From d4d227ce60b68cdc371e5d44f8ce25550e5f32d1 Mon Sep 17 00:00:00 2001 From: yuyr Date: Thu, 18 Jun 2026 16:33:37 +0800 Subject: [PATCH] =?UTF-8?q?20260618=20=E5=A2=9E=E5=8A=A0PP=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E5=91=BD=E4=B8=AD=E8=B7=AF=E5=BE=84=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E5=9F=8B=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/soak/run_soak.sh | 4 + src/progress_log.rs | 14 +++ src/validation/tree_parallel.rs | 75 +++++++++++ src/validation/tree_runner.rs | 185 +++++++++++++++++++++++++--- src/validation/tree_runner/tests.rs | 27 ++++ 5 files changed, 286 insertions(+), 19 deletions(-) diff --git a/scripts/soak/run_soak.sh b/scripts/soak/run_soak.sh index 10b0cde..aa6f065 100755 --- a/scripts/soak/run_soak.sh +++ b/scripts/soak/run_soak.sh @@ -26,6 +26,8 @@ RPKI_PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}" RPKI_PROGRESS_SLOW_SECS="${RPKI_PROGRESS_SLOW_SECS:-10}" RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="${RPKI_PROGRESS_STAGE_FRESH_SLOW_MS:-1000}" RPKI_PROGRESS_PP_CONTROL_SLOW_MS="${RPKI_PROGRESS_PP_CONTROL_SLOW_MS:-100}" +RPKI_PROGRESS_PP_CACHE_SLOW_MS="${RPKI_PROGRESS_PP_CACHE_SLOW_MS:-50}" +RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="${RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS:-1000}" DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}" RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}" RPKI_ANALYZE="${RPKI_ANALYZE:-0}" @@ -577,6 +579,8 @@ run_one_round() { RPKI_PROGRESS_SLOW_SECS="$RPKI_PROGRESS_SLOW_SECS" \ RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="$RPKI_PROGRESS_STAGE_FRESH_SLOW_MS" \ RPKI_PROGRESS_PP_CONTROL_SLOW_MS="$RPKI_PROGRESS_PP_CONTROL_SLOW_MS" \ + RPKI_PROGRESS_PP_CACHE_SLOW_MS="$RPKI_PROGRESS_PP_CACHE_SLOW_MS" \ + RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="$RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS" \ "$RPKI_DAEMON_BIN" "${daemon_args[@]}" -- "${CHILD_ARGS[@]}" \ > "$run_dir/daemon-stdout.log" 2> "$run_dir/daemon-stderr.log" daemon_exit_code=$? diff --git a/src/progress_log.rs b/src/progress_log.rs index e8323b6..3612082 100644 --- a/src/progress_log.rs +++ b/src/progress_log.rs @@ -30,6 +30,20 @@ pub fn pp_control_slow_threshold_ms() -> u64 { .unwrap_or(100) } +pub fn pp_cache_slow_threshold_ms() -> u64 { + std::env::var("RPKI_PROGRESS_PP_CACHE_SLOW_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(50) +} + +pub fn control_loop_slow_threshold_ms() -> u64 { + std::env::var("RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1_000) +} + pub fn emit(kind: &str, payload: Value) { if !progress_enabled() { return; diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index 1f301a5..b1d849d 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -409,6 +409,55 @@ fn elapsed_ms(started: Instant) -> u64 { started.elapsed().as_millis() as u64 } +fn emit_control_loop_slow( + duration_ms: u64, + repo_poll_timeout: Duration, + repo_metrics: &RepoDrainMetrics, + ready_batch_metrics: &ReadyStageBatchMetrics, + ca_queue_len: usize, + ready_queue_len: usize, + ca_waiting_repo_identities: usize, + pending_roa_dispatch_len: usize, + inflight_publication_points_len: usize, + pending_finalization_len: usize, + finalize_inflight: usize, +) { + let threshold_ms = crate::progress_log::control_loop_slow_threshold_ms(); + if duration_ms < threshold_ms { + return; + } + crate::progress_log::emit( + "phase2_control_loop_slow", + serde_json::json!({ + "duration_ms": duration_ms, + "slow_threshold_ms": threshold_ms, + "repo_poll_timeout_ms": repo_poll_timeout.as_millis() as u64, + "repo_event_count": repo_metrics.event_count, + "repo_completions": repo_metrics.completions, + "repo_ready_enqueued": repo_metrics.ready_enqueued, + "repo_drain_duration_ms": repo_metrics.duration_ms, + "ready_count": ready_batch_metrics.ready_count, + "ready_batch_duration_ms": ready_batch_metrics.total_ms, + "ready_batch_stage_fresh_ms_total": ready_batch_metrics.stage_fresh_ms_total, + "ready_batch_stage_fresh_ms_max": ready_batch_metrics.stage_fresh_ms_max, + "ready_batch_stage_fresh_ms_max_manifest_rsync_uri": ready_batch_metrics.stage_fresh_ms_max_manifest_rsync_uri, + "ready_batch_child_discovery_ms_total": ready_batch_metrics.child_discovery_ms_total, + "ready_batch_child_discovery_ms_max": ready_batch_metrics.child_discovery_ms_max, + "ready_batch_prepare_ms_total": ready_batch_metrics.prepare_ms_total, + "ready_batch_prepare_ms_max": ready_batch_metrics.prepare_ms_max, + "ready_batch_direct_finalize_ms_total": ready_batch_metrics.direct_finalize_ms_total, + "ready_batch_direct_finalize_ms_max": ready_batch_metrics.direct_finalize_ms_max, + "ca_queue_len": ca_queue_len, + "ready_queue_len": ready_queue_len, + "ca_waiting_repo_identities": ca_waiting_repo_identities, + "pending_roa_dispatch_len": pending_roa_dispatch_len, + "inflight_publication_points_len": inflight_publication_points_len, + "pending_finalization_len": pending_finalization_len, + "finalize_inflight": finalize_inflight, + }), + ); +} + fn compact_phase2_finished_result( mut result: PublicationPointRunResult, compact_audit: bool, @@ -518,6 +567,7 @@ pub fn run_tree_parallel_phase2_audit_multi_root( let run_result: Result<(), TreeRunError> = (|| { loop { + let control_loop_started = Instant::now(); drain_finalize_results_with_progress( &finalize_result_rx, &mut finished, @@ -711,6 +761,20 @@ pub fn run_tree_parallel_phase2_audit_multi_root( inflight_publication_points.len(), )?; + emit_control_loop_slow( + elapsed_ms(control_loop_started), + repo_poll_timeout, + &repo_metrics, + &ready_batch_metrics, + ca_queue.len(), + ready_queue.len(), + ca_waiting_repo_by_identity.len(), + pending_roa_dispatch.len(), + inflight_publication_points.len(), + pending_finalization.len(), + finalize_inflight, + ); + if is_complete( &ca_queue, &ready_queue, @@ -933,6 +997,17 @@ fn stage_ready_publication_point( result: compact_phase2_finished_result(result, compact_audit), }); metrics.total_ms = elapsed_ms(publication_point_started); + emit_ready_publication_point_control_slow( + metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), + metrics + .publication_point_rsync_uri + .as_deref() + .unwrap_or_default(), + &repo_outcome, + &metrics, + "publication_point_cache", + false, + ); return metrics; } diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 0c1eb94..a3c4338 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -242,11 +242,15 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { { Ok(Some(projection)) => projection, Ok(None) => { - self.record_publication_point_cache_miss("missing_projection"); + self.finish_publication_point_cache_miss(ca, "missing_projection", lookup_started); return None; } Err(e) => { - self.record_publication_point_cache_miss("projection_load_error"); + self.finish_publication_point_cache_miss( + ca, + "projection_load_error", + lookup_started, + ); crate::progress_log::emit( "publication_point_cache_lookup_error", serde_json::json!({ @@ -261,37 +265,37 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { let current_identity = match self.current_publication_point_cache_identity(ca) { Ok(identity) => identity, Err(reason) => { - self.record_publication_point_cache_miss(reason.as_str()); + self.finish_publication_point_cache_miss(ca, reason.as_str(), lookup_started); return None; } }; if projection.ca_cert_uri != ca.ca_certificate_rsync_uri { - self.record_publication_point_cache_miss("ca_uri_mismatch"); + self.finish_publication_point_cache_miss(ca, "ca_uri_mismatch", lookup_started); return None; } if projection.ca_cert_sha256 != current_identity.ca_cert_sha256 { - self.record_publication_point_cache_miss("ca_hash_mismatch"); + self.finish_publication_point_cache_miss(ca, "ca_hash_mismatch", lookup_started); return None; } if projection.manifest_sha256 != current_identity.manifest_sha256 { - self.record_publication_point_cache_miss("manifest_hash_mismatch"); + self.finish_publication_point_cache_miss(ca, "manifest_hash_mismatch", lookup_started); return None; } if projection.tal_id != ca.tal_id { - self.record_publication_point_cache_miss("tal_mismatch"); + self.finish_publication_point_cache_miss(ca, "tal_mismatch", lookup_started); return None; } if projection.ta_context_digest != current_identity.ta_context_digest { - self.record_publication_point_cache_miss("ta_context_mismatch"); + self.finish_publication_point_cache_miss(ca, "ta_context_mismatch", lookup_started); return None; } if projection.parent_context_digest != current_identity.parent_context_digest { - self.record_publication_point_cache_miss("parent_context_mismatch"); + self.finish_publication_point_cache_miss(ca, "parent_context_mismatch", lookup_started); return None; } if projection.validation_policy_fingerprint != current_identity.policy_fingerprint { - self.record_publication_point_cache_miss("policy_mismatch"); + self.finish_publication_point_cache_miss(ca, "policy_mismatch", lookup_started); return None; } @@ -299,36 +303,47 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { match parse_snapshot_time_value(&projection.instance_effective_not_before) { Ok(value) => value, Err(_) => { - self.record_publication_point_cache_miss("instance_not_before_invalid"); + self.finish_publication_point_cache_miss( + ca, + "instance_not_before_invalid", + lookup_started, + ); return None; } }; let instance_until = match parse_snapshot_time_value(&projection.instance_effective_until) { Ok(value) => value, Err(_) => { - self.record_publication_point_cache_miss("instance_until_invalid"); + self.finish_publication_point_cache_miss( + ca, + "instance_until_invalid", + lookup_started, + ); return None; } }; if self.validation_time < instance_not_before || self.validation_time >= instance_until { - self.record_publication_point_cache_miss("instance_time_gate_miss"); + self.finish_publication_point_cache_miss(ca, "instance_time_gate_miss", lookup_started); return None; } if let Err(reason) = publication_point_cache_projection_items_valid(&projection, self.validation_time) { - self.record_publication_point_cache_miss(reason); + self.finish_publication_point_cache_miss(ca, reason, lookup_started); return None; } if let Some(timing) = self.timing.as_ref() { + let lookup_nanos = lookup_started + .elapsed() + .as_nanos() + .min(u128::from(u64::MAX)) as u64; timing.record_count("publication_point_cache_theoretical_hits", 1); + timing.record_phase_nanos("publication_point_cache_lookup_total", lookup_nanos); + timing.record_phase_nanos("publication_point_cache_lookup_hit_total", lookup_nanos); timing.record_phase_nanos( - "publication_point_cache_lookup_total", - lookup_started - .elapsed() - .as_nanos() - .min(u128::from(u64::MAX)) as u64, + "publication_point_cache_lookup_duration_total", + lookup_nanos, ); } if self.publication_point_cache_observe_only { @@ -364,6 +379,40 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { } } + fn finish_publication_point_cache_miss( + &self, + ca: &CaInstanceHandle, + reason: &str, + lookup_started: std::time::Instant, + ) { + self.record_publication_point_cache_miss(reason); + let lookup_nanos = lookup_started + .elapsed() + .as_nanos() + .min(u128::from(u64::MAX)) as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos("publication_point_cache_lookup_miss_total", lookup_nanos); + timing.record_phase_nanos( + "publication_point_cache_lookup_duration_total", + lookup_nanos, + ); + } + let elapsed_ms = lookup_nanos / 1_000_000; + if elapsed_ms >= crate::progress_log::pp_cache_slow_threshold_ms() { + crate::progress_log::emit( + "publication_point_cache_miss_slow", + serde_json::json!({ + "manifest_rsync_uri": ca.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": ca.publication_point_rsync_uri.as_str(), + "ca_certificate_rsync_uri": ca.ca_certificate_rsync_uri.as_deref(), + "reason": reason, + "elapsed_ms": elapsed_ms, + "slow_threshold_ms": crate::progress_log::pp_cache_slow_threshold_ms(), + }), + ); + } + } + fn record_publication_point_cache_miss(&self, reason: &str) { if let Some(timing) = self.timing.as_ref() { timing.record_count("publication_point_cache_miss_total", 1); @@ -371,21 +420,40 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { "missing_projection" => { timing.record_count("publication_point_cache_miss_missing_projection", 1) } + "projection_load_error" => { + timing.record_count("publication_point_cache_miss_projection_load_error", 1) + } "current_manifest_missing" => { timing.record_count("publication_point_cache_miss_current_manifest_missing", 1) } + "ca_uri_mismatch" => { + timing.record_count("publication_point_cache_miss_ca_uri_mismatch", 1) + } "ca_hash_mismatch" => { timing.record_count("publication_point_cache_miss_ca_hash_mismatch", 1) } "manifest_hash_mismatch" => { timing.record_count("publication_point_cache_miss_manifest_hash_mismatch", 1) } + "tal_mismatch" => { + timing.record_count("publication_point_cache_miss_tal_mismatch", 1) + } + "ta_context_mismatch" => { + timing.record_count("publication_point_cache_miss_ta_context_mismatch", 1) + } "parent_context_mismatch" => { timing.record_count("publication_point_cache_miss_parent_context_mismatch", 1) } "policy_mismatch" => { timing.record_count("publication_point_cache_miss_policy_mismatch", 1) } + "instance_not_before_invalid" => timing.record_count( + "publication_point_cache_miss_instance_not_before_invalid", + 1, + ), + "instance_until_invalid" => { + timing.record_count("publication_point_cache_miss_instance_until_invalid", 1) + } "instance_time_gate_miss" => { timing.record_count("publication_point_cache_miss_instance_time_gate", 1) } @@ -395,6 +463,9 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { "child_time_gate_miss" => { timing.record_count("publication_point_cache_miss_child_time_gate", 1) } + "reuse_build_error" => { + timing.record_count("publication_point_cache_miss_reuse_build_error", 1) + } _ => timing.record_count("publication_point_cache_miss_other", 1), } } @@ -447,13 +518,26 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { repo_sync_err: Option<&str>, warnings: &[Warning], ) -> Result { + let build_started = std::time::Instant::now(); let mut warnings = warnings.to_vec(); let output_reuse_count = projection.outputs.len() as u64; let child_reuse_count = projection.children.len() as u64; + let related_object_reuse_count = projection.related_objects.len() as u64; + let to_vcir_started = std::time::Instant::now(); let mut vcir = projection.to_vcir_for_reuse(self.validation_time); + let to_vcir_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_to_vcir_total", + to_vcir_started, + ); vcir.parent_manifest_rsync_uri = ca.parent_manifest_rsync_uri.clone(); + let build_objects_started = std::time::Instant::now(); let mut objects = build_objects_output_from_vcir(&vcir, self.validation_time, &mut warnings); + let build_objects_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_build_objects_total", + build_objects_started, + ); + let restore_children_started = std::time::Instant::now(); let (discovered_children, child_audits) = restore_children_from_publication_point_cache( self.store, ca, @@ -461,8 +545,18 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { self.validation_time, &mut warnings, ); + let restore_children_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_restore_children_total", + restore_children_started, + ); let ccr_projection = projection.ccr_manifest_projection.clone(); + let ccr_append_started = std::time::Instant::now(); self.append_ccr_manifest_projection(&ccr_projection)?; + let ccr_append_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_ccr_append_total", + ccr_append_started, + ); + let audit_build_started = std::time::Instant::now(); let audit = build_publication_point_audit_from_vcir( ca, PublicationPointSource::PublicationPointCache, @@ -477,11 +571,52 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { &child_audits, &[], ); + let audit_build_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_audit_build_total", + audit_build_started, + ); + let audit_object_count = audit.objects.len() as u64; let cir_cached_objects = audit.objects.clone(); objects.local_outputs_cache.clear(); + let total_ms = self.record_publication_point_cache_phase_ms( + "publication_point_cache_reuse_build_total", + build_started, + ); if let Some(timing) = self.timing.as_ref() { timing.record_count("publication_point_cache_outputs_reused", output_reuse_count); timing.record_count("publication_point_cache_children_reused", child_reuse_count); + timing.record_count( + "publication_point_cache_related_objects_reused", + related_object_reuse_count, + ); + timing.record_count( + "publication_point_cache_audit_objects_reused", + audit_object_count, + ); + } + if total_ms >= crate::progress_log::pp_cache_slow_threshold_ms() { + crate::progress_log::emit( + "publication_point_cache_reuse_slow", + serde_json::json!({ + "manifest_rsync_uri": ca.manifest_rsync_uri.as_str(), + "publication_point_rsync_uri": ca.publication_point_rsync_uri.as_str(), + "repo_sync_source": repo_sync_source, + "repo_sync_phase": repo_sync_phase, + "repo_sync_duration_ms": repo_sync_duration_ms, + "repo_sync_err": repo_sync_err, + "outputs_reused": output_reuse_count, + "children_reused": child_reuse_count, + "related_objects_reused": related_object_reuse_count, + "audit_objects_reused": audit_object_count, + "to_vcir_ms": to_vcir_ms, + "build_objects_ms": build_objects_ms, + "restore_children_ms": restore_children_ms, + "ccr_append_ms": ccr_append_ms, + "audit_build_ms": audit_build_ms, + "total_ms": total_ms, + "slow_threshold_ms": crate::progress_log::pp_cache_slow_threshold_ms(), + }), + ); } Ok(PublicationPointRunResult { source: PublicationPointSource::PublicationPointCache, @@ -495,6 +630,18 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { }) } + fn record_publication_point_cache_phase_ms( + &self, + phase: &'static str, + started: std::time::Instant, + ) -> u64 { + let nanos = started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos(phase, nanos); + } + nanos / 1_000_000 + } + pub(crate) fn ccr_accumulator_snapshot(&self) -> Option { self.ccr_accumulator .as_ref() diff --git a/src/validation/tree_runner/tests.rs b/src/validation/tree_runner/tests.rs index 67ad6c8..88688cf 100644 --- a/src/validation/tree_runner/tests.rs +++ b/src/validation/tree_runner/tests.rs @@ -2008,6 +2008,33 @@ fn runner_publication_point_cache_reuses_projection_outputs_children_and_ccr() { .copied(), Some(1) ); + assert_eq!( + counts + .get("publication_point_cache_related_objects_reused") + .copied(), + Some(vcir.related_artifacts.len() as u64) + ); + assert_eq!( + counts + .get("publication_point_cache_audit_objects_reused") + .copied(), + Some(result.cir_cached_objects.len() as u64) + ); + let timing_dir = tempfile::tempdir().expect("timing dir"); + let timing_path = timing_dir.path().join("timing.json"); + timing.write_json(&timing_path, 20).expect("write timing"); + let timing_json: serde_json::Value = + serde_json::from_slice(&std::fs::read(&timing_path).expect("read timing")) + .expect("parse timing"); + let phase_keys = timing_json["phases"] + .as_object() + .expect("phases") + .keys() + .map(|key| key.as_str()) + .collect::>(); + assert!(phase_keys.contains("publication_point_cache_lookup_hit_total")); + assert!(phase_keys.contains("publication_point_cache_reuse_build_total")); + assert!(phase_keys.contains("publication_point_cache_build_objects_total")); } #[test]