use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use arc_swap::ArcSwap; use chrono::Utc; use rpki::rtr::cache::RtrCache; use rpki::rtr::report::{ReportConfiguration, ReportContext}; use rpki::rtr::server::RtrService; use rpki::source::pipeline::{ DataQualityReport, FileFingerprint, PayloadTypeCounts, SlurmRuleCounts, SourceFingerprint, SourceLoadReport, }; use serde_json::Value; fn test_configuration(report_history_limit: usize) -> ReportConfiguration { ReportConfiguration::new( 300, 300, report_history_limit, 100, false, false, chrono_tz::Asia::Shanghai, (3600, 600, 7200), ) } #[test] fn write_report_creates_parseable_json() { let temp = tempfile::tempdir().unwrap(); let report_dir = temp.path().join("report"); let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); let service = RtrService::new(shared_cache.clone()); let notifier = service.notifier(); let context = ReportContext::new(test_configuration(10)); context.write_or_warn( &report_dir, "test", &shared_cache, ¬ifier, &service.stats(), ); let source_report = read_single_report(&report_dir, "rtr-source"); assert_eq!(source_report["schema_version"], 1); assert_eq!(source_report["phase"], "test"); assert_report_time_offset(&source_report["generated_at"]); assert_report_time_offset(&source_report["cache"]["created_at"]); assert_eq!(source_report["cache"]["availability"], "ready"); assert_eq!(source_report["refresh"]["status"], "not_attempted"); assert!(source_report["source"].is_null()); assert!(source_report["data_quality"].is_null()); let runtime_report = read_single_report(&report_dir, "rtr-runtime"); assert_report_time_offset(&runtime_report["service"]["started_at"]); assert_eq!( runtime_report["configuration"]["source_refresh_interval_seconds"], 300 ); assert_eq!(runtime_report["configuration"]["report_history_limit"], 10); let clients_report = read_single_report(&report_dir, "rtr-clients"); assert_eq!(clients_report["service"]["max_connections"], 1024); assert_eq!(clients_report["service"]["active_connections"], 0); assert_eq!( clients_report["service"]["connections_by_transport"]["tcp"], 0 ); assert_eq!( clients_report["service"]["connections_by_transport"]["tls"], 0 ); assert_eq!( clients_report["service"]["connections_by_transport"]["ssh"], 0 ); assert_eq!( source_report["cache"]["versions"].as_array().unwrap().len(), 3 ); assert_eq!( source_report["cache"]["versions"][2]["snapshot"]["total"], 0 ); assert_eq!( source_report["cache"]["memory"]["delta_payload_counts"][2], 0 ); assert!(source_report["source_fingerprint"].is_null()); assert_no_temporary_reports(&report_dir); } #[test] fn refresh_failure_preserves_last_successful_source_data() { let temp = tempfile::tempdir().unwrap(); let report_dir = temp.path().join("report"); let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); let service = RtrService::new(shared_cache.clone()); let notifier = service.notifier(); let context = ReportContext::new(test_configuration(10)); let source = SourceLoadReport { ccr_file: "data/example.ccr".to_string(), ccr_file_size_bytes: 123, ccr_modified_at: Some(Utc::now()), ccr_produced_at: Some("20260615000000Z".to_string()), slurm_enabled: true, slurm_file_count: 1, slurm_files: vec!["policy.slurm".to_string()], slurm_version: Some(2), }; let quality = DataQualityReport { ccr_input: PayloadTypeCounts { total: 11, vrp: 10, router_key: 0, aspa: 1, }, invalid: PayloadTypeCounts::default(), before_slurm: PayloadTypeCounts { total: 11, vrp: 10, router_key: 0, aspa: 1, }, after_slurm: PayloadTypeCounts { total: 10, vrp: 9, router_key: 0, aspa: 1, }, slurm_filters: SlurmRuleCounts { prefix: 1, router_key: 0, aspa: 0, }, slurm_assertions: SlurmRuleCounts::default(), }; context.record_source_fingerprint(SourceFingerprint { ccr: FileFingerprint { path: "data/example.ccr".to_string(), len: 123, modified_unix_secs: 1_781_404_800, }, slurm_files: vec![FileFingerprint { path: "policy.slurm".to_string(), len: 42, modified_unix_secs: 1_781_408_400, }], }); context.record_refresh_success(Utc::now(), 12, true, source, quality); context.record_refresh_failure(Utc::now(), 5, &anyhow::anyhow!("source unavailable")); context.write_or_warn( &report_dir, "refresh_failed", &shared_cache, ¬ifier, &service.stats(), ); let report = read_single_report(&report_dir, "rtr-source"); assert_eq!(report["source"]["ccr_file"], "data/example.ccr"); assert_eq!( report["source_fingerprint"]["ccr"]["path"], "data/example.ccr" ); assert_eq!(report["source_fingerprint"]["ccr"]["len"], 123); assert_report_time_offset(&report["source_fingerprint"]["ccr"]["modified_at"]); assert_eq!( report["source_fingerprint"]["slurm_files"][0]["path"], "policy.slurm" ); assert_report_time_offset(&report["source"]["ccr_modified_at"]); assert_eq!(report["data_quality"]["after_slurm"]["total"], 10); assert_eq!(report["refresh"]["status"], "failed"); assert_eq!(report["refresh"]["consecutive_failures"], 1); assert_eq!(report["refresh"]["last_error"], "source unavailable"); assert!(!report["refresh"]["last_success_at"].is_null()); assert_report_time_offset(&report["refresh"]["last_success_at"]); } #[test] fn rolling_reports_keep_latest_files_per_category() { let temp = tempfile::tempdir().unwrap(); let report_dir = temp.path().join("report"); let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); let service = RtrService::new(shared_cache.clone()); let notifier = service.notifier(); let context = ReportContext::new(test_configuration(2)); for phase in ["one", "two", "three"] { context.write_or_warn( &report_dir, phase, &shared_cache, ¬ifier, &service.stats(), ); } assert_eq!(report_files(&report_dir, "rtr-source").len(), 2); assert_eq!(report_files(&report_dir, "rtr-clients").len(), 2); assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 2); assert_no_temporary_reports(&report_dir); } #[test] fn category_writes_only_create_requested_report_type() { let temp = tempfile::tempdir().unwrap(); let report_dir = temp.path().join("report"); let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); let service = RtrService::new(shared_cache.clone()); let notifier = service.notifier(); let context = ReportContext::new(test_configuration(10)); context.write_source_or_warn( &report_dir, "source_only", &shared_cache, ¬ifier, &service.stats(), ); assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); assert_eq!(report_files(&report_dir, "rtr-clients").len(), 0); assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0); context.write_clients_or_warn( &report_dir, "clients_only", &shared_cache, ¬ifier, &service.stats(), ); assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1); assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 0); context.write_runtime_or_warn( &report_dir, "runtime_only", &shared_cache, ¬ifier, &service.stats(), ); assert_eq!(report_files(&report_dir, "rtr-source").len(), 1); assert_eq!(report_files(&report_dir, "rtr-clients").len(), 1); assert_eq!(report_files(&report_dir, "rtr-runtime").len(), 1); } fn assert_report_time_offset(value: &Value) { let value = value.as_str().expect("report time should be a string"); assert!( value.ends_with("+08:00"), "report time should use +08:00 offset, got {value}" ); } fn read_single_report(report_dir: &Path, prefix: &str) -> Value { let files = report_files(report_dir, prefix); assert_eq!(files.len(), 1, "expected one {prefix} report"); serde_json::from_slice(&fs::read(&files[0]).unwrap()).unwrap() } fn report_files(report_dir: &Path, prefix: &str) -> Vec { let start = format!("{prefix}-"); let mut files = fs::read_dir(report_dir) .unwrap() .map(|entry| entry.unwrap().path()) .filter(|path| { path.file_name() .and_then(|name| name.to_str()) .is_some_and(|name| name.starts_with(&start) && name.ends_with(".json")) }) .collect::>(); files.sort(); files } fn assert_no_temporary_reports(report_dir: &Path) { let has_temporary = fs::read_dir(report_dir).unwrap().any(|entry| { entry .unwrap() .file_name() .to_str() .is_some_and(|name| name.ends_with(".tmp")) }); assert!(!has_temporary); }