| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- # simulation.py
- # -*- coding: utf-8 -*-
- from __future__ import annotations
- from dataclasses import dataclass
- from typing import Dict, Any, List
- import csv, os, math, random
- # ===== 既存ネットワークAPIに合わせたアダプタ =====
- class Adapter:
- """
- あなたの network.py / nb_protocol.py を変更せずに使うための薄いラッパ。
- - QuantumNetwork(path_num, fidelity_list, noise_model) を自前で構築
- - 単一ペア 'Alice-Bob' に path_id=1..path_num を割当
- - スケジューラが期待する nb_protocol 互換API(sample_path / true_fidelity)を Shim で提供
- """
- def __init__(self, noise_model: str, path_num: int, fidelity_list: List[float], seed: int | None = None):
- if seed is not None:
- random.seed(seed)
- import network as qnet
- # QuantumNetwork を直接構築(network.py を変更しない)
- self.net = qnet.QuantumNetwork(path_num=path_num, fidelity_list=fidelity_list, noise_model=noise_model)
- self.pairs = ["Alice-Bob"]
- self.paths_map = {"Alice-Bob": list(range(1, path_num + 1))}
- # nb_protocol 互換 Shim
- self.nbp = _NBPShim(self.net)
- # ---- ヘルパ ----
- def true_fidelity(self, path_id: Any) -> float:
- return self.nbp.true_fidelity(self.net, path_id)
- def list_pairs(self) -> List[Any]:
- return list(self.pairs)
- def list_paths_of(self, pair_id: Any) -> List[Any]:
- return list(self.paths_map.get(pair_id, []))
- # ---- スケジューラ呼び出し ----
- def run_scheduler(self, scheduler_name: str, budget_target: int,
- importance: Dict[Any, float]) -> Dict[str, Any]:
- """
- スケジューラに共通IFで実行要求する。
- 返り値の想定(辞書):
- {
- 'used_cost_total': int,
- 'per_pair_details': [
- {
- 'pair_id': pair_id,
- 'alloc_by_path': {path_id: sample_count, ...},
- 'est_fid_by_path': {path_id: mean_estimate, ...},
- 'best_pred_path': path_id,
- }, ...
- ]
- }
- """
- if scheduler_name == "greedy":
- from schedulers.greedy_scheduler import run as greedy_run
- return greedy_run(self.net, self.pairs, self.paths_map, budget_target, importance, self.nbp)
- elif scheduler_name == "naive":
- from schedulers.lnaive_scheduler import run as naive_run
- return naive_run(self.net, self.pairs, self.paths_map, budget_target, importance, self.nbp)
- elif scheduler_name == "online_nb":
- from schedulers.lonline_nb import run as onb_run
- return onb_run(self.net, self.pairs, self.paths_map, budget_target, importance, self.nbp)
- else:
- raise ValueError(f"unknown scheduler: {scheduler_name}")
- # ===== 便利関数 =====
- def hoeffding_radius(n: int, delta: float = 0.05) -> float:
- if n <= 0:
- return 1.0
- return math.sqrt(0.5 * math.log(2.0 / delta) / n)
- def clamp01(x: float) -> float:
- return 0.0 if x < 0.0 else (1.0 if x > 1.0 else x)
- # ===== CSV I/O =====
- CSV_HEADER = [
- "run_id", "noise", "scheduler", "budget_target",
- "used_cost_total",
- "pair_id", "path_id",
- "importance", # I_d
- "samples", # B_{d,l}
- "est_mean", "lb", "ub", "width",
- "is_best_true", "is_best_pred"
- ]
- def open_csv(path: str):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- exists = os.path.exists(path)
- f = open(path, "a", newline="")
- w = csv.writer(f)
- if not exists:
- w.writerow(CSV_HEADER)
- return f, w
- # ===== メインシミュレーション =====
- @dataclass
- class ExperimentConfig:
- noise_model: str
- budgets: List[int]
- schedulers: List[str] # ["greedy", "naive", "online_nb", ...]
- repeats: int
- importance_mode: str = "both" # "both" / "weighted_only" / "unweighted_only"
- delta_ci: float = 0.05 # 95%CI相当
- out_csv: str = "outputs/raw_simulation_data.csv"
- seed: int | None = None
- # QuantumNetwork 構築用
- path_num: int = 5
- fidelity_list: List[float] | None = None
- def _importance_for_pairs(pairs: List[Any], mode: str) -> Dict[str, Dict[Any, float]]:
- res: Dict[str, Dict[Any, float]] = {}
- if mode in ("both", "unweighted_only"):
- res["unweighted"] = {p: 1.0 for p in pairs}
- if mode in ("both", "weighted_only"):
- # 重要度は例として一様乱数(必要なら差替え)
- res["weighted"] = {p: 0.5 + random.random() for p in pairs}
- return res
- def run_and_append_csv(cfg: ExperimentConfig) -> str:
- fid = cfg.fidelity_list or _default_fidelities(cfg.path_num)
- adp = Adapter(cfg.noise_model, cfg.path_num, fid, seed=cfg.seed)
- pairs = adp.list_pairs()
- importance_sets = _importance_for_pairs(pairs, cfg.importance_mode)
- f, w = open_csv(cfg.out_csv)
- try:
- run_id = 0
- for _ in range(cfg.repeats):
- run_id += 1
- for budget in cfg.budgets:
- for sched in cfg.schedulers:
- for imp_tag, I in importance_sets.items():
- # スケジューラ実行
- result = adp.run_scheduler(sched, budget, I)
- used_cost_total = int(result.get("used_cost_total", budget))
- per_pair_details: List[Dict[str, Any]] = result.get("per_pair_details", [])
- # 真の最良パス(正答率判定用)
- true_best_by_pair = {}
- for pair in pairs:
- paths = adp.list_paths_of(pair)
- best = None
- bestv = -1.0
- for pid in paths:
- tf = adp.true_fidelity(pid)
- if tf > bestv:
- bestv, best = tf, pid
- true_best_by_pair[pair] = best
- # CSV行を形成
- for det in per_pair_details:
- pair_id = det["pair_id"]
- alloc = det.get("alloc_by_path", {}) or {}
- est = det.get("est_fid_by_path", {}) or {}
- pred = det.get("best_pred_path")
- for path_id, samples in alloc.items():
- m = float(est.get(path_id, 0.5))
- r = hoeffding_radius(int(samples), cfg.delta_ci)
- lb = clamp01(m - r)
- ub = clamp01(m + r)
- width = ub - lb
- is_true_best = (true_best_by_pair.get(pair_id) == path_id)
- is_best_pred = (pred == path_id)
- w.writerow([
- f"{run_id}-{imp_tag}",
- cfg.noise_model,
- sched,
- budget,
- used_cost_total,
- pair_id,
- path_id,
- I.get(pair_id, 1.0),
- int(samples),
- m, lb, ub, width,
- int(is_true_best), int(is_best_pred),
- ])
- finally:
- f.close()
- return cfg.out_csv
- # ===== nb_protocol 互換 Shim =====
- class _NBPShim:
- """
- スケジューラが期待する nb_protocol 風のAPIを提供:
- - sample_path(net, path_id, n): QuantumNetwork.benchmark_path を呼ぶ
- - true_fidelity(net, path_id): 量子チャネルの ground truth を返す
- """
- def __init__(self, net):
- self.net = net
- def sample_path(self, net, path_id: int, n: int) -> float:
- # 1-bounce を n 回の測定にマップ(nb_protocol.NBProtocolAlice の設計に整合)
- p, _cost = self.net.benchmark_path(path_id, bounces=[1], sample_times={1: int(n)})
- return float(p)
- def true_fidelity(self, net, path_id: int) -> float:
- return float(self.net.quantum_channels[path_id - 1].fidelity)
- # ===== デフォルト忠実度の簡易生成(必要なら差替え) =====
- def _default_fidelities(path_num: int) -> List[float]:
- alpha, beta, var = 0.93, 0.85, 0.02
- res = [max(0.8, min(0.999, random.gauss(beta, var))) for _ in range(path_num)]
- best_idx = random.randrange(path_num)
- res[best_idx] = max(0.85, min(0.999, random.gauss(alpha, var)))
- return res
|