20260626 优化PP缓存索引与证书缓存长尾

This commit is contained in:
yuyr 2026-06-26 07:19:11 +08:00
parent af25316d68
commit 9e339e63e7
24 changed files with 3158 additions and 329 deletions

View File

@ -785,6 +785,71 @@
], ],
"title": "State DB File Count Over Time", "title": "State DB File Count Over Time",
"type": "timeseries" "type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"fieldConfig": {
"defaults": {
"unit": "none",
"decimals": 0,
"min": 0
},
"overrides": []
},
"gridPos": {
"x": 0,
"y": 48,
"w": 24,
"h": 8
},
"id": 18,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"expr": "sum by (job, instance, exported_instance) (ours_rp_repo_terminal_state_count{terminal_state=\"fresh\"})",
"legendFormat": "fresh pp",
"refId": "A"
},
{
"expr": "sum by (job, instance, exported_instance) (ours_rp_cir_objects_by_source_type{exported_source=\"fresh\",object_type=\"roa\"})",
"legendFormat": "fresh roa",
"refId": "B"
},
{
"expr": "sum by (job, instance, exported_instance) (ours_rp_cir_objects_by_source_type{exported_source=\"fresh\",object_type=\"manifest\"})",
"legendFormat": "fresh mft",
"refId": "C"
},
{
"expr": "sum by (job, instance, exported_instance) (ours_rp_cir_objects_by_source_type{exported_source=\"fresh\",object_type=\"certificate\"})",
"legendFormat": "fresh crt",
"refId": "D"
},
{
"expr": "sum by (job, instance, exported_instance) (ours_rp_cir_objects_by_source_type{exported_source=\"fresh\",object_type=\"crl\"})",
"legendFormat": "fresh crl",
"refId": "E"
}
],
"title": "Fresh PP / Object Counts by Run",
"type": "timeseries"
} }
], ],
"refresh": "5s", "refresh": "5s",

View File

@ -71,6 +71,10 @@ RPKI_PROGRESS_PP_CONTROL_SLOW_MS=100
# 是否在运行前尝试禁用 rpki-client timer 并杀掉竞争 RP 进程。 # 是否在运行前尝试禁用 rpki-client timer 并杀掉竞争 RP 进程。
DISABLE_COMPETING_RPS=1 DISABLE_COMPETING_RPS=1
# 是否启用实验性 child CRT 验证缓存。独立于 ROA cache / PP cache默认关闭。
# 开启后会额外传入 --enable-child-certificate-validation-cache。
ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE=0
# 传给 rpki 子进程的额外参数。多个参数用空格分隔。 # 传给 rpki 子进程的额外参数。多个参数用空格分隔。
# 示例RPKI_EXTRA_ARGS="--enable-roa-validation-cache" # 示例RPKI_EXTRA_ARGS="--enable-roa-validation-cache"
# 实验性 transport 预热RPKI_EXTRA_ARGS="--enable-transport-request-prefetch --enable-roa-validation-cache" # 实验性 transport 预热RPKI_EXTRA_ARGS="--enable-transport-request-prefetch --enable-roa-validation-cache"

View File

@ -30,6 +30,7 @@ RPKI_PROGRESS_PP_CONTROL_SLOW_MS="${RPKI_PROGRESS_PP_CONTROL_SLOW_MS:-100}"
RPKI_PROGRESS_PP_CACHE_SLOW_MS="${RPKI_PROGRESS_PP_CACHE_SLOW_MS:-50}" RPKI_PROGRESS_PP_CACHE_SLOW_MS="${RPKI_PROGRESS_PP_CACHE_SLOW_MS:-50}"
RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="${RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS:-1000}" RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="${RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS:-1000}"
DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}" DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}"
ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE="${ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE:-0}"
RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}" RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}"
RPKI_ANALYZE="${RPKI_ANALYZE:-0}" RPKI_ANALYZE="${RPKI_ANALYZE:-0}"
@ -571,6 +572,9 @@ build_child_args() {
--vaps-csv-out "{run_out}/vaps.csv" --vaps-csv-out "{run_out}/vaps.csv"
--compare-view-trust-anchor "$(compare_view_trust_anchor)" --compare-view-trust-anchor "$(compare_view_trust_anchor)"
) )
if is_true "$ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE"; then
CHILD_ARGS+=(--enable-child-certificate-validation-cache)
fi
if [[ -n "$RPKI_EXTRA_ARGS" ]]; then if [[ -n "$RPKI_EXTRA_ARGS" ]]; then
# shellcheck disable=SC2206 # shellcheck disable=SC2206
local extra_args=( $RPKI_EXTRA_ARGS ) local extra_args=( $RPKI_EXTRA_ARGS )

View File

