Source code for scheduling.main

"""
QNScheduling
============

Overview:
---------
This script simulates the scheduling of quantum network applications. It
generates a set of applications based on a given network configuration,
computes their durations using Packet Generation Attempt (PGA). There are two
scheduling strategies: static scheduling (deprecated) using Earliest Deadline
First (EDF) that precomputes a schedule, and dynamic scheduling that makes
decisions online based on application arrivals. Application releases are drawn
from a Poisson process with rate ``--arrival-rate`` over the observation
horizon. The simulation runs for a fixed time horizon; any pending releases
past the horizon are dropped. Metrics collected over the window [warmup,
horizon].
"""

import argparse
import json
import os
import re
import time

import numpy as np

from scheduling.fidelity import fidelity_bounds_and_paths
from scheduling.pga import compute_durations
from scheduling.routing import find_feasible_path, static_routing
from scheduling.simulation import simulate_dynamic
from utils.graph_generator import fat_tree, generate_waxman_graph
from utils.helper import (
    all_simple_paths,
    app_params_sim,
    generate_n_apps,
    gml_data,
    save_results,
)


[docs] def run_simulation( config: str, arrival_rate: float, inst_range: int, epr_range: tuple[int, int], deadline_range: tuple[float, float], p_packet: float, memory: float, p_swap: float, time_slot_duration: float, seed: int, output_dir: str, instance_arrival_rate: float = 10.0, routing: str = "shortest", save_csv: bool = True, verbose: bool = True, graph: str | None = None, provisioning: bool = True, full_dynamic: bool = True, static_routing_mode: bool = False, windows: tuple[float, float] | None = None, ): """Run the quantum network scheduling simulation. Args: config (yaml or gml): Configuration file path in YAML or GML format. arrival_rate (float): Mean rate lambda for the Poisson arrival process. The number of applications is drawn from this rate over the observation horizon. inst_range (tuple[int, int]): Range (min, max) for the number of instances per application. epr_range (tuple[int, int]): Range (min, max) for the number of EPR pairs to generate per application. deadline_range (tuple[float, float]): Range (min, max) for the relative deadline budget of each application (deadline = release + deadline_budget). p_packet (float): Probability of a packet being generated. memory (float): Memory: number of independent link-generation trials per slot. p_swap (float): Probability of swapping an EPR pair in a single trial. time_slot_duration (float): Duration of a time slot in seconds. seed (int): Random seed for reproducibility of the simulation. output_dir (str): Directory where the results will be saved. windows (tuple[float, float] | None): Post-warm-up observation window as (min_time, max_time). In dynamic mode, max_time is used as the simulation horizon. Returns: tuple[bool, dict]: A tuple containing: - bool: whether the schedule is feasible - dict: summary metrics dictionary """ ss = np.random.SeedSequence(seed) child_seeds = ss.spawn(4) rng, rng_arrivals, rng_routing = ( np.random.default_rng(s) for s in child_seeds[:3] ) ss_app_arrivals = child_seeds[3] # Generate network data and applications based on the configuration file fidelities = {} simple_paths = {} avg_deg = float("nan") diameter = float("nan") rates = None if graph == "waxman": nodes, edges, fidelities, rates, avg_deg, diameter = ( generate_waxman_graph(rng=rng) ) if not nodes or not edges: print("Failed to generate a connected Waxman graph.") return False, {} elif graph == "fat": nodes, edges, fidelities, rates, qpus, diameter = fat_tree() nodes = qpus elif graph == "gml": nodes, edges, distances, fidelities, rates, diameter = gml_data(config) bounds, simple_paths = fidelity_bounds_and_paths( nodes, fidelities, diameter + 2 ) all_links = {tuple(sorted((u, v))) for u, v in edges} # Poisson arrival times for each application over the observation horizon if float(arrival_rate) <= 0.0: raise ValueError("arrival_rate must be positive") windows_max = windows[1] if windows is not None else float("inf") mean_interarrival = 1.0 / float(arrival_rate) arrival_times = [] t = 0.0 while True: t += float(rng_arrivals.exponential(mean_interarrival)) if t > windows_max: break arrival_times.append(t) app_specs = generate_n_apps( nodes, bounds, n_apps=len(arrival_times), inst_range=inst_range, epr_range=epr_range, deadline_range=deadline_range, list_policies=["deadline"], rng=rng, ) pga_rel_times = { app: arrival_times[i] for i, app in enumerate(app_specs.keys()) } rng_arrivals_per_app = { app: np.random.default_rng(s) for app, s in zip( app_specs.keys(), ss_app_arrivals.spawn(len(app_specs)), strict=True, ) } # Find feasible paths for each application based on fidelity/routing mode app_requests = { name: { "src": spec["src"], "dst": spec["dst"], "min_fidelity": spec.get("min_fidelity", 0.0), "instances": spec.get("instances", 0), "epr": spec.get("epr", 0), "deadline_budget": spec.get("deadline_budget", 0.0), "arrival_time": pga_rel_times[name], } for name, spec in app_specs.items() } total_apps = len(app_specs) routing_mode = str(routing) admitted_specs = {} static_routing_time = 0.0 if static_routing_mode: _t0 = time.perf_counter() paths, app_e2e_fidelities = static_routing(app_requests, simple_paths) static_routing_time = time.perf_counter() - _t0 elif full_dynamic: paths = { app: [[req["src"], req["dst"]]] for app, req in app_requests.items() } app_e2e_fidelities = {app: float("nan") for app in app_requests} static_routing_time = 0.0 else: _t0 = time.perf_counter() paths, app_e2e_fidelities = find_feasible_path( edges, simple_paths, app_requests, fidelities, pga_rel_times=pga_rel_times, routing_mode=routing_mode, p_packet=p_packet, memory=memory, p_swap=p_swap, rates=rates, time_slot_duration=time_slot_duration, rng=rng_routing, provisioning=provisioning, ) static_routing_time = time.perf_counter() - _t0 admitted_paths = { app: path_list for app, path_list in paths.items() if path_list } admitted_specs = {app: app_specs[app] for app in admitted_paths.keys()} admitted_apps = len(admitted_specs) if admitted_apps == 0: return False, {} app_e2e_fidelities = { app: app_e2e_fidelities[app] for app in admitted_paths } app_specs = admitted_specs paths = admitted_paths single_path_cpt = 0 two_path_cpt = 0 for spec in app_specs.values(): src, dst = spec["src"], spec["dst"] min_fid = spec.get("min_fidelity", 0.0) feasible_paths = [ (fid, p) for fid, p in all_simple_paths(simple_paths, src, dst) if fid >= min_fid ] if len(feasible_paths) == 1: single_path_cpt += 1 if len(feasible_paths) <= 2: two_path_cpt += 1 single_path_share = single_path_cpt / total_apps if total_apps > 0 else 0.0 two_path_share = two_path_cpt / total_apps if total_apps > 0 else 0.0 initial_paths = {app: path_list[0] for app, path_list in paths.items()} epr_pairs = {name: spec["epr"] for name, spec in app_specs.items()} # Compute durations for each application durations = ( {} if full_dynamic else compute_durations( initial_paths, epr_pairs, p_packet, memory, p_swap, time_slot_duration, rates=rates, ) ) pga_parameters = app_params_sim( initial_paths, app_specs, p_packet, memory, p_swap, time_slot_duration, ) # Run simulation if save_csv: os.makedirs(output_dir, exist_ok=True) routing_decision_cpt = None routing_decision_runtime = None defer_counts = None ( df, pga_names, pga_release_times, link_utilization, link_waiting, routing_decision_cpt, routing_decision_runtime, defer_counts, ) = simulate_dynamic( app_specs, durations, pga_parameters, pga_rel_times, paths, rng, full_dynamic, provisioning, all_links, simple_paths, static_routing_mode, horizon_time=windows[1] if windows is not None else None, warmup_time=windows[0] if windows is not None else 0.0, rng_routing=rng_routing, rng_arrivals=rng_arrivals_per_app, instance_arrival_rate=instance_arrival_rate, rates=rates, ) feasible = True if not full_dynamic: if static_routing_mode: routing_decision_cpt += 1 else: routing_decision_cpt += len(app_specs) # Save results routing_decision_runtime = ( routing_decision_runtime or 0.0 ) + static_routing_time if link_utilization is None: link_utilization = {} if link_waiting is None: link_waiting = {} for link in all_links: link_utilization.setdefault( link, { "busy_time": 0.0, "utilization": 0.0, }, ) link_waiting.setdefault( link, { "total_waiting_time": 0.0, "pga_waited": 0, }, ) summary = save_results( df, pga_names, pga_release_times, app_specs, durations=durations, pga_network_paths=initial_paths, n_edges=len(edges), link_utilization=link_utilization, link_waiting=link_waiting, admitted_apps=admitted_apps, total_apps=total_apps, app_e2e_fidelities=app_e2e_fidelities, single_path_share=single_path_share, two_path_share=two_path_share, avg_deg=avg_deg, output_dir=output_dir, save_csv=save_csv, verbose=verbose, routing_decision_cpt=routing_decision_cpt, routing_decision_runtime=routing_decision_runtime, warmup=windows[0] if windows is not None else None, end_time=windows[1] if windows is not None else None, defer_counts=defer_counts, ) del df return feasible, summary
[docs] def main(): parser = argparse.ArgumentParser( description="Simulate scheduling of quantum network applications" ) parser.add_argument( "--config", "-c", type=str, default="configurations/network/basic/2_equal_paths.gml", help="Path to YAML or GML config", ) parser.add_argument( "--inst", "-i", type=int, default=100, help="Number of instances to generate per application", ) parser.add_argument( "--epr", "-e", type=int, nargs=2, metavar=("MIN", "MAX"), default=[2, 2], help="Maximum number of EPR pairs to generate per application" "(e.g., --epr 1 5)", ) parser.add_argument( "--deadline", "-d", type=float, nargs=2, metavar=("MIN", "MAX"), default=[1.0, 1.0], help="Relative deadline budget per application: deadline = release" " + budget (e.g., --deadline 1.0 5.0)", ) parser.add_argument( "--ppacket", "-pp", type=float, default=0.2, help="Probability of a packet being generated", ) parser.add_argument( "--memory", "-m", type=int, default=1000, help="Number of independent link-generation trials per slot in the" "multiplexed memory", ) parser.add_argument( "--pswap", "-ps", type=float, default=0.6, help="Probability of swapping an EPR pair in a single trial", ) parser.add_argument( "--slot-duration", "-sd", type=float, default=1e-4, help="Duration of a time slot in seconds", ) parser.add_argument( "--arrival-rate", "-ar", type=float, default=1.0, help="Mean arrival rate (lambda) for the Poisson arrival process", ) parser.add_argument( "--routing", "-r", type=str, choices=["shortest", "smallest", "least", "highest"], default="shortest", help=( "Routing: 'shortest' (Dijkstra),'smallest' (smallest bottleneck)," "'least' (least total capacity), or 'highest' (highest " "E2E fidelity)." ), ) parser.add_argument( "--graph", "-g", type=str, choices=["waxman", "fat", "gml"], default=None, help="Graph generator (e.g., 'waxman', 'fat', 'gml')", ) parser.add_argument( "--routing-strategy", "-rs", type=str, choices=["static", "hybrid", "rerouting", "dynamic"], default=None, help="Routing strategy: 'static' (fixed static paths), 'hybrid' (" "static no rerouting), 'rerouting' (static rerouting)," "or 'dynamic' (dynamic routing)", ) parser.add_argument( "--seed", "-s", type=int, default=42, help="Random seed for simulation (optional)", ) parser.add_argument( "--output", "-o", default="results", help="Directory to save results", ) args = parser.parse_args() max_observation_window = 3 * args.inst * 10 * (1.0 / 10.0) warmup = 0.15 * max_observation_window windows = (warmup, max_observation_window) seed_dir = os.path.join(args.output, f"seed_{args.seed}") run_number = 1 pattern = re.compile(r"run(\d+)$") if os.path.isdir(seed_dir): for name in os.listdir(seed_dir): m = pattern.match(name) if m: run_number = max(run_number, int(m.group(1)) + 1) run_dir = os.path.join(seed_dir, f"run{run_number}") t0 = time.perf_counter() feasible, _ = run_simulation( config=args.config, arrival_rate=args.arrival_rate, inst_range=args.inst, epr_range=args.epr, deadline_range=args.deadline, windows=windows, p_packet=args.ppacket, memory=args.memory, p_swap=args.pswap, time_slot_duration=args.slot_duration, seed=args.seed, output_dir=run_dir, routing=args.routing, graph=args.graph, provisioning=args.routing_strategy == "rerouting", full_dynamic=args.routing_strategy == "dynamic", static_routing_mode=args.routing_strategy == "static", ) t1 = time.perf_counter() if not feasible: return runtime = t1 - t0 print(f"Run time: {runtime:.3f} seconds\n") params = { "config": args.config, "arrival_rate": args.arrival_rate, "inst_range": args.inst, "epr_min": args.epr[0], "epr_max": args.epr[1], "deadline_min": args.deadline[0], "deadline_max": args.deadline[1], "windows_min": windows[0], "windows_max": windows[1], "p_packet": args.ppacket, "memory": args.memory, "p_swap": args.pswap, "time_slot_duration": args.slot_duration, "seed": args.seed, "runtime_seconds": runtime, "run_number": run_number, "routing": args.routing, "routing_strategy": args.routing_strategy, } with open(os.path.join(run_dir, "params.json"), "w") as f: json.dump(params, f, indent=2) path_results = os.path.join(run_dir, "pga_results.parquet") path_params = os.path.join(run_dir, "params.json") path_link_util = os.path.join(run_dir, "link_utilization.csv") path_link_wait = os.path.join(run_dir, "link_waiting.csv") print(f"Saved results to: {path_results}") print(f"Saved parameters to: {path_params}") print(f"Saved link utilization to: {path_link_util}") print(f"Saved link waiting metrics to: {path_link_wait}")
if __name__ == "__main__": main()