20260618 优化PP cache hit child恢复

This commit is contained in:
yuyr 2026-06-18 18:05:27 +08:00
parent d4d227ce60
commit bd266ef2a5
2 changed files with 350 additions and 87 deletions

View File

@ -25,11 +25,11 @@ use crate::replay::archive::ReplayArchiveIndex;
use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::report::{RfcRef, Warning};
use crate::storage::{
PackFile, PackTime, PublicationPointCacheProjection, RawByHashEntry, RocksStore,
ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus,
VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput,
VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown,
VcirSourceObjectType, VcirSummary,
PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheProjection,
RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole,
VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry,
VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact,
VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary,
};
use crate::sync::repo::{
sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta,
@ -64,6 +64,9 @@ use x509_parser::x509::SubjectPublicKeyInfo;
use vcir_der::encode_access_description_der_for_vcir_ccr_projection;
const PUBLICATION_POINT_CACHE_CHILD_RESTORE_PARALLEL_MIN_CHILDREN: usize = 256;
const PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS: usize = 16;
fn sha256_hex_to_32(hex_value: &str) -> [u8; 32] {
let mut out = [0u8; 32];
hex::decode_to_slice(hex_value, &mut out).expect("internal sha256 hex should decode");
@ -538,12 +541,15 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
build_objects_started,
);
let restore_children_started = std::time::Instant::now();
let child_restore_workers = self.publication_point_cache_child_restore_worker_count();
let (discovered_children, child_audits) = restore_children_from_publication_point_cache(
self.store,
ca,
&projection,
self.validation_time,
&mut warnings,
child_restore_workers,
self.timing.as_ref(),
);
let restore_children_ms = self.record_publication_point_cache_phase_ms(
"publication_point_cache_restore_children_total",
@ -611,6 +617,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
"to_vcir_ms": to_vcir_ms,
"build_objects_ms": build_objects_ms,
"restore_children_ms": restore_children_ms,
"restore_children_workers": child_restore_workers,
"ccr_append_ms": ccr_append_ms,
"audit_build_ms": audit_build_ms,
"total_ms": total_ms,
@ -642,6 +649,14 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
nanos / 1_000_000
}
fn publication_point_cache_child_restore_worker_count(&self) -> usize {
self.parallel_phase2_config
.as_ref()
.map(|config| config.object_workers)
.unwrap_or(1)
.clamp(1, PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS)
}
pub(crate) fn ccr_accumulator_snapshot(&self) -> Option<CcrAccumulator> {
self.ccr_accumulator
.as_ref()
@ -3282,48 +3297,242 @@ fn restore_children_from_publication_point_cache(
projection: &PublicationPointCacheProjection,
validation_time: time::OffsetDateTime,
warnings: &mut Vec<Warning>,
worker_count: usize,
timing: Option<&TimingHandle>,
) -> (Vec<DiscoveredChildCaInstance>, Vec<ObjectAuditEntry>) {
let mut children = Vec::new();
let mut audits = Vec::new();
for child in &projection.children {
let worker_count = worker_count
.clamp(1, PUBLICATION_POINT_CACHE_CHILD_RESTORE_MAX_WORKERS)
.min(projection.children.len().max(1));
let outcomes = if worker_count > 1
&& projection.children.len() >= PUBLICATION_POINT_CACHE_CHILD_RESTORE_PARALLEL_MIN_CHILDREN
{
if let Some(timing) = timing {
timing.record_count(
"publication_point_cache_restore_children_parallel_publication_points",
1,
);
timing.record_count(
"publication_point_cache_restore_children_parallel_children",
projection.children.len() as u64,
);
timing.record_count(
"publication_point_cache_restore_children_workers_total",
worker_count as u64,
);
}
restore_publication_point_cache_children_parallel(
store,
ca,
&projection.children,
validation_time,
worker_count,
)
} else {
if let Some(timing) = timing {
timing.record_count(
"publication_point_cache_restore_children_batch_publication_points",
1,
);
timing.record_count(
"publication_point_cache_restore_children_batch_children",
projection.children.len() as u64,
);
}
restore_publication_point_cache_children_chunk(
store,
ca,
&projection.children,
validation_time,
)
};
collect_publication_point_cache_child_restore_outcomes(outcomes, warnings)
}
fn restore_publication_point_cache_children_parallel(
store: &RocksStore,
ca: &CaInstanceHandle,
children: &[PublicationPointCacheChild],
validation_time: time::OffsetDateTime,
worker_count: usize,
) -> Vec<PublicationPointCacheChildRestoreOutcome> {
let chunk_size = children.len().div_ceil(worker_count).max(1);
let mut chunk_results = Vec::new();
std::thread::scope(|scope| {
let mut handles = Vec::new();
for chunk in children.chunks(chunk_size) {
handles.push(scope.spawn(move || {
restore_publication_point_cache_children_chunk(store, ca, chunk, validation_time)
}));
}
for handle in handles {
chunk_results.extend(
handle
.join()
.expect("publication-point cache child restore worker panicked"),
);
}
});
chunk_results
}
fn restore_publication_point_cache_children_chunk(
store: &RocksStore,
ca: &CaInstanceHandle,
children: &[PublicationPointCacheChild],
validation_time: time::OffsetDateTime,
) -> Vec<PublicationPointCacheChildRestoreOutcome> {
let mut outcomes = vec![None; children.len()];
let mut valid_positions = Vec::new();
let mut hashes = Vec::new();
for (position, child) in children.iter().enumerate() {
let effective_not_before =
match parse_snapshot_time_value(&child.child_effective_not_before) {
Ok(value) => value,
Err(e) => {
warnings.push(
outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome {
child: None,
audit: None,
warning: Some(
Warning::new(format!(
"publication-point cache child has invalid effective notBefore: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
),
});
continue;
}
};
let effective_until = match parse_snapshot_time_value(&child.child_effective_until) {
Ok(value) => value,
Err(e) => {
warnings.push(
outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome {
child: None,
audit: None,
warning: Some(
Warning::new(format!(
"publication-point cache child has invalid effective until: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
),
});
continue;
}
};
if validation_time < effective_not_before || validation_time > effective_until {
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Skipped,
detail: Some("skipped: publication-point cache child expired".to_string()),
outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome {
child: None,
warning: None,
audit: Some(publication_point_cache_child_audit(
child,
AuditObjectResult::Skipped,
Some("skipped: publication-point cache child expired".to_string()),
)),
});
continue;
}
match store.get_blob_bytes(&child.child_cert_hash) {
Ok(Some(bytes)) => {
children.push(DiscoveredChildCaInstance {
valid_positions.push(position);
hashes.push(child.child_cert_hash.clone());
}
match store.get_blob_bytes_batch(&hashes) {
Ok(bytes_by_position) => {
for (position, maybe_bytes) in valid_positions.into_iter().zip(bytes_by_position) {
let child = &children[position];
outcomes[position] = Some(match maybe_bytes {
Some(bytes) => PublicationPointCacheChildRestoreOutcome {
child: Some(publication_point_cache_discovered_child(ca, child, bytes)),
warning: None,
audit: Some(publication_point_cache_child_audit(
child,
AuditObjectResult::Ok,
Some(
"restored child CA instance from publication-point cache"
.to_string(),
),
)),
},
None => PublicationPointCacheChildRestoreOutcome {
child: None,
warning: Some(
Warning::new(
"child certificate bytes missing for publication-point cache restoration",
)
.with_context(&child.child_cert_rsync_uri),
),
audit: Some(publication_point_cache_child_audit(
child,
AuditObjectResult::Error,
Some(
"child certificate bytes missing for publication-point cache restoration"
.to_string(),
),
)),
},
});
}
}
Err(e) => {
for position in valid_positions {
let child = &children[position];
outcomes[position] = Some(PublicationPointCacheChildRestoreOutcome {
child: None,
warning: Some(
Warning::new(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
))
.with_context(&child.child_cert_rsync_uri),
),
audit: Some(publication_point_cache_child_audit(
child,
AuditObjectResult::Error,
Some(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
)),
)),
});
}
}
}
outcomes
.into_iter()
.flatten()
.collect::<Vec<PublicationPointCacheChildRestoreOutcome>>()
}
fn collect_publication_point_cache_child_restore_outcomes(
outcomes: Vec<PublicationPointCacheChildRestoreOutcome>,
warnings: &mut Vec<Warning>,
) -> (Vec<DiscoveredChildCaInstance>, Vec<ObjectAuditEntry>) {
let mut children = Vec::new();
let mut audits = Vec::new();
for outcome in outcomes {
if let Some(warning) = outcome.warning {
warnings.push(warning);
}
if let Some(audit) = outcome.audit {
audits.push(audit);
}
if let Some(child) = outcome.child {
children.push(child);
}
}
(children, audits)
}
#[derive(Clone)]
struct PublicationPointCacheChildRestoreOutcome {
child: Option<DiscoveredChildCaInstance>,
audit: Option<ObjectAuditEntry>,
warning: Option<Warning>,
}
fn publication_point_cache_discovered_child(
ca: &CaInstanceHandle,
child: &PublicationPointCacheChild,
bytes: Vec<u8>,
) -> DiscoveredChildCaInstance {
DiscoveredChildCaInstance {
handle: CaInstanceHandle {
depth: 0,
tal_id: ca.tal_id.clone(),
@ -3334,9 +3543,7 @@ fn restore_children_from_publication_point_cache(
effective_as_resources: child.child_effective_as_resources.clone(),
rsync_base_uri: child.child_rsync_base_uri.clone(),
manifest_rsync_uri: child.child_manifest_rsync_uri.clone(),
publication_point_rsync_uri: child
.child_publication_point_rsync_uri
.clone(),
publication_point_rsync_uri: child.child_publication_point_rsync_uri.clone(),
rrdp_notification_uri: child.child_rrdp_notification_uri.clone(),
},
discovered_from: crate::audit::DiscoveredFrom {
@ -3344,55 +3551,21 @@ fn restore_children_from_publication_point_cache(
child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(),
child_ca_certificate_sha256_hex: child.child_cert_hash.clone(),
},
});
audits.push(ObjectAuditEntry {
}
}
fn publication_point_cache_child_audit(
child: &PublicationPointCacheChild,
result: AuditObjectResult,
detail: Option<String>,
) -> ObjectAuditEntry {
ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Ok,
detail: Some(
"restored child CA instance from publication-point cache".to_string(),
),
});
result,
detail,
}
Ok(None) => {
warnings.push(
Warning::new(
"child certificate bytes missing for publication-point cache restoration",
)
.with_context(&child.child_cert_rsync_uri),
);
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error,
detail: Some(
"child certificate bytes missing for publication-point cache restoration"
.to_string(),
),
});
}
Err(e) => {
warnings.push(
Warning::new(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error,
detail: Some(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
)),
});
}
}
}
(children, audits)
}
fn persist_vcir_for_fresh_result_with_timing(

View File

@ -2037,6 +2037,96 @@ fn runner_publication_point_cache_reuses_projection_outputs_children_and_ccr() {
assert!(phase_keys.contains("publication_point_cache_build_objects_total"));
}
#[test]
fn publication_point_cache_restore_children_parallel_keeps_order_and_audit() {
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy::default();
let validation_time = time::OffsetDateTime::UNIX_EPOCH + time::Duration::minutes(1);
let ca = publication_point_cache_fixture_ca();
seed_publication_point_cache_projection(&store, &policy, &ca, validation_time);
let mut projection = store
.get_publication_point_cache_projection(&ca.manifest_rsync_uri)
.expect("load projection")
.expect("projection");
let template = projection.children[0].clone();
let mut blobs = Vec::new();
let mut children = Vec::new();
for index in 0..300 {
let bytes = format!("child-cert-{index}").into_bytes();
let child_hash = sha256_hex(&bytes);
let mut child = template.clone();
child.child_cert_hash = child_hash.clone();
child.child_cert_rsync_uri = format!("rsync://example.test/repo/issuer/child-{index}.cer");
child.child_manifest_rsync_uri =
format!("rsync://example.test/repo/child-{index}/child.mft");
child.child_publication_point_rsync_uri =
format!("rsync://example.test/repo/child-{index}/");
child.child_rsync_base_uri = child.child_publication_point_rsync_uri.clone();
blobs.push((child_hash, bytes));
children.push(child);
}
projection.children = children;
store.put_blob_bytes_batch(&blobs).expect("put child blobs");
let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta {
recorded_at_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
tal_url: None,
db_path: None,
});
let mut warnings = Vec::new();
let (restored_children, audits) = restore_children_from_publication_point_cache(
&store,
&ca,
&projection,
validation_time,
&mut warnings,
4,
Some(&timing),
);
assert!(warnings.is_empty());
assert_eq!(restored_children.len(), 300);
assert_eq!(audits.len(), 300);
assert_eq!(
restored_children[0].handle.ca_certificate_der,
b"child-cert-0"
);
assert_eq!(
restored_children[299].handle.ca_certificate_der,
b"child-cert-299"
);
assert_eq!(
restored_children[0].handle.manifest_rsync_uri,
"rsync://example.test/repo/child-0/child.mft"
);
assert!(
audits
.iter()
.all(|audit| audit.result == AuditObjectResult::Ok)
);
let counts = timing.counts_snapshot();
assert_eq!(
counts
.get("publication_point_cache_restore_children_parallel_publication_points")
.copied(),
Some(1)
);
assert_eq!(
counts
.get("publication_point_cache_restore_children_parallel_children")
.copied(),
Some(300)
);
assert_eq!(
counts
.get("publication_point_cache_restore_children_workers_total")
.copied(),
Some(4)
);
}
#[test]
fn runner_publication_point_cache_blocks_parent_policy_and_output_time_mismatch() {
let store_dir = tempfile::tempdir().expect("store dir");