全量同步测试增加download过程审计输出

This commit is contained in:
yuyr 2026-03-06 11:52:59 +08:00
parent 6276d13814
commit afc50364f8
18 changed files with 669 additions and 45 deletions

View File

@ -21,8 +21,8 @@ They are meant for **hands-on validation / acceptance runs**, not for CI.
- Writes:
- run log
- audit report JSON
- run meta JSON (includes durations)
- short summary Markdown (includes durations)
- run meta JSON (includes durations + download_stats)
- short summary Markdown (includes durations + download_stats)
- RocksDB key statistics (`db_stats --exact`)
- RRDP repo state dump (`rrdp_state_dump`)
@ -35,6 +35,24 @@ They are meant for **hands-on validation / acceptance runs**, not for CI.
- base vs delta report JSON
- base vs delta `rrdp_state_dump` TSV
- and includes a **duration comparison** (base vs delta) if the base meta JSON is available
- delta meta JSON includes download_stats copied from delta report JSON
## Audit report fields (report.json)
The `rpki` binary writes an audit report JSON with:
- `format_version: 2`
- `downloads`: per-download RRDP/rsync events (URI, timestamps, duration, ok/fail, error, bytes, objects stats)
- `download_stats`: aggregate counters (by kind)
These are useful for diagnosing why a run is slow (e.g. RRDP snapshot vs delta vs rsync fallback).
## Meta fields (meta.json)
The scripts generate `*_meta.json` next to `*_report.json` and include:
- `durations_secs`: wall-clock duration breakdown for the script steps
- `download_stats`: copied from `report_json.download_stats`
## Usage

View File

