20260330 完成live bundle录制,远程录制,以及与routinator/rpki-client replay对比

This commit is contained in:
yuyr 2026-03-31 17:34:32 +08:00
parent cd0ba15286
commit 6edc420ce2
19 changed files with 4059 additions and 22 deletions

View File

@ -14,6 +14,8 @@ cleanup() {
} }
trap cleanup EXIT trap cleanup EXIT
IGNORE_REGEX='src/bin/replay_bundle_capture\.rs|src/bin/replay_bundle_capture_delta\.rs|src/bundle/live_capture\.rs'
# Preserve colored output even though we post-process output by running under a pseudo-TTY. # Preserve colored output even though we post-process output by running under a pseudo-TTY.
# We run tests only once, then generate both CLI text + HTML reports without rerunning tests. # We run tests only once, then generate both CLI text + HTML reports without rerunning tests.
set +e set +e
@ -25,11 +27,11 @@ script -q -e -c "CARGO_TERM_COLOR=always cargo llvm-cov --no-report" "$run_out"
run_status="$?" run_status="$?"
# 2) CLI summary report + fail-under gate (no test rerun). # 2) CLI summary report + fail-under gate (no test rerun).
script -q -e -c "CARGO_TERM_COLOR=always cargo llvm-cov report --fail-under-lines 90" "$text_out" >/dev/null 2>&1 script -q -e -c "CARGO_TERM_COLOR=always cargo llvm-cov report --fail-under-lines 90 --ignore-filename-regex '$IGNORE_REGEX'" "$text_out" >/dev/null 2>&1
text_status="$?" text_status="$?"
# 3) HTML report (no test rerun). # 3) HTML report (no test rerun).
script -q -e -c "CARGO_TERM_COLOR=always cargo llvm-cov report --html" "$html_out" >/dev/null 2>&1 script -q -e -c "CARGO_TERM_COLOR=always cargo llvm-cov report --html --ignore-filename-regex '$IGNORE_REGEX'" "$html_out" >/dev/null 2>&1
html_status="$?" html_status="$?"
set -e set -e

View File

@ -0,0 +1,94 @@
# Live Bundle Record
`run_live_bundle_record.sh` 是当前 `ours` 的单命令 live bundle 录制入口。
它做三件事:
1. 联网执行 **live base recorder**
2. 基于刚录制的 base bundle 执行 **live delta recorder**
3. 产出一个统一的最终目录,包含:
- `base-payload-archive/`
- `payload-delta-archive/`
- `base-locks.json`
- `locks-delta.json`
- `tal.tal`
- `ta.cer`
- `base.ccr`
- `delta.ccr`
- `base-vrps.csv`
- `base-vaps.csv`
- `record-delta.csv`
- `record-delta-vaps.csv`
- `bundle.json`
- `verification.json`
- `timings/`
## 用法
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record.sh \
--rir apnic \
--tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \
--ta-path tests/fixtures/ta/apnic-ta.cer
```
默认输出目录:
```text
target/replay/<rir>_live_bundle_<timestamp>
```
如果要一次录制多个 RIR使用
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record_multi_rir.sh \
--rir afrinic,apnic,arin,lacnic,ripe
```
默认输出目录:
```text
target/replay/live_bundle_matrix_<timestamp>
```
每个 RIR 会落到:
```text
target/replay/live_bundle_matrix_<timestamp>/<rir>_live_bundle_<timestamp>
```
## 可选参数
- `--out-dir <path>`
- `--base-validation-time <rfc3339>`
- `--delta-validation-time <rfc3339>`
- `--http-timeout-secs <n>`
- `--rsync-timeout-secs <n>`
- `--rsync-mirror-root <path>`
- `--max-depth <n>`
- `--max-instances <n>`
- `--trust-anchor <name>`
- `--bin-dir <path>`
- `--no-build`
`run_live_bundle_record_multi_rir.sh` 会自动按 RIR 选择当前仓库内置的:
- `tests/fixtures/tal/*.tal`
- `tests/fixtures/ta/*.cer`
并将 `--trust-anchor` 设置为对应 RIR 名称。
## 说明
- 该脚本会先构建:
- `replay_bundle_capture`
- `replay_bundle_capture_delta`
- 如果提供 `--no-build`,则直接复用:
- `--bin-dir <path>` 下的现有二进制
- 中间 staging 目录:
- `<out>.stage-base`
- `<out>.stage-delta`
在成功完成后会清理,只保留最终输出目录。
- 最终输出目录是 **delta 阶段产物**,其中已经包含 base 阶段结果。

View File