@ -52,6 +52,7 @@ use std::sync::Arc;
struct RunStageTiming { struct RunStageTiming {
validation_ms: u64, validation_ms: u64,
enable_roa_validation_cache: bool, enable_roa_validation_cache: bool,
enable_child_certificate_validation_cache: bool,
publication_point_cache_observe_only: bool, publication_point_cache_observe_only: bool,
enable_publication_point_validation_cache: bool, enable_publication_point_validation_cache: bool,
enable_transport_request_prefetch: bool, enable_transport_request_prefetch: bool,
@ -78,6 +79,7 @@ struct RunStageTiming {
analysis_phases: HashMap<String, DurationStats>, analysis_phases: HashMap<String, DurationStats>,
analysis_top_publication_points: Vec<TopDurationEntry>, analysis_top_publication_points: Vec<TopDurationEntry>,
analysis_top_publication_point_steps: Vec<TopDurationEntry>, analysis_top_publication_point_steps: Vec<TopDurationEntry>,
analysis_top_publication_point_cache_steps: Vec<TopDurationEntry>,
vcir_storage_summary_ms: Option<u64>, vcir_storage_summary_ms: Option<u64>,
vcir_storage: Option<VcirStorageSummary>, vcir_storage: Option<VcirStorageSummary>,
publication_point_cache_index_load: Option<crate::storage::PpCacheIndexLoadStats>, publication_point_cache_index_load: Option<crate::storage::PpCacheIndexLoadStats>,
@ -133,6 +135,7 @@ pub struct CliArgs {
pub skip_report_build: bool, pub skip_report_build: bool,
pub skip_vcir_persist: bool, pub skip_vcir_persist: bool,
pub enable_roa_validation_cache: bool, pub enable_roa_validation_cache: bool,
pub enable_child_certificate_validation_cache: bool,
pub publication_point_cache_observe_only: bool, pub publication_point_cache_observe_only: bool,
pub enable_publication_point_validation_cache: bool, pub enable_publication_point_validation_cache: bool,
pub enable_transport_request_prefetch: bool, pub enable_transport_request_prefetch: bool,
@ -192,6 +195,8 @@ Options:
--skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs --skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs
--enable-roa-validation-cache --enable-roa-validation-cache
Reuse accepted ROA validation outputs from previous VCIR records (default: off) Reuse accepted ROA validation outputs from previous VCIR records (default: off)
--enable-child-certificate-validation-cache
Experimental: reuse validated child certificate discovery results
--publication-point-cache-observe-only --publication-point-cache-observe-only
Evaluate publication-point cache eligibility without changing results Evaluate publication-point cache eligibility without changing results
--enable-publication-point-validation-cache --enable-publication-point-validation-cache
@ -278,6 +283,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut skip_report_build: bool = false; let mut skip_report_build: bool = false;
let mut skip_vcir_persist: bool = false; let mut skip_vcir_persist: bool = false;
let mut enable_roa_validation_cache: bool = false; let mut enable_roa_validation_cache: bool = false;
let mut enable_child_certificate_validation_cache: bool = false;
let mut publication_point_cache_observe_only: bool = false; let mut publication_point_cache_observe_only: bool = false;
let mut enable_publication_point_validation_cache: bool = false; let mut enable_publication_point_validation_cache: bool = false;
let mut enable_transport_request_prefetch: bool = false; let mut enable_transport_request_prefetch: bool = false;
@ -486,6 +492,9 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--enable-roa-validation-cache" => { "--enable-roa-validation-cache" => {
enable_roa_validation_cache = true; enable_roa_validation_cache = true;
} }
"--enable-child-certificate-validation-cache" => {
enable_child_certificate_validation_cache = true;
}
"--publication-point-cache-observe-only" => { "--publication-point-cache-observe-only" => {
publication_point_cache_observe_only = true; publication_point_cache_observe_only = true;
} }
@ -925,6 +934,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
skip_report_build, skip_report_build,
skip_vcir_persist, skip_vcir_persist,
enable_roa_validation_cache, enable_roa_validation_cache,
enable_child_certificate_validation_cache,
publication_point_cache_observe_only, publication_point_cache_observe_only,
enable_publication_point_validation_cache, enable_publication_point_validation_cache,
enable_transport_request_prefetch, enable_transport_request_prefetch,
@ -1992,6 +2002,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
persist_vcir: !args.skip_vcir_persist, persist_vcir: !args.skip_vcir_persist,
build_ccr_accumulator: args.ccr_out_path.is_some(), build_ccr_accumulator: args.ccr_out_path.is_some(),
enable_roa_validation_cache: args.enable_roa_validation_cache, enable_roa_validation_cache: args.enable_roa_validation_cache,
enable_child_certificate_validation_cache: args.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: args.publication_point_cache_observe_only, publication_point_cache_observe_only: args.publication_point_cache_observe_only,
enable_publication_point_validation_cache: args.enable_publication_point_validation_cache, enable_publication_point_validation_cache: args.enable_publication_point_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch, enable_transport_request_prefetch: args.enable_transport_request_prefetch,
@ -2487,6 +2498,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms, validation_ms,
enable_roa_validation_cache: args.enable_roa_validation_cache, enable_roa_validation_cache: args.enable_roa_validation_cache,
enable_child_certificate_validation_cache: args.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: args.publication_point_cache_observe_only, publication_point_cache_observe_only: args.publication_point_cache_observe_only,
enable_publication_point_validation_cache: args.enable_publication_point_validation_cache, enable_publication_point_validation_cache: args.enable_publication_point_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch, enable_transport_request_prefetch: args.enable_transport_request_prefetch,
@ -2525,6 +2537,17 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.as_ref() .as_ref()
.map(|report| report.top_publication_point_steps.clone()) .map(|report| report.top_publication_point_steps.clone())
.unwrap_or_default(), .unwrap_or_default(),
analysis_top_publication_point_cache_steps: timing_report_snapshot
.as_ref()
.map(|report| {
report
.top_publication_point_steps
.iter()
.filter(|entry| entry.key.contains("::publication_point_cache_"))
.cloned()
.collect()
})
.unwrap_or_default(),
vcir_storage_summary_ms, vcir_storage_summary_ms,
vcir_storage, vcir_storage,
publication_point_cache_index_load, publication_point_cache_index_load,

View File

@ -147,6 +147,22 @@ fn parse_accepts_enable_roa_validation_cache() {
assert!(args.enable_roa_validation_cache); assert!(args.enable_roa_validation_cache);
} }
#[test]
fn parse_accepts_enable_child_certificate_validation_cache() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--enable-child-certificate-validation-cache".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.enable_child_certificate_validation_cache);
}
#[test] #[test]
fn parse_accepts_publication_point_cache_flags() { fn parse_accepts_publication_point_cache_flags() {
let argv = vec![ let argv = vec![
@ -194,6 +210,7 @@ fn parse_disables_roa_validation_cache_by_default() {
]; ];
let args = parse_args(&argv).expect("parse args"); let args = parse_args(&argv).expect("parse args");
assert!(!args.enable_roa_validation_cache); assert!(!args.enable_roa_validation_cache);
assert!(!args.enable_child_certificate_validation_cache);
assert!(!args.publication_point_cache_observe_only); assert!(!args.publication_point_cache_observe_only);
assert!(!args.enable_publication_point_validation_cache); assert!(!args.enable_publication_point_validation_cache);
assert!(!args.enable_transport_request_prefetch); assert!(!args.enable_transport_request_prefetch);
@ -1602,6 +1619,7 @@ fn run_report_task_and_stage_timing_work() {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms: 1, validation_ms: 1,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -1628,6 +1646,7 @@ fn run_report_task_and_stage_timing_work() {
analysis_phases: std::collections::HashMap::new(), analysis_phases: std::collections::HashMap::new(),
analysis_top_publication_points: Vec::new(), analysis_top_publication_points: Vec::new(),
analysis_top_publication_point_steps: Vec::new(), analysis_top_publication_point_steps: Vec::new(),
analysis_top_publication_point_cache_steps: Vec::new(),
vcir_storage_summary_ms: Some(16), vcir_storage_summary_ms: Some(16),
vcir_storage: Some(VcirStorageSummary { vcir_storage: Some(VcirStorageSummary {
entry_count: 2, entry_count: 2,
@ -1706,6 +1725,7 @@ fn stage_timing_serializes_memory_telemetry() {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms: 1, validation_ms: 1,
enable_roa_validation_cache: true, enable_roa_validation_cache: true,
enable_child_certificate_validation_cache: true,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: true, enable_transport_request_prefetch: true,
@ -1738,6 +1758,7 @@ fn stage_timing_serializes_memory_telemetry() {
analysis_phases: std::collections::HashMap::new(), analysis_phases: std::collections::HashMap::new(),
analysis_top_publication_points: Vec::new(), analysis_top_publication_points: Vec::new(),
analysis_top_publication_point_steps: Vec::new(), analysis_top_publication_point_steps: Vec::new(),
analysis_top_publication_point_cache_steps: Vec::new(),
vcir_storage_summary_ms: None, vcir_storage_summary_ms: None,
vcir_storage: None, vcir_storage: None,
publication_point_cache_index_load: None, publication_point_cache_index_load: None,

View File

@ -710,14 +710,14 @@ mod tests {
}; };
use crate::policy::SyncPreference; use crate::policy::SyncPreference;
use crate::report::Warning; use crate::report::Warning;
use crate::validation::tree::{CaInstanceHandle, DiscoveredChildCaInstance}; use crate::validation::tree::{CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance};
fn sample_ca(manifest: &str) -> CaInstanceHandle { fn sample_ca(manifest: &str) -> CaInstanceHandle {
CaInstanceHandle { CaInstanceHandle {
depth: 0, depth: 0,
tal_id: "arin".to_string(), tal_id: "arin".to_string(),
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: vec![1, 2, 3], ca_certificate: CaCertificateRef::inline_der(vec![1, 2, 3]),
ca_certificate_rsync_uri: None, ca_certificate_rsync_uri: None,
effective_ip_resources: None, effective_ip_resources: None,
effective_as_resources: None, effective_as_resources: None,

View File

@ -16,17 +16,18 @@ use crate::data_model::rc::{AsResourceSet, IpResourceSet};
use config::*; use config::*;
pub use config::{ pub use config::{
ALL_COLUMN_FAMILY_NAMES, CF_MANIFEST_REPLAY_META, CF_PUBLICATION_POINT_CACHE_PROJECTION, ALL_COLUMN_FAMILY_NAMES, CF_CHILD_CERTIFICATE_CACHE_PROJECTION, CF_MANIFEST_REPLAY_META,
CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_PUBLICATION_POINT_CACHE_PROJECTION, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW,
CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_TRANSPORT_PREFETCH, CF_VCIR, CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER,
column_family_descriptors, CF_TRANSPORT_PREFETCH, CF_VCIR, column_family_descriptors,
}; };
use keys::*; use keys::*;
use pack::compute_sha256_32; use pack::compute_sha256_32;
pub use pack::{PackBytes, PackFile, PackTime}; pub use pack::{PackBytes, PackFile, PackTime};
pub use pp_cache_index::{PpCacheIndexLoadStats, PpCacheIndexRefreshStats}; pub use pp_cache_index::{PpCacheIndexLoadStats, PpCacheIndexRefreshStats};
use pp_cache_index::{ use pp_cache_index::{
PpCacheMmapIndexSet, default_pp_cache_index_dir, load_pp_cache_mmap_index_set, PpCacheIndexLookup, PpCacheMmapIndexSet, compact_pp_cache_index, default_pp_cache_index_dir,
load_pp_cache_mmap_index, load_pp_cache_mmap_index_set, pp_cache_index_directory_stats,
write_pp_cache_index_atomic, write_pp_cache_index_segment, write_pp_cache_index_atomic, write_pp_cache_index_segment,
}; };
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -52,11 +53,20 @@ pub enum StorageError {
pub type StorageResult<T> = Result<T, StorageError>; pub type StorageResult<T> = Result<T, StorageError>;
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct ChildCertificateCacheMmapLookup {
pub projections: Vec<Option<ChildCertificateCacheProjection>>,
pub hits: usize,
pub misses: usize,
pub file_bytes: u64,
}
pub struct RocksStore { pub struct RocksStore {
db: DB, db: DB,
external_raw_store: Option<ExternalRawStoreDb>, external_raw_store: Option<ExternalRawStoreDb>,
external_repo_bytes: Option<ExternalRepoBytesDb>, external_repo_bytes: Option<ExternalRepoBytesDb>,
publication_point_cache_index_dir: PathBuf, publication_point_cache_index_dir: PathBuf,
child_certificate_cache_index_dir: PathBuf,
publication_point_cache_projection_index: Mutex<PublicationPointCacheProjectionIndexState>, publication_point_cache_projection_index: Mutex<PublicationPointCacheProjectionIndexState>,
} }
@ -95,6 +105,21 @@ fn process_vm_rss_kb() -> Option<u64> {
}) })
} }
fn default_child_certificate_cache_index_dir(db_path: &Path) -> PathBuf {
let file_name = db_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("work-db");
db_path.with_file_name(format!("{file_name}.child-cert-cache-index"))
}
fn child_certificate_cache_segment_file_name(manifest_rsync_uri: &str) -> String {
format!(
"{}.idx",
hex::encode(compute_sha256_32(manifest_rsync_uri.as_bytes()))
)
}
const ROCKSDB_MEMORY_PROPERTY_NAMES: &[(&str, &str)] = &[ const ROCKSDB_MEMORY_PROPERTY_NAMES: &[(&str, &str)] = &[
("cur_size_all_mem_tables", "rocksdb.cur-size-all-mem-tables"), ("cur_size_all_mem_tables", "rocksdb.cur-size-all-mem-tables"),
("size_all_mem_tables", "rocksdb.size-all-mem-tables"), ("size_all_mem_tables", "rocksdb.size-all-mem-tables"),
@ -116,6 +141,8 @@ const PP_CACHE_RAW_INDEX_ENV: &str = "RPKI_PP_CACHE_RAW_INDEX";
const PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV: &str = const PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV: &str =
"RPKI_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES"; "RPKI_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES";
const DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES: usize = 32 * 1024 * 1024; const DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES: usize = 32 * 1024 * 1024;
const PP_CACHE_INDEX_COMPACTION_SEGMENT_THRESHOLD: usize = 16;
const PP_CACHE_INDEX_COMPACTION_BYTES_THRESHOLD: u64 = 1_610_612_736;
fn pp_cache_raw_index_enabled() -> bool { fn pp_cache_raw_index_enabled() -> bool {
match std::env::var(PP_CACHE_RAW_INDEX_ENV) { match std::env::var(PP_CACHE_RAW_INDEX_ENV) {
@ -1179,6 +1206,216 @@ pub struct RoaCacheProjection {
pub entries: Vec<RoaCacheObjectProjection>, pub entries: Vec<RoaCacheObjectProjection>,
} }
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChildCertificateCacheRouterKeyProjection {
#[serde(rename = "a")]
pub as_id: u32,
#[serde(rename = "s")]
#[serde(with = "serde_byte_vec")]
pub ski: Vec<u8>,
#[serde(rename = "p")]
#[serde(with = "serde_byte_vec")]
pub spki_der: Vec<u8>,
#[serde(rename = "e")]
pub item_effective_until: PackTime,
}
impl ChildCertificateCacheRouterKeyProjection {
pub fn validate_internal(&self) -> StorageResult<()> {
if self.ski.is_empty() {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.router_keys[].ski",
detail: "must not be empty".to_string(),
});
}
if self.spki_der.is_empty() {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.router_keys[].spki_der",
detail: "must not be empty".to_string(),
});
}
parse_time(
"child_certificate_cache_projection.router_keys[].item_effective_until",
&self.item_effective_until,
)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ChildCertificateCachePayload {
ChildCa {
#[serde(rename = "cm")]
child_manifest_rsync_uri: String,
#[serde(rename = "ski")]
child_ski: String,
#[serde(rename = "rb")]
child_rsync_base_uri: String,
#[serde(rename = "pp")]
child_publication_point_rsync_uri: String,
#[serde(rename = "rn")]
child_rrdp_notification_uri: Option<String>,
#[serde(rename = "ip")]
child_effective_ip_resources: Option<IpResourceSet>,
#[serde(rename = "as")]
child_effective_as_resources: Option<AsResourceSet>,
},
Router {
#[serde(rename = "r")]
router_keys: Vec<ChildCertificateCacheRouterKeyProjection>,
},
}
impl ChildCertificateCachePayload {
pub fn validate_internal(&self) -> StorageResult<()> {
match self {
Self::ChildCa {
child_manifest_rsync_uri,
child_ski,
child_rsync_base_uri,
child_publication_point_rsync_uri,
child_rrdp_notification_uri,
..
} => {
validate_non_empty(
"child_certificate_cache_projection.child_manifest_rsync_uri",
child_manifest_rsync_uri,
)?;
validate_non_empty("child_certificate_cache_projection.child_ski", child_ski)?;
validate_non_empty(
"child_certificate_cache_projection.child_rsync_base_uri",
child_rsync_base_uri,
)?;
validate_non_empty(
"child_certificate_cache_projection.child_publication_point_rsync_uri",
child_publication_point_rsync_uri,
)?;
if let Some(uri) = child_rrdp_notification_uri {
validate_non_empty(
"child_certificate_cache_projection.child_rrdp_notification_uri",
uri,
)?;
}
Ok(())
}
Self::Router { router_keys } => {
if router_keys.is_empty() {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.router_keys",
detail: "must not be empty".to_string(),
});
}
for router_key in router_keys {
router_key.validate_internal()?;
}
Ok(())
}
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ChildCertificateCacheProjection {
#[serde(rename = "sv")]
pub schema_version: u32,
#[serde(rename = "av")]
pub algorithm_version: u32,
#[serde(rename = "k")]
pub cache_key_sha256_hex: String,
#[serde(rename = "cu")]
pub child_cert_uri: String,
#[serde(rename = "ch")]
pub child_cert_sha256_hex: String,
#[serde(rename = "cs")]
#[serde(with = "serde_byte_vec")]
pub child_cert_serial: Vec<u8>,
#[serde(rename = "ih")]
pub issuer_ca_sha256_hex: String,
#[serde(rename = "cru")]
pub issuer_crl_uri: String,
#[serde(rename = "crh")]
pub issuer_crl_sha256_hex: String,
#[serde(rename = "pc")]
#[serde(with = "serde_bytes_32")]
pub parent_context_digest: [u8; 32],
#[serde(rename = "pf")]
#[serde(with = "serde_bytes_32")]
pub validation_policy_fingerprint: [u8; 32],
#[serde(rename = "nb")]
pub effective_not_before: PackTime,
#[serde(rename = "nu")]
pub effective_until: PackTime,
#[serde(rename = "p")]
pub payload: ChildCertificateCachePayload,
}
pub const CHILD_CERTIFICATE_CACHE_SCHEMA_VERSION: u32 = 2;
pub const CHILD_CERTIFICATE_CACHE_ALGORITHM_VERSION: u32 = 3;
impl ChildCertificateCacheProjection {
pub fn validate_internal(&self) -> StorageResult<()> {
if self.schema_version != CHILD_CERTIFICATE_CACHE_SCHEMA_VERSION {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.schema_version",
detail: format!("unsupported schema_version {}", self.schema_version),
});
}
if self.algorithm_version != CHILD_CERTIFICATE_CACHE_ALGORITHM_VERSION {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.algorithm_version",
detail: format!("unsupported algorithm_version {}", self.algorithm_version),
});
}
validate_sha256_hex(
"child_certificate_cache_projection.cache_key_sha256_hex",
&self.cache_key_sha256_hex,
)?;
validate_non_empty(
"child_certificate_cache_projection.child_cert_uri",
&self.child_cert_uri,
)?;
validate_sha256_hex(
"child_certificate_cache_projection.child_cert_sha256_hex",
&self.child_cert_sha256_hex,
)?;
if self.child_cert_serial.is_empty() {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.child_cert_serial",
detail: "must not be empty".to_string(),
});
}
validate_sha256_hex(
"child_certificate_cache_projection.issuer_ca_sha256_hex",
&self.issuer_ca_sha256_hex,
)?;
validate_non_empty(
"child_certificate_cache_projection.issuer_crl_uri",
&self.issuer_crl_uri,
)?;
validate_sha256_hex(
"child_certificate_cache_projection.issuer_crl_sha256_hex",
&self.issuer_crl_sha256_hex,
)?;
let effective_not_before = parse_time(
"child_certificate_cache_projection.effective_not_before",
&self.effective_not_before,
)?;
let effective_until = parse_time(
"child_certificate_cache_projection.effective_until",
&self.effective_until,
)?;
if effective_not_before >= effective_until {
return Err(StorageError::InvalidData {
entity: "child_certificate_cache_projection.effective_window",
detail: "effective_not_before must be before effective_until".to_string(),
});
}
self.payload.validate_internal()?;
Ok(())
}
}
pub const PUBLICATION_POINT_CACHE_SCHEMA_VERSION: u32 = 1; pub const PUBLICATION_POINT_CACHE_SCHEMA_VERSION: u32 = 1;
pub const PUBLICATION_POINT_CACHE_ALGORITHM_VERSION: u32 = 1; pub const PUBLICATION_POINT_CACHE_ALGORITHM_VERSION: u32 = 1;
@ -2584,6 +2821,7 @@ impl RocksStore {
external_raw_store: None, external_raw_store: None,
external_repo_bytes: None, external_repo_bytes: None,
publication_point_cache_index_dir: default_pp_cache_index_dir(path), publication_point_cache_index_dir: default_pp_cache_index_dir(path),
child_certificate_cache_index_dir: default_child_certificate_cache_index_dir(path),
publication_point_cache_projection_index: Mutex::new( publication_point_cache_projection_index: Mutex::new(
PublicationPointCacheProjectionIndexState::Uninitialized, PublicationPointCacheProjectionIndexState::Uninitialized,
), ),
@ -2658,6 +2896,59 @@ impl RocksStore {
} }
} }
fn try_load_publication_point_cache_mmap_index_for_update(
&self,
reason: &'static str,
) -> StorageResult<()> {
if !pp_cache_raw_index_enabled() {
return Ok(());
}
let mut guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}"))
})?;
if !matches!(
*guard,
PublicationPointCacheProjectionIndexState::Uninitialized
) {
return Ok(());
}
match load_pp_cache_mmap_index_set(&self.publication_point_cache_index_dir) {
Ok((mmap, stats)) => {
crate::progress_log::emit(
"publication_point_cache_mmap_index_load",
serde_json::json!({
"state": "loaded",
"reason": reason,
"entries": stats.entries,
"bytes": stats.bytes,
"file_bytes": stats.file_bytes,
"load_ms": stats.load_ms,
}),
);
*guard = PublicationPointCacheProjectionIndexState::LoadedMmap {
mmap,
dirty: HashMap::new(),
dirty_bytes: 0,
load_stats: stats,
};
}
Err(e) => {
crate::progress_log::emit(
"publication_point_cache_mmap_index_load",
serde_json::json!({
"state": "deferred_fallback_scan",
"reason": reason,
"error": e.to_string(),
}),
);
}
}
Ok(())
}
fn cf(&self, name: &'static str) -> StorageResult<&ColumnFamily> { fn cf(&self, name: &'static str) -> StorageResult<&ColumnFamily> {
self.db self.db
.cf_handle(name) .cf_handle(name)
@ -3210,6 +3501,226 @@ impl RocksStore {
Ok(Some(projection)) Ok(Some(projection))
} }
pub fn put_child_certificate_cache_projection(
&self,
projection: &ChildCertificateCacheProjection,
) -> StorageResult<()> {
projection.validate_internal()?;
let cf = self.cf(CF_CHILD_CERTIFICATE_CACHE_PROJECTION)?;
let key = child_certificate_cache_projection_key(&projection.cache_key_sha256_hex);
let value = encode_cbor(projection, "child_certificate_cache_projection")?;
self.db
.put_cf(cf, key.as_bytes(), value)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn get_child_certificate_cache_projection(
&self,
cache_key_sha256_hex: &str,
) -> StorageResult<Option<ChildCertificateCacheProjection>> {
validate_sha256_hex(
"child_certificate_cache_projection.cache_key_sha256_hex",
cache_key_sha256_hex,
)?;
let cf = self.cf(CF_CHILD_CERTIFICATE_CACHE_PROJECTION)?;
let key = child_certificate_cache_projection_key(cache_key_sha256_hex);
let Some(bytes) = self
.db
.get_cf(cf, key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?
else {
return Ok(None);
};
let projection = decode_cbor::<ChildCertificateCacheProjection>(
&bytes,
"child_certificate_cache_projection",
)?;
projection.validate_internal()?;
Ok(Some(projection))
}
pub fn get_child_certificate_cache_projections_batch(
&self,
cache_key_sha256_hexes: &[String],
) -> StorageResult<Vec<Option<ChildCertificateCacheProjection>>> {
if cache_key_sha256_hexes.is_empty() {
return Ok(Vec::new());
}
for cache_key_sha256_hex in cache_key_sha256_hexes {
validate_sha256_hex(
"child_certificate_cache_projection.cache_key_sha256_hex",
cache_key_sha256_hex,
)?;
}
let cf = self.cf(CF_CHILD_CERTIFICATE_CACHE_PROJECTION)?;
let keys: Vec<String> = cache_key_sha256_hexes
.iter()
.map(|key| child_certificate_cache_projection_key(key))
.collect();
self.db
.multi_get_cf(keys.iter().map(|key| (cf, key.as_bytes())))
.into_iter()
.map(|res| {
let Some(bytes) = res.map_err(|e| StorageError::RocksDb(e.to_string()))? else {
return Ok(None);
};
let projection = decode_cbor::<ChildCertificateCacheProjection>(
&bytes,
"child_certificate_cache_projection",
)?;
projection.validate_internal()?;
Ok(Some(projection))
})
.collect()
}
fn child_certificate_cache_segment_path(&self, manifest_rsync_uri: &str) -> PathBuf {
self.child_certificate_cache_index_dir
.join(child_certificate_cache_segment_file_name(
manifest_rsync_uri,
))
}
pub fn get_child_certificate_cache_projections_mmap_segment(
&self,
manifest_rsync_uri: &str,
cache_key_sha256_hexes: &[String],
) -> StorageResult<Option<ChildCertificateCacheMmapLookup>> {
if cache_key_sha256_hexes.is_empty() {
return Ok(Some(ChildCertificateCacheMmapLookup::default()));
}
validate_non_empty(
"child_certificate_cache_mmap_segment.manifest_rsync_uri",
manifest_rsync_uri,
)?;
for cache_key_sha256_hex in cache_key_sha256_hexes {
validate_sha256_hex(
"child_certificate_cache_projection.cache_key_sha256_hex",
cache_key_sha256_hex,
)?;
}
let path = self.child_certificate_cache_segment_path(manifest_rsync_uri);
if !path.exists() {
return Ok(None);
}
let (index, _) = load_pp_cache_mmap_index(&path)?;
let file_bytes = index.file_bytes();
let mut hits = 0usize;
let mut misses = 0usize;
let mut projections = Vec::with_capacity(cache_key_sha256_hexes.len());
for cache_key_sha256_hex in cache_key_sha256_hexes {
match index.lookup(cache_key_sha256_hex) {
Some(PpCacheIndexLookup::Hit(bytes)) => {
let projection = decode_cbor::<ChildCertificateCacheProjection>(
bytes,
"child_certificate_cache_projection_mmap_segment",
)?;
projection.validate_internal()?;
hits = hits.saturating_add(1);
projections.push(Some(projection));
}
Some(PpCacheIndexLookup::Deleted) | None => {
misses = misses.saturating_add(1);
projections.push(None);
}
}
}
Ok(Some(ChildCertificateCacheMmapLookup {
projections,
hits,
misses,
file_bytes,
}))
}
pub fn write_child_certificate_cache_mmap_segment(
&self,
manifest_rsync_uri: &str,
projections: &[ChildCertificateCacheProjection],
) -> StorageResult<PpCacheIndexRefreshStats> {
validate_non_empty(
"child_certificate_cache_mmap_segment.manifest_rsync_uri",
manifest_rsync_uri,
)?;
let mut entries = Vec::with_capacity(projections.len());
for projection in projections {
projection.validate_internal()?;
entries.push((
projection.cache_key_sha256_hex.clone(),
encode_cbor(
projection,
"child_certificate_cache_projection_mmap_segment",
)?,
));
}
let path = self.child_certificate_cache_segment_path(manifest_rsync_uri);
write_pp_cache_index_atomic(&path, entries)
}
pub fn write_child_certificate_cache_mmap_segment_overlay(
&self,
manifest_rsync_uri: &str,
cache_key_sha256_hexes: &[String],
projections: &[ChildCertificateCacheProjection],
) -> StorageResult<PpCacheIndexRefreshStats> {
validate_non_empty(
"child_certificate_cache_mmap_segment.manifest_rsync_uri",
manifest_rsync_uri,
)?;
let mut dirty = HashMap::<String, Vec<u8>>::with_capacity(projections.len());
for projection in projections {
projection.validate_internal()?;
dirty.insert(
projection.cache_key_sha256_hex.clone(),
encode_cbor(
projection,
"child_certificate_cache_projection_mmap_segment",
)?,
);
}
let path = self.child_certificate_cache_segment_path(manifest_rsync_uri);
let existing = if path.exists() {
Some(load_pp_cache_mmap_index(&path)?.0)
} else {
None
};
let mut entries = Vec::with_capacity(cache_key_sha256_hexes.len());
let mut emitted = HashSet::<String>::new();
for cache_key_sha256_hex in cache_key_sha256_hexes {
validate_sha256_hex(
"child_certificate_cache_projection.cache_key_sha256_hex",
cache_key_sha256_hex,
)?;
if !emitted.insert(cache_key_sha256_hex.clone()) {
continue;
}
if let Some(value) = dirty.remove(cache_key_sha256_hex) {
entries.push((cache_key_sha256_hex.clone(), value));
continue;
}
if let Some(existing) = existing.as_ref() {
if let Some(PpCacheIndexLookup::Hit(bytes)) = existing.lookup(cache_key_sha256_hex)
{
entries.push((cache_key_sha256_hex.clone(), bytes.to_vec()));
}
}
}
for (key, value) in dirty {
if emitted.insert(key.clone()) {
entries.push((key, value));
}
}
write_pp_cache_index_atomic(&path, entries)
}
pub fn get_publication_point_cache_projection( pub fn get_publication_point_cache_projection(
&self, &self,
manifest_rsync_uri: &str, manifest_rsync_uri: &str,
@ -3322,14 +3833,22 @@ impl RocksStore {
} }
PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => { PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => {
if let Some(bytes) = dirty.get(manifest_rsync_uri).cloned() { if let Some(bytes) = dirty.get(manifest_rsync_uri).cloned() {
if bytes.is_empty() {
return Ok(None);
}
owned_bytes = Some(bytes); owned_bytes = Some(bytes);
} else if let Some(bytes) = mmap.get(manifest_rsync_uri) { } else if let Some(lookup) = mmap.lookup(manifest_rsync_uri) {
let projection = decode_cbor::<PublicationPointCacheProjection>( match lookup {
bytes, PpCacheIndexLookup::Hit(bytes) => {
"publication_point_cache_projection", let projection = decode_cbor::<PublicationPointCacheProjection>(
)?; bytes,
projection.validate_internal()?; "publication_point_cache_projection",
return Ok(Some(projection)); )?;
projection.validate_internal()?;
return Ok(Some(projection));
}
PpCacheIndexLookup::Deleted => return Ok(None),
}
} }
} }
PublicationPointCacheProjectionIndexState::Disabled PublicationPointCacheProjectionIndexState::Disabled
@ -3426,6 +3945,7 @@ impl RocksStore {
&self, &self,
projection: &PublicationPointCacheProjection, projection: &PublicationPointCacheProjection,
) -> StorageResult<()> { ) -> StorageResult<()> {
self.try_load_publication_point_cache_mmap_index_for_update("write")?;
let mut guard = self let mut guard = self
.publication_point_cache_projection_index .publication_point_cache_projection_index
.lock() .lock()
@ -3485,76 +4005,42 @@ impl RocksStore {
&self, &self,
manifest_rsync_uri: &str, manifest_rsync_uri: &str,
) -> StorageResult<()> { ) -> StorageResult<()> {
let mut invalidate_mmap_files = false; self.try_load_publication_point_cache_mmap_index_for_update("delete")?;
{ let mut guard = self
let mut guard = self .publication_point_cache_projection_index
.publication_point_cache_projection_index .lock()
.lock() .map_err(|e| {
.map_err(|e| { StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}"))
StorageError::RocksDb(format!( })?;
"publication point cache index lock poisoned: {e}" match &mut *guard {
)) PublicationPointCacheProjectionIndexState::Loaded {
})?; index,
match &mut *guard { bytes: total_bytes,
PublicationPointCacheProjectionIndexState::Loaded { } => {
index, if let Some(previous) = index.remove(manifest_rsync_uri) {
bytes: total_bytes, *total_bytes = total_bytes.saturating_sub(previous.len());
} => {
if let Some(previous) = index.remove(manifest_rsync_uri) {
*total_bytes = total_bytes.saturating_sub(previous.len());
}
} }
PublicationPointCacheProjectionIndexState::BuildingFromEmpty { }
index, PublicationPointCacheProjectionIndexState::BuildingFromEmpty {
bytes: total_bytes, index,
.. bytes: total_bytes,
} => { ..
if let Some(previous) = index.remove(manifest_rsync_uri) { } => {
*total_bytes = total_bytes.saturating_sub(previous.len()); if let Some(previous) = index.remove(manifest_rsync_uri) {
} *total_bytes = total_bytes.saturating_sub(previous.len());
} }
PublicationPointCacheProjectionIndexState::LoadedMmap { .. } => { }
*guard = PublicationPointCacheProjectionIndexState::Disabled; PublicationPointCacheProjectionIndexState::LoadedMmap {
invalidate_mmap_files = true; dirty, dirty_bytes, ..
} => {
if let Some(previous) =
dirty.insert(manifest_rsync_uri.to_string(), Arc::<[u8]>::from([]))
{
*dirty_bytes = dirty_bytes.saturating_sub(previous.len());
} }
PublicationPointCacheProjectionIndexState::Uninitialized
| PublicationPointCacheProjectionIndexState::Disabled => {}
}
}
if invalidate_mmap_files {
self.remove_publication_point_cache_mmap_index_files()?;
let mut guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!(
"publication point cache index lock poisoned: {e}"
))
})?;
if matches!(*guard, PublicationPointCacheProjectionIndexState::Disabled) {
*guard = PublicationPointCacheProjectionIndexState::Uninitialized;
}
}
Ok(())
}
fn remove_publication_point_cache_mmap_index_files(&self) -> StorageResult<()> {
let dir = &self.publication_point_cache_index_dir;
if !dir.exists() {
return Ok(());
}
for entry in std::fs::read_dir(dir).map_err(|e| StorageError::RocksDb(e.to_string()))? {
let entry = entry.map_err(|e| StorageError::RocksDb(e.to_string()))?;
let path = entry.path();
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if file_name == "current.idx"
|| (file_name.starts_with("segment-") && file_name.ends_with(".idx"))
{
std::fs::remove_file(&path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
} }
PublicationPointCacheProjectionIndexState::Uninitialized
| PublicationPointCacheProjectionIndexState::Disabled => {}
} }
Ok(()) Ok(())
} }
@ -3565,6 +4051,7 @@ impl RocksStore {
if !pp_cache_raw_index_enabled() { if !pp_cache_raw_index_enabled() {
return Ok(None); return Ok(None);
} }
self.try_load_publication_point_cache_mmap_index_for_update("refresh")?;
enum RefreshAction { enum RefreshAction {
Entries { Entries {
entries: Vec<(String, Vec<u8>)>, entries: Vec<(String, Vec<u8>)>,
@ -3647,6 +4134,81 @@ impl RocksStore {
"write_ms": stats.write_ms, "write_ms": stats.write_ms,
}), }),
); );
let directory_stats =
pp_cache_index_directory_stats(&self.publication_point_cache_index_dir)?;
let compaction_reason = if directory_stats.segment_count
>= PP_CACHE_INDEX_COMPACTION_SEGMENT_THRESHOLD
{
Some(format!(
"segment_count>={PP_CACHE_INDEX_COMPACTION_SEGMENT_THRESHOLD}"
))
} else if directory_stats.total_file_bytes >= PP_CACHE_INDEX_COMPACTION_BYTES_THRESHOLD {
Some(format!(
"total_file_bytes>={PP_CACHE_INDEX_COMPACTION_BYTES_THRESHOLD}"
))
} else {
None
};
if let Some(reason) = compaction_reason {
crate::progress_log::emit(
"publication_point_cache_mmap_index_compaction",
serde_json::json!({
"state": "started",
"reason": reason,
"segment_count": directory_stats.segment_count,
"total_file_bytes": directory_stats.total_file_bytes,
}),
);
match compact_pp_cache_index(&self.publication_point_cache_index_dir) {
Ok(mut compact_stats) => {
compact_stats.compaction_reason = Some(reason);
compact_stats.old_entries = stats.old_entries;
compact_stats.dirty_entries = stats.dirty_entries;
crate::progress_log::emit(
"publication_point_cache_mmap_index_compaction",
serde_json::json!({
"state": "completed",
"reason": compact_stats.compaction_reason,
"segments_before": compact_stats.compaction_segments_before,
"total_file_bytes_before": compact_stats.compaction_total_file_bytes_before,
"live_entries": compact_stats.compaction_live_entries,
"file_bytes": compact_stats.compaction_file_bytes,
"reclaimed_bytes": compact_stats.compaction_reclaimed_bytes,
"deleted_segments": compact_stats.compaction_deleted_segments,
"compaction_ms": compact_stats.compaction_ms,
}),
);
stats.compaction_triggered = compact_stats.compaction_triggered;
stats.compaction_reason = compact_stats.compaction_reason;
stats.compaction_segments_before = compact_stats.compaction_segments_before;
stats.compaction_total_file_bytes_before =
compact_stats.compaction_total_file_bytes_before;
stats.compaction_live_entries = compact_stats.compaction_live_entries;
stats.compaction_file_bytes = compact_stats.compaction_file_bytes;
stats.compaction_reclaimed_bytes = compact_stats.compaction_reclaimed_bytes;
stats.compaction_ms = compact_stats.compaction_ms;
stats.compaction_deleted_segments = compact_stats.compaction_deleted_segments;
}
Err(e) => {
let error = e.to_string();
crate::progress_log::emit(
"publication_point_cache_mmap_index_compaction",
serde_json::json!({
"state": "failed",
"reason": reason,
"segment_count": directory_stats.segment_count,
"total_file_bytes": directory_stats.total_file_bytes,
"error": error,
}),
);
stats.compaction_triggered = true;
stats.compaction_reason = Some(reason);
stats.compaction_segments_before = directory_stats.segment_count;
stats.compaction_total_file_bytes_before = directory_stats.total_file_bytes;
stats.compaction_error = Some(error);
}
}
}
let mut guard = self let mut guard = self
.publication_point_cache_projection_index .publication_point_cache_projection_index
.lock() .lock()

View File

@ -7,6 +7,7 @@ pub const CF_VCIR: &str = "vcir";
pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta"; pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta";
pub const CF_ROA_CACHE_PROJECTION: &str = "roa_cache_projection"; pub const CF_ROA_CACHE_PROJECTION: &str = "roa_cache_projection";
pub const CF_PUBLICATION_POINT_CACHE_PROJECTION: &str = "publication_point_cache_projection"; pub const CF_PUBLICATION_POINT_CACHE_PROJECTION: &str = "publication_point_cache_projection";
pub const CF_CHILD_CERTIFICATE_CACHE_PROJECTION: &str = "child_certificate_cache_projection";
pub const CF_RRDP_SOURCE: &str = "rrdp_source"; pub const CF_RRDP_SOURCE: &str = "rrdp_source";
pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member"; pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member";
pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner"; pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner";
@ -20,6 +21,7 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[
CF_MANIFEST_REPLAY_META, CF_MANIFEST_REPLAY_META,
CF_ROA_CACHE_PROJECTION, CF_ROA_CACHE_PROJECTION,
CF_PUBLICATION_POINT_CACHE_PROJECTION, CF_PUBLICATION_POINT_CACHE_PROJECTION,
CF_CHILD_CERTIFICATE_CACHE_PROJECTION,
CF_RRDP_SOURCE, CF_RRDP_SOURCE,
CF_RRDP_SOURCE_MEMBER, CF_RRDP_SOURCE_MEMBER,
CF_RRDP_URI_OWNER, CF_RRDP_URI_OWNER,
@ -34,6 +36,8 @@ pub(super) const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:"
pub(super) const ROA_CACHE_PROJECTION_KEY_PREFIX: &str = "roa_cache_projection:"; pub(super) const ROA_CACHE_PROJECTION_KEY_PREFIX: &str = "roa_cache_projection:";
pub(super) const PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX: &str = pub(super) const PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX: &str =
"publication_point_cache_projection:"; "publication_point_cache_projection:";
pub(super) const CHILD_CERTIFICATE_CACHE_PROJECTION_KEY_PREFIX: &str =
"child_certificate_cache_projection:";
pub(super) const RRDP_SOURCE_KEY_PREFIX: &str = "rrdp_source:"; pub(super) const RRDP_SOURCE_KEY_PREFIX: &str = "rrdp_source:";
pub(super) const RRDP_SOURCE_MEMBER_KEY_PREFIX: &str = "rrdp_source_member:"; pub(super) const RRDP_SOURCE_MEMBER_KEY_PREFIX: &str = "rrdp_source_member:";
pub(super) const RRDP_URI_OWNER_KEY_PREFIX: &str = "rrdp_uri_owner:"; pub(super) const RRDP_URI_OWNER_KEY_PREFIX: &str = "rrdp_uri_owner:";

View File

@ -38,6 +38,10 @@ pub(super) fn publication_point_cache_projection_key(manifest_rsync_uri: &str) -
format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}") format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}")
} }
pub(super) fn child_certificate_cache_projection_key(cache_key_sha256_hex: &str) -> String {
format!("{CHILD_CERTIFICATE_CACHE_PROJECTION_KEY_PREFIX}{cache_key_sha256_hex}")
}
pub(super) fn publication_point_cache_projection_key_manifest_uri(key: &[u8]) -> Option<String> { pub(super) fn publication_point_cache_projection_key_manifest_uri(key: &[u8]) -> Option<String> {
let key = std::str::from_utf8(key).ok()?; let key = std::str::from_utf8(key).ok()?;
key.strip_prefix(PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX) key.strip_prefix(PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX)

View File

@ -32,6 +32,22 @@ pub struct PpCacheIndexRefreshStats {
pub new_entries: usize, pub new_entries: usize,
pub file_bytes: u64, pub file_bytes: u64,
pub write_ms: u64, pub write_ms: u64,
pub compaction_triggered: bool,
pub compaction_reason: Option<String>,
pub compaction_segments_before: usize,
pub compaction_total_file_bytes_before: u64,
pub compaction_live_entries: usize,
pub compaction_file_bytes: u64,
pub compaction_reclaimed_bytes: u64,
pub compaction_ms: u64,
pub compaction_deleted_segments: usize,
pub compaction_error: Option<String>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct PpCacheIndexDirectoryStats {
pub segment_count: usize,
pub total_file_bytes: u64,
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -40,6 +56,12 @@ struct PpCacheIndexEntry {
value_len: usize, value_len: usize,
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PpCacheIndexLookup<'a> {
Hit(&'a [u8]),
Deleted,
}
#[derive(Debug)] #[derive(Debug)]
pub struct PpCacheMmapIndex { pub struct PpCacheMmapIndex {
mmap: Arc<Mmap>, mmap: Arc<Mmap>,
@ -126,12 +148,23 @@ impl PpCacheMmapIndex {
}) })
} }
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> { pub fn lookup(&self, manifest_rsync_uri: &str) -> Option<PpCacheIndexLookup<'_>> {
let entry = self.entries.get(manifest_rsync_uri)?; let entry = self.entries.get(manifest_rsync_uri)?;
if entry.value_len == 0 {
return Some(PpCacheIndexLookup::Deleted);
}
let blob_offset = read_u64(self.mmap.as_ref(), 40).ok()? as usize; let blob_offset = read_u64(self.mmap.as_ref(), 40).ok()? as usize;
let start = blob_offset + entry.value_offset; let start = blob_offset + entry.value_offset;
let end = start + entry.value_len; let end = start + entry.value_len;
Some(&self.mmap[start..end]) Some(PpCacheIndexLookup::Hit(&self.mmap[start..end]))
}
#[cfg(test)]
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> {
match self.lookup(manifest_rsync_uri)? {
PpCacheIndexLookup::Hit(bytes) => Some(bytes),
PpCacheIndexLookup::Deleted => None,
}
} }
pub fn entries(&self) -> usize { pub fn entries(&self) -> usize {
@ -156,10 +189,18 @@ pub struct PpCacheMmapIndexSet {
} }
impl PpCacheMmapIndexSet { impl PpCacheMmapIndexSet {
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> { pub fn lookup(&self, manifest_rsync_uri: &str) -> Option<PpCacheIndexLookup<'_>> {
self.indexes self.indexes
.iter() .iter()
.find_map(|index| index.get(manifest_rsync_uri)) .find_map(|index| index.lookup(manifest_rsync_uri))
}
#[cfg(test)]
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> {
match self.lookup(manifest_rsync_uri)? {
PpCacheIndexLookup::Hit(bytes) => Some(bytes),
PpCacheIndexLookup::Deleted => None,
}
} }
pub fn entries(&self) -> usize { pub fn entries(&self) -> usize {
@ -259,6 +300,84 @@ pub fn load_pp_cache_mmap_index_set(
Ok((set, stats)) Ok((set, stats))
} }
pub fn pp_cache_index_directory_stats(dir: &Path) -> StorageResult<PpCacheIndexDirectoryStats> {
let mut stats = PpCacheIndexDirectoryStats::default();
if !dir.exists() {
return Ok(stats);
}
for entry in fs::read_dir(dir).map_err(|e| StorageError::RocksDb(e.to_string()))? {
let path = entry
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.path();
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
let is_index =
name == "current.idx" || (name.starts_with("segment-") && name.ends_with(".idx"));
if !is_index {
continue;
}
let len = fs::metadata(&path)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.len();
stats.total_file_bytes = stats.total_file_bytes.saturating_add(len);
if name.starts_with("segment-") && name.ends_with(".idx") {
stats.segment_count += 1;
}
}
Ok(stats)
}
pub fn compact_pp_cache_index(dir: &Path) -> StorageResult<PpCacheIndexRefreshStats> {
let started = Instant::now();
fs::create_dir_all(dir).map_err(|e| StorageError::RocksDb(e.to_string()))?;
let before = pp_cache_index_directory_stats(dir)?;
let (index_set, _) = load_pp_cache_mmap_index_set(dir)?;
let mut compact_entries = Vec::with_capacity(index_set.entries());
let mut seen = HashMap::<String, ()>::with_capacity(index_set.entries());
for index in &index_set.indexes {
for (key, entry) in &index.entries {
if seen.insert(key.clone(), ()).is_some() {
continue;
}
if entry.value_len == 0 {
continue;
}
let blob_offset = read_u64(index.mmap.as_ref(), 40)? as usize;
let start = blob_offset + entry.value_offset;
let end = start + entry.value_len;
compact_entries.push((key.clone(), index.mmap[start..end].to_vec()));
}
}
let current = dir.join("current.idx");
let mut stats = write_pp_cache_index_atomic(&current, compact_entries)?;
stats.state = "compacted".to_string();
stats.compaction_triggered = true;
stats.compaction_segments_before = before.segment_count;
stats.compaction_total_file_bytes_before = before.total_file_bytes;
stats.compaction_live_entries = stats.new_entries;
stats.compaction_file_bytes = stats.file_bytes;
stats.compaction_reclaimed_bytes = before.total_file_bytes.saturating_sub(stats.file_bytes);
let mut deleted_segments = 0usize;
for entry in fs::read_dir(dir).map_err(|e| StorageError::RocksDb(e.to_string()))? {
let path = entry
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.path();
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if name.starts_with("segment-") && name.ends_with(".idx") {
fs::remove_file(&path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
deleted_segments += 1;
}
}
stats.compaction_deleted_segments = deleted_segments;
stats.compaction_ms = started.elapsed().as_millis() as u64;
Ok(stats)
}
pub fn write_pp_cache_index_segment<I>( pub fn write_pp_cache_index_segment<I>(
dir: &Path, dir: &Path,
entries: I, entries: I,
@ -316,6 +435,7 @@ where
new_entries: ordered.len(), new_entries: ordered.len(),
file_bytes, file_bytes,
write_ms: started.elapsed().as_millis() as u64, write_ms: started.elapsed().as_millis() as u64,
..PpCacheIndexRefreshStats::default()
}) })
} }
@ -465,4 +585,87 @@ mod tests {
assert_eq!(stats.entries, 2); assert_eq!(stats.entries, 2);
assert_eq!(set.get("rsync://example.test/a.mft"), Some(&b"new"[..])); assert_eq!(set.get("rsync://example.test/a.mft"), Some(&b"new"[..]));
} }
#[test]
fn pp_cache_index_set_tombstone_shadows_older_entry() {
let dir = tempfile::tempdir().expect("tempdir");
let current = dir.path().join("current.idx");
write_pp_cache_index_atomic(
&current,
vec![("rsync://example.test/a.mft".to_string(), b"old".to_vec())],
)
.expect("write current index");
write_pp_cache_index_segment(
dir.path(),
vec![("rsync://example.test/a.mft".to_string(), Vec::new())],
)
.expect("write tombstone segment index");
let (set, stats) = load_pp_cache_mmap_index_set(dir.path()).expect("load index set");
assert_eq!(stats.entries, 2);
assert_eq!(
set.lookup("rsync://example.test/a.mft"),
Some(PpCacheIndexLookup::Deleted)
);
assert_eq!(set.get("rsync://example.test/a.mft"), None);
}
#[test]
fn pp_cache_index_compaction_keeps_latest_values_and_removes_segments() {
let dir = tempfile::tempdir().expect("tempdir");
let current = dir.path().join("current.idx");
write_pp_cache_index_atomic(
&current,
vec![
("rsync://example.test/a.mft".to_string(), b"old-a".to_vec()),
("rsync://example.test/b.mft".to_string(), b"old-b".to_vec()),
("rsync://example.test/c.mft".to_string(), b"old-c".to_vec()),
],
)
.expect("write current index");
write_pp_cache_index_segment(
dir.path(),
vec![
("rsync://example.test/a.mft".to_string(), b"new-a".to_vec()),
("rsync://example.test/b.mft".to_string(), Vec::new()),
],
)
.expect("write first segment");
write_pp_cache_index_segment(
dir.path(),
vec![("rsync://example.test/d.mft".to_string(), b"new-d".to_vec())],
)
.expect("write second segment");
let before = pp_cache_index_directory_stats(dir.path()).expect("stats before");
assert_eq!(before.segment_count, 2);
let stats = compact_pp_cache_index(dir.path()).expect("compact");
assert!(stats.compaction_triggered);
assert_eq!(stats.compaction_segments_before, 2);
assert_eq!(stats.compaction_live_entries, 3);
assert_eq!(stats.compaction_deleted_segments, 2);
assert!(stats.compaction_reclaimed_bytes > 0);
let after = pp_cache_index_directory_stats(dir.path()).expect("stats after");
assert_eq!(after.segment_count, 0);
let segments = fs::read_dir(dir.path())
.expect("read dir")
.filter_map(Result::ok)
.filter(|entry| {
entry
.file_name()
.to_str()
.is_some_and(|name| name.starts_with("segment-"))
})
.count();
assert_eq!(segments, 0);
let (index, _) = load_pp_cache_mmap_index(&current).expect("load compacted current");
assert_eq!(index.entries(), 3);
assert_eq!(index.get("rsync://example.test/a.mft"), Some(&b"new-a"[..]));
assert_eq!(index.get("rsync://example.test/b.mft"), None);
assert_eq!(index.get("rsync://example.test/c.mft"), Some(&b"old-c"[..]));
assert_eq!(index.get("rsync://example.test/d.mft"), Some(&b"new-d"[..]));
}
} }

