use std::collections::BTreeMap; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::replay::delta_archive::{ ReplayDeltaArchiveError, ReplayDeltaArchiveIndex, ReplayDeltaRrdpKind, }; use crate::sync::rrdp::{Fetcher, parse_notification}; #[derive(Debug, thiserror::Error)] pub enum PayloadDeltaReplayHttpFetcherError { #[error(transparent)] DeltaIndex(#[from] ReplayDeltaArchiveError), #[error("read delta replay RRDP file failed: {path}: {detail}")] ReadFile { path: String, detail: String }, #[error("parse target notification failed for {notify_uri}: {detail}")] ParseNotification { notify_uri: String, detail: String }, #[error( "target notification session/serial mismatch for {notify_uri}: expected session={expected_session} serial={expected_serial}, got session={actual_session} serial={actual_serial}" )] NotificationTargetMismatch { notify_uri: String, expected_session: String, expected_serial: u64, actual_session: String, actual_serial: u64, }, #[error( "delta serial list mismatch between target notification and transition for {notify_uri}" )] DeltaSerialMismatch { notify_uri: String }, #[error("duplicate delta replay HTTP URI mapping for {uri}: {first_path} vs {second_path}")] DuplicateUriMapping { uri: String, first_path: String, second_path: String, }, #[error( "delta replay notification URI is {kind} and should not be fetched as RRDP: {notify_uri}" )] NotificationKindNotFetchable { notify_uri: String, kind: String }, #[error("delta replay HTTP URI not found in archive: {0}")] MissingUri(String), } #[derive(Clone, Debug)] pub struct PayloadDeltaReplayHttpFetcher { index: Arc, routes: BTreeMap, repo_kinds: BTreeMap, } impl PayloadDeltaReplayHttpFetcher { pub fn new( index: Arc, ) -> Result { let mut routes = BTreeMap::new(); let mut repo_kinds = BTreeMap::new(); for (notify_uri, repo) in &index.rrdp_repos { repo_kinds.insert(notify_uri.clone(), repo.transition.kind); if repo.transition.kind != ReplayDeltaRrdpKind::Delta { continue; } let notification_path = repo .target_notification_path .as_ref() .expect("delta repo target notification indexed"); insert_unique_route(&mut routes, notify_uri, notification_path)?; let notification_xml = fs::read(notification_path).map_err(|e| { PayloadDeltaReplayHttpFetcherError::ReadFile { path: notification_path.display().to_string(), detail: e.to_string(), } })?; let notification = parse_notification(¬ification_xml).map_err(|e| { PayloadDeltaReplayHttpFetcherError::ParseNotification { notify_uri: notify_uri.clone(), detail: e.to_string(), } })?; let expected_session = repo.transition.target.session.as_deref().unwrap_or(""); let expected_serial = repo.transition.target.serial.unwrap_or_default(); let actual_session = notification.session_id.to_string(); if actual_session != expected_session || notification.serial != expected_serial { return Err( PayloadDeltaReplayHttpFetcherError::NotificationTargetMismatch { notify_uri: notify_uri.clone(), expected_session: expected_session.to_string(), expected_serial, actual_session, actual_serial: notification.serial, }, ); } let transition_serials = repo .delta_paths .iter() .map(|(serial, _)| *serial) .collect::>(); let mut notification_delta_map = BTreeMap::new(); for dref in notification.deltas { notification_delta_map.insert(dref.serial, dref.uri); } for serial in &transition_serials { if !notification_delta_map.contains_key(serial) { return Err(PayloadDeltaReplayHttpFetcherError::DeltaSerialMismatch { notify_uri: notify_uri.clone(), }); } } for (serial, path) in &repo.delta_paths { let uri = notification_delta_map .get(serial) .expect("delta uri present for transition serial"); insert_unique_route(&mut routes, uri, path)?; } } Ok(Self { index, routes, repo_kinds, }) } pub fn from_index( index: Arc, ) -> Result { Self::new(index) } pub fn archive_index(&self) -> &ReplayDeltaArchiveIndex { self.index.as_ref() } } impl Fetcher for PayloadDeltaReplayHttpFetcher { fn fetch(&self, uri: &str) -> Result, String> { if let Some(path) = self.routes.get(uri) { return fs::read(path).map_err(|e| { PayloadDeltaReplayHttpFetcherError::ReadFile { path: path.display().to_string(), detail: e.to_string(), } .to_string() }); } if let Some(kind) = self.repo_kinds.get(uri) { return Err( PayloadDeltaReplayHttpFetcherError::NotificationKindNotFetchable { notify_uri: uri.to_string(), kind: kind.as_str().to_string(), } .to_string(), ); } Err(PayloadDeltaReplayHttpFetcherError::MissingUri(uri.to_string()).to_string()) } } fn insert_unique_route( routes: &mut BTreeMap, uri: &str, path: &Path, ) -> Result<(), PayloadDeltaReplayHttpFetcherError> { if let Some(existing) = routes.get(uri) { if existing != path { return Err(PayloadDeltaReplayHttpFetcherError::DuplicateUriMapping { uri: uri.to_string(), first_path: existing.display().to_string(), second_path: path.display().to_string(), }); } return Ok(()); } routes.insert(uri.to_string(), path.to_path_buf()); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::replay::archive::sha256_hex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex; fn build_delta_http_fixture( kind: ReplayDeltaRrdpKind, ) -> (tempfile::TempDir, PathBuf, PathBuf, String, String, String) { let temp = tempfile::tempdir().expect("tempdir"); let archive_root = temp.path().join("payload-delta-archive"); let capture = "delta-http"; let capture_root = archive_root.join("v1").join("captures").join(capture); std::fs::create_dir_all(&capture_root).expect("mkdir capture root"); std::fs::write( capture_root.join("capture.json"), format!(r#"{{"version":1,"captureId":"{capture}","createdAt":"2026-03-16T00:00:00Z","notes":""}}"#), ) .expect("write capture meta"); std::fs::write( capture_root.join("base.json"), r#"{"version":1,"baseCapture":"base-cap","baseLocksSha256":"deadbeef","createdAt":"2026-03-16T00:00:00Z"}"#, ) .expect("write base meta"); let notify_uri = "https://rrdp.example.test/notification.xml".to_string(); let snapshot_uri = "https://rrdp.example.test/snapshot.xml".to_string(); let delta1_uri = "https://rrdp.example.test/d1.xml".to_string(); let delta2_uri = "https://rrdp.example.test/d2.xml".to_string(); let session = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(); let target_serial = 12u64; let repo_hash = sha256_hex(notify_uri.as_bytes()); let session_dir = capture_root .join("rrdp/repos") .join(&repo_hash) .join(&session); let deltas_dir = session_dir.join("deltas"); std::fs::create_dir_all(&deltas_dir).expect("mkdir deltas dir"); std::fs::write( session_dir.parent().unwrap().join("meta.json"), format!(r#"{{"version":1,"rpkiNotify":"{notify_uri}","createdAt":"2026-03-16T00:00:00Z","lastSeenAt":"2026-03-16T00:00:01Z"}}"#), ) .expect("write repo meta"); let notification_xml = format!( r#" "# ); std::fs::write( session_dir.join("notification-target-12.xml"), notification_xml, ) .expect("write target notification"); std::fs::write( deltas_dir.join("delta-11-aaaa.xml"), b"", ) .expect("write delta1"); std::fs::write( deltas_dir.join("delta-12-bbbb.xml"), b"", ) .expect("write delta2"); std::fs::write( session_dir.parent().unwrap().join("transition.json"), format!( r#"{{"kind":"{}","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}}"#, kind.as_str() ), ) .expect("write transition"); let locks_path = temp.path().join("locks-delta.json"); std::fs::write( &locks_path, format!( r#"{{"version":1,"capture":"{capture}","baseCapture":"base-cap","baseLocksSha256":"deadbeef","rrdp":{{"{notify_uri}":{{"kind":"{}","base":{{"transport":"rrdp","session":"{session}","serial":10}},"target":{{"transport":"rrdp","session":"{session}","serial":12}},"delta_count":2,"deltas":[11,12]}}}},"rsync":{{}}}}"#, kind.as_str() ), ) .expect("write locks"); ( temp, archive_root, locks_path, notify_uri, delta1_uri, delta2_uri, ) } #[test] fn delta_http_fetcher_rejects_session_reset_and_gap_notification_kind() { for kind in [ReplayDeltaRrdpKind::SessionReset, ReplayDeltaRrdpKind::Gap] { let (_temp, archive_root, locks_path, notify_uri, _delta1_uri, _delta2_uri) = build_delta_http_fixture(kind); let index = Arc::new( ReplayDeltaArchiveIndex::load(&archive_root, &locks_path) .expect("load delta index"), ); let fetcher = PayloadDeltaReplayHttpFetcher::from_index(index).expect("build fetcher"); let err = fetcher.fetch(¬ify_uri).unwrap_err(); assert!(err.contains(kind.as_str()), "{err}"); } } #[test] fn delta_http_fetcher_reads_target_notification_and_delta_files() { let (_temp, archive_root, locks_path, notify_uri, delta1_uri, delta2_uri) = build_delta_http_fixture(ReplayDeltaRrdpKind::Delta); let index = Arc::new( ReplayDeltaArchiveIndex::load(&archive_root, &locks_path).expect("load delta index"), ); let fetcher = PayloadDeltaReplayHttpFetcher::from_index(index).expect("build fetcher"); let notification = fetcher.fetch(¬ify_uri).expect("fetch notification"); assert!( std::str::from_utf8(¬ification) .unwrap() .contains("notification") ); assert_eq!( fetcher.fetch(&delta1_uri).expect("fetch delta1"), b"".to_vec() ); assert_eq!( fetcher.fetch(&delta2_uri).expect("fetch delta2"), b"".to_vec() ); } #[test] fn delta_http_fetcher_rejects_non_delta_notification_kinds_and_missing_uri() { let (_temp, archive_root, locks_path, notify_uri, _delta1_uri, _delta2_uri) = build_delta_http_fixture(ReplayDeltaRrdpKind::Unchanged); let index = Arc::new( ReplayDeltaArchiveIndex::load(&archive_root, &locks_path).expect("load delta index"), ); let fetcher = PayloadDeltaReplayHttpFetcher::from_index(index).expect("build fetcher"); let err = fetcher.fetch(¬ify_uri).unwrap_err(); assert!(err.contains("unchanged"), "{err}"); let err = fetcher .fetch("https://missing.example/test.xml") .unwrap_err(); assert!(err.contains("not found in archive"), "{err}"); } #[test] fn delta_http_fetcher_rejects_target_notification_mismatch() { let (_temp, archive_root, locks_path, notify_uri, _delta1_uri, _delta2_uri) = build_delta_http_fixture(ReplayDeltaRrdpKind::Delta); let repo_hash = sha256_hex(notify_uri.as_bytes()); let notification = archive_root .join("v1/captures/delta-http/rrdp/repos") .join(repo_hash) .join("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") .join("notification-target-12.xml"); std::fs::write( ¬ification, r#" "#, ) .expect("rewrite notification"); let index = Arc::new( ReplayDeltaArchiveIndex::load(&archive_root, &locks_path).expect("load delta index"), ); let err = PayloadDeltaReplayHttpFetcher::from_index(index).unwrap_err(); assert!( matches!( err, PayloadDeltaReplayHttpFetcherError::NotificationTargetMismatch { .. } ), "{err}" ); } }