20260617 发布点级验证缓存复用

This commit is contained in:
yuyr 2026-06-17 23:49:53 +08:00
parent 68006467f7
commit ad2a25aede
18 changed files with 1841 additions and 14 deletions

View File

@ -50,6 +50,8 @@ use std::sync::Arc;
struct RunStageTiming { struct RunStageTiming {
validation_ms: u64, validation_ms: u64,
enable_roa_validation_cache: bool, enable_roa_validation_cache: bool,
publication_point_cache_observe_only: bool,
enable_publication_point_validation_cache: bool,
enable_transport_request_prefetch: bool, enable_transport_request_prefetch: bool,
report_build_ms: u64, report_build_ms: u64,
report_write_ms: Option<u64>, report_write_ms: Option<u64>,
@ -124,6 +126,8 @@ pub struct CliArgs {
pub skip_report_build: bool, pub skip_report_build: bool,
pub skip_vcir_persist: bool, pub skip_vcir_persist: bool,
pub enable_roa_validation_cache: bool, pub enable_roa_validation_cache: bool,
pub publication_point_cache_observe_only: bool,
pub enable_publication_point_validation_cache: bool,
pub enable_transport_request_prefetch: bool, pub enable_transport_request_prefetch: bool,
pub ccr_out_path: Option<PathBuf>, pub ccr_out_path: Option<PathBuf>,
pub vrps_csv_out_path: Option<PathBuf>, pub vrps_csv_out_path: Option<PathBuf>,
@ -181,6 +185,10 @@ Options:
--skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs --skip-vcir-persist Skip VCIR persistence/projection building for compare-only runs
--enable-roa-validation-cache --enable-roa-validation-cache
Reuse accepted ROA validation outputs from previous VCIR records (default: off) Reuse accepted ROA validation outputs from previous VCIR records (default: off)
--publication-point-cache-observe-only
Evaluate publication-point cache eligibility without changing results
--enable-publication-point-validation-cache
Experimental: reuse complete publication-point validation projections
--enable-transport-request-prefetch --enable-transport-request-prefetch
Experimental: prefetch previous run transport repo requests before tree traversal Experimental: prefetch previous run transport repo requests before tree traversal
--ccr-out <path> Write CCR DER ContentInfo to this path (optional) --ccr-out <path> Write CCR DER ContentInfo to this path (optional)
@ -263,6 +271,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut skip_report_build: bool = false; let mut skip_report_build: bool = false;
let mut skip_vcir_persist: bool = false; let mut skip_vcir_persist: bool = false;
let mut enable_roa_validation_cache: bool = false; let mut enable_roa_validation_cache: bool = false;
let mut publication_point_cache_observe_only: bool = false;
let mut enable_publication_point_validation_cache: bool = false;
let mut enable_transport_request_prefetch: bool = false; let mut enable_transport_request_prefetch: bool = false;
let mut ccr_out_path: Option<PathBuf> = None; let mut ccr_out_path: Option<PathBuf> = None;
let mut vrps_csv_out_path: Option<PathBuf> = None; let mut vrps_csv_out_path: Option<PathBuf> = None;
@ -469,6 +479,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--enable-roa-validation-cache" => { "--enable-roa-validation-cache" => {
enable_roa_validation_cache = true; enable_roa_validation_cache = true;
} }
"--publication-point-cache-observe-only" => {
publication_point_cache_observe_only = true;
}
"--enable-publication-point-validation-cache" => {
enable_publication_point_validation_cache = true;
}
"--enable-transport-request-prefetch" => { "--enable-transport-request-prefetch" => {
enable_transport_request_prefetch = true; enable_transport_request_prefetch = true;
} }
@ -902,6 +918,8 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
skip_report_build, skip_report_build,
skip_vcir_persist, skip_vcir_persist,
enable_roa_validation_cache, enable_roa_validation_cache,
publication_point_cache_observe_only,
enable_publication_point_validation_cache,
enable_transport_request_prefetch, enable_transport_request_prefetch,
ccr_out_path, ccr_out_path,
vrps_csv_out_path, vrps_csv_out_path,
@ -1967,6 +1985,8 @@ pub fn run(argv: &[String]) -> Result<(), String> {
persist_vcir: !args.skip_vcir_persist, persist_vcir: !args.skip_vcir_persist,
build_ccr_accumulator: args.ccr_out_path.is_some(), build_ccr_accumulator: args.ccr_out_path.is_some(),
enable_roa_validation_cache: args.enable_roa_validation_cache, enable_roa_validation_cache: args.enable_roa_validation_cache,
publication_point_cache_observe_only: args.publication_point_cache_observe_only,
enable_publication_point_validation_cache: args.enable_publication_point_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch, enable_transport_request_prefetch: args.enable_transport_request_prefetch,
}; };
let replay_mode = args.payload_replay_archive.is_some(); let replay_mode = args.payload_replay_archive.is_some();
@ -2437,6 +2457,8 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms, validation_ms,
enable_roa_validation_cache: args.enable_roa_validation_cache, enable_roa_validation_cache: args.enable_roa_validation_cache,
publication_point_cache_observe_only: args.publication_point_cache_observe_only,
enable_publication_point_validation_cache: args.enable_publication_point_validation_cache,
enable_transport_request_prefetch: args.enable_transport_request_prefetch, enable_transport_request_prefetch: args.enable_transport_request_prefetch,
report_build_ms, report_build_ms,
report_write_ms, report_write_ms,

View File

