1053 lines
33 KiB
Rust
1053 lines
33 KiB
Rust
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::{Arc, RwLock};
|
|
use std::time::Instant;
|
|
|
|
use anyhow::{Context, Result};
|
|
use chrono::{DateTime, Utc};
|
|
use chrono_tz::Tz;
|
|
use serde::Serialize;
|
|
use tracing::warn;
|
|
|
|
use crate::rtr::cache::{CacheAvailability, CacheMemoryStats, SharedRtrCache, VersionReportStats};
|
|
use crate::rtr::config::format_timezone;
|
|
use crate::rtr::server::{RtrNotifier, RtrServiceStats, RtrTransportConnectionCounts};
|
|
use crate::source::pipeline::{
|
|
DataQualityReport, FileFingerprint, SourceFingerprint, SourceLoadReport,
|
|
};
|
|
|
|
#[derive(Clone, Serialize)]
|
|
pub struct ReportConfiguration {
|
|
source_refresh_interval_seconds: u64,
|
|
runtime_report_interval_seconds: u64,
|
|
report_history_limit: usize,
|
|
max_delta: u8,
|
|
prune_delta_by_snapshot_size: bool,
|
|
strict_ccr_validation: bool,
|
|
timezone: String,
|
|
timing: TimingReport,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Serialize)]
|
|
struct TimingReport {
|
|
refresh: u32,
|
|
retry: u32,
|
|
expire: u32,
|
|
}
|
|
|
|
impl ReportConfiguration {
|
|
pub fn new(
|
|
source_refresh_interval_seconds: u64,
|
|
runtime_report_interval_seconds: u64,
|
|
report_history_limit: usize,
|
|
max_delta: u8,
|
|
prune_delta_by_snapshot_size: bool,
|
|
strict_ccr_validation: bool,
|
|
timezone: Tz,
|
|
timing: (u32, u32, u32),
|
|
) -> Self {
|
|
Self {
|
|
source_refresh_interval_seconds,
|
|
runtime_report_interval_seconds,
|
|
report_history_limit,
|
|
max_delta,
|
|
prune_delta_by_snapshot_size,
|
|
strict_ccr_validation,
|
|
timezone: format_timezone(timezone),
|
|
timing: TimingReport {
|
|
refresh: timing.0,
|
|
retry: timing.1,
|
|
expire: timing.2,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn timezone(&self) -> Tz {
|
|
self.timezone
|
|
.parse::<Tz>()
|
|
.expect("serialized timezone should be a valid IANA timezone")
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct ReportContext {
|
|
started_at: DateTime<Utc>,
|
|
started_instant: Instant,
|
|
timezone: Tz,
|
|
configuration: ReportConfiguration,
|
|
runtime: Arc<RwLock<RuntimeReportState>>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Default)]
|
|
struct RuntimeReportState {
|
|
source: Option<SourceLoadReport>,
|
|
source_fingerprint: Option<SourceFingerprint>,
|
|
data_quality: Option<DataQualityReport>,
|
|
refresh: RefreshReport,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
struct RefreshReport {
|
|
last_attempt_at: Option<DateTime<Utc>>,
|
|
last_success_at: Option<DateTime<Utc>>,
|
|
last_changed_at: Option<DateTime<Utc>>,
|
|
status: &'static str,
|
|
changed: Option<bool>,
|
|
duration_ms: Option<u128>,
|
|
consecutive_failures: u64,
|
|
last_error: Option<String>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct RefreshReportView {
|
|
last_attempt_at: Option<DateTime<Tz>>,
|
|
last_success_at: Option<DateTime<Tz>>,
|
|
last_changed_at: Option<DateTime<Tz>>,
|
|
status: &'static str,
|
|
changed: Option<bool>,
|
|
duration_ms: Option<u128>,
|
|
consecutive_failures: u64,
|
|
last_error: Option<String>,
|
|
}
|
|
|
|
impl RefreshReportView {
|
|
fn from_report(report: RefreshReport, timezone: Tz) -> Self {
|
|
Self {
|
|
last_attempt_at: report
|
|
.last_attempt_at
|
|
.map(|time| to_report_time(time, timezone)),
|
|
last_success_at: report
|
|
.last_success_at
|
|
.map(|time| to_report_time(time, timezone)),
|
|
last_changed_at: report
|
|
.last_changed_at
|
|
.map(|time| to_report_time(time, timezone)),
|
|
status: report.status,
|
|
changed: report.changed,
|
|
duration_ms: report.duration_ms,
|
|
consecutive_failures: report.consecutive_failures,
|
|
last_error: report.last_error,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for RefreshReport {
|
|
fn default() -> Self {
|
|
Self {
|
|
last_attempt_at: None,
|
|
last_success_at: None,
|
|
last_changed_at: None,
|
|
status: "not_attempted",
|
|
changed: None,
|
|
duration_ms: None,
|
|
consecutive_failures: 0,
|
|
last_error: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ReportContext {
|
|
pub fn new(configuration: ReportConfiguration) -> Self {
|
|
let timezone = configuration.timezone();
|
|
Self {
|
|
started_at: Utc::now(),
|
|
started_instant: Instant::now(),
|
|
timezone,
|
|
configuration,
|
|
runtime: Arc::new(RwLock::new(RuntimeReportState::default())),
|
|
}
|
|
}
|
|
|
|
pub fn record_refresh_success(
|
|
&self,
|
|
attempted_at: DateTime<Utc>,
|
|
duration_ms: u128,
|
|
changed: bool,
|
|
source: SourceLoadReport,
|
|
data_quality: DataQualityReport,
|
|
) {
|
|
let mut state = self
|
|
.runtime
|
|
.write()
|
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
|
state.source = Some(source);
|
|
state.data_quality = Some(data_quality);
|
|
state.refresh.last_attempt_at = Some(attempted_at);
|
|
state.refresh.last_success_at = Some(Utc::now());
|
|
if changed {
|
|
state.refresh.last_changed_at = Some(Utc::now());
|
|
}
|
|
state.refresh.status = "success";
|
|
state.refresh.changed = Some(changed);
|
|
state.refresh.duration_ms = Some(duration_ms);
|
|
state.refresh.consecutive_failures = 0;
|
|
state.refresh.last_error = None;
|
|
}
|
|
|
|
pub fn record_source_fingerprint(&self, fingerprint: SourceFingerprint) {
|
|
let mut state = self
|
|
.runtime
|
|
.write()
|
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
|
state.source_fingerprint = Some(fingerprint);
|
|
}
|
|
|
|
pub fn record_refresh_unchanged(&self, attempted_at: DateTime<Utc>, duration_ms: u128) {
|
|
let mut state = self
|
|
.runtime
|
|
.write()
|
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
|
state.refresh.last_attempt_at = Some(attempted_at);
|
|
state.refresh.last_success_at = Some(Utc::now());
|
|
state.refresh.status = "success";
|
|
state.refresh.changed = Some(false);
|
|
state.refresh.duration_ms = Some(duration_ms);
|
|
state.refresh.consecutive_failures = 0;
|
|
state.refresh.last_error = None;
|
|
}
|
|
|
|
pub fn record_refresh_failure(
|
|
&self,
|
|
attempted_at: DateTime<Utc>,
|
|
duration_ms: u128,
|
|
error: &anyhow::Error,
|
|
) {
|
|
let mut state = self
|
|
.runtime
|
|
.write()
|
|
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
|
state.refresh.last_attempt_at = Some(attempted_at);
|
|
state.refresh.status = "failed";
|
|
state.refresh.changed = None;
|
|
state.refresh.duration_ms = Some(duration_ms);
|
|
state.refresh.consecutive_failures = state.refresh.consecutive_failures.saturating_add(1);
|
|
state.refresh.last_error = Some(error.to_string());
|
|
}
|
|
|
|
pub fn write_or_warn(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) {
|
|
if let Err(err) = self.write(report_dir, phase, shared_cache, notifier, service_stats) {
|
|
warn!(
|
|
"failed to write RTR report to {}: {:?}",
|
|
report_dir.display(),
|
|
err
|
|
);
|
|
}
|
|
}
|
|
|
|
pub fn write_source_or_warn(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) {
|
|
if let Err(err) =
|
|
self.write_source(report_dir, phase, shared_cache, notifier, service_stats)
|
|
{
|
|
warn!(
|
|
"failed to write RTR source report to {}: {:?}",
|
|
report_dir.display(),
|
|
err
|
|
);
|
|
}
|
|
}
|
|
|
|
pub fn write_clients_or_warn(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) {
|
|
if let Err(err) =
|
|
self.write_clients(report_dir, phase, shared_cache, notifier, service_stats)
|
|
{
|
|
warn!(
|
|
"failed to write RTR clients report to {}: {:?}",
|
|
report_dir.display(),
|
|
err
|
|
);
|
|
}
|
|
}
|
|
|
|
pub fn write_runtime_or_warn(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) {
|
|
if let Err(err) =
|
|
self.write_runtime(report_dir, phase, shared_cache, notifier, service_stats)
|
|
{
|
|
warn!(
|
|
"failed to write RTR runtime report to {}: {:?}",
|
|
report_dir.display(),
|
|
err
|
|
);
|
|
}
|
|
}
|
|
|
|
fn write(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) -> Result<()> {
|
|
let parts = self.build_reports(phase, shared_cache, notifier, service_stats);
|
|
fs::create_dir_all(report_dir)
|
|
.with_context(|| format!("create report directory {}", report_dir.display()))?;
|
|
self.write_source_report(report_dir, &parts.suffix, &parts.source)?;
|
|
self.write_clients_report(report_dir, &parts.suffix, &parts.clients)?;
|
|
self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn write_source(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) -> Result<()> {
|
|
let parts = self.build_reports(phase, shared_cache, notifier, service_stats);
|
|
fs::create_dir_all(report_dir)
|
|
.with_context(|| format!("create report directory {}", report_dir.display()))?;
|
|
self.write_source_report(report_dir, &parts.suffix, &parts.source)
|
|
}
|
|
|
|
fn write_clients(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) -> Result<()> {
|
|
let parts = self.build_reports(phase, shared_cache, notifier, service_stats);
|
|
fs::create_dir_all(report_dir)
|
|
.with_context(|| format!("create report directory {}", report_dir.display()))?;
|
|
self.write_clients_report(report_dir, &parts.suffix, &parts.clients)
|
|
}
|
|
|
|
fn write_runtime(
|
|
&self,
|
|
report_dir: &Path,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) -> Result<()> {
|
|
let parts = self.build_reports(phase, shared_cache, notifier, service_stats);
|
|
fs::create_dir_all(report_dir)
|
|
.with_context(|| format!("create report directory {}", report_dir.display()))?;
|
|
self.write_runtime_report(report_dir, &parts.suffix, &parts.runtime)
|
|
}
|
|
|
|
fn build_reports(
|
|
&self,
|
|
phase: &str,
|
|
shared_cache: &SharedRtrCache,
|
|
notifier: &RtrNotifier,
|
|
service_stats: &RtrServiceStats,
|
|
) -> ReportParts {
|
|
let cache = shared_cache.load_full();
|
|
let runtime = self
|
|
.runtime
|
|
.read()
|
|
.unwrap_or_else(|poisoned| poisoned.into_inner())
|
|
.clone();
|
|
let availability = match cache.availability() {
|
|
CacheAvailability::Ready => "ready",
|
|
CacheAvailability::NoDataAvailable => "no_data_available",
|
|
};
|
|
let active_connections = service_stats.active_connections();
|
|
let connections_by_transport = service_stats.transport_connections();
|
|
let max_connections = service_stats.max_connections();
|
|
let generated_at = self.report_now();
|
|
let metadata = ReportMetadata {
|
|
schema_version: 2,
|
|
generated_at,
|
|
phase: phase.to_string(),
|
|
};
|
|
let service = ServiceReport {
|
|
started_at: self.to_report_time(self.started_at),
|
|
uptime_seconds: self.started_instant.elapsed().as_secs(),
|
|
active_connections,
|
|
connections_by_transport: TransportConnectionReport::from(connections_by_transport),
|
|
session_listeners: notifier.listener_count(),
|
|
max_connections,
|
|
connection_utilization: active_connections as f64 / max_connections as f64,
|
|
};
|
|
let process = ProcessReport {
|
|
rss_mib: current_rss_mib(),
|
|
};
|
|
let source = runtime
|
|
.source
|
|
.map(|source| SourceLoadReportView::from_report(source, self.timezone));
|
|
let refresh = RefreshReportView::from_report(runtime.refresh, self.timezone);
|
|
let cache = CacheReport {
|
|
availability,
|
|
created_at: self.to_report_time(cache.created_at().utc()),
|
|
last_update_begin: self.to_report_time(cache.last_update_begin().utc()),
|
|
last_update_end: self.to_report_time(cache.last_update_end().utc()),
|
|
memory: cache.memory_stats(),
|
|
versions: cache.version_report_stats(),
|
|
};
|
|
|
|
let source_report = SourceReport {
|
|
schema_version: metadata.schema_version,
|
|
generated_at: metadata.generated_at,
|
|
phase: metadata.phase.clone(),
|
|
source,
|
|
source_fingerprint: runtime.source_fingerprint.map(|fingerprint| {
|
|
SourceFingerprintReport::from_fingerprint(fingerprint, self.timezone)
|
|
}),
|
|
refresh,
|
|
data_quality: runtime.data_quality,
|
|
cache,
|
|
};
|
|
let clients_report = ClientsReport {
|
|
schema_version: metadata.schema_version,
|
|
generated_at: metadata.generated_at,
|
|
phase: metadata.phase.clone(),
|
|
service: ServiceReport {
|
|
started_at: service.started_at,
|
|
uptime_seconds: service.uptime_seconds,
|
|
active_connections: service.active_connections,
|
|
connections_by_transport: service.connections_by_transport,
|
|
session_listeners: service.session_listeners,
|
|
max_connections: service.max_connections,
|
|
connection_utilization: service.connection_utilization,
|
|
},
|
|
};
|
|
let runtime_report = RuntimeReport {
|
|
schema_version: metadata.schema_version,
|
|
generated_at: metadata.generated_at,
|
|
phase: metadata.phase,
|
|
service,
|
|
process,
|
|
configuration: self.configuration.clone(),
|
|
};
|
|
let suffix = report_file_suffix(generated_at);
|
|
|
|
ReportParts {
|
|
suffix,
|
|
source: source_report,
|
|
clients: clients_report,
|
|
runtime: runtime_report,
|
|
}
|
|
}
|
|
|
|
fn write_source_report(
|
|
&self,
|
|
report_dir: &Path,
|
|
suffix: &str,
|
|
report: &SourceReport,
|
|
) -> Result<()> {
|
|
write_rolling_report(
|
|
report_dir,
|
|
"rtr-source",
|
|
suffix,
|
|
self.configuration.report_history_limit,
|
|
report,
|
|
)
|
|
}
|
|
|
|
fn write_clients_report(
|
|
&self,
|
|
report_dir: &Path,
|
|
suffix: &str,
|
|
report: &ClientsReport,
|
|
) -> Result<()> {
|
|
write_rolling_report(
|
|
report_dir,
|
|
"rtr-clients",
|
|
suffix,
|
|
self.configuration.report_history_limit,
|
|
report,
|
|
)
|
|
}
|
|
|
|
fn write_runtime_report(
|
|
&self,
|
|
report_dir: &Path,
|
|
suffix: &str,
|
|
report: &RuntimeReport,
|
|
) -> Result<()> {
|
|
write_rolling_report(
|
|
report_dir,
|
|
"rtr-runtime",
|
|
suffix,
|
|
self.configuration.report_history_limit,
|
|
report,
|
|
)
|
|
}
|
|
|
|
fn report_now(&self) -> DateTime<Tz> {
|
|
self.to_report_time(Utc::now())
|
|
}
|
|
|
|
fn to_report_time(&self, time: DateTime<Utc>) -> DateTime<Tz> {
|
|
time.with_timezone(&self.timezone)
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Clone)]
|
|
struct ReportMetadata {
|
|
schema_version: u16,
|
|
generated_at: DateTime<Tz>,
|
|
phase: String,
|
|
}
|
|
|
|
struct ReportParts {
|
|
suffix: String,
|
|
source: SourceReport,
|
|
clients: ClientsReport,
|
|
runtime: RuntimeReport,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct SourceReport {
|
|
schema_version: u16,
|
|
generated_at: DateTime<Tz>,
|
|
phase: String,
|
|
source: Option<SourceLoadReportView>,
|
|
source_fingerprint: Option<SourceFingerprintReport>,
|
|
refresh: RefreshReportView,
|
|
data_quality: Option<DataQualityReport>,
|
|
cache: CacheReport,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ClientsReport {
|
|
schema_version: u16,
|
|
generated_at: DateTime<Tz>,
|
|
phase: String,
|
|
service: ServiceReport,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct RuntimeReport {
|
|
schema_version: u16,
|
|
generated_at: DateTime<Tz>,
|
|
phase: String,
|
|
service: ServiceReport,
|
|
process: ProcessReport,
|
|
configuration: ReportConfiguration,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Serialize)]
|
|
struct ServiceReport {
|
|
started_at: DateTime<Tz>,
|
|
uptime_seconds: u64,
|
|
active_connections: usize,
|
|
connections_by_transport: TransportConnectionReport,
|
|
session_listeners: usize,
|
|
max_connections: usize,
|
|
connection_utilization: f64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct SourceLoadReportView {
|
|
ccr_file: String,
|
|
ccr_file_size_bytes: u64,
|
|
ccr_modified_at: Option<DateTime<Tz>>,
|
|
ccr_produced_at: Option<String>,
|
|
slurm_enabled: bool,
|
|
slurm_file_count: usize,
|
|
slurm_files: Vec<String>,
|
|
slurm_version: Option<u32>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct SourceFingerprintReport {
|
|
ccr: FileFingerprintReport,
|
|
slurm_files: Vec<FileFingerprintReport>,
|
|
}
|
|
|
|
impl SourceFingerprintReport {
|
|
fn from_fingerprint(fingerprint: SourceFingerprint, timezone: Tz) -> Self {
|
|
Self {
|
|
ccr: FileFingerprintReport::from_fingerprint(fingerprint.ccr, timezone),
|
|
slurm_files: fingerprint
|
|
.slurm_files
|
|
.into_iter()
|
|
.map(|fingerprint| FileFingerprintReport::from_fingerprint(fingerprint, timezone))
|
|
.collect(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct FileFingerprintReport {
|
|
path: String,
|
|
len: u64,
|
|
modified_unix_secs: u64,
|
|
modified_at: Option<DateTime<Tz>>,
|
|
}
|
|
|
|
impl FileFingerprintReport {
|
|
fn from_fingerprint(fingerprint: FileFingerprint, timezone: Tz) -> Self {
|
|
Self {
|
|
path: fingerprint.path,
|
|
len: fingerprint.len,
|
|
modified_unix_secs: fingerprint.modified_unix_secs,
|
|
modified_at: DateTime::from_timestamp(fingerprint.modified_unix_secs as i64, 0)
|
|
.map(|time| to_report_time(time, timezone)),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl SourceLoadReportView {
|
|
fn from_report(report: SourceLoadReport, timezone: Tz) -> Self {
|
|
Self {
|
|
ccr_file: report.ccr_file,
|
|
ccr_file_size_bytes: report.ccr_file_size_bytes,
|
|
ccr_modified_at: report
|
|
.ccr_modified_at
|
|
.map(|time| to_report_time(time, timezone)),
|
|
ccr_produced_at: report.ccr_produced_at,
|
|
slurm_enabled: report.slurm_enabled,
|
|
slurm_file_count: report.slurm_file_count,
|
|
slurm_files: report.slurm_files,
|
|
slurm_version: report.slurm_version,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Serialize)]
|
|
struct TransportConnectionReport {
|
|
tcp: usize,
|
|
tls: usize,
|
|
ssh: usize,
|
|
}
|
|
|
|
impl From<RtrTransportConnectionCounts> for TransportConnectionReport {
|
|
fn from(counts: RtrTransportConnectionCounts) -> Self {
|
|
Self {
|
|
tcp: counts.tcp,
|
|
tls: counts.tls,
|
|
ssh: counts.ssh,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct ProcessReport {
|
|
rss_mib: Option<u64>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct CacheReport {
|
|
availability: &'static str,
|
|
created_at: DateTime<Tz>,
|
|
last_update_begin: DateTime<Tz>,
|
|
last_update_end: DateTime<Tz>,
|
|
memory: CacheMemoryStats,
|
|
versions: [VersionReportStats; 3],
|
|
}
|
|
|
|
pub fn to_report_time(time: DateTime<Utc>, timezone: Tz) -> DateTime<Tz> {
|
|
time.with_timezone(&timezone)
|
|
}
|
|
|
|
pub fn current_rss_mib() -> Option<u64> {
|
|
let status = fs::read_to_string("/proc/self/status").ok()?;
|
|
let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?;
|
|
let kb = vmrss_line
|
|
.split_whitespace()
|
|
.nth(1)
|
|
.and_then(|value| value.parse::<u64>().ok())?;
|
|
Some(kb / 1024)
|
|
}
|
|
|
|
fn report_file_suffix(time: DateTime<Tz>) -> String {
|
|
time.format("%Y%m%d%H%M%S%9f").to_string()
|
|
}
|
|
|
|
fn write_rolling_report<T: Serialize>(
|
|
report_dir: &Path,
|
|
prefix: &str,
|
|
suffix: &str,
|
|
keep: usize,
|
|
report: &T,
|
|
) -> Result<()> {
|
|
let file_name = format!("{prefix}-{suffix}.json");
|
|
let target = report_dir.join(&file_name);
|
|
let temporary = report_dir.join(format!(".{file_name}.tmp"));
|
|
let json = serde_json::to_vec_pretty(report)
|
|
.with_context(|| format!("serialize {prefix} RTR report"))?;
|
|
fs::write(&temporary, json)
|
|
.with_context(|| format!("write temporary report {}", temporary.display()))?;
|
|
replace_file(&temporary, &target)?;
|
|
prune_rolling_reports(report_dir, prefix, keep)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn replace_file(temporary: &Path, target: &Path) -> Result<()> {
|
|
if let Err(err) = fs::rename(temporary, target) {
|
|
if target.exists() {
|
|
fs::remove_file(target)
|
|
.with_context(|| format!("replace existing report {}", target.display()))?;
|
|
fs::rename(temporary, target)
|
|
.with_context(|| format!("move report into {}", target.display()))?;
|
|
} else {
|
|
return Err(err).with_context(|| {
|
|
format!(
|
|
"move temporary report {} into {}",
|
|
temporary.display(),
|
|
target.display()
|
|
)
|
|
});
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn prune_rolling_reports(report_dir: &Path, prefix: &str, keep: usize) -> Result<()> {
|
|
let start = format!("{prefix}-");
|
|
let mut files = Vec::<PathBuf>::new();
|
|
for entry in fs::read_dir(report_dir)
|
|
.with_context(|| format!("read report directory {}", report_dir.display()))?
|
|
{
|
|
let entry =
|
|
entry.with_context(|| format!("iterate report directory {}", report_dir.display()))?;
|
|
let path = entry.path();
|
|
if !path.is_file() {
|
|
continue;
|
|
}
|
|
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
|
|
continue;
|
|
};
|
|
if name.starts_with(&start) && name.ends_with(".json") {
|
|
files.push(path);
|
|
}
|
|
}
|
|
files.sort();
|
|
let remove_count = files.len().saturating_sub(keep);
|
|
for path in files.into_iter().take(remove_count) {
|
|
fs::remove_file(&path)
|
|
.with_context(|| format!("remove old rolling report {}", path.display()))?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::sync::Arc;
|
|
|
|
use crate::rtr::cache::RtrCache;
|
|
use crate::rtr::server::RtrService;
|
|
use crate::source::pipeline::{
|
|
DataQualityReport, PayloadTypeCounts, SlurmRuleCounts, SourceLoadReport,
|
|
};
|
|
use arc_swap::ArcSwap;
|
|
use serde_json::Value;
|
|
|
|
use super::*;
|
|
|
|
fn test_configuration() -> ReportConfiguration {
|
|
ReportConfiguration::new(
|
|
300,
|
|
300,
|
|
10,
|
|
100,
|
|
false,
|
|
false,
|
|
chrono_tz::Asia::Shanghai,
|
|
(3600, 600, 7200),
|
|
)
|
|
}
|
|
|
|
#[test]
|
|
fn write_report_creates_parseable_json() {
|
|
let temp = tempfile::tempdir().unwrap();
|
|
let report_dir = temp.path().join("report");
|
|
let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default()));
|
|
let service = RtrService::new(shared_cache.clone());
|
|
let notifier = service.notifier();
|
|
let context = ReportContext::new(test_configuration());
|
|
|
|
context
|
|
.write(
|
|
&report_dir,
|
|
"test",
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
|
|
let source_report = read_single_report(&report_dir, "rtr-source");
|
|
assert_eq!(source_report["schema_version"], 2);
|
|
assert_eq!(source_report["phase"], "test");
|
|
assert_report_time_offset(&source_report["generated_at"]);
|
|
assert_report_time_offset(&source_report["cache"]["created_at"]);
|
|
assert_eq!(source_report["cache"]["availability"], "ready");
|
|
assert_eq!(source_report["refresh"]["status"], "not_attempted");
|
|
assert!(source_report["source"].is_null());
|
|
assert!(source_report["data_quality"].is_null());
|
|
|
|
let runtime_report = read_single_report(&report_dir, "rtr-runtime");
|
|
assert_report_time_offset(&runtime_report["service"]["started_at"]);
|
|
assert_eq!(
|
|
runtime_report["configuration"]["source_refresh_interval_seconds"],
|
|
300
|
|
);
|
|
assert_eq!(runtime_report["configuration"]["report_history_limit"], 10);
|
|
|
|
let clients_report = read_single_report(&report_dir, "rtr-clients");
|
|
assert_eq!(clients_report["service"]["max_connections"], 1024);
|
|
assert_eq!(clients_report["service"]["active_connections"], 0);
|
|
assert_eq!(
|
|
clients_report["service"]["connections_by_transport"]["tcp"],
|
|
0
|
|
);
|
|
assert_eq!(
|
|
clients_report["service"]["connections_by_transport"]["tls"],
|
|
0
|
|
);
|
|
assert_eq!(
|
|
clients_report["service"]["connections_by_transport"]["ssh"],
|
|
0
|
|
);
|
|
assert_eq!(
|
|
source_report["cache"]["versions"].as_array().unwrap().len(),
|
|
3
|
|
);
|
|
assert_eq!(
|
|
source_report["cache"]["versions"][2]["snapshot"]["total"],
|
|
0
|
|
);
|
|
assert_eq!(
|
|
source_report["cache"]["memory"]["delta_payload_counts"][2],
|
|
0
|
|
);
|
|
assert!(source_report["source_fingerprint"].is_null());
|
|
assert_no_temporary_reports(&report_dir);
|
|
}
|
|
|
|
#[test]
|
|
fn refresh_failure_preserves_last_successful_source_data() {
|
|
let temp = tempfile::tempdir().unwrap();
|
|
let report_dir = temp.path().join("report");
|
|
let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default()));
|
|
let service = RtrService::new(shared_cache.clone());
|
|
let notifier = service.notifier();
|
|
let context = ReportContext::new(test_configuration());
|
|
let source = SourceLoadReport {
|
|
ccr_file: "data/example.ccr".to_string(),
|
|
ccr_file_size_bytes: 123,
|
|
ccr_modified_at: Some(Utc::now()),
|
|
ccr_produced_at: Some("20260615000000Z".to_string()),
|
|
slurm_enabled: true,
|
|
slurm_file_count: 1,
|
|
slurm_files: vec!["policy.slurm".to_string()],
|
|
slurm_version: Some(2),
|
|
};
|
|
let quality = DataQualityReport {
|
|
ccr_input: PayloadTypeCounts {
|
|
total: 11,
|
|
vrp: 10,
|
|
router_key: 0,
|
|
aspa: 1,
|
|
},
|
|
invalid: PayloadTypeCounts::default(),
|
|
before_slurm: PayloadTypeCounts {
|
|
total: 11,
|
|
vrp: 10,
|
|
router_key: 0,
|
|
aspa: 1,
|
|
},
|
|
after_slurm: PayloadTypeCounts {
|
|
total: 10,
|
|
vrp: 9,
|
|
router_key: 0,
|
|
aspa: 1,
|
|
},
|
|
slurm_filters: SlurmRuleCounts {
|
|
prefix: 1,
|
|
router_key: 0,
|
|
aspa: 0,
|
|
},
|
|
slurm_assertions: SlurmRuleCounts::default(),
|
|
};
|
|
context.record_source_fingerprint(SourceFingerprint {
|
|
ccr: FileFingerprint {
|
|
path: "data/example.ccr".to_string(),
|
|
len: 123,
|
|
modified_unix_secs: 1_781_404_800,
|
|
},
|
|
slurm_files: vec![FileFingerprint {
|
|
path: "policy.slurm".to_string(),
|
|
len: 42,
|
|
modified_unix_secs: 1_781_408_400,
|
|
}],
|
|
});
|
|
|
|
context.record_refresh_success(Utc::now(), 12, true, source, quality);
|
|
context.record_refresh_failure(Utc::now(), 5, &anyhow::anyhow!("source unavailable"));
|
|
context
|
|
.write(
|
|
&report_dir,
|
|
"refresh_failed",
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
|
|
let report = read_single_report(&report_dir, "rtr-source");
|
|
assert_eq!(report["source"]["ccr_file"], "data/example.ccr");
|
|
assert_eq!(
|
|
report["source_fingerprint"]["ccr"]["path"],
|
|
"data/example.ccr"
|
|
);
|
|
assert_eq!(report["source_fingerprint"]["ccr"]["len"], 123);
|
|
assert_report_time_offset(&report["source_fingerprint"]["ccr"]["modified_at"]);
|
|
assert_eq!(
|
|
report["source_fingerprint"]["slurm_files"][0]["path"],
|
|
"policy.slurm"
|
|
);
|
|
assert_report_time_offset(&report["source"]["ccr_modified_at"]);
|
|
assert_eq!(report["data_quality"]["after_slurm"]["total"], 10);
|
|
assert_eq!(report["refresh"]["status"], "failed");
|
|
assert_eq!(report["refresh"]["consecutive_failures"], 1);
|
|
assert_eq!(report["refresh"]["last_error"], "source unavailable");
|
|
assert!(!report["refresh"]["last_success_at"].is_null());
|
|
assert_report_time_offset(&report["refresh"]["last_success_at"]);
|
|
}
|
|
|
|
#[test]
|
|
fn rolling_reports_keep_latest_files_per_category() {
|
|
let temp = tempfile::tempdir().unwrap();
|
|
let report_dir = temp.path().join("report");
|
|
let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default()));
|
|
let service = RtrService::new(shared_cache.clone());
|
|
let notifier = service.notifier();
|
|
let mut configuration = test_configuration();
|
|
configuration.report_history_limit = 2;
|
|
let context = ReportContext::new(configuration);
|
|
|
|
for phase in ["one", "two", "three"] {
|
|
context
|
|
.write(
|
|
&report_dir,
|
|
phase,
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
assert_eq!(report_files(&report_dir, "rtr-source").len(), 2);
|
|
assert_eq!(report_files(&report_dir, "rtr-clients").len(), 2);
|
|
assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 2);
|
|
assert_no_temporary_reports(&report_dir);
|
|
}
|
|
|
|
#[test]
|
|
fn category_writes_only_create_requested_report_type() {
|
|
let temp = tempfile::tempdir().unwrap();
|
|
let report_dir = temp.path().join("report");
|
|
let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default()));
|
|
let service = RtrService::new(shared_cache.clone());
|
|
let notifier = service.notifier();
|
|
let context = ReportContext::new(test_configuration());
|
|
|
|
context
|
|
.write_source(
|
|
&report_dir,
|
|
"source_only",
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(report_files(&report_dir, "rtr-source").len(), 1);
|
|
assert_eq!(report_files(&report_dir, "rtr-clients").len(), 0);
|
|
assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0);
|
|
|
|
context
|
|
.write_clients(
|
|
&report_dir,
|
|
"clients_only",
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(report_files(&report_dir, "rtr-source").len(), 1);
|
|
assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1);
|
|
assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0);
|
|
|
|
context
|
|
.write_runtime(
|
|
&report_dir,
|
|
"runtime_only",
|
|
&shared_cache,
|
|
¬ifier,
|
|
&service.stats(),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(report_files(&report_dir, "rtr-source").len(), 1);
|
|
assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1);
|
|
assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 1);
|
|
}
|
|
|
|
fn assert_report_time_offset(value: &Value) {
|
|
let value = value.as_str().expect("report time should be a string");
|
|
assert!(
|
|
value.ends_with("+08:00"),
|
|
"report time should use +08:00 offset, got {value}"
|
|
);
|
|
}
|
|
|
|
fn read_single_report(report_dir: &Path, prefix: &str) -> Value {
|
|
let files = report_files(report_dir, prefix);
|
|
assert_eq!(files.len(), 1, "expected one {prefix} report");
|
|
serde_json::from_slice(&fs::read(&files[0]).unwrap()).unwrap()
|
|
}
|
|
|
|
fn report_files(report_dir: &Path, prefix: &str) -> Vec<PathBuf> {
|
|
let start = format!("{prefix}-");
|
|
let mut files = fs::read_dir(report_dir)
|
|
.unwrap()
|
|
.map(|entry| entry.unwrap().path())
|
|
.filter(|path| {
|
|
path.file_name()
|
|
.and_then(|name| name.to_str())
|
|
.is_some_and(|name| name.starts_with(&start) && name.ends_with(".json"))
|
|
})
|
|
.collect::<Vec<_>>();
|
|
files.sort();
|
|
files
|
|
}
|
|
|
|
fn assert_no_temporary_reports(report_dir: &Path) {
|
|
let has_temporary = fs::read_dir(report_dir).unwrap().any(|entry| {
|
|
entry
|
|
.unwrap()
|
|
.file_name()
|
|
.to_str()
|
|
.is_some_and(|name| name.ends_with(".tmp"))
|
|
});
|
|
assert!(!has_temporary);
|
|
}
|
|
}
|