Source code for qns.network.protocol.entanglement_distribution

#    SimQN: a discrete-event simulator for the quantum networks
#    Copyright (C) 2021-2022 Lutong Chen, Jian Li, Kaiping Xue
#    University of Science and Technology of China, USTC.
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.

from typing import Dict, Optional
import uuid

from qns.entity.cchannel.cchannel import ClassicChannel, ClassicPacket, RecvClassicPacket
from qns.entity.memory.memory import QuantumMemory
from qns.entity.node.app import Application
from qns.entity.node.node import QNode
from qns.entity.qchannel.qchannel import QuantumChannel, RecvQubitPacket
from qns.models.core.backend import QuantumModel
from qns.network.requests import Request
from qns.simulator.event import Event, func_to_event
from qns.simulator.simulator import Simulator
from qns.network import QuantumNetwork
from qns.models.epr import WernerStateEntanglement
from qns.simulator.ts import Time
import qns.utils.log as log


[docs]class Transmit(): def __init__(self, id: str, src: QNode, dst: QNode, first_epr_name: Optional[str] = None, second_epr_name: Optional[str] = None): self.id = id self.src = src self.dst = dst self.first_epr_name = first_epr_name self.second_epr_name = second_epr_name def __repr__(self) -> str: return f"<transmit {self.id}: {self.src} -> {self.dst},\ epr: {self.first_epr_name}, {self.second_epr_name}>"
[docs]class EntanglementDistributionApp(Application): def __init__(self, send_rate: Optional[int] = None, init_fidelity: int = 0.99): super().__init__() self.init_fidelity = init_fidelity self.net: QuantumNetwork = None self.own: QNode = None self.memory: QuantumMemory = None self.src: Optional[QNode] = None self.dst: Optional[QNode] = None self.send_rate: int = send_rate self.state: Dict[str, Transmit] = {} self.success = [] self.success_count = 0 self.send_count = 0 self.add_handler(self.RecvQubitHandler, [RecvQubitPacket]) self.add_handler(self.RecvClassicPacketHandler, [RecvClassicPacket])
[docs] def install(self, node: QNode, simulator: Simulator): super().install(node, simulator) self.own: QNode = self._node self.memory: QuantumMemory = self.own.memories[0] self.net = self.own.network try: request: Request = self.own.requests[0] if self.own == request.src: self.dst = request.dest elif self.own == request.dest: self.src = request.src self.send_rate = request.attr.get("send_rate") except IndexError: pass if self.dst is not None: # I am a sender t = simulator.ts event = func_to_event(t, self.new_distribution, by=self) self._simulator.add_event(event)
[docs] def RecvQubitHandler(self, node: QNode, event: Event): self.response_distribution(event)
[docs] def RecvClassicPacketHandler(self, node: QNode, event: Event): self.handle_response(event)
[docs] def new_distribution(self): # insert the next send event t = self._simulator.tc + Time(sec=1 / self.send_rate) event = func_to_event(t, self.new_distribution, by=self) self._simulator.add_event(event) log.debug(f"{self.own}: start new request") # generate new entanglement epr = self.generate_qubit(self.own, self.dst, None) log.debug(f"{self.own}: generate epr {epr.name}") self.state[epr.transmit_id] = Transmit( id=epr.transmit_id, src=self.own, dst=self.dst, second_epr_name=epr.name) log.debug(f"{self.own}: generate transmit {self.state[epr.transmit_id]}") if not self.memory.write(epr): self.memory.read(epr) self.state[epr.transmit_id] = None self.send_count += 1 self.request_distrbution(epr.transmit_id)
[docs] def request_distrbution(self, transmit_id: str): transmit = self.state.get(transmit_id) if transmit is None: return epr_name = transmit.second_epr_name epr = self.memory.get(epr_name) if epr is None: return dst = transmit.dst # get next hop route_result = self.net.query_route(self.own, dst) try: next_hop: QNode = route_result[0][1] except IndexError: raise Exception("Route error") qchannel: QuantumChannel = self.own.get_qchannel(next_hop) if qchannel is None: raise Exception("No such quantum channel") # send the entanglement log.debug(f"{self.own}: send epr {epr.name} to {next_hop}") qchannel.send(epr, next_hop)
[docs] def response_distribution(self, packet: RecvQubitPacket): qchannel: QuantumChannel = packet.qchannel from_node: QNode = qchannel.node_list[0] \ if qchannel.node_list[1] == self.own else qchannel.node_list[1] cchannel: ClassicChannel = self.own.get_cchannel(from_node) if cchannel is None: raise Exception("No such classic channel") # receive the first epr epr: WernerStateEntanglement = packet.qubit log.debug(f"{self.own}: recv epr {epr.name} from {from_node}") # generate the second epr next_epr = self.generate_qubit( src=epr.src, dst=epr.dst, transmit_id=epr.transmit_id) log.debug(f"{self.own}: generate epr {next_epr.name}") self.state[epr.transmit_id] = Transmit( id=epr.transmit_id, src=epr.src, dst=epr.dst, first_epr_name=epr.name, second_epr_name=next_epr.name) log.debug( f"{self.own}: generate transmit {self.state[epr.transmit_id]}") log.debug(f"{self.own}: store {epr.name} and {next_epr.name}") ret1 = self.memory.write(epr) ret2 = self.memory.write(next_epr) if not ret1 or not ret2: log.debug(f"{self.own}: store fail, destory {epr} and {next_epr}") # if failed (memory is full), destory all entanglements self.memory.read(epr) self.memory.read(next_epr) classic_packet = ClassicPacket( msg={"cmd": "revoke", "transmit_id": epr.transmit_id}, src=self.own, dest=from_node) cchannel.send(classic_packet, next_hop=from_node) log.debug(f"{self.own}: send {classic_packet.msg} to {from_node}") return classic_packet = ClassicPacket( msg={"cmd": "swap", "transmit_id": epr.transmit_id}, src=self.own, dest=from_node) cchannel.send(classic_packet, next_hop=from_node) log.debug( f"{self.own}: send {classic_packet.msg} from {self.own} to {from_node}")
[docs] def handle_response(self, packet: RecvClassicPacket): msg = packet.packet.get() cchannel = packet.cchannel from_node: QNode = cchannel.node_list[0] \ if cchannel.node_list[1] == self.own else cchannel.node_list[1] log.debug(f"{self.own}: recv {msg} from {from_node}") cmd = msg["cmd"] transmit_id = msg["transmit_id"] transmit = self.state.get(transmit_id) if cmd == "swap": if self.own != transmit.src: # perfrom entanglement swapping first_epr: WernerStateEntanglement = self.memory.read( transmit.first_epr_name) second_epr: WernerStateEntanglement = self.memory.read( transmit.second_epr_name) new_epr = first_epr.swapping(second_epr, name=uuid.uuid4().hex) log.debug( f"{self.own}:perform swap use {first_epr} and {second_epr}") log.debug(f"{self.own}:perform swap generate {new_epr}") src: QNode = transmit.src app: EntanglementDistributionApp = src.get_apps( EntanglementDistributionApp)[0] app.set_second_epr(new_epr, transmit_id=transmit_id) app: EntanglementDistributionApp = from_node.get_apps( EntanglementDistributionApp)[0] app.set_first_epr(new_epr, transmit_id=transmit_id) classic_packet = ClassicPacket( msg={"cmd": "next", "transmit_id": transmit_id}, src=self.own, dest=from_node) cchannel.send(classic_packet, next_hop=from_node) log.debug(f"{self.own}: send {classic_packet.msg} to {from_node}") elif cmd == "next": # finish or request to the next hop if self.own == transmit.dst: result_epr = self.memory.read(transmit.first_epr_name) self.memory.read(transmit.second_epr_name) self.success.append(result_epr) self.state[transmit_id] = None self.success_count += 1 log.debug(f"{self.own}: successful distribute {result_epr}") classic_packet = ClassicPacket( msg={"cmd": "succ", "transmit_id": transmit_id}, src=self.own, dest=transmit.src) cchannel = self.own.get_cchannel(transmit.src) if cchannel is not None: log.debug( f"{self.own}: send {classic_packet} to {from_node}") cchannel.send(classic_packet, next_hop=transmit.src) else: log.debug(f"{self.own}: begin new request {transmit_id}") self.request_distrbution(transmit_id) elif cmd == "succ": # the source notice that entanglement distribution is succeed. result_epr = self.memory.read(transmit.second_epr_name) log.debug(f"{self.own}: recv success distribution {result_epr}") self.state[transmit_id] = None self.success_count += 1 elif cmd == "revoke": # clean memory log.debug( f"{self.own}: clean memory {transmit.first_epr_name}\ and {transmit.second_epr_name}") self.memory.read(transmit.first_epr_name) self.memory.read(transmit.second_epr_name) self.state[transmit_id] = None if self.own != transmit.src: classic_packet = ClassicPacket( msg={"cmd": "revoke", "transmit_id": transmit_id}, src=self.own, dest=transmit.src) cchannel = self.own.get_cchannel(transmit.src) if cchannel is not None: log.debug( f"{self.own}: send {classic_packet} to {from_node}") cchannel.send(classic_packet, next_hop=transmit.src)
[docs] def generate_qubit(self, src: QNode, dst: QNode, transmit_id: Optional[str] = None) -> QuantumModel: epr = WernerStateEntanglement( fidelity=self.init_fidelity, name=uuid.uuid4().hex) epr.src = src epr.dst = dst epr.transmit_id = transmit_id if transmit_id is not None else uuid.uuid4().hex return epr
[docs] def set_first_epr(self, epr: QuantumModel, transmit_id: str): transmit = self.state.get(transmit_id, None) if transmit is None or transmit.first_epr_name is None: return self.memory.read(transmit.first_epr_name) self.memory.write(epr) transmit.first_epr_name = epr.name
[docs] def set_second_epr(self, epr: QuantumModel, transmit_id: str): transmit = self.state.get(transmit_id, None) if transmit is None or transmit.second_epr_name is None: return self.memory.read(transmit.second_epr_name) self.memory.write(epr) transmit.second_epr_name = epr.name