20260428 daemon运行化和delta复用warning修复

This commit is contained in:
yuyr 2026-04-29 08:13:10 +08:00
parent 3b2a160c5c
commit ad61caf271
6 changed files with 2035 additions and 87 deletions

1783
src/bin/rpki_daemon.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -214,6 +214,17 @@ pub enum ManifestFreshError {
HashMismatch { rsync_uri: String }, HashMismatch { rsync_uri: String },
} }
impl ManifestFreshError {
pub(crate) fn should_warn_when_current_instance_reused(&self) -> bool {
!matches!(
self,
ManifestFreshError::RepoSyncFailed { .. }
| ManifestFreshError::MissingManifest { .. }
| ManifestFreshError::MissingFile { .. }
)
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ManifestReuseError { pub enum ManifestReuseError {
#[error("latest current-instance VCIR missing: {0} (RFC 9286 §6.6)")] #[error("latest current-instance VCIR missing: {0} (RFC 9286 §6.6)")]
@ -398,12 +409,6 @@ pub fn process_manifest_publication_point_after_repo_sync(
Err(ManifestProcessError::StopAllOutput(fresh_err)) Err(ManifestProcessError::StopAllOutput(fresh_err))
} }
CaFailedFetchPolicy::ReuseCurrentInstanceVcir => { CaFailedFetchPolicy::ReuseCurrentInstanceVcir => {
let mut warnings = vec![
Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(manifest_rsync_uri),
];
match load_current_instance_vcir_publication_point( match load_current_instance_vcir_publication_point(
store, store,
manifest_rsync_uri, manifest_rsync_uri,
@ -411,11 +416,14 @@ pub fn process_manifest_publication_point_after_repo_sync(
validation_time, validation_time,
) { ) {
Ok(snapshot) => { Ok(snapshot) => {
let mut warnings = Vec::new();
if fresh_err.should_warn_when_current_instance_reused() {
warnings.push( warnings.push(
Warning::new("using latest validated result for current CA instance") Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(manifest_rsync_uri), .with_context(manifest_rsync_uri),
); );
}
Ok(PublicationPointResult { Ok(PublicationPointResult {
source: PublicationPointSource::VcirCurrentInstance, source: PublicationPointSource::VcirCurrentInstance,
snapshot, snapshot,

View File

@ -1960,16 +1960,17 @@ fn project_current_instance_vcir_on_failed_fetch(
fresh_err: &ManifestFreshError, fresh_err: &ManifestFreshError,
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
) -> Result<VcirReuseProjection, String> { ) -> Result<VcirReuseProjection, String> {
let mut warnings = vec![ let mut warnings = Vec::new();
Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(&ca.manifest_rsync_uri),
];
let Some(vcir) = store let Some(vcir) = store
.get_vcir(&ca.manifest_rsync_uri) .get_vcir(&ca.manifest_rsync_uri)
.map_err(|e| format!("load VCIR failed: {e}"))? .map_err(|e| format!("load VCIR failed: {e}"))?
else { else {
warnings.push(
Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(&ca.manifest_rsync_uri),
);
warnings.push( warnings.push(
Warning::new( Warning::new(
"no latest validated result for current CA instance; no cached output reused", "no latest validated result for current CA instance; no cached output reused",
@ -1990,6 +1991,11 @@ fn project_current_instance_vcir_on_failed_fetch(
}; };
if !vcir.audit_summary.failed_fetch_eligible { if !vcir.audit_summary.failed_fetch_eligible {
warnings.push(
Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(&ca.manifest_rsync_uri),
);
warnings.push( warnings.push(
Warning::new( Warning::new(
"latest VCIR is not marked failed-fetch eligible; no cached output reused", "latest VCIR is not marked failed-fetch eligible; no cached output reused",
@ -2012,6 +2018,11 @@ fn project_current_instance_vcir_on_failed_fetch(
let instance_effective_until = let instance_effective_until =
parse_snapshot_time_value(&vcir.instance_gate.instance_effective_until)?; parse_snapshot_time_value(&vcir.instance_gate.instance_effective_until)?;
if validation_time > instance_effective_until { if validation_time > instance_effective_until {
warnings.push(
Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(&ca.manifest_rsync_uri),
);
warnings.push( warnings.push(
Warning::new( Warning::new(
"latest VCIR instance_gate expired; current instance contributes no cached output", "latest VCIR instance_gate expired; current instance contributes no cached output",
@ -2031,14 +2042,17 @@ fn project_current_instance_vcir_on_failed_fetch(
}); });
} }
let ccr_manifest_projection = reuse_ccr_manifest_projection_from_vcir(ca, &vcir)?;
if fresh_err.should_warn_when_current_instance_reused() {
warnings.push( warnings.push(
Warning::new("using latest validated result for current CA instance") Warning::new(format!("manifest failed fetch: {fresh_err}"))
.with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")])
.with_context(&ca.manifest_rsync_uri), .with_context(&ca.manifest_rsync_uri),
); );
}
let ccr_manifest_projection = reuse_ccr_manifest_projection_from_vcir(ca, &vcir)?; // Current-instance reuse is fully described by VCIR projections; rebuilding a
let snapshot = reconstruct_snapshot_from_vcir(store, ca, &vcir, &mut warnings); // byte-backed snapshot here only duplicates repo-byte I/O and creates warning noise.
let snapshot = None;
let objects = build_objects_output_from_vcir(&vcir, validation_time, &mut warnings); let objects = build_objects_output_from_vcir(&vcir, validation_time, &mut warnings);
let (discovered_children, child_audits) = let (discovered_children, child_audits) =
restore_children_from_vcir(store, ca, &vcir, &mut warnings); restore_children_from_vcir(store, ca, &vcir, &mut warnings);
@ -2055,6 +2069,7 @@ fn project_current_instance_vcir_on_failed_fetch(
}) })
} }
#[cfg(test)]
fn reconstruct_snapshot_from_vcir( fn reconstruct_snapshot_from_vcir(
store: &RocksStore, store: &RocksStore,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
@ -2065,8 +2080,8 @@ fn reconstruct_snapshot_from_vcir(
artifact.artifact_role == VcirArtifactRole::Manifest artifact.artifact_role == VcirArtifactRole::Manifest
&& artifact.uri.as_deref() == Some(ca.manifest_rsync_uri.as_str()) && artifact.uri.as_deref() == Some(ca.manifest_rsync_uri.as_str())
})?; })?;
let manifest_entry = match store.get_raw_by_hash_entry(&manifest_artifact.sha256) { let manifest_bytes = match store.get_blob_bytes(&manifest_artifact.sha256) {
Ok(Some(entry)) => entry, Ok(Some(bytes)) => bytes,
Ok(None) => { Ok(None) => {
warnings.push( warnings.push(
Warning::new("manifest raw bytes missing for VCIR audit reconstruction") Warning::new("manifest raw bytes missing for VCIR audit reconstruction")
@ -2101,8 +2116,8 @@ fn reconstruct_snapshot_from_vcir(
if !seen.insert(uri.clone()) { if !seen.insert(uri.clone()) {
continue; continue;
} }
match store.get_raw_by_hash_entry(&artifact.sha256) { match store.get_blob_bytes(&artifact.sha256) {
Ok(Some(entry)) => files.push(PackFile::from_bytes_compute_sha256(uri, entry.bytes)), Ok(Some(bytes)) => files.push(PackFile::from_bytes_compute_sha256(uri, bytes)),
Ok(None) => warnings.push( Ok(None) => warnings.push(
Warning::new("related artifact raw bytes missing for VCIR audit reconstruction") Warning::new("related artifact raw bytes missing for VCIR audit reconstruction")
.with_context(uri), .with_context(uri),
@ -2133,7 +2148,7 @@ fn reconstruct_snapshot_from_vcir(
.validated_manifest_next_update .validated_manifest_next_update
.clone(), .clone(),
verified_at: vcir.last_successful_validation_time.clone(), verified_at: vcir.last_successful_validation_time.clone(),
manifest_bytes: manifest_entry.bytes, manifest_bytes,
files, files,
}) })
} }
@ -2399,14 +2414,14 @@ fn restore_children_from_vcir(
let mut children = Vec::new(); let mut children = Vec::new();
let mut audits = Vec::new(); let mut audits = Vec::new();
for child in &vcir.child_entries { for child in &vcir.child_entries {
match store.get_raw_by_hash_entry(&child.child_cert_hash) { match store.get_blob_bytes(&child.child_cert_hash) {
Ok(Some(entry)) => { Ok(Some(bytes)) => {
children.push(DiscoveredChildCaInstance { children.push(DiscoveredChildCaInstance {
handle: CaInstanceHandle { handle: CaInstanceHandle {
depth: 0, depth: 0,
tal_id: ca.tal_id.clone(), tal_id: ca.tal_id.clone(),
parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()), parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()),
ca_certificate_der: entry.bytes, ca_certificate_der: bytes,
ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()), ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()),
effective_ip_resources: child.child_effective_ip_resources.clone(), effective_ip_resources: child.child_effective_ip_resources.clone(),
effective_as_resources: child.child_effective_as_resources.clone(), effective_as_resources: child.child_effective_as_resources.clone(),
@ -2433,7 +2448,7 @@ fn restore_children_from_vcir(
} }
Ok(None) => { Ok(None) => {
warnings.push( warnings.push(
Warning::new("child certificate raw bytes missing for VCIR child restoration") Warning::new("child certificate bytes missing for VCIR child restoration")
.with_context(&child.child_cert_rsync_uri), .with_context(&child.child_cert_rsync_uri),
); );
audits.push(ObjectAuditEntry { audits.push(ObjectAuditEntry {
@ -2442,15 +2457,14 @@ fn restore_children_from_vcir(
kind: AuditObjectKind::Certificate, kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error, result: AuditObjectResult::Error,
detail: Some( detail: Some(
"child certificate raw bytes missing for VCIR child restoration" "child certificate bytes missing for VCIR child restoration".to_string(),
.to_string(),
), ),
}); });
} }
Err(e) => { Err(e) => {
warnings.push( warnings.push(
Warning::new(format!( Warning::new(format!(
"child certificate raw bytes load failed for VCIR child restoration: {e}" "child certificate bytes load failed for VCIR child restoration: {e}"
)) ))
.with_context(&child.child_cert_rsync_uri), .with_context(&child.child_cert_rsync_uri),
); );
@ -2460,7 +2474,7 @@ fn restore_children_from_vcir(
kind: AuditObjectKind::Certificate, kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error, result: AuditObjectResult::Error,
detail: Some(format!( detail: Some(format!(
"child certificate raw bytes load failed for VCIR child restoration: {e}" "child certificate bytes load failed for VCIR child restoration: {e}"
)), )),
}); });
} }
@ -5699,45 +5713,21 @@ authorityKeyIdentifier = keyid:always
let vcir = sample_vcir_for_projection(now, &child_cert_hash); let vcir = sample_vcir_for_projection(now, &child_cert_hash);
let store_dir = tempfile::tempdir().expect("store dir"); let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); let main_db = store_dir.path().join("work-db");
let repo_bytes_db = store_dir.path().join("repo-bytes.db");
let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db)
.expect("open rocksdb with external repo bytes");
store.put_vcir(&vcir).expect("put vcir"); store.put_vcir(&vcir).expect("put vcir");
for (bytes, uri, object_type) in [
(
b"manifest-bytes".to_vec(),
Some(vcir.current_manifest_rsync_uri.clone()),
Some("mft".to_string()),
),
(
b"current-crl-bytes".to_vec(),
Some(vcir.current_crl_rsync_uri.clone()),
Some("crl".to_string()),
),
(
g.child_ca_der.clone(),
Some(vcir.child_entries[0].child_cert_rsync_uri.clone()),
Some("cer".to_string()),
),
(
b"roa-bytes".to_vec(),
Some("rsync://example.test/repo/issuer/a.roa".to_string()),
Some("roa".to_string()),
),
(
b"aspa-bytes".to_vec(),
Some("rsync://example.test/repo/issuer/a.asa".to_string()),
Some("aspa".to_string()),
),
] {
let mut entry = RawByHashEntry::from_bytes(sha256_hex(&bytes), bytes);
if let Some(uri) = uri {
entry.origin_uris.push(uri);
}
entry.object_type = object_type;
entry.encoding = Some("der".to_string());
store store
.put_raw_by_hash_entry(&entry) .put_blob_bytes_batch(&[(child_cert_hash.clone(), g.child_ca_der.clone())])
.expect("put raw_by_hash"); .expect("put child cert repo bytes");
} assert!(
store
.get_raw_by_hash_entry(&child_cert_hash)
.expect("lookup child raw_by_hash")
.is_none(),
"child cert restoration should not require raw_by_hash entries"
);
let ca = CaInstanceHandle { let ca = CaInstanceHandle {
depth: 0, depth: 0,
@ -5780,8 +5770,36 @@ authorityKeyIdentifier = keyid:always
Some(&vcir.ccr_manifest_projection) Some(&vcir.ccr_manifest_projection)
); );
assert!( assert!(
projection.snapshot.is_some(), projection.snapshot.is_none(),
"expected reconstructed snapshot" "current-instance reuse should not reconstruct a byte-backed snapshot"
);
assert!(
!projection
.warnings
.iter()
.any(|warning| warning.message.contains("manifest failed fetch")),
"successful current-instance reuse should not duplicate the fresh fetch error"
);
assert!(
!projection
.warnings
.iter()
.any(|warning| warning.message.contains("using latest validated result")),
"successful current-instance reuse should be tracked by source, not warning"
);
assert!(
!projection
.warnings
.iter()
.any(|warning| warning.message.contains("manifest raw bytes missing")),
"successful current-instance reuse should not load repo bytes for audit reconstruction"
);
assert!(
!projection
.warnings
.iter()
.any(|warning| warning.message.contains("child certificate bytes missing")),
"child discovery restoration should read child certs from repo bytes"
); );
} }
@ -5836,6 +5854,65 @@ authorityKeyIdentifier = keyid:always
assert!(projection.discovered_children.is_empty()); assert!(projection.discovered_children.is_empty());
} }
#[test]
fn project_current_instance_vcir_keeps_real_fresh_validation_warning() {
let now = time::OffsetDateTime::now_utc();
let child_cert_hash = sha256_hex(b"child-cert");
let vcir = sample_vcir_for_projection(now, &child_cert_hash);
let store_dir = tempfile::tempdir().expect("store dir");
let main_db = store_dir.path().join("work-db");
let repo_bytes_db = store_dir.path().join("repo-bytes.db");
let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db)
.expect("open rocksdb with external repo bytes");
store.put_vcir(&vcir).expect("put vcir");
store
.put_blob_bytes_batch(&[(child_cert_hash, b"child-cert".to_vec())])
.expect("put child cert repo bytes");
let ca = CaInstanceHandle {
depth: 0,
tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: Vec::new(),
ca_certificate_rsync_uri: None,
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: "rsync://example.test/repo/issuer/".to_string(),
manifest_rsync_uri: vcir.manifest_rsync_uri.clone(),
publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(),
rrdp_notification_uri: None,
};
let projection = project_current_instance_vcir_on_failed_fetch(
&store,
&ca,
&ManifestFreshError::HashMismatch {
rsync_uri: "rsync://example.test/repo/issuer/a.roa".to_string(),
},
now,
)
.expect("project vcir");
assert_eq!(
projection.source,
PublicationPointSource::VcirCurrentInstance
);
assert!(
projection
.warnings
.iter()
.any(|warning| { warning.message.contains("manifest file hash mismatch") })
);
assert!(
!projection
.warnings
.iter()
.any(|warning| warning.message.contains("using latest validated result")),
"successful current-instance reuse should not emit bookkeeping warnings"
);
}
#[test] #[test]
fn project_current_instance_vcir_returns_no_output_when_latest_result_missing() { fn project_current_instance_vcir_returns_no_output_when_latest_result_missing() {
let now = time::OffsetDateTime::now_utc(); let now = time::OffsetDateTime::now_utc();
@ -6509,6 +6586,74 @@ authorityKeyIdentifier = keyid:always
})); }));
} }
#[test]
fn reconstruct_snapshot_from_vcir_reads_repo_bytes_without_raw_entries() {
let now = time::OffsetDateTime::now_utc();
let child_cert_hash = sha256_hex(b"child-cert");
let vcir = sample_vcir_for_projection(now, &child_cert_hash);
let ca = CaInstanceHandle {
depth: 0,
tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: Vec::new(),
ca_certificate_rsync_uri: None,
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: "rsync://example.test/repo/issuer/".to_string(),
manifest_rsync_uri: vcir.manifest_rsync_uri.clone(),
publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(),
rrdp_notification_uri: None,
};
let store_dir = tempfile::tempdir().expect("store dir");
let main_db = store_dir.path().join("work-db");
let repo_bytes_db = store_dir.path().join("repo-bytes.db");
let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db)
.expect("open rocksdb with external repo bytes");
let repo_blobs = [
b"manifest-bytes".to_vec(),
b"current-crl-bytes".to_vec(),
b"child-cert".to_vec(),
b"roa-bytes".to_vec(),
b"aspa-bytes".to_vec(),
]
.into_iter()
.map(|bytes| (sha256_hex(&bytes), bytes))
.collect::<Vec<_>>();
store
.put_blob_bytes_batch(&repo_blobs)
.expect("put external repo bytes");
let manifest_hash = vcir
.related_artifacts
.iter()
.find(|artifact| artifact.artifact_role == VcirArtifactRole::Manifest)
.expect("manifest artifact")
.sha256
.clone();
assert!(
store
.get_raw_by_hash_entry(&manifest_hash)
.expect("raw manifest lookup")
.is_none(),
"repo object bytes must not require raw_by_hash entries"
);
let mut warnings = Vec::new();
let pack = reconstruct_snapshot_from_vcir(&store, &ca, &vcir, &mut warnings)
.expect("reconstruct pack from external repo bytes");
assert_eq!(pack.manifest_bytes, b"manifest-bytes".to_vec());
assert_eq!(pack.files.len(), 4, "crl + child cert + roa + aspa");
assert!(
warnings.iter().all(|warning| {
!warning
.message
.contains("raw bytes missing for VCIR audit reconstruction")
}),
"external repo bytes should satisfy VCIR audit reconstruction without raw warnings"
);
}
#[test] #[test]
fn runner_dedup_paths_execute_with_timing_enabled() { fn runner_dedup_paths_execute_with_timing_enabled() {
let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))

View File

@ -318,9 +318,11 @@ fn apnic_root_repo_sync_failure_reuses_current_instance_vcir() {
assert_eq!(pp.source, PublicationPointSource::VcirCurrentInstance); assert_eq!(pp.source, PublicationPointSource::VcirCurrentInstance);
assert!( assert!(
pp.warnings.iter().any(|w| w pp.warnings.iter().all(|w| {
.message !w.message
.contains("using latest validated result for current CA instance")), .contains("using latest validated result for current CA instance")
"expected current-instance VCIR reuse warning" && !w.message.contains("manifest failed fetch")
}),
"successful current-instance VCIR reuse should not emit fallback bookkeeping warnings"
); );
} }

View File

@ -298,10 +298,18 @@ fn manifest_hash_mismatch_reuses_current_instance_vcir_when_enabled() {
.expect("second run reuses current-instance VCIR"); .expect("second run reuses current-instance VCIR");
assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance); assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance);
assert!( assert!(
second.warnings.iter().any(|w| w second
.message .warnings
.contains("using latest validated result for current CA instance")), .iter()
"expected current-instance VCIR reuse warning" .any(|w| w.message.contains("manifest file hash mismatch")),
"expected warning for real fresh manifest validation failure"
);
assert!(
second.warnings.iter().all(|w| {
!w.message
.contains("using latest validated result for current CA instance")
}),
"successful current-instance VCIR reuse should not emit fallback bookkeeping warning"
); );
} }

View File

@ -247,10 +247,12 @@ fn repo_sync_failed_can_reuse_current_instance_vcir_when_present() {
.expect("repo sync failure should reuse current-instance VCIR"); .expect("repo sync failure should reuse current-instance VCIR");
assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance); assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance);
assert!( assert!(
second.warnings.iter().any(|w| w second.warnings.iter().all(|w| {
.message !w.message
.contains("using latest validated result for current CA instance")), .contains("using latest validated result for current CA instance")
"expected current-instance VCIR reuse warning" && !w.message.contains("manifest failed fetch")
}),
"successful current-instance VCIR reuse should not emit fallback bookkeeping warnings"
); );
} }