20260612 优化CIR在线累计恢复性能

This commit is contained in:
yuyr 2026-06-12 14:26:49 +08:00
parent 4047214ddf
commit 4e6bd687db
13 changed files with 715 additions and 63 deletions

View File

@ -87,10 +87,6 @@ pub struct PublicationPointAudit {
pub warnings: Vec<AuditWarning>, pub warnings: Vec<AuditWarning>,
pub objects: Vec<ObjectAuditEntry>, pub objects: Vec<ObjectAuditEntry>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub cir_fresh_objects: Vec<ObjectAuditEntry>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub cir_cached_objects: Vec<ObjectAuditEntry>,
} }
#[derive(Clone, Debug, PartialEq, Eq, Serialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize)]
@ -163,6 +159,30 @@ pub struct AuditDownloadStats {
pub by_kind: std::collections::BTreeMap<String, AuditDownloadKindStats>, pub by_kind: std::collections::BTreeMap<String, AuditDownloadKindStats>,
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn publication_point_audit_serializes_object_audit_only() {
let audit = PublicationPointAudit {
objects: vec![ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/fresh.roa".to_string(),
sha256_hex: "11".repeat(32),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Ok,
detail: None,
}],
..PublicationPointAudit::default()
};
let value = serde_json::to_value(&audit).expect("serialize audit");
assert!(value.get("objects").is_some());
assert!(value.get("cir_fresh_objects").is_none());
assert!(value.get("cir_cached_objects").is_none());
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct AuditRepoSyncStateStat { pub struct AuditRepoSyncStateStat {
pub count: u64, pub count: u64,

283
src/cir/accumulator.rs Normal file
View File

@ -0,0 +1,283 @@
use std::collections::HashMap;
use crate::audit::{AuditObjectResult, ObjectAuditEntry};
use crate::cir::export::CirExportError;
use crate::cir::model::{CirObject, CirRejectedObject};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CirInputSection {
Fresh,
Cached,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct CirInputSnapshot {
pub fresh_validated_objects: Vec<CirObject>,
pub cached_validated_objects: Vec<CirObject>,
pub fresh_rejected_objects: Vec<CirRejectedObject>,
pub cached_rejected_objects: Vec<CirRejectedObject>,
}
#[derive(Clone, Debug, Default)]
pub struct CirInputAccumulator {
fresh_objects: HashMap<String, [u8; 32]>,
cached_objects: HashMap<String, [u8; 32]>,
fresh_rejects: HashMap<String, Option<String>>,
cached_rejects: HashMap<String, Option<String>>,
}
impl CirInputAccumulator {
pub fn submit_audit_entries(
&mut self,
section: CirInputSection,
entries: &[ObjectAuditEntry],
) -> Result<(), CirExportError> {
for entry in entries {
if !matches!(
entry.result,
AuditObjectResult::Ok | AuditObjectResult::Error
) {
continue;
}
self.insert_object(section, &entry.rsync_uri, &entry.sha256_hex)?;
if entry.result == AuditObjectResult::Error {
self.insert_reject(section, &entry.rsync_uri, entry.detail.clone());
}
}
Ok(())
}
pub fn submit_snapshot(&mut self, snapshot: CirInputSnapshot) -> Result<(), CirExportError> {
for object in snapshot.fresh_validated_objects {
self.insert_object_bytes(CirInputSection::Fresh, &object.rsync_uri, object.sha256)?;
}
for object in snapshot.cached_validated_objects {
self.insert_object_bytes(CirInputSection::Cached, &object.rsync_uri, object.sha256)?;
}
for rejected in snapshot.fresh_rejected_objects {
self.insert_reject(
CirInputSection::Fresh,
&rejected.object_uri,
rejected.reason,
);
}
for rejected in snapshot.cached_rejected_objects {
self.insert_reject(
CirInputSection::Cached,
&rejected.object_uri,
rejected.reason,
);
}
Ok(())
}
pub fn insert_object(
&mut self,
section: CirInputSection,
rsync_uri: &str,
sha256_hex: &str,
) -> Result<(), CirExportError> {
if !rsync_uri.starts_with("rsync://") || !is_sha256_hex(sha256_hex) {
return Ok(());
}
let mut digest = [0u8; 32];
hex::decode_to_slice(sha256_hex, &mut digest).expect("validated sha256 hex");
self.insert_object_digest(section, rsync_uri, digest)
}
fn insert_object_bytes(
&mut self,
section: CirInputSection,
rsync_uri: &str,
sha256: Vec<u8>,
) -> Result<(), CirExportError> {
if !rsync_uri.starts_with("rsync://") {
return Ok(());
}
let Ok(digest) = <[u8; 32]>::try_from(sha256) else {
return Ok(());
};
self.insert_object_digest(section, rsync_uri, digest)
}
fn insert_object_digest(
&mut self,
section: CirInputSection,
rsync_uri: &str,
digest: [u8; 32],
) -> Result<(), CirExportError> {
let objects = match section {
CirInputSection::Fresh => &mut self.fresh_objects,
CirInputSection::Cached => &mut self.cached_objects,
};
if let Some(existing) = objects.get(rsync_uri) {
if existing != &digest {
return Err(CirExportError::ConflictingObjectHash {
rsync_uri: rsync_uri.to_string(),
first: hex::encode(existing),
second: hex::encode(digest),
});
}
return Ok(());
}
objects.insert(rsync_uri.to_string(), digest);
Ok(())
}
pub fn insert_reject(
&mut self,
section: CirInputSection,
rsync_uri: &str,
reason: Option<String>,
) {
if !rsync_uri.starts_with("rsync://") {
return;
}
let rejects = match section {
CirInputSection::Fresh => &mut self.fresh_rejects,
CirInputSection::Cached => &mut self.cached_rejects,
};
rejects.entry(rsync_uri.to_string()).or_insert(reason);
}
pub fn finalize(self) -> CirInputSnapshot {
CirInputSnapshot {
fresh_validated_objects: finalize_objects(self.fresh_objects),
cached_validated_objects: finalize_objects(self.cached_objects),
fresh_rejected_objects: finalize_rejects(self.fresh_rejects),
cached_rejected_objects: finalize_rejects(self.cached_rejects),
}
}
}
fn finalize_objects(objects: HashMap<String, [u8; 32]>) -> Vec<CirObject> {
let mut objects = objects
.into_iter()
.map(|(rsync_uri, sha256)| CirObject {
rsync_uri,
sha256: sha256.to_vec(),
})
.collect::<Vec<_>>();
objects.sort_by(|a, b| a.rsync_uri.cmp(&b.rsync_uri));
objects
}
fn finalize_rejects(rejects: HashMap<String, Option<String>>) -> Vec<CirRejectedObject> {
let mut rejects = rejects
.into_iter()
.map(|(object_uri, reason)| CirRejectedObject { object_uri, reason })
.collect::<Vec<_>>();
rejects.sort_by(|a, b| a.object_uri.cmp(&b.object_uri));
rejects
}
fn is_sha256_hex(value: &str) -> bool {
value.len() == 64 && value.as_bytes().iter().all(u8::is_ascii_hexdigit)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audit::{AuditObjectKind, AuditObjectResult};
fn entry(uri: &str, hash_byte: u8, result: AuditObjectResult) -> ObjectAuditEntry {
let is_error = result == AuditObjectResult::Error;
ObjectAuditEntry {
rsync_uri: uri.to_string(),
sha256_hex: format!("{}", hex::encode([hash_byte; 32])),
kind: AuditObjectKind::Roa,
result,
detail: is_error.then(|| "invalid".to_string()),
}
}
#[test]
fn accumulator_finalizes_sorted_fresh_and_cached_sections() {
let mut acc = CirInputAccumulator::default();
acc.submit_audit_entries(
CirInputSection::Fresh,
&[
entry("rsync://example.net/z.roa", 0x22, AuditObjectResult::Ok),
entry("rsync://example.net/a.roa", 0x11, AuditObjectResult::Error),
],
)
.unwrap();
acc.submit_audit_entries(
CirInputSection::Cached,
&[entry(
"rsync://example.net/c.roa",
0x33,
AuditObjectResult::Ok,
)],
)
.unwrap();
let snapshot = acc.finalize();
assert_eq!(
snapshot.fresh_validated_objects[0].rsync_uri,
"rsync://example.net/a.roa"
);
assert_eq!(
snapshot.fresh_validated_objects[1].rsync_uri,
"rsync://example.net/z.roa"
);
assert_eq!(
snapshot.cached_validated_objects[0].rsync_uri,
"rsync://example.net/c.roa"
);
assert_eq!(
snapshot.fresh_rejected_objects[0].object_uri,
"rsync://example.net/a.roa"
);
assert!(snapshot.cached_rejected_objects.is_empty());
}
#[test]
fn accumulator_rejects_conflicting_hashes_within_same_section() {
let mut acc = CirInputAccumulator::default();
acc.submit_audit_entries(
CirInputSection::Fresh,
&[entry(
"rsync://example.net/a.roa",
0x11,
AuditObjectResult::Ok,
)],
)
.unwrap();
let err = acc
.submit_audit_entries(
CirInputSection::Fresh,
&[entry(
"rsync://example.net/a.roa",
0x22,
AuditObjectResult::Ok,
)],
)
.unwrap_err();
assert!(matches!(err, CirExportError::ConflictingObjectHash { .. }));
}
#[test]
fn accumulator_merges_finalized_snapshots() {
let snapshot = CirInputSnapshot {
fresh_validated_objects: vec![CirObject {
rsync_uri: "rsync://example.net/fresh.roa".to_string(),
sha256: vec![0x11; 32],
}],
cached_validated_objects: vec![CirObject {
rsync_uri: "rsync://example.net/cached.roa".to_string(),
sha256: vec![0x22; 32],
}],
fresh_rejected_objects: vec![CirRejectedObject {
object_uri: "rsync://example.net/fresh.roa".to_string(),
reason: Some("fresh invalid".to_string()),
}],
cached_rejected_objects: Vec::new(),
};
let mut acc = CirInputAccumulator::default();
acc.submit_snapshot(snapshot).unwrap();
let merged = acc.finalize();
assert_eq!(merged.fresh_validated_objects.len(), 1);
assert_eq!(merged.cached_validated_objects.len(), 1);
assert_eq!(merged.fresh_rejected_objects.len(), 1);
}
}

View File

@ -3,6 +3,7 @@ use std::collections::BTreeSet;
use std::path::Path; use std::path::Path;
use crate::audit::{AuditObjectResult, ObjectAuditEntry, PublicationPointAudit}; use crate::audit::{AuditObjectResult, ObjectAuditEntry, PublicationPointAudit};
use crate::cir::accumulator::CirInputSnapshot;
use crate::cir::encode::{CirEncodeError, encode_cir}; use crate::cir::encode::{CirEncodeError, encode_cir};
use crate::cir::model::{ use crate::cir::model::{
CanonicalInputRepresentation, CirObject, CirRejectedObject, CirTrustAnchor, CanonicalInputRepresentation, CirObject, CirRejectedObject, CirTrustAnchor,
@ -104,10 +105,6 @@ enum CirObjectSection {
Cached, Cached,
} }
fn publication_point_uses_explicit_cir_sections(pp: &PublicationPointAudit) -> bool {
!pp.cir_fresh_objects.is_empty() || !pp.cir_cached_objects.is_empty()
}
fn publication_point_is_cached_fallback(pp: &PublicationPointAudit) -> bool { fn publication_point_is_cached_fallback(pp: &PublicationPointAudit) -> bool {
pp.source == "vcir_current_instance" || pp.repo_terminal_state == "fallback_current_instance" pp.source == "vcir_current_instance" || pp.repo_terminal_state == "fallback_current_instance"
} }
@ -116,13 +113,6 @@ fn publication_point_cir_entries<'a>(
pp: &'a PublicationPointAudit, pp: &'a PublicationPointAudit,
section: CirObjectSection, section: CirObjectSection,
) -> &'a [ObjectAuditEntry] { ) -> &'a [ObjectAuditEntry] {
if publication_point_uses_explicit_cir_sections(pp) {
return match section {
CirObjectSection::Fresh => &pp.cir_fresh_objects,
CirObjectSection::Cached => &pp.cir_cached_objects,
};
}
match (section, publication_point_is_cached_fallback(pp)) { match (section, publication_point_is_cached_fallback(pp)) {
(CirObjectSection::Fresh, false) => &pp.objects, (CirObjectSection::Fresh, false) => &pp.objects,
(CirObjectSection::Cached, true) => &pp.objects, (CirObjectSection::Cached, true) => &pp.objects,
@ -191,6 +181,31 @@ fn canonical_ta_rsync_uri(trust_anchor: &TrustAnchor) -> Result<String, CirExpor
.ok_or(CirExportError::MissingTaRsyncUri) .ok_or(CirExportError::MissingTaRsyncUri)
} }
fn build_cir_trust_anchors(
tal_bindings: &[CirTrustAnchorBinding<'_>],
) -> Result<Vec<CirTrustAnchor>, CirExportError> {
for binding in tal_bindings {
if !(binding.tal_uri.starts_with("https://") || binding.tal_uri.starts_with("http://")) {
return Err(CirExportError::InvalidTalUri(binding.tal_uri.to_string()));
}
}
let mut trust_anchors = Vec::with_capacity(tal_bindings.len());
for binding in tal_bindings {
let ta_rsync_uri = canonical_ta_rsync_uri(binding.trust_anchor)?;
let ta_certificate_der = binding.trust_anchor.ta_certificate.raw_der.clone();
trust_anchors.push(CirTrustAnchor {
ta_rsync_uri,
tal_uri: binding.tal_uri.to_string(),
tal_bytes: binding.trust_anchor.tal.raw.clone(),
ta_certificate_sha256: crate::cir::model::sha256(&ta_certificate_der),
ta_certificate_der,
});
}
trust_anchors.sort_by(|a, b| a.ta_rsync_uri.cmp(&b.ta_rsync_uri));
Ok(trust_anchors)
}
pub fn build_cir_from_run( pub fn build_cir_from_run(
store: &RocksStore, store: &RocksStore,
trust_anchor: &TrustAnchor, trust_anchor: &TrustAnchor,
@ -217,30 +232,12 @@ pub fn build_cir_from_run_multi(
publication_points: &[PublicationPointAudit], publication_points: &[PublicationPointAudit],
_current_repo_objects: Option<&[CurrentRepoObject]>, _current_repo_objects: Option<&[CurrentRepoObject]>,
) -> Result<CanonicalInputRepresentation, CirExportError> { ) -> Result<CanonicalInputRepresentation, CirExportError> {
for binding in tal_bindings {
if !(binding.tal_uri.starts_with("https://") || binding.tal_uri.starts_with("http://")) {
return Err(CirExportError::InvalidTalUri(binding.tal_uri.to_string()));
}
}
let fresh_objects = let fresh_objects =
collect_cir_objects_from_validation_audit(publication_points, CirObjectSection::Fresh)?; collect_cir_objects_from_validation_audit(publication_points, CirObjectSection::Fresh)?;
let cached_objects = let cached_objects =
collect_cir_objects_from_validation_audit(publication_points, CirObjectSection::Cached)?; collect_cir_objects_from_validation_audit(publication_points, CirObjectSection::Cached)?;
let mut trust_anchors = Vec::with_capacity(tal_bindings.len()); let trust_anchors = build_cir_trust_anchors(tal_bindings)?;
for binding in tal_bindings {
let ta_rsync_uri = canonical_ta_rsync_uri(binding.trust_anchor)?;
let ta_certificate_der = binding.trust_anchor.ta_certificate.raw_der.clone();
trust_anchors.push(CirTrustAnchor {
ta_rsync_uri,
tal_uri: binding.tal_uri.to_string(),
tal_bytes: binding.trust_anchor.tal.raw.clone(),
ta_certificate_sha256: crate::cir::model::sha256(&ta_certificate_der),
ta_certificate_der,
});
}
trust_anchors.sort_by(|a, b| a.ta_rsync_uri.cmp(&b.ta_rsync_uri));
let fresh_rejected_objects = let fresh_rejected_objects =
collect_rejected_objects_from_validation_audit(publication_points, CirObjectSection::Fresh); collect_rejected_objects_from_validation_audit(publication_points, CirObjectSection::Fresh);
@ -261,6 +258,22 @@ pub fn build_cir_from_run_multi(
Ok(cir) Ok(cir)
} }
pub fn build_cir_from_input_snapshot_multi(
tal_bindings: &[CirTrustAnchorBinding<'_>],
validation_time: time::OffsetDateTime,
input: CirInputSnapshot,
) -> Result<CanonicalInputRepresentation, CirExportError> {
let trust_anchors = build_cir_trust_anchors(tal_bindings)?;
Ok(CanonicalInputRepresentation::new_v4(
validation_time,
input.fresh_validated_objects,
input.cached_validated_objects,
trust_anchors,
input.fresh_rejected_objects,
input.cached_rejected_objects,
))
}
pub fn write_cir_file( pub fn write_cir_file(
path: &Path, path: &Path,
cir: &CanonicalInputRepresentation, cir: &CanonicalInputRepresentation,
@ -389,6 +402,33 @@ pub fn export_cir_from_run_multi(
}) })
} }
pub fn export_cir_from_input_snapshot_multi(
tal_bindings: &[CirTrustAnchorBinding<'_>],
validation_time: time::OffsetDateTime,
input: CirInputSnapshot,
cir_out: &Path,
) -> Result<CirExportSummary, CirExportError> {
let total_started = std::time::Instant::now();
let started = std::time::Instant::now();
let cir = build_cir_from_input_snapshot_multi(tal_bindings, validation_time, input)?;
let build_cir_ms = started.elapsed().as_millis() as u64;
let started = std::time::Instant::now();
write_cir_file(cir_out, &cir)?;
let write_cir_ms = started.elapsed().as_millis() as u64;
Ok(CirExportSummary {
object_count: cir.validated_object_count(),
trust_anchor_count: cir.trust_anchors.len(),
timing: CirExportTiming {
build_cir_ms,
write_cir_ms,
total_ms: total_started.elapsed().as_millis() as u64,
},
})
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -1036,6 +1076,43 @@ mod tests {
assert!(matches!(err, CirExportError::MissingTaRsyncUri), "{err}"); assert!(matches!(err, CirExportError::MissingTaRsyncUri), "{err}");
} }
#[test]
fn build_cir_from_input_snapshot_multi_preserves_online_sections() {
let ta = sample_trust_anchor();
let cir = build_cir_from_input_snapshot_multi(
&[CirTrustAnchorBinding {
trust_anchor: &ta,
tal_uri: "https://example.test/root.tal",
}],
sample_time(),
CirInputSnapshot {
fresh_validated_objects: vec![CirObject {
rsync_uri: "rsync://example.test/repo/fresh.roa".to_string(),
sha256: vec![0x11; 32],
}],
cached_validated_objects: vec![CirObject {
rsync_uri: "rsync://example.test/repo/cached.roa".to_string(),
sha256: vec![0x22; 32],
}],
fresh_rejected_objects: vec![CirRejectedObject {
object_uri: "rsync://example.test/repo/fresh.roa".to_string(),
reason: Some("fresh rejected".to_string()),
}],
cached_rejected_objects: vec![CirRejectedObject {
object_uri: "rsync://example.test/repo/cached.roa".to_string(),
reason: None,
}],
},
)
.expect("build CIR from online input");
cir.validate().expect("online CIR snapshot is valid");
assert_eq!(cir.fresh_validated_objects.len(), 1);
assert_eq!(cir.cached_validated_objects.len(), 1);
assert_eq!(cir.fresh_rejected_objects.len(), 1);
assert_eq!(cir.cached_rejected_objects.len(), 1);
}
#[test] #[test]
fn export_cir_static_pool_writes_repository_objects_only() { fn export_cir_static_pool_writes_repository_objects_only() {
let td = tempfile::tempdir().unwrap(); let td = tempfile::tempdir().unwrap();

View File

@ -1,3 +1,4 @@
pub mod accumulator;
pub mod decode; pub mod decode;
pub mod encode; pub mod encode;
#[cfg(feature = "full")] #[cfg(feature = "full")]
@ -9,12 +10,14 @@ pub mod sequence;
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub mod static_pool; pub mod static_pool;
pub use accumulator::{CirInputAccumulator, CirInputSection, CirInputSnapshot};
pub use decode::{CirDecodeError, decode_cir}; pub use decode::{CirDecodeError, decode_cir};
pub use encode::{CirEncodeError, encode_cir}; pub use encode::{CirEncodeError, encode_cir};
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub use export::{ pub use export::{
CirExportError, CirExportSummary, CirTrustAnchorBinding, build_cir_from_run, CirExportError, CirExportSummary, CirTrustAnchorBinding, build_cir_from_input_snapshot_multi,
build_cir_from_run_multi, export_cir_from_run, export_cir_from_run_multi, write_cir_file, build_cir_from_run, build_cir_from_run_multi, export_cir_from_input_snapshot_multi,
export_cir_from_run, export_cir_from_run_multi, write_cir_file,
}; };
#[cfg(feature = "full")] #[cfg(feature = "full")]
pub use materialize::{ pub use materialize::{

View File

@ -3,7 +3,7 @@ mod output;
use crate::ccr::{ use crate::ccr::{
CcrAccumulator, CcrBuildBreakdown, build_ccr_from_run_with_breakdown, write_ccr_file, CcrAccumulator, CcrBuildBreakdown, build_ccr_from_run_with_breakdown, write_ccr_file,
}; };
use crate::cir::{CirTrustAnchorBinding, export_cir_from_run_multi}; use crate::cir::{CirTrustAnchorBinding, export_cir_from_input_snapshot_multi};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate}; use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate};
@ -1043,6 +1043,7 @@ struct PostValidationShared {
download_stats: crate::audit::AuditDownloadStats, download_stats: crate::audit::AuditDownloadStats,
current_repo_objects: Arc<[crate::current_repo_index::CurrentRepoObject]>, current_repo_objects: Arc<[crate::current_repo_index::CurrentRepoObject]>,
ccr_accumulator: Option<CcrAccumulator>, ccr_accumulator: Option<CcrAccumulator>,
cir_input: crate::cir::CirInputSnapshot,
} }
impl PostValidationShared { impl PostValidationShared {
@ -1058,6 +1059,7 @@ impl PostValidationShared {
download_stats, download_stats,
current_repo_objects, current_repo_objects,
ccr_accumulator, ccr_accumulator,
cir_input,
} = out; } = out;
let crate::validation::tree::TreeRunOutput { let crate::validation::tree::TreeRunOutput {
instances_processed, instances_processed,
@ -1084,6 +1086,7 @@ impl PostValidationShared {
download_stats, download_stats,
current_repo_objects: current_repo_objects.into(), current_repo_objects: current_repo_objects.into(),
ccr_accumulator, ccr_accumulator,
cir_input,
} }
} }
@ -2203,7 +2206,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
}; };
let validation_ms = validation_started.elapsed().as_millis() as u64; let validation_ms = validation_started.elapsed().as_millis() as u64;
let shared = PostValidationShared::from_run_output(out); let mut shared = PostValidationShared::from_run_output(out);
let vcir_storage_summary_enabled = vcir_storage_summary_enabled(); let vcir_storage_summary_enabled = vcir_storage_summary_enabled();
let vcir_storage_summary_started = std::time::Instant::now(); let vcir_storage_summary_started = std::time::Instant::now();
let vcir_storage = if config.persist_vcir && vcir_storage_summary_enabled { let vcir_storage = if config.persist_vcir && vcir_storage_summary_enabled {
@ -2398,14 +2401,11 @@ pub fn run(argv: &[String]) -> Result<(), String> {
tal_uri: tal_uri.as_str(), tal_uri: tal_uri.as_str(),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let summary = export_cir_from_run_multi( let summary = export_cir_from_input_snapshot_multi(
store.as_ref(),
&tal_bindings, &tal_bindings,
validation_time, validation_time,
shared.publication_points.as_ref(), std::mem::take(&mut shared.cir_input),
cir_out_path, cir_out_path,
time::OffsetDateTime::now_utc().date(),
None,
) )
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
cir_build_cir_ms = Some(summary.timing.build_cir_ms); cir_build_cir_ms = Some(summary.timing.build_cir_ms);

View File

@ -1470,6 +1470,7 @@ fn synthetic_post_validation_shared() -> PostValidationShared {
download_stats: crate::audit::AuditDownloadStats::default(), download_stats: crate::audit::AuditDownloadStats::default(),
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
ccr_accumulator: None, ccr_accumulator: None,
cir_input: crate::cir::CirInputSnapshot::default(),
}; };
PostValidationShared::from_run_output(out) PostValidationShared::from_run_output(out)
} }

View File

@ -96,6 +96,7 @@ pub struct RunTreeFromTalAuditOutput {
pub tree: TreeRunOutput, pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>, pub publication_points: Vec<PublicationPointAudit>,
pub roa_cache_stats: crate::validation::objects::RoaValidationCacheStats, pub roa_cache_stats: crate::validation::objects::RoaValidationCacheStats,
pub cir_input: crate::cir::CirInputSnapshot,
pub downloads: Vec<crate::audit::AuditDownloadEvent>, pub downloads: Vec<crate::audit::AuditDownloadEvent>,
pub download_stats: crate::audit::AuditDownloadStats, pub download_stats: crate::audit::AuditDownloadStats,
pub current_repo_objects: Vec<CurrentRepoObject>, pub current_repo_objects: Vec<CurrentRepoObject>,
@ -608,6 +609,7 @@ pub fn run_tree_from_tal_url_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -619,6 +621,7 @@ pub fn run_tree_from_tal_url_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -669,6 +672,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -680,6 +684,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -754,6 +759,7 @@ where
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = if phase2_enabled { } = if phase2_enabled {
run_tree_parallel_phase2_audit(root, &runner, config)? run_tree_parallel_phase2_audit(root, &runner, config)?
} else { } else {
@ -769,6 +775,7 @@ where
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: snapshot_current_repo_objects( current_repo_objects: snapshot_current_repo_objects(
@ -872,6 +879,7 @@ where
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = if phase2_enabled { } = if phase2_enabled {
run_tree_parallel_phase2_audit_multi_root(root_handles, &runner, config)? run_tree_parallel_phase2_audit_multi_root(root_handles, &runner, config)?
} else { } else {
@ -887,6 +895,7 @@ where
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: snapshot_current_repo_objects( current_repo_objects: snapshot_current_repo_objects(
@ -1355,6 +1364,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -1366,6 +1376,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1429,6 +1440,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
drop(_tree); drop(_tree);
@ -1441,6 +1453,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1500,6 +1513,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -1511,6 +1525,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1574,6 +1589,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -1585,6 +1601,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1716,6 +1733,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -1727,6 +1745,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1800,6 +1819,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &runner, config)?; } = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
@ -1811,6 +1831,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -1994,7 +2015,7 @@ fn run_payload_delta_replay_audit_inner(
.map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?; .map_err(|e| RunTreeFromTalError::Replay(e.to_string()))?;
let delta_rsync_fetcher = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone()); let delta_rsync_fetcher = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone());
let download_log = DownloadLogHandle::new(); let download_log = DownloadLogHandle::new();
let (tree, publication_points, roa_cache_stats) = if let Some(t) = timing.as_ref() { let (tree, publication_points, roa_cache_stats, cir_input) = if let Some(t) = timing.as_ref() {
let _phase = t.span_phase("payload_delta_replay_target_total"); let _phase = t.span_phase("payload_delta_replay_target_total");
let delta_runner = build_payload_delta_replay_runner( let delta_runner = build_payload_delta_replay_runner(
store, store,
@ -2011,8 +2032,9 @@ fn run_payload_delta_replay_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &delta_runner, config)?; } = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points, roa_cache_stats) (tree, publication_points, roa_cache_stats, cir_input)
} else { } else {
let delta_runner = build_payload_delta_replay_runner( let delta_runner = build_payload_delta_replay_runner(
store, store,
@ -2029,8 +2051,9 @@ fn run_payload_delta_replay_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &delta_runner, config)?; } = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points, roa_cache_stats) (tree, publication_points, roa_cache_stats, cir_input)
}; };
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
@ -2041,6 +2064,7 @@ fn run_payload_delta_replay_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),
@ -2153,7 +2177,7 @@ fn run_payload_delta_replay_step_audit_inner(
PayloadDeltaReplayCurrentStoreRsyncFetcher::new(store, delta_index.clone()); PayloadDeltaReplayCurrentStoreRsyncFetcher::new(store, delta_index.clone());
let download_log = DownloadLogHandle::new(); let download_log = DownloadLogHandle::new();
let (tree, publication_points, roa_cache_stats) = if let Some(t) = timing.as_ref() { let (tree, publication_points, roa_cache_stats, cir_input) = if let Some(t) = timing.as_ref() {
let _phase = t.span_phase("payload_delta_replay_step_total"); let _phase = t.span_phase("payload_delta_replay_step_total");
let delta_runner = build_payload_delta_replay_current_store_runner( let delta_runner = build_payload_delta_replay_current_store_runner(
store, store,
@ -2170,8 +2194,9 @@ fn run_payload_delta_replay_step_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &delta_runner, config)?; } = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points, roa_cache_stats) (tree, publication_points, roa_cache_stats, cir_input)
} else { } else {
let delta_runner = build_payload_delta_replay_current_store_runner( let delta_runner = build_payload_delta_replay_current_store_runner(
store, store,
@ -2188,8 +2213,9 @@ fn run_payload_delta_replay_step_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
} = run_tree_serial_audit(root, &delta_runner, config)?; } = run_tree_serial_audit(root, &delta_runner, config)?;
(tree, publication_points, roa_cache_stats) (tree, publication_points, roa_cache_stats, cir_input)
}; };
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
@ -2200,6 +2226,7 @@ fn run_payload_delta_replay_step_audit_inner(
tree, tree,
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input,
downloads, downloads,
download_stats, download_stats,
current_repo_objects: Vec::new(), current_repo_objects: Vec::new(),

View File

@ -1,5 +1,5 @@
use crate::audit::DiscoveredFrom; use crate::audit::{DiscoveredFrom, ObjectAuditEntry, PublicationPointAudit};
use crate::audit::PublicationPointAudit; use crate::cir::{CirInputAccumulator, CirInputSnapshot};
use crate::data_model::rc::{AsResourceSet, IpResourceSet}; use crate::data_model::rc::{AsResourceSet, IpResourceSet};
use crate::report::Warning; use crate::report::Warning;
use crate::validation::manifest::PublicationPointSource; use crate::validation::manifest::PublicationPointSource;
@ -77,6 +77,8 @@ pub struct PublicationPointRunResult {
pub warnings: Vec<Warning>, pub warnings: Vec<Warning>,
pub objects: ObjectsOutput, pub objects: ObjectsOutput,
pub audit: PublicationPointAudit, pub audit: PublicationPointAudit,
pub cir_fresh_objects: Vec<ObjectAuditEntry>,
pub cir_cached_objects: Vec<ObjectAuditEntry>,
/// Candidate child CA instances to enqueue after this publication point completes. /// Candidate child CA instances to enqueue after this publication point completes.
/// ///
/// - For `Fresh`, these are discovered from the current validated publication point. /// - For `Fresh`, these are discovered from the current validated publication point.
@ -126,6 +128,42 @@ pub struct TreeRunAuditOutput {
pub tree: TreeRunOutput, pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>, pub publication_points: Vec<PublicationPointAudit>,
pub roa_cache_stats: RoaValidationCacheStats, pub roa_cache_stats: RoaValidationCacheStats,
pub cir_input: CirInputSnapshot,
}
pub(crate) fn submit_publication_point_cir_input(
cir_input: &mut CirInputAccumulator,
source: PublicationPointSource,
audit: &PublicationPointAudit,
cir_fresh_objects: &[ObjectAuditEntry],
cir_cached_objects: &[ObjectAuditEntry],
) -> Result<(), TreeRunError> {
match source {
PublicationPointSource::Fresh => {
cir_input
.submit_audit_entries(crate::cir::CirInputSection::Fresh, &audit.objects)
.map_err(|e| TreeRunError::Runner(e.to_string()))?;
}
PublicationPointSource::FailedFetchNoCache => {
let entries = if cir_fresh_objects.is_empty() {
&audit.objects
} else {
cir_fresh_objects
};
cir_input
.submit_audit_entries(crate::cir::CirInputSection::Fresh, entries)
.map_err(|e| TreeRunError::Runner(e.to_string()))?;
}
PublicationPointSource::VcirCurrentInstance => {
cir_input
.submit_audit_entries(crate::cir::CirInputSection::Fresh, cir_fresh_objects)
.map_err(|e| TreeRunError::Runner(e.to_string()))?;
cir_input
.submit_audit_entries(crate::cir::CirInputSection::Cached, cir_cached_objects)
.map_err(|e| TreeRunError::Runner(e.to_string()))?;
}
}
Ok(())
} }
pub fn run_tree_serial( pub fn run_tree_serial(
@ -179,6 +217,7 @@ pub fn run_tree_serial_audit_multi_root(
let mut router_keys: Vec<RouterKeyPayload> = Vec::new(); let mut router_keys: Vec<RouterKeyPayload> = Vec::new();
let mut publication_points: Vec<PublicationPointAudit> = Vec::new(); let mut publication_points: Vec<PublicationPointAudit> = Vec::new();
let mut roa_cache_stats = RoaValidationCacheStats::default(); let mut roa_cache_stats = RoaValidationCacheStats::default();
let mut cir_input = CirInputAccumulator::default();
while let Some(node) = queue.pop_front() { while let Some(node) = queue.pop_front() {
let ca = &node.handle; let ca = &node.handle;
@ -218,10 +257,20 @@ pub fn run_tree_serial_audit_multi_root(
aspas.extend(res.objects.aspas); aspas.extend(res.objects.aspas);
router_keys.extend(res.objects.router_keys); router_keys.extend(res.objects.router_keys);
let source = res.source;
let cir_fresh_objects = res.cir_fresh_objects;
let cir_cached_objects = res.cir_cached_objects;
let mut audit = res.audit; let mut audit = res.audit;
audit.node_id = Some(node.id); audit.node_id = Some(node.id);
audit.parent_node_id = node.parent_id; audit.parent_node_id = node.parent_id;
audit.discovered_from = node.discovered_from.clone(); audit.discovered_from = node.discovered_from.clone();
submit_publication_point_cir_input(
&mut cir_input,
source,
&audit,
&cir_fresh_objects,
&cir_cached_objects,
)?;
if config.compact_audit { if config.compact_audit {
audit.objects.clear(); audit.objects.clear();
audit.warnings.clear(); audit.warnings.clear();
@ -267,6 +316,7 @@ pub fn run_tree_serial_audit_multi_root(
}, },
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input: cir_input.finalize(),
}) })
} }
@ -274,7 +324,9 @@ pub fn run_tree_serial_audit_multi_root(
mod tests { mod tests {
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::audit::{DiscoveredFrom, PublicationPointAudit}; use crate::audit::{
AuditObjectKind, AuditObjectResult, DiscoveredFrom, ObjectAuditEntry, PublicationPointAudit,
};
use crate::validation::objects::{ObjectsOutput, ObjectsStats}; use crate::validation::objects::{ObjectsOutput, ObjectsStats};
use super::{ use super::{
@ -353,6 +405,8 @@ mod tests {
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(), roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: children, discovered_children: children,
}) })
} }
@ -395,4 +449,114 @@ mod tests {
assert_eq!(out.tree.instances_processed, 2); assert_eq!(out.tree.instances_processed, 2);
assert_eq!(out.publication_points.len(), 2); assert_eq!(out.publication_points.len(), 2);
} }
#[test]
fn run_tree_serial_audit_accumulates_fresh_and_cached_cir_input() {
struct CirRunner;
impl PublicationPointRunner for CirRunner {
fn run_publication_point(
&self,
ca: &CaInstanceHandle,
) -> Result<PublicationPointRunResult, String> {
if ca.manifest_rsync_uri.ends_with("cached.mft") {
let fresh_reject = ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/cached/current.mft".to_string(),
sha256_hex: "11".repeat(32),
kind: AuditObjectKind::Manifest,
result: AuditObjectResult::Error,
detail: Some("fresh manifest rejected".to_string()),
};
let cached = ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/cached/old.roa".to_string(),
sha256_hex: "22".repeat(32),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Ok,
detail: None,
};
return Ok(PublicationPointRunResult {
source:
crate::validation::manifest::PublicationPointSource::VcirCurrentInstance,
snapshot: None,
warnings: Vec::new(),
objects: ObjectsOutput {
vrps: Vec::new(),
aspas: Vec::new(),
router_keys: Vec::new(),
local_outputs_cache: Vec::new(),
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats:
crate::validation::objects::RoaValidationCacheStats::default(),
},
audit: PublicationPointAudit {
objects: vec![fresh_reject.clone(), cached.clone()],
..PublicationPointAudit::default()
},
cir_fresh_objects: vec![fresh_reject],
cir_cached_objects: vec![cached],
discovered_children: Vec::new(),
});
}
let fresh = ObjectAuditEntry {
rsync_uri: "rsync://example.test/repo/fresh/a.roa".to_string(),
sha256_hex: "33".repeat(32),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Ok,
detail: None,
};
Ok(PublicationPointRunResult {
source: crate::validation::manifest::PublicationPointSource::Fresh,
snapshot: None,
warnings: Vec::new(),
objects: ObjectsOutput {
vrps: Vec::new(),
aspas: Vec::new(),
router_keys: Vec::new(),
local_outputs_cache: Vec::new(),
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
roa_cache_stats:
crate::validation::objects::RoaValidationCacheStats::default(),
},
audit: PublicationPointAudit {
objects: vec![fresh],
..PublicationPointAudit::default()
},
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(),
})
}
}
let out = run_tree_serial_audit_multi_root(
vec![
sample_handle("rsync://example.test/repo/fresh/root.mft"),
sample_handle("rsync://example.test/repo/cached/cached.mft"),
],
&CirRunner,
&TreeRunConfig::default(),
)
.expect("tree run");
assert_eq!(out.cir_input.fresh_validated_objects.len(), 2);
assert_eq!(out.cir_input.cached_validated_objects.len(), 1);
assert_eq!(out.cir_input.fresh_rejected_objects.len(), 1);
assert!(
out.cir_input
.fresh_validated_objects
.iter()
.any(|item| item.rsync_uri == "rsync://example.test/repo/fresh/a.roa")
);
assert!(
out.cir_input
.cached_validated_objects
.iter()
.any(|item| item.rsync_uri == "rsync://example.test/repo/cached/old.roa")
);
}
} }

View File

@ -3,6 +3,7 @@ use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError, TrySendError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::audit::{DiscoveredFrom, PublicationPointAudit}; use crate::audit::{DiscoveredFrom, PublicationPointAudit};
use crate::cir::CirInputAccumulator;
use crate::parallel::object_worker::ObjectWorkerSubmitError; use crate::parallel::object_worker::ObjectWorkerSubmitError;
use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcome}; use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcome};
use crate::parallel::types::RepoIdentity; use crate::parallel::types::RepoIdentity;
@ -10,6 +11,7 @@ use crate::policy::SignedObjectFailurePolicy;
use crate::report::Warning; use crate::report::Warning;
use crate::storage::VcirReplaceTimingBreakdown; use crate::storage::VcirReplaceTimingBreakdown;
use crate::validation::manifest::PublicationPointData; use crate::validation::manifest::PublicationPointData;
use crate::validation::manifest::PublicationPointSource;
use crate::validation::objects::{ use crate::validation::objects::{
ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage, ObjectsOutput, OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage,
RoaValidationCacheInput, prepare_publication_point_for_parallel_roa_with_cache, RoaValidationCacheInput, prepare_publication_point_for_parallel_roa_with_cache,
@ -88,9 +90,12 @@ impl FinishedPublicationPointNode {
#[derive(Debug)] #[derive(Debug)]
enum FinishedPublicationPointResult { enum FinishedPublicationPointResult {
Ok { Ok {
source: PublicationPointSource,
warnings: Vec<Warning>, warnings: Vec<Warning>,
objects: ObjectsOutput, objects: ObjectsOutput,
audit: PublicationPointAudit, audit: PublicationPointAudit,
cir_fresh_objects: Vec<crate::audit::ObjectAuditEntry>,
cir_cached_objects: Vec<crate::audit::ObjectAuditEntry>,
}, },
Err(String), Err(String),
} }
@ -410,14 +415,22 @@ fn compact_phase2_finished_result(
) -> FinishedPublicationPointResult { ) -> FinishedPublicationPointResult {
result.objects.audit.clear(); result.objects.audit.clear();
result.objects.local_outputs_cache.clear(); result.objects.local_outputs_cache.clear();
let cir_fresh_objects = if compact_audit && result.source == PublicationPointSource::Fresh {
result.audit.objects.clone()
} else {
result.cir_fresh_objects
};
if compact_audit { if compact_audit {
result.audit.objects.clear(); result.audit.objects.clear();
result.audit.warnings.clear(); result.audit.warnings.clear();
} }
FinishedPublicationPointResult::Ok { FinishedPublicationPointResult::Ok {
source: result.source,
warnings: result.warnings, warnings: result.warnings,
objects: result.objects, objects: result.objects,
audit: result.audit, audit: result.audit,
cir_fresh_objects,
cir_cached_objects: result.cir_cached_objects,
} }
} }
@ -2025,13 +2038,17 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
let mut router_keys = Vec::new(); let mut router_keys = Vec::new();
let mut publication_points = Vec::new(); let mut publication_points = Vec::new();
let mut roa_cache_stats = crate::validation::objects::RoaValidationCacheStats::default(); let mut roa_cache_stats = crate::validation::objects::RoaValidationCacheStats::default();
let mut cir_input = CirInputAccumulator::default();
for item in finished { for item in finished {
match item.result { match item.result {
FinishedPublicationPointResult::Ok { FinishedPublicationPointResult::Ok {
source,
warnings: result_warnings, warnings: result_warnings,
objects, objects,
audit, audit,
cir_fresh_objects,
cir_cached_objects,
} => { } => {
instances_processed += 1; instances_processed += 1;
warnings.extend(result_warnings); warnings.extend(result_warnings);
@ -2045,6 +2062,14 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
audit.node_id = Some(item.node.id); audit.node_id = Some(item.node.id);
audit.parent_node_id = item.node.parent_id; audit.parent_node_id = item.node.parent_id;
audit.discovered_from = item.node.discovered_from; audit.discovered_from = item.node.discovered_from;
crate::validation::tree::submit_publication_point_cir_input(
&mut cir_input,
source,
&audit,
&cir_fresh_objects,
&cir_cached_objects,
)
.expect("CIR input collection from validated audit must not fail");
publication_points.push(audit); publication_points.push(audit);
} }
FinishedPublicationPointResult::Err(err) => { FinishedPublicationPointResult::Err(err) => {
@ -2068,6 +2093,7 @@ fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAudi
}, },
publication_points, publication_points,
roa_cache_stats, roa_cache_stats,
cir_input: cir_input.finalize(),
} }
} }
@ -2133,6 +2159,8 @@ mod tests {
roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(), roa_cache_stats: crate::validation::objects::RoaValidationCacheStats::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
} }
} }
@ -2172,10 +2200,16 @@ mod tests {
}); });
let result = compact_phase2_finished_result(sample, true); let result = compact_phase2_finished_result(sample, true);
match result { match result {
FinishedPublicationPointResult::Ok { objects, audit, .. } => { FinishedPublicationPointResult::Ok {
objects,
audit,
cir_fresh_objects,
..
} => {
assert!(audit.objects.is_empty()); assert!(audit.objects.is_empty());
assert!(audit.warnings.is_empty()); assert!(audit.warnings.is_empty());
assert!(objects.audit.is_empty()); assert!(objects.audit.is_empty());
assert_eq!(cir_fresh_objects.len(), 1);
} }
FinishedPublicationPointResult::Err(err) => panic!("unexpected error: {err}"), FinishedPublicationPointResult::Err(err) => panic!("unexpected error: {err}"),
} }

