Source code for scheduling.simulation

"""
Simulation Of PGAs Scheduling
-----------------------------
This module provides classes and functions to simulate the scheduling of
Packet Generation Attempts (PGAs) in a quantum network. Each PGA tries to
generate entangled EPR pairs over a specified route within a defined time
window, considering resource availability and link busy times. The function,
`simulate_static`, simulates a static schedule of PGAs and returns performance
metrics, link utilizations, and other relevant data. While the function
`simulate_dynamic` implement a dynamic scheduling approach.
"""

import heapq
import re
import time
from typing import Any, Dict, List, Tuple

import numpy as np
import pandas as pd

from scheduling.routing import (
    compute_path_durations,
    dynamic_routing,
    rerouting,
)
from utils.helper import compute_link_utilization, track_link_waiting

INIT_PGA_RE = re.compile(r"^([A-Za-z]+)(\d+)$")
EPS = 1e-12


[docs] class PGA: def __init__( self, name: str, arrival: float, start: float, end: float, route: List[str], resources: Dict[Tuple[str, str], float], link_busy: Dict[Tuple[str, str], float], link_p_gens: List[float] | np.ndarray, epr_pairs: int, slot_duration: float, rng: np.random.Generator, log: List[Dict[str, Any]], policy: str, p_swap: float, memory: int, deadline: float | None = None, route_links: List[Tuple[str, str]] | None = None, ) -> None: """Packet Generation Attempt (PGA) simulation. A PGA tries to generate EPR pairs over a specified route within a defined time window, considering resource availability and link busy times. The possible outcomes for a PGA: - If the PGA starts but cannot generate the required E2E EPR pairs within its time window, it is marked as "failed". - If the PGA successfully generates the required E2E EPR pairs within its time window, it is marked as "completed". The conditions for EPR generation: - Each link attempts to generate EPR pairs in discrete time slots. - The number of trials needed for a successful EPR pair generation on each link follows a geometric distribution with success probability `p_gen`. - The first success across all links must occur within the memory of the first generated pair to be considered valid. - If there are swaps involved, each swap must also succeed based on the swap probability `p_swap` for the end-to-end entanglement to be successful. Args: name (str): PGA identifier. arrival (float): Arrival time of the PGA in the simulation. start (float): Start time of the PGA in the simulation. end (float): End time of the PGA in the simulation. route (List[str]): List of nodes in the PGA's route. resources (Dict[Tuple[str, str], float]): Dictionary tracking when undirected links become free. link_busy (Dict[Tuple[str, str], float]): Dictionary to track busy time of links. link_p_gens (List[float] | np.ndarray): Per-link probabilities of generating an EPR pair in a single trial, in route_links order. epr_pairs (int): Number of EPR pairs to generate for this PGA. slot_duration (float): Duration of a time slot for EPR generation. rng (np.random.Generator): Random number generator for probabilistic events. log (List[Dict[str, Any]]): Log to record PGA performance metrics. policy (str): Scheduling policy for the PGA, either "best_effort" or "deadline". If "deadline", the PGA will attempt to complete within the maximum burst time defined in durations. p_swap (float): Probability of swapping an EPR pair. memory (int): Memory: number of independent link-generation trials per slot. deadline (float, optional): Deadline time for the PGA. Defaults to None, which means no deadline. """ self.name = name self.arrival = float(arrival) self.start = float(start) self.end = float(end) self.route = route self.resources = resources self.link_busy = link_busy self.epr_pairs = int(epr_pairs) self.slot_duration = float(slot_duration) self.rng = rng self.log = log self.policy = policy self.deadline = None if deadline is None else float(deadline) self.links = route_links self.n_swap = max(0, len(self.route) - 2) self.p_swap = float(p_swap) self.memory = max(0, int(memory)) self.link_p_gens = np.asarray(link_p_gens, dtype=float) def _simulate_e2e_attempts(self, max_attempts: int) -> np.ndarray: """Single end-to-end entanglement for a batch of attempts.""" if self.memory <= 0 or max_attempts <= 0: return np.zeros(max_attempts, dtype=bool) n_links = self.n_swap + 1 t_mem = self.memory size = (max_attempts, n_links) if np.any(self.link_p_gens <= 0.0): return np.zeros(max_attempts, dtype=bool) starts = self.rng.geometric(self.link_p_gens, size=size) - 1 ends = starts + (t_mem - 1) candidate = starts.max(axis=1) last_valid = ends.min(axis=1) succ = (candidate < t_mem) & (last_valid >= candidate) if self.n_swap > 0: if self.p_swap < 1.0: p_bsms = self.p_swap**self.n_swap swap_ok = self.rng.random(max_attempts) < p_bsms succ &= swap_ok return succ def _update_resources_and_links( self, completion: float, attempts_run: int, ) -> None: """Mark nodes and links busy through ``completion``.""" for link in self.links: self.resources[link] = max( self.resources.get(link, 0.0), completion ) if attempts_run > 0 and self.links: busy = attempts_run * self.slot_duration for link in self.links: self.link_busy[link] = self.link_busy.get(link, 0.0) + busy
[docs] def run(self) -> Dict[str, Any]: attempts_run = 0 pairs_generated = 0 wait_until = 0.0 blocking_links = [] for link in self.links: lk_b_t = self.resources.get(link, 0.0) if lk_b_t > wait_until: wait_until = lk_b_t for link in self.links: lk_b_t = self.resources.get(link, 0.0) if abs(lk_b_t - wait_until) < EPS: blocking_links.append(link) current_time = self.start diff = self.end - self.start t_budget = diff if diff > 0.0 else 0.0 status = "failed" if t_budget > EPS and self.policy == "deadline": max_attempts = int((t_budget + EPS) // self.slot_duration) succ = self._simulate_e2e_attempts(max_attempts) csum = ( np.cumsum(succ, dtype=int) if len(succ) else np.array([], dtype=int) ) hit = ( np.searchsorted(csum, self.epr_pairs, side="left") if len(csum) else len(csum) ) if len(csum) and hit < len(csum): attempts_run = int(hit + 1) pairs_generated = int(csum[attempts_run - 1]) status = "completed" else: attempts_run = max_attempts pairs_generated = int(csum[-1]) if len(csum) else 0 current_time = self.start + attempts_run * self.slot_duration completion = ( current_time if current_time < self.end else self.end ) self._update_resources_and_links(completion, attempts_run) else: completion = self.start burst = completion - self.start turnaround = max(0.0, completion - self.arrival) waiting = max(0.0, turnaround - burst) result = { "pga": self.name, "arrival_time": self.arrival, "start_time": self.start, "burst_time": burst, "completion_time": completion, "turnaround_time": turnaround, "waiting_time": waiting, "pairs_generated": pairs_generated, "status": status, "deadline": self.deadline if self.policy == "deadline" else None, "blocking_links": blocking_links, } self.log.append(result) return result
[docs] def simulate_dynamic( app_specs: Dict[str, Dict[str, Any]], durations: Dict[str, float], pga_parameters: Dict[str, Dict[str, float]], pga_rel_times: Dict[str, float], pga_network_paths: Dict[str, List[List[str]]], rng: np.random.Generator, full_dynamic: bool = True, rerouting_mode: bool = False, all_links: List[Tuple[str, str]] | None = None, simple_paths: Dict[str, List[List[str]]] | None = None, static_routing_mode: bool = False, horizon_time: float | None = None, warmup_time: float = 0.0, rng_routing: np.random.Generator | None = None, rng_arrivals: Dict[str, np.random.Generator] | None = None, instance_arrival_rate: float = 10.0, rates: Dict[Tuple[str, str], float] = None, ): log = [] defer_counts = {} pga_release_times = {} pga_names = [] seen_pgas = set() pga_route_links = { app: [ tuple(sorted((u, v))) for u, v in zip(paths[0][:-1], paths[0][1:], strict=False) ] for app, paths in pga_network_paths.items() } resources = {link: 0.0 for link in all_links} link_busy = dict.fromkeys(all_links, 0.0) link_busy_record = dict.fromkeys(all_links, 0.0) link_waiting_routing = ( { link: {"total_waiting_time": 0.0, "pga_waited": 0} for link in all_links } if full_dynamic and simple_paths is not None else None ) link_waiting = { link: {"total_waiting_time": 0.0, "pga_waited": 0} for link in all_links } min_arrival = float("inf") max_completion = 0.0 routing_decision_cpt = 0 routing_decision_runtime = 0.0 deadline_budgets = { app: app_specs[app].get("deadline_budget") for app in app_specs } base_release = {app: pga_rel_times.get(app, 0.0) for app in app_specs} max_instances = { app: max(0, int(app_specs[app].get("instances", 0))) for app in app_specs } release_indices = {app: 0 for app in app_specs} mean_instance_interarrival = 1.0 / instance_arrival_rate poisson_next_release = { app: float(base_release.get(app, 0.0)) for app in app_specs } events_queue = [] ready_queue = [] routing_metadata = {} pga_best = {} rerouting_candidates = {} if full_dynamic and simple_paths is not None: for app in app_specs: _t0 = time.perf_counter() routing_metadata[app] = compute_path_durations( pga_parameters[app], simple_paths=simple_paths, src=app_specs[app]["src"], dst=app_specs[app]["dst"], rates=rates, ) routing_decision_runtime += time.perf_counter() - _t0 min_fid = app_specs[app].get("min_fidelity", 0.0) feasible_durs = [ dur for fid, _, _, dur in routing_metadata[app] if fid >= min_fid ] pga_best[app] = ( min(feasible_durs) if feasible_durs else float("nan") ) elif rerouting_mode: for app, app_paths in pga_network_paths.items(): _t0 = time.perf_counter() rerouting_candidates[app] = compute_path_durations( pga_parameters[app], provisioned_paths=app_paths, rates=rates, ) routing_decision_runtime += time.perf_counter() - _t0 def enqueue_release(app: str) -> None: idx = release_indices[app] deadline_budget = deadline_budgets[app] if max_instances[app] > 0 and idx >= max_instances[app]: return release = poisson_next_release[app] _rng_arr = ( rng_arrivals.get(app) if rng_arrivals is not None else None ) or rng poisson_next_release[app] = release + _rng_arr.exponential( mean_instance_interarrival ) if release >= horizon_time: return deadline = release + deadline_budget heapq.heappush( events_queue, (release, deadline, release, app, idx, release, "release"), ) release_indices[app] += 1 for app in app_specs: enqueue_release(app) cur_t = 0.0 while events_queue or ready_queue: if not ready_queue: cur_t = events_queue[0][0] if cur_t >= horizon_time: break while events_queue and events_queue[0][0] <= cur_t + EPS: ( event_time, deadline, arrival_time, app, i, ready_time, event_type, ) = heapq.heappop(events_queue) if event_type == "release": enqueue_release(app) if event_time >= horizon_time: continue heapq.heappush( ready_queue, (deadline, ready_time, arrival_time, app, i, event_time), ) if not ready_queue: continue while ready_queue: deadline, rdy_t, arrival_time, app, i, _ = heapq.heappop( ready_queue ) at = float(arrival_time) if at < min_arrival: min_arrival = at pga_name = f"{app}{i}" if pga_name not in seen_pgas: seen_pgas.add(pga_name) pga_names.append(pga_name) pga_release_times[pga_name] = arrival_time routed_fid = np.nan if static_routing_mode: route_links = pga_route_links.get(app, []) selected_path = pga_network_paths[app][0] duration = durations.get(app, 0.0) last_available = max( (resources.get(lk, 0.0) for lk in route_links), default=0.0, ) elif full_dynamic and simple_paths is not None: routing_decision_cpt += 1 _t0 = time.perf_counter() routed, next_avail = dynamic_routing( routing_metadata[app], app_specs[app]["min_fidelity"], deadline, cur_t, resources, ) routing_decision_runtime += time.perf_counter() - _t0 if routed is None: if next_avail is not None: defer_counts[pga_name] = ( defer_counts.get(pga_name, 0) + 1 ) heapq.heappush( events_queue, ( next_avail, deadline, arrival_time, app, i, rdy_t, "resume" ), ) else: log.append({ "pga": pga_name, "arrival_time": arrival_time, "ready_time": rdy_t, "start_time": np.nan, "burst_time": 0.0, "completion_time": cur_t, "turnaround_time": max(0.0, cur_t - arrival_time), "waiting_time": max(0.0, cur_t - rdy_t), "pairs_generated": 0, "status": "drop", "deadline": deadline, }) continue ( selected_path, route_links, last_available, duration, routed_fid, ) = routed else: route_links = pga_route_links.get(app, []) selected_path = pga_network_paths[app][0] duration = durations.get(app, 0.0) last_available = 0.0 for link in route_links: last_available = max( last_available, resources.get(link, 0.0) ) would_drop = last_available + duration > deadline + EPS if ( last_available > cur_t + EPS and rerouting_mode and would_drop ): routing_decision_cpt += 1 _t0 = time.perf_counter() alt_path = rerouting( rerouting_candidates, deadline, cur_t, app, resources, ) routing_decision_runtime += time.perf_counter() - _t0 if alt_path is not None: ( selected_path, route_links, last_available, duration, routed_fid, ) = alt_path _stamp = ( { "e2e_fidelity": routed_fid, "pga_duration": duration, "hops": len(selected_path) - 1, } if (full_dynamic and simple_paths is not None) or rerouting_mode else {} ) if full_dynamic and simple_paths is not None: best = pga_best.get(app, float("nan")) _stamp["routing_efficiency"] = ( best / duration if duration > 0 and not np.isnan(best) else float("nan") ) if last_available > cur_t + EPS: if last_available + duration <= deadline + EPS: defer_counts[pga_name] = ( defer_counts.get(pga_name, 0) + 1 ) heapq.heappush( events_queue, ( last_available, deadline, arrival_time, app, i, rdy_t, "resume", ), ) else: log.append({ "pga": pga_name, "arrival_time": arrival_time, "ready_time": rdy_t, "start_time": np.nan, "burst_time": 0.0, "completion_time": cur_t, "turnaround_time": max(0.0, cur_t - arrival_time), "waiting_time": max(0.0, cur_t - rdy_t), "pairs_generated": 0, "status": "drop", "deadline": deadline, **_stamp, }) continue start_time = cur_t completion = start_time + duration if ( app_specs[app].get("policy") == "deadline" and deadline is not None and start_time + duration > deadline + EPS ): log.append({ "pga": pga_name, "arrival_time": arrival_time, "ready_time": rdy_t, "start_time": np.nan, "burst_time": 0.0, "completion_time": cur_t, "turnaround_time": max(0.0, cur_t - arrival_time), "waiting_time": max(0.0, cur_t - rdy_t), "pairs_generated": 0, "status": "drop", "deadline": deadline, **_stamp, }) continue recording = start_time >= warmup_time link_p_gens = [rates[lk] for lk in route_links] pga = PGA( name=pga_name, arrival=arrival_time, start=start_time, end=completion, route=selected_path, resources=resources, link_busy=link_busy_record if recording else link_busy, link_p_gens=link_p_gens, epr_pairs=int(pga_parameters[app]["epr_pairs"]), slot_duration=pga_parameters[app]["slot_duration"], rng=rng, log=log, policy=app_specs[app].get("policy"), p_swap=pga_parameters[app]["p_swap"], memory=pga_parameters[app]["memory"], deadline=deadline, route_links=route_links, ) result = pga.run() result["ready_time"] = float(rdy_t) result["waiting_time"] = max(0.0, start_time - rdy_t) result.update(_stamp) if link_waiting_routing is not None: track_link_waiting( result.get("waiting_time", 0.0), link_waiting_routing, blocking_links=result.get("blocking_links"), ) if recording: track_link_waiting( result.get("waiting_time", 0.0), link_waiting, blocking_links=result.get("blocking_links"), ) if result["completion_time"] > horizon_time: excess = result["completion_time"] - horizon_time for lk in route_links: if lk in link_busy_record: link_busy_record[lk] = max( 0.0, link_busy_record[lk] - excess ) max_completion = max(max_completion, result["completion_time"]) status = result.get("status", "") if status == "completed": continue df = pd.DataFrame(log) del log link_utilization = compute_link_utilization( link_busy_record, warmup_time, horizon_time, ) return ( df, pga_names, pga_release_times, link_utilization, link_waiting, routing_decision_cpt, routing_decision_runtime, defer_counts, )