From 9dc69e5f310a91c8e73d56b6fdb51b08c3ee66b8 Mon Sep 17 00:00:00 2001 From: "xiuting.xu" Date: Wed, 13 May 2026 14:08:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=85=E5=AD=98=E4=BC=98=E5=8C=96=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=9A=8F=E7=9D=80=E6=9B=B4=E6=96=B0ccr?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=A2=9E=E5=A4=9A=EF=BC=8C=E5=86=85=E5=AD=98?= =?UTF-8?q?=E5=A2=9E=E9=95=BF=E7=9A=84=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main_rtr.rs | 14 ++++--- src/rtr/cache/core.rs | 19 ++++++--- src/rtr/cache/mod.rs | 3 +- src/rtr/cache/model.rs | 88 +++++++++++++++++++++++++-------------- src/rtr/cache/ordering.rs | 18 ++++++++ src/rtr/cache/store.rs | 32 +++++++------- src/rtr/session.rs | 12 +++--- src/slurm/file.rs | 23 ++++++++++ src/source/pipeline.rs | 2 +- tests/test_session.rs | 6 +++ 10 files changed, 152 insertions(+), 65 deletions(-) diff --git a/src/main_rtr.rs b/src/main_rtr.rs index bf5f72a..07fc8ad 100644 --- a/src/main_rtr.rs +++ b/src/main_rtr.rs @@ -9,7 +9,7 @@ use chrono::{FixedOffset, Utc}; use tokio::task::JoinHandle; use tracing::{info, warn}; -use rpki::rtr::cache::{RtrCache, SharedRtrCache}; +use rpki::rtr::cache::{RtrCache, SharedRtrCache, Snapshot}; use rpki::rtr::payload::Timing; use rpki::rtr::server::ssh::SshAuthMode; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; @@ -431,13 +431,14 @@ fn spawn_refresh_task( match load_payloads_from_latest_sources(&payload_load_config) { Ok(payloads) => { - let payload_count = payloads.len(); - let updated = { + let (payload_count, updated) = { + let payload_count = payloads.len(); + let source_snapshot = Snapshot::from_payloads(payloads); let old_cache = shared_cache.load_full(); let old_serial = old_cache.serial_for_version(2); let mut next_cache = old_cache.as_ref().clone(); - match next_cache.update(payloads, &store) { + let updated = match next_cache.update_with_snapshot(source_snapshot, &store) { Ok(()) => { let new_serial = next_cache.serial_for_version(2); shared_cache.store(std::sync::Arc::new(next_cache)); @@ -462,7 +463,8 @@ fn spawn_refresh_task( warn!("RTR cache update failed: {:?}", err); false } - } + }; + (payload_count, updated) }; info!( "RTR source-to-delta timing: phase=refresh_cache_update_complete, ccr_dir={}, payload_count={}, changed={}, elapsed_ms={}", @@ -474,7 +476,7 @@ fn spawn_refresh_task( if updated { notifier.notify_cache_updated(); - info!("RTR cache updated, serial notify broadcast sent"); + info!("RTR cache updated, notify signal emitted (session may skip SerialNotify due to rate limit)"); } } Err(err) => { diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index 0101bac..4044356 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -314,16 +314,24 @@ impl RtrCache { pub(super) fn apply_update( &mut self, new_payloads: Vec, + ) -> Result> { + let source_snapshot = Snapshot::from_payloads(new_payloads); + self.apply_update_from_snapshot(source_snapshot) + } + + pub(super) fn apply_update_from_snapshot( + &mut self, + source_snapshot: Snapshot, ) -> Result> { self.last_update_begin = DualTime::now(); info!( - "RTR cache applying update: availability={:?}, current_serials={:?}, incoming_payloads={}", + "RTR cache applying update: availability={:?}, current_serials={:?}, incoming_snapshot_sizes=(origins={}, router_keys={}, aspas={})", self.availability, self.serials(), - new_payloads.len() + source_snapshot.origins().len(), + source_snapshot.router_keys().len(), + source_snapshot.aspas().len() ); - - let source_snapshot = Snapshot::from_payloads(new_payloads); if source_snapshot.is_empty() { let changed = self.availability != CacheAvailability::NoDataAvailable || self.versions.iter().any(|state| !state.snapshot.is_empty()) @@ -598,7 +606,7 @@ fn estimate_delta_window_payload_wire_size(deltas: &VecDeque>) -> usi fn estimate_delta_wire_size(delta: &Delta) -> usize { delta - .payloads_for_rtr() + .payload_updates_for_rtr() .iter() .map(|(announce, payload)| estimate_payload_wire_size(payload, *announce)) .sum() @@ -643,6 +651,7 @@ pub enum SerialResult { ResetRequired, } +#[derive(Clone)] pub(super) struct AppliedUpdate { pub(super) availability: CacheAvailability, pub(super) snapshots: [Arc; VERSION_COUNT], diff --git a/src/rtr/cache/mod.rs b/src/rtr/cache/mod.rs index 9affc59..d83b99f 100644 --- a/src/rtr/cache/mod.rs +++ b/src/rtr/cache/mod.rs @@ -6,7 +6,8 @@ mod store; pub use core::{CacheAvailability, RtrCache, RtrCacheBuilder, SerialResult, SessionIds}; pub use model::{Delta, DualTime, Snapshot}; pub use ordering::{ - OrderingViolation, validate_payload_updates_for_rtr, validate_payloads_for_rtr, + OrderingViolation, validate_payload_update_refs_for_rtr, validate_payload_updates_for_rtr, + validate_payloads_for_rtr, }; use std::sync::Arc; diff --git a/src/rtr/cache/model.rs b/src/rtr/cache/model.rs index 6aecc1d..620a686 100644 --- a/src/rtr/cache/model.rs +++ b/src/rtr/cache/model.rs @@ -121,25 +121,13 @@ impl Snapshot { } pub fn from_payloads(payloads: Vec) -> Self { - let mut origins = Vec::new(); - let mut router_keys = Vec::new(); - let mut aspas = Vec::new(); + let mut builder = SnapshotBuilder::new(); + builder.extend(payloads); + builder.finish() + } - for p in payloads { - match p { - Payload::RouteOrigin(o) => { - origins.push(o); - } - Payload::RouterKey(k) => { - router_keys.push(k); - } - Payload::Aspa(a) => { - aspas.push(a); - } - } - } - - Snapshot::new(origins, router_keys, normalize_aspas(aspas)) + pub fn builder() -> SnapshotBuilder { + SnapshotBuilder::new() } pub fn project_for_version(&self, version: u8) -> Self { @@ -314,14 +302,46 @@ impl Snapshot { } } +#[derive(Debug, Default)] +pub struct SnapshotBuilder { + origins: Vec, + router_keys: Vec, + aspas: Vec, +} + +impl SnapshotBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn push(&mut self, payload: Payload) { + match payload { + Payload::RouteOrigin(o) => self.origins.push(o), + Payload::RouterKey(k) => self.router_keys.push(k), + Payload::Aspa(a) => self.aspas.push(a), + } + } + + pub fn extend(&mut self, payloads: I) + where + I: IntoIterator, + { + for payload in payloads { + self.push(payload); + } + } + + pub fn finish(self) -> Snapshot { + Snapshot::new(self.origins, self.router_keys, normalize_aspas(self.aspas)) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Delta { serial: u32, announced: Vec, withdrawn: Vec, created_at: DualTime, - #[serde(skip)] - updates_for_rtr: OnceLock>, } impl Delta { @@ -331,16 +351,12 @@ impl Delta { sort_payloads_for_rtr(&mut announced, true); sort_payloads_for_rtr(&mut withdrawn, false); - let cached_updates = build_payload_updates_for_rtr(&announced, &withdrawn); - let updates_for_rtr = OnceLock::new(); - let _ = updates_for_rtr.set(cached_updates); Delta { serial, announced, withdrawn, created_at: DualTime::now(), - updates_for_rtr, } } @@ -364,15 +380,25 @@ impl Delta { self.announced.is_empty() && self.withdrawn.is_empty() } - pub fn payload_updates_for_rtr(&self) -> &[(bool, Payload)] { - self.updates_for_rtr - .get_or_init(|| build_payload_updates_for_rtr(&self.announced, &self.withdrawn)) - .as_slice() + pub fn payload_updates_for_rtr(&self) -> Vec<(bool, &Payload)> { + build_payload_updates_for_rtr_refs(&self.announced, &self.withdrawn) } +} - pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> { - self.payload_updates_for_rtr().to_vec() - } +fn build_payload_updates_for_rtr_refs<'a>( + announced: &'a [Payload], + withdrawn: &'a [Payload], +) -> Vec<(bool, &'a Payload)> { + let mut updates = Vec::with_capacity(announced.len() + withdrawn.len()); + + updates.extend(announced.iter().map(|p| (true, p))); + updates.extend(withdrawn.iter().map(|p| (false, p))); + + updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| { + compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd) + }); + + updates } fn build_payload_updates_for_rtr( diff --git a/src/rtr/cache/ordering.rs b/src/rtr/cache/ordering.rs index 08d2879..c67675c 100644 --- a/src/rtr/cache/ordering.rs +++ b/src/rtr/cache/ordering.rs @@ -150,6 +150,24 @@ pub fn validate_payload_updates_for_rtr( Ok(()) } +pub fn validate_payload_update_refs_for_rtr( + updates: &[(bool, &Payload)], +) -> Result<(), OrderingViolation> { + for (index, pair) in updates.windows(2).enumerate() { + if compare_payload_update_for_rtr(pair[0].1, pair[0].0, pair[1].1, pair[1].0) + == Ordering::Greater + { + return Err(OrderingViolation::new_update( + index, + (pair[0].0, pair[0].1), + (pair[1].0, pair[1].1), + )); + } + } + + Ok(()) +} + fn router_key_key(rk: &RouterKey) -> RouterKeyKey { RouterKeyKey::Key { ski: rk.ski(), diff --git a/src/rtr/cache/store.rs b/src/rtr/cache/store.rs index 6d4d3f9..8ecbdfd 100644 --- a/src/rtr/cache/store.rs +++ b/src/rtr/cache/store.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::sync::{Arc, OnceLock}; use anyhow::Result; -use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; +use tokio::sync::watch; use crate::rtr::payload::{Payload, Timing}; use crate::rtr::store::RtrStore; @@ -11,8 +11,9 @@ use super::core::{AppliedUpdate, CacheAvailability, RtrCache, RtrCacheBuilder, S use super::model::{Delta, Snapshot}; const VERSION_COUNT: usize = 3; -static STORE_SYNC_WORKER: OnceLock> = OnceLock::new(); +static STORE_SYNC_WORKER: OnceLock>> = OnceLock::new(); +#[derive(Clone)] struct StoreSyncJob { store: RtrStore, update: AppliedUpdate, @@ -98,6 +99,13 @@ impl RtrCache { } Ok(()) } + + pub fn update_with_snapshot(&mut self, snapshot: Snapshot, store: &RtrStore) -> Result<()> { + if let Some(update) = self.apply_update_from_snapshot(snapshot)? { + spawn_store_sync(store, update); + } + Ok(()) + } } fn try_restore_from_store( @@ -172,28 +180,22 @@ fn try_restore_from_store( fn spawn_store_sync(store: &RtrStore, update: AppliedUpdate) { let tx = STORE_SYNC_WORKER.get_or_init(|| { - let (tx, mut rx) = unbounded_channel::(); + let (tx, mut rx) = watch::channel::>(None); tokio::spawn(async move { - while let Some(mut job) = rx.recv().await { - while let Ok(next) = rx.try_recv() { - job = next; - } + while rx.changed().await.is_ok() { + let Some(job) = rx.borrow().clone() else { + continue; + }; persist_update_job(job); } }); tx }); - if let Err(err) = tx.send(StoreSyncJob { + let _ = tx.send_replace(Some(StoreSyncJob { store: store.clone(), update, - }) { - tracing::warn!( - "store sync worker channel closed, falling back to inline persist: {:?}", - err - ); - persist_update_job(err.0); - } + })); } fn persist_update_job(job: StoreSyncJob) { diff --git a/src/rtr/session.rs b/src/rtr/session.rs index fc35648..a34d27e 100644 --- a/src/rtr/session.rs +++ b/src/rtr/session.rs @@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn}; use crate::data_model::resources::ip_resources::IPAddress; use crate::rtr::cache::{ - Delta, SerialResult, SharedRtrCache, validate_payload_updates_for_rtr, + Delta, SerialResult, SharedRtrCache, validate_payload_update_refs_for_rtr, validate_payloads_for_rtr, }; use crate::rtr::error_type::ErrorCode; @@ -834,8 +834,8 @@ where let now = Instant::now(); if let Some(last) = self.last_notify_at { if now.duration_since(last) < NOTIFY_MIN_INTERVAL { - debug!( - "RTR session notify skipped due to rate limit: state={}, negotiated_version={:?}", + info!( + "RTR session skipped SerialNotify due to rate limit: state={}, negotiated_version={:?}", self.state_name(), self.version ); @@ -1005,7 +1005,7 @@ where // References: // https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-11.4 // https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-12 - validate_payload_updates_for_rtr(&updates).map_err(|err| anyhow!(err.to_string()))?; + validate_payload_update_refs_for_rtr(&updates).map_err(|err| anyhow!(err.to_string()))?; let version = self.version()?; let (announced, withdrawn, route_origins, router_keys, aspas) = count_payload_updates(&updates); @@ -1019,7 +1019,7 @@ where aspas ); let mut writer = BufWriter::new(&mut self.stream); - for (announce, payload) in updates { + for (announce, payload) in &updates { Self::send_payload_to(&mut writer, payload, *announce, version).await?; } writer.flush().await?; @@ -1344,7 +1344,7 @@ fn count_payloads(payloads: &[Payload]) -> (usize, usize, usize) { (route_origins, router_keys, aspas) } -fn count_payload_updates(updates: &[(bool, Payload)]) -> (usize, usize, usize, usize, usize) { +fn count_payload_updates(updates: &[(bool, &Payload)]) -> (usize, usize, usize, usize, usize) { let mut announced = 0; let mut withdrawn = 0; let mut route_origins = 0; diff --git a/src/slurm/file.rs b/src/slurm/file.rs index 335a01a..3c3c10b 100644 --- a/src/slurm/file.rs +++ b/src/slurm/file.rs @@ -91,6 +91,29 @@ impl SlurmFile { result } + pub fn apply_owned(&self, payloads: Vec) -> Vec { + let mut seen = BTreeSet::new(); + let mut result = Vec::new(); + + for payload in payloads { + if self.validation_output_filters.matches(&payload) { + continue; + } + + if seen.insert(payload.clone()) { + result.push(payload); + } + } + + for assertion in self.locally_added_assertions.to_payloads() { + if seen.insert(assertion.clone()) { + result.push(assertion); + } + } + + result + } + pub fn merge_named(files: Vec<(String, SlurmFile)>) -> Result { if files.is_empty() { return Err(SlurmError::Invalid( diff --git a/src/source/pipeline.rs b/src/source/pipeline.rs index a82ac41..d043e07 100644 --- a/src/source/pipeline.rs +++ b/src/source/pipeline.rs @@ -81,7 +81,7 @@ fn apply_slurm_to_payloads_from_dir( .map_err(|err| anyhow!("failed to merge SLURM files from '{}': {}", slurm_dir, err))?; let input_count = payloads.len(); - let filtered = slurm.apply(&payloads); + let filtered = slurm.apply_owned(payloads); let output_count = filtered.len(); info!( diff --git a/tests/test_session.rs b/tests/test_session.rs index 3729e0b..e1b1b85 100644 --- a/tests/test_session.rs +++ b/tests/test_session.rs @@ -515,6 +515,12 @@ async fn serial_query_returns_end_of_data_when_up_to_date() { let mut dump = RtrDebugDumper::new(); + let response = CacheResponse::read(&mut client).await.unwrap(); + dump.push_value(response.pdu(), dump_cache_response(&response)); + assert_eq!(response.pdu(), 3); + assert_eq!(response.version(), 1); + assert_eq!(response.session_id(), 42); + let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7);