View File

@ -440,6 +440,8 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
warnings, warnings,
objects, objects,
audit, audit,
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children, discovered_children,
}, },
snapshot_pack_ms, snapshot_pack_ms,
@ -996,12 +998,33 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
&fresh_failure_audits, &fresh_failure_audits,
); );
let audit_build_ms = audit_build_started.elapsed().as_millis() as u64; let audit_build_ms = audit_build_started.elapsed().as_millis() as u64;
let cir_cached_objects =
if projection.source == PublicationPointSource::VcirCurrentInstance {
audit
.objects
.iter()
.filter(|entry| {
!fresh_failure_audits.iter().any(|fresh| fresh == *entry)
})
.cloned()
.collect()
} else {
Vec::new()
};
let result = PublicationPointRunResult { let result = PublicationPointRunResult {
source: projection.source, source: projection.source,
snapshot: projection.snapshot, snapshot: projection.snapshot,
warnings, warnings,
objects: projection.objects, objects: projection.objects,
audit, audit,
cir_fresh_objects: if projection.source
== PublicationPointSource::VcirCurrentInstance
{
fresh_failure_audits
} else {
Vec::new()
},
cir_cached_objects,
discovered_children: projection.discovered_children, discovered_children: projection.discovered_children,
}; };
let total_duration_ms = let total_duration_ms =
@ -1930,9 +1953,7 @@ fn build_publication_point_audit_from_snapshot(
next_update_rfc3339_utc: pack.next_update.rfc3339_utc.clone(), next_update_rfc3339_utc: pack.next_update.rfc3339_utc.clone(),
verified_at_rfc3339_utc: pack.verified_at.rfc3339_utc.clone(), verified_at_rfc3339_utc: pack.verified_at.rfc3339_utc.clone(),
warnings, warnings,
objects: objects_out.clone(), objects: objects_out,
cir_fresh_objects: objects_out,
cir_cached_objects: Vec::new(),
} }
} }
@ -1989,8 +2010,6 @@ fn build_publication_point_audit_from_vcir(
verified_at_rfc3339_utc: String::new(), verified_at_rfc3339_utc: String::new(),
warnings, warnings,
objects: fresh_failure_audits.to_vec(), objects: fresh_failure_audits.to_vec(),
cir_fresh_objects: fresh_failure_audits.to_vec(),
cir_cached_objects: Vec::new(),
}; };
}; };
@ -2027,9 +2046,7 @@ fn build_publication_point_audit_from_vcir(
.clone(), .clone(),
verified_at_rfc3339_utc: vcir.last_successful_validation_time.rfc3339_utc.clone(), verified_at_rfc3339_utc: vcir.last_successful_validation_time.rfc3339_utc.clone(),
warnings, warnings,
objects: objects_out.clone(), objects: objects_out,
cir_fresh_objects: objects_out,
cir_cached_objects: Vec::new(),
}; };
} }
@ -2124,8 +2141,6 @@ fn build_publication_point_audit_from_vcir(
verified_at_rfc3339_utc: vcir.last_successful_validation_time.rfc3339_utc.clone(), verified_at_rfc3339_utc: vcir.last_successful_validation_time.rfc3339_utc.clone(),
warnings, warnings,
objects: audit_objects, objects: audit_objects,
cir_fresh_objects: fresh_failure_audits.to_vec(),
cir_cached_objects: objects_out,
} }
} }

