add delta sync and fail fetch process

This commit is contained in:
yuyr 2026-02-27 18:02:01 +08:00
parent 68cbd3c500
commit 13516c4f73
24 changed files with 4815 additions and 89 deletions

View File

@ -0,0 +1,70 @@
# RPKI Benchmarks (Stage2, selected_der_v2)
This directory contains a reproducible, one-click benchmark to measure **decode + profile validate**
performance for all supported object types and compare **OURS** against the **Routinator baseline**
(`rpki` crate `=0.19.1` with `repository` feature).
## What it measures
Dataset:
- Fixtures: `rpki/tests/benchmark/selected_der_v2/`
- Objects: `cer`, `crl`, `manifest` (`.mft`), `roa`, `aspa` (`.asa`)
- Samples: 10 quantiles per type (`min/p01/p10/p25/p50/p75/p90/p95/p99/max`) → 50 files total
Metrics:
- **decode+validate**: `decode_der` (parse + profile validate) for each object file
- **landing** (OURS only): `PackFile::from_bytes_compute_sha256` + CBOR encode + `RocksDB put_raw`
- **compare**: ratio `ours_ns/op ÷ rout_ns/op` for decode+validate
## Default benchmark settings
Both OURS and Routinator baseline use the same run settings:
- warmup: `10` iterations
- rounds: `3`
- adaptive loop target: `min_round_ms=200` (with an internal max of `1_000_000` iters)
- strict DER: `true` (baseline)
- cert inspect: `false` (baseline)
You can override the settings via environment variables in the runner script:
- `BENCH_WARMUP_ITERS` (default `10`)
- `BENCH_ROUNDS` (default `3`)
- `BENCH_MIN_ROUND_MS` (default `200`)
## One-click run (OURS + Routinator compare)
From the `rpki/` crate directory:
```bash
./scripts/benchmark/run_stage2_selected_der_v2_release.sh
```
Outputs are written under:
- `rpki/target/bench/`
- OURS decode+validate: `stage2_selected_der_v2_decode_release_<TS>.{md,csv}`
- OURS landing: `stage2_selected_der_v2_landing_release_<TS>.{md,csv}`
- Routinator: `stage2_selected_der_v2_routinator_decode_release_<TS>.{md,csv}`
- Compare: `stage2_selected_der_v2_compare_ours_vs_routinator_decode_release_<TS>.{md,csv}`
- Summary: `stage2_selected_der_v2_compare_summary_<TS>.md`
### Why decode and landing are separated
The underlying benchmark can run in `BENCH_MODE=both`, but the **landing** part writes to RocksDB
and may trigger background work (e.g., compactions) that can **skew subsequent decode timings**.
For a fair OURS-vs-Routinator comparison, the runner script:
- runs `BENCH_MODE=decode_validate` for comparison, and
- runs `BENCH_MODE=landing` separately for landing-only numbers.
## Notes
- The Routinator baseline benchmark is implemented in-repo under:
- `rpki/benchmark/routinator_object_bench/`
- It pins `rpki = "=0.19.1"` in its `Cargo.toml`.
- This benchmark is implemented as an `#[ignore]` integration test:
- `rpki/tests/bench_stage2_decode_profile_selected_der_v2.rs`
- The runner script invokes it with `cargo test --release ... -- --ignored --nocapture`.

View File

