From ad76745619a0125b5ed25fac3cf0868c47fd19ef Mon Sep 17 00:00:00 2001 From: "xiuting.xu" Date: Wed, 13 May 2026 14:57:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=85=E5=AD=98=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main_rtr.rs | 36 ++++++++++++++++++-- src/rtr/server/notifier.rs | 4 +-- src/source/pipeline.rs | 67 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 5 deletions(-) diff --git a/src/main_rtr.rs b/src/main_rtr.rs index 07fc8ad..f6631af 100644 --- a/src/main_rtr.rs +++ b/src/main_rtr.rs @@ -14,7 +14,9 @@ use rpki::rtr::payload::Timing; use rpki::rtr::server::ssh::SshAuthMode; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; use rpki::rtr::store::RtrStore; -use rpki::source::pipeline::{PayloadLoadConfig, load_payloads_from_latest_sources}; +use rpki::source::pipeline::{ + PayloadLoadConfig, SourceFingerprint, latest_sources_fingerprint, load_payloads_from_latest_sources, +}; #[derive(Debug, Clone)] struct AppConfig { @@ -424,11 +426,35 @@ fn spawn_refresh_task( tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); + let mut last_fingerprint: Option = None; loop { interval.tick().await; let source_to_delta_started = Instant::now(); + let current_fingerprint = match latest_sources_fingerprint(&payload_load_config) { + Ok(fp) => fp, + Err(err) => { + warn!( + "failed to fingerprint CCR/SLURM sources from {}: {:?} (source_to_delta_elapsed_ms={})", + payload_load_config.ccr_dir, + err, + source_to_delta_started.elapsed().as_millis() + ); + continue; + } + }; + + if last_fingerprint.as_ref() == Some(¤t_fingerprint) { + info!( + "RTR source refresh skipped: source files unchanged (ccr_path={}, slurm_file_count={}, elapsed_ms={})", + current_fingerprint.ccr.path, + current_fingerprint.slurm_files.len(), + source_to_delta_started.elapsed().as_millis() + ); + continue; + } + match load_payloads_from_latest_sources(&payload_load_config) { Ok(payloads) => { let (payload_count, updated) = { @@ -475,9 +501,13 @@ fn spawn_refresh_task( ); if updated { - notifier.notify_cache_updated(); - info!("RTR cache updated, notify signal emitted (session may skip SerialNotify due to rate limit)"); + let listener_count = notifier.notify_cache_updated(); + info!( + "RTR cache updated, notify signal emitted to session listeners: listener_count={}", + listener_count + ); } + last_fingerprint = Some(current_fingerprint); } Err(err) => { warn!( diff --git a/src/rtr/server/notifier.rs b/src/rtr/server/notifier.rs index 075626a..b3bbfb4 100644 --- a/src/rtr/server/notifier.rs +++ b/src/rtr/server/notifier.rs @@ -10,7 +10,7 @@ impl RtrNotifier { Self { tx } } - pub fn notify_cache_updated(&self) { - let _ = self.tx.send(()); + pub fn notify_cache_updated(&self) -> usize { + self.tx.send(()).unwrap_or_default() } } diff --git a/src/source/pipeline.rs b/src/source/pipeline.rs index d043e07..e0e23b0 100644 --- a/src/source/pipeline.rs +++ b/src/source/pipeline.rs @@ -15,6 +15,19 @@ pub struct PayloadLoadConfig { pub strict_ccr_validation: bool, } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct SourceFingerprint { + pub ccr: FileFingerprint, + pub slurm_files: Vec, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct FileFingerprint { + pub path: String, + pub len: u64, + pub modified_unix_secs: u64, +} + pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result> { let payloads = load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation)?; @@ -24,6 +37,41 @@ pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result Result { + let latest_ccr = find_latest_ccr_file(&config.ccr_dir)?; + let ccr = fingerprint_of_path(&latest_ccr)?; + + let slurm_files = match config.slurm_dir.as_deref() { + Some(dir) => { + let mut paths = Vec::::new(); + for entry in std::fs::read_dir(dir) + .map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", dir, err))? + { + let entry = entry + .map_err(|err| anyhow!("failed to enumerate SLURM directory '{}': {}", dir, err))?; + let path = entry.path(); + if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("slurm") { + paths.push(path); + } + } + paths.sort_by_key(|path| { + path.file_name() + .and_then(|name| name.to_str()) + .map(|name| name.to_ascii_lowercase()) + .unwrap_or_default() + }); + let mut fps = Vec::with_capacity(paths.len()); + for path in paths { + fps.push(fingerprint_of_path(path)?); + } + fps + } + None => Vec::new(), + }; + + Ok(SourceFingerprint { ccr, slurm_files }) +} + fn load_payloads_from_latest_ccr( ccr_dir: &str, strict_ccr_validation: bool, @@ -145,3 +193,22 @@ fn read_slurm_files(slurm_dir: &str) -> Result> { fn sample_messages(messages: &[String]) -> Vec<&str> { messages.iter().take(3).map(String::as_str).collect() } + +fn fingerprint_of_path(path: impl Into) -> Result { + let path = path.into(); + let meta = std::fs::metadata(&path) + .map_err(|err| anyhow!("failed to stat '{}': {}", path.display(), err))?; + let modified = meta + .modified() + .map_err(|err| anyhow!("failed to read mtime '{}': {}", path.display(), err))?; + let modified_unix_secs = modified + .duration_since(std::time::UNIX_EPOCH) + .map_err(|err| anyhow!("mtime before epoch '{}': {}", path.display(), err))? + .as_secs(); + + Ok(FileFingerprint { + path: path.to_string_lossy().to_string(), + len: meta.len(), + modified_unix_secs, + }) +}