From 3b2a160c5cd41ed7e67236f3bd7db0a74ef6dc81 Mon Sep 17 00:00:00 2001 From: yuyr Date: Tue, 28 Apr 2026 09:58:43 +0800 Subject: [PATCH] =?UTF-8?q?20260428=20=E9=99=8D=E4=BD=8Eall5=20CIR=20repla?= =?UTF-8?q?y=E5=86=85=E5=AD=98=E5=B3=B0=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/cir/run_cir_replay_ours.sh | 139 ++++- src/bin/measure_sequence_replay.rs | 6 + src/bin/replay_bundle_capture.rs | 6 + src/bin/replay_bundle_capture_delta.rs | 9 + src/bin/replay_bundle_capture_sequence.rs | 15 + src/bin/replay_bundle_record.rs | 6 + .../replay_bundle_refresh_sequence_outputs.rs | 6 + src/bundle/compare_view.rs | 4 +- src/bundle/mod.rs | 2 +- src/cli.rs | 496 ++++++++++++++++-- src/sync/repo.rs | 4 +- src/sync/store_projection.rs | 31 +- src/validation/objects.rs | 139 ++++- src/validation/run.rs | 1 + src/validation/run_tree_from_tal.rs | 55 +- src/validation/tree.rs | 25 +- src/validation/tree_parallel.rs | 72 ++- src/validation/tree_runner.rs | 54 +- tests/test_apnic_stats_live_stage2.rs | 4 + tests/test_apnic_tree_live_m15.rs | 6 + tests/test_deterministic_semantics_m4.rs | 3 + tests/test_run_tree_from_tal_offline_m17.rs | 18 + tests/test_tree_traversal_m14.rs | 6 + 23 files changed, 965 insertions(+), 142 deletions(-) diff --git a/scripts/cir/run_cir_replay_ours.sh b/scripts/cir/run_cir_replay_ours.sh index 8e8eb21..3747a9e 100755 --- a/scripts/cir/run_cir_replay_ours.sh +++ b/scripts/cir/run_cir_replay_ours.sh @@ -10,7 +10,11 @@ Usage: --out-dir \ --reference-ccr \ [--keep-db] \ + [--write-actual-ccr] \ + [--write-report-json] \ [--report-json-compact] \ + [--phase2-object-workers ] \ + [--phase2-worker-queue-capacity ] \ [--rpki-bin ] \ [--real-rsync-bin ] EOF @@ -23,7 +27,11 @@ REPO_BYTES_DB="" OUT_DIR="" REFERENCE_CCR="" KEEP_DB=0 +WRITE_ACTUAL_CCR=0 +WRITE_REPORT_JSON=0 REPORT_JSON_COMPACT=0 +PHASE2_OBJECT_WORKERS="${CIR_REPLAY_PHASE2_OBJECT_WORKERS:-4}" +PHASE2_WORKER_QUEUE_CAPACITY="${CIR_REPLAY_PHASE2_WORKER_QUEUE_CAPACITY:-64}" RPKI_BIN="${RPKI_BIN:-$ROOT_DIR/target/release/rpki}" CIR_MATERIALIZE_BIN="${CIR_MATERIALIZE_BIN:-$ROOT_DIR/target/release/cir_materialize}" CIR_EXTRACT_INPUTS_BIN="${CIR_EXTRACT_INPUTS_BIN:-$ROOT_DIR/target/release/cir_extract_inputs}" @@ -38,7 +46,11 @@ while [[ $# -gt 0 ]]; do --out-dir) OUT_DIR="$2"; shift 2 ;; --reference-ccr) REFERENCE_CCR="$2"; shift 2 ;; --keep-db) KEEP_DB=1; shift ;; - --report-json-compact) REPORT_JSON_COMPACT=1; shift ;; + --write-actual-ccr) WRITE_ACTUAL_CCR=1; shift ;; + --write-report-json) WRITE_REPORT_JSON=1; shift ;; + --report-json-compact) WRITE_REPORT_JSON=1; REPORT_JSON_COMPACT=1; shift ;; + --phase2-object-workers) PHASE2_OBJECT_WORKERS="$2"; shift 2 ;; + --phase2-worker-queue-capacity) PHASE2_WORKER_QUEUE_CAPACITY="$2"; shift 2 ;; --rpki-bin) RPKI_BIN="$2"; shift 2 ;; --real-rsync-bin) REAL_RSYNC_BIN="$2"; shift 2 ;; -h|--help) usage; exit 0 ;; @@ -53,7 +65,13 @@ done mkdir -p "$OUT_DIR" +needs_build=0 if [[ ! -x "$RPKI_BIN" || ! -x "$CIR_MATERIALIZE_BIN" || ! -x "$CIR_EXTRACT_INPUTS_BIN" || ! -x "$CCR_TO_COMPARE_VIEWS_BIN" ]]; then + needs_build=1 +elif [[ "$RPKI_BIN" == "$ROOT_DIR/target/release/rpki" ]] && find "$ROOT_DIR/src" "$ROOT_DIR/Cargo.toml" -newer "$RPKI_BIN" -print -quit | grep -q .; then + needs_build=1 +fi +if [[ "$needs_build" -eq 1 ]]; then ( cd "$ROOT_DIR" cargo build --release --bin rpki --bin cir_materialize --bin cir_extract_inputs --bin ccr_to_compare_views @@ -65,6 +83,8 @@ TALS_DIR="$TMP_ROOT/tals" META_JSON="$TMP_ROOT/meta.json" MIRROR_ROOT="$TMP_ROOT/mirror" DB_DIR="$TMP_ROOT/work-db" +REPLAY_RAW_STORE_DB="$TMP_ROOT/replay-raw-store.db" +REPLAY_REPO_BYTES_DB="$TMP_ROOT/replay-repo-bytes.db" ACTUAL_CCR="$OUT_DIR/actual.ccr" ACTUAL_REPORT="$OUT_DIR/report.json" ACTUAL_VRPS="$OUT_DIR/actual-vrps.csv" @@ -109,52 +129,117 @@ PY export REAL_RSYNC_BIN="$REAL_RSYNC_BIN" export CIR_LOCAL_LINK_MODE=1 -REPORT_JSON_ARGS=(--report-json "$ACTUAL_REPORT") -if [[ "$REPORT_JSON_COMPACT" -eq 1 ]]; then - REPORT_JSON_ARGS+=(--report-json-compact) +REPORT_JSON_ARGS=(--skip-report-build) +VCIR_ARGS=(--skip-vcir-persist) +if [[ "$WRITE_REPORT_JSON" -eq 1 ]]; then + REPORT_JSON_ARGS=(--report-json "$ACTUAL_REPORT") + if [[ "$REPORT_JSON_COMPACT" -eq 1 ]]; then + REPORT_JSON_ARGS+=(--report-json-compact) + fi +fi +CCR_ARGS=() +if [[ "$WRITE_ACTUAL_CCR" -eq 1 ]]; then + CCR_ARGS=(--ccr-out "$ACTUAL_CCR") fi "$RPKI_BIN" \ --db "$DB_DIR" \ + --raw-store-db "$REPLAY_RAW_STORE_DB" \ + --repo-bytes-db "$REPLAY_REPO_BYTES_DB" \ "${TAL_ARGS[@]}" \ + --parallel-phase2-object-workers "$PHASE2_OBJECT_WORKERS" \ + --parallel-phase2-worker-queue-capacity "$PHASE2_WORKER_QUEUE_CAPACITY" \ --disable-rrdp \ --rsync-command "$WRAPPER" \ --validation-time "$VALIDATION_TIME" \ - --ccr-out "$ACTUAL_CCR" \ + "${CCR_ARGS[@]}" \ + --vrps-csv-out "$ACTUAL_VRPS" \ + --vaps-csv-out "$ACTUAL_VAPS" \ + --compare-view-trust-anchor unknown \ + "${VCIR_ARGS[@]}" \ "${REPORT_JSON_ARGS[@]}" \ >"$RUN_LOG" 2>&1 -"$CCR_TO_COMPARE_VIEWS_BIN" --ccr "$ACTUAL_CCR" --vrps-out "$ACTUAL_VRPS" --vaps-out "$ACTUAL_VAPS" --trust-anchor unknown +sort_compare_csv() { + local path="$1" + local tmp="${path}.sorted.tmp" + { + head -n 1 "$path" + tail -n +2 "$path" | LC_ALL=C sort -u + } >"$tmp" + mv "$tmp" "$path" +} + +sort_compare_csv "$ACTUAL_VRPS" +sort_compare_csv "$ACTUAL_VAPS" "$CCR_TO_COMPARE_VIEWS_BIN" --ccr "$REFERENCE_CCR" --vrps-out "$REF_VRPS" --vaps-out "$REF_VAPS" --trust-anchor unknown -python3 - <<'PY' "$ACTUAL_VRPS" "$REF_VRPS" "$ACTUAL_VAPS" "$REF_VAPS" "$COMPARE_JSON" "$META_JSON" +python3 - <<'PY' "$ACTUAL_VRPS" "$REF_VRPS" "$ACTUAL_VAPS" "$REF_VAPS" "$COMPARE_JSON" "$META_JSON" "$WRITE_REPORT_JSON" "$WRITE_ACTUAL_CCR" "$PHASE2_OBJECT_WORKERS" "$PHASE2_WORKER_QUEUE_CAPACITY" import csv, json, sys -def rows(path): - with open(path, newline="") as f: - return list(csv.reader(f))[1:] -actual_vrps = {tuple(r) for r in rows(sys.argv[1])} -ref_vrps = {tuple(r) for r in rows(sys.argv[2])} -actual_vaps = {tuple(r) for r in rows(sys.argv[3])} -ref_vaps = {tuple(r) for r in rows(sys.argv[4])} + +def next_row(reader): + try: + return tuple(next(reader)) + except StopIteration: + return None + +def compare_sorted_csv(actual_path, ref_path): + actual_count = 0 + ref_count = 0 + only_actual_count = 0 + only_ref_count = 0 + only_actual_sample = [] + only_ref_sample = [] + with open(actual_path, newline="") as actual_file, open(ref_path, newline="") as ref_file: + actual_reader = csv.reader(actual_file) + ref_reader = csv.reader(ref_file) + next(actual_reader, None) + next(ref_reader, None) + actual = next_row(actual_reader) + ref = next_row(ref_reader) + while actual is not None or ref is not None: + if ref is None or (actual is not None and actual < ref): + actual_count += 1 + only_actual_count += 1 + if len(only_actual_sample) < 20: + only_actual_sample.append(list(actual)) + actual = next_row(actual_reader) + elif actual is None or ref < actual: + ref_count += 1 + only_ref_count += 1 + if len(only_ref_sample) < 20: + only_ref_sample.append(list(ref)) + ref = next_row(ref_reader) + else: + actual_count += 1 + ref_count += 1 + actual = next_row(actual_reader) + ref = next_row(ref_reader) + return { + "actual": actual_count, + "reference": ref_count, + "only_in_actual": only_actual_sample, + "only_in_reference": only_ref_sample, + "only_in_actual_count": only_actual_count, + "only_in_reference_count": only_ref_count, + "match": only_actual_count == 0 and only_ref_count == 0, + } + +vrps = compare_sorted_csv(sys.argv[1], sys.argv[2]) +vaps = compare_sorted_csv(sys.argv[3], sys.argv[4]) meta = json.load(open(sys.argv[6], encoding="utf-8")) summary = { "compareMode": "trust-anchor-agnostic", "talCount": len(meta["talFiles"]), "talPaths": [item["path"] for item in meta["talFiles"]], - "vrps": { - "actual": len(actual_vrps), - "reference": len(ref_vrps), - "only_in_actual": sorted(actual_vrps - ref_vrps)[:20], - "only_in_reference": sorted(ref_vrps - actual_vrps)[:20], - "match": actual_vrps == ref_vrps, + "actualCcrWritten": sys.argv[8] == "1", + "reportJsonWritten": sys.argv[7] == "1", + "replayParallelism": { + "phase2ObjectWorkers": int(sys.argv[9]), + "phase2WorkerQueueCapacity": int(sys.argv[10]), }, - "vaps": { - "actual": len(actual_vaps), - "reference": len(ref_vaps), - "only_in_actual": sorted(actual_vaps - ref_vaps)[:20], - "only_in_reference": sorted(ref_vaps - actual_vaps)[:20], - "match": actual_vaps == ref_vaps, - } + "vrps": vrps, + "vaps": vaps, } with open(sys.argv[5], "w") as f: json.dump(summary, f, indent=2) diff --git a/src/bin/measure_sequence_replay.rs b/src/bin/measure_sequence_replay.rs index 539a025..8bc6efc 100644 --- a/src/bin/measure_sequence_replay.rs +++ b/src/bin/measure_sequence_replay.rs @@ -170,6 +170,9 @@ fn real_main() -> Result<(), String> { &TreeRunConfig { max_depth: None, max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("base replay failed for {rir}: {e}"))?; @@ -221,6 +224,9 @@ fn real_main() -> Result<(), String> { &TreeRunConfig { max_depth: None, max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("delta step replay failed for {rir}/{step_id}: {e}"))?; diff --git a/src/bin/replay_bundle_capture.rs b/src/bin/replay_bundle_capture.rs index 4aa936f..7b68f6a 100644 --- a/src/bin/replay_bundle_capture.rs +++ b/src/bin/replay_bundle_capture.rs @@ -202,6 +202,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("live base run failed: {e}"))?; @@ -262,6 +265,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("self replay failed: {e}"))?; diff --git a/src/bin/replay_bundle_capture_delta.rs b/src/bin/replay_bundle_capture_delta.rs index eda5a71..6ee21de 100644 --- a/src/bin/replay_bundle_capture_delta.rs +++ b/src/bin/replay_bundle_capture_delta.rs @@ -249,6 +249,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("base bootstrap replay failed: {e}"))?; @@ -279,6 +282,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("live target run failed: {e}"))?; @@ -344,6 +350,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("self delta replay failed: {e}"))?; diff --git a/src/bin/replay_bundle_capture_sequence.rs b/src/bin/replay_bundle_capture_sequence.rs index b74cab4..91544ca 100644 --- a/src/bin/replay_bundle_capture_sequence.rs +++ b/src/bin/replay_bundle_capture_sequence.rs @@ -368,6 +368,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("live base run failed: {e}"))?; @@ -462,6 +465,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("base self replay failed: {e}"))?; @@ -517,6 +523,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("sequence base self replay failed: {e}"))?; @@ -567,6 +576,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("live delta step {step_id} failed: {e}"))?; @@ -666,6 +678,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("sequence self replay failed for {step_id}: {e}"))?; diff --git a/src/bin/replay_bundle_record.rs b/src/bin/replay_bundle_record.rs index 3d1b5e7..0e8d329 100644 --- a/src/bin/replay_bundle_record.rs +++ b/src/bin/replay_bundle_record.rs @@ -369,6 +369,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("base replay failed: {e}"))?; @@ -549,6 +552,9 @@ fn run(args: Args) -> Result { &TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("delta replay failed: {e}"))?; diff --git a/src/bin/replay_bundle_refresh_sequence_outputs.rs b/src/bin/replay_bundle_refresh_sequence_outputs.rs index 98636a4..ff7ff85 100644 --- a/src/bin/replay_bundle_refresh_sequence_outputs.rs +++ b/src/bin/replay_bundle_refresh_sequence_outputs.rs @@ -848,6 +848,9 @@ fn real_main() -> Result<(), String> { &TreeRunConfig { max_depth: None, max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("base replay failed: {e}"))?; @@ -926,6 +929,9 @@ fn real_main() -> Result<(), String> { &TreeRunConfig { max_depth: None, max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .map_err(|e| format!("delta step {} replay failed: {e}", step.id))?; diff --git a/src/bundle/compare_view.rs b/src/bundle/compare_view.rs index 3a85687..c0ca73a 100644 --- a/src/bundle/compare_view.rs +++ b/src/bundle/compare_view.rs @@ -24,7 +24,7 @@ fn normalize_asn(asn: u32) -> String { format!("AS{asn}") } -fn canonical_prefix(prefix: &crate::data_model::roa::IpPrefix) -> String { +pub fn canonical_vrp_prefix(prefix: &crate::data_model::roa::IpPrefix) -> String { let mut addr = prefix.addr_bytes().to_vec(); let total_bits = match prefix.afi { crate::data_model::roa::RoaAfi::Ipv4 => 32usize, @@ -54,7 +54,7 @@ pub fn build_vrp_compare_rows(vrps: &[Vrp], trust_anchor: &str) -> BTreeSet, ccr_build_breakdown: Option, ccr_write_ms: Option, + compare_view_build_ms: Option, + compare_view_write_ms: Option, cir_build_cir_ms: Option, cir_write_cir_ms: Option, cir_total_ms: Option, @@ -69,7 +72,12 @@ pub struct CliArgs { pub policy_path: Option, pub report_json_path: Option, pub report_json_compact: bool, + pub skip_report_build: bool, + pub skip_vcir_persist: bool, pub ccr_out_path: Option, + pub vrps_csv_out_path: Option, + pub vaps_csv_out_path: Option, + pub compare_view_trust_anchor: Option, pub cir_enabled: bool, pub cir_out_path: Option, pub cir_static_root: Option, @@ -114,7 +122,13 @@ Options: --policy Policy TOML path (optional) --report-json Write full audit report as JSON (optional) --report-json-compact Write report JSON without pretty-printing (requires --report-json) + --skip-report-build Skip full audit report construction when --report-json is not requested + --skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs --ccr-out Write CCR DER ContentInfo to this path (optional) + --vrps-csv-out Write VRP compare-view CSV directly from validation output (optional; requires --vaps-csv-out) + --vaps-csv-out Write VAP compare-view CSV directly from validation output (optional; requires --vrps-csv-out) + --compare-view-trust-anchor + Trust-anchor label used by direct compare-view CSV output (default: unknown) --cir-enable Export CIR after the run completes --cir-out Write CIR DER to this path (requires --cir-enable) --cir-static-root Deprecated; CIR export no longer exports object pools @@ -171,7 +185,12 @@ pub fn parse_args(argv: &[String]) -> Result { let mut policy_path: Option = None; let mut report_json_path: Option = None; let mut report_json_compact: bool = false; + let mut skip_report_build: bool = false; + let mut skip_vcir_persist: bool = false; let mut ccr_out_path: Option = None; + let mut vrps_csv_out_path: Option = None; + let mut vaps_csv_out_path: Option = None; + let mut compare_view_trust_anchor: Option = None; let mut cir_enabled: bool = false; let mut cir_out_path: Option = None; let mut cir_static_root: Option = None; @@ -291,11 +310,34 @@ pub fn parse_args(argv: &[String]) -> Result { "--report-json-compact" => { report_json_compact = true; } + "--skip-report-build" => { + skip_report_build = true; + } + "--skip-vcir-persist" => { + skip_vcir_persist = true; + } "--ccr-out" => { i += 1; let v = argv.get(i).ok_or("--ccr-out requires a value")?; ccr_out_path = Some(PathBuf::from(v)); } + "--vrps-csv-out" => { + i += 1; + let v = argv.get(i).ok_or("--vrps-csv-out requires a value")?; + vrps_csv_out_path = Some(PathBuf::from(v)); + } + "--vaps-csv-out" => { + i += 1; + let v = argv.get(i).ok_or("--vaps-csv-out requires a value")?; + vaps_csv_out_path = Some(PathBuf::from(v)); + } + "--compare-view-trust-anchor" => { + i += 1; + let v = argv + .get(i) + .ok_or("--compare-view-trust-anchor requires a value")?; + compare_view_trust_anchor = Some(v.clone()); + } "--cir-enable" => { cir_enabled = true; } @@ -487,6 +529,24 @@ pub fn parse_args(argv: &[String]) -> Result { usage() )); } + if skip_report_build && report_json_path.is_some() { + return Err(format!( + "--skip-report-build cannot be combined with --report-json\n\n{}", + usage() + )); + } + if vrps_csv_out_path.is_some() != vaps_csv_out_path.is_some() { + return Err(format!( + "--vrps-csv-out and --vaps-csv-out must be provided together\n\n{}", + usage() + )); + } + if compare_view_trust_anchor.is_some() && vrps_csv_out_path.is_none() { + return Err(format!( + "--compare-view-trust-anchor requires --vrps-csv-out/--vaps-csv-out\n\n{}", + usage() + )); + } if cir_static_root.is_some() { return Err(format!( "--cir-static-root is no longer supported; CIR export now writes only .cir files\n\n{}", @@ -632,7 +692,12 @@ pub fn parse_args(argv: &[String]) -> Result { policy_path, report_json_path, report_json_compact, + skip_report_build, + skip_vcir_persist, ccr_out_path, + vrps_csv_out_path, + vaps_csv_out_path, + compare_view_trust_anchor, cir_enabled, cir_out_path, cir_static_root, @@ -688,10 +753,12 @@ fn write_json(path: &Path, report: &AuditReportV2, format: ReportJsonFormat) -> Ok(()) } -fn unique_rrdp_repos(report: &AuditReportV2) -> usize { +fn unique_rrdp_repos_from_publication_points( + publication_points: &[crate::audit::PublicationPointAudit], +) -> usize { use std::collections::HashSet; let mut set: HashSet<&str> = HashSet::new(); - for pp in &report.publication_points { + for pp in publication_points { if let Some(u) = pp.rrdp_notification_uri.as_deref() { set.insert(u); } @@ -699,6 +766,10 @@ fn unique_rrdp_repos(report: &AuditReportV2) -> usize { set.len() } +fn unique_rrdp_repos(report: &AuditReportV2) -> usize { + unique_rrdp_repos_from_publication_points(&report.publication_points) +} + fn print_summary(report: &AuditReportV2) { let rrdp_repos = unique_rrdp_repos(report); println!("RPKI stage2 serial run summary"); @@ -728,6 +799,37 @@ fn print_summary(report: &AuditReportV2) { ); } +fn print_summary_from_shared(validation_time: time::OffsetDateTime, shared: &PostValidationShared) { + use time::format_description::well_known::Rfc3339; + let validation_time_rfc3339_utc = validation_time + .to_offset(time::UtcOffset::UTC) + .format(&Rfc3339) + .expect("format validation_time"); + let rrdp_repos = unique_rrdp_repos_from_publication_points(shared.publication_points.as_ref()); + println!("RPKI stage2 serial run summary"); + println!("validation_time={validation_time_rfc3339_utc}"); + println!( + "publication_points_processed={} publication_points_failed={}", + shared.instances_processed, shared.instances_failed + ); + println!("rrdp_repos_unique={rrdp_repos}"); + println!("vrps={}", shared.vrps.len()); + println!("aspas={}", shared.aspas.len()); + println!( + "audit_publication_points={}", + shared.publication_points.len() + ); + println!( + "warnings_total={}", + shared.tree_warnings.len() + + shared + .publication_points + .iter() + .map(|pp| pp.warnings.len()) + .sum::() + ); +} + #[derive(Clone, Debug, PartialEq, Eq)] struct PostValidationShared { discovery: crate::validation::from_tal::DiscoveredRootCaInstance, @@ -861,11 +963,21 @@ fn build_report( #[derive(Clone, Debug, PartialEq, Eq)] struct ReportTaskOutput { - report: AuditReportV2, + report: Option, report_build_ms: u64, report_write_ms: Option, } +impl ReportTaskOutput { + fn skipped() -> Self { + Self { + report: None, + report_build_ms: 0, + report_write_ms: None, + } + } +} + fn run_report_task( policy: &Policy, validation_time: time::OffsetDateTime, @@ -886,7 +998,7 @@ fn run_report_task( }; Ok(ReportTaskOutput { - report, + report: Some(report), report_build_ms, report_write_ms, }) @@ -950,6 +1062,98 @@ fn run_ccr_task( }) } +#[derive(Clone, Debug, PartialEq, Eq)] +struct CompareViewTaskOutput { + build_ms: Option, + write_ms: Option, +} + +fn run_compare_view_task( + shared: &PostValidationShared, + vrps_csv_out_path: Option<&Path>, + vaps_csv_out_path: Option<&Path>, + trust_anchor: &str, +) -> Result { + let mut build_ms = None; + let mut write_ms = None; + if let (Some(vrps_path), Some(vaps_path)) = (vrps_csv_out_path, vaps_csv_out_path) { + let started = std::time::Instant::now(); + build_ms = Some(0); + write_direct_vrp_csv(vrps_path, shared.vrps.as_ref(), trust_anchor)?; + write_direct_vap_csv(vaps_path, shared.aspas.as_ref(), trust_anchor)?; + write_ms = Some(started.elapsed().as_millis() as u64); + eprintln!( + "wrote compare views: vrps={} vaps={}", + vrps_path.display(), + vaps_path.display() + ); + } + Ok(CompareViewTaskOutput { build_ms, write_ms }) +} + +fn write_direct_vrp_csv( + path: &Path, + vrps: &[crate::validation::objects::Vrp], + trust_anchor: &str, +) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?; + } + let file = std::fs::File::create(path) + .map_err(|e| format!("create file failed: {}: {e}", path.display()))?; + let mut writer = BufWriter::new(file); + use std::io::Write; + let trust_anchor = trust_anchor.to_ascii_lowercase(); + writeln!(writer, "ASN,IP Prefix,Max Length,Trust Anchor").map_err(|e| e.to_string())?; + for vrp in vrps { + writeln!( + writer, + "AS{},{},{},{}", + vrp.asn, + canonical_vrp_prefix(&vrp.prefix), + vrp.max_length, + trust_anchor + ) + .map_err(|e| e.to_string())?; + } + Ok(()) +} + +fn write_direct_vap_csv( + path: &Path, + aspas: &[crate::validation::objects::AspaAttestation], + trust_anchor: &str, +) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?; + } + let file = std::fs::File::create(path) + .map_err(|e| format!("create file failed: {}: {e}", path.display()))?; + let mut writer = BufWriter::new(file); + use std::io::Write; + let trust_anchor = trust_anchor.to_ascii_lowercase(); + writeln!(writer, "Customer ASN,Providers,Trust Anchor").map_err(|e| e.to_string())?; + for aspa in aspas { + let mut providers = aspa.provider_as_ids.clone(); + providers.sort_unstable(); + providers.dedup(); + let providers = providers + .into_iter() + .map(|asn| format!("AS{asn}")) + .collect::>() + .join(";"); + writeln!( + writer, + "AS{},{},{}", + aspa.customer_as_id, providers, trust_anchor + ) + .map_err(|e| e.to_string())?; + } + Ok(()) +} + fn write_stage_timing( report_json_path: Option<&Path>, stage_timing: &RunStageTiming, @@ -1140,6 +1344,11 @@ pub fn run(argv: &[String]) -> Result<(), String> { let config = TreeRunConfig { max_depth: args.max_depth, max_instances: args.max_instances, + compact_audit: args.skip_report_build + && args.report_json_path.is_none() + && !args.cir_enabled, + persist_vcir: !args.skip_vcir_persist, + build_ccr_accumulator: args.ccr_out_path.is_some(), }; let replay_mode = args.payload_replay_archive.is_some(); let delta_replay_mode = args.payload_base_archive.is_some(); @@ -1424,34 +1633,46 @@ pub fn run(argv: &[String]) -> Result<(), String> { ReportJsonFormat::Pretty }; let ccr_produced_at = time::OffsetDateTime::now_utc(); - let (report_result, ccr_result) = std::thread::scope(|scope| { - let report_handle = scope.spawn(|| { - run_report_task( - &policy, - validation_time, - &shared, - args.report_json_path.as_deref(), - report_json_format, - ) - }); - let ccr_handle = scope.spawn(|| { + let (report_result, ccr_result) = if args.skip_report_build { + ( + Ok(ReportTaskOutput::skipped()), run_ccr_task( store.as_ref(), &shared, args.ccr_out_path.as_deref(), ccr_produced_at, - ) - }); - let report_result = report_handle - .join() - .map_err(|_| "report task panicked".to_string()) - .and_then(|result| result); - let ccr_result = ccr_handle - .join() - .map_err(|_| "ccr task panicked".to_string()) - .and_then(|result| result); - (report_result, ccr_result) - }); + ), + ) + } else { + std::thread::scope(|scope| { + let report_handle = scope.spawn(|| { + run_report_task( + &policy, + validation_time, + &shared, + args.report_json_path.as_deref(), + report_json_format, + ) + }); + let ccr_handle = scope.spawn(|| { + run_ccr_task( + store.as_ref(), + &shared, + args.ccr_out_path.as_deref(), + ccr_produced_at, + ) + }); + let report_result = report_handle + .join() + .map_err(|_| "report task panicked".to_string()) + .and_then(|result| result); + let ccr_result = ccr_handle + .join() + .map_err(|_| "ccr task panicked".to_string()) + .and_then(|result| result); + (report_result, ccr_result) + }) + }; let report_output = report_result?; let ccr_output = ccr_result?; let report = report_output.report; @@ -1460,6 +1681,18 @@ pub fn run(argv: &[String]) -> Result<(), String> { let ccr_build_ms = ccr_output.ccr_build_ms; let ccr_build_breakdown = ccr_output.ccr_build_breakdown; let ccr_write_ms = ccr_output.ccr_write_ms; + let compare_view_trust_anchor = args + .compare_view_trust_anchor + .as_deref() + .unwrap_or("unknown"); + let compare_view_output = run_compare_view_task( + &shared, + args.vrps_csv_out_path.as_deref(), + args.vaps_csv_out_path.as_deref(), + compare_view_trust_anchor, + )?; + let compare_view_build_ms = compare_view_output.build_ms; + let compare_view_write_ms = compare_view_output.write_ms; let mut cir_build_cir_ms = None; let mut cir_write_cir_ms = None; @@ -1516,6 +1749,8 @@ pub fn run(argv: &[String]) -> Result<(), String> { ccr_build_ms, ccr_build_breakdown, ccr_write_ms, + compare_view_build_ms, + compare_view_write_ms, cir_build_cir_ms, cir_write_cir_ms, cir_total_ms, @@ -1528,14 +1763,19 @@ pub fn run(argv: &[String]) -> Result<(), String> { rsync_download_ms_total, download_bytes_total, }; - write_stage_timing(args.report_json_path.as_deref(), &stage_timing)?; + let stage_timing_anchor_path = args + .report_json_path + .as_deref() + .or(args.ccr_out_path.as_deref()) + .or(args.vrps_csv_out_path.as_deref()); + write_stage_timing(stage_timing_anchor_path, &stage_timing)?; if let Some((out_dir, t)) = timing.as_ref() { - t.record_count("vrps", report.vrps.len() as u64); - t.record_count("aspas", report.aspas.len() as u64); + t.record_count("vrps", shared.vrps.len() as u64); + t.record_count("aspas", shared.aspas.len() as u64); t.record_count( "audit_publication_points", - report.publication_points.len() as u64, + shared.publication_points.len() as u64, ); let timing_json_path = out_dir.join("timing.json"); t.write_json(&timing_json_path, 20)?; @@ -1575,7 +1815,11 @@ pub fn run(argv: &[String]) -> Result<(), String> { eprintln!("analysis: wrote {}", pb_path.display()); } - print_summary(&report); + if let Some(report) = report.as_ref() { + print_summary(report); + } else { + print_summary_from_shared(validation_time, &shared); + } Ok(()) } @@ -1701,6 +1945,104 @@ mod tests { ); } + #[test] + fn parse_accepts_skip_report_build_without_report_json() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--ccr-out".to_string(), + "out/result.ccr".to_string(), + "--skip-report-build".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert!(args.skip_report_build); + assert_eq!( + args.ccr_out_path.as_deref(), + Some(std::path::Path::new("out/result.ccr")) + ); + } + + #[test] + fn parse_accepts_skip_vcir_persist() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--skip-vcir-persist".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert!(args.skip_vcir_persist); + } + + #[test] + fn parse_rejects_skip_report_build_with_report_json() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--report-json".to_string(), + "out/report.json".to_string(), + "--skip-report-build".to_string(), + ]; + let err = parse_args(&argv).expect_err("skip report build with report path should fail"); + assert!( + err.contains("--skip-report-build cannot be combined with --report-json"), + "{err}" + ); + } + + #[test] + fn parse_accepts_direct_compare_view_csv_outputs() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--vrps-csv-out".to_string(), + "out/vrps.csv".to_string(), + "--vaps-csv-out".to_string(), + "out/vaps.csv".to_string(), + "--compare-view-trust-anchor".to_string(), + "unknown".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert_eq!( + args.vrps_csv_out_path.as_deref(), + Some(std::path::Path::new("out/vrps.csv")) + ); + assert_eq!( + args.vaps_csv_out_path.as_deref(), + Some(std::path::Path::new("out/vaps.csv")) + ); + assert_eq!(args.compare_view_trust_anchor.as_deref(), Some("unknown")); + } + + #[test] + fn parse_rejects_partial_direct_compare_view_csv_outputs() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--vrps-csv-out".to_string(), + "out/vrps.csv".to_string(), + ]; + let err = parse_args(&argv).expect_err("partial direct compare view output should fail"); + assert!( + err.contains("--vrps-csv-out and --vaps-csv-out must be provided together"), + "{err}" + ); + } + #[test] fn parse_accepts_external_raw_store_db() { let argv = vec![ @@ -2478,15 +2820,26 @@ mod tests { .with_rfc_refs(&[crate::report::RfcRef("RFC 6487 ยง4.8.8.1")]) .with_context("rsync://example.test/repo/pp/"), ], - vrps: vec![crate::validation::objects::Vrp { - asn: 64496, - prefix: crate::data_model::roa::IpPrefix { - afi: crate::data_model::roa::RoaAfi::Ipv4, - prefix_len: 24, - addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + vrps: vec![ + crate::validation::objects::Vrp { + asn: 64496, + prefix: crate::data_model::roa::IpPrefix { + afi: crate::data_model::roa::RoaAfi::Ipv4, + prefix_len: 24, + addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + }, + max_length: 24, }, - max_length: 24, - }], + crate::validation::objects::Vrp { + asn: 64497, + prefix: crate::data_model::roa::IpPrefix { + afi: crate::data_model::roa::RoaAfi::Ipv6, + prefix_len: 48, + addr: [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + }, + max_length: 64, + }, + ], aspas: vec![crate::validation::objects::AspaAttestation { customer_as_id: 64496, provider_as_ids: vec![64497, 64498], @@ -2562,7 +2915,7 @@ mod tests { let report = build_report(&policy, validation_time, &shared); assert_eq!(unique_rrdp_repos(&report), 2); - assert_eq!(report.vrps.len(), 1); + assert_eq!(report.vrps.len(), 2); assert_eq!(report.aspas.len(), 1); print_summary(&report); @@ -2584,8 +2937,9 @@ mod tests { ) .expect("run report task"); - assert_eq!(report_output.report.vrps.len(), 1); - assert_eq!(report_output.report.aspas.len(), 1); + let report = report_output.report.as_ref().expect("report built"); + assert_eq!(report.vrps.len(), 2); + assert_eq!(report.aspas.len(), 1); assert!(report_output.report_write_ms.is_some()); let report_json = std::fs::read_to_string(&report_path).expect("read report json"); @@ -2598,23 +2952,59 @@ mod tests { ccr_build_ms: Some(2), ccr_build_breakdown: None, ccr_write_ms: Some(3), - cir_build_cir_ms: Some(4), - cir_write_cir_ms: Some(5), - cir_total_ms: Some(6), - total_ms: 7, + compare_view_build_ms: Some(4), + compare_view_write_ms: Some(5), + cir_build_cir_ms: Some(6), + cir_write_cir_ms: Some(7), + cir_total_ms: Some(8), + total_ms: 9, publication_points: shared.publication_points.len(), - repo_sync_ms_total: 8, - publication_point_repo_sync_ms_total: 9, - download_event_count: 10, - rrdp_download_ms_total: 11, - rsync_download_ms_total: 12, - download_bytes_total: 13, + repo_sync_ms_total: 10, + publication_point_repo_sync_ms_total: 11, + download_event_count: 12, + rrdp_download_ms_total: 13, + rsync_download_ms_total: 14, + download_bytes_total: 15, }; write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing"); let stage_timing_json = std::fs::read_to_string(dir.path().join("stage-timing.json")).expect("read timing"); assert!(stage_timing_json.contains("\"validation_ms\"")); assert!(stage_timing_json.contains("\"ccr_build_ms\"")); + + let ccr_path = dir.path().join("result.ccr"); + write_stage_timing(Some(&ccr_path), &stage_timing) + .expect("write stage timing via ccr path"); + assert!( + dir.path().join("stage-timing.json").exists(), + "stage timing should use parent directory of the anchor path" + ); + + let skipped = ReportTaskOutput::skipped(); + assert!(skipped.report.is_none()); + assert_eq!(skipped.report_build_ms, 0); + assert!(skipped.report_write_ms.is_none()); + } + + #[test] + fn run_compare_view_task_writes_csv_from_shared_output() { + let shared = synthetic_post_validation_shared(); + let dir = tempfile::tempdir().expect("tmpdir"); + let vrps_path = dir.path().join("vrps.csv"); + let vaps_path = dir.path().join("vaps.csv"); + + let output = run_compare_view_task(&shared, Some(&vrps_path), Some(&vaps_path), "unknown") + .expect("write direct compare views"); + + assert!(output.build_ms.is_some()); + assert!(output.write_ms.is_some()); + let vrps_csv = std::fs::read_to_string(vrps_path).expect("read vrps csv"); + let vaps_csv = std::fs::read_to_string(vaps_path).expect("read vaps csv"); + assert!(vrps_csv.contains("ASN,IP Prefix,Max Length,Trust Anchor")); + assert!(vrps_csv.contains("AS64496,192.0.2.0/24,24,unknown")); + assert!(vrps_csv.contains("AS64497,2001:db8::/48,64,unknown")); + assert!(vaps_csv.contains("Customer ASN,Providers,Trust Anchor")); + assert!(vaps_csv.contains("AS64496,AS64497;AS64498,unknown")); } #[test] diff --git a/src/sync/repo.rs b/src/sync/repo.rs index 0766b33..657d389 100644 --- a/src/sync/repo.rs +++ b/src/sync/repo.rs @@ -12,7 +12,7 @@ use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log; use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError, load_rrdp_local_state}; use crate::sync::store_projection::{ build_repository_view_present_entry, build_repository_view_withdrawn_entry, - prepare_repo_bytes_batch, + prepare_repo_bytes_batch_owned, }; use std::collections::HashSet; @@ -637,7 +637,7 @@ fn rsync_sync_into_current_store( .as_ref() .map(|t| t.span_phase("rsync_write_current_store_total")); let prepared_bytes = - prepare_repo_bytes_batch(&fetched_objects).map_err(RepoSyncError::Storage)?; + prepare_repo_bytes_batch_owned(fetched_objects).map_err(RepoSyncError::Storage)?; let mut repository_view_entries = Vec::new(); for entry in existing_view { if !new_set.contains(&entry.rsync_uri) { diff --git a/src/sync/store_projection.rs b/src/sync/store_projection.rs index ff145ef..c3140b5 100644 --- a/src/sync/store_projection.rs +++ b/src/sync/store_projection.rs @@ -20,6 +20,12 @@ pub struct PreparedRepoBytesBatch { pub fn prepare_repo_bytes_batch( objects: &[(String, Vec)], +) -> Result { + prepare_repo_bytes_batch_owned(objects.to_vec()) +} + +pub fn prepare_repo_bytes_batch_owned( + objects: Vec<(String, Vec)>, ) -> Result { let mut uri_to_hash: BTreeMap = BTreeMap::new(); let mut pending: BTreeMap> = BTreeMap::new(); @@ -28,14 +34,14 @@ pub fn prepare_repo_bytes_batch( if bytes.is_empty() { return Err(format!("repo bytes for {uri} must not be empty")); } - let sha256_hex = compute_sha256_hex(bytes); + let sha256_hex = compute_sha256_hex(&bytes); uri_to_hash.insert(uri.clone(), sha256_hex.clone()); match pending.entry(sha256_hex) { std::collections::btree_map::Entry::Vacant(slot) => { - slot.insert(bytes.clone()); + slot.insert(bytes); } std::collections::btree_map::Entry::Occupied(existing) => { - if existing.get() != bytes { + if existing.get() != &bytes { return Err(format!( "repo bytes collision for {uri}: same sha256 maps to different bytes" )); @@ -455,7 +461,7 @@ pub fn now_pack_time() -> PackTime { #[cfg(test)] mod tests { - use super::prepare_repo_bytes_batch; + use super::{prepare_repo_bytes_batch, prepare_repo_bytes_batch_owned}; use std::collections::BTreeSet; #[test] @@ -484,4 +490,21 @@ mod tests { .collect::>(); assert_eq!(unique_hashes.len(), 2); } + + #[test] + fn prepare_repo_bytes_batch_owned_deduplicates_without_borrowed_input() { + let objects = vec![ + ( + "rsync://example.test/repo/a.roa".to_string(), + b"same".to_vec(), + ), + ( + "rsync://example.test/repo/b.roa".to_string(), + b"same".to_vec(), + ), + ]; + let prepared = prepare_repo_bytes_batch_owned(objects).expect("prepare repo bytes"); + assert_eq!(prepared.uri_to_hash.len(), 2); + assert_eq!(prepared.blobs_to_write.len(), 1); + } } diff --git a/src/validation/objects.rs b/src/validation/objects.rs index b0a3cd1..a47b21d 100644 --- a/src/validation/objects.rs +++ b/src/validation/objects.rs @@ -134,6 +134,30 @@ pub fn process_publication_point_for_issuer( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, +) -> ObjectsOutput { + process_publication_point_for_issuer_with_options( + publication_point, + policy, + issuer_ca_der, + issuer_ca_rsync_uri, + issuer_effective_ip, + issuer_effective_as, + validation_time, + timing, + true, + ) +} + +pub fn process_publication_point_for_issuer_with_options( + publication_point: &P, + policy: &Policy, + issuer_ca_der: &[u8], + issuer_ca_rsync_uri: Option<&str>, + issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>, + issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, + validation_time: time::OffsetDateTime, + timing: Option<&TimingHandle>, + collect_vcir_local_outputs: bool, ) -> ObjectsOutput { let manifest_rsync_uri = publication_point.manifest_rsync_uri(); let manifest_bytes = publication_point.manifest_bytes(); @@ -321,12 +345,15 @@ pub fn process_publication_point_for_issuer( issuer_effective_as, validation_time, timing, + collect_vcir_local_outputs, ); match result.outcome { Ok(mut ok) => { stats.roa_ok += 1; vrps.append(&mut ok.vrps); - local_outputs_cache.extend(ok.local_outputs); + if collect_vcir_local_outputs { + local_outputs_cache.extend(ok.local_outputs); + } audit.push(ObjectAuditEntry { rsync_uri: result.rsync_uri, sha256_hex: result.sha256_hex, @@ -426,11 +453,14 @@ pub fn process_publication_point_for_issuer( issuer_effective_as, validation_time, timing, + collect_vcir_local_outputs, ) { Ok((att, local_output)) => { stats.aspa_ok += 1; aspas.push(att); - local_outputs_cache.push(local_output); + if let Some(local_output) = local_output { + local_outputs_cache.push(local_output); + } audit.push(ObjectAuditEntry { rsync_uri: file.rsync_uri.clone(), sha256_hex: sha256_hex_from_32(&file.sha256), @@ -536,11 +566,37 @@ pub fn process_publication_point_for_issuer_parallel_roa, config: &ParallelPhase2Config, +) -> ObjectsOutput { + process_publication_point_for_issuer_parallel_roa_with_options( + publication_point, + policy, + issuer_ca_der, + issuer_ca_rsync_uri, + issuer_effective_ip, + issuer_effective_as, + validation_time, + timing, + config, + true, + ) +} + +pub fn process_publication_point_for_issuer_parallel_roa_with_options( + publication_point: &P, + policy: &Policy, + issuer_ca_der: &[u8], + issuer_ca_rsync_uri: Option<&str>, + issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>, + issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, + validation_time: time::OffsetDateTime, + timing: Option<&TimingHandle>, + config: &ParallelPhase2Config, + collect_vcir_local_outputs: bool, ) -> ObjectsOutput { if config.object_workers <= 1 || policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint { - return process_publication_point_for_issuer( + return process_publication_point_for_issuer_with_options( publication_point, policy, issuer_ca_der, @@ -549,13 +605,14 @@ pub fn process_publication_point_for_issuer_parallel_roa pool, Err(_) => { - return process_publication_point_for_issuer( + return process_publication_point_for_issuer_with_options( publication_point, policy, issuer_ca_der, @@ -564,11 +621,12 @@ pub fn process_publication_point_for_issuer_parallel_roa, pool: &ParallelRoaWorkerPool, +) -> ObjectsOutput { + process_publication_point_for_issuer_parallel_roa_with_pool_options( + publication_point, + policy, + issuer_ca_der, + issuer_ca_rsync_uri, + issuer_effective_ip, + issuer_effective_as, + validation_time, + timing, + pool, + true, + ) +} + +pub fn process_publication_point_for_issuer_parallel_roa_with_pool_options< + P: PublicationPointData, +>( + publication_point: &P, + policy: &Policy, + issuer_ca_der: &[u8], + issuer_ca_rsync_uri: Option<&str>, + issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>, + issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, + validation_time: time::OffsetDateTime, + timing: Option<&TimingHandle>, + pool: &ParallelRoaWorkerPool, + collect_vcir_local_outputs: bool, ) -> ObjectsOutput { if policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint { - return process_publication_point_for_issuer( + return process_publication_point_for_issuer_with_options( publication_point, policy, issuer_ca_der, @@ -602,6 +689,7 @@ pub fn process_publication_point_for_issuer_parallel_roa_with_pool, issuer_effective_as: Option, validation_time: time::OffsetDateTime, + collect_vcir_local_outputs: bool, } #[derive(Clone)] @@ -736,6 +827,7 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult { task.issuer_effective_as.as_ref(), task.validation_time, None, + task.collect_vcir_local_outputs, ) .map(|(vrps, local_outputs)| RoaTaskOk { vrps, @@ -769,6 +861,7 @@ pub(crate) struct ParallelObjectsStage { issuer_effective_ip: Option, issuer_effective_as: Option, validation_time: time::OffsetDateTime, + collect_vcir_local_outputs: bool, warnings: Vec, stats: ObjectsStats, audit: Vec, @@ -794,6 +887,7 @@ impl ParallelObjectsStage { issuer_effective_ip: self.issuer_effective_ip.clone(), issuer_effective_as: self.issuer_effective_as.clone(), validation_time: self.validation_time, + collect_vcir_local_outputs: self.collect_vcir_local_outputs, }) .collect() } @@ -811,6 +905,7 @@ pub(crate) fn prepare_publication_point_for_parallel_roa, issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, + collect_vcir_local_outputs: bool, ) -> ParallelObjectsPrepare { let manifest_rsync_uri = publication_point.manifest_rsync_uri(); let manifest_bytes = publication_point.manifest_bytes(); @@ -985,6 +1080,7 @@ pub(crate) fn prepare_publication_point_for_parallel_roa { stats.roa_ok += 1; vrps.append(&mut ok.vrps); - local_outputs_cache.extend(ok.local_outputs); + if stage.collect_vcir_local_outputs { + local_outputs_cache.extend(ok.local_outputs); + } audit.push(ObjectAuditEntry { rsync_uri: result.rsync_uri, sha256_hex: result.sha256_hex, @@ -1066,11 +1164,14 @@ pub(crate) fn reduce_parallel_roa_stage( stage.issuer_effective_as.as_ref(), stage.validation_time, timing, + stage.collect_vcir_local_outputs, ) { Ok((att, local_output)) => { stats.aspa_ok += 1; aspas.push(att); - local_outputs_cache.push(local_output); + if let Some(local_output) = local_output { + local_outputs_cache.push(local_output); + } audit.push(ObjectAuditEntry { rsync_uri: file.rsync_uri.clone(), sha256_hex: sha256_hex_from_32(&file.sha256), @@ -1120,6 +1221,7 @@ fn process_publication_point_for_issuer_parallel_roa_inner, pool: &ParallelRoaWorkerPool, + collect_vcir_local_outputs: bool, ) -> Result { let stage = match prepare_publication_point_for_parallel_roa( 0, @@ -1129,6 +1231,7 @@ fn process_publication_point_for_issuer_parallel_roa_inner return Ok(out), ParallelObjectsPrepare::Staged(stage) => stage, @@ -1279,6 +1382,7 @@ pub(crate) fn validate_roa_task_serial( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, + collect_vcir_local_outputs: bool, ) -> RoaTaskResult { let sha256_hex = sha256_hex_from_32(&task.file.sha256); let outcome = process_roa_with_issuer( @@ -1294,6 +1398,7 @@ pub(crate) fn validate_roa_task_serial( issuer_effective_as, validation_time, timing, + collect_vcir_local_outputs, ) .map(|(vrps, local_outputs)| RoaTaskOk { vrps, @@ -1322,6 +1427,7 @@ fn process_roa_with_issuer( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, + collect_vcir_local_outputs: bool, ) -> Result<(Vec, Vec), ObjectValidateError> { let _decode = timing .as_ref() @@ -1378,6 +1484,9 @@ fn process_roa_with_issuer( drop(_subset); let vrps = roa_to_vrps(&roa); + if !collect_vcir_local_outputs { + return Ok((vrps, Vec::new())); + } let source_object_hash = sha256_hex_from_32(&file.sha256); let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice()); let item_effective_until = @@ -1434,6 +1543,7 @@ fn process_roa_with_issuer_parallel_cached( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, + collect_vcir_local_outputs: bool, ) -> Result<(Vec, Vec), ObjectValidateError> { let _decode = timing .as_ref() @@ -1496,6 +1606,9 @@ fn process_roa_with_issuer_parallel_cached( drop(_subset); let vrps = roa_to_vrps(&roa); + if !collect_vcir_local_outputs { + return Ok((vrps, Vec::new())); + } let source_object_hash = sha256_hex_from_32(&file.sha256); let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice()); let item_effective_until = @@ -1552,7 +1665,8 @@ fn process_aspa_with_issuer( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, -) -> Result<(AspaAttestation, VcirLocalOutput), ObjectValidateError> { + collect_vcir_local_outputs: bool, +) -> Result<(AspaAttestation, Option), ObjectValidateError> { let _decode = timing .as_ref() .map(|t| t.span_phase("objects_aspa_decode_and_validate_total")); @@ -1611,6 +1725,9 @@ fn process_aspa_with_issuer( customer_as_id: aspa.aspa.customer_as_id, provider_as_ids: aspa.aspa.provider_as_ids.clone(), }; + if !collect_vcir_local_outputs { + return Ok((attestation, None)); + } let source_object_hash = sha256_hex_from_32(&file.sha256); let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice()); let item_effective_until = @@ -1649,7 +1766,7 @@ fn process_aspa_with_issuer( ], }; - Ok((attestation, local_output)) + Ok((attestation, Some(local_output))) } fn vrp_prefix_to_string(vrp: &Vrp) -> String { diff --git a/src/validation/run.rs b/src/validation/run.rs index bee838b..62804cb 100644 --- a/src/validation/run.rs +++ b/src/validation/run.rs @@ -78,6 +78,7 @@ pub fn run_publication_point_once( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let result = runner diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index eeeb3c0..7a67a8e 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -125,6 +125,7 @@ fn make_live_runner<'a>( repo_sync_runtime: Option>, parallel_phase2_config: Option, ccr_accumulator: Option, + persist_vcir: bool, ) -> Rpkiv1PublicationPointRunner<'a> { let parallel_roa_worker_pool = parallel_phase2_config .as_ref() @@ -148,6 +149,7 @@ fn make_live_runner<'a>( parallel_phase2_config, parallel_roa_worker_pool, ccr_accumulator: ccr_accumulator.map(Mutex::new), + persist_vcir, } } @@ -300,6 +302,7 @@ pub fn run_tree_from_tal_url_serial( None, None, None, + config.persist_vcir, ); let root = root_handle_from_trust_anchor( @@ -337,6 +340,7 @@ pub fn run_tree_from_tal_url_serial_audit( None, None, None, + config.persist_vcir, ); let root = root_handle_from_trust_anchor( @@ -391,6 +395,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( None, None, None, + config.persist_vcir, ); let root = root_handle_from_trust_anchor( @@ -460,7 +465,9 @@ where Some(current_repo_index), Some(runtime), phase2_config, - phase2_enabled.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])), + (phase2_enabled && config.build_ccr_accumulator) + .then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])), + config.persist_vcir, ); let root = root_handle_from_trust_anchor( @@ -490,9 +497,7 @@ where Some(¤t_repo_index_for_output), collect_current_repo_objects, ), - ccr_accumulator: phase2_enabled - .then(|| runner.ccr_accumulator_snapshot()) - .flatten(), + ccr_accumulator: runner.ccr_accumulator_snapshot(), }) } @@ -554,7 +559,7 @@ where Some(current_repo_index), Some(runtime), phase2_config, - phase2_enabled.then(|| { + (phase2_enabled && config.build_ccr_accumulator).then(|| { CcrAccumulator::new( discoveries .iter() @@ -562,6 +567,7 @@ where .collect::>(), ) }), + config.persist_vcir, ); let TreeRunAuditOutput { @@ -585,9 +591,7 @@ where Some(¤t_repo_index_for_output), collect_current_repo_objects, ), - ccr_accumulator: phase2_enabled - .then(|| runner.ccr_accumulator_snapshot()) - .flatten(), + ccr_accumulator: runner.ccr_accumulator_snapshot(), }) } @@ -841,6 +845,7 @@ pub fn run_tree_from_tal_and_ta_der_serial( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -892,6 +897,7 @@ pub fn run_tree_from_tal_bytes_serial_audit( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -960,6 +966,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1023,6 +1030,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1087,6 +1095,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1158,6 +1167,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1215,6 +1225,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1289,6 +1300,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let root = root_handle_from_trust_anchor( @@ -1346,6 +1358,7 @@ fn build_payload_replay_runner<'a>( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, } } @@ -1378,6 +1391,7 @@ fn build_payload_delta_replay_runner<'a>( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, } } @@ -1410,6 +1424,7 @@ fn build_payload_delta_replay_current_store_runner<'a>( parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, } } @@ -1888,6 +1903,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .unwrap_err(); @@ -1920,6 +1938,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run replay root-only audit"); @@ -1962,6 +1983,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run replay root-only audit"); @@ -2005,6 +2029,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, &timing, ) @@ -2058,6 +2085,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .unwrap_err(); @@ -2087,6 +2117,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .unwrap_err(); @@ -2136,6 +2169,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run delta replay root-only audit"); @@ -2194,6 +2230,9 @@ mod replay_api_tests { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, &timing, ) diff --git a/src/validation/tree.rs b/src/validation/tree.rs index 98cacab..d66beea 100644 --- a/src/validation/tree.rs +++ b/src/validation/tree.rs @@ -12,6 +12,12 @@ pub struct TreeRunConfig { pub max_depth: Option, /// Max number of CA instances to process. pub max_instances: Option, + /// Drop per-object audit payload when the caller only needs validation outputs. + pub compact_audit: bool, + /// Persist per-CA VCIR state for future reuse/delta runs. + pub persist_vcir: bool, + /// Build online CCR manifest projections during phase2 validation. + pub build_ccr_accumulator: bool, } impl Default for TreeRunConfig { @@ -19,6 +25,9 @@ impl Default for TreeRunConfig { Self { max_depth: None, max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, } } } @@ -192,16 +201,20 @@ pub fn run_tree_serial_audit_multi_root( }; instances_processed += 1; - warnings.extend(res.warnings.clone()); - warnings.extend(res.objects.warnings.clone()); - vrps.extend(res.objects.vrps.clone()); - aspas.extend(res.objects.aspas.clone()); - router_keys.extend(res.objects.router_keys.clone()); + warnings.extend(res.warnings); + warnings.extend(res.objects.warnings); + vrps.extend(res.objects.vrps); + aspas.extend(res.objects.aspas); + router_keys.extend(res.objects.router_keys); - let mut audit = res.audit.clone(); + let mut audit = res.audit; audit.node_id = Some(node.id); audit.parent_node_id = node.parent_id; audit.discovered_from = node.discovered_from.clone(); + if config.compact_audit { + audit.objects.clear(); + audit.warnings.clear(); + } publication_points.push(audit); let mut children = res.discovered_children; diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index 26c2b30..0439840 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -51,17 +51,25 @@ struct FinishedPublicationPoint { fn compact_phase2_finished_result( mut result: PublicationPointRunResult, + compact_audit: bool, ) -> PublicationPointRunResult { // Phase2 only needs warnings, objects, audit, and traversal metadata after finalize. // Dropping the snapshot here avoids retaining manifest/files/raw-byte caches until run end. result.snapshot = None; + if compact_audit { + result.audit.objects.clear(); + result.audit.warnings.clear(); + result.objects.audit.clear(); + result.discovered_children.clear(); + } result } fn compact_phase2_finished_result_result( result: Result, + compact_audit: bool, ) -> Result { - result.map(compact_phase2_finished_result) + result.map(|result| compact_phase2_finished_result(result, compact_audit)) } pub fn run_tree_parallel_phase2_audit_multi_root( @@ -150,11 +158,17 @@ pub fn run_tree_parallel_phase2_audit_multi_root( &mut inflight_publication_points, &mut finished, ready, + config.compact_audit, ); } flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?; - drain_object_results(runner, &mut inflight_publication_points, &mut finished)?; + drain_object_results( + runner, + &mut inflight_publication_points, + &mut finished, + config.compact_audit, + )?; let repo_poll_timeout = event_poll_timeout( &ca_queue, &ready_queue, @@ -204,6 +218,7 @@ fn stage_ready_publication_point( inflight_publication_points: &mut HashMap, finished: &mut Vec, ready: ReadyCaInstance, + compact_audit: bool, ) { let publication_point_started = Instant::now(); let mut warnings = ready.repo_outcome.warnings.clone(); @@ -229,7 +244,7 @@ fn stage_ready_publication_point( } finished.push(FinishedPublicationPoint { node: ready.node, - result: compact_phase2_finished_result_result(fallback), + result: compact_phase2_finished_result_result(fallback, compact_audit), }); return; } @@ -252,6 +267,7 @@ fn stage_ready_publication_point( ready.node.handle.effective_ip_resources.as_ref(), ready.node.handle.effective_as_resources.as_ref(), runner.validation_time, + runner.persist_vcir, ) { ParallelObjectsPrepare::Complete(mut objects) => { objects @@ -271,6 +287,7 @@ fn stage_ready_publication_point( objects, repo_outcome, finished, + compact_audit, ); } ParallelObjectsPrepare::Staged(objects_stage) => { @@ -299,6 +316,7 @@ fn stage_ready_publication_point( objects, repo_outcome, finished, + compact_audit, ); } Err(err) => finished.push(FinishedPublicationPoint { @@ -367,6 +385,7 @@ fn finalize_ready_objects( objects: ObjectsOutput, repo_outcome: RepoSyncRuntimeOutcome, finished: &mut Vec, + compact_audit: bool, ) { let result = runner .finalize_fresh_publication_point_from_reducer( @@ -384,7 +403,7 @@ fn finalize_ready_objects( .map(|out| out.result); finished.push(FinishedPublicationPoint { node, - result: compact_phase2_finished_result_result(result), + result: compact_phase2_finished_result_result(result, compact_audit), }); } @@ -416,6 +435,7 @@ fn drain_object_results( runner: &Rpkiv1PublicationPointRunner<'_>, inflight_publication_points: &mut HashMap, finished: &mut Vec, + compact_audit: bool, ) -> Result<(), TreeRunError> { let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else { return Ok(()); @@ -479,7 +499,7 @@ fn drain_object_results( ); finished.push(FinishedPublicationPoint { node: state.node, - result: compact_phase2_finished_result_result(result), + result: compact_phase2_finished_result_result(result, compact_audit), }); } Err(err) => finished.push(FinishedPublicationPoint { @@ -572,11 +592,11 @@ fn build_tree_output(mut finished: Vec) -> TreeRunAudi match item.result { Ok(result) => { instances_processed += 1; - warnings.extend(result.warnings.clone()); - warnings.extend(result.objects.warnings.clone()); - vrps.extend(result.objects.vrps.clone()); - aspas.extend(result.objects.aspas.clone()); - router_keys.extend(result.objects.router_keys.clone()); + warnings.extend(result.warnings); + warnings.extend(result.objects.warnings); + vrps.extend(result.objects.vrps); + aspas.extend(result.objects.aspas); + router_keys.extend(result.objects.router_keys); let mut audit: PublicationPointAudit = result.audit; audit.node_id = Some(item.node.id); @@ -666,15 +686,43 @@ mod tests { #[test] fn compact_phase2_finished_result_drops_snapshot() { - let result = compact_phase2_finished_result(sample_result()); + let result = compact_phase2_finished_result(sample_result(), false); assert!(result.snapshot.is_none()); assert_eq!(result.source, PublicationPointSource::Fresh); assert!(result.discovered_children.is_empty()); } + #[test] + fn compact_phase2_finished_result_can_drop_audit_payload() { + let mut sample = sample_result(); + sample.audit.objects.push(crate::audit::ObjectAuditEntry { + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + sha256_hex: "11".repeat(32), + kind: crate::audit::AuditObjectKind::Roa, + result: crate::audit::AuditObjectResult::Ok, + detail: None, + }); + sample.audit.warnings.push(crate::audit::AuditWarning { + message: "warning".to_string(), + rfc_refs: Vec::new(), + context: None, + }); + sample.objects.audit.push(crate::audit::ObjectAuditEntry { + rsync_uri: "rsync://example.test/repo/b.roa".to_string(), + sha256_hex: "22".repeat(32), + kind: crate::audit::AuditObjectKind::Roa, + result: crate::audit::AuditObjectResult::Ok, + detail: None, + }); + let result = compact_phase2_finished_result(sample, true); + assert!(result.audit.objects.is_empty()); + assert!(result.audit.warnings.is_empty()); + assert!(result.objects.audit.is_empty()); + } + #[test] fn compact_phase2_finished_result_result_preserves_err() { - let err = compact_phase2_finished_result_result(Err("boom".to_string())) + let err = compact_phase2_finished_result_result(Err("boom".to_string()), false) .expect_err("error should be preserved"); assert_eq!(err, "boom"); } diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index bdd3a95..d62869d 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -44,8 +44,8 @@ use crate::validation::manifest::{ }; use crate::validation::objects::{ AspaAttestation, ParallelRoaWorkerPool, RouterKeyPayload, Vrp, - process_publication_point_for_issuer, process_publication_point_for_issuer_parallel_roa, - process_publication_point_for_issuer_parallel_roa_with_pool, + process_publication_point_for_issuer_parallel_roa_with_options, + process_publication_point_for_issuer_parallel_roa_with_pool_options, }; use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::tree::{ @@ -138,6 +138,11 @@ pub struct Rpkiv1PublicationPointRunner<'a> { pub parallel_phase2_config: Option, pub parallel_roa_worker_pool: Option, pub ccr_accumulator: Option>, + /// When false, skip VCIR persistence and per-output VCIR projection building. + /// + /// This is intended for replay/compare-only runs where the caller does not need + /// the resulting DB to be reused by a later delta run. + pub persist_vcir: bool, } impl<'a> Rpkiv1PublicationPointRunner<'a> { @@ -267,17 +272,21 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { let snapshot_pack_ms = snapshot_pack_started.elapsed().as_millis() as u64; let persist_vcir_started = std::time::Instant::now(); - let persist_vcir_timing = persist_vcir_for_fresh_result_with_timing( - self.store, - ca, - &pack, - &objects, - &warnings, - &child_audits, - &discovered_children, - self.validation_time, - ) - .map_err(|e| format!("persist VCIR failed: {e}"))?; + let persist_vcir_timing = if self.persist_vcir { + persist_vcir_for_fresh_result_with_timing( + self.store, + ca, + &pack, + &objects, + &warnings, + &child_audits, + &discovered_children, + self.validation_time, + ) + .map_err(|e| format!("persist VCIR failed: {e}"))? + } else { + PersistVcirTimingBreakdown::default() + }; let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64; // local_outputs_cache only exists to build/persist VCIR. Release it before the @@ -607,7 +616,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { .as_ref() .map(|t| t.span_phase("objects_processing_total")); if let Some(phase2_pool) = self.parallel_roa_worker_pool.as_ref() { - process_publication_point_for_issuer_parallel_roa_with_pool( + process_publication_point_for_issuer_parallel_roa_with_pool_options( &fresh_point, self.policy, &ca.ca_certificate_der, @@ -617,9 +626,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { self.validation_time, self.timing.as_ref(), phase2_pool, + self.persist_vcir, ) } else if let Some(phase2_config) = self.parallel_phase2_config.as_ref() { - process_publication_point_for_issuer_parallel_roa( + process_publication_point_for_issuer_parallel_roa_with_options( &fresh_point, self.policy, &ca.ca_certificate_der, @@ -629,9 +639,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { self.validation_time, self.timing.as_ref(), phase2_config, + self.persist_vcir, ) } else { - process_publication_point_for_issuer( + crate::validation::objects::process_publication_point_for_issuer_with_options( &fresh_point, self.policy, &ca.ca_certificate_der, @@ -640,6 +651,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { ca.effective_as_resources.as_ref(), self.validation_time, self.timing.as_ref(), + self.persist_vcir, ) } }; @@ -3283,6 +3295,7 @@ mod tests { parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))), + persist_vcir: true, } } @@ -4159,6 +4172,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let ca = CaInstanceHandle { depth: 0, @@ -4711,6 +4725,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; // For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for @@ -4873,6 +4888,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let first = runner.run_publication_point(&handle).expect("first run ok"); @@ -4985,6 +5001,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let first = runner.run_publication_point(&handle).expect("first run ok"); @@ -5100,6 +5117,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let first = runner.run_publication_point(&handle).expect("first run ok"); @@ -5187,6 +5205,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let first = ok_runner .run_publication_point(&handle) @@ -5217,6 +5236,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let second = bad_runner .run_publication_point(&handle) @@ -6551,6 +6571,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let first = runner_rrdp .run_publication_point(&handle) @@ -6584,6 +6605,7 @@ authorityKeyIdentifier = keyid:always parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let third = runner_rsync .run_publication_point(&handle) diff --git a/tests/test_apnic_stats_live_stage2.rs b/tests/test_apnic_stats_live_stage2.rs index 161a690..c0e004f 100644 --- a/tests/test_apnic_stats_live_stage2.rs +++ b/tests/test_apnic_stats_live_stage2.rs @@ -185,6 +185,7 @@ fn apnic_tree_full_stats_serial() { parallel_phase2_config: None, parallel_roa_worker_pool: None, ccr_accumulator: None, + persist_vcir: true, }; let stats = RefCell::new(LiveStats::default()); @@ -213,6 +214,9 @@ fn apnic_tree_full_stats_serial() { &TreeRunConfig { max_depth, max_instances, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree"); diff --git a/tests/test_apnic_tree_live_m15.rs b/tests/test_apnic_tree_live_m15.rs index 36fa14e..565fce2 100644 --- a/tests/test_apnic_tree_live_m15.rs +++ b/tests/test_apnic_tree_live_m15.rs @@ -36,6 +36,9 @@ fn apnic_tree_depth1_processes_more_than_root() { &TreeRunConfig { max_depth: Some(1), max_instances: Some(2), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree from tal"); @@ -74,6 +77,9 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run APNIC root-only"); diff --git a/tests/test_deterministic_semantics_m4.rs b/tests/test_deterministic_semantics_m4.rs index 0bda41e..b056052 100644 --- a/tests/test_deterministic_semantics_m4.rs +++ b/tests/test_deterministic_semantics_m4.rs @@ -108,6 +108,9 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree audit"); diff --git a/tests/test_run_tree_from_tal_offline_m17.rs b/tests/test_run_tree_from_tal_offline_m17.rs index 464aec3..dad9f9c 100644 --- a/tests/test_run_tree_from_tal_offline_m17.rs +++ b/tests/test_run_tree_from_tal_offline_m17.rs @@ -114,6 +114,9 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree"); @@ -158,6 +161,9 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree"); @@ -210,6 +216,9 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree audit"); @@ -250,6 +259,9 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_ &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree audit"); @@ -298,6 +310,9 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() { &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, &timing, ) @@ -345,6 +360,9 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty &TreeRunConfig { max_depth: Some(0), max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, &timing, ) diff --git a/tests/test_tree_traversal_m14.rs b/tests/test_tree_traversal_m14.rs index ffab86b..3f68d3d 100644 --- a/tests/test_tree_traversal_m14.rs +++ b/tests/test_tree_traversal_m14.rs @@ -286,6 +286,9 @@ fn tree_respects_max_depth_and_max_instances() { &TreeRunConfig { max_depth: Some(0), max_instances: None, + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree depth-limited"); @@ -298,6 +301,9 @@ fn tree_respects_max_depth_and_max_instances() { &TreeRunConfig { max_depth: None, max_instances: Some(1), + compact_audit: false, + persist_vcir: true, + build_ccr_accumulator: true, }, ) .expect("run tree instance-limited");