@ -19,6 +19,14 @@ fn parse_help_returns_usage() {
assert!(err.contains("--parallel-phase2-object-workers"), "{err}"); assert!(err.contains("--parallel-phase2-object-workers"), "{err}");
assert!(err.contains("--memory-trim-after-validation"), "{err}"); assert!(err.contains("--memory-trim-after-validation"), "{err}");
assert!(err.contains("--enable-roa-validation-cache"), "{err}"); assert!(err.contains("--enable-roa-validation-cache"), "{err}");
assert!(
err.contains("--publication-point-cache-observe-only"),
"{err}"
);
assert!(
err.contains("--enable-publication-point-validation-cache"),
"{err}"
);
assert!(!err.contains("--parallel-phase1"), "{err}"); assert!(!err.contains("--parallel-phase1"), "{err}");
assert!(!err.contains("--parallel-phase2 "), "{err}"); assert!(!err.contains("--parallel-phase2 "), "{err}");
} }
@ -139,6 +147,24 @@ fn parse_accepts_enable_roa_validation_cache() {
assert!(args.enable_roa_validation_cache); assert!(args.enable_roa_validation_cache);
} }
#[test]
fn parse_accepts_publication_point_cache_flags() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-path".to_string(),
"x.tal".to_string(),
"--ta-path".to_string(),
"x.cer".to_string(),
"--publication-point-cache-observe-only".to_string(),
"--enable-publication-point-validation-cache".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.publication_point_cache_observe_only);
assert!(args.enable_publication_point_validation_cache);
}
#[test] #[test]
fn parse_accepts_enable_transport_request_prefetch() { fn parse_accepts_enable_transport_request_prefetch() {
let argv = vec![ let argv = vec![
@ -168,6 +194,8 @@ fn parse_disables_roa_validation_cache_by_default() {
]; ];
let args = parse_args(&argv).expect("parse args"); let args = parse_args(&argv).expect("parse args");
assert!(!args.enable_roa_validation_cache); assert!(!args.enable_roa_validation_cache);
assert!(!args.publication_point_cache_observe_only);
assert!(!args.enable_publication_point_validation_cache);
assert!(!args.enable_transport_request_prefetch); assert!(!args.enable_transport_request_prefetch);
} }
@ -1574,6 +1602,8 @@ fn run_report_task_and_stage_timing_work() {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms: 1, validation_ms: 1,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
report_build_ms: report_output.report_build_ms, report_build_ms: report_output.report_build_ms,
report_write_ms: report_output.report_write_ms, report_write_ms: report_output.report_write_ms,
@ -1671,6 +1701,8 @@ fn stage_timing_serializes_memory_telemetry() {
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms: 1, validation_ms: 1,
enable_roa_validation_cache: true, enable_roa_validation_cache: true,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: true, enable_transport_request_prefetch: true,
report_build_ms: 2, report_build_ms: 2,
report_write_ms: None, report_write_ms: None,
@ -1750,6 +1782,8 @@ fn stage_timing_serializes_memory_telemetry() {
); );
assert_eq!(value["analysis_counts"]["roa_validation_cache_hit_roas"], 2); assert_eq!(value["analysis_counts"]["roa_validation_cache_hit_roas"], 2);
assert_eq!(value["roa_validation_cache"]["hit_roas"], 2); assert_eq!(value["roa_validation_cache"]["hit_roas"], 2);
assert_eq!(value["publication_point_cache_observe_only"], false);
assert_eq!(value["enable_publication_point_validation_cache"], false);
assert!( assert!(
value["memory_telemetry"] value["memory_telemetry"]
.as_object() .as_object()

View File

@ -14,9 +14,10 @@ use crate::data_model::rc::{AsResourceSet, IpResourceSet};
use config::*; use config::*;
pub use config::{ pub use config::{
ALL_COLUMN_FAMILY_NAMES, CF_MANIFEST_REPLAY_META, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, ALL_COLUMN_FAMILY_NAMES, CF_MANIFEST_REPLAY_META, CF_PUBLICATION_POINT_CACHE_PROJECTION,
CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW, CF_ROA_CACHE_PROJECTION, CF_RRDP_SOURCE,
CF_TRANSPORT_PREFETCH, CF_VCIR, column_family_descriptors, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_TRANSPORT_PREFETCH, CF_VCIR,
column_family_descriptors,
}; };
use keys::*; use keys::*;
use pack::compute_sha256_32; use pack::compute_sha256_32;
@ -970,6 +971,448 @@ pub struct RoaCacheProjection {
pub entries: Vec<RoaCacheObjectProjection>, pub entries: Vec<RoaCacheObjectProjection>,
} }
pub const PUBLICATION_POINT_CACHE_SCHEMA_VERSION: u32 = 1;
pub const PUBLICATION_POINT_CACHE_ALGORITHM_VERSION: u32 = 1;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PublicationPointCacheOutput {
#[serde(rename = "t")]
pub output_type: VcirOutputType,
#[serde(rename = "nb")]
pub item_effective_not_before: PackTime,
#[serde(rename = "nu")]
pub item_effective_until: PackTime,
#[serde(rename = "u")]
pub source_object_uri: String,
#[serde(rename = "k")]
pub source_object_type: VcirSourceObjectType,
#[serde(rename = "h")]
#[serde(with = "serde_bytes_32")]
pub source_object_hash: [u8; 32],
#[serde(rename = "c")]
#[serde(with = "serde_bytes_32")]
pub source_ee_cert_hash: [u8; 32],
#[serde(rename = "p")]
pub payload: VcirLocalOutputPayload,
#[serde(rename = "r")]
#[serde(with = "serde_bytes_32")]
pub rule_hash: [u8; 32],
}
impl PublicationPointCacheOutput {
fn from_local_output(output: &VcirLocalOutput, default_not_before: &PackTime) -> Self {
Self {
output_type: output.output_type,
item_effective_not_before: default_not_before.clone(),
item_effective_until: output.item_effective_until.clone(),
source_object_uri: output.source_object_uri.clone(),
source_object_type: output.source_object_type,
source_object_hash: output.source_object_hash,
source_ee_cert_hash: output.source_ee_cert_hash,
payload: output.payload.clone(),
rule_hash: output.rule_hash,
}
}
pub fn to_local_output(&self) -> VcirLocalOutput {
VcirLocalOutput {
output_type: self.output_type,
item_effective_until: self.item_effective_until.clone(),
source_object_uri: self.source_object_uri.clone(),
source_object_type: self.source_object_type,
source_object_hash: self.source_object_hash,
source_ee_cert_hash: self.source_ee_cert_hash,
payload: self.payload.clone(),
rule_hash: self.rule_hash,
}
}
pub fn validate_internal(&self) -> StorageResult<()> {
parse_time(
"publication_point_cache_projection.outputs[].item_effective_not_before",
&self.item_effective_not_before,
)?;
parse_time(
"publication_point_cache_projection.outputs[].item_effective_until",
&self.item_effective_until,
)?;
validate_non_empty(
"publication_point_cache_projection.outputs[].source_object_uri",
&self.source_object_uri,
)?;
VcirLocalOutput {
output_type: self.output_type,
item_effective_until: self.item_effective_until.clone(),
source_object_uri: self.source_object_uri.clone(),
source_object_type: self.source_object_type,
source_object_hash: self.source_object_hash,
source_ee_cert_hash: self.source_ee_cert_hash,
payload: self.payload.clone(),
rule_hash: self.rule_hash,
}
.validate_internal()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PublicationPointCacheChild {
#[serde(rename = "cm")]
pub child_manifest_rsync_uri: String,
#[serde(rename = "cu")]
pub child_cert_rsync_uri: String,
#[serde(rename = "ch")]
pub child_cert_hash: String,
#[serde(rename = "ski")]
pub child_ski: String,
#[serde(rename = "rb")]
pub child_rsync_base_uri: String,
#[serde(rename = "pp")]
pub child_publication_point_rsync_uri: String,
#[serde(rename = "rn")]
pub child_rrdp_notification_uri: Option<String>,
#[serde(rename = "ip")]
pub child_effective_ip_resources: Option<IpResourceSet>,
#[serde(rename = "as")]
pub child_effective_as_resources: Option<AsResourceSet>,
#[serde(rename = "nb")]
pub child_effective_not_before: PackTime,
#[serde(rename = "nu")]
pub child_effective_until: PackTime,
}
impl PublicationPointCacheChild {
fn from_child_entry(
entry: &VcirChildEntry,
default_not_before: &PackTime,
default_until: &PackTime,
) -> Self {
Self {
child_manifest_rsync_uri: entry.child_manifest_rsync_uri.clone(),
child_cert_rsync_uri: entry.child_cert_rsync_uri.clone(),
child_cert_hash: entry.child_cert_hash.clone(),
child_ski: entry.child_ski.clone(),
child_rsync_base_uri: entry.child_rsync_base_uri.clone(),
child_publication_point_rsync_uri: entry.child_publication_point_rsync_uri.clone(),
child_rrdp_notification_uri: entry.child_rrdp_notification_uri.clone(),
child_effective_ip_resources: entry.child_effective_ip_resources.clone(),
child_effective_as_resources: entry.child_effective_as_resources.clone(),
child_effective_not_before: default_not_before.clone(),
child_effective_until: default_until.clone(),
}
}
pub fn to_child_entry(&self, accepted_at_validation_time: PackTime) -> VcirChildEntry {
VcirChildEntry {
child_manifest_rsync_uri: self.child_manifest_rsync_uri.clone(),
child_cert_rsync_uri: self.child_cert_rsync_uri.clone(),
child_cert_hash: self.child_cert_hash.clone(),
child_ski: self.child_ski.clone(),
child_rsync_base_uri: self.child_rsync_base_uri.clone(),
child_publication_point_rsync_uri: self.child_publication_point_rsync_uri.clone(),
child_rrdp_notification_uri: self.child_rrdp_notification_uri.clone(),
child_effective_ip_resources: self.child_effective_ip_resources.clone(),
child_effective_as_resources: self.child_effective_as_resources.clone(),
accepted_at_validation_time,
}
}
pub fn validate_internal(&self) -> StorageResult<()> {
self.to_child_entry(self.child_effective_not_before.clone())
.validate_internal()?;
parse_time(
"publication_point_cache_projection.children[].child_effective_not_before",
&self.child_effective_not_before,
)?;
parse_time(
"publication_point_cache_projection.children[].child_effective_until",
&self.child_effective_until,
)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PublicationPointCacheObject {
#[serde(rename = "r")]
pub artifact_role: VcirArtifactRole,
#[serde(rename = "k")]
pub artifact_kind: VcirArtifactKind,
#[serde(rename = "u")]
pub uri: Option<String>,
#[serde(rename = "h")]
pub sha256: String,
#[serde(rename = "t")]
pub object_type: Option<String>,
#[serde(rename = "s")]
pub validation_status: VcirArtifactValidationStatus,
}
impl PublicationPointCacheObject {
fn from_related_artifact(artifact: &VcirRelatedArtifact) -> Self {
Self {
artifact_role: artifact.artifact_role,
artifact_kind: artifact.artifact_kind,
uri: artifact.uri.clone(),
sha256: artifact.sha256.clone(),
object_type: artifact.object_type.clone(),
validation_status: artifact.validation_status,
}
}
pub fn to_related_artifact(&self) -> VcirRelatedArtifact {
VcirRelatedArtifact {
artifact_role: self.artifact_role,
artifact_kind: self.artifact_kind,
uri: self.uri.clone(),
sha256: self.sha256.clone(),
object_type: self.object_type.clone(),
validation_status: self.validation_status,
}
}
pub fn validate_internal(&self) -> StorageResult<()> {
self.to_related_artifact().validate_internal()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PublicationPointCacheProjection {
#[serde(rename = "sv")]
pub schema_version: u32,
#[serde(rename = "av")]
pub algorithm_version: u32,
#[serde(rename = "m")]
pub manifest_rsync_uri: String,
#[serde(rename = "pp")]
pub publication_point_rsync_uri: String,
#[serde(rename = "cu")]
pub ca_cert_uri: Option<String>,
#[serde(rename = "ch")]
#[serde(with = "serde_bytes_32")]
pub ca_cert_sha256: [u8; 32],
#[serde(rename = "mh")]
#[serde(with = "serde_bytes_32")]
pub manifest_sha256: [u8; 32],
#[serde(rename = "tal")]
pub tal_id: String,
#[serde(rename = "ta")]
#[serde(with = "serde_bytes_32")]
pub ta_context_digest: [u8; 32],
#[serde(rename = "pc")]
#[serde(with = "serde_bytes_32")]
pub parent_context_digest: [u8; 32],
#[serde(rename = "pf")]
#[serde(with = "serde_bytes_32")]
pub validation_policy_fingerprint: [u8; 32],
#[serde(rename = "nb")]
pub instance_effective_not_before: PackTime,
#[serde(rename = "nu")]
pub instance_effective_until: PackTime,
#[serde(rename = "mt")]
pub manifest_this_update: PackTime,
#[serde(rename = "mn")]
pub manifest_next_update: PackTime,
#[serde(rename = "cn")]
pub current_crl_next_update: PackTime,
#[serde(rename = "ca")]
pub self_ca_not_after: PackTime,
#[serde(rename = "ccr")]
pub ccr_manifest_projection: VcirCcrManifestProjection,
#[serde(rename = "o")]
pub outputs: Vec<PublicationPointCacheOutput>,
#[serde(rename = "c")]
pub children: Vec<PublicationPointCacheChild>,
#[serde(rename = "ra")]
pub related_objects: Vec<PublicationPointCacheObject>,
#[serde(rename = "s")]
pub summary: VcirSummary,
}
impl PublicationPointCacheProjection {
pub fn from_vcir_with_context(
vcir: &ValidatedCaInstanceResult,
publication_point_rsync_uri: String,
ca_cert_uri: Option<String>,
ca_cert_sha256: [u8; 32],
manifest_sha256: [u8; 32],
ta_context_digest: [u8; 32],
parent_context_digest: [u8; 32],
validation_policy_fingerprint: [u8; 32],
) -> StorageResult<Self> {
let projection = Self {
schema_version: PUBLICATION_POINT_CACHE_SCHEMA_VERSION,
algorithm_version: PUBLICATION_POINT_CACHE_ALGORITHM_VERSION,
manifest_rsync_uri: vcir.manifest_rsync_uri.clone(),
publication_point_rsync_uri,
ca_cert_uri,
ca_cert_sha256,
manifest_sha256,
tal_id: vcir.tal_id.clone(),
ta_context_digest,
parent_context_digest,
validation_policy_fingerprint,
instance_effective_not_before: vcir.last_successful_validation_time.clone(),
instance_effective_until: vcir.instance_gate.instance_effective_until.clone(),
manifest_this_update: vcir
.validated_manifest_meta
.validated_manifest_this_update
.clone(),
manifest_next_update: vcir.instance_gate.manifest_next_update.clone(),
current_crl_next_update: vcir.instance_gate.current_crl_next_update.clone(),
self_ca_not_after: vcir.instance_gate.self_ca_not_after.clone(),
ccr_manifest_projection: vcir.ccr_manifest_projection.clone(),
outputs: vcir
.local_outputs
.iter()
.map(|output| {
PublicationPointCacheOutput::from_local_output(
output,
&vcir.last_successful_validation_time,
)
})
.collect(),
children: vcir
.child_entries
.iter()
.map(|child| {
PublicationPointCacheChild::from_child_entry(
child,
&vcir.last_successful_validation_time,
&vcir.instance_gate.instance_effective_until,
)
})
.collect(),
related_objects: vcir
.related_artifacts
.iter()
.map(PublicationPointCacheObject::from_related_artifact)
.collect(),
summary: vcir.summary.clone(),
};
projection.validate_internal()?;
Ok(projection)
}
pub fn to_vcir_for_reuse(
&self,
validation_time: time::OffsetDateTime,
) -> ValidatedCaInstanceResult {
let validation_time = PackTime::from_utc_offset_datetime(validation_time);
ValidatedCaInstanceResult {
manifest_rsync_uri: self.manifest_rsync_uri.clone(),
parent_manifest_rsync_uri: None,
tal_id: self.tal_id.clone(),
ca_subject_name: String::from("publication-point-cache"),
ca_ski: hex::encode(self.ca_cert_sha256),
issuer_ski: hex::encode(self.parent_context_digest),
last_successful_validation_time: validation_time.clone(),
current_manifest_rsync_uri: self.manifest_rsync_uri.clone(),
current_crl_rsync_uri: self
.related_objects
.iter()
.find(|object| object.artifact_role == VcirArtifactRole::CurrentCrl)
.and_then(|object| object.uri.clone())
.unwrap_or_default(),
validated_manifest_meta: ValidatedManifestMeta {
validated_manifest_number: self.ccr_manifest_projection.manifest_number_be.clone(),
validated_manifest_this_update: self.manifest_this_update.clone(),
validated_manifest_next_update: self.manifest_next_update.clone(),
},
ccr_manifest_projection: self.ccr_manifest_projection.clone(),
instance_gate: VcirInstanceGate {
manifest_next_update: self.manifest_next_update.clone(),
current_crl_next_update: self.current_crl_next_update.clone(),
self_ca_not_after: self.self_ca_not_after.clone(),
instance_effective_until: self.instance_effective_until.clone(),
},
child_entries: self
.children
.iter()
.map(|child| child.to_child_entry(validation_time.clone()))
.collect(),
local_outputs: self
.outputs
.iter()
.map(PublicationPointCacheOutput::to_local_output)
.collect(),
related_artifacts: self
.related_objects
.iter()
.map(PublicationPointCacheObject::to_related_artifact)
.collect(),
summary: self.summary.clone(),
audit_summary: VcirAuditSummary {
failed_fetch_eligible: false,
last_failed_fetch_reason: None,
warning_count: 0,
audit_flags: vec!["publication_point_cache_projection".to_string()],
},
}
}
pub fn validate_internal(&self) -> StorageResult<()> {
if self.schema_version != PUBLICATION_POINT_CACHE_SCHEMA_VERSION {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_projection.schema_version",
detail: format!("unsupported schema_version {}", self.schema_version),
});
}
if self.algorithm_version != PUBLICATION_POINT_CACHE_ALGORITHM_VERSION {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_projection.algorithm_version",
detail: format!("unsupported algorithm_version {}", self.algorithm_version),
});
}
validate_non_empty(
"publication_point_cache_projection.manifest_rsync_uri",
&self.manifest_rsync_uri,
)?;
validate_non_empty(
"publication_point_cache_projection.publication_point_rsync_uri",
&self.publication_point_rsync_uri,
)?;
if let Some(uri) = &self.ca_cert_uri {
validate_non_empty("publication_point_cache_projection.ca_cert_uri", uri)?;
}
validate_non_empty("publication_point_cache_projection.tal_id", &self.tal_id)?;
parse_time(
"publication_point_cache_projection.instance_effective_not_before",
&self.instance_effective_not_before,
)?;
parse_time(
"publication_point_cache_projection.instance_effective_until",
&self.instance_effective_until,
)?;
parse_time(
"publication_point_cache_projection.manifest_this_update",
&self.manifest_this_update,
)?;
parse_time(
"publication_point_cache_projection.manifest_next_update",
&self.manifest_next_update,
)?;
parse_time(
"publication_point_cache_projection.current_crl_next_update",
&self.current_crl_next_update,
)?;
parse_time(
"publication_point_cache_projection.self_ca_not_after",
&self.self_ca_not_after,
)?;
self.ccr_manifest_projection.validate_internal()?;
for output in &self.outputs {
output.validate_internal()?;
}
for child in &self.children {
child.validate_internal()?;
}
for object in &self.related_objects {
object.validate_internal()?;
}
Ok(())
}
}
impl RoaCacheProjection { impl RoaCacheProjection {
pub fn from_vcir(vcir: &ValidatedCaInstanceResult) -> StorageResult<Option<Self>> { pub fn from_vcir(vcir: &ValidatedCaInstanceResult) -> StorageResult<Option<Self>> {
let mut issuer_ca_sha256_hex = None; let mut issuer_ca_sha256_hex = None;
@ -1347,6 +1790,8 @@ pub struct VcirReplaceTimingBreakdown {
pub replay_meta_value_bytes: u64, pub replay_meta_value_bytes: u64,
pub roa_cache_projection_encode_ms: u64, pub roa_cache_projection_encode_ms: u64,
pub roa_cache_projection_value_bytes: u64, pub roa_cache_projection_value_bytes: u64,
pub publication_point_cache_projection_encode_ms: u64,
pub publication_point_cache_projection_value_bytes: u64,
pub batch_build_ms: u64, pub batch_build_ms: u64,
pub write_batch_ms: u64, pub write_batch_ms: u64,
pub total_encoded_bytes: u64, pub total_encoded_bytes: u64,
@ -1356,6 +1801,7 @@ pub struct VcirReplaceTimingBreakdown {
pub rss_after_vcir_encode_kb: Option<u64>, pub rss_after_vcir_encode_kb: Option<u64>,
pub rss_after_replay_meta_encode_kb: Option<u64>, pub rss_after_replay_meta_encode_kb: Option<u64>,
pub rss_after_roa_cache_projection_encode_kb: Option<u64>, pub rss_after_roa_cache_projection_encode_kb: Option<u64>,
pub rss_after_publication_point_cache_projection_encode_kb: Option<u64>,
pub rss_after_write_batch_kb: Option<u64>, pub rss_after_write_batch_kb: Option<u64>,
} }
@ -1828,6 +2274,25 @@ fn write_roa_cache_projection_to_batch(
Ok(()) Ok(())
} }
fn write_publication_point_cache_projection_to_batch(
projection_cf: &ColumnFamily,
batch: &mut WriteBatch,
projection: Option<&PublicationPointCacheProjection>,
timing: Option<&mut VcirReplaceTimingBreakdown>,
) -> StorageResult<()> {
let Some(projection) = projection else {
return Ok(());
};
projection.validate_internal()?;
let key = publication_point_cache_projection_key(&projection.manifest_rsync_uri);
let value = encode_cbor(projection, "publication_point_cache_projection")?;
if let Some(timing) = timing {
timing.publication_point_cache_projection_value_bytes = value.len() as u64;
}
batch.put_cf(projection_cf, key.as_bytes(), value);
Ok(())
}
impl RocksStore { impl RocksStore {
pub fn open(path: &Path) -> StorageResult<Self> { pub fn open(path: &Path) -> StorageResult<Self> {
let mut base_opts = Options::default(); let mut base_opts = Options::default();
@ -2221,10 +2686,19 @@ impl RocksStore {
} }
pub fn put_vcir(&self, vcir: &ValidatedCaInstanceResult) -> StorageResult<()> { pub fn put_vcir(&self, vcir: &ValidatedCaInstanceResult) -> StorageResult<()> {
self.put_vcir_with_publication_point_cache_projection(vcir, None)
}
pub fn put_vcir_with_publication_point_cache_projection(
&self,
vcir: &ValidatedCaInstanceResult,
publication_point_projection: Option<&PublicationPointCacheProjection>,
) -> StorageResult<()> {
vcir.validate_internal()?; vcir.validate_internal()?;
let vcir_cf = self.cf(CF_VCIR)?; let vcir_cf = self.cf(CF_VCIR)?;
let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?; let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?;
let pp_projection_cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let replay_meta = ManifestReplayMeta::from_vcir(vcir); let replay_meta = ManifestReplayMeta::from_vcir(vcir);
replay_meta.validate_internal()?; replay_meta.validate_internal()?;
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
@ -2235,12 +2709,26 @@ impl RocksStore {
let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?; let replay_value = encode_cbor(&replay_meta, "manifest_replay_meta")?;
batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value); batch.put_cf(replay_cf, replay_key.as_bytes(), replay_value);
write_roa_cache_projection_to_batch(projection_cf, &mut batch, vcir, None)?; write_roa_cache_projection_to_batch(projection_cf, &mut batch, vcir, None)?;
write_publication_point_cache_projection_to_batch(
pp_projection_cf,
&mut batch,
publication_point_projection,
None,
)?;
self.write_batch(batch) self.write_batch(batch)
} }
pub fn replace_vcir_and_manifest_replay_meta( pub fn replace_vcir_and_manifest_replay_meta(
&self, &self,
vcir: &ValidatedCaInstanceResult, vcir: &ValidatedCaInstanceResult,
) -> StorageResult<VcirReplaceTimingBreakdown> {
self.replace_vcir_manifest_replay_meta_and_publication_point_cache_projection(vcir, None)
}
pub fn replace_vcir_manifest_replay_meta_and_publication_point_cache_projection(
&self,
vcir: &ValidatedCaInstanceResult,
publication_point_projection: Option<&PublicationPointCacheProjection>,
) -> StorageResult<VcirReplaceTimingBreakdown> { ) -> StorageResult<VcirReplaceTimingBreakdown> {
let mut timing = VcirReplaceTimingBreakdown { let mut timing = VcirReplaceTimingBreakdown {
rss_before_kb: process_vm_rss_kb(), rss_before_kb: process_vm_rss_kb(),
@ -2257,6 +2745,7 @@ impl RocksStore {
let vcir_cf = self.cf(CF_VCIR)?; let vcir_cf = self.cf(CF_VCIR)?;
let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?; let replay_cf = self.cf(CF_MANIFEST_REPLAY_META)?;
let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?; let projection_cf = self.cf(CF_ROA_CACHE_PROJECTION)?;
let pp_projection_cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
let vcir_key = vcir_key(&vcir.manifest_rsync_uri); let vcir_key = vcir_key(&vcir.manifest_rsync_uri);
@ -2283,9 +2772,21 @@ impl RocksStore {
projection_encode_started.elapsed().as_millis() as u64; projection_encode_started.elapsed().as_millis() as u64;
timing.rss_after_roa_cache_projection_encode_kb = process_vm_rss_kb(); timing.rss_after_roa_cache_projection_encode_kb = process_vm_rss_kb();
let pp_projection_encode_started = std::time::Instant::now();
write_publication_point_cache_projection_to_batch(
pp_projection_cf,
&mut batch,
publication_point_projection,
Some(&mut timing),
)?;
timing.publication_point_cache_projection_encode_ms =
pp_projection_encode_started.elapsed().as_millis() as u64;
timing.rss_after_publication_point_cache_projection_encode_kb = process_vm_rss_kb();
timing.total_encoded_bytes = timing.vcir_value_bytes timing.total_encoded_bytes = timing.vcir_value_bytes
+ timing.replay_meta_value_bytes + timing.replay_meta_value_bytes
+ timing.roa_cache_projection_value_bytes; + timing.roa_cache_projection_value_bytes
+ timing.publication_point_cache_projection_value_bytes;
timing.batch_build_ms = batch_build_started.elapsed().as_millis() as u64; timing.batch_build_ms = batch_build_started.elapsed().as_millis() as u64;
let write_batch_started = std::time::Instant::now(); let write_batch_started = std::time::Instant::now();
@ -2349,6 +2850,27 @@ impl RocksStore {
Ok(Some(projection)) Ok(Some(projection))
} }
pub fn get_publication_point_cache_projection(
&self,
manifest_rsync_uri: &str,
) -> StorageResult<Option<PublicationPointCacheProjection>> {
let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let key = publication_point_cache_projection_key(manifest_rsync_uri);
let Some(bytes) = self
.db
.get_cf(cf, key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?
else {
return Ok(None);
};
let projection = decode_cbor::<PublicationPointCacheProjection>(
&bytes,
"publication_point_cache_projection",
)?;
projection.validate_internal()?;
Ok(Some(projection))
}
pub fn put_transport_prefetch_snapshot( pub fn put_transport_prefetch_snapshot(
&self, &self,
snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot,

View File

@ -6,6 +6,7 @@ pub const CF_RAW_BLOB: &str = "raw_blob";
pub const CF_VCIR: &str = "vcir"; pub const CF_VCIR: &str = "vcir";
pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta"; pub const CF_MANIFEST_REPLAY_META: &str = "manifest_replay_meta";
pub const CF_ROA_CACHE_PROJECTION: &str = "roa_cache_projection"; pub const CF_ROA_CACHE_PROJECTION: &str = "roa_cache_projection";
pub const CF_PUBLICATION_POINT_CACHE_PROJECTION: &str = "publication_point_cache_projection";
pub const CF_RRDP_SOURCE: &str = "rrdp_source"; pub const CF_RRDP_SOURCE: &str = "rrdp_source";
pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member"; pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member";
pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner"; pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner";
@ -18,6 +19,7 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[
CF_VCIR, CF_VCIR,
CF_MANIFEST_REPLAY_META, CF_MANIFEST_REPLAY_META,
CF_ROA_CACHE_PROJECTION, CF_ROA_CACHE_PROJECTION,
CF_PUBLICATION_POINT_CACHE_PROJECTION,
CF_RRDP_SOURCE, CF_RRDP_SOURCE,
CF_RRDP_SOURCE_MEMBER, CF_RRDP_SOURCE_MEMBER,
CF_RRDP_URI_OWNER, CF_RRDP_URI_OWNER,
@ -30,6 +32,8 @@ pub(super) const RAW_BLOB_KEY_PREFIX: &str = "rawblob:";
pub(super) const VCIR_KEY_PREFIX: &str = "vcir:"; pub(super) const VCIR_KEY_PREFIX: &str = "vcir:";
pub(super) const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:"; pub(super) const MANIFEST_REPLAY_META_KEY_PREFIX: &str = "manifest_replay_meta:";
pub(super) const ROA_CACHE_PROJECTION_KEY_PREFIX: &str = "roa_cache_projection:"; pub(super) const ROA_CACHE_PROJECTION_KEY_PREFIX: &str = "roa_cache_projection:";
pub(super) const PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX: &str =
"publication_point_cache_projection:";
pub(super) const RRDP_SOURCE_KEY_PREFIX: &str = "rrdp_source:"; pub(super) const RRDP_SOURCE_KEY_PREFIX: &str = "rrdp_source:";
pub(super) const RRDP_SOURCE_MEMBER_KEY_PREFIX: &str = "rrdp_source_member:"; pub(super) const RRDP_SOURCE_MEMBER_KEY_PREFIX: &str = "rrdp_source_member:";
pub(super) const RRDP_URI_OWNER_KEY_PREFIX: &str = "rrdp_uri_owner:"; pub(super) const RRDP_URI_OWNER_KEY_PREFIX: &str = "rrdp_uri_owner:";

View File

@ -34,6 +34,10 @@ pub(super) fn roa_cache_projection_key(manifest_rsync_uri: &str) -> String {
format!("{ROA_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}") format!("{ROA_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}")
} }
pub(super) fn publication_point_cache_projection_key(manifest_rsync_uri: &str) -> String {
format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}")
}
pub(super) fn rrdp_source_key(notify_uri: &str) -> String { pub(super) fn rrdp_source_key(notify_uri: &str) -> String {
format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}") format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}")
} }

View File

@ -339,6 +339,63 @@ fn roa_cache_projection_groups_multiple_outputs_by_roa_uri() {
assert_eq!(projection.entries[0].outputs.len(), 2); assert_eq!(projection.entries[0].outputs.len(), 2);
} }
#[test]
fn publication_point_cache_projection_roundtrips_with_vcir() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put vcir with publication point projection");
let got_vcir = store
.get_vcir(&vcir.manifest_rsync_uri)
.expect("get vcir")
.expect("vcir exists");
assert_eq!(got_vcir, vcir);
let got_projection = store
.get_publication_point_cache_projection(&vcir.manifest_rsync_uri)
.expect("get publication point projection")
.expect("projection exists");
assert_eq!(got_projection, projection);
assert_eq!(got_projection.outputs.len(), 2);
assert_eq!(got_projection.children.len(), 1);
assert_eq!(got_projection.related_objects.len(), 2);
}
#[test]
fn publication_point_cache_projection_rejects_version_mismatch() {
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let mut projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
projection.schema_version = PUBLICATION_POINT_CACHE_SCHEMA_VERSION + 1;
assert!(matches!(
projection.validate_internal(),
Err(StorageError::InvalidData { .. })
));
}
#[test] #[test]
fn roa_cache_projection_rejects_duplicate_uri_with_different_hash() { fn roa_cache_projection_rejects_duplicate_uri_with_different_hash() {
let mut vcir = sample_vcir("rsync://example.test/repo/current.mft"); let mut vcir = sample_vcir("rsync://example.test/repo/current.mft");

View File

@ -14,6 +14,7 @@ use x509_parser::prelude::FromDer;
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PublicationPointSource { pub enum PublicationPointSource {
Fresh, Fresh,
PublicationPointCache,
VcirCurrentInstance, VcirCurrentInstance,
FailedFetchNoCache, FailedFetchNoCache,
} }

View File

@ -80,6 +80,8 @@ pub fn run_publication_point_once(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let result = runner let result = runner

View File

@ -136,6 +136,8 @@ fn make_live_runner<'a>(
ccr_accumulator: Option<CcrAccumulator>, ccr_accumulator: Option<CcrAccumulator>,
persist_vcir: bool, persist_vcir: bool,
enable_roa_validation_cache: bool, enable_roa_validation_cache: bool,
publication_point_cache_observe_only: bool,
enable_publication_point_validation_cache: bool,
) -> Rpkiv1PublicationPointRunner<'a> { ) -> Rpkiv1PublicationPointRunner<'a> {
let parallel_roa_worker_pool = parallel_phase2_config let parallel_roa_worker_pool = parallel_phase2_config
.as_ref() .as_ref()
@ -161,6 +163,8 @@ fn make_live_runner<'a>(
ccr_accumulator: ccr_accumulator.map(Mutex::new), ccr_accumulator: ccr_accumulator.map(Mutex::new),
persist_vcir, persist_vcir,
enable_roa_validation_cache, enable_roa_validation_cache,
publication_point_cache_observe_only,
enable_publication_point_validation_cache,
} }
} }
@ -557,6 +561,8 @@ pub fn run_tree_from_tal_url_serial(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -597,6 +603,8 @@ pub fn run_tree_from_tal_url_serial_audit(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -659,6 +667,8 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
None, None,
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -747,6 +757,8 @@ where
.then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])), .then(|| CcrAccumulator::new(vec![discovery.trust_anchor.clone()])),
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -873,6 +885,8 @@ where
}), }),
config.persist_vcir, config.persist_vcir,
config.enable_roa_validation_cache, config.enable_roa_validation_cache,
config.publication_point_cache_observe_only,
config.enable_publication_point_validation_cache,
); );
let TreeRunAuditOutput { let TreeRunAuditOutput {
@ -1299,6 +1313,8 @@ pub fn run_tree_from_tal_and_ta_der_serial(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1352,6 +1368,8 @@ pub fn run_tree_from_tal_bytes_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1427,6 +1445,8 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1501,6 +1521,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1576,6 +1598,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1658,6 +1682,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1721,6 +1747,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1806,6 +1834,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: config.enable_roa_validation_cache, enable_roa_validation_cache: config.enable_roa_validation_cache,
publication_point_cache_observe_only: config.publication_point_cache_observe_only,
enable_publication_point_validation_cache: config.enable_publication_point_validation_cache,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1871,6 +1901,8 @@ fn build_payload_replay_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
} }
} }
@ -1906,6 +1938,8 @@ fn build_payload_delta_replay_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
} }
} }
@ -1941,6 +1975,8 @@ fn build_payload_delta_replay_current_store_runner<'a>(
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache, enable_roa_validation_cache,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
} }
} }
@ -2504,6 +2540,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2541,6 +2579,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2588,6 +2628,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2636,6 +2678,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
&timing, &timing,
@ -2694,6 +2738,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2728,6 +2774,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2782,6 +2830,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -2845,6 +2895,8 @@ mod replay_api_tests {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
&timing, &timing,

View File

@ -22,6 +22,10 @@ pub struct TreeRunConfig {
pub build_ccr_accumulator: bool, pub build_ccr_accumulator: bool,
/// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled. /// Reuse accepted ROA validation outputs from previous VCIR when explicitly enabled.
pub enable_roa_validation_cache: bool, pub enable_roa_validation_cache: bool,
/// Evaluate publication-point cache eligibility without changing validation results.
pub publication_point_cache_observe_only: bool,
/// Reuse complete publication-point validation projections when explicitly enabled.
pub enable_publication_point_validation_cache: bool,
/// Prefetch transport requests from the previous run before normal tree traversal. /// Prefetch transport requests from the previous run before normal tree traversal.
pub enable_transport_request_prefetch: bool, pub enable_transport_request_prefetch: bool,
} }
@ -35,6 +39,8 @@ impl Default for TreeRunConfig {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
} }
} }
@ -154,7 +160,8 @@ pub(crate) fn submit_publication_point_cir_input(
.submit_audit_entries(crate::cir::CirInputSection::Fresh, entries) .submit_audit_entries(crate::cir::CirInputSection::Fresh, entries)
.map_err(|e| TreeRunError::Runner(e.to_string()))?; .map_err(|e| TreeRunError::Runner(e.to_string()))?;
} }
PublicationPointSource::VcirCurrentInstance => { PublicationPointSource::VcirCurrentInstance
| PublicationPointSource::PublicationPointCache => {
cir_input cir_input
.submit_audit_entries(crate::cir::CirInputSection::Fresh, cir_fresh_objects) .submit_audit_entries(crate::cir::CirInputSection::Fresh, cir_fresh_objects)
.map_err(|e| TreeRunError::Runner(e.to_string()))?; .map_err(|e| TreeRunError::Runner(e.to_string()))?;

View File

@ -909,6 +909,33 @@ fn stage_ready_publication_point(
}; };
let mut warnings = ready.repo_outcome.warnings.clone(); let mut warnings = ready.repo_outcome.warnings.clone();
let repo_outcome = ready.repo_outcome.clone(); let repo_outcome = ready.repo_outcome.clone();
if let Some(result) = runner.observe_or_reuse_publication_point_cache(
&ready.node.handle,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
&warnings,
) {
metrics.complete_count = 1;
metrics.discovered_children = result.discovered_children.len();
let child_enqueue_started = Instant::now();
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
result.discovered_children.clone(),
);
metrics.child_enqueue_ms = elapsed_ms(child_enqueue_started);
finished.push(FinishedPublicationPoint {
node: FinishedPublicationPointNode::from_queued(ready.node),
result: compact_phase2_finished_result(result, compact_audit),
});
metrics.total_ms = elapsed_ms(publication_point_started);
return metrics;
}
let stage_fresh_started = Instant::now(); let stage_fresh_started = Instant::now();
let stage = runner.stage_fresh_publication_point_after_repo_ready( let stage = runner.stage_fresh_publication_point_after_repo_ready(
&ready.node.handle, &ready.node.handle,

View File

@ -25,10 +25,11 @@ use crate::replay::archive::ReplayArchiveIndex;
use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::report::{RfcRef, Warning}; use crate::report::{RfcRef, Warning};
use crate::storage::{ use crate::storage::{
PackFile, PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, PackFile, PackTime, PublicationPointCacheProjection, RawByHashEntry, RocksStore,
VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus,
VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput,
VcirRelatedArtifact, VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown,
VcirSourceObjectType, VcirSummary,
}; };
use crate::sync::repo::{ use crate::sync::repo::{
sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta, sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta,
@ -154,6 +155,8 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
/// the resulting DB to be reused by a later delta run. /// the resulting DB to be reused by a later delta run.
pub persist_vcir: bool, pub persist_vcir: bool,
pub enable_roa_validation_cache: bool, pub enable_roa_validation_cache: bool,
pub publication_point_cache_observe_only: bool,
pub enable_publication_point_validation_cache: bool,
} }
impl<'a> Rpkiv1PublicationPointRunner<'a> { impl<'a> Rpkiv1PublicationPointRunner<'a> {
@ -214,6 +217,284 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
} }
} }
pub(crate) fn observe_or_reuse_publication_point_cache(
&self,
ca: &CaInstanceHandle,
repo_sync_source: Option<&str>,
repo_sync_phase: Option<&str>,
repo_sync_duration_ms: u64,
repo_sync_err: Option<&str>,
warnings: &[Warning],
) -> Option<PublicationPointRunResult> {
if !self.publication_point_cache_observe_only
&& !self.enable_publication_point_validation_cache
{
return None;
}
let lookup_started = std::time::Instant::now();
if let Some(timing) = self.timing.as_ref() {
timing.record_count("publication_point_cache_lookup_total", 1);
}
let projection = match self
.store
.get_publication_point_cache_projection(&ca.manifest_rsync_uri)
{
Ok(Some(projection)) => projection,
Ok(None) => {
self.record_publication_point_cache_miss("missing_projection");
return None;
}
Err(e) => {
self.record_publication_point_cache_miss("projection_load_error");
crate::progress_log::emit(
"publication_point_cache_lookup_error",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"error": e.to_string(),
}),
);
return None;
}
};
let current_identity = match self.current_publication_point_cache_identity(ca) {
Ok(identity) => identity,
Err(reason) => {
self.record_publication_point_cache_miss(reason.as_str());
return None;
}
};
if projection.ca_cert_uri != ca.ca_certificate_rsync_uri {
self.record_publication_point_cache_miss("ca_uri_mismatch");
return None;
}
if projection.ca_cert_sha256 != current_identity.ca_cert_sha256 {
self.record_publication_point_cache_miss("ca_hash_mismatch");
return None;
}
if projection.manifest_sha256 != current_identity.manifest_sha256 {
self.record_publication_point_cache_miss("manifest_hash_mismatch");
return None;
}
if projection.tal_id != ca.tal_id {
self.record_publication_point_cache_miss("tal_mismatch");
return None;
}
if projection.ta_context_digest != current_identity.ta_context_digest {
self.record_publication_point_cache_miss("ta_context_mismatch");
return None;
}
if projection.parent_context_digest != current_identity.parent_context_digest {
self.record_publication_point_cache_miss("parent_context_mismatch");
return None;
}
if projection.validation_policy_fingerprint != current_identity.policy_fingerprint {
self.record_publication_point_cache_miss("policy_mismatch");
return None;
}
let instance_not_before =
match parse_snapshot_time_value(&projection.instance_effective_not_before) {
Ok(value) => value,
Err(_) => {
self.record_publication_point_cache_miss("instance_not_before_invalid");
return None;
}
};
let instance_until = match parse_snapshot_time_value(&projection.instance_effective_until) {
Ok(value) => value,
Err(_) => {
self.record_publication_point_cache_miss("instance_until_invalid");
return None;
}
};
if self.validation_time < instance_not_before || self.validation_time >= instance_until {
self.record_publication_point_cache_miss("instance_time_gate_miss");
return None;
}
if let Err(reason) =
publication_point_cache_projection_items_valid(&projection, self.validation_time)
{
self.record_publication_point_cache_miss(reason);
return None;
}
if let Some(timing) = self.timing.as_ref() {
timing.record_count("publication_point_cache_theoretical_hits", 1);
timing.record_phase_nanos(
"publication_point_cache_lookup_total",
lookup_started
.elapsed()
.as_nanos()
.min(u128::from(u64::MAX)) as u64,
);
}
if self.publication_point_cache_observe_only {
return None;
}
match self.build_publication_point_cache_result(
ca,
projection,
repo_sync_source,
repo_sync_phase,
repo_sync_duration_ms,
repo_sync_err,
warnings,
) {
Ok(result) => {
if let Some(timing) = self.timing.as_ref() {
timing.record_count("publication_point_cache_reuse_hits", 1);
}
Some(result)
}
Err(e) => {
self.record_publication_point_cache_miss("reuse_build_error");
crate::progress_log::emit(
"publication_point_cache_reuse_error",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"error": e,
}),
);
None
}
}
}
fn record_publication_point_cache_miss(&self, reason: &str) {
if let Some(timing) = self.timing.as_ref() {
timing.record_count("publication_point_cache_miss_total", 1);
match reason {
"missing_projection" => {
timing.record_count("publication_point_cache_miss_missing_projection", 1)
}
"current_manifest_missing" => {
timing.record_count("publication_point_cache_miss_current_manifest_missing", 1)
}
"ca_hash_mismatch" => {
timing.record_count("publication_point_cache_miss_ca_hash_mismatch", 1)
}
"manifest_hash_mismatch" => {
timing.record_count("publication_point_cache_miss_manifest_hash_mismatch", 1)
}
"parent_context_mismatch" => {
timing.record_count("publication_point_cache_miss_parent_context_mismatch", 1)
}
"policy_mismatch" => {
timing.record_count("publication_point_cache_miss_policy_mismatch", 1)
}
"instance_time_gate_miss" => {
timing.record_count("publication_point_cache_miss_instance_time_gate", 1)
}
"output_time_gate_miss" => {
timing.record_count("publication_point_cache_miss_output_time_gate", 1)
}
"child_time_gate_miss" => {
timing.record_count("publication_point_cache_miss_child_time_gate", 1)
}
_ => timing.record_count("publication_point_cache_miss_other", 1),
}
}
}
fn current_publication_point_cache_identity(
&self,
ca: &CaInstanceHandle,
) -> Result<PublicationPointCacheIdentity, String> {
let ca_cert_sha256 = match ca.ca_certificate_rsync_uri.as_deref() {
Some(uri) => self
.current_hash_for_uri(uri)
.unwrap_or_else(|| sha256_digest_32(&ca.ca_certificate_der)),
None => sha256_digest_32(&ca.ca_certificate_der),
};
let manifest_sha256 = self
.current_hash_for_uri(&ca.manifest_rsync_uri)
.ok_or_else(|| "current_manifest_missing".to_string())?;
Ok(PublicationPointCacheIdentity {
ca_cert_sha256,
manifest_sha256,
ta_context_digest: ta_context_digest_for_ca(ca),
parent_context_digest: parent_context_digest_for_ca(ca),
policy_fingerprint: publication_point_cache_policy_fingerprint(self.policy),
})
}
fn current_hash_for_uri(&self, uri: &str) -> Option<[u8; 32]> {
if let Some(index) = self.current_repo_index.as_ref() {
if let Ok(index) = index.lock() {
if let Some(entry) = index.get_by_uri(uri) {
return Some(entry.current_hash);
}
}
}
self.store
.load_current_object_with_hash_by_uri(uri)
.ok()
.flatten()
.map(|entry| entry.current_hash)
}
fn build_publication_point_cache_result(
&self,
ca: &CaInstanceHandle,
projection: PublicationPointCacheProjection,
repo_sync_source: Option<&str>,
repo_sync_phase: Option<&str>,
repo_sync_duration_ms: u64,
repo_sync_err: Option<&str>,
warnings: &[Warning],
) -> Result<PublicationPointRunResult, String> {
let mut warnings = warnings.to_vec();
let output_reuse_count = projection.outputs.len() as u64;
let child_reuse_count = projection.children.len() as u64;
let mut vcir = projection.to_vcir_for_reuse(self.validation_time);
vcir.parent_manifest_rsync_uri = ca.parent_manifest_rsync_uri.clone();
let mut objects =
build_objects_output_from_vcir(&vcir, self.validation_time, &mut warnings);
let (discovered_children, child_audits) = restore_children_from_publication_point_cache(
self.store,
ca,
&projection,
self.validation_time,
&mut warnings,
);
let ccr_projection = projection.ccr_manifest_projection.clone();
self.append_ccr_manifest_projection(&ccr_projection)?;
let audit = build_publication_point_audit_from_vcir(
ca,
PublicationPointSource::PublicationPointCache,
repo_sync_source,
repo_sync_phase,
Some(repo_sync_duration_ms),
repo_sync_err,
Some(&vcir),
None,
&warnings,
&objects,
&child_audits,
&[],
);
let cir_cached_objects = audit.objects.clone();
objects.local_outputs_cache.clear();
if let Some(timing) = self.timing.as_ref() {
timing.record_count("publication_point_cache_outputs_reused", output_reuse_count);
timing.record_count("publication_point_cache_children_reused", child_reuse_count);
}
Ok(PublicationPointRunResult {
source: PublicationPointSource::PublicationPointCache,
snapshot: None,
warnings,
objects,
audit,
cir_fresh_objects: Vec::new(),
cir_cached_objects,
discovered_children,
})
}
pub(crate) fn ccr_accumulator_snapshot(&self) -> Option<CcrAccumulator> { pub(crate) fn ccr_accumulator_snapshot(&self) -> Option<CcrAccumulator> {
self.ccr_accumulator self.ccr_accumulator
.as_ref() .as_ref()
@ -242,6 +523,11 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
"invalid reuse projection source: fresh does not belong to failed-fetch reuse" "invalid reuse projection source: fresh does not belong to failed-fetch reuse"
.to_string(), .to_string(),
), ),
PublicationPointSource::PublicationPointCache => self.append_ccr_manifest_projection(
projection.ccr_manifest_projection.as_ref().ok_or_else(|| {
"publication-point cache reuse is missing CCR manifest projection".to_string()
})?,
),
PublicationPointSource::VcirCurrentInstance => self.append_ccr_manifest_projection( PublicationPointSource::VcirCurrentInstance => self.append_ccr_manifest_projection(
projection.ccr_manifest_projection.as_ref().ok_or_else(|| { projection.ccr_manifest_projection.as_ref().ok_or_else(|| {
"vcir current-instance reuse is missing CCR manifest projection".to_string() "vcir current-instance reuse is missing CCR manifest projection".to_string()
@ -386,6 +672,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
let persist_vcir_timing = if self.persist_vcir { let persist_vcir_timing = if self.persist_vcir {
persist_vcir_for_fresh_result_with_timing( persist_vcir_for_fresh_result_with_timing(
self.store, self.store,
self.policy,
ca, ca,
&pack, &pack,
&mut objects, &mut objects,
@ -393,6 +680,8 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
&child_audits, &child_audits,
&discovered_children, &discovered_children,
self.validation_time, self.validation_time,
self.publication_point_cache_observe_only
|| self.enable_publication_point_validation_cache,
) )
.map_err(|e| format!("persist VCIR failed: {e}"))? .map_err(|e| format!("persist VCIR failed: {e}"))?
} else { } else {
@ -716,6 +1005,36 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
}), }),
); );
if let Some(result) = self.observe_or_reuse_publication_point_cache(
ca,
repo_sync_source.as_deref(),
repo_sync_phase.as_deref(),
repo_sync_duration_ms,
repo_sync_err.as_deref(),
&warnings,
) {
let total_duration_ms = publication_point_started.elapsed().as_millis() as u64;
crate::progress_log::emit(
"publication_point_finish",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"source": source_label(result.source),
"repo_sync_source": repo_sync_source,
"repo_sync_phase": repo_sync_phase,
"repo_sync_duration_ms": repo_sync_duration_ms,
"total_duration_ms": total_duration_ms,
"post_repo_duration_ms": total_duration_ms.saturating_sub(repo_sync_duration_ms),
"warning_count": result.warnings.len(),
"vrp_count": result.objects.vrps.len(),
"vap_count": result.objects.aspas.len(),
"router_key_count": result.objects.router_keys.len(),
"child_count": result.discovered_children.len(),
}),
);
return Ok(result);
}
let fresh_stage = self.stage_fresh_publication_point_after_repo_ready( let fresh_stage = self.stage_fresh_publication_point_after_repo_ready(
ca, ca,
repo_sync_ok, repo_sync_ok,
@ -1094,6 +1413,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
); );
} }
PublicationPointSource::Fresh => {} PublicationPointSource::Fresh => {}
PublicationPointSource::PublicationPointCache => {}
PublicationPointSource::VcirCurrentInstance => {} PublicationPointSource::VcirCurrentInstance => {}
} }
if (total_duration_ms as f64) / 1000.0 if (total_duration_ms as f64) / 1000.0
@ -1141,6 +1461,119 @@ enum CachedIssuerCrl {
Pending(Vec<u8>), Pending(Vec<u8>),
Ok(VerifiedIssuerCrl), Ok(VerifiedIssuerCrl),
} }
struct PublicationPointCacheIdentity {
ca_cert_sha256: [u8; 32],
manifest_sha256: [u8; 32],
ta_context_digest: [u8; 32],
parent_context_digest: [u8; 32],
policy_fingerprint: [u8; 32],
}
fn sha256_digest_32(bytes: impl AsRef<[u8]>) -> [u8; 32] {
let digest = sha2::Sha256::digest(bytes.as_ref());
let mut out = [0u8; 32];
out.copy_from_slice(&digest);
out
}
fn hash_serialized_parts(parts: &[(&str, Vec<u8>)]) -> [u8; 32] {
let mut hasher = sha2::Sha256::new();
for (label, value) in parts {
hasher.update((label.len() as u64).to_be_bytes());
hasher.update(label.as_bytes());
hasher.update((value.len() as u64).to_be_bytes());
hasher.update(value);
}
let digest = hasher.finalize();
let mut out = [0u8; 32];
out.copy_from_slice(&digest);
out
}
fn cbor_or_debug_bytes<T: serde::Serialize + std::fmt::Debug>(value: &T) -> Vec<u8> {
serde_cbor::to_vec(value).unwrap_or_else(|_| format!("{value:?}").into_bytes())
}
fn ta_context_digest_for_ca(ca: &CaInstanceHandle) -> [u8; 32] {
hash_serialized_parts(&[
("version", b"publication-point-cache-ta-v1".to_vec()),
("tal_id", ca.tal_id.as_bytes().to_vec()),
])
}
fn parent_context_digest_for_ca(ca: &CaInstanceHandle) -> [u8; 32] {
hash_serialized_parts(&[
(
"version",
b"publication-point-cache-parent-context-v1".to_vec(),
),
("tal_id", ca.tal_id.as_bytes().to_vec()),
(
"parent_manifest",
ca.parent_manifest_rsync_uri
.as_deref()
.unwrap_or("")
.as_bytes()
.to_vec(),
),
(
"effective_ip",
cbor_or_debug_bytes(&ca.effective_ip_resources),
),
(
"effective_as",
cbor_or_debug_bytes(&ca.effective_as_resources),
),
])
}
fn publication_point_cache_policy_fingerprint(policy: &Policy) -> [u8; 32] {
hash_serialized_parts(&[
("version", b"publication-point-cache-policy-v1".to_vec()),
("policy", cbor_or_debug_bytes(policy)),
])
}
fn publication_point_cache_projection_items_valid(
projection: &PublicationPointCacheProjection,
validation_time: time::OffsetDateTime,
) -> Result<(), &'static str> {
for output in &projection.outputs {
if !pack_time_window_contains(
&output.item_effective_not_before,
&output.item_effective_until,
validation_time,
) {
return Err("output_time_gate_miss");
}
}
for child in &projection.children {
if !pack_time_window_contains(
&child.child_effective_not_before,
&child.child_effective_until,
validation_time,
) {
return Err("child_time_gate_miss");
}
}
Ok(())
}
fn pack_time_window_contains(
not_before: &PackTime,
until: &PackTime,
validation_time: time::OffsetDateTime,
) -> bool {
let Ok(not_before) = parse_snapshot_time_value(not_before) else {
return false;
};
let Ok(until) = parse_snapshot_time_value(until) else {
return false;
};
validation_time >= not_before && validation_time < until
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct VcirReuseProjection { struct VcirReuseProjection {
source: PublicationPointSource, source: PublicationPointSource,
@ -1759,6 +2192,7 @@ fn kind_from_rsync_uri(uri: &str) -> AuditObjectKind {
fn source_label(source: PublicationPointSource) -> String { fn source_label(source: PublicationPointSource) -> String {
match source { match source {
PublicationPointSource::Fresh => "fresh".to_string(), PublicationPointSource::Fresh => "fresh".to_string(),
PublicationPointSource::PublicationPointCache => "publication_point_cache".to_string(),
PublicationPointSource::VcirCurrentInstance => "vcir_current_instance".to_string(), PublicationPointSource::VcirCurrentInstance => "vcir_current_instance".to_string(),
PublicationPointSource::FailedFetchNoCache => "failed_fetch_no_cache".to_string(), PublicationPointSource::FailedFetchNoCache => "failed_fetch_no_cache".to_string(),
} }
@ -1797,6 +2231,7 @@ fn repo_sync_failure_phase_label(
fn terminal_state_label(source: PublicationPointSource) -> &'static str { fn terminal_state_label(source: PublicationPointSource) -> &'static str {
match source { match source {
PublicationPointSource::Fresh => "fresh", PublicationPointSource::Fresh => "fresh",
PublicationPointSource::PublicationPointCache => "publication_point_cache",
PublicationPointSource::VcirCurrentInstance => "fallback_current_instance", PublicationPointSource::VcirCurrentInstance => "fallback_current_instance",
PublicationPointSource::FailedFetchNoCache => "failed_no_cache", PublicationPointSource::FailedFetchNoCache => "failed_no_cache",
} }
@ -2694,8 +3129,128 @@ fn restore_children_from_vcir(
(children, audits) (children, audits)
} }
fn restore_children_from_publication_point_cache(
store: &RocksStore,
ca: &CaInstanceHandle,
projection: &PublicationPointCacheProjection,
validation_time: time::OffsetDateTime,
warnings: &mut Vec<Warning>,
) -> (Vec<DiscoveredChildCaInstance>, Vec<ObjectAuditEntry>) {
let mut children = Vec::new();
let mut audits = Vec::new();
for child in &projection.children {
let effective_not_before =
match parse_snapshot_time_value(&child.child_effective_not_before) {
Ok(value) => value,
Err(e) => {
warnings.push(
Warning::new(format!(
"publication-point cache child has invalid effective notBefore: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
continue;
}
};
let effective_until = match parse_snapshot_time_value(&child.child_effective_until) {
Ok(value) => value,
Err(e) => {
warnings.push(
Warning::new(format!(
"publication-point cache child has invalid effective until: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
continue;
}
};
if validation_time < effective_not_before || validation_time > effective_until {
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Skipped,
detail: Some("skipped: publication-point cache child expired".to_string()),
});
continue;
}
match store.get_blob_bytes(&child.child_cert_hash) {
Ok(Some(bytes)) => {
children.push(DiscoveredChildCaInstance {
handle: CaInstanceHandle {
depth: 0,
tal_id: ca.tal_id.clone(),
parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()),
ca_certificate_der: bytes,
ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()),
effective_ip_resources: child.child_effective_ip_resources.clone(),
effective_as_resources: child.child_effective_as_resources.clone(),
rsync_base_uri: child.child_rsync_base_uri.clone(),
manifest_rsync_uri: child.child_manifest_rsync_uri.clone(),
publication_point_rsync_uri: child
.child_publication_point_rsync_uri
.clone(),
rrdp_notification_uri: child.child_rrdp_notification_uri.clone(),
},
discovered_from: crate::audit::DiscoveredFrom {
parent_manifest_rsync_uri: ca.manifest_rsync_uri.clone(),
child_ca_certificate_rsync_uri: child.child_cert_rsync_uri.clone(),
child_ca_certificate_sha256_hex: child.child_cert_hash.clone(),
},
});
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Ok,
detail: Some(
"restored child CA instance from publication-point cache".to_string(),
),
});
}
Ok(None) => {
warnings.push(
Warning::new(
"child certificate bytes missing for publication-point cache restoration",
)
.with_context(&child.child_cert_rsync_uri),
);
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error,
detail: Some(
"child certificate bytes missing for publication-point cache restoration"
.to_string(),
),
});
}
Err(e) => {
warnings.push(
Warning::new(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
))
.with_context(&child.child_cert_rsync_uri),
);
audits.push(ObjectAuditEntry {
rsync_uri: child.child_cert_rsync_uri.clone(),
sha256_hex: child.child_cert_hash.clone(),
kind: AuditObjectKind::Certificate,
result: AuditObjectResult::Error,
detail: Some(format!(
"child certificate bytes load failed for publication-point cache restoration: {e}"
)),
});
}
}
}
(children, audits)
}
fn persist_vcir_for_fresh_result_with_timing( fn persist_vcir_for_fresh_result_with_timing(
store: &RocksStore, store: &RocksStore,
policy: &Policy,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot, pack: &PublicationPointSnapshot,
objects: &mut crate::validation::objects::ObjectsOutput, objects: &mut crate::validation::objects::ObjectsOutput,
@ -2703,6 +3258,7 @@ fn persist_vcir_for_fresh_result_with_timing(
child_audits: &[ObjectAuditEntry], child_audits: &[ObjectAuditEntry],
discovered_children: &[DiscoveredChildCaInstance], discovered_children: &[DiscoveredChildCaInstance],
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
write_publication_point_cache_projection: bool,
) -> Result<PersistVcirTimingBreakdown, String> { ) -> Result<PersistVcirTimingBreakdown, String> {
let mut timing = PersistVcirTimingBreakdown::default(); let mut timing = PersistVcirTimingBreakdown::default();
@ -2729,8 +3285,18 @@ fn persist_vcir_for_fresh_result_with_timing(
timing.build_vcir = build_vcir_timing; timing.build_vcir = build_vcir_timing;
let replace_vcir_started = std::time::Instant::now(); let replace_vcir_started = std::time::Instant::now();
let publication_point_cache_projection = if write_publication_point_cache_projection {
Some(build_publication_point_cache_projection_from_fresh(
policy, ca, pack, &vcir,
)?)
} else {
None
};
let replace_timing = store let replace_timing = store
.replace_vcir_and_manifest_replay_meta(&vcir) .replace_vcir_manifest_replay_meta_and_publication_point_cache_projection(
&vcir,
publication_point_cache_projection.as_ref(),
)
.map_err(|e| format!("store VCIR and manifest replay meta failed: {e}"))?; .map_err(|e| format!("store VCIR and manifest replay meta failed: {e}"))?;
timing.replace_vcir_ms = replace_vcir_started.elapsed().as_millis() as u64; timing.replace_vcir_ms = replace_vcir_started.elapsed().as_millis() as u64;
timing.replace_vcir = replace_timing; timing.replace_vcir = replace_timing;
@ -2738,6 +3304,27 @@ fn persist_vcir_for_fresh_result_with_timing(
Ok(timing) Ok(timing)
} }
fn build_publication_point_cache_projection_from_fresh(
policy: &Policy,
ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot,
vcir: &ValidatedCaInstanceResult,
) -> Result<PublicationPointCacheProjection, String> {
let ca_cert_sha256 = sha256_digest_32(&ca.ca_certificate_der);
let manifest_sha256 = sha256_digest_32(&pack.manifest_bytes);
PublicationPointCacheProjection::from_vcir_with_context(
vcir,
pack.publication_point_rsync_uri.clone(),
ca.ca_certificate_rsync_uri.clone(),
ca_cert_sha256,
manifest_sha256,
ta_context_digest_for_ca(ca),
parent_context_digest_for_ca(ca),
publication_point_cache_policy_fingerprint(policy),
)
.map_err(|e| e.to_string())
}
fn build_vcir_from_fresh_result_with_timing( fn build_vcir_from_fresh_result_with_timing(
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
pack: &PublicationPointSnapshot, pack: &PublicationPointSnapshot,

View File

@ -4,10 +4,11 @@ use crate::data_model::roa::RoaAfi;
use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher}; use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use crate::storage::{ use crate::storage::{
PackFile, PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, PackFile, PackTime, PublicationPointCacheProjection, RawByHashEntry, RepositoryViewEntry,
ValidatedManifestMeta, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, RepositoryViewState, RocksStore, ValidatedCaInstanceResult, ValidatedManifestMeta,
VcirAuditSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary,
VcirOutputType, VcirRelatedArtifact, VcirSourceObjectType, VcirSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType,
VcirRelatedArtifact, VcirSourceObjectType, VcirSummary,
}; };
use crate::sync::rrdp::Fetcher; use crate::sync::rrdp::Fetcher;
use crate::validation::publication_point::PublicationPointSnapshot; use crate::validation::publication_point::PublicationPointSnapshot;
@ -69,6 +70,8 @@ fn sample_runner_with_ccr_accumulator<'a>(
ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))), ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))),
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
} }
} }
@ -960,6 +963,8 @@ fn finalize_fresh_publication_point_releases_local_outputs_cache_after_persist()
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let ca = CaInstanceHandle { let ca = CaInstanceHandle {
depth: 0, depth: 0,
@ -1062,6 +1067,7 @@ fn persist_vcir_for_fresh_result_stores_vcir_and_replay_meta_for_real_snapshot()
let mut objects = objects; let mut objects = objects;
persist_vcir_for_fresh_result_with_timing( persist_vcir_for_fresh_result_with_timing(
&store, &store,
&Policy::default(),
&ca, &ca,
&pack, &pack,
&mut objects, &mut objects,
@ -1069,6 +1075,7 @@ fn persist_vcir_for_fresh_result_stores_vcir_and_replay_meta_for_real_snapshot()
&[], &[],
&[], &[],
validation_time, validation_time,
false,
) )
.map(|_timing| ()) .map(|_timing| ())
.expect("persist vcir for fresh result"); .expect("persist vcir for fresh result");
@ -1522,6 +1529,8 @@ fn runner_offline_rsync_fixture_produces_pack_and_warnings() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
// For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for // For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for
@ -1653,6 +1662,8 @@ fn runner_roa_validation_cache_reuses_vcir_outputs_on_second_fixture_run() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = first_runner let first = first_runner
.run_publication_point(&handle) .run_publication_point(&handle)
@ -1681,6 +1692,8 @@ fn runner_roa_validation_cache_reuses_vcir_outputs_on_second_fixture_run() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: true, enable_roa_validation_cache: true,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let second = second_runner let second = second_runner
.run_publication_point(&handle) .run_publication_point(&handle)
@ -1702,6 +1715,456 @@ fn runner_roa_validation_cache_reuses_vcir_outputs_on_second_fixture_run() {
assert_eq!(second.objects.roa_cache_stats.fresh_roas, 0); assert_eq!(second.objects.roa_cache_stats.fresh_roas, 0);
} }
#[test]
fn runner_publication_point_cache_observe_and_reuse_path() {
let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0");
assert!(fixture_dir.is_dir(), "fixture directory must exist");
let rsync_base_uri = "rsync://rpki.cernet.net/repo/cernet/0/".to_string();
let manifest_file = "05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft";
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let fixture_manifest_bytes =
std::fs::read(fixture_dir.join(manifest_file)).expect("read manifest fixture");
let fixture_manifest =
crate::data_model::manifest::ManifestObject::decode_der(&fixture_manifest_bytes)
.expect("decode manifest fixture");
let validation_time = fixture_manifest.manifest.this_update + time::Duration::seconds(60);
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy {
sync_preference: crate::policy::SyncPreference::RsyncOnly,
..Policy::default()
};
let issuer_ca_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(
"tests/fixtures/repository/rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer",
),
)
.expect("read issuer ca fixture");
let issuer_ca = ResourceCertificate::decode_der(&issuer_ca_der).expect("decode issuer ca");
let handle = CaInstanceHandle {
depth: 0,
tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: issuer_ca_der,
ca_certificate_rsync_uri: Some("rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer".to_string()),
effective_ip_resources: issuer_ca.tbs.extensions.ip_resources.clone(),
effective_as_resources: issuer_ca.tbs.extensions.as_resources.clone(),
rsync_base_uri: rsync_base_uri.clone(),
manifest_rsync_uri: manifest_rsync_uri.clone(),
publication_point_rsync_uri: rsync_base_uri,
rrdp_notification_uri: None,
};
let first_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: true,
enable_publication_point_validation_cache: false,
};
let first = first_runner
.run_publication_point(&handle)
.expect("first fresh run");
assert_eq!(first.source, PublicationPointSource::Fresh);
assert!(
store
.get_publication_point_cache_projection(&manifest_rsync_uri)
.expect("load publication-point projection")
.is_some()
);
let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta {
recorded_at_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
tal_url: None,
db_path: None,
});
let observe_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: Some(timing.clone()),
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: true,
enable_publication_point_validation_cache: false,
};
let observed = observe_runner
.run_publication_point(&handle)
.expect("observe-only run");
assert_eq!(observed.source, PublicationPointSource::Fresh);
assert_eq!(observed.objects.vrps, first.objects.vrps);
assert_eq!(
timing
.counts_snapshot()
.get("publication_point_cache_theoretical_hits")
.copied(),
Some(1)
);
let cache_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: true,
};
let cached = cache_runner
.run_publication_point(&handle)
.expect("publication-point cache run");
assert_eq!(cached.source, PublicationPointSource::PublicationPointCache);
assert_eq!(cached.objects.vrps, first.objects.vrps);
assert_eq!(cached.objects.aspas, first.objects.aspas);
assert_eq!(
cached.discovered_children.len(),
first.discovered_children.len()
);
assert!(!cached.cir_cached_objects.is_empty());
}
fn seed_publication_point_cache_projection(
store: &RocksStore,
policy: &Policy,
ca: &CaInstanceHandle,
validation_time: time::OffsetDateTime,
) -> ValidatedCaInstanceResult {
let child_bytes = b"child-cert".to_vec();
let child_hash = sha256_hex(&child_bytes);
let vcir = sample_vcir_for_projection(validation_time, &child_hash);
store
.put_blob_bytes_batch(&[
(child_hash, child_bytes),
(sha256_hex(b"manifest-bytes"), b"manifest-bytes".to_vec()),
])
.expect("put cache bytes");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: ca.manifest_rsync_uri.clone(),
current_hash: Some(sha256_hex(b"manifest-bytes")),
repository_source: Some(ca.publication_point_rsync_uri.clone()),
object_type: Some("mft".to_string()),
state: RepositoryViewState::Present,
})
.expect("put manifest current view");
let projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
ca.publication_point_rsync_uri.clone(),
ca.ca_certificate_rsync_uri.clone(),
sha256_32(&ca.ca_certificate_der),
sha256_32(b"manifest-bytes"),
ta_context_digest_for_ca(ca),
parent_context_digest_for_ca(ca),
publication_point_cache_policy_fingerprint(policy),
)
.expect("build publication point projection");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put publication point projection");
vcir
}
fn publication_point_cache_fixture_ca() -> CaInstanceHandle {
CaInstanceHandle {
depth: 1,
tal_id: "test-tal".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: b"ca-cert".to_vec(),
ca_certificate_rsync_uri: Some("rsync://example.test/repo/issuer/issuer.cer".to_string()),
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: "rsync://example.test/repo/issuer/".to_string(),
manifest_rsync_uri: "rsync://example.test/repo/issuer/issuer.mft".to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(),
rrdp_notification_uri: Some("https://example.test/notification.xml".to_string()),
}
}
#[test]
fn runner_publication_point_cache_reuses_projection_outputs_children_and_ccr() {
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy::default();
let validation_time = time::OffsetDateTime::UNIX_EPOCH + time::Duration::minutes(1);
let ca = publication_point_cache_fixture_ca();
let vcir = seed_publication_point_cache_projection(&store, &policy, &ca, validation_time);
let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta {
recorded_at_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
tal_url: None,
db_path: None,
});
let runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &FailingRsyncFetcher,
validation_time,
timing: Some(timing.clone()),
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: Some(Mutex::new(CcrAccumulator::new(Vec::new()))),
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: true,
};
let result = runner
.observe_or_reuse_publication_point_cache(&ca, Some("rrdp"), Some("delta"), 7, None, &[])
.expect("cache result");
assert_eq!(result.source, PublicationPointSource::PublicationPointCache);
assert_eq!(result.objects.vrps.len(), 1);
assert_eq!(result.objects.aspas.len(), 1);
assert_eq!(result.objects.router_keys.len(), 1);
assert_eq!(result.discovered_children.len(), 1);
assert_eq!(
result.discovered_children[0].handle.manifest_rsync_uri,
vcir.child_entries[0].child_manifest_rsync_uri
);
assert!(result.cir_fresh_objects.is_empty());
assert!(!result.cir_cached_objects.is_empty());
assert_eq!(
runner
.ccr_accumulator_snapshot()
.expect("ccr accumulator")
.manifest_count(),
1
);
let counts = timing.counts_snapshot();
assert_eq!(
counts.get("publication_point_cache_reuse_hits").copied(),
Some(1)
);
assert_eq!(
counts
.get("publication_point_cache_outputs_reused")
.copied(),
Some(3)
);
assert_eq!(
counts
.get("publication_point_cache_children_reused")
.copied(),
Some(1)
);
}
#[test]
fn runner_publication_point_cache_blocks_parent_policy_and_output_time_mismatch() {
let store_dir = tempfile::tempdir().expect("store dir");
let store = RocksStore::open(store_dir.path()).expect("open rocksdb");
let policy = Policy::default();
let validation_time = time::OffsetDateTime::UNIX_EPOCH + time::Duration::minutes(1);
let ca = publication_point_cache_fixture_ca();
seed_publication_point_cache_projection(&store, &policy, &ca, validation_time);
let timing = crate::analysis::timing::TimingHandle::new(crate::analysis::timing::TimingMeta {
recorded_at_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-01-01T00:00:00Z".to_string(),
tal_url: None,
db_path: None,
});
let mut parent_changed_ca = ca.clone();
parent_changed_ca.parent_manifest_rsync_uri =
Some("rsync://example.test/repo/other-parent.mft".to_string());
let parent_changed_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &FailingRsyncFetcher,
validation_time,
timing: Some(timing.clone()),
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: true,
};
assert!(
parent_changed_runner
.observe_or_reuse_publication_point_cache(
&parent_changed_ca,
Some("rrdp"),
Some("delta"),
7,
None,
&[]
)
.is_none()
);
assert_eq!(
timing
.counts_snapshot()
.get("publication_point_cache_miss_parent_context_mismatch")
.copied(),
Some(1)
);
let strict_policy = Policy {
strict: crate::policy::StrictPolicy::all(),
..Policy::default()
};
let policy_changed_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &strict_policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &FailingRsyncFetcher,
validation_time,
timing: Some(timing.clone()),
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: true,
};
assert!(
policy_changed_runner
.observe_or_reuse_publication_point_cache(
&ca,
Some("rrdp"),
Some("delta"),
7,
None,
&[]
)
.is_none()
);
assert_eq!(
timing
.counts_snapshot()
.get("publication_point_cache_miss_policy_mismatch")
.copied(),
Some(1)
);
let output_expired_runner = Rpkiv1PublicationPointRunner {
store: &store,
policy: &policy,
http_fetcher: &NeverHttpFetcher,
rsync_fetcher: &FailingRsyncFetcher,
validation_time: validation_time + time::Duration::minutes(40),
timing: Some(timing.clone()),
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None,
repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
ccr_accumulator: None,
persist_vcir: true,
enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: true,
};
assert!(
output_expired_runner
.observe_or_reuse_publication_point_cache(
&ca,
Some("rrdp"),
Some("delta"),
7,
None,
&[]
)
.is_none()
);
assert_eq!(
timing
.counts_snapshot()
.get("publication_point_cache_miss_output_time_gate")
.copied(),
Some(1)
);
}
#[test] #[test]
fn runner_rsync_dedup_skips_second_sync_for_same_base() { fn runner_rsync_dedup_skips_second_sync_for_same_base() {
let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@ -1789,6 +2252,8 @@ fn runner_rsync_dedup_skips_second_sync_for_same_base() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = runner.run_publication_point(&handle).expect("first run ok"); let first = runner.run_publication_point(&handle).expect("first run ok");
@ -1903,6 +2368,8 @@ fn runner_rsync_dedup_skips_second_sync_for_same_module_scope() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = runner.run_publication_point(&handle).expect("first run ok"); let first = runner.run_publication_point(&handle).expect("first run ok");
@ -2020,6 +2487,8 @@ fn runner_rsync_dedup_works_in_rsync_only_mode_even_when_rrdp_notify_exists() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = runner.run_publication_point(&handle).expect("first run ok"); let first = runner.run_publication_point(&handle).expect("first run ok");
@ -2108,6 +2577,8 @@ fn runner_when_repo_sync_fails_uses_current_instance_vcir_and_keeps_children_emp
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = ok_runner let first = ok_runner
.run_publication_point(&handle) .run_publication_point(&handle)
@ -2140,6 +2611,8 @@ fn runner_when_repo_sync_fails_uses_current_instance_vcir_and_keeps_children_emp
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let second = bad_runner let second = bad_runner
.run_publication_point(&handle) .run_publication_point(&handle)
@ -3089,6 +3562,8 @@ fn runner_roa_validation_cache_uses_projection_not_full_vcir_fallback() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: true, enable_roa_validation_cache: true,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
assert!( assert!(
@ -3821,6 +4296,8 @@ fn runner_dedup_paths_execute_with_timing_enabled() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let first = runner_rrdp let first = runner_rrdp
.run_publication_point(&handle) .run_publication_point(&handle)
@ -3856,6 +4333,8 @@ fn runner_dedup_paths_execute_with_timing_enabled() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let third = runner_rsync let third = runner_rsync
.run_publication_point(&handle) .run_publication_point(&handle)

View File

@ -66,6 +66,9 @@ impl LiveStats {
rpki::validation::manifest::PublicationPointSource::VcirCurrentInstance => { rpki::validation::manifest::PublicationPointSource::VcirCurrentInstance => {
self.publication_points_cached += 1 self.publication_points_cached += 1
} }
rpki::validation::manifest::PublicationPointSource::PublicationPointCache => {
self.publication_points_cached += 1
}
rpki::validation::manifest::PublicationPointSource::FailedFetchNoCache => {} rpki::validation::manifest::PublicationPointSource::FailedFetchNoCache => {}
} }
@ -187,6 +190,8 @@ fn apnic_tree_full_stats_serial() {
ccr_accumulator: None, ccr_accumulator: None,
persist_vcir: true, persist_vcir: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
}; };
let stats = RefCell::new(LiveStats::default()); let stats = RefCell::new(LiveStats::default());
@ -219,6 +224,8 @@ fn apnic_tree_full_stats_serial() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )

View File

@ -40,6 +40,8 @@ fn apnic_tree_depth1_processes_more_than_root() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -83,6 +85,8 @@ fn apnic_tree_root_only_processes_root_with_long_timeouts() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )

