20260428 降低all5 CIR replay内存峰值

This commit is contained in:
yuyr 2026-04-28 09:58:43 +08:00
parent 0295fd3262
commit 3b2a160c5c
23 changed files with 965 additions and 142 deletions

View File

@ -10,7 +10,11 @@ Usage:
--out-dir <path> \
--reference-ccr <path> \
[--keep-db] \
[--write-actual-ccr] \
[--write-report-json] \
[--report-json-compact] \
[--phase2-object-workers <n>] \
[--phase2-worker-queue-capacity <n>] \
[--rpki-bin <path>] \
[--real-rsync-bin <path>]
EOF
@ -23,7 +27,11 @@ REPO_BYTES_DB=""
OUT_DIR=""
REFERENCE_CCR=""
KEEP_DB=0
WRITE_ACTUAL_CCR=0
WRITE_REPORT_JSON=0
REPORT_JSON_COMPACT=0
PHASE2_OBJECT_WORKERS="${CIR_REPLAY_PHASE2_OBJECT_WORKERS:-4}"
PHASE2_WORKER_QUEUE_CAPACITY="${CIR_REPLAY_PHASE2_WORKER_QUEUE_CAPACITY:-64}"
RPKI_BIN="${RPKI_BIN:-$ROOT_DIR/target/release/rpki}"
CIR_MATERIALIZE_BIN="${CIR_MATERIALIZE_BIN:-$ROOT_DIR/target/release/cir_materialize}"
CIR_EXTRACT_INPUTS_BIN="${CIR_EXTRACT_INPUTS_BIN:-$ROOT_DIR/target/release/cir_extract_inputs}"
@ -38,7 +46,11 @@ while [[ $# -gt 0 ]]; do
--out-dir) OUT_DIR="$2"; shift 2 ;;
--reference-ccr) REFERENCE_CCR="$2"; shift 2 ;;
--keep-db) KEEP_DB=1; shift ;;
--report-json-compact) REPORT_JSON_COMPACT=1; shift ;;
--write-actual-ccr) WRITE_ACTUAL_CCR=1; shift ;;
--write-report-json) WRITE_REPORT_JSON=1; shift ;;
--report-json-compact) WRITE_REPORT_JSON=1; REPORT_JSON_COMPACT=1; shift ;;
--phase2-object-workers) PHASE2_OBJECT_WORKERS="$2"; shift 2 ;;
--phase2-worker-queue-capacity) PHASE2_WORKER_QUEUE_CAPACITY="$2"; shift 2 ;;
--rpki-bin) RPKI_BIN="$2"; shift 2 ;;
--real-rsync-bin) REAL_RSYNC_BIN="$2"; shift 2 ;;
-h|--help) usage; exit 0 ;;
@ -53,7 +65,13 @@ done
mkdir -p "$OUT_DIR"
needs_build=0
if [[ ! -x "$RPKI_BIN" || ! -x "$CIR_MATERIALIZE_BIN" || ! -x "$CIR_EXTRACT_INPUTS_BIN" || ! -x "$CCR_TO_COMPARE_VIEWS_BIN" ]]; then
needs_build=1
elif [[ "$RPKI_BIN" == "$ROOT_DIR/target/release/rpki" ]] && find "$ROOT_DIR/src" "$ROOT_DIR/Cargo.toml" -newer "$RPKI_BIN" -print -quit | grep -q .; then
needs_build=1
fi
if [[ "$needs_build" -eq 1 ]]; then
(
cd "$ROOT_DIR"
cargo build --release --bin rpki --bin cir_materialize --bin cir_extract_inputs --bin ccr_to_compare_views
@ -65,6 +83,8 @@ TALS_DIR="$TMP_ROOT/tals"
META_JSON="$TMP_ROOT/meta.json"
MIRROR_ROOT="$TMP_ROOT/mirror"
DB_DIR="$TMP_ROOT/work-db"
REPLAY_RAW_STORE_DB="$TMP_ROOT/replay-raw-store.db"
REPLAY_REPO_BYTES_DB="$TMP_ROOT/replay-repo-bytes.db"
ACTUAL_CCR="$OUT_DIR/actual.ccr"
ACTUAL_REPORT="$OUT_DIR/report.json"
ACTUAL_VRPS="$OUT_DIR/actual-vrps.csv"
@ -109,52 +129,117 @@ PY
export REAL_RSYNC_BIN="$REAL_RSYNC_BIN"
export CIR_LOCAL_LINK_MODE=1
REPORT_JSON_ARGS=(--report-json "$ACTUAL_REPORT")
if [[ "$REPORT_JSON_COMPACT" -eq 1 ]]; then
REPORT_JSON_ARGS=(--skip-report-build)
VCIR_ARGS=(--skip-vcir-persist)
if [[ "$WRITE_REPORT_JSON" -eq 1 ]]; then
REPORT_JSON_ARGS=(--report-json "$ACTUAL_REPORT")
if [[ "$REPORT_JSON_COMPACT" -eq 1 ]]; then
REPORT_JSON_ARGS+=(--report-json-compact)
fi
fi
CCR_ARGS=()
if [[ "$WRITE_ACTUAL_CCR" -eq 1 ]]; then
CCR_ARGS=(--ccr-out "$ACTUAL_CCR")
fi
"$RPKI_BIN" \
--db "$DB_DIR" \
--raw-store-db "$REPLAY_RAW_STORE_DB" \
--repo-bytes-db "$REPLAY_REPO_BYTES_DB" \
"${TAL_ARGS[@]}" \
--parallel-phase2-object-workers "$PHASE2_OBJECT_WORKERS" \
--parallel-phase2-worker-queue-capacity "$PHASE2_WORKER_QUEUE_CAPACITY" \
--disable-rrdp \
--rsync-command "$WRAPPER" \
--validation-time "$VALIDATION_TIME" \
--ccr-out "$ACTUAL_CCR" \
"${CCR_ARGS[@]}" \
--vrps-csv-out "$ACTUAL_VRPS" \
--vaps-csv-out "$ACTUAL_VAPS" \
--compare-view-trust-anchor unknown \
"${VCIR_ARGS[@]}" \
"${REPORT_JSON_ARGS[@]}" \
>"$RUN_LOG" 2>&1
"$CCR_TO_COMPARE_VIEWS_BIN" --ccr "$ACTUAL_CCR" --vrps-out "$ACTUAL_VRPS" --vaps-out "$ACTUAL_VAPS" --trust-anchor unknown
sort_compare_csv() {
local path="$1"
local tmp="${path}.sorted.tmp"
{
head -n 1 "$path"
tail -n +2 "$path" | LC_ALL=C sort -u
} >"$tmp"
mv "$tmp" "$path"
}
sort_compare_csv "$ACTUAL_VRPS"
sort_compare_csv "$ACTUAL_VAPS"
"$CCR_TO_COMPARE_VIEWS_BIN" --ccr "$REFERENCE_CCR" --vrps-out "$REF_VRPS" --vaps-out "$REF_VAPS" --trust-anchor unknown
python3 - <<'PY' "$ACTUAL_VRPS" "$REF_VRPS" "$ACTUAL_VAPS" "$REF_VAPS" "$COMPARE_JSON" "$META_JSON"
python3 - <<'PY' "$ACTUAL_VRPS" "$REF_VRPS" "$ACTUAL_VAPS" "$REF_VAPS" "$COMPARE_JSON" "$META_JSON" "$WRITE_REPORT_JSON" "$WRITE_ACTUAL_CCR" "$PHASE2_OBJECT_WORKERS" "$PHASE2_WORKER_QUEUE_CAPACITY"
import csv, json, sys
def rows(path):
with open(path, newline="") as f:
return list(csv.reader(f))[1:]
actual_vrps = {tuple(r) for r in rows(sys.argv[1])}
ref_vrps = {tuple(r) for r in rows(sys.argv[2])}
actual_vaps = {tuple(r) for r in rows(sys.argv[3])}
ref_vaps = {tuple(r) for r in rows(sys.argv[4])}
def next_row(reader):
try:
return tuple(next(reader))
except StopIteration:
return None
def compare_sorted_csv(actual_path, ref_path):
actual_count = 0
ref_count = 0
only_actual_count = 0
only_ref_count = 0
only_actual_sample = []
only_ref_sample = []
with open(actual_path, newline="") as actual_file, open(ref_path, newline="") as ref_file:
actual_reader = csv.reader(actual_file)
ref_reader = csv.reader(ref_file)
next(actual_reader, None)
next(ref_reader, None)
actual = next_row(actual_reader)
ref = next_row(ref_reader)
while actual is not None or ref is not None:
if ref is None or (actual is not None and actual < ref):
actual_count += 1
only_actual_count += 1
if len(only_actual_sample) < 20:
only_actual_sample.append(list(actual))
actual = next_row(actual_reader)
elif actual is None or ref < actual:
ref_count += 1
only_ref_count += 1
if len(only_ref_sample) < 20:
only_ref_sample.append(list(ref))
ref = next_row(ref_reader)
else:
actual_count += 1
ref_count += 1
actual = next_row(actual_reader)
ref = next_row(ref_reader)
return {
"actual": actual_count,
"reference": ref_count,
"only_in_actual": only_actual_sample,
"only_in_reference": only_ref_sample,
"only_in_actual_count": only_actual_count,
"only_in_reference_count": only_ref_count,
"match": only_actual_count == 0 and only_ref_count == 0,
}
vrps = compare_sorted_csv(sys.argv[1], sys.argv[2])
vaps = compare_sorted_csv(sys.argv[3], sys.argv[4])
meta = json.load(open(sys.argv[6], encoding="utf-8"))
summary = {
"compareMode": "trust-anchor-agnostic",
"talCount": len(meta["talFiles"]),
"talPaths": [item["path"] for item in meta["talFiles"]],
"vrps": {
"actual": len(actual_vrps),
"reference": len(ref_vrps),
"only_in_actual": sorted(actual_vrps - ref_vrps)[:20],
"only_in_reference": sorted(ref_vrps - actual_vrps)[:20],
"match": actual_vrps == ref_vrps,
"actualCcrWritten": sys.argv[8] == "1",
"reportJsonWritten": sys.argv[7] == "1",
"replayParallelism": {
"phase2ObjectWorkers": int(sys.argv[9]),
"phase2WorkerQueueCapacity": int(sys.argv[10]),
},
"vaps": {
"actual": len(actual_vaps),
"reference": len(ref_vaps),
"only_in_actual": sorted(actual_vaps - ref_vaps)[:20],
"only_in_reference": sorted(ref_vaps - actual_vaps)[:20],
"match": actual_vaps == ref_vaps,
}
"vrps": vrps,
"vaps": vaps,
}
with open(sys.argv[5], "w") as f:
json.dump(summary, f, indent=2)

View File

@ -170,6 +170,9 @@ fn real_main() -> Result<(), String> {
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base replay failed for {rir}: {e}"))?;
@ -221,6 +224,9 @@ fn real_main() -> Result<(), String> {
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("delta step replay failed for {rir}/{step_id}: {e}"))?;

View File

@ -202,6 +202,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live base run failed: {e}"))?;
@ -262,6 +265,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("self replay failed: {e}"))?;

View File

@ -249,6 +249,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base bootstrap replay failed: {e}"))?;
@ -279,6 +282,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live target run failed: {e}"))?;
@ -344,6 +350,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("self delta replay failed: {e}"))?;

View File

@ -368,6 +368,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live base run failed: {e}"))?;
@ -462,6 +465,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base self replay failed: {e}"))?;
@ -517,6 +523,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("sequence base self replay failed: {e}"))?;
@ -567,6 +576,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("live delta step {step_id} failed: {e}"))?;
@ -666,6 +678,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("sequence self replay failed for {step_id}: {e}"))?;

