rpki/tests/test_apnic_rrdp_delta_live_20260226.rs

316 lines
11 KiB
Rust

use std::cell::RefCell;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use rpki::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use rpki::fetch::rsync::{RsyncFetchError, RsyncFetcher};
use rpki::policy::{CaFailedFetchPolicy, Policy, SyncPreference};
use rpki::storage::RocksStore;
use rpki::sync::repo::{RepoSyncSource, sync_publication_point};
use rpki::sync::rrdp::{Fetcher, parse_notification, sync_from_notification};
use rpki::validation::from_tal::discover_root_ca_instance_from_tal_url;
use rpki::validation::manifest::{PublicationPointSource, process_manifest_publication_point};
const APNIC_TAL_URL: &str = "https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal";
fn persistent_db_dir() -> PathBuf {
if let Ok(s) = std::env::var("RPKI_LIVE_DB_DIR") {
return PathBuf::from(s);
}
PathBuf::from("target/live/apnic_rrdp_db")
}
fn live_http_fetcher() -> BlockingHttpFetcher {
let timeout_secs: u64 = std::env::var("RPKI_LIVE_HTTP_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(15 * 60);
BlockingHttpFetcher::new(HttpFetcherConfig {
timeout: Duration::from_secs(timeout_secs),
user_agent: "rpki-dev/0.1 (stage2 live rrdp delta test)".to_string(),
})
.expect("http fetcher")
}
struct AlwaysFailRsyncFetcher;
impl RsyncFetcher for AlwaysFailRsyncFetcher {
fn fetch_objects(
&self,
_rsync_base_uri: &str,
) -> Result<Vec<(String, Vec<u8>)>, RsyncFetchError> {
Err(RsyncFetchError::Fetch(
"rsync disabled for this test".to_string(),
))
}
}
#[derive(Clone)]
struct CountingDenyUriFetcher {
inner: BlockingHttpFetcher,
deny_uri: String,
counts: std::rc::Rc<RefCell<HashMap<String, u64>>>,
}
impl CountingDenyUriFetcher {
fn new(inner: BlockingHttpFetcher, deny_uri: String) -> Self {
Self {
inner,
deny_uri,
counts: std::rc::Rc::new(RefCell::new(HashMap::new())),
}
}
fn count(&self, uri: &str) -> u64 {
*self.counts.borrow().get(uri).unwrap_or(&0)
}
}
impl Fetcher for CountingDenyUriFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
*self.counts.borrow_mut().entry(uri.to_string()).or_insert(0) += 1;
if uri == self.deny_uri {
return Err(format!("snapshot fetch denied: {uri}"));
}
self.inner.fetch(uri)
}
}
fn live_policy() -> Policy {
let mut p = Policy::default();
p.sync_preference = SyncPreference::RrdpThenRsync;
p.ca_failed_fetch_policy = CaFailedFetchPolicy::ReuseCurrentInstanceVcir;
p
}
#[test]
#[ignore = "live network: APNIC RRDP snapshot bootstrap into persistent RocksDB"]
fn apnic_live_bootstrap_snapshot_and_persist_root_vcir_to_persistent_db() {
let http = live_http_fetcher();
let rsync = AlwaysFailRsyncFetcher;
let db_dir = persistent_db_dir();
std::fs::create_dir_all(&db_dir).expect("create db dir");
let store = RocksStore::open(&db_dir).expect("open rocksdb");
let policy = live_policy();
let validation_time = time::OffsetDateTime::now_utc();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
let rrdp_notification_uri = ca_instance
.rrdp_notification_uri
.as_deref()
.expect("APNIC root must have rrdpNotification");
let sync = sync_publication_point(
&store,
&policy,
Some(rrdp_notification_uri),
&ca_instance.rsync_base_uri,
&http,
&rsync,
None,
None,
)
.expect("repo sync");
assert_eq!(sync.source, RepoSyncSource::Rrdp);
// Build the root publication point and persist the latest VCIR so later runs can
// validate current-instance failed-fetch reuse behavior (RFC 9286 §6.6).
let ta_der = discovery.trust_anchor.ta_certificate.raw_der;
let pp = process_manifest_publication_point(
&store,
&policy,
&ca_instance.manifest_rsync_uri,
&ca_instance.publication_point_rsync_uri,
&ta_der,
None,
validation_time,
)
.expect("process manifest publication point");
assert_eq!(pp.source, PublicationPointSource::Fresh);
let cached = store
.get_vcir(&ca_instance.manifest_rsync_uri)
.expect("get vcir");
assert!(cached.is_some(), "expected VCIR to be stored");
eprintln!(
"OK: bootstrap complete; persistent db at: {}",
db_dir.display()
);
eprintln!(
"Next: run `cargo test --release -q --test test_apnic_rrdp_delta_live_20260226 -- --ignored` later to exercise delta sync."
);
}
#[test]
#[ignore = "live network: waits for APNIC RRDP serial advance, then sync via deltas only (no snapshot) using persistent RocksDB"]
fn apnic_live_delta_only_from_persistent_db() {
let http = live_http_fetcher();
let db_dir = persistent_db_dir();
let store = RocksStore::open(&db_dir).expect("open rocksdb (must have been bootstrapped)");
let policy = live_policy();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
let rrdp_notification_uri = ca_instance
.rrdp_notification_uri
.as_deref()
.expect("APNIC root must have rrdpNotification");
let state_bytes = store
.get_rrdp_state(rrdp_notification_uri)
.expect("get rrdp_state")
.unwrap_or_else(|| {
panic!(
"missing rrdp_state for APNIC notification URI; run bootstrap test first. db_dir={}",
db_dir.display()
)
});
let state = rpki::sync::rrdp::RrdpState::decode(&state_bytes).expect("decode rrdp_state");
let old_serial = state.serial;
let old_session = state.session_id;
let max_wait_secs: u64 = std::env::var("RPKI_LIVE_MAX_WAIT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30 * 60);
let poll_secs: u64 = std::env::var("RPKI_LIVE_POLL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(60);
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(max_wait_secs) {
panic!(
"timed out waiting for APNIC RRDP serial to advance for delta sync; old_session={} old_serial={} waited={}s",
old_session, old_serial, max_wait_secs
);
}
let notif_xml = http
.fetch(rrdp_notification_uri)
.unwrap_or_else(|e| panic!("fetch notification failed: {e}"));
let notif = parse_notification(&notif_xml).expect("parse notification");
if notif.session_id.to_string() != old_session {
panic!(
"RRDP session_id changed; this delta-only test assumes same snapshot baseline. old_session={} new_session={}",
old_session, notif.session_id
);
}
if notif.serial <= old_serial {
eprintln!(
"waiting for serial advance: session={} old_serial={} current_serial={}",
old_session, old_serial, notif.serial
);
std::thread::sleep(Duration::from_secs(poll_secs));
continue;
}
let want_first = old_serial + 1;
let min_delta = notif.deltas.first().map(|d| d.serial).unwrap_or(u64::MAX);
if notif.deltas.is_empty() || min_delta > want_first {
panic!(
"notification deltas do not cover required serial gap for delta-only sync; old_serial={} want_first={} min_delta={} current_serial={}. rerun bootstrap to refresh snapshot baseline.",
old_serial, want_first, min_delta, notif.serial
);
}
// Deny snapshot fetch to ensure we truly test the delta path and keep the stored snapshot
// baseline unchanged.
let deny = notif.snapshot_uri.clone();
let fetcher = CountingDenyUriFetcher::new(http.clone(), deny.clone());
match sync_from_notification(&store, rrdp_notification_uri, &notif_xml, &fetcher) {
Ok(written) => {
assert!(
written > 0,
"expected delta sync to apply changes (written={written})"
);
assert_eq!(
fetcher.count(&deny),
0,
"delta sync should not fetch snapshot"
);
eprintln!(
"OK: delta sync applied: written={} old_serial={} new_serial={}",
written, old_serial, notif.serial
);
break;
}
Err(e) => {
eprintln!("delta sync attempt failed (will retry): {e}");
std::thread::sleep(Duration::from_secs(poll_secs));
}
}
}
// Keep policy variable used, to avoid warnings if this test evolves.
let _ = policy;
}
#[test]
#[ignore = "offline/synthetic: after bootstrap, force repo sync failure and assert current-instance VCIR is reused (RFC 9286 §6.6)"]
fn apnic_root_repo_sync_failure_reuses_current_instance_vcir() {
let http = live_http_fetcher();
let db_dir = persistent_db_dir();
let store = RocksStore::open(&db_dir).expect("open rocksdb (must have been bootstrapped)");
let mut policy = live_policy();
policy.sync_preference = SyncPreference::RrdpThenRsync;
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::ReuseCurrentInstanceVcir;
let validation_time = time::OffsetDateTime::now_utc();
let discovery = discover_root_ca_instance_from_tal_url(&http, APNIC_TAL_URL)
.expect("discover root CA instance from APNIC TAL");
let ca_instance = discovery.ca_instance;
// Ensure current-instance VCIR exists (created by bootstrap).
let cached = store
.get_vcir(&ca_instance.manifest_rsync_uri)
.expect("get vcir");
assert!(
cached.is_some(),
"missing VCIR; run bootstrap test first. db_dir={}",
db_dir.display()
);
// Simulate repo sync failure: skip calling sync_publication_point and directly drive manifest
// processing with repo_sync_ok=false.
let ta_der = discovery.trust_anchor.ta_certificate.raw_der;
let pp = rpki::validation::manifest::process_manifest_publication_point_after_repo_sync(
&store,
&policy,
&ca_instance.manifest_rsync_uri,
&ca_instance.publication_point_rsync_uri,
&ta_der,
None,
validation_time,
false,
Some("synthetic repo sync failure"),
)
.expect("must reuse current-instance VCIR");
assert_eq!(pp.source, PublicationPointSource::VcirCurrentInstance);
assert!(
pp.warnings.iter().any(|w| w
.message
.contains("using latest validated result for current CA instance")),
"expected current-instance VCIR reuse warning"
);
}