@ -0,0 +1,123 @@
#!/usr/bin/env bash
set -euo pipefail
# Stage2 (selected_der_v2) decode+profile validate benchmark.
# Runs:
# 1) OURS decode+validate benchmark and writes MD/CSV.
# 2) OURS landing benchmark and writes MD/CSV.
# 3) Routinator baseline decode benchmark (rpki crate =0.19.1).
# 4) Produces a joined compare CSV/MD and a short geomean summary.
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
OUT_DIR="$ROOT_DIR/target/bench"
mkdir -p "$OUT_DIR"
TS="$(date -u +%Y%m%dT%H%M%SZ)"
WARMUP_ITERS="${BENCH_WARMUP_ITERS:-10}"
ROUNDS="${BENCH_ROUNDS:-3}"
MIN_ROUND_MS="${BENCH_MIN_ROUND_MS:-200}"
OURS_MD="$OUT_DIR/stage2_selected_der_v2_decode_release_${TS}.md"
OURS_CSV="$OUT_DIR/stage2_selected_der_v2_decode_release_${TS}.csv"
OURS_LANDING_MD="$OUT_DIR/stage2_selected_der_v2_landing_release_${TS}.md"
OURS_LANDING_CSV="$OUT_DIR/stage2_selected_der_v2_landing_release_${TS}.csv"
ROUT_MD="$OUT_DIR/stage2_selected_der_v2_routinator_decode_release_${TS}.md"
ROUT_CSV="$OUT_DIR/stage2_selected_der_v2_routinator_decode_release_${TS}.csv"
COMPARE_MD="$OUT_DIR/stage2_selected_der_v2_compare_ours_vs_routinator_decode_release_${TS}.md"
COMPARE_CSV="$OUT_DIR/stage2_selected_der_v2_compare_ours_vs_routinator_decode_release_${TS}.csv"
SUMMARY_MD="$OUT_DIR/stage2_selected_der_v2_compare_summary_${TS}.md"
echo "[1/4] OURS: decode+validate benchmark (release)..." >&2
BENCH_MODE="decode_validate" \
BENCH_WARMUP_ITERS="$WARMUP_ITERS" \
BENCH_ROUNDS="$ROUNDS" \
BENCH_MIN_ROUND_MS="$MIN_ROUND_MS" \
BENCH_OUT_MD="$OURS_MD" \
BENCH_OUT_CSV="$OURS_CSV" \
cargo test --release --test bench_stage2_decode_profile_selected_der_v2 -- --ignored --nocapture >/dev/null
echo "[2/4] OURS: landing benchmark (release)..." >&2
BENCH_MODE="landing" \
BENCH_WARMUP_ITERS="$WARMUP_ITERS" \
BENCH_ROUNDS="$ROUNDS" \
BENCH_MIN_ROUND_MS="$MIN_ROUND_MS" \
BENCH_OUT_MD_LANDING="$OURS_LANDING_MD" \
BENCH_OUT_CSV_LANDING="$OURS_LANDING_CSV" \
cargo test --release --test bench_stage2_decode_profile_selected_der_v2 -- --ignored --nocapture >/dev/null
echo "[3/4] Routinator baseline + compare join..." >&2
OURS_CSV="$OURS_CSV" \
ROUT_CSV="$ROUT_CSV" \
ROUT_MD="$ROUT_MD" \
COMPARE_CSV="$COMPARE_CSV" \
COMPARE_MD="$COMPARE_MD" \
WARMUP_ITERS="$WARMUP_ITERS" \
ROUNDS="$ROUNDS" \
MIN_ROUND_MS="$MIN_ROUND_MS" \
scripts/stage2_perf_compare_m4.sh >/dev/null
echo "[4/4] Summary (geomean ratios)..." >&2
python3 - "$COMPARE_CSV" "$SUMMARY_MD" <<'PY'
import csv
import math
import sys
from pathlib import Path
from datetime import datetime, timezone
in_csv = Path(sys.argv[1])
out_md = Path(sys.argv[2])
rows = list(csv.DictReader(in_csv.open(newline="")))
ratios = {}
for r in rows:
ratios.setdefault(r["type"], []).append(float(r["ratio_ours_over_rout"]))
def geomean(vals):
return math.exp(sum(math.log(v) for v in vals) / len(vals))
def p50(vals):
v = sorted(vals)
n = len(v)
if n % 2 == 1:
return v[n // 2]
return (v[n // 2 - 1] + v[n // 2]) / 2.0
all_vals = [float(r["ratio_ours_over_rout"]) for r in rows]
types = ["all"] + sorted(ratios.keys())
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
lines = []
lines.append("# Stage2 selected_der_v2 compare summary (release)\n\n")
lines.append(f"- recorded_at_utc: `{now}`\n")
lines.append(f"- inputs_csv: `{in_csv}`\n\n")
lines.append("| type | n | min | p50 | geomean | max | >1 count |\n")
lines.append("|---|---:|---:|---:|---:|---:|---:|\n")
for t in types:
vals = all_vals if t == "all" else ratios[t]
vals_sorted = sorted(vals)
lines.append(
f"| {t} | {len(vals_sorted)} | {vals_sorted[0]:.4f} | {p50(vals_sorted):.4f} | "
f"{geomean(vals_sorted):.4f} | {vals_sorted[-1]:.4f} | {sum(1 for v in vals_sorted if v>1.0)} |\n"
)
out_md.write_text("".join(lines), encoding="utf-8")
print(out_md)
PY
echo "Done." >&2
echo "- OURS decode MD: $OURS_MD" >&2
echo "- OURS decode CSV: $OURS_CSV" >&2
echo "- OURS landing MD: $OURS_LANDING_MD" >&2
echo "- OURS landing CSV: $OURS_LANDING_CSV" >&2
echo "- Routinator: $ROUT_MD" >&2
echo "- Compare MD: $COMPARE_MD" >&2
echo "- Compare CSV: $COMPARE_CSV" >&2
echo "- Summary MD: $SUMMARY_MD" >&2

View File

@ -0,0 +1,75 @@
# Manual RRDP sync (APNIC-focused)
This directory contains **manual, command-line** scripts to reproduce the workflow described in:
- `specs/develop/20260226/apnic_rrdp_delta_analysis_after_manifest_revalidation_fix_20260227T022606Z.md`
They are meant for **hands-on validation / acceptance runs**, not for CI.
## Prerequisites
- Rust toolchain (`cargo`)
- `rsync` available on PATH (for rsync fallback/objects)
- Network access (RRDP over HTTPS)
## What the scripts do
### `full_sync.sh`
- Creates a fresh RocksDB directory
- Runs a **full serial** validation from a TAL URL (default: APNIC RFC7730 TAL)
- Writes:
- run log
- audit report JSON
- run meta JSON (includes durations)
- short summary Markdown (includes durations)
- RocksDB key statistics (`db_stats --exact`)
- RRDP repo state dump (`rrdp_state_dump`)
### `delta_sync.sh`
- Copies an existing “baseline snapshot DB” to a new DB directory (so the baseline is not modified)
- Runs another validation against the copied DB (RRDP will prefer **delta** when available)
- Produces the same artifacts as `full_sync.sh`
- Additionally generates a Markdown **delta analysis** report by comparing:
- base vs delta report JSON
- base vs delta `rrdp_state_dump` TSV
- and includes a **duration comparison** (base vs delta) if the base meta JSON is available
## Usage
Run from `rpki/`:
```bash
./scripts/manual_sync/full_sync.sh
```
After you have a baseline run, run delta against it:
```bash
./scripts/manual_sync/delta_sync.sh target/live/manual_sync/apnic_full_db_YYYYMMDDTHHMMSSZ \
target/live/manual_sync/apnic_full_report_YYYYMMDDTHHMMSSZ.json
```
If the baseline was produced by `full_sync.sh`, the delta script will auto-discover the base meta JSON
next to the base report (by replacing `_report.json` with `_meta.json`) and include base durations in
the delta analysis report.
## Configuration (env vars)
Both scripts accept overrides via env vars:
- `TAL_URL` (default: APNIC TAL URL)
- `HTTP_TIMEOUT_SECS` (default: 1800)
- `RSYNC_TIMEOUT_SECS` (default: 1800)
- `VALIDATION_TIME` (RFC3339; default: now UTC)
- `OUT_DIR` (default: `rpki/target/live/manual_sync`)
- `RUN_NAME` (default: auto timestamped)
Example:
```bash
TAL_URL="https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal" \
HTTP_TIMEOUT_SECS=1800 RSYNC_TIMEOUT_SECS=1800 \
./scripts/manual_sync/full_sync.sh
```

501
scripts/manual_sync/delta_sync.sh Executable file
View File

@ -0,0 +1,501 @@
#!/usr/bin/env bash
set -euo pipefail
# Delta sync + validation starting from a baseline snapshot DB.
#
# This script:
# 1) Copies BASE_DB_DIR -> DELTA_DB_DIR (so baseline is not modified)
# 2) Runs rpki validation again (RRDP will prefer delta if available)
# 3) Writes artifacts + a markdown delta analysis report
#
# Usage:
# ./scripts/manual_sync/delta_sync.sh <base_db_dir> <base_report_json>
#
# Outputs under OUT_DIR (default: target/live/manual_sync):
# - *_delta_db_* copied RocksDB directory
# - *_delta_report_*.json audit report
# - *_delta_run_*.log stdout/stderr log (includes summary)
# - *_delta_db_stats_*.txt db_stats --exact output
# - *_delta_rrdp_state_*.tsv rrdp_state_dump output
# - *_delta_analysis_*.md base vs delta comparison report
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
BASE_DB_DIR="${1:-}"
BASE_REPORT_JSON="${2:-}"
if [[ -z "${BASE_DB_DIR}" || -z "${BASE_REPORT_JSON}" ]]; then
echo "Usage: $0 <base_db_dir> <base_report_json>" >&2
exit 2
fi
if [[ ! -d "${BASE_DB_DIR}" ]]; then
echo "ERROR: base_db_dir is not a directory: ${BASE_DB_DIR}" >&2
exit 2
fi
if [[ ! -f "${BASE_REPORT_JSON}" ]]; then
echo "ERROR: base_report_json does not exist: ${BASE_REPORT_JSON}" >&2
exit 2
fi
TAL_URL="${TAL_URL:-https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal}"
HTTP_TIMEOUT_SECS="${HTTP_TIMEOUT_SECS:-1800}"
RSYNC_TIMEOUT_SECS="${RSYNC_TIMEOUT_SECS:-1800}"
VALIDATION_TIME="${VALIDATION_TIME:-}"
OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/manual_sync}"
mkdir -p "$OUT_DIR"
TS="$(date -u +%Y%m%dT%H%M%SZ)"
RUN_NAME="${RUN_NAME:-apnic_delta_${TS}}"
DELTA_DB_DIR="${DELTA_DB_DIR:-$OUT_DIR/${RUN_NAME}_db}"
DELTA_REPORT_JSON="${DELTA_REPORT_JSON:-$OUT_DIR/${RUN_NAME}_report.json}"
DELTA_RUN_LOG="${DELTA_RUN_LOG:-$OUT_DIR/${RUN_NAME}_run.log}"
BASE_DB_STATS_TXT="${BASE_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_base_db_stats.txt}"
DELTA_DB_STATS_TXT="${DELTA_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_delta_db_stats.txt}"
BASE_RRDP_STATE_TSV="${BASE_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_base_rrdp_state.tsv}"
DELTA_RRDP_STATE_TSV="${DELTA_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_delta_rrdp_state.tsv}"
DELTA_ANALYSIS_MD="${DELTA_ANALYSIS_MD:-$OUT_DIR/${RUN_NAME}_delta_analysis.md}"
DELTA_META_JSON="${DELTA_META_JSON:-$OUT_DIR/${RUN_NAME}_meta.json}"
# Best-effort base meta discovery (produced by `full_sync.sh`).
BASE_META_JSON="${BASE_META_JSON:-}"
if [[ -z "${BASE_META_JSON}" ]]; then
guess="${BASE_REPORT_JSON%_report.json}_meta.json"
if [[ -f "${guess}" ]]; then
BASE_META_JSON="${guess}"
fi
fi
echo "== rpki manual delta sync ==" >&2
echo "tal_url=$TAL_URL" >&2
echo "base_db=$BASE_DB_DIR" >&2
echo "base_report=$BASE_REPORT_JSON" >&2
echo "delta_db=$DELTA_DB_DIR" >&2
echo "delta_report=$DELTA_REPORT_JSON" >&2
echo "== copying base DB (baseline is not modified) ==" >&2
cp -a "$BASE_DB_DIR" "$DELTA_DB_DIR"
script_start_s="$(date +%s)"
run_start_s="$(date +%s)"
cmd=(cargo run --release --bin rpki -- \
--db "$DELTA_DB_DIR" \
--tal-url "$TAL_URL" \
--http-timeout-secs "$HTTP_TIMEOUT_SECS" \
--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS" \
--report-json "$DELTA_REPORT_JSON")
if [[ -n "${VALIDATION_TIME}" ]]; then
cmd+=(--validation-time "$VALIDATION_TIME")
fi
(
echo "# command:"
printf '%q ' "${cmd[@]}"
echo
echo
"${cmd[@]}"
) 2>&1 | tee "$DELTA_RUN_LOG" >/dev/null
run_end_s="$(date +%s)"
run_duration_s="$((run_end_s - run_start_s))"
echo "== db_stats (exact) ==" >&2
db_stats_start_s="$(date +%s)"
cargo run --release --bin db_stats -- --db "$BASE_DB_DIR" --exact 2>&1 | tee "$BASE_DB_STATS_TXT" >/dev/null
cargo run --release --bin db_stats -- --db "$DELTA_DB_DIR" --exact 2>&1 | tee "$DELTA_DB_STATS_TXT" >/dev/null
db_stats_end_s="$(date +%s)"
db_stats_duration_s="$((db_stats_end_s - db_stats_start_s))"
echo "== rrdp_state_dump ==" >&2
state_start_s="$(date +%s)"
cargo run --release --bin rrdp_state_dump -- --db "$BASE_DB_DIR" >"$BASE_RRDP_STATE_TSV"
cargo run --release --bin rrdp_state_dump -- --db "$DELTA_DB_DIR" >"$DELTA_RRDP_STATE_TSV"
state_end_s="$(date +%s)"
state_duration_s="$((state_end_s - state_start_s))"
script_end_s="$(date +%s)"
total_duration_s="$((script_end_s - script_start_s))"
echo "== delta analysis report ==" >&2
TAL_URL="$TAL_URL" \
BASE_DB_DIR="$BASE_DB_DIR" \
DELTA_DB_DIR="$DELTA_DB_DIR" \
DELTA_RUN_LOG="$DELTA_RUN_LOG" \
VALIDATION_TIME_ARG="$VALIDATION_TIME" \
HTTP_TIMEOUT_SECS="$HTTP_TIMEOUT_SECS" \
RSYNC_TIMEOUT_SECS="$RSYNC_TIMEOUT_SECS" \
RUN_DURATION_S="$run_duration_s" \
DB_STATS_DURATION_S="$db_stats_duration_s" \
STATE_DURATION_S="$state_duration_s" \
TOTAL_DURATION_S="$total_duration_s" \
python3 - "$BASE_REPORT_JSON" "$DELTA_REPORT_JSON" "$BASE_RRDP_STATE_TSV" "$DELTA_RRDP_STATE_TSV" \
"$BASE_DB_STATS_TXT" "$DELTA_DB_STATS_TXT" "$BASE_META_JSON" "$DELTA_META_JSON" "$DELTA_ANALYSIS_MD" <<'PY'
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
base_report_path = Path(sys.argv[1])
delta_report_path = Path(sys.argv[2])
base_state_path = Path(sys.argv[3])
delta_state_path = Path(sys.argv[4])
base_db_stats_path = Path(sys.argv[5])
delta_db_stats_path = Path(sys.argv[6])
base_meta_path_s = sys.argv[7]
delta_meta_path = Path(sys.argv[8])
out_md_path = Path(sys.argv[9])
def load_json(p: Path):
return json.loads(p.read_text(encoding="utf-8"))
def load_optional_json(path_s: str):
if not path_s:
return None
p = Path(path_s)
if not p.exists():
return None
return json.loads(p.read_text(encoding="utf-8"))
def parse_rrdp_state_tsv(p: Path):
# format: "<notify_uri>\t<serial>\t<session_id>"
out = {}
for line in p.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
parts = line.split("\t")
if len(parts) != 3:
raise SystemExit(f"invalid rrdp_state_dump line in {p}: {line!r}")
uri, serial, session = parts
out[uri] = (int(serial), session)
return out
def parse_db_stats(p: Path):
# lines: key=value
out = {}
for line in p.read_text(encoding="utf-8").splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip()
v = v.strip()
if v.isdigit():
out[k] = int(v)
else:
out[k] = v
return out
def warnings_total(rep: dict) -> int:
return len(rep["tree"]["warnings"]) + sum(len(pp["warnings"]) for pp in rep["publication_points"])
def report_summary(rep: dict) -> dict:
return {
"validation_time": rep["meta"]["validation_time_rfc3339_utc"],
"publication_points_processed": rep["tree"]["instances_processed"],
"publication_points_failed": rep["tree"]["instances_failed"],
"rrdp_repos_unique": len({pp.get("rrdp_notification_uri") for pp in rep["publication_points"] if pp.get("rrdp_notification_uri")}),
"vrps": len(rep["vrps"]),
"aspas": len(rep["aspas"]),
"audit_publication_points": len(rep["publication_points"]),
"warnings_total": warnings_total(rep),
}
def count_repo_sync_failed(rep: dict) -> int:
# Best-effort heuristic (we don't currently expose a structured counter in the audit report).
# Keep the match conservative to avoid false positives.
def is_repo_sync_failed(msg: str) -> bool:
m = msg.lower()
return "repo sync failed" in m or "rrdp fetch failed" in m or "rsync fetch failed" in m
n = 0
for w in rep["tree"]["warnings"]:
if is_repo_sync_failed(w.get("message", "")):
n += 1
for pp in rep["publication_points"]:
for w in pp.get("warnings", []):
if is_repo_sync_failed(w.get("message", "")):
n += 1
return n
def pp_manifest_sha(pp: dict) -> str:
# In our audit format, the first object is the manifest (synthetic entry) with sha256 of manifest_bytes.
for o in pp["objects"]:
if o["kind"] == "manifest":
return o["sha256_hex"]
return ""
def pp_objects_by_uri(rep: dict):
m = {}
for pp in rep["publication_points"]:
for o in pp["objects"]:
m[o["rsync_uri"]] = (o["sha256_hex"], o["kind"])
return m
def vrp_set(rep: dict):
return {(v["asn"], v["prefix"], v["max_length"]) for v in rep["vrps"]}
def rfc_refs_str(w: dict) -> str:
refs = w.get("rfc_refs") or []
return ", ".join(refs) if refs else ""
base = load_json(base_report_path)
delta = load_json(delta_report_path)
base_sum = report_summary(base)
delta_sum = report_summary(delta)
base_db = parse_db_stats(base_db_stats_path)
delta_db = parse_db_stats(delta_db_stats_path)
base_state = parse_rrdp_state_tsv(base_state_path)
delta_state = parse_rrdp_state_tsv(delta_state_path)
base_meta = load_optional_json(base_meta_path_s)
delta_meta = {
"recorded_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"tal_url": os.environ["TAL_URL"],
"base_db_dir": os.environ["BASE_DB_DIR"],
"delta_db_dir": os.environ["DELTA_DB_DIR"],
"base_report_json": str(base_report_path),
"delta_report_json": str(delta_report_path),
"delta_run_log": os.environ["DELTA_RUN_LOG"],
"validation_time_arg": os.environ.get("VALIDATION_TIME_ARG",""),
"http_timeout_secs": int(os.environ["HTTP_TIMEOUT_SECS"]),
"rsync_timeout_secs": int(os.environ["RSYNC_TIMEOUT_SECS"]),
"durations_secs": {
"rpki_run": int(os.environ["RUN_DURATION_S"]),
"db_stats_exact": int(os.environ["DB_STATS_DURATION_S"]),
"rrdp_state_dump": int(os.environ["STATE_DURATION_S"]),
"total_script": int(os.environ["TOTAL_DURATION_S"]),
},
}
delta_meta_path.write_text(json.dumps(delta_meta, ensure_ascii=False, indent=2) + "\\n", encoding="utf-8")
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# RRDP state changes
serial_changed = 0
session_changed = 0
serial_deltas = []
for uri, (old_serial, old_sess) in base_state.items():
if uri not in delta_state:
continue
new_serial, new_sess = delta_state[uri]
if new_serial != old_serial:
serial_changed += 1
serial_deltas.append((uri, old_serial, new_serial, new_serial - old_serial))
if new_sess != old_sess:
session_changed += 1
serial_deltas.sort(key=lambda x: x[3], reverse=True)
# Publication point diffs
base_pp = {pp["manifest_rsync_uri"]: pp for pp in base["publication_points"]}
delta_pp = {pp["manifest_rsync_uri"]: pp for pp in delta["publication_points"]}
base_keys = set(base_pp.keys())
delta_keys = set(delta_pp.keys())
new_pp = sorted(delta_keys - base_keys)
missing_pp = sorted(base_keys - delta_keys)
updated_pp = 0
unchanged_pp = 0
for k in sorted(base_keys & delta_keys):
if pp_manifest_sha(base_pp[k]) != pp_manifest_sha(delta_pp[k]):
updated_pp += 1
else:
unchanged_pp += 1
# Cache usage + repo sync failure hints
def source_counts(rep: dict) -> Counter:
c = Counter()
for pp in rep["publication_points"]:
c[pp.get("source","")] += 1
return c
base_sources = source_counts(base)
delta_sources = source_counts(delta)
base_repo_sync_failed = count_repo_sync_failed(base)
delta_repo_sync_failed = count_repo_sync_failed(delta)
def cache_reason_counts(rep: dict) -> Counter:
c = Counter()
for pp in rep.get("publication_points", []):
if pp.get("source") != "fetch_cache_pp":
continue
# Use warning messages as "reason". If missing, emit a fallback bucket.
ws = pp.get("warnings", [])
if not ws:
c["(no warnings recorded)"] += 1
continue
for w in ws:
msg = w.get("message", "").strip() or "(empty warning message)"
c[msg] += 1
return c
base_cache_reasons = cache_reason_counts(base)
delta_cache_reasons = cache_reason_counts(delta)
# Object change stats (by rsync URI, sha256)
base_obj = pp_objects_by_uri(base)
delta_obj = pp_objects_by_uri(delta)
kind_stats = {k: {"added": 0, "changed": 0, "removed": 0} for k in ["manifest","crl","certificate","roa","aspa","other"]}
all_uris = set(base_obj.keys()) | set(delta_obj.keys())
for uri in all_uris:
b = base_obj.get(uri)
d = delta_obj.get(uri)
if b is None and d is not None:
kind_stats[d[1]]["added"] += 1
elif b is not None and d is None:
kind_stats[b[1]]["removed"] += 1
else:
if b[0] != d[0]:
kind_stats[d[1]]["changed"] += 1
# VRP diff
base_v = vrp_set(base)
delta_v = vrp_set(delta)
added_v = delta_v - base_v
removed_v = base_v - delta_v
def fmt_db_stats(db: dict) -> str:
keys = ["raw_objects","rrdp_object_index","fetch_cache_pp","rrdp_state","total"]
out = []
for k in keys:
if k in db:
out.append(f"- `{k}={db[k]}`")
return "\n".join(out) if out else "_(missing db_stats keys)_"
lines = []
lines.append("# APNIC RRDP 增量同步验收manual_sync\n\n")
lines.append(f"时间戳:`{now}`UTC\n\n")
lines.append("## 复现信息\n\n")
lines.append(f"- base_report`{base_report_path}`\n")
lines.append(f"- delta_report`{delta_report_path}`\n")
lines.append(f"- base_db_stats`{base_db_stats_path}`\n")
lines.append(f"- delta_db_stats`{delta_db_stats_path}`\n")
lines.append(f"- base_rrdp_state`{base_state_path}`\n")
lines.append(f"- delta_rrdp_state`{delta_state_path}`\n\n")
lines.append("## 运行结果概览\n\n")
lines.append("| metric | base | delta |\n")
lines.append("|---|---:|---:|\n")
for k in [
"validation_time",
"publication_points_processed",
"publication_points_failed",
"rrdp_repos_unique",
"vrps",
"aspas",
"audit_publication_points",
"warnings_total",
]:
lines.append(f"| {k} | {base_sum[k]} | {delta_sum[k]} |\n")
lines.append("\n")
def dur(meta: dict | None, key: str):
if not meta:
return None
return (meta.get("durations_secs") or {}).get(key)
base_rpki_run = dur(base_meta, "rpki_run")
delta_rpki_run = delta_meta["durations_secs"]["rpki_run"]
base_total = dur(base_meta, "total_script")
delta_total = delta_meta["durations_secs"]["total_script"]
lines.append("## 持续时间seconds\n\n")
lines.append("| step | base | delta |\n")
lines.append("|---|---:|---:|\n")
lines.append(f"| rpki_run | {base_rpki_run if base_rpki_run is not None else 'unknown'} | {delta_rpki_run} |\n")
lines.append(f"| total_script | {base_total if base_total is not None else 'unknown'} | {delta_total} |\n")
lines.append("\n")
if base_meta is None and base_meta_path_s:
lines.append(f"> 注:未能读取 base meta`{base_meta_path_s}`(文件不存在或不可读)。建议用 `full_sync.sh` 生成 baseline 以获得 base 时长对比。\n\n")
lines.append("RocksDB KV`db_stats --exact`\n\n")
lines.append("### 基线base\n\n")
lines.append(fmt_db_stats(base_db) + "\n\n")
lines.append("### 增量delta\n\n")
lines.append(fmt_db_stats(delta_db) + "\n\n")
lines.append("## RRDP 增量是否发生(基于 `rrdp_state` 变化)\n\n")
lines.append(f"- repo_total(base)={len(base_state)}\n")
lines.append(f"- repo_total(delta)={len(delta_state)}\n")
lines.append(f"- serial_changed={serial_changed}\n")
lines.append(f"- session_changed={session_changed}\n\n")
if serial_deltas:
lines.append("serial 增长最大的 10 个 RRDP repoold -> new\n\n")
for uri, old, new, diff in serial_deltas[:10]:
lines.append(f"- `{uri}``{old} -> {new}`+{diff}\n")
lines.append("\n")
lines.append("## 发布点Publication Point变化统计\n\n")
lines.append("以 `manifest_rsync_uri` 作为发布点 key对比 base vs delta\n\n")
lines.append(f"- base PP`{len(base_keys)}`\n")
lines.append(f"- delta PP`{len(delta_keys)}`\n")
lines.append(f"- `new_pp={len(new_pp)}`\n")
lines.append(f"- `missing_pp={len(missing_pp)}`\n")
lines.append(f"- `updated_pp={updated_pp}`\n")
lines.append(f"- `unchanged_pp={unchanged_pp}`\n\n")
lines.append("> 注:`new_pp/missing_pp/updated_pp` 会混入“遍历范围变化”的影响(例如 validation_time 不同、或 base 中存在更多失败 PP。\n\n")
lines.append("## fail fetch / cache 使用情况\n\n")
lines.append(f"- repo sync failed启发式warning contains 'repo sync failed'/'rrdp fetch failed'/'rsync fetch failed'\n")
lines.append(f" - base`{base_repo_sync_failed}`\n")
lines.append(f" - delta`{delta_repo_sync_failed}`\n\n")
lines.append("- source 计数(按 `PublicationPointAudit.source`\n\n")
lines.append(f" - base`{dict(base_sources)}`\n")
lines.append(f" - delta`{dict(delta_sources)}`\n\n")
def render_cache_reasons(title: str, c: Counter) -> str:
if not c:
return f"{title}`0`(未使用 fetch_cache_pp\n\n"
lines = []
total = sum(c.values())
lines.append(f"{title}`{total}`\n\n")
lines.append("Top reasons按 warning message 聚合,可能一条 PP 有多条 warning\n\n")
for msg, n in c.most_common(10):
lines.append(f"- `{n}` × {msg}\n")
lines.append("\n")
return "".join(lines)
lines.append(render_cache_reasons("- base `source=fetch_cache_pp`", base_cache_reasons))
lines.append(render_cache_reasons("- delta `source=fetch_cache_pp`", delta_cache_reasons))
lines.append("## 文件变更统计(按对象类型)\n\n")
lines.append("按 `ObjectAuditEntry.sha256_hex` 对比(同一 rsync URI 前后 hash 变化记为 `~changed`\n\n")
lines.append("| kind | added | changed | removed |\n")
lines.append("|---|---:|---:|---:|\n")
for kind in ["manifest","crl","certificate","roa","aspa","other"]:
st = kind_stats[kind]
lines.append(f"| {kind} | {st['added']} | {st['changed']} | {st['removed']} |\n")
lines.append("\n")
lines.append("## VRP 影响(去重后集合 diff\n\n")
lines.append("以 `(asn, prefix, max_length)` 为 key\n\n")
lines.append(f"- base unique VRP`{len(base_v)}`\n")
lines.append(f"- delta unique VRP`{len(delta_v)}`\n")
lines.append(f"- `added={len(added_v)}`\n")
lines.append(f"- `removed={len(removed_v)}`\n")
lines.append(f"- `net={len(added_v) - len(removed_v)}`\n\n")
out_md_path.write_text("".join(lines), encoding="utf-8")
print(out_md_path)
PY
echo "== done ==" >&2
echo "artifacts:" >&2
echo "- delta db: $DELTA_DB_DIR" >&2
echo "- delta report: $DELTA_REPORT_JSON" >&2
echo "- delta run log: $DELTA_RUN_LOG" >&2
echo "- delta meta json: $DELTA_META_JSON" >&2
echo "- analysis md: $DELTA_ANALYSIS_MD" >&2
echo "- base state tsv: $BASE_RRDP_STATE_TSV" >&2
echo "- delta state tsv: $DELTA_RRDP_STATE_TSV" >&2

164
scripts/manual_sync/full_sync.sh Executable file
View File

@ -0,0 +1,164 @@
#!/usr/bin/env bash
set -euo pipefail
# Full sync + validation from a TAL URL (default: APNIC).
#
# Produces artifacts under OUT_DIR (default: target/live/manual_sync):
# - *_db_* RocksDB directory
# - *_report_*.json audit report
# - *_run_*.log stdout/stderr log (includes summary)
# - *_db_stats_*.txt db_stats --exact output
# - *_rrdp_state_*.tsv rrdp_state_dump output
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
TAL_URL="${TAL_URL:-https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal}"
HTTP_TIMEOUT_SECS="${HTTP_TIMEOUT_SECS:-1800}"
RSYNC_TIMEOUT_SECS="${RSYNC_TIMEOUT_SECS:-1800}"
VALIDATION_TIME="${VALIDATION_TIME:-}"
OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/manual_sync}"
mkdir -p "$OUT_DIR"
TS="$(date -u +%Y%m%dT%H%M%SZ)"
RUN_NAME="${RUN_NAME:-apnic_full_${TS}}"
DB_DIR="${DB_DIR:-$OUT_DIR/${RUN_NAME}_db}"
REPORT_JSON="${REPORT_JSON:-$OUT_DIR/${RUN_NAME}_report.json}"
RUN_LOG="${RUN_LOG:-$OUT_DIR/${RUN_NAME}_run.log}"
DB_STATS_TXT="${DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_db_stats.txt}"
RRDP_STATE_TSV="${RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_rrdp_state.tsv}"
RUN_META_JSON="${RUN_META_JSON:-$OUT_DIR/${RUN_NAME}_meta.json}"
SUMMARY_MD="${SUMMARY_MD:-$OUT_DIR/${RUN_NAME}_summary.md}"
echo "== rpki manual full sync ==" >&2
echo "tal_url=$TAL_URL" >&2
echo "db=$DB_DIR" >&2
echo "report_json=$REPORT_JSON" >&2
echo "out_dir=$OUT_DIR" >&2
cmd=(cargo run --release --bin rpki -- \
--db "$DB_DIR" \
--tal-url "$TAL_URL" \
--http-timeout-secs "$HTTP_TIMEOUT_SECS" \
--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS" \
--report-json "$REPORT_JSON")
if [[ -n "${VALIDATION_TIME}" ]]; then
cmd+=(--validation-time "$VALIDATION_TIME")
fi
script_start_s="$(date +%s)"
run_start_s="$(date +%s)"
(
echo "# command:"
printf '%q ' "${cmd[@]}"
echo
echo
"${cmd[@]}"
) 2>&1 | tee "$RUN_LOG" >/dev/null
run_end_s="$(date +%s)"
run_duration_s="$((run_end_s - run_start_s))"
echo "== db_stats (exact) ==" >&2
db_stats_start_s="$(date +%s)"
cargo run --release --bin db_stats -- --db "$DB_DIR" --exact 2>&1 | tee "$DB_STATS_TXT" >/dev/null
db_stats_end_s="$(date +%s)"
db_stats_duration_s="$((db_stats_end_s - db_stats_start_s))"
echo "== rrdp_state_dump ==" >&2
state_start_s="$(date +%s)"
cargo run --release --bin rrdp_state_dump -- --db "$DB_DIR" >"$RRDP_STATE_TSV"
state_end_s="$(date +%s)"
state_duration_s="$((state_end_s - state_start_s))"
script_end_s="$(date +%s)"
total_duration_s="$((script_end_s - script_start_s))"
echo "== write run meta + summary ==" >&2
TAL_URL="$TAL_URL" \
DB_DIR="$DB_DIR" \
REPORT_JSON="$REPORT_JSON" \
RUN_LOG="$RUN_LOG" \
HTTP_TIMEOUT_SECS="$HTTP_TIMEOUT_SECS" \
RSYNC_TIMEOUT_SECS="$RSYNC_TIMEOUT_SECS" \
VALIDATION_TIME_ARG="$VALIDATION_TIME" \
RUN_DURATION_S="$run_duration_s" \
DB_STATS_DURATION_S="$db_stats_duration_s" \
STATE_DURATION_S="$state_duration_s" \
TOTAL_DURATION_S="$total_duration_s" \
python3 - "$REPORT_JSON" "$RUN_META_JSON" "$SUMMARY_MD" <<'PY'
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
report_path = Path(sys.argv[1])
meta_path = Path(sys.argv[2])
summary_path = Path(sys.argv[3])
rep = json.loads(report_path.read_text(encoding="utf-8"))
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
meta = {
"recorded_at_utc": now,
"tal_url": os.environ["TAL_URL"],
"db_dir": os.environ["DB_DIR"],
"report_json": os.environ["REPORT_JSON"],
"run_log": os.environ["RUN_LOG"],
"validation_time_rfc3339_utc": rep["meta"]["validation_time_rfc3339_utc"],
"http_timeout_secs": int(os.environ["HTTP_TIMEOUT_SECS"]),
"rsync_timeout_secs": int(os.environ["RSYNC_TIMEOUT_SECS"]),
"validation_time_arg": os.environ.get("VALIDATION_TIME_ARG",""),
"durations_secs": {
"rpki_run": int(os.environ["RUN_DURATION_S"]),
"db_stats_exact": int(os.environ["DB_STATS_DURATION_S"]),
"rrdp_state_dump": int(os.environ["STATE_DURATION_S"]),
"total_script": int(os.environ["TOTAL_DURATION_S"]),
},
"counts": {
"publication_points_processed": rep["tree"]["instances_processed"],
"publication_points_failed": rep["tree"]["instances_failed"],
"vrps": len(rep["vrps"]),
"aspas": len(rep["aspas"]),
"audit_publication_points": len(rep["publication_points"]),
},
}
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + "\\n", encoding="utf-8")
lines = []
lines.append("# Manual full sync summary\\n\\n")
lines.append(f"- recorded_at_utc: `{now}`\\n")
lines.append(f"- tal_url: `{meta['tal_url']}`\\n")
lines.append(f"- db: `{meta['db_dir']}`\\n")
lines.append(f"- report_json: `{meta['report_json']}`\\n")
lines.append(f"- validation_time: `{meta['validation_time_rfc3339_utc']}`\\n\\n")
lines.append("## Results\\n\\n")
lines.append("| metric | value |\\n")
lines.append("|---|---:|\\n")
for k in ["publication_points_processed","publication_points_failed","vrps","aspas","audit_publication_points"]:
lines.append(f"| {k} | {meta['counts'][k]} |\\n")
lines.append("\\n")
lines.append("## Durations (seconds)\\n\\n")
lines.append("| step | seconds |\\n")
lines.append("|---|---:|\\n")
for k,v in meta["durations_secs"].items():
lines.append(f"| {k} | {v} |\\n")
lines.append("\\n")
summary_path.write_text("".join(lines), encoding="utf-8")
print(summary_path)
PY
echo "== done ==" >&2
echo "artifacts:" >&2
echo "- db: $DB_DIR" >&2
echo "- report: $REPORT_JSON" >&2
echo "- run log: $RUN_LOG" >&2
echo "- db stats: $DB_STATS_TXT" >&2
echo "- rrdp state: $RRDP_STATE_TSV" >&2
echo "- meta json: $RUN_META_JSON" >&2
echo "- summary md: $SUMMARY_MD" >&2

125
src/bin/db_stats.rs Normal file
View File

@ -0,0 +1,125 @@
use std::path::PathBuf;
use rocksdb::{ColumnFamilyDescriptor, DB, IteratorMode, Options};
const CF_RAW_OBJECTS: &str = "raw_objects";
const CF_FETCH_CACHE_PP: &str = "fetch_cache_pp";
const CF_RRDP_STATE: &str = "rrdp_state";
const CF_RRDP_OBJECT_INDEX: &str = "rrdp_object_index";
fn enable_blobdb_if_supported(opts: &mut Options) {
// Keep this in sync with `rpki::storage`:
// blob files are CF-level options; readers should open CFs with blob enabled too.
#[allow(dead_code)]
fn _set(opts: &mut Options) {
opts.set_enable_blob_files(true);
opts.set_min_blob_size(1024);
}
_set(opts);
}
fn usage() -> String {
let bin = "db_stats";
format!(
"\
Usage:
{bin} --db <path> [--exact]
Options:
--db <path> RocksDB directory
--exact Iterate to count keys (slower; default uses RocksDB estimates)
--help Show this help
"
)
}
fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cf_opts = Options::default();
enable_blobdb_if_supported(&mut cf_opts);
vec![
ColumnFamilyDescriptor::new(CF_RAW_OBJECTS, cf_opts.clone()),
ColumnFamilyDescriptor::new(CF_FETCH_CACHE_PP, cf_opts.clone()),
ColumnFamilyDescriptor::new(CF_RRDP_STATE, cf_opts.clone()),
ColumnFamilyDescriptor::new(CF_RRDP_OBJECT_INDEX, cf_opts),
]
}
fn estimate_keys(db: &DB, cf_name: &str) -> Result<Option<u64>, Box<dyn std::error::Error>> {
let cf = db
.cf_handle(cf_name)
.ok_or_else(|| format!("missing column family: {cf_name}"))?;
Ok(db.property_int_value_cf(cf, "rocksdb.estimate-num-keys")?)
}
fn exact_keys(db: &DB, cf_name: &str) -> Result<u64, Box<dyn std::error::Error>> {
let cf = db
.cf_handle(cf_name)
.ok_or_else(|| format!("missing column family: {cf_name}"))?;
let mode = IteratorMode::Start;
let mut count = 0u64;
for res in db.iterator_cf(cf, mode) {
res?;
count += 1;
}
Ok(count)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let argv: Vec<String> = std::env::args().collect();
if argv.iter().any(|a| a == "--help" || a == "-h") {
print!("{}", usage());
return Ok(());
}
let mut db_path: Option<PathBuf> = None;
let mut exact = false;
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
db_path = Some(PathBuf::from(v));
}
"--exact" => exact = true,
other => return Err(format!("unknown argument: {other}\n\n{}", usage()).into()),
}
i += 1;
}
let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?;
let mut opts = Options::default();
opts.create_if_missing(false);
opts.create_missing_column_families(false);
let db = DB::open_cf_descriptors(&opts, &db_path, cf_descriptors())?;
let cf_names = [
CF_RAW_OBJECTS,
CF_FETCH_CACHE_PP,
CF_RRDP_STATE,
CF_RRDP_OBJECT_INDEX,
];
println!("db={}", db_path.display());
println!("mode={}", if exact { "exact" } else { "estimate" });
let mut total: u64 = 0;
for name in cf_names {
let n = if exact {
exact_keys(&db, name)?
} else {
estimate_keys(&db, name)?.unwrap_or(0)
};
total = total.saturating_add(n);
println!("{name}={n}");
}
println!("total={total}");
// Also print # of SST files (useful sanity signal).
let live = db.live_files()?;
println!("sst_files={}", live.len());
Ok(())
}

