rpki/src/sync/repo.rs

2072 lines
78 KiB
Rust

use crate::analysis::timing::TimingHandle;
use crate::audit::AuditDownloadKind;
use crate::audit_downloads::DownloadLogHandle;
use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use crate::policy::{Policy, SyncPreference};
use crate::replay::archive::{ReplayArchiveIndex, ReplayTransport};
use crate::replay::delta_archive::{ReplayDeltaArchiveIndex, ReplayDeltaRrdpKind};
use crate::report::{RfcRef, Warning};
use crate::storage::{RawByHashEntry, RocksStore};
use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log;
use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError, load_rrdp_local_state};
use crate::sync::store_projection::{
build_repository_view_present_entry,
build_repository_view_withdrawn_entry,
compute_sha256_hex, infer_object_type_from_uri,
};
use std::collections::{BTreeMap, HashSet};
use std::thread;
use std::time::Duration;
#[cfg(test)]
use crate::storage::RrdpSourceSyncState;
#[cfg(test)]
use crate::sync::rrdp::persist_rrdp_local_state;
const RRDP_RETRY_BACKOFFS_PROD: [Duration; 3] = [
Duration::from_millis(200),
Duration::from_millis(500),
Duration::from_secs(1),
];
const RRDP_RETRY_BACKOFFS_TEST: [Duration; 2] =
[Duration::from_millis(0), Duration::from_millis(0)];
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoSyncSource {
Rrdp,
Rsync,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncResult {
pub source: RepoSyncSource,
pub objects_written: usize,
pub warnings: Vec<Warning>,
}
#[derive(Debug, thiserror::Error)]
pub enum RepoSyncError {
#[error("RRDP sync failed: {0}")]
Rrdp(#[from] RrdpSyncError),
#[error("rsync fallback failed: {0}")]
Rsync(#[from] RsyncFetchError),
#[error("replay sync error: {0}")]
Replay(String),
#[error("storage error: {0}")]
Storage(String),
}
/// Sync a publication point into the current repository view.
///
/// v1 behavior:
/// - If `rrdp_notification_uri` is present and `policy.sync_preference` is `rrdp_then_rsync`,
/// try RRDP snapshot sync first (RFC 8182 §3.4.1-§3.4.3).
/// - On RRDP failure, fall back to rsync (RFC 8182 §3.4.5).
/// - If `sync_preference` is `rsync_only` or there is no RRDP URI, use rsync.
pub fn sync_publication_point(
store: &RocksStore,
policy: &Policy,
rrdp_notification_uri: Option<&str>,
rsync_base_uri: &str,
http_fetcher: &dyn HttpFetcher,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<RepoSyncResult, RepoSyncError> {
match (policy.sync_preference, rrdp_notification_uri) {
(SyncPreference::RrdpThenRsync, Some(notification_uri)) => {
match try_rrdp_sync_with_retry(
store,
notification_uri,
http_fetcher,
timing,
download_log,
) {
Ok(written) => {
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rrdp_ok_total", 1);
t.record_count("repo_sync_rrdp_objects_written_total", written as u64);
}
crate::progress_log::emit(
"repo_sync_rrdp_ok",
serde_json::json!({
"notify_uri": notification_uri,
"objects_written": written,
}),
);
Ok(RepoSyncResult {
source: RepoSyncSource::Rrdp,
objects_written: written,
warnings: Vec::new(),
})
}
Err(err) => {
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rrdp_failed_total", 1);
}
crate::progress_log::emit(
"rrdp_fallback_rsync",
serde_json::json!({
"notify_uri": notification_uri,
"rsync_base_uri": rsync_base_uri,
"rrdp_error": err.to_string(),
}),
);
let warnings = vec![
Warning::new(format!("RRDP failed; falling back to rsync: {err}"))
.with_rfc_refs(&[RfcRef("RFC 8182 §3.4.5")])
.with_context(notification_uri),
];
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_fallback_ok_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rsync,
objects_written: written,
warnings,
})
}
}
}
_ => {
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
crate::progress_log::emit(
"repo_sync_rsync_direct",
serde_json::json!({
"rsync_base_uri": rsync_base_uri,
"objects_written": written,
}),
);
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_direct_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rsync,
objects_written: written,
warnings: Vec::new(),
})
}
}
}
pub fn sync_publication_point_replay(
store: &RocksStore,
replay_index: &ReplayArchiveIndex,
rrdp_notification_uri: Option<&str>,
rsync_base_uri: &str,
http_fetcher: &dyn HttpFetcher,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<RepoSyncResult, RepoSyncError> {
match resolve_replay_transport(replay_index, rrdp_notification_uri, rsync_base_uri)? {
ReplayResolvedTransport::Rrdp(notification_uri) => {
let written = try_rrdp_sync_with_retry(
store,
notification_uri,
http_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rrdp_ok_total", 1);
t.record_count("repo_sync_rrdp_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rrdp,
objects_written: written,
warnings: Vec::new(),
})
}
ReplayResolvedTransport::Rsync => {
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_direct_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rsync,
objects_written: written,
warnings: Vec::new(),
})
}
}
}
pub fn sync_publication_point_replay_delta(
store: &RocksStore,
delta_index: &ReplayDeltaArchiveIndex,
rrdp_notification_uri: Option<&str>,
rsync_base_uri: &str,
http_fetcher: &dyn HttpFetcher,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<RepoSyncResult, RepoSyncError> {
match resolve_replay_delta_transport(store, delta_index, rrdp_notification_uri, rsync_base_uri)?
{
ReplayDeltaResolvedTransport::Rrdp(notification_uri) => {
let written = try_rrdp_sync_with_retry(
store,
notification_uri,
http_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rrdp_ok_total", 1);
t.record_count("repo_sync_rrdp_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rrdp,
objects_written: written,
warnings: Vec::new(),
})
}
ReplayDeltaResolvedTransport::Rsync => {
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
timing,
download_log,
)?;
if let Some(t) = timing.as_ref() {
t.record_count("repo_sync_rsync_direct_total", 1);
t.record_count("repo_sync_rsync_objects_written_total", written as u64);
}
Ok(RepoSyncResult {
source: RepoSyncSource::Rsync,
objects_written: written,
warnings: Vec::new(),
})
}
ReplayDeltaResolvedTransport::Noop(source) => Ok(RepoSyncResult {
source,
objects_written: 0,
warnings: Vec::new(),
}),
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ReplayResolvedTransport<'a> {
Rrdp(&'a str),
Rsync,
}
enum ReplayDeltaResolvedTransport<'a> {
Rrdp(&'a str),
Rsync,
Noop(RepoSyncSource),
}
fn resolve_replay_transport<'a>(
replay_index: &'a ReplayArchiveIndex,
rrdp_notification_uri: Option<&'a str>,
rsync_base_uri: &str,
) -> Result<ReplayResolvedTransport<'a>, RepoSyncError> {
if let Some(notification_uri) = rrdp_notification_uri {
let lock = replay_index.rrdp_lock(notification_uri).ok_or_else(|| {
RepoSyncError::Replay(format!(
"replay RRDP lock missing for notification URI: {notification_uri}"
))
})?;
return Ok(match lock.transport {
ReplayTransport::Rrdp => ReplayResolvedTransport::Rrdp(notification_uri),
ReplayTransport::Rsync => ReplayResolvedTransport::Rsync,
});
}
replay_index
.resolve_rsync_module_for_base_uri(rsync_base_uri)
.map_err(|e| RepoSyncError::Replay(e.to_string()))?;
Ok(ReplayResolvedTransport::Rsync)
}
fn resolve_replay_delta_transport<'a>(
store: &RocksStore,
delta_index: &'a ReplayDeltaArchiveIndex,
rrdp_notification_uri: Option<&'a str>,
rsync_base_uri: &str,
) -> Result<ReplayDeltaResolvedTransport<'a>, RepoSyncError> {
if let Some(notification_uri) = rrdp_notification_uri {
let repo = delta_index.rrdp_repo(notification_uri).ok_or_else(|| {
RepoSyncError::Replay(format!(
"delta replay RRDP entry missing for notification URI: {notification_uri}"
))
})?;
validate_delta_replay_base_state_for_repo(store, notification_uri, &repo.transition.base)?;
return match repo.transition.kind {
ReplayDeltaRrdpKind::Delta => Ok(ReplayDeltaResolvedTransport::Rrdp(notification_uri)),
ReplayDeltaRrdpKind::Unchanged => Ok(ReplayDeltaResolvedTransport::Noop(
match repo.transition.target.transport {
ReplayTransport::Rrdp => RepoSyncSource::Rrdp,
ReplayTransport::Rsync => RepoSyncSource::Rsync,
},
)),
ReplayDeltaRrdpKind::FallbackRsync => Ok(ReplayDeltaResolvedTransport::Rsync),
ReplayDeltaRrdpKind::SessionReset => Err(RepoSyncError::Replay(format!(
"delta replay kind session-reset requires fresh full replay for {notification_uri}"
))),
ReplayDeltaRrdpKind::Gap => Err(RepoSyncError::Replay(format!(
"delta replay kind gap requires fresh full replay for {notification_uri}"
))),
};
}
delta_index
.resolve_rsync_module_for_base_uri(rsync_base_uri)
.map_err(|e| RepoSyncError::Replay(e.to_string()))?;
Ok(ReplayDeltaResolvedTransport::Rsync)
}
fn validate_delta_replay_base_state_for_repo(
store: &RocksStore,
notification_uri: &str,
base: &crate::replay::delta_archive::ReplayDeltaRrdpState,
) -> Result<(), RepoSyncError> {
match base.transport {
ReplayTransport::Rrdp => {
let state = load_rrdp_local_state(store, notification_uri)
.map_err(RepoSyncError::Storage)?
.ok_or_else(|| {
RepoSyncError::Replay(format!(
"delta replay base state missing for {notification_uri}: expected RRDP session={} serial={}",
base.session.as_deref().unwrap_or("<none>"),
base.serial
.map(|v| v.to_string())
.unwrap_or_else(|| "<none>".to_string())
))
})?;
let expected_session = base.session.as_deref().unwrap_or("");
let expected_serial = base.serial.unwrap_or_default();
if state.session_id != expected_session || state.serial != expected_serial {
return Err(RepoSyncError::Replay(format!(
"delta replay base state mismatch for {notification_uri}: expected session={} serial={}, actual session={} serial={}",
expected_session, expected_serial, state.session_id, state.serial
)));
}
}
ReplayTransport::Rsync => {}
}
Ok(())
}
fn try_rrdp_sync(
store: &RocksStore,
notification_uri: &str,
http_fetcher: &dyn HttpFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RrdpSyncError> {
let notification_xml = {
let _step = timing
.as_ref()
.map(|t| t.span_rrdp_repo_step(notification_uri, "fetch_notification"));
let _total = timing
.as_ref()
.map(|t| t.span_phase("rrdp_fetch_notification_total"));
let mut dl_span = download_log
.map(|dl| dl.span_download(AuditDownloadKind::RrdpNotification, notification_uri));
match http_fetcher.fetch(notification_uri) {
Ok(v) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_notification_fetch_ok_total", 1);
}
if let Some(s) = dl_span.as_mut() {
s.set_bytes(v.len() as u64);
s.set_ok();
}
v
}
Err(e) => {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_notification_fetch_fail_total", 1);
}
if let Some(s) = dl_span.as_mut() {
s.set_err(e.clone());
}
return Err(RrdpSyncError::Fetch(e));
}
}
};
if let Some(t) = timing.as_ref() {
t.record_count(
"rrdp_notification_bytes_total",
notification_xml.len() as u64,
);
}
sync_from_notification_with_timing_and_download_log(
store,
notification_uri,
&notification_xml,
http_fetcher,
timing,
download_log,
)
}
fn is_retryable_http_fetch_error(msg: &str) -> bool {
if msg.contains("http request failed:") || msg.contains("http read body failed:") {
return true;
}
let Some(rest) = msg.strip_prefix("http status ") else {
return false;
};
let code = rest
.trim()
.split_whitespace()
.next()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(0);
code == 408 || code == 429 || (500..600).contains(&code)
}
fn rrdp_retry_backoffs() -> &'static [Duration] {
if cfg!(test) {
&RRDP_RETRY_BACKOFFS_TEST
} else {
&RRDP_RETRY_BACKOFFS_PROD
}
}
fn try_rrdp_sync_with_retry(
store: &RocksStore,
notification_uri: &str,
http_fetcher: &dyn HttpFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RrdpSyncError> {
let backoffs = rrdp_retry_backoffs();
let max_attempts = backoffs.len().saturating_add(1).max(1);
let mut attempt: usize = 0;
loop {
attempt += 1;
crate::progress_log::emit(
"rrdp_sync_attempt",
serde_json::json!({
"notify_uri": notification_uri,
"attempt": attempt,
}),
);
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_retry_attempt_total", 1);
}
match try_rrdp_sync(store, notification_uri, http_fetcher, timing, download_log) {
Ok(written) => {
crate::progress_log::emit(
"rrdp_sync_success",
serde_json::json!({
"notify_uri": notification_uri,
"attempt": attempt,
"objects_written": written,
}),
);
if attempt > 1 {
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_retry_success_total", 1);
}
}
return Ok(written);
}
Err(err) => {
let retryable = match &err {
RrdpSyncError::Fetch(msg) => is_retryable_http_fetch_error(msg),
_ => false,
};
if retryable && attempt < max_attempts {
crate::progress_log::emit(
"rrdp_sync_retry",
serde_json::json!({
"notify_uri": notification_uri,
"attempt": attempt,
"error": err.to_string(),
}),
);
if let Some(t) = timing.as_ref() {
t.record_count("rrdp_retry_sleep_total", 1);
}
let backoff = backoffs
.get(attempt.saturating_sub(1))
.copied()
.unwrap_or_else(|| Duration::from_secs(0));
if !backoff.is_zero() {
thread::sleep(backoff);
}
continue;
}
crate::progress_log::emit(
"rrdp_sync_failed",
serde_json::json!({
"notify_uri": notification_uri,
"attempt": attempt,
"retryable": retryable,
"error": err.to_string(),
}),
);
if let Some(t) = timing.as_ref() {
match &err {
RrdpSyncError::Fetch(_) => t.record_count("rrdp_failed_fetch_total", 1),
RrdpSyncError::Rrdp(_) => t.record_count("rrdp_failed_protocol_total", 1),
RrdpSyncError::Storage(_) => t.record_count("rrdp_failed_storage_total", 1),
}
if retryable && attempt >= max_attempts && attempt > 1 {
t.record_count("rrdp_retry_exhausted_total", 1);
}
}
return Err(err);
}
}
}
}
fn rsync_sync_into_current_store(
store: &RocksStore,
rsync_base_uri: &str,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RepoSyncError> {
let started = std::time::Instant::now();
let sync_scope_uri = rsync_fetcher.dedup_key(rsync_base_uri);
crate::progress_log::emit(
"rsync_sync_start",
serde_json::json!({
"rsync_base_uri": rsync_base_uri,
"sync_scope_uri": &sync_scope_uri,
}),
);
let _s = timing
.as_ref()
.map(|t| t.span_rrdp_repo_step(rsync_base_uri, "rsync_fetch_objects"));
let _p = timing.as_ref().map(|t| t.span_phase("rsync_fetch_total"));
let mut dl_span =
download_log.map(|dl| dl.span_download(AuditDownloadKind::Rsync, rsync_base_uri));
let mut new_set: HashSet<String> = HashSet::new();
let mut uri_to_hash: BTreeMap<String, String> = BTreeMap::new();
let mut pending_raw: BTreeMap<String, RawByHashEntry> = BTreeMap::new();
let (object_count, bytes_total) = match rsync_fetcher.visit_objects(rsync_base_uri, &mut |uri, bytes| {
let sha256_hex = compute_sha256_hex(&bytes);
new_set.insert(uri.clone());
uri_to_hash.insert(uri.clone(), sha256_hex.clone());
let entry = pending_raw
.entry(sha256_hex.clone())
.or_insert_with(|| RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.clone()));
if entry.bytes != bytes {
return Err(format!(
"raw_by_hash collision for {uri}: same sha256 maps to different bytes"
));
}
if !entry.origin_uris.iter().any(|existing| existing == &uri) {
entry.origin_uris.push(uri.clone());
}
if entry.object_type.is_none() {
entry.object_type = infer_object_type_from_uri(&uri);
}
Ok(())
}) {
Ok(v) => {
if let Some(s) = dl_span.as_mut() {
s.set_objects(v.0 as u64, v.1);
s.set_bytes(v.1);
s.set_ok();
}
v
}
Err(e) => {
if let Some(s) = dl_span.as_mut() {
s.set_err(e.to_string());
}
return Err(e.into());
}
};
crate::progress_log::emit(
"rsync_sync_fetch_done",
serde_json::json!({
"rsync_base_uri": rsync_base_uri,
"sync_scope_uri": &sync_scope_uri,
"object_count": object_count,
"bytes_total": bytes_total,
"duration_ms": started.elapsed().as_millis() as u64,
}),
);
if let Some(t) = timing.as_ref() {
t.record_count("rsync_objects_fetched_total", object_count as u64);
t.record_count("rsync_objects_bytes_total", bytes_total);
}
drop(_p);
let existing_view = store
.list_repository_view_entries_with_prefix(&sync_scope_uri)
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
let _proj = timing
.as_ref()
.map(|t| t.span_phase("rsync_write_current_store_total"));
let hashes: Vec<String> = pending_raw.keys().cloned().collect();
let existing_entries = store
.get_raw_by_hash_entries_batch(&hashes)
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
let mut entries_to_write = Vec::new();
for (hash, existing_opt) in hashes.into_iter().zip(existing_entries.into_iter()) {
let mut pending_entry = pending_raw
.remove(&hash)
.ok_or_else(|| RepoSyncError::Storage(format!("missing pending raw entry for {hash}")))?;
match existing_opt {
Some(mut existing) => {
if existing.bytes != pending_entry.bytes {
return Err(RepoSyncError::Storage(format!(
"raw_by_hash collision for hash {hash}: same sha256 maps to different bytes"
)));
}
let mut changed = false;
for uri in pending_entry.origin_uris.drain(..) {
if !existing.origin_uris.iter().any(|existing_uri| existing_uri == &uri) {
existing.origin_uris.push(uri);
changed = true;
}
}
if existing.object_type.is_none() && pending_entry.object_type.is_some() {
existing.object_type = pending_entry.object_type;
changed = true;
}
if changed {
entries_to_write.push(existing);
}
}
None => entries_to_write.push(pending_entry),
}
}
let mut repository_view_entries = Vec::new();
for entry in existing_view {
if !new_set.contains(&entry.rsync_uri) {
repository_view_entries.push(build_repository_view_withdrawn_entry(
&sync_scope_uri,
&entry.rsync_uri,
entry.current_hash,
));
}
}
for uri in &new_set {
let current_hash = uri_to_hash
.get(uri)
.cloned()
.ok_or_else(|| RepoSyncError::Storage(format!("missing raw_by_hash mapping for {uri}")))?;
repository_view_entries.push(build_repository_view_present_entry(
&sync_scope_uri,
uri,
&current_hash,
));
}
store
.put_raw_by_hash_entries_batch_unchecked(&entries_to_write)
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
store
.put_projection_batch(&repository_view_entries, &[], &[])
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
let total_duration_ms = started.elapsed().as_millis() as u64;
crate::progress_log::emit(
"rsync_sync_done",
serde_json::json!({
"rsync_base_uri": rsync_base_uri,
"sync_scope_uri": &sync_scope_uri,
"object_count": object_count,
"bytes_total": bytes_total,
"duration_ms": total_duration_ms,
}),
);
if (total_duration_ms as f64) / 1000.0 >= crate::progress_log::slow_threshold_secs() {
crate::progress_log::emit(
"rsync_sync_slow",
serde_json::json!({
"rsync_base_uri": rsync_base_uri,
"sync_scope_uri": &sync_scope_uri,
"object_count": object_count,
"bytes_total": bytes_total,
"duration_ms": total_duration_ms,
}),
);
}
Ok(object_count)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::analysis::timing::{TimingHandle, TimingMeta};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::replay::archive::{ReplayArchiveIndex, sha256_hex};
use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::replay::delta_fetch_http::PayloadDeltaReplayHttpFetcher;
use crate::replay::delta_fetch_rsync::PayloadDeltaReplayRsyncFetcher;
use crate::replay::fetch_http::PayloadReplayHttpFetcher;
use crate::replay::fetch_rsync::PayloadReplayRsyncFetcher;
use crate::storage::RepositoryViewState;
use crate::sync::store_projection::build_repository_view_present_entry;
use crate::sync::rrdp::Fetcher as HttpFetcher;
use crate::sync::rrdp::RrdpState;
use base64::Engine;
use sha2::Digest;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DummyHttpFetcher;
impl HttpFetcher for DummyHttpFetcher {
fn fetch(&self, _url: &str) -> Result<Vec<u8>, String> {
panic!("http fetcher must not be used in rsync-only mode")
}
}
struct PanicRsyncFetcher;
impl RsyncFetcher for PanicRsyncFetcher {
fn fetch_objects(
&self,
_rsync_base_uri: &str,
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
panic!("rsync must not be used in this test")
}
}
struct MapFetcher {
map: HashMap<String, Vec<u8>>,
}
impl HttpFetcher for MapFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
self.map
.get(uri)
.cloned()
.ok_or_else(|| format!("not found: {uri}"))
}
}
fn assert_current_object(store: &RocksStore, uri: &str, expected: &[u8]) {
assert_eq!(
store
.load_current_object_bytes_by_uri(uri)
.expect("load current object"),
Some(expected.to_vec())
);
}
#[test]
fn rsync_sync_uses_fetcher_dedup_scope_for_repository_view_projection() {
struct ScopeFetcher;
impl RsyncFetcher for ScopeFetcher {
fn fetch_objects(
&self,
_rsync_base_uri: &str,
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
Ok(vec![(
"rsync://example.net/repo/child/a.mft".to_string(),
b"manifest".to_vec(),
)])
}
fn dedup_key(&self, _rsync_base_uri: &str) -> String {
"rsync://example.net/repo/".to_string()
}
}
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let seeded = build_repository_view_present_entry(
"rsync://example.net/repo/",
"rsync://example.net/repo/sibling/old.roa",
&compute_sha256_hex(b"old"),
);
store
.put_projection_batch(&[seeded], &[], &[])
.expect("seed repository view");
let fetcher = ScopeFetcher;
let written = rsync_sync_into_current_store(
&store,
"rsync://example.net/repo/child/",
&fetcher,
None,
None,
)
.expect("sync ok");
assert_eq!(written, 1);
let entries = store
.list_repository_view_entries_with_prefix("rsync://example.net/repo/")
.expect("list repository view");
let sibling = entries
.iter()
.find(|entry| entry.rsync_uri == "rsync://example.net/repo/sibling/old.roa")
.expect("sibling entry exists");
assert_eq!(sibling.state, RepositoryViewState::Withdrawn);
let child = entries
.iter()
.find(|entry| entry.rsync_uri == "rsync://example.net/repo/child/a.mft")
.expect("child entry exists");
assert_eq!(child.state, RepositoryViewState::Present);
}
fn notification_xml(
session_id: &str,
serial: u64,
snapshot_uri: &str,
snapshot_hash: &str,
) -> Vec<u8> {
format!(
r#"<notification xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{session_id}" serial="{serial}"><snapshot uri="{snapshot_uri}" hash="{snapshot_hash}"/></notification>"#
)
.into_bytes()
}
fn snapshot_xml(session_id: &str, serial: u64, published: &[(&str, &[u8])]) -> Vec<u8> {
let mut out = format!(
r#"<snapshot xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{session_id}" serial="{serial}">"#
);
for (uri, bytes) in published {
let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
out.push_str(&format!(r#"<publish uri="{uri}">{b64}</publish>"#));
}
out.push_str("</snapshot>");
out.into_bytes()
}
fn build_replay_archive_fixture() -> (
tempfile::TempDir,
std::path::PathBuf,
std::path::PathBuf,
String,
String,
String,
String,
) {
let temp = tempfile::tempdir().expect("tempdir");
let archive_root = temp.path().join("payload-archive");
let capture = "repo-replay";
let capture_root = archive_root.join("v1").join("captures").join(capture);
std::fs::create_dir_all(&capture_root).expect("mkdir capture root");
std::fs::write(
capture_root.join("capture.json"),
format!(
r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-13T00:00:00Z","notes":""}}"#
),
)
.expect("write capture json");
let notify_uri = "https://rrdp.example.test/notification.xml".to_string();
let snapshot_uri = "https://rrdp.example.test/snapshot.xml".to_string();
let session = "00000000-0000-0000-0000-000000000001".to_string();
let serial = 7u64;
let published_uri = "rsync://example.test/repo/a.mft".to_string();
let published_bytes = b"mft";
let snapshot = snapshot_xml(&session, serial, &[(&published_uri, published_bytes)]);
let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot));
let notification = notification_xml(&session, serial, &snapshot_uri, &snapshot_hash);
let repo_hash = sha256_hex(notify_uri.as_bytes());
let session_dir = capture_root
.join("rrdp/repos")
.join(&repo_hash)
.join(&session);
std::fs::create_dir_all(&session_dir).expect("mkdir session dir");
std::fs::write(
session_dir.parent().unwrap().join("meta.json"),
format!(
r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"#
),
)
.expect("write repo meta");
std::fs::write(session_dir.join("notification-7.xml"), notification)
.expect("write notification");
std::fs::write(
session_dir.join(format!("snapshot-7-{snapshot_hash}.xml")),
&snapshot,
)
.expect("write snapshot");
let rsync_base_uri = "rsync://rsync.example.test/repo/".to_string();
let rsync_locked_notify = "https://rrdp-fallback.example.test/notification.xml".to_string();
let mod_hash = sha256_hex(rsync_base_uri.as_bytes());
let module_bucket_dir = capture_root.join("rsync/modules").join(&mod_hash);
let module_root = module_bucket_dir
.join("tree")
.join("rsync.example.test")
.join("repo");
std::fs::create_dir_all(module_root.join("sub")).expect("mkdir module tree");
std::fs::write(
module_bucket_dir.join("meta.json"),
format!(
r#"{{"version":1,"module":"{rsync_base_uri}","createdAt":"2026-03-13T00:00:00Z","lastSeenAt":"2026-03-13T00:00:01Z"}}"#
),
)
.expect("write rsync meta");
std::fs::write(module_root.join("sub").join("fallback.cer"), b"cer")
.expect("write rsync object");
let locks_path = temp.path().join("locks.json");
std::fs::write(
&locks_path,
format!(
r#"{{
"version":1,
"capture":"{capture}",
"rrdp":{{
"{notify_uri}":{{"transport":"rrdp","session":"{session}","serial":{serial}}},
"{rsync_locked_notify}":{{"transport":"rsync","session":null,"serial":null}}
}},
"rsync":{{
"{rsync_base_uri}":{{"transport":"rsync"}}
}}
}}"#
),
)
.expect("write locks");
(
temp,
archive_root,
locks_path,
notify_uri,
rsync_locked_notify,
rsync_base_uri,
published_uri,
)
}
fn build_delta_replay_fixture() -> (
tempfile::TempDir,
std::path::PathBuf,
std::path::PathBuf,
std::path::PathBuf,
std::path::PathBuf,
String,
String,
String,
) {
let temp = tempfile::tempdir().expect("tempdir");
let base_archive = temp.path().join("payload-archive");
let base_capture_root = base_archive.join("v1/captures/base-cap");
std::fs::create_dir_all(&base_capture_root).expect("mkdir base capture");
std::fs::write(
base_capture_root.join("capture.json"),
r#"{"version":1,"captureId":"base-cap","createdAt":"2026-03-16T00:00:00Z","notes":""}"#,
)
.expect("write base capture meta");
let notify_uri = "https://rrdp.example.test/notification.xml".to_string();
let snapshot_uri = "https://rrdp.example.test/snapshot.xml".to_string();
let session = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string();
let base_serial = 10u64;
let delta1_uri = "https://rrdp.example.test/d1.xml".to_string();
let delta2_uri = "https://rrdp.example.test/d2.xml".to_string();
let repo_hash = sha256_hex(notify_uri.as_bytes());
let base_session_dir = base_capture_root
.join("rrdp/repos")
.join(&repo_hash)
.join(&session);
std::fs::create_dir_all(&base_session_dir).expect("mkdir base session dir");
std::fs::write(
base_session_dir.parent().unwrap().join("meta.json"),
format!(r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#),
)
.expect("write base rrdp meta");
let base_snapshot = snapshot_xml(
&session,
base_serial,
&[("rsync://example.test/repo/a.mft", b"base")],
);
let base_snapshot_hash = hex::encode(sha2::Sha256::digest(&base_snapshot));
let base_notification =
notification_xml(&session, base_serial, &snapshot_uri, &base_snapshot_hash);
std::fs::write(
base_session_dir.join("notification-10.xml"),
base_notification,
)
.expect("write base notif");
std::fs::write(
base_session_dir.join(format!("snapshot-10-{base_snapshot_hash}.xml")),
base_snapshot,
)
.expect("write base snapshot");
let module_uri = "rsync://rsync.example.test/repo/".to_string();
let module_hash = sha256_hex(module_uri.as_bytes());
let base_module_bucket = base_capture_root.join("rsync/modules").join(&module_hash);
let base_module_tree = base_module_bucket.join("tree/rsync.example.test/repo");
std::fs::create_dir_all(base_module_tree.join("sub")).expect("mkdir base rsync tree");
std::fs::write(
base_module_bucket.join("meta.json"),
format!(r#"{{"version":1,"module":"{module_uri}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#),
)
.expect("write base module meta");
std::fs::write(base_module_tree.join("a.mft"), b"base").expect("write base a.mft");
std::fs::write(base_module_tree.join("sub").join("x.cer"), b"base-cer")
.expect("write base x.cer");
let base_locks = temp.path().join("base-locks.json");
let fallback_notify = "https://rrdp-fallback.example.test/notification.xml".to_string();
let base_locks_body = format!(
r#"{{"version":1,"capture":"base-cap","rrdp":{{"{notify_uri}":{{"transport":"rrdp","session":"{session}","serial":10}},"{fallback_notify}":{{"transport":"rsync","session":null,"serial":null}}}},"rsync":{{"{module_uri}":{{"transport":"rsync"}}}}}}"#
);
std::fs::write(&base_locks, &base_locks_body).expect("write base locks");
let base_locks_sha = sha256_hex(base_locks_body.as_bytes());
let delta_archive = temp.path().join("payload-delta-archive");
let delta_capture_root = delta_archive.join("v1/captures/delta-cap");
std::fs::create_dir_all(&delta_capture_root).expect("mkdir delta capture");
std::fs::write(
delta_capture_root.join("capture.json"),
r#"{"version":1,"captureId":"delta-cap","createdAt":"2026-03-16T00:00:00Z","notes":""}"#,
)
.expect("write delta capture meta");
std::fs::write(
delta_capture_root.join("base.json"),
format!(r#"{{"version":1,"baseCapture":"base-cap","baseLocksSha256":"{base_locks_sha}","createdAt":"2026-03-16T00:00:00Z"}}"#),
)
.expect("write delta base meta");
let delta_session_dir = delta_capture_root
.join("rrdp/repos")
.join(&repo_hash)
.join(&session);
let delta_deltas_dir = delta_session_dir.join("deltas");
std::fs::create_dir_all(&delta_deltas_dir).expect("mkdir delta deltas");
std::fs::write(
delta_session_dir.parent().unwrap().join("meta.json"),
format!(r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#),
)
.expect("write delta meta");
std::fs::write(
delta_session_dir.parent().unwrap().join("transition.json"),
format!(r#"{{"kind":"delta","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}}"#),
)
.expect("write delta transition");
let delta1 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{session}" serial="11"><publish uri="rsync://example.test/repo/a.mft">{}</publish></delta>"#,
base64::engine::general_purpose::STANDARD.encode(b"delta-a")
);
let delta2 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{session}" serial="12"><publish uri="rsync://example.test/repo/sub/b.roa">{}</publish></delta>"#,
base64::engine::general_purpose::STANDARD.encode(b"delta-b")
);
let delta1_hash = hex::encode(sha2::Sha256::digest(delta1.as_bytes()));
let delta2_hash = hex::encode(sha2::Sha256::digest(delta2.as_bytes()));
let target_notification = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<notification xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{session}" serial="12">
<snapshot uri="{snapshot_uri}" hash="00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff" />
<delta serial="11" uri="{delta1_uri}" hash="{delta1_hash}" />
<delta serial="12" uri="{delta2_uri}" hash="{delta2_hash}" />
</notification>"#
);
std::fs::write(
delta_session_dir.join("notification-target-12.xml"),
target_notification,
)
.expect("write target notification");
std::fs::write(delta_deltas_dir.join("delta-11-aaaa.xml"), delta1).expect("write delta11");
std::fs::write(delta_deltas_dir.join("delta-12-bbbb.xml"), delta2).expect("write delta12");
let delta_module_bucket = delta_capture_root.join("rsync/modules").join(&module_hash);
let delta_module_tree = delta_module_bucket.join("tree/rsync.example.test/repo");
std::fs::create_dir_all(delta_module_tree.join("sub")).expect("mkdir delta rsync tree");
std::fs::write(
delta_module_bucket.join("meta.json"),
format!(r#"{{"version":1,"module":"{module_uri}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#),
)
.expect("write delta rsync meta");
std::fs::write(
delta_module_bucket.join("files.json"),
format!(r#"{{"version":1,"module":"{module_uri}","fileCount":1,"files":["{module_uri}sub/x.cer"]}}"#),
)
.expect("write delta files");
std::fs::write(delta_module_tree.join("sub").join("x.cer"), b"overlay-cer")
.expect("write overlay file");
let fallback_hash = sha256_hex(fallback_notify.as_bytes());
let fallback_repo_dir = delta_capture_root.join("rrdp/repos").join(&fallback_hash);
std::fs::create_dir_all(&fallback_repo_dir).expect("mkdir fallback repo dir");
std::fs::write(
fallback_repo_dir.join("meta.json"),
format!(r#"{{"version":1,"rpkiNotify":"{fallback_notify}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#),
)
.expect("write fallback meta");
std::fs::write(
fallback_repo_dir.join("transition.json"),
r#"{"kind":"fallback-rsync","base":{"transport":"rsync","session":null,"serial":null},"target":{"transport":"rsync","session":null,"serial":null},"delta_count":0,"deltas":[]}"#,
)
.expect("write fallback transition");
let delta_locks = temp.path().join("locks-delta.json");
std::fs::write(
&delta_locks,
format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"{base_locks_sha}","rrdp":{{"{notify_uri}":{{"kind":"delta","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}},"{fallback_notify}":{{"kind":"fallback-rsync","base":{{"transport":"rsync","session":null,"serial":null}},"target":{{"transport":"rsync","session":null,"serial":null}},"delta_count":0,"deltas":[]}}}},"rsync":{{"{module_uri}":{{"file_count":1,"overlay_only":false}}}}}}"#),
)
.expect("write delta locks");
(
temp,
base_archive,
base_locks,
delta_archive,
delta_locks,
notify_uri,
fallback_notify,
module_uri,
)
}
fn timing_to_json(temp_dir: &std::path::Path, timing: &TimingHandle) -> serde_json::Value {
let timing_path = temp_dir.join("timing_retry.json");
timing.write_json(&timing_path, 50).expect("write json");
serde_json::from_slice(&std::fs::read(&timing_path).expect("read json"))
.expect("parse json")
}
#[test]
fn rsync_sync_writes_current_store_and_records_counts() {
let temp = tempfile::tempdir().expect("tempdir");
let repo_dir = temp.path().join("repo");
std::fs::create_dir_all(repo_dir.join("sub")).expect("mkdir");
std::fs::write(repo_dir.join("a.mft"), b"mft").expect("write");
std::fs::write(repo_dir.join("sub").join("b.roa"), b"roa").expect("write");
std::fs::write(repo_dir.join("sub").join("c.cer"), b"cer").expect("write");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let timing = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
tal_url: None,
db_path: Some(store_dir.to_string_lossy().into_owned()),
});
let policy = Policy {
sync_preference: SyncPreference::RsyncOnly,
..Policy::default()
};
let http = DummyHttpFetcher;
let rsync = LocalDirRsyncFetcher::new(&repo_dir);
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
None,
"rsync://example.test/repo/",
&http,
&rsync,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 3);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, AuditDownloadKind::Rsync);
assert!(events[0].success);
assert_eq!(events[0].bytes, Some(9));
let objects = events[0].objects.as_ref().expect("objects stat");
assert_eq!(objects.objects_count, 3);
assert_eq!(objects.objects_bytes_total, 9);
assert_current_object(&store, "rsync://example.test/repo/a.mft", b"mft");
assert_current_object(&store, "rsync://example.test/repo/sub/b.roa", b"roa");
assert_current_object(&store, "rsync://example.test/repo/sub/c.cer", b"cer");
let view = store
.get_repository_view_entry("rsync://example.test/repo/a.mft")
.expect("get repository view")
.expect("repository view entry present");
assert_eq!(
view.current_hash.as_deref(),
Some(hex::encode(sha2::Sha256::digest(b"mft")).as_str())
);
assert_eq!(
view.repository_source.as_deref(),
Some("rsync://example.test/repo/")
);
let raw = store
.get_raw_by_hash_entry(hex::encode(sha2::Sha256::digest(b"roa")).as_str())
.expect("get raw_by_hash")
.expect("raw_by_hash entry present");
assert!(
raw.origin_uris
.iter()
.any(|uri| uri == "rsync://example.test/repo/sub/b.roa")
);
let timing_path = temp.path().join("timing.json");
timing.write_json(&timing_path, 5).expect("write json");
let v: serde_json::Value =
serde_json::from_slice(&std::fs::read(&timing_path).expect("read json"))
.expect("parse json");
let counts = v.get("counts").expect("counts");
assert_eq!(
counts
.get("rsync_objects_fetched_total")
.and_then(|v| v.as_u64()),
Some(3)
);
assert_eq!(
counts
.get("rsync_objects_bytes_total")
.and_then(|v| v.as_u64()),
Some(3 * 3)
);
}
#[test]
fn rsync_second_sync_marks_missing_repository_view_entries_withdrawn() {
let temp = tempfile::tempdir().expect("tempdir");
let repo_dir = temp.path().join("repo");
std::fs::create_dir_all(repo_dir.join("sub")).expect("mkdir");
std::fs::write(repo_dir.join("a.mft"), b"mft-v1").expect("write a");
std::fs::write(repo_dir.join("sub").join("b.roa"), b"roa-v1").expect("write b");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let policy = Policy {
sync_preference: SyncPreference::RsyncOnly,
..Policy::default()
};
let http = DummyHttpFetcher;
let rsync = LocalDirRsyncFetcher::new(&repo_dir);
sync_publication_point(
&store,
&policy,
None,
"rsync://example.test/repo/",
&http,
&rsync,
None,
None,
)
.expect("first sync ok");
std::fs::remove_file(repo_dir.join("sub").join("b.roa")).expect("remove b");
std::fs::write(repo_dir.join("c.crl"), b"crl-v2").expect("write c");
sync_publication_point(
&store,
&policy,
None,
"rsync://example.test/repo/",
&http,
&rsync,
None,
None,
)
.expect("second sync ok");
let withdrawn = store
.get_repository_view_entry("rsync://example.test/repo/sub/b.roa")
.expect("get withdrawn repo view")
.expect("withdrawn entry exists");
assert_eq!(
withdrawn.state,
crate::storage::RepositoryViewState::Withdrawn
);
assert_eq!(
withdrawn.repository_source.as_deref(),
Some("rsync://example.test/repo/")
);
let added = store
.get_repository_view_entry("rsync://example.test/repo/c.crl")
.expect("get added repo view")
.expect("added entry exists");
assert_eq!(added.state, crate::storage::RepositoryViewState::Present);
}
#[test]
fn rrdp_retry_succeeds_without_rsync_when_notification_fetch_is_transient() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let timing = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
tal_url: None,
db_path: Some(store_dir.to_string_lossy().into_owned()),
});
let notification_uri = "https://example.test/notification.xml";
let snapshot_uri = "https://example.test/snapshot.xml";
let published_uri = "rsync://example.test/repo/a.mft";
let published_bytes = b"x";
let snapshot = snapshot_xml(
"9df4b597-af9e-4dca-bdda-719cce2c4e28",
1,
&[(published_uri, published_bytes)],
);
let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot));
let notif = notification_xml(
"9df4b597-af9e-4dca-bdda-719cce2c4e28",
1,
snapshot_uri,
&snapshot_hash,
);
let mut map = HashMap::new();
map.insert(notification_uri.to_string(), notif);
map.insert(snapshot_uri.to_string(), snapshot);
struct RetryThenMap {
inner: MapFetcher,
notification_uri: String,
fail_times: usize,
notification_calls: AtomicUsize,
}
impl HttpFetcher for RetryThenMap {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
if uri == self.notification_uri {
let n = self.notification_calls.fetch_add(1, Ordering::SeqCst);
if n < self.fail_times {
return Err("http request failed: simulated transient".to_string());
}
}
self.inner.fetch(uri)
}
}
let http = RetryThenMap {
inner: MapFetcher { map },
notification_uri: notification_uri.to_string(),
fail_times: 2,
notification_calls: AtomicUsize::new(0),
};
let policy = Policy {
sync_preference: SyncPreference::RrdpThenRsync,
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
Some(notification_uri),
"rsync://example.test/repo/",
&http,
&PanicRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_current_object(&store, published_uri, published_bytes);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4, "expected 3x notification + 1x snapshot");
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification)
.count(),
3
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification && !e.success)
.count(),
2
);
let v = timing_to_json(temp.path(), &timing);
let counts = v.get("counts").expect("counts");
assert_eq!(
counts
.get("rrdp_retry_attempt_total")
.and_then(|v| v.as_u64()),
Some(3)
);
assert_eq!(
counts
.get("rrdp_retry_success_total")
.and_then(|v| v.as_u64()),
Some(1)
);
assert_eq!(
counts
.get("repo_sync_rrdp_ok_total")
.and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn rrdp_protocol_error_does_not_retry_and_falls_back_to_rsync() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let timing = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
tal_url: None,
db_path: Some(store_dir.to_string_lossy().into_owned()),
});
let notification_uri = "https://example.test/notification.xml";
let snapshot_uri = "https://example.test/snapshot.xml";
let published_uri = "rsync://example.test/repo/a.mft";
let published_bytes = b"x";
let snapshot = snapshot_xml(
"9df4b597-af9e-4dca-bdda-719cce2c4e28",
1,
&[(published_uri, published_bytes)],
);
// Intentionally wrong hash to trigger protocol error (SnapshotHashMismatch).
let wrong_hash = "00".repeat(32);
let notif = notification_xml(
"9df4b597-af9e-4dca-bdda-719cce2c4e28",
1,
snapshot_uri,
&wrong_hash,
);
let mut map = HashMap::new();
map.insert(notification_uri.to_string(), notif);
map.insert(snapshot_uri.to_string(), snapshot);
let http = MapFetcher { map };
struct EmptyRsyncFetcher;
impl RsyncFetcher for EmptyRsyncFetcher {
fn fetch_objects(
&self,
_rsync_base_uri: &str,
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
Ok(Vec::new())
}
}
let policy = Policy {
sync_preference: SyncPreference::RrdpThenRsync,
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
Some(notification_uri),
"rsync://example.test/repo/",
&http,
&EmptyRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert!(
out.warnings
.iter()
.any(|w| w.message.contains("RRDP failed; falling back to rsync")),
"expected RRDP fallback warning"
);
let events = download_log.snapshot_events();
assert_eq!(
events.len(),
3,
"expected notification + snapshot + rsync fallback"
);
assert_eq!(events[0].kind, AuditDownloadKind::RrdpNotification);
assert!(events[0].success);
assert_eq!(events[1].kind, AuditDownloadKind::RrdpSnapshot);
assert!(events[1].success);
assert_eq!(events[2].kind, AuditDownloadKind::Rsync);
assert!(events[2].success);
let v = timing_to_json(temp.path(), &timing);
let counts = v.get("counts").expect("counts");
assert_eq!(
counts
.get("rrdp_retry_attempt_total")
.and_then(|v| v.as_u64()),
Some(1)
);
assert_eq!(
counts
.get("rrdp_failed_protocol_total")
.and_then(|v| v.as_u64()),
Some(1)
);
assert_eq!(
counts
.get("repo_sync_rrdp_failed_total")
.and_then(|v| v.as_u64()),
Some(1)
);
assert_eq!(
counts
.get("repo_sync_rsync_fallback_ok_total")
.and_then(|v| v.as_u64()),
Some(1)
);
}
#[test]
fn rrdp_delta_fetches_are_logged_even_if_snapshot_fallback_is_used() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let timing = TimingHandle::new(TimingMeta {
recorded_at_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
validation_time_utc_rfc3339: "2026-02-28T00:00:00Z".to_string(),
tal_url: None,
db_path: Some(store_dir.to_string_lossy().into_owned()),
});
let notification_uri = "https://example.test/notification.xml";
let snapshot_uri = "https://example.test/snapshot.xml";
let delta_2_uri = "https://example.test/delta_2.xml";
let delta_3_uri = "https://example.test/delta_3.xml";
let published_uri = "rsync://example.test/repo/a.mft";
let published_bytes = b"x";
let sid = "9df4b597-af9e-4dca-bdda-719cce2c4e28";
// Seed old RRDP state so sync_from_notification tries deltas (RFC 8182 §3.4.1).
let state = RrdpState {
session_id: sid.to_string(),
serial: 1,
};
persist_rrdp_local_state(
&store,
notification_uri,
&state,
RrdpSourceSyncState::DeltaReady,
Some(snapshot_uri),
None,
)
.expect("seed state");
let delta_2 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="2"></delta>"#
)
.into_bytes();
let delta_3 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="3"></delta>"#
)
.into_bytes();
let delta_2_hash = hex::encode(sha2::Sha256::digest(&delta_2));
let delta_3_hash = hex::encode(sha2::Sha256::digest(&delta_3));
let snapshot = snapshot_xml(sid, 3, &[(published_uri, published_bytes)]);
let snapshot_hash = hex::encode(sha2::Sha256::digest(&snapshot));
let notif = format!(
r#"<notification xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="3"><snapshot uri="{snapshot_uri}" hash="{snapshot_hash}"/><delta serial="2" uri="{delta_2_uri}" hash="{delta_2_hash}"/><delta serial="3" uri="{delta_3_uri}" hash="{delta_3_hash}"/></notification>"#
)
.into_bytes();
let mut map = HashMap::new();
map.insert(notification_uri.to_string(), notif);
map.insert(snapshot_uri.to_string(), snapshot);
map.insert(delta_2_uri.to_string(), delta_2);
map.insert(delta_3_uri.to_string(), delta_3);
let http = MapFetcher { map };
let policy = Policy {
sync_preference: SyncPreference::RrdpThenRsync,
..Policy::default()
};
let download_log = DownloadLogHandle::new();
let out = sync_publication_point(
&store,
&policy,
Some(notification_uri),
"rsync://example.test/repo/",
&http,
&PanicRsyncFetcher,
Some(&timing),
Some(&download_log),
)
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 1);
assert_current_object(&store, published_uri, published_bytes);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpNotification)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpDelta)
.count(),
2
);
assert_eq!(
events
.iter()
.filter(|e| e.kind == AuditDownloadKind::RrdpSnapshot)
.count(),
1
);
assert!(events.iter().all(|e| e.success));
}
#[test]
fn replay_sync_uses_rrdp_when_locked_to_rrdp() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_archive_temp,
archive_root,
locks_path,
notify_uri,
_rsync_locked_notify,
_rsync_base_uri,
published_uri,
) = build_replay_archive_fixture();
let replay_index =
ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index");
let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay http fetcher");
let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay rsync fetcher");
let out = sync_publication_point_replay(
&store,
&replay_index,
Some(&notify_uri),
"rsync://example.test/repo/",
&http,
&rsync,
None,
None,
)
.expect("replay sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 1);
assert_current_object(&store, &published_uri, b"mft");
}
#[test]
fn replay_sync_uses_rsync_when_notification_is_locked_to_rsync() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_archive_temp,
archive_root,
locks_path,
_notify_uri,
rsync_locked_notify,
rsync_base_uri,
_published_uri,
) = build_replay_archive_fixture();
let replay_index =
ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index");
let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay http fetcher");
let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay rsync fetcher");
let out = sync_publication_point_replay(
&store,
&replay_index,
Some(&rsync_locked_notify),
&rsync_base_uri,
&http,
&rsync,
None,
None,
)
.expect("replay rsync sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 1);
assert_eq!(out.warnings.len(), 0);
assert_current_object(&store, "rsync://rsync.example.test/repo/sub/fallback.cer", b"cer");
}
#[test]
fn replay_sync_errors_when_lock_is_missing() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_archive_temp,
archive_root,
locks_path,
_notify_uri,
_rsync_locked_notify,
_rsync_base_uri,
_published_uri,
) = build_replay_archive_fixture();
let replay_index =
ReplayArchiveIndex::load(&archive_root, &locks_path).expect("load replay index");
let http = PayloadReplayHttpFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay http fetcher");
let rsync = PayloadReplayRsyncFetcher::from_paths(&archive_root, &locks_path)
.expect("build replay rsync fetcher");
let err = sync_publication_point_replay(
&store,
&replay_index,
Some("https://missing.example/notification.xml"),
"rsync://missing.example/repo/",
&http,
&rsync,
None,
None,
)
.unwrap_err();
assert!(matches!(err, RepoSyncError::Replay(_)), "{err}");
}
#[test]
fn delta_replay_sync_applies_rrdp_deltas_when_base_state_matches() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_fixture,
base_archive,
base_locks,
delta_archive,
delta_locks,
notify_uri,
_fallback_notify,
module_uri,
) = build_delta_replay_fixture();
let base_index = Arc::new(
ReplayArchiveIndex::load(&base_archive, &base_locks).expect("load base index"),
);
let delta_index = Arc::new(
ReplayDeltaArchiveIndex::load(&delta_archive, &delta_locks).expect("load delta index"),
);
let http = PayloadDeltaReplayHttpFetcher::from_index(delta_index.clone())
.expect("build delta http fetcher");
let rsync = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone());
let state = RrdpState {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let out = sync_publication_point_replay_delta(
&store,
&delta_index,
Some(&notify_uri),
&module_uri,
&http,
&rsync,
None,
None,
)
.expect("delta sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 2);
assert_current_object(&store, "rsync://example.test/repo/a.mft", b"delta-a");
assert_current_object(&store, "rsync://example.test/repo/sub/b.roa", b"delta-b");
let new_state = load_rrdp_local_state(&store, &notify_uri)
.expect("load current state")
.expect("rrdp state present");
assert_eq!(new_state.serial, 12);
}
#[test]
fn delta_replay_sync_rejects_base_state_mismatch() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_fixture,
base_archive,
base_locks,
delta_archive,
delta_locks,
notify_uri,
_fallback_notify,
module_uri,
) = build_delta_replay_fixture();
let base_index = Arc::new(
ReplayArchiveIndex::load(&base_archive, &base_locks).expect("load base index"),
);
let delta_index = Arc::new(
ReplayDeltaArchiveIndex::load(&delta_archive, &delta_locks).expect("load delta index"),
);
let http = PayloadDeltaReplayHttpFetcher::from_index(delta_index.clone())
.expect("build delta http fetcher");
let rsync = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone());
let err = sync_publication_point_replay_delta(
&store,
&delta_index,
Some(&notify_uri),
&module_uri,
&http,
&rsync,
None,
None,
)
.unwrap_err();
assert!(matches!(err, RepoSyncError::Replay(_)), "{err}");
}
#[test]
fn delta_replay_sync_noops_unchanged_rrdp_repo() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_fixture,
base_archive,
base_locks,
delta_archive,
delta_locks,
notify_uri,
_fallback_notify,
module_uri,
) = build_delta_replay_fixture();
let state = RrdpState {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let base_locks_body = std::fs::read_to_string(&base_locks).expect("read base locks");
let base_locks_sha = sha256_hex(base_locks_body.as_bytes());
std::fs::write(
&delta_locks,
format!(r#"{{"version":1,"capture":"delta-cap","baseCapture":"base-cap","baseLocksSha256":"{base_locks_sha}","rrdp":{{"{notify_uri}":{{"kind":"unchanged","base":{{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":10}},"target":{{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":10}},"delta_count":0,"deltas":[]}},"https://rrdp-fallback.example.test/notification.xml":{{"kind":"fallback-rsync","base":{{"transport":"rsync","session":null,"serial":null}},"target":{{"transport":"rsync","session":null,"serial":null}},"delta_count":0,"deltas":[]}}}},"rsync":{{"rsync://rsync.example.test/repo/":{{"file_count":1,"overlay_only":true}}}}}}"#),
)
.expect("rewrite delta locks");
let repo_hash = sha256_hex(notify_uri.as_bytes());
let repo_dir = delta_archive
.join("v1/captures/delta-cap/rrdp/repos")
.join(&repo_hash);
std::fs::write(
repo_dir.join("transition.json"),
r#"{"kind":"unchanged","base":{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":10},"target":{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":10},"delta_count":0,"deltas":[]}"#,
).expect("rewrite transition");
let delta_index =
ReplayDeltaArchiveIndex::load(&delta_archive, &delta_locks).expect("load delta index");
let http = PayloadDeltaReplayHttpFetcher::from_index(Arc::new(delta_index.clone()))
.expect("build delta http fetcher");
let base_index = Arc::new(
ReplayArchiveIndex::load(&base_archive, &base_locks).expect("load base index"),
);
let rsync = PayloadDeltaReplayRsyncFetcher::new(base_index, Arc::new(delta_index.clone()));
let out = sync_publication_point_replay_delta(
&store,
&delta_index,
Some(&notify_uri),
&module_uri,
&http,
&rsync,
None,
None,
)
.expect("unchanged delta sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 0);
}
#[test]
fn delta_replay_sync_uses_rsync_overlay_for_fallback_rsync_kind() {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_fixture,
base_archive,
base_locks,
delta_archive,
delta_locks,
_notify_uri,
fallback_notify,
module_uri,
) = build_delta_replay_fixture();
let base_index = Arc::new(
ReplayArchiveIndex::load(&base_archive, &base_locks).expect("load base index"),
);
let delta_index = Arc::new(
ReplayDeltaArchiveIndex::load(&delta_archive, &delta_locks).expect("load delta index"),
);
let http = PayloadDeltaReplayHttpFetcher::from_index(delta_index.clone())
.expect("build delta http fetcher");
let rsync = PayloadDeltaReplayRsyncFetcher::new(base_index, delta_index.clone());
let out = sync_publication_point_replay_delta(
&store,
&delta_index,
Some(&fallback_notify),
&module_uri,
&http,
&rsync,
None,
None,
)
.expect("fallback-rsync delta sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 2);
assert_current_object(&store, "rsync://rsync.example.test/repo/a.mft", b"base");
assert_current_object(&store, "rsync://rsync.example.test/repo/sub/x.cer", b"overlay-cer");
}
#[test]
fn delta_replay_sync_rejects_session_reset_and_gap() {
for kind in ["session-reset", "gap"] {
let temp = tempfile::tempdir().expect("tempdir");
let store_dir = temp.path().join("db");
let store = RocksStore::open(&store_dir).expect("open rocksdb");
let (
_fixture,
base_archive,
base_locks,
delta_archive,
delta_locks,
notify_uri,
_fallback_notify,
module_uri,
) = build_delta_replay_fixture();
let base_index = Arc::new(
ReplayArchiveIndex::load(&base_archive, &base_locks).expect("load base index"),
);
let state = RrdpState {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let locks_body = std::fs::read_to_string(&delta_locks).expect("read delta locks");
let rewritten =
locks_body.replace("\"kind\":\"delta\"", &format!("\"kind\":\"{}\"", kind));
std::fs::write(&delta_locks, rewritten).expect("rewrite locks kind");
let repo_hash = sha256_hex(notify_uri.as_bytes());
let repo_dir = delta_archive
.join("v1/captures/delta-cap/rrdp/repos")
.join(&repo_hash);
std::fs::write(
repo_dir.join("transition.json"),
format!(
r#"{{"kind":"{kind}","base":{{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":10}},"target":{{"transport":"rrdp","session":"aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa","serial":12}},"delta_count":2,"deltas":[11,12]}}"#,
),
)
.expect("rewrite transition kind");
let delta_index = Arc::new(
ReplayDeltaArchiveIndex::load(&delta_archive, &delta_locks)
.expect("load delta index"),
);
let http = PayloadDeltaReplayHttpFetcher::from_index(delta_index.clone())
.expect("build delta http fetcher");
let rsync =
PayloadDeltaReplayRsyncFetcher::new(base_index.clone(), delta_index.clone());
let err = sync_publication_point_replay_delta(
&store,
&delta_index,
Some(&notify_uri),
&module_uri,
&http,
&rsync,
None,
None,
)
.unwrap_err();
assert!(matches!(err, RepoSyncError::Replay(_)), "{err}");
}
}
}