from collections import defaultdict
from typing import Any, Dict, List, Tuple
import networkx as nx
import numpy as np
from scheduling.pga import duration_pga
from utils.helper import all_simple_paths
EPS = 1e-12
[docs]
def shortest_paths(
edges: List[Tuple[str, str]], app_requests: Dict[str, Dict[str, Any]]
) -> Dict[str, List[str]]:
"""Find shortest paths for each applications in a quantum network graph
represented by edges.
Args:
edges (List[Tuple[str, str]]): List of edges in the quantum network,
where each edge is a tuple of nodes (src, dst).
app_requests (Dict[str, Dict[str, Any]]): A dictionary where keys are
application names and values are dictionaries containing source and
destination nodes (src, dst) for the application. For example:
{
'A': {'src': 'Alice', 'dst': 'Bob'},
'B': {'src': 'Alice', 'dst': 'Charlie'},
'C': {'src': 'Charlie', 'dst': 'David'},
'D': {'src': 'Bob', 'dst': 'David'}
}
Returns:
Dict[str, List[str]]: A dictionary where keys are application names and
values are lists of nodes representing the shortest path from source to
destination for that application.
"""
G = nx.Graph()
G.add_edges_from(edges)
return {
application: [nx.shortest_path(G, req["src"], req["dst"])]
for application, req in app_requests.items()
}
def _compute_delta_and_links(
path: List[str],
req: Dict[str, Any],
p_packet: float | None,
memory: int,
p_swap: float,
rates: Dict[Tuple[str, str], float],
time_slot_duration: float,
) -> Tuple[float, List[Tuple[str, str]]]:
n_swaps = max(0, len(path) - 2)
links = [
(min(u, v), max(u, v))
for u, v in zip(path[:-1], path[1:], strict=False)
]
effective_p_gen = min(rates[lk] for lk in links)
pga_duration = duration_pga(
p_packet=p_packet,
epr_pairs=req["epr"],
n_swap=n_swaps,
memory=memory,
p_swap=p_swap,
p_gen=effective_p_gen,
time_slot_duration=time_slot_duration,
)
delta = float(pga_duration) / float(req["deadline_budget"])
return delta, links
[docs]
def smallest_bottleneck(
simple_paths: Dict[Tuple[str, str], List[List[str]]],
src: str,
dst: str,
req: Dict[str, Any],
cap: Dict[Tuple[str, str], float],
p_packet: float | None,
memory: int,
p_swap: float,
rates: Dict[Tuple[str, str], float],
time_slot_duration: float,
k: int | None = None,
) -> Tuple[List[List[str]], float, float]:
candidates = []
for e2e_fid, path in all_simple_paths(simple_paths, src, dst):
if e2e_fid < req["min_fidelity"]:
continue
delta, links = _compute_delta_and_links(
path, req, p_packet, memory, p_swap, rates, time_slot_duration
)
post_loads = [cap[lk] + delta for lk in links]
bottleneck = max(post_loads)
candidates.append((bottleneck, path, delta, e2e_fid))
if not candidates:
return [], 0.0, float("nan")
candidates.sort(key=lambda x: x[0])
if k is not None:
candidates = candidates[:k]
selected = candidates[0]
result = [list(c[1]) for c in candidates]
return result, selected[2], selected[3]
[docs]
def least_capacity(
simple_paths: Dict[Tuple[str, str], List[List[str]]],
src: str,
dst: str,
req: Dict[str, Any],
cap: Dict[Tuple[str, str], float],
p_packet: float | None,
memory: int,
p_swap: float,
rates: Dict[Tuple[str, str], float],
time_slot_duration: float,
rng: np.random.Generator | None = None,
provisioning: bool = True,
) -> Tuple[List[List[str]], float, float]:
"""Select the path with the least total capacity utilization among all
paths that meet the fidelity requirement. The total capacity utilization of
a path is defined as the sum of the capacity utilizations of all edges
along the path after accounting for the additional load (delta) introduced
by the new request.
"""
min_val = None
tied_candidates = []
all_paths = all_simple_paths(simple_paths, src, dst)
for path in all_paths:
e2e_fid, path = path[0], path[1]
if e2e_fid < req["min_fidelity"]:
continue
delta, links = _compute_delta_and_links(
path, req, p_packet, memory, p_swap, rates, time_slot_duration
)
sum_cap = sum(cap[lk] + delta for lk in links)
if min_val is None or sum_cap < min_val:
min_val = sum_cap
tied_candidates = [(path, delta, e2e_fid)]
elif np.isclose(sum_cap, min_val):
tied_candidates.append((path, delta, e2e_fid))
if not tied_candidates:
return [], 0.0, float("nan")
initial_idx = (
0
if rng is None or len(tied_candidates) == 1
else int(rng.integers(len(tied_candidates)))
)
selected_path, selected_delta, selected_e2e_fid = (
tied_candidates[initial_idx]
)
if provisioning:
result = [list(selected_path)] + [
list(p)
for i, (p, _, _) in enumerate(tied_candidates)
if i != initial_idx
]
else:
result = [list(selected_path)]
return result, selected_delta, selected_e2e_fid
[docs]
def fidelity_shortest(
simple_paths: Dict[Tuple[str, str], List[List[str]]],
src: str,
dst: str,
min_fidelity: float,
rng: np.random.Generator,
provisioning: bool = True,
) -> Tuple[List[List[str]], float]:
candidate_paths = []
shortest_length = None
for path in all_simple_paths(simple_paths, src, dst):
e2e_fid, path_nodes = path[0], path[1]
if e2e_fid < min_fidelity:
continue
path_len = len(path_nodes)
if shortest_length is None:
shortest_length = path_len
elif path_len > shortest_length:
break
candidate_paths.append((e2e_fid, list(path_nodes)))
if not candidate_paths:
return [], float("nan")
initial_idx = int(rng.integers(len(candidate_paths)))
initial_fid, initial_path = candidate_paths[initial_idx]
if provisioning:
result = [initial_path] + [
p for i, (_, p) in enumerate(candidate_paths) if i != initial_idx
]
else:
result = [initial_path]
return result, initial_fid
[docs]
def highest_fidelity(
simple_paths: Dict[Tuple[str, str], List[List[str]]],
src: str,
dst: str,
min_fidelity: float,
rng: np.random.Generator,
provisioning: bool = True,
) -> Tuple[List[List[str]], float]:
"""Select the path with the highest E2E fidelity among all paths that
meet the minimum fidelity requirement. Ties are broken randomly.
Args:
simple_paths: Pre-computed (fidelity, path) pairs keyed by node pair.
src: Source node.
dst: Destination node.
min_fidelity: Minimum acceptable E2E fidelity.
rng: Random number generator for tie-breaking.
provisioning: If True, return selected path plus all tied candidates;
otherwise return only the selected path.
Returns:
Tuple of (list of paths with selected first, e2e fidelity or nan).
"""
best_fid = float("nan")
tied_candidates = []
all_paths = all_simple_paths(simple_paths, src, dst)
for path in all_paths:
e2e_fid = path[0]
if e2e_fid < min_fidelity:
continue
if not tied_candidates or e2e_fid > best_fid:
best_fid = e2e_fid
tied_candidates = [list(path[1])]
elif np.isclose(e2e_fid, best_fid):
tied_candidates.append(list(path[1]))
if not tied_candidates:
return [], float("nan")
initial_idx = (
0
if len(tied_candidates) == 1
else int(rng.integers(len(tied_candidates)))
)
if provisioning:
result = [tied_candidates[initial_idx]] + [
p for i, p in enumerate(tied_candidates) if i != initial_idx
]
else:
result = [tied_candidates[initial_idx]]
return result, best_fid
def _update_capacity(
path: List[str],
delta: float,
cap: Dict[Tuple[str, str], float],
) -> None:
for u, v in zip(path[:-1], path[1:], strict=False):
link = tuple(sorted((u, v)))
cap[link] += delta
[docs]
def find_feasible_path(
edges: List[Tuple[str, str]],
simple_paths: Dict[Tuple[str, str], List[List[str]]],
app_requests: Dict[str, Dict[str, Any]],
fidelities: Dict[Tuple[str, str], float] | None,
pga_rel_times: Dict[str, float] | None = None,
routing_mode: str = "shortest",
p_packet: float | None = None,
memory: int = 1,
p_swap: float = 0.6,
rates: Dict[Tuple[str, str], float] | None = None,
time_slot_duration: float = 1e-4,
rng: np.random.Generator | None = None,
provisioning: bool = True,
) -> Dict[str, List[List[str]]]:
"""Find feasible paths for each application request based on the specified
routing and the fidelity threshold.
Args:
edges (List[Tuple[str, str]]): List of edges in the quantum network,
where each edge is a tuple of nodes (src, dst).
app_requests (Dict[str, Dict[str, Any]]): A dictionary where keys are
application names and values are dictionaries containing source and
destination nodes (src, dst) and minimum fidelity (min_fidelity)
for the application.
fidelities (Dict[Tuple[str, str], float]): Per-edge fidelities in
the quantum network, where keys are directed edges (src, dst) and
values are the fidelities of those edges.
routing_mode (str, optional): Routing mode, either "shortest", "random"
, "degree", or "capacity". In "capacity" mode, edges that have
reached the capacity threshold are excluded from path selection. In
"random" mode, a random path is selected among all shortest paths
that meet the fidelity requirement. In "degree" mode, the path with
the lowest maximum degree of internal nodes is selected among all
shortest paths that meet the fidelity requirement.
p_packet (float, optional): Probability of a packet being generated.
memory (int, optional): Number of independent link-generation trials
per slot.
p_swap (float, optional): Probability of swapping an EPR pair in a
single trial.
rates (Dict[Tuple[str, str], float], optional): Per-link p_gen.
time_slot_duration (float, optional): Duration of a time slot in
seconds.
provisioning (bool, optional): Whether to enable provisioning for
routing.
Returns:
Tuple[
Dict[str, List[List[str]]],
Dict[str, float],
]: A tuple of:
- paths: dict mapping application names to lists of feasible paths
fidelity of the selected path, or nan if none was found.
- e2e_fids: dict mapping application names to the end-to-end
fidelity of the selected path, or nan if none was found.
"""
if fidelities is None or not fidelities:
return (
{app: [] for app in app_requests.keys()},
{app: float("nan") for app in app_requests.keys()},
)
G = nx.Graph()
G.add_edges_from(edges)
ret = {}
e2e_fids = {}
cap = defaultdict(float)
apps_ordered = list(app_requests.keys())
if pga_rel_times is not None:
apps_ordered.sort(
key=lambda app: (float(pga_rel_times.get(app, 0.0)), str(app))
)
for app in apps_ordered:
req = app_requests[app]
src = req["src"]
dst = req["dst"]
min_fidelity = req["min_fidelity"]
if min_fidelity <= 0.5:
ret[app] = []
e2e_fids[app] = float("nan")
continue
if not nx.has_path(G, src, dst):
ret[app] = []
e2e_fids[app] = float("nan")
continue
if routing_mode == "smallest":
path_list, selected_delta, selected_e2e_fid = smallest_bottleneck(
simple_paths,
src,
dst,
req,
cap,
p_packet,
memory,
p_swap,
rates,
time_slot_duration,
k=None if provisioning else 1,
)
if not path_list:
ret[app] = []
e2e_fids[app] = float("nan")
continue
_update_capacity(path_list[0], selected_delta, cap)
ret[app] = path_list
e2e_fids[app] = selected_e2e_fid
continue
elif routing_mode == "least":
path_list, selected_delta, selected_e2e_fid = least_capacity(
simple_paths,
src,
dst,
req,
cap,
p_packet,
memory,
p_swap,
rates,
time_slot_duration,
rng,
provisioning,
)
if not path_list:
ret[app] = []
e2e_fids[app] = float("nan")
continue
_update_capacity(path_list[0], selected_delta, cap)
ret[app] = path_list
e2e_fids[app] = selected_e2e_fid
continue
elif routing_mode == "highest":
path_list, selected_e2e_fid = highest_fidelity(
simple_paths,
src,
dst,
min_fidelity,
rng,
provisioning,
)
ret[app] = path_list
e2e_fids[app] = selected_e2e_fid
continue
else:
path_list, selected_e2e_fid = fidelity_shortest(
simple_paths,
src,
dst,
min_fidelity,
rng,
provisioning,
)
ret[app] = path_list
e2e_fids[app] = selected_e2e_fid
return ret, e2e_fids
[docs]
def rerouting(
precomputed: Dict[str, List[Tuple]],
deadline: float,
cur_t: float,
app: str,
resources: Dict[Tuple[str, str], float] | None = None,
) -> Tuple[List[str], List[Tuple[str, str]], float, float, float] | None:
best = None
best_score = None
for e2e_fid, path, links, pga_duration in precomputed.get(app, []):
waits = [max(resources.get(lnk, 0.0) - cur_t, 0.0) for lnk in links]
start = cur_t + max(waits, default=0.0)
finish = start + pga_duration
if finish > deadline + EPS:
continue
sum_wait = sum(waits)
score = (finish, sum_wait)
if best_score is None or score < best_score:
best_score = score
best = (path, links, start, pga_duration, e2e_fid)
return best
[docs]
def compute_path_durations(
pga_params: Dict[str, float],
rates: Dict[Tuple[str, str], float],
simple_paths: Dict[Tuple[str, str], List] | None = None,
provisioned_paths: List[List[str]] | None = None,
src: str | None = None,
dst: str | None = None,
) -> List[Tuple]:
duration_cache = {}
result = []
if provisioned_paths is not None:
items = ((None, p) for p in provisioned_paths)
else:
items = (
(item[0], item[1])
for item in all_simple_paths(simple_paths, src, dst)
)
for e2e_fid, path in items:
links = [
(min(u, v), max(u, v))
for u, v in zip(path[:-1], path[1:], strict=False)
]
n_swap = max(0, len(path) - 2)
effective_p_gen = min(rates[lk] for lk in links)
key = (n_swap, effective_p_gen)
if key not in duration_cache:
duration_cache[key] = duration_pga(
p_packet=pga_params["p_packet"],
epr_pairs=int(pga_params["epr_pairs"]),
n_swap=n_swap,
memory=pga_params["memory"],
p_swap=pga_params["p_swap"],
p_gen=effective_p_gen,
time_slot_duration=pga_params["slot_duration"],
)
result.append((e2e_fid, path, links, duration_cache[key]))
return result
[docs]
def dynamic_routing(
candidate_paths: List[Tuple[float, Any, List[Tuple[str, str]], float]],
min_fidelity: float,
deadline: float,
cur_t: float = 0.0,
resources: Dict[Tuple[str, str], float] = None,
) -> Tuple[Tuple | None, float | None]:
next_avail = None
best = None
best_finish = float("inf")
deadline_eps = deadline + EPS
cur_t_eps = cur_t + EPS
for e2e_fid, path, links, pga_duration in candidate_paths:
if e2e_fid < min_fidelity:
continue
if cur_t + pga_duration > deadline_eps:
continue
avail = cur_t
for lnk in links:
v = resources.get(lnk, 0.0)
if v > avail:
avail = v
finish = avail + pga_duration
if finish > deadline_eps:
continue
if avail > cur_t_eps:
if next_avail is None or avail < next_avail:
next_avail = avail
continue
if finish < best_finish:
best_finish = finish
best = (path, links, cur_t, pga_duration, e2e_fid)
return best, next_avail
[docs]
def static_routing(
app_requests: Dict[str, Dict[str, Any]],
simple_paths: Dict[Tuple[str, str], List[List[str]]],
) -> Tuple[Dict[str, List[List[str]]], Dict[str, float]]:
ret = {}
e2e_fids = {}
path_cache = {}
for app, req in app_requests.items():
src, dst = req["src"], req["dst"]
if (src, dst) in path_cache:
chosen_path, best_fid = path_cache[(src, dst)]
min_fid = req.get("min_fidelity", 0.0)
if not chosen_path or best_fid < min_fid:
ret[app] = []
e2e_fids[app] = float("nan")
else:
ret[app] = [chosen_path]
e2e_fids[app] = best_fid
continue
best_fid = float("-inf")
chosen_path = []
for path in all_simple_paths(simple_paths, src, dst):
e2e_fid, path_nodes = path[0], path[1]
path_nodes_list = list(path_nodes)
if e2e_fid > best_fid:
best_fid = e2e_fid
chosen_path = path_nodes_list
min_fid = req.get("min_fidelity", 0.0)
if not chosen_path:
path_cache[(src, dst)] = ([], float("nan"))
ret[app] = []
e2e_fids[app] = float("nan")
else:
chosen_fid = best_fid
if chosen_fid < min_fid:
path_cache[(src, dst)] = ([], chosen_fid)
ret[app] = []
e2e_fids[app] = float("nan")
else:
path_cache[(src, dst)] = (chosen_path, chosen_fid)
ret[app] = [chosen_path]
e2e_fids[app] = chosen_fid
return ret, e2e_fids