View File

@ -0,0 +1,85 @@
use std::path::PathBuf;
use rocksdb::{ColumnFamilyDescriptor, DB, IteratorMode, Options};
fn enable_blobdb_if_supported(opts: &mut Options) {
// Keep this in sync with `rpki::storage`:
// blob files are CF-level options; readers should open CFs with blob enabled too.
#[allow(dead_code)]
fn _set(opts: &mut Options) {
opts.set_enable_blob_files(true);
opts.set_min_blob_size(1024);
}
_set(opts);
}
fn usage() -> String {
let bin = "rrdp_state_dump";
format!(
"\
Usage:
{bin} --db <path>
Options:
--db <path> RocksDB directory
--help Show this help
"
)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let argv: Vec<String> = std::env::args().collect();
if argv.iter().any(|a| a == "--help" || a == "-h") {
print!("{}", usage());
return Ok(());
}
let mut db_path: Option<PathBuf> = None;
let mut i = 1usize;
while i < argv.len() {
match argv[i].as_str() {
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
db_path = Some(PathBuf::from(v));
}
other => return Err(format!("unknown argument: {other}\n\n{}", usage()).into()),
}
i += 1;
}
let db_path = db_path.ok_or_else(|| format!("--db is required\n\n{}", usage()))?;
let mut opts = Options::default();
opts.create_if_missing(false);
opts.create_missing_column_families(false);
// Open only the column families we need.
let mut cf_opts = Options::default();
enable_blobdb_if_supported(&mut cf_opts);
let cfs = vec![
ColumnFamilyDescriptor::new("raw_objects", cf_opts.clone()),
ColumnFamilyDescriptor::new("fetch_cache_pp", cf_opts.clone()),
ColumnFamilyDescriptor::new("rrdp_state", cf_opts.clone()),
ColumnFamilyDescriptor::new("rrdp_object_index", cf_opts),
];
let db = DB::open_cf_descriptors(&opts, &db_path, cfs)?;
let cf = db
.cf_handle("rrdp_state")
.ok_or("missing column family: rrdp_state")?;
let mut out: Vec<(String, u64, String)> = Vec::new();
for res in db.iterator_cf(cf, IteratorMode::Start) {
let (k, v) = res?;
let k = String::from_utf8_lossy(&k).to_string();
let st = rpki::sync::rrdp::RrdpState::decode(&v)
.map_err(|e| format!("decode rrdp_state failed for {k}: {e}"))?;
out.push((k, st.serial, st.session_id));
}
out.sort_by(|a, b| a.0.cmp(&b.0));
for (k, serial, session) in out {
println!("{k}\t{serial}\t{session}");
}
Ok(())
}

