diff --git a/scripts/manual_sync/README.md b/scripts/manual_sync/README.md index da97773..cceed1f 100644 --- a/scripts/manual_sync/README.md +++ b/scripts/manual_sync/README.md @@ -21,8 +21,8 @@ They are meant for **hands-on validation / acceptance runs**, not for CI. - Writes: - run log - audit report JSON - - run meta JSON (includes durations) - - short summary Markdown (includes durations) + - run meta JSON (includes durations + download_stats) + - short summary Markdown (includes durations + download_stats) - RocksDB key statistics (`db_stats --exact`) - RRDP repo state dump (`rrdp_state_dump`) @@ -35,6 +35,24 @@ They are meant for **hands-on validation / acceptance runs**, not for CI. - base vs delta report JSON - base vs delta `rrdp_state_dump` TSV - and includes a **duration comparison** (base vs delta) if the base meta JSON is available + - delta meta JSON includes download_stats copied from delta report JSON + +## Audit report fields (report.json) + +The `rpki` binary writes an audit report JSON with: + +- `format_version: 2` +- `downloads`: per-download RRDP/rsync events (URI, timestamps, duration, ok/fail, error, bytes, objects stats) +- `download_stats`: aggregate counters (by kind) + +These are useful for diagnosing why a run is slow (e.g. RRDP snapshot vs delta vs rsync fallback). + +## Meta fields (meta.json) + +The scripts generate `*_meta.json` next to `*_report.json` and include: + +- `durations_secs`: wall-clock duration breakdown for the script steps +- `download_stats`: copied from `report_json.download_stats` ## Usage diff --git a/scripts/manual_sync/delta_sync.sh b/scripts/manual_sync/delta_sync.sh index d870de3..4b76399 100755 --- a/scripts/manual_sync/delta_sync.sh +++ b/scripts/manual_sync/delta_sync.sh @@ -266,6 +266,7 @@ base_state = parse_rrdp_state_tsv(base_state_path) delta_state = parse_rrdp_state_tsv(delta_state_path) base_meta = load_optional_json(base_meta_path_s) +download_stats = delta.get("download_stats") or {} delta_meta = { "recorded_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "tal_url": os.environ["TAL_URL"], @@ -283,6 +284,7 @@ delta_meta = { "rrdp_state_dump": int(os.environ["STATE_DURATION_S"]), "total_script": int(os.environ["TOTAL_DURATION_S"]), }, + "download_stats": download_stats, } delta_meta_path.write_text(json.dumps(delta_meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") diff --git a/scripts/manual_sync/full_sync.sh b/scripts/manual_sync/full_sync.sh index 196ac4c..a12c00b 100755 --- a/scripts/manual_sync/full_sync.sh +++ b/scripts/manual_sync/full_sync.sh @@ -106,6 +106,7 @@ summary_path = Path(sys.argv[3]) rep = json.loads(report_path.read_text(encoding="utf-8")) now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") +download_stats = rep.get("download_stats") or {} meta = { "recorded_at_utc": now, "tal_url": os.environ["TAL_URL"], @@ -129,6 +130,7 @@ meta = { "aspas": len(rep["aspas"]), "audit_publication_points": len(rep["publication_points"]), }, + "download_stats": download_stats, } meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") @@ -153,6 +155,25 @@ for k,v in meta["durations_secs"].items(): lines.append(f"| {k} | {v} |\\n") lines.append("\\n") +lines.append("## Download Stats\\n\\n") +lines.append("- raw events: `report_json.downloads`\\n") +lines.append("- aggregated: `report_json.download_stats` (copied into meta.json)\\n\\n") + +def fmt_u(v): + if v is None: + return "" + return str(v) + +by_kind = download_stats.get("by_kind") or {} +lines.append("| kind | ok | fail | duration_ms_total | bytes_total | objects_count_total | objects_bytes_total |\\n") +lines.append("|---|---:|---:|---:|---:|---:|---:|\\n") +for kind in sorted(by_kind.keys()): + st = by_kind[kind] or {} + lines.append( + f"| {kind} | {st.get('ok_total',0)} | {st.get('fail_total',0)} | {st.get('duration_ms_total',0)} | {fmt_u(st.get('bytes_total'))} | {fmt_u(st.get('objects_count_total'))} | {fmt_u(st.get('objects_bytes_total'))} |\\n" + ) +lines.append("\\n") + summary_path.write_text("".join(lines), encoding="utf-8") print(summary_path) PY diff --git a/src/audit.rs b/src/audit.rs index e041cf4..e161f69 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -98,6 +98,57 @@ pub struct AuditRunMeta { pub validation_time_rfc3339_utc: String, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AuditDownloadKind { + RrdpNotification, + RrdpSnapshot, + RrdpDelta, + Rsync, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct AuditDownloadObjectsStat { + pub objects_count: u64, + pub objects_bytes_total: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct AuditDownloadEvent { + pub kind: AuditDownloadKind, + pub uri: String, + pub started_at_rfc3339_utc: String, + pub finished_at_rfc3339_utc: String, + pub duration_ms: u64, + pub success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub objects: Option, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct AuditDownloadKindStats { + pub ok_total: u64, + pub fail_total: u64, + pub duration_ms_total: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub bytes_total: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub objects_count_total: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub objects_bytes_total: Option, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct AuditDownloadStats { + pub events_total: u64, + /// Statistics keyed by serialized `AuditDownloadKind` string (e.g. "rrdp_snapshot"). + pub by_kind: std::collections::BTreeMap, +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize)] pub struct AuditReportV1 { pub format_version: u32, @@ -110,6 +161,21 @@ pub struct AuditReportV1 { pub aspas: Vec, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct AuditReportV2 { + pub format_version: u32, + pub meta: AuditRunMeta, + pub policy: Policy, + pub tree: TreeSummary, + pub publication_points: Vec, + + pub vrps: Vec, + pub aspas: Vec, + + pub downloads: Vec, + pub download_stats: AuditDownloadStats, +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize)] pub struct VrpOutput { pub asn: u32, diff --git a/src/audit_downloads.rs b/src/audit_downloads.rs new file mode 100644 index 0000000..5860947 --- /dev/null +++ b/src/audit_downloads.rs @@ -0,0 +1,168 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use crate::audit::{ + AuditDownloadEvent, AuditDownloadKind, AuditDownloadKindStats, AuditDownloadObjectsStat, + AuditDownloadStats, +}; + +#[derive(Clone, Debug, Default)] +pub struct DownloadLogHandle { + inner: Arc>>, +} + +impl DownloadLogHandle { + pub fn new() -> Self { + Self::default() + } + + pub fn record_event(&self, event: AuditDownloadEvent) { + self.inner.lock().expect("download log lock").push(event); + } + + pub fn snapshot_events(&self) -> Vec { + self.inner.lock().expect("download log lock").clone() + } + + pub fn stats_from_events(events: &[AuditDownloadEvent]) -> AuditDownloadStats { + let mut out = AuditDownloadStats { + events_total: events.len() as u64, + by_kind: BTreeMap::new(), + }; + for e in events { + let kind_key = match e.kind { + AuditDownloadKind::RrdpNotification => "rrdp_notification", + AuditDownloadKind::RrdpSnapshot => "rrdp_snapshot", + AuditDownloadKind::RrdpDelta => "rrdp_delta", + AuditDownloadKind::Rsync => "rsync", + } + .to_string(); + + let st = out.by_kind.entry(kind_key).or_insert_with(|| AuditDownloadKindStats { + ok_total: 0, + fail_total: 0, + duration_ms_total: 0, + bytes_total: None, + objects_count_total: None, + objects_bytes_total: None, + }); + if e.success { + st.ok_total = st.ok_total.saturating_add(1); + } else { + st.fail_total = st.fail_total.saturating_add(1); + } + st.duration_ms_total = st.duration_ms_total.saturating_add(e.duration_ms); + if let Some(b) = e.bytes { + st.bytes_total = Some(st.bytes_total.unwrap_or(0).saturating_add(b)); + } + if let Some(objects) = &e.objects { + st.objects_count_total = Some( + st.objects_count_total + .unwrap_or(0) + .saturating_add(objects.objects_count), + ); + st.objects_bytes_total = Some( + st.objects_bytes_total + .unwrap_or(0) + .saturating_add(objects.objects_bytes_total), + ); + } + } + out + } + + pub fn stats(&self) -> AuditDownloadStats { + let events = self.snapshot_events(); + Self::stats_from_events(&events) + } + + pub fn span_download<'a>( + &'a self, + kind: AuditDownloadKind, + uri: &'a str, + ) -> DownloadSpanGuard<'a> { + DownloadSpanGuard { + handle: self, + kind, + uri, + start_instant: Instant::now(), + started_at: time::OffsetDateTime::now_utc(), + bytes: None, + objects: None, + error: None, + success: None, + } + } +} + +pub struct DownloadSpanGuard<'a> { + handle: &'a DownloadLogHandle, + kind: AuditDownloadKind, + uri: &'a str, + start_instant: Instant, + started_at: time::OffsetDateTime, + bytes: Option, + objects: Option, + error: Option, + success: Option, +} + +impl DownloadSpanGuard<'_> { + pub fn set_bytes(&mut self, bytes: u64) { + self.bytes = Some(bytes); + } + + pub fn set_objects(&mut self, objects_count: u64, objects_bytes_total: u64) { + self.objects = Some(AuditDownloadObjectsStat { + objects_count, + objects_bytes_total, + }); + } + + pub fn set_ok(&mut self) { + self.success = Some(true); + } + + pub fn set_err(&mut self, msg: impl Into) { + self.success = Some(false); + self.error = Some(msg.into()); + } +} + +impl Drop for DownloadSpanGuard<'_> { + fn drop(&mut self) { + use time::format_description::well_known::Rfc3339; + let finished_at = time::OffsetDateTime::now_utc(); + let dur = self.start_instant.elapsed(); + let duration_ms = duration_to_ms(dur); + let started_at_rfc3339_utc = self + .started_at + .to_offset(time::UtcOffset::UTC) + .format(&Rfc3339) + .unwrap_or_else(|_| "".to_string()); + let finished_at_rfc3339_utc = finished_at + .to_offset(time::UtcOffset::UTC) + .format(&Rfc3339) + .unwrap_or_else(|_| "".to_string()); + let success = self.success.unwrap_or(false); + let event = AuditDownloadEvent { + kind: self.kind.clone(), + uri: self.uri.to_string(), + started_at_rfc3339_utc, + finished_at_rfc3339_utc, + duration_ms, + success, + error: if success { None } else { self.error.clone() }, + bytes: self.bytes, + objects: self.objects.clone(), + }; + self.handle.record_event(event); + } +} + +fn duration_to_ms(d: Duration) -> u64 { + let ms = d.as_millis(); + ms.min(u128::from(u64::MAX)) as u64 +} + diff --git a/src/cli.rs b/src/cli.rs index 71e181d..a2d39f7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -2,7 +2,7 @@ use std::path::{Path, PathBuf}; use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate}; use crate::audit::{ - AspaOutput, AuditReportV1, AuditRunMeta, AuditWarning, TreeSummary, VrpOutput, + AspaOutput, AuditReportV2, AuditRunMeta, AuditWarning, TreeSummary, VrpOutput, format_roa_ip_prefix, }; use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig}; @@ -239,7 +239,7 @@ fn read_policy(path: Option<&Path>) -> Result { } } -fn write_json(path: &Path, report: &AuditReportV1) -> Result<(), String> { +fn write_json(path: &Path, report: &AuditReportV2) -> Result<(), String> { let f = std::fs::File::create(path) .map_err(|e| format!("create report file failed: {}: {e}", path.display()))?; serde_json::to_writer_pretty(f, report) @@ -247,7 +247,7 @@ fn write_json(path: &Path, report: &AuditReportV1) -> Result<(), String> { Ok(()) } -fn unique_rrdp_repos(report: &AuditReportV1) -> usize { +fn unique_rrdp_repos(report: &AuditReportV2) -> usize { use std::collections::HashSet; let mut set: HashSet<&str> = HashSet::new(); for pp in &report.publication_points { @@ -258,7 +258,7 @@ fn unique_rrdp_repos(report: &AuditReportV1) -> usize { set.len() } -fn print_summary(report: &AuditReportV1) { +fn print_summary(report: &AuditReportV2) { let rrdp_repos = unique_rrdp_repos(report); println!("RPKI stage2 serial run summary"); println!( @@ -291,7 +291,7 @@ fn build_report( policy: &Policy, validation_time: time::OffsetDateTime, out: RunTreeFromTalAuditOutput, -) -> AuditReportV1 { +) -> AuditReportV2 { use time::format_description::well_known::Rfc3339; let validation_time_rfc3339_utc = validation_time .to_offset(time::UtcOffset::UTC) @@ -319,8 +319,8 @@ fn build_report( }) .collect::>(); - AuditReportV1 { - format_version: 1, + AuditReportV2 { + format_version: 2, meta: AuditRunMeta { validation_time_rfc3339_utc, }, @@ -333,6 +333,8 @@ fn build_report( publication_points: out.publication_points, vrps, aspas, + downloads: out.downloads, + download_stats: out.download_stats, } } @@ -879,6 +881,8 @@ mod tests { discovery, tree, publication_points: vec![pp1, pp2, pp3], + downloads: Vec::new(), + download_stats: crate::audit::AuditDownloadStats::default(), }; let policy = Policy::default(); @@ -894,8 +898,8 @@ mod tests { #[test] fn write_json_writes_report() { - let report = AuditReportV1 { - format_version: 1, + let report = AuditReportV2 { + format_version: 2, meta: AuditRunMeta { validation_time_rfc3339_utc: "2026-01-01T00:00:00Z".to_string(), }, @@ -908,6 +912,8 @@ mod tests { publication_points: Vec::new(), vrps: Vec::new(), aspas: Vec::new(), + downloads: Vec::new(), + download_stats: crate::audit::AuditDownloadStats::default(), }; let dir = tempfile::tempdir().expect("tmpdir"); diff --git a/src/lib.rs b/src/lib.rs index fb52d91..bd6c599 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ pub mod analysis; #[cfg(feature = "full")] pub mod audit; #[cfg(feature = "full")] +pub mod audit_downloads; +#[cfg(feature = "full")] pub mod cli; #[cfg(feature = "full")] pub mod fetch; diff --git a/src/sync/repo.rs b/src/sync/repo.rs index 8d3ce0c..c3f5c44 100644 --- a/src/sync/repo.rs +++ b/src/sync/repo.rs @@ -1,9 +1,11 @@ use crate::analysis::timing::TimingHandle; +use crate::audit_downloads::DownloadLogHandle; +use crate::audit::{AuditDownloadKind}; use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher}; use crate::policy::{Policy, SyncPreference}; use crate::report::{RfcRef, Warning}; use crate::storage::RocksStore; -use crate::sync::rrdp::sync_from_notification_with_timing; +use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log; use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError}; use std::thread; use std::time::Duration; @@ -56,10 +58,17 @@ pub fn sync_publication_point( http_fetcher: &dyn HttpFetcher, rsync_fetcher: &dyn RsyncFetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> Result { match (policy.sync_preference, rrdp_notification_uri) { (SyncPreference::RrdpThenRsync, Some(notification_uri)) => { - match try_rrdp_sync_with_retry(store, notification_uri, http_fetcher, timing) { + match try_rrdp_sync_with_retry( + store, + notification_uri, + http_fetcher, + timing, + download_log, + ) { Ok(written) => { if let Some(t) = timing.as_ref() { t.record_count("repo_sync_rrdp_ok_total", 1); @@ -80,8 +89,13 @@ pub fn sync_publication_point( .with_rfc_refs(&[RfcRef("RFC 8182 §3.4.5")]) .with_context(notification_uri), ]; - let written = - rsync_sync_into_raw_objects(store, rsync_base_uri, rsync_fetcher, timing)?; + let written = rsync_sync_into_raw_objects( + store, + rsync_base_uri, + rsync_fetcher, + timing, + download_log, + )?; if let Some(t) = timing.as_ref() { t.record_count("repo_sync_rsync_fallback_ok_total", 1); t.record_count("repo_sync_rsync_objects_written_total", written as u64); @@ -95,8 +109,13 @@ pub fn sync_publication_point( } } _ => { - let written = - rsync_sync_into_raw_objects(store, rsync_base_uri, rsync_fetcher, timing)?; + let written = rsync_sync_into_raw_objects( + store, + rsync_base_uri, + rsync_fetcher, + timing, + download_log, + )?; if let Some(t) = timing.as_ref() { t.record_count("repo_sync_rsync_direct_total", 1); t.record_count("repo_sync_rsync_objects_written_total", written as u64); @@ -115,6 +134,7 @@ fn try_rrdp_sync( notification_uri: &str, http_fetcher: &dyn HttpFetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> Result { let notification_xml = { let _step = timing @@ -123,17 +143,27 @@ fn try_rrdp_sync( let _total = timing .as_ref() .map(|t| t.span_phase("rrdp_fetch_notification_total")); + let mut dl_span = download_log.map(|dl| { + dl.span_download(AuditDownloadKind::RrdpNotification, notification_uri) + }); match http_fetcher.fetch(notification_uri) { Ok(v) => { if let Some(t) = timing.as_ref() { t.record_count("rrdp_notification_fetch_ok_total", 1); } + if let Some(s) = dl_span.as_mut() { + s.set_bytes(v.len() as u64); + s.set_ok(); + } v } Err(e) => { if let Some(t) = timing.as_ref() { t.record_count("rrdp_notification_fetch_fail_total", 1); } + if let Some(s) = dl_span.as_mut() { + s.set_err(e.clone()); + } return Err(RrdpSyncError::Fetch(e)); } } @@ -145,12 +175,13 @@ fn try_rrdp_sync( ); } - sync_from_notification_with_timing( + sync_from_notification_with_timing_and_download_log( store, notification_uri, ¬ification_xml, http_fetcher, timing, + download_log, ) } @@ -183,6 +214,7 @@ fn try_rrdp_sync_with_retry( notification_uri: &str, http_fetcher: &dyn HttpFetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> Result { let backoffs = rrdp_retry_backoffs(); let max_attempts = backoffs.len().saturating_add(1).max(1); @@ -194,7 +226,7 @@ fn try_rrdp_sync_with_retry( t.record_count("rrdp_retry_attempt_total", 1); } - match try_rrdp_sync(store, notification_uri, http_fetcher, timing) { + match try_rrdp_sync(store, notification_uri, http_fetcher, timing, download_log) { Ok(written) => { if attempt > 1 { if let Some(t) = timing.as_ref() { @@ -244,12 +276,31 @@ fn rsync_sync_into_raw_objects( rsync_base_uri: &str, rsync_fetcher: &dyn RsyncFetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> Result { let _s = timing .as_ref() .map(|t| t.span_rrdp_repo_step(rsync_base_uri, "rsync_fetch_objects")); let _p = timing.as_ref().map(|t| t.span_phase("rsync_fetch_total")); - let objects = rsync_fetcher.fetch_objects(rsync_base_uri)?; + let mut dl_span = + download_log.map(|dl| dl.span_download(AuditDownloadKind::Rsync, rsync_base_uri)); + let objects = match rsync_fetcher.fetch_objects(rsync_base_uri) { + Ok(v) => { + let bytes_total: u64 = v.iter().map(|(_u, b)| b.len() as u64).sum::(); + if let Some(s) = dl_span.as_mut() { + s.set_objects(v.len() as u64, bytes_total); + s.set_bytes(bytes_total); + s.set_ok(); + } + v + } + Err(e) => { + if let Some(s) = dl_span.as_mut() { + s.set_err(e.to_string()); + } + return Err(e.into()); + } + }; if let Some(t) = timing.as_ref() { t.record_count("rsync_objects_fetched_total", objects.len() as u64); let bytes_total: u64 = objects.iter().map(|(_u, b)| b.len() as u64).sum::(); @@ -270,6 +321,7 @@ mod tests { use crate::analysis::timing::{TimingHandle, TimingMeta}; use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::sync::rrdp::Fetcher as HttpFetcher; + use crate::sync::rrdp::RrdpState; use base64::Engine; use sha2::Digest; use std::collections::HashMap; @@ -364,6 +416,7 @@ mod tests { let http = DummyHttpFetcher; let rsync = LocalDirRsyncFetcher::new(&repo_dir); + let download_log = DownloadLogHandle::new(); let out = sync_publication_point( &store, &policy, @@ -372,12 +425,22 @@ mod tests { &http, &rsync, Some(&timing), + Some(&download_log), ) .expect("sync ok"); assert_eq!(out.source, RepoSyncSource::Rsync); assert_eq!(out.objects_written, 3); + let events = download_log.snapshot_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].kind, AuditDownloadKind::Rsync); + assert!(events[0].success); + assert_eq!(events[0].bytes, Some(9)); + let objects = events[0].objects.as_ref().expect("objects stat"); + assert_eq!(objects.objects_count, 3); + assert_eq!(objects.objects_bytes_total, 9); + assert_eq!( store.get_raw("rsync://example.test/repo/a.mft").unwrap(), Some(b"mft".to_vec()) @@ -481,6 +544,7 @@ mod tests { ..Policy::default() }; + let download_log = DownloadLogHandle::new(); let out = sync_publication_point( &store, &policy, @@ -489,6 +553,7 @@ mod tests { &http, &PanicRsyncFetcher, Some(&timing), + Some(&download_log), ) .expect("sync ok"); @@ -498,6 +563,30 @@ mod tests { Some(published_bytes.to_vec()) ); + let events = download_log.snapshot_events(); + assert_eq!(events.len(), 4, "expected 3x notification + 1x snapshot"); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpNotification) + .count(), + 3 + ); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot) + .count(), + 1 + ); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpNotification && !e.success) + .count(), + 2 + ); + let v = timing_to_json(temp.path(), &timing); let counts = v.get("counts").expect("counts"); assert_eq!( @@ -542,11 +631,12 @@ mod tests { &[(published_uri, published_bytes)], ); // Intentionally wrong hash to trigger protocol error (SnapshotHashMismatch). + let wrong_hash = "00".repeat(32); let notif = notification_xml( "9df4b597-af9e-4dca-bdda-719cce2c4e28", 1, snapshot_uri, - "00", + &wrong_hash, ); let mut map = HashMap::new(); @@ -569,6 +659,7 @@ mod tests { ..Policy::default() }; + let download_log = DownloadLogHandle::new(); let out = sync_publication_point( &store, &policy, @@ -577,6 +668,7 @@ mod tests { &http, &EmptyRsyncFetcher, Some(&timing), + Some(&download_log), ) .expect("sync ok"); @@ -588,6 +680,15 @@ mod tests { "expected RRDP fallback warning" ); + let events = download_log.snapshot_events(); + assert_eq!(events.len(), 3, "expected notification + snapshot + rsync fallback"); + assert_eq!(events[0].kind, AuditDownloadKind::RrdpNotification); + assert!(events[0].success); + assert_eq!(events[1].kind, AuditDownloadKind::RrdpSnapshot); + assert!(events[1].success); + assert_eq!(events[2].kind, AuditDownloadKind::Rsync); + assert!(events[2].success); + let v = timing_to_json(temp.path(), &timing); let counts = v.get("counts").expect("counts"); assert_eq!( @@ -615,4 +716,112 @@ mod tests { Some(1) ); } + + #[test] + fn rrdp_delta_fetches_are_logged_even_if_snapshot_fallback_is_used() { + let temp = tempfile::tempdir().expect("tempdir"); + let store_dir = temp.path().join("db"); + let store = RocksStore::open(&store_dir).expect("open rocksdb"); + + let timing = TimingHandle::new(TimingMeta { + recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(), + validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(), + tal_url: None, + db_path: Some(store_dir.to_string_lossy().into_owned()), + }); + + let notification_uri = "https://example.test/notification.xml"; + let snapshot_uri = "https://example.test/snapshot.xml"; + let delta_2_uri = "https://example.test/delta_2.xml"; + let delta_3_uri = "https://example.test/delta_3.xml"; + let published_uri = "rsync://example.test/repo/a.mft"; + let published_bytes = b"x"; + + let sid = "9df4b597-af9e-4dca-bdda-719cce2c4e28"; + + // Seed old RRDP state so sync_from_notification tries deltas (RFC 8182 §3.4.1). + let state = RrdpState { + session_id: sid.to_string(), + serial: 1, + }; + let state_bytes = state.encode().expect("encode state"); + store + .put_rrdp_state(notification_uri, &state_bytes) + .expect("seed state"); + + let delta_2 = format!( + r#""# + ) + .into_bytes(); + let delta_3 = format!( + r#""# + ) + .into_bytes(); + let delta_2_hash = hex::encode(sha2::Sha256::digest(&delta_2)); + let delta_3_hash = hex::encode(sha2::Sha256::digest(&delta_3)); + + let snapshot = snapshot_xml(sid, 3, &[(published_uri, published_bytes)]); + let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot)); + let notif = format!( + r#""# + ) + .into_bytes(); + + let mut map = HashMap::new(); + map.insert(notification_uri.to_string(), notif); + map.insert(snapshot_uri.to_string(), snapshot); + map.insert(delta_2_uri.to_string(), delta_2); + map.insert(delta_3_uri.to_string(), delta_3); + let http = MapFetcher { map }; + + let policy = Policy { + sync_preference: SyncPreference::RrdpThenRsync, + ..Policy::default() + }; + + let download_log = DownloadLogHandle::new(); + let out = sync_publication_point( + &store, + &policy, + Some(notification_uri), + "rsync://example.test/repo/", + &http, + &PanicRsyncFetcher, + Some(&timing), + Some(&download_log), + ) + .expect("sync ok"); + + assert_eq!(out.source, RepoSyncSource::Rrdp); + assert_eq!(out.objects_written, 1); + assert_eq!( + store.get_raw(published_uri).unwrap(), + Some(published_bytes.to_vec()) + ); + + let events = download_log.snapshot_events(); + assert_eq!(events.len(), 4); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpNotification) + .count(), + 1 + ); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpDelta) + .count(), + 2 + ); + assert_eq!( + events + .iter() + .filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot) + .count(), + 1 + ); + assert!(events.iter().all(|e| e.success)); + } } diff --git a/src/sync/rrdp.rs b/src/sync/rrdp.rs index 956ac62..79e77b6 100644 --- a/src/sync/rrdp.rs +++ b/src/sync/rrdp.rs @@ -1,4 +1,6 @@ use crate::analysis::timing::TimingHandle; +use crate::audit::AuditDownloadKind; +use crate::audit_downloads::DownloadLogHandle; use crate::storage::RocksStore; use crate::storage::RrdpDeltaOp; use base64::Engine; @@ -432,7 +434,14 @@ pub fn sync_from_notification_snapshot( notification_xml: &[u8], fetcher: &dyn Fetcher, ) -> RrdpSyncResult { - sync_from_notification_snapshot_inner(store, notification_uri, notification_xml, fetcher, None) + sync_from_notification_snapshot_inner( + store, + notification_uri, + notification_xml, + fetcher, + None, + None, + ) } pub fn sync_from_notification_snapshot_with_timing( @@ -448,6 +457,25 @@ pub fn sync_from_notification_snapshot_with_timing( notification_xml, fetcher, timing, + None, + ) +} + +pub fn sync_from_notification_snapshot_with_timing_and_download_log( + store: &RocksStore, + notification_uri: &str, + notification_xml: &[u8], + fetcher: &dyn Fetcher, + timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, +) -> RrdpSyncResult { + sync_from_notification_snapshot_inner( + store, + notification_uri, + notification_xml, + fetcher, + timing, + download_log, ) } @@ -457,6 +485,7 @@ fn sync_from_notification_snapshot_inner( notification_xml: &[u8], fetcher: &dyn Fetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> RrdpSyncResult { let _parse_step = timing .as_ref() @@ -470,16 +499,30 @@ fn sync_from_notification_snapshot_inner( .as_ref() .map(|t| t.span_rrdp_repo_step(notification_uri, "fetch_snapshot")); let _fetch_total = timing.as_ref().map(|t| t.span_phase("rrdp_fetch_snapshot_total")); - let snapshot_xml = fetcher.fetch(¬if.snapshot_uri).map_err(|e| { - if let Some(t) = timing.as_ref() { - t.record_count("rrdp_snapshot_fetch_fail_total", 1); + let mut dl_span = download_log + .map(|dl| dl.span_download(AuditDownloadKind::RrdpSnapshot, ¬if.snapshot_uri)); + let snapshot_xml = match fetcher.fetch(¬if.snapshot_uri) { + Ok(v) => { + if let Some(t) = timing.as_ref() { + t.record_count("rrdp_snapshot_fetch_ok_total", 1); + t.record_count("rrdp_snapshot_bytes_total", v.len() as u64); + } + if let Some(s) = dl_span.as_mut() { + s.set_bytes(v.len() as u64); + s.set_ok(); + } + v } - RrdpSyncError::Fetch(e) - })?; - if let Some(t) = timing.as_ref() { - t.record_count("rrdp_snapshot_fetch_ok_total", 1); - t.record_count("rrdp_snapshot_bytes_total", snapshot_xml.len() as u64); - } + Err(e) => { + if let Some(t) = timing.as_ref() { + t.record_count("rrdp_snapshot_fetch_fail_total", 1); + } + if let Some(s) = dl_span.as_mut() { + s.set_err(e.clone()); + } + return Err(RrdpSyncError::Fetch(e)); + } + }; drop(_fetch_step); drop(_fetch_total); @@ -535,7 +578,7 @@ pub fn sync_from_notification( notification_xml: &[u8], fetcher: &dyn Fetcher, ) -> RrdpSyncResult { - sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, None) + sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, None, None) } pub fn sync_from_notification_with_timing( @@ -545,7 +588,25 @@ pub fn sync_from_notification_with_timing( fetcher: &dyn Fetcher, timing: Option<&TimingHandle>, ) -> RrdpSyncResult { - sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, timing) + sync_from_notification_inner(store, notification_uri, notification_xml, fetcher, timing, None) +} + +pub fn sync_from_notification_with_timing_and_download_log( + store: &RocksStore, + notification_uri: &str, + notification_xml: &[u8], + fetcher: &dyn Fetcher, + timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, +) -> RrdpSyncResult { + sync_from_notification_inner( + store, + notification_uri, + notification_xml, + fetcher, + timing, + download_log, + ) } fn sync_from_notification_inner( @@ -554,6 +615,7 @@ fn sync_from_notification_inner( notification_xml: &[u8], fetcher: &dyn Fetcher, timing: Option<&TimingHandle>, + download_log: Option<&DownloadLogHandle>, ) -> RrdpSyncResult { let _parse_step = timing .as_ref() @@ -632,18 +694,27 @@ fn sync_from_notification_inner( } }; + let mut dl_span = download_log + .map(|dl| dl.span_download(AuditDownloadKind::RrdpDelta, &dref.uri)); match fetcher.fetch(&dref.uri) { Ok(bytes) => { if let Some(t) = timing.as_ref() { t.record_count("rrdp_delta_fetch_ok_total", 1); t.record_count("rrdp_delta_bytes_total", bytes.len() as u64); } + if let Some(s) = dl_span.as_mut() { + s.set_bytes(bytes.len() as u64); + s.set_ok(); + } fetched.push((serial, dref.hash_sha256, bytes)) } - Err(_) => { + Err(e) => { if let Some(t) = timing.as_ref() { t.record_count("rrdp_delta_fetch_fail_total", 1); } + if let Some(s) = dl_span.as_mut() { + s.set_err(e.clone()); + } fetch_ok = false; break; } @@ -710,16 +781,30 @@ fn sync_from_notification_inner( .as_ref() .map(|t| t.span_rrdp_repo_step(notification_uri, "fetch_snapshot")); let _fetch_total = timing.as_ref().map(|t| t.span_phase("rrdp_fetch_snapshot_total")); - let snapshot_xml = fetcher.fetch(¬if.snapshot_uri).map_err(|e| { - if let Some(t) = timing.as_ref() { - t.record_count("rrdp_snapshot_fetch_fail_total", 1); + let mut dl_span = download_log + .map(|dl| dl.span_download(AuditDownloadKind::RrdpSnapshot, ¬if.snapshot_uri)); + let snapshot_xml = match fetcher.fetch(¬if.snapshot_uri) { + Ok(v) => { + if let Some(t) = timing.as_ref() { + t.record_count("rrdp_snapshot_fetch_ok_total", 1); + t.record_count("rrdp_snapshot_bytes_total", v.len() as u64); + } + if let Some(s) = dl_span.as_mut() { + s.set_bytes(v.len() as u64); + s.set_ok(); + } + v } - RrdpSyncError::Fetch(e) - })?; - if let Some(t) = timing.as_ref() { - t.record_count("rrdp_snapshot_fetch_ok_total", 1); - t.record_count("rrdp_snapshot_bytes_total", snapshot_xml.len() as u64); - } + Err(e) => { + if let Some(t) = timing.as_ref() { + t.record_count("rrdp_snapshot_fetch_fail_total", 1); + } + if let Some(s) = dl_span.as_mut() { + s.set_err(e.clone()); + } + return Err(RrdpSyncError::Fetch(e)); + } + }; drop(_fetch_step); drop(_fetch_total); diff --git a/src/validation/run.rs b/src/validation/run.rs index 24cf084..a36e385 100644 --- a/src/validation/run.rs +++ b/src/validation/run.rs @@ -52,6 +52,7 @@ pub fn run_publication_point_once( http_fetcher, rsync_fetcher, None, + None, )?; let publication_point = process_manifest_publication_point( diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index b1e71c3..0e30929 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -2,6 +2,7 @@ use url::Url; use crate::analysis::timing::TimingHandle; use crate::audit::PublicationPointAudit; +use crate::audit_downloads::DownloadLogHandle; use crate::data_model::ta::TrustAnchor; use crate::sync::rrdp::Fetcher; use crate::validation::from_tal::{ @@ -27,6 +28,8 @@ pub struct RunTreeFromTalAuditOutput { pub discovery: DiscoveredRootCaInstance, pub tree: TreeRunOutput, pub publication_points: Vec, + pub downloads: Vec, + pub download_stats: crate::audit::AuditDownloadStats, } #[derive(Debug, thiserror::Error)] @@ -75,6 +78,7 @@ pub fn run_tree_from_tal_url_serial( rsync_fetcher, validation_time, timing: None, + download_log: None, revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -99,6 +103,7 @@ pub fn run_tree_from_tal_url_serial_audit( ) -> Result { let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?; + let download_log = DownloadLogHandle::new(); let runner = Rpkiv1PublicationPointRunner { store, policy, @@ -106,6 +111,7 @@ pub fn run_tree_from_tal_url_serial_audit( rsync_fetcher, validation_time, timing: None, + download_log: Some(download_log.clone()), revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -119,10 +125,14 @@ pub fn run_tree_from_tal_url_serial_audit( publication_points, } = run_tree_serial_audit(root, &runner, config)?; + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { discovery, tree, publication_points, + downloads, + download_stats, }) } @@ -140,6 +150,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?; drop(_tal); + let download_log = DownloadLogHandle::new(); let runner = Rpkiv1PublicationPointRunner { store, policy, @@ -147,6 +158,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( rsync_fetcher, validation_time, timing: Some(timing.clone()), + download_log: Some(download_log.clone()), revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -161,10 +173,14 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( publication_points, } = run_tree_serial_audit(root, &runner, config)?; + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { discovery, tree, publication_points, + downloads, + download_stats, }) } @@ -189,6 +205,7 @@ pub fn run_tree_from_tal_and_ta_der_serial( rsync_fetcher, validation_time, timing: None, + download_log: None, revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -216,6 +233,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( let discovery = discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?; + let download_log = DownloadLogHandle::new(); let runner = Rpkiv1PublicationPointRunner { store, policy, @@ -223,6 +241,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( rsync_fetcher, validation_time, timing: None, + download_log: Some(download_log.clone()), revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -236,10 +255,14 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit( publication_points, } = run_tree_serial_audit(root, &runner, config)?; + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { discovery, tree, publication_points, + downloads, + download_stats, }) } @@ -260,6 +283,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?; drop(_tal); + let download_log = DownloadLogHandle::new(); let runner = Rpkiv1PublicationPointRunner { store, policy, @@ -267,6 +291,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( rsync_fetcher, validation_time, timing: Some(timing.clone()), + download_log: Some(download_log.clone()), revalidate_only: config.revalidate_only, rrdp_dedup: true, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -281,9 +306,13 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing( publication_points, } = run_tree_serial_audit(root, &runner, config)?; + let downloads = download_log.snapshot_events(); + let download_stats = DownloadLogHandle::stats_from_events(&downloads); Ok(RunTreeFromTalAuditOutput { discovery, tree, publication_points, + downloads, + download_stats, }) } diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 8811716..9ccc94a 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -3,6 +3,7 @@ use crate::audit::{ AuditObjectKind, AuditObjectResult, AuditWarning, ObjectAuditEntry, PublicationPointAudit, sha256_hex, sha256_hex_from_32, }; +use crate::audit_downloads::DownloadLogHandle; use crate::fetch::rsync::RsyncFetcher; use crate::policy::Policy; use crate::report::{RfcRef, Warning}; @@ -33,6 +34,7 @@ pub struct Rpkiv1PublicationPointRunner<'a> { pub rsync_fetcher: &'a dyn RsyncFetcher, pub validation_time: time::OffsetDateTime, pub timing: Option, + pub download_log: Option, pub revalidate_only: bool, /// In-run RRDP dedup: when RRDP is enabled, only sync each `rrdp_notification_uri` once per run. /// @@ -157,6 +159,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { self.http_fetcher, self.rsync_fetcher, self.timing.as_ref(), + self.download_log.as_ref(), ) { Ok(res) => { // Populate rsync dedup cache when we actually used rsync. @@ -1390,6 +1393,7 @@ authorityKeyIdentifier = keyid:always rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir), validation_time, timing: None, + download_log: None, revalidate_only: false, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -1502,6 +1506,7 @@ authorityKeyIdentifier = keyid:always rsync_fetcher: &rsync, validation_time, timing: None, + download_log: None, revalidate_only: false, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -1576,6 +1581,7 @@ authorityKeyIdentifier = keyid:always rsync_fetcher: &LocalDirRsyncFetcher::new(&fixture_dir), validation_time, timing: None, + download_log: None, revalidate_only: false, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), @@ -1599,6 +1605,7 @@ authorityKeyIdentifier = keyid:always rsync_fetcher: &FailingRsyncFetcher, validation_time, timing: None, + download_log: None, revalidate_only: false, rrdp_dedup: false, rrdp_repo_cache: Mutex::new(HashMap::new()), diff --git a/tests/test_apnic_rrdp_delta_live_20260226.rs b/tests/test_apnic_rrdp_delta_live_20260226.rs index a316e6c..e3ccbcc 100644 --- a/tests/test_apnic_rrdp_delta_live_20260226.rs +++ b/tests/test_apnic_rrdp_delta_live_20260226.rs @@ -114,6 +114,7 @@ fn apnic_live_bootstrap_snapshot_and_fetch_cache_pp_pack_to_persistent_db() { &http, &rsync, None, + None, ) .expect("repo sync"); diff --git a/tests/test_apnic_stats_live_stage2.rs b/tests/test_apnic_stats_live_stage2.rs index eabf12d..9cb2241 100644 --- a/tests/test_apnic_stats_live_stage2.rs +++ b/tests/test_apnic_stats_live_stage2.rs @@ -167,6 +167,7 @@ fn apnic_tree_full_stats_serial() { rsync_fetcher: &rsync, validation_time, timing: None, + download_log: None, revalidate_only: false, rrdp_dedup: true, rrdp_repo_cache: std::sync::Mutex::new(std::collections::HashMap::new()), diff --git a/tests/test_cli_run_offline_m18.rs b/tests/test_cli_run_offline_m18.rs index c60a333..922ff17 100644 --- a/tests/test_cli_run_offline_m18.rs +++ b/tests/test_cli_run_offline_m18.rs @@ -37,5 +37,5 @@ fn cli_run_offline_mode_executes_and_writes_json() { let bytes = std::fs::read(&report_path).expect("read report json"); let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse report json"); - assert_eq!(v["format_version"], 1); + assert_eq!(v["format_version"], 2); } diff --git a/tests/test_cli_smoke_m18.rs b/tests/test_cli_smoke_m18.rs index c1f6839..7c06d1c 100644 --- a/tests/test_cli_smoke_m18.rs +++ b/tests/test_cli_smoke_m18.rs @@ -44,10 +44,12 @@ fn cli_offline_smoke_writes_report_json() { let bytes = std::fs::read(&report_path).expect("read report json"); let v: serde_json::Value = serde_json::from_slice(&bytes).expect("parse report json"); - assert_eq!(v["format_version"], 1); + assert_eq!(v["format_version"], 2); assert!(v.get("policy").is_some()); assert!(v.get("tree").is_some()); assert!(v.get("publication_points").is_some()); assert!(v.get("vrps").is_some()); assert!(v.get("aspas").is_some()); + assert!(v.get("downloads").is_some()); + assert!(v.get("download_stats").is_some()); } diff --git a/tests/test_objects_process_pack_for_issuer.rs b/tests/test_objects_process_pack_for_issuer.rs index b3e249e..43e9dc5 100644 --- a/tests/test_objects_process_pack_for_issuer.rs +++ b/tests/test_objects_process_pack_for_issuer.rs @@ -92,6 +92,7 @@ fn build_fetch_cache_pp_from_local_rsync_fixture( &NoopHttpFetcher, &LocalDirRsyncFetcher::new(dir), None, + None, ) .expect("sync into raw_objects"); diff --git a/tests/test_repo_sync_m6.rs b/tests/test_repo_sync_m6.rs index 2116dae..5d6f97a 100644 --- a/tests/test_repo_sync_m6.rs +++ b/tests/test_repo_sync_m6.rs @@ -84,6 +84,7 @@ fn repo_sync_uses_rrdp_when_available() { &http_fetcher, &rsync_fetcher, None, + None, ) .expect("sync"); @@ -137,6 +138,7 @@ fn repo_sync_skips_snapshot_when_state_unchanged() { &http_fetcher, &rsync_fetcher, None, + None, ) .expect("sync 1"); assert_eq!(out1.source, RepoSyncSource::Rrdp); @@ -150,6 +152,7 @@ fn repo_sync_skips_snapshot_when_state_unchanged() { &http_fetcher, &rsync_fetcher, None, + None, ) .expect("sync 2"); assert_eq!(out2.source, RepoSyncSource::Rrdp); @@ -203,6 +206,7 @@ fn repo_sync_falls_back_to_rsync_on_rrdp_failure() { &http_fetcher, &rsync_fetcher, None, + None, ) .expect("fallback sync"); @@ -252,6 +256,7 @@ fn repo_sync_rsync_populates_raw_objects() { &http_fetcher, &rsync_fetcher, None, + None, ) .expect("rsync-only sync");