From 73d8ebb5c1bf7f02eee595a58267fd0288f33b50 Mon Sep 17 00:00:00 2001 From: yuyr Date: Sun, 15 Mar 2026 22:49:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20payload=20replay=20for=20s?= =?UTF-8?q?napshot=EF=BC=8C20260313=20=E8=BF=AD=E4=BB=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/payload_replay/README.md | 100 ++ .../compare_with_routinator_record.sh | 110 +++ .../report_to_routinator_csv.py | 57 ++ scripts/payload_replay/run_apnic_replay.sh | 150 +++ src/cli.rs | 212 +++- src/lib.rs | 2 + src/replay/archive.rs | 909 ++++++++++++++++++ src/replay/fetch_http.rs | 399 ++++++++ src/replay/fetch_rsync.rs | 253 +++++ src/replay/mod.rs | 3 + src/sync/repo.rs | 310 ++++++ src/validation/run.rs | 1 + src/validation/run_tree_from_tal.rs | 336 ++++++- src/validation/tree_runner.rs | 45 +- tests/test_apnic_stats_live_stage2.rs | 1 + tests/test_cli_payload_replay_smoke.rs | 69 ++ tests/test_payload_replay_tools.rs | 124 +++ 17 files changed, 3060 insertions(+), 21 deletions(-) create mode 100644 scripts/payload_replay/README.md create mode 100755 scripts/payload_replay/compare_with_routinator_record.sh create mode 100755 scripts/payload_replay/report_to_routinator_csv.py create mode 100755 scripts/payload_replay/run_apnic_replay.sh create mode 100644 src/replay/archive.rs create mode 100644 src/replay/fetch_http.rs create mode 100644 src/replay/fetch_rsync.rs create mode 100644 src/replay/mod.rs create mode 100644 tests/test_cli_payload_replay_smoke.rs create mode 100644 tests/test_payload_replay_tools.rs diff --git a/scripts/payload_replay/README.md b/scripts/payload_replay/README.md new file mode 100644 index 0000000..2d44757 --- /dev/null +++ b/scripts/payload_replay/README.md @@ -0,0 +1,100 @@ +# Payload Replay Scripts + +本目录提供基于本地 payload archive 的手工 replay 入口。 + +## `run_apnic_replay.sh` + +默认使用: + +- `tests/fixtures/tal/apnic-rfc7730-https.tal` +- `tests/fixtures/ta/apnic-ta.cer` +- `target/live/payload_replay/payload-archive` +- `target/live/payload_replay/locks.json` + +运行: + +```bash +./scripts/payload_replay/run_apnic_replay.sh +``` + +产物默认输出到: + +- `target/live/payload_replay_runs/` + +包含: + +- replay DB 目录 +- `report.json` +- `run.log` +- `meta.json` +- `summary.md` + +## 环境变量 + +可覆盖: + +- `TAL_PATH` +- `TA_PATH` +- `PAYLOAD_REPLAY_ARCHIVE` +- `PAYLOAD_REPLAY_LOCKS` +- `VALIDATION_TIME` +- `MAX_DEPTH` +- `MAX_INSTANCES` +- `OUT_DIR` +- `RUN_NAME` +- `DB_DIR` +- `REPORT_JSON` +- `RUN_LOG` +- `META_JSON` +- `SUMMARY_MD` + +## 说明 + +- 该脚本依赖 `rpki` CLI 已支持: + - `--payload-replay-archive` + - `--payload-replay-locks` +- replay 模式必须搭配离线 TAL/TA 输入,不会去访问真实 RRDP / rsync 网络源。 + +## `report_to_routinator_csv.py` + +把 `rpki` 生成的 `report.json` 转成 Routinator 风格的 VRP CSV: + +```bash +python3 scripts/payload_replay/report_to_routinator_csv.py \ + --report target/live/payload_replay_runs/_report.json \ + --out target/live/payload_replay_runs/_vrps.csv \ + --trust-anchor apnic +``` + +输出列为: + +- `ASN` +- `IP Prefix` +- `Max Length` +- `Trust Anchor` + +## `compare_with_routinator_record.sh` + +把 ours 生成的 VRP CSV 与 Routinator 的 `record.csv` 做对比: + +```bash +./scripts/payload_replay/compare_with_routinator_record.sh \ + target/live/payload_replay_runs/_vrps.csv \ + target/live/payload_replay/record.csv +``` + +会产出: + +- compare summary Markdown +- `only_in_ours.csv` +- `only_in_record.csv` + +## `run_apnic_replay.sh` 现有额外产物 + +脚本现在除了 `report/meta/summary`,还会额外生成: + +- `vrps.csv` +- 若 `ROUTINATOR_RECORD_CSV` 存在,则生成: + - compare summary + - `only_in_ours.csv` + - `only_in_record.csv` diff --git a/scripts/payload_replay/compare_with_routinator_record.sh b/scripts/payload_replay/compare_with_routinator_record.sh new file mode 100755 index 0000000..ee72e76 --- /dev/null +++ b/scripts/payload_replay/compare_with_routinator_record.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ $# -lt 2 || $# -gt 5 ]]; then + echo "Usage: $0 [summary.md] [only_in_ours.csv] [only_in_record.csv]" >&2 + exit 2 +fi + +OURS_CSV="$1" +RECORD_CSV="$2" +SUMMARY_MD="${3:-}" +ONLY_IN_OURS_CSV="${4:-}" +ONLY_IN_RECORD_CSV="${5:-}" + +if [[ -z "$SUMMARY_MD" ]]; then + SUMMARY_MD="$(dirname "$OURS_CSV")/$(basename "$OURS_CSV" .csv)_vs_routinator_summary.md" +fi +if [[ -z "$ONLY_IN_OURS_CSV" ]]; then + ONLY_IN_OURS_CSV="$(dirname "$OURS_CSV")/$(basename "$OURS_CSV" .csv)_only_in_ours.csv" +fi +if [[ -z "$ONLY_IN_RECORD_CSV" ]]; then + ONLY_IN_RECORD_CSV="$(dirname "$OURS_CSV")/$(basename "$OURS_CSV" .csv)_only_in_record.csv" +fi + +python3 - "$OURS_CSV" "$RECORD_CSV" "$SUMMARY_MD" "$ONLY_IN_OURS_CSV" "$ONLY_IN_RECORD_CSV" <<'PY' +import csv +import ipaddress +import sys +from pathlib import Path + +ours_csv = Path(sys.argv[1]) +record_csv = Path(sys.argv[2]) +summary_md = Path(sys.argv[3]) +only_in_ours_csv = Path(sys.argv[4]) +only_in_record_csv = Path(sys.argv[5]) + + +def normalize_row(row: dict): + asn = row["ASN"].strip().upper() + prefix = row["IP Prefix"].strip() + max_len = str(int(row["Max Length"])) + ta = row["Trust Anchor"].strip() + network = ipaddress.ip_network(prefix, strict=False) + return { + "ASN": asn, + "IP Prefix": str(network), + "Max Length": max_len, + "Trust Anchor": ta, + } + + +def read_rows(path: Path): + with path.open(encoding="utf-8", newline="") as f: + rows = [normalize_row(r) for r in csv.DictReader(f)] + return rows + + +def row_key(row: dict): + network = ipaddress.ip_network(row["IP Prefix"], strict=False) + return ( + row["ASN"], + network.version, + int(network.network_address), + network.prefixlen, + int(row["Max Length"]), + row["Trust Anchor"], + ) + + +def write_rows(path: Path, rows): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=["ASN", "IP Prefix", "Max Length", "Trust Anchor"]) + writer.writeheader() + for row in rows: + writer.writerow(row) + +ours = read_rows(ours_csv) +record = read_rows(record_csv) +ours_map = {row_key(r): r for r in ours} +record_map = {row_key(r): r for r in record} +only_in_ours = [ours_map[k] for k in sorted(set(ours_map) - set(record_map))] +only_in_record = [record_map[k] for k in sorted(set(record_map) - set(ours_map))] +intersection = len(set(ours_map) & set(record_map)) + +write_rows(only_in_ours_csv, only_in_ours) +write_rows(only_in_record_csv, only_in_record) + +summary_md.parent.mkdir(parents=True, exist_ok=True) +summary = [] +summary.append("# Replay vs Routinator VRP Compare\n\n") +summary.append(f"- ours_csv: `{ours_csv}`\n") +summary.append(f"- record_csv: `{record_csv}`\n") +summary.append(f"- only_in_ours_csv: `{only_in_ours_csv}`\n") +summary.append(f"- only_in_record_csv: `{only_in_record_csv}`\n\n") +summary.append("| metric | value |\n") +summary.append("|---|---:|\n") +summary.append(f"| ours_total | {len(ours_map)} |\n") +summary.append(f"| record_total | {len(record_map)} |\n") +summary.append(f"| intersection | {intersection} |\n") +summary.append(f"| only_in_ours | {len(only_in_ours)} |\n") +summary.append(f"| only_in_record | {len(only_in_record)} |\n") +summary_md.write_text("".join(summary), encoding="utf-8") +print(summary_md) +PY + +echo "== compare complete ==" >&2 +echo "- summary: $SUMMARY_MD" >&2 +echo "- only_in_ours: $ONLY_IN_OURS_CSV" >&2 +echo "- only_in_record: $ONLY_IN_RECORD_CSV" >&2 diff --git a/scripts/payload_replay/report_to_routinator_csv.py b/scripts/payload_replay/report_to_routinator_csv.py new file mode 100755 index 0000000..5d58589 --- /dev/null +++ b/scripts/payload_replay/report_to_routinator_csv.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +import argparse +import csv +import ipaddress +import json +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Convert rpki report.json VRPs into Routinator-compatible CSV" + ) + p.add_argument("--report", required=True, help="path to rpki report.json") + p.add_argument("--out", required=True, help="output CSV path") + p.add_argument( + "--trust-anchor", + default="unknown", + help="Trust Anchor column value (default: unknown)", + ) + return p.parse_args() + + +def sort_key(vrp: dict): + network = ipaddress.ip_network(vrp["prefix"], strict=False) + return ( + int(vrp["asn"]), + network.version, + int(network.network_address), + network.prefixlen, + int(vrp["max_length"]), + ) + + +def main() -> int: + args = parse_args() + report = json.loads(Path(args.report).read_text(encoding="utf-8")) + vrps = list(report.get("vrps") or []) + vrps.sort(key=sort_key) + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", encoding="utf-8", newline="") as f: + w = csv.writer(f) + w.writerow(["ASN", "IP Prefix", "Max Length", "Trust Anchor"]) + for vrp in vrps: + w.writerow([ + f"AS{vrp['asn']}", + vrp["prefix"], + vrp["max_length"], + args.trust_anchor, + ]) + print(out_path) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/payload_replay/run_apnic_replay.sh b/scripts/payload_replay/run_apnic_replay.sh new file mode 100755 index 0000000..fa618df --- /dev/null +++ b/scripts/payload_replay/run_apnic_replay.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$ROOT_DIR" + +TAL_PATH="${TAL_PATH:-$ROOT_DIR/tests/fixtures/tal/apnic-rfc7730-https.tal}" +TA_PATH="${TA_PATH:-$ROOT_DIR/tests/fixtures/ta/apnic-ta.cer}" +PAYLOAD_REPLAY_ARCHIVE="${PAYLOAD_REPLAY_ARCHIVE:-$ROOT_DIR/target/live/payload_replay/payload-archive}" +PAYLOAD_REPLAY_LOCKS="${PAYLOAD_REPLAY_LOCKS:-$ROOT_DIR/target/live/payload_replay/locks.json}" +VALIDATION_TIME="${VALIDATION_TIME:-2026-03-13T02:30:00Z}" +TRUST_ANCHOR="${TRUST_ANCHOR:-apnic}" +ROUTINATOR_RECORD_CSV="${ROUTINATOR_RECORD_CSV:-$ROOT_DIR/target/live/payload_replay/record.csv}" +MAX_DEPTH="${MAX_DEPTH:-}" +MAX_INSTANCES="${MAX_INSTANCES:-}" +OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/payload_replay_runs}" +mkdir -p "$OUT_DIR" + +TS="$(date -u +%Y%m%dT%H%M%SZ)" +RUN_NAME="${RUN_NAME:-apnic_replay_${TS}}" +DB_DIR="${DB_DIR:-$OUT_DIR/${RUN_NAME}_db}" +REPORT_JSON="${REPORT_JSON:-$OUT_DIR/${RUN_NAME}_report.json}" +RUN_LOG="${RUN_LOG:-$OUT_DIR/${RUN_NAME}_run.log}" +META_JSON="${META_JSON:-$OUT_DIR/${RUN_NAME}_meta.json}" +SUMMARY_MD="${SUMMARY_MD:-$OUT_DIR/${RUN_NAME}_summary.md}" +VRPS_CSV="${VRPS_CSV:-$OUT_DIR/${RUN_NAME}_vrps.csv}" +COMPARE_SUMMARY_MD="${COMPARE_SUMMARY_MD:-$OUT_DIR/${RUN_NAME}_compare_summary.md}" +ONLY_IN_OURS_CSV="${ONLY_IN_OURS_CSV:-$OUT_DIR/${RUN_NAME}_only_in_ours.csv}" +ONLY_IN_RECORD_CSV="${ONLY_IN_RECORD_CSV:-$OUT_DIR/${RUN_NAME}_only_in_record.csv}" + +cmd=(cargo run --release --bin rpki -- + --db "$DB_DIR" + --tal-path "$TAL_PATH" + --ta-path "$TA_PATH" + --payload-replay-archive "$PAYLOAD_REPLAY_ARCHIVE" + --payload-replay-locks "$PAYLOAD_REPLAY_LOCKS" + --validation-time "$VALIDATION_TIME" + --report-json "$REPORT_JSON") + +if [[ -n "$MAX_DEPTH" ]]; then + cmd+=(--max-depth "$MAX_DEPTH") +fi +if [[ -n "$MAX_INSTANCES" ]]; then + cmd+=(--max-instances "$MAX_INSTANCES") +fi + +run_start_s="$(date +%s)" +( + echo "# command:" + printf '%q ' "${cmd[@]}" + echo + echo + "${cmd[@]}" +) 2>&1 | tee "$RUN_LOG" >/dev/null +run_end_s="$(date +%s)" +run_duration_s="$((run_end_s - run_start_s))" + +TAL_PATH="$TAL_PATH" \ +TA_PATH="$TA_PATH" \ +PAYLOAD_REPLAY_ARCHIVE="$PAYLOAD_REPLAY_ARCHIVE" \ +PAYLOAD_REPLAY_LOCKS="$PAYLOAD_REPLAY_LOCKS" \ +DB_DIR="$DB_DIR" \ +REPORT_JSON="$REPORT_JSON" \ +RUN_LOG="$RUN_LOG" \ +VALIDATION_TIME="$VALIDATION_TIME" \ +RUN_DURATION_S="$run_duration_s" \ +python3 - "$REPORT_JSON" "$META_JSON" "$SUMMARY_MD" <<'PY' +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +report_path = Path(sys.argv[1]) +meta_path = Path(sys.argv[2]) +summary_path = Path(sys.argv[3]) +rep = json.loads(report_path.read_text(encoding="utf-8")) +now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") +meta = { + "recorded_at_utc": now, + "tal_path": os.environ["TAL_PATH"], + "ta_path": os.environ["TA_PATH"], + "payload_replay_archive": os.environ["PAYLOAD_REPLAY_ARCHIVE"], + "payload_replay_locks": os.environ["PAYLOAD_REPLAY_LOCKS"], + "db_dir": os.environ["DB_DIR"], + "report_json": os.environ["REPORT_JSON"], + "run_log": os.environ["RUN_LOG"], + "validation_time_arg": os.environ["VALIDATION_TIME"], + "durations_secs": { + "rpki_run": int(os.environ["RUN_DURATION_S"]), + }, + "counts": { + "publication_points_processed": rep["tree"]["instances_processed"], + "publication_points_failed": rep["tree"]["instances_failed"], + "vrps": len(rep["vrps"]), + "aspas": len(rep["aspas"]), + "audit_publication_points": len(rep["publication_points"]), + }, +} +meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") +summary = [] +summary.append("# Payload Replay Summary\n\n") +summary.append(f"- recorded_at_utc: `{now}`\n") +summary.append(f"- tal_path: `{meta['tal_path']}`\n") +summary.append(f"- ta_path: `{meta['ta_path']}`\n") +summary.append(f"- payload_replay_archive: `{meta['payload_replay_archive']}`\n") +summary.append(f"- payload_replay_locks: `{meta['payload_replay_locks']}`\n") +summary.append(f"- db: `{meta['db_dir']}`\n") +summary.append(f"- report_json: `{meta['report_json']}`\n") +summary.append(f"- validation_time_arg: `{meta['validation_time_arg']}`\n\n") +summary.append("## Results\n\n") +summary.append("| metric | value |\n") +summary.append("|---|---:|\n") +for k, v in meta["counts"].items(): + summary.append(f"| {k} | {v} |\n") +summary.append("\n## Durations\n\n") +summary.append("| step | seconds |\n") +summary.append("|---|---:|\n") +for k, v in meta["durations_secs"].items(): + summary.append(f"| {k} | {v} |\n") +summary_path.write_text("".join(summary), encoding="utf-8") +print(summary_path) +PY + +python3 scripts/payload_replay/report_to_routinator_csv.py \ + --report "$REPORT_JSON" \ + --out "$VRPS_CSV" \ + --trust-anchor "$TRUST_ANCHOR" >/dev/null + +if [[ -f "$ROUTINATOR_RECORD_CSV" ]]; then + ./scripts/payload_replay/compare_with_routinator_record.sh \ + "$VRPS_CSV" \ + "$ROUTINATOR_RECORD_CSV" \ + "$COMPARE_SUMMARY_MD" \ + "$ONLY_IN_OURS_CSV" \ + "$ONLY_IN_RECORD_CSV" >/dev/null +fi + +echo "== payload replay run complete ==" >&2 +echo "- db: $DB_DIR" >&2 +echo "- report: $REPORT_JSON" >&2 +echo "- run log: $RUN_LOG" >&2 +echo "- meta json: $META_JSON" >&2 +echo "- summary md: $SUMMARY_MD" >&2 +echo "- vrps csv: $VRPS_CSV" >&2 +if [[ -f "$COMPARE_SUMMARY_MD" ]]; then + echo "- compare summary: $COMPARE_SUMMARY_MD" >&2 + echo "- only in ours: $ONLY_IN_OURS_CSV" >&2 + echo "- only in record: $ONLY_IN_RECORD_CSV" >&2 +fi diff --git a/src/cli.rs b/src/cli.rs index 7ad1a37..4225530 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -11,7 +11,9 @@ use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher}; use crate::policy::Policy; use crate::storage::RocksStore; use crate::validation::run_tree_from_tal::{ - RunTreeFromTalAuditOutput, run_tree_from_tal_and_ta_der_serial_audit, + RunTreeFromTalAuditOutput, run_tree_from_tal_and_ta_der_payload_replay_serial_audit, + run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, + run_tree_from_tal_and_ta_der_serial_audit, run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing, }; @@ -26,6 +28,8 @@ pub struct CliArgs { pub db_path: PathBuf, pub policy_path: Option, pub report_json_path: Option, + pub payload_replay_archive: Option, + pub payload_replay_locks: Option, pub rsync_local_dir: Option, @@ -53,6 +57,8 @@ Options: --db RocksDB directory path (required) --policy Policy TOML path (optional) --report-json Write full audit report as JSON (optional) + --payload-replay-archive Use local payload replay archive root (offline replay mode) + --payload-replay-locks Use local payload replay locks.json (offline replay mode) --tal-url TAL URL (downloads TAL + TA over HTTPS) --tal-path TAL file path (offline-friendly; requires --ta-path) @@ -81,6 +87,8 @@ pub fn parse_args(argv: &[String]) -> Result { let mut db_path: Option = None; let mut policy_path: Option = None; let mut report_json_path: Option = None; + let mut payload_replay_archive: Option = None; + let mut payload_replay_locks: Option = None; let mut rsync_local_dir: Option = None; let mut http_timeout_secs: u64 = 20; @@ -127,6 +135,20 @@ pub fn parse_args(argv: &[String]) -> Result { let v = argv.get(i).ok_or("--report-json requires a value")?; report_json_path = Some(PathBuf::from(v)); } + "--payload-replay-archive" => { + i += 1; + let v = argv + .get(i) + .ok_or("--payload-replay-archive requires a value")?; + payload_replay_archive = Some(PathBuf::from(v)); + } + "--payload-replay-locks" => { + i += 1; + let v = argv + .get(i) + .ok_or("--payload-replay-locks requires a value")?; + payload_replay_locks = Some(PathBuf::from(v)); + } "--rsync-local-dir" => { i += 1; let v = argv.get(i).ok_or("--rsync-local-dir requires a value")?; @@ -202,6 +224,43 @@ pub fn parse_args(argv: &[String]) -> Result { )); } + let replay_mode_count = + payload_replay_archive.is_some() as u8 + payload_replay_locks.is_some() as u8; + if replay_mode_count == 1 { + return Err(format!( + "--payload-replay-archive and --payload-replay-locks must be provided together + +{}", + usage() + )); + } + if replay_mode_count == 2 { + if tal_url.is_some() { + return Err(format!( + "payload replay mode requires --tal-path and --ta-path; --tal-url is not supported + +{}", + usage() + )); + } + if tal_path.is_none() || ta_path.is_none() { + return Err(format!( + "payload replay mode requires --tal-path and --ta-path + +{}", + usage() + )); + } + if rsync_local_dir.is_some() { + return Err(format!( + "payload replay mode cannot be combined with --rsync-local-dir + +{}", + usage() + )); + } + } + Ok(CliArgs { tal_url, tal_path, @@ -209,6 +268,8 @@ pub fn parse_args(argv: &[String]) -> Result { db_path, policy_path, report_json_path, + payload_replay_archive, + payload_replay_locks, rsync_local_dir, http_timeout_secs, rsync_timeout_secs, @@ -340,16 +401,11 @@ pub fn run(argv: &[String]) -> Result<(), String> { .unwrap_or_else(time::OffsetDateTime::now_utc); let store = RocksStore::open(&args.db_path).map_err(|e| e.to_string())?; - let http = BlockingHttpFetcher::new(HttpFetcherConfig { - timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)), - ..HttpFetcherConfig::default() - }) - .map_err(|e| e.to_string())?; - let config = TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, }; + let replay_mode = args.payload_replay_archive.is_some(); use time::format_description::well_known::Rfc3339; let mut timing: Option<(std::path::PathBuf, TimingHandle)> = None; @@ -411,7 +467,61 @@ pub fn run(argv: &[String]) -> Result<(), String> { None }; - let out = if let Some(dir) = args.rsync_local_dir.as_ref() { + let out = if replay_mode { + let tal_path = args + .tal_path + .as_ref() + .expect("validated by parse_args for replay mode"); + let ta_path = args + .ta_path + .as_ref() + .expect("validated by parse_args for replay mode"); + let archive_root = args + .payload_replay_archive + .as_ref() + .expect("validated by parse_args for replay mode"); + let locks_path = args + .payload_replay_locks + .as_ref() + .expect("validated by parse_args for replay mode"); + let tal_bytes = std::fs::read(tal_path) + .map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?; + let ta_der = std::fs::read(ta_path) + .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; + if let Some((_, t)) = timing.as_ref() { + run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing( + &store, + &policy, + &tal_bytes, + &ta_der, + None, + archive_root, + locks_path, + validation_time, + &config, + t, + ) + .map_err(|e| e.to_string())? + } else { + run_tree_from_tal_and_ta_der_payload_replay_serial_audit( + &store, + &policy, + &tal_bytes, + &ta_der, + None, + archive_root, + locks_path, + validation_time, + &config, + ) + .map_err(|e| e.to_string())? + } + } else if let Some(dir) = args.rsync_local_dir.as_ref() { + let http = BlockingHttpFetcher::new(HttpFetcherConfig { + timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)), + ..HttpFetcherConfig::default() + }) + .map_err(|e| e.to_string())?; let rsync = LocalDirRsyncFetcher::new(dir); match ( args.tal_url.as_ref(), @@ -481,6 +591,11 @@ pub fn run(argv: &[String]) -> Result<(), String> { _ => unreachable!("validated by parse_args"), } } else { + let http = BlockingHttpFetcher::new(HttpFetcherConfig { + timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)), + ..HttpFetcherConfig::default() + }) + .map_err(|e| e.to_string())?; let rsync = SystemRsyncFetcher::new(SystemRsyncConfig { timeout: std::time::Duration::from_secs(args.rsync_timeout_secs.max(1)), mirror_root: args.rsync_mirror_root.clone(), @@ -781,6 +896,87 @@ mod tests { assert_eq!(args.max_depth, Some(0)); } + #[test] + fn parse_accepts_payload_replay_mode_with_offline_tal_and_ta() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "a.tal".to_string(), + "--ta-path".to_string(), + "ta.cer".to_string(), + "--payload-replay-archive".to_string(), + "archive".to_string(), + "--payload-replay-locks".to_string(), + "locks.json".to_string(), + ]; + let args = parse_args(&argv).expect("parse replay mode"); + assert_eq!( + args.payload_replay_archive.as_deref(), + Some(Path::new("archive")) + ); + assert_eq!( + args.payload_replay_locks.as_deref(), + Some(Path::new("locks.json")) + ); + } + + #[test] + fn parse_rejects_partial_payload_replay_arguments() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "a.tal".to_string(), + "--ta-path".to_string(), + "ta.cer".to_string(), + "--payload-replay-archive".to_string(), + "archive".to_string(), + ]; + let err = parse_args(&argv).unwrap_err(); + assert!(err.contains("must be provided together"), "{err}"); + } + + #[test] + fn parse_rejects_payload_replay_with_tal_url_or_rsync_local_dir() { + let argv_url = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--payload-replay-archive".to_string(), + "archive".to_string(), + "--payload-replay-locks".to_string(), + "locks.json".to_string(), + ]; + let err = parse_args(&argv_url).unwrap_err(); + assert!(err.contains("--tal-url is not supported"), "{err}"); + + let argv_rsync = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-path".to_string(), + "a.tal".to_string(), + "--ta-path".to_string(), + "ta.cer".to_string(), + "--payload-replay-archive".to_string(), + "archive".to_string(), + "--payload-replay-locks".to_string(), + "locks.json".to_string(), + "--rsync-local-dir".to_string(), + "repo".to_string(), + ]; + let err = parse_args(&argv_rsync).unwrap_err(); + assert!( + err.contains("cannot be combined with --rsync-local-dir"), + "{err}" + ); + } + #[test] fn parse_accepts_validation_time_rfc3339() { let argv = vec![ diff --git a/src/lib.rs b/src/lib.rs index 44238fb..916d511 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,8 @@ pub mod fetch; #[cfg(feature = "full")] pub mod policy; #[cfg(feature = "full")] +pub mod replay; +#[cfg(feature = "full")] pub mod report; #[cfg(feature = "full")] pub mod storage; diff --git a/src/replay/archive.rs b/src/replay/archive.rs new file mode 100644 index 0000000..bcac336 --- /dev/null +++ b/src/replay/archive.rs @@ -0,0 +1,909 @@ +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; + +use serde::Deserialize; +use sha2::Digest; + +#[derive(Debug, thiserror::Error)] +pub enum ReplayArchiveError { + #[error("read {entity} failed: {path}: {detail}")] + ReadFile { + entity: &'static str, + path: String, + detail: String, + }, + + #[error("parse {entity} JSON failed: {path}: {detail}")] + ParseJson { + entity: &'static str, + path: String, + detail: String, + }, + + #[error("unsupported {entity} version: expected 1, got {version}")] + UnsupportedVersion { entity: &'static str, version: u32 }, + + #[error("capture directory not found: {0}")] + MissingCaptureDirectory(String), + + #[error("capture.json captureId mismatch: locks={locks_capture}, capture={capture_json}")] + CaptureIdMismatch { + locks_capture: String, + capture_json: String, + }, + + #[error("RRDP lock entry invalid for {notify_uri}: {detail}")] + InvalidRrdpLock { notify_uri: String, detail: String }, + + #[error("RRDP repo bucket not found for {notify_uri}: {path}")] + MissingRrdpRepoBucket { notify_uri: String, path: String }, + + #[error("RRDP repo meta rpkiNotify mismatch: expected {expected}, actual {actual}")] + RrdpMetaMismatch { expected: String, actual: String }, + + #[error("RRDP session directory not found for {notify_uri}: {path}")] + MissingRrdpSessionDir { notify_uri: String, path: String }, + + #[error("locked notification file not found for {notify_uri}: {path}")] + MissingLockedNotification { notify_uri: String, path: String }, + + #[error("locked snapshot file not found for {notify_uri} at serial {serial} in {session_dir}")] + MissingLockedSnapshot { + notify_uri: String, + serial: u64, + session_dir: String, + }, + + #[error( + "multiple locked snapshot files found for {notify_uri} at serial {serial} in {session_dir}" + )] + AmbiguousLockedSnapshot { + notify_uri: String, + serial: u64, + session_dir: String, + }, + + #[error("rsync URI is not a valid module/base URI: {uri}: {detail}")] + InvalidRsyncUri { uri: String, detail: String }, + + #[error("rsync module bucket not found for {module_uri}: {path}")] + MissingRsyncModuleBucket { module_uri: String, path: String }, + + #[error("rsync module meta mismatch: expected {expected}, actual {actual}")] + RsyncMetaMismatch { expected: String, actual: String }, + + #[error("rsync module tree not found for {module_uri}: {path}")] + MissingRsyncTree { module_uri: String, path: String }, + + #[error("no replay lock found for RRDP notification URI: {0}")] + MissingRrdpLock(String), + + #[error("no replay lock found for rsync module: {0}")] + MissingRsyncLock(String), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ReplayTransport { + Rrdp, + Rsync, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayRrdpLock { + pub transport: ReplayTransport, + pub session: Option, + pub serial: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayRsyncLock { + pub transport: ReplayTransport, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayLocks { + pub version: u32, + pub capture: String, + pub rrdp: BTreeMap, + pub rsync: BTreeMap, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayCaptureMeta { + pub version: u32, + #[serde(rename = "captureId")] + pub capture_id: String, + #[serde(rename = "createdAt")] + pub created_at: String, + pub notes: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayRrdpRepoMeta { + pub version: u32, + #[serde(rename = "rpkiNotify")] + pub rpki_notify: String, + #[serde(rename = "createdAt")] + pub created_at: String, + #[serde(rename = "lastSeenAt")] + pub last_seen_at: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub struct ReplayRsyncModuleMeta { + pub version: u32, + pub module: String, + #[serde(rename = "createdAt")] + pub created_at: String, + #[serde(rename = "lastSeenAt")] + pub last_seen_at: String, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReplayRrdpRepo { + pub notify_uri: String, + pub bucket_hash: String, + pub bucket_dir: PathBuf, + pub meta: ReplayRrdpRepoMeta, + pub locked_session: String, + pub locked_serial: u64, + pub session_dir: PathBuf, + pub locked_notification_path: PathBuf, + pub locked_snapshot_path: PathBuf, + pub available_delta_paths: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReplayRsyncModule { + pub module_uri: String, + pub bucket_hash: String, + pub bucket_dir: PathBuf, + pub meta: ReplayRsyncModuleMeta, + pub tree_dir: PathBuf, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReplayArchiveIndex { + pub archive_root: PathBuf, + pub capture_root: PathBuf, + pub locks_path: PathBuf, + pub locks: ReplayLocks, + pub capture_meta: ReplayCaptureMeta, + pub rrdp_repos: BTreeMap, + pub rsync_modules: BTreeMap, +} + +impl ReplayArchiveIndex { + pub fn load( + archive_root: impl AsRef, + locks_path: impl AsRef, + ) -> Result { + let archive_root = archive_root.as_ref().to_path_buf(); + let locks_path = locks_path.as_ref().to_path_buf(); + + let locks: ReplayLocks = read_json_file(&locks_path, "payload replay locks")?; + ensure_version("payload replay locks", locks.version)?; + + let capture_root = archive_root + .join("v1") + .join("captures") + .join(&locks.capture); + if !capture_root.is_dir() { + return Err(ReplayArchiveError::MissingCaptureDirectory( + capture_root.display().to_string(), + )); + } + + let capture_meta_path = capture_root.join("capture.json"); + let capture_meta: ReplayCaptureMeta = read_json_file(&capture_meta_path, "capture meta")?; + ensure_version("capture meta", capture_meta.version)?; + if capture_meta.capture_id != locks.capture { + return Err(ReplayArchiveError::CaptureIdMismatch { + locks_capture: locks.capture.clone(), + capture_json: capture_meta.capture_id.clone(), + }); + } + + let mut rrdp_repos = BTreeMap::new(); + for (notify_uri, lock) in &locks.rrdp { + match lock.transport { + ReplayTransport::Rrdp => { + let repo = load_rrdp_repo(&capture_root, notify_uri, lock)?; + rrdp_repos.insert(notify_uri.clone(), repo); + } + ReplayTransport::Rsync => { + validate_rsync_transport_lock(notify_uri, lock)?; + } + } + } + + let mut rsync_modules = BTreeMap::new(); + for (module_uri, lock) in &locks.rsync { + if lock.transport != ReplayTransport::Rsync { + return Err(ReplayArchiveError::InvalidRrdpLock { + notify_uri: module_uri.clone(), + detail: "rsync lock transport must be rsync".to_string(), + }); + } + let module = load_rsync_module(&capture_root, module_uri)?; + rsync_modules.insert(module.module_uri.clone(), module); + } + + Ok(Self { + archive_root, + capture_root, + locks_path, + locks, + capture_meta, + rrdp_repos, + rsync_modules, + }) + } + + pub fn rrdp_lock(&self, notify_uri: &str) -> Option<&ReplayRrdpLock> { + self.locks.rrdp.get(notify_uri) + } + + pub fn rrdp_repo(&self, notify_uri: &str) -> Option<&ReplayRrdpRepo> { + self.rrdp_repos.get(notify_uri) + } + + pub fn require_rrdp_repo( + &self, + notify_uri: &str, + ) -> Result<&ReplayRrdpRepo, ReplayArchiveError> { + self.rrdp_repos + .get(notify_uri) + .ok_or_else(|| ReplayArchiveError::MissingRrdpLock(notify_uri.to_string())) + } + + pub fn rsync_module(&self, module_uri: &str) -> Option<&ReplayRsyncModule> { + self.rsync_modules.get(module_uri) + } + + pub fn resolve_rsync_module_for_base_uri( + &self, + rsync_base_uri: &str, + ) -> Result<&ReplayRsyncModule, ReplayArchiveError> { + let module_uri = canonical_rsync_module(rsync_base_uri)?; + self.rsync_modules + .get(&module_uri) + .ok_or(ReplayArchiveError::MissingRsyncLock(module_uri)) + } +} + +fn load_rrdp_repo( + capture_root: &Path, + notify_uri: &str, + lock: &ReplayRrdpLock, +) -> Result { + let session = lock + .session + .as_deref() + .ok_or_else(|| ReplayArchiveError::InvalidRrdpLock { + notify_uri: notify_uri.to_string(), + detail: "transport=rrdp requires non-null session".to_string(), + })?; + let serial = lock + .serial + .ok_or_else(|| ReplayArchiveError::InvalidRrdpLock { + notify_uri: notify_uri.to_string(), + detail: "transport=rrdp requires non-null serial".to_string(), + })?; + + let bucket_hash = sha256_hex(notify_uri.as_bytes()); + let bucket_dir = capture_root.join("rrdp").join("repos").join(&bucket_hash); + if !bucket_dir.is_dir() { + return Err(ReplayArchiveError::MissingRrdpRepoBucket { + notify_uri: notify_uri.to_string(), + path: bucket_dir.display().to_string(), + }); + } + + let meta_path = bucket_dir.join("meta.json"); + let meta: ReplayRrdpRepoMeta = read_json_file(&meta_path, "RRDP repo meta")?; + ensure_version("RRDP repo meta", meta.version)?; + if meta.rpki_notify != notify_uri { + return Err(ReplayArchiveError::RrdpMetaMismatch { + expected: notify_uri.to_string(), + actual: meta.rpki_notify.clone(), + }); + } + + let session_dir = bucket_dir.join(session); + if !session_dir.is_dir() { + return Err(ReplayArchiveError::MissingRrdpSessionDir { + notify_uri: notify_uri.to_string(), + path: session_dir.display().to_string(), + }); + } + + let locked_notification_path = session_dir.join(format!("notification-{serial}.xml")); + if !locked_notification_path.is_file() { + return Err(ReplayArchiveError::MissingLockedNotification { + notify_uri: notify_uri.to_string(), + path: locked_notification_path.display().to_string(), + }); + } + + let mut snapshot_candidates: Vec = fs::read_dir(&session_dir) + .map_err(|e| ReplayArchiveError::ReadFile { + entity: "RRDP session directory", + path: session_dir.display().to_string(), + detail: e.to_string(), + })? + .filter_map(|entry| entry.ok().map(|e| e.path())) + .filter(|path| path.is_file()) + .filter(|path| { + path.file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| { + name.starts_with(&format!("snapshot-{serial}-")) && name.ends_with(".xml") + }) + }) + .collect(); + snapshot_candidates.sort(); + let locked_snapshot_path = match snapshot_candidates.len() { + 0 => { + return Err(ReplayArchiveError::MissingLockedSnapshot { + notify_uri: notify_uri.to_string(), + serial, + session_dir: session_dir.display().to_string(), + }); + } + 1 => snapshot_candidates.remove(0), + _ => { + return Err(ReplayArchiveError::AmbiguousLockedSnapshot { + notify_uri: notify_uri.to_string(), + serial, + session_dir: session_dir.display().to_string(), + }); + } + }; + + let mut available_delta_paths: Vec = fs::read_dir(&session_dir) + .map_err(|e| ReplayArchiveError::ReadFile { + entity: "RRDP session directory", + path: session_dir.display().to_string(), + detail: e.to_string(), + })? + .filter_map(|entry| entry.ok().map(|e| e.path())) + .filter(|path| path.is_file()) + .filter(|path| { + path.file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.starts_with("delta-") && name.ends_with(".xml")) + }) + .collect(); + available_delta_paths.sort(); + + Ok(ReplayRrdpRepo { + notify_uri: notify_uri.to_string(), + bucket_hash, + bucket_dir, + meta, + locked_session: session.to_string(), + locked_serial: serial, + session_dir, + locked_notification_path, + locked_snapshot_path, + available_delta_paths, + }) +} + +fn validate_rsync_transport_lock( + notify_uri: &str, + lock: &ReplayRrdpLock, +) -> Result<(), ReplayArchiveError> { + if lock.session.is_some() || lock.serial.is_some() { + return Err(ReplayArchiveError::InvalidRrdpLock { + notify_uri: notify_uri.to_string(), + detail: "transport=rsync requires null session and serial".to_string(), + }); + } + Ok(()) +} + +fn load_rsync_module( + capture_root: &Path, + module_uri: &str, +) -> Result { + let canonical_module = canonical_rsync_module(module_uri)?; + let bucket_hash = sha256_hex(canonical_module.as_bytes()); + let bucket_dir = capture_root + .join("rsync") + .join("modules") + .join(&bucket_hash); + if !bucket_dir.is_dir() { + return Err(ReplayArchiveError::MissingRsyncModuleBucket { + module_uri: canonical_module.clone(), + path: bucket_dir.display().to_string(), + }); + } + + let meta_path = bucket_dir.join("meta.json"); + let meta: ReplayRsyncModuleMeta = read_json_file(&meta_path, "rsync module meta")?; + ensure_version("rsync module meta", meta.version)?; + if meta.module != canonical_module { + return Err(ReplayArchiveError::RsyncMetaMismatch { + expected: canonical_module.clone(), + actual: meta.module.clone(), + }); + } + + let tree_dir = bucket_dir.join("tree"); + if !tree_dir.is_dir() { + return Err(ReplayArchiveError::MissingRsyncTree { + module_uri: canonical_module.clone(), + path: tree_dir.display().to_string(), + }); + } + + Ok(ReplayRsyncModule { + module_uri: canonical_module, + bucket_hash, + bucket_dir, + meta, + tree_dir, + }) +} + +pub fn canonical_rsync_module(rsync_uri: &str) -> Result { + let raw = rsync_uri.trim(); + let prefix = "rsync://"; + let rest = raw + .strip_prefix(prefix) + .ok_or_else(|| ReplayArchiveError::InvalidRsyncUri { + uri: rsync_uri.to_string(), + detail: "URI must start with rsync://".to_string(), + })?; + + let mut parts = rest.split('/'); + let authority = parts.next().unwrap_or_default(); + let module = parts.next().unwrap_or_default(); + if authority.is_empty() || module.is_empty() { + return Err(ReplayArchiveError::InvalidRsyncUri { + uri: rsync_uri.to_string(), + detail: "URI must contain authority and module".to_string(), + }); + } + + Ok(format!("rsync://{authority}/{module}/")) +} + +pub fn sha256_hex(bytes: &[u8]) -> String { + let digest = sha2::Sha256::digest(bytes); + hex::encode(digest) +} + +fn ensure_version(entity: &'static str, version: u32) -> Result<(), ReplayArchiveError> { + if version == 1 { + Ok(()) + } else { + Err(ReplayArchiveError::UnsupportedVersion { entity, version }) + } +} + +fn read_json_file Deserialize<'de>>( + path: &Path, + entity: &'static str, +) -> Result { + let bytes = fs::read(path).map_err(|e| ReplayArchiveError::ReadFile { + entity, + path: path.display().to_string(), + detail: e.to_string(), + })?; + serde_json::from_slice(&bytes).map_err(|e| ReplayArchiveError::ParseJson { + entity, + path: path.display().to_string(), + detail: e.to_string(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn build_minimal_archive() -> (tempfile::TempDir, PathBuf, PathBuf, String, String) { + let temp = tempfile::tempdir().expect("tempdir"); + let archive_root = temp.path().join("payload-archive"); + let capture = "capture-001"; + let capture_root = archive_root.join("v1").join("captures").join(capture); + std::fs::create_dir_all(&capture_root).expect("mkdir capture root"); + std::fs::write( + capture_root.join("capture.json"), + format!( + r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-13T00:00:00Z","notes":""}}"# + ), + ) + .expect("write capture json"); + + let notify_uri = "https://rrdp.example.test/notification.xml".to_string(); + let session = "11111111-1111-1111-1111-111111111111".to_string(); + let serial = 42u64; + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let repo_dir = capture_root.join("rrdp").join("repos").join(&repo_hash); + let session_dir = repo_dir.join(&session); + std::fs::create_dir_all(&session_dir).expect("mkdir session dir"); + std::fs::write( + repo_dir.join("meta.json"), + format!( + r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write repo meta"); + std::fs::write(session_dir.join("notification-42.xml"), b"") + .expect("write notification"); + std::fs::write(session_dir.join("snapshot-42-deadbeef.xml"), b"") + .expect("write snapshot"); + + let module_uri = "rsync://rsync.example.test/repo/".to_string(); + let mod_hash = sha256_hex(module_uri.as_bytes()); + let mod_dir = capture_root.join("rsync").join("modules").join(&mod_hash); + let tree_dir = mod_dir.join("tree").join("rsync.example.test").join("repo"); + std::fs::create_dir_all(&tree_dir).expect("mkdir tree dir"); + std::fs::write( + mod_dir.join("meta.json"), + format!( + r#"{{"version":1,"module":"{module_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write rsync meta"); + std::fs::write(tree_dir.join("a.roa"), b"roa").expect("write rsync object"); + + let locks_path = temp.path().join("locks.json"); + std::fs::write( + &locks_path, + format!( + r#"{{ + "version":1, + "capture":"{capture}", + "rrdp":{{ + "{notify_uri}":{{"transport":"rrdp","session":"{session}","serial":{serial}}}, + "https://rrdp-fallback.example.test/notification.xml":{{"transport":"rsync","session":null,"serial":null}} + }}, + "rsync":{{ + "{module_uri}":{{"transport":"rsync"}} + }} +}}"# + ), + ) + .expect("write locks"); + + (temp, archive_root, locks_path, notify_uri, module_uri) + } + + #[test] + fn canonical_rsync_module_normalizes_subpaths_and_trailing_slash() { + let normalized = canonical_rsync_module("rsync://example.test/repo/sub/dir/file.roa") + .expect("normalize module"); + assert_eq!(normalized, "rsync://example.test/repo/"); + + let normalized = canonical_rsync_module("rsync://example.test/repo") + .expect("normalize module without slash"); + assert_eq!(normalized, "rsync://example.test/repo/"); + } + + #[test] + fn replay_archive_index_loads_rrdp_and_rsync_entries() { + let (_temp, archive_root, locks_path, notify_uri, module_uri) = build_minimal_archive(); + let index = + ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index"); + + assert_eq!(index.capture_meta.capture_id, "capture-001"); + assert_eq!(index.rrdp_repos.len(), 1); + assert_eq!(index.locks.rrdp.len(), 2); + assert_eq!(index.rsync_modules.len(), 1); + + let repo = index.require_rrdp_repo(¬ify_uri).expect("rrdp repo"); + assert_eq!(repo.locked_serial, 42); + assert!( + repo.locked_notification_path + .ends_with("notification-42.xml") + ); + assert!( + repo.locked_snapshot_path + .ends_with("snapshot-42-deadbeef.xml") + ); + assert!(repo.available_delta_paths.is_empty()); + + let module = index + .resolve_rsync_module_for_base_uri("rsync://rsync.example.test/repo/sub/path") + .expect("resolve module from base"); + assert_eq!(module.module_uri, module_uri); + assert!( + module + .tree_dir + .join("rsync.example.test") + .join("repo") + .join("a.roa") + .is_file() + ); + } + + #[test] + fn replay_archive_index_rejects_rsync_transport_with_session_or_serial() { + let (_temp, archive_root, locks_path, _notify_uri, _module_uri) = build_minimal_archive(); + let bad = locks_path.with_file_name("bad-locks.json"); + std::fs::write( + &bad, + r#"{ + "version":1, + "capture":"capture-001", + "rrdp":{ + "https://rrdp-fallback.example.test/notification.xml":{"transport":"rsync","session":"oops","serial":1} + }, + "rsync":{} +}"#, + ) + .expect("write bad locks"); + + let err = ReplayArchiveIndex::load(&archive_root, &bad).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::InvalidRrdpLock { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_mismatched_rrdp_meta() { + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let meta_path = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(repo_hash) + .join("meta.json"); + std::fs::write( + &meta_path, + r#"{"version":1,"rpkiNotify":"https://other.example/notification.xml","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}"#, + ) + .expect("rewrite meta"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::RrdpMetaMismatch { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_missing_snapshot() { + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(repo_hash) + .join("11111111-1111-1111-1111-111111111111"); + std::fs::remove_file(session_dir.join("snapshot-42-deadbeef.xml")) + .expect("remove snapshot"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingLockedSnapshot { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_ambiguous_snapshot_candidates() { + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(repo_hash) + .join("11111111-1111-1111-1111-111111111111"); + std::fs::write(session_dir.join("snapshot-42-another.xml"), b"") + .expect("write second snapshot"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::AmbiguousLockedSnapshot { .. }), + "{err}" + ); + } + + #[test] + fn canonical_rsync_module_rejects_invalid_uris() { + let err = canonical_rsync_module("https://example.test/repo/").unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::InvalidRsyncUri { .. }), + "{err}" + ); + + let err = canonical_rsync_module("rsync://example.test").unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::InvalidRsyncUri { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_missing_capture_directory() { + let temp = tempfile::tempdir().expect("tempdir"); + let archive_root = temp.path().join("payload-archive"); + std::fs::create_dir_all(&archive_root).expect("mkdir archive root"); + let locks_path = temp.path().join("locks.json"); + std::fs::write( + &locks_path, + r#"{"version":1,"capture":"missing","rrdp":{},"rsync":{}}"#, + ) + .expect("write locks"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingCaptureDirectory(_)), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_missing_rrdp_bucket_and_session_and_notification() { + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let repo_dir = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(&repo_hash); + + std::fs::remove_dir_all(&repo_dir).expect("remove repo dir"); + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRrdpRepoBucket { .. }), + "{err}" + ); + + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(&repo_hash) + .join("11111111-1111-1111-1111-111111111111"); + std::fs::remove_dir_all(&session_dir).expect("remove session dir"); + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRrdpSessionDir { .. }), + "{err}" + ); + + let (_temp, archive_root, locks_path, notify_uri, _module_uri) = build_minimal_archive(); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = archive_root + .join("v1/captures/capture-001/rrdp/repos") + .join(&repo_hash) + .join("11111111-1111-1111-1111-111111111111"); + std::fs::remove_file(session_dir.join("notification-42.xml")).expect("remove notification"); + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingLockedNotification { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_invalid_json_and_missing_rsync_bucket() { + let (_temp, archive_root, locks_path, _notify_uri, _module_uri) = build_minimal_archive(); + std::fs::write(&locks_path, b"not json").expect("corrupt locks"); + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!( + err, + ReplayArchiveError::ParseJson { + entity: "payload replay locks", + .. + } + ), + "{err}" + ); + + let (_temp, archive_root, locks_path, _notify_uri, module_uri) = build_minimal_archive(); + let mod_hash = sha256_hex(module_uri.as_bytes()); + let mod_dir = archive_root + .join("v1/captures/capture-001/rsync/modules") + .join(&mod_hash); + std::fs::remove_dir_all(&mod_dir).expect("remove module dir"); + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRsyncModuleBucket { .. }), + "{err}" + ); + } + #[test] + fn replay_archive_index_rejects_capture_id_mismatch() { + let (_temp, archive_root, locks_path, _notify_uri, _module_uri) = build_minimal_archive(); + let capture_json = archive_root.join("v1/captures/capture-001/capture.json"); + std::fs::write( + &capture_json, + r#"{"version":1,"captureId":"other-capture","createdAt":"2026-03-13T00:00:00Z","notes":""}"#, + ) + .expect("rewrite capture json"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::CaptureIdMismatch { .. }), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_unsupported_locks_version() { + let (_temp, archive_root, locks_path, _notify_uri, _module_uri) = build_minimal_archive(); + std::fs::write( + &locks_path, + r#"{"version":2,"capture":"capture-001","rrdp":{},"rsync":{}}"#, + ) + .expect("rewrite locks version"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!( + err, + ReplayArchiveError::UnsupportedVersion { + entity: "payload replay locks", + version: 2 + } + ), + "{err}" + ); + } + + #[test] + fn replay_archive_index_reports_missing_rrdp_lock_and_missing_rsync_lock() { + let (_temp, archive_root, locks_path, _notify_uri, _module_uri) = build_minimal_archive(); + let index = + ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index"); + + let err = index + .require_rrdp_repo("https://missing.example/notification.xml") + .unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRrdpLock(_)), + "{err}" + ); + + let err = index + .resolve_rsync_module_for_base_uri("rsync://missing.example/repo/path") + .unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRsyncLock(_)), + "{err}" + ); + } + + #[test] + fn replay_archive_index_rejects_rsync_meta_mismatch() { + let (_temp, archive_root, locks_path, _notify_uri, module_uri) = build_minimal_archive(); + let mod_hash = sha256_hex(module_uri.as_bytes()); + let meta_path = archive_root + .join("v1/captures/capture-001/rsync/modules") + .join(mod_hash) + .join("meta.json"); + std::fs::write( + &meta_path, + r#"{"version":1,"module":"rsync://other.example/repo/","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}"#, + ) + .expect("rewrite rsync meta"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::RsyncMetaMismatch { .. }), + "{err}" + ); + } + #[test] + fn replay_archive_index_rejects_missing_rsync_module_tree() { + let (_temp, archive_root, locks_path, _notify_uri, module_uri) = build_minimal_archive(); + let mod_hash = sha256_hex(module_uri.as_bytes()); + let tree_dir = archive_root + .join("v1/captures/capture-001/rsync/modules") + .join(mod_hash) + .join("tree"); + std::fs::remove_dir_all(&tree_dir).expect("remove tree dir"); + + let err = ReplayArchiveIndex::load(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayArchiveError::MissingRsyncTree { .. }), + "{err}" + ); + } +} diff --git a/src/replay/fetch_http.rs b/src/replay/fetch_http.rs new file mode 100644 index 0000000..2ed6061 --- /dev/null +++ b/src/replay/fetch_http.rs @@ -0,0 +1,399 @@ +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use crate::replay::archive::{ReplayArchiveError, ReplayArchiveIndex, ReplayTransport}; +use crate::sync::rrdp::{Fetcher, parse_notification}; + +#[derive(Debug, thiserror::Error)] +pub enum ReplayHttpFetcherError { + #[error(transparent)] + Archive(#[from] ReplayArchiveError), + + #[error("read replay RRDP file failed: {path}: {detail}")] + ReadFile { path: String, detail: String }, + + #[error("parse locked notification failed for {notify_uri}: {detail}")] + ParseNotification { notify_uri: String, detail: String }, + + #[error( + "locked notification session/serial mismatch for {notify_uri}: expected session={expected_session} serial={expected_serial}, got session={actual_session} serial={actual_serial}" + )] + NotificationLockMismatch { + notify_uri: String, + expected_session: String, + expected_serial: u64, + actual_session: String, + actual_serial: u64, + }, + + #[error("locked delta file not found for {notify_uri} at serial {serial}: {path}")] + MissingLockedDelta { + notify_uri: String, + serial: u64, + path: String, + }, + + #[error("duplicate replay HTTP URI mapping for {uri}: {first_path} vs {second_path}")] + DuplicateUriMapping { + uri: String, + first_path: String, + second_path: String, + }, + + #[error("RRDP notification URI is locked to rsync transport in replay: {0}")] + LockedToRsync(String), + + #[error("replay HTTP URI not found in archive: {0}")] + MissingUri(String), +} + +#[derive(Clone, Debug)] +pub struct PayloadReplayHttpFetcher { + index: Arc, + routes: BTreeMap, +} + +impl PayloadReplayHttpFetcher { + pub fn new(index: Arc) -> Result { + let mut routes = BTreeMap::new(); + for (notify_uri, lock) in &index.locks.rrdp { + if lock.transport != ReplayTransport::Rrdp { + continue; + } + let repo = index.require_rrdp_repo(notify_uri)?; + insert_unique_route(&mut routes, notify_uri, &repo.locked_notification_path)?; + + let notification_xml = fs::read(&repo.locked_notification_path).map_err(|e| { + ReplayHttpFetcherError::ReadFile { + path: repo.locked_notification_path.display().to_string(), + detail: e.to_string(), + } + })?; + let notification = parse_notification(¬ification_xml).map_err(|e| { + ReplayHttpFetcherError::ParseNotification { + notify_uri: notify_uri.clone(), + detail: e.to_string(), + } + })?; + let actual_session = notification.session_id.to_string(); + if actual_session != repo.locked_session || notification.serial != repo.locked_serial { + return Err(ReplayHttpFetcherError::NotificationLockMismatch { + notify_uri: notify_uri.clone(), + expected_session: repo.locked_session.clone(), + expected_serial: repo.locked_serial, + actual_session, + actual_serial: notification.serial, + }); + } + + insert_unique_route( + &mut routes, + ¬ification.snapshot_uri, + &repo.locked_snapshot_path, + )?; + + for delta in notification.deltas { + let expected = repo.session_dir.join(format!( + "delta-{}-{}.xml", + delta.serial, + hex::encode(delta.hash_sha256) + )); + if expected.is_file() { + insert_unique_route(&mut routes, &delta.uri, &expected)?; + } + } + } + Ok(Self { index, routes }) + } + + pub fn from_paths( + archive_root: impl AsRef, + locks_path: impl AsRef, + ) -> Result { + let index = Arc::new(ReplayArchiveIndex::load(archive_root, locks_path)?); + Self::new(index) + } + + pub fn archive_index(&self) -> &ReplayArchiveIndex { + self.index.as_ref() + } + + pub fn mapped_route(&self, uri: &str) -> Option<&Path> { + self.routes.get(uri).map(PathBuf::as_path) + } +} + +impl Fetcher for PayloadReplayHttpFetcher { + fn fetch(&self, uri: &str) -> Result, String> { + if let Some(path) = self.routes.get(uri) { + return fs::read(path).map_err(|e| { + ReplayHttpFetcherError::ReadFile { + path: path.display().to_string(), + detail: e.to_string(), + } + .to_string() + }); + } + + if self + .index + .rrdp_lock(uri) + .is_some_and(|lock| lock.transport == ReplayTransport::Rsync) + { + return Err(ReplayHttpFetcherError::LockedToRsync(uri.to_string()).to_string()); + } + + Err(ReplayHttpFetcherError::MissingUri(uri.to_string()).to_string()) + } +} + +fn insert_unique_route( + routes: &mut BTreeMap, + uri: &str, + path: &Path, +) -> Result<(), ReplayHttpFetcherError> { + if let Some(existing) = routes.get(uri) { + if existing != path { + return Err(ReplayHttpFetcherError::DuplicateUriMapping { + uri: uri.to_string(), + first_path: existing.display().to_string(), + second_path: path.display().to_string(), + }); + } + return Ok(()); + } + routes.insert(uri.to_string(), path.to_path_buf()); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::replay::archive::sha256_hex; + + fn build_archive_with_rrdp_and_rsync( + with_delta: bool, + ) -> ( + tempfile::TempDir, + PathBuf, + PathBuf, + String, + String, + String, + Option, + ) { + let temp = tempfile::tempdir().expect("tempdir"); + let archive_root = temp.path().join("payload-archive"); + let capture = "capture-http"; + let capture_root = archive_root.join("v1").join("captures").join(capture); + std::fs::create_dir_all(&capture_root).expect("mkdir capture root"); + std::fs::write( + capture_root.join("capture.json"), + format!( + r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-13T00:00:00Z","notes":""}}"# + ), + ) + .expect("write capture meta"); + + let notify_uri = "https://rrdp.example.test/notification.xml".to_string(); + let snapshot_uri = "https://rrdp.example.test/snapshot.xml".to_string(); + let delta_uri = "https://rrdp.example.test/delta.xml".to_string(); + let rsync_locked_notify = "https://rrdp-fallback.example.test/notification.xml".to_string(); + let session = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(); + let serial = 99u64; + let bucket_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = capture_root + .join("rrdp/repos") + .join(bucket_hash) + .join(&session); + std::fs::create_dir_all(&session_dir).expect("mkdir session dir"); + std::fs::write( + session_dir.parent().unwrap().join("meta.json"), + format!( + r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write repo meta"); + + let snapshot_hash = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"; + let delta_hash = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"; + let notification_xml = if with_delta { + format!( + r#" + + + +"# + ) + } else { + format!( + r#" + + +"# + ) + }; + std::fs::write(session_dir.join("notification-99.xml"), notification_xml) + .expect("write notification"); + std::fs::write( + session_dir.join(format!("snapshot-99-{snapshot_hash}.xml")), + b"", + ) + .expect("write snapshot"); + if with_delta { + std::fs::write( + session_dir.join(format!("delta-99-{delta_hash}.xml")), + b"", + ) + .expect("write delta"); + } + + let module_uri = "rsync://rsync.example.test/repo/".to_string(); + let module_hash = sha256_hex(module_uri.as_bytes()); + let tree_root = capture_root + .join("rsync/modules") + .join(&module_hash) + .join("tree") + .join("rsync.example.test") + .join("repo"); + std::fs::create_dir_all(&tree_root).expect("mkdir tree root"); + let module_bucket_dir = capture_root.join("rsync/modules").join(&module_hash); + std::fs::write( + module_bucket_dir.join("meta.json"), + format!( + r#"{{"version":1,"module":"{module_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write rsync meta"); + std::fs::write(tree_root.join("x.cer"), b"cer").expect("write tree file"); + + let locks_path = temp.path().join("locks.json"); + std::fs::write( + &locks_path, + format!( + r#"{{ + "version":1, + "capture":"{capture}", + "rrdp":{{ + "{notify_uri}":{{"transport":"rrdp","session":"{session}","serial":{serial}}}, + "{rsync_locked_notify}":{{"transport":"rsync","session":null,"serial":null}} + }}, + "rsync":{{ + "{module_uri}":{{"transport":"rsync"}} + }} +}}"# + ), + ) + .expect("write locks"); + + ( + temp, + archive_root, + locks_path, + notify_uri, + snapshot_uri, + rsync_locked_notify, + with_delta.then_some(delta_uri), + ) + } + + #[test] + fn payload_replay_http_fetcher_reads_locked_notification_and_snapshot_and_delta() { + let (_temp, archive_root, locks_path, notify_uri, snapshot_uri, _rsync_locked, delta_uri) = + build_archive_with_rrdp_and_rsync(true); + let fetcher = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + + let notification = fetcher.fetch(¬ify_uri).expect("fetch notification"); + assert!( + std::str::from_utf8(¬ification) + .expect("utf8") + .contains(""); + + let delta = fetcher + .fetch(delta_uri.as_deref().expect("delta uri")) + .expect("fetch delta"); + assert_eq!(delta, b""); + } + + #[test] + fn payload_replay_http_fetcher_rejects_notification_locked_to_rsync() { + let (_temp, archive_root, locks_path, _notify_uri, _snapshot_uri, rsync_locked, _delta_uri) = + build_archive_with_rrdp_and_rsync(false); + let fetcher = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + let err = fetcher.fetch(&rsync_locked).unwrap_err(); + assert!(err.contains("locked to rsync transport"), "{err}"); + } + + #[test] + fn payload_replay_http_fetcher_rejects_unknown_uri() { + let ( + _temp, + archive_root, + locks_path, + _notify_uri, + _snapshot_uri, + _rsync_locked, + _delta_uri, + ) = build_archive_with_rrdp_and_rsync(false); + let fetcher = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + let err = fetcher + .fetch("https://unknown.example/test.xml") + .unwrap_err(); + assert!(err.contains("not found in archive"), "{err}"); + } + + #[test] + fn payload_replay_http_fetcher_rejects_notification_lock_mismatch() { + let (_temp, archive_root, locks_path, notify_uri, _snapshot_uri, _rsync_locked, _delta_uri) = + build_archive_with_rrdp_and_rsync(false); + let bucket_hash = sha256_hex(notify_uri.as_bytes()); + let notification = archive_root + .join("v1/captures/capture-http/rrdp/repos") + .join(bucket_hash) + .join("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + .join("notification-99.xml"); + std::fs::write( + ¬ification, + r#" + + +"#, + ) + .expect("rewrite notification"); + + let err = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path).unwrap_err(); + assert!( + matches!(err, ReplayHttpFetcherError::NotificationLockMismatch { .. }), + "{err}" + ); + } + + #[test] + fn payload_replay_http_fetcher_skips_missing_locked_delta_route() { + let (_temp, archive_root, locks_path, notify_uri, _snapshot_uri, _rsync_locked, _delta_uri) = + build_archive_with_rrdp_and_rsync(true); + let bucket_hash = sha256_hex(notify_uri.as_bytes()); + let delta = archive_root + .join("v1/captures/capture-http/rrdp/repos") + .join(bucket_hash) + .join("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + .join("delta-99-ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100.xml"); + std::fs::remove_file(delta).expect("remove delta"); + + let fetcher = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher without delta route"); + let err = fetcher + .fetch("https://rrdp.example.test/delta.xml") + .unwrap_err(); + assert!(err.contains("not found in archive"), "{err}"); + } +} diff --git a/src/replay/fetch_rsync.rs b/src/replay/fetch_rsync.rs new file mode 100644 index 0000000..df8fb91 --- /dev/null +++ b/src/replay/fetch_rsync.rs @@ -0,0 +1,253 @@ +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use crate::fetch::rsync::{RsyncFetchError, RsyncFetchResult, RsyncFetcher}; +use crate::replay::archive::{ReplayArchiveIndex, canonical_rsync_module}; + +#[derive(Clone, Debug)] +pub struct PayloadReplayRsyncFetcher { + index: Arc, +} + +impl PayloadReplayRsyncFetcher { + pub fn new(index: Arc) -> Self { + Self { index } + } + + pub fn from_paths( + archive_root: impl AsRef, + locks_path: impl AsRef, + ) -> Result { + let index = Arc::new( + ReplayArchiveIndex::load(archive_root, locks_path).map_err(|e| e.to_string())?, + ); + Ok(Self::new(index)) + } + + pub fn archive_index(&self) -> &ReplayArchiveIndex { + self.index.as_ref() + } +} + +impl RsyncFetcher for PayloadReplayRsyncFetcher { + fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult)>> { + let (module_uri, relative_path) = split_rsync_base_uri(rsync_base_uri) + .map_err(|e| RsyncFetchError::Fetch(e.to_string()))?; + let module = self + .index + .resolve_rsync_module_for_base_uri(rsync_base_uri) + .map_err(|e| RsyncFetchError::Fetch(e.to_string()))?; + let module_root = module_tree_root(&module_uri, &module.tree_dir) + .map_err(|e| RsyncFetchError::Fetch(e.to_string()))?; + let start = if relative_path.as_os_str().is_empty() { + module_root + } else { + module_root.join(&relative_path) + }; + if !start.is_dir() { + return Err(RsyncFetchError::Fetch(format!( + "replay rsync subtree not found: {}", + start.display() + ))); + } + let mut out = Vec::new(); + walk_dir_collect(&start, &start, rsync_base_uri, &mut out) + .map_err(RsyncFetchError::Fetch)?; + Ok(out) + } +} + +fn split_rsync_base_uri(rsync_base_uri: &str) -> Result<(String, PathBuf), String> { + let module_uri = canonical_rsync_module(rsync_base_uri).map_err(|e| e.to_string())?; + let rest = rsync_base_uri + .strip_prefix(&module_uri) + .unwrap_or_default() + .trim_matches('/'); + let relative_path = if rest.is_empty() { + PathBuf::new() + } else { + let mut path = PathBuf::new(); + for segment in rest.split('/') { + if !segment.is_empty() { + path.push(segment); + } + } + path + }; + Ok((module_uri, relative_path)) +} + +fn module_tree_root(module_uri: &str, tree_dir: &Path) -> Result { + let rest = module_uri + .strip_prefix("rsync://") + .ok_or_else(|| format!("invalid rsync module URI: {module_uri}"))?; + let mut parts = rest.trim_end_matches('/').split('/'); + let authority = parts + .next() + .ok_or_else(|| format!("invalid rsync module URI: {module_uri}"))?; + let module = parts + .next() + .ok_or_else(|| format!("invalid rsync module URI: {module_uri}"))?; + Ok(tree_dir.join(authority).join(module)) +} + +fn walk_dir_collect( + root: &Path, + current: &Path, + rsync_base_uri: &str, + out: &mut Vec<(String, Vec)>, +) -> Result<(), String> { + let rd = fs::read_dir(current).map_err(|e| e.to_string())?; + for entry in rd { + let entry = entry.map_err(|e| e.to_string())?; + let path = entry.path(); + let meta = entry.metadata().map_err(|e| e.to_string())?; + if meta.is_dir() { + walk_dir_collect(root, &path, rsync_base_uri, out)?; + continue; + } + if !meta.is_file() { + continue; + } + let rel = path + .strip_prefix(root) + .map_err(|e| e.to_string())? + .to_string_lossy() + .replace('\\', "/"); + let base = if rsync_base_uri.ends_with('/') { + rsync_base_uri.to_string() + } else { + format!("{rsync_base_uri}/") + }; + let uri = format!("{base}{rel}"); + let bytes = fs::read(&path).map_err(|e| e.to_string())?; + out.push((uri, bytes)); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn build_rsync_archive() -> (tempfile::TempDir, PathBuf, PathBuf) { + let temp = tempfile::tempdir().expect("tempdir"); + let archive_root = temp.path().join("payload-archive"); + let capture = "capture-rsync"; + let capture_root = archive_root.join("v1").join("captures").join(capture); + std::fs::create_dir_all(&capture_root).expect("mkdir capture root"); + std::fs::write( + capture_root.join("capture.json"), + format!( + r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-13T00:00:00Z","notes":""}}"# + ), + ) + .expect("write capture meta"); + + let module_uri = "rsync://rsync.example.test/repo/"; + let mod_hash = crate::replay::archive::sha256_hex(module_uri.as_bytes()); + let module_root = capture_root + .join("rsync/modules") + .join(&mod_hash) + .join("tree") + .join("rsync.example.test") + .join("repo"); + std::fs::create_dir_all(module_root.join("child")).expect("mkdir module tree"); + let module_bucket_dir = capture_root.join("rsync/modules").join(&mod_hash); + std::fs::write( + module_bucket_dir.join("meta.json"), + format!( + r#"{{"version":1,"module":"{module_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write module meta"); + std::fs::write(module_root.join("a.roa"), b"a").expect("write a.roa"); + std::fs::write(module_root.join("child").join("b.cer"), b"b").expect("write b.cer"); + + let locks_path = temp.path().join("locks.json"); + std::fs::write( + &locks_path, + format!( + r#"{{"version":1,"capture":"{capture}","rrdp":{{}},"rsync":{{"{module_uri}":{{"transport":"rsync"}}}}}}"# + ), + ) + .expect("write locks"); + + (temp, archive_root, locks_path) + } + + #[test] + fn payload_replay_rsync_fetcher_reads_module_root() { + let (_temp, archive_root, locks_path) = build_rsync_archive(); + let fetcher = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + let mut objects = fetcher + .fetch_objects("rsync://rsync.example.test/repo/") + .expect("fetch root objects"); + objects.sort_by(|a, b| a.0.cmp(&b.0)); + + assert_eq!(objects.len(), 2); + assert_eq!(objects[0].0, "rsync://rsync.example.test/repo/a.roa"); + assert_eq!(objects[0].1, b"a"); + assert_eq!(objects[1].0, "rsync://rsync.example.test/repo/child/b.cer"); + assert_eq!(objects[1].1, b"b"); + } + + #[test] + fn payload_replay_rsync_fetcher_reads_subtree_only() { + let (_temp, archive_root, locks_path) = build_rsync_archive(); + let fetcher = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + let objects = fetcher + .fetch_objects("rsync://rsync.example.test/repo/child/") + .expect("fetch child subtree"); + + assert_eq!(objects.len(), 1); + assert_eq!(objects[0].0, "rsync://rsync.example.test/repo/child/b.cer"); + assert_eq!(objects[0].1, b"b"); + } + + #[test] + fn payload_replay_rsync_fetcher_rejects_missing_module_and_subtree() { + let (_temp, archive_root, locks_path) = build_rsync_archive(); + let fetcher = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + + let err = fetcher + .fetch_objects("rsync://missing.example/repo/") + .unwrap_err(); + match err { + RsyncFetchError::Fetch(msg) => assert!( + msg.contains("no replay lock found for rsync module"), + "{msg}" + ), + } + + let err = fetcher + .fetch_objects("rsync://rsync.example.test/repo/missing/") + .unwrap_err(); + match err { + RsyncFetchError::Fetch(msg) => { + assert!(msg.contains("replay rsync subtree not found"), "{msg}") + } + } + } + + #[test] + fn payload_replay_rsync_fetcher_rejects_invalid_uri_and_exposes_index() { + let (_temp, archive_root, locks_path) = build_rsync_archive(); + let fetcher = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + assert_eq!(fetcher.archive_index().rsync_modules.len(), 1); + + let err = fetcher + .fetch_objects("https://not-rsync.example/repo/") + .unwrap_err(); + match err { + RsyncFetchError::Fetch(msg) => { + assert!(msg.contains("URI must start with rsync://"), "{msg}") + } + } + } +} diff --git a/src/replay/mod.rs b/src/replay/mod.rs new file mode 100644 index 0000000..26d5add --- /dev/null +++ b/src/replay/mod.rs @@ -0,0 +1,3 @@ +pub mod archive; +pub mod fetch_http; +pub mod fetch_rsync; diff --git a/src/sync/repo.rs b/src/sync/repo.rs index 2078cb8..62be036 100644 --- a/src/sync/repo.rs +++ b/src/sync/repo.rs @@ -3,6 +3,7 @@ use crate::audit::AuditDownloadKind; use crate::audit_downloads::DownloadLogHandle; use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher}; use crate::policy::{Policy, SyncPreference}; +use crate::replay::archive::{ReplayArchiveIndex, ReplayTransport}; use crate::report::{RfcRef, Warning}; use crate::storage::RocksStore; use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log; @@ -43,6 +44,9 @@ pub enum RepoSyncError { #[error("rsync fallback failed: {0}")] Rsync(#[from] RsyncFetchError), + #[error("replay sync error: {0}")] + Replay(String), + #[error("storage error: {0}")] Storage(String), } @@ -133,6 +137,85 @@ pub fn sync_publication_point( } } +pub fn sync_publication_point_replay( + store: &RocksStore, + replay_index: &ReplayArchiveIndex, + rrdp_notification_uri: Option<&str>, + rsync_base_uri: &str, + http_fetcher: &dyn HttpFetcher, + rsync_fetcher: &dyn RsyncFetcher, + timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, +) -> Result { + match resolve_replay_transport(replay_index, rrdp_notification_uri, rsync_base_uri)? { + ReplayResolvedTransport::Rrdp(notification_uri) => { + let written = try_rrdp_sync_with_retry( + store, + notification_uri, + http_fetcher, + timing, + download_log, + )?; + if let Some(t) = timing.as_ref() { + t.record_count("repo_sync_rrdp_ok_total", 1); + t.record_count("repo_sync_rrdp_objects_written_total", written as u64); + } + Ok(RepoSyncResult { + source: RepoSyncSource::Rrdp, + objects_written: written, + warnings: Vec::new(), + }) + } + ReplayResolvedTransport::Rsync => { + let written = rsync_sync_into_raw_objects( + store, + rsync_base_uri, + rsync_fetcher, + timing, + download_log, + )?; + if let Some(t) = timing.as_ref() { + t.record_count("repo_sync_rsync_direct_total", 1); + t.record_count("repo_sync_rsync_objects_written_total", written as u64); + } + Ok(RepoSyncResult { + source: RepoSyncSource::Rsync, + objects_written: written, + warnings: Vec::new(), + }) + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ReplayResolvedTransport<'a> { + Rrdp(&'a str), + Rsync, +} + +fn resolve_replay_transport<'a>( + replay_index: &'a ReplayArchiveIndex, + rrdp_notification_uri: Option<&'a str>, + rsync_base_uri: &str, +) -> Result, RepoSyncError> { + if let Some(notification_uri) = rrdp_notification_uri { + let lock = replay_index.rrdp_lock(notification_uri).ok_or_else(|| { + RepoSyncError::Replay(format!( + "replay RRDP lock missing for notification URI: {notification_uri}" + )) + })?; + return Ok(match lock.transport { + ReplayTransport::Rrdp => ReplayResolvedTransport::Rrdp(notification_uri), + ReplayTransport::Rsync => ReplayResolvedTransport::Rsync, + }); + } + + replay_index + .resolve_rsync_module_for_base_uri(rsync_base_uri) + .map_err(|e| RepoSyncError::Replay(e.to_string()))?; + Ok(ReplayResolvedTransport::Rsync) +} + fn try_rrdp_sync( store: &RocksStore, notification_uri: &str, @@ -354,6 +437,9 @@ mod tests { use super::*; use crate::analysis::timing::{TimingHandle, TimingMeta}; use crate::fetch::rsync::LocalDirRsyncFetcher; + use crate::replay::archive::{ReplayArchiveIndex, sha256_hex}; + use crate::replay::fetch_http::PayloadReplayHttpFetcher; + use crate::replay::fetch_rsync::PayloadReplayRsyncFetcher; use crate::sync::rrdp::Fetcher as HttpFetcher; use crate::sync::rrdp::RrdpState; use base64::Engine; @@ -416,6 +502,107 @@ mod tests { out.into_bytes() } + fn build_replay_archive_fixture() -> ( + tempfile::TempDir, + std::path::PathBuf, + std::path::PathBuf, + String, + String, + String, + String, + ) { + let temp = tempfile::tempdir().expect("tempdir"); + let archive_root = temp.path().join("payload-archive"); + let capture = "repo-replay"; + let capture_root = archive_root.join("v1").join("captures").join(capture); + std::fs::create_dir_all(&capture_root).expect("mkdir capture root"); + std::fs::write( + capture_root.join("capture.json"), + format!( + r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-13T00:00:00Z","notes":""}}"# + ), + ) + .expect("write capture json"); + + let notify_uri = "https://rrdp.example.test/notification.xml".to_string(); + let snapshot_uri = "https://rrdp.example.test/snapshot.xml".to_string(); + let session = "00000000-0000-0000-0000-000000000001".to_string(); + let serial = 7u64; + let published_uri = "rsync://example.test/repo/a.mft".to_string(); + let published_bytes = b"mft"; + let snapshot = snapshot_xml(&session, serial, &[(&published_uri, published_bytes)]); + let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot)); + let notification = notification_xml(&session, serial, &snapshot_uri, &snapshot_hash); + let repo_hash = sha256_hex(notify_uri.as_bytes()); + let session_dir = capture_root + .join("rrdp/repos") + .join(&repo_hash) + .join(&session); + std::fs::create_dir_all(&session_dir).expect("mkdir session dir"); + std::fs::write( + session_dir.parent().unwrap().join("meta.json"), + format!( + r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write repo meta"); + std::fs::write(session_dir.join("notification-7.xml"), notification) + .expect("write notification"); + std::fs::write( + session_dir.join(format!("snapshot-7-{snapshot_hash}.xml")), + &snapshot, + ) + .expect("write snapshot"); + + let rsync_base_uri = "rsync://rsync.example.test/repo/".to_string(); + let rsync_locked_notify = "https://rrdp-fallback.example.test/notification.xml".to_string(); + let mod_hash = sha256_hex(rsync_base_uri.as_bytes()); + let module_bucket_dir = capture_root.join("rsync/modules").join(&mod_hash); + let module_root = module_bucket_dir + .join("tree") + .join("rsync.example.test") + .join("repo"); + std::fs::create_dir_all(module_root.join("sub")).expect("mkdir module tree"); + std::fs::write( + module_bucket_dir.join("meta.json"), + format!( + r#"{{"version":1,"module":"{rsync_base_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"# + ), + ) + .expect("write rsync meta"); + std::fs::write(module_root.join("sub").join("fallback.cer"), b"cer") + .expect("write rsync object"); + + let locks_path = temp.path().join("locks.json"); + std::fs::write( + &locks_path, + format!( + r#"{{ + "version":1, + "capture":"{capture}", + "rrdp":{{ + "{notify_uri}":{{"transport":"rrdp","session":"{session}","serial":{serial}}}, + "{rsync_locked_notify}":{{"transport":"rsync","session":null,"serial":null}} + }}, + "rsync":{{ + "{rsync_base_uri}":{{"transport":"rsync"}} + }} +}}"# + ), + ) + .expect("write locks"); + + ( + temp, + archive_root, + locks_path, + notify_uri, + rsync_locked_notify, + rsync_base_uri, + published_uri, + ) + } + fn timing_to_json(temp_dir: &std::path::Path, timing: &TimingHandle) -> serde_json::Value { let timing_path = temp_dir.join("timing_retry.json"); timing.write_json(&timing_path, 50).expect("write json"); @@ -952,4 +1139,127 @@ mod tests { ); assert!(events.iter().all(|e| e.success)); } + + #[test] + fn replay_sync_uses_rrdp_when_locked_to_rrdp() { + let temp = tempfile::tempdir().expect("tempdir"); + let store_dir = temp.path().join("db"); + let store = RocksStore::open(&store_dir).expect("open rocksdb"); + + let ( + _archive_temp, + archive_root, + locks_path, + notify_uri, + _rsync_locked_notify, + _rsync_base_uri, + published_uri, + ) = build_replay_archive_fixture(); + let replay_index = + ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index"); + let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + + let out = sync_publication_point_replay( + &store, + &replay_index, + Some(¬ify_uri), + "rsync://example.test/repo/", + &http, + &rsync, + None, + None, + ) + .expect("replay sync ok"); + + assert_eq!(out.source, RepoSyncSource::Rrdp); + assert_eq!(out.objects_written, 1); + assert_eq!( + store.get_raw(&published_uri).unwrap(), + Some(b"mft".to_vec()) + ); + } + + #[test] + fn replay_sync_uses_rsync_when_notification_is_locked_to_rsync() { + let temp = tempfile::tempdir().expect("tempdir"); + let store_dir = temp.path().join("db"); + let store = RocksStore::open(&store_dir).expect("open rocksdb"); + + let ( + _archive_temp, + archive_root, + locks_path, + _notify_uri, + rsync_locked_notify, + rsync_base_uri, + _published_uri, + ) = build_replay_archive_fixture(); + let replay_index = + ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index"); + let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + + let out = sync_publication_point_replay( + &store, + &replay_index, + Some(&rsync_locked_notify), + &rsync_base_uri, + &http, + &rsync, + None, + None, + ) + .expect("replay rsync sync ok"); + + assert_eq!(out.source, RepoSyncSource::Rsync); + assert_eq!(out.objects_written, 1); + assert_eq!(out.warnings.len(), 0); + assert_eq!( + store + .get_raw("rsync://rsync.example.test/repo/sub/fallback.cer") + .unwrap(), + Some(b"cer".to_vec()) + ); + } + + #[test] + fn replay_sync_errors_when_lock_is_missing() { + let temp = tempfile::tempdir().expect("tempdir"); + let store_dir = temp.path().join("db"); + let store = RocksStore::open(&store_dir).expect("open rocksdb"); + + let ( + _archive_temp, + archive_root, + locks_path, + _notify_uri, + _rsync_locked_notify, + _rsync_base_uri, + _published_uri, + ) = build_replay_archive_fixture(); + let replay_index = + ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index"); + let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay http fetcher"); + let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path) + .expect("build replay rsync fetcher"); + + let err = sync_publication_point_replay( + &store, + &replay_index, + Some("https://missing.example/notification.xml"), + "rsync://missing.example/repo/", + &http, + &rsync, + None, + None, + ) + .unwrap_err(); + assert!(matches!(err, RepoSyncError::Replay(_)), "{err}"); + } } diff --git a/src/validation/run.rs b/src/validation/run.rs index 29cb4bb..8db1ec9 100644 --- a/src/validation/run.rs +++ b/src/validation/run.rs @@ -67,6 +67,7 @@ pub fn run_publication_point_once( validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: false, diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index 03e463f..029fe21 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -4,6 +4,9 @@ use crate::analysis::timing::TimingHandle; use crate::audit::PublicationPointAudit; use crate::audit_downloads::DownloadLogHandle; use crate::data_model::ta::TrustAnchor; +use crate::replay::archive::ReplayArchiveIndex; +use crate::replay::fetch_http::PayloadReplayHttpFetcher; +use crate::replay::fetch_rsync::PayloadReplayRsyncFetcher; use crate::sync::rrdp::Fetcher; use crate::validation::from_tal::{ DiscoveredRootCaInstance, FromTalError, discover_root_ca_instance_from_tal_and_ta_der, @@ -15,7 +18,7 @@ use crate::validation::tree::{ }; use crate::validation::tree_runner::Rpkiv1PublicationPointRunner; use std::collections::HashMap; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; fn tal_id_from_url_like(s: &str) -> Option { let url = Url::parse(s).ok()?; @@ -75,6 +78,9 @@ pub enum RunTreeFromTalError { #[error("{0}")] FromTal(#[from] FromTalError), + #[error("payload replay setup failed: {0}")] + Replay(String), + #[error("{0}")] Tree(#[from] TreeRunError), } @@ -120,6 +126,7 @@ pub fn run_tree_from_tal_url_serial( validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -157,6 +164,7 @@ pub fn run_tree_from_tal_url_serial_audit( validation_time, timing: None, download_log: Some(download_log.clone()), + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -208,6 +216,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( validation_time, timing: Some(timing.clone()), download_log: Some(download_log.clone()), + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -259,6 +268,7 @@ pub fn run_tree_from_tal_and_ta_der_serial( validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -299,6 +309,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( validation_time, timing: None, download_log: Some(download_log.clone()), + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -353,6 +364,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( validation_time, timing: Some(timing.clone()), download_log: Some(download_log.clone()), + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -381,3 +393,325 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( download_stats, }) } + +pub fn run_tree_from_tal_and_ta_der_payload_replay_serial( + store: &crate::storage::RocksStore, + policy: &crate::policy::Policy, + tal_bytes: &[u8], + ta_der: &[u8], + resolved_ta_uri: Option<&Url>, + payload_archive_root: &std::path::Path, + payload_locks_path: &std::path::Path, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, +) -> Result { + let discovery = + discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?; + let replay_index = Arc::new( + ReplayArchiveIndex::load(payload_archive_root, payload_locks_path) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?, + ); + let http_fetcher = PayloadReplayHttpFetcher::new(replay_index.clone()) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?; + let rsync_fetcher = PayloadReplayRsyncFetcher::new(replay_index.clone()); + + let runner = Rpkiv1PublicationPointRunner { + store, + policy, + http_fetcher: &http_fetcher, + rsync_fetcher: &rsync_fetcher, + validation_time, + timing: None, + download_log: None, + replay_archive_index: Some(replay_index), + rrdp_dedup: true, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: true, + rsync_repo_cache: Mutex::new(HashMap::new()), + }; + + let root = root_handle_from_trust_anchor( + &discovery.trust_anchor, + derive_tal_id(&discovery), + None, + &discovery.ca_instance, + ); + let tree = run_tree_serial(root, &runner, config)?; + + Ok(RunTreeFromTalOutput { discovery, tree }) +} + +pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit( + store: &crate::storage::RocksStore, + policy: &crate::policy::Policy, + tal_bytes: &[u8], + ta_der: &[u8], + resolved_ta_uri: Option<&Url>, + payload_archive_root: &std::path::Path, + payload_locks_path: &std::path::Path, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, +) -> Result { + let discovery = + discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?; + let replay_index = Arc::new( + ReplayArchiveIndex::load(payload_archive_root, payload_locks_path) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?, + ); + let http_fetcher = PayloadReplayHttpFetcher::new(replay_index.clone()) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?; + let rsync_fetcher = PayloadReplayRsyncFetcher::new(replay_index.clone()); + let download_log = DownloadLogHandle::new(); + + let runner = Rpkiv1PublicationPointRunner { + store, + policy, + http_fetcher: &http_fetcher, + rsync_fetcher: &rsync_fetcher, + validation_time, + timing: None, + download_log: Some(download_log.clone()), + replay_archive_index: Some(replay_index), + rrdp_dedup: true, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: true, + rsync_repo_cache: Mutex::new(HashMap::new()), + }; + + let root = root_handle_from_trust_anchor( + &discovery.trust_anchor, + derive_tal_id(&discovery), + None, + &discovery.ca_instance, + ); + let TreeRunAuditOutput { + tree, + publication_points, + } = run_tree_serial_audit(root, &runner, config)?; + + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); + Ok(RunTreeFromTalAuditOutput { + discovery, + tree, + publication_points, + downloads, + download_stats, + }) +} + +pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing( + store: &crate::storage::RocksStore, + policy: &crate::policy::Policy, + tal_bytes: &[u8], + ta_der: &[u8], + resolved_ta_uri: Option<&Url>, + payload_archive_root: &std::path::Path, + payload_locks_path: &std::path::Path, + validation_time: time::OffsetDateTime, + config: &TreeRunConfig, + timing: &TimingHandle, +) -> Result { + let _tal = timing.span_phase("tal_bootstrap"); + let discovery = + discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?; + drop(_tal); + let replay_index = Arc::new( + ReplayArchiveIndex::load(payload_archive_root, payload_locks_path) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?, + ); + let http_fetcher = PayloadReplayHttpFetcher::new(replay_index.clone()) + .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?; + let rsync_fetcher = PayloadReplayRsyncFetcher::new(replay_index.clone()); + let download_log = DownloadLogHandle::new(); + + let runner = Rpkiv1PublicationPointRunner { + store, + policy, + http_fetcher: &http_fetcher, + rsync_fetcher: &rsync_fetcher, + validation_time, + timing: Some(timing.clone()), + download_log: Some(download_log.clone()), + replay_archive_index: Some(replay_index), + rrdp_dedup: true, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: true, + rsync_repo_cache: Mutex::new(HashMap::new()), + }; + + let root = root_handle_from_trust_anchor( + &discovery.trust_anchor, + derive_tal_id(&discovery), + None, + &discovery.ca_instance, + ); + let _tree = timing.span_phase("tree_run_total"); + let TreeRunAuditOutput { + tree, + publication_points, + } = run_tree_serial_audit(root, &runner, config)?; + + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); + Ok(RunTreeFromTalAuditOutput { + discovery, + tree, + publication_points, + downloads, + download_stats, + }) +} + +#[cfg(test)] +mod replay_api_tests { + use super::*; + use crate::analysis::timing::{TimingHandle, TimingMeta}; + use time::format_description::well_known::Rfc3339; + + fn apnic_replay_inputs() -> ( + Vec, + Vec, + std::path::PathBuf, + std::path::PathBuf, + time::OffsetDateTime, + ) { + let tal_bytes = std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal") + .expect("read apnic tal fixture"); + let ta_der = + std::fs::read("tests/fixtures/ta/apnic-ta.cer").expect("read apnic ta fixture"); + let archive_root = std::path::PathBuf::from("target/live/payload_replay/payload-archive"); + let locks_path = std::path::PathBuf::from("target/live/payload_replay/locks.json"); + let validation_time = time::OffsetDateTime::parse("2026-03-13T02:30:00Z", &Rfc3339) + .expect("parse validation time"); + (tal_bytes, ta_der, archive_root, locks_path, validation_time) + } + + #[test] + fn payload_replay_api_reports_setup_error_for_missing_archive() { + let temp = tempfile::tempdir().expect("tempdir"); + let store = crate::storage::RocksStore::open(&temp.path().join("db")).expect("open db"); + let tal_bytes = std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal") + .expect("read apnic tal fixture"); + let ta_der = + std::fs::read("tests/fixtures/ta/apnic-ta.cer").expect("read apnic ta fixture"); + let err = run_tree_from_tal_and_ta_der_payload_replay_serial_audit( + &store, + &crate::policy::Policy::default(), + &tal_bytes, + &ta_der, + None, + std::path::Path::new("tests/fixtures/missing-payload-archive"), + std::path::Path::new("tests/fixtures/missing-locks.json"), + time::OffsetDateTime::now_utc(), + &TreeRunConfig { + max_depth: Some(0), + max_instances: Some(1), + }, + ) + .unwrap_err(); + assert!(matches!(err, RunTreeFromTalError::Replay(_)), "{err}"); + } + + #[test] + fn payload_replay_api_root_only_apnic_archive_runs() { + let temp = tempfile::tempdir().expect("tempdir"); + let store = crate::storage::RocksStore::open(&temp.path().join("db")).expect("open db"); + let (tal_bytes, ta_der, archive_root, locks_path, validation_time) = apnic_replay_inputs(); + assert!( + archive_root.is_dir(), + "payload replay archive missing: {}", + archive_root.display() + ); + assert!( + locks_path.is_file(), + "payload replay locks missing: {}", + locks_path.display() + ); + + let out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit( + &store, + &crate::policy::Policy::default(), + &tal_bytes, + &ta_der, + None, + &archive_root, + &locks_path, + validation_time, + &TreeRunConfig { + max_depth: Some(0), + max_instances: Some(1), + }, + ) + .expect("run replay root-only audit"); + + assert_eq!(out.tree.instances_processed, 1); + assert_eq!(out.tree.instances_failed, 0); + assert_eq!(out.publication_points.len(), 1); + assert_eq!(out.discovery.trust_anchor.resolved_ta_uri, None); + assert!(!out.downloads.is_empty()); + assert!( + out.downloads.iter().all(|d| d.success), + "expected successful replay downloads" + ); + } + + #[test] + fn payload_replay_api_root_only_apnic_archive_runs_with_timing() { + let temp = tempfile::tempdir().expect("tempdir"); + let db_path = temp.path().join("db"); + let store = crate::storage::RocksStore::open(&db_path).expect("open db"); + let (tal_bytes, ta_der, archive_root, locks_path, validation_time) = apnic_replay_inputs(); + assert!( + archive_root.is_dir(), + "payload replay archive missing: {}", + archive_root.display() + ); + assert!( + locks_path.is_file(), + "payload replay locks missing: {}", + locks_path.display() + ); + + let timing = TimingHandle::new(TimingMeta { + recorded_at_utc_rfc3339: "2026-03-13T03:00:00Z".to_string(), + validation_time_utc_rfc3339: "2026-03-13T02:30:00Z".to_string(), + tal_url: None, + db_path: Some(db_path.to_string_lossy().into_owned()), + }); + + let out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing( + &store, + &crate::policy::Policy::default(), + &tal_bytes, + &ta_der, + None, + &archive_root, + &locks_path, + validation_time, + &TreeRunConfig { + max_depth: Some(0), + max_instances: Some(1), + }, + &timing, + ) + .expect("run replay root-only audit with timing"); + + assert_eq!(out.tree.instances_processed, 1); + let timing_json = temp.path().join("timing_replay.json"); + timing + .write_json(&timing_json, 20) + .expect("write timing json"); + let json: serde_json::Value = + serde_json::from_slice(&std::fs::read(&timing_json).expect("read timing json")) + .expect("parse timing json"); + let counts = json.get("counts").expect("counts"); + assert!( + counts + .get("repo_sync_rrdp_ok_total") + .and_then(|v| v.as_u64()) + .unwrap_or(0) + >= 1 + ); + } +} diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index b70f569..d8d89b6 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -11,6 +11,7 @@ use crate::data_model::rc::ResourceCertificate; use crate::data_model::roa::{RoaAfi, RoaObject}; use crate::fetch::rsync::RsyncFetcher; use crate::policy::Policy; +use crate::replay::archive::ReplayArchiveIndex; use crate::report::{RfcRef, Warning}; use crate::storage::{ AuditRuleIndexEntry, AuditRuleKind, PackFile, PackTime, RawByHashEntry, RocksStore, @@ -18,7 +19,7 @@ use crate::storage::{ VcirAuditSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirOutputType, VcirRelatedArtifact, VcirSummary, }; -use crate::sync::repo::sync_publication_point; +use crate::sync::repo::{sync_publication_point, sync_publication_point_replay}; use crate::sync::rrdp::Fetcher; use crate::validation::ca_instance::ca_instance_uris_from_ca_certificate; use crate::validation::ca_path::{ @@ -34,7 +35,7 @@ use crate::validation::tree::{ CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner, }; use std::collections::{HashMap, HashSet}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use serde::Deserialize; use serde_json::json; @@ -49,6 +50,7 @@ pub struct Rpkiv1PublicationPointRunner<'a> { pub validation_time: time::OffsetDateTime, pub timing: Option, pub download_log: Option, + pub replay_archive_index: Option>, /// In-run RRDP dedup: when RRDP is enabled, only sync each `rrdp_notification_uri` once per run. /// /// - If RRDP succeeded for a repo, later publication points referencing that same RRDP repo @@ -151,16 +153,29 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { .map(|t| t.span_phase("repo_sync_total")); let _repo_span = self.timing.as_ref().map(|t| t.span_rrdp_repo(repo_key)); - match sync_publication_point( - self.store, - self.policy, - effective_notification_uri, - &ca.rsync_base_uri, - self.http_fetcher, - self.rsync_fetcher, - self.timing.as_ref(), - self.download_log.as_ref(), - ) { + match if let Some(replay_index) = self.replay_archive_index.as_ref() { + sync_publication_point_replay( + self.store, + replay_index, + effective_notification_uri, + &ca.rsync_base_uri, + self.http_fetcher, + self.rsync_fetcher, + self.timing.as_ref(), + self.download_log.as_ref(), + ) + } else { + sync_publication_point( + self.store, + self.policy, + effective_notification_uri, + &ca.rsync_base_uri, + self.http_fetcher, + self.rsync_fetcher, + self.timing.as_ref(), + self.download_log.as_ref(), + ) + } { Ok(res) => { if self.rsync_dedup && res.source == crate::sync::repo::RepoSyncSource::Rsync { let base = normalize_rsync_base_uri(&ca.rsync_base_uri); @@ -2857,6 +2872,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: false, @@ -3023,6 +3039,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, @@ -3100,6 +3117,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: false, @@ -3123,6 +3141,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: false, @@ -4107,6 +4126,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: Some(timing.clone()), download_log: None, + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: false, @@ -4133,6 +4153,7 @@ authorityKeyIdentifier = keyid:always validation_time, timing: Some(timing), download_log: None, + replay_archive_index: None, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), rsync_dedup: true, diff --git a/tests/test_apnic_stats_live_stage2.rs b/tests/test_apnic_stats_live_stage2.rs index 9b0e241..89cbc62 100644 --- a/tests/test_apnic_stats_live_stage2.rs +++ b/tests/test_apnic_stats_live_stage2.rs @@ -170,6 +170,7 @@ fn apnic_tree_full_stats_serial() { validation_time, timing: None, download_log: None, + replay_archive_index: None, rrdp_dedup: true, rrdp_repo_cache: std::sync::Mutex::new(std::collections::HashMap::new()), rsync_dedup: true, diff --git a/tests/test_cli_payload_replay_smoke.rs b/tests/test_cli_payload_replay_smoke.rs new file mode 100644 index 0000000..c4c929f --- /dev/null +++ b/tests/test_cli_payload_replay_smoke.rs @@ -0,0 +1,69 @@ +use std::process::Command; + +#[test] +fn cli_payload_replay_root_only_smoke_writes_report_json() { + let bin = env!("CARGO_BIN_EXE_rpki"); + let db_dir = tempfile::tempdir().expect("db tempdir"); + let out_dir = tempfile::tempdir().expect("out tempdir"); + let report_path = out_dir.path().join("report.json"); + + let tal_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/tal/apnic-rfc7730-https.tal"); + let ta_path = + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/ta/apnic-ta.cer"); + let archive_root = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("target/live/payload_replay/payload-archive"); + let locks_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("target/live/payload_replay/locks.json"); + + assert!( + archive_root.is_dir(), + "payload replay archive missing: {}", + archive_root.display() + ); + assert!( + locks_path.is_file(), + "payload replay locks missing: {}", + locks_path.display() + ); + + let out = Command::new(bin) + .args([ + "--db", + db_dir.path().to_string_lossy().as_ref(), + "--tal-path", + tal_path.to_string_lossy().as_ref(), + "--ta-path", + ta_path.to_string_lossy().as_ref(), + "--payload-replay-archive", + archive_root.to_string_lossy().as_ref(), + "--payload-replay-locks", + locks_path.to_string_lossy().as_ref(), + "--validation-time", + "2026-03-13T02:30:00Z", + "--max-depth", + "0", + "--max-instances", + "1", + "--report-json", + report_path.to_string_lossy().as_ref(), + ]) + .output() + .expect("run replay cli"); + + assert!( + out.status.success(), + "cli failed: status={}\nstdout={}\nstderr={}", + out.status, + String::from_utf8_lossy(&out.stdout), + String::from_utf8_lossy(&out.stderr) + ); + + let bytes = std::fs::read(&report_path).expect("read report json"); + let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse report json"); + assert_eq!(v["format_version"], 2); + assert!(v["tree"]["instances_processed"].as_u64().unwrap_or(0) >= 1); + assert!(v.get("publication_points").is_some()); + assert!(v.get("downloads").is_some()); + assert!(v.get("download_stats").is_some()); +} diff --git a/tests/test_payload_replay_tools.rs b/tests/test_payload_replay_tools.rs new file mode 100644 index 0000000..3dbb388 --- /dev/null +++ b/tests/test_payload_replay_tools.rs @@ -0,0 +1,124 @@ +use std::process::Command; + +#[test] +fn report_to_routinator_csv_exports_sorted_rows() { + let dir = tempfile::tempdir().expect("tempdir"); + let report = dir.path().join("report.json"); + let out_csv = dir.path().join("vrps.csv"); + std::fs::write( + &report, + r#"{ + "format_version": 2, + "meta": {"validation_time_rfc3339_utc": "2026-03-13T02:30:00Z"}, + "policy": {}, + "tree": {"instances_processed": 0, "instances_failed": 0, "warnings": []}, + "publication_points": [], + "vrps": [ + {"asn": 64497, "prefix": "203.0.113.0/24", "max_length": 24}, + {"asn": 64496, "prefix": "192.0.2.0/24", "max_length": 24} + ], + "aspas": [], + "downloads": [], + "download_stats": {"events_total": 0, "by_kind": {}} +}"#, + ) + .expect("write report"); + + let script = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("scripts/payload_replay/report_to_routinator_csv.py"); + let out = Command::new("python3") + .arg(&script) + .args([ + "--report", + report.to_string_lossy().as_ref(), + "--out", + out_csv.to_string_lossy().as_ref(), + "--trust-anchor", + "apnic", + ]) + .output() + .expect("run export script"); + + assert!( + out.status.success(), + "script failed: status={}\nstdout={}\nstderr={}", + out.status, + String::from_utf8_lossy(&out.stdout), + String::from_utf8_lossy(&out.stderr) + ); + + let csv = std::fs::read_to_string(&out_csv).expect("read csv"); + let lines: Vec<_> = csv.lines().collect(); + assert_eq!(lines[0], "ASN,IP Prefix,Max Length,Trust Anchor"); + assert_eq!(lines[1], "AS64496,192.0.2.0/24,24,apnic"); + assert_eq!(lines[2], "AS64497,203.0.113.0/24,24,apnic"); +} + +#[test] +fn compare_with_routinator_record_reports_diff_counts() { + let dir = tempfile::tempdir().expect("tempdir"); + let ours = dir.path().join("ours.csv"); + let record = dir.path().join("record.csv"); + let summary = dir.path().join("summary.md"); + let only_ours = dir.path().join("only_ours.csv"); + let only_record = dir.path().join("only_record.csv"); + + std::fs::write( + &ours, + "ASN,IP Prefix,Max Length,Trust Anchor\nAS64496,192.0.2.0/24,24,apnic\nAS64497,198.51.100.0/24,24,apnic\n", + ) + .expect("write ours csv"); + std::fs::write( + &record, + "ASN,IP Prefix,Max Length,Trust Anchor\nAS64496,192.0.2.0/24,24,apnic\nAS64498,203.0.113.0/24,24,apnic\n", + ) + .expect("write record csv"); + + let script = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("scripts/payload_replay/compare_with_routinator_record.sh"); + let out = Command::new(&script) + .args([ + ours.to_string_lossy().as_ref(), + record.to_string_lossy().as_ref(), + summary.to_string_lossy().as_ref(), + only_ours.to_string_lossy().as_ref(), + only_record.to_string_lossy().as_ref(), + ]) + .output() + .expect("run compare script"); + + assert!( + out.status.success(), + "script failed: status={}\nstdout={}\nstderr={}", + out.status, + String::from_utf8_lossy(&out.stdout), + String::from_utf8_lossy(&out.stderr) + ); + + let summary_text = std::fs::read_to_string(&summary).expect("read summary"); + assert!( + summary_text.contains("| ours_total | 2 |"), + "{summary_text}" + ); + assert!( + summary_text.contains("| record_total | 2 |"), + "{summary_text}" + ); + assert!( + summary_text.contains("| intersection | 1 |"), + "{summary_text}" + ); + assert!( + summary_text.contains("| only_in_ours | 1 |"), + "{summary_text}" + ); + assert!( + summary_text.contains("| only_in_record | 1 |"), + "{summary_text}" + ); + + let only_ours_text = std::fs::read_to_string(&only_ours).expect("read only ours csv"); + assert!(only_ours_text.contains("AS64497,198.51.100.0/24,24,apnic")); + let only_record_text = std::fs::read_to_string(&only_record).expect("read only record csv"); + assert!(only_record_text.contains("AS64498,203.0.113.0/24,24,apnic")); +}