use std::collections::HashSet; use std::sync::{Arc, Mutex}; use std::time::Duration; use crate::parallel::repo_scheduler::TransportRequestAction; use crate::parallel::repo_worker::{RepoTransportExecutor, RepoTransportWorkerPool}; use crate::parallel::run_coordinator::GlobalRunCoordinator; use crate::parallel::transport_prefetch::{ TransportPrefetchDispatchStats, TransportPrefetchRecorder, TransportPrefetchSnapshot, }; use crate::parallel::types::{ RepoIdentity, RepoRequester, RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, }; use crate::policy::SyncPreference; use crate::report::Warning; use crate::validation::tree::{CaInstanceHandle, DiscoveredChildCaInstance}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct RepoSyncRuntimeOutcome { pub repo_sync_ok: bool, pub repo_sync_err: Option, pub repo_sync_source: Option, pub repo_sync_phase: Option, pub repo_sync_duration_ms: u64, pub warnings: Vec, } #[derive(Clone, Debug, PartialEq, Eq)] pub enum RepoSyncRequestStatus { Ready { identity: RepoIdentity, outcome: RepoSyncRuntimeOutcome, }, Pending { identity: RepoIdentity, state: RepoRuntimeState, }, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct RepoSyncRuntimeCompletion { pub identity: RepoIdentity, pub state: RepoRuntimeState, pub outcome: RepoSyncRuntimeOutcome, } #[derive(Clone, Debug, PartialEq, Eq)] pub struct RepoSyncRuntimeEvent { pub transport_identity: RepoIdentity, pub completions: Vec, } pub trait RepoSyncRuntime: Send + Sync { fn sync_publication_point_repo( &self, ca: &CaInstanceHandle, ) -> Result; fn request_publication_point_repo( &self, ca: &CaInstanceHandle, priority: u8, ) -> Result; fn recv_repo_result_timeout( &self, timeout: Duration, ) -> Result, String>; fn drain_repo_results_timeout( &self, timeout: Duration, max_events: usize, ) -> Result, String> { let max_events = max_events.max(1); let mut events = Vec::new(); for index in 0..max_events { let poll_timeout = if index == 0 { timeout } else { Duration::from_millis(0) }; let Some(event) = self.recv_repo_result_timeout(poll_timeout)? else { break; }; events.push(event); } Ok(events) } fn reset_run_state(&self) -> Result<(), String>; fn prefetch_discovered_children( &self, children: &[DiscoveredChildCaInstance], ) -> Result<(), String>; fn prefetch_transport_requests( &self, snapshot: &TransportPrefetchSnapshot, validation_time: time::OffsetDateTime, ) -> Result; fn transport_prefetch_snapshot(&self) -> TransportPrefetchSnapshot; } pub struct Phase1RepoSyncRuntime { coordinator: Mutex, worker_pool: Mutex>, transport_prefetch_recorder: Option>, retry_short_rsync_scopes: Mutex>, rsync_scope_resolver: Arc String + Send + Sync>, rsync_failure_scope_resolver: Arc Option + Send + Sync>, sync_preference: SyncPreference, } impl Phase1RepoSyncRuntime { pub fn new( coordinator: GlobalRunCoordinator, worker_pool: RepoTransportWorkerPool, rsync_scope_resolver: Arc String + Send + Sync>, sync_preference: SyncPreference, ) -> Self { Self::new_with_failure_scope( coordinator, worker_pool, rsync_scope_resolver, Arc::new(|_base: &str| None), sync_preference, ) } pub fn new_with_failure_scope( coordinator: GlobalRunCoordinator, worker_pool: RepoTransportWorkerPool, rsync_scope_resolver: Arc String + Send + Sync>, rsync_failure_scope_resolver: Arc Option + Send + Sync>, sync_preference: SyncPreference, ) -> Self { Self { coordinator: Mutex::new(coordinator), worker_pool: Mutex::new(worker_pool), transport_prefetch_recorder: None, retry_short_rsync_scopes: Mutex::new(HashSet::new()), rsync_scope_resolver, rsync_failure_scope_resolver, sync_preference, } } pub fn new_with_failure_scope_and_prefetch_recording( coordinator: GlobalRunCoordinator, worker_pool: RepoTransportWorkerPool, rsync_scope_resolver: Arc String + Send + Sync>, rsync_failure_scope_resolver: Arc Option + Send + Sync>, sync_preference: SyncPreference, record_transport_prefetch_requests: bool, ) -> Self { Self { coordinator: Mutex::new(coordinator), worker_pool: Mutex::new(worker_pool), transport_prefetch_recorder: record_transport_prefetch_requests .then(|| Mutex::new(TransportPrefetchRecorder::default())), retry_short_rsync_scopes: Mutex::new(HashSet::new()), rsync_scope_resolver, rsync_failure_scope_resolver, sync_preference, } } fn build_requester(ca: &CaInstanceHandle) -> RepoRequester { RepoRequester { tal_id: ca.tal_id.clone(), rir_id: ca.tal_id.clone(), parent_node_id: None, ca_instance_handle_id: format!("{}:{}", ca.tal_id, ca.manifest_rsync_uri), publication_point_rsync_uri: ca.publication_point_rsync_uri.clone(), manifest_rsync_uri: ca.manifest_rsync_uri.clone(), } } fn build_identity(ca: &CaInstanceHandle) -> RepoIdentity { RepoIdentity::new(ca.rrdp_notification_uri.clone(), ca.rsync_base_uri.clone()) } fn request_transport_for_ca( &self, ca: &CaInstanceHandle, priority: u8, ) -> Result { let identity = Self::build_identity(ca); let requester = Self::build_requester(ca); let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri); let rsync_failure_scope_uri = (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri); if let Some(recorder) = self.transport_prefetch_recorder.as_ref() { let mut recorder = recorder .lock() .expect("transport prefetch recorder lock poisoned"); recorder.record_registered_request( &identity, &requester, priority, rsync_scope_uri.clone(), rsync_failure_scope_uri.clone(), self.sync_preference, ); } let action = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.register_transport_request( identity.clone(), requester, time::OffsetDateTime::now_utc(), priority, rsync_scope_uri, rsync_failure_scope_uri, self.sync_preference, false, ) }; match action { TransportRequestAction::Enqueue(task) => { crate::progress_log::emit( "phase1_repo_task_enqueued", serde_json::json!({ "manifest_rsync_uri": ca.manifest_rsync_uri, "publication_point_rsync_uri": ca.publication_point_rsync_uri, "repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri, "rsync_failure_scope_uri": task.rsync_failure_scope_uri, "repo_key_notification_uri": task.repo_identity.notification_uri, "priority": priority, "transport_mode": match task.mode { RepoTransportMode::Rrdp => "rrdp", RepoTransportMode::Rsync => "rsync", }, }), ); self.drain_pending_transport_tasks()?; Ok(RepoSyncRequestStatus::Pending { identity, state: self .runtime_state_for_identity(&task.repo_identity) .unwrap_or(RepoRuntimeState::WaitingRrdp), }) } TransportRequestAction::Waiting { state } => { crate::progress_log::emit( "phase1_repo_task_waiting", serde_json::json!({ "manifest_rsync_uri": ca.manifest_rsync_uri, "publication_point_rsync_uri": ca.publication_point_rsync_uri, "repo_key_rsync_base_uri": identity.rsync_base_uri, "rsync_failure_scope_uri": (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri), "repo_key_notification_uri": identity.notification_uri, "priority": priority, "runtime_state": format!("{state:?}"), }), ); Ok(RepoSyncRequestStatus::Pending { identity, state }) } TransportRequestAction::ReusedSuccess(result) | TransportRequestAction::ReusedTerminalFailure(result) => { crate::progress_log::emit( "phase1_repo_task_reused", serde_json::json!({ "manifest_rsync_uri": ca.manifest_rsync_uri, "publication_point_rsync_uri": ca.publication_point_rsync_uri, "repo_key_rsync_base_uri": identity.rsync_base_uri, "rsync_failure_scope_uri": result.rsync_failure_scope_uri, "repo_key_notification_uri": identity.notification_uri, "priority": priority, "transport_mode": match result.mode { RepoTransportMode::Rrdp => "rrdp", RepoTransportMode::Rsync => "rsync", }, }), ); Ok(RepoSyncRequestStatus::Ready { outcome: outcome_from_transport_result( &result, self.runtime_state_for_identity(&identity) .unwrap_or(RepoRuntimeState::Init), ), identity, }) } } } fn drain_pending_transport_tasks(&self) -> Result<(), String> { loop { let maybe_task = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.pop_next_transport_task() }; let Some(task) = maybe_task else { break; }; { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator .mark_transport_running(&task.dedup_key, time::OffsetDateTime::now_utc())?; } crate::progress_log::emit( "phase1_repo_task_dispatched", serde_json::json!({ "repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri, "rsync_failure_scope_uri": task.rsync_failure_scope_uri, "repo_key_notification_uri": task.repo_identity.notification_uri, "requester_count": task.requesters.len(), "priority": task.priority, "transport_mode": match task.mode { RepoTransportMode::Rrdp => "rrdp", RepoTransportMode::Rsync => "rsync", }, }), ); let pool = self.worker_pool.lock().expect("worker pool lock poisoned"); pool.submit(task)?; } Ok(()) } fn pump_one_transport_result( &self, timeout: Duration, ) -> Result, String> { let envelope = { let pool = self.worker_pool.lock().expect("worker pool lock poisoned"); pool.recv_result_timeout(timeout)? }; let Some(envelope) = envelope else { return Ok(None); }; let transport_identity = envelope.repo_identity.clone(); let completed_envelope = envelope.clone(); if let Some(recorder) = self.transport_prefetch_recorder.as_ref() { recorder .lock() .expect("transport prefetch recorder lock poisoned") .record_result(&envelope); } crate::progress_log::emit( "phase1_repo_task_result", serde_json::json!({ "repo_key_rsync_base_uri": envelope.repo_identity.rsync_base_uri, "rsync_failure_scope_uri": envelope.rsync_failure_scope_uri, "repo_key_notification_uri": envelope.repo_identity.notification_uri, "timing_ms": envelope.timing_ms, "transport_mode": match envelope.mode { RepoTransportMode::Rrdp => "rrdp", RepoTransportMode::Rsync => "rsync", }, "result": match &envelope.result { RepoTransportResultKind::Success { .. } => "success", RepoTransportResultKind::Failed { .. } => "failed", }, }), ); let finished_at = time::OffsetDateTime::now_utc(); let completion = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.complete_transport_result(envelope, finished_at)? }; if !completion.follow_up_tasks.is_empty() { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); for mut task in completion.follow_up_tasks { if let crate::parallel::types::RepoDedupKey::RsyncScope { rsync_scope_uri } = &task.dedup_key { if self .retry_short_rsync_scopes .lock() .expect("retry short rsync scopes lock poisoned") .contains(rsync_scope_uri) { task.retry_short_timeout = true; } } crate::progress_log::emit( "phase1_repo_task_enqueued", serde_json::json!({ "manifest_rsync_uri": serde_json::Value::Null, "publication_point_rsync_uri": task.requesters.first().map(|r| r.publication_point_rsync_uri.clone()), "repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri, "repo_key_notification_uri": task.repo_identity.notification_uri, "priority": task.priority, "transport_mode": "rsync", }), ); coordinator.push_transport_task(task); } } self.drain_pending_transport_tasks()?; let completions = { let coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator .finalized_runtime_records_for_transport_result(&completed_envelope) .into_iter() .filter_map(|record| { let outcome = match record.state { RepoRuntimeState::RrdpOk | RepoRuntimeState::RsyncOk => record .last_success .as_ref() .map(|result| outcome_from_transport_result(result, record.state)), RepoRuntimeState::FailedTerminal => record .terminal_failure .as_ref() .map(|result| outcome_from_transport_result(result, record.state)), _ => None, }?; Some(RepoSyncRuntimeCompletion { identity: record.identity, state: record.state, outcome, }) }) .collect::>() }; if completions.is_empty() { return Ok(None); } Ok(Some(RepoSyncRuntimeEvent { transport_identity, completions, })) } fn pump_transport_results( &self, timeout: Duration, max_events: usize, ) -> Result, String> { let max_events = max_events.max(1); let mut events = Vec::new(); for index in 0..max_events { let poll_timeout = if index == 0 { timeout } else { Duration::from_millis(0) }; let Some(event) = self.pump_one_transport_result(poll_timeout)? else { break; }; events.push(event); } Ok(events) } fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option { let coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator .runtime_record(identity) .map(|record| record.state) } fn resolved_outcome_for_identity( &self, identity: &RepoIdentity, ) -> Option { let coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); let record = coordinator.runtime_record(identity)?; match record.state { RepoRuntimeState::RrdpOk | RepoRuntimeState::RsyncOk => record .last_success .as_ref() .map(|result| outcome_from_transport_result(result, record.state)), RepoRuntimeState::FailedTerminal => record .terminal_failure .as_ref() .map(|result| outcome_from_transport_result(result, record.state)), _ => None, } } } impl RepoSyncRuntime for Phase1RepoSyncRuntime { fn sync_publication_point_repo( &self, ca: &CaInstanceHandle, ) -> Result { if let RepoSyncRequestStatus::Ready { outcome, .. } = self.request_publication_point_repo(ca, 0)? { return Ok(outcome); } let identity = Self::build_identity(ca); loop { if let Some(done) = self.resolved_outcome_for_identity(&identity) { return Ok(done); } let _ = self.recv_repo_result_timeout(Duration::from_millis(50))?; } } fn request_publication_point_repo( &self, ca: &CaInstanceHandle, priority: u8, ) -> Result { self.request_transport_for_ca(ca, priority) } fn recv_repo_result_timeout( &self, timeout: Duration, ) -> Result, String> { self.pump_one_transport_result(timeout) } fn drain_repo_results_timeout( &self, timeout: Duration, max_events: usize, ) -> Result, String> { self.pump_transport_results(timeout, max_events) } fn reset_run_state(&self) -> Result<(), String> { { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); if coordinator.stats.repo_tasks_running != 0 { return Err(format!( "cannot reset repo runtime with {} repo task(s) still running", coordinator.stats.repo_tasks_running )); } coordinator.reset_run_state(); } loop { let maybe_result = { let pool = self.worker_pool.lock().expect("worker pool lock poisoned"); pool.recv_result_timeout(Duration::from_millis(0))? }; if maybe_result.is_none() { break; } } Ok(()) } fn prefetch_discovered_children( &self, children: &[DiscoveredChildCaInstance], ) -> Result<(), String> { for child in children { let _ = self.request_publication_point_repo(&child.handle, 1)?; } Ok(()) } fn prefetch_transport_requests( &self, snapshot: &TransportPrefetchSnapshot, validation_time: time::OffsetDateTime, ) -> Result { let mut stats = TransportPrefetchDispatchStats { loaded_requests: snapshot.requests.len() as u64, ..TransportPrefetchDispatchStats::default() }; for request in &snapshot.requests { let identity = request.to_identity(); let current_rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri); let current_rsync_failure_scope_uri = (self.rsync_failure_scope_resolver)(&identity.rsync_base_uri); if current_rsync_scope_uri != request.rsync_scope_uri || current_rsync_failure_scope_uri != request.rsync_failure_scope_uri { stats.skipped_incompatible += 1; continue; } if request.retry_short_rsync_timeout() { self.retry_short_rsync_scopes .lock() .expect("retry short rsync scopes lock poisoned") .insert(current_rsync_scope_uri.clone()); } let action = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.register_transport_request( identity, request.to_requester(), validation_time, request.priority, current_rsync_scope_uri, current_rsync_failure_scope_uri, self.sync_preference, request.retry_short_timeout(), ) }; match action { TransportRequestAction::Enqueue(task) => { stats.enqueued_tasks += 1; crate::progress_log::emit( "phase1_repo_prefetch_enqueued", serde_json::json!({ "repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri, "rsync_failure_scope_uri": task.rsync_failure_scope_uri, "repo_key_notification_uri": task.repo_identity.notification_uri, "priority": task.priority, "transport_mode": match task.mode { RepoTransportMode::Rrdp => "rrdp", RepoTransportMode::Rsync => "rsync", }, }), ); } TransportRequestAction::Waiting { .. } => { stats.waiting_requests += 1; } TransportRequestAction::ReusedSuccess(_) | TransportRequestAction::ReusedTerminalFailure(_) => { stats.reused_results += 1; } } } self.drain_pending_transport_tasks()?; Ok(stats) } fn transport_prefetch_snapshot(&self) -> TransportPrefetchSnapshot { self.transport_prefetch_recorder .as_ref() .map(|recorder| { recorder .lock() .expect("transport prefetch recorder lock poisoned") .snapshot(self.sync_preference) }) .unwrap_or_else(|| TransportPrefetchSnapshot::new(self.sync_preference, Vec::new())) } } fn outcome_from_transport_result( envelope: &RepoTransportResultEnvelope, state: RepoRuntimeState, ) -> RepoSyncRuntimeOutcome { match (&envelope.result, state) { (RepoTransportResultKind::Success { source, warnings }, RepoRuntimeState::RrdpOk) => { RepoSyncRuntimeOutcome { repo_sync_ok: true, repo_sync_err: None, repo_sync_source: Some(source.clone()), repo_sync_phase: Some("rrdp_ok".to_string()), repo_sync_duration_ms: envelope.timing_ms, warnings: warnings.clone(), } } (RepoTransportResultKind::Success { source, warnings }, RepoRuntimeState::RsyncOk) => { RepoSyncRuntimeOutcome { repo_sync_ok: true, repo_sync_err: None, repo_sync_source: Some(source.clone()), repo_sync_phase: Some(if envelope.repo_identity.notification_uri.is_some() { "rrdp_failed_rsync_ok".to_string() } else { "rsync_only_ok".to_string() }), repo_sync_duration_ms: envelope.timing_ms, warnings: warnings.clone(), } } ( RepoTransportResultKind::Failed { detail, warnings }, RepoRuntimeState::FailedTerminal, ) => RepoSyncRuntimeOutcome { repo_sync_ok: false, repo_sync_err: Some(detail.clone()), repo_sync_source: None, repo_sync_phase: Some(if envelope.repo_identity.notification_uri.is_some() { "rrdp_failed_rsync_failed".to_string() } else { "rsync_failed".to_string() }), repo_sync_duration_ms: envelope.timing_ms, warnings: warnings.clone(), }, _ => RepoSyncRuntimeOutcome { repo_sync_ok: false, repo_sync_err: Some("repo runtime state unresolved".to_string()), repo_sync_source: None, repo_sync_phase: Some("repo_runtime_unresolved".to_string()), repo_sync_duration_ms: envelope.timing_ms, warnings: Vec::new(), }, } } #[cfg(test)] mod tests { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use crate::parallel::config::ParallelPhase1Config; use crate::parallel::repo_runtime::{Phase1RepoSyncRuntime, RepoSyncRuntime}; use crate::parallel::repo_worker::{ RepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig, }; use crate::parallel::run_coordinator::GlobalRunCoordinator; use crate::parallel::transport_prefetch::TransportPrefetchSnapshot; use crate::parallel::types::{ RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, RepoTransportTask, TalInputSpec, }; use crate::policy::SyncPreference; use crate::report::Warning; use crate::validation::tree::{CaCertificateRef, CaInstanceHandle, DiscoveredChildCaInstance}; fn sample_ca(manifest: &str) -> CaInstanceHandle { CaInstanceHandle { depth: 0, tal_id: "arin".to_string(), parent_manifest_rsync_uri: None, ca_certificate: CaCertificateRef::inline_der(vec![1, 2, 3]), ca_certificate_rsync_uri: None, effective_ip_resources: None, effective_as_resources: None, rsync_base_uri: "rsync://example.test/repo/".to_string(), manifest_rsync_uri: manifest.to_string(), publication_point_rsync_uri: "rsync://example.test/repo/".to_string(), rrdp_notification_uri: Some("https://example.test/notify.xml".to_string()), } } struct SuccessTransportExecutor; impl RepoTransportExecutor for SuccessTransportExecutor { fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: task.mode, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 7, result: RepoTransportResultKind::Success { source: match task.mode { RepoTransportMode::Rrdp => "rrdp".to_string(), RepoTransportMode::Rsync => "rsync".to_string(), }, warnings: vec![Warning::new("transport ok")], }, } } } struct CountingSuccessTransportExecutor { count: Arc, } impl RepoTransportExecutor for CountingSuccessTransportExecutor { fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { self.count.fetch_add(1, Ordering::SeqCst); RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: task.mode, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 7, result: RepoTransportResultKind::Success { source: match task.mode { RepoTransportMode::Rrdp => "rrdp".to_string(), RepoTransportMode::Rsync => "rsync".to_string(), }, warnings: vec![Warning::new("transport ok")], }, } } } struct FailRrdpThenSucceedRsyncExecutor { rrdp_count: Arc, rsync_count: Arc, } impl RepoTransportExecutor for FailRrdpThenSucceedRsyncExecutor { fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { match task.mode { RepoTransportMode::Rrdp => { self.rrdp_count.fetch_add(1, Ordering::SeqCst); RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: RepoTransportMode::Rrdp, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 10, result: RepoTransportResultKind::Failed { detail: "rrdp failed".to_string(), warnings: vec![Warning::new("rrdp failed")], }, } } RepoTransportMode::Rsync => { self.rsync_count.fetch_add(1, Ordering::SeqCst); RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: RepoTransportMode::Rsync, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 12, result: RepoTransportResultKind::Success { source: "rsync".to_string(), warnings: vec![Warning::new("rsync ok")], }, } } } } } struct FailRrdpThenFailRsyncExecutor { rrdp_count: Arc, rsync_count: Arc, } impl RepoTransportExecutor for FailRrdpThenFailRsyncExecutor { fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { match task.mode { RepoTransportMode::Rrdp => { self.rrdp_count.fetch_add(1, Ordering::SeqCst); RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: RepoTransportMode::Rrdp, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 10, result: RepoTransportResultKind::Failed { detail: "rrdp failed".to_string(), warnings: vec![Warning::new("rrdp failed")], }, } } RepoTransportMode::Rsync => { self.rsync_count.fetch_add(1, Ordering::SeqCst); RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: task.repo_identity, mode: RepoTransportMode::Rsync, tal_id: task.tal_id, rir_id: task.rir_id, timing_ms: 12, result: RepoTransportResultKind::Failed { detail: "rsync failed".to_string(), warnings: vec![Warning::new("rsync failed")], }, } } } } } #[test] fn phase1_runtime_waits_for_rrdp_transport_and_returns_rrdp_outcome() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let outcome = runtime .sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft")) .expect("sync repo"); assert!(outcome.repo_sync_ok); assert_eq!(outcome.repo_sync_source.as_deref(), Some("rrdp")); assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok")); } #[test] fn phase1_runtime_request_repo_returns_pending_then_repo_ready_event() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let ca = sample_ca("rsync://example.test/repo/root.mft"); let status = runtime .request_publication_point_repo(&ca, 0) .expect("request repo"); let identity = match status { super::RepoSyncRequestStatus::Pending { identity, state } => { assert_eq!(state, RepoRuntimeState::WaitingRrdp); identity } other => panic!("expected pending, got {other:?}"), }; let event = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("repo event") .expect("event"); assert_eq!(event.transport_identity, identity); assert_eq!(event.completions.len(), 1); assert_eq!(event.completions[0].identity, identity); assert_eq!(event.completions[0].state, RepoRuntimeState::RrdpOk); assert!(event.completions[0].outcome.repo_sync_ok); assert_eq!( event.completions[0].outcome.repo_sync_source.as_deref(), Some("rrdp") ); } #[test] fn phase1_runtime_request_repo_reuses_ready_event_result() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let ca = sample_ca("rsync://example.test/repo/root.mft"); let first = runtime .request_publication_point_repo(&ca, 0) .expect("request repo"); assert!(matches!( first, super::RepoSyncRequestStatus::Pending { .. } )); let _ = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("repo event") .expect("event"); let second = runtime .request_publication_point_repo(&ca, 0) .expect("request repo reused"); match second { super::RepoSyncRequestStatus::Ready { outcome, .. } => { assert!(outcome.repo_sync_ok); assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok")); } other => panic!("expected ready reuse, got {other:?}"), } } #[test] fn phase1_runtime_repo_event_reports_all_finalized_identities_for_shared_rrdp() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let ca1 = sample_ca("rsync://example.test/repo/root.mft"); let mut ca2 = sample_ca("rsync://example.test/other/root.mft"); ca2.rsync_base_uri = "rsync://example.test/other/".to_string(); ca2.publication_point_rsync_uri = "rsync://example.test/other/".to_string(); let id1 = match runtime .request_publication_point_repo(&ca1, 0) .expect("request first") { super::RepoSyncRequestStatus::Pending { identity, .. } => identity, other => panic!("expected first pending, got {other:?}"), }; let id2 = match runtime .request_publication_point_repo(&ca2, 0) .expect("request second") { super::RepoSyncRequestStatus::Pending { identity, .. } => identity, other => panic!("expected second pending, got {other:?}"), }; assert_ne!(id1, id2); let event = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("repo event") .expect("event"); let mut identities = event .completions .iter() .map(|completion| completion.identity.clone()) .collect::>(); identities.sort_by(|a, b| a.rsync_base_uri.cmp(&b.rsync_base_uri)); let mut expected = vec![id1, id2]; expected.sort_by(|a, b| a.rsync_base_uri.cmp(&b.rsync_base_uri)); assert_eq!(identities, expected); assert!( event .completions .iter() .all(|completion| completion.state == RepoRuntimeState::RrdpOk) ); } #[test] fn phase1_runtime_drains_multiple_ready_transport_events() { let count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 2 }, CountingSuccessTransportExecutor { count: Arc::clone(&count), }, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let ca1 = sample_ca("rsync://example.test/repo/root.mft"); let mut ca2 = sample_ca("rsync://example.net/repo/root.mft"); ca2.rsync_base_uri = "rsync://example.net/repo/".to_string(); ca2.publication_point_rsync_uri = "rsync://example.net/repo/".to_string(); ca2.rrdp_notification_uri = Some("https://example.net/notify.xml".to_string()); assert!(matches!( runtime .request_publication_point_repo(&ca1, 0) .expect("request ca1"), super::RepoSyncRequestStatus::Pending { .. } )); assert!(matches!( runtime .request_publication_point_repo(&ca2, 0) .expect("request ca2"), super::RepoSyncRequestStatus::Pending { .. } )); let started = Instant::now(); while count.load(Ordering::SeqCst) < 2 && started.elapsed() < Duration::from_secs(1) { std::thread::sleep(Duration::from_millis(5)); } assert_eq!(count.load(Ordering::SeqCst), 2); let events = runtime .drain_repo_results_timeout(Duration::from_millis(0), 8) .expect("drain events"); assert_eq!(events.len(), 2); assert_eq!( events .iter() .map(|event| event.completions.len()) .sum::(), 2 ); } #[test] fn phase1_runtime_reset_run_state_clears_completed_transport_reuse() { let count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, CountingSuccessTransportExecutor { count: Arc::clone(&count), }, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let ca = sample_ca("rsync://example.test/repo/root.mft"); assert!(matches!( runtime .request_publication_point_repo(&ca, 0) .expect("first request"), super::RepoSyncRequestStatus::Pending { .. } )); let _ = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("first event") .expect("event"); assert_eq!(count.load(Ordering::SeqCst), 1); assert!(matches!( runtime .request_publication_point_repo(&ca, 0) .expect("ready reuse before reset"), super::RepoSyncRequestStatus::Ready { .. } )); runtime.reset_run_state().expect("reset"); assert!(matches!( runtime .request_publication_point_repo(&ca, 0) .expect("second request after reset"), super::RepoSyncRequestStatus::Pending { .. } )); let _ = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("second event") .expect("event"); assert_eq!(count.load(Ordering::SeqCst), 2); } #[test] fn phase1_runtime_records_prefetch_snapshot_only_when_enabled() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( coordinator, pool, Arc::new(|base: &str| base.to_string()), Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), SyncPreference::RrdpThenRsync, true, ); let ca = sample_ca("rsync://example.test/repo/root.mft"); let _ = runtime .request_publication_point_repo(&ca, 0) .expect("request repo"); let snapshot = runtime.transport_prefetch_snapshot(); assert_eq!(snapshot.requests.len(), 1); assert_eq!( snapshot.requests[0].repo_identity.rsync_base_uri, "rsync://example.test/repo/" ); assert_eq!( snapshot.requests[0].rsync_failure_scope_uri.as_deref(), Some("rsync://example.test/") ); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|base: &str| base.to_string()), SyncPreference::RrdpThenRsync, ); let _ = runtime .request_publication_point_repo(&ca, 0) .expect("request repo without recording"); assert!(runtime.transport_prefetch_snapshot().requests.is_empty()); } #[test] fn phase1_runtime_prefetch_dispatches_and_later_request_waits_on_same_task() { let count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, CountingSuccessTransportExecutor { count: Arc::clone(&count), }, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( coordinator, pool, Arc::new(|base: &str| base.to_string()), Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), SyncPreference::RrdpThenRsync, true, ); let ca = sample_ca("rsync://example.test/repo/root.mft"); let mut recorder = crate::parallel::transport_prefetch::TransportPrefetchRecorder::default(); recorder.record_registered_request( &super::Phase1RepoSyncRuntime::::build_identity(&ca), &super::Phase1RepoSyncRuntime::::build_requester(&ca), 0, "rsync://example.test/repo/".to_string(), Some("rsync://example.test/".to_string()), SyncPreference::RrdpThenRsync, ); let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync); let stats = runtime .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) .expect("prefetch transport requests"); assert_eq!(stats.loaded_requests, 1); assert_eq!(stats.enqueued_tasks, 1); let status = runtime .request_publication_point_repo(&ca, 0) .expect("request after prefetch"); assert!(matches!( status, super::RepoSyncRequestStatus::Pending { state: RepoRuntimeState::WaitingRrdp, .. } )); let _ = runtime .recv_repo_result_timeout(Duration::from_secs(1)) .expect("repo event") .expect("event"); assert_eq!(count.load(Ordering::SeqCst), 1); } #[test] fn phase1_runtime_does_not_persist_prefetch_only_requests() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( coordinator, pool, Arc::new(|base: &str| base.to_string()), Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), SyncPreference::RrdpThenRsync, true, ); let snapshot = TransportPrefetchSnapshot::new( SyncPreference::RrdpThenRsync, vec![crate::parallel::transport_prefetch::TransportPrefetchRequest::from_registered_request( &crate::parallel::types::RepoIdentity::new( Some("https://example.test/notify.xml".to_string()), "rsync://example.test/repo/", ), &crate::parallel::types::RepoRequester::with_tal_rir( "arin", "arin", "rsync://example.test/repo/root.mft", "rsync://example.test/repo/", "arin:root", ), 0, "rsync://example.test/repo/".to_string(), Some("rsync://example.test/".to_string()), SyncPreference::RrdpThenRsync, )], ); let stats = runtime .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) .expect("prefetch transport requests"); assert_eq!(stats.enqueued_tasks, 1); assert!( runtime.transport_prefetch_snapshot().requests.is_empty(), "prefetch-only requests should not be carried forward forever" ); } #[test] fn phase1_runtime_prefetch_skips_requests_when_scope_resolver_differs() { let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, SuccessTransportExecutor, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new_with_failure_scope_and_prefetch_recording( coordinator, pool, Arc::new(|_base: &str| "rsync://example.test/different/".to_string()), Arc::new(|_base: &str| Some("rsync://example.test/".to_string())), SyncPreference::RrdpThenRsync, true, ); let snapshot = TransportPrefetchSnapshot::new( SyncPreference::RrdpThenRsync, vec![crate::parallel::transport_prefetch::TransportPrefetchRequest::from_registered_request( &crate::parallel::types::RepoIdentity::new( Some("https://example.test/notify.xml".to_string()), "rsync://example.test/repo/", ), &crate::parallel::types::RepoRequester::with_tal_rir( "arin", "arin", "rsync://example.test/repo/root.mft", "rsync://example.test/repo/", "arin:root", ), 0, "rsync://example.test/repo/".to_string(), Some("rsync://example.test/".to_string()), SyncPreference::RrdpThenRsync, )], ); let stats = runtime .prefetch_transport_requests(&snapshot, time::OffsetDateTime::UNIX_EPOCH) .expect("prefetch transport requests"); assert_eq!(stats.loaded_requests, 1); assert_eq!(stats.enqueued_tasks, 0); assert_eq!(stats.skipped_incompatible, 1); } #[test] fn phase1_runtime_transitions_rrdp_failure_to_rsync_success() { let rrdp_count = Arc::new(AtomicUsize::new(0)); let rsync_count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, FailRrdpThenSucceedRsyncExecutor { rrdp_count: Arc::clone(&rrdp_count), rsync_count: Arc::clone(&rsync_count), }, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|_base: &str| "rsync://example.test/module/".to_string()), SyncPreference::RrdpThenRsync, ); let outcome = runtime .sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft")) .expect("sync repo"); assert!(outcome.repo_sync_ok); assert_eq!(outcome.repo_sync_source.as_deref(), Some("rsync")); assert_eq!( outcome.repo_sync_phase.as_deref(), Some("rrdp_failed_rsync_ok") ); assert_eq!(rrdp_count.load(Ordering::SeqCst), 1); assert_eq!(rsync_count.load(Ordering::SeqCst), 1); } #[test] fn phase1_runtime_terminal_failure_keeps_rsync_failure_duration() { let rrdp_count = Arc::new(AtomicUsize::new(0)); let rsync_count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, FailRrdpThenFailRsyncExecutor { rrdp_count: Arc::clone(&rrdp_count), rsync_count: Arc::clone(&rsync_count), }, ) .expect("pool"); let runtime = Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|_base: &str| "rsync://example.test/module/".to_string()), SyncPreference::RrdpThenRsync, ); let outcome = runtime .sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft")) .expect("sync repo"); assert!(!outcome.repo_sync_ok); assert_eq!( outcome.repo_sync_phase.as_deref(), Some("rrdp_failed_rsync_failed") ); assert_eq!(outcome.repo_sync_duration_ms, 12); assert_eq!(rrdp_count.load(Ordering::SeqCst), 1); assert_eq!(rsync_count.load(Ordering::SeqCst), 1); } #[test] fn phase1_runtime_prefetch_submits_transport_task_before_consumption() { let rrdp_count = Arc::new(AtomicUsize::new(0)); let rsync_count = Arc::new(AtomicUsize::new(0)); let coordinator = GlobalRunCoordinator::new( ParallelPhase1Config::default(), vec![TalInputSpec::from_url("https://example.test/arin.tal")], ); let pool = RepoTransportWorkerPool::new( RepoWorkerPoolConfig { max_workers: 1 }, FailRrdpThenSucceedRsyncExecutor { rrdp_count: Arc::clone(&rrdp_count), rsync_count: Arc::clone(&rsync_count), }, ) .expect("pool"); let runtime = Arc::new(Phase1RepoSyncRuntime::new( coordinator, pool, Arc::new(|_base: &str| "rsync://example.test/module/".to_string()), SyncPreference::RrdpThenRsync, )); let child = DiscoveredChildCaInstance { handle: sample_ca("rsync://example.test/repo/child.mft"), discovered_from: crate::audit::DiscoveredFrom { parent_manifest_rsync_uri: "rsync://example.test/repo/root.mft".to_string(), child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer".to_string(), child_ca_certificate_sha256_hex: "00".repeat(32), }, child_entry_projection: None, }; runtime .prefetch_discovered_children(std::slice::from_ref(&child)) .expect("prefetch"); let started = Instant::now(); while rrdp_count.load(Ordering::SeqCst) == 0 && started.elapsed() < Duration::from_secs(1) { std::thread::sleep(Duration::from_millis(10)); } assert_eq!(rrdp_count.load(Ordering::SeqCst), 1); let outcome = runtime .sync_publication_point_repo(&child.handle) .expect("sync child repo"); assert!(outcome.repo_sync_ok); assert_eq!(rsync_count.load(Ordering::SeqCst), 1); } }