Source code for scheduling.pga

"""
Packet Generation Attempt
-------------------------
This module provides functions to calculate the end-to-end probability of
generating EPR pairs in a quantum network, check if the probability of
generating a certain number of EPR pairs exceeds a given threshold, and
compute the duration of a Packet Generation Attempt (PGA) based on these
probabilities.
"""

from functools import lru_cache

from scipy.stats import binom


[docs] def probability_e2e( n_swap: int, memory: int = 1, p_gen: float = 0.001, p_swap: float = 0.6 ) -> float: """Calculate the end-to-end probability of generating EPR pairs in a given path. Args: n_swap (int): Number of swaps performed. memory (int, optional): Number of independent link-generation trials per slot. p_gen (float, optional): Probability of generating an EPR pair in a single trial. p_swap (float, optional): Probability of swapping an EPR pair in a single trial. Returns: float: End-to-end probability of generating EPR pairs. """ p_succ_one_link = 1 - (1 - p_gen) ** (memory) p_succ_all_links = p_succ_one_link ** (n_swap + 1) p_bsms = p_swap**n_swap return p_succ_all_links * p_bsms
[docs] def exceeds_p_packet(n: int, k: int, p_e2e: float, p_packet: float) -> bool: """Check if the probability of generating at least k EPR pairs in n trials is greater than or equal to p_packet. Args: n (int): Number of trials. k (int): Number of successes (number of EPR pairs generated). p_e2e (float): Probability of generating an EPR pair end-to-end in a single trial. p_packet (float): Probability of a packet being generated. Returns: bool: True if the probability of generating at least k EPR pairs in n trials is greater than or equal to p_packet. """ return binom.sf(k - 1, float(n), p_e2e) >= p_packet
[docs] @lru_cache(maxsize=None) def duration_pga( p_packet: float, epr_pairs: int, n_swap: int, memory: int = 1, p_swap: float = 0.6, p_gen: float = 0.001, time_slot_duration: float = 1e-4, ) -> float: """Calculate the duration of a PGA (Packet Generation Attempt). Args: p_packet (float): Probability of a packet being generated. epr_pairs (int): Number of successes (number of EPR pairs generated). n_swap (int): Number of swaps performed. memory (int, optional): Number of independent link-generation trials per slot. p_swap (float, optional): Probability of swapping an EPR pair in a single trial. p_gen (float, optional): Probability of generating an EPR pair in a single trial. time_slot_duration (float, optional): Duration of a time slot in seconds. Returns: float: Duration of a PGA in seconds. """ if p_packet == 1.0: raise ValueError( "p_packet cannot be 1.0, as it would lead to infinite duration." ) p_e2e = probability_e2e(n_swap, memory, p_gen, p_swap) # exponential search low = epr_pairs high = low while not exceeds_p_packet(high, epr_pairs, p_e2e, p_packet): high *= 2 while low < high: mid = (low + high) // 2 if exceeds_p_packet(mid, epr_pairs, p_e2e, p_packet): high = mid else: low = mid + 1 return low * time_slot_duration
[docs] def compute_durations( paths: dict[str, list[str] | None], epr_pairs: dict[str, int], p_packet: float, memory: int, p_swap: float, time_slot_duration: float, rates: dict[tuple, float], ) -> dict[str, float]: """Compute the duration of each application based on the paths and link parameters. Args: paths (dict[str, list[str]]): Paths for each application in the network. epr_pairs (dict[str, int]): Entanglement generation pairs for each application, indicating how many EPR pairs are to be generated. p_packet (float): Probability of a packet being generated. memory (int): Number of independent link-generation trials per slot. p_swap (float): Probability of swapping an EPR pair in a single trial. time_slot_duration (float): Duration of a time slot in seconds. rates (dict[tuple, float]): Per-link p_gen, keyed by sorted-tuple edges. The effective p_gen for a route is the minimum across its edges. Returns: dict[str, float]: A dictionary mapping each application to its total duration, which includes the time taken for probabilistic generation of EPR pairs and the latency based on the distance of the path. """ durations = {} for app, route in paths.items(): if not route: continue length_route = len(route) n_swaps = length_route - 2 if length_route <= 2: n_swaps = 0 effective_p_gen = min( rates[(min(u, v), max(u, v))] for u, v in zip(route[:-1], route[1:], strict=False) ) pga_time = duration_pga( p_packet=p_packet, epr_pairs=epr_pairs[app], n_swap=n_swaps, memory=memory, p_swap=p_swap, p_gen=effective_p_gen, time_slot_duration=time_slot_duration, ) durations[app] = pga_time return durations