diff --git a/scripts/compare/run_perf_compare_quick_remote.sh b/scripts/compare/run_perf_compare_quick_remote.sh index 1a7a985..bed0e4d 100755 --- a/scripts/compare/run_perf_compare_quick_remote.sh +++ b/scripts/compare/run_perf_compare_quick_remote.sh @@ -7,6 +7,7 @@ Usage: ./scripts/compare/run_perf_compare_quick_remote.sh \ --run-root \ --remote-root \ + [--rir-set ] \ [--ssh-target ] \ [--rpki-client-bin ] \ [--libtls-path ] \ @@ -23,6 +24,7 @@ SSH_TARGET="${SSH_TARGET:-root@47.251.56.108}" RPKI_CLIENT_BIN="${RPKI_CLIENT_BIN:-/home/yuyr/dev/rpki-client-9.7/build-m5/src/rpki-client}" LIBTLS_PATH="${LIBTLS_PATH:-/home/yuyr/dev/rpki-client-9.7/.deps/libtls/root/usr/lib/x86_64-linux-gnu/libtls.so.28.0.0}" RP_RUN_MODE="${RP_RUN_MODE:-serial}" +RIR_SET="${RIR_SET:-mixed2}" OURS_EXTRA_ARGS="${OURS_EXTRA_ARGS:-}" DRY_RUN=0 @@ -30,6 +32,7 @@ while [[ $# -gt 0 ]]; do case "$1" in --run-root) RUN_ROOT="$2"; shift 2 ;; --remote-root) REMOTE_ROOT="$2"; shift 2 ;; + --rir-set) RIR_SET="$2"; shift 2 ;; --ssh-target) SSH_TARGET="$2"; shift 2 ;; --rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;; --libtls-path) LIBTLS_PATH="$2"; shift 2 ;; @@ -43,6 +46,7 @@ done [[ -n "$RUN_ROOT" && -n "$REMOTE_ROOT" ]] || { usage >&2; exit 2; } [[ "$RP_RUN_MODE" == "serial" || "$RP_RUN_MODE" == "parallel" ]] || { echo "invalid --rp-run-mode: $RP_RUN_MODE" >&2; usage; exit 2; } +[[ "$RIR_SET" == "mixed2" || "$RIR_SET" == "all5" ]] || { echo "invalid --rir-set: $RIR_SET" >&2; usage; exit 2; } [[ "$DRY_RUN" -eq 1 || -x "$RPKI_CLIENT_BIN" ]] || { echo "rpki-client binary not executable: $RPKI_CLIENT_BIN" >&2; exit 2; } [[ "$DRY_RUN" -eq 1 || -f "$LIBTLS_PATH" ]] || { echo "libtls not found: $LIBTLS_PATH" >&2; exit 2; } @@ -56,15 +60,50 @@ PY mkdir -p "$RUN_ROOT/steps/step-001/ours" "$RUN_ROOT/steps/step-001/rpki-client" "$RUN_ROOT/steps/step-001/compare" mkdir -p "$RUN_ROOT/steps/step-002/ours" "$RUN_ROOT/steps/step-002/rpki-client" "$RUN_ROOT/steps/step-002/compare" -APNIC_TAL="$ROOT_DIR/tests/fixtures/tal/apnic-rfc7730-https.tal" -APNIC_TA="$ROOT_DIR/tests/fixtures/ta/apnic-ta.cer" -ARIN_TAL="$ROOT_DIR/tests/fixtures/tal/arin.tal" -ARIN_TA="$ROOT_DIR/tests/fixtures/ta/arin-ta.cer" +tal_path_for_rir() { + case "$1" in + afrinic) printf '%s' "$ROOT_DIR/tests/fixtures/tal/afrinic.tal" ;; + apnic) printf '%s' "$ROOT_DIR/tests/fixtures/tal/apnic-rfc7730-https.tal" ;; + arin) printf '%s' "$ROOT_DIR/tests/fixtures/tal/arin.tal" ;; + lacnic) printf '%s' "$ROOT_DIR/tests/fixtures/tal/lacnic.tal" ;; + ripe) printf '%s' "$ROOT_DIR/tests/fixtures/tal/ripe-ncc.tal" ;; + *) echo "unknown rir: $1" >&2; exit 2 ;; + esac +} + +ta_path_for_rir() { + case "$1" in + afrinic) printf '%s' "$ROOT_DIR/tests/fixtures/ta/afrinic-ta.cer" ;; + apnic) printf '%s' "$ROOT_DIR/tests/fixtures/ta/apnic-ta.cer" ;; + arin) printf '%s' "$ROOT_DIR/tests/fixtures/ta/arin-ta.cer" ;; + lacnic) printf '%s' "$ROOT_DIR/tests/fixtures/ta/lacnic-ta.cer" ;; + ripe) printf '%s' "$ROOT_DIR/tests/fixtures/ta/ripe-ncc-ta.cer" ;; + *) echo "unknown rir: $1" >&2; exit 2 ;; + esac +} + +case "$RIR_SET" in + mixed2) + RIRS=(apnic arin) + SCOPE_LABEL="APNIC+ARIN mixed release two-step synchronized compare" + ;; + all5) + RIRS=(afrinic apnic arin lacnic ripe) + SCOPE_LABEL="all-five-RIR mixed release two-step synchronized compare" + ;; +esac + +COPY_FILES=() +for rir in "${RIRS[@]}"; do + COPY_FILES+=("$(tal_path_for_rir "$rir")" "$(ta_path_for_rir "$rir")") +done if [[ "$DRY_RUN" -eq 1 ]]; then cat </dev/null 2>&1 || useradd -r -M -s /usr/sbin/nologin _rpki-client || true; rm -rf '$REMOTE_ROOT'; mkdir -p '$REMOTE_ROOT/bin' '$REMOTE_ROOT/lib' '$REMOTE_ROOT/state/ours' '$REMOTE_ROOT/state/rpki-client' '$REMOTE_ROOT/steps/step-001/ours' '$REMOTE_ROOT/steps/step-001/rpki-client' '$REMOTE_ROOT/steps/step-002/ours' '$REMOTE_ROOT/steps/step-002/rpki-client'" -scp "$ROOT_DIR/target/release/rpki" "$APNIC_TAL" "$APNIC_TA" "$ARIN_TAL" "$ARIN_TA" "$SSH_TARGET:$REMOTE_ROOT/" +scp "$ROOT_DIR/target/release/rpki" "${COPY_FILES[@]}" "$SSH_TARGET:$REMOTE_ROOT/" scp "$RPKI_CLIENT_BIN" "$SSH_TARGET:$REMOTE_ROOT/bin/rpki-client" scp "$LIBTLS_PATH" "$SSH_TARGET:$REMOTE_ROOT/lib/libtls.so.28" +printf '%s' "$OURS_EXTRA_ARGS" | ssh "$SSH_TARGET" "cat > '$REMOTE_ROOT/ours-extra-args.txt'" +printf '%s' "$RP_RUN_MODE" | ssh "$SSH_TARGET" "cat > '$REMOTE_ROOT/rp-run-mode.txt'" +printf '%s' "$RIR_SET" | ssh "$SSH_TARGET" "cat > '$REMOTE_ROOT/rir-set.txt'" run_step() { local step_id="$1" local kind="$2" local local_step="$RUN_ROOT/steps/$step_id" - ssh "$SSH_TARGET" bash -s -- "$REMOTE_ROOT" "$step_id" "$kind" "$OURS_EXTRA_ARGS" "$RP_RUN_MODE" <<'EOS' + ssh "$SSH_TARGET" bash -s -- "$REMOTE_ROOT" "$step_id" "$kind" <<'EOS' set -euo pipefail REMOTE_ROOT="$1" STEP_ID="$2" KIND="$3" -OURS_EXTRA_ARGS="$4" -RP_RUN_MODE="$5" cd "$REMOTE_ROOT" mkdir -p "steps/$STEP_ID/ours" "steps/$STEP_ID/rpki-client" +OURS_EXTRA_ARGS="$(cat ours-extra-args.txt)" +RP_RUN_MODE="$(cat rp-run-mode.txt)" +RIR_SET="$(cat rir-set.txt)" OURS_EXTRA_ARGV=() if [[ -n "$OURS_EXTRA_ARGS" ]]; then # shellcheck disable=SC2206 OURS_EXTRA_ARGV=($OURS_EXTRA_ARGS) fi +case "$RIR_SET" in + mixed2) RIRS=(apnic arin) ;; + all5) RIRS=(afrinic apnic arin lacnic ripe) ;; + *) echo "invalid rir set: $RIR_SET" >&2; exit 2 ;; +esac + +tal_file_for_rir() { + case "$1" in + afrinic) printf '%s' "afrinic.tal" ;; + apnic) printf '%s' "apnic-rfc7730-https.tal" ;; + arin) printf '%s' "arin.tal" ;; + lacnic) printf '%s' "lacnic.tal" ;; + ripe) printf '%s' "ripe-ncc.tal" ;; + *) echo "unknown rir: $1" >&2; exit 2 ;; + esac +} + +ta_file_for_rir() { + case "$1" in + afrinic) printf '%s' "afrinic-ta.cer" ;; + apnic) printf '%s' "apnic-ta.cer" ;; + arin) printf '%s' "arin-ta.cer" ;; + lacnic) printf '%s' "lacnic-ta.cer" ;; + ripe) printf '%s' "ripe-ncc-ta.cer" ;; + *) echo "unknown rir: $1" >&2; exit 2 ;; + esac +} + +OURS_TAL_ARGS=() +CLIENT_TAL_ARGS=() +for rir in "${RIRS[@]}"; do + tal_file="$(tal_file_for_rir "$rir")" + ta_file="$(ta_file_for_rir "$rir")" + OURS_TAL_ARGS+=(--tal-path "$tal_file" --ta-path "$ta_file") + CLIENT_TAL_ARGS+=(-t "../../$tal_file") +done + if [[ "$KIND" == "snapshot" ]]; then rm -rf state/ours/work-db state/ours/raw-store.db state/rpki-client/cache state/rpki-client/out state/rpki-client/ta state/rpki-client/.ta fi @@ -142,8 +222,7 @@ PY env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=0 ./rpki \ --db state/ours/work-db \ --raw-store-db state/ours/raw-store.db \ - --tal-path apnic-rfc7730-https.tal --ta-path apnic-ta.cer \ - --tal-path arin.tal --ta-path arin-ta.cer \ + "${OURS_TAL_ARGS[@]}" \ --parallel-phase1 \ "${OURS_EXTRA_ARGV[@]}" \ --ccr-out "steps/$STEP_ID/ours/result.ccr" \ @@ -189,8 +268,7 @@ PY set +e LD_LIBRARY_PATH="$REMOTE_ROOT/lib${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}" "$REMOTE_ROOT/bin/rpki-client" \ -vv \ - -t ../../apnic-rfc7730-https.tal \ - -t ../../arin.tal \ + "${CLIENT_TAL_ARGS[@]}" \ -d cache out \ > "$REMOTE_ROOT/steps/$STEP_ID/rpki-client/run.log" 2>&1 exit_code=$? @@ -286,14 +364,16 @@ PY run_step step-001 snapshot run_step step-002 delta -python3 - <<'PY' "$RUN_ROOT/steps/step-001/step-summary.json" "$RUN_ROOT/steps/step-002/step-summary.json" "$RUN_ROOT/summary.json" "$RP_RUN_MODE" "$OURS_EXTRA_ARGS" +python3 - <<'PY' "$RUN_ROOT/steps/step-001/step-summary.json" "$RUN_ROOT/steps/step-002/step-summary.json" "$RUN_ROOT/summary.json" "$RP_RUN_MODE" "$OURS_EXTRA_ARGS" "$RIR_SET" "$SCOPE_LABEL" "${RIRS[@]}" import json, sys steps = [json.load(open(p)) for p in sys.argv[1:3]] summary = { "workflowName": "性能对比测试快速版", - "scope": "APNIC+ARIN mixed release two-step synchronized compare", + "scope": sys.argv[7], "rpRunMode": sys.argv[4], "oursExtraArgs": sys.argv[5], + "rirSet": sys.argv[6], + "rirs": sys.argv[8:], "steps": steps, } json.dump(summary, open(sys.argv[3], "w"), indent=2, ensure_ascii=False) diff --git a/src/cli.rs b/src/cli.rs index 2a76947..3e0a55c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1143,6 +1143,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { let total_started = std::time::Instant::now(); let validation_started = std::time::Instant::now(); + let collect_current_repo_objects = args.cir_enabled; let out = if delta_replay_mode { let tal_path = args .tal_path @@ -1279,6 +1280,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1293,6 +1295,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } @@ -1319,6 +1322,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1333,6 +1337,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } @@ -1384,6 +1389,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1400,6 +1406,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } @@ -1497,6 +1504,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1511,6 +1519,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } @@ -1537,6 +1546,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1551,6 +1561,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } @@ -1602,6 +1613,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase2_config .clone() .expect("phase2 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } else { @@ -1618,6 +1630,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { args.parallel_phase1_config .clone() .expect("phase1 config present"), + collect_current_repo_objects, ) .map_err(|e| e.to_string())? } diff --git a/src/current_repo_index.rs b/src/current_repo_index.rs index 88d1b33..4214845 100644 --- a/src/current_repo_index.rs +++ b/src/current_repo_index.rs @@ -23,7 +23,6 @@ pub struct CurrentRepoObject { #[derive(Default, Debug)] pub struct CurrentRepoIndex { by_uri: HashMap, - by_scope: HashMap>, } pub type CurrentRepoIndexHandle = Arc>; @@ -43,10 +42,12 @@ impl CurrentRepoIndex { pub fn list_scope_uris(&self, repository_source: &str) -> Vec { let mut out = self - .by_scope - .get(repository_source) - .map(|set| set.iter().cloned().collect::>()) - .unwrap_or_default(); + .by_uri + .iter() + .filter_map(|(rsync_uri, entry)| { + (entry.repository_source == repository_source).then(|| rsync_uri.clone()) + }) + .collect::>(); out.sort(); out } @@ -56,7 +57,11 @@ impl CurrentRepoIndex { } pub fn scope_count(&self) -> usize { - self.by_scope.len() + self.by_uri + .values() + .map(|entry| entry.repository_source.as_str()) + .collect::>() + .len() } pub fn snapshot_objects(&self) -> Vec { @@ -76,7 +81,6 @@ impl CurrentRepoIndex { pub fn clear(&mut self) { self.by_uri.clear(); - self.by_scope.clear(); } pub fn apply_repository_view_entries( @@ -92,11 +96,6 @@ impl CurrentRepoIndex { fn apply_repository_view_entry(&mut self, entry: &RepositoryViewEntry) -> Result<(), String> { entry.validate_internal().map_err(|e| e.to_string())?; - let old_scope = self - .by_uri - .get(&entry.rsync_uri) - .map(|existing| existing.repository_source.clone()); - match entry.state { RepositoryViewState::Present | RepositoryViewState::Replaced => { let repository_source = entry.repository_source.clone().ok_or_else(|| { @@ -112,17 +111,6 @@ impl CurrentRepoIndex { ) })?; let current_hash = decode_sha256_hex_32(¤t_hash_hex)?; - - if let Some(old_scope) = old_scope.as_ref() { - if old_scope != &repository_source { - self.remove_uri_from_scope(old_scope, &entry.rsync_uri); - } - } - - self.by_scope - .entry(repository_source.clone()) - .or_default() - .insert(entry.rsync_uri.clone()); self.by_uri.insert( entry.rsync_uri.clone(), CurrentRepoEntry { @@ -135,27 +123,12 @@ impl CurrentRepoIndex { ); } RepositoryViewState::Withdrawn => { - if let Some(scope) = entry.repository_source.as_ref().or(old_scope.as_ref()) { - self.remove_uri_from_scope(scope, &entry.rsync_uri); - } self.by_uri.remove(&entry.rsync_uri); } } Ok(()) } - - fn remove_uri_from_scope(&mut self, scope: &str, rsync_uri: &str) { - let empty = if let Some(entries) = self.by_scope.get_mut(scope) { - entries.remove(rsync_uri); - entries.is_empty() - } else { - false - }; - if empty { - self.by_scope.remove(scope); - } - } } fn decode_sha256_hex_32(value: &str) -> Result<[u8; 32], String> { @@ -281,7 +254,7 @@ mod tests { .expect_err("invalid hash should fail"); assert!(err.contains("invalid"), "{err}"); - let err = index + index .apply_repository_view_entries(&[RepositoryViewEntry { rsync_uri: "rsync://example.test/repo/b.roa".to_string(), current_hash: Some("22".repeat(32)), diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index 7e0c1e3..d8cc1e4 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -103,7 +103,11 @@ pub struct TalRootDiscovery { fn snapshot_current_repo_objects( current_repo_index: Option<&CurrentRepoIndexHandle>, + collect: bool, ) -> Vec { + if !collect { + return Vec::new(); + } current_repo_index .and_then(|handle| handle.lock().ok().map(|idx| idx.snapshot_objects())) .unwrap_or_default() @@ -426,6 +430,7 @@ fn run_single_root_parallel_audit_inner( config: &TreeRunConfig, parallel_config: ParallelPhase1Config, phase2_config: Option, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -481,7 +486,10 @@ where publication_points, downloads, download_stats, - current_repo_objects: snapshot_current_repo_objects(Some(¤t_repo_index_for_output)), + current_repo_objects: snapshot_current_repo_objects( + Some(¤t_repo_index_for_output), + collect_current_repo_objects, + ), ccr_accumulator: phase2_enabled .then(|| runner.ccr_accumulator_snapshot()) .flatten(), @@ -498,6 +506,7 @@ fn run_multi_root_parallel_audit_inner( config: &TreeRunConfig, parallel_config: ParallelPhase1Config, phase2_config: Option, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -572,7 +581,10 @@ where publication_points, downloads, download_stats, - current_repo_objects: snapshot_current_repo_objects(Some(¤t_repo_index_for_output)), + current_repo_objects: snapshot_current_repo_objects( + Some(¤t_repo_index_for_output), + collect_current_repo_objects, + ), ccr_accumulator: phase2_enabled .then(|| runner.ccr_accumulator_snapshot()) .flatten(), @@ -588,6 +600,7 @@ pub fn run_tree_from_tal_url_parallel_phase1_audit( validation_time: time::OffsetDateTime, config: &TreeRunConfig, parallel_config: ParallelPhase1Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -605,6 +618,7 @@ where config, parallel_config, None, + collect_current_repo_objects, ) } @@ -619,6 +633,7 @@ pub fn run_tree_from_tal_and_ta_der_parallel_phase1_audit( validation_time: time::OffsetDateTime, config: &TreeRunConfig, parallel_config: ParallelPhase1Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -650,6 +665,7 @@ where config, parallel_config, None, + collect_current_repo_objects, ) } @@ -662,6 +678,7 @@ pub fn run_tree_from_multiple_tals_parallel_phase1_audit( validation_time: time::OffsetDateTime, config: &TreeRunConfig, parallel_config: ParallelPhase1Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -677,6 +694,7 @@ where config, parallel_config, None, + collect_current_repo_objects, ) } @@ -690,6 +708,7 @@ pub fn run_tree_from_tal_url_parallel_phase2_audit( config: &TreeRunConfig, parallel_config: ParallelPhase1Config, phase2_config: ParallelPhase2Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -707,6 +726,7 @@ where config, parallel_config, Some(phase2_config), + collect_current_repo_objects, ) } @@ -722,6 +742,7 @@ pub fn run_tree_from_tal_and_ta_der_parallel_phase2_audit( config: &TreeRunConfig, parallel_config: ParallelPhase1Config, phase2_config: ParallelPhase2Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -753,6 +774,7 @@ where config, parallel_config, Some(phase2_config), + collect_current_repo_objects, ) } @@ -766,6 +788,7 @@ pub fn run_tree_from_multiple_tals_parallel_phase2_audit( config: &TreeRunConfig, parallel_config: ParallelPhase1Config, phase2_config: ParallelPhase2Config, + collect_current_repo_objects: bool, ) -> Result where H: Fetcher + Clone + 'static, @@ -781,6 +804,7 @@ where config, parallel_config, Some(phase2_config), + collect_current_repo_objects, ) } @@ -1681,6 +1705,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_delta_replay_step_serial_audit( #[cfg(test)] mod multi_tal_tests { use super::*; + use crate::current_repo_index::CurrentRepoIndex; + use crate::storage::{RepositoryViewEntry, RepositoryViewState}; struct RejectingHttpFetcher; @@ -1707,6 +1733,31 @@ mod multi_tal_tests { } } + #[test] + fn snapshot_current_repo_objects_is_on_demand() { + let handle = CurrentRepoIndex::shared(); + handle + .lock() + .expect("lock index") + .apply_repository_view_entries(&[RepositoryViewEntry { + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + current_hash: Some("11".repeat(32)), + repository_source: Some("rsync://example.test/repo/".to_string()), + object_type: Some("roa".to_string()), + state: RepositoryViewState::Present, + }]) + .expect("apply present entry"); + + assert!( + snapshot_current_repo_objects(Some(&handle), false).is_empty(), + "collection should be skipped when disabled" + ); + + let collected = snapshot_current_repo_objects(Some(&handle), true); + assert_eq!(collected.len(), 1); + assert_eq!(collected[0].rsync_uri, "rsync://example.test/repo/a.roa"); + } + #[test] fn discover_multiple_roots_from_tal_inputs_builds_multiple_root_handles() { let apnic_tal = diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index c9be989..f50532e 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -49,6 +49,21 @@ struct FinishedPublicationPoint { result: Result, } +fn compact_phase2_finished_result( + mut result: PublicationPointRunResult, +) -> PublicationPointRunResult { + // Phase2 only needs warnings, objects, audit, and traversal metadata after finalize. + // Dropping the snapshot here avoids retaining manifest/files/raw-byte caches until run end. + result.snapshot = None; + result +} + +fn compact_phase2_finished_result_result( + result: Result, +) -> Result { + result.map(compact_phase2_finished_result) +} + pub fn run_tree_parallel_phase2_audit_multi_root( roots: Vec, runner: &Rpkiv1PublicationPointRunner<'_>, @@ -214,7 +229,7 @@ fn stage_ready_publication_point( } finished.push(FinishedPublicationPoint { node: ready.node, - result: fallback, + result: compact_phase2_finished_result_result(fallback), }); return; } @@ -367,7 +382,10 @@ fn finalize_ready_objects( repo_outcome.repo_sync_err.as_deref(), ) .map(|out| out.result); - finished.push(FinishedPublicationPoint { node, result }); + finished.push(FinishedPublicationPoint { + node, + result: compact_phase2_finished_result_result(result), + }); } fn flush_pending_roa_dispatch( @@ -461,7 +479,7 @@ fn drain_object_results( ); finished.push(FinishedPublicationPoint { node: state.node, - result, + result: compact_phase2_finished_result_result(result), }); } Err(err) => finished.push(FinishedPublicationPoint { @@ -596,3 +614,68 @@ pub fn run_tree_parallel_phase2_audit( ) -> Result { run_tree_parallel_phase2_audit_multi_root(vec![root], runner, config) } + +#[cfg(test)] +mod tests { + use super::{compact_phase2_finished_result, compact_phase2_finished_result_result}; + use crate::audit::PublicationPointAudit; + use crate::storage::PackTime; + use crate::validation::manifest::PublicationPointSource; + use crate::validation::objects::{ObjectsOutput, ObjectsStats}; + use crate::validation::publication_point::PublicationPointSnapshot; + use crate::validation::tree::PublicationPointRunResult; + + fn sample_snapshot() -> PublicationPointSnapshot { + PublicationPointSnapshot { + format_version: PublicationPointSnapshot::FORMAT_VERSION_V1, + manifest_rsync_uri: "rsync://example.test/repo/example.mft".to_string(), + publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), + manifest_number_be: vec![1], + this_update: PackTime { + rfc3339_utc: "2026-04-21T00:00:00Z".to_string(), + }, + next_update: PackTime { + rfc3339_utc: "2026-04-22T00:00:00Z".to_string(), + }, + verified_at: PackTime { + rfc3339_utc: "2026-04-21T00:00:01Z".to_string(), + }, + manifest_bytes: vec![1, 2, 3], + files: Vec::new(), + } + } + + fn sample_result() -> PublicationPointRunResult { + PublicationPointRunResult { + source: PublicationPointSource::Fresh, + snapshot: Some(sample_snapshot()), + warnings: Vec::new(), + objects: ObjectsOutput { + vrps: Vec::new(), + aspas: Vec::new(), + router_keys: Vec::new(), + local_outputs_cache: Vec::new(), + warnings: Vec::new(), + stats: ObjectsStats::default(), + audit: Vec::new(), + }, + audit: PublicationPointAudit::default(), + discovered_children: Vec::new(), + } + } + + #[test] + fn compact_phase2_finished_result_drops_snapshot() { + let result = compact_phase2_finished_result(sample_result()); + assert!(result.snapshot.is_none()); + assert_eq!(result.source, PublicationPointSource::Fresh); + assert!(result.discovered_children.is_empty()); + } + + #[test] + fn compact_phase2_finished_result_result_preserves_err() { + let err = compact_phase2_finished_result_result(Err("boom".to_string())) + .expect_err("error should be preserved"); + assert_eq!(err, "boom"); + } +} diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 354db13..8735610 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -252,7 +252,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { ca: &CaInstanceHandle, fresh_point: &FreshValidatedPublicationPoint, warnings: Vec, - objects: crate::validation::objects::ObjectsOutput, + mut objects: crate::validation::objects::ObjectsOutput, child_audits: Vec, discovered_children: Vec, repo_sync_source: Option<&str>, @@ -278,6 +278,10 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { .map_err(|e| format!("persist VCIR failed: {e}"))?; let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64; + // local_outputs_cache only exists to build/persist VCIR. Release it before the + // publication point result is retained for the rest of the run. + let _released_local_outputs = std::mem::take(&mut objects.local_outputs_cache); + if self.ccr_accumulator.is_some() { let child_entries = build_vcir_child_entries(&discovered_children, self.validation_time)?; @@ -4101,6 +4105,115 @@ authorityKeyIdentifier = keyid:always .all(|output| output.output_type == VcirOutputType::Vrp)); } + #[test] + fn finalize_fresh_publication_point_releases_local_outputs_cache_after_persist() { + let (pack, issuer_ca_der, validation_time) = + cernet_publication_point_snapshot_for_vcir_tests(); + let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca"); + let mut objects = crate::validation::objects::process_publication_point_snapshot_for_issuer( + &pack, + &Policy::default(), + issuer_ca_der.as_slice(), + Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer", + ), + issuer_ca.tbs.extensions.ip_resources.as_ref(), + issuer_ca.tbs.extensions.as_resources.as_ref(), + validation_time, + None, + ); + assert!( + !objects.local_outputs_cache.is_empty(), + "expected local outputs from signed objects" + ); + + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let policy = Policy::default(); + let runner = Rpkiv1PublicationPointRunner { + store: &store, + policy: &policy, + http_fetcher: &NeverHttpFetcher, + rsync_fetcher: &FailingRsyncFetcher, + validation_time, + timing: None, + download_log: None, + replay_archive_index: None, + replay_delta_index: None, + rrdp_dedup: false, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: false, + rsync_repo_cache: Mutex::new(HashMap::new()), + current_repo_index: None, + repo_sync_runtime: None, + parallel_phase2_config: None, + parallel_roa_worker_pool: None, + ccr_accumulator: None, + }; + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: issuer_ca_der.clone(), + ca_certificate_rsync_uri: Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string(), + ), + effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(), + effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(), + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + let fresh_point = FreshValidatedPublicationPoint { + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + manifest_number_be: pack.manifest_number_be.clone(), + this_update: pack.this_update.clone(), + next_update: pack.next_update.clone(), + verified_at: pack.verified_at.clone(), + manifest_bytes: pack.manifest_bytes.clone(), + files: pack.files.clone(), + }; + + objects.local_outputs_cache.shrink_to_fit(); + let original_cache_capacity = objects.local_outputs_cache.capacity(); + let finalized = runner + .finalize_fresh_publication_point_from_reducer( + &ca, + &fresh_point, + Vec::new(), + objects, + Vec::new(), + Vec::new(), + None, + None, + 0, + None, + ) + .expect("finalize fresh publication point"); + + assert!( + finalized.result.objects.local_outputs_cache.is_empty(), + "local outputs cache should be released after VCIR persistence" + ); + assert_eq!( + finalized.result.objects.local_outputs_cache.capacity(), + 0, + "released cache should not keep its backing allocation" + ); + assert!(original_cache_capacity > 0); + + let persisted = store + .get_vcir(&pack.manifest_rsync_uri) + .expect("load persisted vcir") + .expect("persisted vcir"); + assert!( + !persisted.local_outputs.is_empty(), + "VCIR should still persist local outputs before cache release" + ); + } + #[test] fn persist_vcir_for_fresh_result_stores_vcir_and_audit_indexes_for_real_snapshot() { let (pack, issuer_ca_der, validation_time) = diff --git a/tests/fixtures/ta/ripe-ncc-ta.cer b/tests/fixtures/ta/ripe-ncc-ta.cer index ad42dfd..1364de7 100644 Binary files a/tests/fixtures/ta/ripe-ncc-ta.cer and b/tests/fixtures/ta/ripe-ncc-ta.cer differ