@ -266,6 +266,7 @@ base_state = parse_rrdp_state_tsv(base_state_path)
delta_state = parse_rrdp_state_tsv(delta_state_path)
base_meta = load_optional_json(base_meta_path_s)
download_stats = delta.get("download_stats") or {}
delta_meta = {
"recorded_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"tal_url": os.environ["TAL_URL"],
@ -283,6 +284,7 @@ delta_meta = {
"rrdp_state_dump": int(os.environ["STATE_DURATION_S"]),
"total_script": int(os.environ["TOTAL_DURATION_S"]),
},
"download_stats": download_stats,
}
delta_meta_path.write_text(json.dumps(delta_meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")

View File

@ -106,6 +106,7 @@ summary_path = Path(sys.argv[3])
rep = json.loads(report_path.read_text(encoding="utf-8"))
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
download_stats = rep.get("download_stats") or {}
meta = {
"recorded_at_utc": now,
"tal_url": os.environ["TAL_URL"],
@ -129,6 +130,7 @@ meta = {
"aspas": len(rep["aspas"]),
"audit_publication_points": len(rep["publication_points"]),
},
"download_stats": download_stats,
}
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
@ -153,6 +155,25 @@ for k,v in meta["durations_secs"].items():
lines.append(f"| {k} | {v} |\\n")
lines.append("\\n")
lines.append("## Download Stats\\n\\n")
lines.append("- raw events: `report_json.downloads`\\n")
lines.append("- aggregated: `report_json.download_stats` (copied into meta.json)\\n\\n")
def fmt_u(v):
if v is None:
return ""
return str(v)
by_kind = download_stats.get("by_kind") or {}
lines.append("| kind | ok | fail | duration_ms_total | bytes_total | objects_count_total | objects_bytes_total |\\n")
lines.append("|---|---:|---:|---:|---:|---:|---:|\\n")
for kind in sorted(by_kind.keys()):
st = by_kind[kind] or {}
lines.append(
f"| {kind} | {st.get('ok_total',0)} | {st.get('fail_total',0)} | {st.get('duration_ms_total',0)} | {fmt_u(st.get('bytes_total'))} | {fmt_u(st.get('objects_count_total'))} | {fmt_u(st.get('objects_bytes_total'))} |\\n"
)
lines.append("\\n")
summary_path.write_text("".join(lines), encoding="utf-8")
print(summary_path)
PY

View File

@ -98,6 +98,57 @@ pub struct AuditRunMeta {
pub validation_time_rfc3339_utc: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditDownloadKind {
RrdpNotification,
RrdpSnapshot,
RrdpDelta,
Rsync,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct AuditDownloadObjectsStat {
pub objects_count: u64,
pub objects_bytes_total: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct AuditDownloadEvent {
pub kind: AuditDownloadKind,
pub uri: String,
pub started_at_rfc3339_utc: String,
pub finished_at_rfc3339_utc: String,
pub duration_ms: u64,
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub objects: Option<AuditDownloadObjectsStat>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct AuditDownloadKindStats {
pub ok_total: u64,
pub fail_total: u64,
pub duration_ms_total: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_total: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub objects_count_total: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub objects_bytes_total: Option<u64>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct AuditDownloadStats {
pub events_total: u64,
/// Statistics keyed by serialized `AuditDownloadKind` string (e.g. "rrdp_snapshot").
pub by_kind: std::collections::BTreeMap<String, AuditDownloadKindStats>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct AuditReportV1 {
pub format_version: u32,
@ -110,6 +161,21 @@ pub struct AuditReportV1 {
pub aspas: Vec<AspaOutput>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct AuditReportV2 {
pub format_version: u32,
pub meta: AuditRunMeta,
pub policy: Policy,
pub tree: TreeSummary,
pub publication_points: Vec<PublicationPointAudit>,
pub vrps: Vec<VrpOutput>,
pub aspas: Vec<AspaOutput>,
pub downloads: Vec<AuditDownloadEvent>,
pub download_stats: AuditDownloadStats,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct VrpOutput {
pub asn: u32,

168
src/audit_downloads.rs Normal file
View File

@ -0,0 +1,168 @@
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::audit::{
AuditDownloadEvent, AuditDownloadKind, AuditDownloadKindStats, AuditDownloadObjectsStat,
AuditDownloadStats,
};
#[derive(Clone, Debug, Default)]
pub struct DownloadLogHandle {
inner: Arc<Mutex<Vec<AuditDownloadEvent>>>,
}
impl DownloadLogHandle {
pub fn new() -> Self {
Self::default()
}
pub fn record_event(&self, event: AuditDownloadEvent) {
self.inner.lock().expect("download log lock").push(event);
}
pub fn snapshot_events(&self) -> Vec<AuditDownloadEvent> {
self.inner.lock().expect("download log lock").clone()
}
pub fn stats_from_events(events: &[AuditDownloadEvent]) -> AuditDownloadStats {
let mut out = AuditDownloadStats {
events_total: events.len() as u64,
by_kind: BTreeMap::new(),
};
for e in events {
let kind_key = match e.kind {
AuditDownloadKind::RrdpNotification => "rrdp_notification",
AuditDownloadKind::RrdpSnapshot => "rrdp_snapshot",
AuditDownloadKind::RrdpDelta => "rrdp_delta",
AuditDownloadKind::Rsync => "rsync",
}
.to_string();
let st = out.by_kind.entry(kind_key).or_insert_with(|| AuditDownloadKindStats {
ok_total: 0,
fail_total: 0,
duration_ms_total: 0,
bytes_total: None,
objects_count_total: None,
objects_bytes_total: None,
});
if e.success {
st.ok_total = st.ok_total.saturating_add(1);
} else {
st.fail_total = st.fail_total.saturating_add(1);
}
st.duration_ms_total = st.duration_ms_total.saturating_add(e.duration_ms);
if let Some(b) = e.bytes {
st.bytes_total = Some(st.bytes_total.unwrap_or(0).saturating_add(b));
}
if let Some(objects) = &e.objects {
st.objects_count_total = Some(
st.objects_count_total
.unwrap_or(0)
.saturating_add(objects.objects_count),
);
st.objects_bytes_total = Some(
st.objects_bytes_total
.unwrap_or(0)
.saturating_add(objects.objects_bytes_total),
);
}
}
out
}
pub fn stats(&self) -> AuditDownloadStats {
let events = self.snapshot_events();
Self::stats_from_events(&events)
}
pub fn span_download<'a>(
&'a self,
kind: AuditDownloadKind,
uri: &'a str,
) -> DownloadSpanGuard<'a> {
DownloadSpanGuard {
handle: self,
kind,
uri,
start_instant: Instant::now(),
started_at: time::OffsetDateTime::now_utc(),
bytes: None,
objects: None,
error: None,
success: None,
}
}
}
pub struct DownloadSpanGuard<'a> {
handle: &'a DownloadLogHandle,
kind: AuditDownloadKind,
uri: &'a str,
start_instant: Instant,
started_at: time::OffsetDateTime,
bytes: Option<u64>,
objects: Option<AuditDownloadObjectsStat>,
error: Option<String>,
success: Option<bool>,
}
impl DownloadSpanGuard<'_> {
pub fn set_bytes(&mut self, bytes: u64) {
self.bytes = Some(bytes);
}
pub fn set_objects(&mut self, objects_count: u64, objects_bytes_total: u64) {
self.objects = Some(AuditDownloadObjectsStat {
objects_count,
objects_bytes_total,
});
}
pub fn set_ok(&mut self) {
self.success = Some(true);
}
pub fn set_err(&mut self, msg: impl Into<String>) {
self.success = Some(false);
self.error = Some(msg.into());
}
}
impl Drop for DownloadSpanGuard<'_> {
fn drop(&mut self) {
use time::format_description::well_known::Rfc3339;
let finished_at = time::OffsetDateTime::now_utc();
let dur = self.start_instant.elapsed();
let duration_ms = duration_to_ms(dur);
let started_at_rfc3339_utc = self
.started_at
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.unwrap_or_else(|_| "<format-error>".to_string());
let finished_at_rfc3339_utc = finished_at
.to_offset(time::UtcOffset::UTC)
.format(&Rfc3339)
.unwrap_or_else(|_| "<format-error>".to_string());
let success = self.success.unwrap_or(false);
let event = AuditDownloadEvent {
kind: self.kind.clone(),
uri: self.uri.to_string(),
started_at_rfc3339_utc,
finished_at_rfc3339_utc,
duration_ms,
success,
error: if success { None } else { self.error.clone() },
bytes: self.bytes,
objects: self.objects.clone(),
};
self.handle.record_event(event);
}
}
fn duration_to_ms(d: Duration) -> u64 {
let ms = d.as_millis();
ms.min(u128::from(u64::MAX)) as u64
}

View File

@ -2,7 +2,7 @@ use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate};
use crate::audit::{
AspaOutput, AuditReportV1, AuditRunMeta, AuditWarning, TreeSummary, VrpOutput,
AspaOutput, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary, VrpOutput,
format_roa_ip_prefix,
};
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
@ -239,7 +239,7 @@ fn read_policy(path: Option<&Path>) -> Result<Policy, String> {
}
}
fn write_json(path: &Path, report: &AuditReportV1) -> Result<(), String> {
fn write_json(path: &Path, report: &AuditReportV2) -> Result<(), String> {
let f = std::fs::File::create(path)
.map_err(|e| format!("create report file failed: {}: {e}", path.display()))?;
serde_json::to_writer_pretty(f, report)
@ -247,7 +247,7 @@ fn write_json(path: &Path, report: &AuditReportV1) -> Result<(), String> {
Ok(())
}
fn unique_rrdp_repos(report: &AuditReportV1) -> usize {
fn unique_rrdp_repos(report: &AuditReportV2) -> usize {
use std::collections::HashSet;
let mut set: HashSet<&str> = HashSet::new();
for pp in &report.publication_points {
@ -258,7 +258,7 @@ fn unique_rrdp_repos(report: &AuditReportV1) -> usize {
set.len()
}
fn print_summary(report: &AuditReportV1) {
fn print_summary(report: &AuditReportV2) {
let rrdp_repos = unique_rrdp_repos(report);
println!("RPKI stage2 serial run summary");
println!(
@ -291,7 +291,7 @@ fn build_report(
policy: &Policy,
validation_time: time::OffsetDateTime,
out: RunTreeFromTalAuditOutput,
) -> AuditReportV1 {
) -> AuditReportV2 {
use time::format_description::well_known::Rfc3339;
let validation_time_rfc3339_utc = validation_time
.to_offset(time::UtcOffset::UTC)
@ -319,8 +319,8 @@ fn build_report(
})
.collect::<Vec<_>>();
AuditReportV1 {
format_version: 1,
AuditReportV2 {
format_version: 2,
meta: AuditRunMeta {
validation_time_rfc3339_utc,
},
@ -333,6 +333,8 @@ fn build_report(
publication_points: out.publication_points,
vrps,
aspas,
downloads: out.downloads,
download_stats: out.download_stats,
}
}
@ -879,6 +881,8 @@ mod tests {
discovery,
tree,
publication_points: vec![pp1, pp2, pp3],
downloads: Vec::new(),
download_stats: crate::audit::AuditDownloadStats::default(),
};
let policy = Policy::default();
@ -894,8 +898,8 @@ mod tests {
#[test]
fn write_json_writes_report() {
let report = AuditReportV1 {
format_version: 1,
let report = AuditReportV2 {
format_version: 2,
meta: AuditRunMeta {
validation_time_rfc3339_utc: "2026-01-01T00:00:00Z".to_string(),
},
@ -908,6 +912,8 @@ mod tests {
publication_points: Vec::new(),
vrps: Vec::new(),
aspas: Vec::new(),
downloads: Vec::new(),
download_stats: crate::audit::AuditDownloadStats::default(),
};
let dir = tempfile::tempdir().expect("tmpdir");

View File

@ -5,6 +5,8 @@ pub mod analysis;
#[cfg(feature = "full")]
pub mod audit;
#[cfg(feature = "full")]
pub mod audit_downloads;
#[cfg(feature = "full")]
pub mod cli;
#[cfg(feature = "full")]
pub mod fetch;

View File

@ -1,9 +1,11 @@
use crate::analysis::timing::TimingHandle;
use crate::audit_downloads::DownloadLogHandle;
use crate::audit::{AuditDownloadKind};
use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use crate::policy::{Policy, SyncPreference};
use crate::report::{RfcRef, Warning};
use crate::storage::RocksStore;
use crate::sync::rrdp::sync_from_notification_with_timing;
use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log;
use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError};
use std::thread;
use std::time::Duration;
@ -56,10 +58,17 @@ pub fn sync_publication_point(
http_fetcher: &dyn HttpFetcher,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<RepoSyncResult, RepoSyncError> {
match (policy.sync_preference, rrdp_notification_uri) {
(SyncPreference::RrdpThenRsync, Some(notification_uri)) => {
match try_rrdp_sync_with_retry(store, notification_uri, http_fetcher, timing) {
match try_rrdp_sync_with_retry(
store,
notification_uri,
http_fetcher,
timing,
download_log,
) {
Ok(written) => {
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rrdp_ok_total", 1);
@ -80,8 +89,13 @@ pub fn sync_publication_point(
.with_rfc_refs(&[RfcRef("RFC 8182 §3.4.5")])
.with_context(notification_uri),
];
let written =
rsync_sync_into_raw_objects(store, rsync_base_uri, rsync_fetcher, timing)?;
let written = rsync_sync_into_raw_objects(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_fallback_ok_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
@ -95,8 +109,13 @@ pub fn sync_publication_point(
}
}
_ => {
let written =
rsync_sync_into_raw_objects(store, rsync_base_uri, rsync_fetcher, timing)?;
let written = rsync_sync_into_raw_objects(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_direct_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
@ -115,6 +134,7 @@ fn try_rrdp_sync(
notification_uri: &str,
http_fetcher: &dyn HttpFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RrdpSyncError> {
let notification_xml = {
let _step = timing
@ -123,17 +143,27 @@ fn try_rrdp_sync(
let _total = timing
.as_ref()
.map(|t| t.span_phase("rrdp_fetch_notification_total"));
let mut dl_span = download_log.map(|dl| {
dl.span_download(AuditDownloadKind::RrdpNotification, notification_uri)
});
match http_fetcher.fetch(notification_uri) {
Ok(v) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_notification_fetch_ok_total", 1);
}
if let Some(s) = dl_span.as_mut() {
s.set_bytes(v.len() as u64);
s.set_ok();
}
v
}
Err(e) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_notification_fetch_fail_total", 1);
}
if let Some(s) = dl_span.as_mut() {
s.set_err(e.clone());
}
return Err(RrdpSyncError::Fetch(e));
}
}
@ -145,12 +175,13 @@ fn try_rrdp_sync(
);
}
sync_from_notification_with_timing(
sync_from_notification_with_timing_and_download_log(
store,
notification_uri,
&notification_xml,
http_fetcher,
timing,
download_log,
)
}
@ -183,6 +214,7 @@ fn try_rrdp_sync_with_retry(
notification_uri: &str,
http_fetcher: &dyn HttpFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RrdpSyncError> {
let backoffs = rrdp_retry_backoffs();
let max_attempts = backoffs.len().saturating_add(1).max(1);
@ -194,7 +226,7 @@ fn try_rrdp_sync_with_retry(
t.record_count("rrdp_retry_attempt_total", 1);
}
match try_rrdp_sync(store, notification_uri, http_fetcher, timing) {
match try_rrdp_sync(store, notification_uri, http_fetcher, timing, download_log) {
Ok(written) => {
if attempt > 1 {
if let Some(t) = timing.as_ref() {
@ -244,12 +276,31 @@ fn rsync_sync_into_raw_objects(
rsync_base_uri: &str,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RepoSyncError> {
let _s = timing
.as_ref()
.map(|t| t.span_rrdp_repo_step(rsync_base_uri, "rsync_fetch_objects"));
let _p = timing.as_ref().map(|t| t.span_phase("rsync_fetch_total"));
let objects = rsync_fetcher.fetch_objects(rsync_base_uri)?;
let mut dl_span =
download_log.map(|dl| dl.span_download(AuditDownloadKind::Rsync, rsync_base_uri));
let objects = match rsync_fetcher.fetch_objects(rsync_base_uri) {
Ok(v) => {
let bytes_total: u64 = v.iter().map(|(_u, b)| b.len() as u64).sum::<u64>();
if let Some(s) = dl_span.as_mut() {
s.set_objects(v.len() as u64, bytes_total);
s.set_bytes(bytes_total);
s.set_ok();
}
v
}
Err(e) => {
if let Some(s) = dl_span.as_mut() {
s.set_err(e.to_string());
}
return Err(e.into());
}
};
if let Some(t) = timing.as_ref() {
t.record_count("rsync_objects_fetched_total", objects.len() as u64);
let bytes_total: u64 = objects.iter().map(|(_u, b)| b.len() as u64).sum::<u64>();
@ -270,6 +321,7 @@ mod tests {
use crate::analysis::timing::{TimingHandle, TimingMeta};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::sync::rrdp::Fetcher as HttpFetcher;
use crate::sync::rrdp::RrdpState;
use base64::Engine;
use sha2::Digest;
use std::collections::HashMap;
@ -364,6 +416,7 @@ mod tests {
let http = DummyHttpFetcher;
let rsync = LocalDirRsyncFetcher::new(&repo_dir);
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
@ -372,12 +425,22 @@ mod tests {
&http,
&rsync,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 3);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, AuditDownloadKind::Rsync);
assert!(events[0].success);
assert_eq!(events[0].bytes, Some(9));
let objects = events[0].objects.as_ref().expect("objects stat");
assert_eq!(objects.objects_count, 3);
assert_eq!(objects.objects_bytes_total, 9);
assert_eq!(
store.get_raw("rsync://example.test/repo/a.mft").unwrap(),
Some(b"mft".to_vec())
@ -481,6 +544,7 @@ mod tests {
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
@ -489,6 +553,7 @@ mod tests {
&http,
&PanicRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
@ -498,6 +563,30 @@ mod tests {
Some(published_bytes.to_vec())
);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4, "expected 3x notification + 1x snapshot");
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification)
.count(),
3
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification && !e.success)
.count(),
2
);
let v = timing_to_json(temp.path(), &timing);
let counts = v.get("counts").expect("counts");
assert_eq!(
@ -542,11 +631,12 @@ mod tests {
&[(published_uri, published_bytes)],
);
// Intentionally wrong hash to trigger protocol error (SnapshotHashMismatch).
let wrong_hash = "00".repeat(32);
let notif = notification_xml(
"9df4b597-af9e-4dca-bdda-719cce2c4e28",
1,
snapshot_uri,
"00",
&wrong_hash,
);
let mut map = HashMap::new();
@ -569,6 +659,7 @@ mod tests {
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
@ -577,6 +668,7 @@ mod tests {
&http,
&EmptyRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
@ -588,6 +680,15 @@ mod tests {
"expected RRDP fallback warning"
);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 3, "expected notification + snapshot + rsync fallback");
assert_eq!(events[0].kind, AuditDownloadKind::RrdpNotification);
assert!(events[0].success);
assert_eq!(events[1].kind, AuditDownloadKind::RrdpSnapshot);
assert!(events[1].success);
assert_eq!(events[2].kind, AuditDownloadKind::Rsync);
assert!(events[2].success);
let v = timing_to_json(temp.path(), &timing);
let counts = v.get("counts").expect("counts");
assert_eq!(
@ -615,4 +716,112 @@ mod tests {
Some(1)
);
}
#[test]
fn rrdp_delta_fetches_are_logged_even_if_snapshot_fallback_is_used() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let timing = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
tal_url: None,
db_path: Some(store_dir.to_string_lossy().into_owned()),
});
let notification_uri = "https://example.test/notification.xml";
let snapshot_uri = "https://example.test/snapshot.xml";
let delta_2_uri = "https://example.test/delta_2.xml";
let delta_3_uri = "https://example.test/delta_3.xml";
let published_uri = "rsync://example.test/repo/a.mft";
let published_bytes = b"x";
let sid = "9df4b597-af9e-4dca-bdda-719cce2c4e28";
// Seed old RRDP state so sync_from_notification tries deltas (RFC 8182 §3.4.1).
let state = RrdpState {
session_id: sid.to_string(),
serial: 1,
};
let state_bytes = state.encode().expect("encode state");
store
.put_rrdp_state(notification_uri, &state_bytes)
.expect("seed state");
let delta_2 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="2"></delta>"#
)
.into_bytes();
let delta_3 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="3"></delta>"#
)
.into_bytes();
let delta_2_hash = hex::encode(sha2::Sha256::digest(&delta_2));
let delta_3_hash = hex::encode(sha2::Sha256::digest(&delta_3));
let snapshot = snapshot_xml(sid, 3, &[(published_uri, published_bytes)]);
let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot));
let notif = format!(
r#"<notification xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="3"><snapshot uri="{snapshot_uri}" hash="{snapshot_hash}"/><delta serial="2" uri="{delta_2_uri}" hash="{delta_2_hash}"/><delta serial="3" uri="{delta_3_uri}" hash="{delta_3_hash}"/></notification>"#
)
.into_bytes();
let mut map = HashMap::new();
map.insert(notification_uri.to_string(), notif);
map.insert(snapshot_uri.to_string(), snapshot);
map.insert(delta_2_uri.to_string(), delta_2);
map.insert(delta_3_uri.to_string(), delta_3);
let http = MapFetcher { map };
let policy = Policy {
sync_preference: SyncPreference::RrdpThenRsync,
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
Some(notification_uri),
"rsync://example.test/repo/",
&http,
&PanicRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 1);
assert_eq!(
store.get_raw(published_uri).unwrap(),
Some(published_bytes.to_vec())
);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpDelta)
.count(),
2
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot)
.count(),
1
);
assert!(events.iter().all(|e| e.success));
}
}

View File

@ -1,4 +1,6 @@
use crate::analysis::timing::TimingHandle;
use crate::audit::AuditDownloadKind;
use crate::audit_downloads::DownloadLogHandle;
use crate::storage::RocksStore;
use crate::storage::RrdpDeltaOp;
use base64::Engine;
@ -432,7 +434,14 @@ pub fn sync_from_notification_snapshot(
notification_xml: &[u8],
fetcher: &dyn Fetcher,
) -> RrdpSyncResult<usize> {
sync_from_notification_snapshot_inner(store, notification_uri, notification_xml, fetcher, None)
sync_from_notification_snapshot_inner(
store,
notification_uri,
notification_xml,
fetcher,
None,
None,
)
}
pub fn sync_from_notification_snapshot_with_timing(
@ -448,6 +457,25 @@ pub fn sync_from_notification_snapshot_with_timing(
notification_xml,
fetcher,
timing,
None,
)
}
pub fn sync_from_notification_snapshot_with_timing_and_download_log(
store: &RocksStore,
notification_uri: &str,
notification_xml: &[u8],
fetcher: &dyn Fetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> RrdpSyncResult<usize> {
sync_from_notification_snapshot_inner(
store,
notification_uri,
notification_xml,
fetcher,
timing,
download_log,
)
}
@ -457,6 +485,7 @@ fn sync_from_notification_snapshot_inner(
notification_xml: &[u8],
fetcher: &dyn Fetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> RrdpSyncResult<usize> {
let _parse_step = timing
.as_ref()
@ -470,16 +499,30 @@ fn sync_from_notification_snapshot_inner(
.as_ref()
.map(|t| t.span_rrdp_repo_step(notification_uri, "fetch_snapshot"));
let _fetch_total = timing.as_ref().map(|t| t.span_phase("rrdp_fetch_snapshot_total"));
let snapshot_xml = fetcher.fetch(&notif.snapshot_uri).map_err(|e| {
let mut dl_span = download_log
.map(|dl| dl.span_download(AuditDownloadKind::RrdpSnapshot, &notif.snapshot_uri));
let snapshot_xml = match fetcher.fetch(&notif.snapshot_uri) {
Ok(v) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_ok_total", 1);
t.record_count("rrdp_snapshot_bytes_total", v.len() as u64);
}
if let Some(s) = dl_span.as_mut() {
s.set_bytes(v.len() as u64);
s.set_ok();
}
v
}
Err(e) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_fail_total", 1);
}
RrdpSyncError::Fetch(e)
})?;
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_ok_total", 1);
t.record_count("rrdp_snapshot_bytes_total", snapshot_xml.len() as u64);
if let Some(s) = dl_span.as_mut() {
s.set_err(e.clone());
}
return Err(RrdpSyncError::Fetch(e));
}
};
drop(_fetch_step);
drop(_fetch_total);
@ -535,7 +578,7 @@ pub fn sync_from_notification(
notification_xml: &[u8],
fetcher: &dyn Fetcher,
) -> RrdpSyncResult<usize> {
sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, None)
sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, None, None)
}
pub fn sync_from_notification_with_timing(
@ -545,7 +588,25 @@ pub fn sync_from_notification_with_timing(
fetcher: &dyn Fetcher,
timing: Option<&TimingHandle>,
) -> RrdpSyncResult<usize> {
sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, timing)
sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, timing, None)
}
pub fn sync_from_notification_with_timing_and_download_log(
store: &RocksStore,
notification_uri: &str,
notification_xml: &[u8],
fetcher: &dyn Fetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> RrdpSyncResult<usize> {
sync_from_notification_inner(
store,
notification_uri,
notification_xml,
fetcher,
timing,
download_log,
)
}
fn sync_from_notification_inner(
@ -554,6 +615,7 @@ fn sync_from_notification_inner(
notification_xml: &[u8],
fetcher: &dyn Fetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> RrdpSyncResult<usize> {
let _parse_step = timing
.as_ref()
@ -632,18 +694,27 @@ fn sync_from_notification_inner(
}
};
let mut dl_span = download_log
.map(|dl| dl.span_download(AuditDownloadKind::RrdpDelta, &dref.uri));
match fetcher.fetch(&dref.uri) {
Ok(bytes) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_delta_fetch_ok_total", 1);
t.record_count("rrdp_delta_bytes_total", bytes.len() as u64);
}
if let Some(s) = dl_span.as_mut() {
s.set_bytes(bytes.len() as u64);
s.set_ok();
}
fetched.push((serial, dref.hash_sha256, bytes))
}
Err(_) => {
Err(e) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_delta_fetch_fail_total", 1);
}
if let Some(s) = dl_span.as_mut() {
s.set_err(e.clone());
}
fetch_ok = false;
break;
}
@ -710,16 +781,30 @@ fn sync_from_notification_inner(
.as_ref()
.map(|t| t.span_rrdp_repo_step(notification_uri, "fetch_snapshot"));
let _fetch_total = timing.as_ref().map(|t| t.span_phase("rrdp_fetch_snapshot_total"));
let snapshot_xml = fetcher.fetch(&notif.snapshot_uri).map_err(|e| {
let mut dl_span = download_log
.map(|dl| dl.span_download(AuditDownloadKind::RrdpSnapshot, &notif.snapshot_uri));
let snapshot_xml = match fetcher.fetch(&notif.snapshot_uri) {
Ok(v) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_ok_total", 1);
t.record_count("rrdp_snapshot_bytes_total", v.len() as u64);
}
if let Some(s) = dl_span.as_mut() {
s.set_bytes(v.len() as u64);
s.set_ok();
}
v
}
Err(e) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_fail_total", 1);
}
RrdpSyncError::Fetch(e)
})?;
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_snapshot_fetch_ok_total", 1);
t.record_count("rrdp_snapshot_bytes_total", snapshot_xml.len() as u64);
if let Some(s) = dl_span.as_mut() {
s.set_err(e.clone());
}
return Err(RrdpSyncError::Fetch(e));
}
};
drop(_fetch_step);
drop(_fetch_total);

View File

@ -52,6 +52,7 @@ pub fn run_publication_point_once(
http_fetcher,
rsync_fetcher,
None,
None,
)?;
let publication_point = process_manifest_publication_point(

View File

@ -2,6 +2,7 @@ use url::Url;
use crate::analysis::timing::TimingHandle;
use crate::audit::PublicationPointAudit;
use crate::audit_downloads::DownloadLogHandle;
use crate::data_model::ta::TrustAnchor;
use crate::sync::rrdp::Fetcher;
use crate::validation::from_tal::{
@ -27,6 +28,8 @@ pub struct RunTreeFromTalAuditOutput {
pub discovery: DiscoveredRootCaInstance,
pub tree: TreeRunOutput,
pub publication_points: Vec<PublicationPointAudit>,
pub downloads: Vec<crate::audit::AuditDownloadEvent>,
pub download_stats: crate::audit::AuditDownloadStats,
}
#[derive(Debug, thiserror::Error)]
@ -75,6 +78,7 @@ pub fn run_tree_from_tal_url_serial(
rsync_fetcher,
validation_time,
timing: None,
download_log: None,
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -99,6 +103,7 @@ pub fn run_tree_from_tal_url_serial_audit(
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> {
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
store,
policy,
@ -106,6 +111,7 @@ pub fn run_tree_from_tal_url_serial_audit(
rsync_fetcher,
validation_time,
timing: None,
download_log: Some(download_log.clone()),
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -119,10 +125,14 @@ pub fn run_tree_from_tal_url_serial_audit(
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}
@ -140,6 +150,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
drop(_tal);
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
store,
policy,
@ -147,6 +158,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
rsync_fetcher,
validation_time,
timing: Some(timing.clone()),
download_log: Some(download_log.clone()),
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -161,10 +173,14 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}
@ -189,6 +205,7 @@ pub fn run_tree_from_tal_and_ta_der_serial(
rsync_fetcher,
validation_time,
timing: None,
download_log: None,
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -216,6 +233,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
let discovery =
discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
store,
policy,
@ -223,6 +241,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
rsync_fetcher,
validation_time,
timing: None,
download_log: Some(download_log.clone()),
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -236,10 +255,14 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}
@ -260,6 +283,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
drop(_tal);
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
store,
policy,
@ -267,6 +291,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
rsync_fetcher,
validation_time,
timing: Some(timing.clone()),
download_log: Some(download_log.clone()),
revalidate_only: config.revalidate_only,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -281,9 +306,13 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}

View File

@ -3,6 +3,7 @@ use crate::audit::{
AuditObjectKind, AuditObjectResult, AuditWarning, ObjectAuditEntry, PublicationPointAudit,
sha256_hex, sha256_hex_from_32,
};
use crate::audit_downloads::DownloadLogHandle;
use crate::fetch::rsync::RsyncFetcher;
use crate::policy::Policy;
use crate::report::{RfcRef, Warning};
@ -33,6 +34,7 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
pub rsync_fetcher: &'a dyn RsyncFetcher,
pub validation_time: time::OffsetDateTime,
pub timing: Option<TimingHandle>,
pub download_log: Option<DownloadLogHandle>,
pub revalidate_only: bool,
/// In-run RRDP dedup: when RRDP is enabled, only sync each `rrdp_notification_uri` once per run.
///
@ -157,6 +159,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
self.http_fetcher,
self.rsync_fetcher,
self.timing.as_ref(),
self.download_log.as_ref(),
) {
Ok(res) => {
// Populate rsync dedup cache when we actually used rsync.
@ -1390,6 +1393,7 @@ authorityKeyIdentifier = keyid:always
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
revalidate_only: false,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -1502,6 +1506,7 @@ authorityKeyIdentifier = keyid:always
rsync_fetcher: &rsync,
validation_time,
timing: None,
download_log: None,
revalidate_only: false,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -1576,6 +1581,7 @@ authorityKeyIdentifier = keyid:always
rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir),
validation_time,
timing: None,
download_log: None,
revalidate_only: false,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),
@ -1599,6 +1605,7 @@ authorityKeyIdentifier = keyid:always
rsync_fetcher: &FailingRsyncFetcher,
validation_time,
timing: None,
download_log: None,
revalidate_only: false,
rrdp_dedup: false,
rrdp_repo_cache: Mutex::new(HashMap::new()),

View File

@ -114,6 +114,7 @@ fn apnic_live_bootstrap_snapshot_and_fetch_cache_pp_pack_to_persistent_db() {
&http,
&rsync,
None,
None,
)
.expect("repo sync");

View File

@ -167,6 +167,7 @@ fn apnic_tree_full_stats_serial() {
rsync_fetcher: &rsync,
validation_time,
timing: None,
download_log: None,
revalidate_only: false,
rrdp_dedup: true,
rrdp_repo_cache: std::sync::Mutex::new(std::collections::HashMap::new()),

View File

@ -37,5 +37,5 @@ fn cli_run_offline_mode_executes_and_writes_json() {
let bytes = std::fs::read(&report_path).expect("read report json");
let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse report json");
assert_eq!(v["format_version"], 1);
assert_eq!(v["format_version"], 2);
}

View File

@ -44,10 +44,12 @@ fn cli_offline_smoke_writes_report_json() {
let bytes = std::fs::read(&report_path).expect("read report json");
let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse report json");
assert_eq!(v["format_version"], 1);
assert_eq!(v["format_version"], 2);
assert!(v.get("policy").is_some());
assert!(v.get("tree").is_some());
assert!(v.get("publication_points").is_some());
assert!(v.get("vrps").is_some());
assert!(v.get("aspas").is_some());
assert!(v.get("downloads").is_some());
assert!(v.get("download_stats").is_some());
}

View File

@ -92,6 +92,7 @@ fn build_fetch_cache_pp_from_local_rsync_fixture(
&NoopHttpFetcher,
&LocalDirRsyncFetcher::new(dir),
None,
None,
)
.expect("sync into raw_objects");

View File

@ -84,6 +84,7 @@ fn repo_sync_uses_rrdp_when_available() {
&http_fetcher,
&rsync_fetcher,
None,
None,
)
.expect("sync");
@ -137,6 +138,7 @@ fn repo_sync_skips_snapshot_when_state_unchanged() {
&http_fetcher,
&rsync_fetcher,
None,
None,
)
.expect("sync 1");
assert_eq!(out1.source, RepoSyncSource::Rrdp);
@ -150,6 +152,7 @@ fn repo_sync_skips_snapshot_when_state_unchanged() {
&http_fetcher,
&rsync_fetcher,
None,
None,
)
.expect("sync 2");
assert_eq!(out2.source, RepoSyncSource::Rrdp);
@ -203,6 +206,7 @@ fn repo_sync_falls_back_to_rsync_on_rrdp_failure() {
&http_fetcher,
&rsync_fetcher,
None,
None,
)
.expect("fallback sync");
@ -252,6 +256,7 @@ fn repo_sync_rsync_populates_raw_objects() {
&http_fetcher,
&rsync_fetcher,
None,
None,
)
.expect("rsync-only sync");