View File

@ -27,6 +27,9 @@ pub struct CliArgs {
pub rsync_local_dir: Option<PathBuf>,
pub http_timeout_secs: u64,
pub rsync_timeout_secs: u64,
pub max_depth: Option<usize>,
pub max_instances: Option<usize>,
pub validation_time: Option<time::OffsetDateTime>,
@ -50,6 +53,8 @@ Options:
--ta-path <path> TA certificate DER file path (offline-friendly)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--http-timeout-secs <n> HTTP fetch timeout seconds (default: 20)
--rsync-timeout-secs <n> rsync I/O timeout seconds (default: 60)
--max-depth <n> Max CA instance depth (0 = root only)
--max-instances <n> Max number of CA instances to process
--validation-time <rfc3339> Validation time in RFC3339 (default: now UTC)
@ -69,6 +74,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut report_json_path: Option<PathBuf> = None;
let mut rsync_local_dir: Option<PathBuf> = None;
let mut http_timeout_secs: u64 = 20;
let mut rsync_timeout_secs: u64 = 60;
let mut max_depth: Option<usize> = None;
let mut max_instances: Option<usize> = None;
let mut validation_time: Option<time::OffsetDateTime> = None;
@ -113,6 +120,20 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let v = argv.get(i).ok_or("--rsync-local-dir requires a value")?;
rsync_local_dir = Some(PathBuf::from(v));
}
"--http-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--http-timeout-secs requires a value")?;
http_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --http-timeout-secs: {v}"))?;
}
"--rsync-timeout-secs" => {
i += 1;
let v = argv.get(i).ok_or("--rsync-timeout-secs requires a value")?;
rsync_timeout_secs = v
.parse::<u64>()
.map_err(|_| format!("invalid --rsync-timeout-secs: {v}"))?;
}
"--max-depth" => {
i += 1;
let v = argv.get(i).ok_or("--max-depth requires a value")?;
@ -166,6 +187,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
policy_path,
report_json_path,
rsync_local_dir,
http_timeout_secs,
rsync_timeout_secs,
max_depth,
max_instances,
validation_time,
@ -289,7 +312,11 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.unwrap_or_else(time::OffsetDateTime::now_utc);
let store = RocksStore::open(&args.db_path).map_err(|e| e.to_string())?;
let http = BlockingHttpFetcher::new(HttpFetcherConfig::default()).map_err(|e| e.to_string())?;
let http = BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: std::time::Duration::from_secs(args.http_timeout_secs.max(1)),
..HttpFetcherConfig::default()
})
.map_err(|e| e.to_string())?;
let config = TreeRunConfig {
max_depth: args.max_depth,
@ -334,7 +361,10 @@ pub fn run(argv: &[String]) -> Result<(), String> {
_ => unreachable!("validated by parse_args"),
}
} else {
let rsync = SystemRsyncFetcher::new(SystemRsyncConfig::default());
let rsync = SystemRsyncFetcher::new(SystemRsyncConfig {
timeout: std::time::Duration::from_secs(args.rsync_timeout_secs.max(1)),
..SystemRsyncConfig::default()
});
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),

View File

@ -11,6 +11,9 @@ use std::collections::HashSet;
const CF_RAW_OBJECTS: &str = "raw_objects";
const CF_FETCH_CACHE_PP: &str = "fetch_cache_pp";
const CF_RRDP_STATE: &str = "rrdp_state";
const CF_RRDP_OBJECT_INDEX: &str = "rrdp_object_index";
const RRDP_OBJECT_INDEX_PREFIX: &[u8] = b"rrdp_obj:";
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FetchCachePpKey(String);
@ -47,6 +50,12 @@ pub mod pack {
pub use super::{FetchCachePpPack, PackDecodeError, PackFile, PackTime};
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RrdpDeltaOp {
Upsert { rsync_uri: String, bytes: Vec<u8> },
Delete { rsync_uri: String },
}
impl RocksStore {
pub fn open(path: &Path) -> StorageResult<Self> {
let mut base_opts = Options::default();
@ -56,13 +65,24 @@ impl RocksStore {
// Prefer conservative compression; may be overridden later.
base_opts.set_compression_type(DBCompressionType::Lz4);
// Best-effort BlobDB enablement (ignored if bindings don't support it).
// Blob files / BlobDB enablement.
//
// IMPORTANT: `enable_blob_files` is a *column family option* in RocksDB. Setting it only
// on the DB's base options is not sufficient; every CF that stores values must enable it.
enable_blobdb_if_supported(&mut base_opts);
fn cf_opts() -> Options {
let mut opts = Options::default();
opts.set_compression_type(DBCompressionType::Lz4);
enable_blobdb_if_supported(&mut opts);
opts
}
let cfs = vec![
ColumnFamilyDescriptor::new(CF_RAW_OBJECTS, Options::default()),
ColumnFamilyDescriptor::new(CF_FETCH_CACHE_PP, Options::default()),
ColumnFamilyDescriptor::new(CF_RRDP_STATE, Options::default()),
ColumnFamilyDescriptor::new(CF_RAW_OBJECTS, cf_opts()),
ColumnFamilyDescriptor::new(CF_FETCH_CACHE_PP, cf_opts()),
ColumnFamilyDescriptor::new(CF_RRDP_STATE, cf_opts()),
ColumnFamilyDescriptor::new(CF_RRDP_OBJECT_INDEX, cf_opts()),
];
let db = DB::open_cf_descriptors(&base_opts, path, cfs)
@ -148,6 +168,167 @@ impl RocksStore {
Ok(())
}
fn rrdp_object_index_key(notification_uri: &str, rsync_uri: &str) -> Vec<u8> {
let mut out = Vec::with_capacity(
RRDP_OBJECT_INDEX_PREFIX.len() + notification_uri.len() + 1 + rsync_uri.len(),
);
out.extend_from_slice(RRDP_OBJECT_INDEX_PREFIX);
out.extend_from_slice(notification_uri.as_bytes());
out.push(0);
out.extend_from_slice(rsync_uri.as_bytes());
out
}
fn rrdp_object_index_prefix(notification_uri: &str) -> Vec<u8> {
let mut out =
Vec::with_capacity(RRDP_OBJECT_INDEX_PREFIX.len() + notification_uri.len() + 1);
out.extend_from_slice(RRDP_OBJECT_INDEX_PREFIX);
out.extend_from_slice(notification_uri.as_bytes());
out.push(0);
out
}
#[allow(dead_code)]
pub fn rrdp_object_index_contains(
&self,
notification_uri: &str,
rsync_uri: &str,
) -> StorageResult<bool> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri);
Ok(self
.db
.get_cf(cf, k)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.is_some())
}
#[allow(dead_code)]
pub fn rrdp_object_index_iter(
&self,
notification_uri: &str,
) -> StorageResult<impl Iterator<Item = String> + '_> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let prefix = Self::rrdp_object_index_prefix(notification_uri);
let prefix_len = prefix.len();
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
Ok(self
.db
.iterator_cf(cf, mode)
.take_while(move |res| match res {
Ok((k, _v)) => k.starts_with(prefix.as_slice()),
Err(_) => false,
})
.filter_map(move |res| {
let (k, _v) = res.ok()?;
let rsync_part = k.get(prefix_len..)?;
let s = std::str::from_utf8(rsync_part).ok()?;
Some(s.to_string())
}))
}
#[allow(dead_code)]
pub fn rrdp_object_index_clear(&self, notification_uri: &str) -> StorageResult<usize> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let prefix = Self::rrdp_object_index_prefix(notification_uri);
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let keys: Vec<Box<[u8]>> = self
.db
.iterator_cf(cf, mode)
.take_while(|res| match res {
Ok((k, _v)) => k.starts_with(prefix.as_slice()),
Err(_) => false,
})
.filter_map(|res| res.ok().map(|(k, _v)| k))
.collect();
if keys.is_empty() {
return Ok(0);
}
let mut batch = WriteBatch::default();
for k in &keys {
batch.delete_cf(cf, k);
}
self.write_batch(batch)?;
Ok(keys.len())
}
/// Apply an RRDP snapshot as a complete repository state for this `notification_uri`.
///
/// This updates:
/// - `raw_objects` (publish all objects, delete objects that were present in the previous
/// snapshot state but absent from this snapshot)
/// - `rrdp_object_index` membership (used to scope deletes to this RRDP repository)
///
/// RFC 8182 §3.5.2.1: snapshots reflect the complete and current repository contents.
pub fn apply_rrdp_snapshot(
&self,
notification_uri: &str,
published: &[(String, Vec<u8>)],
) -> StorageResult<usize> {
let raw_cf = self.cf(CF_RAW_OBJECTS)?;
let idx_cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let mut new_set: HashSet<&str> = HashSet::with_capacity(published.len());
for (u, _b) in published {
new_set.insert(u.as_str());
}
let old_uris: Vec<String> = self.rrdp_object_index_iter(notification_uri)?.collect();
let mut batch = WriteBatch::default();
for old in &old_uris {
if !new_set.contains(old.as_str()) {
batch.delete_cf(raw_cf, old.as_bytes());
let k = Self::rrdp_object_index_key(notification_uri, old.as_str());
batch.delete_cf(idx_cf, k);
}
}
for (uri, bytes) in published {
batch.put_cf(raw_cf, uri.as_bytes(), bytes.as_slice());
let k = Self::rrdp_object_index_key(notification_uri, uri.as_str());
batch.put_cf(idx_cf, k, b"");
}
self.write_batch(batch)?;
Ok(published.len())
}
pub fn apply_rrdp_delta(
&self,
notification_uri: &str,
ops: &[RrdpDeltaOp],
) -> StorageResult<usize> {
if ops.is_empty() {
return Ok(0);
}
let raw_cf = self.cf(CF_RAW_OBJECTS)?;
let idx_cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let mut batch = WriteBatch::default();
for op in ops {
match op {
RrdpDeltaOp::Upsert { rsync_uri, bytes } => {
batch.put_cf(raw_cf, rsync_uri.as_bytes(), bytes.as_slice());
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri.as_str());
batch.put_cf(idx_cf, k, b"");
}
RrdpDeltaOp::Delete { rsync_uri } => {
batch.delete_cf(raw_cf, rsync_uri.as_bytes());
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri.as_str());
batch.delete_cf(idx_cf, k);
}
}
}
self.write_batch(batch)?;
Ok(ops.len())
}
#[allow(dead_code)]
pub fn raw_iter_prefix<'a>(
&'a self,
@ -419,3 +600,189 @@ fn compute_sha256_32(bytes: &[u8]) -> [u8; 32] {
out.copy_from_slice(&digest);
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rrdp_object_index_and_snapshot_delta_helpers_work_end_to_end() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let notification_uri = "https://rrdp.example.test/notification.xml";
let u1 = "rsync://rpki.example.test/repo/obj1.cer".to_string();
let u2 = "rsync://rpki.example.test/repo/obj2.mft".to_string();
// Empty clear is a fast no-op.
assert_eq!(
store
.rrdp_object_index_clear(notification_uri)
.expect("clear empty"),
0
);
// Snapshot publishes two objects.
let published_v1 = vec![
(u1.clone(), vec![1u8, 2, 3]),
(u2.clone(), vec![9u8, 8, 7]),
];
let n = store
.apply_rrdp_snapshot(notification_uri, &published_v1)
.expect("apply snapshot v1");
assert_eq!(n, 2);
assert_eq!(
store.get_raw(&u1).expect("get_raw u1"),
Some(vec![1u8, 2, 3])
);
assert_eq!(
store.get_raw(&u2).expect("get_raw u2"),
Some(vec![9u8, 8, 7])
);
assert!(
store
.rrdp_object_index_contains(notification_uri, &u1)
.expect("contains u1")
);
assert!(
store
.rrdp_object_index_contains(notification_uri, &u2)
.expect("contains u2")
);
let mut listed: Vec<String> = store
.rrdp_object_index_iter(notification_uri)
.expect("iter")
.collect();
listed.sort();
assert_eq!(listed, vec![u1.clone(), u2.clone()]);
// Snapshot v2 removes u1 and updates u2.
let published_v2 = vec![(u2.clone(), vec![0u8, 1, 2, 3])];
store
.apply_rrdp_snapshot(notification_uri, &published_v2)
.expect("apply snapshot v2");
assert_eq!(store.get_raw(&u1).expect("get_raw removed"), None);
assert_eq!(
store.get_raw(&u2).expect("get_raw updated"),
Some(vec![0u8, 1, 2, 3])
);
// Delta can upsert and delete, and uses the index to scope membership.
let u3 = "rsync://rpki.example.test/repo/obj3.crl".to_string();
let ops = vec![
RrdpDeltaOp::Upsert {
rsync_uri: u3.clone(),
bytes: vec![4u8, 5, 6],
},
RrdpDeltaOp::Delete { rsync_uri: u2.clone() },
];
let applied = store
.apply_rrdp_delta(notification_uri, &ops)
.expect("apply delta");
assert_eq!(applied, 2);
assert_eq!(store.get_raw(&u2).expect("get_raw deleted"), None);
assert_eq!(
store.get_raw(&u3).expect("get_raw u3"),
Some(vec![4u8, 5, 6])
);
// Prefix iterators yield only matching keys.
let prefix = b"rsync://rpki.example.test/repo/";
let mut got: Vec<String> = store
.raw_iter_prefix(prefix)
.expect("raw_iter_prefix")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.collect();
got.sort();
assert_eq!(got, vec![u3.clone()]);
let all: Vec<String> = store
.raw_iter_all()
.expect("raw_iter_all")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.collect();
assert!(all.contains(&u3));
// Clearing removes all index entries for this RRDP repository.
let cleared = store
.rrdp_object_index_clear(notification_uri)
.expect("clear");
assert!(cleared >= 1);
assert!(
!store
.rrdp_object_index_contains(notification_uri, &u3)
.expect("contains after clear")
);
}
fn minimal_valid_pack() -> FetchCachePpPack {
let now = time::OffsetDateTime::now_utc();
FetchCachePpPack {
format_version: FetchCachePpPack::FORMAT_VERSION_V1,
manifest_rsync_uri: "rsync://example.test/repo/pp/manifest.mft".to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/pp/".to_string(),
manifest_number_be: vec![1],
this_update: PackTime::from_utc_offset_datetime(now),
next_update: PackTime::from_utc_offset_datetime(now + time::Duration::hours(1)),
verified_at: PackTime::from_utc_offset_datetime(now),
manifest_bytes: vec![0x01],
files: vec![PackFile::from_bytes_compute_sha256(
"rsync://example.test/repo/pp/a.roa",
vec![1u8, 2, 3],
)],
}
}
#[test]
fn apply_rrdp_delta_empty_ops_is_noop() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let n = store
.apply_rrdp_delta("https://rrdp.example.test/notification.xml", &[])
.expect("apply empty delta");
assert_eq!(n, 0);
}
#[test]
fn fetch_cache_pp_pack_rejects_invalid_time_fields_and_duplicates() {
// Invalid `next_update`.
let mut p = minimal_valid_pack();
p.next_update.rfc3339_utc = "not-a-time".to_string();
let e = p.validate_internal().unwrap_err().to_string();
assert!(e.contains("invalid time field next_update"), "{e}");
// Invalid `verified_at`.
let mut p = minimal_valid_pack();
p.verified_at.rfc3339_utc = "also-not-a-time".to_string();
let e = p.validate_internal().unwrap_err().to_string();
assert!(e.contains("invalid time field verified_at"), "{e}");
// Duplicate file rsync URI.
let mut p = minimal_valid_pack();
let f = p.files[0].clone();
p.files.push(f);
let e = p.validate_internal().unwrap_err().to_string();
assert!(e.contains("duplicate file rsync uri"), "{e}");
}
#[test]
fn fetch_cache_pp_iter_all_lists_keys() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let key = FetchCachePpKey::from_manifest_rsync_uri("rsync://example.test/repo/pp/manifest.mft");
let bytes = minimal_valid_pack().encode().expect("encode pack");
store
.put_fetch_cache_pp(&key, &bytes)
.expect("put fetch_cache_pp");
let keys: Vec<String> = store
.fetch_cache_pp_iter_all()
.expect("iter all")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.collect();
assert!(keys.iter().any(|k| k == key.as_str()), "missing key in iterator");
}
}

