271 lines
7.7 KiB
Rust
271 lines
7.7 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use rpki::fetch::rsync::{LocalDirRsyncFetcher, RsyncFetchError, RsyncFetcher};
|
|
use rpki::policy::{Policy, SyncPreference};
|
|
use rpki::storage::RocksStore;
|
|
use rpki::sync::repo::{RepoSyncSource, sync_publication_point};
|
|
use rpki::sync::rrdp::Fetcher;
|
|
|
|
struct MapFetcher {
|
|
by_uri: HashMap<String, Vec<u8>>,
|
|
}
|
|
|
|
impl Fetcher for MapFetcher {
|
|
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
|
|
self.by_uri
|
|
.get(uri)
|
|
.cloned()
|
|
.ok_or_else(|| format!("not found: {uri}"))
|
|
}
|
|
}
|
|
|
|
struct CountingRsyncFetcher<F> {
|
|
inner: F,
|
|
calls: Arc<Mutex<usize>>,
|
|
}
|
|
|
|
impl<F> CountingRsyncFetcher<F> {
|
|
fn new(inner: F) -> (Self, Arc<Mutex<usize>>) {
|
|
let calls = Arc::new(Mutex::new(0usize));
|
|
(
|
|
Self {
|
|
inner,
|
|
calls: calls.clone(),
|
|
},
|
|
calls,
|
|
)
|
|
}
|
|
}
|
|
|
|
impl<F: RsyncFetcher> RsyncFetcher for CountingRsyncFetcher<F> {
|
|
fn fetch_objects(
|
|
&self,
|
|
rsync_base_uri: &str,
|
|
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
|
|
*self.calls.lock().unwrap() += 1;
|
|
self.inner.fetch_objects(rsync_base_uri)
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn repo_sync_uses_rrdp_when_available() {
|
|
let notification_xml =
|
|
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
|
|
let snapshot_xml = std::fs::read("tests/fixtures/rrdp/snapshot.xml").expect("read snapshot");
|
|
|
|
let http_fetcher = MapFetcher {
|
|
by_uri: HashMap::from([
|
|
(
|
|
"https://example.net/rrdp/notification.xml".to_string(),
|
|
notification_xml,
|
|
),
|
|
(
|
|
"https://example.net/rrdp/snapshot.xml".to_string(),
|
|
snapshot_xml,
|
|
),
|
|
]),
|
|
};
|
|
|
|
let temp = tempfile::tempdir().expect("tempdir");
|
|
let store = RocksStore::open(temp.path()).expect("open rocksdb");
|
|
|
|
let local_repo = tempfile::tempdir().expect("local repo dir");
|
|
std::fs::write(local_repo.path().join("x.cer"), b"x").unwrap();
|
|
let (rsync_fetcher, calls) =
|
|
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
|
|
|
|
let policy = Policy::default();
|
|
let out = sync_publication_point(
|
|
&store,
|
|
&policy,
|
|
Some("https://example.net/rrdp/notification.xml"),
|
|
"rsync://example.net/repo/",
|
|
&http_fetcher,
|
|
&rsync_fetcher,
|
|
)
|
|
.expect("sync");
|
|
|
|
assert_eq!(out.source, RepoSyncSource::Rrdp);
|
|
assert_eq!(out.objects_written, 2);
|
|
assert_eq!(*calls.lock().unwrap(), 0);
|
|
|
|
assert_eq!(
|
|
store
|
|
.get_raw("rsync://example.net/repo/obj1.cer")
|
|
.unwrap()
|
|
.unwrap(),
|
|
b"abc"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn repo_sync_skips_snapshot_when_state_unchanged() {
|
|
let notification_xml =
|
|
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
|
|
let snapshot_xml = std::fs::read("tests/fixtures/rrdp/snapshot.xml").expect("read snapshot");
|
|
|
|
let http_fetcher = MapFetcher {
|
|
by_uri: HashMap::from([
|
|
(
|
|
"https://example.net/rrdp/notification.xml".to_string(),
|
|
notification_xml,
|
|
),
|
|
(
|
|
"https://example.net/rrdp/snapshot.xml".to_string(),
|
|
snapshot_xml,
|
|
),
|
|
]),
|
|
};
|
|
|
|
let temp = tempfile::tempdir().expect("tempdir");
|
|
let store = RocksStore::open(temp.path()).expect("open rocksdb");
|
|
|
|
let local_repo = tempfile::tempdir().expect("local repo dir");
|
|
std::fs::write(local_repo.path().join("x.cer"), b"x").unwrap();
|
|
let (rsync_fetcher, calls) =
|
|
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
|
|
|
|
let policy = Policy::default();
|
|
|
|
let out1 = sync_publication_point(
|
|
&store,
|
|
&policy,
|
|
Some("https://example.net/rrdp/notification.xml"),
|
|
"rsync://example.net/repo/",
|
|
&http_fetcher,
|
|
&rsync_fetcher,
|
|
)
|
|
.expect("sync 1");
|
|
assert_eq!(out1.source, RepoSyncSource::Rrdp);
|
|
assert_eq!(out1.objects_written, 2);
|
|
|
|
let out2 = sync_publication_point(
|
|
&store,
|
|
&policy,
|
|
Some("https://example.net/rrdp/notification.xml"),
|
|
"rsync://example.net/repo/",
|
|
&http_fetcher,
|
|
&rsync_fetcher,
|
|
)
|
|
.expect("sync 2");
|
|
assert_eq!(out2.source, RepoSyncSource::Rrdp);
|
|
assert_eq!(
|
|
out2.objects_written, 0,
|
|
"expected to skip snapshot apply when state unchanged"
|
|
);
|
|
assert_eq!(
|
|
*calls.lock().unwrap(),
|
|
0,
|
|
"expected no rsync fallback calls"
|
|
);
|
|
|
|
assert_eq!(
|
|
store
|
|
.get_raw("rsync://example.net/repo/obj1.cer")
|
|
.unwrap()
|
|
.unwrap(),
|
|
b"abc"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn repo_sync_falls_back_to_rsync_on_rrdp_failure() {
|
|
// Provide notification, but omit snapshot, causing RRDP fetch failure.
|
|
let notification_xml =
|
|
std::fs::read("tests/fixtures/rrdp/notification.xml").expect("read notification");
|
|
let http_fetcher = MapFetcher {
|
|
by_uri: HashMap::from([(
|
|
"https://example.net/rrdp/notification.xml".to_string(),
|
|
notification_xml,
|
|
)]),
|
|
};
|
|
|
|
let temp = tempfile::tempdir().expect("tempdir");
|
|
let store = RocksStore::open(temp.path()).expect("open rocksdb");
|
|
|
|
let local_repo = tempfile::tempdir().expect("local repo dir");
|
|
std::fs::create_dir_all(local_repo.path().join("sub")).unwrap();
|
|
std::fs::write(local_repo.path().join("sub/obj.cer"), b"hello").unwrap();
|
|
|
|
let (rsync_fetcher, calls) =
|
|
CountingRsyncFetcher::new(LocalDirRsyncFetcher::new(local_repo.path()));
|
|
|
|
let policy = Policy::default();
|
|
let out = sync_publication_point(
|
|
&store,
|
|
&policy,
|
|
Some("https://example.net/rrdp/notification.xml"),
|
|
"rsync://example.net/repo/",
|
|
&http_fetcher,
|
|
&rsync_fetcher,
|
|
)
|
|
.expect("fallback sync");
|
|
|
|
assert_eq!(out.source, RepoSyncSource::Rsync);
|
|
assert_eq!(out.objects_written, 1);
|
|
assert_eq!(*calls.lock().unwrap(), 1);
|
|
assert!(!out.warnings.is_empty());
|
|
assert!(
|
|
out.warnings[0]
|
|
.rfc_refs
|
|
.iter()
|
|
.any(|r| r.0 == "RFC 8182 §3.4.5")
|
|
);
|
|
|
|
assert_eq!(
|
|
store
|
|
.get_raw("rsync://example.net/repo/sub/obj.cer")
|
|
.unwrap()
|
|
.unwrap(),
|
|
b"hello"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn repo_sync_rsync_populates_raw_objects() {
|
|
let temp = tempfile::tempdir().expect("tempdir");
|
|
let store = RocksStore::open(temp.path()).expect("open rocksdb");
|
|
|
|
let local_repo = tempfile::tempdir().expect("local repo dir");
|
|
std::fs::create_dir_all(local_repo.path().join("a/b")).unwrap();
|
|
std::fs::write(local_repo.path().join("a/one.cer"), b"1").unwrap();
|
|
std::fs::write(local_repo.path().join("a/b/two.crl"), b"2").unwrap();
|
|
|
|
let http_fetcher = MapFetcher {
|
|
by_uri: HashMap::new(),
|
|
};
|
|
let rsync_fetcher = LocalDirRsyncFetcher::new(local_repo.path());
|
|
|
|
let mut policy = Policy::default();
|
|
policy.sync_preference = SyncPreference::RsyncOnly;
|
|
|
|
let out = sync_publication_point(
|
|
&store,
|
|
&policy,
|
|
None,
|
|
"rsync://example.net/repo/",
|
|
&http_fetcher,
|
|
&rsync_fetcher,
|
|
)
|
|
.expect("rsync-only sync");
|
|
|
|
assert_eq!(out.source, RepoSyncSource::Rsync);
|
|
assert_eq!(out.objects_written, 2);
|
|
|
|
assert_eq!(
|
|
store
|
|
.get_raw("rsync://example.net/repo/a/one.cer")
|
|
.unwrap()
|
|
.unwrap(),
|
|
b"1"
|
|
);
|
|
assert_eq!(
|
|
store
|
|
.get_raw("rsync://example.net/repo/a/b/two.crl")
|
|
.unwrap()
|
|
.unwrap(),
|
|
b"2"
|
|
);
|
|
}
|