From f1a73bd2d1891ee91b7dc02d937bc15410860975 Mon Sep 17 00:00:00 2001 From: yuyr Date: Fri, 26 Jun 2026 18:52:27 +0800 Subject: [PATCH] =?UTF-8?q?20260626=5F2=20=E4=BC=98=E5=8C=96child=E8=AF=81?= =?UTF-8?q?=E4=B9=A6=E7=BC=93=E5=AD=98=E5=B9=B6=E4=BF=AE=E5=A4=8D=E8=BF=9C?= =?UTF-8?q?=E7=AB=AF=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/soak/publish_remote231_full.sh | 146 +++++++- src/parallel/repo_runtime.rs | 1 + src/validation/tree.rs | 7 + src/validation/tree_parallel.rs | 1 + src/validation/tree_runner.rs | 51 ++- src/validation/tree_runner/tests.rs | 486 ++++++++++++++++++++++++- tests/test_tree_failure_handling.rs | 1 + tests/test_tree_traversal_m14.rs | 1 + 8 files changed, 669 insertions(+), 25 deletions(-) diff --git a/scripts/soak/publish_remote231_full.sh b/scripts/soak/publish_remote231_full.sh index 054389c..a098760 100755 --- a/scripts/soak/publish_remote231_full.sh +++ b/scripts/soak/publish_remote231_full.sh @@ -21,9 +21,12 @@ START_MONITOR_STACK="${START_MONITOR_STACK:-1}" START_RPKI_SOAK="${START_RPKI_SOAK:-1}" RESTART_FIXED_PHASE_LOOP="${RESTART_FIXED_PHASE_LOOP:-1}" START_ROUTINATOR_SYNC="${START_ROUTINATOR_SYNC:-1}" +START_ROUTINATOR_LOCAL_SERVICES="${START_ROUTINATOR_LOCAL_SERVICES:-1}" VERIFY_INTER_RP_DASHBOARD="${VERIFY_INTER_RP_DASHBOARD:-1}" WAIT_FIRST_RUN="${WAIT_FIRST_RUN:-1}" FIRST_RUN_TIMEOUT_SECS="${FIRST_RUN_TIMEOUT_SECS:-7200}" +SNAPSHOT_STOP_SIDE_SERVICES="${SNAPSHOT_STOP_SIDE_SERVICES:-1}" +ROUTINATOR_READY_TIMEOUT_SECS="${ROUTINATOR_READY_TIMEOUT_SECS:-300}" usage() { cat <<'USAGE' @@ -37,6 +40,7 @@ ours RP soak loop, and starts/verifies sidecars: - rpki_artifact_metrics (:9556) - rpki_query_service (:9560, optional) - ours-rp vs Routinator inter-RP exporter (:9557) + - local Routinator server/loop used by the inter-RP dashboard - local Routinator artifact sync helper - Prometheus/Grafana monitor stack @@ -54,8 +58,11 @@ Environment overrides: RESTART_QUERY_SERVICE=1 START_QUERY_SERVICE=1 START_INTER_RP=1 + START_ROUTINATOR_LOCAL_SERVICES=1 + SNAPSHOT_STOP_SIDE_SERVICES=1 START_MONITOR_STACK=1 WAIT_FIRST_RUN=1 + ROUTINATOR_READY_TIMEOUT_SECS=300 USAGE } @@ -105,6 +112,12 @@ while [[ $# -gt 0 ]]; do --no-inter-rp) START_INTER_RP=0 ;; + --no-routinator-local-services) + START_ROUTINATOR_LOCAL_SERVICES=0 + ;; + --no-snapshot-stop-side-services) + SNAPSHOT_STOP_SIDE_SERVICES=0 + ;; --no-wait-first-run) WAIT_FIRST_RUN=0 ;; @@ -182,10 +195,14 @@ if [[ -n "$(git -C "$REPO_ROOT" status --short 2>/dev/null || true)" ]]; then fi if [[ "$PUBLISH_MODE" == "snapshot" ]]; then - ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE'" <<'REMOTE' + ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE' '$SNAPSHOT_STOP_SIDE_SERVICES'" <<'REMOTE' set -euo pipefail remote_root="$1" mode="$2" +snapshot_stop_side_services="$3" +is_true() { + case "${1:-0}" in 1|true|TRUE|yes|YES|on|ON) return 0 ;; *) return 1 ;; esac +} matching_pids() { local pattern="$1" pgrep -af "$pattern" 2>/dev/null | while IFS= read -r line; do @@ -197,14 +214,58 @@ matching_pids() { printf '%s\n' "$pid" done | sort -u } -if [[ "$mode" == "execute" ]]; then - mapfile -t pids < <(matching_pids "fixed_phase_loop.sh --name ours-rp") +terminate_matching() { + local signal="$1" + local pattern="$2" + local label="$3" + local -a pids=() + mapfile -t pids < <(matching_pids "$pattern") if (( ${#pids[@]} > 0 )); then - kill -TERM "${pids[@]}" >/dev/null 2>&1 || true - sleep 2 + if [[ "$mode" == "execute" ]]; then + printf '[snapshot-prestop] %s %s pids=%s\n' "$signal" "$label" "${pids[*]}" + kill "$signal" "${pids[@]}" >/dev/null 2>&1 || true + else + printf '[dry-run] stop %s pids=%s pattern=%s\n' "$label" "${pids[*]}" "$pattern" + fi + else + printf '[snapshot-prestop] no %s processes\n' "$label" + fi +} +if [[ "$mode" == "execute" ]]; then + terminate_matching -TERM "fixed_phase_loop.sh --name ours-rp" "ours fixed phase loop" + if is_true "$snapshot_stop_side_services"; then + terminate_matching -TERM "$remote_root/bin/rpki_query_service" "query service" + terminate_matching -TERM "$remote_root/bin/rpki_artifact_metrics" "artifact metrics" + terminate_matching -TERM "inter_rp_ours_routinator_exporter.py" "inter-rp exporter" + terminate_matching -TERM "rpki_inter_rp_metrics" "inter-rp exporter" + terminate_matching -TERM "sync_local_routinator_peer.sh" "local routinator sync" + terminate_matching -TERM "fixed_phase_loop.sh --name routinator" "routinator fixed phase loop" + terminate_matching -TERM "/root/inter-rp-runners/scripts/run_single_rp_with_rss.sh --rp routinator" "routinator one-shot wrapper" + terminate_matching -TERM "/root/inter-rp-runners/bin/routinator" "routinator process" + fi + sleep 3 + if is_true "$snapshot_stop_side_services"; then + terminate_matching -KILL "$remote_root/bin/rpki_query_service" "query service" + terminate_matching -KILL "$remote_root/bin/rpki_artifact_metrics" "artifact metrics" + terminate_matching -KILL "inter_rp_ours_routinator_exporter.py" "inter-rp exporter" + terminate_matching -KILL "rpki_inter_rp_metrics" "inter-rp exporter" + terminate_matching -KILL "sync_local_routinator_peer.sh" "local routinator sync" + terminate_matching -KILL "fixed_phase_loop.sh --name routinator" "routinator fixed phase loop" + terminate_matching -KILL "/root/inter-rp-runners/scripts/run_single_rp_with_rss.sh --rp routinator" "routinator one-shot wrapper" + terminate_matching -KILL "/root/inter-rp-runners/bin/routinator" "routinator process" fi else echo "[dry-run] stop existing fixed_phase_loop.sh --name ours-rp before snapshot publish" + if is_true "$snapshot_stop_side_services"; then + terminate_matching -TERM "$remote_root/bin/rpki_query_service" "query service" + terminate_matching -TERM "$remote_root/bin/rpki_artifact_metrics" "artifact metrics" + terminate_matching -TERM "inter_rp_ours_routinator_exporter.py" "inter-rp exporter" + terminate_matching -TERM "rpki_inter_rp_metrics" "inter-rp exporter" + terminate_matching -TERM "sync_local_routinator_peer.sh" "local routinator sync" + terminate_matching -TERM "fixed_phase_loop.sh --name routinator" "routinator fixed phase loop" + terminate_matching -TERM "/root/inter-rp-runners/scripts/run_single_rp_with_rss.sh --rp routinator" "routinator one-shot wrapper" + terminate_matching -TERM "/root/inter-rp-runners/bin/routinator" "routinator process" + fi fi REMOTE publish_args=( @@ -347,7 +408,7 @@ fi REMOTE fi -ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE' '$START_RPKI_SOAK' '$START_ARTIFACT_METRICS' '$START_QUERY_SERVICE' '$START_INTER_RP' '$START_ROUTINATOR_SYNC' '$START_MONITOR_STACK' '$VERIFY_INTER_RP_DASHBOARD' '$WAIT_FIRST_RUN' '$FIRST_RUN_TIMEOUT_SECS' '$RESTART_FIXED_PHASE_LOOP'" <<'REMOTE' +ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE' '$START_RPKI_SOAK' '$START_ARTIFACT_METRICS' '$START_QUERY_SERVICE' '$START_INTER_RP' '$START_ROUTINATOR_SYNC' '$START_ROUTINATOR_LOCAL_SERVICES' '$START_MONITOR_STACK' '$VERIFY_INTER_RP_DASHBOARD' '$WAIT_FIRST_RUN' '$FIRST_RUN_TIMEOUT_SECS' '$RESTART_FIXED_PHASE_LOOP' '$ROUTINATOR_READY_TIMEOUT_SECS'" <<'REMOTE' set -euo pipefail remote_root="$1" mode="$2" @@ -356,11 +417,13 @@ start_artifact_metrics="$4" start_query_service="$5" start_inter_rp="$6" start_routinator_sync="$7" -start_monitor_stack="$8" -verify_inter_rp_dashboard="$9" -wait_first_run="${10}" -first_run_timeout_secs="${11}" -restart_fixed_phase_loop="${12}" +start_routinator_local_services="$8" +start_monitor_stack="$9" +verify_inter_rp_dashboard="${10}" +wait_first_run="${11}" +first_run_timeout_secs="${12}" +restart_fixed_phase_loop="${13}" +routinator_ready_timeout_secs="${14}" log() { printf '[remote231-full] %s\n' "$*"; } is_true() { @@ -460,6 +523,62 @@ ensure_inter_rp() { log "would start inter-rp ours+routinator exporter on :9557" fi } +ensure_routinator_local_services() { + is_true "$start_routinator_local_services" || return 0 + local routinator_bin="/root/inter-rp-runners/bin/routinator" + local routinator_root="/var/lib/inter-rp-runners" + if [[ -x "$routinator_bin" ]]; then + if pgrep -af "$routinator_bin .* server .*127.0.0.1:9558" >/dev/null 2>&1; then + log "local routinator server already running" + elif [[ "$mode" == "execute" ]]; then + mkdir -p "$routinator_root/routinator-server/repository" + nohup "$routinator_bin" \ + --repository-dir "$routinator_root/routinator-server/repository" \ + --no-rir-tals \ + --extra-tals-dir "$routinator_root/fixtures/tal" \ + --enable-aspa \ + server \ + --http 127.0.0.1:9558 \ + --rtr 127.0.0.1:0 \ + --refresh 86400 \ + >"$remote_root/logs/routinator-server.full-publish.log" 2>&1 & + log "started local routinator server on 127.0.0.1:9558" + else + log "would start local routinator server on 127.0.0.1:9558" + fi + else + log "warning: missing $routinator_bin; cannot start local routinator server" + fi + + if pgrep -af "fixed_phase_loop.sh --name routinator" >/dev/null 2>&1; then + log "routinator fixed phase loop already running" + return 0 + fi + if [[ -x "$remote_root/scripts/soak/fixed_phase_loop.sh" \ + && -x /root/inter-rp-runners/scripts/run_single_rp_with_rss.sh \ + && -x /root/inter-rp-runners/scripts/run_routinator_once.sh ]]; then + if [[ "$mode" == "execute" ]]; then + nohup bash "$remote_root/scripts/soak/fixed_phase_loop.sh" \ + --name routinator \ + --cycle-secs 900 \ + --offset-secs 450 \ + --lock-file /var/lock/rpki-heavy-run.lock \ + --lock-wait-secs 60 \ + -- /root/inter-rp-runners/scripts/run_single_rp_with_rss.sh \ + --rp routinator \ + --root /var/lib/inter-rp-runners/routinator \ + --command /root/inter-rp-runners/scripts/run_routinator_once.sh \ + --retain-runs 30 \ + --sample-ms 500 \ + >"$remote_root/logs/fixed-phase-routinator.full-publish.log" 2>&1 & + log "started routinator fixed phase loop" + else + log "would start routinator fixed phase loop" + fi + else + log "warning: missing routinator loop scripts; cannot start routinator fixed phase loop" + fi +} ensure_routinator_sync() { is_true "$start_routinator_sync" || return 0 if pgrep -af "sync_local_routinator_peer.sh" >/dev/null 2>&1; then @@ -515,7 +634,8 @@ ensure_fixed_phase_soak() { wait_url() { local name="$1" local url="$2" - local deadline=$((SECONDS + 60)) + local timeout_secs="${3:-60}" + local deadline=$((SECONDS + timeout_secs)) while (( SECONDS < deadline )); do if curl -fsS --max-time 5 "$url" >/dev/null 2>&1; then log "$name up" @@ -603,6 +723,7 @@ PY ensure_artifact_metrics ensure_query_service ensure_inter_rp +ensure_routinator_local_services ensure_routinator_sync ensure_monitor_stack ensure_fixed_phase_soak @@ -611,6 +732,7 @@ if [[ "$mode" == "execute" ]]; then sleep 5 wait_url "artifact metrics" http://127.0.0.1:9556/metrics if is_true "$start_query_service"; then wait_url "query service" http://127.0.0.1:9560/api/v1; fi + if is_true "$start_routinator_local_services"; then wait_url "routinator server" http://127.0.0.1:9558/metrics "$routinator_ready_timeout_secs"; fi if is_true "$start_inter_rp"; then wait_url "inter-rp metrics" http://127.0.0.1:9557/metrics; fi if is_true "$start_monitor_stack"; then wait_url "prometheus" http://127.0.0.1:9090/-/ready; wait_url "grafana" http://127.0.0.1:3000/api/health; fi if is_true "$start_inter_rp"; then diff --git a/src/parallel/repo_runtime.rs b/src/parallel/repo_runtime.rs index fef41b0..f1a61ca 100644 --- a/src/parallel/repo_runtime.rs +++ b/src/parallel/repo_runtime.rs @@ -1464,6 +1464,7 @@ mod tests { child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer".to_string(), child_ca_certificate_sha256_hex: "00".repeat(32), }, + child_entry_projection: None, }; runtime diff --git a/src/validation/tree.rs b/src/validation/tree.rs index bcefa94..6bff590 100644 --- a/src/validation/tree.rs +++ b/src/validation/tree.rs @@ -196,10 +196,16 @@ pub struct PublicationPointRunResult { pub discovered_children: Vec, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DiscoveredChildEntryProjection { + pub child_ski: String, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct DiscoveredChildCaInstance { pub handle: CaInstanceHandle, pub discovered_from: DiscoveredFrom, + pub child_entry_projection: Option, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -495,6 +501,7 @@ mod tests { .to_string(), child_ca_certificate_sha256_hex: "00".repeat(32), }, + child_entry_projection: None, }] } else { Vec::new() diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index 0458e54..de036a9 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -2444,6 +2444,7 @@ mod tests { .to_string(), child_ca_certificate_sha256_hex: "55".repeat(32), }, + child_entry_projection: None, }); let output = FreshPublicationPointFinalizeOutput { result, diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index afb210f..997910e 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -57,8 +57,8 @@ use crate::validation::objects::{ }; use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::tree::{ - CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, - PublicationPointRunner, + CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance, DiscoveredChildEntryProjection, + PublicationPointRunResult, PublicationPointRunner, }; use sha2::Digest; use std::collections::{HashMap, HashSet}; @@ -2689,6 +2689,7 @@ fn discover_children_from_fresh_snapshot_with_audit_cached_with_issuer_der< match &projection.payload { ChildCertificateCachePayload::ChildCa { child_manifest_rsync_uri, + child_ski, child_rsync_base_uri, child_publication_point_rsync_uri, child_rrdp_notification_uri, @@ -2735,6 +2736,11 @@ fn discover_children_from_fresh_snapshot_with_audit_cached_with_issuer_der< child_ca_certificate_sha256_hex: child_cert_sha256_hex.clone(), }, + child_entry_projection: Some( + DiscoveredChildEntryProjection { + child_ski: child_ski.clone(), + }, + ), }); ca_ok = ca_ok.saturating_add(1); child_cert_cache_hit = @@ -3145,6 +3151,15 @@ fn discover_children_from_fresh_snapshot_with_audit_cached_with_issuer_der< child_ca_certificate_rsync_uri: f.rsync_uri.clone(), child_ca_certificate_sha256_hex: child_cert_sha256_hex.clone(), }, + child_entry_projection: validated + .child_ca + .tbs + .extensions + .subject_key_identifier + .as_ref() + .map(|child_ski| DiscoveredChildEntryProjection { + child_ski: hex::encode(child_ski), + }), }); enqueue_nanos = enqueue_nanos.saturating_add(t3.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64); @@ -4857,6 +4872,9 @@ fn restore_children_from_vcir( child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(), child_ca_certificate_sha256_hex: child.child_cert_hash.clone(), }, + child_entry_projection: Some(DiscoveredChildEntryProjection { + child_ski: child.child_ski.clone(), + }), }); audits.push(ObjectAuditEntry { rsync_uri: child.child_cert_rsync_uri.clone(), @@ -5073,6 +5091,9 @@ fn publication_point_cache_discovered_child( child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(), child_ca_certificate_sha256_hex: child.child_cert_hash.clone(), }, + child_entry_projection: Some(DiscoveredChildEntryProjection { + child_ski: child.child_ski.clone(), + }), } } @@ -5713,15 +5734,21 @@ fn build_vcir_child_entries( ) -> Result, String> { let mut out = Vec::with_capacity(discovered_children.len()); for child in discovered_children { - let child_der = child.handle.ca_certificate_der(store)?; - let child_cert = ResourceCertificate::decode_der(child_der.as_ref()) - .map_err(|e| format!("decode child certificate for VCIR failed: {e}"))?; - let child_ski = child_cert - .tbs - .extensions - .subject_key_identifier - .as_ref() - .ok_or_else(|| "child certificate missing SubjectKeyIdentifier".to_string())?; + let child_ski = match child.child_entry_projection.as_ref() { + Some(projection) => projection.child_ski.clone(), + None => { + let child_der = child.handle.ca_certificate_der(store)?; + let child_cert = ResourceCertificate::decode_der(child_der.as_ref()) + .map_err(|e| format!("decode child certificate for VCIR failed: {e}"))?; + let child_ski = child_cert + .tbs + .extensions + .subject_key_identifier + .as_ref() + .ok_or_else(|| "child certificate missing SubjectKeyIdentifier".to_string())?; + hex::encode(child_ski) + } + }; out.push(VcirChildEntry { child_manifest_rsync_uri: child.handle.manifest_rsync_uri.clone(), child_cert_rsync_uri: child.discovered_from.child_ca_certificate_rsync_uri.clone(), @@ -5729,7 +5756,7 @@ fn build_vcir_child_entries( .discovered_from .child_ca_certificate_sha256_hex .clone(), - child_ski: hex::encode(child_ski), + child_ski, child_rsync_base_uri: child.handle.rsync_base_uri.clone(), child_publication_point_rsync_uri: child.handle.publication_point_rsync_uri.clone(), child_rrdp_notification_uri: child.handle.rrdp_notification_uri.clone(), diff --git a/src/validation/tree_runner/tests.rs b/src/validation/tree_runner/tests.rs index c426240..5124531 100644 --- a/src/validation/tree_runner/tests.rs +++ b/src/validation/tree_runner/tests.rs @@ -13,7 +13,7 @@ use crate::storage::{ }; use crate::sync::rrdp::Fetcher; use crate::validation::publication_point::PublicationPointSnapshot; -use crate::validation::tree::PublicationPointRunner; +use crate::validation::tree::{DiscoveredChildEntryProjection, PublicationPointRunner}; use std::process::Command; use std::sync::Arc; @@ -1210,6 +1210,45 @@ fn build_vcir_ccr_manifest_projection_from_fresh_real_snapshot_matches_manifest_ assert_eq!(projection.subordinate_skis, expected_subordinate_skis); } +#[test] +fn build_vcir_child_entries_uses_projection_without_repo_bytes() { + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let validation_time = time::OffsetDateTime::parse( + "2026-06-26T00:00:00Z", + &time::format_description::well_known::Rfc3339, + ) + .expect("parse time"); + let child = DiscoveredChildCaInstance { + handle: CaInstanceHandle { + depth: 1, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: Some("rsync://example.test/repo/root.mft".to_string()), + ca_certificate: CaCertificateRef::repo_bytes("aa".repeat(32)), + ca_certificate_rsync_uri: Some("rsync://example.test/repo/child.cer".to_string()), + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: "rsync://example.test/repo/child/".to_string(), + manifest_rsync_uri: "rsync://example.test/repo/child/child.mft".to_string(), + publication_point_rsync_uri: "rsync://example.test/repo/child/".to_string(), + rrdp_notification_uri: Some("https://example.test/notify.xml".to_string()), + }, + discovered_from: crate::audit::DiscoveredFrom { + parent_manifest_rsync_uri: "rsync://example.test/repo/root.mft".to_string(), + child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer".to_string(), + child_ca_certificate_sha256_hex: "aa".repeat(32), + }, + child_entry_projection: Some(DiscoveredChildEntryProjection { + child_ski: "11".repeat(20), + }), + }; + + let entries = build_vcir_child_entries(&store, &[child], validation_time) + .expect("projection should avoid repo-bytes load"); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].child_ski, "11".repeat(20)); +} + #[test] fn build_vcir_related_artifacts_classifies_snapshot_files_and_audit_statuses() { let manifest_bytes = std::fs::read( @@ -5036,3 +5075,448 @@ fn runner_dedup_paths_execute_with_timing_enabled() { "rsync://example.test/repo/" ); } + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct RipeRootFinalizeFixtureEvent { + event_type: String, + validation_time: String, + pp_manifest_uri: Option, + object_uri: Option, + sha256: Option, + object_type: Option, + result: Option, + reason: Option, +} + +#[derive(Debug)] +struct RipeRootFinalizeObject { + uri: String, + sha256_hex: String, + object_type: String, + result: String, + reason: Option, +} + +struct RipeRootFinalizeFixture { + manifest_uri: String, + publication_point_uri: String, + validation_time: time::OffsetDateTime, + manifest_sha256_hex: String, + objects: Vec, +} + +fn load_ripe_root_finalize_fixture( + fixture_root: &std::path::Path, + run_id: &str, +) -> RipeRootFinalizeFixture { + let events_path = fixture_root + .join(format!("run_{run_id}")) + .join("ripe-root-events.jsonl"); + let file = std::fs::File::open(&events_path) + .unwrap_or_else(|e| panic!("open fixture events {} failed: {e}", events_path.display())); + let reader = std::io::BufReader::new(file); + let mut manifest_uri = None; + let mut validation_time = None; + let mut manifest_sha256_hex = None; + let mut objects = Vec::new(); + + for line in std::io::BufRead::lines(reader) { + let line = line.expect("read fixture event line"); + let event: RipeRootFinalizeFixtureEvent = + serde_json::from_str(&line).expect("decode fixture event"); + if event.event_type == "publication_point" { + manifest_uri = event.pp_manifest_uri; + validation_time = Some( + time::OffsetDateTime::parse( + &event.validation_time, + &time::format_description::well_known::Rfc3339, + ) + .expect("parse validation_time"), + ); + continue; + } + if event.event_type != "object" { + continue; + } + let uri = event.object_uri.expect("fixture object uri"); + let sha256_hex = event.sha256.expect("fixture object sha256"); + let object_type = event.object_type.expect("fixture object type"); + let result = event.result.expect("fixture object result"); + if object_type == "manifest" { + manifest_sha256_hex = Some(sha256_hex.clone()); + } + objects.push(RipeRootFinalizeObject { + uri, + sha256_hex, + object_type, + result, + reason: event.reason, + }); + } + + let manifest_uri = manifest_uri.expect("fixture publication_point event"); + let publication_point_uri = manifest_uri + .rsplit_once('/') + .map(|(parent, _)| format!("{parent}/")) + .expect("manifest uri parent"); + RipeRootFinalizeFixture { + manifest_uri, + publication_point_uri, + validation_time: validation_time.expect("fixture validation_time"), + manifest_sha256_hex: manifest_sha256_hex.expect("fixture manifest object"), + objects, + } +} + +fn pack_file_from_fixture_object( + object: &RipeRootFinalizeObject, + repo_bytes: &Arc, +) -> PackFile { + PackFile::from_lazy_repo_bytes( + object.uri.clone(), + object.sha256_hex.clone(), + sha256_hex_to_32(&object.sha256_hex), + repo_bytes.clone(), + ) +} + +fn child_audit_from_fixture_object(object: &RipeRootFinalizeObject) -> ObjectAuditEntry { + ObjectAuditEntry { + rsync_uri: object.uri.clone(), + sha256_hex: object.sha256_hex.clone(), + kind: AuditObjectKind::Certificate, + result: match object.result.as_str() { + "ok" => AuditObjectResult::Ok, + "skipped" => AuditObjectResult::Skipped, + _ => AuditObjectResult::Error, + }, + detail: object.reason.clone(), + } +} + +fn discovered_child_from_fixture_object( + issuer: &CaInstanceHandle, + object: &RipeRootFinalizeObject, + child_entry_projection: Option, +) -> DiscoveredChildCaInstance { + let stem = object + .uri + .rsplit_once('/') + .map(|(_, file)| file.trim_end_matches(".cer")) + .unwrap_or("child"); + let child_publication_point = format!("{}synthetic-child-{stem}/", issuer.rsync_base_uri); + let child_manifest = format!("{child_publication_point}child.mft"); + DiscoveredChildCaInstance { + handle: CaInstanceHandle { + depth: issuer.depth + 1, + tal_id: issuer.tal_id.clone(), + parent_manifest_rsync_uri: Some(issuer.manifest_rsync_uri.clone()), + ca_certificate: CaCertificateRef::repo_bytes(object.sha256_hex.clone()), + ca_certificate_rsync_uri: Some(object.uri.clone()), + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: child_publication_point.clone(), + manifest_rsync_uri: child_manifest, + publication_point_rsync_uri: child_publication_point, + rrdp_notification_uri: issuer.rrdp_notification_uri.clone(), + }, + discovered_from: crate::audit::DiscoveredFrom { + parent_manifest_rsync_uri: issuer.manifest_rsync_uri.clone(), + child_ca_certificate_rsync_uri: object.uri.clone(), + child_ca_certificate_sha256_hex: object.sha256_hex.clone(), + }, + child_entry_projection, + } +} + +fn child_entry_projection_from_fixture_object( + store: &RocksStore, + object: &RipeRootFinalizeObject, +) -> DiscoveredChildEntryProjection { + let child_der = store + .get_blob_bytes(&object.sha256_hex) + .expect("load child certificate bytes for projection") + .expect("child certificate bytes exist for projection"); + let child_cert = + ResourceCertificate::decode_der(&child_der).expect("decode child certificate projection"); + let child_ski = child_cert + .tbs + .extensions + .subject_key_identifier + .as_ref() + .expect("child certificate projection SKI"); + DiscoveredChildEntryProjection { + child_ski: hex::encode(child_ski), + } +} + +struct RipeRootChildEntryProfile { + count: usize, + load_der_nanos: u128, + decode_cert_nanos: u128, + build_entry_nanos: u128, +} + +fn profile_ripe_root_child_entry_build( + store: &RocksStore, + discovered_children: &[DiscoveredChildCaInstance], + validation_time: time::OffsetDateTime, +) -> Result { + let mut out = Vec::with_capacity(discovered_children.len()); + let mut load_der_nanos = 0; + let mut decode_cert_nanos = 0; + let mut build_entry_nanos = 0; + for child in discovered_children { + let load_started = std::time::Instant::now(); + let child_der = child.handle.ca_certificate_der(store)?; + load_der_nanos += load_started.elapsed().as_nanos(); + + let decode_started = std::time::Instant::now(); + let child_cert = ResourceCertificate::decode_der(child_der.as_ref()) + .map_err(|e| format!("decode child certificate for VCIR failed: {e}"))?; + decode_cert_nanos += decode_started.elapsed().as_nanos(); + + let build_started = std::time::Instant::now(); + let child_ski = child_cert + .tbs + .extensions + .subject_key_identifier + .as_ref() + .ok_or_else(|| "child certificate missing SubjectKeyIdentifier".to_string())?; + out.push(VcirChildEntry { + child_manifest_rsync_uri: child.handle.manifest_rsync_uri.clone(), + child_cert_rsync_uri: child.discovered_from.child_ca_certificate_rsync_uri.clone(), + child_cert_hash: child + .discovered_from + .child_ca_certificate_sha256_hex + .clone(), + child_ski: hex::encode(child_ski), + child_rsync_base_uri: child.handle.rsync_base_uri.clone(), + child_publication_point_rsync_uri: child.handle.publication_point_rsync_uri.clone(), + child_rrdp_notification_uri: child.handle.rrdp_notification_uri.clone(), + child_effective_ip_resources: child.handle.effective_ip_resources.clone(), + child_effective_as_resources: child.handle.effective_as_resources.clone(), + accepted_at_validation_time: PackTime::from_utc_offset_datetime(validation_time), + }); + build_entry_nanos += build_started.elapsed().as_nanos(); + } + Ok(RipeRootChildEntryProfile { + count: out.len(), + load_der_nanos, + decode_cert_nanos, + build_entry_nanos, + }) +} + +#[test] +#[ignore = "manual performance repro: requires target/ripe-root-finalize-repro repo-bytes fixture"] +fn ripe_root_finalize_repro_from_remote_fixture() { + let fixture_root = std::env::var("RPKI_RIPE_ROOT_FINALIZE_FIXTURE") + .map(std::path::PathBuf::from) + .unwrap_or_else(|_| { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("target/ripe-root-finalize-repro") + }); + let run_id = std::env::var("RPKI_RIPE_ROOT_FINALIZE_RUN").unwrap_or_else(|_| "0262".into()); + assert!( + fixture_root.exists(), + "fixture root missing: {}; expected copied remote fixture", + fixture_root.display() + ); + + let fixture = load_ripe_root_finalize_fixture(&fixture_root, &run_id); + let repo_bytes_db = fixture_root.join("db/repo-bytes.db"); + assert!( + repo_bytes_db.exists(), + "repo-bytes fixture missing: {}", + repo_bytes_db.display() + ); + + let store_dir = tempfile::tempdir().expect("store dir"); + let work_db = store_dir.path().join("work-db"); + let store = RocksStore::open_with_external_repo_bytes(&work_db, &repo_bytes_db) + .expect("open work-db with external repo-bytes"); + let repo_bytes = Arc::new( + store + .external_repo_bytes_ref() + .expect("external repo bytes") + .clone(), + ); + + let manifest_bytes = store + .get_blob_bytes(&fixture.manifest_sha256_hex) + .expect("load fixture manifest bytes") + .expect("fixture manifest bytes exist"); + let manifest = ManifestObject::decode_der(&manifest_bytes).expect("decode fixture manifest"); + let mut files = Vec::with_capacity(fixture.objects.len().saturating_sub(1)); + let mut child_audits = Vec::new(); + let current_ca_hash = "007ad0c291b01ede4bd60e1204074ce3f7192186a022c9577cef5d8e91d5171a"; + let current_ca_uri = "rsync://rpki.ripe.net/repository/aca/KpSo3VVK5wEHIJnHC2QHVV3d5mk.cer"; + let parent_manifest_uri = + "rsync://rpki.ripe.net/repository/aca/7DNNDzoYvgAht7joQih2Qayxcxo.mft"; + let ca = CaInstanceHandle { + depth: 1, + tal_id: "ripe-ncc".to_string(), + parent_manifest_rsync_uri: Some(parent_manifest_uri.to_string()), + ca_certificate: CaCertificateRef::repo_bytes(current_ca_hash.to_string()), + ca_certificate_rsync_uri: Some(current_ca_uri.to_string()), + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: fixture.publication_point_uri.clone(), + manifest_rsync_uri: fixture.manifest_uri.clone(), + publication_point_rsync_uri: fixture.publication_point_uri.clone(), + rrdp_notification_uri: Some("https://rrdp.ripe.net/notification.xml".to_string()), + }; + let mut discovered_children = Vec::new(); + let use_child_projection = std::env::var("RPKI_RIPE_ROOT_FINALIZE_USE_CHILD_PROJECTION") + .map(|value| value != "0" && value.to_ascii_lowercase() != "false") + .unwrap_or(false); + for object in &fixture.objects { + if object.uri == fixture.manifest_uri { + continue; + } + files.push(pack_file_from_fixture_object(object, &repo_bytes)); + if object.object_type == "certificate" { + child_audits.push(child_audit_from_fixture_object(object)); + if object.result == "ok" { + let child_entry_projection = use_child_projection + .then(|| child_entry_projection_from_fixture_object(&store, object)); + discovered_children.push(discovered_child_from_fixture_object( + &ca, + object, + child_entry_projection, + )); + } + } + } + if std::env::var("RPKI_RIPE_ROOT_FINALIZE_PROFILE_CHILD").is_ok() { + let profile_children = fixture + .objects + .iter() + .filter(|object| object.object_type == "certificate" && object.result == "ok") + .map(|object| discovered_child_from_fixture_object(&ca, object, None)) + .collect::>(); + let profile_started = std::time::Instant::now(); + let profile = + profile_ripe_root_child_entry_build(&store, &profile_children, fixture.validation_time) + .expect("profile child entry build"); + eprintln!( + "ripe root child-entry profile: run={} count={} total_ms={} load_der_ms={:.3} decode_cert_ms={:.3} build_entry_ms={:.3}", + run_id, + profile.count, + profile_started.elapsed().as_millis(), + profile.load_der_nanos as f64 / 1_000_000.0, + profile.decode_cert_nanos as f64 / 1_000_000.0, + profile.build_entry_nanos as f64 / 1_000_000.0, + ); + if std::env::var("RPKI_RIPE_ROOT_FINALIZE_ONLY_CHILD_PROFILE").is_ok() { + assert!(profile.count > 22_000); + return; + } + } + + let fresh_point = FreshValidatedPublicationPoint { + manifest_rsync_uri: fixture.manifest_uri.clone(), + publication_point_rsync_uri: fixture.publication_point_uri.clone(), + manifest_number_be: manifest.manifest.manifest_number.bytes_be.clone(), + this_update: PackTime::from_utc_offset_datetime(manifest.manifest.this_update), + next_update: PackTime::from_utc_offset_datetime(manifest.manifest.next_update), + verified_at: PackTime::from_utc_offset_datetime(fixture.validation_time), + manifest_bytes, + files, + }; + let policy = Policy::default(); + let enable_ccr_accumulator = std::env::var("RPKI_RIPE_ROOT_FINALIZE_CCR") + .map(|value| value != "0" && value.to_ascii_lowercase() != "false") + .unwrap_or(true); + let runner = Rpkiv1PublicationPointRunner { + store: &store, + policy: &policy, + http_fetcher: &NeverHttpFetcher, + rsync_fetcher: &FailingRsyncFetcher, + validation_time: fixture.validation_time, + timing: None, + download_log: None, + replay_archive_index: None, + replay_delta_index: None, + rrdp_dedup: false, + rrdp_repo_cache: Mutex::new(HashMap::new()), + rsync_dedup: false, + rsync_repo_cache: Mutex::new(HashMap::new()), + current_repo_index: None, + repo_sync_runtime: None, + parallel_phase2_config: None, + parallel_roa_worker_pool: None, + ccr_accumulator: enable_ccr_accumulator + .then(|| Mutex::new(CcrAccumulator::new(Vec::new()))), + persist_vcir: true, + enable_roa_validation_cache: false, + enable_child_certificate_validation_cache: true, + publication_point_cache_observe_only: false, + enable_publication_point_validation_cache: true, + }; + + eprintln!( + "ripe root finalize repro setup: run={} ccr_accumulator={} child_projection={} objects={} files={} child_audits={} discovered_children={}", + run_id, + enable_ccr_accumulator, + use_child_projection, + fixture.objects.len(), + fresh_point.files.len(), + child_audits.len(), + discovered_children.len() + ); + let started = std::time::Instant::now(); + let output = runner + .finalize_fresh_publication_point_from_reducer( + &ca, + &fresh_point, + Vec::new(), + empty_objects_output(), + child_audits, + discovered_children, + Some("rrdp"), + Some("rrdp_ok"), + 0, + None, + ) + .expect("finalize fixture publication point"); + let finalize_ms = started.elapsed().as_millis(); + eprintln!( + "ripe root finalize repro timing: run={} finalize_ms={} snapshot_pack_ms={} persist_vcir_ms={} build_vcir_ms={} child_entries_ms={} related_artifacts_ms={} replace_vcir_ms={} replace_vcir_encode_ms={} replace_vcir_write_batch_ms={} ccr_projection_build_ms={} audit_build_ms={}", + run_id, + finalize_ms, + output.snapshot_pack_ms, + output.persist_vcir_ms, + output.persist_vcir_timing.build_vcir_ms, + output.persist_vcir_timing.build_vcir.child_entries_ms, + output.persist_vcir_timing.build_vcir.related_artifacts_ms, + output.persist_vcir_timing.replace_vcir_ms, + output.persist_vcir_timing.replace_vcir.vcir_encode_ms, + output.persist_vcir_timing.replace_vcir.write_batch_ms, + output.ccr_projection_build_ms, + output.audit_build_ms, + ); + eprintln!( + "ripe root finalize repro result: audit_objects={} discovered_children={} ccr_manifest_count={}", + output.result.audit.objects.len(), + output.result.discovered_children.len(), + runner + .ccr_accumulator_snapshot() + .map(|snapshot| snapshot.manifest_count()) + .unwrap_or(0), + ); + + assert!(fresh_point.files.len() > 22_000); + assert!(output.result.discovered_children.len() > 22_000); + if enable_ccr_accumulator { + assert_eq!( + runner + .ccr_accumulator_snapshot() + .expect("ccr snapshot") + .manifest_count(), + 1 + ); + } +} diff --git a/tests/test_tree_failure_handling.rs b/tests/test_tree_failure_handling.rs index 2765701..f5eb4bb 100644 --- a/tests/test_tree_failure_handling.rs +++ b/tests/test_tree_failure_handling.rs @@ -63,6 +63,7 @@ fn discovered_child( child_ca_certificate_rsync_uri: format!("rsync://example.test/repo/{name}.cer"), child_ca_certificate_sha256_hex: "00".repeat(32), }, + child_entry_projection: None, } } diff --git a/tests/test_tree_traversal_m14.rs b/tests/test_tree_traversal_m14.rs index 7a37464..37de233 100644 --- a/tests/test_tree_traversal_m14.rs +++ b/tests/test_tree_traversal_m14.rs @@ -96,6 +96,7 @@ fn discovered_child( child_ca_certificate_rsync_uri: format!("rsync://example.test/repo/{name}.cer"), child_ca_certificate_sha256_hex: "00".repeat(32), }, + child_entry_projection: None, } }