内存优化

This commit is contained in:
xiuting.xu 2026-05-13 14:57:43 +08:00
parent 9dc69e5f31
commit ad76745619
3 changed files with 102 additions and 5 deletions

View File

@ -14,7 +14,9 @@ use rpki::rtr::payload::Timing;
use rpki::rtr::server::ssh::SshAuthMode; use rpki::rtr::server::ssh::SshAuthMode;
use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService};
use rpki::rtr::store::RtrStore; use rpki::rtr::store::RtrStore;
use rpki::source::pipeline::{PayloadLoadConfig, load_payloads_from_latest_sources}; use rpki::source::pipeline::{
PayloadLoadConfig, SourceFingerprint, latest_sources_fingerprint, load_payloads_from_latest_sources,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct AppConfig { struct AppConfig {
@ -424,11 +426,35 @@ fn spawn_refresh_task(
tokio::spawn(async move { tokio::spawn(async move {
let mut interval = tokio::time::interval(refresh_interval); let mut interval = tokio::time::interval(refresh_interval);
let mut last_fingerprint: Option<SourceFingerprint> = None;
loop { loop {
interval.tick().await; interval.tick().await;
let source_to_delta_started = Instant::now(); let source_to_delta_started = Instant::now();
let current_fingerprint = match latest_sources_fingerprint(&payload_load_config) {
Ok(fp) => fp,
Err(err) => {
warn!(
"failed to fingerprint CCR/SLURM sources from {}: {:?} (source_to_delta_elapsed_ms={})",
payload_load_config.ccr_dir,
err,
source_to_delta_started.elapsed().as_millis()
);
continue;
}
};
if last_fingerprint.as_ref() == Some(&current_fingerprint) {
info!(
"RTR source refresh skipped: source files unchanged (ccr_path={}, slurm_file_count={}, elapsed_ms={})",
current_fingerprint.ccr.path,
current_fingerprint.slurm_files.len(),
source_to_delta_started.elapsed().as_millis()
);
continue;
}
match load_payloads_from_latest_sources(&payload_load_config) { match load_payloads_from_latest_sources(&payload_load_config) {
Ok(payloads) => { Ok(payloads) => {
let (payload_count, updated) = { let (payload_count, updated) = {
@ -475,9 +501,13 @@ fn spawn_refresh_task(
); );
if updated { if updated {
notifier.notify_cache_updated(); let listener_count = notifier.notify_cache_updated();
info!("RTR cache updated, notify signal emitted (session may skip SerialNotify due to rate limit)"); info!(
"RTR cache updated, notify signal emitted to session listeners: listener_count={}",
listener_count
);
} }
last_fingerprint = Some(current_fingerprint);
} }
Err(err) => { Err(err) => {
warn!( warn!(

View File

@ -10,7 +10,7 @@ impl RtrNotifier {
Self { tx } Self { tx }
} }
pub fn notify_cache_updated(&self) { pub fn notify_cache_updated(&self) -> usize {
let _ = self.tx.send(()); self.tx.send(()).unwrap_or_default()
} }
} }

View File

@ -15,6 +15,19 @@ pub struct PayloadLoadConfig {
pub strict_ccr_validation: bool, pub strict_ccr_validation: bool,
} }
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SourceFingerprint {
pub ccr: FileFingerprint,
pub slurm_files: Vec<FileFingerprint>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FileFingerprint {
pub path: String,
pub len: u64,
pub modified_unix_secs: u64,
}
pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result<Vec<Payload>> { pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result<Vec<Payload>> {
let payloads = load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation)?; let payloads = load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation)?;
@ -24,6 +37,41 @@ pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result<V
} }
} }
pub fn latest_sources_fingerprint(config: &PayloadLoadConfig) -> Result<SourceFingerprint> {
let latest_ccr = find_latest_ccr_file(&config.ccr_dir)?;
let ccr = fingerprint_of_path(&latest_ccr)?;
let slurm_files = match config.slurm_dir.as_deref() {
Some(dir) => {
let mut paths = Vec::<PathBuf>::new();
for entry in std::fs::read_dir(dir)
.map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", dir, err))?
{
let entry = entry
.map_err(|err| anyhow!("failed to enumerate SLURM directory '{}': {}", dir, err))?;
let path = entry.path();
if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("slurm") {
paths.push(path);
}
}
paths.sort_by_key(|path| {
path.file_name()
.and_then(|name| name.to_str())
.map(|name| name.to_ascii_lowercase())
.unwrap_or_default()
});
let mut fps = Vec::with_capacity(paths.len());
for path in paths {
fps.push(fingerprint_of_path(path)?);
}
fps
}
None => Vec::new(),
};
Ok(SourceFingerprint { ccr, slurm_files })
}
fn load_payloads_from_latest_ccr( fn load_payloads_from_latest_ccr(
ccr_dir: &str, ccr_dir: &str,
strict_ccr_validation: bool, strict_ccr_validation: bool,
@ -145,3 +193,22 @@ fn read_slurm_files(slurm_dir: &str) -> Result<Vec<(String, SlurmFile)>> {
fn sample_messages(messages: &[String]) -> Vec<&str> { fn sample_messages(messages: &[String]) -> Vec<&str> {
messages.iter().take(3).map(String::as_str).collect() messages.iter().take(3).map(String::as_str).collect()
} }
fn fingerprint_of_path(path: impl Into<PathBuf>) -> Result<FileFingerprint> {
let path = path.into();
let meta = std::fs::metadata(&path)
.map_err(|err| anyhow!("failed to stat '{}': {}", path.display(), err))?;
let modified = meta
.modified()
.map_err(|err| anyhow!("failed to read mtime '{}': {}", path.display(), err))?;
let modified_unix_secs = modified
.duration_since(std::time::UNIX_EPOCH)
.map_err(|err| anyhow!("mtime before epoch '{}': {}", path.display(), err))?
.as_secs();
Ok(FileFingerprint {
path: path.to_string_lossy().to_string(),
len: meta.len(),
modified_unix_secs,
})
}