View File

@ -14,6 +14,37 @@ fn sha256_32(input: &[u8]) -> [u8; 32] {
compute_sha256_32(input) compute_sha256_32(input)
} }
fn sample_child_certificate_cache_projection(
cache_key_sha256_hex: String,
child_cert_uri: &str,
child_cert_sha256_hex: &str,
) -> ChildCertificateCacheProjection {
ChildCertificateCacheProjection {
schema_version: CHILD_CERTIFICATE_CACHE_SCHEMA_VERSION,
algorithm_version: CHILD_CERTIFICATE_CACHE_ALGORITHM_VERSION,
cache_key_sha256_hex,
child_cert_uri: child_cert_uri.to_string(),
child_cert_sha256_hex: child_cert_sha256_hex.to_string(),
child_cert_serial: vec![1],
issuer_ca_sha256_hex: sha256_hex(b"issuer-ca"),
issuer_crl_uri: "rsync://example.test/repo/issuer.crl".to_string(),
issuer_crl_sha256_hex: sha256_hex(b"issuer-crl"),
parent_context_digest: sha256_32(b"parent-context"),
validation_policy_fingerprint: sha256_32(b"policy"),
effective_not_before: pack_time(0),
effective_until: pack_time(24),
payload: ChildCertificateCachePayload::ChildCa {
child_manifest_rsync_uri: format!("{child_cert_uri}.mft"),
child_ski: "11".repeat(20),
child_rsync_base_uri: "rsync://example.test/repo/child/".to_string(),
child_publication_point_rsync_uri: "rsync://example.test/repo/child/".to_string(),
child_rrdp_notification_uri: Some("https://example.test/notify.xml".to_string()),
child_effective_ip_resources: None,
child_effective_as_resources: None,
},
}
}
#[test] #[test]
fn parse_work_db_blob_mode_accepts_supported_values() { fn parse_work_db_blob_mode_accepts_supported_values() {
assert_eq!(default_work_db_blob_mode(), WorkDbBlobMode::Disabled); assert_eq!(default_work_db_blob_mode(), WorkDbBlobMode::Disabled);
@ -565,6 +596,128 @@ fn publication_point_cache_mmap_index_dirty_overlay_wins_and_refreshes_segment()
assert_eq!(got.manifest_sha256, sha256_32(b"manifest-new")); assert_eq!(got.manifest_sha256, sha256_32(b"manifest-new"));
} }
#[test]
fn publication_point_cache_mmap_index_write_before_read_refreshes_segment() {
let td = tempfile::tempdir().expect("tempdir");
let db_path = td.path().join("work-db");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let mut projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest-old"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
{
let store = RocksStore::open(&db_path).expect("open rocksdb");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put old projection");
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh base mmap index")
.expect("refresh stats");
assert_eq!(stats.state, "written");
}
{
let store = RocksStore::open(&db_path).expect("reopen rocksdb");
projection.manifest_sha256 = sha256_32(b"manifest-new");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put new projection before any cached read");
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh dirty mmap segment")
.expect("refresh stats");
assert_eq!(stats.state, "segment_written");
assert_eq!(stats.dirty_entries, 1);
assert_eq!(stats.new_entries, 1);
}
let store = RocksStore::open(&db_path).expect("reopen rocksdb after segment");
let got = store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get refreshed projection")
.expect("projection exists");
assert_eq!(got.manifest_sha256, sha256_32(b"manifest-new"));
}
#[test]
fn publication_point_cache_mmap_index_delete_before_read_refreshes_tombstone_segment() {
let td = tempfile::tempdir().expect("tempdir");
let db_path = td.path().join("work-db");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
{
let store = RocksStore::open(&db_path).expect("open rocksdb");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put projection");
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh base mmap index")
.expect("refresh stats");
assert_eq!(stats.state, "written");
}
{
let store = RocksStore::open(&db_path).expect("reopen rocksdb");
store
.replace_vcir_manifest_replay_meta_and_projection_action(
&vcir,
None,
PublicationPointCacheProjectionWriteAction::Delete {
manifest_rsync_uri: &vcir.manifest_rsync_uri,
},
)
.expect("delete projection before any cached read");
assert!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("dirty tombstone lookup")
.is_none()
);
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh tombstone mmap segment")
.expect("refresh stats");
assert_eq!(stats.state, "segment_written");
assert_eq!(stats.dirty_entries, 1);
assert_eq!(stats.new_entries, 1);
}
let store = RocksStore::open(&db_path).expect("reopen rocksdb after tombstone segment");
assert!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("tombstone shadows old current index")
.is_none()
);
assert!(
store
.get_publication_point_cache_projection(&vcir.manifest_rsync_uri)
.expect("direct db lookup")
.is_none()
);
}
#[test] #[test]
fn publication_point_cache_projection_rejects_version_mismatch() { fn publication_point_cache_projection_rejects_version_mismatch() {
let vcir = sample_vcir("rsync://example.test/repo/current.mft"); let vcir = sample_vcir("rsync://example.test/repo/current.mft");
@ -1364,6 +1517,196 @@ fn replace_vcir_and_manifest_replay_meta_replaces_current_entry() {
); );
} }
#[test]
fn get_child_certificate_cache_projections_batch_preserves_order_and_misses() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let first_key = sha256_hex(b"child-cache-key-first");
let missing_key = sha256_hex(b"child-cache-key-missing");
let second_key = sha256_hex(b"child-cache-key-second");
let first_hash = sha256_hex(b"first-child-cert");
let second_hash = sha256_hex(b"second-child-cert");
let first = sample_child_certificate_cache_projection(
first_key.clone(),
"rsync://example.test/repo/first.cer",
&first_hash,
);
let second = sample_child_certificate_cache_projection(
second_key.clone(),
"rsync://example.test/repo/second.cer",
&second_hash,
);
store
.put_child_certificate_cache_projection(&first)
.expect("put first projection");
store
.put_child_certificate_cache_projection(&second)
.expect("put second projection");
let got = store
.get_child_certificate_cache_projections_batch(&[
second_key.clone(),
missing_key,
first_key.clone(),
])
.expect("batch get child projections");
assert_eq!(got.len(), 3);
assert_eq!(
got[0]
.as_ref()
.expect("second projection")
.child_cert_sha256_hex,
second_hash
);
assert!(got[1].is_none());
assert_eq!(
got[2]
.as_ref()
.expect("first projection")
.child_cert_sha256_hex,
first_hash
);
}
#[test]
fn child_certificate_cache_mmap_segment_preserves_order_and_misses() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let manifest_uri = "rsync://example.test/repo/parent.mft";
let first_key = sha256_hex(b"child-cache-mmap-key-first");
let missing_key = sha256_hex(b"child-cache-mmap-key-missing");
let second_key = sha256_hex(b"child-cache-mmap-key-second");
let first_hash = sha256_hex(b"first-child-cert-mmap");
let second_hash = sha256_hex(b"second-child-cert-mmap");
let first = sample_child_certificate_cache_projection(
first_key.clone(),
"rsync://example.test/repo/first.cer",
&first_hash,
);
let second = sample_child_certificate_cache_projection(
second_key.clone(),
"rsync://example.test/repo/second.cer",
&second_hash,
);
assert!(
store
.get_child_certificate_cache_projections_mmap_segment(
manifest_uri,
&[first_key.clone()]
)
.expect("missing segment lookup")
.is_none()
);
let write_stats = store
.write_child_certificate_cache_mmap_segment(manifest_uri, &[first.clone(), second.clone()])
.expect("write child projection segment");
assert_eq!(write_stats.new_entries, 2);
assert!(write_stats.file_bytes > 0);
let got = store
.get_child_certificate_cache_projections_mmap_segment(
manifest_uri,
&[second_key.clone(), missing_key, first_key.clone()],
)
.expect("lookup child projection segment")
.expect("segment exists");
assert_eq!(got.hits, 2);
assert_eq!(got.misses, 1);
assert!(got.file_bytes > 0);
assert_eq!(got.projections.len(), 3);
assert_eq!(
got.projections[0]
.as_ref()
.expect("second projection")
.child_cert_sha256_hex,
second_hash
);
assert!(got.projections[1].is_none());
assert_eq!(
got.projections[2]
.as_ref()
.expect("first projection")
.child_cert_sha256_hex,
first_hash
);
}
#[test]
fn child_certificate_cache_mmap_segment_overlay_preserves_existing_values() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let manifest_uri = "rsync://example.test/repo/parent-overlay.mft";
let first_key = sha256_hex(b"child-cache-overlay-key-first");
let second_key = sha256_hex(b"child-cache-overlay-key-second");
let missing_key = sha256_hex(b"child-cache-overlay-key-missing");
let first_hash = sha256_hex(b"first-child-cert-overlay");
let old_second_hash = sha256_hex(b"old-second-child-cert-overlay");
let new_second_hash = sha256_hex(b"new-second-child-cert-overlay");
let first = sample_child_certificate_cache_projection(
first_key.clone(),
"rsync://example.test/repo/first-overlay.cer",
&first_hash,
);
let old_second = sample_child_certificate_cache_projection(
second_key.clone(),
"rsync://example.test/repo/second-overlay.cer",
&old_second_hash,
);
let new_second = sample_child_certificate_cache_projection(
second_key.clone(),
"rsync://example.test/repo/second-overlay.cer",
&new_second_hash,
);
store
.write_child_certificate_cache_mmap_segment(
manifest_uri,
&[first.clone(), old_second.clone()],
)
.expect("write initial segment");
let stats = store
.write_child_certificate_cache_mmap_segment_overlay(
manifest_uri,
&[first_key.clone(), second_key.clone(), missing_key.clone()],
std::slice::from_ref(&new_second),
)
.expect("write segment overlay");
assert_eq!(stats.new_entries, 2);
let got = store
.get_child_certificate_cache_projections_mmap_segment(
manifest_uri,
&[first_key.clone(), second_key.clone(), missing_key],
)
.expect("lookup segment")
.expect("segment exists");
assert_eq!(got.hits, 2);
assert_eq!(got.misses, 1);
assert_eq!(
got.projections[0]
.as_ref()
.expect("first projection")
.child_cert_sha256_hex,
first_hash
);
assert_eq!(
got.projections[1]
.as_ref()
.expect("updated second projection")
.child_cert_sha256_hex,
new_second_hash
);
assert!(got.projections[2].is_none());
}
#[test] #[test]
fn storage_helpers_cover_optional_validation_paths() { fn storage_helpers_cover_optional_validation_paths() {
let withdrawn = RepositoryViewEntry { let withdrawn = RepositoryViewEntry {

View File

@ -213,6 +213,9 @@ pub enum ManifestFreshError {
#[error("manifest file hash mismatch: {rsync_uri} (RFC 9286 §6.5; RFC 9286 §6.6)")] #[error("manifest file hash mismatch: {rsync_uri} (RFC 9286 §6.5; RFC 9286 §6.6)")]
HashMismatch { rsync_uri: String }, HashMismatch { rsync_uri: String },
#[error("issuer CA certificate bytes unavailable: {detail} (RFC 6487 §4; RFC 9286 §6.2)")]
IssuerCaLoadFailed { detail: String },
} }
impl ManifestFreshError { impl ManifestFreshError {

View File

@ -6,7 +6,7 @@ use crate::storage::RocksStore;
use crate::sync::rrdp::Fetcher as HttpFetcher; use crate::sync::rrdp::Fetcher as HttpFetcher;
use crate::validation::manifest::PublicationPointSource; use crate::validation::manifest::PublicationPointSource;
use crate::validation::objects::ObjectsOutput; use crate::validation::objects::ObjectsOutput;
use crate::validation::tree::{CaInstanceHandle, PublicationPointRunner}; use crate::validation::tree::{CaCertificateRef, CaInstanceHandle, PublicationPointRunner};
use crate::validation::tree_runner::Rpkiv1PublicationPointRunner; use crate::validation::tree_runner::Rpkiv1PublicationPointRunner;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Mutex; use std::sync::Mutex;
@ -49,7 +49,7 @@ pub fn run_publication_point_once(
depth: 0, depth: 0,
tal_id: "single-publication-point".to_string(), tal_id: "single-publication-point".to_string(),
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: issuer_ca_der.to_vec(), ca_certificate: CaCertificateRef::inline_der(issuer_ca_der.to_vec()),
ca_certificate_rsync_uri: issuer_ca_rsync_uri.map(str::to_string), ca_certificate_rsync_uri: issuer_ca_rsync_uri.map(str::to_string),
effective_ip_resources: issuer_effective_ip.cloned(), effective_ip_resources: issuer_effective_ip.cloned(),
effective_as_resources: issuer_effective_as.cloned(), effective_as_resources: issuer_effective_as.cloned(),
@ -80,6 +80,7 @@ pub fn run_publication_point_once(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
}; };

View File

@ -33,6 +33,7 @@ use crate::validation::from_tal::{
discover_root_ca_instance_from_tal_with_fetchers_strict_name, discover_root_ca_instance_from_tal_with_fetchers_strict_name,
}; };
use crate::validation::objects::ParallelRoaWorkerPool; use crate::validation::objects::ParallelRoaWorkerPool;
use crate::validation::tree::CaCertificateRef;
use crate::validation::tree::{ use crate::validation::tree::{
CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput,
run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root, run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root,
@ -136,6 +137,7 @@ fn make_live_runner<'a>(
ccr_accumulator: Option<CcrAccumulator>, ccr_accumulator: Option<CcrAccumulator>,
persist_vcir: bool, persist_vcir: bool,
enable_roa_validation_cache: bool, enable_roa_validation_cache: bool,
enable_child_certificate_validation_cache: bool,
publication_point_cache_observe_only: bool, publication_point_cache_observe_only: bool,
enable_publication_point_validation_cache: bool, enable_publication_point_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> { ) -> Rpkiv1PublicationPointRunner<'a> {
@ -163,6 +165,7 @@ fn make_live_runner<'a>(
ccr_accumulator: ccr_accumulator.map(Mutex::new), ccr_accumulator: ccr_accumulator.map(Mutex::new),
persist_vcir, persist_vcir,
enable_roa_validation_cache, enable_roa_validation_cache,
enable_child_certificate_validation_cache,
publication_point_cache_observe_only, publication_point_cache_observe_only,
enable_publication_point_validation_cache, enable_publication_point_validation_cache,
} }
@ -537,7 +540,7 @@ pub fn root_handle_from_trust_anchor(
depth: 0, depth: 0,
tal_id, tal_id,
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: trust_anchor.ta_certificate.raw_der.clone(), ca_certificate: CaCertificateRef::inline_der(trust_anchor.ta_certificate.raw_der.clone()),
ca_certificate_rsync_uri, ca_certificate_rsync_uri,
effective_ip_resources: ta_rc.tbs.extensions.ip_resources.clone(), effective_ip_resources: ta_rc.tbs.extensions.ip_resources.clone(),
effective_as_resources: ta_rc.tbs.extensions.as_resources.clone(), effective_as_resources: ta_rc.tbs.extensions.as_resources.clone(),
@ -578,6 +581,7 @@ pub fn run_tree_from_tal_url_serial(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.enable_child_certificate_validation_cache,
config.publication_point_cache_observe_only, config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache, config.enable_publication_point_validation_cache,
); );
@ -624,6 +628,7 @@ pub fn run_tree_from_tal_url_serial_audit(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.enable_child_certificate_validation_cache,
config.publication_point_cache_observe_only, config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache, config.enable_publication_point_validation_cache,
); );
@ -692,6 +697,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.enable_child_certificate_validation_cache,
config.publication_point_cache_observe_only, config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache, config.enable_publication_point_validation_cache,
); );
@ -782,6 +788,7 @@ where
.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])), .then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])),
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.enable_child_certificate_validation_cache,
config.publication_point_cache_observe_only, config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache, config.enable_publication_point_validation_cache,
); );
@ -910,6 +917,7 @@ where
}), }),
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.enable_child_certificate_validation_cache,
config.publication_point_cache_observe_only, config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache, config.enable_publication_point_validation_cache,
); );
@ -1350,6 +1358,7 @@ pub fn run_tree_from_tal_and_ta_der_serial(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1405,6 +1414,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1482,6 +1492,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1558,6 +1569,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1635,6 +1647,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1719,6 +1732,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1784,6 +1798,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1871,6 +1886,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
enable_child_certificate_validation_cache: config.enable_child_certificate_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only, publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache, enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
@ -1938,6 +1954,7 @@ fn build_payload_replay_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
} }
@ -1975,6 +1992,7 @@ fn build_payload_delta_replay_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
} }
@ -2012,6 +2030,7 @@ fn build_payload_delta_replay_current_store_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
} }
@ -2577,6 +2596,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2616,6 +2636,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2665,6 +2686,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2715,6 +2737,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2775,6 +2798,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2811,6 +2835,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2867,6 +2892,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -2932,6 +2958,7 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,

