20260501 修复CurrentRepoIndex完整性并优化replay guard

This commit is contained in:
yuyr 2026-05-03 08:26:18 +08:00
parent ad61caf271
commit b3b44d50c6
10 changed files with 1230 additions and 61 deletions

View File

@ -4,8 +4,9 @@ use std::path::{Path, PathBuf};
use rocksdb::{DB, IteratorMode, Options};
use rpki::storage::{
ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW,
CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR, column_family_descriptors,
ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_MANIFEST_REPLAY_META, CF_RAW_BY_HASH,
CF_REPOSITORY_VIEW, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR,
column_family_descriptors,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -182,7 +183,7 @@ fn collect_db_file_stats(db_path: &Path) -> Result<DbFileStats, Box<dyn std::err
fn cf_group(cf_name: &str) -> CfGroup {
match cf_name {
CF_REPOSITORY_VIEW | CF_RAW_BY_HASH => CfGroup::CurrentRepositoryView,
CF_VCIR | CF_AUDIT_RULE_INDEX => CfGroup::CurrentValidationState,
CF_VCIR | CF_MANIFEST_REPLAY_META | CF_AUDIT_RULE_INDEX => CfGroup::CurrentValidationState,
CF_RRDP_SOURCE | CF_RRDP_SOURCE_MEMBER | CF_RRDP_URI_OWNER => CfGroup::CurrentRrdpState,
_ => CfGroup::LegacyCompatibility,
}
@ -370,6 +371,10 @@ mod tests {
assert_eq!(cf_group(CF_REPOSITORY_VIEW), CfGroup::CurrentRepositoryView);
assert_eq!(cf_group(CF_RAW_BY_HASH), CfGroup::CurrentRepositoryView);
assert_eq!(cf_group(CF_VCIR), CfGroup::CurrentValidationState);
assert_eq!(
cf_group(CF_MANIFEST_REPLAY_META),
CfGroup::CurrentValidationState
);
assert_eq!(
cf_group(CF_AUDIT_RULE_INDEX),
CfGroup::CurrentValidationState

View File

@ -178,6 +178,7 @@ pub fn build_cir_from_run_multi(
tal_bytes: binding.trust_anchor.tal.raw.clone(),
});
}
tals.sort_by(|a, b| a.tal_uri.cmp(&b.tal_uri));
let cir = CanonicalInputRepresentation {
version: CIR_VERSION_V1,
@ -700,6 +701,43 @@ mod tests {
);
}
#[test]
fn build_cir_from_run_multi_sorts_tals_by_tal_uri() {
let td = tempfile::tempdir().unwrap();
let store = RocksStore::open(td.path()).unwrap();
let apnic = sample_trust_anchor();
let arin = sample_arin_trust_anchor();
let cir = build_cir_from_run_multi(
&store,
&[
CirTalBinding {
trust_anchor: &apnic,
tal_uri: "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer",
},
CirTalBinding {
trust_anchor: &arin,
tal_uri: "https://rrdp.arin.net/arin-rpki-ta.cer",
},
],
sample_time(),
&[],
Some(&[]),
)
.expect("build cir with unsorted input bindings");
assert_eq!(
cir.tals
.iter()
.map(|tal| tal.tal_uri.as_str())
.collect::<Vec<_>>(),
vec![
"https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer",
"https://rrdp.arin.net/arin-rpki-ta.cer",
]
);
}
#[test]
fn build_cir_from_run_multi_rejects_invalid_tal_uri_and_missing_rsync_ta_uri() {
let td = tempfile::tempdir().unwrap();

View File

@ -154,6 +154,8 @@ Options:
Phase 2 object worker count (default: 8)
--parallel-phase2-worker-queue-capacity <n>
Phase 2 per-worker object queue capacity (default: 256)
--parallel-phase2-ready-batch-size <n>
Phase 2 ready publication points processed per scheduler turn (default: 256)
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync
@ -282,6 +284,15 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?;
}
"--parallel-phase2-ready-batch-size" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-ready-batch-size requires a value")?;
parallel_phase2_cfg.ready_batch_size = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-ready-batch-size: {v}"))?;
}
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
@ -496,6 +507,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage()
));
}
if parallel_phase2_cfg.ready_batch_size == 0 {
return Err(format!(
"--parallel-phase2-ready-batch-size must be > 0\n\n{}",
usage()
));
}
if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!(
"--ta-path cannot be used with --tal-url mode\n\n{}",
@ -2172,13 +2189,31 @@ mod tests {
"3".to_string(),
"--parallel-phase2-worker-queue-capacity".to_string(),
"17".to_string(),
"--parallel-phase2-ready-batch-size".to_string(),
"31".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert_eq!(args.parallel_phase2_config.object_workers, 3);
assert_eq!(args.parallel_phase2_config.worker_queue_capacity, 17);
assert_eq!(args.parallel_phase2_config.ready_batch_size, 31);
assert_eq!(args.parallel_phase1_config, ParallelPhase1Config::default());
}
#[test]
fn parse_rejects_zero_phase2_ready_batch_size() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2-ready-batch-size".to_string(),
"0".to_string(),
];
let err = parse_args(&argv).expect_err("zero ready batch must fail");
assert!(err.contains("--parallel-phase2-ready-batch-size"), "{err}");
}
#[test]
fn parse_rejects_removed_parallel_enable_flags() {
let argv = vec![

View File

@ -9,6 +9,7 @@ pub struct ParallelPhase1Config {
pub struct ParallelPhase2Config {
pub object_workers: usize,
pub worker_queue_capacity: usize,
pub ready_batch_size: usize,
}
impl Default for ParallelPhase2Config {
@ -16,6 +17,7 @@ impl Default for ParallelPhase2Config {
Self {
object_workers: 8,
worker_queue_capacity: 256,
ready_batch_size: 256,
}
}
}
@ -47,5 +49,6 @@ mod tests {
let cfg = ParallelPhase2Config::default();
assert!(cfg.object_workers > 0);
assert!(cfg.worker_queue_capacity > 0);
assert!(cfg.ready_batch_size > 0);
}
}

View File

@ -16,6 +16,13 @@ pub fn slow_threshold_secs() -> f64 {
.unwrap_or(30.0)
}
pub fn stage_fresh_slow_threshold_ms() -> u64 {
std::env::var("RPKI_PROGRESS_STAGE_FRESH_SLOW_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1_000)
}
pub fn emit(kind: &str, payload: Value) {
if !progress_enabled() {
return;

View File

@ -16,6 +16,7 @@ pub const CF_REPOSITORY_VIEW: &str = "repository_view";
pub const CF_RAW_BY_HASH: &str = "raw_by_hash";
pub const CF_RAW_BLOB: &str = "raw_blob";
pub const CF_VCIR: &str = "vcir";
pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta";
pub const CF_AUDIT_RULE_INDEX: &str = "audit_rule_index";
pub const CF_RRDP_SOURCE: &str = "rrdp_source";
pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member";
@ -26,6 +27,7 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[
CF_RAW_BY_HASH,
CF_RAW_BLOB,
CF_VCIR,
CF_MANIFEST_REPLAY_META,
CF_AUDIT_RULE_INDEX,
CF_RRDP_SOURCE,
CF_RRDP_SOURCE_MEMBER,
@ -36,6 +38,7 @@ const REPOSITORY_VIEW_KEY_PREFIX: &str = "repo_view:";
const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:";
const RAW_BLOB_KEY_PREFIX: &str = "rawblob:";
const VCIR_KEY_PREFIX: &str = "vcir:";
const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:";
const AUDIT_ROA_RULE_KEY_PREFIX: &str = "audit:roa_rule:";
const AUDIT_ASPA_RULE_KEY_PREFIX: &str = "audit:aspa_rule:";
const AUDIT_ROUTER_KEY_RULE_KEY_PREFIX: &str = "audit:router_key_rule:";
@ -290,6 +293,57 @@ impl ValidatedManifestMeta {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestReplayMeta {
pub manifest_rsync_uri: String,
pub manifest_number_be: Vec<u8>,
pub manifest_this_update: PackTime,
pub manifest_sha256: Vec<u8>,
pub updated_at_validation_time: PackTime,
}
impl ManifestReplayMeta {
pub fn from_vcir(vcir: &ValidatedCaInstanceResult) -> Self {
Self {
manifest_rsync_uri: vcir.manifest_rsync_uri.clone(),
manifest_number_be: vcir
.validated_manifest_meta
.validated_manifest_number
.clone(),
manifest_this_update: vcir
.validated_manifest_meta
.validated_manifest_this_update
.clone(),
manifest_sha256: vcir.ccr_manifest_projection.manifest_sha256.clone(),
updated_at_validation_time: vcir.last_successful_validation_time.clone(),
}
}
pub fn validate_internal(&self) -> StorageResult<()> {
validate_non_empty(
"manifest_replay_meta.manifest_rsync_uri",
&self.manifest_rsync_uri,
)?;
validate_manifest_number_be(
"manifest_replay_meta.manifest_number_be",
&self.manifest_number_be,
)?;
parse_time(
"manifest_replay_meta.manifest_this_update",
&self.manifest_this_update,
)?;
validate_sha256_digest_bytes(
"manifest_replay_meta.manifest_sha256",
&self.manifest_sha256,
)?;
parse_time(
"manifest_replay_meta.updated_at_validation_time",
&self.updated_at_validation_time,
)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct VcirCcrManifestProjection {
pub manifest_rsync_uri: String,
@ -1270,13 +1324,18 @@ impl RocksStore {
pub fn put_vcir(&self, vcir: &ValidatedCaInstanceResult) -> StorageResult<()> {
vcir.validate_internal()?;
let cf = self.cf(CF_VCIR)?;
let vcir_cf = self.cf(CF_VCIR)?;
let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let replay_meta = ManifestReplayMeta::from_vcir(vcir);
replay_meta.validate_internal()?;
let mut batch = WriteBatch::default();
let key = vcir_key(&vcir.manifest_rsync_uri);
let value = encode_cbor(vcir, "vcir")?;
self.db
.put_cf(cf, key.as_bytes(), value)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
batch.put_cf(vcir_cf, key.as_bytes(), value);
let replay_key = manifest_replay_meta_key(&replay_meta.manifest_rsync_uri);
let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?;
batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value);
self.write_batch(batch)
}
pub fn replace_vcir_and_audit_rule_indexes(
@ -1286,12 +1345,18 @@ impl RocksStore {
) -> StorageResult<()> {
vcir.validate_internal()?;
let vcir_cf = self.cf(CF_VCIR)?;
let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let audit_cf = self.cf(CF_AUDIT_RULE_INDEX)?;
let mut batch = WriteBatch::default();
let vcir_key = vcir_key(&vcir.manifest_rsync_uri);
let vcir_value = encode_cbor(vcir, "vcir")?;
batch.put_cf(vcir_cf, vcir_key.as_bytes(), vcir_value);
let replay_meta = ManifestReplayMeta::from_vcir(vcir);
replay_meta.validate_internal()?;
let replay_key = manifest_replay_meta_key(&replay_meta.manifest_rsync_uri);
let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?;
batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value);
if let Some(previous) = previous {
for output in &previous.local_outputs {
@ -1343,6 +1408,24 @@ impl RocksStore {
Ok(Some(vcir))
}
pub fn get_manifest_replay_meta(
&self,
manifest_rsync_uri: &str,
) -> StorageResult<Option<ManifestReplayMeta>> {
let cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let key = manifest_replay_meta_key(manifest_rsync_uri);
let Some(bytes) = self
.db
.get_cf(cf, key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?
else {
return Ok(None);
};
let meta = decode_cbor::<ManifestReplayMeta>(&bytes, "manifest_replay_meta")?;
meta.validate_internal()?;
Ok(Some(meta))
}
pub fn list_vcirs(&self) -> StorageResult<Vec<ValidatedCaInstanceResult>> {
let cf = self.cf(CF_VCIR)?;
let mode = IteratorMode::Start;
@ -1357,12 +1440,14 @@ impl RocksStore {
}
pub fn delete_vcir(&self, manifest_rsync_uri: &str) -> StorageResult<()> {
let cf = self.cf(CF_VCIR)?;
let vcir_cf = self.cf(CF_VCIR)?;
let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let mut batch = WriteBatch::default();
let key = vcir_key(manifest_rsync_uri);
self.db
.delete_cf(cf, key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
batch.delete_cf(vcir_cf, key.as_bytes());
let replay_key = manifest_replay_meta_key(manifest_rsync_uri);
batch.delete_cf(replay_cf, replay_key.as_bytes());
self.write_batch(batch)
}
pub fn put_audit_rule_index_entry(&self, entry: &AuditRuleIndexEntry) -> StorageResult<()> {
@ -1622,6 +1707,10 @@ fn vcir_key(manifest_rsync_uri: &str) -> String {
format!("{VCIR_KEY_PREFIX}{manifest_rsync_uri}")
}
fn manifest_replay_meta_key(manifest_rsync_uri: &str) -> String {
format!("{MANIFEST_REPLAY_META_KEY_PREFIX}{manifest_rsync_uri}")
}
fn audit_rule_kind_for_output_type(output_type: VcirOutputType) -> Option<AuditRuleKind> {
match output_type {
VcirOutputType::Vrp => Some(AuditRuleKind::Roa),
@ -2704,6 +2793,26 @@ mod tests {
.expect("get vcir")
.expect("vcir exists");
assert_eq!(got, vcir);
let replay_meta = store
.get_manifest_replay_meta(&vcir.manifest_rsync_uri)
.expect("get manifest replay meta")
.expect("manifest replay meta exists");
assert_eq!(
replay_meta,
ManifestReplayMeta {
manifest_rsync_uri: vcir.manifest_rsync_uri.clone(),
manifest_number_be: vcir
.validated_manifest_meta
.validated_manifest_number
.clone(),
manifest_this_update: vcir
.validated_manifest_meta
.validated_manifest_this_update
.clone(),
manifest_sha256: vcir.ccr_manifest_projection.manifest_sha256.clone(),
updated_at_validation_time: vcir.last_successful_validation_time.clone(),
}
);
let mut invalid = sample_vcir("rsync://example.test/repo/invalid.mft");
invalid.summary.local_vrp_count = 9;
@ -2728,6 +2837,37 @@ mod tests {
.expect("get deleted vcir")
.is_none()
);
assert!(
store
.get_manifest_replay_meta(&vcir.manifest_rsync_uri)
.expect("get deleted manifest replay meta")
.is_none()
);
}
#[test]
fn manifest_replay_meta_validation_reports_invalid_fields() {
let mut meta = ManifestReplayMeta {
manifest_rsync_uri: "rsync://example.test/repo/current.mft".to_string(),
manifest_number_be: vec![3],
manifest_this_update: pack_time(0),
manifest_sha256: vec![0x11; 32],
updated_at_validation_time: pack_time(1),
};
meta.validate_internal().expect("valid replay meta");
meta.manifest_sha256 = vec![0x11; 31];
let err = meta
.validate_internal()
.expect_err("short manifest sha must fail");
assert!(err.to_string().contains("must be 32 bytes"));
meta.manifest_sha256 = vec![0x11; 32];
meta.manifest_number_be = vec![0, 3];
let err = meta
.validate_internal()
.expect_err("non-minimal manifest number must fail");
assert!(err.to_string().contains("minimal big-endian"));
}
#[test]
@ -2855,6 +2995,18 @@ mod tests {
.expect("get replaced vcir")
.expect("vcir exists");
assert_eq!(got, current);
let replay_meta = store
.get_manifest_replay_meta(&current.manifest_rsync_uri)
.expect("get replaced replay meta")
.expect("replay meta exists");
assert_eq!(
replay_meta.manifest_number_be,
current.validated_manifest_meta.validated_manifest_number
);
assert_eq!(
replay_meta.manifest_sha256,
current.ccr_manifest_projection.manifest_sha256
);
assert!(
store
.get_audit_rule_index_entry(
@ -2877,9 +3029,6 @@ mod tests {
#[test]
fn storage_helpers_cover_optional_validation_paths() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let withdrawn = RepositoryViewEntry {
rsync_uri: "rsync://example.test/repo/withdrawn.cer".to_string(),
current_hash: Some(sha256_hex(b"withdrawn")),

View File

@ -2,7 +2,9 @@ use crate::analysis::timing::TimingHandle;
use crate::audit::AuditDownloadKind;
use crate::audit_downloads::DownloadLogHandle;
use crate::current_repo_index::CurrentRepoIndexHandle;
use crate::storage::{RocksStore, RrdpDeltaOp, RrdpSourceSyncState};
use crate::storage::{
RepositoryViewEntry, RepositoryViewState, RocksStore, RrdpDeltaOp, RrdpSourceSyncState,
};
use crate::sync::store_projection::{
build_repository_view_present_entry, build_repository_view_withdrawn_entry,
build_rrdp_source_member_present_record, build_rrdp_source_member_withdrawn_record,
@ -735,6 +737,22 @@ fn sync_from_notification_inner(
if let Some(s) = same_session_state {
if s.serial == notif.serial {
let _hydrate_step = timing
.as_ref()
.map(|t| t.span_rrdp_repo_step(notification_uri, "hydrate_current_index"));
let _hydrate_total = timing
.as_ref()
.map(|t| t.span_phase("rrdp_hydrate_current_index_total"));
let hydrated = hydrate_current_repo_index_from_rrdp_members(
store,
notification_uri,
current_repo_index,
)?;
drop(_hydrate_step);
drop(_hydrate_total);
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_current_index_hydrated_objects_total", hydrated as u64);
}
return Ok(0);
}
if s.serial > notif.serial {
@ -865,6 +883,25 @@ fn sync_from_notification_inner(
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_delta_ops_applied_total", applied_total as u64);
}
let _hydrate_step = timing.as_ref().map(|t| {
t.span_rrdp_repo_step(notification_uri, "hydrate_current_index")
});
let _hydrate_total = timing
.as_ref()
.map(|t| t.span_phase("rrdp_hydrate_current_index_total"));
let hydrated = hydrate_current_repo_index_from_rrdp_members(
store,
notification_uri,
current_repo_index,
)?;
drop(_hydrate_step);
drop(_hydrate_total);
if let Some(t) = timing.as_ref() {
t.record_count(
"rrdp_current_index_hydrated_objects_total",
hydrated as u64,
);
}
return Ok(applied_total);
}
}
@ -960,6 +997,47 @@ fn sync_from_notification_inner(
Ok(published)
}
fn hydrate_current_repo_index_from_rrdp_members(
store: &RocksStore,
notification_uri: &str,
current_repo_index: Option<&CurrentRepoIndexHandle>,
) -> Result<usize, RrdpSyncError> {
let Some(index) = current_repo_index else {
return Ok(0);
};
let members = store
.list_current_rrdp_source_members(notification_uri)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
if members.is_empty() {
return Ok(0);
}
let mut entries = Vec::with_capacity(members.len());
for member in members {
let current_hash = member.current_hash.ok_or_else(|| {
RrdpSyncError::Storage(format!(
"rrdp source member missing current_hash for current object {}",
member.rsync_uri
))
})?;
entries.push(RepositoryViewEntry {
rsync_uri: member.rsync_uri,
current_hash: Some(current_hash),
repository_source: Some(notification_uri.to_string()),
object_type: member.object_type,
state: RepositoryViewState::Present,
});
}
index
.lock()
.map_err(|_| RrdpSyncError::Storage("current repo index lock poisoned".to_string()))?
.apply_repository_view_entries(&entries)
.map_err(RrdpSyncError::Storage)?;
Ok(entries.len())
}
fn apply_delta(
store: &RocksStore,
notification_uri: &str,
@ -1704,6 +1782,7 @@ fn strip_all_ascii_whitespace(s: &str) -> String {
mod tests {
use super::*;
use crate::analysis::timing::{TimingHandle, TimingMeta};
use crate::current_repo_index::CurrentRepoIndex;
use crate::storage::RocksStore;
use std::collections::HashMap;
use std::time::Duration;
@ -2625,6 +2704,118 @@ mod tests {
assert_eq!(state.serial, 3);
}
#[test]
fn sync_from_notification_same_serial_hydrates_current_repo_index() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(tmp.path()).expect("open rocksdb");
let sid = "550e8400-e29b-41d4-a716-446655440000";
let notif_uri = "https://example.net/notification.xml";
let snapshot_uri = "https://example.net/snapshot.xml";
let uri_a = "rsync://example.net/repo/a.mft";
let uri_b = "rsync://example.net/repo/b.roa";
let snapshot = snapshot_xml(sid, 1, &[(uri_a, b"a1"), (uri_b, b"b1")]);
let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot));
let notif = notification_xml(sid, 1, snapshot_uri, &snapshot_hash);
let fetcher_1 = MapFetcher {
map: HashMap::from([(snapshot_uri.to_string(), snapshot)]),
};
sync_from_notification_snapshot(&store, notif_uri, &notif, &fetcher_1).expect("seed");
let index = CurrentRepoIndex::shared();
let no_fetcher = MapFetcher {
map: HashMap::new(),
};
let applied = sync_from_notification_with_timing_and_download_log(
&store,
notif_uri,
Some(&index),
&notif,
&no_fetcher,
None,
None,
)
.expect("same serial no-op");
assert_eq!(applied, 0);
let index = index.lock().expect("lock index");
assert_eq!(index.active_uri_count(), 2);
assert!(index.get_by_uri(uri_a).is_some());
assert!(index.get_by_uri(uri_b).is_some());
}
#[test]
fn sync_from_notification_delta_hydrates_unchanged_current_repo_entries() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(tmp.path()).expect("open rocksdb");
let sid = "550e8400-e29b-41d4-a716-446655440000";
let notif_uri = "https://example.net/notification.xml";
let snapshot_uri_1 = "https://example.net/snapshot-1.xml";
let uri_a = "rsync://example.net/repo/a.mft";
let uri_b = "rsync://example.net/repo/b.roa";
let uri_c = "rsync://example.net/repo/c.crl";
let snapshot_1 = snapshot_xml(sid, 1, &[(uri_a, b"a1"), (uri_b, b"b1")]);
let snapshot_hash_1 = hex::encode(sha2::Sha256::digest(&snapshot_1));
let notif_1 = notification_xml(sid, 1, snapshot_uri_1, &snapshot_hash_1);
let fetcher_1 = MapFetcher {
map: HashMap::from([(snapshot_uri_1.to_string(), snapshot_1)]),
};
sync_from_notification_snapshot(&store, notif_uri, &notif_1, &fetcher_1).expect("seed");
let publish_c_b64 = base64::engine::general_purpose::STANDARD.encode(b"c2");
let delta_2 = delta_xml(
sid,
2,
&[&format!(
r#"<publish uri="{uri_c}">{publish_c_b64}</publish>"#
)],
);
let delta_2_hash_hex = hex::encode(sha2::Sha256::digest(&delta_2));
let notif_2 = notification_xml_with_deltas(
sid,
2,
"https://example.net/snapshot-2.xml",
&"00".repeat(32),
&[(
"d2",
2,
"https://example.net/delta-2.xml",
&delta_2_hash_hex,
)],
);
let fetcher_2 = MapFetcher {
map: HashMap::from([("https://example.net/delta-2.xml".to_string(), delta_2)]),
};
let index = CurrentRepoIndex::shared();
let applied = sync_from_notification_with_timing_and_download_log(
&store,
notif_uri,
Some(&index),
&notif_2,
&fetcher_2,
None,
None,
)
.expect("delta sync");
assert_eq!(applied, 1);
let index = index.lock().expect("lock index");
assert_eq!(index.active_uri_count(), 3);
assert!(
index.get_by_uri(uri_a).is_some(),
"unchanged object from the previous serial must be visible"
);
assert!(
index.get_by_uri(uri_b).is_some(),
"unchanged object from the previous serial must be visible"
);
assert!(index.get_by_uri(uri_c).is_some(), "delta publish visible");
}
#[test]
fn load_rrdp_local_state_uses_source_record_only() {
let tmp = tempfile::tempdir().expect("tempdir");

View File

@ -327,12 +327,20 @@ pub fn process_manifest_publication_point_fresh_after_repo_sync(
#[derive(Clone, Debug, Default)]
pub struct FreshPublicationPointTimingBreakdown {
pub current_index_lock_ms: u64,
pub manifest_load_ms: u64,
pub manifest_index_lookup_ms: u64,
pub manifest_blob_load_ms: u64,
pub manifest_decode_ms: u64,
pub replay_guard_ms: u64,
pub replay_meta_hit: bool,
pub replay_meta_miss: bool,
pub manifest_entries_ms: u64,
pub pack_files_ms: u64,
pub pack_files_index_lookup_ms: u64,
pub pack_files_blob_load_ms: u64,
pub ee_path_validate_ms: u64,
pub manifest_file_count: usize,
}
pub fn process_manifest_publication_point_fresh_after_repo_sync_with_timing(
@ -620,7 +628,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
ManifestFreshError,
> {
let mut timing = FreshPublicationPointTimingBreakdown::default();
let current_index_lock_started = std::time::Instant::now();
let current_index_guard = current_repo_index.and_then(|handle| handle.lock().ok());
timing.current_index_lock_ms = current_index_lock_started.elapsed().as_millis() as u64;
if !rsync_uri_is_under_publication_point(manifest_rsync_uri, publication_point_rsync_uri) {
return Err(ManifestFreshError::ManifestOutsidePublicationPoint {
@ -631,11 +641,14 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
let manifest_load_started = std::time::Instant::now();
let manifest_bytes = if let Some(index) = current_index_guard.as_ref() {
let manifest_lookup_started = std::time::Instant::now();
let current = index.get_by_uri(manifest_rsync_uri).ok_or_else(|| {
ManifestFreshError::MissingManifest {
manifest_rsync_uri: manifest_rsync_uri.to_string(),
}
})?;
timing.manifest_index_lookup_ms = manifest_lookup_started.elapsed().as_millis() as u64;
let manifest_blob_load_started = std::time::Instant::now();
store
.get_blob_bytes(&current.current_hash_hex)
.map_err(|e| ManifestFreshError::MissingManifest {
@ -643,8 +656,13 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
})?
.ok_or_else(|| ManifestFreshError::MissingManifest {
manifest_rsync_uri: manifest_rsync_uri.to_string(),
})
.inspect(|_| {
timing.manifest_blob_load_ms =
manifest_blob_load_started.elapsed().as_millis() as u64;
})?
} else {
let manifest_blob_load_started = std::time::Instant::now();
store
.load_current_object_bytes_by_uri(manifest_rsync_uri)
.map_err(|e| ManifestFreshError::MissingManifest {
@ -652,6 +670,10 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
})?
.ok_or_else(|| ManifestFreshError::MissingManifest {
manifest_rsync_uri: manifest_rsync_uri.to_string(),
})
.inspect(|_| {
timing.manifest_blob_load_ms =
manifest_blob_load_started.elapsed().as_millis() as u64;
})?
};
timing.manifest_load_ms = manifest_load_started.elapsed().as_millis() as u64;
@ -684,20 +706,21 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
// - If manifestNumber is higher, require thisUpdate to be more recent than the previously
// validated thisUpdate.
let replay_guard_started = std::time::Instant::now();
if let Some(old_vcir) = store.get_vcir(manifest_rsync_uri).ok().flatten() {
if old_vcir.manifest_rsync_uri == manifest_rsync_uri {
if let Some(old_meta) = store
.get_manifest_replay_meta(manifest_rsync_uri)
.ok()
.flatten()
{
timing.replay_meta_hit = true;
if old_meta.manifest_rsync_uri == manifest_rsync_uri {
let new_num = manifest.manifest.manifest_number.bytes_be.as_slice();
let old_num = old_vcir
.validated_manifest_meta
.validated_manifest_number
.as_slice();
let old_num = old_meta.manifest_number_be.as_slice();
match cmp_minimal_be_unsigned(new_num, old_num) {
Ordering::Greater => {
let old_this_update = old_vcir
.validated_manifest_meta
.validated_manifest_this_update
let old_this_update = old_meta
.manifest_this_update
.parse()
.expect("vcir internal validation ensures thisUpdate parses");
.expect("manifest replay meta validation ensures thisUpdate parses");
if this_update <= old_this_update {
use time::format_description::well_known::Rfc3339;
return Err(ManifestFreshError::ThisUpdateNotIncreasing {
@ -712,14 +735,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
}
}
Ordering::Equal => {
let old_manifest_hash =
old_vcir.related_artifacts.iter().find_map(|artifact| {
(artifact.artifact_role == VcirArtifactRole::Manifest
&& artifact.uri.as_deref() == Some(manifest_rsync_uri))
.then_some(artifact.sha256.as_str())
});
let new_manifest_hash = hex::encode(sha2::Sha256::digest(&manifest_bytes));
if old_manifest_hash != Some(new_manifest_hash.as_str()) {
let new_manifest_hash = sha2::Sha256::digest(&manifest_bytes);
if old_meta.manifest_sha256.as_slice() != new_manifest_hash.as_slice() {
return Err(ManifestFreshError::ManifestNumberNotIncreasing {
old_hex: hex::encode_upper(old_num),
new_hex: hex::encode_upper(new_num),
@ -734,6 +751,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
}
}
}
} else {
timing.replay_meta_miss = true;
}
timing.replay_guard_ms = replay_guard_started.elapsed().as_millis() as u64;
@ -743,6 +762,7 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
.parse_files()
.map_err(ManifestDecodeError::Validate)?;
timing.manifest_entries_ms = manifest_entries_started.elapsed().as_millis() as u64;
timing.manifest_file_count = entries.len();
let mut files = Vec::with_capacity(manifest.manifest.file_count());
let pack_files_started = std::time::Instant::now();
let external_raw_store = store
@ -753,22 +773,27 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
.external_repo_bytes_ref()
.cloned()
.map(std::sync::Arc::new);
let mut pack_files_index_lookup_duration = std::time::Duration::ZERO;
let mut pack_files_blob_load_duration = std::time::Duration::ZERO;
for entry in &entries {
let rsync_uri =
join_rsync_dir_and_file(publication_point_rsync_uri, entry.file_name.as_str());
let current_object = if let Some(index) = current_index_guard.as_ref() {
let index_lookup_started = std::time::Instant::now();
let current =
index
.get_by_uri(&rsync_uri)
.ok_or_else(|| ManifestFreshError::MissingFile {
rsync_uri: rsync_uri.clone(),
})?;
pack_files_index_lookup_duration += index_lookup_started.elapsed();
crate::storage::CurrentObjectWithHash {
current_hash_hex: current.current_hash_hex.clone(),
current_hash: current.current_hash,
bytes: Vec::new(),
}
} else {
let blob_load_started = std::time::Instant::now();
store
.load_current_object_with_hash_by_uri(&rsync_uri)
.map_err(|_e| ManifestFreshError::MissingFile {
@ -776,6 +801,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
})?
.ok_or_else(|| ManifestFreshError::MissingFile {
rsync_uri: rsync_uri.clone(),
})
.inspect(|_| {
pack_files_blob_load_duration += blob_load_started.elapsed();
})?
};
@ -803,6 +831,7 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
));
} else {
let bytes = if current_object.bytes.is_empty() {
let blob_load_started = std::time::Instant::now();
store
.get_blob_bytes(&current_object.current_hash_hex)
.map_err(|_e| ManifestFreshError::MissingFile {
@ -810,6 +839,9 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
})?
.ok_or_else(|| ManifestFreshError::MissingFile {
rsync_uri: rsync_uri.clone(),
})
.inspect(|_| {
pack_files_blob_load_duration += blob_load_started.elapsed();
})?
} else {
current_object.bytes
@ -821,6 +853,8 @@ pub(crate) fn try_build_fresh_publication_point_with_timing(
));
}
}
timing.pack_files_index_lookup_ms = pack_files_index_lookup_duration.as_millis() as u64;
timing.pack_files_blob_load_ms = pack_files_blob_load_duration.as_millis() as u64;
timing.pack_files_ms = pack_files_started.elapsed().as_millis() as u64;
// RFC 6488 §3: manifest (signed object) validity includes a valid EE cert path.
@ -1044,12 +1078,67 @@ mod tests {
.expect("put repository view entry");
}
fn put_complete_publication_point_current_objects(
store: &RocksStore,
manifest: &ManifestObject,
manifest_rsync_uri: &str,
manifest_bytes: Vec<u8>,
publication_point_rsync_uri: &str,
) {
put_current_object(store, manifest_rsync_uri, manifest_bytes, "mft");
for entry in manifest.manifest.parse_files().expect("parse files") {
let file_path = manifest_fixture_path()
.parent()
.unwrap()
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read fixture file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin");
put_current_object(store, &rsync_uri, bytes, object_type);
}
}
fn put_raw_only(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
store
.put_raw_by_hash_entry(&raw_by_hash_entry(rsync_uri, bytes, object_type))
.expect("put raw_by_hash entry");
}
fn sample_vcir_for_manifest_replay_meta(
manifest: &ManifestObject,
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
manifest_bytes: &[u8],
validation_time: time::OffsetDateTime,
) -> ValidatedCaInstanceResult {
let manifest_hash = hex::encode(sha2::Sha256::digest(manifest_bytes));
let this_update = manifest
.manifest
.this_update
.to_offset(time::UtcOffset::UTC);
let mut vcir = sample_current_instance_vcir(
manifest_rsync_uri,
publication_point_rsync_uri,
&manifest_hash,
"rsync://example.test/repo/object.roa",
&hex::encode(sha2::Sha256::digest(b"object")),
validation_time,
true,
);
vcir.validated_manifest_meta.validated_manifest_number =
manifest.manifest.manifest_number.bytes_be.clone();
vcir.validated_manifest_meta.validated_manifest_this_update =
PackTime::from_utc_offset_datetime(this_update);
vcir.ccr_manifest_projection.manifest_number_be =
manifest.manifest.manifest_number.bytes_be.clone();
vcir.ccr_manifest_projection.manifest_this_update =
PackTime::from_utc_offset_datetime(this_update);
vcir.ccr_manifest_projection.manifest_sha256 =
hex::decode(manifest_hash).expect("decode manifest hash");
vcir.ccr_manifest_projection.manifest_size = manifest_bytes.len() as u64;
vcir
}
fn sample_current_instance_vcir(
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
@ -1371,6 +1460,82 @@ mod tests {
assert_eq!(fresh.files.len(), manifest.manifest.file_count());
}
#[test]
fn try_build_fresh_publication_point_records_replay_meta_miss() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let (
manifest,
manifest_bytes,
manifest_rsync_uri,
publication_point_rsync_uri,
validation_time,
) = load_manifest_fixture();
put_complete_publication_point_current_objects(
&store,
&manifest,
&manifest_rsync_uri,
manifest_bytes,
&publication_point_rsync_uri,
);
let (_fresh, timing) = try_build_fresh_publication_point_with_timing(
&store,
&manifest_rsync_uri,
&publication_point_rsync_uri,
None,
&issuer_ca_fixture_der(),
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("fresh publication point without replay meta");
assert!(timing.replay_meta_miss);
assert!(!timing.replay_meta_hit);
}
#[test]
fn try_build_fresh_publication_point_uses_replay_meta_hit_for_same_manifest() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let (
manifest,
manifest_bytes,
manifest_rsync_uri,
publication_point_rsync_uri,
validation_time,
) = load_manifest_fixture();
let previous_vcir = sample_vcir_for_manifest_replay_meta(
&manifest,
&manifest_rsync_uri,
&publication_point_rsync_uri,
&manifest_bytes,
validation_time,
);
store.put_vcir(&previous_vcir).expect("put previous vcir");
put_complete_publication_point_current_objects(
&store,
&manifest,
&manifest_rsync_uri,
manifest_bytes,
&publication_point_rsync_uri,
);
let (_fresh, timing) = try_build_fresh_publication_point_with_timing(
&store,
&manifest_rsync_uri,
&publication_point_rsync_uri,
None,
&issuer_ca_fixture_der(),
Some(issuer_ca_rsync_uri()),
validation_time,
)
.expect("fresh publication point with matching replay meta");
assert!(timing.replay_meta_hit);
assert!(!timing.replay_meta_miss);
}
#[test]
fn validate_manifest_embedded_ee_cert_path_rejects_missing_crl_files() {
let (manifest, _, _, publication_point_rsync_uri, validation_time) =

View File

@ -19,7 +19,7 @@ use crate::validation::cert_path::{CertPathError, validate_signed_object_ee_cert
use crate::validation::manifest::PublicationPointData;
use crate::validation::publication_point::PublicationPointSnapshot;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};
use x509_parser::prelude::FromDer;
use x509_parser::x509::SubjectPublicKeyInfo;
@ -118,6 +118,9 @@ pub(crate) struct RoaTaskOk {
pub(crate) struct RoaTaskResult {
pub(crate) publication_point_id: u64,
pub(crate) index: usize,
pub(crate) worker_index: usize,
pub(crate) queue_wait_ms: u64,
pub(crate) worker_ms: u64,
pub(crate) rsync_uri: String,
pub(crate) sha256_hex: String,
pub(crate) outcome: Result<RoaTaskOk, ObjectValidateError>,
@ -736,14 +739,15 @@ pub(crate) struct OwnedRoaTask {
issuer_effective_as: Option<crate::data_model::rc::AsResourceSet>,
validation_time: time::OffsetDateTime,
collect_vcir_local_outputs: bool,
pub(crate) submitted_at: Option<Instant>,
}
#[derive(Clone)]
struct RoaTaskExecutor;
impl ObjectTaskExecutor<OwnedRoaTask, RoaTaskResult> for RoaTaskExecutor {
fn execute(&self, _worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult {
validate_owned_roa_task(task)
fn execute(&self, worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult {
validate_owned_roa_task(worker_index, task)
}
}
@ -787,7 +791,13 @@ impl ParallelRoaWorkerPool {
}
}
fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult {
fn validate_owned_roa_task(worker_index: usize, task: OwnedRoaTask) -> RoaTaskResult {
let worker_started = Instant::now();
let queue_wait_ms = task
.submitted_at
.map(|submitted_at| worker_started.saturating_duration_since(submitted_at))
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0);
let sha256_hex = sha256_hex_from_32(&task.file.sha256);
let issuer_spki = match SubjectPublicKeyInfo::from_der(task.issuer_spki_der.as_ref()) {
Ok((rem, spki)) if rem.is_empty() => spki,
@ -795,6 +805,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult {
return RoaTaskResult {
publication_point_id: task.publication_point_id,
index: task.index,
worker_index,
queue_wait_ms,
worker_ms: worker_started.elapsed().as_millis() as u64,
rsync_uri: task.file.rsync_uri,
sha256_hex,
outcome: Err(ObjectValidateError::CertPath(
@ -806,6 +819,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult {
return RoaTaskResult {
publication_point_id: task.publication_point_id,
index: task.index,
worker_index,
queue_wait_ms,
worker_ms: worker_started.elapsed().as_millis() as u64,
rsync_uri: task.file.rsync_uri,
sha256_hex,
outcome: Err(ObjectValidateError::CertPath(
@ -837,6 +853,9 @@ fn validate_owned_roa_task(task: OwnedRoaTask) -> RoaTaskResult {
RoaTaskResult {
publication_point_id: task.publication_point_id,
index: task.index,
worker_index,
queue_wait_ms,
worker_ms: worker_started.elapsed().as_millis() as u64,
rsync_uri: task.file.rsync_uri,
sha256_hex,
outcome,
@ -888,6 +907,7 @@ impl ParallelObjectsStage {
issuer_effective_as: self.issuer_effective_as.clone(),
validation_time: self.validation_time,
collect_vcir_local_outputs: self.collect_vcir_local_outputs,
submitted_at: None,
})
.collect()
}
@ -895,6 +915,14 @@ impl ParallelObjectsStage {
pub(crate) fn roa_task_count(&self) -> usize {
self.stats.roa_total
}
pub(crate) fn aspa_task_count(&self) -> usize {
self.stats.aspa_total
}
pub(crate) fn locked_file_count(&self) -> usize {
self.locked_files.len()
}
}
pub(crate) fn prepare_publication_point_for_parallel_roa<P: PublicationPointData>(
@ -1408,6 +1436,9 @@ pub(crate) fn validate_roa_task_serial(
RoaTaskResult {
publication_point_id: 0,
index: task.index,
worker_index: 0,
queue_wait_ms: 0,
worker_ms: 0,
rsync_uri: task.file.rsync_uri.clone(),
sha256_hex,
outcome,

View File

@ -41,6 +41,15 @@ struct InflightPublicationPoint {
started_at: Instant,
objects_started_at: Instant,
task_count: usize,
tasks_submitted: usize,
first_task_submitted_at: Option<Instant>,
last_task_submitted_at: Option<Instant>,
first_result_at: Option<Instant>,
last_result_at: Option<Instant>,
worker_ms_total: u64,
worker_ms_max: u64,
queue_wait_ms_total: u64,
queue_wait_ms_max: u64,
results: Vec<crate::validation::objects::RoaTaskResult>,
}
@ -49,6 +58,220 @@ struct FinishedPublicationPoint {
result: Result<PublicationPointRunResult, String>,
}
#[derive(Default)]
struct ReadyStageMetrics {
manifest_rsync_uri: Option<String>,
publication_point_rsync_uri: Option<String>,
ready_count: usize,
fallback_count: usize,
complete_count: usize,
staged_count: usize,
zero_task_count: usize,
error_count: usize,
discovered_children: usize,
locked_files: usize,
roa_tasks: usize,
aspa_objects: usize,
stage_fresh_ms: u64,
snapshot_prepare_ms: u64,
snapshot_current_index_lock_ms: u64,
snapshot_manifest_load_ms: u64,
snapshot_manifest_index_lookup_ms: u64,
snapshot_manifest_blob_load_ms: u64,
snapshot_manifest_decode_ms: u64,
snapshot_replay_guard_ms: u64,
replay_meta_hit_count: usize,
replay_meta_miss_count: usize,
snapshot_manifest_entries_ms: u64,
snapshot_pack_files_ms: u64,
snapshot_pack_files_index_lookup_ms: u64,
snapshot_pack_files_blob_load_ms: u64,
snapshot_ee_path_validate_ms: u64,
snapshot_manifest_file_count: usize,
child_discovery_ms: u64,
child_enqueue_ms: u64,
prepare_ms: u64,
build_roa_tasks_ms: u64,
total_ms: u64,
}
#[derive(Default)]
struct ReadyStageBatchMetrics {
ready_count: usize,
fallback_count: usize,
complete_count: usize,
staged_count: usize,
zero_task_count: usize,
error_count: usize,
discovered_children: usize,
locked_files: usize,
roa_tasks: usize,
aspa_objects: usize,
stage_fresh_ms_total: u64,
stage_fresh_ms_max: u64,
stage_fresh_ms_max_manifest_rsync_uri: Option<String>,
stage_fresh_ms_max_publication_point_rsync_uri: Option<String>,
snapshot_prepare_ms_total: u64,
snapshot_prepare_ms_max: u64,
snapshot_current_index_lock_ms_total: u64,
snapshot_current_index_lock_ms_max: u64,
snapshot_manifest_load_ms_total: u64,
snapshot_manifest_load_ms_max: u64,
snapshot_manifest_index_lookup_ms_total: u64,
snapshot_manifest_index_lookup_ms_max: u64,
snapshot_manifest_blob_load_ms_total: u64,
snapshot_manifest_blob_load_ms_max: u64,
snapshot_manifest_decode_ms_total: u64,
snapshot_manifest_decode_ms_max: u64,
snapshot_replay_guard_ms_total: u64,
snapshot_replay_guard_ms_max: u64,
replay_meta_hit_count: usize,
replay_meta_miss_count: usize,
snapshot_manifest_entries_ms_total: u64,
snapshot_manifest_entries_ms_max: u64,
snapshot_pack_files_ms_total: u64,
snapshot_pack_files_ms_max: u64,
snapshot_pack_files_index_lookup_ms_total: u64,
snapshot_pack_files_index_lookup_ms_max: u64,
snapshot_pack_files_blob_load_ms_total: u64,
snapshot_pack_files_blob_load_ms_max: u64,
snapshot_ee_path_validate_ms_total: u64,
snapshot_ee_path_validate_ms_max: u64,
snapshot_manifest_file_count_total: usize,
snapshot_manifest_file_count_max: usize,
child_discovery_ms_total: u64,
child_discovery_ms_max: u64,
child_enqueue_ms_total: u64,
child_enqueue_ms_max: u64,
prepare_ms_total: u64,
prepare_ms_max: u64,
build_roa_tasks_ms_total: u64,
build_roa_tasks_ms_max: u64,
total_ms: u64,
}
impl ReadyStageBatchMetrics {
fn record(&mut self, metrics: ReadyStageMetrics) {
self.ready_count += metrics.ready_count;
self.fallback_count += metrics.fallback_count;
self.complete_count += metrics.complete_count;
self.staged_count += metrics.staged_count;
self.zero_task_count += metrics.zero_task_count;
self.error_count += metrics.error_count;
self.discovered_children += metrics.discovered_children;
self.locked_files += metrics.locked_files;
self.roa_tasks += metrics.roa_tasks;
self.aspa_objects += metrics.aspa_objects;
if metrics.stage_fresh_ms >= self.stage_fresh_ms_max {
self.stage_fresh_ms_max_manifest_rsync_uri = metrics.manifest_rsync_uri.clone();
self.stage_fresh_ms_max_publication_point_rsync_uri =
metrics.publication_point_rsync_uri.clone();
}
self.stage_fresh_ms_total += metrics.stage_fresh_ms;
self.stage_fresh_ms_max = self.stage_fresh_ms_max.max(metrics.stage_fresh_ms);
self.snapshot_prepare_ms_total += metrics.snapshot_prepare_ms;
self.snapshot_prepare_ms_max = self
.snapshot_prepare_ms_max
.max(metrics.snapshot_prepare_ms);
self.snapshot_current_index_lock_ms_total += metrics.snapshot_current_index_lock_ms;
self.snapshot_current_index_lock_ms_max = self
.snapshot_current_index_lock_ms_max
.max(metrics.snapshot_current_index_lock_ms);
self.snapshot_manifest_load_ms_total += metrics.snapshot_manifest_load_ms;
self.snapshot_manifest_load_ms_max = self
.snapshot_manifest_load_ms_max
.max(metrics.snapshot_manifest_load_ms);
self.snapshot_manifest_index_lookup_ms_total += metrics.snapshot_manifest_index_lookup_ms;
self.snapshot_manifest_index_lookup_ms_max = self
.snapshot_manifest_index_lookup_ms_max
.max(metrics.snapshot_manifest_index_lookup_ms);
self.snapshot_manifest_blob_load_ms_total += metrics.snapshot_manifest_blob_load_ms;
self.snapshot_manifest_blob_load_ms_max = self
.snapshot_manifest_blob_load_ms_max
.max(metrics.snapshot_manifest_blob_load_ms);
self.snapshot_manifest_decode_ms_total += metrics.snapshot_manifest_decode_ms;
self.snapshot_manifest_decode_ms_max = self
.snapshot_manifest_decode_ms_max
.max(metrics.snapshot_manifest_decode_ms);
self.snapshot_replay_guard_ms_total += metrics.snapshot_replay_guard_ms;
self.snapshot_replay_guard_ms_max = self
.snapshot_replay_guard_ms_max
.max(metrics.snapshot_replay_guard_ms);
self.replay_meta_hit_count += metrics.replay_meta_hit_count;
self.replay_meta_miss_count += metrics.replay_meta_miss_count;
self.snapshot_manifest_entries_ms_total += metrics.snapshot_manifest_entries_ms;
self.snapshot_manifest_entries_ms_max = self
.snapshot_manifest_entries_ms_max
.max(metrics.snapshot_manifest_entries_ms);
self.snapshot_pack_files_ms_total += metrics.snapshot_pack_files_ms;
self.snapshot_pack_files_ms_max = self
.snapshot_pack_files_ms_max
.max(metrics.snapshot_pack_files_ms);
self.snapshot_pack_files_index_lookup_ms_total +=
metrics.snapshot_pack_files_index_lookup_ms;
self.snapshot_pack_files_index_lookup_ms_max = self
.snapshot_pack_files_index_lookup_ms_max
.max(metrics.snapshot_pack_files_index_lookup_ms);
self.snapshot_pack_files_blob_load_ms_total += metrics.snapshot_pack_files_blob_load_ms;
self.snapshot_pack_files_blob_load_ms_max = self
.snapshot_pack_files_blob_load_ms_max
.max(metrics.snapshot_pack_files_blob_load_ms);
self.snapshot_ee_path_validate_ms_total += metrics.snapshot_ee_path_validate_ms;
self.snapshot_ee_path_validate_ms_max = self
.snapshot_ee_path_validate_ms_max
.max(metrics.snapshot_ee_path_validate_ms);
self.snapshot_manifest_file_count_total += metrics.snapshot_manifest_file_count;
self.snapshot_manifest_file_count_max = self
.snapshot_manifest_file_count_max
.max(metrics.snapshot_manifest_file_count);
self.child_discovery_ms_total += metrics.child_discovery_ms;
self.child_discovery_ms_max = self.child_discovery_ms_max.max(metrics.child_discovery_ms);
self.child_enqueue_ms_total += metrics.child_enqueue_ms;
self.child_enqueue_ms_max = self.child_enqueue_ms_max.max(metrics.child_enqueue_ms);
self.prepare_ms_total += metrics.prepare_ms;
self.prepare_ms_max = self.prepare_ms_max.max(metrics.prepare_ms);
self.build_roa_tasks_ms_total += metrics.build_roa_tasks_ms;
self.build_roa_tasks_ms_max = self.build_roa_tasks_ms_max.max(metrics.build_roa_tasks_ms);
self.total_ms += metrics.total_ms;
}
}
#[derive(Default)]
struct RoaDispatchMetrics {
attempted: usize,
submitted: usize,
queue_full: bool,
pending_remaining: usize,
duration_ms: u64,
}
#[derive(Default)]
struct ObjectDrainMetrics {
results_drained: usize,
publication_points_finalized: usize,
reduce_ms_total: u64,
reduce_ms_max: u64,
finalize_ms_total: u64,
finalize_ms_max: u64,
worker_ms_total: u64,
worker_ms_max: u64,
queue_wait_ms_total: u64,
queue_wait_ms_max: u64,
duration_ms: u64,
}
#[derive(Default)]
struct RepoDrainMetrics {
event_count: usize,
completions: usize,
ready_enqueued: usize,
duration_ms: u64,
}
fn elapsed_ms(started: Instant) -> u64 {
started.elapsed().as_millis() as u64
}
fn compact_phase2_finished_result(
mut result: PublicationPointRunResult,
compact_audit: bool,
@ -109,6 +332,12 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
let mut pending_roa_dispatch: VecDeque<OwnedRoaTask> = VecDeque::new();
let mut finished: Vec<FinishedPublicationPoint> = Vec::new();
let mut instances_started = 0usize;
let ready_batch_size = runner
.parallel_phase2_config
.as_ref()
.map(|cfg| cfg.ready_batch_size)
.unwrap_or(256)
.max(1);
loop {
while can_start_more(instances_started, config) {
@ -149,8 +378,13 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
}
}
while let Some(ready) = ready_queue.pop_front() {
stage_ready_publication_point(
let ready_batch_started = Instant::now();
let mut ready_batch_metrics = ReadyStageBatchMetrics::default();
while ready_batch_metrics.ready_count < ready_batch_size {
let Some(ready) = ready_queue.pop_front() else {
break;
};
let metrics = stage_ready_publication_point(
runner,
&mut next_id,
&mut ca_queue,
@ -160,15 +394,129 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
ready,
config.compact_audit,
);
ready_batch_metrics.record(metrics);
}
if ready_batch_metrics.ready_count > 0 {
ready_batch_metrics.total_ms = elapsed_ms(ready_batch_started);
crate::progress_log::emit(
"phase2_ready_queue_batch",
serde_json::json!({
"ready_count": ready_batch_metrics.ready_count,
"fallback_count": ready_batch_metrics.fallback_count,
"complete_count": ready_batch_metrics.complete_count,
"staged_count": ready_batch_metrics.staged_count,
"zero_task_count": ready_batch_metrics.zero_task_count,
"error_count": ready_batch_metrics.error_count,
"discovered_children": ready_batch_metrics.discovered_children,
"locked_files": ready_batch_metrics.locked_files,
"roa_tasks": ready_batch_metrics.roa_tasks,
"aspa_objects": ready_batch_metrics.aspa_objects,
"stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total,
"stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max,
"stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri,
"stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri,
"child_enqueue_ms_total": ready_batch_metrics.child_enqueue_ms_total,
"child_enqueue_ms_max": ready_batch_metrics.child_enqueue_ms_max,
"prepare_ms_total": ready_batch_metrics.prepare_ms_total,
"prepare_ms_max": ready_batch_metrics.prepare_ms_max,
"build_roa_tasks_ms_total": ready_batch_metrics.build_roa_tasks_ms_total,
"build_roa_tasks_ms_max": ready_batch_metrics.build_roa_tasks_ms_max,
"batch_duration_ms": ready_batch_metrics.total_ms,
"ready_batch_size": ready_batch_size,
"ready_queue_len_after_batch": ready_queue.len(),
"ready_queue_budget_exhausted": !ready_queue.is_empty(),
"ca_queue_len_after_batch": ca_queue.len(),
"pending_roa_dispatch_len_after_batch": pending_roa_dispatch.len(),
"inflight_publication_points_after_batch": inflight_publication_points.len(),
}),
);
crate::progress_log::emit(
"phase2_ready_queue_stage_fresh_breakdown",
serde_json::json!({
"ready_count": ready_batch_metrics.ready_count,
"stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total,
"stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max,
"stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri,
"stage_fresh_ms_max_publication_point_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_publication_point_rsync_uri,
"snapshot_prepare_ms_total": ready_batch_metrics.snapshot_prepare_ms_total,
"snapshot_prepare_ms_max": ready_batch_metrics.snapshot_prepare_ms_max,
"snapshot_current_index_lock_ms_total": ready_batch_metrics.snapshot_current_index_lock_ms_total,
"snapshot_current_index_lock_ms_max": ready_batch_metrics.snapshot_current_index_lock_ms_max,
"snapshot_manifest_load_ms_total": ready_batch_metrics.snapshot_manifest_load_ms_total,
"snapshot_manifest_load_ms_max": ready_batch_metrics.snapshot_manifest_load_ms_max,
"snapshot_manifest_index_lookup_ms_total": ready_batch_metrics.snapshot_manifest_index_lookup_ms_total,
"snapshot_manifest_index_lookup_ms_max": ready_batch_metrics.snapshot_manifest_index_lookup_ms_max,
"snapshot_manifest_blob_load_ms_total": ready_batch_metrics.snapshot_manifest_blob_load_ms_total,
"snapshot_manifest_blob_load_ms_max": ready_batch_metrics.snapshot_manifest_blob_load_ms_max,
"snapshot_manifest_decode_ms_total": ready_batch_metrics.snapshot_manifest_decode_ms_total,
"snapshot_manifest_decode_ms_max": ready_batch_metrics.snapshot_manifest_decode_ms_max,
"snapshot_replay_guard_ms_total": ready_batch_metrics.snapshot_replay_guard_ms_total,
"snapshot_replay_guard_ms_max": ready_batch_metrics.snapshot_replay_guard_ms_max,
"replay_meta_hit_count": ready_batch_metrics.replay_meta_hit_count,
"replay_meta_miss_count": ready_batch_metrics.replay_meta_miss_count,
"snapshot_manifest_entries_ms_total": ready_batch_metrics.snapshot_manifest_entries_ms_total,
"snapshot_manifest_entries_ms_max": ready_batch_metrics.snapshot_manifest_entries_ms_max,
"snapshot_pack_files_ms_total": ready_batch_metrics.snapshot_pack_files_ms_total,
"snapshot_pack_files_ms_max": ready_batch_metrics.snapshot_pack_files_ms_max,
"snapshot_pack_files_index_lookup_ms_total": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_total,
"snapshot_pack_files_index_lookup_ms_max": ready_batch_metrics.snapshot_pack_files_index_lookup_ms_max,
"snapshot_pack_files_blob_load_ms_total": ready_batch_metrics.snapshot_pack_files_blob_load_ms_total,
"snapshot_pack_files_blob_load_ms_max": ready_batch_metrics.snapshot_pack_files_blob_load_ms_max,
"snapshot_ee_path_validate_ms_total": ready_batch_metrics.snapshot_ee_path_validate_ms_total,
"snapshot_ee_path_validate_ms_max": ready_batch_metrics.snapshot_ee_path_validate_ms_max,
"snapshot_manifest_file_count_total": ready_batch_metrics.snapshot_manifest_file_count_total,
"snapshot_manifest_file_count_max": ready_batch_metrics.snapshot_manifest_file_count_max,
"child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total,
"child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max,
"batch_duration_ms": ready_batch_metrics.total_ms,
}),
);
}
flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?;
drain_object_results(
let dispatch_metrics = flush_pending_roa_dispatch(
runner,
&mut pending_roa_dispatch,
&mut inflight_publication_points,
)?;
if dispatch_metrics.attempted > 0 || dispatch_metrics.queue_full {
crate::progress_log::emit(
"phase2_roa_dispatch_batch",
serde_json::json!({
"attempted": dispatch_metrics.attempted,
"submitted": dispatch_metrics.submitted,
"queue_full": dispatch_metrics.queue_full,
"pending_remaining": dispatch_metrics.pending_remaining,
"duration_ms": dispatch_metrics.duration_ms,
"inflight_publication_points": inflight_publication_points.len(),
}),
);
}
let drain_metrics = drain_object_results(
runner,
&mut inflight_publication_points,
&mut finished,
config.compact_audit,
)?;
if drain_metrics.results_drained > 0 {
crate::progress_log::emit(
"phase2_object_results_drain",
serde_json::json!({
"results_drained": drain_metrics.results_drained,
"publication_points_finalized": drain_metrics.publication_points_finalized,
"reduce_ms_total": drain_metrics.reduce_ms_total,
"reduce_ms_max": drain_metrics.reduce_ms_max,
"finalize_ms_total": drain_metrics.finalize_ms_total,
"finalize_ms_max": drain_metrics.finalize_ms_max,
"worker_ms_total": drain_metrics.worker_ms_total,
"worker_ms_max": drain_metrics.worker_ms_max,
"queue_wait_ms_total": drain_metrics.queue_wait_ms_total,
"queue_wait_ms_max": drain_metrics.queue_wait_ms_max,
"duration_ms": drain_metrics.duration_ms,
"pending_roa_dispatch_len": pending_roa_dispatch.len(),
"inflight_publication_points": inflight_publication_points.len(),
}),
);
}
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
@ -177,12 +525,25 @@ pub fn run_tree_parallel_phase2_audit_multi_root(
instances_started,
config,
);
drain_repo_events(
let repo_metrics = drain_repo_events(
repo_runtime.as_ref(),
&mut ca_waiting_repo_by_identity,
&mut ready_queue,
repo_poll_timeout,
)?;
if repo_metrics.event_count > 0 {
crate::progress_log::emit(
"phase2_repo_events_drain",
serde_json::json!({
"event_count": repo_metrics.event_count,
"completions": repo_metrics.completions,
"ready_enqueued": repo_metrics.ready_enqueued,
"duration_ms": repo_metrics.duration_ms,
"ready_queue_len": ready_queue.len(),
"ca_waiting_repo_identities": ca_waiting_repo_by_identity.len(),
}),
);
}
if is_complete(
&ca_queue,
@ -219,21 +580,48 @@ fn stage_ready_publication_point(
finished: &mut Vec<FinishedPublicationPoint>,
ready: ReadyCaInstance,
compact_audit: bool,
) {
) -> ReadyStageMetrics {
let publication_point_started = Instant::now();
let mut metrics = ReadyStageMetrics {
ready_count: 1,
manifest_rsync_uri: Some(ready.node.handle.manifest_rsync_uri.clone()),
publication_point_rsync_uri: Some(ready.node.handle.publication_point_rsync_uri.clone()),
..ReadyStageMetrics::default()
};
let mut warnings = ready.repo_outcome.warnings.clone();
let repo_outcome = ready.repo_outcome.clone();
let stage_fresh_started = Instant::now();
let stage = runner.stage_fresh_publication_point_after_repo_ready(
&ready.node.handle,
repo_outcome.repo_sync_ok,
repo_outcome.repo_sync_err.as_deref(),
);
metrics.stage_fresh_ms = elapsed_ms(stage_fresh_started);
let fresh_stage = match stage {
Ok(stage) => stage,
Err(_) => {
Err(err) => {
if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() {
crate::progress_log::emit(
"phase2_stage_fresh_slow",
serde_json::json!({
"manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(),
"status": "error",
"error": err.error.to_string(),
"stage_fresh_ms": metrics.stage_fresh_ms,
"snapshot_prepare_ms": err.snapshot_prepare_ms,
"repo_sync_source": repo_outcome.repo_sync_source.as_deref(),
"repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(),
"repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms,
}),
);
}
metrics.fallback_count = 1;
let fallback = runner.run_publication_point(&ready.node.handle);
if let Ok(result) = fallback.as_ref() {
metrics.discovered_children = result.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
@ -241,16 +629,73 @@ fn stage_ready_publication_point(
&ready.node,
result.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
}
finished.push(FinishedPublicationPoint {
node: ready.node,
result: compact_phase2_finished_result_result(fallback, compact_audit),
});
return;
metrics.total_ms = elapsed_ms(publication_point_started);
return metrics;
}
};
metrics.snapshot_prepare_ms = fresh_stage.snapshot_prepare_ms;
metrics.snapshot_current_index_lock_ms =
fresh_stage.snapshot_prepare_timing.current_index_lock_ms;
metrics.snapshot_manifest_load_ms = fresh_stage.snapshot_prepare_timing.manifest_load_ms;
metrics.snapshot_manifest_index_lookup_ms =
fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms;
metrics.snapshot_manifest_blob_load_ms =
fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms;
metrics.snapshot_manifest_decode_ms = fresh_stage.snapshot_prepare_timing.manifest_decode_ms;
metrics.snapshot_replay_guard_ms = fresh_stage.snapshot_prepare_timing.replay_guard_ms;
metrics.replay_meta_hit_count = fresh_stage.snapshot_prepare_timing.replay_meta_hit as usize;
metrics.replay_meta_miss_count = fresh_stage.snapshot_prepare_timing.replay_meta_miss as usize;
metrics.snapshot_manifest_entries_ms = fresh_stage.snapshot_prepare_timing.manifest_entries_ms;
metrics.snapshot_pack_files_ms = fresh_stage.snapshot_prepare_timing.pack_files_ms;
metrics.snapshot_pack_files_index_lookup_ms = fresh_stage
.snapshot_prepare_timing
.pack_files_index_lookup_ms;
metrics.snapshot_pack_files_blob_load_ms =
fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms;
metrics.snapshot_ee_path_validate_ms = fresh_stage.snapshot_prepare_timing.ee_path_validate_ms;
metrics.snapshot_manifest_file_count = fresh_stage.snapshot_prepare_timing.manifest_file_count;
metrics.child_discovery_ms = fresh_stage.child_discovery_ms;
if metrics.stage_fresh_ms >= crate::progress_log::stage_fresh_slow_threshold_ms() {
crate::progress_log::emit(
"phase2_stage_fresh_slow",
serde_json::json!({
"manifest_rsync_uri": ready.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": ready.node.handle.publication_point_rsync_uri.as_str(),
"status": "ok",
"stage_fresh_ms": metrics.stage_fresh_ms,
"snapshot_prepare_ms": fresh_stage.snapshot_prepare_ms,
"snapshot_current_index_lock_ms": fresh_stage.snapshot_prepare_timing.current_index_lock_ms,
"snapshot_manifest_load_ms": fresh_stage.snapshot_prepare_timing.manifest_load_ms,
"snapshot_manifest_index_lookup_ms": fresh_stage.snapshot_prepare_timing.manifest_index_lookup_ms,
"snapshot_manifest_blob_load_ms": fresh_stage.snapshot_prepare_timing.manifest_blob_load_ms,
"snapshot_manifest_decode_ms": fresh_stage.snapshot_prepare_timing.manifest_decode_ms,
"snapshot_replay_guard_ms": fresh_stage.snapshot_prepare_timing.replay_guard_ms,
"replay_meta_hit": fresh_stage.snapshot_prepare_timing.replay_meta_hit,
"replay_meta_miss": fresh_stage.snapshot_prepare_timing.replay_meta_miss,
"snapshot_manifest_entries_ms": fresh_stage.snapshot_prepare_timing.manifest_entries_ms,
"snapshot_pack_files_ms": fresh_stage.snapshot_prepare_timing.pack_files_ms,
"snapshot_pack_files_index_lookup_ms": fresh_stage.snapshot_prepare_timing.pack_files_index_lookup_ms,
"snapshot_pack_files_blob_load_ms": fresh_stage.snapshot_prepare_timing.pack_files_blob_load_ms,
"snapshot_ee_path_validate_ms": fresh_stage.snapshot_prepare_timing.ee_path_validate_ms,
"snapshot_manifest_file_count": fresh_stage.snapshot_prepare_timing.manifest_file_count,
"child_discovery_ms": fresh_stage.child_discovery_ms,
"child_count": fresh_stage.discovered_children.len(),
"repo_sync_source": repo_outcome.repo_sync_source.as_deref(),
"repo_sync_phase": repo_outcome.repo_sync_phase.as_deref(),
"repo_sync_duration_ms": repo_outcome.repo_sync_duration_ms,
}),
);
}
warnings.extend(fresh_stage.warnings.clone());
metrics.discovered_children = fresh_stage.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
@ -258,7 +703,9 @@ fn stage_ready_publication_point(
&ready.node,
fresh_stage.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
let prepare_started = Instant::now();
match prepare_publication_point_for_parallel_roa(
ready.node.id,
&fresh_stage.fresh_point,
@ -270,6 +717,10 @@ fn stage_ready_publication_point(
runner.persist_vcir,
) {
ParallelObjectsPrepare::Complete(mut objects) => {
metrics.prepare_ms = elapsed_ms(prepare_started);
metrics.complete_count = 1;
metrics.roa_tasks = objects.stats.roa_total;
metrics.aspa_objects = objects.stats.aspa_total;
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
@ -291,12 +742,20 @@ fn stage_ready_publication_point(
);
}
ParallelObjectsPrepare::Staged(objects_stage) => {
metrics.prepare_ms = elapsed_ms(prepare_started);
metrics.staged_count = 1;
metrics.locked_files = objects_stage.locked_file_count();
metrics.aspa_objects = objects_stage.aspa_task_count();
let build_tasks_started = Instant::now();
let tasks = objects_stage.build_roa_tasks();
metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started);
let task_count = objects_stage.roa_task_count();
metrics.roa_tasks = task_count;
for task in tasks {
pending_roa_dispatch.push_back(task);
}
if task_count == 0 {
metrics.zero_task_count = 1;
match reduce_parallel_roa_stage(objects_stage, Vec::new(), runner.timing.as_ref()) {
Ok(mut objects) => {
objects
@ -336,12 +795,23 @@ fn stage_ready_publication_point(
started_at: publication_point_started,
objects_started_at: Instant::now(),
task_count,
tasks_submitted: 0,
first_task_submitted_at: None,
last_task_submitted_at: None,
first_result_at: None,
last_result_at: None,
worker_ms_total: 0,
worker_ms_max: 0,
queue_wait_ms_total: 0,
queue_wait_ms_max: 0,
results: Vec::with_capacity(task_count),
},
);
}
}
}
metrics.total_ms = elapsed_ms(publication_point_started);
metrics
}
fn enqueue_discovered_children(
@ -410,15 +880,32 @@ fn finalize_ready_objects(
fn flush_pending_roa_dispatch(
runner: &Rpkiv1PublicationPointRunner<'_>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
) -> Result<(), TreeRunError> {
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
) -> Result<RoaDispatchMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = RoaDispatchMetrics::default();
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(());
return Ok(metrics);
};
while let Some(task) = pending_roa_dispatch.pop_front() {
while let Some(mut task) = pending_roa_dispatch.pop_front() {
metrics.attempted += 1;
let pp_id = task.publication_point_id;
task.submitted_at = Some(Instant::now());
match pool.try_submit_round_robin(task) {
Ok(_) => {}
Ok(_) => {
metrics.submitted += 1;
if let Some(state) = inflight_publication_points.get_mut(&pp_id) {
let now = Instant::now();
state.tasks_submitted += 1;
if state.first_task_submitted_at.is_none() {
state.first_task_submitted_at = Some(now);
}
state.last_task_submitted_at = Some(now);
}
}
Err(ObjectWorkerSubmitError::QueueFull { task, .. }) => {
pending_roa_dispatch.push_front(task);
metrics.queue_full = true;
break;
}
Err(ObjectWorkerSubmitError::Disconnected { .. }) => {
@ -428,7 +915,9 @@ fn flush_pending_roa_dispatch(
}
}
}
Ok(())
metrics.pending_remaining = pending_roa_dispatch.len();
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_object_results(
@ -436,9 +925,11 @@ fn drain_object_results(
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>,
compact_audit: bool,
) -> Result<(), TreeRunError> {
) -> Result<ObjectDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = ObjectDrainMetrics::default();
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(());
return Ok(metrics);
};
loop {
let Some(result) = pool
@ -447,8 +938,23 @@ fn drain_object_results(
else {
break;
};
metrics.results_drained += 1;
let pp_id = result.publication_point_id;
let _worker_index = result.worker_index;
metrics.worker_ms_total += result.worker_ms;
metrics.worker_ms_max = metrics.worker_ms_max.max(result.worker_ms);
metrics.queue_wait_ms_total += result.queue_wait_ms;
metrics.queue_wait_ms_max = metrics.queue_wait_ms_max.max(result.queue_wait_ms);
let should_finalize = if let Some(state) = inflight_publication_points.get_mut(&pp_id) {
let now = Instant::now();
if state.first_result_at.is_none() {
state.first_result_at = Some(now);
}
state.last_result_at = Some(now);
state.worker_ms_total += result.worker_ms;
state.worker_ms_max = state.worker_ms_max.max(result.worker_ms);
state.queue_wait_ms_total += result.queue_wait_ms;
state.queue_wait_ms_max = state.queue_wait_ms_max.max(result.queue_wait_ms);
state.results.push(result);
state.results.len() == state.task_count
} else {
@ -459,12 +965,19 @@ fn drain_object_results(
.remove(&pp_id)
.expect("inflight publication point must exist");
let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64;
match reduce_parallel_roa_stage(
let reduce_started = Instant::now();
let reduce_result = reduce_parallel_roa_stage(
state.objects_stage,
state.results,
runner.timing.as_ref(),
) {
);
let reduce_ms = elapsed_ms(reduce_started);
metrics.reduce_ms_total += reduce_ms;
metrics.reduce_ms_max = metrics.reduce_ms_max.max(reduce_ms);
metrics.publication_points_finalized += 1;
match reduce_result {
Ok(mut objects) => {
let finalize_started = Instant::now();
objects
.router_keys
.extend(state.fresh_stage.discovered_router_keys.clone());
@ -488,12 +1001,37 @@ fn drain_object_results(
state.repo_outcome.repo_sync_err.as_deref(),
)
.map(|out| out.result);
let finalize_ms = elapsed_ms(finalize_started);
metrics.finalize_ms_total += finalize_ms;
metrics.finalize_ms_max = metrics.finalize_ms_max.max(finalize_ms);
crate::progress_log::emit(
"phase2_publication_point_reduced",
serde_json::json!({
"manifest_rsync_uri": state.node.handle.manifest_rsync_uri,
"publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri,
"manifest_rsync_uri": state.node.handle.manifest_rsync_uri.as_str(),
"publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri.as_str(),
"objects_processing_ms": objects_processing_ms,
"task_count": state.task_count,
"tasks_submitted": state.tasks_submitted,
"first_task_submitted_ms": state.first_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"last_task_submitted_ms": state.last_task_submitted_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"task_submit_span_ms": match (state.first_task_submitted_at, state.last_task_submitted_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"first_result_ms": state.first_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"last_result_ms": state.last_result_at.map(|t| t.saturating_duration_since(state.objects_started_at).as_millis() as u64),
"result_span_ms": match (state.first_result_at, state.last_result_at) {
(Some(first), Some(last)) => Some(last.saturating_duration_since(first).as_millis() as u64),
_ => None,
},
"worker_ms_total": state.worker_ms_total,
"worker_ms_max": state.worker_ms_max,
"worker_ms_avg": if state.task_count > 0 { state.worker_ms_total / state.task_count as u64 } else { 0 },
"queue_wait_ms_total": state.queue_wait_ms_total,
"queue_wait_ms_max": state.queue_wait_ms_max,
"queue_wait_ms_avg": if state.task_count > 0 { state.queue_wait_ms_total / state.task_count as u64 } else { 0 },
"reduce_ms": reduce_ms,
"finalize_ms": finalize_ms,
"total_duration_ms": state.started_at.elapsed().as_millis() as u64,
}),
);
@ -509,7 +1047,8 @@ fn drain_object_results(
}
}
}
Ok(())
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn drain_repo_events(
@ -517,11 +1056,15 @@ fn drain_repo_events(
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
ready_queue: &mut VecDeque<ReadyCaInstance>,
timeout: Duration,
) -> Result<(), TreeRunError> {
) -> Result<RepoDrainMetrics, TreeRunError> {
let started = Instant::now();
let mut metrics = RepoDrainMetrics::default();
if let Some(event) = repo_runtime
.recv_repo_result_timeout(timeout)
.map_err(TreeRunError::Runner)?
{
metrics.event_count += 1;
metrics.completions += event.completions.len();
for completion in event.completions {
let mut outcome = completion.outcome;
if completion.identity != event.transport_identity {
@ -530,6 +1073,7 @@ fn drain_repo_events(
outcome.repo_sync_duration_ms = 0;
}
if let Some(waiters) = ca_waiting_repo_by_identity.remove(&completion.identity) {
metrics.ready_enqueued += waiters.len();
for node in waiters {
ready_queue.push_back(ReadyCaInstance {
node,
@ -539,7 +1083,8 @@ fn drain_repo_events(
}
}
}
Ok(())
metrics.duration_ms = elapsed_ms(started);
Ok(metrics)
}
fn event_poll_timeout(