20260506 清理废弃bundle代码并收敛ccr compare view

This commit is contained in:
yuyr 2026-05-06 16:49:23 +08:00
parent f843eedda9
commit 51663a9410
26 changed files with 16 additions and 7245 deletions

View File

@ -27,7 +27,7 @@ cleanup() {
} }
trap cleanup EXIT trap cleanup EXIT
IGNORE_REGEX='src/bin/replay_bundle_capture\.rs|src/bin/replay_bundle_capture_delta\.rs|src/bin/replay_bundle_capture_sequence\.rs|src/bin/replay_bundle_record\.rs|src/bin/replay_bundle_refresh_sequence_outputs\.rs|src/bin/measure_sequence_replay\.rs|src/bin/repository_view_stats\.rs|src/bin/trace_arin_missing_vrps\.rs|src/bin/db_stats\.rs|src/bin/rrdp_state_dump\.rs|src/bin/ccr_dump\.rs|src/bin/ccr_verify\.rs|src/bin/ccr_to_routinator_csv\.rs|src/bin/ccr_to_compare_views\.rs|src/bin/cir_materialize\.rs|src/bin/cir_extract_inputs\.rs|src/bin/cir_drop_report\.rs|src/bin/cir_ta_only_fixture\.rs|src/bundle/live_capture\.rs|src/bundle/record_io\.rs|src/bundle/compare_view\.rs|src/progress_log\.rs|src/cli\.rs|src/validation/run_tree_from_tal\.rs|src/validation/tree_parallel\.rs|src/validation/from_tal\.rs|src/sync/store_projection\.rs|src/cir/materialize\.rs' IGNORE_REGEX='src/bin/repository_view_stats\.rs|src/bin/trace_arin_missing_vrps\.rs|src/bin/db_stats\.rs|src/bin/rrdp_state_dump\.rs|src/bin/ccr_dump\.rs|src/bin/ccr_verify\.rs|src/bin/ccr_to_routinator_csv\.rs|src/bin/ccr_to_compare_views\.rs|src/bin/cir_materialize\.rs|src/bin/cir_extract_inputs\.rs|src/bin/cir_drop_report\.rs|src/bin/cir_ta_only_fixture\.rs|src/ccr/compare_view\.rs|src/progress_log\.rs|src/cli\.rs|src/validation/run_tree_from_tal\.rs|src/validation/tree_parallel\.rs|src/validation/from_tal\.rs|src/sync/store_projection\.rs|src/cir/materialize\.rs'
# Preserve colored output even though we post-process output by running under a pseudo-TTY. # Preserve colored output even though we post-process output by running under a pseudo-TTY.
# We run tests only once, then generate both CLI text + HTML reports without rerunning tests. # We run tests only once, then generate both CLI text + HTML reports without rerunning tests.

View File