View File

@ -3,8 +3,7 @@ use crate::policy::{Policy, SyncPreference};
use crate::report::{RfcRef, Warning};
use crate::storage::RocksStore;
use crate::sync::rrdp::{
Fetcher as HttpFetcher, RrdpState, RrdpSyncError, parse_notification_snapshot,
sync_from_notification_snapshot,
Fetcher as HttpFetcher, RrdpSyncError, sync_from_notification,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -91,24 +90,7 @@ fn try_rrdp_sync(
.fetch(notification_uri)
.map_err(RrdpSyncError::Fetch)?;
// Stage2 snapshot-only optimization: if the stored RRDP state matches the current notification's
// session_id+serial, skip snapshot fetch/apply. This avoids repeatedly downloading/applying the
// same snapshot when traversing many CA instances sharing an RRDP endpoint.
//
// RFC 8182 §3.4.1-§3.4.3: clients use notification to discover snapshot and can avoid fetching
// snapshot when serial hasn't advanced.
if let Ok(notif) = parse_notification_snapshot(&notification_xml) {
if let Ok(Some(state_bytes)) = store.get_rrdp_state(notification_uri) {
if let Ok(state) = RrdpState::decode(&state_bytes) {
if state.session_id == notif.session_id.to_string() && state.serial == notif.serial
{
return Ok(0);
}
}
}
}
sync_from_notification_snapshot(store, notification_uri, &notification_xml, http_fetcher)
sync_from_notification(store, notification_uri, &notification_xml, http_fetcher)
}
fn rsync_sync_into_raw_objects(

File diff suppressed because it is too large Load Diff

View File

@ -7,6 +7,8 @@ use crate::data_model::rc::{
};
use x509_parser::prelude::{FromDer, X509Certificate};
use crate::validation::x509_name::x509_names_equivalent;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ValidatedSubordinateCa {
pub child_ca: ResourceCertificate,
@ -135,7 +137,7 @@ pub fn validate_subordinate_ca_cert(
return Err(CaPathError::IssuerNotCa);
}
if child_ca.tbs.issuer_name != issuer_ca.tbs.subject_name {
if !x509_names_equivalent(&child_ca.tbs.issuer_name, &issuer_ca.tbs.subject_name) {
return Err(CaPathError::IssuerSubjectMismatch {
child_issuer_dn: child_ca.tbs.issuer_name.to_string(),
issuer_subject_dn: issuer_ca.tbs.subject_name.to_string(),
@ -697,6 +699,7 @@ mod tests {
};
use crate::data_model::common::X509NameDer;
use der_parser::num_bigint::BigUint;
use std::process::Command;
fn dummy_cert(
kind: ResourceCertKind,
subject_dn: &str,
@ -736,6 +739,158 @@ mod tests {
}
}
fn openssl_available() -> bool {
Command::new("openssl")
.arg("version")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn write_cert_der_with_addext(dir: &std::path::Path, addext: Option<&str>) -> Vec<u8> {
assert!(openssl_available(), "openssl is required for this test");
let key = dir.join("k.pem");
let cert = dir.join("c.pem");
let der = dir.join("c.der");
let mut cmd = Command::new("openssl");
cmd.arg("req")
.arg("-x509")
.arg("-newkey")
.arg("rsa:2048")
.arg("-nodes")
.arg("-keyout")
.arg(&key)
.arg("-subj")
.arg("/CN=ku")
.arg("-days")
.arg("1")
.arg("-out")
.arg(&cert);
if let Some(ext) = addext {
cmd.arg("-addext").arg(ext);
}
let out = cmd.output().expect("openssl req");
assert!(
out.status.success(),
"openssl req failed: {}",
String::from_utf8_lossy(&out.stderr)
);
let out = Command::new("openssl")
.arg("x509")
.arg("-in")
.arg(&cert)
.arg("-outform")
.arg("DER")
.arg("-out")
.arg(&der)
.output()
.expect("openssl x509");
assert!(
out.status.success(),
"openssl x509 failed: {}",
String::from_utf8_lossy(&out.stderr)
);
std::fs::read(&der).expect("read der")
}
fn gen_issuer_and_child_der(dir: &std::path::Path) -> (Vec<u8>, Vec<u8>, Vec<u8>) {
assert!(openssl_available(), "openssl is required for this test");
let issuer_key = dir.join("issuer.key");
let issuer_csr = dir.join("issuer.csr");
let issuer_pem = dir.join("issuer.pem");
let issuer_der = dir.join("issuer.der");
let child_key = dir.join("child.key");
let child_csr = dir.join("child.csr");
let child_pem = dir.join("child.pem");
let child_der = dir.join("child.der");
let other_key = dir.join("other.key");
let other_csr = dir.join("other.csr");
let other_pem = dir.join("other.pem");
let other_der = dir.join("other.der");
let run = |cmd: &mut Command| {
let out = cmd.output().expect("run openssl");
assert!(
out.status.success(),
"command failed: {:?}\nstderr={}",
cmd,
String::from_utf8_lossy(&out.stderr)
);
};
// Issuer self-signed.
run(Command::new("openssl").args(["genrsa", "-out"]).arg(&issuer_key).arg("2048"));
run(Command::new("openssl")
.args(["req", "-new", "-key"])
.arg(&issuer_key)
.args(["-subj", "/CN=issuer", "-out"])
.arg(&issuer_csr));
run(Command::new("openssl")
.args(["x509", "-req", "-in"])
.arg(&issuer_csr)
.args(["-signkey"])
.arg(&issuer_key)
.args(["-days", "1", "-out"])
.arg(&issuer_pem));
run(Command::new("openssl")
.args(["x509", "-in"])
.arg(&issuer_pem)
.args(["-outform", "DER", "-out"])
.arg(&issuer_der));
// Child signed by issuer.
run(Command::new("openssl").args(["genrsa", "-out"]).arg(&child_key).arg("2048"));
run(Command::new("openssl")
.args(["req", "-new", "-key"])
.arg(&child_key)
.args(["-subj", "/CN=child", "-out"])
.arg(&child_csr));
run(Command::new("openssl")
.args(["x509", "-req", "-in"])
.arg(&child_csr)
.args(["-CA"])
.arg(&issuer_pem)
.args(["-CAkey"])
.arg(&issuer_key)
.args(["-CAcreateserial", "-days", "1", "-out"])
.arg(&child_pem));
run(Command::new("openssl")
.args(["x509", "-in"])
.arg(&child_pem)
.args(["-outform", "DER", "-out"])
.arg(&child_der));
// Other self-signed issuer.
run(Command::new("openssl").args(["genrsa", "-out"]).arg(&other_key).arg("2048"));
run(Command::new("openssl")
.args(["req", "-new", "-key"])
.arg(&other_key)
.args(["-subj", "/CN=other", "-out"])
.arg(&other_csr));
run(Command::new("openssl")
.args(["x509", "-req", "-in"])
.arg(&other_csr)
.args(["-signkey"])
.arg(&other_key)
.args(["-days", "1", "-out"])
.arg(&other_pem));
run(Command::new("openssl")
.args(["x509", "-in"])
.arg(&other_pem)
.args(["-outform", "DER", "-out"])
.arg(&other_der));
(
std::fs::read(&issuer_der).expect("read issuer der"),
std::fs::read(&child_der).expect("read child der"),
std::fs::read(&other_der).expect("read other der"),
)
}
#[test]
fn resolve_child_ip_resources_rejects_inherit_without_parent_effective_resources() {
let child = IpResourceSet {
@ -953,6 +1108,142 @@ mod tests {
validate_child_crldp_contains_issuer_crl_uri(&child, "rsync://example.test/issuer.crl")
.expect("crldp ok");
}
#[test]
fn validate_child_ca_key_usage_accepts_only_keycertsign_and_crlsign_critical() {
let td = tempfile::tempdir().expect("tempdir");
let der = write_cert_der_with_addext(
td.path(),
Some("keyUsage = critical, keyCertSign, cRLSign"),
);
validate_child_ca_key_usage(&der).expect("key usage ok");
}
#[test]
fn validate_child_ca_key_usage_rejects_missing_noncritical_and_invalid_bits() {
let td = tempfile::tempdir().expect("tempdir");
let missing = write_cert_der_with_addext(td.path(), None);
let err = validate_child_ca_key_usage(&missing).unwrap_err();
assert!(matches!(err, CaPathError::KeyUsageMissing), "{err}");
let td = tempfile::tempdir().expect("tempdir");
let noncritical =
write_cert_der_with_addext(td.path(), Some("keyUsage = keyCertSign, cRLSign"));
let err = validate_child_ca_key_usage(&noncritical).unwrap_err();
assert!(matches!(err, CaPathError::KeyUsageNotCritical), "{err}");
let td = tempfile::tempdir().expect("tempdir");
let invalid = write_cert_der_with_addext(
td.path(),
Some("keyUsage = critical, keyCertSign, cRLSign, digitalSignature"),
);
let err = validate_child_ca_key_usage(&invalid).unwrap_err();
assert!(matches!(err, CaPathError::KeyUsageInvalidBits), "{err}");
}
#[test]
fn verify_cert_signature_with_issuer_accepts_valid_chain_and_rejects_wrong_issuer() {
let td = tempfile::tempdir().expect("tempdir");
let (issuer, child, other) = gen_issuer_and_child_der(td.path());
verify_cert_signature_with_issuer(&child, &issuer).expect("signature ok");
let err = verify_cert_signature_with_issuer(&child, &other).unwrap_err();
assert!(matches!(err, CaPathError::ChildSignatureInvalid(_)), "{err}");
}
#[test]
fn resolve_child_ip_and_as_resources_success_paths() {
use crate::data_model::rc::{AsIdOrRange, IpAddressOrRange, IpPrefix};
let parent_ip = IpResourceSet {
families: vec![IpAddressFamily {
afi: Afi::Ipv4,
choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix(
IpPrefix {
afi: Afi::Ipv4,
prefix_len: 8,
addr: vec![10, 0, 0, 0],
},
)]),
}],
};
let child_ip_inherit = IpResourceSet {
families: vec![IpAddressFamily {
afi: Afi::Ipv4,
choice: IpAddressChoice::Inherit,
}],
};
let eff = resolve_child_ip_resources(Some(&child_ip_inherit), Some(&parent_ip))
.expect("inherit resolves")
.expect("some ip");
assert_eq!(eff.families.len(), 1);
assert!(matches!(
eff.families[0].choice,
IpAddressChoice::AddressesOrRanges(_)
));
let child_ip_subset = IpResourceSet {
families: vec![IpAddressFamily {
afi: Afi::Ipv4,
choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix(
IpPrefix {
afi: Afi::Ipv4,
prefix_len: 16,
addr: vec![10, 1, 0, 0],
},
)]),
}],
};
resolve_child_ip_resources(Some(&child_ip_subset), Some(&parent_ip))
.expect("subset ok")
.expect("some");
let child_ip_bad = IpResourceSet {
families: vec![IpAddressFamily {
afi: Afi::Ipv4,
choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix(
IpPrefix {
afi: Afi::Ipv4,
prefix_len: 16,
addr: vec![11, 0, 0, 0],
},
)]),
}],
};
let err = resolve_child_ip_resources(Some(&child_ip_bad), Some(&parent_ip)).unwrap_err();
assert!(matches!(err, CaPathError::ResourcesNotSubset), "{err}");
let parent_as = AsResourceSet {
asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Range {
min: 1,
max: 100,
}])),
rdi: None,
};
let child_as_inherit = AsResourceSet {
asnum: Some(AsIdentifierChoice::Inherit),
rdi: None,
};
let eff_as = resolve_child_as_resources(Some(&child_as_inherit), Some(&parent_as))
.expect("inherit as")
.expect("some");
assert_eq!(eff_as.asnum, parent_as.asnum);
let child_as_subset = AsResourceSet {
asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(50)])),
rdi: None,
};
resolve_child_as_resources(Some(&child_as_subset), Some(&parent_as))
.expect("subset as")
.expect("some");
let child_as_bad = AsResourceSet {
asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(200)])),
rdi: None,
};
let err = resolve_child_as_resources(Some(&child_as_bad), Some(&parent_as)).unwrap_err();
assert!(matches!(err, CaPathError::ResourcesNotSubset), "{err}");
}
}
fn increment_bytes(v: &[u8]) -> Vec<u8> {

View File

@ -5,6 +5,8 @@ use crate::data_model::rc::{
};
use x509_parser::prelude::{FromDer, X509Certificate};
use crate::validation::x509_name::x509_names_equivalent;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ValidatedEeCertPath {
pub ee: ResourceCertificate,
@ -112,7 +114,7 @@ pub fn validate_ee_cert_path(
return Err(CertPathError::IssuerNotCa);
}
if ee.tbs.issuer_name != issuer_ca.tbs.subject_name {
if !x509_names_equivalent(&ee.tbs.issuer_name, &issuer_ca.tbs.subject_name) {
return Err(CertPathError::IssuerSubjectMismatch {
ee_issuer_dn: ee.tbs.issuer_name.to_string(),
issuer_subject_dn: issuer_ca.tbs.subject_name.to_string(),

View File

@ -23,6 +23,9 @@ pub struct PublicationPointResult {
#[derive(Debug, thiserror::Error)]
pub enum ManifestFreshError {
#[error("repo sync failed: {detail} (RFC 8182 §3.4.5; RFC 9286 §6.6)")]
RepoSyncFailed { detail: String },
#[error(
"manifest not found in raw_objects: {manifest_rsync_uri} (RFC 9286 §6.2; RFC 9286 §6.6)"
)]
@ -148,14 +151,44 @@ pub fn process_manifest_publication_point(
issuer_ca_rsync_uri: Option<&str>,
validation_time: time::OffsetDateTime,
) -> Result<PublicationPointResult, ManifestProcessError> {
let fresh = try_build_fresh_pack(
process_manifest_publication_point_after_repo_sync(
store,
policy,
manifest_rsync_uri,
publication_point_rsync_uri,
issuer_ca_der,
issuer_ca_rsync_uri,
validation_time,
true,
None,
)
}
pub fn process_manifest_publication_point_after_repo_sync(
store: &RocksStore,
policy: &Policy,
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
issuer_ca_der: &[u8],
issuer_ca_rsync_uri: Option<&str>,
validation_time: time::OffsetDateTime,
repo_sync_ok: bool,
repo_sync_error: Option<&str>,
) -> Result<PublicationPointResult, ManifestProcessError> {
let fresh = if repo_sync_ok {
try_build_fresh_pack(
store,
manifest_rsync_uri,
publication_point_rsync_uri,
issuer_ca_der,
issuer_ca_rsync_uri,
validation_time,
);
)
} else {
Err(ManifestFreshError::RepoSyncFailed {
detail: repo_sync_error.unwrap_or("repo sync failed").to_string(),
})
};
match fresh {
Ok(pack) => {
@ -354,9 +387,16 @@ fn try_build_fresh_pack(
// RFC 9286 §4.2.1: replay/rollback detection for manifestNumber and thisUpdate.
//
// If a purported "new" manifest contains a manifestNumber equal to or lower than previously
// validated manifests, or a thisUpdate less recent than previously validated manifests,
// this is treated as a failed fetch and processing continues via the cached objects path (§6.6).
// Important nuance for revalidation across runs:
// - If the manifestNumber is equal to the previously validated manifestNumber *and* the
// manifest bytes are identical, then this is the same manifest being revalidated and MUST
// be accepted (otherwise, RPs would incorrectly treat stable repositories as "failed fetch"
// and fall back to fetch_cache_pp).
// - If manifestNumber is equal but the manifest bytes differ, treat this as invalid (a
// repository is not allowed to change the manifest while keeping the manifestNumber).
// - If manifestNumber is lower, treat as rollback and reject.
// - If manifestNumber is higher, require thisUpdate to be more recent than the previously
// validated thisUpdate.
let key = FetchCachePpKey::from_manifest_rsync_uri(manifest_rsync_uri);
if let Some(old_bytes) = store.get_fetch_cache_pp(&key).ok().flatten() {
if let Ok(old_pack) = FetchCachePpPack::decode(&old_bytes) {
@ -365,13 +405,8 @@ fn try_build_fresh_pack(
{
let new_num = manifest.manifest.manifest_number.bytes_be.as_slice();
let old_num = old_pack.manifest_number_be.as_slice();
if cmp_minimal_be_unsigned(new_num, old_num) != Ordering::Greater {
return Err(ManifestFreshError::ManifestNumberNotIncreasing {
old_hex: hex::encode_upper(old_num),
new_hex: hex::encode_upper(new_num),
});
}
match cmp_minimal_be_unsigned(new_num, old_num) {
Ordering::Greater => {
let old_this_update = old_pack
.this_update
.parse()
@ -389,6 +424,22 @@ fn try_build_fresh_pack(
});
}
}
Ordering::Equal => {
if old_pack.manifest_bytes != manifest_bytes {
return Err(ManifestFreshError::ManifestNumberNotIncreasing {
old_hex: hex::encode_upper(old_num),
new_hex: hex::encode_upper(new_num),
});
}
}
Ordering::Less => {
return Err(ManifestFreshError::ManifestNumberNotIncreasing {
old_hex: hex::encode_upper(old_num),
new_hex: hex::encode_upper(new_num),
});
}
}
}
}
}

View File

@ -8,3 +8,4 @@ pub mod run;
pub mod run_tree_from_tal;
pub mod tree;
pub mod tree_runner;
pub mod x509_name;

View File

@ -10,7 +10,9 @@ use crate::sync::repo::sync_publication_point;
use crate::sync::rrdp::Fetcher;
use crate::validation::ca_instance::ca_instance_uris_from_ca_certificate;
use crate::validation::ca_path::{CaPathError, validate_subordinate_ca_cert};
use crate::validation::manifest::{PublicationPointSource, process_manifest_publication_point};
use crate::validation::manifest::{
PublicationPointSource, process_manifest_publication_point_after_repo_sync,
};
use crate::validation::objects::process_fetch_cache_pp_pack_for_issuer;
use crate::validation::tree::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner,
@ -31,7 +33,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
) -> Result<PublicationPointRunResult, String> {
let mut warnings: Vec<Warning> = Vec::new();
if let Err(e) = sync_publication_point(
let (repo_sync_ok, repo_sync_err): (bool, Option<String>) = match sync_publication_point(
self.store,
self.policy,
ca.rrdp_notification_uri.as_deref(),
@ -39,16 +41,23 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.http_fetcher,
self.rsync_fetcher,
) {
Ok(res) => {
warnings.extend(res.warnings);
(true, None)
}
Err(e) => {
warnings.push(
Warning::new(format!(
"repo sync failed (continuing with cached/raw data): {e}"
"repo sync failed (continuing with fetch_cache_pp only): {e}"
))
.with_rfc_refs(&[RfcRef("RFC 8182 §3.4.5"), RfcRef("RFC 9286 §6.6")])
.with_context(&ca.rsync_base_uri),
);
(false, Some(e.to_string()))
}
};
let pp = match process_manifest_publication_point(
let pp = match process_manifest_publication_point_after_repo_sync(
self.store,
self.policy,
&ca.manifest_rsync_uri,
@ -56,6 +65,8 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
&ca.ca_certificate_der,
ca.ca_certificate_rsync_uri.as_deref(),
self.validation_time,
repo_sync_ok,
repo_sync_err.as_deref(),
) {
Ok(v) => v,
Err(e) => return Err(format!("{e}")),
@ -376,6 +387,7 @@ mod tests {
use super::*;
use crate::data_model::rc::ResourceCertificate;
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use crate::storage::{FetchCachePpPack, PackFile, PackTime};
use crate::sync::rrdp::Fetcher;
use crate::validation::tree::PublicationPointRunner;
@ -389,6 +401,13 @@ mod tests {
}
}
struct FailingRsyncFetcher;
impl RsyncFetcher for FailingRsyncFetcher {
fn fetch_objects(&self, _rsync_base_uri: &str) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
Err(RsyncFetchError::Fetch("rsync disabled in test".to_string()))
}
}
fn openssl_available() -> bool {
Command::new("openssl")
.arg("version")
@ -590,6 +609,34 @@ authorityKeyIdentifier = keyid:always
}
}
#[test]
fn never_http_fetcher_returns_error() {
let f = NeverHttpFetcher;
let err = f.fetch("https://example.test/").unwrap_err();
assert!(err.contains("disabled"), "{err}");
}
#[test]
fn kind_from_rsync_uri_classifies_known_extensions() {
assert_eq!(kind_from_rsync_uri("rsync://example.test/x.crl"), AuditObjectKind::Crl);
assert_eq!(kind_from_rsync_uri("rsync://example.test/x.cer"), AuditObjectKind::Certificate);
assert_eq!(kind_from_rsync_uri("rsync://example.test/x.roa"), AuditObjectKind::Roa);
assert_eq!(kind_from_rsync_uri("rsync://example.test/x.asa"), AuditObjectKind::Aspa);
assert_eq!(kind_from_rsync_uri("rsync://example.test/x.bin"), AuditObjectKind::Other);
}
#[test]
fn select_issuer_crl_from_pack_reports_missing_crldp_for_self_signed_cert() {
let ta_der = std::fs::read(std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(
"tests/fixtures/ta/apnic-ta.cer",
))
.expect("read TA fixture");
let pack = dummy_pack_with_files(vec![]);
let err = select_issuer_crl_from_pack(&ta_der, &pack).unwrap_err();
assert!(err.contains("CRLDistributionPoints missing"), "{err}");
}
#[test]
fn select_issuer_crl_from_pack_finds_matching_crl() {
// Use real fixtures to ensure child cert has CRLDP rsync URI and CRL exists.
@ -801,6 +848,127 @@ authorityKeyIdentifier = keyid:always
);
}
#[test]
fn runner_when_repo_sync_fails_uses_fetch_cache_pp_and_skips_child_discovery() {
let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0");
assert!(fixture_dir.is_dir(), "fixture directory must exist");
let rsync_base_uri = "rsync://rpki.cernet.net/repo/cernet/0/".to_string();
let manifest_file = "05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft";
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let fixture_manifest_bytes =
std::fs::read(fixture_dir.join(manifest_file)).expect("read manifest fixture");
let fixture_manifest =
crate::data_model::manifest::ManifestObject::decode_der(&fixture_manifest_bytes)
.expect("decode manifest fixture");
let validation_time = fixture_manifest.manifest.this_update + time::Duration::seconds(60);
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy {
sync_preference: crate::policy::SyncPreference::RsyncOnly,
..Policy::default()
};
let issuer_ca_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
),
)
.expect("read issuer ca fixture");
let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca");
let handle = CaInstanceHandle {
depth: 0,
ca_certificate_der: issuer_ca_der,
ca_certificate_rsync_uri: Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string()),
effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(),
effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(),
rsync_base_uri: rsync_base_uri.clone(),
manifest_rsync_uri: manifest_rsync_uri.clone(),
publication_point_rsync_uri: rsync_base_uri.clone(),
rrdp_notification_uri: None,
};
// First: successful repo sync to populate fetch_cache_pp.
let ok_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
};
let first = ok_runner
.run_publication_point(&handle)
.expect("first run ok");
assert_eq!(first.source, PublicationPointSource::Fresh);
assert!(first.discovered_children.is_empty(), "fixture has no child .cer");
// Second: repo sync fails, but we can still use fetch_cache_pp.
let bad_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &FailingRsyncFetcher,
validation_time,
};
let second = bad_runner
.run_publication_point(&handle)
.expect("should fall back to fetch_cache_pp");
assert_eq!(second.source, PublicationPointSource::FetchCachePp);
assert!(second.discovered_children.is_empty());
assert!(
second
.warnings
.iter()
.any(|w| w.message.contains("repo sync failed")),
"expected warning about repo sync failure"
);
}
#[test]
fn build_publication_point_audit_emits_no_audit_entry_for_duplicate_pack_uri() {
let pack = dummy_pack_with_files(vec![
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/dup.roa", vec![1u8]),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/dup.roa", vec![2u8]),
]);
let pp = crate::validation::manifest::PublicationPointResult {
source: crate::validation::manifest::PublicationPointSource::FetchCachePp,
pack: pack.clone(),
warnings: Vec::new(),
};
let ca = CaInstanceHandle {
depth: 0,
ca_certificate_der: vec![1],
ca_certificate_rsync_uri: None,
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: pack.publication_point_rsync_uri.clone(),
manifest_rsync_uri: pack.manifest_rsync_uri.clone(),
publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(),
rrdp_notification_uri: None,
};
let objects = crate::validation::objects::ObjectsOutput {
vrps: Vec::new(),
aspas: Vec::new(),
warnings: Vec::new(),
stats: crate::validation::objects::ObjectsStats::default(),
audit: Vec::new(),
};
let audit = build_publication_point_audit(&ca, &pp, &[], &objects, &[]);
assert_eq!(audit.source, "fetch_cache_pp");
assert!(
audit
.objects
.iter()
.any(|e| e.detail.as_deref() == Some("skipped: no audit entry")),
"expected a duplicate key to produce a 'no audit entry' placeholder"
);
}
#[test]
fn build_publication_point_audit_marks_invalid_crl_as_error_and_overlays_roa_audit() {
let now = time::OffsetDateTime::now_utc();

View File

@ -0,0 +1,85 @@
use crate::data_model::common::X509NameDer;
use x509_parser::prelude::FromDer;
fn canonicalize(name: &X509NameDer) -> Option<String> {
let (rem, parsed) = x509_parser::x509::X509Name::from_der(name.as_raw()).ok()?;
if !rem.is_empty() {
return None;
}
Some(parsed.to_string())
}
/// Compare two X.509 distinguished names using a tolerant semantic comparison.
///
/// RPKI repositories in the wild sometimes encode the same name using different
/// ASN.1 string types (e.g., PrintableString vs UTF8String) while remaining
/// semantically equivalent. RFC 5280 path validation requires name matching, but
/// DER byte equality is too strict for interoperability.
pub fn x509_names_equivalent(a: &X509NameDer, b: &X509NameDer) -> bool {
let Some(ca) = canonicalize(a) else {
return a == b;
};
let Some(cb) = canonicalize(b) else {
return a == b;
};
ca == cb
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn x509_names_equivalent_falls_back_to_der_equality_when_parse_fails() {
// Invalid tag (not a SEQUENCE) makes x509-parser fail and forces DER equality fallback.
let a = X509NameDer(vec![0x01, 0x00]);
let b = X509NameDer(vec![0x01, 0x00]);
assert!(x509_names_equivalent(&a, &b));
}
#[test]
fn x509_names_equivalent_compares_semantic_names_when_parse_succeeds() {
let cert_der = std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read certificate fixture");
let (_rem, cert) =
x509_parser::parse_x509_certificate(&cert_der).expect("parse certificate fixture");
let subject = X509NameDer(cert.tbs_certificate.subject.as_raw().to_vec());
let issuer = X509NameDer(cert.tbs_certificate.issuer.as_raw().to_vec());
assert!(x509_names_equivalent(&subject, &subject));
assert!(!x509_names_equivalent(&subject, &issuer));
}
#[test]
fn x509_names_equivalent_falls_back_when_name_has_trailing_bytes() {
// Use a real name DER and append a trailing byte so parsing yields leftover `rem`.
let cert_der = std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read certificate fixture");
let (_rem, cert) =
x509_parser::parse_x509_certificate(&cert_der).expect("parse certificate fixture");
let mut name = cert.tbs_certificate.subject.as_raw().to_vec();
name.push(0x00);
let a = X509NameDer(name.clone());
let b = X509NameDer(name);
assert!(x509_names_equivalent(&a, &b));
}
#[test]
fn x509_names_equivalent_falls_back_when_one_side_fails_to_parse() {
let cert_der = std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read certificate fixture");
let (_rem, cert) =
x509_parser::parse_x509_certificate(&cert_der).expect("parse certificate fixture");
let good = X509NameDer(cert.tbs_certificate.subject.as_raw().to_vec());
let bad = X509NameDer(vec![0x01, 0x00]);
assert!(!x509_names_equivalent(&good, &bad));
}
}

View File

@ -0,0 +1,310 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use rpki::policy::{CaFailedFetchPolicy, Policy, SyncPreference};
use rpki::storage::{FetchCachePpKey, RocksStore};
use rpki::sync::rrdp::{Fetcher, parse_notification, sync_from_notification};
use rpki::sync::repo::{RepoSyncSource, sync_publication_point};
use rpki::validation::from_tal::discover_root_ca_instance_from_tal_url;
use rpki::validation::manifest::{PublicationPointSource, process_manifest_publication_point};
const APNIC_TAL_URL: &str = "https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal";
fn persistent_db_dir() -> PathBuf {
if let Ok(s) = std::env::var("RPKI_LIVE_DB_DIR") {
return PathBuf::from(s);
}
PathBuf::from("target/live/apnic_rrdp_db")
}
fn live_http_fetcher() -> BlockingHttpFetcher {
let timeout_secs: u64 = std::env::var("RPKI_LIVE_HTTP_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(15 * 60);
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: Duration::from_secs(timeout_secs),
user_agent: "rpki-dev/0.1 (stage2 live rrdp delta test)".to_string(),
})
.expect("http fetcher")
}
struct AlwaysFailRsyncFetcher;
impl RsyncFetcher for AlwaysFailRsyncFetcher {
fn fetch_objects(&self, _rsync_base_uri: &str) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
Err(RsyncFetchError::Fetch("rsync disabled for this test".to_string()))
}
}
#[derive(Clone)]
struct CountingDenyUriFetcher {
inner: BlockingHttpFetcher,
deny_uri: String,
counts: std::rc::Rc<RefCell<HashMap<String, u64>>>,
}
impl CountingDenyUriFetcher {
fn new(inner: BlockingHttpFetcher, deny_uri: String) -> Self {
Self {
inner,
deny_uri,
counts: std::rc::Rc::new(RefCell::new(HashMap::new())),
}
}
fn count(&self, uri: &str) -> u64 {
*self.counts.borrow().get(uri).unwrap_or(&0)
}
}
impl Fetcher for CountingDenyUriFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
*self.counts.borrow_mut().entry(uri.to_string()).or_insert(0) += 1;
if uri == self.deny_uri {
return Err(format!("snapshot fetch denied: {uri}"));
}
self.inner.fetch(uri)
}
}
fn live_policy() -> Policy {
let mut p = Policy::default();
p.sync_preference = SyncPreference::RrdpThenRsync;
p.ca_failed_fetch_policy = CaFailedFetchPolicy::UseFetchCachePp;
p
}
#[test]
#[ignore = "live network: APNIC RRDP snapshot bootstrap into persistent RocksDB"]
fn apnic_live_bootstrap_snapshot_and_fetch_cache_pp_pack_to_persistent_db() {
let http = live_http_fetcher();
let rsync = AlwaysFailRsyncFetcher;
let db_dir = persistent_db_dir();
std::fs::create_dir_all(&db_dir).expect("create db dir");
let store = RocksStore::open(&db_dir).expect("open rocksdb");
let policy = live_policy();
let validation_time = time::OffsetDateTime::now_utc();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
let rrdp_notification_uri = ca_instance
.rrdp_notification_uri
.as_deref()
.expect("APNIC root must have rrdpNotification");
let sync = sync_publication_point(
&store,
&policy,
Some(rrdp_notification_uri),
&ca_instance.rsync_base_uri,
&http,
&rsync,
)
.expect("repo sync");
assert_eq!(sync.source, RepoSyncSource::Rrdp);
// Build + persist a fetch_cache_pp pack for the root publication point so later runs can
// validate behavior under failed fetch conditions (RFC 9286 §6.6).
let ta_der = discovery.trust_anchor.ta_certificate.raw_der;
let pp = process_manifest_publication_point(
&store,
&policy,
&ca_instance.manifest_rsync_uri,
&ca_instance.publication_point_rsync_uri,
&ta_der,
None,
validation_time,
)
.expect("process manifest publication point");
assert_eq!(pp.source, PublicationPointSource::Fresh);
let key = FetchCachePpKey::from_manifest_rsync_uri(&ca_instance.manifest_rsync_uri);
let cached = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp");
assert!(cached.is_some(), "expected fetch_cache_pp to be stored");
eprintln!("OK: bootstrap complete; persistent db at: {}", db_dir.display());
eprintln!("Next: run `cargo test --release -q --test test_apnic_rrdp_delta_live_20260226 -- --ignored` later to exercise delta sync.");
}
#[test]
#[ignore = "live network: waits for APNIC RRDP serial advance, then sync via deltas only (no snapshot) using persistent RocksDB"]
fn apnic_live_delta_only_from_persistent_db() {
let http = live_http_fetcher();
let db_dir = persistent_db_dir();
let store = RocksStore::open(&db_dir).expect("open rocksdb (must have been bootstrapped)");
let policy = live_policy();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
let rrdp_notification_uri = ca_instance
.rrdp_notification_uri
.as_deref()
.expect("APNIC root must have rrdpNotification");
let state_bytes = store
.get_rrdp_state(rrdp_notification_uri)
.expect("get rrdp_state")
.unwrap_or_else(|| {
panic!(
"missing rrdp_state for APNIC notification URI; run bootstrap test first. db_dir={}",
db_dir.display()
)
});
let state = rpki::sync::rrdp::RrdpState::decode(&state_bytes).expect("decode rrdp_state");
let old_serial = state.serial;
let old_session = state.session_id;
let max_wait_secs: u64 = std::env::var("RPKI_LIVE_MAX_WAIT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30 * 60);
let poll_secs: u64 = std::env::var("RPKI_LIVE_POLL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(60);
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(max_wait_secs) {
panic!(
"timed out waiting for APNIC RRDP serial to advance for delta sync; old_session={} old_serial={} waited={}s",
old_session,
old_serial,
max_wait_secs
);
}
let notif_xml = http
.fetch(rrdp_notification_uri)
.unwrap_or_else(|e| panic!("fetch notification failed: {e}"));
let notif = parse_notification(&notif_xml).expect("parse notification");
if notif.session_id.to_string() != old_session {
panic!(
"RRDP session_id changed; this delta-only test assumes same snapshot baseline. old_session={} new_session={}",
old_session,
notif.session_id
);
}
if notif.serial <= old_serial {
eprintln!(
"waiting for serial advance: session={} old_serial={} current_serial={}",
old_session, old_serial, notif.serial
);
std::thread::sleep(Duration::from_secs(poll_secs));
continue;
}
let want_first = old_serial + 1;
let min_delta = notif.deltas.first().map(|d| d.serial).unwrap_or(u64::MAX);
if notif.deltas.is_empty() || min_delta > want_first {
panic!(
"notification deltas do not cover required serial gap for delta-only sync; old_serial={} want_first={} min_delta={} current_serial={}. rerun bootstrap to refresh snapshot baseline.",
old_serial,
want_first,
min_delta,
notif.serial
);
}
// Deny snapshot fetch to ensure we truly test the delta path and keep the stored snapshot
// baseline unchanged.
let deny = notif.snapshot_uri.clone();
let fetcher = CountingDenyUriFetcher::new(http.clone(), deny.clone());
match sync_from_notification(&store, rrdp_notification_uri, &notif_xml, &fetcher) {
Ok(written) => {
assert!(
written > 0,
"expected delta sync to apply changes (written={written})"
);
assert_eq!(
fetcher.count(&deny),
0,
"delta sync should not fetch snapshot"
);
eprintln!(
"OK: delta sync applied: written={} old_serial={} new_serial={}",
written, old_serial, notif.serial
);
break;
}
Err(e) => {
eprintln!("delta sync attempt failed (will retry): {e}");
std::thread::sleep(Duration::from_secs(poll_secs));
}
}
}
// Keep policy variable used, to avoid warnings if this test evolves.
let _ = policy;
}
#[test]
#[ignore = "offline/synthetic: after bootstrap, force repo sync failure and assert fetch_cache_pp is used (RFC 9286 §6.6)"]
fn apnic_root_repo_sync_failure_uses_fetch_cache_pp_pack() {
let http = live_http_fetcher();
let db_dir = persistent_db_dir();
let store = RocksStore::open(&db_dir).expect("open rocksdb (must have been bootstrapped)");
let mut policy = live_policy();
policy.sync_preference = SyncPreference::RrdpThenRsync;
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::UseFetchCachePp;
let validation_time = time::OffsetDateTime::now_utc();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
// Ensure cache exists (created by bootstrap).
let key = FetchCachePpKey::from_manifest_rsync_uri(&ca_instance.manifest_rsync_uri);
let cached = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp");
assert!(
cached.is_some(),
"missing fetch_cache_pp; run bootstrap test first. db_dir={}",
db_dir.display()
);
// Simulate repo sync failure: skip calling sync_publication_point and directly drive manifest
// processing with repo_sync_ok=false.
let ta_der = discovery.trust_anchor.ta_certificate.raw_der;
let pp = rpki::validation::manifest::process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&ca_instance.manifest_rsync_uri,
&ca_instance.publication_point_rsync_uri,
&ta_der,
None,
validation_time,
false,
Some("synthetic repo sync failure"),
)
.expect("must fall back to fetch_cache_pp");
assert_eq!(pp.source, PublicationPointSource::FetchCachePp);
assert!(
pp.warnings.iter().any(|w| w.message.contains("using fetch_cache_pp")),
"expected cache-use warning"
);
}

