diff --git a/src/main_rtr.rs b/src/main_rtr.rs index f6631af..1f23c85 100644 --- a/src/main_rtr.rs +++ b/src/main_rtr.rs @@ -1,4 +1,5 @@ use std::env; +use std::fs; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -427,9 +428,17 @@ fn spawn_refresh_task( tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); let mut last_fingerprint: Option = None; + let mut stats_interval = tokio::time::interval(Duration::from_secs(60)); + stats_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - interval.tick().await; + tokio::select! { + _ = stats_interval.tick() => { + log_cache_memory_stats("periodic_observe", &shared_cache, ¬ifier); + continue; + } + _ = interval.tick() => {} + } let source_to_delta_started = Instant::now(); let current_fingerprint = match latest_sources_fingerprint(&payload_load_config) { @@ -452,6 +461,7 @@ fn spawn_refresh_task( current_fingerprint.slurm_files.len(), source_to_delta_started.elapsed().as_millis() ); + log_cache_memory_stats("refresh_skipped_unchanged", &shared_cache, ¬ifier); continue; } @@ -507,6 +517,7 @@ fn spawn_refresh_task( listener_count ); } + log_cache_memory_stats("refresh_complete", &shared_cache, ¬ifier); last_fingerprint = Some(current_fingerprint); } Err(err) => { @@ -522,6 +533,35 @@ fn spawn_refresh_task( }) } +fn log_cache_memory_stats(phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier) { + let cache = shared_cache.load_full(); + let stats = cache.memory_stats(); + let rss_mib = current_rss_mib(); + + info!( + "RTR memory observe: phase={}, listener_count={}, serials={:?}, snapshot_payload_counts={:?}, delta_lengths={:?}, delta_payload_counts={:?}, snapshot_arc_strong_counts={:?}, rtr_payloads_arc_strong_counts={:?}, rss_mib={:?}", + phase, + notifier.listener_count(), + stats.serials, + stats.snapshot_payload_counts, + stats.delta_lengths, + stats.delta_payload_counts, + stats.snapshot_arc_strong_counts, + stats.rtr_payloads_arc_strong_counts, + rss_mib + ); +} + +fn current_rss_mib() -> Option { + let status = fs::read_to_string("/proc/self/status").ok()?; + let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?; + let kb = vmrss_line + .split_whitespace() + .nth(1) + .and_then(|v| v.parse::().ok())?; + Some(kb / 1024) +} + async fn wait_for_shutdown() -> Result<()> { tokio::signal::ctrl_c().await?; info!("shutdown signal received"); diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index 4044356..a7885ae 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -651,6 +651,44 @@ pub enum SerialResult { ResetRequired, } +#[derive(Debug, Clone, Copy)] +pub struct CacheMemoryStats { + pub serials: [u32; VERSION_COUNT], + pub snapshot_payload_counts: [usize; VERSION_COUNT], + pub delta_lengths: [usize; VERSION_COUNT], + pub delta_payload_counts: [usize; VERSION_COUNT], + pub snapshot_arc_strong_counts: [usize; VERSION_COUNT], + pub rtr_payloads_arc_strong_counts: [usize; VERSION_COUNT], +} + +impl RtrCache { + pub fn memory_stats(&self) -> CacheMemoryStats { + let snapshot_payload_counts = + std::array::from_fn(|idx| self.versions[idx].rtr_payloads.len()); + let delta_lengths = std::array::from_fn(|idx| self.versions[idx].deltas.len()); + let delta_payload_counts = std::array::from_fn(|idx| { + self.versions[idx] + .deltas + .iter() + .map(|delta| delta.announced().len() + delta.withdrawn().len()) + .sum() + }); + let snapshot_arc_strong_counts = + std::array::from_fn(|idx| Arc::strong_count(&self.versions[idx].snapshot)); + let rtr_payloads_arc_strong_counts = + std::array::from_fn(|idx| Arc::strong_count(&self.versions[idx].rtr_payloads)); + + CacheMemoryStats { + serials: self.serials(), + snapshot_payload_counts, + delta_lengths, + delta_payload_counts, + snapshot_arc_strong_counts, + rtr_payloads_arc_strong_counts, + } + } +} + #[derive(Clone)] pub(super) struct AppliedUpdate { pub(super) availability: CacheAvailability, diff --git a/src/rtr/cache/mod.rs b/src/rtr/cache/mod.rs index d83b99f..57a9793 100644 --- a/src/rtr/cache/mod.rs +++ b/src/rtr/cache/mod.rs @@ -3,7 +3,9 @@ mod model; mod ordering; mod store; -pub use core::{CacheAvailability, RtrCache, RtrCacheBuilder, SerialResult, SessionIds}; +pub use core::{ + CacheAvailability, CacheMemoryStats, RtrCache, RtrCacheBuilder, SerialResult, SessionIds, +}; pub use model::{Delta, DualTime, Snapshot}; pub use ordering::{ OrderingViolation, validate_payload_update_refs_for_rtr, validate_payload_updates_for_rtr, diff --git a/src/rtr/server/notifier.rs b/src/rtr/server/notifier.rs index b3bbfb4..9bab166 100644 --- a/src/rtr/server/notifier.rs +++ b/src/rtr/server/notifier.rs @@ -13,4 +13,8 @@ impl RtrNotifier { pub fn notify_cache_updated(&self) -> usize { self.tx.send(()).unwrap_or_default() } + + pub fn listener_count(&self) -> usize { + self.tx.receiver_count() + } }