@ -1,129 +0,0 @@
# Live Bundle Record
`run_live_bundle_record.sh` 是当前 `ours` 的单命令 live bundle 录制入口。
它做三件事:
1. 联网执行 **live base recorder**
2. 基于刚录制的 base bundle 执行 **live delta recorder**
3. 产出一个统一的最终目录,包含:
- `base-payload-archive/`
- `payload-delta-archive/`
- `base-locks.json`
- `locks-delta.json`
- `tal.tal`
- `ta.cer`
- `base.ccr`
- `delta.ccr`
- `base-vrps.csv`
- `base-vaps.csv`
- `record-delta.csv`
- `record-delta-vaps.csv`
- `bundle.json`
- `verification.json`
- `timings/`
## 用法
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record.sh \
--rir apnic \
--tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \
--ta-path tests/fixtures/ta/apnic-ta.cer
```
默认输出目录:
```text
target/replay/<rir>_live_bundle_<timestamp>
```
如果要一次录制多个 RIR使用
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record_multi_rir.sh \
--rir afrinic,apnic,arin,lacnic,ripe
```
默认输出目录:
```text
target/replay/live_bundle_matrix_<timestamp>
```
每个 RIR 会落到:
```text
target/replay/live_bundle_matrix_<timestamp>/<rir>_live_bundle_<timestamp>
```
如果要录制单个 RIR 的 `1 base + N delta` 序列,使用:
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record_sequence.sh \
--rir apnic \
--tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \
--ta-path tests/fixtures/ta/apnic-ta.cer \
--delta-count 2 \
--delta-interval-secs 0
```
默认输出目录:
```text
target/replay/<rir>_live_bundle_sequence_<timestamp>
```
如果要一次录制多个 RIR 的 `1 base + N delta` 序列,使用:
```bash
cd rpki
./scripts/replay_bundle/run_live_bundle_record_multi_rir_sequence.sh \
--rir afrinic,apnic,arin,lacnic,ripe
```
默认输出目录:
```text
target/replay/live_bundle_sequence_matrix_<timestamp>
```
## 可选参数
- `--out-dir <path>`
- `--base-validation-time <rfc3339>`
- `--delta-validation-time <rfc3339>`
- `--http-timeout-secs <n>`
- `--rsync-timeout-secs <n>`
- `--rsync-mirror-root <path>`
- `--max-depth <n>`
- `--max-instances <n>`
- `--trust-anchor <name>`
- `--bin-dir <path>`
- `--no-build`
- `--delta-count <n>`sequence 入口)
- `--delta-interval-secs <n>`sequence 入口)
- `--keep-db`sequence 入口)
`run_live_bundle_record_multi_rir.sh` 会自动按 RIR 选择当前仓库内置的:
- `tests/fixtures/tal/*.tal`
- `tests/fixtures/ta/*.cer`
并将 `--trust-anchor` 设置为对应 RIR 名称。
## 说明
- 该脚本会先构建:
- `replay_bundle_capture`
- `replay_bundle_capture_delta`
- 如果提供 `--no-build`,则直接复用:
- `--bin-dir <path>` 下的现有二进制
- 中间 staging 目录:
- `<out>.stage-base`
- `<out>.stage-delta`
在成功完成后会清理,只保留最终输出目录。
- 最终输出目录是 **delta 阶段产物**,其中已经包含 base 阶段结果。

View File

@ -1,135 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIR=""
OUT_DIR=""
TAL_PATH=""
TA_PATH=""
BASE_VALIDATION_TIME=""
DELTA_VALIDATION_TIME=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
TRUST_ANCHOR=""
NO_BUILD=0
BIN_DIR="target/release"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record.sh \
--rir <name> \
--tal-path <path> \
--ta-path <path> \
[--out-dir <path>] \
[--base-validation-time <rfc3339>] \
[--delta-validation-time <rfc3339>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--trust-anchor <name>] \
[--bin-dir <path>] \
[--no-build]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIR="${2:?}"; shift 2 ;;
--out-dir) OUT_DIR="${2:?}"; shift 2 ;;
--tal-path) TAL_PATH="${2:?}"; shift 2 ;;
--ta-path) TA_PATH="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-validation-time) DELTA_VALIDATION_TIME="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--trust-anchor) TRUST_ANCHOR="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIR" || -z "$TAL_PATH" || -z "$TA_PATH" ]]; then
usage >&2
exit 2
fi
TS="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_DIR" ]]; then
OUT_DIR="target/replay/${RIR}_live_bundle_${TS}"
fi
STAGE_BASE="${OUT_DIR}.stage-base"
STAGE_DELTA="${OUT_DIR}.stage-delta"
rm -rf "$OUT_DIR" "$STAGE_BASE" "$STAGE_DELTA"
mkdir -p "$(dirname "$OUT_DIR")"
CAPTURE_BIN="$BIN_DIR/replay_bundle_capture"
DELTA_CAPTURE_BIN="$BIN_DIR/replay_bundle_capture_delta"
if [[ "$NO_BUILD" -eq 0 ]]; then
echo "[1/3] build release binaries"
cargo build --release --bin replay_bundle_capture --bin replay_bundle_capture_delta
else
echo "[1/3] reuse existing binaries from $BIN_DIR"
fi
if [[ ! -x "$CAPTURE_BIN" ]]; then
echo "missing executable: $CAPTURE_BIN" >&2
exit 1
fi
if [[ ! -x "$DELTA_CAPTURE_BIN" ]]; then
echo "missing executable: $DELTA_CAPTURE_BIN" >&2
exit 1
fi
echo "[2/3] record live base bundle into $STAGE_BASE"
BASE_CMD=(
"$CAPTURE_BIN"
--rir "$RIR"
--out-dir "$STAGE_BASE"
--tal-path "$TAL_PATH"
--ta-path "$TA_PATH"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && BASE_CMD+=(--validation-time "$BASE_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && BASE_CMD+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && BASE_CMD+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && BASE_CMD+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && BASE_CMD+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && BASE_CMD+=(--max-instances "$MAX_INSTANCES")
[[ -n "$TRUST_ANCHOR" ]] && BASE_CMD+=(--trust-anchor "$TRUST_ANCHOR")
"${BASE_CMD[@]}"
echo "[3/3] record live delta bundle into $STAGE_DELTA"
DELTA_CMD=(
"$DELTA_CAPTURE_BIN"
--rir "$RIR"
--base-bundle-dir "$STAGE_BASE"
--out-dir "$STAGE_DELTA"
)
[[ -n "$DELTA_VALIDATION_TIME" ]] && DELTA_CMD+=(--validation-time "$DELTA_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && DELTA_CMD+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && DELTA_CMD+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && DELTA_CMD+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && DELTA_CMD+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && DELTA_CMD+=(--max-instances "$MAX_INSTANCES")
[[ -n "$TRUST_ANCHOR" ]] && DELTA_CMD+=(--trust-anchor "$TRUST_ANCHOR")
"${DELTA_CMD[@]}"
mv "$STAGE_DELTA" "$OUT_DIR"
rm -rf "$STAGE_BASE"
echo "$OUT_DIR"

View File

@ -1,166 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIRS=""
OUT_ROOT=""
BASE_VALIDATION_TIME=""
DELTA_VALIDATION_TIME=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
NO_BUILD=0
BIN_DIR="target/release"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record_multi_rir.sh \
--rir <afrinic,apnic,...> \
[--out-root <path>] \
[--base-validation-time <rfc3339>] \
[--delta-validation-time <rfc3339>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--bin-dir <path>] \
[--no-build]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIRS="${2:?}"; shift 2 ;;
--out-root) OUT_ROOT="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-validation-time) DELTA_VALIDATION_TIME="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIRS" ]]; then
usage >&2
exit 2
fi
RUN_TAG="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_ROOT" ]]; then
OUT_ROOT="target/replay/live_bundle_matrix_${RUN_TAG}"
fi
mkdir -p "$OUT_ROOT"
resolve_tal_path() {
case "$1" in
afrinic) printf 'tests/fixtures/tal/afrinic.tal' ;;
apnic) printf 'tests/fixtures/tal/apnic-rfc7730-https.tal' ;;
arin) printf 'tests/fixtures/tal/arin.tal' ;;
lacnic) printf 'tests/fixtures/tal/lacnic.tal' ;;
ripe) printf 'tests/fixtures/tal/ripe-ncc.tal' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
resolve_ta_path() {
case "$1" in
afrinic) printf 'tests/fixtures/ta/afrinic-ta.cer' ;;
apnic) printf 'tests/fixtures/ta/apnic-ta.cer' ;;
arin) printf 'tests/fixtures/ta/arin-ta.cer' ;;
lacnic) printf 'tests/fixtures/ta/lacnic-ta.cer' ;;
ripe) printf 'tests/fixtures/ta/ripe-ncc-ta.cer' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
SUMMARY_JSON="$OUT_ROOT/summary.json"
SUMMARY_MD="$OUT_ROOT/summary.md"
python3 - "$SUMMARY_JSON" "$RUN_TAG" <<'PY'
import json, sys
out, run_tag = sys.argv[1:]
with open(out, "w") as fh:
json.dump({"runTag": run_tag, "results": []}, fh, indent=2)
PY
IFS=',' read -r -a RIR_LIST <<< "$RIRS"
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
tal_path="$(resolve_tal_path "$rir")"
ta_path="$(resolve_ta_path "$rir")"
out_dir="$OUT_ROOT/${rir}_live_bundle_${RUN_TAG}"
cmd=(
./scripts/replay_bundle/run_live_bundle_record.sh
--rir "$rir"
--out-dir "$out_dir"
--tal-path "$tal_path"
--ta-path "$ta_path"
--trust-anchor "$rir"
--bin-dir "$BIN_DIR"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && cmd+=(--base-validation-time "$BASE_VALIDATION_TIME")
[[ -n "$DELTA_VALIDATION_TIME" ]] && cmd+=(--delta-validation-time "$DELTA_VALIDATION_TIME")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && cmd+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && cmd+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && cmd+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && cmd+=(--max-instances "$MAX_INSTANCES")
[[ "$NO_BUILD" -eq 1 ]] && cmd+=(--no-build)
"${cmd[@]}"
python3 - "$SUMMARY_JSON" "$rir" "$out_dir" <<'PY'
import json, pathlib, sys
summary_path, rir, out_dir = sys.argv[1:]
summary = json.loads(pathlib.Path(summary_path).read_text())
bundle = json.loads(pathlib.Path(out_dir, rir, "bundle.json").read_text())
verification = json.loads(pathlib.Path(out_dir, rir, "verification.json").read_text())
summary["results"].append({
"rir": rir,
"outDir": out_dir,
"baseVrpCount": bundle["baseVrpCount"],
"deltaVrpCount": bundle["deltaVrpCount"],
"baseVapCount": bundle["baseVapCount"],
"deltaVapCount": bundle["deltaVapCount"],
"baseSelfReplayOk": verification["base"]["capture"]["selfReplayOk"],
"deltaSelfReplayOk": verification["delta"]["capture"]["selfReplayOk"],
})
pathlib.Path(summary_path).write_text(json.dumps(summary, indent=2))
PY
done
python3 - "$SUMMARY_JSON" "$SUMMARY_MD" <<'PY'
import json, pathlib, sys
summary = json.loads(pathlib.Path(sys.argv[1]).read_text())
out = pathlib.Path(sys.argv[2])
lines = [
"# Multi-RIR Live Bundle Record Summary",
"",
f"- runTag: `{summary['runTag']}`",
"",
"| rir | base_vrps | delta_vrps | base_vaps | delta_vaps | base_self_replay | delta_self_replay | out_dir |",
"|---|---:|---:|---:|---:|---|---|---|",
]
for item in summary["results"]:
lines.append(
f"| {item['rir']} | {item['baseVrpCount']} | {item['deltaVrpCount']} | "
f"{item['baseVapCount']} | {item['deltaVapCount']} | "
f"{str(item['baseSelfReplayOk']).lower()} | {str(item['deltaSelfReplayOk']).lower()} | "
f"`{item['outDir']}` |"
)
out.write_text("\n".join(lines) + "\n")
PY
echo "$OUT_ROOT"

View File

@ -1,173 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIRS=""
OUT_ROOT=""
BASE_VALIDATION_TIME=""
DELTA_COUNT=""
DELTA_INTERVAL_SECS=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
NO_BUILD=0
KEEP_DB=0
CAPTURE_INPUTS_ONLY=0
BIN_DIR="target/release"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record_multi_rir_sequence.sh \
--rir <afrinic,apnic,...> \
[--out-root <path>] \
[--base-validation-time <rfc3339>] \
[--delta-count <n>] \
[--delta-interval-secs <n>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--bin-dir <path>] \
[--no-build] \
[--keep-db] \
[--capture-inputs-only]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIRS="${2:?}"; shift 2 ;;
--out-root) OUT_ROOT="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-count) DELTA_COUNT="${2:?}"; shift 2 ;;
--delta-interval-secs) DELTA_INTERVAL_SECS="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--keep-db) KEEP_DB=1; shift ;;
--capture-inputs-only) CAPTURE_INPUTS_ONLY=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIRS" ]]; then
usage >&2
exit 2
fi
RUN_TAG="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_ROOT" ]]; then
OUT_ROOT="target/replay/live_bundle_sequence_matrix_${RUN_TAG}"
fi
mkdir -p "$OUT_ROOT"
resolve_tal_path() {
case "$1" in
afrinic) printf 'tests/fixtures/tal/afrinic.tal' ;;
apnic) printf 'tests/fixtures/tal/apnic-rfc7730-https.tal' ;;
arin) printf 'tests/fixtures/tal/arin.tal' ;;
lacnic) printf 'tests/fixtures/tal/lacnic.tal' ;;
ripe) printf 'tests/fixtures/tal/ripe-ncc.tal' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
resolve_ta_path() {
case "$1" in
afrinic) printf 'tests/fixtures/ta/afrinic-ta.cer' ;;
apnic) printf 'tests/fixtures/ta/apnic-ta.cer' ;;
arin) printf 'tests/fixtures/ta/arin-ta.cer' ;;
lacnic) printf 'tests/fixtures/ta/lacnic-ta.cer' ;;
ripe) printf 'tests/fixtures/ta/ripe-ncc-ta.cer' ;;
*) echo "unsupported rir: $1" >&2; exit 2 ;;
esac
}
SUMMARY_JSON="$OUT_ROOT/summary.json"
SUMMARY_MD="$OUT_ROOT/summary.md"
python3 - "$SUMMARY_JSON" "$RUN_TAG" <<'PY'
import json, sys
path, run_tag = sys.argv[1:]
with open(path, "w") as fh:
json.dump({"runTag": run_tag, "results": []}, fh, indent=2)
PY
IFS=',' read -r -a RIR_LIST <<< "$RIRS"
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
tal_path="$(resolve_tal_path "$rir")"
ta_path="$(resolve_ta_path "$rir")"
out_dir="$OUT_ROOT/${rir}_live_bundle_sequence_${RUN_TAG}"
cmd=(
./scripts/replay_bundle/run_live_bundle_record_sequence.sh
--rir "$rir"
--out-dir "$out_dir"
--tal-path "$tal_path"
--ta-path "$ta_path"
--trust-anchor "$rir"
--bin-dir "$BIN_DIR"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && cmd+=(--base-validation-time "$BASE_VALIDATION_TIME")
[[ -n "$DELTA_COUNT" ]] && cmd+=(--delta-count "$DELTA_COUNT")
[[ -n "$DELTA_INTERVAL_SECS" ]] && cmd+=(--delta-interval-secs "$DELTA_INTERVAL_SECS")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && cmd+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && cmd+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && cmd+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && cmd+=(--max-instances "$MAX_INSTANCES")
[[ "$NO_BUILD" -eq 1 ]] && cmd+=(--no-build)
[[ "$KEEP_DB" -eq 1 ]] && cmd+=(--keep-db)
[[ "$CAPTURE_INPUTS_ONLY" -eq 1 ]] && cmd+=(--capture-inputs-only)
"${cmd[@]}"
python3 - "$SUMMARY_JSON" "$rir" "$out_dir" <<'PY'
import json, pathlib, sys
summary_path, rir, out_dir = sys.argv[1:]
summary = json.loads(pathlib.Path(summary_path).read_text())
bundle = json.loads(pathlib.Path(out_dir, rir, "bundle.json").read_text())
verification = json.loads(pathlib.Path(out_dir, rir, "verification.json").read_text())
summary["results"].append({
"rir": rir,
"outDir": out_dir,
"stepCount": len(bundle["deltaSequence"]["steps"]),
"baseVrpCount": bundle["base"]["vrpCount"],
"baseVapCount": bundle["base"]["vapCount"],
"allStepsSelfReplayOk": verification["summary"]["allStepsSelfReplayOk"],
})
pathlib.Path(summary_path).write_text(json.dumps(summary, indent=2))
PY
done
python3 - "$SUMMARY_JSON" "$SUMMARY_MD" <<'PY'
import json, pathlib, sys
summary = json.loads(pathlib.Path(sys.argv[1]).read_text())
out = pathlib.Path(sys.argv[2])
lines = [
"# Multi-RIR Live Bundle Sequence Summary",
"",
f"- runTag: `{summary['runTag']}`",
"",
"| rir | step_count | base_vrps | base_vaps | all_steps_self_replay | out_dir |",
"|---|---:|---:|---:|---|---|",
]
for item in summary["results"]:
lines.append(
f"| {item['rir']} | {item['stepCount']} | {item['baseVrpCount']} | {item['baseVapCount']} | "
f"{str(item['allStepsSelfReplayOk']).lower()} | `{item['outDir']}` |"
)
out.write_text("\n".join(lines) + "\n")
PY
echo "$OUT_ROOT"

View File

@ -1,119 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
RIR=""
OUT_DIR=""
TAL_PATH=""
TA_PATH=""
BASE_VALIDATION_TIME=""
DELTA_COUNT=""
DELTA_INTERVAL_SECS=""
HTTP_TIMEOUT_SECS=""
RSYNC_TIMEOUT_SECS=""
RSYNC_MIRROR_ROOT=""
MAX_DEPTH=""
MAX_INSTANCES=""
TRUST_ANCHOR=""
NO_BUILD=0
KEEP_DB=0
CAPTURE_INPUTS_ONLY=0
BIN_DIR="target/release"
PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}"
PROGRESS_SLOW_SECS="${RPKI_PROGRESS_SLOW_SECS:-30}"
usage() {
cat <<'EOF'
Usage:
./scripts/replay_bundle/run_live_bundle_record_sequence.sh \
--rir <name> \
--tal-path <path> \
--ta-path <path> \
[--out-dir <path>] \
[--base-validation-time <rfc3339>] \
[--delta-count <n>] \
[--delta-interval-secs <n>] \
[--http-timeout-secs <n>] \
[--rsync-timeout-secs <n>] \
[--rsync-mirror-root <path>] \
[--max-depth <n>] \
[--max-instances <n>] \
[--trust-anchor <name>] \
[--bin-dir <path>] \
[--no-build] \
[--keep-db] \
[--capture-inputs-only]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--rir) RIR="${2:?}"; shift 2 ;;
--out-dir) OUT_DIR="${2:?}"; shift 2 ;;
--tal-path) TAL_PATH="${2:?}"; shift 2 ;;
--ta-path) TA_PATH="${2:?}"; shift 2 ;;
--base-validation-time) BASE_VALIDATION_TIME="${2:?}"; shift 2 ;;
--delta-count) DELTA_COUNT="${2:?}"; shift 2 ;;
--delta-interval-secs) DELTA_INTERVAL_SECS="${2:?}"; shift 2 ;;
--http-timeout-secs) HTTP_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-timeout-secs) RSYNC_TIMEOUT_SECS="${2:?}"; shift 2 ;;
--rsync-mirror-root) RSYNC_MIRROR_ROOT="${2:?}"; shift 2 ;;
--max-depth) MAX_DEPTH="${2:?}"; shift 2 ;;
--max-instances) MAX_INSTANCES="${2:?}"; shift 2 ;;
--trust-anchor) TRUST_ANCHOR="${2:?}"; shift 2 ;;
--bin-dir) BIN_DIR="${2:?}"; shift 2 ;;
--no-build) NO_BUILD=1; shift ;;
--keep-db) KEEP_DB=1; shift ;;
--capture-inputs-only) CAPTURE_INPUTS_ONLY=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$RIR" || -z "$TAL_PATH" || -z "$TA_PATH" ]]; then
usage >&2
exit 2
fi
TS="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_DIR" ]]; then
OUT_DIR="target/replay/${RIR}_live_bundle_sequence_${TS}"
fi
SEQUENCE_BIN="$BIN_DIR/replay_bundle_capture_sequence"
if [[ "$NO_BUILD" -eq 0 ]]; then
echo "[1/1] build release binary"
cargo build --release --bin replay_bundle_capture_sequence
else
echo "[1/1] reuse existing binary from $BIN_DIR"
fi
if [[ ! -x "$SEQUENCE_BIN" ]]; then
echo "missing executable: $SEQUENCE_BIN" >&2
exit 1
fi
cmd=(
"$SEQUENCE_BIN"
--rir "$RIR"
--out-dir "$OUT_DIR"
--tal-path "$TAL_PATH"
--ta-path "$TA_PATH"
)
[[ -n "$BASE_VALIDATION_TIME" ]] && cmd+=(--base-validation-time "$BASE_VALIDATION_TIME")
[[ -n "$DELTA_COUNT" ]] && cmd+=(--delta-count "$DELTA_COUNT")
[[ -n "$DELTA_INTERVAL_SECS" ]] && cmd+=(--delta-interval-secs "$DELTA_INTERVAL_SECS")
[[ -n "$HTTP_TIMEOUT_SECS" ]] && cmd+=(--http-timeout-secs "$HTTP_TIMEOUT_SECS")
[[ -n "$RSYNC_TIMEOUT_SECS" ]] && cmd+=(--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS")
[[ -n "$RSYNC_MIRROR_ROOT" ]] && cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
[[ -n "$MAX_DEPTH" ]] && cmd+=(--max-depth "$MAX_DEPTH")
[[ -n "$MAX_INSTANCES" ]] && cmd+=(--max-instances "$MAX_INSTANCES")
[[ -n "$TRUST_ANCHOR" ]] && cmd+=(--trust-anchor "$TRUST_ANCHOR")
[[ "$KEEP_DB" -eq 1 ]] && cmd+=(--keep-db)
[[ "$CAPTURE_INPUTS_ONLY" -eq 1 ]] && cmd+=(--capture-inputs-only)
RPKI_PROGRESS_LOG="$PROGRESS_LOG" \
RPKI_PROGRESS_SLOW_SECS="$PROGRESS_SLOW_SECS" \
"${cmd[@]}"

View File

@ -12,7 +12,7 @@
- `<rir>_ccr_replay_<timestamp>` - `<rir>_ccr_replay_<timestamp>`
默认输入: 默认输入:
- bundle root: `/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3` - 历史 replay fixture root: `/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3`
- 每个 RIR 的 TAL / TA / validation time / record CSV 由 `scripts/payload_replay/multi_rir_case_info.py` 解析 - 每个 RIR 的 TAL / TA / validation time / record CSV 由 `scripts/payload_replay/multi_rir_case_info.py` 解析
用法: 用法:
@ -43,25 +43,3 @@
- 同次执行总汇总: - 同次执行总汇总:
- `multi_rir_ccr_replay_verify_<timestamp>_summary.md` - `multi_rir_ccr_replay_verify_<timestamp>_summary.md`
- `multi_rir_ccr_replay_verify_<timestamp>_summary.json` - `multi_rir_ccr_replay_verify_<timestamp>_summary.json`
## `run_peer_bundle_matrix.sh`
用途:
- 对一组 `ours live bundle` 做本地 peer replay 矩阵验证
- Routinator 与 `rpki-client` 分别消费相同 bundle root
- 汇总 `VRP + VAP` 的 base / delta 结果
用法:
- `./scripts/replay_verify/run_peer_bundle_matrix.sh --bundle-root target/replay/live_bundle_matrix_<timestamp>`
- `./scripts/replay_verify/run_peer_bundle_matrix.sh --bundle-root target/replay/live_bundle_matrix_<timestamp> --rir apnic,ripe`
主要产物:
- 输出根目录:
- `target/replay/peer_bundle_matrix_<timestamp>/`
- Routinator
- `target/replay/peer_bundle_matrix_<timestamp>/routinator/<rir>/`
- `rpki-client`
- `target/replay/peer_bundle_matrix_<timestamp>/rpki-client/`
- 汇总:
- `summary.json`
- `summary.md`

View File

@ -1,210 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
BUNDLE_ROOT=""
RIRS=""
OUT_ROOT=""
ROUTINATOR_ROOT="/home/yuyr/dev/rust_playground/routinator"
RPKI_CLIENT_ROOT="/home/yuyr/dev/rpki-client-9.7"
RPKI_CLIENT_BUILD_DIR="/home/yuyr/dev/rpki-client-9.7/build-m5"
KEEP_DB=0
usage() {
cat <<'EOF'
Usage:
./scripts/replay_verify/run_peer_bundle_matrix.sh \
--bundle-root <dir> \
[--rir <afrinic,apnic,...>] \
[--out-root <dir>] \
[--routinator-root <dir>] \
[--rpki-client-root <dir>] \
[--rpki-client-build-dir <dir>] \
[--keep-db]
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--bundle-root) BUNDLE_ROOT="${2:?}"; shift 2 ;;
--rir) RIRS="${2:?}"; shift 2 ;;
--out-root) OUT_ROOT="${2:?}"; shift 2 ;;
--routinator-root) ROUTINATOR_ROOT="${2:?}"; shift 2 ;;
--rpki-client-root) RPKI_CLIENT_ROOT="${2:?}"; shift 2 ;;
--rpki-client-build-dir) RPKI_CLIENT_BUILD_DIR="${2:?}"; shift 2 ;;
--keep-db) KEEP_DB=1; shift ;;
--help|-h) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;;
esac
done
if [[ -z "$BUNDLE_ROOT" ]]; then
usage >&2
exit 2
fi
BUNDLE_ROOT="$(python3 - "$BUNDLE_ROOT" <<'PY'
from pathlib import Path
import sys
print(Path(sys.argv[1]).resolve())
PY
)"
RUN_TAG="$(date -u +%Y%m%dT%H%M%SZ)"
if [[ -z "$OUT_ROOT" ]]; then
OUT_ROOT="target/replay/peer_bundle_matrix_${RUN_TAG}"
fi
mkdir -p "$OUT_ROOT"
OUT_ROOT="$(python3 - "$OUT_ROOT" <<'PY'
from pathlib import Path
import sys
print(Path(sys.argv[1]).resolve())
PY
)"
discover_rirs() {
python3 - "$BUNDLE_ROOT" <<'PY'
from pathlib import Path
import sys
root = Path(sys.argv[1])
if (root / "base-locks.json").exists():
print(root.name)
raise SystemExit
rirs = []
for entry in sorted(root.iterdir()):
if not entry.is_dir():
continue
if (entry / "base-locks.json").exists():
rirs.append(entry.name)
continue
nested = sorted(
child.name for child in entry.iterdir()
if child.is_dir() and (child / "base-locks.json").exists()
)
if len(nested) == 1:
rirs.append(nested[0])
print(",".join(rirs))
PY
}
if [[ -z "$RIRS" ]]; then
RIRS="$(discover_rirs)"
fi
ROUTI_OUT="$OUT_ROOT/routinator"
CLIENT_OUT="$OUT_ROOT/rpki-client"
NORMALIZED_BUNDLE_ROOT="$OUT_ROOT/.normalized-bundle-root"
mkdir -p "$ROUTI_OUT" "$CLIENT_OUT"
rm -rf "$NORMALIZED_BUNDLE_ROOT"
mkdir -p "$NORMALIZED_BUNDLE_ROOT"
IFS=',' read -r -a RIR_LIST <<< "$RIRS"
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
source_bundle_dir=""
if [[ -d "$BUNDLE_ROOT/$rir" && -f "$BUNDLE_ROOT/$rir/base-locks.json" ]]; then
source_bundle_dir="$BUNDLE_ROOT/$rir"
else
match="$(find "$BUNDLE_ROOT" -maxdepth 2 -type d -path "*/${rir}" -exec test -f '{}/base-locks.json' ';' -print | head -n 1)"
if [[ -z "$match" ]]; then
echo "unable to resolve bundle directory for RIR: $rir" >&2
exit 1
fi
source_bundle_dir="$match"
fi
ln -sfn "$source_bundle_dir" "$NORMALIZED_BUNDLE_ROOT/$rir"
ROUTI_CMD=(
"$ROUTINATOR_ROOT/bench/multi_rir_demo_ours/run_single_rir_ours_bundle.sh"
"$source_bundle_dir"
"$ROUTI_OUT/$rir"
)
[[ "$KEEP_DB" -eq 1 ]] && ROUTI_CMD=( "$ROUTINATOR_ROOT/bench/multi_rir_demo_ours/run_single_rir_ours_bundle.sh" --keep-db "$source_bundle_dir" "$ROUTI_OUT/$rir" )
"${ROUTI_CMD[@]}"
done
CLIENT_ARGS=(
python3 "$RPKI_CLIENT_ROOT/tools/run_bundle_matrix.py"
--bundle-dir "$NORMALIZED_BUNDLE_ROOT"
--build-dir "$RPKI_CLIENT_BUILD_DIR"
--work-dir "$CLIENT_OUT"
)
[[ "$KEEP_DB" -eq 1 ]] && CLIENT_ARGS+=(--keep-db)
for raw_rir in "${RIR_LIST[@]}"; do
rir="$(printf '%s' "$raw_rir" | tr '[:upper:]' '[:lower:]' | xargs)"
[[ -n "$rir" ]] || continue
CLIENT_ARGS+=(--rir "$rir")
done
"${CLIENT_ARGS[@]}"
SUMMARY_JSON="$OUT_ROOT/summary.json"
SUMMARY_MD="$OUT_ROOT/summary.md"
python3 - "$ROUTI_OUT" "$CLIENT_OUT/matrix-summary.json" "$SUMMARY_JSON" <<'PY'
import json
from pathlib import Path
import sys
routi_root = Path(sys.argv[1])
client_summary = json.loads(Path(sys.argv[2]).read_text())
summary_path = Path(sys.argv[3])
summary = {"routinator": {}, "rpki_client": client_summary}
for verification in sorted(routi_root.glob("*/verification.json")):
rir = verification.parent.name
summary["routinator"][rir] = json.loads(verification.read_text())
summary_path.write_text(json.dumps(summary, indent=2))
PY
python3 - "$SUMMARY_JSON" "$SUMMARY_MD" <<'PY'
import json
from pathlib import Path
import sys
summary = json.loads(Path(sys.argv[1]).read_text())
out = Path(sys.argv[2])
lines = [
"# Peer Bundle Matrix Summary",
"",
"## Routinator",
"",
"| rir | base_vrp | base_vap | sequence_vrp | sequence_vap |",
"|---|---|---|---|---|",
]
for rir, data in sorted(summary["routinator"].items()):
if "steps" in data:
lines.append(
f"| {rir} | {str(data.get('baseMatch')).lower()} | {str(data.get('baseVapsMatch')).lower()} | "
f"{str(data.get('summary', {}).get('allStepsMatch')).lower()} | "
f"{str(data.get('summary', {}).get('allStepsVapsMatch')).lower()} |"
)
else:
lines.append(
f"| {rir} | {str(data.get('baseMatch')).lower()} | {str(data.get('baseVapsMatch')).lower()} | "
f"{str(data.get('deltaMatch')).lower()} | {str(data.get('deltaVapsMatch')).lower()} |"
)
lines += [
"",
"## rpki-client",
"",
"| rir | base_vrp | base_vap | sequence_vrp | sequence_vap |",
"|---|---|---|---|---|",
]
for rir, phases in sorted(summary["rpki_client"].items()):
base = phases.get("base", {})
step_items = [
value for key, value in phases.items()
if key not in ("base", "delta") and isinstance(value, dict)
]
if "delta" in phases:
step_items.append(phases["delta"])
all_step_match = all(item.get("match") for item in step_items) if step_items else None
all_step_vap_match = all(item.get("vaps_match") for item in step_items) if step_items else None
lines.append(
f"| {rir} | {str(base.get('match')).lower()} | {str(base.get('vaps_match')).lower()} | "
f"{str(all_step_match).lower()} | {str(all_step_vap_match).lower()} |"
)
out.write_text("\n".join(lines) + "\n")
PY
echo "$OUT_ROOT"

