Source code for utils.parallel_simulations

import json
import multiprocessing as mp
import os
import shutil
import signal
import tempfile
from typing import Any, Optional, Sequence

import pandas as pd
from tqdm.auto import tqdm

from scheduling.main import run_simulation
from utils.helper import (
    build_default_sim_args,
    ppacket_dirname,
    prepare_run_dir,
)

_WORKER_DEFAULT_KWARGS = None
_WORKER_RUN_DIR = None


def _init_worker(default_kwargs: dict[str, Any], run_dir: str) -> None:
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    global _WORKER_DEFAULT_KWARGS, _WORKER_RUN_DIR
    _WORKER_DEFAULT_KWARGS = default_kwargs
    _WORKER_RUN_DIR = run_dir


[docs] def simulate_one_ppacket(args: tuple) -> dict: (p_packet, run_seed, arrival_rate, inst_range, keep_seed_outputs) = args default_kwargs = _WORKER_DEFAULT_KWARGS run_dir = _WORKER_RUN_DIR if keep_seed_outputs: base_dir = run_dir ppacket_dir = os.path.join(base_dir, ppacket_dirname(p_packet)) sd_dir = os.path.join(ppacket_dir, f"seed_{run_seed}") os.makedirs(sd_dir, exist_ok=True) cleanup = False else: sd_dir = tempfile.mkdtemp(prefix=f"seed_{run_seed}_") cleanup = True sim_kwargs = {**default_kwargs} sim_kwargs.update( p_packet=p_packet, arrival_rate=arrival_rate, seed=run_seed, output_dir=sd_dir, save_csv=keep_seed_outputs, verbose=False, ) if inst_range is not None: sim_kwargs["inst_range"] = inst_range effective_inst = sim_kwargs.get("inst_range", 100) effective_rate = sim_kwargs.get("instance_arrival_rate", 10.0) or 10.0 max_obs = 3 * effective_inst * 10 * (1.0 / effective_rate) sim_kwargs["windows"] = (0.15 * max_obs, max_obs) try: _, summary = run_simulation(**sim_kwargs) finally: if cleanup: shutil.rmtree(sd_dir, ignore_errors=True) summary_metrics = { "throughput": float("nan"), "app_throughput": float("nan"), "service_ratio": float("nan"), "completed_ratio": float("nan"), "drop_ratio": float("nan"), "avg_waiting_time": float("nan"), "avg_turnaround_time": float("nan"), "avg_service_time": float("nan"), "avg_hops": float("nan"), "avg_e2e_fidelity": float("nan"), "avg_pga_duration": float("nan"), "avg_routing_efficiency": float("nan"), "avg_link_utilization": float("nan"), "p90_link_utilization": float("nan"), "p95_link_utilization": float("nan"), "p90_link_avg_wait": float("nan"), "p95_link_avg_wait": float("nan"), "avg_queue_length": float("nan"), "avg_deg": float("nan"), "fairness": float("nan"), "routing_decision_count": 0, "routing_decision_runtime": float("nan"), } if summary is not None: summary_metrics.update(summary) result = { "p_packet": p_packet, "arrival_rate": arrival_rate, "seed": run_seed, **summary_metrics, } if inst_range is not None: result["inst_range"] = inst_range return result
[docs] def run_parallel_sims( tasks: list[tuple[Any, ...]], max_workers: int, show_progress: bool, default_kwargs: dict[str, Any], run_dir: str, ) -> list[dict[str, Any]]: mp_ctx = mp.get_context("spawn") n_tasks = len(tasks) if n_tasks == 0: return [] n_procs = min(max_workers, n_tasks) chunksize = 1 pool = mp_ctx.Pool( processes=n_procs, initializer=_init_worker, initargs=(default_kwargs, run_dir), ) try: it = pool.imap_unordered(simulate_one_ppacket, tasks, chunksize) if show_progress: it = tqdm(it, total=n_tasks, desc="Simulations", unit="run") records = list(it) pool.close() return records except (KeyboardInterrupt, Exception): pool.terminate() raise finally: pool.join()
[docs] def run_ppacket_parallel_simulations( ppacket_values: Sequence[float], arrival_rate_values: Sequence[float], simulations_per_point: int, seed_start: int, run_dir: str, default_kwargs: dict, keep_seed_outputs: bool, inst_range_values: Sequence[int] | None = None, max_workers: Optional[int] = None, show_progress: bool = True, raw_csv_path: str | None = None, ) -> pd.DataFrame: seed_pool = [seed_start + i for i in range(simulations_per_point)] inst_range_sweep = inst_range_values or [None] tasks = [ (p_packet, run_seed, arrival_rate, inst_range, keep_seed_outputs) for p_packet in ppacket_values for arrival_rate in arrival_rate_values for inst_range in inst_range_sweep for run_seed in seed_pool ] workers = max_workers or os.cpu_count() or 1 records = run_parallel_sims( tasks=tasks, max_workers=workers, show_progress=show_progress, default_kwargs=default_kwargs, run_dir=run_dir, ) results_df = pd.DataFrame(records) if raw_csv_path: results_df.to_csv(raw_csv_path, index=False) return results_df
[docs] def run_ppacket_sweep_to_csv( ppacket_values: Sequence[float], arrival_rate_values: Sequence[float], simulations_per_point: int, seed_start: int = 0, config: str = "configurations/network/Dumbbell.gml", output_dir: str = "results", simulation_kwargs: dict | None = None, keep_seed_outputs: bool = False, inst_range_values: Sequence[int] | None = None, max_workers: Optional[int] = None, show_progress: bool = True, ) -> tuple[pd.DataFrame, str]: run_dir, timestamp = prepare_run_dir( output_dir, ppacket_values, keep_seed_outputs=keep_seed_outputs, ) raw_csv_path = os.path.join(run_dir, f"{timestamp}_raw.csv") default_kwargs = build_default_sim_args(config, simulation_kwargs) df = run_ppacket_parallel_simulations( ppacket_values=ppacket_values, arrival_rate_values=arrival_rate_values, simulations_per_point=simulations_per_point, seed_start=seed_start, run_dir=run_dir, default_kwargs=default_kwargs, keep_seed_outputs=keep_seed_outputs, inst_range_values=inst_range_values, max_workers=max_workers, show_progress=show_progress, raw_csv_path=raw_csv_path, ) args_json_path = os.path.join(run_dir, f"{timestamp}_args.json") args_record = { "ppacket_values": list(ppacket_values), "arrival_rate_values": list(arrival_rate_values), "inst_range_values": ( list(inst_range_values) if inst_range_values is not None else None ), "simulations_per_point": simulations_per_point, "seed_start": seed_start, "config": config, "keep_seed_outputs": keep_seed_outputs, "max_workers": max_workers, "simulation_kwargs": default_kwargs, } with open(args_json_path, "w") as f: json.dump(args_record, f, indent=2) return df, raw_csv_path