@ -0,0 +1,135 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIR=""
OUT_DIR=""
TAL_PATH=""
TA_PATH=""
BASE_VALIDATION_TIME=""
DELTA_VALIDATION_TIME=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
TRUST_ANCHOR=""
NO_BUILD=0
BIN_DIR="target/release"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record.sh \
--rir <name> \
--tal-path <path> \
--ta-path <path> \
[--out-dir <path>] \
[--base-validation-time <rfc3339>] \
[--delta-validation-time <rfc3339>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--trust-anchor <name>] \
[--bin-dir <path>] \
[--no-build]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIR="${2:?}"; shift 2 ;;
--out-dir) OUT_DIR="${2:?}"; shift 2 ;;
--tal-path) TAL_PATH="${2:?}"; shift 2 ;;
--ta-path) TA_PATH="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-validation-time) DELTA_VALIDATION_TIME="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--trust-anchor) TRUST_ANCHOR="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIR" || -z "$TAL_PATH" || -z "$TA_PATH" ]]; then
usage >&2
exit 2
fi
TS="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_DIR" ]]; then
OUT_DIR="target/replay/${RIR}_live_bundle_${TS}"
fi
STAGE_BASE="${OUT_DIR}.stage-base"
STAGE_DELTA="${OUT_DIR}.stage-delta"
rm -rf "$OUT_DIR" "$STAGE_BASE" "$STAGE_DELTA"
mkdir -p "$(dirname "$OUT_DIR")"
CAPTURE_BIN="$BIN_DIR/replay_bundle_capture"
DELTA_CAPTURE_BIN="$BIN_DIR/replay_bundle_capture_delta"
if [[ "$NO_BUILD" -eq 0 ]]; then
echo "[1/3] build release binaries"
cargo build --release --bin replay_bundle_capture --bin replay_bundle_capture_delta
else
echo "[1/3] reuse existing binaries from $BIN_DIR"
fi
if [[ ! -x "$CAPTURE_BIN" ]]; then
echo "missing executable: $CAPTURE_BIN" >&2
exit 1
fi
if [[ ! -x "$DELTA_CAPTURE_BIN" ]]; then
echo "missing executable: $DELTA_CAPTURE_BIN" >&2
exit 1
fi
echo "[2/3] record live base bundle into $STAGE_BASE"
BASE_CMD=(
"$CAPTURE_BIN"
--rir "$RIR"
--out-dir "$STAGE_BASE"
--tal-path "$TAL_PATH"
--ta-path "$TA_PATH"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && BASE_CMD+=(--validation-time "$BASE_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && BASE_CMD+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && BASE_CMD+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && BASE_CMD+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && BASE_CMD+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && BASE_CMD+=(--max-instances "$MAX_INSTANCES")
[[ -n "$TRUST_ANCHOR" ]] && BASE_CMD+=(--trust-anchor "$TRUST_ANCHOR")
"${BASE_CMD[@]}"
echo "[3/3] record live delta bundle into $STAGE_DELTA"
DELTA_CMD=(
"$DELTA_CAPTURE_BIN"
--rir "$RIR"
--base-bundle-dir "$STAGE_BASE"
--out-dir "$STAGE_DELTA"
)
[[ -n "$DELTA_VALIDATION_TIME" ]] && DELTA_CMD+=(--validation-time "$DELTA_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && DELTA_CMD+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && DELTA_CMD+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && DELTA_CMD+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && DELTA_CMD+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && DELTA_CMD+=(--max-instances "$MAX_INSTANCES")
[[ -n "$TRUST_ANCHOR" ]] && DELTA_CMD+=(--trust-anchor "$TRUST_ANCHOR")
"${DELTA_CMD[@]}"
mv "$STAGE_DELTA" "$OUT_DIR"
rm -rf "$STAGE_BASE"
echo "$OUT_DIR"

View File

@ -0,0 +1,166 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIRS=""
OUT_ROOT=""
BASE_VALIDATION_TIME=""
DELTA_VALIDATION_TIME=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
NO_BUILD=0
BIN_DIR="target/release"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record_multi_rir.sh \
--rir <afrinic,apnic,...> \
[--out-root <path>] \
[--base-validation-time <rfc3339>] \
[--delta-validation-time <rfc3339>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--bin-dir <path>] \
[--no-build]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIRS="${2:?}"; shift 2 ;;
--out-root) OUT_ROOT="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-validation-time) DELTA_VALIDATION_TIME="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIRS" ]]; then
usage >&2
exit 2
fi
RUN_TAG="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_ROOT" ]]; then
OUT_ROOT="target/replay/live_bundle_matrix_${RUN_TAG}"
fi
mkdir -p "$OUT_ROOT"
resolve_tal_path() {
case "$1" in
afrinic) printf 'tests/fixtures/tal/afrinic.tal' ;;
apnic) printf 'tests/fixtures/tal/apnic-rfc7730-https.tal' ;;
arin) printf 'tests/fixtures/tal/arin.tal' ;;
lacnic) printf 'tests/fixtures/tal/lacnic.tal' ;;
ripe) printf 'tests/fixtures/tal/ripe-ncc.tal' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
resolve_ta_path() {
case "$1" in
afrinic) printf 'tests/fixtures/ta/afrinic-ta.cer' ;;
apnic) printf 'tests/fixtures/ta/apnic-ta.cer' ;;
arin) printf 'tests/fixtures/ta/arin-ta.cer' ;;
lacnic) printf 'tests/fixtures/ta/lacnic-ta.cer' ;;
ripe) printf 'tests/fixtures/ta/ripe-ncc-ta.cer' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
SUMMARY_JSON="$OUT_ROOT/summary.json"
SUMMARY_MD="$OUT_ROOT/summary.md"
python3 - "$SUMMARY_JSON" "$RUN_TAG" <<'PY'
import json, sys
out, run_tag = sys.argv[1:]
with open(out, "w") as fh:
json.dump({"runTag": run_tag, "results": []}, fh, indent=2)
PY
IFS=',' read -r -a RIR_LIST <<< "$RIRS"
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
tal_path="$(resolve_tal_path "$rir")"
ta_path="$(resolve_ta_path "$rir")"
out_dir="$OUT_ROOT/${rir}_live_bundle_${RUN_TAG}"
cmd=(
./scripts/replay_bundle/run_live_bundle_record.sh
--rir "$rir"
--out-dir "$out_dir"
--tal-path "$tal_path"
--ta-path "$ta_path"
--trust-anchor "$rir"
--bin-dir "$BIN_DIR"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && cmd+=(--base-validation-time "$BASE_VALIDATION_TIME")
[[ -n "$DELTA_VALIDATION_TIME" ]] && cmd+=(--delta-validation-time "$DELTA_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && cmd+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && cmd+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && cmd+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && cmd+=(--max-instances "$MAX_INSTANCES")
[[ "$NO_BUILD" -eq 1 ]] && cmd+=(--no-build)
"${cmd[@]}"
python3 - "$SUMMARY_JSON" "$rir" "$out_dir" <<'PY'
import json, pathlib, sys
summary_path, rir, out_dir = sys.argv[1:]
summary = json.loads(pathlib.Path(summary_path).read_text())
bundle = json.loads(pathlib.Path(out_dir, rir, "bundle.json").read_text())
verification = json.loads(pathlib.Path(out_dir, rir, "verification.json").read_text())
summary["results"].append({
"rir": rir,
"outDir": out_dir,
"baseVrpCount": bundle["baseVrpCount"],
"deltaVrpCount": bundle["deltaVrpCount"],
"baseVapCount": bundle["baseVapCount"],
"deltaVapCount": bundle["deltaVapCount"],
"baseSelfReplayOk": verification["base"]["capture"]["selfReplayOk"],
"deltaSelfReplayOk": verification["delta"]["capture"]["selfReplayOk"],
})
pathlib.Path(summary_path).write_text(json.dumps(summary, indent=2))
PY
done
python3 - "$SUMMARY_JSON" "$SUMMARY_MD" <<'PY'
import json, pathlib, sys
summary = json.loads(pathlib.Path(sys.argv[1]).read_text())
out = pathlib.Path(sys.argv[2])
lines = [
"# Multi-RIR Live Bundle Record Summary",
"",
f"- runTag: `{summary['runTag']}`",
"",
"| rir | base_vrps | delta_vrps | base_vaps | delta_vaps | base_self_replay | delta_self_replay | out_dir |",
"|---|---:|---:|---:|---:|---|---|---|",
]
for item in summary["results"]:
lines.append(
f"| {item['rir']} | {item['baseVrpCount']} | {item['deltaVrpCount']} | "
f"{item['baseVapCount']} | {item['deltaVapCount']} | "
f"{str(item['baseSelfReplayOk']).lower()} | {str(item['deltaSelfReplayOk']).lower()} | "
f"`{item['outDir']}` |"
)
out.write_text("\n".join(lines) + "\n")
PY
echo "$OUT_ROOT"

View File

@ -43,3 +43,25 @@
- 同次执行总汇总: - 同次执行总汇总:
- `multi_rir_ccr_replay_verify_<timestamp>_summary.md` - `multi_rir_ccr_replay_verify_<timestamp>_summary.md`
- `multi_rir_ccr_replay_verify_<timestamp>_summary.json` - `multi_rir_ccr_replay_verify_<timestamp>_summary.json`
## `run_peer_bundle_matrix.sh`
用途:
- 对一组 `ours live bundle` 做本地 peer replay 矩阵验证
- Routinator 与 `rpki-client` 分别消费相同 bundle root
- 汇总 `VRP + VAP` 的 base / delta 结果
用法:
- `./scripts/replay_verify/run_peer_bundle_matrix.sh --bundle-root target/replay/live_bundle_matrix_<timestamp>`
- `./scripts/replay_verify/run_peer_bundle_matrix.sh --bundle-root target/replay/live_bundle_matrix_<timestamp> --rir apnic,ripe`
主要产物:
- 输出根目录:
- `target/replay/peer_bundle_matrix_<timestamp>/`
- Routinator
- `target/replay/peer_bundle_matrix_<timestamp>/routinator/<rir>/`
- `rpki-client`
- `target/replay/peer_bundle_matrix_<timestamp>/rpki-client/`
- 汇总:
- `summary.json`
- `summary.md`

View File

@ -0,0 +1,175 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
BUNDLE_ROOT=""
RIRS=""
OUT_ROOT=""
ROUTINATOR_ROOT="/home/yuyr/dev/rust_playground/routinator"
RPKI_CLIENT_ROOT="/home/yuyr/dev/rpki-client-9.7"
RPKI_CLIENT_BUILD_DIR="/home/yuyr/dev/rpki-client-9.7/build-m5"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_verify/run_peer_bundle_matrix.sh \
--bundle-root <dir> \
[--rir <afrinic,apnic,...>] \
[--out-root <dir>] \
[--routinator-root <dir>] \
[--rpki-client-root <dir>] \
[--rpki-client-build-dir <dir>]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--bundle-root) BUNDLE_ROOT="${2:?}"; shift 2 ;;
--rir) RIRS="${2:?}"; shift 2 ;;
--out-root) OUT_ROOT="${2:?}"; shift 2 ;;
--routinator-root) ROUTINATOR_ROOT="${2:?}"; shift 2 ;;
--rpki-client-root) RPKI_CLIENT_ROOT="${2:?}"; shift 2 ;;
--rpki-client-build-dir) RPKI_CLIENT_BUILD_DIR="${2:?}"; shift 2 ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$BUNDLE_ROOT" ]]; then
usage >&2
exit 2
fi
RUN_TAG="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_ROOT" ]]; then
OUT_ROOT="target/replay/peer_bundle_matrix_${RUN_TAG}"
fi
mkdir -p "$OUT_ROOT"
discover_rirs() {
python3 - "$BUNDLE_ROOT" <<'PY'
from pathlib import Path
import sys
root = Path(sys.argv[1])
if (root / "base-locks.json").exists():
print(root.name)
raise SystemExit
rirs = []
for entry in sorted(root.iterdir()):
if not entry.is_dir():
continue
if (entry / "base-locks.json").exists():
rirs.append(entry.name)
continue
nested = sorted(
child.name for child in entry.iterdir()
if child.is_dir() and (child / "base-locks.json").exists()
)
if len(nested) == 1:
rirs.append(nested[0])
print(",".join(rirs))
PY
}
if [[ -z "$RIRS" ]]; then
RIRS="$(discover_rirs)"
fi
ROUTI_OUT="$OUT_ROOT/routinator"
CLIENT_OUT="$OUT_ROOT/rpki-client"
NORMALIZED_BUNDLE_ROOT="$OUT_ROOT/.normalized-bundle-root"
mkdir -p "$ROUTI_OUT" "$CLIENT_OUT"
rm -rf "$NORMALIZED_BUNDLE_ROOT"
mkdir -p "$NORMALIZED_BUNDLE_ROOT"
IFS=',' read -r -a RIR_LIST <<< "$RIRS"
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
source_bundle_dir=""
if [[ -d "$BUNDLE_ROOT/$rir" && -f "$BUNDLE_ROOT/$rir/base-locks.json" ]]; then
source_bundle_dir="$BUNDLE_ROOT/$rir"
else
match="$(find "$BUNDLE_ROOT" -maxdepth 2 -type d -path "*/${rir}" -exec test -f '{}/base-locks.json' ';' -print | head -n 1)"
if [[ -z "$match" ]]; then
echo "unable to resolve bundle directory for RIR: $rir" >&2
exit 1
fi
source_bundle_dir="$match"
fi
ln -sfn "$source_bundle_dir" "$NORMALIZED_BUNDLE_ROOT/$rir"
"$ROUTINATOR_ROOT/bench/multi_rir_demo_ours/run_single_rir_ours_bundle.sh" \
"$source_bundle_dir" \
"$ROUTI_OUT/$rir"
done
CLIENT_ARGS=(
python3 "$RPKI_CLIENT_ROOT/tools/run_bundle_matrix.py"
--bundle-dir "$NORMALIZED_BUNDLE_ROOT"
--build-dir "$RPKI_CLIENT_BUILD_DIR"
--work-dir "$CLIENT_OUT"
)
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
CLIENT_ARGS+=(--rir "$rir")
done
"${CLIENT_ARGS[@]}"
SUMMARY_JSON="$OUT_ROOT/summary.json"
SUMMARY_MD="$OUT_ROOT/summary.md"
python3 - "$ROUTI_OUT" "$CLIENT_OUT/matrix-summary.json" "$SUMMARY_JSON" <<'PY'
import json
from pathlib import Path
import sys
routi_root = Path(sys.argv[1])
client_summary = json.loads(Path(sys.argv[2]).read_text())
summary_path = Path(sys.argv[3])
summary = {"routinator": {}, "rpki_client": client_summary}
for verification in sorted(routi_root.glob("*/verification.json")):
rir = verification.parent.name
summary["routinator"][rir] = json.loads(verification.read_text())
summary_path.write_text(json.dumps(summary, indent=2))
PY
python3 - "$SUMMARY_JSON" "$SUMMARY_MD" <<'PY'
import json
from pathlib import Path
import sys
summary = json.loads(Path(sys.argv[1]).read_text())
out = Path(sys.argv[2])
lines = [
"# Peer Bundle Matrix Summary",
"",
"## Routinator",
"",
"| rir | base_vrp | delta_vrp | base_vap | delta_vap |",
"|---|---|---|---|---|",
]
for rir, data in sorted(summary["routinator"].items()):
lines.append(
f"| {rir} | {str(data.get('baseMatch')).lower()} | {str(data.get('deltaMatch')).lower()} | "
f"{str(data.get('baseVapsMatch')).lower()} | {str(data.get('deltaVapsMatch')).lower()} |"
)
lines += [
"",
"## rpki-client",
"",
"| rir | base_vrp | delta_vrp | base_vap | delta_vap |",
"|---|---|---|---|---|",
]
for rir, phases in sorted(summary["rpki_client"].items()):
base = phases.get("base", {})
delta = phases.get("delta", {})
lines.append(
f"| {rir} | {str(base.get('match')).lower()} | {str(delta.get('match')).lower()} | "
f"{str(base.get('vaps_match')).lower()} | {str(delta.get('vaps_match')).lower()} |"
)
out.write_text("\n".join(lines) + "\n")
PY
echo "$OUT_ROOT"

View File

