内存优化,优化随着更新ccr文件增多,内存增长的情况

This commit is contained in:
xiuting.xu 2026-05-13 14:08:07 +08:00
parent ff711a2fe1
commit 9dc69e5f31
10 changed files with 152 additions and 65 deletions

View File

@ -9,7 +9,7 @@ use chrono::{FixedOffset, Utc};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{info, warn}; use tracing::{info, warn};
use rpki::rtr::cache::{RtrCache, SharedRtrCache}; use rpki::rtr::cache::{RtrCache, SharedRtrCache, Snapshot};
use rpki::rtr::payload::Timing; use rpki::rtr::payload::Timing;
use rpki::rtr::server::ssh::SshAuthMode; use rpki::rtr::server::ssh::SshAuthMode;
use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService};
@ -431,13 +431,14 @@ fn spawn_refresh_task(
match load_payloads_from_latest_sources(&payload_load_config) { match load_payloads_from_latest_sources(&payload_load_config) {
Ok(payloads) => { Ok(payloads) => {
let (payload_count, updated) = {
let payload_count = payloads.len(); let payload_count = payloads.len();
let updated = { let source_snapshot = Snapshot::from_payloads(payloads);
let old_cache = shared_cache.load_full(); let old_cache = shared_cache.load_full();
let old_serial = old_cache.serial_for_version(2); let old_serial = old_cache.serial_for_version(2);
let mut next_cache = old_cache.as_ref().clone(); let mut next_cache = old_cache.as_ref().clone();
match next_cache.update(payloads, &store) { let updated = match next_cache.update_with_snapshot(source_snapshot, &store) {
Ok(()) => { Ok(()) => {
let new_serial = next_cache.serial_for_version(2); let new_serial = next_cache.serial_for_version(2);
shared_cache.store(std::sync::Arc::new(next_cache)); shared_cache.store(std::sync::Arc::new(next_cache));
@ -462,7 +463,8 @@ fn spawn_refresh_task(
warn!("RTR cache update failed: {:?}", err); warn!("RTR cache update failed: {:?}", err);
false false
} }
} };
(payload_count, updated)
}; };
info!( info!(
"RTR source-to-delta timing: phase=refresh_cache_update_complete, ccr_dir={}, payload_count={}, changed={}, elapsed_ms={}", "RTR source-to-delta timing: phase=refresh_cache_update_complete, ccr_dir={}, payload_count={}, changed={}, elapsed_ms={}",
@ -474,7 +476,7 @@ fn spawn_refresh_task(
if updated { if updated {
notifier.notify_cache_updated(); notifier.notify_cache_updated();
info!("RTR cache updated, serial notify broadcast sent"); info!("RTR cache updated, notify signal emitted (session may skip SerialNotify due to rate limit)");
} }
} }
Err(err) => { Err(err) => {

19
src/rtr/cache/core.rs vendored
View File

@ -314,16 +314,24 @@ impl RtrCache {
pub(super) fn apply_update( pub(super) fn apply_update(
&mut self, &mut self,
new_payloads: Vec<Payload>, new_payloads: Vec<Payload>,
) -> Result<Option<AppliedUpdate>> {
let source_snapshot = Snapshot::from_payloads(new_payloads);
self.apply_update_from_snapshot(source_snapshot)
}
pub(super) fn apply_update_from_snapshot(
&mut self,
source_snapshot: Snapshot,
) -> Result<Option<AppliedUpdate>> { ) -> Result<Option<AppliedUpdate>> {
self.last_update_begin = DualTime::now(); self.last_update_begin = DualTime::now();
info!( info!(
"RTR cache applying update: availability={:?}, current_serials={:?}, incoming_payloads={}", "RTR cache applying update: availability={:?}, current_serials={:?}, incoming_snapshot_sizes=(origins={}, router_keys={}, aspas={})",
self.availability, self.availability,
self.serials(), self.serials(),
new_payloads.len() source_snapshot.origins().len(),
source_snapshot.router_keys().len(),
source_snapshot.aspas().len()
); );
let source_snapshot = Snapshot::from_payloads(new_payloads);
if source_snapshot.is_empty() { if source_snapshot.is_empty() {
let changed = self.availability != CacheAvailability::NoDataAvailable let changed = self.availability != CacheAvailability::NoDataAvailable
|| self.versions.iter().any(|state| !state.snapshot.is_empty()) || self.versions.iter().any(|state| !state.snapshot.is_empty())
@ -598,7 +606,7 @@ fn estimate_delta_window_payload_wire_size(deltas: &VecDeque<Arc<Delta>>) -> usi
fn estimate_delta_wire_size(delta: &Delta) -> usize { fn estimate_delta_wire_size(delta: &Delta) -> usize {
delta delta
.payloads_for_rtr() .payload_updates_for_rtr()
.iter() .iter()
.map(|(announce, payload)| estimate_payload_wire_size(payload, *announce)) .map(|(announce, payload)| estimate_payload_wire_size(payload, *announce))
.sum() .sum()
@ -643,6 +651,7 @@ pub enum SerialResult {
ResetRequired, ResetRequired,
} }
#[derive(Clone)]
pub(super) struct AppliedUpdate { pub(super) struct AppliedUpdate {
pub(super) availability: CacheAvailability, pub(super) availability: CacheAvailability,
pub(super) snapshots: [Arc<Snapshot>; VERSION_COUNT], pub(super) snapshots: [Arc<Snapshot>; VERSION_COUNT],

View File

@ -6,7 +6,8 @@ mod store;
pub use core::{CacheAvailability, RtrCache, RtrCacheBuilder, SerialResult, SessionIds}; pub use core::{CacheAvailability, RtrCache, RtrCacheBuilder, SerialResult, SessionIds};
pub use model::{Delta, DualTime, Snapshot}; pub use model::{Delta, DualTime, Snapshot};
pub use ordering::{ pub use ordering::{
OrderingViolation, validate_payload_updates_for_rtr, validate_payloads_for_rtr, OrderingViolation, validate_payload_update_refs_for_rtr, validate_payload_updates_for_rtr,
validate_payloads_for_rtr,
}; };
use std::sync::Arc; use std::sync::Arc;

View File

@ -121,25 +121,13 @@ impl Snapshot {
} }
pub fn from_payloads(payloads: Vec<Payload>) -> Self { pub fn from_payloads(payloads: Vec<Payload>) -> Self {
let mut origins = Vec::new(); let mut builder = SnapshotBuilder::new();
let mut router_keys = Vec::new(); builder.extend(payloads);
let mut aspas = Vec::new(); builder.finish()
for p in payloads {
match p {
Payload::RouteOrigin(o) => {
origins.push(o);
}
Payload::RouterKey(k) => {
router_keys.push(k);
}
Payload::Aspa(a) => {
aspas.push(a);
}
}
} }
Snapshot::new(origins, router_keys, normalize_aspas(aspas)) pub fn builder() -> SnapshotBuilder {
SnapshotBuilder::new()
} }
pub fn project_for_version(&self, version: u8) -> Self { pub fn project_for_version(&self, version: u8) -> Self {
@ -314,14 +302,46 @@ impl Snapshot {
} }
} }
#[derive(Debug, Default)]
pub struct SnapshotBuilder {
origins: Vec<RouteOrigin>,
router_keys: Vec<RouterKey>,
aspas: Vec<Aspa>,
}
impl SnapshotBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, payload: Payload) {
match payload {
Payload::RouteOrigin(o) => self.origins.push(o),
Payload::RouterKey(k) => self.router_keys.push(k),
Payload::Aspa(a) => self.aspas.push(a),
}
}
pub fn extend<I>(&mut self, payloads: I)
where
I: IntoIterator<Item = Payload>,
{
for payload in payloads {
self.push(payload);
}
}
pub fn finish(self) -> Snapshot {
Snapshot::new(self.origins, self.router_keys, normalize_aspas(self.aspas))
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Delta { pub struct Delta {
serial: u32, serial: u32,
announced: Vec<Payload>, announced: Vec<Payload>,
withdrawn: Vec<Payload>, withdrawn: Vec<Payload>,
created_at: DualTime, created_at: DualTime,
#[serde(skip)]
updates_for_rtr: OnceLock<Vec<(bool, Payload)>>,
} }
impl Delta { impl Delta {
@ -331,16 +351,12 @@ impl Delta {
sort_payloads_for_rtr(&mut announced, true); sort_payloads_for_rtr(&mut announced, true);
sort_payloads_for_rtr(&mut withdrawn, false); sort_payloads_for_rtr(&mut withdrawn, false);
let cached_updates = build_payload_updates_for_rtr(&announced, &withdrawn);
let updates_for_rtr = OnceLock::new();
let _ = updates_for_rtr.set(cached_updates);
Delta { Delta {
serial, serial,
announced, announced,
withdrawn, withdrawn,
created_at: DualTime::now(), created_at: DualTime::now(),
updates_for_rtr,
} }
} }
@ -364,15 +380,25 @@ impl Delta {
self.announced.is_empty() && self.withdrawn.is_empty() self.announced.is_empty() && self.withdrawn.is_empty()
} }
pub fn payload_updates_for_rtr(&self) -> &[(bool, Payload)] { pub fn payload_updates_for_rtr(&self) -> Vec<(bool, &Payload)> {
self.updates_for_rtr build_payload_updates_for_rtr_refs(&self.announced, &self.withdrawn)
.get_or_init(|| build_payload_updates_for_rtr(&self.announced, &self.withdrawn))
.as_slice()
} }
}
pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> { fn build_payload_updates_for_rtr_refs<'a>(
self.payload_updates_for_rtr().to_vec() announced: &'a [Payload],
} withdrawn: &'a [Payload],
) -> Vec<(bool, &'a Payload)> {
let mut updates = Vec::with_capacity(announced.len() + withdrawn.len());
updates.extend(announced.iter().map(|p| (true, p)));
updates.extend(withdrawn.iter().map(|p| (false, p)));
updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| {
compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd)
});
updates
} }
fn build_payload_updates_for_rtr( fn build_payload_updates_for_rtr(

View File

@ -150,6 +150,24 @@ pub fn validate_payload_updates_for_rtr(
Ok(()) Ok(())
} }
pub fn validate_payload_update_refs_for_rtr(
updates: &[(bool, &Payload)],
) -> Result<(), OrderingViolation> {
for (index, pair) in updates.windows(2).enumerate() {
if compare_payload_update_for_rtr(pair[0].1, pair[0].0, pair[1].1, pair[1].0)
== Ordering::Greater
{
return Err(OrderingViolation::new_update(
index,
(pair[0].0, pair[0].1),
(pair[1].0, pair[1].1),
));
}
}
Ok(())
}
fn router_key_key(rk: &RouterKey) -> RouterKeyKey { fn router_key_key(rk: &RouterKey) -> RouterKeyKey {
RouterKeyKey::Key { RouterKeyKey::Key {
ski: rk.ski(), ski: rk.ski(),

View File

@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
use anyhow::Result; use anyhow::Result;
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use tokio::sync::watch;
use crate::rtr::payload::{Payload, Timing}; use crate::rtr::payload::{Payload, Timing};
use crate::rtr::store::RtrStore; use crate::rtr::store::RtrStore;
@ -11,8 +11,9 @@ use super::core::{AppliedUpdate, CacheAvailability, RtrCache, RtrCacheBuilder, S
use super::model::{Delta, Snapshot}; use super::model::{Delta, Snapshot};
const VERSION_COUNT: usize = 3; const VERSION_COUNT: usize = 3;
static STORE_SYNC_WORKER: OnceLock<UnboundedSender<StoreSyncJob>> = OnceLock::new(); static STORE_SYNC_WORKER: OnceLock<watch::Sender<Option<StoreSyncJob>>> = OnceLock::new();
#[derive(Clone)]
struct StoreSyncJob { struct StoreSyncJob {
store: RtrStore, store: RtrStore,
update: AppliedUpdate, update: AppliedUpdate,
@ -98,6 +99,13 @@ impl RtrCache {
} }
Ok(()) Ok(())
} }
pub fn update_with_snapshot(&mut self, snapshot: Snapshot, store: &RtrStore) -> Result<()> {
if let Some(update) = self.apply_update_from_snapshot(snapshot)? {
spawn_store_sync(store, update);
}
Ok(())
}
} }
fn try_restore_from_store( fn try_restore_from_store(
@ -172,28 +180,22 @@ fn try_restore_from_store(
fn spawn_store_sync(store: &RtrStore, update: AppliedUpdate) { fn spawn_store_sync(store: &RtrStore, update: AppliedUpdate) {
let tx = STORE_SYNC_WORKER.get_or_init(|| { let tx = STORE_SYNC_WORKER.get_or_init(|| {
let (tx, mut rx) = unbounded_channel::<StoreSyncJob>(); let (tx, mut rx) = watch::channel::<Option<StoreSyncJob>>(None);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(mut job) = rx.recv().await { while rx.changed().await.is_ok() {
while let Ok(next) = rx.try_recv() { let Some(job) = rx.borrow().clone() else {
job = next; continue;
} };
persist_update_job(job); persist_update_job(job);
} }
}); });
tx tx
}); });
if let Err(err) = tx.send(StoreSyncJob { let _ = tx.send_replace(Some(StoreSyncJob {
store: store.clone(), store: store.clone(),
update, update,
}) { }));
tracing::warn!(
"store sync worker channel closed, falling back to inline persist: {:?}",
err
);
persist_update_job(err.0);
}
} }
fn persist_update_job(job: StoreSyncJob) { fn persist_update_job(job: StoreSyncJob) {

View File

@ -12,7 +12,7 @@ use tracing::{debug, error, info, warn};
use crate::data_model::resources::ip_resources::IPAddress; use crate::data_model::resources::ip_resources::IPAddress;
use crate::rtr::cache::{ use crate::rtr::cache::{
Delta, SerialResult, SharedRtrCache, validate_payload_updates_for_rtr, Delta, SerialResult, SharedRtrCache, validate_payload_update_refs_for_rtr,
validate_payloads_for_rtr, validate_payloads_for_rtr,
}; };
use crate::rtr::error_type::ErrorCode; use crate::rtr::error_type::ErrorCode;
@ -834,8 +834,8 @@ where
let now = Instant::now(); let now = Instant::now();
if let Some(last) = self.last_notify_at { if let Some(last) = self.last_notify_at {
if now.duration_since(last) < NOTIFY_MIN_INTERVAL { if now.duration_since(last) < NOTIFY_MIN_INTERVAL {
debug!( info!(
"RTR session notify skipped due to rate limit: state={}, negotiated_version={:?}", "RTR session skipped SerialNotify due to rate limit: state={}, negotiated_version={:?}",
self.state_name(), self.state_name(),
self.version self.version
); );
@ -1005,7 +1005,7 @@ where
// References: // References:
// https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-11.4 // https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-11.4
// https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-12 // https://datatracker.ietf.org/doc/html/draft-ietf-sidrops-8210bis-25#section-12
validate_payload_updates_for_rtr(&updates).map_err(|err| anyhow!(err.to_string()))?; validate_payload_update_refs_for_rtr(&updates).map_err(|err| anyhow!(err.to_string()))?;
let version = self.version()?; let version = self.version()?;
let (announced, withdrawn, route_origins, router_keys, aspas) = let (announced, withdrawn, route_origins, router_keys, aspas) =
count_payload_updates(&updates); count_payload_updates(&updates);
@ -1019,7 +1019,7 @@ where
aspas aspas
); );
let mut writer = BufWriter::new(&mut self.stream); let mut writer = BufWriter::new(&mut self.stream);
for (announce, payload) in updates { for (announce, payload) in &updates {
Self::send_payload_to(&mut writer, payload, *announce, version).await?; Self::send_payload_to(&mut writer, payload, *announce, version).await?;
} }
writer.flush().await?; writer.flush().await?;
@ -1344,7 +1344,7 @@ fn count_payloads(payloads: &[Payload]) -> (usize, usize, usize) {
(route_origins, router_keys, aspas) (route_origins, router_keys, aspas)
} }
fn count_payload_updates(updates: &[(bool, Payload)]) -> (usize, usize, usize, usize, usize) { fn count_payload_updates(updates: &[(bool, &Payload)]) -> (usize, usize, usize, usize, usize) {
let mut announced = 0; let mut announced = 0;
let mut withdrawn = 0; let mut withdrawn = 0;
let mut route_origins = 0; let mut route_origins = 0;

View File

@ -91,6 +91,29 @@ impl SlurmFile {
result result
} }
pub fn apply_owned(&self, payloads: Vec<Payload>) -> Vec<Payload> {
let mut seen = BTreeSet::new();
let mut result = Vec::new();
for payload in payloads {
if self.validation_output_filters.matches(&payload) {
continue;
}
if seen.insert(payload.clone()) {
result.push(payload);
}
}
for assertion in self.locally_added_assertions.to_payloads() {
if seen.insert(assertion.clone()) {
result.push(assertion);
}
}
result
}
pub fn merge_named(files: Vec<(String, SlurmFile)>) -> Result<Self, SlurmError> { pub fn merge_named(files: Vec<(String, SlurmFile)>) -> Result<Self, SlurmError> {
if files.is_empty() { if files.is_empty() {
return Err(SlurmError::Invalid( return Err(SlurmError::Invalid(

View File

@ -81,7 +81,7 @@ fn apply_slurm_to_payloads_from_dir(
.map_err(|err| anyhow!("failed to merge SLURM files from '{}': {}", slurm_dir, err))?; .map_err(|err| anyhow!("failed to merge SLURM files from '{}': {}", slurm_dir, err))?;
let input_count = payloads.len(); let input_count = payloads.len();
let filtered = slurm.apply(&payloads); let filtered = slurm.apply_owned(payloads);
let output_count = filtered.len(); let output_count = filtered.len();
info!( info!(

View File

@ -515,6 +515,12 @@ async fn serial_query_returns_end_of_data_when_up_to_date() {
let mut dump = RtrDebugDumper::new(); let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap(); let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod)); dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7); assert_eq!(eod.pdu(), 7);