use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::Instant; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use chrono_tz::Tz; use serde::Serialize; use tracing::warn; use crate::rtr::cache::{CacheAvailability, CacheMemoryStats, SharedRtrCache, VersionReportStats}; use crate::rtr::config::{RuntimeConfig, format_timezone}; use crate::rtr::server::{RtrNotifier, RtrServiceStats, RtrTransportConnectionCounts}; use crate::source::pipeline::{ DataQualityReport, FileFingerprint, SourceFingerprint, SourceLoadReport, }; #[derive(Clone, Serialize)] pub struct ReportConfiguration { source_refresh_interval_seconds: u64, runtime_report_interval_seconds: u64, report_history_limit: usize, max_delta: u8, prune_delta_by_snapshot_size: bool, strict_ccr_validation: bool, timezone: String, timing: TimingReport, } #[derive(Clone, Copy, Serialize)] struct TimingReport { refresh: u32, retry: u32, expire: u32, } impl ReportConfiguration { pub fn new( source_refresh_interval_seconds: u64, runtime_report_interval_seconds: u64, report_history_limit: usize, max_delta: u8, prune_delta_by_snapshot_size: bool, strict_ccr_validation: bool, timezone: Tz, timing: (u32, u32, u32), ) -> Self { Self { source_refresh_interval_seconds, runtime_report_interval_seconds, report_history_limit, max_delta, prune_delta_by_snapshot_size, strict_ccr_validation, timezone: format_timezone(timezone), timing: TimingReport { refresh: timing.0, retry: timing.1, expire: timing.2, }, } } pub fn timezone(&self) -> Tz { self.timezone .parse::() .expect("serialized timezone should be a valid IANA timezone") } } #[derive(Clone)] pub struct ReportContext { started_at: DateTime, started_instant: Instant, configuration: Arc>, runtime: Arc>, } #[derive(Debug, Clone, Default)] struct RuntimeReportState { source: Option, source_fingerprint: Option, data_quality: Option, refresh: RefreshReport, } #[derive(Debug, Clone, Serialize)] struct RefreshReport { last_attempt_at: Option>, last_success_at: Option>, last_changed_at: Option>, status: &'static str, changed: Option, duration_ms: Option, consecutive_failures: u64, last_error: Option, } #[derive(Serialize)] struct RefreshReportView { last_attempt_at: Option>, last_success_at: Option>, last_changed_at: Option>, status: &'static str, changed: Option, duration_ms: Option, consecutive_failures: u64, last_error: Option, } impl RefreshReportView { fn from_report(report: RefreshReport, timezone: Tz) -> Self { Self { last_attempt_at: report .last_attempt_at .map(|time| to_report_time(time, timezone)), last_success_at: report .last_success_at .map(|time| to_report_time(time, timezone)), last_changed_at: report .last_changed_at .map(|time| to_report_time(time, timezone)), status: report.status, changed: report.changed, duration_ms: report.duration_ms, consecutive_failures: report.consecutive_failures, last_error: report.last_error, } } } impl Default for RefreshReport { fn default() -> Self { Self { last_attempt_at: None, last_success_at: None, last_changed_at: None, status: "not_attempted", changed: None, duration_ms: None, consecutive_failures: 0, last_error: None, } } } impl ReportContext { pub fn new(configuration: ReportConfiguration) -> Self { Self { started_at: Utc::now(), started_instant: Instant::now(), configuration: Arc::new(RwLock::new(configuration)), runtime: Arc::new(RwLock::new(RuntimeReportState::default())), } } pub fn update_runtime_config(&self, config: &RuntimeConfig) { let timezone = config .timezone() .expect("runtime config timezone should be validated before report update"); let mut configuration = self .configuration .write() .unwrap_or_else(|poisoned| poisoned.into_inner()); *configuration = ReportConfiguration::new( config.source_refresh_interval_seconds, config.runtime_report_interval_seconds, config.report_history_limit, config.max_delta, config.prune_delta_by_snapshot_size, config.strict_ccr_validation, timezone, ( config.timing.refresh, config.timing.retry, config.timing.expire, ), ); } pub fn record_refresh_success( &self, attempted_at: DateTime, duration_ms: u128, changed: bool, source: SourceLoadReport, data_quality: DataQualityReport, ) { let mut state = self .runtime .write() .unwrap_or_else(|poisoned| poisoned.into_inner()); state.source = Some(source); state.data_quality = Some(data_quality); state.refresh.last_attempt_at = Some(attempted_at); state.refresh.last_success_at = Some(Utc::now()); if changed { state.refresh.last_changed_at = Some(Utc::now()); } state.refresh.status = "success"; state.refresh.changed = Some(changed); state.refresh.duration_ms = Some(duration_ms); state.refresh.consecutive_failures = 0; state.refresh.last_error = None; } pub fn record_source_fingerprint(&self, fingerprint: SourceFingerprint) { let mut state = self .runtime .write() .unwrap_or_else(|poisoned| poisoned.into_inner()); state.source_fingerprint = Some(fingerprint); } pub fn record_refresh_unchanged(&self, attempted_at: DateTime, duration_ms: u128) { let mut state = self .runtime .write() .unwrap_or_else(|poisoned| poisoned.into_inner()); state.refresh.last_attempt_at = Some(attempted_at); state.refresh.last_success_at = Some(Utc::now()); state.refresh.status = "success"; state.refresh.changed = Some(false); state.refresh.duration_ms = Some(duration_ms); state.refresh.consecutive_failures = 0; state.refresh.last_error = None; } pub fn record_refresh_failure( &self, attempted_at: DateTime, duration_ms: u128, error: &anyhow::Error, ) { let mut state = self .runtime .write() .unwrap_or_else(|poisoned| poisoned.into_inner()); state.refresh.last_attempt_at = Some(attempted_at); state.refresh.status = "failed"; state.refresh.changed = None; state.refresh.duration_ms = Some(duration_ms); state.refresh.consecutive_failures = state.refresh.consecutive_failures.saturating_add(1); state.refresh.last_error = Some(error.to_string()); } pub fn write_or_warn( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) { if let Err(err) = self.write(report_dir, phase, shared_cache, notifier, service_stats) { warn!( "failed to write RTR report to {}: {:?}", report_dir.display(), err ); } } pub fn write_source_or_warn( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) { if let Err(err) = self.write_source(report_dir, phase, shared_cache, notifier, service_stats) { warn!( "failed to write RTR source report to {}: {:?}", report_dir.display(), err ); } } pub fn write_clients_or_warn( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) { if let Err(err) = self.write_clients(report_dir, phase, shared_cache, notifier, service_stats) { warn!( "failed to write RTR clients report to {}: {:?}", report_dir.display(), err ); } } pub fn write_runtime_or_warn( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) { if let Err(err) = self.write_runtime(report_dir, phase, shared_cache, notifier, service_stats) { warn!( "failed to write RTR runtime report to {}: {:?}", report_dir.display(), err ); } } fn write( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> Result<()> { let parts = self.build_reports(phase, shared_cache, notifier, service_stats); fs::create_dir_all(report_dir) .with_context(|| format!("create report directory {}", report_dir.display()))?; self.write_source_report(report_dir, &parts.suffix, &parts.source)?; self.write_clients_report(report_dir, &parts.suffix, &parts.clients)?; self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime)?; Ok(()) } fn write_source( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> Result<()> { let parts = self.build_reports(phase, shared_cache, notifier, service_stats); fs::create_dir_all(report_dir) .with_context(|| format!("create report directory {}", report_dir.display()))?; self.write_source_report(report_dir, &parts.suffix, &parts.source) } fn write_clients( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> Result<()> { let parts = self.build_reports(phase, shared_cache, notifier, service_stats); fs::create_dir_all(report_dir) .with_context(|| format!("create report directory {}", report_dir.display()))?; self.write_clients_report(report_dir, &parts.suffix, &parts.clients) } fn write_runtime( &self, report_dir: &Path, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> Result<()> { let parts = self.build_reports(phase, shared_cache, notifier, service_stats); fs::create_dir_all(report_dir) .with_context(|| format!("create report directory {}", report_dir.display()))?; self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime) } fn build_reports( &self, phase: &str, shared_cache: &SharedRtrCache, notifier: &RtrNotifier, service_stats: &RtrServiceStats, ) -> ReportParts { let cache = shared_cache.load_full(); let configuration = self .configuration .read() .unwrap_or_else(|poisoned| poisoned.into_inner()) .clone(); let timezone = configuration.timezone(); let runtime = self .runtime .read() .unwrap_or_else(|poisoned| poisoned.into_inner()) .clone(); let availability = match cache.availability() { CacheAvailability::Ready => "ready", CacheAvailability::NoDataAvailable => "no_data_available", }; let active_connections = service_stats.active_connections(); let connections_by_transport = service_stats.transport_connections(); let max_connections = service_stats.max_connections(); let generated_at = self.report_now(timezone); let metadata = ReportMetadata { schema_version: 2, generated_at, phase: phase.to_string(), }; let service = ServiceReport { started_at: self.to_report_time(self.started_at, timezone), uptime_seconds: self.started_instant.elapsed().as_secs(), active_connections, connections_by_transport: TransportConnectionReport::from(connections_by_transport), session_listeners: notifier.listener_count(), max_connections, connection_utilization: active_connections as f64 / max_connections as f64, }; let process = ProcessReport { rss_mib: current_rss_mib(), }; let source = runtime .source .map(|source| SourceLoadReportView::from_report(source, timezone)); let refresh = RefreshReportView::from_report(runtime.refresh, timezone); let cache = CacheReport { availability, created_at: self.to_report_time(cache.created_at().utc(), timezone), last_update_begin: self.to_report_time(cache.last_update_begin().utc(), timezone), last_update_end: self.to_report_time(cache.last_update_end().utc(), timezone), memory: cache.memory_stats(), versions: cache.version_report_stats(), }; let source_report = SourceReport { schema_version: metadata.schema_version, generated_at: metadata.generated_at, phase: metadata.phase.clone(), source, source_fingerprint: runtime.source_fingerprint.map(|fingerprint| { SourceFingerprintReport::from_fingerprint(fingerprint, timezone) }), refresh, data_quality: runtime.data_quality, cache, }; let clients_report = ClientsReport { schema_version: metadata.schema_version, generated_at: metadata.generated_at, phase: metadata.phase.clone(), service: ServiceReport { started_at: service.started_at, uptime_seconds: service.uptime_seconds, active_connections: service.active_connections, connections_by_transport: service.connections_by_transport, session_listeners: service.session_listeners, max_connections: service.max_connections, connection_utilization: service.connection_utilization, }, }; let runtime_report = RuntimeReport { schema_version: metadata.schema_version, generated_at: metadata.generated_at, phase: metadata.phase, service, process, configuration, }; let suffix = report_file_suffix(generated_at); ReportParts { suffix, source: source_report, clients: clients_report, runtime: runtime_report, } } fn write_source_report( &self, report_dir: &Path, suffix: &str, report: &SourceReport, ) -> Result<()> { write_rolling_report( report_dir, "rtr-source", suffix, self.report_history_limit(), report, ) } fn write_clients_report( &self, report_dir: &Path, suffix: &str, report: &ClientsReport, ) -> Result<()> { write_rolling_report( report_dir, "rtr-clients", suffix, self.report_history_limit(), report, ) } fn write_runtime_report( &self, report_dir: &Path, suffix: &str, report: &RuntimeReport, ) -> Result<()> { write_rolling_report( report_dir, "rtr-runtime", suffix, self.report_history_limit(), report, ) } fn report_history_limit(&self) -> usize { self.configuration .read() .unwrap_or_else(|poisoned| poisoned.into_inner()) .report_history_limit } fn report_now(&self, timezone: Tz) -> DateTime { self.to_report_time(Utc::now(), timezone) } fn to_report_time(&self, time: DateTime, timezone: Tz) -> DateTime { time.with_timezone(&timezone) } } #[derive(Serialize, Clone)] struct ReportMetadata { schema_version: u16, generated_at: DateTime, phase: String, } struct ReportParts { suffix: String, source: SourceReport, clients: ClientsReport, runtime: RuntimeReport, } #[derive(Serialize)] struct SourceReport { schema_version: u16, generated_at: DateTime, phase: String, source: Option, source_fingerprint: Option, refresh: RefreshReportView, data_quality: Option, cache: CacheReport, } #[derive(Serialize)] struct ClientsReport { schema_version: u16, generated_at: DateTime, phase: String, service: ServiceReport, } #[derive(Serialize)] struct RuntimeReport { schema_version: u16, generated_at: DateTime, phase: String, service: ServiceReport, process: ProcessReport, configuration: ReportConfiguration, } #[derive(Clone, Copy, Serialize)] struct ServiceReport { started_at: DateTime, uptime_seconds: u64, active_connections: usize, connections_by_transport: TransportConnectionReport, session_listeners: usize, max_connections: usize, connection_utilization: f64, } #[derive(Serialize)] struct SourceLoadReportView { ccr_file: String, ccr_file_size_bytes: u64, ccr_modified_at: Option>, ccr_produced_at: Option, slurm_enabled: bool, slurm_file_count: usize, slurm_files: Vec, slurm_version: Option, } #[derive(Serialize)] struct SourceFingerprintReport { ccr: FileFingerprintReport, slurm_files: Vec, } impl SourceFingerprintReport { fn from_fingerprint(fingerprint: SourceFingerprint, timezone: Tz) -> Self { Self { ccr: FileFingerprintReport::from_fingerprint(fingerprint.ccr, timezone), slurm_files: fingerprint .slurm_files .into_iter() .map(|fingerprint| FileFingerprintReport::from_fingerprint(fingerprint, timezone)) .collect(), } } } #[derive(Serialize)] struct FileFingerprintReport { path: String, len: u64, modified_unix_secs: u64, modified_at: Option>, } impl FileFingerprintReport { fn from_fingerprint(fingerprint: FileFingerprint, timezone: Tz) -> Self { Self { path: fingerprint.path, len: fingerprint.len, modified_unix_secs: fingerprint.modified_unix_secs, modified_at: DateTime::from_timestamp(fingerprint.modified_unix_secs as i64, 0) .map(|time| to_report_time(time, timezone)), } } } impl SourceLoadReportView { fn from_report(report: SourceLoadReport, timezone: Tz) -> Self { Self { ccr_file: report.ccr_file, ccr_file_size_bytes: report.ccr_file_size_bytes, ccr_modified_at: report .ccr_modified_at .map(|time| to_report_time(time, timezone)), ccr_produced_at: report.ccr_produced_at, slurm_enabled: report.slurm_enabled, slurm_file_count: report.slurm_file_count, slurm_files: report.slurm_files, slurm_version: report.slurm_version, } } } #[derive(Clone, Copy, Serialize)] struct TransportConnectionReport { tcp: usize, tls: usize, ssh: usize, } impl From for TransportConnectionReport { fn from(counts: RtrTransportConnectionCounts) -> Self { Self { tcp: counts.tcp, tls: counts.tls, ssh: counts.ssh, } } } #[derive(Serialize)] struct ProcessReport { rss_mib: Option, } #[derive(Serialize)] struct CacheReport { availability: &'static str, created_at: DateTime, last_update_begin: DateTime, last_update_end: DateTime, memory: CacheMemoryStats, versions: [VersionReportStats; 3], } pub fn to_report_time(time: DateTime, timezone: Tz) -> DateTime { time.with_timezone(&timezone) } pub fn current_rss_mib() -> Option { let status = fs::read_to_string("/proc/self/status").ok()?; let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?; let kb = vmrss_line .split_whitespace() .nth(1) .and_then(|value| value.parse::().ok())?; Some(kb / 1024) } fn report_file_suffix(time: DateTime) -> String { time.format("%Y%m%d%H%M%S%9f").to_string() } fn write_rolling_report( report_dir: &Path, prefix: &str, suffix: &str, keep: usize, report: &T, ) -> Result<()> { let file_name = format!("{prefix}-{suffix}.json"); let target = report_dir.join(&file_name); let temporary = report_dir.join(format!(".{file_name}.tmp")); let json = serde_json::to_vec_pretty(report) .with_context(|| format!("serialize {prefix} RTR report"))?; fs::write(&temporary, json) .with_context(|| format!("write temporary report {}", temporary.display()))?; replace_file(&temporary, &target)?; prune_rolling_reports(report_dir, prefix, keep)?; Ok(()) } fn replace_file(temporary: &Path, target: &Path) -> Result<()> { if let Err(err) = fs::rename(temporary, target) { if target.exists() { fs::remove_file(target) .with_context(|| format!("replace existing report {}", target.display()))?; fs::rename(temporary, target) .with_context(|| format!("move report into {}", target.display()))?; } else { return Err(err).with_context(|| { format!( "move temporary report {} into {}", temporary.display(), target.display() ) }); } } Ok(()) } fn prune_rolling_reports(report_dir: &Path, prefix: &str, keep: usize) -> Result<()> { let start = format!("{prefix}-"); let mut files = Vec::::new(); for entry in fs::read_dir(report_dir) .with_context(|| format!("read report directory {}", report_dir.display()))? { let entry = entry.with_context(|| format!("iterate report directory {}", report_dir.display()))?; let path = entry.path(); if !path.is_file() { continue; } let Some(name) = path.file_name().and_then(|name| name.to_str()) else { continue; }; if name.starts_with(&start) && name.ends_with(".json") { files.push(path); } } files.sort(); let remove_count = files.len().saturating_sub(keep); for path in files.into_iter().take(remove_count) { fs::remove_file(&path) .with_context(|| format!("remove old rolling report {}", path.display()))?; } Ok(()) }