diff --git a/deploy/server/.env b/deploy/server/.env index a3db73e..4c7e533 100644 --- a/deploy/server/.env +++ b/deploy/server/.env @@ -18,7 +18,8 @@ RPKI_RTR_REPORT_DIR=/app/report # Core runtime knobs. RPKI_RTR_STRICT_CCR_VALIDATION=false RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS=300 -RPKI_RTR_REPORT_INTERVAL_SECS=60 +RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS=300 +RPKI_RTR_REPORT_HISTORY_LIMIT=10 RPKI_RTR_TIMEZONE=Asia/Shanghai RPKI_RTR_MAX_DELTA=10 RPKI_RTR_MAX_CONNECTIONS=100000 diff --git a/deploy/server/DEPLOYMENT.md b/deploy/server/DEPLOYMENT.md index deedf6c..c025549 100644 --- a/deploy/server/DEPLOYMENT.md +++ b/deploy/server/DEPLOYMENT.md @@ -35,7 +35,7 @@ The container runs `rpki` directly as PID 1. ## Runtime Configuration via `.env` -- Core: `RPKI_RTR_STRICT_CCR_VALIDATION`, `RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS`, `RPKI_RTR_MAX_DELTA`, `RPKI_RTR_MAX_CONCURRENT_HANDSHAKES`, `RPKI_RTR_REPORT_INTERVAL_SECS`, `RPKI_RTR_TIMEZONE`, `RUST_LOG` +- Core: `RPKI_RTR_STRICT_CCR_VALIDATION`, `RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS`, `RPKI_RTR_MAX_DELTA`, `RPKI_RTR_MAX_CONCURRENT_HANDSHAKES`, `RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS`, `RPKI_RTR_REPORT_HISTORY_LIMIT`, `RPKI_RTR_TIMEZONE`, `RUST_LOG` - TCP mode: `RPKI_RTR_MAX_CONNECTIONS` - TLS mode: `RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH`, `RPKI_RTR_TLS_CERT_PATH`, `RPKI_RTR_TLS_KEY_PATH`, `RPKI_RTR_TLS_CLIENT_CA_PATH`, `RPKI_RTR_TLS_CERTS_HOST_DIR` - SSH mode: `RPKI_RTR_SSH_HOST_PORT`, `RPKI_RTR_SSH_CONTAINER_PORT`, `RPKI_RTR_SSH_AUTH_MODE`, `RPKI_RTR_SSH_USERNAME`, `RPKI_RTR_SSH_SUBSYSTEM_NAME`, `RPKI_RTR_SSH_HOST_KEY_PATH`, `RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH`, `RPKI_RTR_SSH_KEYS_VOLUME`, `RPKI_RTR_SSH_CERTS_HOST_DIR` @@ -60,19 +60,21 @@ docker compose -f deploy/server/docker-compose.yml logs -f rpki-rtr ## Runtime Report -The server creates `report/rtr-server.json` on startup and replaces it after -cache refreshes and at the configured interval. The default interval is 60 -seconds. The file contains service connection counts, process RSS, cache -timestamps, and per-protocol-version snapshot and delta counts. Schema version -2 also includes: +The server writes split JSON reports. Each report file uses a local-time +timestamp suffix and each category keeps `RPKI_RTR_REPORT_HISTORY_LIMIT` files, +defaulting to 10. -- CCR and SLURM source file metadata -- latest refresh status, duration, failure count, and error -- CCR validation and SLURM before/after payload counts -- service start time and uptime -- non-sensitive runtime configuration +- `rtr-source-*.json`: CCR and SLURM source metadata, latest refresh status, + data quality counts, cache snapshot counts, and delta counts. Written on + startup and source refresh events. +- `rtr-clients-*.json`: active client connection counts and counts by transport + (`tcp`, `tls`, `ssh`). Written on startup, whenever the active connection + count changes. +- `rtr-runtime-*.json`: service start time, uptime, process RSS, and + non-sensitive runtime configuration. Written on startup and every + `RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS`, defaulting to 300 seconds. -Timestamps in logs and `rtr-server.json` use `RPKI_RTR_TIMEZONE`, which +Timestamps in logs and report JSON files use `RPKI_RTR_TIMEZONE`, which defaults to `Asia/Shanghai`. Use IANA timezone names such as `Asia/Shanghai`, `Europe/London`, `America/New_York`, or `UTC`; `Shanghai` is accepted as a convenience alias for `Asia/Shanghai`. diff --git a/deploy/server/Dockerfile b/deploy/server/Dockerfile index 1f52afd..8f1b23b 100644 --- a/deploy/server/Dockerfile +++ b/deploy/server/Dockerfile @@ -72,7 +72,8 @@ ENV RPKI_RTR_ENABLE_TLS=false \ RPKI_RTR_CCR_DIR=/app/data \ RPKI_RTR_SLURM_DIR=/app/slurm \ RPKI_RTR_REPORT_DIR=/app/report \ - RPKI_RTR_REPORT_INTERVAL_SECS=60 \ + RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS=300 \ + RPKI_RTR_REPORT_HISTORY_LIMIT=10 \ RPKI_RTR_REFRESH_INTERVAL_SECS=300 \ RPKI_RTR_STRICT_CCR_VALIDATION=false diff --git a/deploy/server/docker-compose.ssh.yml b/deploy/server/docker-compose.ssh.yml index 721c6dc..989466b 100644 --- a/deploy/server/docker-compose.ssh.yml +++ b/deploy/server/docker-compose.ssh.yml @@ -28,7 +28,8 @@ services: RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" - RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS: "${RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS:-300}" + RPKI_RTR_REPORT_HISTORY_LIMIT: "${RPKI_RTR_REPORT_HISTORY_LIMIT:-10}" RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" diff --git a/deploy/server/docker-compose.tcp.yml b/deploy/server/docker-compose.tcp.yml index 869d7c5..a4dd888 100644 --- a/deploy/server/docker-compose.tcp.yml +++ b/deploy/server/docker-compose.tcp.yml @@ -18,7 +18,8 @@ services: RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" - RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS: "${RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS:-300}" + RPKI_RTR_REPORT_HISTORY_LIMIT: "${RPKI_RTR_REPORT_HISTORY_LIMIT:-10}" RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-60}" diff --git a/deploy/server/docker-compose.tls.yml b/deploy/server/docker-compose.tls.yml index 89f2421..36aa2e8 100644 --- a/deploy/server/docker-compose.tls.yml +++ b/deploy/server/docker-compose.tls.yml @@ -22,7 +22,8 @@ services: RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" - RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS: "${RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS:-300}" + RPKI_RTR_REPORT_HISTORY_LIMIT: "${RPKI_RTR_REPORT_HISTORY_LIMIT:-10}" RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" diff --git a/deploy/server/docker-compose.yml b/deploy/server/docker-compose.yml index d607c78..3502e08 100644 --- a/deploy/server/docker-compose.yml +++ b/deploy/server/docker-compose.yml @@ -21,7 +21,8 @@ services: RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" - RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS: "${RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS:-300}" + RPKI_RTR_REPORT_HISTORY_LIMIT: "${RPKI_RTR_REPORT_HISTORY_LIMIT:-10}" RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" diff --git a/src/main_rtr.rs b/src/main_rtr.rs index 53c27a0..ad05a7d 100644 --- a/src/main_rtr.rs +++ b/src/main_rtr.rs @@ -26,7 +26,8 @@ async fn main() -> Result<()> { let report_context = ReportContext::new(ReportConfiguration::new( config.source_refresh_interval.as_secs(), - config.report_interval.as_secs(), + config.runtime_report_interval.as_secs(), + config.report_history_limit, config.max_delta, config.prune_delta_by_snapshot_size, config.strict_ccr_validation, @@ -177,7 +178,7 @@ fn spawn_refresh_task( report_context: ReportContext, ) -> JoinHandle<()> { let refresh_interval = config.source_refresh_interval; - let report_interval = config.report_interval; + let runtime_report_interval = config.runtime_report_interval; let report_dir = PathBuf::from(&config.report_dir); let payload_load_config = PayloadLoadConfig { ccr_dir: config.ccr_dir.clone(), @@ -188,24 +189,50 @@ fn spawn_refresh_task( tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); let mut last_fingerprint: Option = None; - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "startup", &shared_cache, ¬ifier, &service_stats, ); - let mut stats_interval = tokio::time::interval_at( - tokio::time::Instant::now() + report_interval, - report_interval, + report_context.write_clients_or_warn( + &report_dir, + "startup", + &shared_cache, + ¬ifier, + &service_stats, ); - stats_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + report_context.write_runtime_or_warn( + &report_dir, + "startup", + &shared_cache, + ¬ifier, + &service_stats, + ); + let mut runtime_interval = tokio::time::interval_at( + tokio::time::Instant::now() + runtime_report_interval, + runtime_report_interval, + ); + runtime_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut client_change_rx = service_stats.subscribe_connection_changes(); loop { tokio::select! { - _ = stats_interval.tick() => { + changed = client_change_rx.changed() => { + match changed { + Ok(()) => { + report_context.write_clients_or_warn(&report_dir, "clients_changed", &shared_cache, ¬ifier, &service_stats); + } + Err(_) => { + warn!("RTR client connection change channel closed"); + } + } + continue; + } + _ = runtime_interval.tick() => { log_cache_memory_stats("periodic_observe", &shared_cache, ¬ifier); - report_context.write_or_warn(&report_dir, "periodic", &shared_cache, ¬ifier, &service_stats); + report_context.write_runtime_or_warn(&report_dir, "runtime_periodic", &shared_cache, ¬ifier, &service_stats); continue; } _ = interval.tick() => {} @@ -227,7 +254,7 @@ fn spawn_refresh_task( err, source_to_delta_started.elapsed().as_millis() ); - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "refresh_failed", &shared_cache, @@ -237,6 +264,7 @@ fn spawn_refresh_task( continue; } }; + report_context.record_source_fingerprint(current_fingerprint.clone()); if last_fingerprint.as_ref() == Some(¤t_fingerprint) { report_context.record_refresh_unchanged( @@ -250,7 +278,7 @@ fn spawn_refresh_task( source_to_delta_started.elapsed().as_millis() ); log_cache_memory_stats("refresh_skipped_unchanged", &shared_cache, ¬ifier); - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "refresh_skipped_unchanged", &shared_cache, @@ -301,7 +329,7 @@ fn spawn_refresh_task( &err, ); warn!("RTR cache update failed: {:?}", err); - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "refresh_failed", &shared_cache, @@ -336,7 +364,7 @@ fn spawn_refresh_task( ); } log_cache_memory_stats("refresh_complete", &shared_cache, ¬ifier); - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "refresh_complete", &shared_cache, @@ -357,7 +385,7 @@ fn spawn_refresh_task( err, source_to_delta_started.elapsed().as_millis() ); - report_context.write_or_warn( + report_context.write_source_or_warn( &report_dir, "refresh_failed", &shared_cache, diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index 5b6b549..5de14a4 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -654,7 +654,7 @@ pub enum SerialResult { ResetRequired, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize)] pub struct CacheMemoryStats { pub serials: [u32; VERSION_COUNT], pub snapshot_payload_counts: [usize; VERSION_COUNT], diff --git a/src/rtr/config.rs b/src/rtr/config.rs index d463b1e..b9182e9 100644 --- a/src/rtr/config.rs +++ b/src/rtr/config.rs @@ -36,7 +36,8 @@ pub struct AppConfig { pub strict_ccr_validation: bool, pub source_refresh_interval: Duration, pub report_dir: String, - pub report_interval: Duration, + pub runtime_report_interval: Duration, + pub report_history_limit: usize, pub timezone: Tz, pub timing: Timing, @@ -70,7 +71,8 @@ impl Default for AppConfig { strict_ccr_validation: false, source_refresh_interval: Duration::from_secs(300), report_dir: "./report".to_string(), - report_interval: Duration::from_secs(60), + runtime_report_interval: Duration::from_secs(300), + report_history_limit: 10, timezone: default_timezone(), timing: Timing::default(), @@ -188,9 +190,13 @@ impl AppConfig { if let Some(value) = env_var("RPKI_RTR_REPORT_DIR")? { config.report_dir = value; } - if let Some(value) = env_var("RPKI_RTR_REPORT_INTERVAL_SECS")? { - let secs = parse_positive_u64(&value, "RPKI_RTR_REPORT_INTERVAL_SECS")?; - config.report_interval = Duration::from_secs(secs); + if let Some(value) = env_var("RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS")? { + let secs = parse_positive_u64(&value, "RPKI_RTR_RUNTIME_REPORT_INTERVAL_SECS")?; + config.runtime_report_interval = Duration::from_secs(secs); + } + if let Some(value) = env_var("RPKI_RTR_REPORT_HISTORY_LIMIT")? { + config.report_history_limit = + parse_positive_usize(&value, "RPKI_RTR_REPORT_HISTORY_LIMIT")?; } if let Some(value) = env_var("RPKI_RTR_TIMEZONE")? { config.timezone = parse_timezone(&value, "RPKI_RTR_TIMEZONE")?; @@ -341,7 +347,11 @@ pub fn log_startup_config(config: &AppConfig) { config.source_refresh_interval.as_secs() ); info!("report_dir={}", config.report_dir); - info!("report_interval_secs={}", config.report_interval.as_secs()); + info!( + "runtime_report_interval_secs={}", + config.runtime_report_interval.as_secs() + ); + info!("report_history_limit={}", config.report_history_limit); info!("timezone={}", format_timezone(config.timezone)); info!("rtr_timing_refresh_secs={}", config.timing.refresh); info!("rtr_timing_retry_secs={}", config.timing.retry); @@ -411,6 +421,16 @@ fn parse_positive_u64(value: &str, name: &str) -> Result { Ok(parsed) } +fn parse_positive_usize(value: &str, name: &str) -> Result { + let parsed = value + .parse::() + .map_err(|err| anyhow!("invalid {} '{}': {}", name, value, err))?; + if parsed == 0 { + return Err(anyhow!("invalid {} '{}': must be >= 1", name, value)); + } + Ok(parsed) +} + fn parse_positive_u32(value: &str, name: &str) -> Result { let parsed = value .parse::() diff --git a/src/rtr/report.rs b/src/rtr/report.rs index ba2c026..dac021e 100644 --- a/src/rtr/report.rs +++ b/src/rtr/report.rs @@ -1,5 +1,5 @@ use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -9,15 +9,18 @@ use chrono_tz::Tz; use serde::Serialize; use tracing::warn; -use crate::rtr::cache::{CacheAvailability, SharedRtrCache, VersionReportStats}; +use crate::rtr::cache::{CacheAvailability, CacheMemoryStats, SharedRtrCache, VersionReportStats}; use crate::rtr::config::format_timezone; use crate::rtr::server::{RtrNotifier, RtrServiceStats, RtrTransportConnectionCounts}; -use crate::source::pipeline::{DataQualityReport, SourceLoadReport}; +use crate::source::pipeline::{ + DataQualityReport, FileFingerprint, SourceFingerprint, SourceLoadReport, +}; #[derive(Clone, Serialize)] pub struct ReportConfiguration { source_refresh_interval_seconds: u64, - report_interval_seconds: u64, + runtime_report_interval_seconds: u64, + report_history_limit: usize, max_delta: u8, prune_delta_by_snapshot_size: bool, strict_ccr_validation: bool, @@ -35,7 +38,8 @@ struct TimingReport { impl ReportConfiguration { pub fn new( source_refresh_interval_seconds: u64, - report_interval_seconds: u64, + runtime_report_interval_seconds: u64, + report_history_limit: usize, max_delta: u8, prune_delta_by_snapshot_size: bool, strict_ccr_validation: bool, @@ -44,7 +48,8 @@ impl ReportConfiguration { ) -> Self { Self { source_refresh_interval_seconds, - report_interval_seconds, + runtime_report_interval_seconds, + report_history_limit, max_delta, prune_delta_by_snapshot_size, strict_ccr_validation, @@ -76,6 +81,7 @@ pub struct ReportContext { #[derive(Debug, Clone, Default)] struct RuntimeReportState { source: Option, + source_fingerprint: Option, data_quality: Option, refresh: RefreshReport, } @@ -178,6 +184,14 @@ impl ReportContext { state.refresh.last_error = None; } + pub fn record_source_fingerprint(&self, fingerprint: SourceFingerprint) { + let mut state = self + .runtime + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.source_fingerprint = Some(fingerprint); + } + pub fn record_refresh_unchanged(&self, attempted_at: DateTime, duration_ms: u128) { let mut state = self .runtime @@ -227,6 +241,63 @@ impl ReportContext { } } + pub fn write_source_or_warn( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) { + if let Err(err) = + self.write_source(report_dir, phase, shared_cache, notifier, service_stats) + { + warn!( + "failed to write RTR source report to {}: {:?}", + report_dir.display(), + err + ); + } + } + + pub fn write_clients_or_warn( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) { + if let Err(err) = + self.write_clients(report_dir, phase, shared_cache, notifier, service_stats) + { + warn!( + "failed to write RTR clients report to {}: {:?}", + report_dir.display(), + err + ); + } + } + + pub fn write_runtime_or_warn( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) { + if let Err(err) = + self.write_runtime(report_dir, phase, shared_cache, notifier, service_stats) + { + warn!( + "failed to write RTR runtime report to {}: {:?}", + report_dir.display(), + err + ); + } + } + fn write( &self, report_dir: &Path, @@ -235,6 +306,64 @@ impl ReportContext { notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> Result<()> { + let parts = self.build_reports(phase, shared_cache, notifier, service_stats); + fs::create_dir_all(report_dir) + .with_context(|| format!("create report directory {}", report_dir.display()))?; + self.write_source_report(report_dir, &parts.suffix, &parts.source)?; + self.write_clients_report(report_dir, &parts.suffix, &parts.clients)?; + self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime)?; + Ok(()) + } + + fn write_source( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) -> Result<()> { + let parts = self.build_reports(phase, shared_cache, notifier, service_stats); + fs::create_dir_all(report_dir) + .with_context(|| format!("create report directory {}", report_dir.display()))?; + self.write_source_report(report_dir, &parts.suffix, &parts.source) + } + + fn write_clients( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) -> Result<()> { + let parts = self.build_reports(phase, shared_cache, notifier, service_stats); + fs::create_dir_all(report_dir) + .with_context(|| format!("create report directory {}", report_dir.display()))?; + self.write_clients_report(report_dir, &parts.suffix, &parts.clients) + } + + fn write_runtime( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) -> Result<()> { + let parts = self.build_reports(phase, shared_cache, notifier, service_stats); + fs::create_dir_all(report_dir) + .with_context(|| format!("create report directory {}", report_dir.display()))?; + self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime) + } + + fn build_reports( + &self, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) -> ReportParts { let cache = shared_cache.load_full(); let runtime = self .runtime @@ -248,63 +377,124 @@ impl ReportContext { let active_connections = service_stats.active_connections(); let connections_by_transport = service_stats.transport_connections(); let max_connections = service_stats.max_connections(); - let report = RtrReport { + let generated_at = self.report_now(); + let metadata = ReportMetadata { schema_version: 2, - generated_at: self.report_now(), + generated_at, phase: phase.to_string(), - service: ServiceReport { - started_at: self.to_report_time(self.started_at), - uptime_seconds: self.started_instant.elapsed().as_secs(), - active_connections, - connections_by_transport: TransportConnectionReport::from(connections_by_transport), - session_listeners: notifier.listener_count(), - max_connections, - connection_utilization: active_connections as f64 / max_connections as f64, - }, - process: ProcessReport { - rss_mib: current_rss_mib(), - }, - source: runtime - .source - .map(|source| SourceLoadReportView::from_report(source, self.timezone)), - refresh: RefreshReportView::from_report(runtime.refresh, self.timezone), - data_quality: runtime.data_quality, - configuration: self.configuration.clone(), - cache: CacheReport { - availability, - created_at: self.to_report_time(cache.created_at().utc()), - last_update_begin: self.to_report_time(cache.last_update_begin().utc()), - last_update_end: self.to_report_time(cache.last_update_end().utc()), - versions: cache.version_report_stats(), - }, + }; + let service = ServiceReport { + started_at: self.to_report_time(self.started_at), + uptime_seconds: self.started_instant.elapsed().as_secs(), + active_connections, + connections_by_transport: TransportConnectionReport::from(connections_by_transport), + session_listeners: notifier.listener_count(), + max_connections, + connection_utilization: active_connections as f64 / max_connections as f64, + }; + let process = ProcessReport { + rss_mib: current_rss_mib(), + }; + let source = runtime + .source + .map(|source| SourceLoadReportView::from_report(source, self.timezone)); + let refresh = RefreshReportView::from_report(runtime.refresh, self.timezone); + let cache = CacheReport { + availability, + created_at: self.to_report_time(cache.created_at().utc()), + last_update_begin: self.to_report_time(cache.last_update_begin().utc()), + last_update_end: self.to_report_time(cache.last_update_end().utc()), + memory: cache.memory_stats(), + versions: cache.version_report_stats(), }; - fs::create_dir_all(report_dir) - .with_context(|| format!("create report directory {}", report_dir.display()))?; - let target = report_dir.join("rtr-server.json"); - let temporary = report_dir.join(".rtr-server.json.tmp"); - let json = serde_json::to_vec_pretty(&report).context("serialize RTR report")?; - fs::write(&temporary, json) - .with_context(|| format!("write temporary report {}", temporary.display()))?; + let source_report = SourceReport { + schema_version: metadata.schema_version, + generated_at: metadata.generated_at, + phase: metadata.phase.clone(), + source, + source_fingerprint: runtime.source_fingerprint.map(|fingerprint| { + SourceFingerprintReport::from_fingerprint(fingerprint, self.timezone) + }), + refresh, + data_quality: runtime.data_quality, + cache, + }; + let clients_report = ClientsReport { + schema_version: metadata.schema_version, + generated_at: metadata.generated_at, + phase: metadata.phase.clone(), + service: ServiceReport { + started_at: service.started_at, + uptime_seconds: service.uptime_seconds, + active_connections: service.active_connections, + connections_by_transport: service.connections_by_transport, + session_listeners: service.session_listeners, + max_connections: service.max_connections, + connection_utilization: service.connection_utilization, + }, + }; + let runtime_report = RuntimeReport { + schema_version: metadata.schema_version, + generated_at: metadata.generated_at, + phase: metadata.phase, + service, + process, + configuration: self.configuration.clone(), + }; + let suffix = report_file_suffix(generated_at); - if let Err(err) = fs::rename(&temporary, &target) { - if target.exists() { - fs::remove_file(&target) - .with_context(|| format!("replace existing report {}", target.display()))?; - fs::rename(&temporary, &target) - .with_context(|| format!("move report into {}", target.display()))?; - } else { - return Err(err).with_context(|| { - format!( - "move temporary report {} into {}", - temporary.display(), - target.display() - ) - }); - } + ReportParts { + suffix, + source: source_report, + clients: clients_report, + runtime: runtime_report, } + } - Ok(()) + fn write_source_report( + &self, + report_dir: &Path, + suffix: &str, + report: &SourceReport, + ) -> Result<()> { + write_rolling_report( + report_dir, + "rtr-source", + suffix, + self.configuration.report_history_limit, + report, + ) + } + + fn write_clients_report( + &self, + report_dir: &Path, + suffix: &str, + report: &ClientsReport, + ) -> Result<()> { + write_rolling_report( + report_dir, + "rtr-clients", + suffix, + self.configuration.report_history_limit, + report, + ) + } + + fn write_runtime_report( + &self, + report_dir: &Path, + suffix: &str, + report: &RuntimeReport, + ) -> Result<()> { + write_rolling_report( + report_dir, + "rtr-runtime", + suffix, + self.configuration.report_history_limit, + report, + ) } fn report_now(&self) -> DateTime { @@ -316,21 +506,51 @@ impl ReportContext { } } +#[derive(Serialize, Clone)] +struct ReportMetadata { + schema_version: u16, + generated_at: DateTime, + phase: String, +} + +struct ReportParts { + suffix: String, + source: SourceReport, + clients: ClientsReport, + runtime: RuntimeReport, +} + #[derive(Serialize)] -struct RtrReport { +struct SourceReport { + schema_version: u16, + generated_at: DateTime, + phase: String, + source: Option, + source_fingerprint: Option, + refresh: RefreshReportView, + data_quality: Option, + cache: CacheReport, +} + +#[derive(Serialize)] +struct ClientsReport { + schema_version: u16, + generated_at: DateTime, + phase: String, + service: ServiceReport, +} + +#[derive(Serialize)] +struct RuntimeReport { schema_version: u16, generated_at: DateTime, phase: String, service: ServiceReport, process: ProcessReport, - source: Option, - refresh: RefreshReportView, - data_quality: Option, configuration: ReportConfiguration, - cache: CacheReport, } -#[derive(Serialize)] +#[derive(Clone, Copy, Serialize)] struct ServiceReport { started_at: DateTime, uptime_seconds: u64, @@ -353,6 +573,45 @@ struct SourceLoadReportView { slurm_version: Option, } +#[derive(Serialize)] +struct SourceFingerprintReport { + ccr: FileFingerprintReport, + slurm_files: Vec, +} + +impl SourceFingerprintReport { + fn from_fingerprint(fingerprint: SourceFingerprint, timezone: Tz) -> Self { + Self { + ccr: FileFingerprintReport::from_fingerprint(fingerprint.ccr, timezone), + slurm_files: fingerprint + .slurm_files + .into_iter() + .map(|fingerprint| FileFingerprintReport::from_fingerprint(fingerprint, timezone)) + .collect(), + } + } +} + +#[derive(Serialize)] +struct FileFingerprintReport { + path: String, + len: u64, + modified_unix_secs: u64, + modified_at: Option>, +} + +impl FileFingerprintReport { + fn from_fingerprint(fingerprint: FileFingerprint, timezone: Tz) -> Self { + Self { + path: fingerprint.path, + len: fingerprint.len, + modified_unix_secs: fingerprint.modified_unix_secs, + modified_at: DateTime::from_timestamp(fingerprint.modified_unix_secs as i64, 0) + .map(|time| to_report_time(time, timezone)), + } + } +} + impl SourceLoadReportView { fn from_report(report: SourceLoadReport, timezone: Tz) -> Self { Self { @@ -370,7 +629,7 @@ impl SourceLoadReportView { } } -#[derive(Serialize)] +#[derive(Clone, Copy, Serialize)] struct TransportConnectionReport { tcp: usize, tls: usize, @@ -398,6 +657,7 @@ struct CacheReport { created_at: DateTime, last_update_begin: DateTime, last_update_end: DateTime, + memory: CacheMemoryStats, versions: [VersionReportStats; 3], } @@ -415,6 +675,77 @@ pub fn current_rss_mib() -> Option { Some(kb / 1024) } +fn report_file_suffix(time: DateTime) -> String { + time.format("%Y%m%d%H%M%S%9f").to_string() +} + +fn write_rolling_report( + report_dir: &Path, + prefix: &str, + suffix: &str, + keep: usize, + report: &T, +) -> Result<()> { + let file_name = format!("{prefix}-{suffix}.json"); + let target = report_dir.join(&file_name); + let temporary = report_dir.join(format!(".{file_name}.tmp")); + let json = serde_json::to_vec_pretty(report) + .with_context(|| format!("serialize {prefix} RTR report"))?; + fs::write(&temporary, json) + .with_context(|| format!("write temporary report {}", temporary.display()))?; + replace_file(&temporary, &target)?; + prune_rolling_reports(report_dir, prefix, keep)?; + Ok(()) +} + +fn replace_file(temporary: &Path, target: &Path) -> Result<()> { + if let Err(err) = fs::rename(temporary, target) { + if target.exists() { + fs::remove_file(target) + .with_context(|| format!("replace existing report {}", target.display()))?; + fs::rename(temporary, target) + .with_context(|| format!("move report into {}", target.display()))?; + } else { + return Err(err).with_context(|| { + format!( + "move temporary report {} into {}", + temporary.display(), + target.display() + ) + }); + } + } + Ok(()) +} + +fn prune_rolling_reports(report_dir: &Path, prefix: &str, keep: usize) -> Result<()> { + let start = format!("{prefix}-"); + let mut files = Vec::::new(); + for entry in fs::read_dir(report_dir) + .with_context(|| format!("read report directory {}", report_dir.display()))? + { + let entry = + entry.with_context(|| format!("iterate report directory {}", report_dir.display()))?; + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + if name.starts_with(&start) && name.ends_with(".json") { + files.push(path); + } + } + files.sort(); + let remove_count = files.len().saturating_sub(keep); + for path in files.into_iter().take(remove_count) { + fs::remove_file(&path) + .with_context(|| format!("remove old rolling report {}", path.display()))?; + } + Ok(()) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -432,7 +763,8 @@ mod tests { fn test_configuration() -> ReportConfiguration { ReportConfiguration::new( 300, - 60, + 300, + 10, 100, false, false, @@ -460,29 +792,53 @@ mod tests { ) .unwrap(); - let report: Value = - serde_json::from_slice(&fs::read(report_dir.join("rtr-server.json")).unwrap()).unwrap(); - assert_eq!(report["schema_version"], 2); - assert_eq!(report["phase"], "test"); - assert_report_time_offset(&report["generated_at"]); - assert_report_time_offset(&report["service"]["started_at"]); - assert_report_time_offset(&report["cache"]["created_at"]); - assert_eq!(report["cache"]["availability"], "ready"); - assert_eq!(report["refresh"]["status"], "not_attempted"); - assert!(report["source"].is_null()); - assert!(report["data_quality"].is_null()); + let source_report = read_single_report(&report_dir, "rtr-source"); + assert_eq!(source_report["schema_version"], 2); + assert_eq!(source_report["phase"], "test"); + assert_report_time_offset(&source_report["generated_at"]); + assert_report_time_offset(&source_report["cache"]["created_at"]); + assert_eq!(source_report["cache"]["availability"], "ready"); + assert_eq!(source_report["refresh"]["status"], "not_attempted"); + assert!(source_report["source"].is_null()); + assert!(source_report["data_quality"].is_null()); + + let runtime_report = read_single_report(&report_dir, "rtr-runtime"); + assert_report_time_offset(&runtime_report["service"]["started_at"]); assert_eq!( - report["configuration"]["source_refresh_interval_seconds"], + runtime_report["configuration"]["source_refresh_interval_seconds"], 300 ); - assert_eq!(report["service"]["max_connections"], 1024); - assert_eq!(report["service"]["active_connections"], 0); - assert_eq!(report["service"]["connections_by_transport"]["tcp"], 0); - assert_eq!(report["service"]["connections_by_transport"]["tls"], 0); - assert_eq!(report["service"]["connections_by_transport"]["ssh"], 0); - assert_eq!(report["cache"]["versions"].as_array().unwrap().len(), 3); - assert_eq!(report["cache"]["versions"][2]["snapshot"]["total"], 0); - assert!(!report_dir.join(".rtr-server.json.tmp").exists()); + assert_eq!(runtime_report["configuration"]["report_history_limit"], 10); + + let clients_report = read_single_report(&report_dir, "rtr-clients"); + assert_eq!(clients_report["service"]["max_connections"], 1024); + assert_eq!(clients_report["service"]["active_connections"], 0); + assert_eq!( + clients_report["service"]["connections_by_transport"]["tcp"], + 0 + ); + assert_eq!( + clients_report["service"]["connections_by_transport"]["tls"], + 0 + ); + assert_eq!( + clients_report["service"]["connections_by_transport"]["ssh"], + 0 + ); + assert_eq!( + source_report["cache"]["versions"].as_array().unwrap().len(), + 3 + ); + assert_eq!( + source_report["cache"]["versions"][2]["snapshot"]["total"], + 0 + ); + assert_eq!( + source_report["cache"]["memory"]["delta_payload_counts"][2], + 0 + ); + assert!(source_report["source_fingerprint"].is_null()); + assert_no_temporary_reports(&report_dir); } #[test] @@ -530,6 +886,18 @@ mod tests { }, slurm_assertions: SlurmRuleCounts::default(), }; + context.record_source_fingerprint(SourceFingerprint { + ccr: FileFingerprint { + path: "data/example.ccr".to_string(), + len: 123, + modified_unix_secs: 1_781_404_800, + }, + slurm_files: vec![FileFingerprint { + path: "policy.slurm".to_string(), + len: 42, + modified_unix_secs: 1_781_408_400, + }], + }); context.record_refresh_success(Utc::now(), 12, true, source, quality); context.record_refresh_failure(Utc::now(), 5, &anyhow::anyhow!("source unavailable")); @@ -543,9 +911,18 @@ mod tests { ) .unwrap(); - let report: Value = - serde_json::from_slice(&fs::read(report_dir.join("rtr-server.json")).unwrap()).unwrap(); + let report = read_single_report(&report_dir, "rtr-source"); assert_eq!(report["source"]["ccr_file"], "data/example.ccr"); + assert_eq!( + report["source_fingerprint"]["ccr"]["path"], + "data/example.ccr" + ); + assert_eq!(report["source_fingerprint"]["ccr"]["len"], 123); + assert_report_time_offset(&report["source_fingerprint"]["ccr"]["modified_at"]); + assert_eq!( + report["source_fingerprint"]["slurm_files"][0]["path"], + "policy.slurm" + ); assert_report_time_offset(&report["source"]["ccr_modified_at"]); assert_eq!(report["data_quality"]["after_slurm"]["total"], 10); assert_eq!(report["refresh"]["status"], "failed"); @@ -555,6 +932,84 @@ mod tests { assert_report_time_offset(&report["refresh"]["last_success_at"]); } + #[test] + fn rolling_reports_keep_latest_files_per_category() { + let temp = tempfile::tempdir().unwrap(); + let report_dir = temp.path().join("report"); + let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); + let service = RtrService::new(shared_cache.clone()); + let notifier = service.notifier(); + let mut configuration = test_configuration(); + configuration.report_history_limit = 2; + let context = ReportContext::new(configuration); + + for phase in ["one", "two", "three"] { + context + .write( + &report_dir, + phase, + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + } + + assert_eq!(report_files(&report_dir, "rtr-source").len(), 2); + assert_eq!(report_files(&report_dir, "rtr-clients").len(), 2); + assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 2); + assert_no_temporary_reports(&report_dir); + } + + #[test] + fn category_writes_only_create_requested_report_type() { + let temp = tempfile::tempdir().unwrap(); + let report_dir = temp.path().join("report"); + let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); + let service = RtrService::new(shared_cache.clone()); + let notifier = service.notifier(); + let context = ReportContext::new(test_configuration()); + + context + .write_source( + &report_dir, + "source_only", + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); + assert_eq!(report_files(&report_dir, "rtr-clients").len(), 0); + assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0); + + context + .write_clients( + &report_dir, + "clients_only", + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); + assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1); + assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0); + + context + .write_runtime( + &report_dir, + "runtime_only", + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); + assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1); + assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 1); + } + fn assert_report_time_offset(value: &Value) { let value = value.as_str().expect("report time should be a string"); assert!( @@ -562,4 +1017,36 @@ mod tests { "report time should use +08:00 offset, got {value}" ); } + + fn read_single_report(report_dir: &Path, prefix: &str) -> Value { + let files = report_files(report_dir, prefix); + assert_eq!(files.len(), 1, "expected one {prefix} report"); + serde_json::from_slice(&fs::read(&files[0]).unwrap()).unwrap() + } + + fn report_files(report_dir: &Path, prefix: &str) -> Vec { + let start = format!("{prefix}-"); + let mut files = fs::read_dir(report_dir) + .unwrap() + .map(|entry| entry.unwrap().path()) + .filter(|path| { + path.file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.starts_with(&start) && name.ends_with(".json")) + }) + .collect::>(); + files.sort(); + files + } + + fn assert_no_temporary_reports(report_dir: &Path) { + let has_temporary = fs::read_dir(report_dir).unwrap().any(|entry| { + entry + .unwrap() + .file_name() + .to_str() + .is_some_and(|name| name.ends_with(".tmp")) + }); + assert!(!has_temporary); + } } diff --git a/src/rtr/server/connection.rs b/src/rtr/server/connection.rs index 8673e18..55effbf 100644 --- a/src/rtr/server/connection.rs +++ b/src/rtr/server/connection.rs @@ -1,7 +1,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::{ Arc, - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, }; use anyhow::{Context, Result, anyhow}; @@ -66,6 +66,8 @@ impl RtrTransportConnectionCounters { pub struct ConnectionGuard { active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, + connection_change_seq: Arc, transport: RtrTransportKind, _permit: OwnedSemaphorePermit, } @@ -74,14 +76,19 @@ impl ConnectionGuard { pub fn new( active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, + connection_change_seq: Arc, transport: RtrTransportKind, permit: OwnedSemaphorePermit, ) -> Self { active_connections.fetch_add(1, Ordering::Relaxed); transport_connections.increment(transport); + notify_connection_change(&connection_change_tx, &connection_change_seq); Self { active_connections, transport_connections, + connection_change_tx, + connection_change_seq, transport, _permit: permit, } @@ -96,9 +103,15 @@ impl Drop for ConnectionGuard { fn drop(&mut self) { self.active_connections.fetch_sub(1, Ordering::Relaxed); self.transport_connections.decrement(self.transport); + notify_connection_change(&self.connection_change_tx, &self.connection_change_seq); } } +fn notify_connection_change(tx: &watch::Sender, seq: &AtomicU64) { + let next = seq.fetch_add(1, Ordering::Relaxed).wrapping_add(1); + let _ = tx.send_replace(next); +} + pub async fn handle_tcp_connection( cache: SharedRtrCache, stream: TcpStream, diff --git a/src/rtr/server/listener.rs b/src/rtr/server/listener.rs index c6e1964..b3d4209 100644 --- a/src/rtr/server/listener.rs +++ b/src/rtr/server/listener.rs @@ -3,7 +3,10 @@ use std::future::Future; use std::net::SocketAddr; use std::path::Path; use std::pin::Pin; -use std::sync::{Arc, atomic::AtomicUsize}; +use std::sync::{ + Arc, + atomic::{AtomicU64, AtomicUsize}, +}; use std::time::Duration; use anyhow::{Context, Result, anyhow}; @@ -199,6 +202,8 @@ pub struct RtrServer { handshake_limiter: Arc, active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, + connection_change_seq: Arc, config: RtrServiceConfig, } @@ -212,6 +217,8 @@ impl RtrServer { handshake_limiter: Arc, active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, + connection_change_seq: Arc, config: RtrServiceConfig, ) -> Self { Self { @@ -223,6 +230,8 @@ impl RtrServer { handshake_limiter, active_connections, transport_connections, + connection_change_tx, + connection_change_seq, config, } } @@ -373,6 +382,8 @@ impl RtrServer { let shutdown_tx = self.shutdown_tx.clone(); let active_connections = self.active_connections.clone(); let transport_connections = self.transport_connections.clone(); + let connection_change_tx = self.connection_change_tx.clone(); + let connection_change_seq = self.connection_change_seq.clone(); let transport_instance = transport.clone(); let transport_kind = transport_instance.kind(); @@ -387,6 +398,8 @@ impl RtrServer { let guard = ConnectionGuard::new( active_connections, transport_connections, + connection_change_tx, + connection_change_seq, transport_kind, permit, ); diff --git a/src/rtr/server/service.rs b/src/rtr/server/service.rs index e43192e..734afce 100644 --- a/src/rtr/server/service.rs +++ b/src/rtr/server/service.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use std::path::Path; use std::sync::{ Arc, - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, }; use tokio::sync::{Semaphore, broadcast, watch}; @@ -26,6 +26,8 @@ pub struct RtrService { handshake_limiter: Arc, active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, + connection_change_seq: Arc, config: RtrServiceConfig, } @@ -33,6 +35,7 @@ pub struct RtrService { pub struct RtrServiceStats { active_connections: Arc, transport_connections: Arc, + connection_change_tx: watch::Sender, max_connections: usize, } @@ -48,6 +51,10 @@ impl RtrServiceStats { pub fn max_connections(&self) -> usize { self.max_connections } + + pub fn subscribe_connection_changes(&self) -> watch::Receiver { + self.connection_change_tx.subscribe() + } } impl RtrService { @@ -58,6 +65,7 @@ impl RtrService { pub fn with_config(cache: SharedRtrCache, config: RtrServiceConfig) -> Self { let (notify_tx, _) = broadcast::channel(config.notify_queue_size); let (shutdown_tx, _) = watch::channel(false); + let (connection_change_tx, _) = watch::channel(0); Self { cache, @@ -67,6 +75,8 @@ impl RtrService { handshake_limiter: Arc::new(Semaphore::new(config.max_concurrent_handshakes)), active_connections: Arc::new(AtomicUsize::new(0)), transport_connections: Arc::new(RtrTransportConnectionCounters::default()), + connection_change_tx, + connection_change_seq: Arc::new(AtomicU64::new(0)), config, } } @@ -99,6 +109,7 @@ impl RtrService { RtrServiceStats { active_connections: self.active_connections.clone(), transport_connections: self.transport_connections.clone(), + connection_change_tx: self.connection_change_tx.clone(), max_connections: self.config.max_connections, } } @@ -113,6 +124,8 @@ impl RtrService { self.handshake_limiter.clone(), self.active_connections.clone(), self.transport_connections.clone(), + self.connection_change_tx.clone(), + self.connection_change_seq.clone(), self.config.clone(), ) } @@ -127,6 +140,8 @@ impl RtrService { self.handshake_limiter.clone(), self.active_connections.clone(), self.transport_connections.clone(), + self.connection_change_tx.clone(), + self.connection_change_seq.clone(), self.config.clone(), ) } @@ -141,6 +156,8 @@ impl RtrService { self.handshake_limiter.clone(), self.active_connections.clone(), self.transport_connections.clone(), + self.connection_change_tx.clone(), + self.connection_change_seq.clone(), self.config.clone(), ) } diff --git a/src/rtr/store.rs b/src/rtr/store.rs index d9e8819..75cef81 100644 --- a/src/rtr/store.rs +++ b/src/rtr/store.rs @@ -1,5 +1,5 @@ -use anyhow::{Result, anyhow}; -use rocksdb::{ColumnFamilyDescriptor, DB, IteratorMode, Options, WriteBatch}; +use anyhow::{anyhow, Result}; +use rocksdb::{ColumnFamilyDescriptor, IteratorMode, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use std::borrow::Borrow; use std::path::Path; @@ -18,6 +18,7 @@ const META_SERIAL_PREFIX: &str = "serial_v"; const META_DELTA_MIN_PREFIX: &str = "delta_min_v"; const META_DELTA_MAX_PREFIX: &str = "delta_max_v"; const SNAPSHOT_CURRENT_PREFIX: &str = "current_v"; +const CANONICAL_SNAPSHOT_VERSION: u8 = 2; const DELTA_KEY_V2_PREFIX: u8 = b'D'; @@ -123,7 +124,18 @@ impl RtrStore { pub fn get_snapshot_for_version(&self, version: u8) -> Result> { let key = snapshot_key(version); - self.get_cf(CF_SNAPSHOT, &key) + if let Some(snapshot) = self.get_cf(CF_SNAPSHOT, &key)? { + return Ok(Some(snapshot)); + } + + if version == CANONICAL_SNAPSHOT_VERSION { + return Ok(None); + } + + let canonical_key = snapshot_key(CANONICAL_SNAPSHOT_VERSION); + Ok(self + .get_cf::(CF_SNAPSHOT, &canonical_key)? + .map(|snapshot| snapshot.project_for_version(version))) } pub fn get_delta_for_version(&self, version: u8, serial: u32) -> Result> { @@ -242,14 +254,17 @@ impl RtrStore { META_AVAILABILITY, serde_json::to_vec(&availability)?, ); + batch.put_cf( + snapshot_cf, + snapshot_key(CANONICAL_SNAPSHOT_VERSION), + serde_json::to_vec(snapshots[usize::from(CANONICAL_SNAPSHOT_VERSION)].borrow())?, + ); + for legacy_version in 0u8..CANONICAL_SNAPSHOT_VERSION { + batch.delete_cf(snapshot_cf, snapshot_key(legacy_version)); + } for version in 0u8..=2 { let idx = version as usize; - batch.put_cf( - snapshot_cf, - snapshot_key(version), - serde_json::to_vec(snapshots[idx].borrow())?, - ); batch.put_cf( meta_cf, meta_key(META_SESSION_ID_PREFIX, version), @@ -304,9 +319,9 @@ impl RtrStore { ); } } else { - for key in self - .list_delta_keys_outside_window_for_version(version, min_serial, max_serial)? - { + for key in self.list_delta_keys_outside_window_for_version( + version, min_serial, max_serial, + )? { batch.delete_cf(delta_cf, key); } } @@ -382,3 +397,82 @@ fn validate_delta_window(deltas: &[Delta], min_serial: u32, max_serial: u32) -> Ok(()) } + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use crate::data_model::resources::as_resources::Asn; + use crate::data_model::resources::ip_resources::{IPAddress, IPAddressPrefix}; + use crate::rtr::cache::Snapshot; + use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey, Ski}; + + use super::*; + + fn mixed_snapshot() -> Snapshot { + Snapshot::from_payloads(vec![ + Payload::RouteOrigin(RouteOrigin::new( + IPAddressPrefix::new(IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)), 24), + 24, + Asn::from(64496), + )), + Payload::RouterKey(RouterKey::new( + Ski::default(), + Asn::from(64497), + vec![1, 2, 3], + )), + Payload::Aspa(Aspa::new(Asn::from(64498), vec![Asn::from(64499)])), + ]) + } + + #[test] + fn save_cache_state_persists_single_canonical_snapshot() { + let temp = tempfile::tempdir().unwrap(); + let store = RtrStore::open(temp.path()).unwrap(); + let source = mixed_snapshot(); + let snapshots = std::array::from_fn(|version| source.project_for_version(version as u8)); + let deltas: [Option<&Delta>; 3] = [None, None, None]; + let windows: [Option<(u32, u32)>; 3] = [None, None, None]; + let clear = [true, true, true]; + + store + .save_cache_state_versioned( + CacheAvailability::Ready, + &snapshots, + &[1, 2, 3], + &[10, 11, 12], + &deltas, + &windows, + &clear, + ) + .unwrap(); + + assert!(store + .get_cf::(CF_SNAPSHOT, &snapshot_key(0)) + .unwrap() + .is_none()); + assert!(store + .get_cf::(CF_SNAPSHOT, &snapshot_key(1)) + .unwrap() + .is_none()); + assert!(store + .get_cf::(CF_SNAPSHOT, &snapshot_key(2)) + .unwrap() + .is_some()); + + let restored_v0 = store.get_snapshot_for_version(0).unwrap().unwrap(); + assert_eq!(restored_v0.origins().len(), 1); + assert_eq!(restored_v0.router_keys().len(), 0); + assert_eq!(restored_v0.aspas().len(), 0); + + let restored_v1 = store.get_snapshot_for_version(1).unwrap().unwrap(); + assert_eq!(restored_v1.origins().len(), 1); + assert_eq!(restored_v1.router_keys().len(), 1); + assert_eq!(restored_v1.aspas().len(), 0); + + let restored_v2 = store.get_snapshot_for_version(2).unwrap().unwrap(); + assert_eq!(restored_v2.origins().len(), 1); + assert_eq!(restored_v2.router_keys().len(), 1); + assert_eq!(restored_v2.aspas().len(), 1); + } +} diff --git a/tests/common/test_helper.rs b/tests/common/test_helper.rs index de1467e..7b8d136 100644 --- a/tests/common/test_helper.rs +++ b/tests/common/test_helper.rs @@ -1,7 +1,7 @@ use std::fmt::Write; use std::net::{Ipv4Addr, Ipv6Addr}; -use serde_json::{Value, json}; +use serde_json::{json, Value}; use rpki::data_model::resources::ip_resources::{IPAddress, IPAddressPrefix}; use rpki::rtr::cache::SerialResult; diff --git a/tests/test_store_db.rs b/tests/test_store_db.rs index 348f82e..d3601f4 100644 --- a/tests/test_store_db.rs +++ b/tests/test_store_db.rs @@ -13,25 +13,18 @@ fn store_db_versioned_state_persists_and_restores_all_versions() { let dir = tempfile::tempdir().unwrap(); let store = RtrStore::open(dir.path()).unwrap(); - let snapshots = [ - Snapshot::from_payloads(vec![Payload::RouteOrigin(v4_origin( - 192, 0, 2, 0, 24, 24, 64496, - ))]), - Snapshot::from_payloads(vec![ - Payload::RouteOrigin(v4_origin(192, 0, 2, 0, 24, 24, 64496)), - Payload::RouteOrigin(v4_origin(198, 51, 100, 0, 24, 24, 64497)), - ]), - Snapshot::from_payloads(vec![ - Payload::RouteOrigin(v4_origin(192, 0, 2, 0, 24, 24, 64496)), - Payload::RouteOrigin(v4_origin(198, 51, 100, 0, 24, 24, 64497)), - Payload::RouteOrigin(v6_origin( - Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0), - 32, - 48, - 64498, - )), - ]), - ]; + let source_snapshot = Snapshot::from_payloads(vec![ + Payload::RouteOrigin(v4_origin(192, 0, 2, 0, 24, 24, 64496)), + Payload::RouteOrigin(v4_origin(198, 51, 100, 0, 24, 24, 64497)), + Payload::RouteOrigin(v6_origin( + Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0), + 32, + 48, + 64498, + )), + ]); + let snapshots = + std::array::from_fn(|version| source_snapshot.project_for_version(version as u8)); let session_ids = [410u16, 411u16, 412u16]; let serials = [100u32, 200u32, 300u32]; @@ -63,7 +56,10 @@ fn store_db_versioned_state_persists_and_restores_all_versions() { ) .unwrap(); - assert_eq!(store.get_availability().unwrap(), Some(CacheAvailability::Ready)); + assert_eq!( + store.get_availability().unwrap(), + Some(CacheAvailability::Ready) + ); for version in 0u8..=2 { let idx = version as usize; @@ -85,16 +81,28 @@ fn store_db_versioned_state_persists_and_restores_all_versions() { assert_eq!(loaded_serial, serials[idx]); } - assert_eq!(store.get_delta_window_for_version(0).unwrap(), Some((100, 100))); - assert_eq!(store.get_delta_window_for_version(1).unwrap(), None); - assert_eq!(store.get_delta_window_for_version(2).unwrap(), Some((300, 300))); assert_eq!( - store.get_delta_for_version(0, 100).unwrap().map(|d| d.serial()), + store.get_delta_window_for_version(0).unwrap(), + Some((100, 100)) + ); + assert_eq!(store.get_delta_window_for_version(1).unwrap(), None); + assert_eq!( + store.get_delta_window_for_version(2).unwrap(), + Some((300, 300)) + ); + assert_eq!( + store + .get_delta_for_version(0, 100) + .unwrap() + .map(|d| d.serial()), Some(100) ); assert!(store.get_delta_for_version(1, 200).unwrap().is_none()); assert_eq!( - store.get_delta_for_version(2, 300).unwrap().map(|d| d.serial()), + store + .get_delta_for_version(2, 300) + .unwrap() + .map(|d| d.serial()), Some(300) ); } @@ -114,12 +122,16 @@ fn store_db_versioned_delta_window_wraparound_is_isolated_by_version() { ); let d_zero = Delta::new( 0, - vec![Payload::RouteOrigin(v4_origin(198, 51, 100, 0, 24, 24, 64497))], + vec![Payload::RouteOrigin(v4_origin( + 198, 51, 100, 0, 24, 24, 64497, + ))], vec![], ); let d_one = Delta::new( 1, - vec![Payload::RouteOrigin(v4_origin(203, 0, 113, 0, 24, 24, 64498))], + vec![Payload::RouteOrigin(v4_origin( + 203, 0, 113, 0, 24, 24, 64498, + ))], vec![], ); let d_v1_only = Delta::new( @@ -180,7 +192,10 @@ fn store_db_versioned_delta_window_wraparound_is_isolated_by_version() { ); let v1_loaded = store.load_delta_window_for_version(1, 0, 0).unwrap(); - assert_eq!(v1_loaded.iter().map(Delta::serial).collect::>(), vec![0]); + assert_eq!( + v1_loaded.iter().map(Delta::serial).collect::>(), + vec![0] + ); assert_eq!(v1_loaded[0].announced().len(), 1); } @@ -209,7 +224,9 @@ fn store_db_versioned_clear_window_affects_only_target_version() { ); let d2 = Delta::new( 30, - vec![Payload::RouteOrigin(v4_origin(203, 0, 113, 0, 24, 24, 64498))], + vec![Payload::RouteOrigin(v4_origin( + 203, 0, 113, 0, 24, 24, 64498, + ))], vec![], ); @@ -240,9 +257,15 @@ fn store_db_versioned_clear_window_affects_only_target_version() { assert_eq!(store.get_delta_window_for_version(0).unwrap(), None); assert!(store.get_delta_for_version(0, 10).unwrap().is_none()); - assert_eq!(store.get_delta_window_for_version(2).unwrap(), Some((30, 30))); assert_eq!( - store.get_delta_for_version(2, 30).unwrap().map(|d| d.serial()), + store.get_delta_window_for_version(2).unwrap(), + Some((30, 30)) + ); + assert_eq!( + store + .get_delta_for_version(2, 30) + .unwrap() + .map(|d| d.serial()), Some(30) ); } @@ -312,7 +335,10 @@ fn store_db_versioned_prunes_outside_window() { assert!(store.get_delta_for_version(0, 100).unwrap().is_none()); assert!(store.get_delta_for_version(0, 101).unwrap().is_none()); assert_eq!( - store.get_delta_for_version(0, 102).unwrap().map(|d| d.serial()), + store + .get_delta_for_version(0, 102) + .unwrap() + .map(|d| d.serial()), Some(102) ); } @@ -327,7 +353,9 @@ fn store_db_versioned_load_delta_window_requires_complete_range() { let d11 = Delta::new( 11, - vec![Payload::RouteOrigin(v4_origin(198, 51, 100, 0, 24, 24, 64497))], + vec![Payload::RouteOrigin(v4_origin( + 198, 51, 100, 0, 24, 24, 64497, + ))], vec![], ); store