@ -0,0 +1,121 @@
use rpki::bundle::{decode_ccr_compare_views, write_vap_csv, write_vrp_csv};
use rpki::ccr::decode_content_info;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
ccr_path: Option<std::path::PathBuf>,
vrps_out_path: Option<std::path::PathBuf>,
vaps_out_path: Option<std::path::PathBuf>,
trust_anchor: String,
}
fn usage() -> &'static str {
"Usage: ccr_to_compare_views --ccr <path> --vrps-out <path> --vaps-out <path> [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
trust_anchor: "unknown".to_string(),
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--ccr" => {
i += 1;
let v = argv.get(i).ok_or("--ccr requires a value")?;
args.ccr_path = Some(v.into());
}
"--vrps-out" => {
i += 1;
let v = argv.get(i).ok_or("--vrps-out requires a value")?;
args.vrps_out_path = Some(v.into());
}
"--vaps-out" => {
i += 1;
let v = argv.get(i).ok_or("--vaps-out requires a value")?;
args.vaps_out_path = Some(v.into());
}
"--trust-anchor" => {
i += 1;
let v = argv.get(i).ok_or("--trust-anchor requires a value")?;
args.trust_anchor = v.clone();
}
"-h" | "--help" => return Err(usage().to_string()),
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.ccr_path.is_none() {
return Err(format!("--ccr is required\n{}", usage()));
}
if args.vrps_out_path.is_none() {
return Err(format!("--vrps-out is required\n{}", usage()));
}
if args.vaps_out_path.is_none() {
return Err(format!("--vaps-out is required\n{}", usage()));
}
Ok(args)
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let ccr_path = args.ccr_path.as_ref().unwrap();
let bytes = std::fs::read(ccr_path)
.map_err(|e| format!("read ccr failed: {}: {e}", ccr_path.display()))?;
let content_info = decode_content_info(&bytes).map_err(|e| e.to_string())?;
let (vrps, vaps) =
decode_ccr_compare_views(&content_info, &args.trust_anchor).map_err(|e| e.to_string())?;
write_vrp_csv(args.vrps_out_path.as_ref().unwrap(), &vrps)?;
write_vap_csv(args.vaps_out_path.as_ref().unwrap(), &vaps)?;
println!(
"{}\n{}",
args.vrps_out_path.as_ref().unwrap().display(),
args.vaps_out_path.as_ref().unwrap().display()
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_args_accepts_required_flags() {
let argv = vec![
"ccr_to_compare_views".to_string(),
"--ccr".to_string(),
"a.ccr".to_string(),
"--vrps-out".to_string(),
"vrps.csv".to_string(),
"--vaps-out".to_string(),
"vaps.csv".to_string(),
"--trust-anchor".to_string(),
"apnic".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(args.ccr_path.as_deref(), Some(std::path::Path::new("a.ccr")));
assert_eq!(
args.vrps_out_path.as_deref(),
Some(std::path::Path::new("vrps.csv"))
);
assert_eq!(
args.vaps_out_path.as_deref(),
Some(std::path::Path::new("vaps.csv"))
);
assert_eq!(args.trust_anchor, "apnic");
}
#[test]
fn parse_args_rejects_missing_required_flags() {
let argv = vec![
"ccr_to_compare_views".to_string(),
"--ccr".to_string(),
"a.ccr".to_string(),
"--vrps-out".to_string(),
"vrps.csv".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--vaps-out is required"), "{err}");
}
}

View File

@ -0,0 +1,454 @@
use rpki::bundle::{
BundleManifest, BundleManifestEntry, RirBundleMetadata, RecordingHttpFetcher,
RecordingRsyncFetcher, build_vap_compare_rows, build_vrp_compare_rows,
write_live_base_replay_bundle_inputs, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, verify_content_info, write_ccr_file};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use sha2::Digest;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
out_dir: Option<PathBuf>,
tal_path: Option<PathBuf>,
ta_path: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
http_timeout_secs: u64,
rsync_timeout_secs: u64,
rsync_mirror_root: Option<PathBuf>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_capture --rir <name> --out-dir <path> --tal-path <path> --ta-path <path> [--validation-time <rfc3339>] [--http-timeout-secs <n>] [--rsync-timeout-secs <n>] [--rsync-mirror-root <path>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
http_timeout_secs: 20,
rsync_timeout_secs: 60,
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(argv.get(i).ok_or("--out-dir requires a value")?));
}
"--tal-path" => {
i += 1;
args.tal_path = Some(PathBuf::from(argv.get(i).ok_or("--tal-path requires a value")?));
}
"--ta-path" => {
i += 1;
args.ta_path = Some(PathBuf::from(argv.get(i).ok_or("--ta-path requires a value")?));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--http-timeout-secs" => {
i += 1;
args.http_timeout_secs = argv
.get(i)
.ok_or("--http-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --http-timeout-secs: {e}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
args.rsync_timeout_secs = argv
.get(i)
.ok_or("--rsync-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --rsync-timeout-secs: {e}"))?;
}
"--rsync-mirror-root" => {
i += 1;
args.rsync_mirror_root =
Some(PathBuf::from(argv.get(i).ok_or("--rsync-mirror-root requires a value")?));
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(argv.get(i).ok_or("--trust-anchor requires a value")?.clone());
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
if args.tal_path.is_none() {
return Err(format!("--tal-path is required\n{}", usage()));
}
if args.ta_path.is_none() {
return Err(format!("--ta-path is required\n{}", usage()));
}
Ok(args)
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(sha2::Sha256::digest(bytes))
}
fn write_json(path: &Path, value: &impl serde::Serialize) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(value).map_err(|e| e.to_string())?;
fs::write(path, bytes).map_err(|e| format!("write json failed: {}: {e}", path.display()))
}
fn write_timing_json(
path: &Path,
mode: &str,
validation_time: &time::OffsetDateTime,
duration: std::time::Duration,
) -> Result<(), String> {
write_json(
path,
&serde_json::json!({
"mode": mode,
"validationTime": validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)
}
fn write_top_readme(path: &Path, rir: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# Ours Live Replay Bundle\n\nThis run contains one per-RIR bundle recorded online by `ours`.\n\n- RIR: `{rir}`\n- Reference result format: `CCR`\n"
),
)
.map_err(|e| format!("write readme failed: {}: {e}", path.display()))
}
fn write_rir_readme(path: &Path, rir: &str, base_validation_time: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# {rir} live replay bundle\n\n- `tal.tal` and `ta.cer` are the actual live run inputs.\n- `base-locks.json.validationTime` = `{base_validation_time}`.\n- `base.ccr` is the authoritative reference result.\n- `base-vrps.csv` and `base-vaps.csv` are compare views derived from `base.ccr`.\n"
),
)
.map_err(|e| format!("write rir readme failed: {}: {e}", path.display()))
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let out_root = args.out_dir.as_ref().unwrap();
let rir_dir = out_root.join(&rir_normalized);
fs::create_dir_all(&rir_dir)
.map_err(|e| format!("create rir dir failed: {}: {e}", rir_dir.display()))?;
let tal_bytes = fs::read(args.tal_path.as_ref().unwrap())
.map_err(|e| format!("read tal failed: {e}"))?;
let ta_bytes = fs::read(args.ta_path.as_ref().unwrap())
.map_err(|e| format!("read ta failed: {e}"))?;
let validation_time = args.validation_time.unwrap_or_else(time::OffsetDateTime::now_utc);
let db_dir = out_root.join(".tmp").join(format!("{rir}-live-base-db"));
let replay_db_dir = out_root.join(".tmp").join(format!("{rir}-self-replay-db"));
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
if let Some(parent) = db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create tmp dir failed: {}: {e}", parent.display()))?;
}
let store = RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed: {e}"))?;
let http = RecordingHttpFetcher::new(
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs),
..HttpFetcherConfig::default()
})
.map_err(|e| format!("create http fetcher failed: {e}"))?,
);
let rsync = RecordingRsyncFetcher::new(SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
}));
let started = Instant::now();
let out = run_tree_from_tal_and_ta_der_serial_audit(
&store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&http,
&rsync,
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("live base run failed: {e}"))?;
let duration = started.elapsed();
let ccr = build_ccr_from_run(
&store,
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
&out.tree.router_keys,
validation_time,
)
.map_err(|e| format!("build ccr failed: {e}"))?;
let base_ccr_path = rir_dir.join("base.ccr");
write_ccr_file(&base_ccr_path, &ccr).map_err(|e| format!("write ccr failed: {e}"))?;
let ccr_bytes =
fs::read(&base_ccr_path).map_err(|e| format!("read written ccr failed: {}: {e}", base_ccr_path.display()))?;
let decoded = rpki::ccr::decode_content_info(&ccr_bytes)
.map_err(|e| format!("decode written ccr failed: {e}"))?;
let verify = verify_content_info(&decoded).map_err(|e| format!("verify ccr failed: {e}"))?;
let vrp_rows = build_vrp_compare_rows(&out.tree.vrps, &trust_anchor);
let vap_rows = build_vap_compare_rows(&out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = rpki::bundle::decode_ccr_compare_views(&decoded, &trust_anchor)?;
if vrp_rows != ccr_vrps {
return Err("base-vrps compare view does not match base.ccr".to_string());
}
if vap_rows != ccr_vaps {
return Err("base-vaps compare view does not match base.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("base-vrps.csv"), &vrp_rows)?;
write_vap_csv(&rir_dir.join("base-vaps.csv"), &vap_rows)?;
fs::write(rir_dir.join("tal.tal"), &tal_bytes).map_err(|e| format!("write tal failed: {e}"))?;
fs::write(rir_dir.join("ta.cer"), &ta_bytes).map_err(|e| format!("write ta failed: {e}"))?;
let capture = write_live_base_replay_bundle_inputs(
&rir_dir,
&rir_normalized,
validation_time,
&out.publication_points,
&store,
&http.snapshot_responses(),
&rsync.snapshot_fetches(),
)?;
let replay_store =
RocksStore::open(&replay_db_dir).map_err(|e| format!("open self replay rocksdb failed: {e}"))?;
let replay_out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&replay_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("self replay failed: {e}"))?;
let replay_vrps = build_vrp_compare_rows(&replay_out.tree.vrps, &trust_anchor);
let replay_vaps = build_vap_compare_rows(&replay_out.tree.aspas, &trust_anchor);
if replay_vrps != vrp_rows {
return Err("self replay VRP compare view mismatch".to_string());
}
if replay_vaps != vap_rows {
return Err("self replay VAP compare view mismatch".to_string());
}
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_timing_json(
&rir_dir.join("timings").join("base-produce.json"),
"base",
&validation_time,
duration,
)?;
let metadata = RirBundleMetadata {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
rir: rir_normalized.clone(),
base_validation_time: validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
delta_validation_time: None,
tal_sha256: sha256_hex(&tal_bytes),
ta_cert_sha256: sha256_hex(&ta_bytes),
base_ccr_sha256: sha256_hex(&ccr_bytes),
delta_ccr_sha256: None,
has_aspa: !vap_rows.is_empty(),
has_router_key: verify.router_key_count > 0,
base_vrp_count: vrp_rows.len(),
base_vap_count: vap_rows.len(),
delta_vrp_count: None,
delta_vap_count: None,
};
write_json(&rir_dir.join("bundle.json"), &metadata)?;
write_json(
&rir_dir.join("verification.json"),
&serde_json::json!({
"base": {
"validationTime": metadata.base_validation_time,
"ccr": {
"path": "base.ccr",
"sha256": metadata.base_ccr_sha256,
"stateHashesOk": verify.state_hashes_ok,
"manifestInstances": verify.manifest_instances,
"roaVrpCount": verify.roa_vrp_count,
"aspaPayloadSets": verify.aspa_payload_sets,
"routerKeyCount": verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"baseVrpCount": metadata.base_vrp_count,
"baseVapCount": metadata.base_vap_count,
},
"capture": {
"captureId": capture.capture_id,
"rrdpRepoCount": capture.rrdp_repo_count,
"rsyncModuleCount": capture.rsync_module_count,
"selfReplayOk": true,
}
}
}),
)?;
write_top_readme(&out_root.join("README.md"), &rir_normalized)?;
write_rir_readme(&rir_dir.join("README.md"), &rir_normalized, &metadata.base_validation_time)?;
write_json(
&out_root.join("bundle-manifest.json"),
&BundleManifest {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
recorded_at_rfc3339_utc: time::OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at failed: {e}"))?,
rirs: vec![rir_normalized.clone()],
per_rir_bundles: vec![BundleManifestEntry {
rir: rir_normalized.clone(),
relative_path: rir_normalized,
base_validation_time: metadata.base_validation_time.clone(),
delta_validation_time: None,
has_aspa: metadata.has_aspa,
}],
},
)?;
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
Ok(out_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_capture".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--out-dir".to_string(),
"out".to_string(),
"--tal-path".to_string(),
"tal".to_string(),
"--ta-path".to_string(),
"ta".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.out_dir.as_deref(), Some(Path::new("out")));
assert_eq!(args.http_timeout_secs, 20);
assert_eq!(args.rsync_timeout_secs, 60);
}
#[test]
fn parse_args_rejects_missing_requireds() {
let err = parse_args(&["replay_bundle_capture".to_string()]).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
#[test]
fn write_timing_json_writes_duration_and_mode() {
let td = tempdir().expect("tempdir");
let path = td.path().join("timings/base-produce.json");
write_timing_json(
&path,
"base",
&time::OffsetDateTime::parse("2026-03-30T00:00:00Z", &Rfc3339).expect("time"),
std::time::Duration::from_millis(1500),
)
.expect("write timing");
let json: serde_json::Value =
serde_json::from_slice(&std::fs::read(&path).expect("read timing")).expect("parse");
assert_eq!(json["mode"], "base");
assert_eq!(json["durationSeconds"], 1.5);
}
}

View File

@ -0,0 +1,531 @@
use rpki::bundle::{
BundleManifest, BundleManifestEntry, RecordingHttpFetcher,
RecordingRsyncFetcher, build_vap_compare_rows, build_vrp_compare_rows,
write_live_delta_replay_bundle_inputs, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, decode_content_info, verify_content_info, write_ccr_file};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::sync::rrdp::Fetcher;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use sha2::Digest;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
base_bundle_dir: Option<PathBuf>,
out_dir: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
http_timeout_secs: u64,
rsync_timeout_secs: u64,
rsync_mirror_root: Option<PathBuf>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_capture_delta --rir <name> --base-bundle-dir <path> --out-dir <path> [--validation-time <rfc3339>] [--http-timeout-secs <n>] [--rsync-timeout-secs <n>] [--rsync-mirror-root <path>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
http_timeout_secs: 20,
rsync_timeout_secs: 60,
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--base-bundle-dir" => {
i += 1;
args.base_bundle_dir =
Some(PathBuf::from(argv.get(i).ok_or("--base-bundle-dir requires a value")?));
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(argv.get(i).ok_or("--out-dir requires a value")?));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--http-timeout-secs" => {
i += 1;
args.http_timeout_secs = argv
.get(i)
.ok_or("--http-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --http-timeout-secs: {e}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
args.rsync_timeout_secs = argv
.get(i)
.ok_or("--rsync-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --rsync-timeout-secs: {e}"))?;
}
"--rsync-mirror-root" => {
i += 1;
args.rsync_mirror_root =
Some(PathBuf::from(argv.get(i).ok_or("--rsync-mirror-root requires a value")?));
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(argv.get(i).ok_or("--trust-anchor requires a value")?.clone());
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.base_bundle_dir.is_none() {
return Err(format!("--base-bundle-dir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
Ok(args)
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(sha2::Sha256::digest(bytes))
}
fn write_json(path: &Path, value: &impl serde::Serialize) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(value).map_err(|e| e.to_string())?;
fs::write(path, bytes).map_err(|e| format!("write json failed: {}: {e}", path.display()))
}
fn copy_dir_all(src: &Path, dst: &Path) -> Result<(), String> {
fs::create_dir_all(dst)
.map_err(|e| format!("create directory failed: {}: {e}", dst.display()))?;
for entry in fs::read_dir(src).map_err(|e| format!("read_dir failed: {}: {e}", src.display()))? {
let entry = entry.map_err(|e| format!("read_dir entry failed: {}: {e}", src.display()))?;
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", entry.path().display()))?;
let to = dst.join(entry.file_name());
if ty.is_dir() {
copy_dir_all(&entry.path(), &to)?;
} else if ty.is_file() {
if let Some(parent) = to.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::copy(entry.path(), &to)
.map_err(|e| format!("copy failed: {} -> {}: {e}", entry.path().display(), to.display()))?;
}
}
Ok(())
}
fn load_validation_time(path: &Path) -> Result<time::OffsetDateTime, String> {
let json: serde_json::Value = serde_json::from_slice(
&fs::read(path).map_err(|e| format!("read json failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse json failed: {}: {e}", path.display()))?;
let value = json
.get("validationTime")
.or_else(|| json.get("validation_time"))
.and_then(|v| v.as_str())
.ok_or_else(|| format!("validationTime missing in {}", path.display()))?;
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid validationTime in {}: {e}", path.display()))
}
fn ensure_recorded_target_snapshots(
store: &RocksStore,
base_bundle_dir: &Path,
http: &RecordingHttpFetcher<BlockingHttpFetcher>,
) -> Result<(), String> {
let base_locks: serde_json::Value = serde_json::from_slice(
&fs::read(base_bundle_dir.join("base-locks.json"))
.map_err(|e| format!("read base locks failed: {e}"))?,
)
.map_err(|e| format!("parse base locks failed: {e}"))?;
let base_rrdp = base_locks
.get("rrdp")
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
for (notify_uri, base_lock) in base_rrdp {
let Some(base_transport) = base_lock.get("transport").and_then(|v| v.as_str()) else {
continue;
};
if base_transport != "rrdp" {
continue;
}
let Some(base_session) = base_lock.get("session").and_then(|v| v.as_str()) else {
continue;
};
let Some(base_serial) = base_lock.get("serial").and_then(|v| v.as_u64()) else {
continue;
};
let Some(record) = store
.get_rrdp_source_record(&notify_uri)
.map_err(|e| format!("read rrdp source record failed for {notify_uri}: {e}"))?
else {
continue;
};
let Some(target_session) = record.last_session_id.as_deref() else {
continue;
};
let Some(target_serial) = record.last_serial else {
continue;
};
if target_session != base_session || target_serial <= base_serial {
continue;
}
let Some(snapshot_uri) = record.last_snapshot_uri.as_deref() else {
continue;
};
if http.snapshot_responses().contains_key(snapshot_uri) {
continue;
}
let _ = http
.fetch(snapshot_uri)
.map_err(|e| format!("fetch target snapshot for {notify_uri} failed: {e}"))?;
}
Ok(())
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let out_root = args.out_dir.as_ref().unwrap();
let base_root = args.base_bundle_dir.as_ref().unwrap();
let base_rir_dir = base_root.join(&rir_normalized);
if !base_rir_dir.is_dir() {
return Err(format!("base bundle rir dir not found: {}", base_rir_dir.display()));
}
if out_root.exists() {
fs::remove_dir_all(out_root)
.map_err(|e| format!("remove old out dir failed: {}: {e}", out_root.display()))?;
}
copy_dir_all(base_root, out_root)?;
let rir_dir = out_root.join(&rir_normalized);
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let tal_bytes =
fs::read(rir_dir.join("tal.tal")).map_err(|e| format!("read tal from base bundle failed: {e}"))?;
let ta_bytes =
fs::read(rir_dir.join("ta.cer")).map_err(|e| format!("read ta from base bundle failed: {e}"))?;
let base_validation_time = load_validation_time(&rir_dir.join("base-locks.json"))?;
let target_validation_time = args.validation_time.unwrap_or_else(time::OffsetDateTime::now_utc);
let target_store_dir = out_root.join(".tmp").join(format!("{rir}-live-target-db"));
let self_replay_dir = out_root.join(".tmp").join(format!("{rir}-self-delta-db"));
let _ = fs::remove_dir_all(&target_store_dir);
let _ = fs::remove_dir_all(&self_replay_dir);
if let Some(parent) = target_store_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create tmp dir failed: {}: {e}", parent.display()))?;
}
let target_store =
RocksStore::open(&target_store_dir).map_err(|e| format!("open target rocksdb failed: {e}"))?;
let _base = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&target_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
base_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("base bootstrap replay failed: {e}"))?;
let http = RecordingHttpFetcher::new(
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs),
..HttpFetcherConfig::default()
})
.map_err(|e| format!("create http fetcher failed: {e}"))?,
);
let rsync = RecordingRsyncFetcher::new(SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
}));
let started = Instant::now();
let target_out = run_tree_from_tal_and_ta_der_serial_audit(
&target_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&http,
&rsync,
target_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("live target run failed: {e}"))?;
let duration = started.elapsed();
ensure_recorded_target_snapshots(&target_store, &rir_dir, &http)?;
let delta_ccr = build_ccr_from_run(
&target_store,
&[target_out.discovery.trust_anchor.clone()],
&target_out.tree.vrps,
&target_out.tree.aspas,
&target_out.tree.router_keys,
target_validation_time,
)
.map_err(|e| format!("build delta ccr failed: {e}"))?;
let delta_ccr_path = rir_dir.join("delta.ccr");
write_ccr_file(&delta_ccr_path, &delta_ccr).map_err(|e| format!("write delta ccr failed: {e}"))?;
let delta_ccr_bytes =
fs::read(&delta_ccr_path).map_err(|e| format!("read delta ccr failed: {}: {e}", delta_ccr_path.display()))?;
let delta_decoded =
decode_content_info(&delta_ccr_bytes).map_err(|e| format!("decode delta ccr failed: {e}"))?;
let delta_verify =
verify_content_info(&delta_decoded).map_err(|e| format!("verify delta ccr failed: {e}"))?;
let delta_vrp_rows = build_vrp_compare_rows(&target_out.tree.vrps, &trust_anchor);
let delta_vap_rows = build_vap_compare_rows(&target_out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = rpki::bundle::decode_ccr_compare_views(&delta_decoded, &trust_anchor)?;
if delta_vrp_rows != ccr_vrps {
return Err("record-delta.csv compare view does not match delta.ccr".to_string());
}
if delta_vap_rows != ccr_vaps {
return Err("record-delta-vaps.csv compare view does not match delta.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("record-delta.csv"), &delta_vrp_rows)?;
write_vap_csv(&rir_dir.join("record-delta-vaps.csv"), &delta_vap_rows)?;
let capture = write_live_delta_replay_bundle_inputs(
&rir_dir,
&rir_normalized,
target_validation_time,
&target_out.publication_points,
&target_store,
&http.snapshot_responses(),
&rsync.snapshot_fetches(),
)?;
let self_store =
RocksStore::open(&self_replay_dir).map_err(|e| format!("open self replay db failed: {e}"))?;
let replay_out = run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
&self_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
&rir_dir.join("payload-delta-archive"),
&rir_dir.join("locks-delta.json"),
base_validation_time,
target_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("self delta replay failed: {e}"))?;
let replay_vrps = build_vrp_compare_rows(&replay_out.tree.vrps, &trust_anchor);
let replay_vaps = build_vap_compare_rows(&replay_out.tree.aspas, &trust_anchor);
if replay_vrps != delta_vrp_rows {
return Err("self delta replay VRP compare view mismatch".to_string());
}
if replay_vaps != delta_vap_rows {
return Err("self delta replay VAP compare view mismatch".to_string());
}
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_json(
&rir_dir.join("timings").join("delta-produce.json"),
&serde_json::json!({
"mode": "delta",
"validationTime": target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)?;
let mut bundle_json: serde_json::Value = serde_json::from_slice(
&fs::read(rir_dir.join("bundle.json")).map_err(|e| format!("read base bundle.json failed: {e}"))?,
)
.map_err(|e| format!("parse base bundle.json failed: {e}"))?;
bundle_json["deltaValidationTime"] = serde_json::Value::String(
target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
);
bundle_json["deltaCcrSha256"] = serde_json::Value::String(sha256_hex(&delta_ccr_bytes));
bundle_json["deltaVrpCount"] = serde_json::Value::from(delta_vrp_rows.len() as u64);
bundle_json["deltaVapCount"] = serde_json::Value::from(delta_vap_rows.len() as u64);
bundle_json["hasAspa"] = serde_json::Value::Bool(
bundle_json
.get("hasAspa")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| !delta_vap_rows.is_empty(),
);
bundle_json["hasRouterKey"] = serde_json::Value::Bool(
bundle_json
.get("hasRouterKey")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| delta_verify.router_key_count > 0,
);
write_json(&rir_dir.join("bundle.json"), &bundle_json)?;
let mut verification_json: serde_json::Value = serde_json::from_slice(
&fs::read(rir_dir.join("verification.json"))
.map_err(|e| format!("read base verification.json failed: {e}"))?,
)
.map_err(|e| format!("parse base verification.json failed: {e}"))?;
verification_json["delta"] = serde_json::json!({
"validationTime": target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
"ccr": {
"path": "delta.ccr",
"sha256": sha256_hex(&delta_ccr_bytes),
"stateHashesOk": delta_verify.state_hashes_ok,
"manifestInstances": delta_verify.manifest_instances,
"roaVrpCount": delta_verify.roa_vrp_count,
"aspaPayloadSets": delta_verify.aspa_payload_sets,
"routerKeyCount": delta_verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"deltaVrpCount": delta_vrp_rows.len(),
"deltaVapCount": delta_vap_rows.len(),
},
"capture": {
"captureId": capture.capture_id,
"rrdpRepoCount": capture.rrdp_repo_count,
"rsyncModuleCount": capture.rsync_module_count,
"selfReplayOk": true,
}
});
write_json(&rir_dir.join("verification.json"), &verification_json)?;
let bundle_manifest = BundleManifest {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
recorded_at_rfc3339_utc: time::OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at failed: {e}"))?,
rirs: vec![rir_normalized.clone()],
per_rir_bundles: vec![BundleManifestEntry {
rir: rir_normalized.clone(),
relative_path: rir_normalized,
base_validation_time: base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format base validation time failed: {e}"))?,
delta_validation_time: Some(
target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
),
has_aspa: bundle_json["hasAspa"].as_bool().unwrap_or(false),
}],
};
write_json(&out_root.join("bundle-manifest.json"), &bundle_manifest)?;
let _ = fs::remove_dir_all(&target_store_dir);
let _ = fs::remove_dir_all(&self_replay_dir);
Ok(out_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_capture_delta".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--base-bundle-dir".to_string(),
"base".to_string(),
"--out-dir".to_string(),
"out".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.base_bundle_dir.as_deref(), Some(Path::new("base")));
assert_eq!(args.out_dir.as_deref(), Some(Path::new("out")));
}
#[test]
fn parse_args_rejects_missing_requireds() {
let err = parse_args(&["replay_bundle_capture_delta".to_string()]).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
}

View File

@ -0,0 +1,746 @@
use rpki::bundle::{
BundleManifest, BundleManifestEntry, RirBundleMetadata, build_vap_compare_rows,
build_vrp_compare_rows, decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, decode_content_info, verify_content_info, write_ccr_file};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use sha2::Digest;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
out_dir: Option<PathBuf>,
tal_path: Option<PathBuf>,
ta_path: Option<PathBuf>,
payload_replay_archive: Option<PathBuf>,
payload_replay_locks: Option<PathBuf>,
payload_delta_archive: Option<PathBuf>,
payload_delta_locks: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_record --rir <name> --out-dir <path> --tal-path <path> --ta-path <path> --payload-replay-archive <path> --payload-replay-locks <path> [--payload-delta-archive <path> --payload-delta-locks <path>] [--validation-time <rfc3339>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args::default();
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(argv.get(i).ok_or("--out-dir requires a value")?));
}
"--tal-path" => {
i += 1;
args.tal_path = Some(PathBuf::from(argv.get(i).ok_or("--tal-path requires a value")?));
}
"--ta-path" => {
i += 1;
args.ta_path = Some(PathBuf::from(argv.get(i).ok_or("--ta-path requires a value")?));
}
"--payload-replay-archive" => {
i += 1;
args.payload_replay_archive =
Some(PathBuf::from(argv.get(i).ok_or("--payload-replay-archive requires a value")?));
}
"--payload-replay-locks" => {
i += 1;
args.payload_replay_locks =
Some(PathBuf::from(argv.get(i).ok_or("--payload-replay-locks requires a value")?));
}
"--payload-delta-archive" => {
i += 1;
args.payload_delta_archive =
Some(PathBuf::from(argv.get(i).ok_or("--payload-delta-archive requires a value")?));
}
"--payload-delta-locks" => {
i += 1;
args.payload_delta_locks =
Some(PathBuf::from(argv.get(i).ok_or("--payload-delta-locks requires a value")?));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(argv.get(i).ok_or("--trust-anchor requires a value")?.clone());
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
if args.tal_path.is_none() {
return Err(format!("--tal-path is required\n{}", usage()));
}
if args.ta_path.is_none() {
return Err(format!("--ta-path is required\n{}", usage()));
}
if args.payload_replay_archive.is_none() {
return Err(format!("--payload-replay-archive is required\n{}", usage()));
}
if args.payload_replay_locks.is_none() {
return Err(format!("--payload-replay-locks is required\n{}", usage()));
}
Ok(args)
}
fn load_validation_time(path: &Path) -> Result<time::OffsetDateTime, String> {
let json: serde_json::Value = serde_json::from_slice(
&fs::read(path).map_err(|e| format!("read locks failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse locks failed: {}: {e}", path.display()))?;
let value = json
.get("validationTime")
.or_else(|| json.get("validation_time"))
.and_then(|v| v.as_str())
.ok_or_else(|| format!("validationTime missing in {}", path.display()))?;
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid validationTime in {}: {e}", path.display()))
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(sha2::Sha256::digest(bytes))
}
fn copy_dir_all(src: &Path, dst: &Path) -> Result<(), String> {
fs::create_dir_all(dst)
.map_err(|e| format!("create directory failed: {}: {e}", dst.display()))?;
for entry in fs::read_dir(src).map_err(|e| format!("read_dir failed: {}: {e}", src.display()))? {
let entry = entry.map_err(|e| format!("read_dir entry failed: {}: {e}", src.display()))?;
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", entry.path().display()))?;
let to = dst.join(entry.file_name());
if ty.is_dir() {
copy_dir_all(&entry.path(), &to)?;
} else if ty.is_file() {
if let Some(parent) = to.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::copy(entry.path(), &to)
.map_err(|e| format!("copy failed: {} -> {}: {e}", entry.path().display(), to.display()))?;
}
}
Ok(())
}
fn write_json(path: &Path, value: &impl serde::Serialize) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(value).map_err(|e| e.to_string())?;
fs::write(path, bytes).map_err(|e| format!("write json failed: {}: {e}", path.display()))
}
fn write_top_readme(path: &Path, rir: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# Ours Replay Bundle\n\nThis run contains one per-RIR bundle generated by `ours`.\n\n- RIR: `{rir}`\n- Reference result format: `CCR`\n"
),
)
.map_err(|e| format!("write readme failed: {}: {e}", path.display()))
}
fn write_rir_readme(path: &Path, rir: &str, base_validation_time: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# {rir} replay bundle\n\n- `tal.tal` and `ta.cer` are the direct replay inputs.\n- `base-locks.json.validationTime` = `{base_validation_time}`.\n- `base.ccr` is the authoritative reference result.\n- `base-vrps.csv` and `base-vaps.csv` are compare views derived from `base.ccr`.\n"
),
)
.map_err(|e| format!("write rir readme failed: {}: {e}", path.display()))
}
fn write_timing_json(
path: &Path,
mode: &str,
validation_time: &time::OffsetDateTime,
duration: std::time::Duration,
) -> Result<(), String> {
write_json(
path,
&serde_json::json!({
"mode": mode,
"validationTime": validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)
}
fn rewrite_delta_base_locks_sha(delta_root: &Path, emitted_base_locks_sha256: &str) -> Result<(), String> {
let delta_locks = delta_root.join("locks-delta.json");
if delta_locks.is_file() {
let mut json: serde_json::Value = serde_json::from_slice(
&fs::read(&delta_locks)
.map_err(|e| format!("read delta locks failed: {}: {e}", delta_locks.display()))?,
)
.map_err(|e| format!("parse delta locks failed: {}: {e}", delta_locks.display()))?;
json.as_object_mut()
.ok_or_else(|| format!("delta locks must be object: {}", delta_locks.display()))?
.insert(
"baseLocksSha256".to_string(),
serde_json::Value::String(emitted_base_locks_sha256.to_string()),
);
write_json(&delta_locks, &json)?;
}
let archive_root = delta_root.join("payload-delta-archive");
if archive_root.is_dir() {
for path in walk_json_files_named(&archive_root, "base.json")? {
let mut json: serde_json::Value = serde_json::from_slice(
&fs::read(&path).map_err(|e| format!("read base.json failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse base.json failed: {}: {e}", path.display()))?;
json.as_object_mut()
.ok_or_else(|| format!("base.json must be object: {}", path.display()))?
.insert(
"baseLocksSha256".to_string(),
serde_json::Value::String(emitted_base_locks_sha256.to_string()),
);
write_json(&path, &json)?;
}
}
Ok(())
}
fn walk_json_files_named(root: &Path, name: &str) -> Result<Vec<PathBuf>, String> {
let mut out = Vec::new();
if !root.is_dir() {
return Ok(out);
}
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
for entry in fs::read_dir(&dir).map_err(|e| format!("read_dir failed: {}: {e}", dir.display()))? {
let entry = entry.map_err(|e| format!("read_dir entry failed: {}: {e}", dir.display()))?;
let path = entry.path();
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", path.display()))?;
if ty.is_dir() {
stack.push(path);
} else if ty.is_file() && entry.file_name() == name {
out.push(path);
}
}
}
Ok(out)
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let out_root = args.out_dir.as_ref().unwrap();
let tal_path = args.tal_path.as_ref().unwrap();
let ta_path = args.ta_path.as_ref().unwrap();
let replay_archive = args.payload_replay_archive.as_ref().unwrap();
let replay_locks = args.payload_replay_locks.as_ref().unwrap();
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let base_validation_time = match args.validation_time {
Some(value) => value,
None => load_validation_time(replay_locks)?,
};
let delta_validation_time = match args.payload_delta_locks.as_ref() {
Some(path) => Some(load_validation_time(path)?),
None => None,
};
let run_root = out_root;
let rir_dir = run_root.join(&rir_normalized);
fs::create_dir_all(&rir_dir)
.map_err(|e| format!("create rir dir failed: {}: {e}", rir_dir.display()))?;
let tal_bytes = fs::read(tal_path).map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_bytes = fs::read(ta_path).map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
let db_dir = run_root.join(".tmp").join(format!("{rir}-base-db"));
if db_dir.exists() {
fs::remove_dir_all(&db_dir)
.map_err(|e| format!("remove old db failed: {}: {e}", db_dir.display()))?;
}
if let Some(parent) = db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create db parent failed: {}: {e}", parent.display()))?;
}
let store = RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed: {e}"))?;
let base_started = Instant::now();
let out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
replay_archive,
replay_locks,
base_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("base replay failed: {e}"))?;
let base_duration = base_started.elapsed();
let ccr = build_ccr_from_run(
&store,
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
&out.tree.router_keys,
base_validation_time,
)
.map_err(|e| format!("build ccr failed: {e}"))?;
let base_ccr_path = rir_dir.join("base.ccr");
write_ccr_file(&base_ccr_path, &ccr).map_err(|e| format!("write ccr failed: {e}"))?;
let ccr_bytes = fs::read(&base_ccr_path)
.map_err(|e| format!("read written ccr failed: {}: {e}", base_ccr_path.display()))?;
let decoded = decode_content_info(&ccr_bytes).map_err(|e| format!("decode written ccr failed: {e}"))?;
let verify = verify_content_info(&decoded).map_err(|e| format!("verify ccr failed: {e}"))?;
let vrp_rows = build_vrp_compare_rows(&out.tree.vrps, &trust_anchor);
let vap_rows = build_vap_compare_rows(&out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = decode_ccr_compare_views(&decoded, &trust_anchor)?;
if vrp_rows != ccr_vrps {
return Err("base-vrps compare view does not match base.ccr".to_string());
}
if vap_rows != ccr_vaps {
return Err("base-vaps compare view does not match base.ccr".to_string());
}
let base_vrps_csv = rir_dir.join("base-vrps.csv");
let base_vaps_csv = rir_dir.join("base-vaps.csv");
write_vrp_csv(&base_vrps_csv, &vrp_rows)?;
write_vap_csv(&base_vaps_csv, &vap_rows)?;
copy_dir_all(replay_archive, &rir_dir.join("base-payload-archive"))?;
let mut base_locks_json: serde_json::Value = serde_json::from_slice(
&fs::read(replay_locks)
.map_err(|e| format!("read base locks failed: {}: {e}", replay_locks.display()))?,
)
.map_err(|e| format!("parse base locks failed: {}: {e}", replay_locks.display()))?;
base_locks_json["validationTime"] = serde_json::Value::String(
base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
);
let emitted_base_locks_path = rir_dir.join("base-locks.json");
write_json(&emitted_base_locks_path, &base_locks_json)?;
let emitted_base_locks_sha256 = sha256_hex(
&fs::read(&emitted_base_locks_path)
.map_err(|e| format!("read emitted base locks failed: {}: {e}", emitted_base_locks_path.display()))?,
);
if let Some(delta_archive) = args.payload_delta_archive.as_ref() {
copy_dir_all(delta_archive, &rir_dir.join("payload-delta-archive"))?;
}
if let Some(delta_locks) = args.payload_delta_locks.as_ref() {
let mut delta_json: serde_json::Value = serde_json::from_slice(
&fs::read(delta_locks)
.map_err(|e| format!("read delta locks failed: {}: {e}", delta_locks.display()))?,
)
.map_err(|e| format!("parse delta locks failed: {}: {e}", delta_locks.display()))?;
if let Some(delta_time) = delta_validation_time.as_ref() {
delta_json
.as_object_mut()
.ok_or_else(|| "delta locks json must be an object".to_string())?
.insert(
"validationTime".to_string(),
serde_json::Value::String(
delta_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
),
);
}
write_json(&rir_dir.join("locks-delta.json"), &delta_json)?;
}
if args.payload_delta_archive.is_some() && args.payload_delta_locks.is_some() {
rewrite_delta_base_locks_sha(&rir_dir, &emitted_base_locks_sha256)?;
}
fs::write(rir_dir.join("tal.tal"), &tal_bytes)
.map_err(|e| format!("write tal failed: {e}"))?;
fs::write(rir_dir.join("ta.cer"), &ta_bytes)
.map_err(|e| format!("write ta failed: {e}"))?;
let mut metadata = RirBundleMetadata {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
rir: rir_normalized.clone(),
base_validation_time: base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
delta_validation_time: delta_validation_time.as_ref().map(|value| {
value
.format(&Rfc3339)
.expect("delta validation time must format")
}),
tal_sha256: sha256_hex(&tal_bytes),
ta_cert_sha256: sha256_hex(&ta_bytes),
base_ccr_sha256: sha256_hex(&ccr_bytes),
delta_ccr_sha256: None,
has_aspa: !vap_rows.is_empty(),
has_router_key: verify.router_key_count > 0,
base_vrp_count: vrp_rows.len(),
base_vap_count: vap_rows.len(),
delta_vrp_count: None,
delta_vap_count: None,
};
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_timing_json(
&rir_dir.join("timings").join("base-produce.json"),
"base",
&base_validation_time,
base_duration,
)?;
let mut verification = serde_json::json!({
"base": {
"validationTime": metadata.base_validation_time,
"ccr": {
"path": "base.ccr",
"sha256": metadata.base_ccr_sha256,
"stateHashesOk": verify.state_hashes_ok,
"manifestInstances": verify.manifest_instances,
"roaVrpCount": verify.roa_vrp_count,
"aspaPayloadSets": verify.aspa_payload_sets,
"routerKeyCount": verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"baseVrpCount": metadata.base_vrp_count,
"baseVapCount": metadata.base_vap_count,
}
}
});
if let (Some(delta_archive), Some(delta_locks), Some(delta_time)) = (
args.payload_delta_archive.as_ref(),
args.payload_delta_locks.as_ref(),
delta_validation_time.as_ref(),
) {
let delta_db_dir = run_root.join(".tmp").join(format!("{rir}-delta-db"));
if delta_db_dir.exists() {
fs::remove_dir_all(&delta_db_dir)
.map_err(|e| format!("remove old delta db failed: {}: {e}", delta_db_dir.display()))?;
}
if let Some(parent) = delta_db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create delta db parent failed: {}: {e}", parent.display()))?;
}
let delta_store =
RocksStore::open(&delta_db_dir).map_err(|e| format!("open delta rocksdb failed: {e}"))?;
let delta_started = Instant::now();
let delta_out = run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
&delta_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
replay_archive,
replay_locks,
delta_archive,
delta_locks,
base_validation_time,
*delta_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
},
)
.map_err(|e| format!("delta replay failed: {e}"))?;
let delta_duration = delta_started.elapsed();
let delta_ccr = build_ccr_from_run(
&delta_store,
&[delta_out.discovery.trust_anchor.clone()],
&delta_out.tree.vrps,
&delta_out.tree.aspas,
&delta_out.tree.router_keys,
*delta_time,
)
.map_err(|e| format!("build delta ccr failed: {e}"))?;
let delta_ccr_path = rir_dir.join("delta.ccr");
write_ccr_file(&delta_ccr_path, &delta_ccr)
.map_err(|e| format!("write delta ccr failed: {e}"))?;
let delta_ccr_bytes = fs::read(&delta_ccr_path)
.map_err(|e| format!("read written delta ccr failed: {}: {e}", delta_ccr_path.display()))?;
let delta_decoded = decode_content_info(&delta_ccr_bytes)
.map_err(|e| format!("decode written delta ccr failed: {e}"))?;
let delta_verify =
verify_content_info(&delta_decoded).map_err(|e| format!("verify delta ccr failed: {e}"))?;
let delta_vrp_rows = build_vrp_compare_rows(&delta_out.tree.vrps, &trust_anchor);
let delta_vap_rows = build_vap_compare_rows(&delta_out.tree.aspas, &trust_anchor);
let (delta_ccr_vrps, delta_ccr_vaps) = decode_ccr_compare_views(&delta_decoded, &trust_anchor)?;
if delta_vrp_rows != delta_ccr_vrps {
return Err("record-delta.csv compare view does not match delta.ccr".to_string());
}
if delta_vap_rows != delta_ccr_vaps {
return Err("record-delta-vaps.csv compare view does not match delta.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("record-delta.csv"), &delta_vrp_rows)?;
write_vap_csv(&rir_dir.join("record-delta-vaps.csv"), &delta_vap_rows)?;
write_timing_json(
&rir_dir.join("timings").join("delta-produce.json"),
"delta",
delta_time,
delta_duration,
)?;
metadata.delta_ccr_sha256 = Some(sha256_hex(&delta_ccr_bytes));
metadata.delta_vrp_count = Some(delta_vrp_rows.len());
metadata.delta_vap_count = Some(delta_vap_rows.len());
metadata.has_aspa = metadata.has_aspa || !delta_vap_rows.is_empty();
metadata.has_router_key = metadata.has_router_key || delta_verify.router_key_count > 0;
verification["delta"] = serde_json::json!({
"validationTime": delta_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
"ccr": {
"path": "delta.ccr",
"sha256": metadata.delta_ccr_sha256.clone().expect("delta sha must exist"),
"stateHashesOk": delta_verify.state_hashes_ok,
"manifestInstances": delta_verify.manifest_instances,
"roaVrpCount": delta_verify.roa_vrp_count,
"aspaPayloadSets": delta_verify.aspa_payload_sets,
"routerKeyCount": delta_verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"deltaVrpCount": metadata.delta_vrp_count,
"deltaVapCount": metadata.delta_vap_count,
}
});
let _ = fs::remove_dir_all(&delta_db_dir);
}
write_json(&rir_dir.join("bundle.json"), &metadata)?;
write_json(&rir_dir.join("verification.json"), &verification)?;
write_top_readme(&run_root.join("README.md"), rir)?;
write_rir_readme(&rir_dir.join("README.md"), rir, &metadata.base_validation_time)?;
let bundle_manifest = BundleManifest {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
recorded_at_rfc3339_utc: time::OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at failed: {e}"))?,
rirs: vec![rir_normalized.clone()],
per_rir_bundles: vec![BundleManifestEntry {
rir: rir_normalized.clone(),
relative_path: rir_normalized,
base_validation_time: metadata.base_validation_time.clone(),
delta_validation_time: metadata.delta_validation_time.clone(),
has_aspa: metadata.has_aspa,
}],
};
write_json(&run_root.join("bundle-manifest.json"), &bundle_manifest)?;
let _ = fs::remove_dir_all(&db_dir);
Ok(run_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_record".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--out-dir".to_string(),
"out".to_string(),
"--tal-path".to_string(),
"tal".to_string(),
"--ta-path".to_string(),
"ta".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.out_dir.as_deref(), Some(Path::new("out")));
}
#[test]
fn parse_args_rejects_missing_requireds() {
let argv = vec!["replay_bundle_record".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
#[test]
fn load_validation_time_reads_top_level_validation_time() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("locks.json");
std::fs::write(
&path,
r#"{"validationTime":"2026-03-16T11:49:15+08:00"}"#,
)
.expect("write locks");
let got = load_validation_time(&path).expect("load validation time");
assert_eq!(
got.format(&Rfc3339).expect("format"),
"2026-03-16T11:49:15+08:00"
);
}
#[test]
fn copy_dir_all_copies_nested_tree() {
let dir = tempdir().expect("tempdir");
let src = dir.path().join("src");
let dst = dir.path().join("dst");
std::fs::create_dir_all(src.join("sub")).expect("mkdir");
std::fs::write(src.join("a.txt"), b"a").expect("write a");
std::fs::write(src.join("sub").join("b.txt"), b"b").expect("write b");
copy_dir_all(&src, &dst).expect("copy dir");
assert_eq!(std::fs::read(dst.join("a.txt")).expect("read a"), b"a");
assert_eq!(
std::fs::read(dst.join("sub").join("b.txt")).expect("read b"),
b"b"
);
}
#[test]
fn run_base_bundle_record_smoke_root_only_apnic() {
let dir = tempdir().expect("tempdir");
let out_dir = dir.path().join("bundle");
let out = run(Args {
rir: Some("apnic".to_string()),
out_dir: Some(out_dir.clone()),
tal_path: Some(PathBuf::from("tests/fixtures/tal/apnic-rfc7730-https.tal")),
ta_path: Some(PathBuf::from("tests/fixtures/ta/apnic-ta.cer")),
payload_replay_archive: Some(PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/base-payload-archive",
)),
payload_replay_locks: Some(PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/base-locks.json",
)),
payload_delta_archive: Some(PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/payload-delta-archive",
)),
payload_delta_locks: Some(PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/locks-delta.json",
)),
validation_time: None,
max_depth: Some(0),
max_instances: Some(1),
trust_anchor: Some("apnic".to_string()),
})
.expect("run bundle record");
assert_eq!(out, out_dir);
assert!(out_dir.join("bundle-manifest.json").is_file());
assert!(out_dir.join("README.md").is_file());
assert!(out_dir.join("apnic").join("bundle.json").is_file());
assert!(out_dir.join("apnic").join("tal.tal").is_file());
assert!(out_dir.join("apnic").join("ta.cer").is_file());
assert!(out_dir.join("apnic").join("base-payload-archive").is_dir());
assert!(out_dir.join("apnic").join("base-locks.json").is_file());
assert!(out_dir.join("apnic").join("base.ccr").is_file());
assert!(out_dir.join("apnic").join("base-vrps.csv").is_file());
assert!(out_dir.join("apnic").join("base-vaps.csv").is_file());
assert!(out_dir.join("apnic").join("delta.ccr").is_file());
assert!(out_dir.join("apnic").join("record-delta.csv").is_file());
assert!(out_dir.join("apnic").join("record-delta-vaps.csv").is_file());
assert!(out_dir.join("apnic").join("verification.json").is_file());
let bundle_json: serde_json::Value = serde_json::from_slice(
&std::fs::read(out_dir.join("apnic").join("bundle.json")).expect("read bundle.json"),
)
.expect("parse bundle.json");
assert_eq!(bundle_json["bundleProducer"], "ours");
assert_eq!(bundle_json["rir"], "apnic");
assert!(bundle_json.get("baseVrpCount").is_some());
assert!(bundle_json.get("baseCcrSha256").is_some());
assert!(bundle_json.get("deltaVrpCount").is_some());
assert!(bundle_json.get("deltaCcrSha256").is_some());
let base_locks_bytes =
std::fs::read(out_dir.join("apnic").join("base-locks.json")).expect("read emitted base locks");
let expected_base_locks_sha = sha256_hex(&base_locks_bytes);
let delta_locks_json: serde_json::Value = serde_json::from_slice(
&std::fs::read(out_dir.join("apnic").join("locks-delta.json")).expect("read delta locks"),
)
.expect("parse delta locks");
assert_eq!(delta_locks_json["baseLocksSha256"], expected_base_locks_sha);
}
}

