增加观测日志

This commit is contained in:
xiuting.xu 2026-05-13 16:49:10 +08:00
parent ad76745619
commit a11f2bc864
4 changed files with 86 additions and 2 deletions

View File

@ -1,4 +1,5 @@
use std::env;
use std::fs;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -427,9 +428,17 @@ fn spawn_refresh_task(
tokio::spawn(async move {
let mut interval = tokio::time::interval(refresh_interval);
let mut last_fingerprint: Option<SourceFingerprint> = None;
let mut stats_interval = tokio::time::interval(Duration::from_secs(60));
stats_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
tokio::select! {
_ = stats_interval.tick() => {
log_cache_memory_stats("periodic_observe", &shared_cache, &notifier);
continue;
}
_ = interval.tick() => {}
}
let source_to_delta_started = Instant::now();
let current_fingerprint = match latest_sources_fingerprint(&payload_load_config) {
@ -452,6 +461,7 @@ fn spawn_refresh_task(
current_fingerprint.slurm_files.len(),
source_to_delta_started.elapsed().as_millis()
);
log_cache_memory_stats("refresh_skipped_unchanged", &shared_cache, &notifier);
continue;
}
@ -507,6 +517,7 @@ fn spawn_refresh_task(
listener_count
);
}
log_cache_memory_stats("refresh_complete", &shared_cache, &notifier);
last_fingerprint = Some(current_fingerprint);
}
Err(err) => {
@ -522,6 +533,35 @@ fn spawn_refresh_task(
})
}
fn log_cache_memory_stats(phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier) {
let cache = shared_cache.load_full();
let stats = cache.memory_stats();
let rss_mib = current_rss_mib();
info!(
"RTR memory observe: phase={}, listener_count={}, serials={:?}, snapshot_payload_counts={:?}, delta_lengths={:?}, delta_payload_counts={:?}, snapshot_arc_strong_counts={:?}, rtr_payloads_arc_strong_counts={:?}, rss_mib={:?}",
phase,
notifier.listener_count(),
stats.serials,
stats.snapshot_payload_counts,
stats.delta_lengths,
stats.delta_payload_counts,
stats.snapshot_arc_strong_counts,
stats.rtr_payloads_arc_strong_counts,
rss_mib
);
}
fn current_rss_mib() -> Option<u64> {
let status = fs::read_to_string("/proc/self/status").ok()?;
let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?;
let kb = vmrss_line
.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())?;
Some(kb / 1024)
}
async fn wait_for_shutdown() -> Result<()> {
tokio::signal::ctrl_c().await?;
info!("shutdown signal received");

38
src/rtr/cache/core.rs vendored
View File

@ -651,6 +651,44 @@ pub enum SerialResult {
ResetRequired,
}
#[derive(Debug, Clone, Copy)]
pub struct CacheMemoryStats {
pub serials: [u32; VERSION_COUNT],
pub snapshot_payload_counts: [usize; VERSION_COUNT],
pub delta_lengths: [usize; VERSION_COUNT],
pub delta_payload_counts: [usize; VERSION_COUNT],
pub snapshot_arc_strong_counts: [usize; VERSION_COUNT],
pub rtr_payloads_arc_strong_counts: [usize; VERSION_COUNT],
}
impl RtrCache {
pub fn memory_stats(&self) -> CacheMemoryStats {
let snapshot_payload_counts =
std::array::from_fn(|idx| self.versions[idx].rtr_payloads.len());
let delta_lengths = std::array::from_fn(|idx| self.versions[idx].deltas.len());
let delta_payload_counts = std::array::from_fn(|idx| {
self.versions[idx]
.deltas
.iter()
.map(|delta| delta.announced().len() + delta.withdrawn().len())
.sum()
});
let snapshot_arc_strong_counts =
std::array::from_fn(|idx| Arc::strong_count(&self.versions[idx].snapshot));
let rtr_payloads_arc_strong_counts =
std::array::from_fn(|idx| Arc::strong_count(&self.versions[idx].rtr_payloads));
CacheMemoryStats {
serials: self.serials(),
snapshot_payload_counts,
delta_lengths,
delta_payload_counts,
snapshot_arc_strong_counts,
rtr_payloads_arc_strong_counts,
}
}
}
#[derive(Clone)]
pub(super) struct AppliedUpdate {
pub(super) availability: CacheAvailability,

View File

@ -3,7 +3,9 @@ mod model;
mod ordering;
mod store;
pub use core::{CacheAvailability, RtrCache, RtrCacheBuilder, SerialResult, SessionIds};
pub use core::{
CacheAvailability, CacheMemoryStats, RtrCache, RtrCacheBuilder, SerialResult, SessionIds,
};
pub use model::{Delta, DualTime, Snapshot};
pub use ordering::{
OrderingViolation, validate_payload_update_refs_for_rtr, validate_payload_updates_for_rtr,

View File

@ -13,4 +13,8 @@ impl RtrNotifier {
pub fn notify_cache_updated(&self) -> usize {
self.tx.send(()).unwrap_or_default()
}
pub fn listener_count(&self) -> usize {
self.tx.receiver_count()
}
}