import networkx as nx
import numpy as np
from utils.helper import compute_edge_fidelities, compute_edge_rates
[docs]
def generate_waxman_graph(
n: int = 48,
alpha: float = 0.2,
beta: float = 0.3,
rng: np.random.Generator | None = None,
max_retries: int = 5000,
max_avg_degree: float = 3.0,
max_hops: int = 8,
) -> tuple[list, list, dict, dict, float, float]:
"""Generates a Waxman graph with constraints on connectivity,
average degree, and diameter.
Args:
n (int, optional): Number of nodes in the graph.
alpha (float, optional:): Controls the influence of distance on edge
probability. Higher alpha values lead to fewer edges between distant
nodes.
beta (float, optional): Controls the overall density of the graph.
Higher beta values lead to more edges.
rng (np.random.Generator | None, optional): Random number generator.
max_retries (int, optional): Maximum number of retries to generate a
valid graph (connected, with average degree <= max_avg_degree,
and diameter <= max_hops).
max_avg_degree (float, optional): Maximum average degree of the graph.
max_hops (int, optional): Maximum diameter of the graph. Defaults to 8.
Returns:
tuple[list, list, dict, dict, float, float]: A tuple containing:
- List of nodes in the graph.
- List of edges in the graph.
- Dictionary mapping edges to their fidelities.
- Dictionary mapping edges to their p_gen rates.
- Average degree of the graph.
- Diameter of the graph.
"""
G = None
diameter = float("nan")
for _ in range(max_retries):
G = nx.waxman_graph(n, alpha=alpha, beta=beta, seed=rng)
if not nx.is_connected(G):
continue
avg_deg = 2 * G.number_of_edges() / G.number_of_nodes()
if avg_deg > max_avg_degree:
continue
diameter = float(nx.diameter(G))
if diameter > max_hops:
continue
break
else:
return [], [], {}, {}, 0.0, float("nan")
nodes = sorted(G.nodes(), key=str)
edges = sorted(G.edges(), key=lambda edge: (str(edge[0]), str(edge[1])))
pos = nx.get_node_attributes(G, "pos")
distances = {}
for u, v in G.edges():
sq_d = (pos[u][0] - pos[v][0]) ** 2 + (pos[u][1] - pos[v][1]) ** 2
d = sq_d**0.5
distances[(u, v)] = d
G[u][v]["dist"] = d
fidelites = compute_edge_fidelities(G, distances)
rates = compute_edge_rates(G, distances)
return nodes, edges, fidelites, rates, avg_deg, diameter
[docs]
def fat_tree(
k: int = 4,
qpu_edge_dist=1.0,
edge_aggregate_dist=3.0,
aggregate_core_dist=6.0,
L_dep: float = 50.0,
) -> tuple[list, list, dict, dict, list, float]:
"""Generates a fat-tree topology with k pods. Each pod contains k/2 edge
switches and k/2 aggregate switches. The core layer has (k/2)^2 core
switches. Each edge switch connects to k/2 hosts (QPUs). See:
https://arxiv.org/pdf/2601.01353
Args:
k (int, optional): Number of pods.
qpu_edge_dist (float, optional): Distance between QPUs and edge
switches.
edge_aggregate_dist (float, optional): Distance between edge and
aggregate switches.
aggregate_core_dist (float, optional): Distance between aggregate and
core switches.
L_dep (float, optional): Depolarization length in km for edge
fidelities.
Returns:
tuple[list, list, dict, dict, list, float]: A tuple containing:
- List of nodes in the graph.
- List of edges in the graph.
- Dictionary mapping edges to their fidelities.
- Dictionary mapping edges to their p_gen rates.
- List of QPUs (hosts) in the graph.
- Diameter of the graph.
"""
G = nx.Graph()
qpus = []
pods = k
core_switches = (k // 2) ** 2
agg_per_pod = k // 2
edge_per_pod = k // 2
qpus_per_edge = k // 2
# Core
cores = [f"core_{i}" for i in range(core_switches)]
G.add_nodes_from(cores, layer=0)
for p in range(pods):
aggregate = [f"pod{p}_agg_{i}" for i in range(agg_per_pod)]
edge = [f"pod{p}_edge_{i}" for i in range(edge_per_pod)]
G.add_nodes_from(aggregate, layer=1, pod=p)
G.add_nodes_from(edge, layer=2, pod=p)
# core <-> aggregate
for i, a in enumerate(aggregate):
for j in range(k // 2):
core_i = i * (k // 2) + j
G.add_edge(a, cores[core_i], dist=float(aggregate_core_dist))
# aggregate <-> edge
for e in edge:
for a in aggregate:
G.add_edge(e, a, dist=float(edge_aggregate_dist))
# edge <-> qpu
for e_i, e in enumerate(edge):
for qpu in range(qpus_per_edge):
qpu = f"pod{p}_qpu_{e_i}_{qpu}"
qpus.append(qpu)
G.add_node(qpu, layer=3, pod=p)
G.add_edge(e, qpu, dist=float(qpu_edge_dist))
nodes = sorted(G.nodes(), key=str)
edges = sorted(G.edges(), key=lambda edge: (str(edge[0]), str(edge[1])))
distances = {(u, v): float(G.edges[u, v]["dist"]) for (u, v) in edges}
fidelities = compute_edge_fidelities(G, distances, L_dep=L_dep)
rates = compute_edge_rates(G, distances)
diameter = float(nx.diameter(G))
return nodes, edges, fidelities, rates, qpus, diameter