View File

@ -7,6 +7,8 @@ use crate::validation::objects::{
AspaAttestation, ObjectsOutput, RoaValidationCacheStats, RouterKeyPayload, Vrp, AspaAttestation, ObjectsOutput, RoaValidationCacheStats, RouterKeyPayload, Vrp,
}; };
use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::publication_point::PublicationPointSnapshot;
use std::borrow::Cow;
use std::sync::{Arc, OnceLock};
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct TreeRunConfig { pub struct TreeRunConfig {
@ -22,6 +24,8 @@ pub struct TreeRunConfig {
pub build_ccr_accumulator: bool, pub build_ccr_accumulator: bool,
/// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled. /// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled.
pub enable_roa_validation_cache: bool, pub enable_roa_validation_cache: bool,
/// Reuse validated child certificate discovery results when explicitly enabled.
pub enable_child_certificate_validation_cache: bool,
/// Evaluate publication-point cache eligibility without changing validation results. /// Evaluate publication-point cache eligibility without changing validation results.
pub publication_point_cache_observe_only: bool, pub publication_point_cache_observe_only: bool,
/// Reuse complete publication-point validation projections when explicitly enabled. /// Reuse complete publication-point validation projections when explicitly enabled.
@ -39,6 +43,7 @@ impl Default for TreeRunConfig {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -46,13 +51,85 @@ impl Default for TreeRunConfig {
} }
} }
#[derive(Clone, Debug)]
pub enum CaCertificateRef {
InlineDer(Vec<u8>),
RepoBytes {
sha256_hex: String,
cached_der: Arc<OnceLock<Arc<[u8]>>>,
},
}
impl PartialEq for CaCertificateRef {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::InlineDer(left), Self::InlineDer(right)) => left == right,
(
Self::RepoBytes {
sha256_hex: left, ..
},
Self::RepoBytes {
sha256_hex: right, ..
},
) => left == right,
_ => false,
}
}
}
impl Eq for CaCertificateRef {}
impl CaCertificateRef {
pub fn inline_der(bytes: Vec<u8>) -> Self {
Self::InlineDer(bytes)
}
pub fn repo_bytes(sha256_hex: String) -> Self {
Self::RepoBytes {
sha256_hex,
cached_der: Arc::new(OnceLock::new()),
}
}
pub fn sha256_hex(&self) -> Option<&str> {
match self {
Self::InlineDer(_) => None,
Self::RepoBytes { sha256_hex, .. } => Some(sha256_hex.as_str()),
}
}
pub fn der<'a>(&'a self, store: &crate::storage::RocksStore) -> Result<Cow<'a, [u8]>, String> {
match self {
Self::InlineDer(bytes) => Ok(Cow::Borrowed(bytes.as_slice())),
Self::RepoBytes {
sha256_hex,
cached_der,
} => {
if cached_der.get().is_none() {
let bytes = store
.get_blob_bytes(sha256_hex)
.map_err(|e| format!("load CA certificate bytes failed: {e}"))?
.ok_or_else(|| {
format!("missing CA certificate repo bytes for sha256={sha256_hex}")
})?;
let _ = cached_der.set(Arc::from(bytes));
}
let bytes = cached_der.get().ok_or_else(|| {
format!("missing cached CA certificate bytes for sha256={sha256_hex}")
})?;
Ok(Cow::Borrowed(bytes.as_ref()))
}
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct CaInstanceHandle { pub struct CaInstanceHandle {
pub depth: usize, pub depth: usize,
pub tal_id: String, pub tal_id: String,
pub parent_manifest_rsync_uri: Option<String>, pub parent_manifest_rsync_uri: Option<String>,
/// DER bytes of the CA certificate for this CA instance. /// CA certificate bytes or a lazy repo-bytes reference for this CA instance.
pub ca_certificate_der: Vec<u8>, pub ca_certificate: CaCertificateRef,
/// rsync URI of this CA certificate object (where it is published). /// rsync URI of this CA certificate object (where it is published).
/// ///
/// This is used for strict AIA binding checks (RFC 6487 §4.8.7) when validating /// This is used for strict AIA binding checks (RFC 6487 §4.8.7) when validating
@ -74,6 +151,32 @@ impl CaInstanceHandle {
self.depth = depth; self.depth = depth;
self self
} }
pub fn ca_certificate_der<'a>(
&'a self,
store: &crate::storage::RocksStore,
) -> Result<Cow<'a, [u8]>, String> {
self.ca_certificate.der(store)
}
pub fn ca_certificate_sha256_hex(&self) -> Option<&str> {
self.ca_certificate.sha256_hex()
}
pub fn ca_certificate_sha256_32(&self) -> Option<[u8; 32]> {
match &self.ca_certificate {
CaCertificateRef::InlineDer(bytes) => {
use sha2::Digest as _;
let digest = sha2::Sha256::digest(bytes);
Some(digest.into())
}
CaCertificateRef::RepoBytes { sha256_hex, .. } => {
let mut out = [0u8; 32];
hex::decode_to_slice(sha256_hex, &mut out).ok()?;
Some(out)
}
}
}
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -337,7 +440,7 @@ mod tests {
use crate::validation::objects::{ObjectsOutput, ObjectsStats}; use crate::validation::objects::{ObjectsOutput, ObjectsStats};
use super::{ use super::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult,
PublicationPointRunner, TreeRunConfig, run_tree_serial_audit, PublicationPointRunner, TreeRunConfig, run_tree_serial_audit,
run_tree_serial_audit_multi_root, run_tree_serial_audit_multi_root,
}; };
@ -347,7 +450,7 @@ mod tests {
depth: 0, depth: 0,
tal_id: "arin".to_string(), tal_id: "arin".to_string(),
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: vec![1, 2, 3], ca_certificate: CaCertificateRef::inline_der(vec![1, 2, 3]),
ca_certificate_rsync_uri: None, ca_certificate_rsync_uri: None,
effective_ip_resources: None, effective_ip_resources: None,
effective_as_resources: None, effective_as_resources: None,

View File

@ -994,6 +994,11 @@ fn stage_ready_publication_point(
result.discovered_children.clone(), result.discovered_children.clone(),
); );
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started); metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
runner.record_publication_point_step_ms(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
"publication_point_cache_child_enqueue",
metrics.child_enqueue_ms,
);
finished.push(FinishedPublicationPoint { finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(ready.node), node: FinishedPublicationPointNode::from_queued(ready.node),
result: compact_phase2_finished_result(result, compact_audit), result: compact_phase2_finished_result(result, compact_audit),
@ -1182,7 +1187,7 @@ fn stage_ready_publication_point(
ready.node.id, ready.node.id,
&fresh_stage.fresh_point, &fresh_stage.fresh_point,
runner.policy, runner.policy,
&ready.node.handle.ca_certificate_der, fresh_stage.issuer_ca_der.as_ref(),
ready.node.handle.ca_certificate_rsync_uri.as_deref(), ready.node.handle.ca_certificate_rsync_uri.as_deref(),
ready.node.handle.effective_ip_resources.as_ref(), ready.node.handle.effective_ip_resources.as_ref(),
ready.node.handle.effective_as_resources.as_ref(), ready.node.handle.effective_as_resources.as_ref(),
@ -2420,7 +2425,7 @@ mod tests {
.push(crate::validation::tree::DiscoveredChildCaInstance { .push(crate::validation::tree::DiscoveredChildCaInstance {
handle: crate::validation::tree::CaInstanceHandle { handle: crate::validation::tree::CaInstanceHandle {
tal_id: "test".to_string(), tal_id: "test".to_string(),
ca_certificate_der: vec![1], ca_certificate: crate::validation::tree::CaCertificateRef::inline_der(vec![1]),
ca_certificate_rsync_uri: Some( ca_certificate_rsync_uri: Some(
"rsync://example.test/repo/child.cer".to_string(), "rsync://example.test/repo/child.cer".to_string(),
), ),

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -190,6 +190,7 @@ fn apnic_tree_full_stats_serial() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
}; };
@ -224,6 +225,7 @@ fn apnic_tree_full_stats_serial() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,

View File

@ -40,6 +40,7 @@ fn apnic_tree_depth1_processes_more_than_root() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -85,6 +86,7 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,

View File

@ -5,8 +5,8 @@ use rpki::validation::manifest::PublicationPointSource;
use rpki::validation::objects::process_publication_point_snapshot_for_issuer; use rpki::validation::objects::process_publication_point_snapshot_for_issuer;
use rpki::validation::publication_point::PublicationPointSnapshot; use rpki::validation::publication_point::PublicationPointSnapshot;
use rpki::validation::tree::{ use rpki::validation::tree::{
CaInstanceHandle, PublicationPointRunResult, PublicationPointRunner, TreeRunConfig, CaCertificateRef, CaInstanceHandle, PublicationPointRunResult, PublicationPointRunner,
run_tree_serial_audit, TreeRunConfig, run_tree_serial_audit,
}; };
fn fixture_bytes(path: &str) -> Vec<u8> { fn fixture_bytes(path: &str) -> Vec<u8> {
@ -46,7 +46,10 @@ impl PublicationPointRunner for SinglePackRunner {
let objects = process_publication_point_snapshot_for_issuer( let objects = process_publication_point_snapshot_for_issuer(
&self.snapshot, &self.snapshot,
&self.policy, &self.policy,
&ca.ca_certificate_der, match &ca.ca_certificate {
CaCertificateRef::InlineDer(bytes) => bytes.as_slice(),
other => panic!("test runner expects inline CA DER, got {other:?}"),
},
ca.ca_certificate_rsync_uri.as_deref(), ca.ca_certificate_rsync_uri.as_deref(),
ca.effective_ip_resources.as_ref(), ca.effective_ip_resources.as_ref(),
ca.effective_as_resources.as_ref(), ca.effective_as_resources.as_ref(),
@ -92,7 +95,9 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() {
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
// Use a real, parseable CA certificate DER so objects processing can reach CRL selection. // Use a real, parseable CA certificate DER so objects processing can reach CRL selection.
// The test only asserts CRLDP/locked-pack error handling, not signature chaining. // The test only asserts CRLDP/locked-pack error handling, not signature chaining.
ca_certificate_der: fixture_bytes("tests/fixtures/ta/apnic-ta.cer"), ca_certificate: CaCertificateRef::inline_der(fixture_bytes(
"tests/fixtures/ta/apnic-ta.cer",
)),
ca_certificate_rsync_uri: None, ca_certificate_rsync_uri: None,
effective_ip_resources: None, effective_ip_resources: None,
effective_as_resources: None, effective_as_resources: None,
@ -114,6 +119,7 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,

View File

@ -6,7 +6,7 @@ use rpki::validation::run_tree_from_tal::{
run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial, run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial,
run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing, run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing,
}; };
use rpki::validation::tree::TreeRunConfig; use rpki::validation::tree::{CaCertificateRef, TreeRunConfig};
use std::collections::HashMap; use std::collections::HashMap;
@ -71,10 +71,12 @@ fn root_handle_is_constructible_from_fixture_tal_and_ta() {
discovery.ca_instance.manifest_rsync_uri discovery.ca_instance.manifest_rsync_uri
); );
assert_eq!(root.rsync_base_uri, discovery.ca_instance.rsync_base_uri); assert_eq!(root.rsync_base_uri, discovery.ca_instance.rsync_base_uri);
assert!( match &root.ca_certificate {
root.ca_certificate_der.len() > 100, CaCertificateRef::InlineDer(bytes) => {
"TA der should be non-empty" assert!(bytes.len() > 100, "TA der should be non-empty");
); }
other => panic!("TA root certificate should be inline DER, got {other:?}"),
}
} }
#[test] #[test]
@ -118,6 +120,7 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -169,6 +172,7 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -228,6 +232,7 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -275,6 +280,7 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -330,6 +336,7 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -384,6 +391,7 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,

View File

@ -7,8 +7,8 @@ use rpki::validation::manifest::PublicationPointSource;
use rpki::validation::objects::{ObjectsOutput, ObjectsStats}; use rpki::validation::objects::{ObjectsOutput, ObjectsStats};
use rpki::validation::publication_point::PublicationPointSnapshot; use rpki::validation::publication_point::PublicationPointSnapshot;
use rpki::validation::tree::{ use rpki::validation::tree::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner, CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult,
TreeRunConfig, run_tree_serial, PublicationPointRunner, TreeRunConfig, run_tree_serial,
}; };
fn empty_snapshot(manifest_uri: &str, pp_uri: &str) -> PublicationPointSnapshot { fn empty_snapshot(manifest_uri: &str, pp_uri: &str) -> PublicationPointSnapshot {
@ -36,7 +36,7 @@ fn ca_handle(manifest_uri: &str) -> CaInstanceHandle {
depth: 0, depth: 0,
tal_id: "test-tal".to_string(), tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: Vec::new(), ca_certificate: CaCertificateRef::inline_der(Vec::new()),
ca_certificate_rsync_uri: None, ca_certificate_rsync_uri: None,
effective_ip_resources: None, effective_ip_resources: None,
effective_as_resources: None, effective_as_resources: None,

View File

@ -7,8 +7,8 @@ use rpki::validation::manifest::PublicationPointSource;
use rpki::validation::objects::{ObjectsOutput, ObjectsStats, RouterKeyPayload}; use rpki::validation::objects::{ObjectsOutput, ObjectsStats, RouterKeyPayload};
use rpki::validation::publication_point::PublicationPointSnapshot; use rpki::validation::publication_point::PublicationPointSnapshot;
use rpki::validation::tree::{ use rpki::validation::tree::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult, PublicationPointRunner, CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult,
TreeRunConfig, run_tree_serial, run_tree_serial_audit, PublicationPointRunner, TreeRunConfig, run_tree_serial, run_tree_serial_audit,
}; };
#[derive(Default)] #[derive(Default)]
@ -69,7 +69,7 @@ fn ca_handle(manifest_uri: &str) -> CaInstanceHandle {
depth: 0, depth: 0,
tal_id: "test-tal".to_string(), tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None, parent_manifest_rsync_uri: None,
ca_certificate_der: Vec::new(), ca_certificate: CaCertificateRef::inline_der(Vec::new()),
ca_certificate_rsync_uri: None, ca_certificate_rsync_uri: None,
effective_ip_resources: None, effective_ip_resources: None,
effective_as_resources: None, effective_as_resources: None,
@ -314,6 +314,7 @@ fn tree_respects_max_depth_and_max_instances() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
@ -333,6 +334,7 @@ fn tree_respects_max_depth_and_max_instances() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
enable_child_certificate_validation_cache: false,
publication_point_cache_observe_only: false, publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false, enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,