View File

@ -114,6 +114,8 @@ fn crl_mismatch_drops_publication_point_and_cites_rfc_sections() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )

View File

@ -118,6 +118,8 @@ fn run_tree_from_tal_url_entry_executes_and_records_failure_when_repo_empty() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -167,6 +169,8 @@ fn run_tree_from_tal_and_ta_der_entry_executes_and_records_failure_when_repo_emp
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -224,6 +228,8 @@ fn run_tree_from_tal_url_audit_entry_collects_no_publication_points_when_repo_em
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -269,6 +275,8 @@ fn run_tree_from_tal_and_ta_der_audit_entry_collects_no_publication_points_when_
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -322,6 +330,8 @@ fn run_tree_from_tal_url_audit_with_timing_records_phases_when_repo_empty() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
&timing, &timing,
@ -374,6 +384,8 @@ fn run_tree_from_tal_and_ta_der_audit_with_timing_records_phases_when_repo_empty
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
&timing, &timing,

View File

@ -308,6 +308,8 @@ fn tree_respects_max_depth_and_max_instances() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )
@ -325,6 +327,8 @@ fn tree_respects_max_depth_and_max_instances() {
persist_vcir: true, persist_vcir: true,
build_ccr_accumulator: true, build_ccr_accumulator: true,
enable_roa_validation_cache: false, enable_roa_validation_cache: false,
publication_point_cache_observe_only: false,
enable_publication_point_validation_cache: false,
enable_transport_request_prefetch: false, enable_transport_request_prefetch: false,
}, },
) )