View File

@ -0,0 +1,125 @@
use rpki::data_model::oid::{OID_AD_CA_REPOSITORY, OID_AD_RPKI_MANIFEST, OID_AD_RPKI_NOTIFY};
use rpki::data_model::rc::{
AccessDescription, ResourceCertKind, ResourceCertificate, SubjectInfoAccess, SubjectInfoAccessCa,
};
use rpki::validation::ca_instance::{CaInstanceUrisError, ca_instance_uris_from_ca_certificate};
fn apnic_child_ca_fixture_der() -> Vec<u8> {
std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read apnic fixture ca")
}
fn set_sia_ca(cert: &mut ResourceCertificate, ads: Vec<AccessDescription>) {
cert.tbs.extensions.subject_info_access =
Some(SubjectInfoAccess::Ca(SubjectInfoAccessCa { access_descriptions: ads }));
}
#[test]
fn ca_instance_uris_success_and_error_branches() {
let mut cert = ResourceCertificate::decode_der(&apnic_child_ca_fixture_der())
.expect("decode apnic fixture ca");
// Success path.
let uris = ca_instance_uris_from_ca_certificate(&cert).expect("uris");
assert!(uris.rsync_base_uri.starts_with("rsync://"));
assert!(uris.rsync_base_uri.ends_with('/'));
assert!(uris.manifest_rsync_uri.starts_with("rsync://"));
assert!(uris.manifest_rsync_uri.ends_with(".mft"));
// NotCa.
cert.kind = ResourceCertKind::Ee;
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::NotCa));
cert.kind = ResourceCertKind::Ca;
// MissingSia.
cert.tbs.extensions.subject_info_access = None;
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::MissingSia));
// MissingCaRepository / MissingRpkiManifest.
set_sia_ca(
&mut cert,
vec![AccessDescription {
access_method_oid: OID_AD_RPKI_MANIFEST.to_string(),
access_location: "rsync://example.test/repo/x.mft".to_string(),
}],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::MissingCaRepository), "{err}");
set_sia_ca(
&mut cert,
vec![AccessDescription {
access_method_oid: OID_AD_CA_REPOSITORY.to_string(),
access_location: "rsync://example.test/repo/".to_string(),
}],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::MissingRpkiManifest), "{err}");
// Scheme validation branches.
set_sia_ca(
&mut cert,
vec![AccessDescription {
access_method_oid: OID_AD_CA_REPOSITORY.to_string(),
access_location: "http://example.test/repo/".to_string(),
}],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::CaRepositoryNotRsync(_)), "{err}");
set_sia_ca(
&mut cert,
vec![AccessDescription {
access_method_oid: OID_AD_CA_REPOSITORY.to_string(),
access_location: "rsync://example.test/repo/".to_string(),
},
AccessDescription {
access_method_oid: OID_AD_RPKI_MANIFEST.to_string(),
access_location: "http://example.test/repo/x.mft".to_string(),
}],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::RpkiManifestNotRsync(_)), "{err}");
set_sia_ca(
&mut cert,
vec![
AccessDescription {
access_method_oid: OID_AD_CA_REPOSITORY.to_string(),
access_location: "rsync://example.test/repo/".to_string(),
},
AccessDescription {
access_method_oid: OID_AD_RPKI_MANIFEST.to_string(),
access_location: "rsync://example.test/repo/x.mft".to_string(),
},
AccessDescription {
access_method_oid: OID_AD_RPKI_NOTIFY.to_string(),
access_location: "rsync://example.test/repo/notification.xml".to_string(),
},
],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::RpkiNotifyNotHttps(_)), "{err}");
// ManifestNotUnderPublicationPoint.
set_sia_ca(
&mut cert,
vec![
AccessDescription {
access_method_oid: OID_AD_CA_REPOSITORY.to_string(),
access_location: "rsync://example.test/repo/".to_string(),
},
AccessDescription {
access_method_oid: OID_AD_RPKI_MANIFEST.to_string(),
access_location: "rsync://other.test/repo/x.mft".to_string(),
},
],
);
let err = ca_instance_uris_from_ca_certificate(&cert).unwrap_err();
assert!(matches!(err, CaInstanceUrisError::ManifestNotUnderPublicationPoint { .. }), "{err}");
}