View File

@ -369,6 +369,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base replay failed: {e}"))?;
@ -549,6 +552,9 @@ fn run(args: Args) -> Result<PathBuf, String> {
&TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("delta replay failed: {e}"))?;

View File

@ -848,6 +848,9 @@ fn real_main() -> Result<(), String> {
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("base replay failed: {e}"))?;
@ -926,6 +929,9 @@ fn real_main() -> Result<(), String> {
&TreeRunConfig {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.map_err(|e| format!("delta step {} replay failed: {e}", step.id))?;

View File

@ -24,7 +24,7 @@ fn normalize_asn(asn: u32) -> String {
format!("AS{asn}")
}
fn canonical_prefix(prefix: &crate::data_model::roa::IpPrefix) -> String {
pub fn canonical_vrp_prefix(prefix: &crate::data_model::roa::IpPrefix) -> String {
let mut addr = prefix.addr_bytes().to_vec();
let total_bits = match prefix.afi {
crate::data_model::roa::RoaAfi::Ipv4 => 32usize,
@ -54,7 +54,7 @@ pub fn build_vrp_compare_rows(vrps: &[Vrp], trust_anchor: &str) -> BTreeSet<VrpC
vrps.iter()
.map(|vrp| VrpCompareRow {
asn: normalize_asn(vrp.asn),
ip_prefix: canonical_prefix(&vrp.prefix),
ip_prefix: canonical_vrp_prefix(&vrp.prefix),
max_length: vrp.max_length.to_string(),
trust_anchor: trust_anchor.to_ascii_lowercase(),
})

View File

@ -5,7 +5,7 @@ pub mod spec;
pub use compare_view::{
VapCompareRow, VrpCompareRow, build_vap_compare_rows, build_vrp_compare_rows,
decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
canonical_vrp_prefix, decode_ccr_compare_views, write_vap_csv, write_vrp_csv,
};
pub use live_capture::{
LiveBaseCaptureSummary, LiveDeltaCaptureSummary, RecordedHttpResponse, RecordedRsyncFetch,

View File

@ -10,6 +10,7 @@ use crate::audit::{
AspaOutput, AuditRepoSyncStats, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary,
VrpOutput, format_roa_ip_prefix,
};
use crate::bundle::canonical_vrp_prefix;
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
@ -38,6 +39,8 @@ struct RunStageTiming {
ccr_build_ms: Option<u64>,
ccr_build_breakdown: Option<CcrBuildBreakdown>,
ccr_write_ms: Option<u64>,
compare_view_build_ms: Option<u64>,
compare_view_write_ms: Option<u64>,
cir_build_cir_ms: Option<u64>,
cir_write_cir_ms: Option<u64>,
cir_total_ms: Option<u64>,
@ -69,7 +72,12 @@ pub struct CliArgs {
pub policy_path: Option<PathBuf>,
pub report_json_path: Option<PathBuf>,
pub report_json_compact: bool,
pub skip_report_build: bool,
pub skip_vcir_persist: bool,
pub ccr_out_path: Option<PathBuf>,
pub vrps_csv_out_path: Option<PathBuf>,
pub vaps_csv_out_path: Option<PathBuf>,
pub compare_view_trust_anchor: Option<String>,
pub cir_enabled: bool,
pub cir_out_path: Option<PathBuf>,
pub cir_static_root: Option<PathBuf>,
@ -114,7 +122,13 @@ Options:
--policy <path> Policy TOML path (optional)
--report-json <path> Write full audit report as JSON (optional)
--report-json-compact Write report JSON without pretty-printing (requires --report-json)
--skip-report-build Skip full audit report construction when --report-json is not requested
--skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs
--ccr-out <path> Write CCR DER ContentInfo to this path (optional)
--vrps-csv-out <path> Write VRP compare-view CSV directly from validation output (optional; requires --vaps-csv-out)
--vaps-csv-out <path> Write VAP compare-view CSV directly from validation output (optional; requires --vrps-csv-out)
--compare-view-trust-anchor <name>
Trust-anchor label used by direct compare-view CSV output (default: unknown)
--cir-enable Export CIR after the run completes
--cir-out <path> Write CIR DER to this path (requires --cir-enable)
--cir-static-root <path> Deprecated; CIR export no longer exports object pools
@ -171,7 +185,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut policy_path: Option<PathBuf> = None;
let mut report_json_path: Option<PathBuf> = None;
let mut report_json_compact: bool = false;
let mut skip_report_build: bool = false;
let mut skip_vcir_persist: bool = false;
let mut ccr_out_path: Option<PathBuf> = None;
let mut vrps_csv_out_path: Option<PathBuf> = None;
let mut vaps_csv_out_path: Option<PathBuf> = None;
let mut compare_view_trust_anchor: Option<String> = None;
let mut cir_enabled: bool = false;
let mut cir_out_path: Option<PathBuf> = None;
let mut cir_static_root: Option<PathBuf> = None;
@ -291,11 +310,34 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--report-json-compact" => {
report_json_compact = true;
}
"--skip-report-build" => {
skip_report_build = true;
}
"--skip-vcir-persist" => {
skip_vcir_persist = true;
}
"--ccr-out" => {
i += 1;
let v = argv.get(i).ok_or("--ccr-out requires a value")?;
ccr_out_path = Some(PathBuf::from(v));
}
"--vrps-csv-out" => {
i += 1;
let v = argv.get(i).ok_or("--vrps-csv-out requires a value")?;
vrps_csv_out_path = Some(PathBuf::from(v));
}
"--vaps-csv-out" => {
i += 1;
let v = argv.get(i).ok_or("--vaps-csv-out requires a value")?;
vaps_csv_out_path = Some(PathBuf::from(v));
}
"--compare-view-trust-anchor" => {
i += 1;
let v = argv
.get(i)
.ok_or("--compare-view-trust-anchor requires a value")?;
compare_view_trust_anchor = Some(v.clone());
}
"--cir-enable" => {
cir_enabled = true;
}
@ -487,6 +529,24 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage()
));
}
if skip_report_build && report_json_path.is_some() {
return Err(format!(
"--skip-report-build cannot be combined with --report-json\n\n{}",
usage()
));
}
if vrps_csv_out_path.is_some() != vaps_csv_out_path.is_some() {
return Err(format!(
"--vrps-csv-out and --vaps-csv-out must be provided together\n\n{}",
usage()
));
}
if compare_view_trust_anchor.is_some() && vrps_csv_out_path.is_none() {
return Err(format!(
"--compare-view-trust-anchor requires --vrps-csv-out/--vaps-csv-out\n\n{}",
usage()
));
}
if cir_static_root.is_some() {
return Err(format!(
"--cir-static-root is no longer supported; CIR export now writes only .cir files\n\n{}",
@ -632,7 +692,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
policy_path,
report_json_path,
report_json_compact,
skip_report_build,
skip_vcir_persist,
ccr_out_path,
vrps_csv_out_path,
vaps_csv_out_path,
compare_view_trust_anchor,
cir_enabled,
cir_out_path,
cir_static_root,
@ -688,10 +753,12 @@ fn write_json(path: &Path, report: &AuditReportV2, format: ReportJsonFormat) ->
Ok(())
}
fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
fn unique_rrdp_repos_from_publication_points(
publication_points: &[crate::audit::PublicationPointAudit],
) -> usize {
use std::collections::HashSet;
let mut set: HashSet<&str> = HashSet::new();
for pp in &report.publication_points {
for pp in publication_points {
if let Some(u) = pp.rrdp_notification_uri.as_deref() {
set.insert(u);
}
@ -699,6 +766,10 @@ fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
set.len()
}
fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
unique_rrdp_repos_from_publication_points(&report.publication_points)
}
fn print_summary(report: &AuditReportV2) {
let rrdp_repos = unique_rrdp_repos(report);
println!("RPKI stage2 serial run summary");
@ -728,6 +799,37 @@ fn print_summary(report: &AuditReportV2) {
);
}
fn print_summary_from_shared(validation_time: time::OffsetDateTime, shared: &PostValidationShared) {
use time::format_description::well_known::Rfc3339;
let validation_time_rfc3339_utc = validation_time
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.expect("format validation_time");
let rrdp_repos = unique_rrdp_repos_from_publication_points(shared.publication_points.as_ref());
println!("RPKI stage2 serial run summary");
println!("validation_time={validation_time_rfc3339_utc}");
println!(
"publication_points_processed={} publication_points_failed={}",
shared.instances_processed, shared.instances_failed
);
println!("rrdp_repos_unique={rrdp_repos}");
println!("vrps={}", shared.vrps.len());
println!("aspas={}", shared.aspas.len());
println!(
"audit_publication_points={}",
shared.publication_points.len()
);
println!(
"warnings_total={}",
shared.tree_warnings.len()
+ shared
.publication_points
.iter()
.map(|pp| pp.warnings.len())
.sum::<usize>()
);
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PostValidationShared {
discovery: crate::validation::from_tal::DiscoveredRootCaInstance,
@ -861,11 +963,21 @@ fn build_report(
#[derive(Clone, Debug, PartialEq, Eq)]
struct ReportTaskOutput {
report: AuditReportV2,
report: Option<AuditReportV2>,
report_build_ms: u64,
report_write_ms: Option<u64>,
}
impl ReportTaskOutput {
fn skipped() -> Self {
Self {
report: None,
report_build_ms: 0,
report_write_ms: None,
}
}
}
fn run_report_task(
policy: &Policy,
validation_time: time::OffsetDateTime,
@ -886,7 +998,7 @@ fn run_report_task(
};
Ok(ReportTaskOutput {
report,
report: Some(report),
report_build_ms,
report_write_ms,
})
@ -950,6 +1062,98 @@ fn run_ccr_task(
})
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct CompareViewTaskOutput {
build_ms: Option<u64>,
write_ms: Option<u64>,
}
fn run_compare_view_task(
shared: &PostValidationShared,
vrps_csv_out_path: Option<&Path>,
vaps_csv_out_path: Option<&Path>,
trust_anchor: &str,
) -> Result<CompareViewTaskOutput, String> {
let mut build_ms = None;
let mut write_ms = None;
if let (Some(vrps_path), Some(vaps_path)) = (vrps_csv_out_path, vaps_csv_out_path) {
let started = std::time::Instant::now();
build_ms = Some(0);
write_direct_vrp_csv(vrps_path, shared.vrps.as_ref(), trust_anchor)?;
write_direct_vap_csv(vaps_path, shared.aspas.as_ref(), trust_anchor)?;
write_ms = Some(started.elapsed().as_millis() as u64);
eprintln!(
"wrote compare views: vrps={} vaps={}",
vrps_path.display(),
vaps_path.display()
);
}
Ok(CompareViewTaskOutput { build_ms, write_ms })
}
fn write_direct_vrp_csv(
path: &Path,
vrps: &[crate::validation::objects::Vrp],
trust_anchor: &str,
) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?;
}
let file = std::fs::File::create(path)
.map_err(|e| format!("create file failed: {}: {e}", path.display()))?;
let mut writer = BufWriter::new(file);
use std::io::Write;
let trust_anchor = trust_anchor.to_ascii_lowercase();
writeln!(writer, "ASN,IP Prefix,Max Length,Trust Anchor").map_err(|e| e.to_string())?;
for vrp in vrps {
writeln!(
writer,
"AS{},{},{},{}",
vrp.asn,
canonical_vrp_prefix(&vrp.prefix),
vrp.max_length,
trust_anchor
)
.map_err(|e| e.to_string())?;
}
Ok(())
}
fn write_direct_vap_csv(
path: &Path,
aspas: &[crate::validation::objects::AspaAttestation],
trust_anchor: &str,
) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?;
}
let file = std::fs::File::create(path)
.map_err(|e| format!("create file failed: {}: {e}", path.display()))?;
let mut writer = BufWriter::new(file);
use std::io::Write;
let trust_anchor = trust_anchor.to_ascii_lowercase();
writeln!(writer, "Customer ASN,Providers,Trust Anchor").map_err(|e| e.to_string())?;
for aspa in aspas {
let mut providers = aspa.provider_as_ids.clone();
providers.sort_unstable();
providers.dedup();
let providers = providers
.into_iter()
.map(|asn| format!("AS{asn}"))
.collect::<Vec<_>>()
.join(";");
writeln!(
writer,
"AS{},{},{}",
aspa.customer_as_id, providers, trust_anchor
)
.map_err(|e| e.to_string())?;
}
Ok(())
}
fn write_stage_timing(
report_json_path: Option<&Path>,
stage_timing: &RunStageTiming,
@ -1140,6 +1344,11 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let config = TreeRunConfig {
max_depth: args.max_depth,
max_instances: args.max_instances,
compact_audit: args.skip_report_build
&& args.report_json_path.is_none()
&& !args.cir_enabled,
persist_vcir: !args.skip_vcir_persist,
build_ccr_accumulator: args.ccr_out_path.is_some(),
};
let replay_mode = args.payload_replay_archive.is_some();
let delta_replay_mode = args.payload_base_archive.is_some();
@ -1424,7 +1633,18 @@ pub fn run(argv: &[String]) -> Result<(), String> {
ReportJsonFormat::Pretty
};
let ccr_produced_at = time::OffsetDateTime::now_utc();
let (report_result, ccr_result) = std::thread::scope(|scope| {
let (report_result, ccr_result) = if args.skip_report_build {
(
Ok(ReportTaskOutput::skipped()),
run_ccr_task(
store.as_ref(),
&shared,
args.ccr_out_path.as_deref(),
ccr_produced_at,
),
)
} else {
std::thread::scope(|scope| {
let report_handle = scope.spawn(|| {
run_report_task(
&policy,
@ -1451,7 +1671,8 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|_| "ccr task panicked".to_string())
.and_then(|result| result);
(report_result, ccr_result)
});
})
};
let report_output = report_result?;
let ccr_output = ccr_result?;
let report = report_output.report;
@ -1460,6 +1681,18 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let ccr_build_ms = ccr_output.ccr_build_ms;
let ccr_build_breakdown = ccr_output.ccr_build_breakdown;
let ccr_write_ms = ccr_output.ccr_write_ms;
let compare_view_trust_anchor = args
.compare_view_trust_anchor
.as_deref()
.unwrap_or("unknown");
let compare_view_output = run_compare_view_task(
&shared,
args.vrps_csv_out_path.as_deref(),
args.vaps_csv_out_path.as_deref(),
compare_view_trust_anchor,
)?;
let compare_view_build_ms = compare_view_output.build_ms;
let compare_view_write_ms = compare_view_output.write_ms;
let mut cir_build_cir_ms = None;
let mut cir_write_cir_ms = None;
@ -1516,6 +1749,8 @@ pub fn run(argv: &[String]) -> Result<(), String> {
ccr_build_ms,
ccr_build_breakdown,
ccr_write_ms,
compare_view_build_ms,
compare_view_write_ms,
cir_build_cir_ms,
cir_write_cir_ms,
cir_total_ms,
@ -1528,14 +1763,19 @@ pub fn run(argv: &[String]) -> Result<(), String> {
rsync_download_ms_total,
download_bytes_total,
};
write_stage_timing(args.report_json_path.as_deref(), &stage_timing)?;
let stage_timing_anchor_path = args
.report_json_path
.as_deref()
.or(args.ccr_out_path.as_deref())
.or(args.vrps_csv_out_path.as_deref());
write_stage_timing(stage_timing_anchor_path, &stage_timing)?;
if let Some((out_dir, t)) = timing.as_ref() {
t.record_count("vrps", report.vrps.len() as u64);
t.record_count("aspas", report.aspas.len() as u64);
t.record_count("vrps", shared.vrps.len() as u64);
t.record_count("aspas", shared.aspas.len() as u64);
t.record_count(
"audit_publication_points",
report.publication_points.len() as u64,
shared.publication_points.len() as u64,
);
let timing_json_path = out_dir.join("timing.json");
t.write_json(&timing_json_path, 20)?;
@ -1575,7 +1815,11 @@ pub fn run(argv: &[String]) -> Result<(), String> {
eprintln!("analysis: wrote {}", pb_path.display());
}
print_summary(&report);
if let Some(report) = report.as_ref() {
print_summary(report);
} else {
print_summary_from_shared(validation_time, &shared);
}
Ok(())
}
@ -1701,6 +1945,104 @@ mod tests {
);
}
#[test]
fn parse_accepts_skip_report_build_without_report_json() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--ccr-out".to_string(),
"out/result.ccr".to_string(),
"--skip-report-build".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.skip_report_build);
assert_eq!(
args.ccr_out_path.as_deref(),
Some(std::path::Path::new("out/result.ccr"))
);
}
#[test]
fn parse_accepts_skip_vcir_persist() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--skip-vcir-persist".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.skip_vcir_persist);
}
#[test]
fn parse_rejects_skip_report_build_with_report_json() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--report-json".to_string(),
"out/report.json".to_string(),
"--skip-report-build".to_string(),
];
let err = parse_args(&argv).expect_err("skip report build with report path should fail");
assert!(
err.contains("--skip-report-build cannot be combined with --report-json"),
"{err}"
);
}
#[test]
fn parse_accepts_direct_compare_view_csv_outputs() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--vrps-csv-out".to_string(),
"out/vrps.csv".to_string(),
"--vaps-csv-out".to_string(),
"out/vaps.csv".to_string(),
"--compare-view-trust-anchor".to_string(),
"unknown".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(
args.vrps_csv_out_path.as_deref(),
Some(std::path::Path::new("out/vrps.csv"))
);
assert_eq!(
args.vaps_csv_out_path.as_deref(),
Some(std::path::Path::new("out/vaps.csv"))
);
assert_eq!(args.compare_view_trust_anchor.as_deref(), Some("unknown"));
}
#[test]
fn parse_rejects_partial_direct_compare_view_csv_outputs() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--vrps-csv-out".to_string(),
"out/vrps.csv".to_string(),
];
let err = parse_args(&argv).expect_err("partial direct compare view output should fail");
assert!(
err.contains("--vrps-csv-out and --vaps-csv-out must be provided together"),
"{err}"
);
}
#[test]
fn parse_accepts_external_raw_store_db() {
let argv = vec![
@ -2478,7 +2820,8 @@ mod tests {
.with_rfc_refs(&[crate::report::RfcRef("RFC 6487 §4.8.8.1")])
.with_context("rsync://example.test/repo/pp/"),
],
vrps: vec![crate::validation::objects::Vrp {
vrps: vec![
crate::validation::objects::Vrp {
asn: 64496,
prefix: crate::data_model::roa::IpPrefix {
afi: crate::data_model::roa::RoaAfi::Ipv4,
@ -2486,7 +2829,17 @@ mod tests {
addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 24,
}],
},
crate::validation::objects::Vrp {
asn: 64497,
prefix: crate::data_model::roa::IpPrefix {
afi: crate::data_model::roa::RoaAfi::Ipv6,
prefix_len: 48,
addr: [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
},
max_length: 64,
},
],
aspas: vec![crate::validation::objects::AspaAttestation {
customer_as_id: 64496,
provider_as_ids: vec![64497, 64498],
@ -2562,7 +2915,7 @@ mod tests {
let report = build_report(&policy, validation_time, &shared);
assert_eq!(unique_rrdp_repos(&report), 2);
assert_eq!(report.vrps.len(), 1);
assert_eq!(report.vrps.len(), 2);
assert_eq!(report.aspas.len(), 1);
print_summary(&report);
@ -2584,8 +2937,9 @@ mod tests {
)
.expect("run report task");
assert_eq!(report_output.report.vrps.len(), 1);
assert_eq!(report_output.report.aspas.len(), 1);
let report = report_output.report.as_ref().expect("report built");
assert_eq!(report.vrps.len(), 2);
assert_eq!(report.aspas.len(), 1);
assert!(report_output.report_write_ms.is_some());
let report_json = std::fs::read_to_string(&report_path).expect("read report json");
@ -2598,23 +2952,59 @@ mod tests {
ccr_build_ms: Some(2),
ccr_build_breakdown: None,
ccr_write_ms: Some(3),
cir_build_cir_ms: Some(4),
cir_write_cir_ms: Some(5),
cir_total_ms: Some(6),
total_ms: 7,
compare_view_build_ms: Some(4),
compare_view_write_ms: Some(5),
cir_build_cir_ms: Some(6),
cir_write_cir_ms: Some(7),
cir_total_ms: Some(8),
total_ms: 9,
publication_points: shared.publication_points.len(),
repo_sync_ms_total: 8,
publication_point_repo_sync_ms_total: 9,
download_event_count: 10,
rrdp_download_ms_total: 11,
rsync_download_ms_total: 12,
download_bytes_total: 13,
repo_sync_ms_total: 10,
publication_point_repo_sync_ms_total: 11,
download_event_count: 12,
rrdp_download_ms_total: 13,
rsync_download_ms_total: 14,
download_bytes_total: 15,
};
write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing");
let stage_timing_json =
std::fs::read_to_string(dir.path().join("stage-timing.json")).expect("read timing");
assert!(stage_timing_json.contains("\"validation_ms\""));
assert!(stage_timing_json.contains("\"ccr_build_ms\""));
let ccr_path = dir.path().join("result.ccr");
write_stage_timing(Some(&ccr_path), &stage_timing)
.expect("write stage timing via ccr path");
assert!(
dir.path().join("stage-timing.json").exists(),
"stage timing should use parent directory of the anchor path"
);
let skipped = ReportTaskOutput::skipped();
assert!(skipped.report.is_none());
assert_eq!(skipped.report_build_ms, 0);
assert!(skipped.report_write_ms.is_none());
}
#[test]
fn run_compare_view_task_writes_csv_from_shared_output() {
let shared = synthetic_post_validation_shared();
let dir = tempfile::tempdir().expect("tmpdir");
let vrps_path = dir.path().join("vrps.csv");
let vaps_path = dir.path().join("vaps.csv");
let output = run_compare_view_task(&shared, Some(&vrps_path), Some(&vaps_path), "unknown")
.expect("write direct compare views");
assert!(output.build_ms.is_some());
assert!(output.write_ms.is_some());
let vrps_csv = std::fs::read_to_string(vrps_path).expect("read vrps csv");
let vaps_csv = std::fs::read_to_string(vaps_path).expect("read vaps csv");
assert!(vrps_csv.contains("ASN,IP Prefix,Max Length,Trust Anchor"));
assert!(vrps_csv.contains("AS64496,192.0.2.0/24,24,unknown"));
assert!(vrps_csv.contains("AS64497,2001:db8::/48,64,unknown"));
assert!(vaps_csv.contains("Customer ASN,Providers,Trust Anchor"));
assert!(vaps_csv.contains("AS64496,AS64497;AS64498,unknown"));
}
#[test]

View File

@ -12,7 +12,7 @@ use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log;
use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError, load_rrdp_local_state};
use crate::sync::store_projection::{
build_repository_view_present_entry, build_repository_view_withdrawn_entry,
prepare_repo_bytes_batch,
prepare_repo_bytes_batch_owned,
};
use std::collections::HashSet;
@ -637,7 +637,7 @@ fn rsync_sync_into_current_store(
.as_ref()
.map(|t| t.span_phase("rsync_write_current_store_total"));
let prepared_bytes =
prepare_repo_bytes_batch(&fetched_objects).map_err(RepoSyncError::Storage)?;
prepare_repo_bytes_batch_owned(fetched_objects).map_err(RepoSyncError::Storage)?;
let mut repository_view_entries = Vec::new();
for entry in existing_view {
if !new_set.contains(&entry.rsync_uri) {

View File

@ -20,6 +20,12 @@ pub struct PreparedRepoBytesBatch {
pub fn prepare_repo_bytes_batch(
objects: &[(String, Vec<u8>)],
) -> Result<PreparedRepoBytesBatch, String> {
prepare_repo_bytes_batch_owned(objects.to_vec())
}
pub fn prepare_repo_bytes_batch_owned(
objects: Vec<(String, Vec<u8>)>,
) -> Result<PreparedRepoBytesBatch, String> {
let mut uri_to_hash: BTreeMap<String, String> = BTreeMap::new();
let mut pending: BTreeMap<String, Vec<u8>> = BTreeMap::new();
@ -28,14 +34,14 @@ pub fn prepare_repo_bytes_batch(
if bytes.is_empty() {
return Err(format!("repo bytes for {uri} must not be empty"));
}
let sha256_hex = compute_sha256_hex(bytes);
let sha256_hex = compute_sha256_hex(&bytes);
uri_to_hash.insert(uri.clone(), sha256_hex.clone());
match pending.entry(sha256_hex) {
std::collections::btree_map::Entry::Vacant(slot) => {
slot.insert(bytes.clone());
slot.insert(bytes);
}
std::collections::btree_map::Entry::Occupied(existing) => {
if existing.get() != bytes {
if existing.get() != &bytes {
return Err(format!(
"repo bytes collision for {uri}: same sha256 maps to different bytes"
));
@ -455,7 +461,7 @@ pub fn now_pack_time() -> PackTime {
#[cfg(test)]
mod tests {
use super::prepare_repo_bytes_batch;
use super::{prepare_repo_bytes_batch, prepare_repo_bytes_batch_owned};
use std::collections::BTreeSet;
#[test]
@ -484,4 +490,21 @@ mod tests {
.collect::<BTreeSet<_>>();
assert_eq!(unique_hashes.len(), 2);
}
#[test]
fn prepare_repo_bytes_batch_owned_deduplicates_without_borrowed_input() {
let objects = vec![
(
"rsync://example.test/repo/a.roa".to_string(),
b"same".to_vec(),
),
(
"rsync://example.test/repo/b.roa".to_string(),
b"same".to_vec(),
),
];
let prepared = prepare_repo_bytes_batch_owned(objects).expect("prepare repo bytes");
assert_eq!(prepared.uri_to_hash.len(), 2);
assert_eq!(prepared.blobs_to_write.len(), 1);
}
}

View File

@ -134,6 +134,30 @@ pub fn process_publication_point_for_issuer<P: PublicationPointData>(
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
) -> ObjectsOutput {
process_publication_point_for_issuer_with_options(
publication_point,
policy,
issuer_ca_der,
issuer_ca_rsync_uri,
issuer_effective_ip,
issuer_effective_as,
validation_time,
timing,
true,
)
}
pub fn process_publication_point_for_issuer_with_options<P: PublicationPointData>(
publication_point: &P,
policy: &Policy,
issuer_ca_der: &[u8],
issuer_ca_rsync_uri: Option<&str>,
issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
collect_vcir_local_outputs: bool,
) -> ObjectsOutput {
let manifest_rsync_uri = publication_point.manifest_rsync_uri();
let manifest_bytes = publication_point.manifest_bytes();
@ -321,12 +345,15 @@ pub fn process_publication_point_for_issuer<P: PublicationPointData>(
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
);
match result.outcome {
Ok(mut ok) => {
stats.roa_ok += 1;
vrps.append(&mut ok.vrps);
if collect_vcir_local_outputs {
local_outputs_cache.extend(ok.local_outputs);
}
audit.push(ObjectAuditEntry {
rsync_uri: result.rsync_uri,
sha256_hex: result.sha256_hex,
@ -426,11 +453,14 @@ pub fn process_publication_point_for_issuer<P: PublicationPointData>(
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
) {
Ok((att, local_output)) => {
stats.aspa_ok += 1;
aspas.push(att);
if let Some(local_output) = local_output {
local_outputs_cache.push(local_output);
}
audit.push(ObjectAuditEntry {
rsync_uri: file.rsync_uri.clone(),
sha256_hex: sha256_hex_from_32(&file.sha256),
@ -537,10 +567,7 @@ pub fn process_publication_point_for_issuer_parallel_roa<P: PublicationPointData
timing: Option<&TimingHandle>,
config: &ParallelPhase2Config,
) -> ObjectsOutput {
if config.object_workers <= 1
|| policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint
{
return process_publication_point_for_issuer(
process_publication_point_for_issuer_parallel_roa_with_options(
publication_point,
policy,
issuer_ca_der,
@ -549,13 +576,43 @@ pub fn process_publication_point_for_issuer_parallel_roa<P: PublicationPointData
issuer_effective_as,
validation_time,
timing,
config,
true,
)
}
pub fn process_publication_point_for_issuer_parallel_roa_with_options<P: PublicationPointData>(
publication_point: &P,
policy: &Policy,
issuer_ca_der: &[u8],
issuer_ca_rsync_uri: Option<&str>,
issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
config: &ParallelPhase2Config,
collect_vcir_local_outputs: bool,
) -> ObjectsOutput {
if config.object_workers <= 1
|| policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint
{
return process_publication_point_for_issuer_with_options(
publication_point,
policy,
issuer_ca_der,
issuer_ca_rsync_uri,
issuer_effective_ip,
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
);
}
let pool = match ParallelRoaWorkerPool::new(config) {
Ok(pool) => pool,
Err(_) => {
return process_publication_point_for_issuer(
return process_publication_point_for_issuer_with_options(
publication_point,
policy,
issuer_ca_der,
@ -564,11 +621,12 @@ pub fn process_publication_point_for_issuer_parallel_roa<P: PublicationPointData
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
);
}
};
process_publication_point_for_issuer_parallel_roa_with_pool(
process_publication_point_for_issuer_parallel_roa_with_pool_options(
publication_point,
policy,
issuer_ca_der,
@ -578,6 +636,7 @@ pub fn process_publication_point_for_issuer_parallel_roa<P: PublicationPointData
validation_time,
timing,
&pool,
collect_vcir_local_outputs,
)
}
@ -592,8 +651,7 @@ pub fn process_publication_point_for_issuer_parallel_roa_with_pool<P: Publicatio
timing: Option<&TimingHandle>,
pool: &ParallelRoaWorkerPool,
) -> ObjectsOutput {
if policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint {
return process_publication_point_for_issuer(
process_publication_point_for_issuer_parallel_roa_with_pool_options(
publication_point,
policy,
issuer_ca_der,
@ -602,6 +660,36 @@ pub fn process_publication_point_for_issuer_parallel_roa_with_pool<P: Publicatio
issuer_effective_as,
validation_time,
timing,
pool,
true,
)
}
pub fn process_publication_point_for_issuer_parallel_roa_with_pool_options<
P: PublicationPointData,
>(
publication_point: &P,
policy: &Policy,
issuer_ca_der: &[u8],
issuer_ca_rsync_uri: Option<&str>,
issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
pool: &ParallelRoaWorkerPool,
collect_vcir_local_outputs: bool,
) -> ObjectsOutput {
if policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint {
return process_publication_point_for_issuer_with_options(
publication_point,
policy,
issuer_ca_der,
issuer_ca_rsync_uri,
issuer_effective_ip,
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
);
}
@ -615,9 +703,10 @@ pub fn process_publication_point_for_issuer_parallel_roa_with_pool<P: Publicatio
validation_time,
timing,
pool,
collect_vcir_local_outputs,
)
.unwrap_or_else(|_| {
process_publication_point_for_issuer(
process_publication_point_for_issuer_with_options(
publication_point,
policy,
issuer_ca_der,
@ -626,6 +715,7 @@ pub fn process_publication_point_for_issuer_parallel_roa_with_pool<P: Publicatio
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
)
})
}
@ -645,6 +735,7 @@ pub(crate) struct OwnedRoaTask {
issuer_effective_ip: Option<crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
collect_vcir_local_outputs: bool,
}
#[derive(Clone)]
@ -736,6 +827,7 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult {
task.issuer_effective_as.as_ref(),
task.validation_time,
None,
task.collect_vcir_local_outputs,
)
.map(|(vrps, local_outputs)| RoaTaskOk {
vrps,
@ -769,6 +861,7 @@ pub(crate) struct ParallelObjectsStage {
issuer_effective_ip: Option<crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
collect_vcir_local_outputs: bool,
warnings: Vec<Warning>,
stats: ObjectsStats,
audit: Vec<ObjectAuditEntry>,
@ -794,6 +887,7 @@ impl ParallelObjectsStage {
issuer_effective_ip: self.issuer_effective_ip.clone(),
issuer_effective_as: self.issuer_effective_as.clone(),
validation_time: self.validation_time,
collect_vcir_local_outputs: self.collect_vcir_local_outputs,
})
.collect()
}
@ -811,6 +905,7 @@ pub(crate) fn prepare_publication_point_for_parallel_roa<P: PublicationPointData
issuer_effective_ip: Option<&crate::data_model::rc::IpResourceSet>,
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
collect_vcir_local_outputs: bool,
) -> ParallelObjectsPrepare {
let manifest_rsync_uri = publication_point.manifest_rsync_uri();
let manifest_bytes = publication_point.manifest_bytes();
@ -985,6 +1080,7 @@ pub(crate) fn prepare_publication_point_for_parallel_roa<P: PublicationPointData
issuer_effective_ip: issuer_effective_ip.cloned(),
issuer_effective_as: issuer_effective_as.cloned(),
validation_time,
collect_vcir_local_outputs,
warnings,
stats,
audit,
@ -1025,7 +1121,9 @@ pub(crate) fn reduce_parallel_roa_stage(
Ok(mut ok) => {
stats.roa_ok += 1;
vrps.append(&mut ok.vrps);
if stage.collect_vcir_local_outputs {
local_outputs_cache.extend(ok.local_outputs);
}
audit.push(ObjectAuditEntry {
rsync_uri: result.rsync_uri,
sha256_hex: result.sha256_hex,
@ -1066,11 +1164,14 @@ pub(crate) fn reduce_parallel_roa_stage(
stage.issuer_effective_as.as_ref(),
stage.validation_time,
timing,
stage.collect_vcir_local_outputs,
) {
Ok((att, local_output)) => {
stats.aspa_ok += 1;
aspas.push(att);
if let Some(local_output) = local_output {
local_outputs_cache.push(local_output);
}
audit.push(ObjectAuditEntry {
rsync_uri: file.rsync_uri.clone(),
sha256_hex: sha256_hex_from_32(&file.sha256),
@ -1120,6 +1221,7 @@ fn process_publication_point_for_issuer_parallel_roa_inner<P: PublicationPointDa
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
pool: &ParallelRoaWorkerPool,
collect_vcir_local_outputs: bool,
) -> Result<ObjectsOutput, String> {
let stage = match prepare_publication_point_for_parallel_roa(
0,
@ -1129,6 +1231,7 @@ fn process_publication_point_for_issuer_parallel_roa_inner<P: PublicationPointDa
issuer_effective_ip,
issuer_effective_as,
validation_time,
collect_vcir_local_outputs,
) {
ParallelObjectsPrepare::Complete(out) => return Ok(out),
ParallelObjectsPrepare::Staged(stage) => stage,
@ -1279,6 +1382,7 @@ pub(crate) fn validate_roa_task_serial(
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
collect_vcir_local_outputs: bool,
) -> RoaTaskResult {
let sha256_hex = sha256_hex_from_32(&task.file.sha256);
let outcome = process_roa_with_issuer(
@ -1294,6 +1398,7 @@ pub(crate) fn validate_roa_task_serial(
issuer_effective_as,
validation_time,
timing,
collect_vcir_local_outputs,
)
.map(|(vrps, local_outputs)| RoaTaskOk {
vrps,
@ -1322,6 +1427,7 @@ fn process_roa_with_issuer(
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
collect_vcir_local_outputs: bool,
) -> Result<(Vec<Vrp>, Vec<VcirLocalOutput>), ObjectValidateError> {
let _decode = timing
.as_ref()
@ -1378,6 +1484,9 @@ fn process_roa_with_issuer(
drop(_subset);
let vrps = roa_to_vrps(&roa);
if !collect_vcir_local_outputs {
return Ok((vrps, Vec::new()));
}
let source_object_hash = sha256_hex_from_32(&file.sha256);
let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice());
let item_effective_until =
@ -1434,6 +1543,7 @@ fn process_roa_with_issuer_parallel_cached(
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
collect_vcir_local_outputs: bool,
) -> Result<(Vec<Vrp>, Vec<VcirLocalOutput>), ObjectValidateError> {
let _decode = timing
.as_ref()
@ -1496,6 +1606,9 @@ fn process_roa_with_issuer_parallel_cached(
drop(_subset);
let vrps = roa_to_vrps(&roa);
if !collect_vcir_local_outputs {
return Ok((vrps, Vec::new()));
}
let source_object_hash = sha256_hex_from_32(&file.sha256);
let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice());
let item_effective_until =
@ -1552,7 +1665,8 @@ fn process_aspa_with_issuer(
issuer_effective_as: Option<&crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
timing: Option<&TimingHandle>,
) -> Result<(AspaAttestation, VcirLocalOutput), ObjectValidateError> {
collect_vcir_local_outputs: bool,
) -> Result<(AspaAttestation, Option<VcirLocalOutput>), ObjectValidateError> {
let _decode = timing
.as_ref()
.map(|t| t.span_phase("objects_aspa_decode_and_validate_total"));
@ -1611,6 +1725,9 @@ fn process_aspa_with_issuer(
customer_as_id: aspa.aspa.customer_as_id,
provider_as_ids: aspa.aspa.provider_as_ids.clone(),
};
if !collect_vcir_local_outputs {
return Ok((attestation, None));
}
let source_object_hash = sha256_hex_from_32(&file.sha256);
let source_ee_cert_hash = crate::audit::sha256_hex(ee.raw_der.as_slice());
let item_effective_until =
@ -1649,7 +1766,7 @@ fn process_aspa_with_issuer(
],
};
Ok((attestation, local_output))
Ok((attestation, Some(local_output)))
}
fn vrp_prefix_to_string(vrp: &Vrp) -> String {

View File

@ -78,6 +78,7 @@ pub fn run_publication_point_once(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let result = runner

View File

@ -125,6 +125,7 @@ fn make_live_runner<'a>(
repo_sync_runtime: Option<Arc<dyn RepoSyncRuntime>>,
parallel_phase2_config: Option<ParallelPhase2Config>,
ccr_accumulator: Option<CcrAccumulator>,
persist_vcir: bool,
) -> Rpkiv1PublicationPointRunner<'a> {
let parallel_roa_worker_pool = parallel_phase2_config
.as_ref()
@ -148,6 +149,7 @@ fn make_live_runner<'a>(
parallel_phase2_config,
parallel_roa_worker_pool,
ccr_accumulator: ccr_accumulator.map(Mutex::new),
persist_vcir,
}
}
@ -300,6 +302,7 @@ pub fn run_tree_from_tal_url_serial(
None,
None,
None,
config.persist_vcir,
);
let root = root_handle_from_trust_anchor(
@ -337,6 +340,7 @@ pub fn run_tree_from_tal_url_serial_audit(
None,
None,
None,
config.persist_vcir,
);
let root = root_handle_from_trust_anchor(
@ -391,6 +395,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
None,
None,
None,
config.persist_vcir,
);
let root = root_handle_from_trust_anchor(
@ -460,7 +465,9 @@ where
Some(current_repo_index),
Some(runtime),
phase2_config,
phase2_enabled.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])),
(phase2_enabled && config.build_ccr_accumulator)
.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])),
config.persist_vcir,
);
let root = root_handle_from_trust_anchor(
@ -490,9 +497,7 @@ where
Some(&current_repo_index_for_output),
collect_current_repo_objects,
),
ccr_accumulator: phase2_enabled
.then(|| runner.ccr_accumulator_snapshot())
.flatten(),
ccr_accumulator: runner.ccr_accumulator_snapshot(),
})
}
@ -554,7 +559,7 @@ where
Some(current_repo_index),
Some(runtime),
phase2_config,
phase2_enabled.then(|| {
(phase2_enabled && config.build_ccr_accumulator).then(|| {
CcrAccumulator::new(
discoveries
.iter()
@ -562,6 +567,7 @@ where
.collect::<Vec<_>>(),
)
}),
config.persist_vcir,
);
let TreeRunAuditOutput {
@ -585,9 +591,7 @@ where
Some(&current_repo_index_for_output),
collect_current_repo_objects,
),
ccr_accumulator: phase2_enabled
.then(|| runner.ccr_accumulator_snapshot())
.flatten(),
ccr_accumulator: runner.ccr_accumulator_snapshot(),
})
}
@ -841,6 +845,7 @@ pub fn run_tree_from_tal_and_ta_der_serial(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -892,6 +897,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -960,6 +966,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1023,6 +1030,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1087,6 +1095,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1158,6 +1167,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1215,6 +1225,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1289,6 +1300,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let root = root_handle_from_trust_anchor(
@ -1346,6 +1358,7 @@ fn build_payload_replay_runner<'a>(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
}
}
@ -1378,6 +1391,7 @@ fn build_payload_delta_replay_runner<'a>(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
}
}
@ -1410,6 +1424,7 @@ fn build_payload_delta_replay_current_store_runner<'a>(
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
}
}
@ -1888,6 +1903,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.unwrap_err();
@ -1920,6 +1938,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run replay root-only audit");
@ -1962,6 +1983,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run replay root-only audit");
@ -2005,6 +2029,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
&timing,
)
@ -2058,6 +2085,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.unwrap_err();
@ -2087,6 +2117,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.unwrap_err();
@ -2136,6 +2169,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run delta replay root-only audit");
@ -2194,6 +2230,9 @@ mod replay_api_tests {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
&timing,
)

View File

@ -12,6 +12,12 @@ pub struct TreeRunConfig {
pub max_depth: Option<usize>,
/// Max number of CA instances to process.
pub max_instances: Option<usize>,
/// Drop per-object audit payload when the caller only needs validation outputs.
pub compact_audit: bool,
/// Persist per-CA VCIR state for future reuse/delta runs.
pub persist_vcir: bool,
/// Build online CCR manifest projections during phase2 validation.
pub build_ccr_accumulator: bool,
}
impl Default for TreeRunConfig {
@ -19,6 +25,9 @@ impl Default for TreeRunConfig {
Self {
max_depth: None,
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
}
}
}
@ -192,16 +201,20 @@ pub fn run_tree_serial_audit_multi_root(
};
instances_processed += 1;
warnings.extend(res.warnings.clone());
warnings.extend(res.objects.warnings.clone());
vrps.extend(res.objects.vrps.clone());
aspas.extend(res.objects.aspas.clone());
router_keys.extend(res.objects.router_keys.clone());
warnings.extend(res.warnings);
warnings.extend(res.objects.warnings);
vrps.extend(res.objects.vrps);
aspas.extend(res.objects.aspas);
router_keys.extend(res.objects.router_keys);
let mut audit = res.audit.clone();
let mut audit = res.audit;
audit.node_id = Some(node.id);
audit.parent_node_id = node.parent_id;
audit.discovered_from = node.discovered_from.clone();
if config.compact_audit {
audit.objects.clear();
audit.warnings.clear();
}
publication_points.push(audit);
let mut children = res.discovered_children;

View File

@ -51,17 +51,25 @@ struct FinishedPublicationPoint {
fn compact_phase2_finished_result(
mut result: PublicationPointRunResult,
compact_audit: bool,
) -> PublicationPointRunResult {
// Phase2 only needs warnings, objects, audit, and traversal metadata after finalize.
// Dropping the snapshot here avoids retaining manifest/files/raw-byte caches until run end.
result.snapshot = None;
if compact_audit {
result.audit.objects.clear();
result.audit.warnings.clear();
result.objects.audit.clear();
result.discovered_children.clear();
}
result
}
fn compact_phase2_finished_result_result(
result: Result<PublicationPointRunResult, String>,
compact_audit: bool,
) -> Result<PublicationPointRunResult, String> {
result.map(compact_phase2_finished_result)
result.map(|result| compact_phase2_finished_result(result, compact_audit))
}
pub fn run_tree_parallel_phase2_audit_multi_root(
@ -150,11 +158,17 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
&mut inflight_publication_points,
&mut finished,
ready,
config.compact_audit,
);
}
flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?;
drain_object_results(runner, &mut inflight_publication_points, &mut finished)?;
drain_object_results(
runner,
&mut inflight_publication_points,
&mut finished,
config.compact_audit,
)?;
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
@ -204,6 +218,7 @@ fn stage_ready_publication_point(
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>,
ready: ReadyCaInstance,
compact_audit: bool,
) {
let publication_point_started = Instant::now();
let mut warnings = ready.repo_outcome.warnings.clone();
@ -229,7 +244,7 @@ fn stage_ready_publication_point(
}
finished.push(FinishedPublicationPoint {
node: ready.node,
result: compact_phase2_finished_result_result(fallback),
result: compact_phase2_finished_result_result(fallback, compact_audit),
});
return;
}
@ -252,6 +267,7 @@ fn stage_ready_publication_point(
ready.node.handle.effective_ip_resources.as_ref(),
ready.node.handle.effective_as_resources.as_ref(),
runner.validation_time,
runner.persist_vcir,
) {
ParallelObjectsPrepare::Complete(mut objects) => {
objects
@ -271,6 +287,7 @@ fn stage_ready_publication_point(
objects,
repo_outcome,
finished,
compact_audit,
);
}
ParallelObjectsPrepare::Staged(objects_stage) => {
@ -299,6 +316,7 @@ fn stage_ready_publication_point(
objects,
repo_outcome,
finished,
compact_audit,
);
}
Err(err) => finished.push(FinishedPublicationPoint {
@ -367,6 +385,7 @@ fn finalize_ready_objects(
objects: ObjectsOutput,
repo_outcome: RepoSyncRuntimeOutcome,
finished: &mut Vec<FinishedPublicationPoint>,
compact_audit: bool,
) {
let result = runner
.finalize_fresh_publication_point_from_reducer(
@ -384,7 +403,7 @@ fn finalize_ready_objects(
.map(|out| out.result);
finished.push(FinishedPublicationPoint {
node,
result: compact_phase2_finished_result_result(result),
result: compact_phase2_finished_result_result(result, compact_audit),
});
}
@ -416,6 +435,7 @@ fn drain_object_results(
runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>,
compact_audit: bool,
) -> Result<(), TreeRunError> {
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(());
@ -479,7 +499,7 @@ fn drain_object_results(
);
finished.push(FinishedPublicationPoint {
node: state.node,
result: compact_phase2_finished_result_result(result),
result: compact_phase2_finished_result_result(result, compact_audit),
});
}
Err(err) => finished.push(FinishedPublicationPoint {
@ -572,11 +592,11 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
match item.result {
Ok(result) => {
instances_processed += 1;
warnings.extend(result.warnings.clone());
warnings.extend(result.objects.warnings.clone());
vrps.extend(result.objects.vrps.clone());
aspas.extend(result.objects.aspas.clone());
router_keys.extend(result.objects.router_keys.clone());
warnings.extend(result.warnings);
warnings.extend(result.objects.warnings);
vrps.extend(result.objects.vrps);
aspas.extend(result.objects.aspas);
router_keys.extend(result.objects.router_keys);
let mut audit: PublicationPointAudit = result.audit;
audit.node_id = Some(item.node.id);
@ -666,15 +686,43 @@ mod tests {
#[test]
fn compact_phase2_finished_result_drops_snapshot() {
let result = compact_phase2_finished_result(sample_result());
let result = compact_phase2_finished_result(sample_result(), false);
assert!(result.snapshot.is_none());
assert_eq!(result.source, PublicationPointSource::Fresh);
assert!(result.discovered_children.is_empty());
}
#[test]
fn compact_phase2_finished_result_can_drop_audit_payload() {
let mut sample = sample_result();
sample.audit.objects.push(crate::audit::ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/a.roa".to_string(),
sha256_hex: "11".repeat(32),
kind: crate::audit::AuditObjectKind::Roa,
result: crate::audit::AuditObjectResult::Ok,
detail: None,
});
sample.audit.warnings.push(crate::audit::AuditWarning {
message: "warning".to_string(),
rfc_refs: Vec::new(),
context: None,
});
sample.objects.audit.push(crate::audit::ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/b.roa".to_string(),
sha256_hex: "22".repeat(32),
kind: crate::audit::AuditObjectKind::Roa,
result: crate::audit::AuditObjectResult::Ok,
detail: None,
});
let result = compact_phase2_finished_result(sample, true);
assert!(result.audit.objects.is_empty());
assert!(result.audit.warnings.is_empty());
assert!(result.objects.audit.is_empty());
}
#[test]
fn compact_phase2_finished_result_result_preserves_err() {
let err = compact_phase2_finished_result_result(Err("boom".to_string()))
let err = compact_phase2_finished_result_result(Err("boom".to_string()), false)
.expect_err("error should be preserved");
assert_eq!(err, "boom");
}

View File

@ -44,8 +44,8 @@ use crate::validation::manifest::{
};
use crate::validation::objects::{
AspaAttestation, ParallelRoaWorkerPool, RouterKeyPayload, Vrp,
process_publication_point_for_issuer, process_publication_point_for_issuer_parallel_roa,
process_publication_point_for_issuer_parallel_roa_with_pool,
process_publication_point_for_issuer_parallel_roa_with_options,
process_publication_point_for_issuer_parallel_roa_with_pool_options,
};
use crate::validation::publication_point::PublicationPointSnapshot;
use crate::validation::tree::{
@ -138,6 +138,11 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
pub parallel_phase2_config: Option<ParallelPhase2Config>,
pub parallel_roa_worker_pool: Option<ParallelRoaWorkerPool>,
pub ccr_accumulator: Option<Mutex<CcrAccumulator>>,
/// When false, skip VCIR persistence and per-output VCIR projection building.
///
/// This is intended for replay/compare-only runs where the caller does not need
/// the resulting DB to be reused by a later delta run.
pub persist_vcir: bool,
}
impl<'a> Rpkiv1PublicationPointRunner<'a> {
@ -267,7 +272,8 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
let snapshot_pack_ms = snapshot_pack_started.elapsed().as_millis() as u64;
let persist_vcir_started = std::time::Instant::now();
let persist_vcir_timing = persist_vcir_for_fresh_result_with_timing(
let persist_vcir_timing = if self.persist_vcir {
persist_vcir_for_fresh_result_with_timing(
self.store,
ca,
&pack,
@ -277,7 +283,10 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
&discovered_children,
self.validation_time,
)
.map_err(|e| format!("persist VCIR failed: {e}"))?;
.map_err(|e| format!("persist VCIR failed: {e}"))?
} else {
PersistVcirTimingBreakdown::default()
};
let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64;
// local_outputs_cache only exists to build/persist VCIR. Release it before the
@ -607,7 +616,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
.as_ref()
.map(|t| t.span_phase("objects_processing_total"));
if let Some(phase2_pool) = self.parallel_roa_worker_pool.as_ref() {
process_publication_point_for_issuer_parallel_roa_with_pool(
process_publication_point_for_issuer_parallel_roa_with_pool_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -617,9 +626,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.validation_time,
self.timing.as_ref(),
phase2_pool,
self.persist_vcir,
)
} else if let Some(phase2_config) = self.parallel_phase2_config.as_ref() {
process_publication_point_for_issuer_parallel_roa(
process_publication_point_for_issuer_parallel_roa_with_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -629,9 +639,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.validation_time,
self.timing.as_ref(),
phase2_config,
self.persist_vcir,
)
} else {
process_publication_point_for_issuer(
crate::validation::objects::process_publication_point_for_issuer_with_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -640,6 +651,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
ca.effective_as_resources.as_ref(),
self.validation_time,
self.timing.as_ref(),
self.persist_vcir,
)
}
};
@ -3283,6 +3295,7 @@ mod tests {
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))),
persist_vcir: true,
}
}
@ -4159,6 +4172,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let ca = CaInstanceHandle {
depth: 0,
@ -4711,6 +4725,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
// For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for
@ -4873,6 +4888,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -4985,6 +5001,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -5100,6 +5117,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -5187,6 +5205,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let first = ok_runner
.run_publication_point(&handle)
@ -5217,6 +5236,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let second = bad_runner
.run_publication_point(&handle)
@ -6551,6 +6571,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let first = runner_rrdp
.run_publication_point(&handle)
@ -6584,6 +6605,7 @@ authorityKeyIdentifier = keyid:always
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let third = runner_rsync
.run_publication_point(&handle)

View File

@ -185,6 +185,7 @@ fn apnic_tree_full_stats_serial() {
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
};
let stats = RefCell::new(LiveStats::default());
@ -213,6 +214,9 @@ fn apnic_tree_full_stats_serial() {
&TreeRunConfig {
max_depth,
max_instances,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree");

View File

@ -36,6 +36,9 @@ fn apnic_tree_depth1_processes_more_than_root() {
&TreeRunConfig {
max_depth: Some(1),
max_instances: Some(2),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree from tal");
@ -74,6 +77,9 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run APNIC root-only");

View File

@ -108,6 +108,9 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree audit");

View File

@ -114,6 +114,9 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree");
@ -158,6 +161,9 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree");
@ -210,6 +216,9 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree audit");
@ -250,6 +259,9 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree audit");
@ -298,6 +310,9 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() {
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
&timing,
)
@ -345,6 +360,9 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty
&TreeRunConfig {
max_depth: Some(0),
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
&timing,
)

View File

@ -286,6 +286,9 @@ fn tree_respects_max_depth_and_max_instances() {
&TreeRunConfig {
max_depth: Some(0),
max_instances: None,
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree depth-limited");
@ -298,6 +301,9 @@ fn tree_respects_max_depth_and_max_instances() {
&TreeRunConfig {
max_depth: None,
max_instances: Some(1),
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
},
)
.expect("run tree instance-limited");