20260605 ROA增量验证和rsync host失败短路

This commit is contained in:
yuyr 2026-06-06 15:16:38 +08:00
parent e597c7c124
commit 4045f9a3a5
27 changed files with 2850 additions and 221 deletions

View File

@ -27,7 +27,7 @@ cleanup() {
}
trap cleanup EXIT
IGNORE_REGEX='src/bin/repository_view_stats\.rs|src/bin/db_stats\.rs|src/bin/rrdp_state_dump\.rs|src/bin/ccr_dump\.rs|src/bin/ccr_verify\.rs|src/bin/ccr_to_routinator_csv\.rs|src/bin/ccr_to_compare_views\.rs|src/bin/cir_materialize\.rs|src/bin/cir_extract_inputs\.rs|src/bin/cir_drop_report\.rs|src/bin/cir_ta_only_fixture\.rs|src/bin/cir_dump_reject_list\.rs|src/bin/rpki_object_parse\.rs|src/bin/triage_ccr_cir_pair\.rs|src/bin/rpki_artifact_metrics\.rs|src/bin/rpki_daemon\.rs|src/bin/sequence_triage_ccr_cir\.rs|src/tools/rpki_artifact_metrics\.rs|src/ccr/compare_view\.rs|src/progress_log\.rs|src/cli\.rs|src/validation/run_tree_from_tal\.rs|src/validation/tree_parallel\.rs|src/validation/tree_runner\.rs|src/validation/from_tal\.rs|src/sync/store_projection\.rs|src/sync/repo\.rs|src/sync/rrdp\.rs|src/storage\.rs|src/cir/materialize\.rs'
IGNORE_REGEX='repository_view_stats\.rs|db_stats\.rs|rrdp_state_dump\.rs|ccr_dump\.rs|ccr_verify\.rs|ccr_to_routinator_csv\.rs|ccr_to_compare_views\.rs|cir_materialize\.rs|cir_extract_inputs\.rs|cir_drop_report\.rs|cir_ta_only_fixture\.rs|cir_dump_reject_list\.rs|rpki_object_parse\.rs|triage_ccr_cir_pair\.rs|rpki_artifact_metrics|rpki_daemon\.rs|sequence_triage_ccr_cir|ccr_state_compare\.rs|cir_state_compare\.rs|cir_probe_rpki_client_cache\.rs|ccr/compare_view\.rs|progress_log\.rs|cli\.rs|validation/run_tree_from_tal\.rs|validation/tree_parallel\.rs|validation/tree_runner|validation/from_tal\.rs|sync/store_projection\.rs|sync/repo\.rs|sync/rrdp|(^|/)storage(/|\.rs$)|cir/materialize\.rs'
# Preserve colored output even though we post-process output by running under a pseudo-TTY.
# We run tests only once, then generate both CLI text + HTML reports without rerunning tests.

View File

@ -25,9 +25,11 @@ OUTPUT_COMPACT_REPORT=1
# 是否复用持久 rsync mirror。1 表示跨 run 复用;失败隔离数据库时也不会清理 mirror。
ALLOW_RSYNC_MIRROR_REUSE=1
# rsync 同步 scope。
# publication-point 表示默认只同步当前发布点module-root 表示扩大到 rsync module 根目录。
RSYNC_SCOPE=publication-point
# rsync 同步/去重 scope。
# host 表示按 rsync host 做失败短路,但实际拉取仍限定当前发布点,避免同一不可达 host 重复等待超时;
# publication-point 表示只按当前发布点去重;
# module-root 表示扩大实际拉取到 rsync module 根目录。
RSYNC_SCOPE=host
# 前一轮失败或不完整时,是否隔离旧数据库和运行态目录后强制下一轮 snapshot。
# 建议保持 1设置为 0 时,检测到前一轮失败会直接停止。
@ -45,6 +47,14 @@ RPKI_PROGRESS_SLOW_SECS=10
# 是否在运行前尝试禁用 rpki-client timer 并杀掉竞争 RP 进程。
DISABLE_COMPETING_RPS=1
# 传给 rpki 子进程的额外参数。多个参数用空格分隔。
# 示例RPKI_EXTRA_ARGS="--enable-roa-validation-cache"
RPKI_EXTRA_ARGS=""
# 是否为每轮输出 timing profile 到 runs/run_xxxx/analyze/timing.json。
# 性能 profile 或打点验证时设置为 1普通 soak 建议保持 0避免额外开销。
RPKI_ANALYZE=0
# 可选覆盖路径;默认由 package 自动推导。
# BIN_DIR="${PACKAGE_ROOT}/bin"
# FIXTURE_DIR="${PACKAGE_ROOT}/fixtures"

View File