View File

@ -1,5 +1,5 @@
use rpki::bundle::{decode_ccr_compare_views, write_vap_csv, write_vrp_csv};
use rpki::ccr::decode_content_info; use rpki::ccr::decode_content_info;
use rpki::ccr::{decode_ccr_compare_views, write_vap_csv, write_vrp_csv};
#[derive(Debug, Default, PartialEq, Eq)] #[derive(Debug, Default, PartialEq, Eq)]
struct Args { struct Args {

View File

@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf; use std::path::PathBuf;
use rpki::blob_store::ExternalRepoBytesDb; use rpki::blob_store::ExternalRepoBytesDb;
use rpki::bundle::decode_ccr_compare_views; use rpki::ccr::decode_ccr_compare_views;
use rpki::ccr::decode_content_info; use rpki::ccr::decode_content_info;
use rpki::cir::decode_cir; use rpki::cir::decode_cir;
use rpki::data_model::roa::RoaObject; use rpki::data_model::roa::RoaObject;

View File

@ -1,263 +0,0 @@
use rpki::bundle::record_io::load_validation_time;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_step_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use serde::Serialize;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
fn usage() -> &'static str {
"Usage: measure_sequence_replay --bundle-root <dir> [--rir <rir[,rir...]>] --out <path> [--keep-db]"
}
#[derive(Default)]
struct Args {
bundle_root: Option<PathBuf>,
rirs: Option<Vec<String>>,
out: Option<PathBuf>,
keep_db: bool,
}
fn parse_args() -> Result<Args, String> {
let mut out = Args::default();
let argv: Vec<String> = std::env::args().skip(1).collect();
let mut i = 0usize;
while i < argv.len() {
match argv[i].as_str() {
"--bundle-root" => {
i += 1;
out.bundle_root = Some(PathBuf::from(
argv.get(i).ok_or("--bundle-root requires a value")?,
));
}
"--rir" => {
i += 1;
let value = argv.get(i).ok_or("--rir requires a value")?;
out.rirs = Some(
value
.split(',')
.map(|s| s.trim().to_lowercase())
.filter(|s| !s.is_empty())
.collect(),
);
}
"--out" => {
i += 1;
out.out = Some(PathBuf::from(argv.get(i).ok_or("--out requires a value")?));
}
"--keep-db" => out.keep_db = true,
"--help" | "-h" => return Err(usage().to_string()),
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if out.bundle_root.is_none() || out.out.is_none() {
return Err(format!("--bundle-root and --out are required\n{}", usage()));
}
Ok(out)
}
#[derive(Serialize)]
struct PhaseTiming {
duration_seconds: f64,
vrp_count: usize,
vap_count: usize,
}
#[derive(Serialize)]
struct RirTiming {
rir: String,
base: PhaseTiming,
steps: Vec<(String, PhaseTiming)>,
}
fn discover_rirs(bundle_root: &Path) -> Result<Vec<String>, String> {
let mut out = Vec::new();
for entry in fs::read_dir(bundle_root)
.map_err(|e| format!("read_dir failed: {}: {e}", bundle_root.display()))?
{
let entry = entry.map_err(|e| format!("read_dir entry failed: {e}"))?;
let path = entry.path();
if path.is_dir() && path.join("bundle.json").exists() && path.join("tal.tal").exists() {
out.push(
path.file_name()
.and_then(|s| s.to_str())
.ok_or_else(|| format!("invalid rir dir name: {}", path.display()))?
.to_string(),
);
}
}
out.sort();
Ok(out)
}
fn path_join(root: &Path, relative: &str) -> PathBuf {
root.join(relative)
}
fn main() {
if let Err(err) = real_main() {
eprintln!("{err}");
std::process::exit(1);
}
}
fn real_main() -> Result<(), String> {
let args = parse_args()?;
let bundle_root = args.bundle_root.unwrap();
let out_path = args.out.unwrap();
let rirs = match args.rirs {
Some(v) => v,
None => discover_rirs(&bundle_root)?,
};
let mut results = Vec::new();
let tmp_root = out_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join(".tmp-sequence-replay");
fs::create_dir_all(&tmp_root)
.map_err(|e| format!("create tmp root failed: {}: {e}", tmp_root.display()))?;
for rir in rirs {
let rir_dir = bundle_root.join(&rir);
let bundle: serde_json::Value = serde_json::from_slice(
&fs::read(rir_dir.join("bundle.json"))
.map_err(|e| format!("read bundle failed: {}: {e}", rir_dir.display()))?,
)
.map_err(|e| format!("parse bundle failed for {}: {e}", rir_dir.display()))?;
let tal_bytes = fs::read(rir_dir.join("tal.tal"))
.map_err(|e| format!("read tal.tal failed for {}: {e}", rir_dir.display()))?;
let ta_bytes = fs::read(rir_dir.join("ta.cer"))
.map_err(|e| format!("read ta.cer failed for {}: {e}", rir_dir.display()))?;
let db_dir = tmp_root.join(format!("{rir}-db"));
if db_dir.exists() {
fs::remove_dir_all(&db_dir)
.map_err(|e| format!("remove old db failed: {}: {e}", db_dir.display()))?;
}
let store =
RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed for {rir}: {e}"))?;
let base_archive = path_join(
&rir_dir,
bundle["base"]["relativeArchivePath"]
.as_str()
.ok_or("bundle missing base.relativeArchivePath")?,
);
let base_locks = path_join(
&rir_dir,
bundle["base"]["relativeLocksPath"]
.as_str()
.ok_or("bundle missing base.relativeLocksPath")?,
);
let base_validation_time = load_validation_time(&base_locks)
.map_err(|e| format!("load base validation time failed for {rir}: {e}"))?;
let start = Instant::now();
let base_out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&store,
&rpki::policy::Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&base_archive,
&base_locks,
base_validation_time,
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base replay failed for {rir}: {e}"))?;
let base_timing = PhaseTiming {
duration_seconds: start.elapsed().as_secs_f64(),
vrp_count: base_out.tree.vrps.len(),
vap_count: base_out.tree.aspas.len(),
};
let mut previous_locks = base_locks.clone();
let mut step_timings = Vec::new();
for step in bundle["deltaSequence"]["steps"]
.as_array()
.ok_or("bundle missing deltaSequence.steps")?
{
let step_id = step["id"].as_str().ok_or("step missing id")?.to_string();
let step_dir = path_join(
&rir_dir,
step["relativePath"]
.as_str()
.ok_or("step missing relativePath")?,
);
let delta_archive = path_join(
&rir_dir,
step["relativeArchivePath"]
.as_str()
.ok_or("step missing relativeArchivePath")?,
);
let delta_locks = path_join(
&rir_dir,
step["relativeTransitionLocksPath"]
.as_str()
.ok_or("step missing relativeTransitionLocksPath")?,
);
let validation_time = load_validation_time(&delta_locks).map_err(|e| {
format!("load step validation time failed for {rir}/{step_id}: {e}")
})?;
let start = Instant::now();
let step_out = run_tree_from_tal_and_ta_der_payload_delta_replay_step_serial_audit(
&store,
&rpki::policy::Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&delta_archive,
&previous_locks,
&delta_locks,
validation_time,
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("delta step replay failed for {rir}/{step_id}: {e}"))?;
step_timings.push((
step_id.clone(),
PhaseTiming {
duration_seconds: start.elapsed().as_secs_f64(),
vrp_count: step_out.tree.vrps.len(),
vap_count: step_out.tree.aspas.len(),
},
));
previous_locks = step_dir.join("target-locks.json");
}
results.push(RirTiming {
rir,
base: base_timing,
steps: step_timings,
});
if !args.keep_db && db_dir.exists() {
fs::remove_dir_all(&db_dir)
.map_err(|e| format!("remove db failed: {}: {e}", db_dir.display()))?;
}
}
fs::write(
&out_path,
serde_json::to_vec_pretty(&results).map_err(|e| format!("encode json failed: {e}"))?,
)
.map_err(|e| format!("write out failed: {}: {e}", out_path.display()))?;
println!("{}", out_path.display());
Ok(())
}

View File

@ -1,419 +0,0 @@
use rpki::bundle::{
RecordingHttpFetcher, RecordingRsyncFetcher, RirBundleMetadata,
build_single_rir_bundle_manifest, build_vap_compare_rows, build_vrp_compare_rows, sha256_hex,
write_json, write_live_base_replay_bundle_inputs, write_live_bundle_rir_readme,
write_live_bundle_top_readme, write_timing_json, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, verify_content_info, write_ccr_file};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
out_dir: Option<PathBuf>,
tal_path: Option<PathBuf>,
ta_path: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
http_timeout_secs: u64,
rsync_timeout_secs: u64,
rsync_mirror_root: Option<PathBuf>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_capture --rir <name> --out-dir <path> --tal-path <path> --ta-path <path> [--validation-time <rfc3339>] [--http-timeout-secs <n>] [--rsync-timeout-secs <n>] [--rsync-mirror-root <path>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
http_timeout_secs: 20,
rsync_timeout_secs: 60,
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(
argv.get(i).ok_or("--out-dir requires a value")?,
));
}
"--tal-path" => {
i += 1;
args.tal_path = Some(PathBuf::from(
argv.get(i).ok_or("--tal-path requires a value")?,
));
}
"--ta-path" => {
i += 1;
args.ta_path = Some(PathBuf::from(
argv.get(i).ok_or("--ta-path requires a value")?,
));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--http-timeout-secs" => {
i += 1;
args.http_timeout_secs = argv
.get(i)
.ok_or("--http-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --http-timeout-secs: {e}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
args.rsync_timeout_secs = argv
.get(i)
.ok_or("--rsync-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --rsync-timeout-secs: {e}"))?;
}
"--rsync-mirror-root" => {
i += 1;
args.rsync_mirror_root = Some(PathBuf::from(
argv.get(i).ok_or("--rsync-mirror-root requires a value")?,
));
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(
argv.get(i)
.ok_or("--trust-anchor requires a value")?
.clone(),
);
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
if args.tal_path.is_none() {
return Err(format!("--tal-path is required\n{}", usage()));
}
if args.ta_path.is_none() {
return Err(format!("--ta-path is required\n{}", usage()));
}
Ok(args)
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let out_root = args.out_dir.as_ref().unwrap();
let rir_dir = out_root.join(&rir_normalized);
fs::create_dir_all(&rir_dir)
.map_err(|e| format!("create rir dir failed: {}: {e}", rir_dir.display()))?;
let tal_bytes =
fs::read(args.tal_path.as_ref().unwrap()).map_err(|e| format!("read tal failed: {e}"))?;
let ta_bytes =
fs::read(args.ta_path.as_ref().unwrap()).map_err(|e| format!("read ta failed: {e}"))?;
let validation_time = args
.validation_time
.unwrap_or_else(time::OffsetDateTime::now_utc);
let db_dir = out_root.join(".tmp").join(format!("{rir}-live-base-db"));
let replay_db_dir = out_root.join(".tmp").join(format!("{rir}-self-replay-db"));
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
if let Some(parent) = db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create tmp dir failed: {}: {e}", parent.display()))?;
}
let store = RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed: {e}"))?;
let http = RecordingHttpFetcher::new(
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs),
..HttpFetcherConfig::default()
})
.map_err(|e| format!("create http fetcher failed: {e}"))?,
);
let rsync = RecordingRsyncFetcher::new(SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
}));
let started = Instant::now();
let out = run_tree_from_tal_and_ta_der_serial_audit(
&store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&http,
&rsync,
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live base run failed: {e}"))?;
let duration = started.elapsed();
let ccr = build_ccr_from_run(
&store,
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
&out.tree.router_keys,
validation_time,
)
.map_err(|e| format!("build ccr failed: {e}"))?;
let base_ccr_path = rir_dir.join("base.ccr");
write_ccr_file(&base_ccr_path, &ccr).map_err(|e| format!("write ccr failed: {e}"))?;
let ccr_bytes = fs::read(&base_ccr_path)
.map_err(|e| format!("read written ccr failed: {}: {e}", base_ccr_path.display()))?;
let decoded = rpki::ccr::decode_content_info(&ccr_bytes)
.map_err(|e| format!("decode written ccr failed: {e}"))?;
let verify = verify_content_info(&decoded).map_err(|e| format!("verify ccr failed: {e}"))?;
let vrp_rows = build_vrp_compare_rows(&out.tree.vrps, &trust_anchor);
let vap_rows = build_vap_compare_rows(&out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = rpki::bundle::decode_ccr_compare_views(&decoded, &trust_anchor)?;
if vrp_rows != ccr_vrps {
return Err("base-vrps compare view does not match base.ccr".to_string());
}
if vap_rows != ccr_vaps {
return Err("base-vaps compare view does not match base.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("base-vrps.csv"), &vrp_rows)?;
write_vap_csv(&rir_dir.join("base-vaps.csv"), &vap_rows)?;
fs::write(rir_dir.join("tal.tal"), &tal_bytes).map_err(|e| format!("write tal failed: {e}"))?;
fs::write(rir_dir.join("ta.cer"), &ta_bytes).map_err(|e| format!("write ta failed: {e}"))?;
let capture = write_live_base_replay_bundle_inputs(
&rir_dir,
&rir_normalized,
validation_time,
&out.publication_points,
&store,
&http.snapshot_responses(),
&rsync.snapshot_fetches(),
)?;
let replay_store = RocksStore::open(&replay_db_dir)
.map_err(|e| format!("open self replay rocksdb failed: {e}"))?;
let replay_out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&replay_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("self replay failed: {e}"))?;
let replay_vrps = build_vrp_compare_rows(&replay_out.tree.vrps, &trust_anchor);
let replay_vaps = build_vap_compare_rows(&replay_out.tree.aspas, &trust_anchor);
if replay_vrps != vrp_rows {
return Err("self replay VRP compare view mismatch".to_string());
}
if replay_vaps != vap_rows {
return Err("self replay VAP compare view mismatch".to_string());
}
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_timing_json(
&rir_dir.join("timings").join("base-produce.json"),
"base",
&validation_time,
duration,
)?;
let metadata = RirBundleMetadata {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
rir: rir_normalized.clone(),
base_validation_time: validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
delta_validation_time: None,
tal_sha256: sha256_hex(&tal_bytes),
ta_cert_sha256: sha256_hex(&ta_bytes),
base_ccr_sha256: sha256_hex(&ccr_bytes),
delta_ccr_sha256: None,
has_aspa: !vap_rows.is_empty(),
has_router_key: verify.router_key_count > 0,
base_vrp_count: vrp_rows.len(),
base_vap_count: vap_rows.len(),
delta_vrp_count: None,
delta_vap_count: None,
};
write_json(&rir_dir.join("bundle.json"), &metadata)?;
write_json(
&rir_dir.join("verification.json"),
&serde_json::json!({
"base": {
"validationTime": metadata.base_validation_time,
"ccr": {
"path": "base.ccr",
"sha256": metadata.base_ccr_sha256,
"stateHashesOk": verify.state_hashes_ok,
"manifestInstances": verify.manifest_instances,
"roaVrpCount": verify.roa_vrp_count,
"aspaPayloadSets": verify.aspa_payload_sets,
"routerKeyCount": verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"baseVrpCount": metadata.base_vrp_count,
"baseVapCount": metadata.base_vap_count,
},
"capture": {
"captureId": capture.capture_id,
"rrdpRepoCount": capture.rrdp_repo_count,
"rsyncModuleCount": capture.rsync_module_count,
"selfReplayOk": true,
}
}
}),
)?;
write_live_bundle_top_readme(&out_root.join("README.md"), &rir_normalized)?;
write_live_bundle_rir_readme(
&rir_dir.join("README.md"),
&rir_normalized,
&metadata.base_validation_time,
)?;
write_json(
&out_root.join("bundle-manifest.json"),
&build_single_rir_bundle_manifest(
"20260330-v1",
"ours",
&rir_normalized,
&validation_time,
None,
metadata.has_aspa,
)?,
)?;
let _ = fs::remove_dir_all(&db_dir);
let _ = fs::remove_dir_all(&replay_db_dir);
Ok(out_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_capture".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--out-dir".to_string(),
"out".to_string(),
"--tal-path".to_string(),
"tal".to_string(),
"--ta-path".to_string(),
"ta".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.out_dir.as_deref(), Some(std::path::Path::new("out")));
assert_eq!(args.http_timeout_secs, 20);
assert_eq!(args.rsync_timeout_secs, 60);
}
#[test]
fn parse_args_rejects_missing_requireds() {
let err = parse_args(&["replay_bundle_capture".to_string()]).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
#[test]
fn write_timing_json_writes_duration_and_mode() {
let td = tempdir().expect("tempdir");
let path = td.path().join("timings/base-produce.json");
write_timing_json(
&path,
"base",
&time::OffsetDateTime::parse("2026-03-30T00:00:00Z", &Rfc3339).expect("time"),
std::time::Duration::from_millis(1500),
)
.expect("write timing");
let json: serde_json::Value =
serde_json::from_slice(&std::fs::read(&path).expect("read timing")).expect("parse");
assert_eq!(json["mode"], "base");
assert_eq!(json["durationSeconds"], 1.5);
}
}

View File

@ -1,492 +0,0 @@
use rpki::bundle::{
RecordingHttpFetcher, RecordingRsyncFetcher, build_single_rir_bundle_manifest,
build_vap_compare_rows, build_vrp_compare_rows, copy_dir_all, load_validation_time, sha256_hex,
write_json, write_live_delta_replay_bundle_inputs, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, decode_content_info, verify_content_info, write_ccr_file};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::sync::rrdp::Fetcher;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
base_bundle_dir: Option<PathBuf>,
out_dir: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
http_timeout_secs: u64,
rsync_timeout_secs: u64,
rsync_mirror_root: Option<PathBuf>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_capture_delta --rir <name> --base-bundle-dir <path> --out-dir <path> [--validation-time <rfc3339>] [--http-timeout-secs <n>] [--rsync-timeout-secs <n>] [--rsync-mirror-root <path>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args {
http_timeout_secs: 20,
rsync_timeout_secs: 60,
..Args::default()
};
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--base-bundle-dir" => {
i += 1;
args.base_bundle_dir = Some(PathBuf::from(
argv.get(i).ok_or("--base-bundle-dir requires a value")?,
));
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(
argv.get(i).ok_or("--out-dir requires a value")?,
));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--http-timeout-secs" => {
i += 1;
args.http_timeout_secs = argv
.get(i)
.ok_or("--http-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --http-timeout-secs: {e}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
args.rsync_timeout_secs = argv
.get(i)
.ok_or("--rsync-timeout-secs requires a value")?
.parse()
.map_err(|e| format!("invalid --rsync-timeout-secs: {e}"))?;
}
"--rsync-mirror-root" => {
i += 1;
args.rsync_mirror_root = Some(PathBuf::from(
argv.get(i).ok_or("--rsync-mirror-root requires a value")?,
));
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(
argv.get(i)
.ok_or("--trust-anchor requires a value")?
.clone(),
);
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.base_bundle_dir.is_none() {
return Err(format!("--base-bundle-dir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
Ok(args)
}
fn ensure_recorded_target_snapshots(
store: &RocksStore,
base_bundle_dir: &Path,
http: &RecordingHttpFetcher<BlockingHttpFetcher>,
) -> Result<(), String> {
let base_locks: serde_json::Value = serde_json::from_slice(
&fs::read(base_bundle_dir.join("base-locks.json"))
.map_err(|e| format!("read base locks failed: {e}"))?,
)
.map_err(|e| format!("parse base locks failed: {e}"))?;
let base_rrdp = base_locks
.get("rrdp")
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
for (notify_uri, base_lock) in base_rrdp {
let Some(base_transport) = base_lock.get("transport").and_then(|v| v.as_str()) else {
continue;
};
if base_transport != "rrdp" {
continue;
}
let Some(base_session) = base_lock.get("session").and_then(|v| v.as_str()) else {
continue;
};
let Some(base_serial) = base_lock.get("serial").and_then(|v| v.as_u64()) else {
continue;
};
let Some(record) = store
.get_rrdp_source_record(&notify_uri)
.map_err(|e| format!("read rrdp source record failed for {notify_uri}: {e}"))?
else {
continue;
};
let Some(target_session) = record.last_session_id.as_deref() else {
continue;
};
let Some(target_serial) = record.last_serial else {
continue;
};
if target_session != base_session || target_serial <= base_serial {
continue;
}
let Some(snapshot_uri) = record.last_snapshot_uri.as_deref() else {
continue;
};
if http.snapshot_responses().contains_key(snapshot_uri) {
continue;
}
let _ = http
.fetch(snapshot_uri)
.map_err(|e| format!("fetch target snapshot for {notify_uri} failed: {e}"))?;
}
Ok(())
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let out_root = args.out_dir.as_ref().unwrap();
let base_root = args.base_bundle_dir.as_ref().unwrap();
let base_rir_dir = base_root.join(&rir_normalized);
if !base_rir_dir.is_dir() {
return Err(format!(
"base bundle rir dir not found: {}",
base_rir_dir.display()
));
}
if out_root.exists() {
fs::remove_dir_all(out_root)
.map_err(|e| format!("remove old out dir failed: {}: {e}", out_root.display()))?;
}
copy_dir_all(base_root, out_root)?;
let rir_dir = out_root.join(&rir_normalized);
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let tal_bytes = fs::read(rir_dir.join("tal.tal"))
.map_err(|e| format!("read tal from base bundle failed: {e}"))?;
let ta_bytes = fs::read(rir_dir.join("ta.cer"))
.map_err(|e| format!("read ta from base bundle failed: {e}"))?;
let base_validation_time = load_validation_time(&rir_dir.join("base-locks.json"))?;
let target_validation_time = args
.validation_time
.unwrap_or_else(time::OffsetDateTime::now_utc);
let target_store_dir = out_root.join(".tmp").join(format!("{rir}-live-target-db"));
let self_replay_dir = out_root.join(".tmp").join(format!("{rir}-self-delta-db"));
let _ = fs::remove_dir_all(&target_store_dir);
let _ = fs::remove_dir_all(&self_replay_dir);
if let Some(parent) = target_store_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create tmp dir failed: {}: {e}", parent.display()))?;
}
let target_store = RocksStore::open(&target_store_dir)
.map_err(|e| format!("open target rocksdb failed: {e}"))?;
let _base = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&target_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
base_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base bootstrap replay failed: {e}"))?;
let http = RecordingHttpFetcher::new(
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs),
..HttpFetcherConfig::default()
})
.map_err(|e| format!("create http fetcher failed: {e}"))?,
);
let rsync = RecordingRsyncFetcher::new(SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs),
mirror_root: args.rsync_mirror_root.clone(),
..SystemRsyncConfig::default()
}));
let started = Instant::now();
let target_out = run_tree_from_tal_and_ta_der_serial_audit(
&target_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&http,
&rsync,
target_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live target run failed: {e}"))?;
let duration = started.elapsed();
ensure_recorded_target_snapshots(&target_store, &rir_dir, &http)?;
let delta_ccr = build_ccr_from_run(
&target_store,
&[target_out.discovery.trust_anchor.clone()],
&target_out.tree.vrps,
&target_out.tree.aspas,
&target_out.tree.router_keys,
target_validation_time,
)
.map_err(|e| format!("build delta ccr failed: {e}"))?;
let delta_ccr_path = rir_dir.join("delta.ccr");
write_ccr_file(&delta_ccr_path, &delta_ccr)
.map_err(|e| format!("write delta ccr failed: {e}"))?;
let delta_ccr_bytes = fs::read(&delta_ccr_path)
.map_err(|e| format!("read delta ccr failed: {}: {e}", delta_ccr_path.display()))?;
let delta_decoded = decode_content_info(&delta_ccr_bytes)
.map_err(|e| format!("decode delta ccr failed: {e}"))?;
let delta_verify =
verify_content_info(&delta_decoded).map_err(|e| format!("verify delta ccr failed: {e}"))?;
let delta_vrp_rows = build_vrp_compare_rows(&target_out.tree.vrps, &trust_anchor);
let delta_vap_rows = build_vap_compare_rows(&target_out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) =
rpki::bundle::decode_ccr_compare_views(&delta_decoded, &trust_anchor)?;
if delta_vrp_rows != ccr_vrps {
return Err("record-delta.csv compare view does not match delta.ccr".to_string());
}
if delta_vap_rows != ccr_vaps {
return Err("record-delta-vaps.csv compare view does not match delta.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("record-delta.csv"), &delta_vrp_rows)?;
write_vap_csv(&rir_dir.join("record-delta-vaps.csv"), &delta_vap_rows)?;
let capture = write_live_delta_replay_bundle_inputs(
&rir_dir,
&rir_normalized,
target_validation_time,
&target_out.publication_points,
&target_store,
&http.snapshot_responses(),
&rsync.snapshot_fetches(),
)?;
let self_store = RocksStore::open(&self_replay_dir)
.map_err(|e| format!("open self replay db failed: {e}"))?;
let replay_out = run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
&self_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
&rir_dir.join("base-payload-archive"),
&rir_dir.join("base-locks.json"),
&rir_dir.join("payload-delta-archive"),
&rir_dir.join("locks-delta.json"),
base_validation_time,
target_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("self delta replay failed: {e}"))?;
let replay_vrps = build_vrp_compare_rows(&replay_out.tree.vrps, &trust_anchor);
let replay_vaps = build_vap_compare_rows(&replay_out.tree.aspas, &trust_anchor);
if replay_vrps != delta_vrp_rows {
return Err("self delta replay VRP compare view mismatch".to_string());
}
if replay_vaps != delta_vap_rows {
return Err("self delta replay VAP compare view mismatch".to_string());
}
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_json(
&rir_dir.join("timings").join("delta-produce.json"),
&serde_json::json!({
"mode": "delta",
"validationTime": target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)?;
let mut bundle_json: serde_json::Value = serde_json::from_slice(
&fs::read(rir_dir.join("bundle.json"))
.map_err(|e| format!("read base bundle.json failed: {e}"))?,
)
.map_err(|e| format!("parse base bundle.json failed: {e}"))?;
bundle_json["deltaValidationTime"] = serde_json::Value::String(
target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
);
bundle_json["deltaCcrSha256"] = serde_json::Value::String(sha256_hex(&delta_ccr_bytes));
bundle_json["deltaVrpCount"] = serde_json::Value::from(delta_vrp_rows.len() as u64);
bundle_json["deltaVapCount"] = serde_json::Value::from(delta_vap_rows.len() as u64);
bundle_json["hasAspa"] = serde_json::Value::Bool(
bundle_json
.get("hasAspa")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| !delta_vap_rows.is_empty(),
);
bundle_json["hasRouterKey"] = serde_json::Value::Bool(
bundle_json
.get("hasRouterKey")
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| delta_verify.router_key_count > 0,
);
write_json(&rir_dir.join("bundle.json"), &bundle_json)?;
let mut verification_json: serde_json::Value = serde_json::from_slice(
&fs::read(rir_dir.join("verification.json"))
.map_err(|e| format!("read base verification.json failed: {e}"))?,
)
.map_err(|e| format!("parse base verification.json failed: {e}"))?;
verification_json["delta"] = serde_json::json!({
"validationTime": target_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
"ccr": {
"path": "delta.ccr",
"sha256": sha256_hex(&delta_ccr_bytes),
"stateHashesOk": delta_verify.state_hashes_ok,
"manifestInstances": delta_verify.manifest_instances,
"roaVrpCount": delta_verify.roa_vrp_count,
"aspaPayloadSets": delta_verify.aspa_payload_sets,
"routerKeyCount": delta_verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"deltaVrpCount": delta_vrp_rows.len(),
"deltaVapCount": delta_vap_rows.len(),
},
"capture": {
"captureId": capture.capture_id,
"rrdpRepoCount": capture.rrdp_repo_count,
"rsyncModuleCount": capture.rsync_module_count,
"selfReplayOk": true,
}
});
write_json(&rir_dir.join("verification.json"), &verification_json)?;
let bundle_manifest = build_single_rir_bundle_manifest(
"20260330-v1",
"ours",
&rir_normalized,
&base_validation_time,
Some(&target_validation_time),
bundle_json["hasAspa"].as_bool().unwrap_or(false),
)?;
write_json(&out_root.join("bundle-manifest.json"), &bundle_manifest)?;
let _ = fs::remove_dir_all(&target_store_dir);
let _ = fs::remove_dir_all(&self_replay_dir);
Ok(out_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_capture_delta".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--base-bundle-dir".to_string(),
"base".to_string(),
"--out-dir".to_string(),
"out".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.base_bundle_dir.as_deref(), Some(Path::new("base")));
assert_eq!(args.out_dir.as_deref(), Some(Path::new("out")));
}
#[test]
fn parse_args_rejects_missing_requireds() {
let err = parse_args(&["replay_bundle_capture_delta".to_string()]).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,833 +0,0 @@
use rpki::bundle::{
BundleManifest, BundleManifestEntry, RirBundleMetadata, build_vap_compare_rows,
build_vrp_compare_rows, decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
use rpki::ccr::{build_ccr_from_run, decode_content_info, verify_content_info, write_ccr_file};
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
};
use rpki::validation::tree::TreeRunConfig;
use sha2::Digest;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use time::format_description::well_known::Rfc3339;
#[derive(Debug, Default, PartialEq, Eq)]
struct Args {
rir: Option<String>,
out_dir: Option<PathBuf>,
tal_path: Option<PathBuf>,
ta_path: Option<PathBuf>,
payload_replay_archive: Option<PathBuf>,
payload_replay_locks: Option<PathBuf>,
payload_delta_archive: Option<PathBuf>,
payload_delta_locks: Option<PathBuf>,
validation_time: Option<time::OffsetDateTime>,
max_depth: Option<usize>,
max_instances: Option<usize>,
trust_anchor: Option<String>,
}
fn usage() -> &'static str {
"Usage: replay_bundle_record --rir <name> --out-dir <path> --tal-path <path> --ta-path <path> --payload-replay-archive <path> --payload-replay-locks <path> [--payload-delta-archive <path> --payload-delta-locks <path>] [--validation-time <rfc3339>] [--max-depth <n>] [--max-instances <n>] [--trust-anchor <name>]"
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut args = Args::default();
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--help" | "-h" => return Err(usage().to_string()),
"--rir" => {
i += 1;
args.rir = Some(argv.get(i).ok_or("--rir requires a value")?.clone());
}
"--out-dir" => {
i += 1;
args.out_dir = Some(PathBuf::from(
argv.get(i).ok_or("--out-dir requires a value")?,
));
}
"--tal-path" => {
i += 1;
args.tal_path = Some(PathBuf::from(
argv.get(i).ok_or("--tal-path requires a value")?,
));
}
"--ta-path" => {
i += 1;
args.ta_path = Some(PathBuf::from(
argv.get(i).ok_or("--ta-path requires a value")?,
));
}
"--payload-replay-archive" => {
i += 1;
args.payload_replay_archive = Some(PathBuf::from(
argv.get(i)
.ok_or("--payload-replay-archive requires a value")?,
));
}
"--payload-replay-locks" => {
i += 1;
args.payload_replay_locks = Some(PathBuf::from(
argv.get(i)
.ok_or("--payload-replay-locks requires a value")?,
));
}
"--payload-delta-archive" => {
i += 1;
args.payload_delta_archive = Some(PathBuf::from(
argv.get(i)
.ok_or("--payload-delta-archive requires a value")?,
));
}
"--payload-delta-locks" => {
i += 1;
args.payload_delta_locks = Some(PathBuf::from(
argv.get(i)
.ok_or("--payload-delta-locks requires a value")?,
));
}
"--validation-time" => {
i += 1;
let value = argv.get(i).ok_or("--validation-time requires a value")?;
args.validation_time = Some(
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid --validation-time: {e}"))?,
);
}
"--max-depth" => {
i += 1;
args.max_depth = Some(
argv.get(i)
.ok_or("--max-depth requires a value")?
.parse()
.map_err(|e| format!("invalid --max-depth: {e}"))?,
);
}
"--max-instances" => {
i += 1;
args.max_instances = Some(
argv.get(i)
.ok_or("--max-instances requires a value")?
.parse()
.map_err(|e| format!("invalid --max-instances: {e}"))?,
);
}
"--trust-anchor" => {
i += 1;
args.trust_anchor = Some(
argv.get(i)
.ok_or("--trust-anchor requires a value")?
.clone(),
);
}
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
i += 1;
}
if args.rir.is_none() {
return Err(format!("--rir is required\n{}", usage()));
}
if args.out_dir.is_none() {
return Err(format!("--out-dir is required\n{}", usage()));
}
if args.tal_path.is_none() {
return Err(format!("--tal-path is required\n{}", usage()));
}
if args.ta_path.is_none() {
return Err(format!("--ta-path is required\n{}", usage()));
}
if args.payload_replay_archive.is_none() {
return Err(format!("--payload-replay-archive is required\n{}", usage()));
}
if args.payload_replay_locks.is_none() {
return Err(format!("--payload-replay-locks is required\n{}", usage()));
}
Ok(args)
}
fn load_validation_time(path: &Path) -> Result<time::OffsetDateTime, String> {
let json: serde_json::Value = serde_json::from_slice(
&fs::read(path).map_err(|e| format!("read locks failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse locks failed: {}: {e}", path.display()))?;
let value = json
.get("validationTime")
.or_else(|| json.get("validation_time"))
.and_then(|v| v.as_str())
.ok_or_else(|| format!("validationTime missing in {}", path.display()))?;
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid validationTime in {}: {e}", path.display()))
}
fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(sha2::Sha256::digest(bytes))
}
fn copy_dir_all(src: &Path, dst: &Path) -> Result<(), String> {
fs::create_dir_all(dst)
.map_err(|e| format!("create directory failed: {}: {e}", dst.display()))?;
for entry in
fs::read_dir(src).map_err(|e| format!("read_dir failed: {}: {e}", src.display()))?
{
let entry = entry.map_err(|e| format!("read_dir entry failed: {}: {e}", src.display()))?;
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", entry.path().display()))?;
let to = dst.join(entry.file_name());
if ty.is_dir() {
copy_dir_all(&entry.path(), &to)?;
} else if ty.is_file() {
if let Some(parent) = to.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::copy(entry.path(), &to).map_err(|e| {
format!(
"copy failed: {} -> {}: {e}",
entry.path().display(),
to.display()
)
})?;
}
}
Ok(())
}
fn write_json(path: &Path, value: &impl serde::Serialize) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(value).map_err(|e| e.to_string())?;
fs::write(path, bytes).map_err(|e| format!("write json failed: {}: {e}", path.display()))
}
fn write_top_readme(path: &Path, rir: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# Ours Replay Bundle\n\nThis run contains one per-RIR bundle generated by `ours`.\n\n- RIR: `{rir}`\n- Reference result format: `CCR`\n"
),
)
.map_err(|e| format!("write readme failed: {}: {e}", path.display()))
}
fn write_rir_readme(path: &Path, rir: &str, base_validation_time: &str) -> Result<(), String> {
fs::write(
path,
format!(
"# {rir} replay bundle\n\n- `tal.tal` and `ta.cer` are the direct replay inputs.\n- `base-locks.json.validationTime` = `{base_validation_time}`.\n- `base.ccr` is the authoritative reference result.\n- `base-vrps.csv` and `base-vaps.csv` are compare views derived from `base.ccr`.\n"
),
)
.map_err(|e| format!("write rir readme failed: {}: {e}", path.display()))
}
fn write_timing_json(
path: &Path,
mode: &str,
validation_time: &time::OffsetDateTime,
duration: std::time::Duration,
) -> Result<(), String> {
write_json(
path,
&serde_json::json!({
"mode": mode,
"validationTime": validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)
}
fn rewrite_delta_base_locks_sha(
delta_root: &Path,
emitted_base_locks_sha256: &str,
) -> Result<(), String> {
let delta_locks = delta_root.join("locks-delta.json");
if delta_locks.is_file() {
let mut json: serde_json::Value = serde_json::from_slice(
&fs::read(&delta_locks)
.map_err(|e| format!("read delta locks failed: {}: {e}", delta_locks.display()))?,
)
.map_err(|e| format!("parse delta locks failed: {}: {e}", delta_locks.display()))?;
json.as_object_mut()
.ok_or_else(|| format!("delta locks must be object: {}", delta_locks.display()))?
.insert(
"baseLocksSha256".to_string(),
serde_json::Value::String(emitted_base_locks_sha256.to_string()),
);
write_json(&delta_locks, &json)?;
}
let archive_root = delta_root.join("payload-delta-archive");
if archive_root.is_dir() {
for path in walk_json_files_named(&archive_root, "base.json")? {
let mut json: serde_json::Value = serde_json::from_slice(
&fs::read(&path)
.map_err(|e| format!("read base.json failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse base.json failed: {}: {e}", path.display()))?;
json.as_object_mut()
.ok_or_else(|| format!("base.json must be object: {}", path.display()))?
.insert(
"baseLocksSha256".to_string(),
serde_json::Value::String(emitted_base_locks_sha256.to_string()),
);
write_json(&path, &json)?;
}
}
Ok(())
}
fn walk_json_files_named(root: &Path, name: &str) -> Result<Vec<PathBuf>, String> {
let mut out = Vec::new();
if !root.is_dir() {
return Ok(out);
}
let mut stack = vec![root.to_path_buf()];
while let Some(dir) = stack.pop() {
for entry in
fs::read_dir(&dir).map_err(|e| format!("read_dir failed: {}: {e}", dir.display()))?
{
let entry =
entry.map_err(|e| format!("read_dir entry failed: {}: {e}", dir.display()))?;
let path = entry.path();
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", path.display()))?;
if ty.is_dir() {
stack.push(path);
} else if ty.is_file() && entry.file_name() == name {
out.push(path);
}
}
}
Ok(out)
}
fn run(args: Args) -> Result<PathBuf, String> {
let rir = args.rir.as_ref().unwrap();
let rir_normalized = rir.to_ascii_lowercase();
let out_root = args.out_dir.as_ref().unwrap();
let tal_path = args.tal_path.as_ref().unwrap();
let ta_path = args.ta_path.as_ref().unwrap();
let replay_archive = args.payload_replay_archive.as_ref().unwrap();
let replay_locks = args.payload_replay_locks.as_ref().unwrap();
let trust_anchor = args
.trust_anchor
.clone()
.unwrap_or_else(|| rir_normalized.clone());
let base_validation_time = match args.validation_time {
Some(value) => value,
None => load_validation_time(replay_locks)?,
};
let delta_validation_time = match args.payload_delta_locks.as_ref() {
Some(path) => Some(load_validation_time(path)?),
None => None,
};
let run_root = out_root;
let rir_dir = run_root.join(&rir_normalized);
fs::create_dir_all(&rir_dir)
.map_err(|e| format!("create rir dir failed: {}: {e}", rir_dir.display()))?;
let tal_bytes =
fs::read(tal_path).map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_bytes =
fs::read(ta_path).map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
let db_dir = run_root.join(".tmp").join(format!("{rir}-base-db"));
if db_dir.exists() {
fs::remove_dir_all(&db_dir)
.map_err(|e| format!("remove old db failed: {}: {e}", db_dir.display()))?;
}
if let Some(parent) = db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create db parent failed: {}: {e}", parent.display()))?;
}
let store = RocksStore::open(&db_dir).map_err(|e| format!("open rocksdb failed: {e}"))?;
let base_started = Instant::now();
let out = run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
replay_archive,
replay_locks,
base_validation_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base replay failed: {e}"))?;
let base_duration = base_started.elapsed();
let ccr = build_ccr_from_run(
&store,
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
&out.tree.router_keys,
base_validation_time,
)
.map_err(|e| format!("build ccr failed: {e}"))?;
let base_ccr_path = rir_dir.join("base.ccr");
write_ccr_file(&base_ccr_path, &ccr).map_err(|e| format!("write ccr failed: {e}"))?;
let ccr_bytes = fs::read(&base_ccr_path)
.map_err(|e| format!("read written ccr failed: {}: {e}", base_ccr_path.display()))?;
let decoded =
decode_content_info(&ccr_bytes).map_err(|e| format!("decode written ccr failed: {e}"))?;
let verify = verify_content_info(&decoded).map_err(|e| format!("verify ccr failed: {e}"))?;
let vrp_rows = build_vrp_compare_rows(&out.tree.vrps, &trust_anchor);
let vap_rows = build_vap_compare_rows(&out.tree.aspas, &trust_anchor);
let (ccr_vrps, ccr_vaps) = decode_ccr_compare_views(&decoded, &trust_anchor)?;
if vrp_rows != ccr_vrps {
return Err("base-vrps compare view does not match base.ccr".to_string());
}
if vap_rows != ccr_vaps {
return Err("base-vaps compare view does not match base.ccr".to_string());
}
let base_vrps_csv = rir_dir.join("base-vrps.csv");
let base_vaps_csv = rir_dir.join("base-vaps.csv");
write_vrp_csv(&base_vrps_csv, &vrp_rows)?;
write_vap_csv(&base_vaps_csv, &vap_rows)?;
copy_dir_all(replay_archive, &rir_dir.join("base-payload-archive"))?;
let mut base_locks_json: serde_json::Value = serde_json::from_slice(
&fs::read(replay_locks)
.map_err(|e| format!("read base locks failed: {}: {e}", replay_locks.display()))?,
)
.map_err(|e| format!("parse base locks failed: {}: {e}", replay_locks.display()))?;
base_locks_json["validationTime"] = serde_json::Value::String(
base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
);
let emitted_base_locks_path = rir_dir.join("base-locks.json");
write_json(&emitted_base_locks_path, &base_locks_json)?;
let emitted_base_locks_sha256 =
sha256_hex(&fs::read(&emitted_base_locks_path).map_err(|e| {
format!(
"read emitted base locks failed: {}: {e}",
emitted_base_locks_path.display()
)
})?);
if let Some(delta_archive) = args.payload_delta_archive.as_ref() {
copy_dir_all(delta_archive, &rir_dir.join("payload-delta-archive"))?;
}
if let Some(delta_locks) = args.payload_delta_locks.as_ref() {
let mut delta_json: serde_json::Value = serde_json::from_slice(
&fs::read(delta_locks)
.map_err(|e| format!("read delta locks failed: {}: {e}", delta_locks.display()))?,
)
.map_err(|e| format!("parse delta locks failed: {}: {e}", delta_locks.display()))?;
if let Some(delta_time) = delta_validation_time.as_ref() {
delta_json
.as_object_mut()
.ok_or_else(|| "delta locks json must be an object".to_string())?
.insert(
"validationTime".to_string(),
serde_json::Value::String(
delta_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
),
);
}
write_json(&rir_dir.join("locks-delta.json"), &delta_json)?;
}
if args.payload_delta_archive.is_some() && args.payload_delta_locks.is_some() {
rewrite_delta_base_locks_sha(&rir_dir, &emitted_base_locks_sha256)?;
}
fs::write(rir_dir.join("tal.tal"), &tal_bytes).map_err(|e| format!("write tal failed: {e}"))?;
fs::write(rir_dir.join("ta.cer"), &ta_bytes).map_err(|e| format!("write ta failed: {e}"))?;
let mut metadata = RirBundleMetadata {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
rir: rir_normalized.clone(),
base_validation_time: base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
delta_validation_time: delta_validation_time.as_ref().map(|value| {
value
.format(&Rfc3339)
.expect("delta validation time must format")
}),
tal_sha256: sha256_hex(&tal_bytes),
ta_cert_sha256: sha256_hex(&ta_bytes),
base_ccr_sha256: sha256_hex(&ccr_bytes),
delta_ccr_sha256: None,
has_aspa: !vap_rows.is_empty(),
has_router_key: verify.router_key_count > 0,
base_vrp_count: vrp_rows.len(),
base_vap_count: vap_rows.len(),
delta_vrp_count: None,
delta_vap_count: None,
};
fs::create_dir_all(rir_dir.join("timings"))
.map_err(|e| format!("create timings dir failed: {e}"))?;
write_timing_json(
&rir_dir.join("timings").join("base-produce.json"),
"base",
&base_validation_time,
base_duration,
)?;
let mut verification = serde_json::json!({
"base": {
"validationTime": metadata.base_validation_time,
"ccr": {
"path": "base.ccr",
"sha256": metadata.base_ccr_sha256,
"stateHashesOk": verify.state_hashes_ok,
"manifestInstances": verify.manifest_instances,
"roaVrpCount": verify.roa_vrp_count,
"aspaPayloadSets": verify.aspa_payload_sets,
"routerKeyCount": verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"baseVrpCount": metadata.base_vrp_count,
"baseVapCount": metadata.base_vap_count,
}
}
});
if let (Some(delta_archive), Some(delta_locks), Some(delta_time)) = (
args.payload_delta_archive.as_ref(),
args.payload_delta_locks.as_ref(),
delta_validation_time.as_ref(),
) {
let delta_db_dir = run_root.join(".tmp").join(format!("{rir}-delta-db"));
if delta_db_dir.exists() {
fs::remove_dir_all(&delta_db_dir).map_err(|e| {
format!(
"remove old delta db failed: {}: {e}",
delta_db_dir.display()
)
})?;
}
if let Some(parent) = delta_db_dir.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create delta db parent failed: {}: {e}", parent.display()))?;
}
let delta_store = RocksStore::open(&delta_db_dir)
.map_err(|e| format!("open delta rocksdb failed: {e}"))?;
let delta_started = Instant::now();
let delta_out = run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
&delta_store,
&Policy::default(),
&tal_bytes,
&ta_bytes,
None,
replay_archive,
replay_locks,
delta_archive,
delta_locks,
base_validation_time,
*delta_time,
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("delta replay failed: {e}"))?;
let delta_duration = delta_started.elapsed();
let delta_ccr = build_ccr_from_run(
&delta_store,
&[delta_out.discovery.trust_anchor.clone()],
&delta_out.tree.vrps,
&delta_out.tree.aspas,
&delta_out.tree.router_keys,
*delta_time,
)
.map_err(|e| format!("build delta ccr failed: {e}"))?;
let delta_ccr_path = rir_dir.join("delta.ccr");
write_ccr_file(&delta_ccr_path, &delta_ccr)
.map_err(|e| format!("write delta ccr failed: {e}"))?;
let delta_ccr_bytes = fs::read(&delta_ccr_path).map_err(|e| {
format!(
"read written delta ccr failed: {}: {e}",
delta_ccr_path.display()
)
})?;
let delta_decoded = decode_content_info(&delta_ccr_bytes)
.map_err(|e| format!("decode written delta ccr failed: {e}"))?;
let delta_verify = verify_content_info(&delta_decoded)
.map_err(|e| format!("verify delta ccr failed: {e}"))?;
let delta_vrp_rows = build_vrp_compare_rows(&delta_out.tree.vrps, &trust_anchor);
let delta_vap_rows = build_vap_compare_rows(&delta_out.tree.aspas, &trust_anchor);
let (delta_ccr_vrps, delta_ccr_vaps) =
decode_ccr_compare_views(&delta_decoded, &trust_anchor)?;
if delta_vrp_rows != delta_ccr_vrps {
return Err("record-delta.csv compare view does not match delta.ccr".to_string());
}
if delta_vap_rows != delta_ccr_vaps {
return Err("record-delta-vaps.csv compare view does not match delta.ccr".to_string());
}
write_vrp_csv(&rir_dir.join("record-delta.csv"), &delta_vrp_rows)?;
write_vap_csv(&rir_dir.join("record-delta-vaps.csv"), &delta_vap_rows)?;
write_timing_json(
&rir_dir.join("timings").join("delta-produce.json"),
"delta",
delta_time,
delta_duration,
)?;
metadata.delta_ccr_sha256 = Some(sha256_hex(&delta_ccr_bytes));
metadata.delta_vrp_count = Some(delta_vrp_rows.len());
metadata.delta_vap_count = Some(delta_vap_rows.len());
metadata.has_aspa = metadata.has_aspa || !delta_vap_rows.is_empty();
metadata.has_router_key = metadata.has_router_key || delta_verify.router_key_count > 0;
verification["delta"] = serde_json::json!({
"validationTime": delta_time
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
"ccr": {
"path": "delta.ccr",
"sha256": metadata.delta_ccr_sha256.clone().expect("delta sha must exist"),
"stateHashesOk": delta_verify.state_hashes_ok,
"manifestInstances": delta_verify.manifest_instances,
"roaVrpCount": delta_verify.roa_vrp_count,
"aspaPayloadSets": delta_verify.aspa_payload_sets,
"routerKeyCount": delta_verify.router_key_count,
},
"compareViews": {
"vrpsSelfMatch": true,
"vapsSelfMatch": true,
"deltaVrpCount": metadata.delta_vrp_count,
"deltaVapCount": metadata.delta_vap_count,
}
});
let _ = fs::remove_dir_all(&delta_db_dir);
}
write_json(&rir_dir.join("bundle.json"), &metadata)?;
write_json(&rir_dir.join("verification.json"), &verification)?;
write_top_readme(&run_root.join("README.md"), rir)?;
write_rir_readme(
&rir_dir.join("README.md"),
rir,
&metadata.base_validation_time,
)?;
let bundle_manifest = BundleManifest {
schema_version: "20260330-v1".to_string(),
bundle_producer: "ours".to_string(),
recorded_at_rfc3339_utc: time::OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at failed: {e}"))?,
rirs: vec![rir_normalized.clone()],
per_rir_bundles: vec![BundleManifestEntry {
rir: rir_normalized.clone(),
relative_path: rir_normalized,
base_validation_time: metadata.base_validation_time.clone(),
delta_validation_time: metadata.delta_validation_time.clone(),
has_aspa: metadata.has_aspa,
}],
};
write_json(&run_root.join("bundle-manifest.json"), &bundle_manifest)?;
let _ = fs::remove_dir_all(&db_dir);
Ok(run_root.clone())
}
fn main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
let out = run(args)?;
println!("{}", out.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn skip_heavy_blackbox_test() -> bool {
std::env::var_os("RPKI_SKIP_HEAVY_BLACKBOX_TESTS").is_some()
}
#[test]
fn parse_args_requires_required_flags() {
let argv = vec![
"replay_bundle_record".to_string(),
"--rir".to_string(),
"apnic".to_string(),
"--out-dir".to_string(),
"out".to_string(),
"--tal-path".to_string(),
"tal".to_string(),
"--ta-path".to_string(),
"ta".to_string(),
"--payload-replay-archive".to_string(),
"archive".to_string(),
"--payload-replay-locks".to_string(),
"locks.json".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert_eq!(args.rir.as_deref(), Some("apnic"));
assert_eq!(args.out_dir.as_deref(), Some(Path::new("out")));
}
#[test]
fn parse_args_rejects_missing_requireds() {
let argv = vec!["replay_bundle_record".to_string()];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("--rir is required"), "{err}");
}
#[test]
fn load_validation_time_reads_top_level_validation_time() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("locks.json");
std::fs::write(&path, r#"{"validationTime":"2026-03-16T11:49:15+08:00"}"#)
.expect("write locks");
let got = load_validation_time(&path).expect("load validation time");
assert_eq!(
got.format(&Rfc3339).expect("format"),
"2026-03-16T11:49:15+08:00"
);
}
#[test]
fn copy_dir_all_copies_nested_tree() {
let dir = tempdir().expect("tempdir");
let src = dir.path().join("src");
let dst = dir.path().join("dst");
std::fs::create_dir_all(src.join("sub")).expect("mkdir");
std::fs::write(src.join("a.txt"), b"a").expect("write a");
std::fs::write(src.join("sub").join("b.txt"), b"b").expect("write b");
copy_dir_all(&src, &dst).expect("copy dir");
assert_eq!(std::fs::read(dst.join("a.txt")).expect("read a"), b"a");
assert_eq!(
std::fs::read(dst.join("sub").join("b.txt")).expect("read b"),
b"b"
);
}
#[test]
fn run_base_bundle_record_smoke_root_only_apnic() {
if skip_heavy_blackbox_test() {
return;
}
let tal_path = PathBuf::from("tests/fixtures/tal/apnic-rfc7730-https.tal");
let ta_path = PathBuf::from("tests/fixtures/ta/apnic-ta.cer");
let replay_archive = PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/base-payload-archive",
);
let replay_locks = PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/base-locks.json",
);
let delta_archive = PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/payload-delta-archive",
);
let delta_locks = PathBuf::from(
"/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3/apnic/locks-delta.json",
);
let required = [
tal_path.as_path(),
ta_path.as_path(),
replay_archive.as_path(),
replay_locks.as_path(),
delta_archive.as_path(),
delta_locks.as_path(),
];
if let Some(missing) = required.iter().find(|path| !path.exists()) {
eprintln!(
"skipping replay_bundle_record smoke test; fixture missing: {}",
missing.display()
);
return;
}
let dir = tempdir().expect("tempdir");
let out_dir = dir.path().join("bundle");
let out = run(Args {
rir: Some("apnic".to_string()),
out_dir: Some(out_dir.clone()),
tal_path: Some(tal_path),
ta_path: Some(ta_path),
payload_replay_archive: Some(replay_archive),
payload_replay_locks: Some(replay_locks),
payload_delta_archive: Some(delta_archive),
payload_delta_locks: Some(delta_locks),
validation_time: None,
max_depth: Some(0),
max_instances: Some(1),
trust_anchor: Some("apnic".to_string()),
})
.expect("run bundle record");
assert_eq!(out, out_dir);
assert!(out_dir.join("bundle-manifest.json").is_file());
assert!(out_dir.join("README.md").is_file());
assert!(out_dir.join("apnic").join("bundle.json").is_file());
assert!(out_dir.join("apnic").join("tal.tal").is_file());
assert!(out_dir.join("apnic").join("ta.cer").is_file());
assert!(out_dir.join("apnic").join("base-payload-archive").is_dir());
assert!(out_dir.join("apnic").join("base-locks.json").is_file());
assert!(out_dir.join("apnic").join("base.ccr").is_file());
assert!(out_dir.join("apnic").join("base-vrps.csv").is_file());
assert!(out_dir.join("apnic").join("base-vaps.csv").is_file());
assert!(out_dir.join("apnic").join("delta.ccr").is_file());
assert!(out_dir.join("apnic").join("record-delta.csv").is_file());
assert!(
out_dir
.join("apnic")
.join("record-delta-vaps.csv")
.is_file()
);
assert!(out_dir.join("apnic").join("verification.json").is_file());
let bundle_json: serde_json::Value = serde_json::from_slice(
&std::fs::read(out_dir.join("apnic").join("bundle.json")).expect("read bundle.json"),
)
.expect("parse bundle.json");
assert_eq!(bundle_json["bundleProducer"], "ours");
assert_eq!(bundle_json["rir"], "apnic");
assert!(bundle_json.get("baseVrpCount").is_some());
assert!(bundle_json.get("baseCcrSha256").is_some());
assert!(bundle_json.get("deltaVrpCount").is_some());
assert!(bundle_json.get("deltaCcrSha256").is_some());
let base_locks_bytes = std::fs::read(out_dir.join("apnic").join("base-locks.json"))
.expect("read emitted base locks");
let expected_base_locks_sha = sha256_hex(&base_locks_bytes);
let delta_locks_json: serde_json::Value = serde_json::from_slice(
&std::fs::read(out_dir.join("apnic").join("locks-delta.json"))
.expect("read delta locks"),
)
.expect("parse delta locks");
assert_eq!(delta_locks_json["baseLocksSha256"], expected_base_locks_sha);
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,24 +0,0 @@
pub mod compare_view;
pub mod live_capture;
pub mod record_io;
pub mod spec;
pub use compare_view::{
VapCompareRow, VrpCompareRow, build_vap_compare_rows, build_vrp_compare_rows,
canonical_vrp_prefix, decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
pub use live_capture::{
LiveBaseCaptureSummary, LiveDeltaCaptureSummary, RecordedHttpResponse, RecordedRsyncFetch,
RecordingHttpFetcher, RecordingRsyncFetcher, write_current_replay_state_locks,
write_live_base_replay_bundle_inputs, write_live_delta_replay_bundle_inputs,
write_live_delta_replay_step_inputs,
};
pub use record_io::{
build_single_rir_bundle_manifest, copy_dir_all, load_validation_time, sha256_hex, write_bytes,
write_json, write_live_bundle_rir_readme, write_live_bundle_top_readme, write_timing_json,
};
pub use spec::{
BaseBundleStateMetadataV2, BundleManifestEntryV2, BundleManifestV2, DeltaSequenceMetadataV2,
DeltaStepMetadataV2, RirBundleMetadataV2,
};
pub use spec::{BundleManifest, BundleManifestEntry, RirBundleMetadata};

View File

@ -1,274 +0,0 @@
use std::fs;
use std::path::Path;
use serde::Serialize;
use sha2::Digest;
use time::format_description::well_known::Rfc3339;
use super::{BundleManifest, BundleManifestEntry};
pub fn sha256_hex(bytes: &[u8]) -> String {
hex::encode(sha2::Sha256::digest(bytes))
}
pub fn write_json(path: &Path, value: &impl Serialize) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
let bytes = serde_json::to_vec_pretty(value).map_err(|e| e.to_string())?;
fs::write(path, bytes).map_err(|e| format!("write json failed: {}: {e}", path.display()))
}
pub fn write_timing_json(
path: &Path,
mode: &str,
validation_time: &time::OffsetDateTime,
duration: std::time::Duration,
) -> Result<(), String> {
write_json(
path,
&serde_json::json!({
"mode": mode,
"validationTime": validation_time
.format(&Rfc3339)
.map_err(|e| format!("format validation time failed: {e}"))?,
"durationSeconds": duration.as_secs_f64(),
}),
)
}
pub fn write_live_bundle_top_readme(path: &Path, rir: &str) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::write(
path,
format!(
"# Ours Live Replay Bundle\n\nThis run contains one per-RIR bundle recorded online by `ours`.\n\n- RIR: `{rir}`\n- Reference result format: `CCR`\n"
),
)
.map_err(|e| format!("write readme failed: {}: {e}", path.display()))
}
pub fn write_live_bundle_rir_readme(
path: &Path,
rir: &str,
base_validation_time: &str,
) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::write(
path,
format!(
"# {rir} live replay bundle\n\n- `tal.tal` and `ta.cer` are the actual live run inputs.\n- `base-locks.json.validationTime` = `{base_validation_time}`.\n- `base.ccr` is the authoritative reference result.\n- `base-vrps.csv` and `base-vaps.csv` are compare views derived from `base.ccr`.\n"
),
)
.map_err(|e| format!("write rir readme failed: {}: {e}", path.display()))
}
pub fn write_bytes(path: &Path, bytes: &[u8]) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::write(path, bytes).map_err(|e| format!("write file failed: {}: {e}", path.display()))
}
pub fn copy_dir_all(src: &Path, dst: &Path) -> Result<(), String> {
fs::create_dir_all(dst)
.map_err(|e| format!("create directory failed: {}: {e}", dst.display()))?;
for entry in
fs::read_dir(src).map_err(|e| format!("read_dir failed: {}: {e}", src.display()))?
{
let entry = entry.map_err(|e| format!("read_dir entry failed: {}: {e}", src.display()))?;
let ty = entry
.file_type()
.map_err(|e| format!("file_type failed: {}: {e}", entry.path().display()))?;
let to = dst.join(entry.file_name());
if ty.is_dir() {
copy_dir_all(&entry.path(), &to)?;
} else if ty.is_file() {
if let Some(parent) = to.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::copy(entry.path(), &to).map_err(|e| {
format!(
"copy failed: {} -> {}: {e}",
entry.path().display(),
to.display()
)
})?;
}
}
Ok(())
}
pub fn load_validation_time(path: &Path) -> Result<time::OffsetDateTime, String> {
let json: serde_json::Value = serde_json::from_slice(
&fs::read(path).map_err(|e| format!("read json failed: {}: {e}", path.display()))?,
)
.map_err(|e| format!("parse json failed: {}: {e}", path.display()))?;
let value = json
.get("validationTime")
.or_else(|| json.get("validation_time"))
.and_then(|v| v.as_str())
.ok_or_else(|| format!("validationTime missing in {}", path.display()))?;
time::OffsetDateTime::parse(value, &Rfc3339)
.map_err(|e| format!("invalid validationTime in {}: {e}", path.display()))
}
pub fn build_single_rir_bundle_manifest(
schema_version: &str,
bundle_producer: &str,
rir: &str,
base_validation_time: &time::OffsetDateTime,
delta_validation_time: Option<&time::OffsetDateTime>,
has_aspa: bool,
) -> Result<BundleManifest, String> {
Ok(BundleManifest {
schema_version: schema_version.to_string(),
bundle_producer: bundle_producer.to_string(),
recorded_at_rfc3339_utc: time::OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| format!("format recorded_at failed: {e}"))?,
rirs: vec![rir.to_string()],
per_rir_bundles: vec![BundleManifestEntry {
rir: rir.to_string(),
relative_path: rir.to_string(),
base_validation_time: base_validation_time
.format(&Rfc3339)
.map_err(|e| format!("format base validation time failed: {e}"))?,
delta_validation_time: match delta_validation_time {
Some(value) => Some(
value
.format(&Rfc3339)
.map_err(|e| format!("format delta validation time failed: {e}"))?,
),
None => None,
},
has_aspa,
}],
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn load_validation_time_reads_validation_time_field() {
let td = tempdir().expect("tempdir");
let path = td.path().join("locks.json");
fs::write(&path, r#"{"validationTime":"2026-04-01T00:00:00Z"}"#).expect("write");
let parsed = load_validation_time(&path).expect("load");
assert_eq!(
parsed.format(&Rfc3339).expect("format"),
"2026-04-01T00:00:00Z"
);
}
#[test]
fn copy_dir_all_copies_nested_files() {
let td = tempdir().expect("tempdir");
let src = td.path().join("src");
let dst = td.path().join("dst");
fs::create_dir_all(src.join("nested")).expect("mkdir");
fs::write(src.join("root.txt"), b"root").expect("write root");
fs::write(src.join("nested/child.txt"), b"child").expect("write child");
copy_dir_all(&src, &dst).expect("copy");
assert_eq!(fs::read(dst.join("root.txt")).expect("read root"), b"root");
assert_eq!(
fs::read(dst.join("nested/child.txt")).expect("read child"),
b"child"
);
}
#[test]
fn build_single_rir_bundle_manifest_formats_times() {
let base = time::OffsetDateTime::parse("2026-04-01T00:00:00Z", &Rfc3339).expect("base");
let delta = time::OffsetDateTime::parse("2026-04-01T00:10:00Z", &Rfc3339).expect("delta");
let manifest = build_single_rir_bundle_manifest(
"20260330-v1",
"ours",
"apnic",
&base,
Some(&delta),
true,
)
.expect("manifest");
assert_eq!(manifest.schema_version, "20260330-v1");
assert_eq!(manifest.rirs, vec!["apnic".to_string()]);
assert_eq!(
manifest.per_rir_bundles[0].base_validation_time,
"2026-04-01T00:00:00Z"
);
assert_eq!(
manifest.per_rir_bundles[0].delta_validation_time.as_deref(),
Some("2026-04-01T00:10:00Z")
);
}
#[test]
fn write_json_and_write_bytes_create_parent_directories() {
let td = tempdir().expect("tempdir");
let json_path = td.path().join("nested/meta/data.json");
write_json(&json_path, &serde_json::json!({"ok": true})).expect("write json");
let json: serde_json::Value =
serde_json::from_slice(&fs::read(&json_path).expect("read json")).expect("parse");
assert_eq!(json["ok"], true);
let bytes_path = td.path().join("nested/raw/file.bin");
write_bytes(&bytes_path, b"payload").expect("write bytes");
assert_eq!(fs::read(&bytes_path).expect("read bytes"), b"payload");
}
#[test]
fn write_timing_and_readmes_emit_expected_text() {
let td = tempdir().expect("tempdir");
let timing_path = td.path().join("timings/base-produce.json");
let validation_time =
time::OffsetDateTime::parse("2026-04-01T00:00:00Z", &Rfc3339).expect("time");
write_timing_json(
&timing_path,
"base",
&validation_time,
std::time::Duration::from_secs_f64(1.25),
)
.expect("write timing");
let timing: serde_json::Value =
serde_json::from_slice(&fs::read(&timing_path).expect("read timing")).expect("parse");
assert_eq!(timing["mode"], "base");
assert_eq!(timing["validationTime"], "2026-04-01T00:00:00Z");
assert_eq!(timing["durationSeconds"], 1.25);
let top_readme = td.path().join("README.md");
write_live_bundle_top_readme(&top_readme, "apnic").expect("write top readme");
let top_text = fs::read_to_string(&top_readme).expect("read top readme");
assert!(top_text.contains("RIR: `apnic`"));
assert!(top_text.contains("Reference result format: `CCR`"));
let rir_readme = td.path().join("apnic/README.md");
write_live_bundle_rir_readme(&rir_readme, "apnic", "2026-04-01T00:00:00Z")
.expect("write rir readme");
let rir_text = fs::read_to_string(&rir_readme).expect("read rir readme");
assert!(rir_text.contains("base-locks.json.validationTime"));
assert!(rir_text.contains("base-vrps.csv"));
assert!(rir_text.contains("base-vaps.csv"));
}
#[test]
fn build_single_rir_bundle_manifest_supports_none_delta_time() {
let base = time::OffsetDateTime::parse("2026-04-01T00:00:00Z", &Rfc3339).expect("base");
let manifest =
build_single_rir_bundle_manifest("20260330-v1", "ours", "afrinic", &base, None, false)
.expect("manifest");
assert_eq!(manifest.per_rir_bundles[0].delta_validation_time, None);
assert!(!manifest.per_rir_bundles[0].has_aspa);
}
}

View File

@ -1,185 +0,0 @@
use serde::Serialize;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifest {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
#[serde(rename = "recordedAt")]
pub recorded_at_rfc3339_utc: String,
pub rirs: Vec<String>,
#[serde(rename = "perRirBundles")]
pub per_rir_bundles: Vec<BundleManifestEntry>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifestEntry {
pub rir: String,
pub relative_path: String,
#[serde(rename = "baseValidationTime")]
pub base_validation_time: String,
#[serde(
rename = "deltaValidationTime",
skip_serializing_if = "Option::is_none"
)]
pub delta_validation_time: Option<String>,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct RirBundleMetadata {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
pub rir: String,
#[serde(rename = "baseValidationTime")]
pub base_validation_time: String,
#[serde(
rename = "deltaValidationTime",
skip_serializing_if = "Option::is_none"
)]
pub delta_validation_time: Option<String>,
#[serde(rename = "talSha256")]
pub tal_sha256: String,
#[serde(rename = "taCertSha256")]
pub ta_cert_sha256: String,
#[serde(rename = "baseCcrSha256")]
pub base_ccr_sha256: String,
#[serde(rename = "deltaCcrSha256", skip_serializing_if = "Option::is_none")]
pub delta_ccr_sha256: Option<String>,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
#[serde(rename = "hasRouterKey")]
pub has_router_key: bool,
#[serde(rename = "baseVrpCount")]
pub base_vrp_count: usize,
#[serde(rename = "baseVapCount")]
pub base_vap_count: usize,
#[serde(rename = "deltaVrpCount", skip_serializing_if = "Option::is_none")]
pub delta_vrp_count: Option<usize>,
#[serde(rename = "deltaVapCount", skip_serializing_if = "Option::is_none")]
pub delta_vap_count: Option<usize>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifestV2 {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
#[serde(rename = "recordedAt")]
pub recorded_at_rfc3339_utc: String,
pub rirs: Vec<String>,
#[serde(rename = "perRirBundles")]
pub per_rir_bundles: Vec<BundleManifestEntryV2>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BundleManifestEntryV2 {
pub rir: String,
pub relative_path: String,
#[serde(rename = "baseValidationTime")]
pub base_validation_time: String,
#[serde(rename = "stepCount")]
pub step_count: usize,
#[serde(
rename = "firstDeltaValidationTime",
skip_serializing_if = "Option::is_none"
)]
pub first_delta_validation_time: Option<String>,
#[serde(
rename = "lastDeltaValidationTime",
skip_serializing_if = "Option::is_none"
)]
pub last_delta_validation_time: Option<String>,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct BaseBundleStateMetadataV2 {
#[serde(rename = "validationTime")]
pub validation_time: String,
#[serde(rename = "ccrSha256")]
pub ccr_sha256: String,
#[serde(rename = "vrpCount")]
pub vrp_count: usize,
#[serde(rename = "vapCount")]
pub vap_count: usize,
#[serde(rename = "relativeArchivePath")]
pub relative_archive_path: String,
#[serde(rename = "relativeLocksPath")]
pub relative_locks_path: String,
#[serde(rename = "relativeCcrPath")]
pub relative_ccr_path: String,
#[serde(rename = "relativeVrpsPath")]
pub relative_vrps_path: String,
#[serde(rename = "relativeVapsPath")]
pub relative_vaps_path: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct DeltaStepMetadataV2 {
pub index: usize,
pub id: String,
#[serde(rename = "relativePath")]
pub relative_path: String,
#[serde(rename = "baseRef")]
pub base_ref: String,
#[serde(rename = "validationTime")]
pub validation_time: String,
#[serde(rename = "deltaCcrSha256")]
pub delta_ccr_sha256: String,
#[serde(rename = "vrpCount")]
pub vrp_count: usize,
#[serde(rename = "vapCount")]
pub vap_count: usize,
#[serde(rename = "relativeArchivePath")]
pub relative_archive_path: String,
#[serde(rename = "relativeTransitionLocksPath")]
pub relative_transition_locks_path: String,
#[serde(rename = "relativeTargetLocksPath")]
pub relative_target_locks_path: String,
#[serde(rename = "relativeCcrPath")]
pub relative_ccr_path: String,
#[serde(rename = "relativeVrpsPath")]
pub relative_vrps_path: String,
#[serde(rename = "relativeVapsPath")]
pub relative_vaps_path: String,
#[serde(rename = "hasAspa")]
pub has_aspa: bool,
#[serde(rename = "hasRouterKey")]
pub has_router_key: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct DeltaSequenceMetadataV2 {
#[serde(rename = "configuredDeltaCount")]
pub configured_delta_count: usize,
#[serde(rename = "configuredIntervalSeconds")]
pub configured_interval_seconds: u64,
pub steps: Vec<DeltaStepMetadataV2>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct RirBundleMetadataV2 {
#[serde(rename = "schemaVersion")]
pub schema_version: String,
#[serde(rename = "bundleProducer")]
pub bundle_producer: String,
pub rir: String,
#[serde(rename = "talSha256")]
pub tal_sha256: String,
#[serde(rename = "taCertSha256")]
pub ta_cert_sha256: String,
#[serde(rename = "hasAnyAspa")]
pub has_any_aspa: bool,
#[serde(rename = "hasAnyRouterKey")]
pub has_any_router_key: bool,
pub base: BaseBundleStateMetadataV2,
#[serde(rename = "deltaSequence")]
pub delta_sequence: DeltaSequenceMetadataV2,
}

View File

@ -2,6 +2,8 @@
pub mod accumulator; pub mod accumulator;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod build; pub mod build;
#[cfg(feature = "full")]
pub mod compare_view;
pub mod decode; pub mod decode;
pub mod dump; pub mod dump;
pub mod encode; pub mod encode;
@ -20,6 +22,11 @@ pub use build::{
build_manifest_state_from_vcirs, build_manifest_state_from_vcirs_with_breakdown, build_manifest_state_from_vcirs, build_manifest_state_from_vcirs_with_breakdown,
build_roa_payload_state, build_router_key_state_from_runtime, build_trust_anchor_state, build_roa_payload_state, build_router_key_state_from_runtime, build_trust_anchor_state,
}; };
#[cfg(feature = "full")]
pub use compare_view::{
VapCompareRow, VrpCompareRow, build_vap_compare_rows, build_vrp_compare_rows,
canonical_vrp_prefix, decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
pub use decode::{CcrDecodeError, decode_content_info}; pub use decode::{CcrDecodeError, decode_content_info};
pub use dump::{CcrDumpError, dump_content_info_json, dump_content_info_json_value}; pub use dump::{CcrDumpError, dump_content_info_json, dump_content_info_json_value};
pub use encode::{CcrEncodeError, encode_content_info}; pub use encode::{CcrEncodeError, encode_content_info};

View File

@ -10,7 +10,7 @@ use crate::audit::{
AspaOutput, AuditRepoSyncStats, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary, AspaOutput, AuditRepoSyncStats, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary,
VrpOutput, format_roa_ip_prefix, VrpOutput, format_roa_ip_prefix,
}; };
use crate::bundle::canonical_vrp_prefix; use crate::ccr::canonical_vrp_prefix;
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig}; use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher}; use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};

