rpki/tests/test_repo_sync_m6.rs
2026-02-09 19:35:54 +08:00

271 lines
7.7 KiB
Rust

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use rpki::fetch::rsync::{LocalDirRsyncFetcher, RsyncFetchError, RsyncFetcher};
use rpki::policy::{Policy, SyncPreference};
use rpki::storage::RocksStore;
use rpki::sync::repo::{RepoSyncSource, sync_publication_point};
use rpki::sync::rrdp::Fetcher;
struct MapFetcher {
by_uri: HashMap<String, Vec<u8>>,
}
impl Fetcher for MapFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
self.by_uri
.get(uri)
.cloned()
.ok_or_else(|| format!("not found: {uri}"))
}
}
struct CountingRsyncFetcher<F> {
inner: F,
calls: Arc<Mutex<usize>>,
}
impl<F> CountingRsyncFetcher<F> {
fn new(inner: F) -> (Self, Arc<Mutex<usize>>) {
let calls = Arc::new(Mutex::new(0usize));
(
Self {
inner,
calls: calls.clone(),
},
calls,
)
}
}
impl<F: RsyncFetcher> RsyncFetcher for CountingRsyncFetcher<F> {
fn fetch_objects(
&self,
rsync_base_uri: &str,
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
*self.calls.lock().unwrap() += 1;
self.inner.fetch_objects(rsync_base_uri)
}
}
#[test]
fn repo_sync_uses_rrdp_when_available() {
let notification_xml =
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
let snapshot_xml = std::fs::read("tests/fixtures/rrdp/snapshot.xml").expect("read snapshot");
let http_fetcher = MapFetcher {
by_uri: HashMap::from([
(
"https://example.net/rrdp/notification.xml".to_string(),
notification_xml,
),
(
"https://example.net/rrdp/snapshot.xml".to_string(),
snapshot_xml,
),
]),
};
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let local_repo = tempfile::tempdir().expect("local repo dir");
std::fs::write(local_repo.path().join("x.cer"), b"x").unwrap();
let (rsync_fetcher, calls) =
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
let policy = Policy::default();
let out = sync_publication_point(
&store,
&policy,
Some("https://example.net/rrdp/notification.xml"),
"rsync://example.net/repo/",
&http_fetcher,
&rsync_fetcher,
)
.expect("sync");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 2);
assert_eq!(*calls.lock().unwrap(), 0);
assert_eq!(
store
.get_raw("rsync://example.net/repo/obj1.cer")
.unwrap()
.unwrap(),
b"abc"
);
}
#[test]
fn repo_sync_skips_snapshot_when_state_unchanged() {
let notification_xml =
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
let snapshot_xml = std::fs::read("tests/fixtures/rrdp/snapshot.xml").expect("read snapshot");
let http_fetcher = MapFetcher {
by_uri: HashMap::from([
(
"https://example.net/rrdp/notification.xml".to_string(),
notification_xml,
),
(
"https://example.net/rrdp/snapshot.xml".to_string(),
snapshot_xml,
),
]),
};
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let local_repo = tempfile::tempdir().expect("local repo dir");
std::fs::write(local_repo.path().join("x.cer"), b"x").unwrap();
let (rsync_fetcher, calls) =
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
let policy = Policy::default();
let out1 = sync_publication_point(
&store,
&policy,
Some("https://example.net/rrdp/notification.xml"),
"rsync://example.net/repo/",
&http_fetcher,
&rsync_fetcher,
)
.expect("sync 1");
assert_eq!(out1.source, RepoSyncSource::Rrdp);
assert_eq!(out1.objects_written, 2);
let out2 = sync_publication_point(
&store,
&policy,
Some("https://example.net/rrdp/notification.xml"),
"rsync://example.net/repo/",
&http_fetcher,
&rsync_fetcher,
)
.expect("sync 2");
assert_eq!(out2.source, RepoSyncSource::Rrdp);
assert_eq!(
out2.objects_written, 0,
"expected to skip snapshot apply when state unchanged"
);
assert_eq!(
*calls.lock().unwrap(),
0,
"expected no rsync fallback calls"
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/obj1.cer")
.unwrap()
.unwrap(),
b"abc"
);
}
#[test]
fn repo_sync_falls_back_to_rsync_on_rrdp_failure() {
// Provide notification, but omit snapshot, causing RRDP fetch failure.
let notification_xml =
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
let http_fetcher = MapFetcher {
by_uri: HashMap::from([(
"https://example.net/rrdp/notification.xml".to_string(),
notification_xml,
)]),
};
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let local_repo = tempfile::tempdir().expect("local repo dir");
std::fs::create_dir_all(local_repo.path().join("sub")).unwrap();
std::fs::write(local_repo.path().join("sub/obj.cer"), b"hello").unwrap();
let (rsync_fetcher, calls) =
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
let policy = Policy::default();
let out = sync_publication_point(
&store,
&policy,
Some("https://example.net/rrdp/notification.xml"),
"rsync://example.net/repo/",
&http_fetcher,
&rsync_fetcher,
)
.expect("fallback sync");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 1);
assert_eq!(*calls.lock().unwrap(), 1);
assert!(!out.warnings.is_empty());
assert!(
out.warnings[0]
.rfc_refs
.iter()
.any(|r| r.0 == "RFC 8182 §3.4.5")
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/sub/obj.cer")
.unwrap()
.unwrap(),
b"hello"
);
}
#[test]
fn repo_sync_rsync_populates_raw_objects() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
let local_repo = tempfile::tempdir().expect("local repo dir");
std::fs::create_dir_all(local_repo.path().join("a/b")).unwrap();
std::fs::write(local_repo.path().join("a/one.cer"), b"1").unwrap();
std::fs::write(local_repo.path().join("a/b/two.crl"), b"2").unwrap();
let http_fetcher = MapFetcher {
by_uri: HashMap::new(),
};
let rsync_fetcher = LocalDirRsyncFetcher::new(local_repo.path());
let mut policy = Policy::default();
policy.sync_preference = SyncPreference::RsyncOnly;
let out = sync_publication_point(
&store,
&policy,
None,
"rsync://example.net/repo/",
&http_fetcher,
&rsync_fetcher,
)
.expect("rsync-only sync");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 2);
assert_eq!(
store
.get_raw("rsync://example.net/repo/a/one.cer")
.unwrap()
.unwrap(),
b"1"
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/a/b/two.crl")
.unwrap()
.unwrap(),
b"2"
);
}