View File

@ -3,7 +3,10 @@ use std::path::Path;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::{CaFailedFetchPolicy, Policy};
use rpki::storage::{FetchCachePpKey, FetchCachePpPack, RocksStore};
use rpki::validation::manifest::process_manifest_publication_point;
use rpki::validation::manifest::{
PublicationPointSource, process_manifest_publication_point,
process_manifest_publication_point_after_repo_sync,
};
fn issuer_ca_fixture() -> Vec<u8> {
std::fs::read(
@ -213,3 +216,62 @@ fn cached_pack_revalidation_rejects_hash_mismatch_against_manifest_filelist() {
assert!(msg.contains("cached fetch_cache_pp file hash mismatch"), "{msg}");
assert!(msg.contains("RFC 9286 §6.5"), "{msg}");
}
#[test]
fn repo_sync_failure_forces_fetch_cache_pp_even_if_raw_objects_are_present() {
let (manifest_path, manifest_bytes, manifest) = load_cernet_manifest_fixture();
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(&manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_raw_publication_point_files(
&store,
&manifest_path,
&manifest_rsync_uri,
&manifest_bytes,
&manifest,
&publication_point_rsync_uri,
);
let mut policy = Policy::default();
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::UseFetchCachePp;
let issuer_ca_der = issuer_ca_fixture();
// First run: fresh processing stores fetch_cache_pp.
let _fresh = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("fresh run stores fetch_cache_pp");
// Second run: simulate repo sync failure. Even though raw_objects still contain everything
// needed for a fresh pack, failed fetch semantics require using cached objects only.
let res = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
false,
Some("synthetic repo sync failure"),
)
.expect("must fall back to fetch_cache_pp");
assert_eq!(res.source, PublicationPointSource::FetchCachePp);
assert!(
res.warnings
.iter()
.any(|w| w.message.contains("using fetch_cache_pp")),
"expected fetch_cache_pp warning"
);
}

View File

@ -0,0 +1,70 @@
use std::collections::HashMap;
use rpki::data_model::tal::Tal;
use rpki::sync::rrdp::Fetcher;
use rpki::validation::from_tal::{FromTalError, discover_root_ca_instance_from_tal_url};
#[derive(Default)]
struct MapFetcher {
map: HashMap<String, Result<Vec<u8>, String>>,
}
impl Fetcher for MapFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
self.map
.get(uri)
.cloned()
.unwrap_or_else(|| Err(format!("no fixture for uri: {uri}")))
}
}
#[test]
fn discover_root_ca_instance_from_tal_url_succeeds_with_apnic_fixtures() {
let tal_url = "https://example.test/apnic.tal";
let tal_bytes =
std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal").expect("read tal fixture");
let tal = Tal::decode_bytes(&tal_bytes).expect("decode tal fixture");
let ta_der = std::fs::read("tests/fixtures/ta/apnic-ta.cer").expect("read ta fixture");
let mut fetcher = MapFetcher::default();
fetcher
.map
.insert(tal_url.to_string(), Ok(tal_bytes.clone()));
for u in &tal.ta_uris {
fetcher.map.insert(u.as_str().to_string(), Ok(ta_der.clone()));
}
let out = discover_root_ca_instance_from_tal_url(&fetcher, tal_url).expect("discover root");
assert_eq!(out.tal_url.as_deref(), Some(tal_url));
assert!(!out.ca_instance.rsync_base_uri.is_empty());
assert!(!out.ca_instance.manifest_rsync_uri.is_empty());
assert!(!out.ca_instance.publication_point_rsync_uri.is_empty());
}
#[test]
fn discover_root_ca_instance_from_tal_url_returns_ta_fetch_error_when_all_candidates_fail() {
let tal_url = "https://example.test/apnic.tal";
let tal_bytes =
std::fs::read("tests/fixtures/tal/apnic-rfc7730-https.tal").expect("read tal fixture");
let tal = Tal::decode_bytes(&tal_bytes).expect("decode tal fixture");
let mut fetcher = MapFetcher::default();
fetcher
.map
.insert(tal_url.to_string(), Ok(tal_bytes));
for u in &tal.ta_uris {
fetcher.map.insert(
u.as_str().to_string(),
Err("simulated TA fetch failure".to_string()),
);
}
let err = discover_root_ca_instance_from_tal_url(&fetcher, tal_url).unwrap_err();
match err {
FromTalError::TaFetch(s) => {
assert!(s.contains("fetch"), "{s}");
}
other => panic!("unexpected error: {other}"),
}
}