View File

@ -12,9 +12,6 @@ pub mod audit_downloads;
pub mod audit_trace; pub mod audit_trace;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod blob_store; pub mod blob_store;
#[cfg(feature = "full")]
pub mod bundle;
#[cfg(feature = "full")]
pub mod cli; pub mod cli;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod current_repo_index; pub mod current_repo_index;

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use rpki::bundle::{VapCompareRow, VrpCompareRow, decode_ccr_compare_views}; use rpki::ccr::{VapCompareRow, VrpCompareRow, decode_ccr_compare_views};
fn fixture(rel: &str) -> PathBuf { fn fixture(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel) PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel)

View File

@ -64,10 +64,9 @@ fn offline_default_parallel_and_configured_phase2_match_compare_views() {
rpki::ccr::decode_content_info(&configured_ccr_bytes).expect("decode configured ccr"); rpki::ccr::decode_content_info(&configured_ccr_bytes).expect("decode configured ccr");
let (default_vrps, default_vaps) = let (default_vrps, default_vaps) =
rpki::bundle::decode_ccr_compare_views(&default_ccr, "apnic") rpki::ccr::decode_ccr_compare_views(&default_ccr, "apnic").expect("default compare view");
.expect("default compare view");
let (configured_vrps, configured_vaps) = let (configured_vrps, configured_vaps) =
rpki::bundle::decode_ccr_compare_views(&configured_ccr, "apnic") rpki::ccr::decode_ccr_compare_views(&configured_ccr, "apnic")
.expect("configured compare view"); .expect("configured compare view");
assert_eq!( assert_eq!(
@ -96,8 +95,7 @@ fn offline_default_parallel_and_configured_phase2_match_compare_views() {
fn offline_default_parallel_emits_online_ccr_accumulator_output() { fn offline_default_parallel_emits_online_ccr_accumulator_output() {
let (report, ccr_bytes) = run_offline_case(None); let (report, ccr_bytes) = run_offline_case(None);
let ccr = rpki::ccr::decode_content_info(&ccr_bytes).expect("decode ccr"); let ccr = rpki::ccr::decode_content_info(&ccr_bytes).expect("decode ccr");
let (_vrps, _vaps) = let (_vrps, _vaps) = rpki::ccr::decode_ccr_compare_views(&ccr, "apnic").expect("compare view");
rpki::bundle::decode_ccr_compare_views(&ccr, "apnic").expect("compare view");
assert!( assert!(
report["publication_points"] report["publication_points"]
.as_array() .as_array()