View File

@ -60,6 +60,8 @@ impl PublicationPointRunner for SinglePackRunner {
warnings: Vec::new(), warnings: Vec::new(),
objects, objects,
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}) })
} }

View File

@ -120,6 +120,8 @@ fn tree_continues_when_a_publication_point_fails() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: vec![ discovered_children: vec![
discovered_child(root_manifest, bad_child_manifest), discovered_child(root_manifest, bad_child_manifest),
discovered_child(root_manifest, ok_child_manifest), discovered_child(root_manifest, ok_child_manifest),
@ -147,6 +149,8 @@ fn tree_continues_when_a_publication_point_fails() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );

View File

@ -130,6 +130,8 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: root_children, discovered_children: root_children,
}, },
) )
@ -153,6 +155,8 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: child1_children, discovered_children: child1_children,
}, },
) )
@ -176,6 +180,8 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
) )
@ -199,6 +205,8 @@ fn tree_enqueues_children_for_fresh_and_current_instance_vcir_results() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );
@ -259,6 +267,8 @@ fn tree_respects_max_depth_and_max_instances() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: vec![discovered_child(root_manifest, child_manifest)], discovered_children: vec![discovered_child(root_manifest, child_manifest)],
}, },
) )
@ -282,6 +292,8 @@ fn tree_respects_max_depth_and_max_instances() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );
@ -344,6 +356,8 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: vec![discovered_child(root_manifest, child_manifest)], discovered_children: vec![discovered_child(root_manifest, child_manifest)],
}, },
) )
@ -367,6 +381,8 @@ fn tree_audit_includes_parent_and_discovered_from_for_non_root_nodes() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );
@ -436,6 +452,8 @@ fn tree_aggregates_router_keys_from_publication_point_results() {
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );
@ -478,6 +496,8 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: vec![first, second], discovered_children: vec![first, second],
}, },
) )
@ -501,6 +521,8 @@ fn tree_prefers_lexicographically_first_discovery_when_duplicate_manifest_is_que
roa_cache_stats: Default::default(), roa_cache_stats: Default::default(),
}, },
audit: PublicationPointAudit::default(), audit: PublicationPointAudit::default(),
cir_fresh_objects: Vec::new(),
cir_cached_objects: Vec::new(),
discovered_children: Vec::new(), discovered_children: Vec::new(),
}, },
); );