diff --git a/scripts/payload_replay/README.md b/scripts/payload_replay/README.md index 5c71c78..2116172 100644 --- a/scripts/payload_replay/README.md +++ b/scripts/payload_replay/README.md @@ -53,7 +53,7 @@ python3 scripts/payload_replay/multi_rir_case_info.py \ - 从 multi-RIR bundle 中选择指定 RIR 的 snapshot/base 与 delta 输入 - 读取该 RIR 的 Routinator `base-replay` / `delta-replay` timing 基线 -- 使用该 RIR `timings/base-replay.json` 与 `timings/delta-replay.json` 的 `startedAt` 作为 replay `--validation-time` +- 优先使用 `base-locks.json.validationTime` 与 `locks-delta.json.validationTime` 作为 replay `--validation-time`;若缺失才回退到 `timings/base-replay.json` 与 `timings/delta-replay.json` 的 `startedAt` - 在 `target/live/multi_rir_replay_runs//` 下生成: - snapshot replay 产物 - delta replay 产物 @@ -65,6 +65,40 @@ python3 scripts/payload_replay/multi_rir_case_info.py \ 也可以通过 `BUNDLE_ROOT` 覆盖。 +## `run_apnic_snapshot_replay_profile.sh` + +基于 multi-RIR bundle 中的 APNIC snapshot 输入,使用当前 replay 主流程执行一次带 `--analyze` 和 `--profile-cpu` 的离线 profile。 + +```bash +./scripts/payload_replay/run_apnic_snapshot_replay_profile.sh +``` + +作用: + +- 使用 `APNIC` 的 snapshot/base replay 输入 +- 自动开启: + - `--analyze` + - `--profile-cpu` +- 自动记录: + - replay wall-clock 时长 + - Routinator baseline (`base-replay`) + - analyze 目录路径 +- 生成: + - `report.json` + - `run.log` + - `meta.json` + - `summary.md` + - 以及 `target/live/analyze//` 下的: + - `timing.json` + - `flamegraph.svg` + - `pprof.pb.gz` + +支持: + +- `DRY_RUN=1`:只打印命令,不真正执行 +- `MAX_DEPTH` / `MAX_INSTANCES`:用于限定 replay 范围 +- `PROFILE_RUN_ROOT`:覆盖 wrapper 产物输出目录 + ## `run_apnic_replay.sh` 默认使用: diff --git a/scripts/payload_replay/multi_rir_case_info.py b/scripts/payload_replay/multi_rir_case_info.py index 2cd57a6..31801ab 100755 --- a/scripts/payload_replay/multi_rir_case_info.py +++ b/scripts/payload_replay/multi_rir_case_info.py @@ -58,6 +58,14 @@ def load_timing_summary(bundle_root: Path) -> dict: return json.loads(timing_path.read_text(encoding="utf-8")) +def load_json(path: Path) -> dict: + return json.loads(require_path(path, "file").read_text(encoding="utf-8")) + + +def lock_validation_time(lock_obj: dict, fallback_started_at: str) -> str: + return lock_obj.get("validationTime") or lock_obj.get("validation_time") or fallback_started_at + + def build_case(bundle_root: Path, repo_root: Path, rir: str) -> dict: if rir not in RIR_CONFIG: raise SystemExit( @@ -76,6 +84,8 @@ def build_case(bundle_root: Path, repo_root: Path, rir: str) -> dict: delta_timing = require_path(rir_root / "timings" / "delta-replay.json", "file") base_timing_obj = json.loads(base_timing.read_text(encoding="utf-8")) delta_timing_obj = json.loads(delta_timing.read_text(encoding="utf-8")) + base_locks_obj = load_json(rir_root / "base-locks.json") + delta_locks_obj = load_json(rir_root / "locks-delta.json") case = { "bundle_root": str(bundle_root), @@ -98,8 +108,12 @@ def build_case(bundle_root: Path, repo_root: Path, rir: str) -> dict: "tal_path": str(require_path(repo_root / cfg["tal"], "file")), "ta_path": str(require_path(repo_root / cfg["ta"], "file")), "validation_times": { - "snapshot": base_timing_obj["startedAt"], - "delta": delta_timing_obj["startedAt"], + "snapshot": lock_validation_time(base_locks_obj, base_timing_obj["startedAt"]), + "delta": lock_validation_time(delta_locks_obj, delta_timing_obj["startedAt"]), + }, + "timing_started_at": { + "snapshot_replay": base_timing_obj["startedAt"], + "delta_replay": delta_timing_obj["startedAt"], }, "routinator_timings": { "base_replay_seconds": float(durations["base-replay"]), diff --git a/scripts/payload_replay/run_apnic_delta_replay.sh b/scripts/payload_replay/run_apnic_delta_replay.sh index 3671c63..5dcf138 100755 --- a/scripts/payload_replay/run_apnic_delta_replay.sh +++ b/scripts/payload_replay/run_apnic_delta_replay.sh @@ -11,7 +11,7 @@ PAYLOAD_BASE_ARCHIVE="${PAYLOAD_BASE_ARCHIVE:-$DELTA_ROOT/base-payload-archive}" PAYLOAD_BASE_LOCKS="${PAYLOAD_BASE_LOCKS:-$DELTA_ROOT/base-locks.json}" PAYLOAD_DELTA_ARCHIVE="${PAYLOAD_DELTA_ARCHIVE:-$DELTA_ROOT/payload-delta-archive}" PAYLOAD_DELTA_LOCKS="${PAYLOAD_DELTA_LOCKS:-$DELTA_ROOT/locks-delta.json}" -VALIDATION_TIME="${VALIDATION_TIME:-2026-03-15T10:00:00Z}" +VALIDATION_TIME="${VALIDATION_TIME:-}" PAYLOAD_BASE_VALIDATION_TIME="${PAYLOAD_BASE_VALIDATION_TIME:-}" TRUST_ANCHOR="${TRUST_ANCHOR:-apnic}" ROUTINATOR_RECORD_CSV="${ROUTINATOR_RECORD_CSV:-$DELTA_ROOT/record-delta.csv}" @@ -20,6 +20,28 @@ MAX_INSTANCES="${MAX_INSTANCES:-}" OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/payload_delta_replay_runs}" mkdir -p "$OUT_DIR" +if [[ -z "$PAYLOAD_BASE_VALIDATION_TIME" ]]; then + PAYLOAD_BASE_VALIDATION_TIME="$(python3 - "$PAYLOAD_BASE_LOCKS" <<'LOCKPY' +import json, sys +from pathlib import Path +path = Path(sys.argv[1]) +data = json.loads(path.read_text(encoding='utf-8')) +print(data.get('validationTime') or data.get('validation_time') or '') +LOCKPY +)" +fi + +if [[ -z "$VALIDATION_TIME" ]]; then + VALIDATION_TIME="$(python3 - "$PAYLOAD_DELTA_LOCKS" <<'LOCKPY' +import json, sys +from pathlib import Path +path = Path(sys.argv[1]) +data = json.loads(path.read_text(encoding='utf-8')) +print(data.get('validationTime') or data.get('validation_time') or '2026-03-15T10:00:00Z') +LOCKPY +)" +fi + TS="$(date -u +%Y%m%dT%H%M%SZ)" RUN_NAME="${RUN_NAME:-apnic_delta_replay_${TS}}" DB_DIR="${DB_DIR:-$OUT_DIR/${RUN_NAME}_db}" diff --git a/scripts/payload_replay/run_apnic_replay.sh b/scripts/payload_replay/run_apnic_replay.sh index fa618df..a7f883f 100755 --- a/scripts/payload_replay/run_apnic_replay.sh +++ b/scripts/payload_replay/run_apnic_replay.sh @@ -8,7 +8,7 @@ TAL_PATH="${TAL_PATH:-$ROOT_DIR/tests/fixtures/tal/apnic-rfc7730-https.tal}" TA_PATH="${TA_PATH:-$ROOT_DIR/tests/fixtures/ta/apnic-ta.cer}" PAYLOAD_REPLAY_ARCHIVE="${PAYLOAD_REPLAY_ARCHIVE:-$ROOT_DIR/target/live/payload_replay/payload-archive}" PAYLOAD_REPLAY_LOCKS="${PAYLOAD_REPLAY_LOCKS:-$ROOT_DIR/target/live/payload_replay/locks.json}" -VALIDATION_TIME="${VALIDATION_TIME:-2026-03-13T02:30:00Z}" +VALIDATION_TIME="${VALIDATION_TIME:-}" TRUST_ANCHOR="${TRUST_ANCHOR:-apnic}" ROUTINATOR_RECORD_CSV="${ROUTINATOR_RECORD_CSV:-$ROOT_DIR/target/live/payload_replay/record.csv}" MAX_DEPTH="${MAX_DEPTH:-}" @@ -16,6 +16,17 @@ MAX_INSTANCES="${MAX_INSTANCES:-}" OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/payload_replay_runs}" mkdir -p "$OUT_DIR" +if [[ -z "$VALIDATION_TIME" ]]; then + VALIDATION_TIME="$(python3 - "$PAYLOAD_REPLAY_LOCKS" <<'LOCKPY' +import json, sys +from pathlib import Path +path = Path(sys.argv[1]) +data = json.loads(path.read_text(encoding='utf-8')) +print(data.get('validationTime') or data.get('validation_time') or '2026-03-13T02:30:00Z') +LOCKPY +)" +fi + TS="$(date -u +%Y%m%dT%H%M%SZ)" RUN_NAME="${RUN_NAME:-apnic_replay_${TS}}" DB_DIR="${DB_DIR:-$OUT_DIR/${RUN_NAME}_db}" diff --git a/scripts/payload_replay/run_apnic_snapshot_replay_profile.sh b/scripts/payload_replay/run_apnic_snapshot_replay_profile.sh new file mode 100755 index 0000000..77ff820 --- /dev/null +++ b/scripts/payload_replay/run_apnic_snapshot_replay_profile.sh @@ -0,0 +1,168 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$ROOT_DIR" + +BUNDLE_ROOT="${BUNDLE_ROOT:-$ROOT_DIR/../../rpki/target/live/20260316-112341-multi-final3}" +CASE_INFO_SCRIPT="$ROOT_DIR/scripts/payload_replay/multi_rir_case_info.py" +PROFILE_RUN_ROOT="${PROFILE_RUN_ROOT:-$ROOT_DIR/target/live/analyze_runs}" +mkdir -p "$PROFILE_RUN_ROOT" + +TS="$(date -u +%Y%m%dT%H%M%SZ)" +RUN_NAME="${RUN_NAME:-apnic_snapshot_profile_${TS}}" +RUN_DIR="$PROFILE_RUN_ROOT/$RUN_NAME" +mkdir -p "$RUN_DIR" + +ANALYZE_ROOT="$ROOT_DIR/target/live/analyze" +mkdir -p "$ANALYZE_ROOT" +mapfile -t ANALYZE_BEFORE < <(find "$ANALYZE_ROOT" -mindepth 1 -maxdepth 1 -type d 2>/dev/null | sort) + +eval "$(python3 "$CASE_INFO_SCRIPT" --bundle-root "$BUNDLE_ROOT" --rir apnic --format env)" + +DB_DIR="${DB_DIR:-$RUN_DIR/db}" +REPORT_JSON="${REPORT_JSON:-$RUN_DIR/report.json}" +RUN_LOG="${RUN_LOG:-$RUN_DIR/run.log}" +META_JSON="${META_JSON:-$RUN_DIR/meta.json}" +SUMMARY_MD="${SUMMARY_MD:-$RUN_DIR/summary.md}" + +rm -rf "$DB_DIR" + +cmd=(cargo run --release --features profile --bin rpki -- + --db "$DB_DIR" + --tal-path "$TAL_PATH" + --ta-path "$TA_PATH" + --payload-replay-archive "$PAYLOAD_REPLAY_ARCHIVE" + --payload-replay-locks "$PAYLOAD_REPLAY_LOCKS" + --validation-time "$SNAPSHOT_VALIDATION_TIME" + --analyze + --profile-cpu + --report-json "$REPORT_JSON") + +if [[ -n "${MAX_DEPTH:-}" ]]; then + cmd+=(--max-depth "$MAX_DEPTH") +fi +if [[ -n "${MAX_INSTANCES:-}" ]]; then + cmd+=(--max-instances "$MAX_INSTANCES") +fi + +if [[ "${DRY_RUN:-0}" == "1" ]]; then + printf '%q ' "${cmd[@]}" + echo + exit 0 +fi + +run_start_s="$(date +%s)" +( + echo '# command:' + printf '%q ' "${cmd[@]}" + echo + echo + "${cmd[@]}" +) 2>&1 | tee "$RUN_LOG" >/dev/null +run_end_s="$(date +%s)" +run_duration_s="$((run_end_s - run_start_s))" + +mapfile -t ANALYZE_AFTER < <(find "$ANALYZE_ROOT" -mindepth 1 -maxdepth 1 -type d 2>/dev/null | sort) +ANALYZE_DIR="" +for candidate in "${ANALYZE_AFTER[@]}"; do + seen=0 + for old in "${ANALYZE_BEFORE[@]}"; do + if [[ "$candidate" == "$old" ]]; then + seen=1 + break + fi + done + if [[ "$seen" == "0" ]]; then + ANALYZE_DIR="$candidate" + fi +done +if [[ -z "$ANALYZE_DIR" ]]; then + ANALYZE_DIR="$(find "$ANALYZE_ROOT" -mindepth 1 -maxdepth 1 -type d 2>/dev/null | sort | tail -n 1)" +fi + +BUNDLE_ROOT="$BUNDLE_ROOT" \ +TRUST_ANCHOR="$TRUST_ANCHOR" \ +TAL_PATH="$TAL_PATH" \ +TA_PATH="$TA_PATH" \ +PAYLOAD_REPLAY_ARCHIVE="$PAYLOAD_REPLAY_ARCHIVE" \ +PAYLOAD_REPLAY_LOCKS="$PAYLOAD_REPLAY_LOCKS" \ +SNAPSHOT_VALIDATION_TIME="$SNAPSHOT_VALIDATION_TIME" \ +ROUTINATOR_BASE_REPLAY_SECONDS="$ROUTINATOR_BASE_REPLAY_SECONDS" \ +DB_DIR="$DB_DIR" \ +REPORT_JSON="$REPORT_JSON" \ +RUN_LOG="$RUN_LOG" \ +ANALYZE_DIR="$ANALYZE_DIR" \ +RUN_DURATION_S="$run_duration_s" \ +python3 - "$META_JSON" "$SUMMARY_MD" <<'PY' +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +meta_path = Path(sys.argv[1]) +summary_path = Path(sys.argv[2]) +report_path = Path(os.environ['REPORT_JSON']) +report = json.loads(report_path.read_text(encoding='utf-8')) +recorded = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') +meta = { + 'recorded_at_utc': recorded, + 'bundle_root': os.environ['BUNDLE_ROOT'], + 'trust_anchor': os.environ['TRUST_ANCHOR'], + 'tal_path': os.environ['TAL_PATH'], + 'ta_path': os.environ['TA_PATH'], + 'payload_replay_archive': os.environ['PAYLOAD_REPLAY_ARCHIVE'], + 'payload_replay_locks': os.environ['PAYLOAD_REPLAY_LOCKS'], + 'validation_time_arg': os.environ['SNAPSHOT_VALIDATION_TIME'], + 'routinator_base_replay_seconds': float(os.environ['ROUTINATOR_BASE_REPLAY_SECONDS']), + 'db_dir': os.environ['DB_DIR'], + 'report_json': os.environ['REPORT_JSON'], + 'run_log': os.environ['RUN_LOG'], + 'analyze_dir': os.environ.get('ANALYZE_DIR') or '', + 'durations_secs': { + 'rpki_run_wall': int(os.environ['RUN_DURATION_S']), + }, + 'counts': { + 'publication_points_processed': report['tree']['instances_processed'], + 'publication_points_failed': report['tree']['instances_failed'], + 'vrps': len(report['vrps']), + 'aspas': len(report['aspas']), + 'audit_publication_points': len(report['publication_points']), + }, +} +meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + '\n', encoding='utf-8') +ratio = meta['durations_secs']['rpki_run_wall'] / meta['routinator_base_replay_seconds'] if meta['routinator_base_replay_seconds'] else None +lines = [] +lines.append('# APNIC Snapshot Replay Profile Summary\n\n') +lines.append(f"- recorded_at_utc: `{recorded}`\n") +lines.append(f"- bundle_root: `{meta['bundle_root']}`\n") +lines.append(f"- tal_path: `{meta['tal_path']}`\n") +lines.append(f"- ta_path: `{meta['ta_path']}`\n") +lines.append(f"- payload_replay_archive: `{meta['payload_replay_archive']}`\n") +lines.append(f"- payload_replay_locks: `{meta['payload_replay_locks']}`\n") +lines.append(f"- validation_time_arg: `{meta['validation_time_arg']}`\n") +lines.append(f"- db_dir: `{meta['db_dir']}`\n") +lines.append(f"- report_json: `{meta['report_json']}`\n") +lines.append(f"- run_log: `{meta['run_log']}`\n") +lines.append(f"- analyze_dir: `{meta['analyze_dir']}`\n\n") +lines.append('## Timing\n\n') +lines.append('| metric | value |\n') +lines.append('|---|---:|\n') +lines.append(f"| ours_snapshot_replay_wall_s | {meta['durations_secs']['rpki_run_wall']} |\n") +lines.append(f"| routinator_base_replay_s | {meta['routinator_base_replay_seconds']:.3f} |\n") +if ratio is not None: + lines.append(f"| ratio_ours_over_routinator | {ratio:.3f} |\n") +lines.append('\n## Counts\n\n') +for key, value in meta['counts'].items(): + lines.append(f"- {key}: `{value}`\n") +summary_path.write_text(''.join(lines), encoding='utf-8') +PY + +echo "== APNIC snapshot replay profiling complete ==" >&2 +echo "- run_dir: $RUN_DIR" >&2 +echo "- analyze_dir: $ANALYZE_DIR" >&2 +echo "- report_json: $REPORT_JSON" >&2 +echo "- run_log: $RUN_LOG" >&2 +echo "- meta_json: $META_JSON" >&2 +echo "- summary_md: $SUMMARY_MD" >&2 diff --git a/src/cli.rs b/src/cli.rs index 479e177..dcc1fb2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -844,6 +844,18 @@ pub fn run(argv: &[String]) -> Result<(), String> { t.record_count("instances_failed", out.tree.instances_failed as u64); } + #[cfg(feature = "profile")] + let profiler_report = if let Some(guard) = profiler_guard.take() { + Some( + guard + .report() + .build() + .map_err(|e| format!("pprof report build failed: {e}"))?, + ) + } else { + None + }; + let report = build_report(&policy, validation_time, out); if let Some(p) = args.report_json_path.as_deref() { @@ -863,12 +875,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { } #[cfg(feature = "profile")] - if let (Some((out_dir, _)), Some(guard)) = (timing.as_ref(), profiler_guard.take()) { - let report = guard - .report() - .build() - .map_err(|e| format!("pprof report build failed: {e}"))?; - + if let (Some((out_dir, _)), Some(report)) = (timing.as_ref(), profiler_report) { let svg_path = out_dir.join("flamegraph.svg"); let svg_file = std::fs::File::create(&svg_path) .map_err(|e| format!("create flamegraph failed: {}: {e}", svg_path.display()))?; @@ -877,7 +884,6 @@ pub fn run(argv: &[String]) -> Result<(), String> { .map_err(|e| format!("write flamegraph failed: {e}"))?; eprintln!("analysis: wrote {}", svg_path.display()); - // Best-effort: write pprof protobuf as gzipped bytes. let pb_path = out_dir.join("pprof.pb.gz"); let pprof_profile = report .pprof() diff --git a/src/data_model/signed_object.rs b/src/data_model/signed_object.rs index 4ebfeb3..4261db3 100644 --- a/src/data_model/signed_object.rs +++ b/src/data_model/signed_object.rs @@ -20,6 +20,8 @@ pub struct ResourceEeCertificate { pub raw_der: Vec, pub subject_key_identifier: Vec, pub spki_der: Vec, + pub rsa_public_modulus: Vec, + pub rsa_public_exponent: Vec, pub sia_signed_object_uris: Vec, pub resource_cert: ResourceCertificate, } @@ -324,7 +326,10 @@ impl RpkiSignedObject { /// Verify the CMS signature using the embedded EE certificate public key. pub fn verify_signature(&self) -> Result<(), SignedObjectVerifyError> { let ee = &self.signed_data.certificates[0]; - self.verify_signature_with_ee_spki_der(&ee.spki_der) + self.verify_signature_with_rsa_components( + &ee.rsa_public_modulus, + &ee.rsa_public_exponent, + ) } /// Verify the CMS signature using a DER-encoded SubjectPublicKeyInfo. @@ -360,10 +365,21 @@ impl RpkiSignedObject { _ => return Err(SignedObjectVerifyError::UnsupportedEePublicKeyAlgorithm), }; + self.verify_signature_with_rsa_components(n.as_slice(), e.as_slice()) + } + + fn verify_signature_with_rsa_components( + &self, + modulus: &[u8], + exponent: &[u8], + ) -> Result<(), SignedObjectVerifyError> { let signer = &self.signed_data.signer_infos[0]; let msg = &signer.signed_attrs_der_for_signature; - let pk = ring::signature::RsaPublicKeyComponents { n, e }; + let pk = ring::signature::RsaPublicKeyComponents { + n: modulus, + e: exponent, + }; pk.verify( &ring::signature::RSA_PKCS1_2048_8192_SHA256, msg, @@ -736,6 +752,35 @@ fn validate_ee_certificate(der: &[u8]) -> Result { + let modulus = strip_leading_zeros(rsa.modulus).to_vec(); + let exponent = strip_leading_zeros(rsa.exponent).to_vec(); + let _ = rsa + .try_exponent() + .map_err(|_e| SignedObjectValidateError::EeCertificateParse( + "invalid EE RSA exponent".to_string(), + ))?; + (modulus, exponent) + } + _ => { + return Err(SignedObjectValidateError::EeCertificateParse( + "unsupported EE public key algorithm".to_string(), + )); + } + }; let sia = rc .tbs @@ -758,6 +803,8 @@ fn validate_ee_certificate(der: &[u8]) -> Result StorageResult<()> { + if repository_view_entries.is_empty() && member_records.is_empty() && owner_records.is_empty() { + return Ok(()); + } + + let repo_cf = self.cf(CF_REPOSITORY_VIEW)?; + let member_cf = self.cf(CF_RRDP_SOURCE_MEMBER)?; + let owner_cf = self.cf(CF_RRDP_URI_OWNER)?; + let mut batch = WriteBatch::default(); + + for entry in repository_view_entries { + entry.validate_internal()?; + let key = repository_view_key(&entry.rsync_uri); + let value = encode_cbor(entry, "repository_view")?; + batch.put_cf(repo_cf, key.as_bytes(), value); + } + for record in member_records { + record.validate_internal()?; + let key = rrdp_source_member_key(&record.notify_uri, &record.rsync_uri); + let value = encode_cbor(record, "rrdp_source_member")?; + batch.put_cf(member_cf, key.as_bytes(), value); + } + for record in owner_records { + record.validate_internal()?; + let key = rrdp_uri_owner_key(&record.rsync_uri); + let value = encode_cbor(record, "rrdp_uri_owner")?; + batch.put_cf(owner_cf, key.as_bytes(), value); + } + + self.write_batch(batch) + } + pub fn list_repository_view_entries_with_prefix( &self, rsync_uri_prefix: &str, @@ -1058,6 +1095,40 @@ impl RocksStore { Ok(()) } + pub fn put_raw_by_hash_entries_batch(&self, entries: &[RawByHashEntry]) -> StorageResult<()> { + if entries.is_empty() { + return Ok(()); + } + + let cf = self.cf(CF_RAW_BY_HASH)?; + let mut batch = WriteBatch::default(); + for entry in entries { + entry.validate_internal()?; + let key = raw_by_hash_key(&entry.sha256_hex); + let value = encode_cbor(entry, "raw_by_hash")?; + batch.put_cf(cf, key.as_bytes(), value); + } + self.write_batch(batch) + } + + pub fn put_raw_by_hash_entries_batch_unchecked( + &self, + entries: &[RawByHashEntry], + ) -> StorageResult<()> { + if entries.is_empty() { + return Ok(()); + } + + let cf = self.cf(CF_RAW_BY_HASH)?; + let mut batch = WriteBatch::default(); + for entry in entries { + let key = raw_by_hash_key(&entry.sha256_hex); + let value = encode_cbor(entry, "raw_by_hash")?; + batch.put_cf(cf, key.as_bytes(), value); + } + self.write_batch(batch) + } + pub fn get_raw_by_hash_entry(&self, sha256_hex: &str) -> StorageResult> { let cf = self.cf(CF_RAW_BY_HASH)?; let key = raw_by_hash_key(sha256_hex); @@ -1073,6 +1144,33 @@ impl RocksStore { Ok(Some(entry)) } + pub fn get_raw_by_hash_entries_batch( + &self, + sha256_hexes: &[String], + ) -> StorageResult>> { + if sha256_hexes.is_empty() { + return Ok(Vec::new()); + } + + let cf = self.cf(CF_RAW_BY_HASH)?; + let keys: Vec = sha256_hexes.iter().map(|hash| raw_by_hash_key(hash)).collect(); + self.db + .multi_get_cf(keys.iter().map(|key| (cf, key.as_bytes()))) + .into_iter() + .map(|res| { + let maybe = res.map_err(|e| StorageError::RocksDb(e.to_string()))?; + match maybe { + Some(bytes) => { + let entry = decode_cbor::(&bytes, "raw_by_hash")?; + entry.validate_internal()?; + Ok(Some(entry)) + } + None => Ok(None), + } + }) + .collect() + } + pub fn put_vcir(&self, vcir: &ValidatedCaInstanceResult) -> StorageResult<()> { vcir.validate_internal()?; let cf = self.cf(CF_VCIR)?; @@ -1084,6 +1182,52 @@ impl RocksStore { Ok(()) } + pub fn replace_vcir_and_audit_rule_indexes( + &self, + previous: Option<&ValidatedCaInstanceResult>, + vcir: &ValidatedCaInstanceResult, + ) -> StorageResult<()> { + vcir.validate_internal()?; + let vcir_cf = self.cf(CF_VCIR)?; + let audit_cf = self.cf(CF_AUDIT_RULE_INDEX)?; + let mut batch = WriteBatch::default(); + + let vcir_key = vcir_key(&vcir.manifest_rsync_uri); + let vcir_value = encode_cbor(vcir, "vcir")?; + batch.put_cf(vcir_cf, vcir_key.as_bytes(), vcir_value); + + if let Some(previous) = previous { + for output in &previous.local_outputs { + let Some(kind) = audit_rule_kind_for_output_type(output.output_type) else { + continue; + }; + let key = audit_rule_key(kind, &output.rule_hash); + batch.delete_cf(audit_cf, key.as_bytes()); + } + } + + for output in &vcir.local_outputs { + let Some(kind) = audit_rule_kind_for_output_type(output.output_type) else { + continue; + }; + let entry = AuditRuleIndexEntry { + kind, + rule_hash: output.rule_hash.clone(), + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + source_object_uri: output.source_object_uri.clone(), + source_object_hash: output.source_object_hash.clone(), + output_id: output.output_id.clone(), + item_effective_until: output.item_effective_until.clone(), + }; + entry.validate_internal()?; + let key = audit_rule_key(kind, &entry.rule_hash); + let value = encode_cbor(&entry, "audit_rule_index")?; + batch.put_cf(audit_cf, key.as_bytes(), value); + } + + self.write_batch(batch) + } + pub fn get_vcir( &self, manifest_rsync_uri: &str, @@ -1328,6 +1472,13 @@ fn vcir_key(manifest_rsync_uri: &str) -> String { format!("{VCIR_KEY_PREFIX}{manifest_rsync_uri}") } +fn audit_rule_kind_for_output_type(output_type: VcirOutputType) -> Option { + match output_type { + VcirOutputType::Vrp => Some(AuditRuleKind::Roa), + VcirOutputType::Aspa => Some(AuditRuleKind::Aspa), + } +} + fn audit_rule_key(kind: AuditRuleKind, rule_hash: &str) -> String { format!("{}{rule_hash}", kind.key_prefix()) } @@ -1952,6 +2103,68 @@ mod tests { assert!(err.to_string().contains("64-character")); } + #[test] + fn replace_vcir_and_audit_rule_indexes_replaces_previous_entries_in_one_step() { + let td = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(td.path()).expect("open rocksdb"); + + let mut previous = sample_vcir("rsync://example.test/repo/current.mft"); + previous.local_outputs = vec![VcirLocalOutput { + output_id: "old-output".to_string(), + output_type: VcirOutputType::Vrp, + item_effective_until: pack_time(10), + source_object_uri: "rsync://example.test/repo/old.roa".to_string(), + source_object_type: "roa".to_string(), + source_object_hash: sha256_hex(b"old-roa"), + source_ee_cert_hash: sha256_hex(b"old-ee"), + payload_json: "{}".to_string(), + rule_hash: sha256_hex(b"old-rule"), + validation_path_hint: vec![previous.manifest_rsync_uri.clone()], + }]; + previous.summary.local_vrp_count = 1; + previous.summary.local_aspa_count = 0; + store + .replace_vcir_and_audit_rule_indexes(None, &previous) + .expect("store previous vcir"); + assert!(store + .get_audit_rule_index_entry(AuditRuleKind::Roa, &previous.local_outputs[0].rule_hash) + .expect("get old audit entry") + .is_some()); + + let mut current = sample_vcir("rsync://example.test/repo/current.mft"); + current.local_outputs = vec![VcirLocalOutput { + output_id: "new-output".to_string(), + output_type: VcirOutputType::Aspa, + item_effective_until: pack_time(11), + source_object_uri: "rsync://example.test/repo/new.asa".to_string(), + source_object_type: "aspa".to_string(), + source_object_hash: sha256_hex(b"new-aspa"), + source_ee_cert_hash: sha256_hex(b"new-ee"), + payload_json: "{}".to_string(), + rule_hash: sha256_hex(b"new-rule"), + validation_path_hint: vec![current.manifest_rsync_uri.clone()], + }]; + current.summary.local_vrp_count = 0; + current.summary.local_aspa_count = 1; + store + .replace_vcir_and_audit_rule_indexes(Some(&previous), ¤t) + .expect("replace vcir and audit indexes"); + + let got = store + .get_vcir(¤t.manifest_rsync_uri) + .expect("get replaced vcir") + .expect("vcir exists"); + assert_eq!(got, current); + assert!(store + .get_audit_rule_index_entry(AuditRuleKind::Roa, &previous.local_outputs[0].rule_hash) + .expect("get deleted old audit entry") + .is_none()); + assert!(store + .get_audit_rule_index_entry(AuditRuleKind::Aspa, ¤t.local_outputs[0].rule_hash) + .expect("get new audit entry") + .is_some()); + } + #[test] fn storage_helpers_cover_optional_validation_paths() { let td = tempfile::tempdir().expect("tempdir"); @@ -2162,4 +2375,63 @@ mod tests { .expect_err("present member without hash must fail"); assert!(err.to_string().contains("current_hash is required")); } + #[test] + fn projection_batch_roundtrip_writes_repository_view_member_and_owner_records() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(dir.path()).expect("open store"); + + let view = RepositoryViewEntry { + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + current_hash: Some(hex::encode([1u8; 32])), + repository_source: Some("https://example.test/notify.xml".to_string()), + object_type: Some("roa".to_string()), + state: RepositoryViewState::Present, + }; + let member = RrdpSourceMemberRecord { + notify_uri: "https://example.test/notify.xml".to_string(), + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + current_hash: Some(hex::encode([1u8; 32])), + object_type: Some("roa".to_string()), + present: true, + last_confirmed_session_id: "session-1".to_string(), + last_confirmed_serial: 7, + last_changed_at: pack_time(1), + }; + let owner = RrdpUriOwnerRecord { + rsync_uri: "rsync://example.test/repo/a.roa".to_string(), + notify_uri: "https://example.test/notify.xml".to_string(), + current_hash: Some(hex::encode([1u8; 32])), + last_confirmed_session_id: "session-1".to_string(), + last_confirmed_serial: 7, + last_changed_at: pack_time(1), + owner_state: RrdpUriOwnerState::Active, + }; + + store + .put_projection_batch(&[view.clone()], &[member.clone()], &[owner.clone()]) + .expect("write projection batch"); + + assert_eq!( + store + .get_repository_view_entry(&view.rsync_uri) + .expect("get view") + .expect("present view"), + view + ); + assert_eq!( + store + .get_rrdp_source_member_record(&member.notify_uri, &member.rsync_uri) + .expect("get member") + .expect("present member"), + member + ); + assert_eq!( + store + .get_rrdp_uri_owner_record(&owner.rsync_uri) + .expect("get owner") + .expect("present owner"), + owner + ); + } + } diff --git a/src/sync/repo.rs b/src/sync/repo.rs index 9e8b056..bd9de51 100644 --- a/src/sync/repo.rs +++ b/src/sync/repo.rs @@ -10,7 +10,9 @@ use crate::storage::RocksStore; use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log; use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpState, RrdpSyncError}; use crate::sync::store_projection::{ - put_repository_view_present, put_repository_view_withdrawn, upsert_raw_by_hash_evidence, + build_repository_view_present_entry, + build_repository_view_withdrawn_entry, + prepare_raw_by_hash_evidence_batch, }; use std::collections::HashSet; use std::thread; @@ -547,25 +549,39 @@ fn rsync_sync_into_raw_objects( let _proj = timing .as_ref() .map(|t| t.span_phase("rsync_write_repository_view_total")); + let prepared_raw = prepare_raw_by_hash_evidence_batch(store, &objects) + .map_err(RepoSyncError::Storage)?; + let mut repository_view_entries = Vec::new(); for entry in existing_view { if !new_set.contains(entry.rsync_uri.as_str()) { - put_repository_view_withdrawn( - store, + repository_view_entries.push(build_repository_view_withdrawn_entry( rsync_base_uri, &entry.rsync_uri, entry.current_hash, - ) - .map_err(RepoSyncError::Storage)?; + )); } } - for (uri, bytes) in &objects { - let current_hash = - upsert_raw_by_hash_evidence(store, uri, bytes).map_err(RepoSyncError::Storage)?; - put_repository_view_present(store, rsync_base_uri, uri, ¤t_hash) - .map_err(RepoSyncError::Storage)?; + for (uri, _bytes) in &objects { + let current_hash = prepared_raw + .uri_to_hash + .get(uri) + .cloned() + .ok_or_else(|| RepoSyncError::Storage(format!("missing raw_by_hash mapping for {uri}")))?; + repository_view_entries.push(build_repository_view_present_entry( + rsync_base_uri, + uri, + ¤t_hash, + )); } + store + .put_raw_by_hash_entries_batch_unchecked(&prepared_raw.entries_to_write) + .map_err(|e| RepoSyncError::Storage(e.to_string()))?; + store + .put_projection_batch(&repository_view_entries, &[], &[]) + .map_err(|e| RepoSyncError::Storage(e.to_string()))?; + Ok(objects.len()) } diff --git a/src/sync/rrdp.rs b/src/sync/rrdp.rs index 5628267..dbb9880 100644 --- a/src/sync/rrdp.rs +++ b/src/sync/rrdp.rs @@ -3,9 +3,14 @@ use crate::audit::AuditDownloadKind; use crate::audit_downloads::DownloadLogHandle; use crate::storage::{RocksStore, RrdpDeltaOp, RrdpSourceSyncState}; use crate::sync::store_projection::{ + build_repository_view_present_entry, build_repository_view_withdrawn_entry, + build_rrdp_source_member_present_record, build_rrdp_source_member_withdrawn_record, + build_rrdp_uri_owner_active_record, build_rrdp_uri_owner_withdrawn_record, compute_sha256_hex, current_rrdp_owner_is, ensure_rrdp_uri_can_be_owned_by, - put_repository_view_present, put_repository_view_withdrawn, put_rrdp_source_member_present, - put_rrdp_source_member_withdrawn, put_rrdp_uri_owner_active, put_rrdp_uri_owner_withdrawn, + prepare_raw_by_hash_evidence_batch, + put_repository_view_present, put_repository_view_withdrawn, + put_rrdp_source_member_present, put_rrdp_source_member_withdrawn, + put_rrdp_uri_owner_active, put_rrdp_uri_owner_withdrawn, update_rrdp_source_record_on_success, upsert_raw_by_hash_evidence, }; use base64::Engine; @@ -1212,56 +1217,72 @@ fn apply_snapshot( .map_err(|e| RrdpSyncError::Storage(e.to_string()))?; let session_id = expected_session_id.to_string(); - for (uri, bytes) in &published { - let current_hash = - upsert_raw_by_hash_evidence(store, uri, bytes).map_err(RrdpSyncError::Storage)?; - put_repository_view_present(store, notification_uri, uri, ¤t_hash) - .map_err(RrdpSyncError::Storage)?; - put_rrdp_source_member_present( - store, + let prepared_raw = prepare_raw_by_hash_evidence_batch(store, &published) + .map_err(RrdpSyncError::Storage)?; + let mut repository_view_entries = Vec::with_capacity(published.len() + withdrawn.len()); + let mut member_records = Vec::with_capacity(published.len() + withdrawn.len()); + let mut owner_records = Vec::with_capacity(published.len() + withdrawn.len()); + + for (uri, _bytes) in &published { + let current_hash = prepared_raw + .uri_to_hash + .get(uri) + .cloned() + .ok_or_else(|| { + RrdpSyncError::Storage(format!("missing raw_by_hash mapping for {uri}")) + })?; + repository_view_entries.push(build_repository_view_present_entry( + notification_uri, + uri, + ¤t_hash, + )); + member_records.push(build_rrdp_source_member_present_record( notification_uri, &session_id, expected_serial, uri, ¤t_hash, - ) - .map_err(RrdpSyncError::Storage)?; - put_rrdp_uri_owner_active( - store, + )); + owner_records.push(build_rrdp_uri_owner_active_record( notification_uri, &session_id, expected_serial, uri, ¤t_hash, - ) - .map_err(RrdpSyncError::Storage)?; + )); } for (uri, previous_hash) in withdrawn { - put_rrdp_source_member_withdrawn( - store, + member_records.push(build_rrdp_source_member_withdrawn_record( notification_uri, &session_id, expected_serial, &uri, previous_hash.clone(), - ) - .map_err(RrdpSyncError::Storage)?; + )); if current_rrdp_owner_is(store, notification_uri, &uri).map_err(RrdpSyncError::Storage)? { - put_repository_view_withdrawn(store, notification_uri, &uri, previous_hash.clone()) - .map_err(RrdpSyncError::Storage)?; - put_rrdp_uri_owner_withdrawn( - store, + repository_view_entries.push(build_repository_view_withdrawn_entry( + notification_uri, + &uri, + previous_hash.clone(), + )); + owner_records.push(build_rrdp_uri_owner_withdrawn_record( notification_uri, &session_id, expected_serial, &uri, previous_hash, - ) - .map_err(RrdpSyncError::Storage)?; + )); } } + store + .put_raw_by_hash_entries_batch_unchecked(&prepared_raw.entries_to_write) + .map_err(|e| RrdpSyncError::Storage(e.to_string()))?; + store + .put_projection_batch(&repository_view_entries, &member_records, &owner_records) + .map_err(|e| RrdpSyncError::Storage(e.to_string()))?; + Ok(published.len()) } diff --git a/src/sync/store_projection.rs b/src/sync/store_projection.rs index bb898a8..622f983 100644 --- a/src/sync/store_projection.rs +++ b/src/sync/store_projection.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::collections::BTreeMap; use crate::storage::{ PackTime, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore, @@ -7,6 +7,78 @@ use crate::storage::{ }; use sha2::Digest; +pub struct PreparedRawByHashBatch { + pub uri_to_hash: BTreeMap, + pub entries_to_write: Vec, +} + +pub fn prepare_raw_by_hash_evidence_batch( + store: &RocksStore, + objects: &[(String, Vec)], +) -> Result { + let mut pending: BTreeMap = BTreeMap::new(); + let mut uri_to_hash: BTreeMap = BTreeMap::new(); + + for (uri, bytes) in objects { + let sha256_hex = compute_sha256_hex(bytes); + uri_to_hash.insert(uri.clone(), sha256_hex.clone()); + let entry = pending + .entry(sha256_hex.clone()) + .or_insert_with(|| RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.clone())); + + if entry.bytes != *bytes { + return Err(format!( + "raw_by_hash collision for {uri}: same sha256 maps to different bytes" + )); + } + if !entry.origin_uris.iter().any(|existing| existing == uri) { + entry.origin_uris.push(uri.clone()); + } + if entry.object_type.is_none() { + entry.object_type = infer_object_type_from_uri(uri); + } + } + + let hashes: Vec = pending.keys().cloned().collect(); + let existing_entries = store + .get_raw_by_hash_entries_batch(&hashes) + .map_err(|e| e.to_string())?; + + let mut entries_to_write = Vec::new(); + for (hash, existing_opt) in hashes.into_iter().zip(existing_entries.into_iter()) { + let mut pending_entry = pending.remove(&hash).expect("pending raw_by_hash entry"); + match existing_opt { + Some(mut existing) => { + if existing.bytes != pending_entry.bytes { + return Err(format!( + "raw_by_hash collision for hash {hash}: same sha256 maps to different bytes" + )); + } + let mut changed = false; + for uri in pending_entry.origin_uris.drain(..) { + if !existing.origin_uris.iter().any(|existing_uri| existing_uri == &uri) { + existing.origin_uris.push(uri); + changed = true; + } + } + if existing.object_type.is_none() && pending_entry.object_type.is_some() { + existing.object_type = pending_entry.object_type; + changed = true; + } + if changed { + entries_to_write.push(existing); + } + } + None => entries_to_write.push(pending_entry), + } + } + + Ok(PreparedRawByHashBatch { + uri_to_hash, + entries_to_write, + }) +} + pub fn infer_object_type_from_uri(uri: &str) -> Option { let ext = uri.rsplit('.').next()?; let ext = ext.to_ascii_lowercase(); @@ -16,36 +88,125 @@ pub fn infer_object_type_from_uri(uri: &str) -> Option { } } +pub fn build_repository_view_present_entry( + repository_source: &str, + rsync_uri: &str, + current_hash: &str, +) -> RepositoryViewEntry { + RepositoryViewEntry { + rsync_uri: rsync_uri.to_string(), + current_hash: Some(current_hash.to_string()), + repository_source: Some(repository_source.to_string()), + object_type: infer_object_type_from_uri(rsync_uri), + state: RepositoryViewState::Present, + } +} + +pub fn build_repository_view_withdrawn_entry( + repository_source: &str, + rsync_uri: &str, + current_hash: Option, +) -> RepositoryViewEntry { + RepositoryViewEntry { + rsync_uri: rsync_uri.to_string(), + current_hash, + repository_source: Some(repository_source.to_string()), + object_type: infer_object_type_from_uri(rsync_uri), + state: RepositoryViewState::Withdrawn, + } +} + +pub fn build_rrdp_source_member_present_record( + notification_uri: &str, + session_id: &str, + serial: u64, + rsync_uri: &str, + current_hash: &str, +) -> RrdpSourceMemberRecord { + RrdpSourceMemberRecord { + notify_uri: notification_uri.to_string(), + rsync_uri: rsync_uri.to_string(), + current_hash: Some(current_hash.to_string()), + object_type: infer_object_type_from_uri(rsync_uri), + present: true, + last_confirmed_session_id: session_id.to_string(), + last_confirmed_serial: serial, + last_changed_at: now_pack_time(), + } +} + +pub fn build_rrdp_source_member_withdrawn_record( + notification_uri: &str, + session_id: &str, + serial: u64, + rsync_uri: &str, + current_hash: Option, +) -> RrdpSourceMemberRecord { + RrdpSourceMemberRecord { + notify_uri: notification_uri.to_string(), + rsync_uri: rsync_uri.to_string(), + current_hash, + object_type: infer_object_type_from_uri(rsync_uri), + present: false, + last_confirmed_session_id: session_id.to_string(), + last_confirmed_serial: serial, + last_changed_at: now_pack_time(), + } +} + +pub fn build_rrdp_uri_owner_active_record( + notification_uri: &str, + session_id: &str, + serial: u64, + rsync_uri: &str, + current_hash: &str, +) -> RrdpUriOwnerRecord { + RrdpUriOwnerRecord { + rsync_uri: rsync_uri.to_string(), + notify_uri: notification_uri.to_string(), + current_hash: Some(current_hash.to_string()), + last_confirmed_session_id: session_id.to_string(), + last_confirmed_serial: serial, + last_changed_at: now_pack_time(), + owner_state: RrdpUriOwnerState::Active, + } +} + +pub fn build_rrdp_uri_owner_withdrawn_record( + notification_uri: &str, + session_id: &str, + serial: u64, + rsync_uri: &str, + current_hash: Option, +) -> RrdpUriOwnerRecord { + RrdpUriOwnerRecord { + rsync_uri: rsync_uri.to_string(), + notify_uri: notification_uri.to_string(), + current_hash, + last_confirmed_session_id: session_id.to_string(), + last_confirmed_serial: serial, + last_changed_at: now_pack_time(), + owner_state: RrdpUriOwnerState::Withdrawn, + } +} + pub fn upsert_raw_by_hash_evidence( store: &RocksStore, rsync_uri: &str, bytes: &[u8], ) -> Result { - let sha256_hex = compute_sha256_hex(bytes); - let mut entry = match store - .get_raw_by_hash_entry(&sha256_hex) - .map_err(|e| e.to_string())? - { - Some(existing) => existing, - None => RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.to_vec()), - }; - - if entry.bytes != bytes { - return Err(format!( - "raw_by_hash collision for {rsync_uri}: same sha256 maps to different bytes" - )); - } - - let mut origins: BTreeSet = entry.origin_uris.into_iter().collect(); - origins.insert(rsync_uri.to_string()); - entry.origin_uris = origins.into_iter().collect(); - if entry.object_type.is_none() { - entry.object_type = infer_object_type_from_uri(rsync_uri); - } - + let prepared = prepare_raw_by_hash_evidence_batch( + store, + &[(rsync_uri.to_string(), bytes.to_vec())], + )?; store - .put_raw_by_hash_entry(&entry) + .put_raw_by_hash_entries_batch_unchecked(&prepared.entries_to_write) .map_err(|e| e.to_string())?; + let sha256_hex = prepared + .uri_to_hash + .get(rsync_uri) + .cloned() + .expect("raw_by_hash mapping for upsert input"); Ok(sha256_hex) } @@ -55,13 +216,7 @@ pub fn put_repository_view_present( rsync_uri: &str, current_hash: &str, ) -> Result<(), String> { - let entry = RepositoryViewEntry { - rsync_uri: rsync_uri.to_string(), - current_hash: Some(current_hash.to_string()), - repository_source: Some(repository_source.to_string()), - object_type: infer_object_type_from_uri(rsync_uri), - state: RepositoryViewState::Present, - }; + let entry = build_repository_view_present_entry(repository_source, rsync_uri, current_hash); store .put_repository_view_entry(&entry) .map_err(|e| e.to_string()) @@ -73,13 +228,7 @@ pub fn put_repository_view_withdrawn( rsync_uri: &str, current_hash: Option, ) -> Result<(), String> { - let entry = RepositoryViewEntry { - rsync_uri: rsync_uri.to_string(), - current_hash, - repository_source: Some(repository_source.to_string()), - object_type: infer_object_type_from_uri(rsync_uri), - state: RepositoryViewState::Withdrawn, - }; + let entry = build_repository_view_withdrawn_entry(repository_source, rsync_uri, current_hash); store .put_repository_view_entry(&entry) .map_err(|e| e.to_string()) @@ -128,16 +277,7 @@ pub fn put_rrdp_source_member_present( rsync_uri: &str, current_hash: &str, ) -> Result<(), String> { - let record = RrdpSourceMemberRecord { - notify_uri: notification_uri.to_string(), - rsync_uri: rsync_uri.to_string(), - current_hash: Some(current_hash.to_string()), - object_type: infer_object_type_from_uri(rsync_uri), - present: true, - last_confirmed_session_id: session_id.to_string(), - last_confirmed_serial: serial, - last_changed_at: now_pack_time(), - }; + let record = build_rrdp_source_member_present_record(notification_uri, session_id, serial, rsync_uri, current_hash); store .put_rrdp_source_member_record(&record) .map_err(|e| e.to_string()) @@ -151,16 +291,7 @@ pub fn put_rrdp_source_member_withdrawn( rsync_uri: &str, current_hash: Option, ) -> Result<(), String> { - let record = RrdpSourceMemberRecord { - notify_uri: notification_uri.to_string(), - rsync_uri: rsync_uri.to_string(), - current_hash, - object_type: infer_object_type_from_uri(rsync_uri), - present: false, - last_confirmed_session_id: session_id.to_string(), - last_confirmed_serial: serial, - last_changed_at: now_pack_time(), - }; + let record = build_rrdp_source_member_withdrawn_record(notification_uri, session_id, serial, rsync_uri, current_hash); store .put_rrdp_source_member_record(&record) .map_err(|e| e.to_string()) @@ -174,15 +305,7 @@ pub fn put_rrdp_uri_owner_active( rsync_uri: &str, current_hash: &str, ) -> Result<(), String> { - let record = RrdpUriOwnerRecord { - rsync_uri: rsync_uri.to_string(), - notify_uri: notification_uri.to_string(), - current_hash: Some(current_hash.to_string()), - last_confirmed_session_id: session_id.to_string(), - last_confirmed_serial: serial, - last_changed_at: now_pack_time(), - owner_state: RrdpUriOwnerState::Active, - }; + let record = build_rrdp_uri_owner_active_record(notification_uri, session_id, serial, rsync_uri, current_hash); store .put_rrdp_uri_owner_record(&record) .map_err(|e| e.to_string()) @@ -196,15 +319,7 @@ pub fn put_rrdp_uri_owner_withdrawn( rsync_uri: &str, current_hash: Option, ) -> Result<(), String> { - let record = RrdpUriOwnerRecord { - rsync_uri: rsync_uri.to_string(), - notify_uri: notification_uri.to_string(), - current_hash, - last_confirmed_session_id: session_id.to_string(), - last_confirmed_serial: serial, - last_changed_at: now_pack_time(), - owner_state: RrdpUriOwnerState::Withdrawn, - }; + let record = build_rrdp_uri_owner_withdrawn_record(notification_uri, session_id, serial, rsync_uri, current_hash); store .put_rrdp_uri_owner_record(&record) .map_err(|e| e.to_string()) diff --git a/src/validation/ca_path.rs b/src/validation/ca_path.rs index 618495b..f8ef202 100644 --- a/src/validation/ca_path.rs +++ b/src/validation/ca_path.rs @@ -8,7 +8,7 @@ use crate::data_model::rc::{ use x509_parser::prelude::{FromDer, X509Certificate}; use crate::validation::x509_name::x509_names_equivalent; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; use x509_parser::x509::SubjectPublicKeyInfo; #[derive(Clone, Debug, PartialEq, Eq)] @@ -27,6 +27,44 @@ pub struct ValidatedSubordinateCaLite { pub effective_as_resources: Option, } +#[derive(Clone, Debug, Default)] +pub struct IssuerEffectiveResourcesIndex { + parent_ip_by_afi_items: Option>>, + parent_ip_merged_intervals: HashMap, Vec)>>, + parent_asnum_intervals: Option>, + parent_rdi_intervals: Option>, +} + +impl IssuerEffectiveResourcesIndex { + pub fn from_effective_resources( + issuer_effective_ip: Option<&IpResourceSet>, + issuer_effective_as: Option<&AsResourceSet>, + ) -> Result { + let parent_ip_by_afi_items = issuer_effective_ip.map(ip_resources_by_afi_items).transpose()?; + + let parent_ip_merged_intervals = issuer_effective_ip + .map(ip_resources_to_merged_intervals_by_afi) + .unwrap_or_default(); + + let parent_asnum_intervals = issuer_effective_as.and_then(|resources| { + resources + .asnum + .as_ref() + .map(as_choice_to_merged_intervals) + }); + let parent_rdi_intervals = issuer_effective_as.and_then(|resources| { + resources.rdi.as_ref().map(as_choice_to_merged_intervals) + }); + + Ok(Self { + parent_ip_by_afi_items, + parent_ip_merged_intervals, + parent_asnum_intervals, + parent_rdi_intervals, + }) + } +} + #[derive(Debug, thiserror::Error)] pub enum CaPathError { #[error("child CA certificate decode failed: {0} (RFC 6487 §4; RFC 5280 §4.1)")] @@ -231,6 +269,40 @@ pub fn validate_subordinate_ca_cert_with_prevalidated_issuer( issuer_effective_ip: Option<&IpResourceSet>, issuer_effective_as: Option<&AsResourceSet>, validation_time: time::OffsetDateTime, +) -> Result { + let issuer_resources_index = IssuerEffectiveResourcesIndex::from_effective_resources( + issuer_effective_ip, + issuer_effective_as, + )?; + validate_subordinate_ca_cert_with_prevalidated_issuer_and_resources( + child_ca_der, + child_ca, + issuer_ca, + issuer_spki, + issuer_crl, + issuer_crl_revoked_serials, + issuer_ca_rsync_uri, + issuer_crl_rsync_uri, + issuer_effective_ip, + issuer_effective_as, + &issuer_resources_index, + validation_time, + ) +} + +pub fn validate_subordinate_ca_cert_with_prevalidated_issuer_and_resources( + child_ca_der: &[u8], + child_ca: ResourceCertificate, + issuer_ca: &ResourceCertificate, + issuer_spki: &SubjectPublicKeyInfo<'_>, + issuer_crl: &RpkixCrl, + issuer_crl_revoked_serials: &HashSet>, + issuer_ca_rsync_uri: Option<&str>, + issuer_crl_rsync_uri: &str, + issuer_effective_ip: Option<&IpResourceSet>, + issuer_effective_as: Option<&AsResourceSet>, + issuer_resources_index: &IssuerEffectiveResourcesIndex, + validation_time: time::OffsetDateTime, ) -> Result { if child_ca.kind != ResourceCertKind::Ca { return Err(CaPathError::ChildNotCa); @@ -277,13 +349,17 @@ pub fn validate_subordinate_ca_cert_with_prevalidated_issuer( return Err(CaPathError::ChildRevoked); } - let effective_ip_resources = resolve_child_ip_resources( + let effective_ip_resources = resolve_child_ip_resources_indexed( child_ca.tbs.extensions.ip_resources.as_ref(), issuer_effective_ip, + issuer_resources_index.parent_ip_by_afi_items.as_ref(), + &issuer_resources_index.parent_ip_merged_intervals, )?; - let effective_as_resources = resolve_child_as_resources( + let effective_as_resources = resolve_child_as_resources_indexed( child_ca.tbs.extensions.as_resources.as_ref(), issuer_effective_as, + issuer_resources_index.parent_asnum_intervals.as_deref(), + issuer_resources_index.parent_rdi_intervals.as_deref(), )?; if effective_ip_resources.is_none() && effective_as_resources.is_none() { return Err(CaPathError::ResourcesMissing); @@ -435,12 +511,30 @@ fn is_serial_revoked_by_crl(cert: &ResourceCertificate, crl: &RpkixCrl) -> bool fn resolve_child_ip_resources( child_ip: Option<&IpResourceSet>, issuer_effective: Option<&IpResourceSet>, +) -> Result, CaPathError> { + let precomputed_parent_by_afi = issuer_effective.map(ip_resources_by_afi_items).transpose()?; + let precomputed_parent_intervals = issuer_effective + .map(ip_resources_to_merged_intervals_by_afi) + .unwrap_or_default(); + resolve_child_ip_resources_indexed( + child_ip, + issuer_effective, + precomputed_parent_by_afi.as_ref(), + &precomputed_parent_intervals, + ) +} + +fn resolve_child_ip_resources_indexed( + child_ip: Option<&IpResourceSet>, + issuer_effective: Option<&IpResourceSet>, + parent_by_afi: Option<&BTreeMap>>, + parent_intervals_by_afi: &HashMap, Vec)>>, ) -> Result, CaPathError> { let Some(child_ip) = child_ip else { return Ok(None); }; - let Some(parent) = issuer_effective else { + let Some(_parent) = issuer_effective else { if child_ip.has_any_inherit() { return Err(CaPathError::InheritWithoutParentResources); } @@ -449,7 +543,7 @@ fn resolve_child_ip_resources( }; // Resolve per-AFI inherit, producing an effective set with no inherit. - let parent_by_afi = ip_resources_by_afi_items(parent)?; + let parent_by_afi = parent_by_afi.ok_or(CaPathError::InheritWithoutParentResources)?; let mut out_families: Vec = Vec::new(); for fam in &child_ip.families { @@ -465,9 +559,11 @@ fn resolve_child_ip_resources( } IpAddressChoice::AddressesOrRanges(items) => { // Subset check against parent union for that AFI. - let parent_set = - ip_resources_single_afi(parent, fam.afi, parent_by_afi.get(&fam.afi)); - if !ip_family_items_subset(items, &parent_set) { + let parent_intervals = parent_intervals_by_afi + .get(&fam.afi) + .map(Vec::as_slice) + .unwrap_or(&[]); + if !ip_family_items_subset_with_parent_intervals(items, parent_intervals) { return Err(CaPathError::ResourcesNotSubset); } out_families.push(crate::data_model::rc::IpAddressFamily { @@ -486,6 +582,29 @@ fn resolve_child_ip_resources( fn resolve_child_as_resources( child_as: Option<&AsResourceSet>, issuer_effective: Option<&AsResourceSet>, +) -> Result, CaPathError> { + let precomputed_asnum = issuer_effective.and_then(|resources| { + resources + .asnum + .as_ref() + .map(as_choice_to_merged_intervals) + }); + let precomputed_rdi = issuer_effective.and_then(|resources| { + resources.rdi.as_ref().map(as_choice_to_merged_intervals) + }); + resolve_child_as_resources_indexed( + child_as, + issuer_effective, + precomputed_asnum.as_deref(), + precomputed_rdi.as_deref(), + ) +} + +fn resolve_child_as_resources_indexed( + child_as: Option<&AsResourceSet>, + issuer_effective: Option<&AsResourceSet>, + parent_asnum_intervals: Option<&[(u32, u32)]>, + parent_rdi_intervals: Option<&[(u32, u32)]>, ) -> Result, CaPathError> { let Some(child_as) = child_as else { return Ok(None); @@ -507,7 +626,11 @@ fn resolve_child_as_resources( .ok_or(CaPathError::InheritWithoutParentResources) .map(Some)?, Some(_) => { - if !as_choice_subset(child_as.asnum.as_ref(), parent.asnum.as_ref()) { + if !as_choice_subset_with_parent_intervals( + child_as.asnum.as_ref(), + parent.asnum.as_ref(), + parent_asnum_intervals, + ) { return Err(CaPathError::ResourcesNotSubset); } child_as.asnum.clone() @@ -522,7 +645,11 @@ fn resolve_child_as_resources( .ok_or(CaPathError::InheritWithoutParentResources) .map(Some)?, Some(_) => { - if !as_choice_subset(child_as.rdi.as_ref(), parent.rdi.as_ref()) { + if !as_choice_subset_with_parent_intervals( + child_as.rdi.as_ref(), + parent.rdi.as_ref(), + parent_rdi_intervals, + ) { return Err(CaPathError::ResourcesNotSubset); } child_as.rdi.clone() @@ -535,6 +662,14 @@ fn resolve_child_as_resources( fn as_choice_subset( child: Option<&AsIdentifierChoice>, parent: Option<&AsIdentifierChoice>, +) -> bool { + as_choice_subset_with_parent_intervals(child, parent, None) +} + +fn as_choice_subset_with_parent_intervals( + child: Option<&AsIdentifierChoice>, + parent: Option<&AsIdentifierChoice>, + parent_intervals_hint: Option<&[(u32, u32)]>, ) -> bool { let Some(child) = child else { return true; @@ -552,9 +687,16 @@ fn as_choice_subset( } let child_intervals = as_choice_to_merged_intervals(child); - let parent_intervals = as_choice_to_merged_intervals(parent); + let owned_parent_intervals; + let parent_intervals = match parent_intervals_hint { + Some(intervals) => intervals, + None => { + owned_parent_intervals = as_choice_to_merged_intervals(parent); + owned_parent_intervals.as_slice() + } + }; for (cmin, cmax) in &child_intervals { - if !as_interval_is_covered(&parent_intervals, *cmin, *cmax) { + if !as_interval_is_covered(parent_intervals, *cmin, *cmax) { return false; } } @@ -692,21 +834,33 @@ fn ip_family_items_subset( fn ip_resources_to_merged_intervals( set: &IpResourceSet, ) -> std::collections::HashMap, Vec)>> { - let mut m: std::collections::HashMap, Vec)>> = - std::collections::HashMap::new(); + let m = ip_resources_to_merged_intervals_by_afi(set); + m.into_iter() + .map(|(afi, ranges)| { + ( + match afi { + crate::data_model::rc::Afi::Ipv4 => AfiKey::V4, + crate::data_model::rc::Afi::Ipv6 => AfiKey::V6, + }, + ranges, + ) + }) + .collect() +} + +fn ip_resources_to_merged_intervals_by_afi( + set: &IpResourceSet, +) -> HashMap, Vec)>> { + let mut m: HashMap, Vec)>> = HashMap::new(); for fam in &set.families { - let afi = match fam.afi { - crate::data_model::rc::Afi::Ipv4 => AfiKey::V4, - crate::data_model::rc::Afi::Ipv6 => AfiKey::V6, - }; match &fam.choice { IpAddressChoice::Inherit => { // When used in subset checks, treat inherit as "all" by leaving it absent. // Resolution should have happened earlier. } IpAddressChoice::AddressesOrRanges(items) => { - let ent = m.entry(afi).or_default(); + let ent = m.entry(fam.afi).or_default(); for item in items { match item { crate::data_model::rc::IpAddressOrRange::Prefix(p) => { @@ -730,6 +884,36 @@ fn ip_resources_to_merged_intervals( m } +fn ip_family_items_subset_with_parent_intervals( + child_items: &[crate::data_model::rc::IpAddressOrRange], + parent_intervals: &[(Vec, Vec)], +) -> bool { + if parent_intervals.is_empty() { + return false; + } + + let mut child_intervals: Vec<(Vec, Vec)> = Vec::new(); + for item in child_items { + match item { + crate::data_model::rc::IpAddressOrRange::Prefix(p) => { + child_intervals.push(prefix_to_range(p)) + } + crate::data_model::rc::IpAddressOrRange::Range(r) => { + child_intervals.push((r.min.clone(), r.max.clone())) + } + } + } + child_intervals.sort_by(|(a, _), (b, _)| a.cmp(b)); + let child_intervals = merge_ip_intervals(&child_intervals); + + for (cmin, cmax) in &child_intervals { + if !interval_is_covered(parent_intervals, cmin, cmax) { + return false; + } + } + true +} + fn merge_ip_intervals(v: &[(Vec, Vec)]) -> Vec<(Vec, Vec)> { let mut out: Vec<(Vec, Vec)> = Vec::new(); for (min, max) in v { @@ -790,7 +974,8 @@ mod tests { use super::*; use crate::data_model::common::X509NameDer; use crate::data_model::rc::{ - Afi, AsIdentifierChoice, AsResourceSet, IpAddressChoice, IpAddressFamily, IpResourceSet, + Afi, AsIdentifierChoice, AsResourceSet, IpAddressChoice, IpAddressFamily, + IpAddressOrRange, IpResourceSet, }; use crate::data_model::rc::{ RcExtensions, ResourceCertKind, ResourceCertificate, RpkixTbsCertificate, @@ -1268,6 +1453,105 @@ mod tests { ); } + #[test] + fn issuer_effective_resources_index_and_indexed_resolvers_cover_success_and_failure_paths() { + use crate::data_model::rc::{AsIdOrRange, IpPrefix}; + + let parent_ip = IpResourceSet { + families: vec![IpAddressFamily { + afi: Afi::Ipv4, + choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix( + IpPrefix { + afi: Afi::Ipv4, + prefix_len: 8, + addr: vec![10, 0, 0, 0], + }, + )]), + }], + }; + let parent_as = AsResourceSet { + asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Range { + min: 64500, + max: 64599, + }])), + rdi: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(65000)])), + }; + let idx = IssuerEffectiveResourcesIndex::from_effective_resources(Some(&parent_ip), Some(&parent_as)) + .expect("index builds"); + assert_eq!(idx.parent_ip_by_afi_items.as_ref().map(|v| v.len()), Some(1)); + assert_eq!(idx.parent_ip_merged_intervals.len(), 1); + assert_eq!(idx.parent_asnum_intervals.as_ref().map(|v| v.len()), Some(1)); + assert_eq!(idx.parent_rdi_intervals.as_ref().map(|v| v.len()), Some(1)); + + let child_ip_subset = IpResourceSet { + families: vec![IpAddressFamily { + afi: Afi::Ipv4, + choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix( + IpPrefix { + afi: Afi::Ipv4, + prefix_len: 16, + addr: vec![10, 1, 0, 0], + }, + )]), + }], + }; + assert!(resolve_child_ip_resources_indexed( + Some(&child_ip_subset), + Some(&parent_ip), + idx.parent_ip_by_afi_items.as_ref(), + &idx.parent_ip_merged_intervals, + ) + .expect("subset should resolve") + .is_some()); + + let child_ip_bad = IpResourceSet { + families: vec![IpAddressFamily { + afi: Afi::Ipv4, + choice: IpAddressChoice::AddressesOrRanges(vec![IpAddressOrRange::Prefix( + IpPrefix { + afi: Afi::Ipv4, + prefix_len: 16, + addr: vec![11, 0, 0, 0], + }, + )]), + }], + }; + let err = resolve_child_ip_resources_indexed( + Some(&child_ip_bad), + Some(&parent_ip), + idx.parent_ip_by_afi_items.as_ref(), + &idx.parent_ip_merged_intervals, + ) + .unwrap_err(); + assert!(matches!(err, CaPathError::ResourcesNotSubset)); + + let child_as_subset = AsResourceSet { + asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(64542)])), + rdi: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(65000)])), + }; + assert!(resolve_child_as_resources_indexed( + Some(&child_as_subset), + Some(&parent_as), + idx.parent_asnum_intervals.as_deref(), + idx.parent_rdi_intervals.as_deref(), + ) + .expect("subset as resolves") + .is_some()); + + let child_as_bad = AsResourceSet { + asnum: Some(AsIdentifierChoice::AsIdsOrRanges(vec![AsIdOrRange::Id(65123)])), + rdi: None, + }; + let err = resolve_child_as_resources_indexed( + Some(&child_as_bad), + Some(&parent_as), + idx.parent_asnum_intervals.as_deref(), + idx.parent_rdi_intervals.as_deref(), + ) + .unwrap_err(); + assert!(matches!(err, CaPathError::ResourcesNotSubset)); + } + #[test] fn resolve_child_ip_and_as_resources_success_paths() { use crate::data_model::rc::{AsIdOrRange, IpAddressOrRange, IpPrefix}; diff --git a/src/validation/cert_path.rs b/src/validation/cert_path.rs index a9a742f..6d7dbfc 100644 --- a/src/validation/cert_path.rs +++ b/src/validation/cert_path.rs @@ -193,6 +193,59 @@ pub fn validate_ee_cert_path_with_prevalidated_issuer( validation_time: time::OffsetDateTime, ) -> Result { let ee = ResourceCertificate::decode_der(ee_cert_der)?; + validate_ee_cert_path_components( + &ee, + ee_cert_der, + issuer_ca, + issuer_spki, + issuer_crl, + issuer_crl_revoked_serials, + issuer_ca_rsync_uri, + issuer_crl_rsync_uri, + validation_time, + )?; + Ok(ee) +} + +/// Validate the EE certificate path using a pre-decoded EE certificate and a pre-validated issuer. +/// +/// This avoids re-decoding the embedded EE certificate when the caller already parsed it while +/// decoding a signed object (e.g. ROA / ASPA). +pub fn validate_ee_cert_path_with_predecoded_ee( + ee: &ResourceCertificate, + ee_cert_der: &[u8], + issuer_ca: &ResourceCertificate, + issuer_spki: &SubjectPublicKeyInfo<'_>, + issuer_crl: &RpkixCrl, + issuer_crl_revoked_serials: &HashSet>, + issuer_ca_rsync_uri: Option<&str>, + issuer_crl_rsync_uri: Option<&str>, + validation_time: time::OffsetDateTime, +) -> Result<(), CertPathError> { + validate_ee_cert_path_components( + ee, + ee_cert_der, + issuer_ca, + issuer_spki, + issuer_crl, + issuer_crl_revoked_serials, + issuer_ca_rsync_uri, + issuer_crl_rsync_uri, + validation_time, + ) +} + +fn validate_ee_cert_path_components( + ee: &ResourceCertificate, + ee_cert_der: &[u8], + issuer_ca: &ResourceCertificate, + issuer_spki: &SubjectPublicKeyInfo<'_>, + issuer_crl: &RpkixCrl, + issuer_crl_revoked_serials: &HashSet>, + issuer_ca_rsync_uri: Option<&str>, + issuer_crl_rsync_uri: Option<&str>, + validation_time: time::OffsetDateTime, +) -> Result<(), CertPathError> { if ee.kind != ResourceCertKind::Ee { return Err(CertPathError::EeNotEe); } @@ -207,12 +260,12 @@ pub fn validate_ee_cert_path_with_prevalidated_issuer( }); } - validate_ee_aki_matches_issuer_ski(&ee, issuer_ca)?; + validate_ee_aki_matches_issuer_ski(ee, issuer_ca)?; if let Some(expected_issuer_uri) = issuer_ca_rsync_uri { - validate_ee_aia_points_to_issuer_uri(&ee, expected_issuer_uri)?; + validate_ee_aia_points_to_issuer_uri(ee, expected_issuer_uri)?; } if let Some(expected_crl_uri) = issuer_crl_rsync_uri { - validate_ee_crldp_contains_issuer_crl_uri(&ee, expected_crl_uri)?; + validate_ee_crldp_contains_issuer_crl_uri(ee, expected_crl_uri)?; } if !time_within_validity( @@ -240,7 +293,7 @@ pub fn validate_ee_cert_path_with_prevalidated_issuer( return Err(CertPathError::EeRevoked); } - Ok(ee) + Ok(()) } fn parse_subject_pki_from_der(der: &[u8]) -> Result, CertPathError> { diff --git a/src/validation/objects.rs b/src/validation/objects.rs index 226e0d1..68a94da 100644 --- a/src/validation/objects.rs +++ b/src/validation/objects.rs @@ -10,8 +10,10 @@ use crate::data_model::roa::{IpPrefix, RoaAfi, RoaDecodeError, RoaObject, RoaVal use crate::data_model::signed_object::SignedObjectVerifyError; use crate::policy::{Policy, SignedObjectFailurePolicy}; use crate::report::{RfcRef, Warning}; -use crate::storage::PackFile; -use crate::validation::cert_path::{CertPathError, validate_ee_cert_path_with_prevalidated_issuer}; +use crate::storage::{PackFile, PackTime, VcirLocalOutput, VcirOutputType}; +use crate::validation::cert_path::{ + CertPathError, validate_ee_cert_path_with_predecoded_ee, +}; use crate::validation::manifest::PublicationPointData; use crate::validation::publication_point::PublicationPointSnapshot; use x509_parser::prelude::FromDer; @@ -67,6 +69,7 @@ pub struct AspaAttestation { pub struct ObjectsOutput { pub vrps: Vec, pub aspas: Vec, + pub local_outputs_cache: Vec, pub warnings: Vec, pub stats: ObjectsStats, pub audit: Vec, @@ -148,6 +151,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -171,6 +175,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -188,6 +193,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -246,6 +252,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -254,12 +261,14 @@ pub fn process_publication_point_for_issuer( let mut vrps: Vec = Vec::new(); let mut aspas: Vec = Vec::new(); + let mut local_outputs_cache: Vec = Vec::new(); for (idx, file) in locked_files.iter().enumerate() { if file.rsync_uri.ends_with(".roa") { let _t = timing.as_ref().map(|t| t.span_phase("objects_roa_total")); match process_roa_with_issuer( file, + manifest_rsync_uri, issuer_ca_der, &issuer_ca, &issuer_spki, @@ -271,9 +280,10 @@ pub fn process_publication_point_for_issuer( validation_time, timing, ) { - Ok(mut out) => { + Ok((mut out, local_outputs)) => { stats.roa_ok += 1; vrps.append(&mut out); + local_outputs_cache.extend(local_outputs); audit.push(ObjectAuditEntry { rsync_uri: file.rsync_uri.clone(), sha256_hex: sha256_hex_from_32(&file.sha256), @@ -346,6 +356,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -357,6 +368,7 @@ pub fn process_publication_point_for_issuer( let _t = timing.as_ref().map(|t| t.span_phase("objects_aspa_total")); match process_aspa_with_issuer( file, + manifest_rsync_uri, issuer_ca_der, &issuer_ca, &issuer_spki, @@ -368,9 +380,10 @@ pub fn process_publication_point_for_issuer( validation_time, timing, ) { - Ok(att) => { + Ok((att, local_output)) => { stats.aspa_ok += 1; aspas.push(att); + local_outputs_cache.push(local_output); audit.push(ObjectAuditEntry { rsync_uri: file.rsync_uri.clone(), sha256_hex: sha256_hex_from_32(&file.sha256), @@ -443,6 +456,7 @@ pub fn process_publication_point_for_issuer( return ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings, stats, audit, @@ -456,6 +470,7 @@ pub fn process_publication_point_for_issuer( ObjectsOutput { vrps, aspas, + local_outputs_cache, warnings, stats, audit, @@ -537,6 +552,7 @@ enum ObjectValidateError { fn process_roa_with_issuer( file: &PackFile, + manifest_rsync_uri: &str, issuer_ca_der: &[u8], issuer_ca: &ResourceCertificate, issuer_spki: &SubjectPublicKeyInfo<'_>, @@ -547,7 +563,7 @@ fn process_roa_with_issuer( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, -) -> Result, ObjectValidateError> { +) -> Result<(Vec, Vec), ObjectValidateError> { let _decode = timing .as_ref() .map(|t| t.span_phase("objects_roa_decode_and_validate_total")); @@ -566,9 +582,9 @@ fn process_roa_with_issuer( roa.signed_object.verify()?; drop(_verify); + let ee = &roa.signed_object.signed_data.certificates[0].resource_cert; let ee_der = &roa.signed_object.signed_data.certificates[0].raw_der; - let ee_crldp_uris = roa.signed_object.signed_data.certificates[0] - .resource_cert + let ee_crldp_uris = ee .tbs .extensions .crl_distribution_points_uris @@ -579,7 +595,8 @@ fn process_roa_with_issuer( let _cert_path = timing .as_ref() .map(|t| t.span_phase("objects_roa_validate_ee_cert_path_total")); - let ee = validate_ee_cert_path_with_prevalidated_issuer( + validate_ee_cert_path_with_predecoded_ee( + ee, ee_der, issuer_ca, issuer_spki, @@ -602,11 +619,52 @@ fn process_roa_with_issuer( )?; drop(_subset); - Ok(roa_to_vrps(&roa)) + let vrps = roa_to_vrps(&roa); + let source_object_hash = sha256_hex_from_32(&file.sha256); + let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice()); + let item_effective_until = PackTime::from_utc_offset_datetime(ee.tbs.validity_not_after); + let local_outputs = vrps + .iter() + .map(|vrp| { + let prefix = vrp_prefix_to_string(vrp); + let payload_json = serde_json::json!({ + "asn": vrp.asn, + "prefix": prefix, + "max_length": vrp.max_length, + }) + .to_string(); + let rule_hash = crate::audit::sha256_hex( + format!( + "roa-rule:{}:{}:{}:{}", + source_object_hash, vrp.asn, prefix, vrp.max_length + ) + .as_bytes(), + ); + VcirLocalOutput { + output_id: rule_hash.clone(), + output_type: VcirOutputType::Vrp, + item_effective_until: item_effective_until.clone(), + source_object_uri: file.rsync_uri.clone(), + source_object_type: "roa".to_string(), + source_object_hash: source_object_hash.clone(), + source_ee_cert_hash: source_ee_cert_hash.clone(), + payload_json, + rule_hash, + validation_path_hint: vec![ + manifest_rsync_uri.to_string(), + file.rsync_uri.clone(), + source_object_hash.clone(), + ], + } + }) + .collect(); + + Ok((vrps, local_outputs)) } fn process_aspa_with_issuer( file: &PackFile, + manifest_rsync_uri: &str, issuer_ca_der: &[u8], issuer_ca: &ResourceCertificate, issuer_spki: &SubjectPublicKeyInfo<'_>, @@ -617,7 +675,7 @@ fn process_aspa_with_issuer( issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, validation_time: time::OffsetDateTime, timing: Option<&TimingHandle>, -) -> Result { +) -> Result<(AspaAttestation, VcirLocalOutput), ObjectValidateError> { let _decode = timing .as_ref() .map(|t| t.span_phase("objects_aspa_decode_and_validate_total")); @@ -636,9 +694,9 @@ fn process_aspa_with_issuer( aspa.signed_object.verify()?; drop(_verify); + let ee = &aspa.signed_object.signed_data.certificates[0].resource_cert; let ee_der = &aspa.signed_object.signed_data.certificates[0].raw_der; - let ee_crldp_uris = aspa.signed_object.signed_data.certificates[0] - .resource_cert + let ee_crldp_uris = ee .tbs .extensions .crl_distribution_points_uris @@ -649,7 +707,8 @@ fn process_aspa_with_issuer( let _cert_path = timing .as_ref() .map(|t| t.span_phase("objects_aspa_validate_ee_cert_path_total")); - let ee = validate_ee_cert_path_with_prevalidated_issuer( + validate_ee_cert_path_with_predecoded_ee( + ee, ee_der, issuer_ca, issuer_spki, @@ -672,10 +731,70 @@ fn process_aspa_with_issuer( )?; drop(_subset); - Ok(AspaAttestation { + let attestation = AspaAttestation { customer_as_id: aspa.aspa.customer_as_id, provider_as_ids: aspa.aspa.provider_as_ids.clone(), - }) + }; + let source_object_hash = sha256_hex_from_32(&file.sha256); + let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice()); + let item_effective_until = PackTime::from_utc_offset_datetime(ee.tbs.validity_not_after); + let providers = attestation + .provider_as_ids + .iter() + .map(u32::to_string) + .collect::>() + .join(","); + let rule_hash = crate::audit::sha256_hex( + format!( + "aspa-rule:{}:{}:{}", + source_object_hash, attestation.customer_as_id, providers + ) + .as_bytes(), + ); + let local_output = VcirLocalOutput { + output_id: rule_hash.clone(), + output_type: VcirOutputType::Aspa, + item_effective_until, + source_object_uri: file.rsync_uri.clone(), + source_object_type: "aspa".to_string(), + source_object_hash: source_object_hash.clone(), + source_ee_cert_hash, + payload_json: serde_json::json!({ + "customer_as_id": attestation.customer_as_id, + "provider_as_ids": attestation.provider_as_ids, + }) + .to_string(), + rule_hash, + validation_path_hint: vec![ + manifest_rsync_uri.to_string(), + file.rsync_uri.clone(), + source_object_hash, + ], + }; + + Ok((attestation, local_output)) +} + + +fn vrp_prefix_to_string(vrp: &Vrp) -> String { + let prefix = &vrp.prefix; + match prefix.afi { + RoaAfi::Ipv4 => { + let addr = std::net::Ipv4Addr::new( + prefix.addr[0], + prefix.addr[1], + prefix.addr[2], + prefix.addr[3], + ); + format!("{addr}/{}", prefix.prefix_len) + } + RoaAfi::Ipv6 => { + let mut octets = [0u8; 16]; + octets.copy_from_slice(&prefix.addr[..16]); + let addr = std::net::Ipv6Addr::from(octets); + format!("{addr}/{}", prefix.prefix_len) + } + } } fn choose_crl_uri_for_certificate<'a>( diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index b795aba..65ef955 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -15,7 +15,7 @@ use crate::replay::archive::ReplayArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::report::{RfcRef, Warning}; use crate::storage::{ - AuditRuleIndexEntry, AuditRuleKind, PackFile, PackTime, RawByHashEntry, RocksStore, + PackFile, PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirOutputType, VcirRelatedArtifact, VcirSummary, @@ -26,7 +26,8 @@ use crate::sync::repo::{ use crate::sync::rrdp::Fetcher; use crate::validation::ca_instance::ca_instance_uris_from_ca_certificate; use crate::validation::ca_path::{ - CaPathError, ValidatedSubordinateCaLite, validate_subordinate_ca_cert_with_prevalidated_issuer, + CaPathError, IssuerEffectiveResourcesIndex, ValidatedSubordinateCaLite, + validate_subordinate_ca_cert_with_prevalidated_issuer_and_resources, }; use crate::validation::manifest::{ ManifestFreshError, PublicationPointData, PublicationPointSource, @@ -483,6 +484,11 @@ fn discover_children_from_fresh_snapshot_with_audit( let mut out: Vec = Vec::new(); let mut audits: Vec = Vec::new(); + let issuer_resources_index = IssuerEffectiveResourcesIndex::from_effective_resources( + issuer.effective_ip_resources.as_ref(), + issuer.effective_as_resources.as_ref(), + ) + .map_err(|e| format!("build issuer effective resources index failed: {e}"))?; let mut cer_seen: u64 = 0; let mut ca_skipped_not_ca: u64 = 0; @@ -628,6 +634,7 @@ fn discover_children_from_fresh_snapshot_with_audit( issuer.ca_certificate_rsync_uri.as_deref(), issuer.effective_ip_resources.as_ref(), issuer.effective_as_resources.as_ref(), + &issuer_resources_index, validation_time, ) { Ok(v) => v, @@ -839,11 +846,12 @@ fn validate_subordinate_ca_cert_with_cached_issuer( issuer_ca_rsync_uri: Option<&str>, issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>, issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>, + issuer_resources_index: &IssuerEffectiveResourcesIndex, validation_time: time::OffsetDateTime, ) -> Result { let verified_crl = ensure_issuer_crl_verified(issuer_crl_rsync_uri, crl_cache, issuer_ca_der)?; - validate_subordinate_ca_cert_with_prevalidated_issuer( + validate_subordinate_ca_cert_with_prevalidated_issuer_and_resources( child_ca_der, child_ca, issuer_ca, @@ -854,6 +862,7 @@ fn validate_subordinate_ca_cert_with_cached_issuer( issuer_crl_rsync_uri, issuer_effective_ip, issuer_effective_as, + issuer_resources_index, validation_time, ) } @@ -1172,6 +1181,7 @@ fn empty_objects_output() -> crate::validation::objects::ObjectsOutput { crate::validation::objects::ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: crate::validation::objects::ObjectsStats::default(), audit: Vec::new(), @@ -1669,36 +1679,8 @@ fn persist_vcir_for_fresh_result( .map_err(|e| format!("load existing VCIR failed: {e}"))?; store - .put_vcir(&vcir) - .map_err(|e| format!("store VCIR failed: {e}"))?; - - if let Some(previous) = previous.as_ref() { - for output in &previous.local_outputs { - if let Some(kind) = audit_rule_kind_for_output(output.output_type) { - store - .delete_audit_rule_index_entry(kind, &output.rule_hash) - .map_err(|e| format!("delete stale audit rule index failed: {e}"))?; - } - } - } - - for output in &vcir.local_outputs { - let Some(kind) = audit_rule_kind_for_output(output.output_type) else { - continue; - }; - let entry = AuditRuleIndexEntry { - kind, - rule_hash: output.rule_hash.clone(), - manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), - source_object_uri: output.source_object_uri.clone(), - source_object_hash: output.source_object_hash.clone(), - output_id: output.output_id.clone(), - item_effective_until: output.item_effective_until.clone(), - }; - store - .put_audit_rule_index_entry(&entry) - .map_err(|e| format!("store audit rule index failed: {e}"))?; - } + .replace_vcir_and_audit_rule_indexes(previous.as_ref(), &vcir) + .map_err(|e| format!("store VCIR and audit rule index failed: {e}"))?; Ok(()) } @@ -1847,6 +1829,10 @@ fn build_vcir_local_outputs( pack: &PublicationPointSnapshot, objects: &crate::validation::objects::ObjectsOutput, ) -> Result, String> { + if !objects.local_outputs_cache.is_empty() { + return Ok(objects.local_outputs_cache.clone()); + } + let accepted_roa_uris: HashSet<&str> = objects .audit .iter() @@ -2250,12 +2236,6 @@ fn audit_result_to_vcir_status(result: &AuditObjectResult) -> VcirArtifactValida } } -fn audit_rule_kind_for_output(output_type: VcirOutputType) -> Option { - match output_type { - VcirOutputType::Vrp => Some(AuditRuleKind::Roa), - VcirOutputType::Aspa => Some(AuditRuleKind::Aspa), - } -} fn roa_to_vrps_for_vcir(roa: &RoaObject) -> Vec { let asn = roa.roa.as_id; @@ -2298,7 +2278,7 @@ mod tests { use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher}; use crate::storage::{ - PackFile, PackTime, RawByHashEntry, ValidatedCaInstanceResult, ValidatedManifestMeta, + PackFile, PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, ValidatedManifestMeta, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirOutputType, VcirRelatedArtifact, VcirSummary, @@ -2529,6 +2509,64 @@ authorityKeyIdentifier = keyid:always } } + fn cernet_publication_point_snapshot_for_vcir_tests( + ) -> (PublicationPointSnapshot, Vec, time::OffsetDateTime) { + let dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0"); + let rsync_base_uri = "rsync://rpki.cernet.net/repo/cernet/0/"; + let manifest_file = "05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft"; + let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}"); + let manifest_bytes = std::fs::read(dir.join(manifest_file)).expect("read manifest fixture"); + let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode manifest fixture"); + let candidate = manifest.manifest.this_update + time::Duration::seconds(60); + let validation_time = if candidate < manifest.manifest.next_update { + candidate + } else { + manifest.manifest.this_update + }; + + let issuer_ca_der = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join( + "tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer", + ), + ) + .expect("read issuer ca fixture"); + + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let policy = Policy { + sync_preference: crate::policy::SyncPreference::RsyncOnly, + ..Policy::default() + }; + + sync_publication_point( + &store, + &policy, + None, + rsync_base_uri, + &NeverHttpFetcher, + &LocalDirRsyncFetcher::new(&dir), + None, + None, + ) + .expect("sync cernet fixture"); + + let pp = crate::validation::manifest::process_manifest_publication_point( + &store, + &policy, + &manifest_rsync_uri, + rsync_base_uri, + issuer_ca_der.as_slice(), + Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer", + ), + validation_time, + ) + .expect("process manifest publication point"); + + (pp.snapshot, issuer_ca_der, validation_time) + } + fn sample_vcir_for_projection( now: time::OffsetDateTime, child_cert_hash: &str, @@ -2693,6 +2731,327 @@ authorityKeyIdentifier = keyid:always ); } + #[test] + fn build_vcir_local_outputs_prefers_cached_outputs() { + let pack = dummy_pack_with_files(vec![]); + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: vec![1], + ca_certificate_rsync_uri: None, + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + let cached = vec![VcirLocalOutput { + output_id: "cached-output".to_string(), + output_type: VcirOutputType::Vrp, + item_effective_until: pack.next_update.clone(), + source_object_uri: "rsync://example.test/repo/issuer/a.roa".to_string(), + source_object_type: "roa".to_string(), + source_object_hash: sha256_hex(b"cached-roa"), + source_ee_cert_hash: sha256_hex(b"cached-ee"), + payload_json: "{\"asn\":64500}".to_string(), + rule_hash: sha256_hex(b"cached-rule"), + validation_path_hint: vec![pack.manifest_rsync_uri.clone()], + }]; + let outputs = build_vcir_local_outputs( + &ca, + &pack, + &crate::validation::objects::ObjectsOutput { + vrps: Vec::new(), + aspas: Vec::new(), + local_outputs_cache: cached.clone(), + warnings: Vec::new(), + stats: crate::validation::objects::ObjectsStats::default(), + audit: Vec::new(), + }, + ) + .expect("reuse cached outputs"); + assert_eq!(outputs, cached); + } + + #[test] + fn collect_and_persist_vcir_embedded_evidence_for_real_signed_objects() { + let (pack, issuer_ca_der, validation_time) = cernet_publication_point_snapshot_for_vcir_tests(); + let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca"); + let objects = crate::validation::objects::process_publication_point_snapshot_for_issuer( + &pack, + &Policy::default(), + issuer_ca_der.as_slice(), + Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"), + issuer_ca.tbs.extensions.ip_resources.as_ref(), + issuer_ca.tbs.extensions.as_resources.as_ref(), + validation_time, + None, + ); + assert!(!objects.local_outputs_cache.is_empty(), "expected local outputs from signed objects"); + + let evidence = collect_vcir_embedded_evidence(&pack, &objects).expect("collect embedded evidence"); + assert!(evidence.len() >= 2, "expected manifest EE and signed-object EE evidence"); + + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: issuer_ca_der.clone(), + ca_certificate_rsync_uri: Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string(), + ), + effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(), + effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(), + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + persist_vcir_non_repository_evidence(&store, &ca, &evidence) + .expect("persist embedded evidence"); + + let issuer_hash = sha256_hex(&issuer_ca_der); + let issuer_entry = store + .get_raw_by_hash_entry(&issuer_hash) + .expect("load issuer raw entry") + .expect("issuer raw entry present"); + assert!(issuer_entry + .origin_uris + .iter() + .any(|uri| uri.ends_with("BfycW4hQb3wNP4YsiJW-1n6fjro.cer"))); + for entry in &evidence { + assert!(store + .get_raw_by_hash_entry(&entry.raw_entry.sha256_hex) + .expect("load evidence raw entry") + .is_some()); + } + } + + #[test] + fn build_vcir_local_outputs_falls_back_to_decoding_accepted_objects_when_cache_is_empty() { + let (pack, issuer_ca_der, validation_time) = cernet_publication_point_snapshot_for_vcir_tests(); + let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca"); + let objects = crate::validation::objects::process_publication_point_snapshot_for_issuer( + &pack, + &Policy::default(), + issuer_ca_der.as_slice(), + Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"), + issuer_ca.tbs.extensions.ip_resources.as_ref(), + issuer_ca.tbs.extensions.as_resources.as_ref(), + validation_time, + None, + ); + let mut objects_without_cache = objects.clone(); + objects_without_cache.local_outputs_cache.clear(); + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: issuer_ca_der, + ca_certificate_rsync_uri: Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string(), + ), + effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(), + effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(), + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + let local_outputs = build_vcir_local_outputs(&ca, &pack, &objects_without_cache) + .expect("rebuild vcir local outputs"); + assert!(!local_outputs.is_empty()); + assert_eq!(local_outputs.len(), objects.vrps.len()); + assert!(local_outputs + .iter() + .all(|output| output.output_type == VcirOutputType::Vrp)); + } + + #[test] + fn persist_vcir_for_fresh_result_stores_vcir_and_audit_indexes_for_real_snapshot() { + let (pack, issuer_ca_der, validation_time) = cernet_publication_point_snapshot_for_vcir_tests(); + let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca"); + let objects = crate::validation::objects::process_publication_point_snapshot_for_issuer( + &pack, + &Policy::default(), + issuer_ca_der.as_slice(), + Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"), + issuer_ca.tbs.extensions.ip_resources.as_ref(), + issuer_ca.tbs.extensions.as_resources.as_ref(), + validation_time, + None, + ); + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: issuer_ca_der.clone(), + ca_certificate_rsync_uri: Some( + "rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string(), + ), + effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(), + effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(), + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + + persist_vcir_for_fresh_result( + &store, + &ca, + &pack, + &objects, + &[], + &[], + &[], + validation_time, + ) + .expect("persist vcir for fresh result"); + + let vcir = store + .get_vcir(&pack.manifest_rsync_uri) + .expect("get vcir") + .expect("vcir exists"); + assert_eq!(vcir.manifest_rsync_uri, pack.manifest_rsync_uri); + assert_eq!(vcir.summary.local_vrp_count as usize, objects.vrps.len()); + let first_output = vcir.local_outputs.first().expect("local outputs stored"); + assert!(store + .get_audit_rule_index_entry(crate::storage::AuditRuleKind::Roa, &first_output.rule_hash) + .expect("get audit rule index entry") + .is_some()); + } + + #[test] + fn build_vcir_related_artifacts_classifies_snapshot_files_and_audit_statuses() { + let manifest_bytes = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join( + "tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft", + ), + ) + .expect("read manifest fixture"); + let crl_bytes = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join( + "tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.crl", + ), + ) + .expect("read crl fixture"); + let pack = PublicationPointSnapshot { + format_version: PublicationPointSnapshot::FORMAT_VERSION_V1, + manifest_rsync_uri: "rsync://example.test/repo/issuer/issuer.mft".to_string(), + publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(), + manifest_number_be: vec![1], + this_update: PackTime::from_utc_offset_datetime(time::OffsetDateTime::now_utc()), + next_update: PackTime::from_utc_offset_datetime( + time::OffsetDateTime::now_utc() + time::Duration::hours(1), + ), + verified_at: PackTime::from_utc_offset_datetime(time::OffsetDateTime::now_utc()), + manifest_bytes, + files: vec![ + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/issuer.crl", + crl_bytes, + ), + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/child.cer", + vec![1u8, 2], + ), + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/a.roa", + vec![3u8, 4], + ), + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/a.asa", + vec![5u8, 6], + ), + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/a.gbr", + vec![7u8, 8], + ), + PackFile::from_bytes_compute_sha256( + "rsync://example.test/repo/issuer/extra.bin", + vec![9u8], + ), + ], + }; + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: vec![0x11, 0x22], + ca_certificate_rsync_uri: Some("rsync://example.test/repo/issuer/issuer.cer".to_string()), + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: pack.publication_point_rsync_uri.clone(), + manifest_rsync_uri: pack.manifest_rsync_uri.clone(), + publication_point_rsync_uri: pack.publication_point_rsync_uri.clone(), + rrdp_notification_uri: None, + }; + let objects = crate::validation::objects::ObjectsOutput { + vrps: Vec::new(), + aspas: Vec::new(), + local_outputs_cache: Vec::new(), + warnings: Vec::new(), + stats: crate::validation::objects::ObjectsStats::default(), + audit: vec![ + ObjectAuditEntry { + rsync_uri: "rsync://example.test/repo/issuer/a.roa".to_string(), + sha256_hex: sha256_hex_from_32(&pack.files[2].sha256), + kind: AuditObjectKind::Roa, + result: AuditObjectResult::Error, + detail: Some("bad roa".to_string()), + }, + ObjectAuditEntry { + rsync_uri: "rsync://example.test/repo/issuer/a.asa".to_string(), + sha256_hex: sha256_hex_from_32(&pack.files[3].sha256), + kind: AuditObjectKind::Aspa, + result: AuditObjectResult::Skipped, + detail: Some("skipped aspa".to_string()), + }, + ], + }; + let embedded = vec![VcirEmbeddedEvidence { + artifact: VcirRelatedArtifact { + artifact_role: VcirArtifactRole::EeCert, + artifact_kind: VcirArtifactKind::Cer, + uri: None, + sha256: sha256_hex(b"embedded-ee"), + object_type: Some("cer".to_string()), + validation_status: VcirArtifactValidationStatus::Accepted, + }, + raw_entry: embedded_raw_entry(sha256_hex(b"embedded-ee"), vec![1u8, 2, 3]), + }]; + let artifacts = build_vcir_related_artifacts( + &ca, + &pack, + "rsync://example.test/repo/issuer/issuer.crl", + &objects, + &[], + &embedded, + ); + assert!(artifacts.iter().any(|artifact| artifact.artifact_role == VcirArtifactRole::Manifest)); + assert!(artifacts.iter().any(|artifact| artifact.artifact_role == VcirArtifactRole::TrustAnchorCert)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/issuer.crl") + && artifact.artifact_role == VcirArtifactRole::CurrentCrl)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/child.cer") + && artifact.artifact_role == VcirArtifactRole::ChildCaCert)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/a.roa") + && artifact.validation_status == VcirArtifactValidationStatus::Rejected)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/a.asa") + && artifact.validation_status == VcirArtifactValidationStatus::WarningOnly)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/a.gbr") + && artifact.artifact_kind == VcirArtifactKind::Gbr)); + assert!(artifacts.iter().any(|artifact| artifact.uri.as_deref() == Some("rsync://example.test/repo/issuer/extra.bin") + && artifact.artifact_kind == VcirArtifactKind::Other)); + assert!(artifacts.iter().any(|artifact| artifact.uri.is_none() && artifact.sha256 == sha256_hex(b"embedded-ee"))); + } + #[test] fn select_issuer_crl_from_snapshot_reports_missing_crldp_for_self_signed_cert() { let ta_der = std::fs::read( @@ -3207,6 +3566,7 @@ authorityKeyIdentifier = keyid:always let objects = crate::validation::objects::ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: crate::validation::objects::ObjectsStats::default(), audit: Vec::new(), @@ -3267,6 +3627,7 @@ authorityKeyIdentifier = keyid:always let objects = crate::validation::objects::ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: crate::validation::objects::ObjectsStats::default(), audit: vec![ObjectAuditEntry { @@ -3872,6 +4233,7 @@ authorityKeyIdentifier = keyid:always let objects = crate::validation::objects::ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: vec![Warning::new("objects warning")], stats: crate::validation::objects::ObjectsStats::default(), audit: vec![ObjectAuditEntry { @@ -3956,6 +4318,7 @@ authorityKeyIdentifier = keyid:always &crate::validation::objects::ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: vec![Warning::new("object warning")], stats: crate::validation::objects::ObjectsStats::default(), audit: Vec::new(), diff --git a/tests/test_cert_path_key_usage.rs b/tests/test_cert_path_key_usage.rs index da643e6..bf9e5b0 100644 --- a/tests/test_cert_path_key_usage.rs +++ b/tests/test_cert_path_key_usage.rs @@ -1,6 +1,8 @@ use std::process::Command; -use rpki::validation::cert_path::{CertPathError, validate_ee_cert_path}; +use rpki::validation::cert_path::{ + CertPathError, validate_ee_cert_path, validate_ee_cert_path_with_predecoded_ee, +}; fn openssl_available() -> bool { Command::new("openssl") @@ -295,6 +297,57 @@ fn validate_ee_cert_path_with_prevalidated_issuer_covers_success_and_error_paths assert!(matches!(err, CertPathError::EeRevoked), "{err}"); } +#[test] +fn validate_ee_cert_path_with_predecoded_ee_matches_prevalidated_path_rules() { + use rpki::data_model::common::BigUnsigned; + use rpki::data_model::crl::RpkixCrl; + use rpki::data_model::rc::ResourceCertificate; + use std::collections::HashSet; + use x509_parser::prelude::FromDer; + use x509_parser::x509::SubjectPublicKeyInfo; + + let g = generate_issuer_ca_ee_and_crl( + "keyUsage = critical, digitalSignature +", + ); + let issuer = ResourceCertificate::decode_der(&g.issuer_ca_der).expect("decode issuer"); + let ee = ResourceCertificate::decode_der(&g.ee_der).expect("decode ee"); + let issuer_crl = RpkixCrl::decode_der(&g.issuer_crl_der).expect("decode crl"); + let (rem, issuer_spki) = SubjectPublicKeyInfo::from_der(&issuer.tbs.subject_public_key_info) + .expect("parse issuer spki"); + assert!(rem.is_empty()); + let now = time::OffsetDateTime::now_utc(); + + validate_ee_cert_path_with_predecoded_ee( + &ee, + &g.ee_der, + &issuer, + &issuer_spki, + &issuer_crl, + &HashSet::new(), + Some("rsync://example.test/repo/issuer/issuer.cer"), + Some("rsync://example.test/repo/issuer/issuer.crl"), + now, + ) + .expect("predecoded ee path ok"); + + let mut revoked = HashSet::new(); + revoked.insert(BigUnsigned::from_biguint(&ee.tbs.serial_number).bytes_be); + let err = validate_ee_cert_path_with_predecoded_ee( + &ee, + &g.ee_der, + &issuer, + &issuer_spki, + &issuer_crl, + &revoked, + Some("rsync://example.test/repo/issuer/issuer.cer"), + Some("rsync://example.test/repo/issuer/issuer.crl"), + now, + ) + .unwrap_err(); + assert!(matches!(err, CertPathError::EeRevoked), "{err}"); +} + #[test] fn validate_ee_cert_path_with_prevalidated_issuer_rejects_non_ee_and_non_ca_issuer() { use rpki::data_model::crl::RpkixCrl; diff --git a/tests/test_multi_rir_case_info.rs b/tests/test_multi_rir_case_info.rs index 8b40f5e..4cda44e 100644 --- a/tests/test_multi_rir_case_info.rs +++ b/tests/test_multi_rir_case_info.rs @@ -1,3 +1,4 @@ +use std::fs; use std::process::Command; fn multi_rir_bundle_root() -> std::path::PathBuf { @@ -71,6 +72,94 @@ fn multi_rir_case_info_resolves_all_five_rirs_and_timings() { } } +#[test] +fn multi_rir_case_info_prefers_lock_validation_time_over_replay_started_at() { + let td = tempfile::tempdir().expect("tempdir"); + let bundle_root = td.path(); + let rir_root = bundle_root.join("apnic"); + fs::create_dir_all(rir_root.join("base-payload-archive")).expect("base archive dir"); + fs::create_dir_all(rir_root.join("payload-delta-archive")).expect("delta archive dir"); + fs::create_dir_all(rir_root.join("timings")).expect("timings dir"); + + fs::write( + bundle_root.join("timing-summary.json"), + r#"{"apnic":{"durations":{"base-replay":1.5,"delta-replay":2.5}}}"#, + ) + .expect("write timing summary"); + fs::write( + rir_root.join("base-locks.json"), + r#"{"version":1,"capture":"base-cap","validationTime":"2026-03-16T11:49:15+08:00","rrdp":{},"rsync":{}}"#, + ) + .expect("write base locks"); + fs::write( + rir_root.join("locks-delta.json"), + r#"{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"deadbeef","validationTime":"2026-03-16T12:14:10+08:00","rrdp":{},"rsync":{}}"#, + ) + .expect("write delta locks"); + fs::write( + rir_root.join("timings/base-replay.json"), + r#"{"startedAt":"2099-01-01T00:00:00Z","durationSeconds":1.5}"#, + ) + .expect("write base timing"); + fs::write( + rir_root.join("timings/delta-replay.json"), + r#"{"startedAt":"2099-01-02T00:00:00Z","durationSeconds":2.5}"#, + ) + .expect("write delta timing"); + for rel in [ + "base-vrps.csv", + "record-delta.csv", + "replay-delta.csv", + "verification.json", + "README.md", + ] { + fs::write(rir_root.join(rel), "placeholder +").expect("write required file"); + } + + let repo_root = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let out = Command::new("python3") + .arg(helper_script()) + .args([ + "--bundle-root", + bundle_root.to_string_lossy().as_ref(), + "--repo-root", + repo_root.to_string_lossy().as_ref(), + "--rir", + "apnic", + ]) + .output() + .expect("run helper script"); + + assert!( + out.status.success(), + "helper failed: status={} +stdout={} +stderr={}", + out.status, + String::from_utf8_lossy(&out.stdout), + String::from_utf8_lossy(&out.stderr) + ); + + let json: serde_json::Value = serde_json::from_slice(&out.stdout).expect("parse helper json"); + assert_eq!( + json["validation_times"]["snapshot"].as_str(), + Some("2026-03-16T11:49:15+08:00") + ); + assert_eq!( + json["validation_times"]["delta"].as_str(), + Some("2026-03-16T12:14:10+08:00") + ); + assert_eq!( + json["timing_started_at"]["snapshot_replay"].as_str(), + Some("2099-01-01T00:00:00Z") + ); + assert_eq!( + json["timing_started_at"]["delta_replay"].as_str(), + Some("2099-01-02T00:00:00Z") + ); +} + #[test] fn multi_rir_wrapper_describe_mode_works_for_ripe() { let bundle_root = multi_rir_bundle_root(); diff --git a/tests/test_objects_process_publication_point_snapshot.rs b/tests/test_objects_process_publication_point_snapshot.rs index 4d1241d..c093add 100644 --- a/tests/test_objects_process_publication_point_snapshot.rs +++ b/tests/test_objects_process_publication_point_snapshot.rs @@ -1,6 +1,6 @@ use rpki::fetch::rsync::LocalDirRsyncFetcher; use rpki::policy::{Policy, SignedObjectFailurePolicy, SyncPreference}; -use rpki::storage::{PackFile, PackTime, RocksStore}; +use rpki::storage::{PackFile, PackTime, RocksStore, VcirOutputType}; use rpki::sync::repo::sync_publication_point; use rpki::sync::rrdp::Fetcher; use rpki::validation::manifest::process_manifest_publication_point; @@ -403,4 +403,45 @@ fn process_snapshot_for_issuer_drop_publication_point_on_invalid_aspa_bytes() { assert!(!out.warnings.is_empty()); } + +#[test] +fn process_snapshot_for_issuer_populates_local_outputs_cache_from_real_cernet_fixture() { + let (dir, rsync_base_uri, manifest_file) = cernet_fixture(); + let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}"); + let validation_time = validation_time_from_manifest_fixture(&dir, &manifest_file); + + let pack = build_publication_point_snapshot_from_local_rsync_fixture( + &dir, + &rsync_base_uri, + &manifest_rsync_uri, + validation_time, + ); + + let issuer_ca_der = issuer_ca_fixture(); + let issuer_ca = rpki::data_model::rc::ResourceCertificate::decode_der(&issuer_ca_der) + .expect("decode issuer ca"); + + let out = process_publication_point_snapshot_for_issuer( + &pack, + &Policy::default(), + &issuer_ca_der, + Some(issuer_ca_rsync_uri()), + issuer_ca.tbs.extensions.ip_resources.as_ref(), + issuer_ca.tbs.extensions.as_resources.as_ref(), + validation_time, + None, + ); + + assert!(!out.local_outputs_cache.is_empty(), "expected cached VCIR local outputs"); + assert_eq!(out.local_outputs_cache.len(), out.vrps.len()); + assert!(out + .local_outputs_cache + .iter() + .all(|entry| entry.output_type == VcirOutputType::Vrp)); + assert!(out + .local_outputs_cache + .iter() + .all(|entry| entry.source_object_type == "roa")); +} + // NOTE: DN-based issuer resolution and pack-local CA indexing have been removed for determinism. diff --git a/tests/test_payload_replay_tools.rs b/tests/test_payload_replay_tools.rs index 880a454..aa4a301 100644 --- a/tests/test_payload_replay_tools.rs +++ b/tests/test_payload_replay_tools.rs @@ -284,3 +284,28 @@ fn write_multi_rir_summary_aggregates_case_reports() { assert!(md.contains("Multi-RIR Replay Summary"), "{md}"); assert!(md.contains("| afrinic | true | 10.000 | 5.000 | 2.000 | true | 12.000 | 6.000 | 3.000 |"), "{md}"); } + +#[test] +fn apnic_snapshot_profile_script_dry_run_builds_command() { + let script = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("scripts/payload_replay/run_apnic_snapshot_replay_profile.sh"); + let out = Command::new("bash") + .env("DRY_RUN", "1") + .arg(&script) + .output() + .expect("run profile script dry-run"); + + assert!( + out.status.success(), + "script failed: status={}\nstdout={}\nstderr={}", + out.status, + String::from_utf8_lossy(&out.stdout), + String::from_utf8_lossy(&out.stderr) + ); + + let stdout = String::from_utf8_lossy(&out.stdout); + assert!(stdout.contains("--payload-replay-archive"), "{stdout}"); + assert!(stdout.contains("--payload-replay-locks"), "{stdout}"); + assert!(stdout.contains("--analyze"), "{stdout}"); + assert!(stdout.contains("--profile-cpu"), "{stdout}"); +} diff --git a/tests/test_tree_failure_handling.rs b/tests/test_tree_failure_handling.rs index b48af6d..483895a 100644 --- a/tests/test_tree_failure_handling.rs +++ b/tests/test_tree_failure_handling.rs @@ -112,6 +112,7 @@ fn tree_continues_when_a_publication_point_fails() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -136,6 +137,7 @@ fn tree_continues_when_a_publication_point_fails() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), diff --git a/tests/test_tree_traversal_m14.rs b/tests/test_tree_traversal_m14.rs index c39bc28..c2329a9 100644 --- a/tests/test_tree_traversal_m14.rs +++ b/tests/test_tree_traversal_m14.rs @@ -122,6 +122,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -142,6 +143,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -162,6 +164,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -182,6 +185,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -239,6 +243,7 @@ fn tree_respects_max_depth_and_max_instances() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -259,6 +264,7 @@ fn tree_respects_max_depth_and_max_instances() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -308,6 +314,7 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -328,6 +335,7 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() { objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -380,6 +388,7 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(), @@ -400,6 +409,7 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que objects: ObjectsOutput { vrps: Vec::new(), aspas: Vec::new(), + local_outputs_cache: Vec::new(), warnings: Vec::new(), stats: ObjectsStats::default(), audit: Vec::new(),