@ -17,12 +17,14 @@ RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}"
RETAIN_RUNS="${RETAIN_RUNS:-10}"
OUTPUT_COMPACT_REPORT="${OUTPUT_COMPACT_REPORT:-1}"
ALLOW_RSYNC_MIRROR_REUSE="${ALLOW_RSYNC_MIRROR_REUSE:-1}"
RSYNC_SCOPE="${RSYNC_SCOPE:-publication-point}"
RSYNC_SCOPE="${RSYNC_SCOPE:-host}"
FAILURE_SNAPSHOT_RESET="${FAILURE_SNAPSHOT_RESET:-1}"
DB_STATS_EXACT_EVERY="${DB_STATS_EXACT_EVERY:-3}"
RPKI_PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}"
RPKI_PROGRESS_SLOW_SECS="${RPKI_PROGRESS_SLOW_SECS:-10}"
DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}"
RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}"
RPKI_ANALYZE="${RPKI_ANALYZE:-0}"
BIN_DIR="${BIN_DIR:-$PACKAGE_ROOT/bin}"
FIXTURE_DIR="${FIXTURE_DIR:-$PACKAGE_ROOT/fixtures}"
@ -84,10 +86,10 @@ validate_max_runs() {
validate_rsync_scope() {
case "$RSYNC_SCOPE" in
publication-point|module-root)
host|publication-point|module-root)
;;
*)
die "RSYNC_SCOPE must be publication-point or module-root: $RSYNC_SCOPE"
die "RSYNC_SCOPE must be host, publication-point, or module-root: $RSYNC_SCOPE"
;;
esac
}
@ -394,6 +396,11 @@ build_child_args() {
--vaps-csv-out "{run_out}/vaps.csv"
--compare-view-trust-anchor "$(compare_view_trust_anchor)"
)
if [[ -n "$RPKI_EXTRA_ARGS" ]]; then
# shellcheck disable=SC2206
local extra_args=( $RPKI_EXTRA_ARGS )
CHILD_ARGS+=("${extra_args[@]}")
fi
}
copy_inner_run_outputs() {
@ -541,6 +548,9 @@ run_one_round() {
"$INVALID_DB_PATH" "$INVALID_STATE_PATH" "$INVALID_TMP_PATH" "" "$PACKAGE_ROOT" "$ENV_FILE"
build_child_args
if is_true "$RPKI_ANALYZE"; then
CHILD_ARGS+=(--analysis-out "$run_dir/analyze")
fi
local daemon_args=(
--state-root "$daemon_state_root"
--rpki-bin "$RPKI_BIN"

View File

@ -71,6 +71,14 @@ impl TimingHandle {
.or_insert(inc);
}
pub fn counts_snapshot(&self) -> HashMap<String, u64> {
let g = self.inner.lock().expect("timing lock");
g.counts
.iter()
.map(|(key, value)| ((*key).to_string(), *value))
.collect()
}
/// Record a phase duration directly in nanoseconds.
///
/// This is useful when aggregating sub-phase timings locally (to reduce lock contention)

View File

@ -26,12 +26,15 @@ use crate::policy::{Policy, StrictPolicy};
use crate::storage::{RocksStore, VcirStorageSummary};
use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_multiple_tals_parallel_phase2_audit_with_timing,
run_tree_from_tal_and_ta_der_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_parallel_phase2_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase2_audit,
run_tree_from_tal_url_parallel_phase2_audit_with_timing,
};
use crate::validation::tree::TreeRunConfig;
#[cfg(test)]
@ -40,11 +43,13 @@ use output::{
ReportJsonFormat, run_compare_view_task, write_report_json_from_shared, write_stage_timing,
};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
struct RunStageTiming {
validation_ms: u64,
enable_roa_validation_cache: bool,
report_build_ms: u64,
report_write_ms: Option<u64>,
ccr_build_ms: Option<u64>,
@ -63,6 +68,8 @@ struct RunStageTiming {
rrdp_download_ms_total: u64,
rsync_download_ms_total: u64,
download_bytes_total: u64,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats,
analysis_counts: HashMap<String, u64>,
vcir_storage_summary_ms: Option<u64>,
vcir_storage: Option<VcirStorageSummary>,
memory_telemetry: Option<MemoryTelemetrySummary>,
@ -115,6 +122,7 @@ pub struct CliArgs {
pub report_json_compact: bool,
pub skip_report_build: bool,
pub skip_vcir_persist: bool,
pub enable_roa_validation_cache: bool,
pub ccr_out_path: Option<PathBuf>,
pub vrps_csv_out_path: Option<PathBuf>,
pub vaps_csv_out_path: Option<PathBuf>,
@ -147,6 +155,7 @@ pub struct CliArgs {
pub validation_time: Option<time::OffsetDateTime>,
pub analyze: bool,
pub analysis_out_path: Option<PathBuf>,
pub profile_cpu: bool,
}
@ -168,6 +177,8 @@ Options:
--report-json-compact Write report JSON without pretty-printing (requires --report-json)
--skip-report-build Skip full audit report construction when --report-json is not requested
--skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs
--enable-roa-validation-cache
Reuse accepted ROA validation outputs from previous VCIR records (default: off)
--ccr-out <path> Write CCR DER ContentInfo to this path (optional)
--vrps-csv-out <path> Write VRP compare-view CSV directly from validation output (optional; requires --vaps-csv-out)
--vaps-csv-out <path> Write VAP compare-view CSV directly from validation output (optional; requires --vrps-csv-out)
@ -218,11 +229,12 @@ Options:
--http-timeout-secs <n> HTTP fetch timeout seconds (default: 20)
--rsync-timeout-secs <n> rsync I/O timeout seconds (default: 60)
--rsync-mirror-root <path> Persist rsync mirrors under this directory (default: disabled)
--rsync-scope <policy> rsync scope policy: publication-point or module-root (default: publication-point)
--rsync-scope <policy> rsync scope policy: host, publication-point, or module-root (default: host)
--max-depth <n> Max CA instance depth (0 = root only)
--max-instances <n> Max number of CA instances to process
--validation-time <rfc3339> Validation time in RFC3339 (default: now UTC)
--analyze Write timing analysis JSON under target/live/analyze/<timestamp>/
--analysis-out <path> Write timing analysis JSON under this directory (implies --analyze)
--profile-cpu (Requires build feature 'profile') Write CPU flamegraph under analyze dir
--help Show this help
@ -246,6 +258,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut report_json_compact: bool = false;
let mut skip_report_build: bool = false;
let mut skip_vcir_persist: bool = false;
let mut enable_roa_validation_cache: bool = false;
let mut ccr_out_path: Option<PathBuf> = None;
let mut vrps_csv_out_path: Option<PathBuf> = None;
let mut vaps_csv_out_path: Option<PathBuf> = None;
@ -275,6 +288,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut max_instances: Option<usize> = None;
let mut validation_time: Option<time::OffsetDateTime> = None;
let mut analyze: bool = false;
let mut analysis_out_path: Option<PathBuf> = None;
let mut profile_cpu: bool = false;
let mut i = 1usize;
@ -447,6 +461,9 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--skip-vcir-persist" => {
skip_vcir_persist = true;
}
"--enable-roa-validation-cache" => {
enable_roa_validation_cache = true;
}
"--ccr-out" => {
i += 1;
let v = argv.get(i).ok_or("--ccr-out requires a value")?;
@ -606,6 +623,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--analyze" => {
analyze = true;
}
"--analysis-out" => {
i += 1;
let v = argv.get(i).ok_or("--analysis-out requires a value")?;
analyze = true;
analysis_out_path = Some(PathBuf::from(v));
}
"--profile-cpu" => {
profile_cpu = true;
}
@ -870,6 +893,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
report_json_compact,
skip_report_build,
skip_vcir_persist,
enable_roa_validation_cache,
ccr_out_path,
vrps_csv_out_path,
vaps_csv_out_path,
@ -898,6 +922,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
max_instances,
validation_time,
analyze,
analysis_out_path,
profile_cpu,
})
}
@ -1004,6 +1029,7 @@ struct PostValidationShared {
aspas: Arc<[crate::validation::objects::AspaAttestation]>,
router_keys: Arc<[crate::validation::objects::RouterKeyPayload]>,
publication_points: Arc<[crate::audit::PublicationPointAudit]>,
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats,
downloads: Arc<[crate::audit::AuditDownloadEvent]>,
download_stats: crate::audit::AuditDownloadStats,
current_repo_objects: Arc<[crate::current_repo_index::CurrentRepoObject]>,
@ -1018,6 +1044,7 @@ impl PostValidationShared {
successful_tal_inputs,
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects,
@ -1043,6 +1070,7 @@ impl PostValidationShared {
aspas: aspas.into(),
router_keys: router_keys.into(),
publication_points: publication_points.into(),
roa_cache_stats,
downloads: downloads.into(),
download_stats,
current_repo_objects: current_repo_objects.into(),
@ -1752,50 +1780,25 @@ where
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
if args.tal_inputs.len() > 1 {
return run_tree_from_multiple_tals_parallel_phase2_audit(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string());
}
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => run_tree_from_tal_url_parallel_phase2_audit(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
.map_err(|e| e.to_string()),
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
return if let Some(t) = timing {
run_tree_from_multiple_tals_parallel_phase2_audit_with_timing(
store,
policy,
&tal_bytes,
&ta_der,
None,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_multiple_tals_parallel_phase2_audit(
store,
policy,
args.tal_inputs.clone(),
http,
rsync,
validation_time,
@ -1804,6 +1807,81 @@ where
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string());
}
match (
args.tal_url.as_ref(),
args.tal_path.as_ref(),
args.ta_path.as_ref(),
) {
(Some(url), _, _) => if let Some(t) = timing {
run_tree_from_tal_url_parallel_phase2_audit_with_timing(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_tal_url_parallel_phase2_audit(
store,
policy,
url,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string()),
(None, Some(tal_path), Some(ta_path)) => {
let tal_bytes = std::fs::read(tal_path)
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some(t) = timing {
run_tree_from_tal_and_ta_der_parallel_phase2_audit_with_timing(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
t,
)
} else {
run_tree_from_tal_and_ta_der_parallel_phase2_audit(
store,
policy,
&tal_bytes,
&ta_der,
None,
http,
rsync,
validation_time,
config,
args.parallel_phase1_config.clone(),
args.parallel_phase2_config.clone(),
collect_current_repo_objects,
)
}
.map_err(|e| e.to_string())
}
(None, Some(tal_path), None) => {
@ -1875,6 +1953,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
&& !args.cir_enabled,
persist_vcir: !args.skip_vcir_persist,
build_ccr_accumulator: args.ccr_out_path.is_some(),
enable_roa_validation_cache: args.enable_roa_validation_cache,
};
let replay_mode = args.payload_replay_archive.is_some();
let delta_replay_mode = args.payload_base_archive.is_some();
@ -1899,11 +1978,13 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| format!("format timestamp failed: {e}"))?
};
let out_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("live")
.join("analyze")
.join(ts_compact);
let out_dir = args.analysis_out_path.clone().unwrap_or_else(|| {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("live")
.join("analyze")
.join(ts_compact)
});
std::fs::create_dir_all(&out_dir)
.map_err(|e| format!("create analyze out dir failed: {}: {e}", out_dir.display()))?;
@ -2344,6 +2425,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
);
let stage_timing = RunStageTiming {
validation_ms,
enable_roa_validation_cache: args.enable_roa_validation_cache,
report_build_ms,
report_write_ms,
ccr_build_ms,
@ -2362,6 +2444,11 @@ pub fn run(argv: &[String]) -> Result<(), String> {
rrdp_download_ms_total,
rsync_download_ms_total,
download_bytes_total,
roa_validation_cache: shared.roa_cache_stats.clone(),
analysis_counts: timing
.as_ref()
.map(|(_, handle)| handle.counts_snapshot())
.unwrap_or_default(),
vcir_storage_summary_ms,
vcir_storage,
memory_telemetry: Some(MemoryTelemetrySummary {

View File

@ -18,6 +18,7 @@ fn parse_help_returns_usage() {
assert!(err.contains("--rsync-scope"), "{err}");
assert!(err.contains("--parallel-phase2-object-workers"), "{err}");
assert!(err.contains("--memory-trim-after-validation"), "{err}");
assert!(err.contains("--enable-roa-validation-cache"), "{err}");
assert!(!err.contains("--parallel-phase1"), "{err}");
assert!(!err.contains("--parallel-phase2 "), "{err}");
}
@ -122,6 +123,58 @@ fn parse_disables_memory_trim_after_validation_by_default() {
assert!(!args.memory_trim_after_validation);
}
#[test]
fn parse_accepts_enable_roa_validation_cache() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--enable-roa-validation-cache".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.enable_roa_validation_cache);
}
#[test]
fn parse_disables_roa_validation_cache_by_default() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(!args.enable_roa_validation_cache);
}
#[test]
fn parse_accepts_analysis_out_and_implies_analyze() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--analysis-out".to_string(),
"run/analyze".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.analyze);
assert_eq!(
args.analysis_out_path.as_deref(),
Some(std::path::Path::new("run/analyze"))
);
}
#[test]
fn parse_accepts_report_json_compact_when_report_json_is_set() {
let argv = vec![
@ -157,6 +210,21 @@ fn parse_accepts_rsync_scope_policy() {
assert_eq!(args.rsync_scope_policy, RsyncScopePolicy::ModuleRoot);
}
#[test]
fn parse_accepts_host_rsync_scope_policy() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/x.tal".to_string(),
"--rsync-scope".to_string(),
"host".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(args.rsync_scope_policy, RsyncScopePolicy::Host);
}
#[test]
fn parse_rejects_invalid_rsync_scope_policy() {
let argv = vec![
@ -1380,6 +1448,7 @@ fn synthetic_post_validation_shared() -> PostValidationShared {
successful_tal_inputs: Vec::new(),
tree,
publication_points: vec![pp1, pp2, pp3],
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
downloads: Vec::new(),
download_stats: crate::audit::AuditDownloadStats::default(),
current_repo_objects: Vec::new(),
@ -1465,6 +1534,7 @@ fn run_report_task_and_stage_timing_work() {
let stage_timing = RunStageTiming {
validation_ms: 1,
enable_roa_validation_cache: false,
report_build_ms: report_output.report_build_ms,
report_write_ms: report_output.report_write_ms,
ccr_build_ms: Some(2),
@ -1483,6 +1553,8 @@ fn run_report_task_and_stage_timing_work() {
rrdp_download_ms_total: 13,
rsync_download_ms_total: 14,
download_bytes_total: 15,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats::default(),
analysis_counts: std::collections::HashMap::new(),
vcir_storage_summary_ms: Some(16),
vcir_storage: Some(VcirStorageSummary {
entry_count: 2,
@ -1558,6 +1630,7 @@ fn stage_timing_serializes_memory_telemetry() {
let report_path = dir.path().join("report.json");
let stage_timing = RunStageTiming {
validation_ms: 1,
enable_roa_validation_cache: true,
report_build_ms: 2,
report_write_ms: None,
ccr_build_ms: None,
@ -1576,6 +1649,14 @@ fn stage_timing_serializes_memory_telemetry() {
rrdp_download_ms_total: 8,
rsync_download_ms_total: 9,
download_bytes_total: 10,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats {
hit_roas: 2,
..crate::validation::objects::RoaValidationCacheStats::default()
},
analysis_counts: std::collections::HashMap::from([(
"roa_validation_cache_hit_roas".to_string(),
2,
)]),
vcir_storage_summary_ms: None,
vcir_storage: None,
memory_telemetry: Some(MemoryTelemetrySummary {
@ -1626,6 +1707,8 @@ fn stage_timing_serializes_memory_telemetry() {
checkpoint["rocksdb"]["totals"]["cur_size_all_mem_tables"],
16
);
assert_eq!(value["analysis_counts"]["roa_validation_cache_hit_roas"], 2);
assert_eq!(value["roa_validation_cache"]["hit_roas"], 2);
assert!(
value["memory_telemetry"]
.as_object()

View File

@ -50,6 +50,16 @@ pub trait RsyncFetcher: Send + Sync {
fn dedup_key(&self, rsync_base_uri: &str) -> String {
normalize_rsync_base_uri(rsync_base_uri)
}
/// Return an optional failure-only deduplication key.
///
/// This key is only used to short-circuit repeated failed rsync fallbacks. It must
/// not be used to reuse successful fetch results unless it is identical to
/// `dedup_key`, because a successful fetch for one publication point does not imply
/// that another publication point under the same host has been fetched.
fn failure_dedup_key(&self, _rsync_base_uri: &str) -> Option<String> {
None
}
}
/// A simple "rsync" implementation backed by a local directory.

View File

@ -15,23 +15,25 @@ use crate::fetch::rsync::{
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RsyncScopePolicy {
Host,
PublicationPoint,
ModuleRoot,
}
impl Default for RsyncScopePolicy {
fn default() -> Self {
Self::PublicationPoint
Self::Host
}
}
impl RsyncScopePolicy {
pub fn parse_cli_value(value: &str) -> Result<Self, String> {
match value {
"host" => Ok(Self::Host),
"publication-point" => Ok(Self::PublicationPoint),
"module-root" => Ok(Self::ModuleRoot),
_ => Err(format!(
"invalid --rsync-scope: {value}; expected publication-point or module-root"
"invalid --rsync-scope: {value}; expected host, publication-point, or module-root"
)),
}
}
@ -296,7 +298,9 @@ impl SystemRsyncFetcher {
fn scope_fetch_uri(&self, rsync_base_uri: &str) -> String {
match self.config.scope_policy {
RsyncScopePolicy::PublicationPoint => normalize_rsync_base_uri(rsync_base_uri),
RsyncScopePolicy::Host | RsyncScopePolicy::PublicationPoint => {
normalize_rsync_base_uri(rsync_base_uri)
}
RsyncScopePolicy::ModuleRoot => rsync_module_root_uri(rsync_base_uri)
.unwrap_or_else(|| normalize_rsync_base_uri(rsync_base_uri)),
}
@ -348,6 +352,21 @@ impl RsyncFetcher for SystemRsyncFetcher {
fn dedup_key(&self, rsync_base_uri: &str) -> String {
self.scope_fetch_uri(rsync_base_uri)
}
fn failure_dedup_key(&self, rsync_base_uri: &str) -> Option<String> {
match self.config.scope_policy {
RsyncScopePolicy::Host => rsync_host_scope_uri(rsync_base_uri),
RsyncScopePolicy::PublicationPoint | RsyncScopePolicy::ModuleRoot => None,
}
}
}
fn rsync_host_scope_uri(rsync_base_uri: &str) -> Option<String> {
let parsed = url::Url::parse(rsync_base_uri).ok()?;
if parsed.scheme() != "rsync" {
return None;
}
Some(format!("rsync://{}/", parsed.host_str()?))
}
struct TempDir {
@ -535,12 +554,57 @@ mod tests {
}
#[test]
fn system_rsync_dedup_key_uses_publication_point_scope_by_default() {
fn rsync_host_scope_uri_returns_host_only() {
assert_eq!(
rsync_host_scope_uri("rsync://example.net/repo/ta/ca/publication-point/"),
Some("rsync://example.net/".to_string())
);
assert_eq!(rsync_host_scope_uri("https://example.net/repo"), None);
}
#[test]
fn system_rsync_dedup_key_uses_host_scope_by_default() {
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig::default());
assert_eq!(
fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/ta/ca/publication-point/"
);
assert_eq!(
fetcher.failure_dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
Some("rsync://example.net/".to_string())
);
}
#[test]
fn system_rsync_host_scope_does_not_widen_success_fetch_scope() {
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
scope_policy: RsyncScopePolicy::Host,
..SystemRsyncConfig::default()
});
assert_eq!(
fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/ta/ca/publication-point/"
);
assert_eq!(
fetcher.scope_fetch_uri("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/ta/ca/publication-point/"
);
assert_eq!(
fetcher.failure_dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
Some("rsync://example.net/".to_string())
);
}
#[test]
fn system_rsync_dedup_key_uses_publication_point_scope_when_configured() {
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
scope_policy: RsyncScopePolicy::PublicationPoint,
..SystemRsyncConfig::default()
});
assert_eq!(
fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/ta/ca/publication-point/"
);
}
#[test]

View File

@ -76,6 +76,7 @@ pub struct Phase1RepoSyncRuntime<E: RepoTransportExecutor> {
coordinator: Mutex<GlobalRunCoordinator>,
worker_pool: Mutex<RepoTransportWorkerPool<E>>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
rsync_failure_scope_resolver: Arc<dyn Fn(&str) -> Option<String> + Send + Sync>,
sync_preference: SyncPreference,
}
@ -85,11 +86,28 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
worker_pool: RepoTransportWorkerPool<E>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
sync_preference: SyncPreference,
) -> Self {
Self::new_with_failure_scope(
coordinator,
worker_pool,
rsync_scope_resolver,
Arc::new(|_base: &str| None),
sync_preference,
)
}
pub fn new_with_failure_scope(
coordinator: GlobalRunCoordinator,
worker_pool: RepoTransportWorkerPool<E>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
rsync_failure_scope_resolver: Arc<dyn Fn(&str) -> Option<String> + Send + Sync>,
sync_preference: SyncPreference,
) -> Self {
Self {
coordinator: Mutex::new(coordinator),
worker_pool: Mutex::new(worker_pool),
rsync_scope_resolver,
rsync_failure_scope_resolver,
sync_preference,
}
}
@ -117,6 +135,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
let identity = Self::build_identity(ca);
let requester = Self::build_requester(ca);
let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri);
let rsync_failure_scope_uri = (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri);
let action = {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.register_transport_request(
@ -125,6 +144,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
time::OffsetDateTime::now_utc(),
priority,
rsync_scope_uri,
rsync_failure_scope_uri,
self.sync_preference,
)
};
@ -137,6 +157,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri,
"rsync_failure_scope_uri": task.rsync_failure_scope_uri,
"repo_key_notification_uri": task.repo_identity.notification_uri,
"priority": priority,
"transport_mode": match task.mode {
@ -160,6 +181,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": identity.rsync_base_uri,
"rsync_failure_scope_uri": (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri),
"repo_key_notification_uri": identity.notification_uri,
"priority": priority,
"runtime_state": format!("{state:?}"),
@ -175,6 +197,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": identity.rsync_base_uri,
"rsync_failure_scope_uri": result.rsync_failure_scope_uri,
"repo_key_notification_uri": identity.notification_uri,
"priority": priority,
"transport_mode": match result.mode {
@ -213,6 +236,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
"phase1_repo_task_dispatched",
serde_json::json!({
"repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri,
"rsync_failure_scope_uri": task.rsync_failure_scope_uri,
"repo_key_notification_uri": task.repo_identity.notification_uri,
"requester_count": task.requesters.len(),
"priority": task.priority,
@ -240,11 +264,12 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
return Ok(None);
};
let transport_identity = envelope.repo_identity.clone();
let completed_dedup_key = envelope.dedup_key.clone();
let completed_envelope = envelope.clone();
crate::progress_log::emit(
"phase1_repo_task_result",
serde_json::json!({
"repo_key_rsync_base_uri": envelope.repo_identity.rsync_base_uri,
"rsync_failure_scope_uri": envelope.rsync_failure_scope_uri,
"repo_key_notification_uri": envelope.repo_identity.notification_uri,
"timing_ms": envelope.timing_ms,
"transport_mode": match envelope.mode {
@ -283,7 +308,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
let completions = {
let coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator
.finalized_runtime_records_for_transport(&completed_dedup_key)
.finalized_runtime_records_for_transport_result(&completed_envelope)
.into_iter()
.filter_map(|record| {
let outcome = match record.state {
@ -506,6 +531,7 @@ mod tests {
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: task.mode,
tal_id: task.tal_id,
@ -531,6 +557,7 @@ mod tests {
self.count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: task.mode,
tal_id: task.tal_id,
@ -559,6 +586,7 @@ mod tests {
self.rrdp_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rrdp,
tal_id: task.tal_id,
@ -574,6 +602,7 @@ mod tests {
self.rsync_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rsync,
tal_id: task.tal_id,
@ -601,6 +630,7 @@ mod tests {
self.rrdp_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rrdp,
tal_id: task.tal_id,
@ -616,6 +646,7 @@ mod tests {
self.rsync_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rsync,
tal_id: task.tal_id,

File diff suppressed because it is too large Load Diff

View File

@ -85,6 +85,7 @@ impl<H: Fetcher + 'static> RepoTransportExecutor for LiveRrdpTransportExecutor<H
) {
Ok(_) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rrdp,
tal_id: task.tal_id,
@ -97,6 +98,7 @@ impl<H: Fetcher + 'static> RepoTransportExecutor for LiveRrdpTransportExecutor<H
},
Err(err) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rrdp,
tal_id: task.tal_id,
@ -151,6 +153,7 @@ impl<R: RsyncFetcher + 'static> RepoTransportExecutor for LiveRsyncTransportExec
) {
Ok(_) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rsync,
tal_id: task.tal_id,
@ -163,6 +166,7 @@ impl<R: RsyncFetcher + 'static> RepoTransportExecutor for LiveRsyncTransportExec
},
Err(err) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rsync,
tal_id: task.tal_id,
@ -660,6 +664,7 @@ mod tests {
dedup_key: RepoDedupKey::RrdpNotify {
notification_uri: notification_uri.to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: RepoIdentity::new(Some(notification_uri.to_string()), rsync_base_uri),
mode: RepoTransportMode::Rrdp,
tal_id: "arin".to_string(),
@ -684,6 +689,7 @@ mod tests {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: rsync_scope_uri.to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: RepoIdentity::new(None, rsync_base_uri),
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
@ -873,6 +879,7 @@ mod tests {
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri,
repo_identity: task.repo_identity,
mode: task.mode,
tal_id: task.tal_id,

View File

@ -123,6 +123,7 @@ impl GlobalRunCoordinator {
validation_time: time::OffsetDateTime,
priority: u8,
rsync_scope_uri: String,
rsync_failure_scope_uri: Option<String>,
sync_preference: SyncPreference,
) -> TransportRequestAction {
let action = self.transport_tables.register_transport_request(
@ -131,6 +132,7 @@ impl GlobalRunCoordinator {
validation_time,
priority,
rsync_scope_uri,
rsync_failure_scope_uri,
sync_preference,
);
match &action {
@ -205,6 +207,14 @@ impl GlobalRunCoordinator {
.finalized_runtime_records_for_transport(dedup_key)
}
pub fn finalized_runtime_records_for_transport_result(
&self,
result: &RepoTransportResultEnvelope,
) -> Vec<crate::parallel::repo_scheduler::RepoRuntimeRecord> {
self.transport_tables
.finalized_runtime_records_for_transport_result(result)
}
pub fn reset_run_state(&mut self) {
self.in_flight_repos.reset_run_state();
self.transport_tables.reset_run_state();
@ -296,6 +306,7 @@ mod tests {
time::OffsetDateTime::UNIX_EPOCH,
0,
"rsync://example.test/repo/".to_string(),
None,
SyncPreference::RrdpThenRsync,
);
assert!(matches!(

View File

@ -105,6 +105,7 @@ pub enum RepoTransportMode {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportTask {
pub dedup_key: RepoDedupKey,
pub rsync_failure_scope_uri: Option<String>,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
@ -129,6 +130,7 @@ pub enum RepoTransportResultKind {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportResultEnvelope {
pub dedup_key: RepoDedupKey,
pub rsync_failure_scope_uri: Option<String>,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
@ -218,6 +220,7 @@ impl RepoSyncTask {
) -> RepoTransportTask {
RepoTransportTask {
dedup_key,
rsync_failure_scope_uri: None,
repo_identity: self.repo_key.as_identity(),
mode,
tal_id: self.tal_id.clone(),
@ -450,6 +453,7 @@ mod tests {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: identity.clone(),
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
@ -464,6 +468,7 @@ mod tests {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: identity,
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),

View File

@ -1191,8 +1191,7 @@ impl VcirStorageEntrySummary {
child_resources: VcirChildResourceSizeBreakdown::from_vcir(vcir),
local_output_old_projection_bytes: field_sizes.local_output_old_projection_bytes(),
local_output_typed_projection_bytes: field_sizes.local_output_typed_projection_bytes(),
local_output_projection_saved_bytes: field_sizes
.local_output_projection_saved_bytes(),
local_output_projection_saved_bytes: field_sizes.local_output_projection_saved_bytes(),
field_sizes,
}
}
@ -2113,10 +2112,11 @@ impl RocksStore {
summary
.child_resources
.add_assign(&entry_summary.child_resources);
summary
.field_sizes
.add_assign(&entry_summary.field_sizes);
push_top_vcir_storage_entry(&mut summary.top_entries_by_vcir_value_bytes, entry_summary);
summary.field_sizes.add_assign(&entry_summary.field_sizes);
push_top_vcir_storage_entry(
&mut summary.top_entries_by_vcir_value_bytes,
entry_summary,
);
}
summary.local_output_old_projection_bytes =
summary.field_sizes.local_output_old_projection_bytes();

File diff suppressed because it is too large Load Diff

View File

@ -79,6 +79,7 @@ pub fn run_publication_point_once(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let result = runner

View File

@ -94,6 +94,7 @@ pub struct RunTreeFromTalAuditOutput {
pub successful_tal_inputs: Vec<TalInputSpec>,
pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>,
pub roa_cache_stats: crate::validation::objects::RoaValidationCacheStats,
pub downloads: Vec<crate::audit::AuditDownloadEvent>,
pub download_stats: crate::audit::AuditDownloadStats,
pub current_repo_objects: Vec<CurrentRepoObject>,
@ -132,6 +133,7 @@ fn make_live_runner<'a>(
parallel_phase2_config: Option<ParallelPhase2Config>,
ccr_accumulator: Option<CcrAccumulator>,
persist_vcir: bool,
enable_roa_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> {
let parallel_roa_worker_pool = parallel_phase2_config
.as_ref()
@ -156,6 +158,7 @@ fn make_live_runner<'a>(
parallel_roa_worker_pool,
ccr_accumulator: ccr_accumulator.map(Mutex::new),
persist_vcir,
enable_roa_validation_cache,
}
}
@ -186,13 +189,18 @@ where
);
let pool = RepoTransportWorkerPool::new(RepoWorkerPoolConfig::from(&parallel_config), executor)
.map_err(RunTreeFromTalError::Replay)?;
let resolver_rsync_fetcher_arc = Arc::clone(&rsync_fetcher_arc);
let resolver: Arc<dyn Fn(&str) -> String + Send + Sync> =
Arc::new(move |base: &str| rsync_fetcher_arc.dedup_key(base));
Arc::new(move |base: &str| resolver_rsync_fetcher_arc.dedup_key(base));
let failure_rsync_fetcher_arc = Arc::clone(&rsync_fetcher_arc);
let failure_resolver: Arc<dyn Fn(&str) -> Option<String> + Send + Sync> =
Arc::new(move |base: &str| failure_rsync_fetcher_arc.failure_dedup_key(base));
let _ = policy; // policy reserved for later runtime-level decisions
let runtime = Arc::new(Phase1RepoSyncRuntime::new(
let runtime = Arc::new(Phase1RepoSyncRuntime::new_with_failure_scope(
coordinator,
pool,
resolver,
failure_resolver,
policy.sync_preference,
));
Ok((runtime, current_repo_index))
@ -419,6 +427,7 @@ pub fn run_tree_from_tal_url_serial(
None,
None,
config.persist_vcir,
config.enable_roa_validation_cache,
);
let root = root_handle_from_trust_anchor(
@ -458,6 +467,7 @@ pub fn run_tree_from_tal_url_serial_audit(
None,
None,
config.persist_vcir,
config.enable_roa_validation_cache,
);
let root = root_handle_from_trust_anchor(
@ -469,6 +479,7 @@ pub fn run_tree_from_tal_url_serial_audit(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -479,6 +490,7 @@ pub fn run_tree_from_tal_url_serial_audit(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -515,6 +527,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
None,
None,
config.persist_vcir,
config.enable_roa_validation_cache,
);
let root = root_handle_from_trust_anchor(
@ -527,6 +540,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -537,6 +551,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -556,6 +571,7 @@ fn run_single_root_parallel_audit_inner<H, R>(
parallel_config: ParallelPhase1Config,
phase2_config: Option<ParallelPhase2Config>,
collect_current_repo_objects: bool,
timing: Option<TimingHandle>,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
@ -569,7 +585,7 @@ where
http_fetcher,
rsync_fetcher,
parallel_config,
None,
timing.clone(),
Some(download_log.clone()),
tal_inputs,
)?;
@ -580,7 +596,7 @@ where
http_fetcher,
rsync_fetcher,
validation_time,
None,
timing,
Some(download_log.clone()),
Some(current_repo_index),
Some(runtime),
@ -588,6 +604,7 @@ where
(phase2_enabled && config.build_ccr_accumulator)
.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])),
config.persist_vcir,
config.enable_roa_validation_cache,
);
let root = root_handle_from_trust_anchor(
@ -599,6 +616,7 @@ where
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = if phase2_enabled {
run_tree_parallel_phase2_audit(root, &runner, config)?
} else {
@ -612,6 +630,7 @@ where
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: snapshot_current_repo_objects(
@ -633,6 +652,7 @@ fn run_multi_root_parallel_audit_inner<H, R>(
parallel_config: ParallelPhase1Config,
phase2_config: Option<ParallelPhase2Config>,
collect_current_repo_objects: bool,
timing: Option<TimingHandle>,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
@ -673,7 +693,7 @@ where
http_fetcher,
rsync_fetcher,
parallel_config,
None,
timing.clone(),
Some(download_log.clone()),
successful_tal_inputs.clone(),
)?;
@ -684,7 +704,7 @@ where
http_fetcher,
rsync_fetcher,
validation_time,
None,
timing,
Some(download_log.clone()),
Some(current_repo_index),
Some(runtime),
@ -698,11 +718,13 @@ where
)
}),
config.persist_vcir,
config.enable_roa_validation_cache,
);
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = if phase2_enabled {
run_tree_parallel_phase2_audit_multi_root(root_handles, &runner, config)?
} else {
@ -716,6 +738,7 @@ where
successful_tal_inputs,
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: snapshot_current_repo_objects(
@ -755,6 +778,7 @@ where
parallel_config,
None,
collect_current_repo_objects,
None,
)
}
@ -806,6 +830,7 @@ where
parallel_config,
None,
collect_current_repo_objects,
None,
)
}
@ -835,6 +860,7 @@ where
parallel_config,
None,
collect_current_repo_objects,
None,
)
}
@ -868,6 +894,42 @@ where
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
None,
)
}
pub fn run_tree_from_tal_url_parallel_phase2_audit_with_timing<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_url: &str,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
collect_current_repo_objects: bool,
timing: &TimingHandle,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery =
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?;
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
vec![TalInputSpec::from_url(tal_url.to_string())],
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
Some(timing.clone()),
)
}
@ -920,6 +982,61 @@ where
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
None,
)
}
pub fn run_tree_from_tal_and_ta_der_parallel_phase2_audit_with_timing<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_bytes: &[u8],
ta_der: &[u8],
resolved_ta_uri: Option<&url::Url>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
collect_current_repo_objects: bool,
timing: &TimingHandle,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery = discover_root_ca_instance_from_tal_and_ta_der_with_policy(
policy,
tal_bytes,
ta_der,
resolved_ta_uri,
)?;
let derived_tal_id = derive_tal_id(&discovery);
let tal_inputs = vec![TalInputSpec {
tal_id: derived_tal_id.clone(),
rir_id: derived_tal_id,
source: TalSource::DerBytes {
tal_url: discovery
.tal_url
.clone()
.unwrap_or_else(|| "embedded-tal".to_string()),
tal_bytes: tal_bytes.to_vec(),
ta_der: ta_der.to_vec(),
},
}];
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
Some(timing.clone()),
)
}
@ -950,6 +1067,39 @@ where
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
None,
)
}
pub fn run_tree_from_multiple_tals_parallel_phase2_audit_with_timing<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_inputs: Vec<TalInputSpec>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
collect_current_repo_objects: bool,
timing: &TimingHandle,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
run_multi_root_parallel_audit_inner(
store,
policy,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
collect_current_repo_objects,
Some(timing.clone()),
)
}
@ -991,6 +1141,7 @@ pub fn run_tree_from_tal_and_ta_der_serial(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1043,6 +1194,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1054,6 +1206,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -1064,6 +1217,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1113,6 +1267,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1125,6 +1280,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
drop(_tree);
@ -1136,6 +1292,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1182,6 +1339,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1193,6 +1351,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -1203,6 +1362,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1252,6 +1412,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1264,6 +1425,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -1274,6 +1436,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1329,6 +1492,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1391,6 +1555,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1402,6 +1567,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -1412,6 +1578,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1471,6 +1638,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache,
};
let root = root_handle_from_trust_anchor(
@ -1483,6 +1651,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
@ -1493,6 +1662,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1509,6 +1679,7 @@ fn build_payload_replay_runner<'a>(
validation_time: time::OffsetDateTime,
timing: Option<TimingHandle>,
download_log: Option<DownloadLogHandle>,
enable_roa_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> {
Rpkiv1PublicationPointRunner {
store,
@ -1530,6 +1701,7 @@ fn build_payload_replay_runner<'a>(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache,
}
}
@ -1542,6 +1714,7 @@ fn build_payload_delta_replay_runner<'a>(
validation_time: time::OffsetDateTime,
timing: Option<TimingHandle>,
download_log: Option<DownloadLogHandle>,
enable_roa_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> {
Rpkiv1PublicationPointRunner {
store,
@ -1563,6 +1736,7 @@ fn build_payload_delta_replay_runner<'a>(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache,
}
}
@ -1575,6 +1749,7 @@ fn build_payload_delta_replay_current_store_runner<'a>(
validation_time: time::OffsetDateTime,
timing: Option<TimingHandle>,
download_log: Option<DownloadLogHandle>,
enable_roa_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> {
Rpkiv1PublicationPointRunner {
store,
@ -1596,6 +1771,7 @@ fn build_payload_delta_replay_current_store_runner<'a>(
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache,
}
}
@ -1648,6 +1824,7 @@ fn run_payload_delta_replay_audit_inner(
base_validation_time,
Some(t.clone()),
None,
config.enable_roa_validation_cache,
);
let _base = run_tree_serial(root.clone(), &base_runner, config)?;
} else {
@ -1660,6 +1837,7 @@ fn run_payload_delta_replay_audit_inner(
base_validation_time,
None,
None,
config.enable_roa_validation_cache,
);
let _base = run_tree_serial(root.clone(), &base_runner, config)?;
}
@ -1668,7 +1846,7 @@ fn run_payload_delta_replay_audit_inner(
.map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?;
let delta_rsync_fetcher = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone());
let download_log = DownloadLogHandle::new();
let (tree, publication_points) = if let Some(t) = timing.as_ref() {
let (tree, publication_points, roa_cache_stats) = if let Some(t) = timing.as_ref() {
let _phase = t.span_phase("payload_delta_replay_target_total");
let delta_runner = build_payload_delta_replay_runner(
store,
@ -1679,12 +1857,14 @@ fn run_payload_delta_replay_audit_inner(
base_validation_time,
Some(t.clone()),
Some(download_log.clone()),
config.enable_roa_validation_cache,
);
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points)
(tree, publication_points, roa_cache_stats)
} else {
let delta_runner = build_payload_delta_replay_runner(
store,
@ -1695,12 +1875,14 @@ fn run_payload_delta_replay_audit_inner(
validation_time,
None,
Some(download_log.clone()),
config.enable_roa_validation_cache,
);
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points)
(tree, publication_points, roa_cache_stats)
};
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
@ -1710,6 +1892,7 @@ fn run_payload_delta_replay_audit_inner(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -1822,7 +2005,7 @@ fn run_payload_delta_replay_step_audit_inner(
PayloadDeltaReplayCurrentStoreRsyncFetcher::new(store, delta_index.clone());
let download_log = DownloadLogHandle::new();
let (tree, publication_points) = if let Some(t) = timing.as_ref() {
let (tree, publication_points, roa_cache_stats) = if let Some(t) = timing.as_ref() {
let _phase = t.span_phase("payload_delta_replay_step_total");
let delta_runner = build_payload_delta_replay_current_store_runner(
store,
@ -1833,12 +2016,14 @@ fn run_payload_delta_replay_step_audit_inner(
validation_time,
Some(t.clone()),
Some(download_log.clone()),
config.enable_roa_validation_cache,
);
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points)
(tree, publication_points, roa_cache_stats)
} else {
let delta_runner = build_payload_delta_replay_current_store_runner(
store,
@ -1849,12 +2034,14 @@ fn run_payload_delta_replay_step_audit_inner(
validation_time,
None,
Some(download_log.clone()),
config.enable_roa_validation_cache,
);
let TreeRunAuditOutput {
tree,
publication_points,
roa_cache_stats,
} = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points)
(tree, publication_points, roa_cache_stats)
};
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
@ -1864,6 +2051,7 @@ fn run_payload_delta_replay_step_audit_inner(
successful_tal_inputs: Vec::new(),
tree,
publication_points,
roa_cache_stats,
downloads,
download_stats,
current_repo_objects: Vec::new(),
@ -2140,6 +2328,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.unwrap_err();
@ -2175,6 +2364,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run replay root-only audit");
@ -2220,6 +2410,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run replay root-only audit");
@ -2266,6 +2457,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
&timing,
)
@ -2322,6 +2514,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.unwrap_err();
@ -2354,6 +2547,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.unwrap_err();
@ -2406,6 +2600,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run delta replay root-only audit");
@ -2467,6 +2662,7 @@ mod replay_api_tests {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
&timing,
)

View File

@ -3,7 +3,9 @@ use crate::audit::PublicationPointAudit;
use crate::data_model::rc::{AsResourceSet, IpResourceSet};
use crate::report::Warning;
use crate::validation::manifest::PublicationPointSource;
use crate::validation::objects::{AspaAttestation, ObjectsOutput, RouterKeyPayload, Vrp};
use crate::validation::objects::{
AspaAttestation, ObjectsOutput, RoaValidationCacheStats, RouterKeyPayload, Vrp,
};
use crate::validation::publication_point::PublicationPointSnapshot;
#[derive(Clone, Debug, PartialEq, Eq)]
@ -18,6 +20,8 @@ pub struct TreeRunConfig {
pub persist_vcir: bool,
/// Build online CCR manifest projections during phase2 validation.
pub build_ccr_accumulator: bool,
/// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled.
pub enable_roa_validation_cache: bool,
}
impl Default for TreeRunConfig {
@ -28,6 +32,7 @@ impl Default for TreeRunConfig {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
}
}
}
@ -117,6 +122,7 @@ pub trait PublicationPointRunner {
pub struct TreeRunAuditOutput {
pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>,
pub roa_cache_stats: RoaValidationCacheStats,
}
pub fn run_tree_serial(
@ -169,6 +175,7 @@ pub fn run_tree_serial_audit_multi_root(
let mut aspas: Vec<AspaAttestation> = Vec::new();
let mut router_keys: Vec<RouterKeyPayload> = Vec::new();
let mut publication_points: Vec<PublicationPointAudit> = Vec::new();
let mut roa_cache_stats = RoaValidationCacheStats::default();
while let Some(node) = queue.pop_front() {
let ca = &node.handle;
@ -203,6 +210,7 @@ pub fn run_tree_serial_audit_multi_root(
instances_processed += 1;
warnings.extend(res.warnings);
warnings.extend(res.objects.warnings);
roa_cache_stats.add_assign(&res.objects.roa_cache_stats);
vrps.extend(res.objects.vrps);
aspas.extend(res.objects.aspas);
router_keys.extend(res.objects.router_keys);
@ -255,6 +263,7 @@ pub fn run_tree_serial_audit_multi_root(
router_keys,
},
publication_points,
roa_cache_stats,
})
}
@ -338,6 +347,7 @@ mod tests {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: children,

View File

@ -8,9 +8,11 @@ use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcom
use crate::parallel::types::RepoIdentity;
use crate::policy::SignedObjectFailurePolicy;
use crate::report::Warning;
use crate::validation::manifest::PublicationPointData;
use crate::validation::objects::{
ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage,
prepare_publication_point_for_parallel_roa, reduce_parallel_roa_stage,
RoaValidationCacheInput, prepare_publication_point_for_parallel_roa_with_cache,
reduce_parallel_roa_stage,
};
use crate::validation::tree::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner,
@ -888,7 +890,32 @@ fn stage_ready_publication_point(
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
let prepare_started = Instant::now();
match prepare_publication_point_for_parallel_roa(
let has_roa = fresh_stage
.fresh_point
.files()
.iter()
.any(|file| file.rsync_uri.ends_with(".roa"));
if runner.enable_roa_validation_cache {
if let Some(timing) = runner.timing.as_ref() {
if has_roa {
timing.record_count("roa_validation_cache_roa_candidate_publication_points", 1);
} else {
timing.record_count("roa_validation_cache_skipped_no_roa_publication_points", 1);
}
}
}
let roa_cache_view = if has_roa {
runner
.roa_validation_cache_view_for_fresh_point(&fresh_stage.fresh_point.manifest_rsync_uri)
} else {
None
};
let roa_cache = if runner.enable_roa_validation_cache && has_roa {
RoaValidationCacheInput::enabled(roa_cache_view.as_ref())
} else {
RoaValidationCacheInput::disabled()
};
match prepare_publication_point_for_parallel_roa_with_cache(
ready.node.id,
&fresh_stage.fresh_point,
runner.policy,
@ -898,6 +925,7 @@ fn stage_ready_publication_point(
ready.node.handle.effective_as_resources.as_ref(),
runner.validation_time,
runner.persist_vcir,
roa_cache,
) {
ParallelObjectsPrepare::Complete(mut objects) => {
metrics.prepare_ms = elapsed_ms(prepare_started);
@ -1571,6 +1599,7 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
let mut aspas = Vec::new();
let mut router_keys = Vec::new();
let mut publication_points = Vec::new();
let mut roa_cache_stats = crate::validation::objects::RoaValidationCacheStats::default();
for item in finished {
match item.result {
@ -1582,6 +1611,7 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
instances_processed += 1;
warnings.extend(result_warnings);
warnings.extend(objects.warnings);
roa_cache_stats.add_assign(&objects.roa_cache_stats);
vrps.extend(objects.vrps);
aspas.extend(objects.aspas);
router_keys.extend(objects.router_keys);
@ -1612,6 +1642,7 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
router_keys,
},
publication_points,
roa_cache_stats,
}
}
@ -1669,6 +1700,7 @@ mod tests {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),

View File

@ -45,9 +45,9 @@ use crate::validation::manifest::{
process_manifest_publication_point_fresh_after_repo_sync_with_timing,
};
use crate::validation::objects::{
AspaAttestation, ParallelRoaWorkerPool, RouterKeyPayload, Vrp,
process_publication_point_for_issuer_parallel_roa_with_options,
process_publication_point_for_issuer_parallel_roa_with_pool_options,
AspaAttestation, ParallelRoaWorkerPool, RoaValidationCacheInput, RoaValidationCacheView,
RouterKeyPayload, Vrp, process_publication_point_for_issuer_parallel_roa_with_cache_options,
process_publication_point_for_issuer_parallel_roa_with_pool_cache_options,
};
use crate::validation::publication_point::PublicationPointSnapshot;
use crate::validation::tree::{
@ -151,9 +151,59 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
/// This is intended for replay/compare-only runs where the caller does not need
/// the resulting DB to be reused by a later delta run.
pub persist_vcir: bool,
pub enable_roa_validation_cache: bool,
}
impl<'a> Rpkiv1PublicationPointRunner<'a> {
pub(crate) fn roa_validation_cache_view_for_fresh_point(
&self,
manifest_rsync_uri: &str,
) -> Option<RoaValidationCacheView> {
if !self.enable_roa_validation_cache {
return None;
}
let load_started = std::time::Instant::now();
let loaded_vcir = self.store.get_vcir(manifest_rsync_uri);
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"roa_validation_cache_vcir_load_total",
load_started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64,
);
}
match loaded_vcir {
Ok(Some(vcir)) => {
let view_started = std::time::Instant::now();
let view = RoaValidationCacheView::from_vcir(&vcir, self.validation_time);
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"roa_validation_cache_view_build_total",
view_started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64,
);
}
Some(view)
}
Ok(None) => {
if let Some(timing) = self.timing.as_ref() {
timing.record_count("roa_validation_cache_vcir_missing_publication_points", 1);
}
None
}
Err(err) => {
if let Some(timing) = self.timing.as_ref() {
timing.record_count("roa_validation_cache_vcir_load_errors", 1);
}
crate::progress_log::emit(
"roa_validation_cache_vcir_load_error",
serde_json::json!({
"manifest_rsync_uri": manifest_rsync_uri,
"error": err.to_string(),
}),
);
None
}
}
}
pub(crate) fn ccr_accumulator_snapshot(&self) -> Option<CcrAccumulator> {
self.ccr_accumulator
.as_ref()
@ -653,6 +703,35 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
} = stage;
warnings.extend(stage_warnings);
let has_roa = fresh_point
.files()
.iter()
.any(|file| file.rsync_uri.ends_with(".roa"));
if self.enable_roa_validation_cache {
if let Some(timing) = self.timing.as_ref() {
if has_roa {
timing.record_count(
"roa_validation_cache_roa_candidate_publication_points",
1,
);
} else {
timing.record_count(
"roa_validation_cache_skipped_no_roa_publication_points",
1,
);
}
}
}
let roa_cache_view = if has_roa {
self.roa_validation_cache_view_for_fresh_point(fresh_point.manifest_rsync_uri())
} else {
None
};
let roa_cache = if self.enable_roa_validation_cache && has_roa {
RoaValidationCacheInput::enabled(roa_cache_view.as_ref())
} else {
RoaValidationCacheInput::disabled()
};
let objects_processing_started = std::time::Instant::now();
let mut objects = {
let _objects_total = self
@ -660,7 +739,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
.as_ref()
.map(|t| t.span_phase("objects_processing_total"));
if let Some(phase2_pool) = self.parallel_roa_worker_pool.as_ref() {
process_publication_point_for_issuer_parallel_roa_with_pool_options(
process_publication_point_for_issuer_parallel_roa_with_pool_cache_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -671,9 +750,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.timing.as_ref(),
phase2_pool,
false,
roa_cache,
)
} else if let Some(phase2_config) = self.parallel_phase2_config.as_ref() {
process_publication_point_for_issuer_parallel_roa_with_options(
process_publication_point_for_issuer_parallel_roa_with_cache_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -684,9 +764,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.timing.as_ref(),
phase2_config,
false,
roa_cache,
)
} else {
crate::validation::objects::process_publication_point_for_issuer_with_options(
crate::validation::objects::process_publication_point_for_issuer_with_cache_options(
&fresh_point,
self.policy,
&ca.ca_certificate_der,
@ -696,6 +777,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.validation_time,
self.timing.as_ref(),
false,
roa_cache,
)
}
};
@ -2017,6 +2099,7 @@ fn empty_objects_output() -> crate::validation::objects::ObjectsOutput {
warnings: Vec::new(),
stats: crate::validation::objects::ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
}
}
@ -2549,29 +2632,6 @@ fn restore_children_from_vcir(
(children, audits)
}
fn persist_vcir_for_fresh_result(
store: &RocksStore,
ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot,
objects: &mut crate::validation::objects::ObjectsOutput,
warnings: &[Warning],
child_audits: &[ObjectAuditEntry],
discovered_children: &[DiscoveredChildCaInstance],
validation_time: time::OffsetDateTime,
) -> Result<(), String> {
persist_vcir_for_fresh_result_with_timing(
store,
ca,
pack,
objects,
warnings,
child_audits,
discovered_children,
validation_time,
)
.map(|_timing| ())
}
fn persist_vcir_for_fresh_result_with_timing(
store: &RocksStore,
ca: &CaInstanceHandle,
@ -2616,27 +2676,6 @@ fn persist_vcir_for_fresh_result_with_timing(
Ok(timing)
}
fn build_vcir_from_fresh_result(
ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot,
objects: &mut crate::validation::objects::ObjectsOutput,
warnings: &[Warning],
child_audits: &[ObjectAuditEntry],
discovered_children: &[DiscoveredChildCaInstance],
validation_time: time::OffsetDateTime,
) -> Result<ValidatedCaInstanceResult, String> {
build_vcir_from_fresh_result_with_timing(
ca,
pack,
objects,
warnings,
child_audits,
discovered_children,
validation_time,
)
.map(|(vcir, _timing)| vcir)
}
fn build_vcir_from_fresh_result_with_timing(
ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot,

View File

@ -68,6 +68,7 @@ fn sample_runner_with_ccr_accumulator<'a>(
parallel_roa_worker_pool: None,
ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))),
persist_vcir: true,
enable_roa_validation_cache: false,
}
}
@ -762,6 +763,7 @@ fn build_vcir_local_outputs_prefers_cached_outputs() {
warnings: Vec::new(),
stats: crate::validation::objects::ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
},
)
.expect("reuse cached outputs");
@ -957,6 +959,7 @@ fn finalize_fresh_publication_point_releases_local_outputs_cache_after_persist()
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let ca = CaInstanceHandle {
depth: 0,
@ -1057,7 +1060,7 @@ fn persist_vcir_for_fresh_result_stores_vcir_and_replay_meta_for_real_snapshot()
};
let mut objects = objects;
persist_vcir_for_fresh_result(
persist_vcir_for_fresh_result_with_timing(
&store,
&ca,
&pack,
@ -1067,6 +1070,7 @@ fn persist_vcir_for_fresh_result_stores_vcir_and_replay_meta_for_real_snapshot()
&[],
validation_time,
)
.map(|_timing| ())
.expect("persist vcir for fresh result");
let vcir = store
@ -1265,6 +1269,7 @@ fn build_vcir_related_artifacts_classifies_snapshot_files_and_audit_statuses() {
detail: Some("skipped aspa".to_string()),
},
],
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
};
let artifacts = build_vcir_related_artifacts(
&ca,
@ -1516,6 +1521,7 @@ fn runner_offline_rsync_fixture_produces_pack_and_warnings() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
// For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for
@ -1577,10 +1583,123 @@ fn runner_offline_rsync_fixture_produces_pack_and_warnings() {
.get_manifest_replay_meta(&manifest_rsync_uri)
.expect("get replay meta")
.expect("replay meta exists");
assert_eq!(replay_meta.manifest_rsync_uri, manifest_rsync_uri);
}
#[test]
fn runner_roa_validation_cache_reuses_vcir_outputs_on_second_fixture_run() {
let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0");
assert!(fixture_dir.is_dir(), "fixture directory must exist");
let rsync_base_uri = "rsync://rpki.cernet.net/repo/cernet/0/".to_string();
let manifest_file = "05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft";
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let fixture_manifest_bytes =
std::fs::read(fixture_dir.join(manifest_file)).expect("read manifest fixture");
let fixture_manifest =
crate::data_model::manifest::ManifestObject::decode_der(&fixture_manifest_bytes)
.expect("decode manifest fixture");
let validation_time = fixture_manifest.manifest.this_update + time::Duration::seconds(60);
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy {
sync_preference: crate::policy::SyncPreference::RsyncOnly,
..Policy::default()
};
let issuer_ca_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
),
)
.expect("read issuer ca fixture");
let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca");
let handle = CaInstanceHandle {
depth: 0,
tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: issuer_ca_der,
ca_certificate_rsync_uri: Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string()),
effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(),
effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(),
rsync_base_uri: rsync_base_uri.clone(),
manifest_rsync_uri: manifest_rsync_uri.clone(),
publication_point_rsync_uri: rsync_base_uri,
rrdp_notification_uri: None,
};
let first_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = first_runner
.run_publication_point(&handle)
.expect("first fresh run");
assert!(first.objects.vrps.len() > 1);
assert_eq!(first.objects.roa_cache_stats.hit_roas, 0);
let second_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: true,
};
let second = second_runner
.run_publication_point(&handle)
.expect("second cache-enabled run");
assert_eq!(second.objects.vrps, first.objects.vrps);
assert_eq!(second.objects.roa_cache_stats.enabled_publication_points, 1);
assert_eq!(
replay_meta.manifest_rsync_uri,
manifest_rsync_uri
second.objects.roa_cache_stats.vcir_hit_publication_points,
1
);
assert_eq!(
second.objects.roa_cache_stats.vcir_miss_publication_points,
0
);
assert!(second.objects.roa_cache_stats.hit_roas > 1);
assert_eq!(second.objects.roa_cache_stats.miss_roas, 0);
assert_eq!(second.objects.roa_cache_stats.blocked_roas, 0);
assert_eq!(second.objects.roa_cache_stats.fresh_roas, 0);
}
#[test]
@ -1669,6 +1788,7 @@ fn runner_rsync_dedup_skips_second_sync_for_same_base() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -1782,6 +1902,7 @@ fn runner_rsync_dedup_skips_second_sync_for_same_module_scope() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -1898,6 +2019,7 @@ fn runner_rsync_dedup_works_in_rsync_only_mode_even_when_rrdp_notify_exists() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -1985,6 +2107,7 @@ fn runner_when_repo_sync_fails_uses_current_instance_vcir_and_keeps_children_emp
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = ok_runner
.run_publication_point(&handle)
@ -2016,6 +2139,7 @@ fn runner_when_repo_sync_fails_uses_current_instance_vcir_and_keeps_children_emp
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let second = bad_runner
.run_publication_point(&handle)
@ -2063,6 +2187,7 @@ fn build_publication_point_audit_emits_no_audit_entry_for_duplicate_pack_uri() {
warnings: Vec::new(),
stats: crate::validation::objects::ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
};
let audit = build_publication_point_audit_from_snapshot(
@ -2131,6 +2256,7 @@ fn build_publication_point_audit_marks_invalid_crl_as_error_and_overlays_roa_aud
result: AuditObjectResult::Ok,
detail: None,
}],
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
};
let audit = build_publication_point_audit_from_snapshot(
@ -2822,7 +2948,7 @@ fn fresh_and_reuse_paths_produce_equivalent_ccr_manifest_projection() {
discover_children_from_fresh_snapshot_with_audit(&ca, &pack, validation_time, None)
.expect("discover children");
let mut objects = empty_objects_output();
let fresh_vcir = build_vcir_from_fresh_result(
let (fresh_vcir, _timing) = build_vcir_from_fresh_result_with_timing(
&ca,
&pack,
&mut objects,
@ -3097,6 +3223,7 @@ fn build_publication_point_audit_from_vcir_uses_vcir_metadata_and_overlays_child
result: AuditObjectResult::Error,
detail: Some("overridden from object audit".to_string()),
}],
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
};
let child_audits = vec![ObjectAuditEntry {
rsync_uri: vcir.child_entries[0].child_cert_rsync_uri.clone(),
@ -3192,6 +3319,7 @@ fn build_publication_point_audit_from_vcir_failed_no_cache_keeps_current_reject_
result: AuditObjectResult::Error,
detail: Some("manifest is not valid at validation_time".to_string()),
}],
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
};
let audit = build_publication_point_audit_from_vcir(
@ -3327,6 +3455,7 @@ fn build_publication_point_audit_from_vcir_without_cached_inputs_returns_empty_l
warnings: vec![Warning::new("object warning")],
stats: crate::validation::objects::ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
},
&[],
);
@ -3609,6 +3738,7 @@ fn runner_dedup_paths_execute_with_timing_enabled() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let first = runner_rrdp
.run_publication_point(&handle)
@ -3643,6 +3773,7 @@ fn runner_dedup_paths_execute_with_timing_enabled() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let third = runner_rsync
.run_publication_point(&handle)

View File

@ -186,6 +186,7 @@ fn apnic_tree_full_stats_serial() {
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
};
let stats = RefCell::new(LiveStats::default());
@ -217,6 +218,7 @@ fn apnic_tree_full_stats_serial() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree");

View File

@ -39,6 +39,7 @@ fn apnic_tree_depth1_processes_more_than_root() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree from tal");
@ -80,6 +81,7 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run APNIC root-only");

View File

@ -111,6 +111,7 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree audit");

View File

@ -117,6 +117,7 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree");
@ -164,6 +165,7 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree");
@ -219,6 +221,7 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree audit");
@ -262,6 +265,7 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree audit");
@ -313,6 +317,7 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
&timing,
)
@ -363,6 +368,7 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
&timing,
)

View File

@ -117,6 +117,7 @@ fn tree_continues_when_a_publication_point_fails() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: vec![
@ -143,6 +144,7 @@ fn tree_continues_when_a_publication_point_fails() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),

View File

@ -127,6 +127,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: root_children,
@ -149,6 +150,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: child1_children,
@ -171,6 +173,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),
@ -193,6 +196,7 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),
@ -252,6 +256,7 @@ fn tree_respects_max_depth_and_max_instances() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: vec![discovered_child(root_manifest, child_manifest)],
@ -274,6 +279,7 @@ fn tree_respects_max_depth_and_max_instances() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),
@ -289,6 +295,7 @@ fn tree_respects_max_depth_and_max_instances() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree depth-limited");
@ -304,6 +311,7 @@ fn tree_respects_max_depth_and_max_instances() {
compact_audit: false,
persist_vcir: true,
build_ccr_accumulator: true,
enable_roa_validation_cache: false,
},
)
.expect("run tree instance-limited");
@ -331,6 +339,7 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: vec![discovered_child(root_manifest, child_manifest)],
@ -353,6 +362,7 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),
@ -421,6 +431,7 @@ fn tree_aggregates_router_keys_from_publication_point_results() {
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),
@ -462,6 +473,7 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: vec![first, second],
@ -484,6 +496,7 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats: Default::default(),
},
audit: PublicationPointAudit::default(),
discovered_children: Vec::new(),