| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import random as rd
- import netsquid as ns
- import numpy as np
- from netsquid.protocols import NodeProtocol
- from netsquid.qubits import QFormalism, ketstates
- from netsquid.qubits import operators as ops
- from netsquid.qubits import qubitapi
- from netsquid.qubits.cliffords import local_cliffords
- from netsquid.qubits.operators import Operator
- from netsquid.qubits.qubitapi import create_qubits, measure, operate
- from scipy.optimize import curve_fit
- CLIFFORD_OPERATORS = [Operator(op.name, op.arr) for op in local_cliffords]
- ns.set_qstate_formalism(QFormalism.DM)
- def EXP(x, p, A):
- return A * p**(2 * x)
- def REGRESSION(bounces, mean_bm):
- popt_AB, pcov_AB = curve_fit(EXP, bounces, mean_bm, p0=[0.9, 0.5], maxfev=100000)
- # print("poptAB", popt_AB[0])
- return [popt_AB[0], popt_AB[1]]
- def GET_FIDELITY(info_qubit, gates):
- [ref_qubit1, ref_qubit2] = create_qubits(2)
- # qubitapi.assign_qstate(ref_qubit1, ketstates.s1)
- # qubitapi.assign_qstate(ref_qubit2, ketstates.s1)
- operate(ref_qubit2, ops.X)
- for gate_instr in gates:
- operate(ref_qubit1, gate_instr)
- operate(ref_qubit2, gate_instr)
- fidelity = qubitapi.exp_value(
- info_qubit, ops.Operator("ref", (ns.qubits.reduced_dm(ref_qubit1) - ns.qubits.reduced_dm(ref_qubit2)) / 2))
- # fidelity = abs(fidelity)
- # if fidelity < 0:
- # print("NEGATIVE", fidelity)
- # # print("A:", ns.qubits.reduced_dm(ref_qubit1))
- # # print("B:", ns.qubits.reduced_dm(ref_qubit2))
- # # print("Operator:", (ns.qubits.reduced_dm(ref_qubit1) - ns.qubits.reduced_dm(ref_qubit2)) / 2)
- # # Add Gaussian noise to simulate measurement noise, but we need to make sure fidelity is within a valid range
- # if fidelity < 0.005:
- # # If the fidelity is too small, adding noise will likely to make it negative, so we skip adding noise
- # return fidelity
- # while True:
- # # measure_error = np.random.normal(0, np.sqrt(((1 + fidelity) * (1 - fidelity))) / np.sqrt(1000))
- # noisy_fidelity = fidelity + np.random.normal(0, 0.015)
- # if noisy_fidelity >= 0 and noisy_fidelity <= 1:
- # break
- noisy_fidelity = fidelity + np.random.normal(0, 0.015)
- return noisy_fidelity
- def teleport(epr_qubit, info_qubit):
- """Perform teleportation and return two classical bits."""
- operate([epr_qubit, info_qubit], ns.CNOT)
- operate(epr_qubit, ns.H)
- m1, _ = measure(epr_qubit)
- m2, _ = measure(info_qubit)
- return [m1, m2]
- def correction(epr_qubit, measurement_results):
- """Perform correction to recover the information qubit."""
- if measurement_results[0]:
- operate(epr_qubit, ns.Z)
- if measurement_results[1]:
- operate(epr_qubit, ns.X)
- return epr_qubit
- class NBProtocolAlice(NodeProtocol):
- # bounce: bounce number, type: list
- # num_samples: repetition times for each bounce, type: dict bounce: times
- def __init__(self, node, bounce=[], num_samples={}, qconn=None):
- super().__init__(node)
- self._qconn = qconn
- if isinstance(bounce, list):
- self._bounce_list = bounce
- elif isinstance(bounce, int):
- self._bounce_list = [bounce]
- self._num_samples = num_samples
- self._gates = [] # record the clifford operations we used.
- self._data_record = {}
- self._target_protocol = None
- self._cost = 0 # The totoal number of bounces
- self.add_signal("ALICE_MEASUREMENT_READY")
- self.add_signal("BOB_MEASUREMENT_READY")
- self.add_signal("ENTANGLEMENT_READY")
- def set_target_protocol(self, bob_protocol):
- self._target_protocol = bob_protocol
- def request_ERP(self):
- """Generate an EPR pair by triggering the quantum source of the quantum channel."""
- self._qconn.subcomponents["qsource"].ports["trigger"].tx_input("trigger")
- yield self.await_timer(100) # Wait for Alice and Bob to receive and store their qubits
- def run(self):
- for current_bounce in self._bounce_list:
- current_max_sample = self._num_samples[current_bounce]
- if current_bounce not in self._data_record:
- self._data_record[current_bounce] = []
- # print("current bounce:", current_bounce, "bounce_list:", self._bounce_list)
- for current_sample in range(current_max_sample):
- # print("current sample:", current_sample)
- self._gates.clear()
- info_qubit = create_qubits(1)[0]
- # qubitapi.assign_qstate(info_qubit, ketstates.s1)
- for _ in range(current_bounce):
- # Start one bounce
- # clifford operation to info qubit
- instr = rd.choice(CLIFFORD_OPERATORS)
- self._gates.append(instr)
- operate(info_qubit, instr)
- # Request an ERP pair
- self._qconn.subcomponents["qsource"].ports["trigger"].tx_input("trigger")
- yield self.await_timer(100) # Wait for Alice and Bob to receive and store their qubits
- # Extract EPR pair
- # print(f"At {ns.sim_time()} {self.node} get one EPR pair and starts teleportation")
- epr_qubit = self.node.qmemory.pop(0)[0]
- # print('At', ns.sim_time(), "alice's epr pair", epr_qubit.qstate.qrepr)
- # Teleport info qubit to Bob using the EPR pair
- measurement_results = teleport(epr_qubit, info_qubit)
- # msg = MeasurementResult(measurement_results)
- self.send_signal("ALICE_MEASUREMENT_READY", result=measurement_results)
- self._qconn.subcomponents["qsource"].ports["trigger"].tx_input("trigger")
- yield self.await_timer(100) # Wait for Alice and Bob to receive and store their qubits
- # epr_qubit = self.node.qmemory.pop(entanglement.source_position)[0]
- epr_qubit = self.node.qmemory.pop(0)[0]
- self.send_signal("ENTANGLEMENT_READY")
- # print('At', ns.sim_time(), "alice's epr pair", epr_qubit.qstate.qrepr)
- yield self.await_signal(self._target_protocol, "BOB_MEASUREMENT_READY")
- measurement_results, instrfrombob = self._target_protocol.get_signal_result("BOB_MEASUREMENT_READY")
- self._gates.append(instrfrombob)
- info_qubit = correction(epr_qubit, measurement_results)
- self._cost += 1 # Finish one bounce
- fidelity = GET_FIDELITY(info_qubit, self._gates)
- self._data_record[current_bounce].append(fidelity)
- # print(f"Finished bounce {current_bounce}")
- # result = self.data_processing()
- # print(f"Estimated fidelity: {result}, cost: {self._cost}")
- def data_processing(self):
- raw_data = self._data_record
- bounces = list(raw_data.keys())
- mean_values = [np.mean(raw_data[key]) for key in bounces]
- # if sorted(mean_values)[0] < 0:
- # print("Some mean value is negative,", mean_values)
- # print('bounces:', bounces)
- # print("mean_values:", mean_values)
- p, _ = REGRESSION(bounces, mean_values)
- return p, self._cost
- def return_data(self):
- raw_data = self._data_record
- print(raw_data)
- # print()
- bounces = list(raw_data.keys())
- mean_values = [np.mean(raw_data[key]) for key in bounces]
- # assert len(mean_values) == len(self._bounce_list)
- # print('bounces:', bounces)
- # print("mean_values:",mean_values)
- return mean_values
- class NBProtocolBob(NodeProtocol):
- def __init__(self, node):
- super().__init__(node)
- self.add_signal("ALICE_MEASUREMENT_READY")
- self.add_signal("BOB_MEASUREMENT_READY")
- self.add_signal("ENTANGLEMENT_READY")
- def set_target_protocol(self, alice_protocol):
- self._target_protocol = alice_protocol
- def run(self):
- while True:
- yield self.await_signal(self._target_protocol, signal_label="ALICE_MEASUREMENT_READY")
- measurement_results_from_target = self._target_protocol.get_signal_result("ALICE_MEASUREMENT_READY")
- # entanglement = measurement_result.entanglement
- epr_qubit = self.node.qmemory.pop(0)[0]
- # measurement_results_from_target = measurement_result.measurement_results
- info_qubit = correction(epr_qubit, measurement_results_from_target)
- yield self.await_signal(self._target_protocol, "ENTANGLEMENT_READY")
- # entanglement = self._target_protocol.get_signal_result("ENTANGLEMENT_READY")
- epr_qubit = self.node.qmemory.pop(0)[0]
- # teleport the qubit to the next node Bob
- instr = rd.choice(CLIFFORD_OPERATORS)
- operate(info_qubit, instr)
- measurement_results = teleport(epr_qubit, info_qubit)
- self.send_signal("BOB_MEASUREMENT_READY", result=[measurement_results, instr])
|