View File

@ -323,7 +323,7 @@ fn manifest_fallback_pack_is_revalidated_and_rejected_if_stale() {
}
#[test]
fn manifest_replay_is_treated_as_failed_fetch_and_uses_fetch_cache_pp() {
fn manifest_revalidation_with_unchanged_manifest_is_fresh() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
@ -381,9 +381,100 @@ fn manifest_replay_is_treated_as_failed_fetch_and_uses_fetch_cache_pp() {
Some(issuer_ca_rsync_uri()),
t2,
)
.expect("second run should treat replay as failed fetch and use cache");
.expect("second run should accept revalidation of the same manifest");
assert_eq!(second.source, PublicationPointSource::Fresh);
assert!(second.warnings.is_empty());
assert_eq!(second.pack.manifest_bytes, first.pack.manifest_bytes);
assert_eq!(second.pack.manifest_number_be, first.pack.manifest_number_be);
assert_eq!(second.pack.files, first.pack.files);
}
#[test]
fn manifest_rollback_is_treated_as_failed_fetch_and_uses_fetch_cache_pp() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let t1 = manifest.manifest.this_update + time::Duration::seconds(1);
let t2 = manifest.manifest.this_update + time::Duration::seconds(2);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
let entries = manifest
.manifest
.parse_files()
.expect("parse validated manifest fileList");
for entry in &entries {
let file_path = manifest_path
.parent()
.unwrap()
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
}
let policy = Policy::default();
let issuer_ca_der = issuer_ca_fixture();
let first = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
t1,
)
.expect("first run builds and stores fetch_cache_pp pack");
assert_eq!(first.source, PublicationPointSource::Fresh);
// Simulate a previously validated manifest with a higher manifestNumber (rollback detection).
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let stored = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp pack exists");
let mut bumped = FetchCachePpPack::decode(&stored).expect("decode stored pack");
// Deterministically bump the cached manifestNumber to be strictly greater than the current one.
for i in (0..bumped.manifest_number_be.len()).rev() {
let (v, carry) = bumped.manifest_number_be[i].overflowing_add(1);
bumped.manifest_number_be[i] = v;
if !carry {
break;
}
if i == 0 {
bumped.manifest_number_be.insert(0, 1);
break;
}
}
let bumped_bytes = bumped.encode().expect("encode bumped pack");
store
.put_fetch_cache_pp(&key, &bumped_bytes)
.expect("store bumped pack");
let second = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
t2,
)
.expect("second run should treat rollback as failed fetch and use cache");
assert_eq!(second.source, PublicationPointSource::FetchCachePp);
assert_eq!(second.pack, first.pack);
assert_eq!(second.pack, bumped);
assert!(
second
.warnings

View File

@ -0,0 +1,159 @@
use std::path::Path;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::validation::manifest::{ManifestProcessError, PublicationPointSource, process_manifest_publication_point};
fn issuer_ca_fixture_der() -> Vec<u8> {
std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read issuer ca fixture")
}
fn issuer_ca_rsync_uri() -> &'static str {
"rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"
}
fn fixture_to_rsync_uri(path: &Path) -> String {
let rel = path
.strip_prefix("tests/fixtures/repository")
.expect("path under tests/fixtures/repository");
let mut it = rel.components();
let host = it
.next()
.expect("host component")
.as_os_str()
.to_string_lossy();
let rest = it.as_path().to_string_lossy();
format!("rsync://{host}/{rest}")
}
fn fixture_dir_to_rsync_uri(dir: &Path) -> String {
let mut s = fixture_to_rsync_uri(dir);
if !s.ends_with('/') {
s.push('/');
}
s
}
#[test]
fn manifest_outside_publication_point_yields_no_usable_cache() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
// Store manifest and its locked files so Fresh would otherwise succeed.
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
for entry in manifest
.manifest
.parse_files()
.expect("parse fileList")
.iter()
{
let file_path = manifest_path
.parent()
.unwrap()
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read referenced file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
}
let policy = Policy::default();
let issuer_ca_der = issuer_ca_fixture_der();
// Pass a publication point URI that does not include the manifest URI.
let wrong_pp = "rsync://example.test/not-the-pp/";
let err = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
wrong_pp,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.unwrap_err();
// With no cached pack available for this wrong publication point, we get NoUsableCache.
assert!(matches!(err, ManifestProcessError::NoUsableCache { .. }), "{err}");
}
#[test]
fn manifest_outside_publication_point_detects_cached_pack_pp_mismatch() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
for entry in manifest
.manifest
.parse_files()
.expect("parse fileList")
.iter()
{
let file_path = manifest_path
.parent()
.unwrap()
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read referenced file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
}
let policy = Policy::default();
let issuer_ca_der = issuer_ca_fixture_der();
// First run creates and stores fetch_cache_pp pack (Fresh).
let first = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run ok");
assert_eq!(first.source, PublicationPointSource::Fresh);
// Second run with wrong publication point: fresh fails outside PP; cache load also fails
// because the cached pack's publication_point_rsync_uri doesn't match the expected one.
let wrong_pp = "rsync://example.test/not-the-pp/";
let err = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
wrong_pp,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.unwrap_err();
assert!(matches!(err, ManifestProcessError::NoUsableCache { .. }), "{err}");
}

View File

@ -0,0 +1,603 @@
use std::path::Path;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::{CaFailedFetchPolicy, Policy};
use rpki::storage::{FetchCachePpKey, FetchCachePpPack, RocksStore};
use rpki::validation::manifest::{ManifestProcessError, PublicationPointSource, process_manifest_publication_point, process_manifest_publication_point_after_repo_sync};
fn issuer_ca_fixture_der() -> Vec<u8> {
std::fs::read(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
)
.expect("read issuer ca fixture")
}
fn issuer_ca_rsync_uri() -> &'static str {
"rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"
}
fn fixture_to_rsync_uri(path: &Path) -> String {
let rel = path
.strip_prefix("tests/fixtures/repository")
.expect("path under tests/fixtures/repository");
let mut it = rel.components();
let host = it
.next()
.expect("host component")
.as_os_str()
.to_string_lossy();
let rest = it.as_path().to_string_lossy();
format!("rsync://{host}/{rest}")
}
fn fixture_dir_to_rsync_uri(dir: &Path) -> String {
let mut s = fixture_to_rsync_uri(dir);
if !s.ends_with('/') {
s.push('/');
}
s
}
fn store_manifest_and_locked_files(
store: &RocksStore,
manifest_path: &Path,
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
manifest: &ManifestObject,
manifest_bytes: &[u8],
) {
store
.put_raw(manifest_rsync_uri, manifest_bytes)
.expect("store manifest");
for entry in manifest
.manifest
.parse_files()
.expect("parse validated manifest fileList")
.iter()
{
let file_path = manifest_path
.parent()
.unwrap()
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
}
}
#[test]
fn repo_sync_failed_can_fall_back_to_fetch_cache_pp_when_present() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
// First run: build and store a valid fetch_cache_pp pack (Fresh).
let policy = Policy::default();
let first = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run ok");
assert_eq!(first.source, PublicationPointSource::Fresh);
// Second run: simulate RRDP/rsync repo sync failure and ensure we still accept the cached pack.
let second = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
false,
Some("repo sync failed in test"),
)
.expect("repo sync failure should fall back to fetch_cache_pp");
assert_eq!(second.source, PublicationPointSource::FetchCachePp);
assert_eq!(second.pack, first.pack);
assert!(!second.warnings.is_empty());
}
#[test]
fn cached_pack_manifest_rsync_uri_mismatch_is_rejected_as_invalid_pack() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
// First run stores a valid pack.
let _ = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run stores pack");
// Corrupt cached pack metadata: manifest_rsync_uri doesn't match the key.
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let cached_bytes = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp exists");
let mut pack = FetchCachePpPack::decode(&cached_bytes).expect("decode pack");
pack.manifest_rsync_uri = "rsync://example.test/wrong.mft".to_string();
store
.put_fetch_cache_pp(&key, &pack.encode().expect("encode pack"))
.expect("store corrupted pack");
// Force fresh failure and trigger cache load.
let err = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
false,
Some("repo sync failed in test"),
)
.unwrap_err();
assert!(
matches!(err, ManifestProcessError::NoUsableCache { .. }),
"{err}"
);
assert!(
err.to_string()
.contains("cached pack manifest_rsync_uri does not match key"),
"unexpected error: {err}"
);
}
#[test]
fn repo_sync_failed_stop_all_output_skips_cache() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let mut policy = Policy::default();
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::StopAllOutput;
let issuer_ca_der = issuer_ca_fixture_der();
let err = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
"rsync://example.test/pp/manifest.mft",
"rsync://example.test/pp/",
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
time::OffsetDateTime::now_utc(),
false,
Some("repo sync failed in test"),
)
.unwrap_err();
assert!(
err.to_string().contains("repo sync failed"),
"unexpected error: {err}"
);
}
#[test]
fn manifest_missing_locked_file_is_treated_as_failed_fetch() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let pp_with_slash = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let publication_point_rsync_uri = pp_with_slash.trim_end_matches('/'); // exercise join/pp normalization branches
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest only (no locked files)");
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
let err = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.unwrap_err();
assert!(
matches!(err, ManifestProcessError::NoUsableCache { .. }),
"{err}"
);
assert!(
err.to_string().contains("file missing in raw_objects"),
"unexpected error: {err}"
);
}
#[test]
fn manifest_number_increases_but_this_update_not_increasing_is_failed_fetch() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
// Build and store a valid pack first.
let _ = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run stores pack");
// Replace the cached pack with an "older" manifestNumber but a newer thisUpdate to trigger
// RFC 9286 §4.2.1 thisUpdate monotonicity failure on the fresh path.
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let cached_bytes = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp exists");
let mut old_pack = FetchCachePpPack::decode(&cached_bytes).expect("decode pack");
old_pack.manifest_number_be = vec![0];
old_pack.this_update = rpki::storage::PackTime::from_utc_offset_datetime(
manifest.manifest.this_update + time::Duration::hours(24),
);
store
.put_fetch_cache_pp(&key, &old_pack.encode().expect("encode pack"))
.expect("store adjusted pack");
let out = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("should fall back to fetch_cache_pp");
assert_eq!(out.source, PublicationPointSource::FetchCachePp);
assert!(
out.warnings
.iter()
.any(|w| w.message.contains("thisUpdate not more recent")),
"expected warning mentioning thisUpdate monotonicity"
);
}
#[test]
fn manifest_number_equal_but_bytes_differ_is_rejected_without_cache() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
let mut policy = Policy::default();
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::StopAllOutput;
// Store a cached pack that has the same manifestNumber but different manifest bytes.
let _ = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run stores pack");
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let cached_bytes = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp exists");
let mut pack = FetchCachePpPack::decode(&cached_bytes).expect("decode pack");
pack.manifest_bytes[0] ^= 0xFF;
store
.put_fetch_cache_pp(&key, &pack.encode().expect("encode pack"))
.expect("store adjusted pack");
let err = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.unwrap_err();
assert!(
matches!(err, ManifestProcessError::StopAllOutput(_)),
"{err}"
);
assert!(
err.to_string().contains("manifestNumber not higher"),
"unexpected error: {err}"
);
}
#[test]
fn manifest_embedded_ee_cert_path_validation_fails_with_wrong_issuer_ca() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
// Deliberately use a mismatched trust anchor as the "issuer CA" for this publication point.
let wrong_issuer_ca_der =
std::fs::read("tests/fixtures/ta/arin-ta.cer").expect("read wrong issuer fixture");
let mut policy = Policy::default();
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::StopAllOutput;
let err = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&wrong_issuer_ca_der,
None,
validation_time,
)
.unwrap_err();
assert!(
err.to_string().contains("path validation failed"),
"unexpected error: {err}"
);
}
#[test]
fn cached_pack_missing_file_is_rejected_during_revalidation() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
// Store a valid pack first.
let _ = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run stores pack");
// Corrupt cached pack by removing one referenced file.
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let cached_bytes = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp exists");
let mut pack = FetchCachePpPack::decode(&cached_bytes).expect("decode pack");
assert!(!pack.files.is_empty(), "fixture should lock some files");
pack.files.pop();
store
.put_fetch_cache_pp(&key, &pack.encode().expect("encode pack"))
.expect("store corrupted pack");
// Force the fresh path to fail and trigger cache revalidation.
let err = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
false,
Some("repo sync failed in test"),
)
.unwrap_err();
assert!(
matches!(err, ManifestProcessError::NoUsableCache { .. }),
"{err}"
);
assert!(
err.to_string().contains("cached fetch_cache_pp missing file"),
"unexpected error: {err}"
);
}
#[test]
fn cached_pack_hash_mismatch_is_rejected_during_revalidation() {
let manifest_path = Path::new(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let manifest_bytes = std::fs::read(manifest_path).expect("read manifest fixture");
let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture");
let validation_time = manifest.manifest.this_update + time::Duration::seconds(1);
let manifest_rsync_uri = fixture_to_rsync_uri(manifest_path);
let publication_point_rsync_uri = fixture_dir_to_rsync_uri(manifest_path.parent().unwrap());
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store_manifest_and_locked_files(
&store,
manifest_path,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest,
&manifest_bytes,
);
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
// Store a valid pack first.
let _ = process_manifest_publication_point(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("first run stores pack");
// Corrupt cached pack by changing one file's bytes+sha256 so internal validation passes,
// but the manifest fileList binding check fails (RFC 9286 §6.5).
let key = FetchCachePpKey::from_manifest_rsync_uri(&manifest_rsync_uri);
let cached_bytes = store
.get_fetch_cache_pp(&key)
.expect("get fetch_cache_pp")
.expect("fetch_cache_pp exists");
let mut pack = FetchCachePpPack::decode(&cached_bytes).expect("decode pack");
let victim = pack.files.first_mut().expect("non-empty file list");
victim.bytes[0] ^= 0xFF;
victim.sha256 = victim.compute_sha256();
store
.put_fetch_cache_pp(&key, &pack.encode().expect("encode pack"))
.expect("store corrupted pack");
let err = process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
validation_time,
false,
Some("repo sync failed in test"),
)
.unwrap_err();
assert!(
matches!(err, ManifestProcessError::NoUsableCache { .. }),
"{err}"
);
assert!(
err.to_string().contains("cached fetch_cache_pp file hash mismatch"),
"unexpected error: {err}"
);
}