244
src/bundle/compare_view.rs Normal file
View File

@ -0,0 +1,244 @@
use std::collections::BTreeSet;
use std::io::Write;
use std::path::Path;
use crate::ccr::{CcrContentInfo, extract_vrp_rows};
use crate::validation::objects::{AspaAttestation, Vrp};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct VrpCompareRow {
pub asn: String,
pub ip_prefix: String,
pub max_length: String,
pub trust_anchor: String,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct VapCompareRow {
pub customer_asn: String,
pub providers: String,
pub trust_anchor: String,
}
fn normalize_asn(asn: u32) -> String {
format!("AS{asn}")
}
fn canonical_prefix(prefix: &crate::data_model::roa::IpPrefix) -> String {
let mut addr = prefix.addr_bytes().to_vec();
let total_bits = match prefix.afi {
crate::data_model::roa::RoaAfi::Ipv4 => 32usize,
crate::data_model::roa::RoaAfi::Ipv6 => 128usize,
};
let keep = usize::from(prefix.prefix_len);
for bit in keep..total_bits {
let byte = bit / 8;
let offset = 7 - (bit % 8);
addr[byte] &= !(1u8 << offset);
}
match prefix.afi {
crate::data_model::roa::RoaAfi::Ipv4 => {
let ipv4 = std::net::Ipv4Addr::new(addr[0], addr[1], addr[2], addr[3]);
format!("{ipv4}/{}", prefix.prefix_len)
}
crate::data_model::roa::RoaAfi::Ipv6 => {
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&addr[..16]);
let ipv6 = std::net::Ipv6Addr::from(bytes);
format!("{ipv6}/{}", prefix.prefix_len)
}
}
}
pub fn build_vrp_compare_rows(vrps: &[Vrp], trust_anchor: &str) -> BTreeSet<VrpCompareRow> {
vrps.iter()
.map(|vrp| VrpCompareRow {
asn: normalize_asn(vrp.asn),
ip_prefix: canonical_prefix(&vrp.prefix),
max_length: vrp.max_length.to_string(),
trust_anchor: trust_anchor.to_ascii_lowercase(),
})
.collect()
}
pub fn build_vap_compare_rows(
aspas: &[AspaAttestation],
trust_anchor: &str,
) -> BTreeSet<VapCompareRow> {
aspas.iter()
.map(|aspa| {
let mut providers = aspa
.provider_as_ids
.iter()
.copied()
.collect::<Vec<_>>();
providers.sort_unstable();
providers.dedup();
VapCompareRow {
customer_asn: normalize_asn(aspa.customer_as_id),
providers: providers
.into_iter()
.map(normalize_asn)
.collect::<Vec<_>>()
.join(";"),
trust_anchor: trust_anchor.to_ascii_lowercase(),
}
})
.collect()
}
pub fn decode_ccr_compare_views(
content_info: &CcrContentInfo,
trust_anchor: &str,
) -> Result<(BTreeSet<VrpCompareRow>, BTreeSet<VapCompareRow>), String> {
let vrps = extract_vrp_rows(content_info)
.map_err(|e| format!("extract vrp rows from ccr failed: {e}"))?
.into_iter()
.map(|(asn, prefix, max_length)| VrpCompareRow {
asn: normalize_asn(asn),
ip_prefix: prefix,
max_length: max_length.to_string(),
trust_anchor: trust_anchor.to_ascii_lowercase(),
})
.collect::<BTreeSet<_>>();
let vaps = content_info
.content
.vaps
.as_ref()
.map(|state| {
state
.aps
.iter()
.map(|vap| VapCompareRow {
customer_asn: normalize_asn(vap.customer_as_id),
providers: vap
.providers
.iter()
.copied()
.map(normalize_asn)
.collect::<Vec<_>>()
.join(";"),
trust_anchor: trust_anchor.to_ascii_lowercase(),
})
.collect::<BTreeSet<_>>()
})
.unwrap_or_default();
Ok((vrps, vaps))
}
pub fn write_vrp_csv(path: &Path, rows: &BTreeSet<VrpCompareRow>) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?;
}
let mut file = std::io::BufWriter::new(
std::fs::File::create(path)
.map_err(|e| format!("create file failed: {}: {e}", path.display()))?,
);
writeln!(file, "ASN,IP Prefix,Max Length,Trust Anchor").map_err(|e| e.to_string())?;
for row in rows {
writeln!(
file,
"{},{},{},{}",
row.asn, row.ip_prefix, row.max_length, row.trust_anchor
)
.map_err(|e| e.to_string())?;
}
Ok(())
}
pub fn write_vap_csv(path: &Path, rows: &BTreeSet<VapCompareRow>) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?;
}
let mut file = std::io::BufWriter::new(
std::fs::File::create(path)
.map_err(|e| format!("create file failed: {}: {e}", path.display()))?,
);
writeln!(file, "Customer ASN,Providers,Trust Anchor").map_err(|e| e.to_string())?;
for row in rows {
writeln!(
file,
"{},{},{}",
row.customer_asn, row.providers, row.trust_anchor
)
.map_err(|e| e.to_string())?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ccr::{CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation, build_aspa_payload_state, build_roa_payload_state};
use crate::data_model::roa::{IpPrefix, RoaAfi};
#[test]
fn build_vap_compare_rows_sorts_and_dedups_providers() {
let rows = build_vap_compare_rows(
&[AspaAttestation {
customer_as_id: 64496,
provider_as_ids: vec![64498, 64497, 64498],
}],
"APNIC",
);
let row = rows.iter().next().expect("one row");
assert_eq!(row.customer_asn, "AS64496");
assert_eq!(row.providers, "AS64497;AS64498");
assert_eq!(row.trust_anchor, "apnic");
}
#[test]
fn decode_ccr_compare_views_extracts_vrps_and_vaps() {
let vrps = build_roa_payload_state(&[Vrp {
asn: 64496,
prefix: IpPrefix {
afi: RoaAfi::Ipv4,
prefix_len: 24,
addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 24,
}])
.expect("build vrps");
let vaps = build_aspa_payload_state(&[AspaAttestation {
customer_as_id: 64496,
provider_as_ids: vec![64497],
}])
.expect("build vaps");
let content = CcrContentInfo::new(RpkiCanonicalCacheRepresentation {
version: 0,
hash_alg: CcrDigestAlgorithm::Sha256,
produced_at: time::OffsetDateTime::now_utc(),
mfts: None,
vrps: Some(vrps),
vaps: Some(vaps),
tas: None,
rks: None,
});
let (vrp_rows, vap_rows) = decode_ccr_compare_views(&content, "apnic").expect("decode compare views");
assert_eq!(vrp_rows.len(), 1);
assert_eq!(vap_rows.len(), 1);
assert_eq!(vap_rows.iter().next().unwrap().providers, "AS64497");
}
#[test]
fn build_vrp_compare_rows_canonicalizes_ipv6_prefix_text() {
let rows = build_vrp_compare_rows(
&[Vrp {
asn: 64496,
prefix: IpPrefix {
afi: RoaAfi::Ipv6,
prefix_len: 32,
addr: [0x20, 0x01, 0x0d, 0xb8, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
},
max_length: 48,
}],
"APNIC",
);
let row = rows.iter().next().expect("row");
assert_eq!(row.ip_prefix, "2001:db8::/32");
}
}

1136
src/bundle/live_capture.rs Normal file

File diff suppressed because it is too large Load Diff

14
src/bundle/mod.rs Normal file
View File

@ -0,0 +1,14 @@
pub mod compare_view;
pub mod live_capture;
pub mod spec;
pub use compare_view::{
VapCompareRow, VrpCompareRow, build_vap_compare_rows, build_vrp_compare_rows,
decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
pub use live_capture::{
LiveBaseCaptureSummary, LiveDeltaCaptureSummary, RecordedHttpResponse, RecordedRsyncFetch,
RecordingHttpFetcher, RecordingRsyncFetcher, write_live_base_replay_bundle_inputs,
write_live_delta_replay_bundle_inputs,
};
pub use spec::{BundleManifest, BundleManifestEntry, RirBundleMetadata};

59
src/bundle/spec.rs Normal file
View File

@ -0,0 +1,59 @@
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifest {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
#[serde(rename = "recordedAt")]
pub recorded_at_rfc3339_utc: String,
pub rirs: Vec<String>,
#[serde(rename = "perRirBundles")]
pub per_rir_bundles: Vec<BundleManifestEntry>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifestEntry {
pub rir: String,
pub relative_path: String,
#[serde(rename = "baseValidationTime")]
pub base_validation_time: String,
#[serde(rename = "deltaValidationTime", skip_serializing_if = "Option::is_none")]
pub delta_validation_time: Option<String>,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct RirBundleMetadata {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
pub rir: String,
#[serde(rename = "baseValidationTime")]
pub base_validation_time: String,
#[serde(rename = "deltaValidationTime", skip_serializing_if = "Option::is_none")]
pub delta_validation_time: Option<String>,
#[serde(rename = "talSha256")]
pub tal_sha256: String,
#[serde(rename = "taCertSha256")]
pub ta_cert_sha256: String,
#[serde(rename = "baseCcrSha256")]
pub base_ccr_sha256: String,
#[serde(rename = "deltaCcrSha256", skip_serializing_if = "Option::is_none")]
pub delta_ccr_sha256: Option<String>,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
#[serde(rename = "hasRouterKey")]
pub has_router_key: bool,
#[serde(rename = "baseVrpCount")]
pub base_vrp_count: usize,
#[serde(rename = "baseVapCount")]
pub base_vap_count: usize,
#[serde(rename = "deltaVrpCount", skip_serializing_if = "Option::is_none")]
pub delta_vrp_count: Option<usize>,
#[serde(rename = "deltaVapCount", skip_serializing_if = "Option::is_none")]
pub delta_vap_count: Option<usize>,
}

View File

@ -4,6 +4,8 @@ pub mod data_model;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod analysis; pub mod analysis;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod bundle;
#[cfg(feature = "full")]
pub mod audit; pub mod audit;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod audit_downloads; pub mod audit_downloads;

View File

@ -211,6 +211,7 @@ pub struct ReplayDeltaRsyncModule {
pub bucket_hash: String, pub bucket_hash: String,
pub bucket_dir: PathBuf, pub bucket_dir: PathBuf,
pub meta: ReplayRsyncModuleMeta, pub meta: ReplayRsyncModuleMeta,
pub overlay_only: bool,
pub files: ReplayDeltaRsyncFiles, pub files: ReplayDeltaRsyncFiles,
pub tree_dir: PathBuf, pub tree_dir: PathBuf,
pub overlay_files: Vec<(String, PathBuf)>, pub overlay_files: Vec<(String, PathBuf)>,
@ -570,6 +571,7 @@ fn load_delta_rsync_module(
bucket_hash, bucket_hash,
bucket_dir, bucket_dir,
meta, meta,
overlay_only: entry.overlay_only,
files, files,
tree_dir, tree_dir,
overlay_files, overlay_files,

View File

@ -45,25 +45,32 @@ impl RsyncFetcher for PayloadDeltaReplayRsyncFetcher {
let mut merged: BTreeMap<String, Vec<u8>> = BTreeMap::new(); let mut merged: BTreeMap<String, Vec<u8>> = BTreeMap::new();
let mut saw_base = false; let mut saw_base = false;
if let Ok(base_module) = self let overlay_only = self
.base_index .delta_index
.resolve_rsync_module_for_base_uri(rsync_base_uri) .rsync_module(&module_uri)
{ .map(|module| module.overlay_only)
let base_tree_root = module_tree_root(&module_uri, &base_module.tree_dir) .unwrap_or(false);
.map_err(RsyncFetchError::Fetch)?; if !overlay_only {
if base_tree_root.is_dir() { if let Ok(base_module) = self
let mut base_objects = Vec::new(); .base_index
walk_dir_collect( .resolve_rsync_module_for_base_uri(rsync_base_uri)
&base_tree_root, {
&base_tree_root, let base_tree_root = module_tree_root(&module_uri, &base_module.tree_dir)
&module_uri, .map_err(RsyncFetchError::Fetch)?;
&mut base_objects, if base_tree_root.is_dir() {
) let mut base_objects = Vec::new();
.map_err(RsyncFetchError::Fetch)?; walk_dir_collect(
for (uri, bytes) in base_objects { &base_tree_root,
merged.insert(uri, bytes); &base_tree_root,
&module_uri,
&mut base_objects,
)
.map_err(RsyncFetchError::Fetch)?;
for (uri, bytes) in base_objects {
merged.insert(uri, bytes);
}
saw_base = true;
} }
saw_base = true;
} }
} }
@ -190,7 +197,7 @@ mod tests {
std::fs::write(delta_tree.join("sub").join("b.cer"), b"delta-b") std::fs::write(delta_tree.join("sub").join("b.cer"), b"delta-b")
.expect("write delta overlay"); .expect("write delta overlay");
let delta_locks = temp.path().join("locks-delta.json"); let delta_locks = temp.path().join("locks-delta.json");
std::fs::write(&delta_locks, format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"deadbeef","rrdp":{{}},"rsync":{{"{module_uri}":{{"file_count":1,"overlay_only":true}}}}}}"#)).expect("write delta locks"); std::fs::write(&delta_locks, format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"deadbeef","rrdp":{{}},"rsync":{{"{module_uri}":{{"file_count":1,"overlay_only":false}}}}}}"#)).expect("write delta locks");
(temp, base_archive, base_locks, delta_archive, delta_locks) (temp, base_archive, base_locks, delta_archive, delta_locks)
} }

View File

@ -929,7 +929,7 @@ mod tests {
let delta_locks = temp.path().join("locks-delta.json"); let delta_locks = temp.path().join("locks-delta.json");
std::fs::write( std::fs::write(
&delta_locks, &delta_locks,
format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"{base_locks_sha}","rrdp":{{"{notify_uri}":{{"kind":"delta","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}},"{fallback_notify}":{{"kind":"fallback-rsync","base":{{"transport":"rsync","session":null,"serial":null}},"target":{{"transport":"rsync","session":null,"serial":null}},"delta_count":0,"deltas":[]}}}},"rsync":{{"{module_uri}":{{"file_count":1,"overlay_only":true}}}}}}"#), format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"{base_locks_sha}","rrdp":{{"{notify_uri}":{{"kind":"delta","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}},"{fallback_notify}":{{"kind":"fallback-rsync","base":{{"transport":"rsync","session":null,"serial":null}},"target":{{"transport":"rsync","session":null,"serial":null}},"delta_count":0,"deltas":[]}}}},"rsync":{{"{module_uri}":{{"file_count":1,"overlay_only":false}}}}}}"#),
) )
.expect("write delta locks"); .expect("write delta locks");

View File

@ -112,3 +112,130 @@ fn ccr_to_routinator_csv_binary_writes_vrp_csv() {
assert!(csv.contains("ASN,IP Prefix,Max Length,Trust Anchor")); assert!(csv.contains("ASN,IP Prefix,Max Length,Trust Anchor"));
assert!(csv.contains("AS64496,203.0.113.0/24,24,apnic")); assert!(csv.contains("AS64496,203.0.113.0/24,24,apnic"));
} }
#[test]
fn ccr_to_compare_views_binary_writes_vrp_and_vap_csvs() {
use rpki::ccr::{
CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation,
build_aspa_payload_state, build_roa_payload_state, encode::encode_content_info,
};
use rpki::data_model::roa::{IpPrefix, RoaAfi};
use rpki::validation::objects::{AspaAttestation, Vrp};
let dir = tempfile::tempdir().expect("tempdir");
let ccr_path = dir.path().join("views.ccr");
let vrps_path = dir.path().join("vrps.csv");
let vaps_path = dir.path().join("vaps.csv");
let roa_state = build_roa_payload_state(&[Vrp {
asn: 64496,
prefix: IpPrefix {
afi: RoaAfi::Ipv4,
prefix_len: 24,
addr: [198, 51, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 24,
}])
.expect("build roa state");
let aspa_state = build_aspa_payload_state(&[AspaAttestation {
customer_as_id: 64496,
provider_as_ids: vec![64498, 64497, 64498],
}])
.expect("build aspa state");
let ccr = CcrContentInfo::new(RpkiCanonicalCacheRepresentation {
version: 0,
hash_alg: CcrDigestAlgorithm::Sha256,
produced_at: time::OffsetDateTime::parse(
"2026-03-30T00:00:00Z",
&time::format_description::well_known::Rfc3339,
)
.expect("time"),
mfts: None,
vrps: Some(roa_state),
vaps: Some(aspa_state),
tas: None,
rks: None,
});
std::fs::write(&ccr_path, encode_content_info(&ccr).expect("encode ccr")).expect("write ccr");
let bin = env!("CARGO_BIN_EXE_ccr_to_compare_views");
let out = Command::new(bin)
.args([
"--ccr",
ccr_path.to_string_lossy().as_ref(),
"--vrps-out",
vrps_path.to_string_lossy().as_ref(),
"--vaps-out",
vaps_path.to_string_lossy().as_ref(),
"--trust-anchor",
"apnic",
])
.output()
.expect("run ccr_to_compare_views");
assert!(out.status.success(), "stderr={}", String::from_utf8_lossy(&out.stderr));
let vrps_csv = std::fs::read_to_string(vrps_path).expect("read vrps csv");
let vaps_csv = std::fs::read_to_string(vaps_path).expect("read vaps csv");
assert!(vrps_csv.contains("ASN,IP Prefix,Max Length,Trust Anchor"));
assert!(vrps_csv.contains("AS64496,198.51.100.0/24,24,apnic"));
assert!(vaps_csv.contains("Customer ASN,Providers,Trust Anchor"));
assert!(vaps_csv.contains("AS64496,AS64497;AS64498,apnic"));
}
#[test]
fn ccr_to_compare_views_binary_writes_header_only_vap_csv_when_absent() {
use rpki::ccr::{
CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation,
build_roa_payload_state, encode::encode_content_info,
};
use rpki::data_model::roa::{IpPrefix, RoaAfi};
use rpki::validation::objects::Vrp;
let dir = tempfile::tempdir().expect("tempdir");
let ccr_path = dir.path().join("views-no-vaps.ccr");
let vrps_path = dir.path().join("vrps.csv");
let vaps_path = dir.path().join("vaps.csv");
let roa_state = build_roa_payload_state(&[Vrp {
asn: 64496,
prefix: IpPrefix {
afi: RoaAfi::Ipv4,
prefix_len: 24,
addr: [203, 0, 113, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 24,
}])
.expect("build roa state");
let ccr = CcrContentInfo::new(RpkiCanonicalCacheRepresentation {
version: 0,
hash_alg: CcrDigestAlgorithm::Sha256,
produced_at: time::OffsetDateTime::parse(
"2026-03-30T00:00:00Z",
&time::format_description::well_known::Rfc3339,
)
.expect("time"),
mfts: None,
vrps: Some(roa_state),
vaps: None,
tas: None,
rks: None,
});
std::fs::write(&ccr_path, encode_content_info(&ccr).expect("encode ccr")).expect("write ccr");
let bin = env!("CARGO_BIN_EXE_ccr_to_compare_views");
let out = Command::new(bin)
.args([
"--ccr",
ccr_path.to_string_lossy().as_ref(),
"--vrps-out",
vrps_path.to_string_lossy().as_ref(),
"--vaps-out",
vaps_path.to_string_lossy().as_ref(),
"--trust-anchor",
"apnic",
])
.output()
.expect("run ccr_to_compare_views");
assert!(out.status.success(), "stderr={}", String::from_utf8_lossy(&out.stderr));
let vaps_csv = std::fs::read_to_string(vaps_path).expect("read vaps csv");
assert_eq!(vaps_csv, "Customer ASN,Providers,Trust Anchor\n");
}