diff --git a/.gitignore b/.gitignore index 647fb8c..b8549f6 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ target/ rtr-db/ .idea/ logs/ +report/ diff --git a/Cargo.lock b/Cargo.lock index a4f3c31..dfcf055 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,6 +447,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1776,6 +1786,24 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2079,6 +2107,7 @@ dependencies = [ "base64", "bytes", "chrono", + "chrono-tz", "der-parser", "hex", "rand 0.10.0", @@ -2528,6 +2557,12 @@ dependencies = [ "rand_core 0.10.0", ] +[[package]] +name = "siphasher" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee5873ec9cce0195efcb7a4e9507a04cd49aec9c83d0389df45b1ef7ba2e649" + [[package]] name = "slab" version = "0.4.11" diff --git a/Cargo.toml b/Cargo.toml index d5dfe88..ea318bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ asn1-rs-derive = "0.6.0" asn1 = "0.23.0" arc-swap = "1.7.0" chrono = "0.4.44" +chrono-tz = "0.10" bytes = "1.11.1" tokio = { version = "1.49.0", features = ["full"] } rand = "0.10.0" diff --git a/deploy/server/.env b/deploy/server/.env index 069487e..a3db73e 100644 --- a/deploy/server/.env +++ b/deploy/server/.env @@ -9,13 +9,17 @@ RPKI_RTR_SLURM_DIR=/app/slurm # Persistent directories on host. RPKI_RTR_DB_HOST_DIR=../../rtr-db RPKI_RTR_LOG_HOST_DIR=../../logs/server +RPKI_RTR_REPORT_HOST_DIR=../../report # In-container runtime paths. RPKI_RTR_DB_PATH=/app/rtr-db +RPKI_RTR_REPORT_DIR=/app/report # Core runtime knobs. RPKI_RTR_STRICT_CCR_VALIDATION=false RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS=300 +RPKI_RTR_REPORT_INTERVAL_SECS=60 +RPKI_RTR_TIMEZONE=Asia/Shanghai RPKI_RTR_MAX_DELTA=10 RPKI_RTR_MAX_CONNECTIONS=100000 RPKI_RTR_MAX_CONCURRENT_HANDSHAKES=128 diff --git a/deploy/server/DEPLOYMENT.md b/deploy/server/DEPLOYMENT.md index 30ce369..deedf6c 100644 --- a/deploy/server/DEPLOYMENT.md +++ b/deploy/server/DEPLOYMENT.md @@ -18,6 +18,7 @@ The container runs `rpki` directly as PID 1. - CCR directory: `/app/data` - RocksDB directory: `/app/rtr-db` - SLURM directory: `/app/slurm` +- Report directory: `/app/report` - TLS cert directory (optional): `/app/certs` ## Path Configuration via `.env` @@ -28,11 +29,13 @@ The container runs `rpki` directly as PID 1. - `RPKI_RTR_SLURM_DIR`: in-container SLURM directory path - `RPKI_RTR_DB_HOST_DIR`: host RocksDB directory - `RPKI_RTR_LOG_HOST_DIR`: host log directory +- `RPKI_RTR_REPORT_HOST_DIR`: host directory receiving `rtr-server.json` - `RPKI_RTR_DB_PATH`: in-container RocksDB directory +- `RPKI_RTR_REPORT_DIR`: in-container report directory ## Runtime Configuration via `.env` -- Core: `RPKI_RTR_STRICT_CCR_VALIDATION`, `RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS`, `RPKI_RTR_MAX_DELTA`, `RPKI_RTR_MAX_CONCURRENT_HANDSHAKES`, `RUST_LOG` +- Core: `RPKI_RTR_STRICT_CCR_VALIDATION`, `RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS`, `RPKI_RTR_MAX_DELTA`, `RPKI_RTR_MAX_CONCURRENT_HANDSHAKES`, `RPKI_RTR_REPORT_INTERVAL_SECS`, `RPKI_RTR_TIMEZONE`, `RUST_LOG` - TCP mode: `RPKI_RTR_MAX_CONNECTIONS` - TLS mode: `RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH`, `RPKI_RTR_TLS_CERT_PATH`, `RPKI_RTR_TLS_KEY_PATH`, `RPKI_RTR_TLS_CLIENT_CA_PATH`, `RPKI_RTR_TLS_CERTS_HOST_DIR` - SSH mode: `RPKI_RTR_SSH_HOST_PORT`, `RPKI_RTR_SSH_CONTAINER_PORT`, `RPKI_RTR_SSH_AUTH_MODE`, `RPKI_RTR_SSH_USERNAME`, `RPKI_RTR_SSH_SUBSYSTEM_NAME`, `RPKI_RTR_SSH_HOST_KEY_PATH`, `RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH`, `RPKI_RTR_SSH_KEYS_VOLUME`, `RPKI_RTR_SSH_CERTS_HOST_DIR` @@ -54,3 +57,22 @@ docker compose -f deploy/server/docker-compose.yml down ```bash docker compose -f deploy/server/docker-compose.yml logs -f rpki-rtr ``` + +## Runtime Report + +The server creates `report/rtr-server.json` on startup and replaces it after +cache refreshes and at the configured interval. The default interval is 60 +seconds. The file contains service connection counts, process RSS, cache +timestamps, and per-protocol-version snapshot and delta counts. Schema version +2 also includes: + +- CCR and SLURM source file metadata +- latest refresh status, duration, failure count, and error +- CCR validation and SLURM before/after payload counts +- service start time and uptime +- non-sensitive runtime configuration + +Timestamps in logs and `rtr-server.json` use `RPKI_RTR_TIMEZONE`, which +defaults to `Asia/Shanghai`. Use IANA timezone names such as `Asia/Shanghai`, +`Europe/London`, `America/New_York`, or `UTC`; `Shanghai` is accepted as a +convenience alias for `Asia/Shanghai`. diff --git a/deploy/server/Dockerfile b/deploy/server/Dockerfile index 0c10435..1f52afd 100644 --- a/deploy/server/Dockerfile +++ b/deploy/server/Dockerfile @@ -63,7 +63,7 @@ WORKDIR /app COPY --from=builder /build/target/release/rpki_rtr /usr/local/bin/rpki_rtr COPY --chmod=755 deploy/server/entrypoint.sh /usr/local/bin/rpki-rtr-entrypoint.sh -RUN mkdir -p /app/data /app/rtr-db /app/certs /app/slurm /app/logs +RUN mkdir -p /app/data /app/rtr-db /app/certs /app/slurm /app/logs /app/report ENV RPKI_RTR_ENABLE_TLS=false \ RPKI_RTR_TCP_ADDR=0.0.0.0:323 \ @@ -71,6 +71,8 @@ ENV RPKI_RTR_ENABLE_TLS=false \ RPKI_RTR_DB_PATH=/app/rtr-db \ RPKI_RTR_CCR_DIR=/app/data \ RPKI_RTR_SLURM_DIR=/app/slurm \ + RPKI_RTR_REPORT_DIR=/app/report \ + RPKI_RTR_REPORT_INTERVAL_SECS=60 \ RPKI_RTR_REFRESH_INTERVAL_SECS=300 \ RPKI_RTR_STRICT_CCR_VALIDATION=false diff --git a/deploy/server/docker-compose.ssh.yml b/deploy/server/docker-compose.ssh.yml index 2e5c0d4..721c6dc 100644 --- a/deploy/server/docker-compose.ssh.yml +++ b/deploy/server/docker-compose.ssh.yml @@ -27,6 +27,9 @@ services: RPKI_RTR_DB_PATH: "${RPKI_RTR_DB_PATH:-/app/rtr-db}" RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" + RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" + RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" @@ -39,6 +42,7 @@ services: - ${RPKI_RTR_SSH_KEYS_VOLUME:-/etc/ssh:/host-ssh:ro} - ${RPKI_RTR_SSH_CERTS_HOST_DIR:-../../certs}:/app/certs:ro - ${RPKI_RTR_LOG_HOST_DIR:-../../logs/server}:/app/logs + - ${RPKI_RTR_REPORT_HOST_DIR:-../../report}:${RPKI_RTR_REPORT_DIR:-/app/report} networks: - rpki_net diff --git a/deploy/server/docker-compose.tcp.yml b/deploy/server/docker-compose.tcp.yml index 2345f97..869d7c5 100644 --- a/deploy/server/docker-compose.tcp.yml +++ b/deploy/server/docker-compose.tcp.yml @@ -17,6 +17,9 @@ services: RPKI_RTR_DB_PATH: "${RPKI_RTR_DB_PATH:-/app/rtr-db}" RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" + RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" + RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-60}" RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" @@ -28,6 +31,7 @@ services: - ${RPKI_RTR_DB_HOST_DIR:-../../rtr-db}:${RPKI_RTR_DB_PATH:-/app/rtr-db} - ${RPKI_RTR_SLURM_HOST_DIR:-../../data}:${RPKI_RTR_SLURM_DIR:-/app/slurm}:ro - ${RPKI_RTR_LOG_HOST_DIR:-../../logs/server}:/app/logs + - ${RPKI_RTR_REPORT_HOST_DIR:-../../report}:${RPKI_RTR_REPORT_DIR:-/app/report} networks: - rpki_net diff --git a/deploy/server/docker-compose.tls.yml b/deploy/server/docker-compose.tls.yml index a299d7a..89f2421 100644 --- a/deploy/server/docker-compose.tls.yml +++ b/deploy/server/docker-compose.tls.yml @@ -21,6 +21,9 @@ services: RPKI_RTR_DB_PATH: "${RPKI_RTR_DB_PATH:-/app/rtr-db}" RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" + RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" + RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" @@ -32,6 +35,7 @@ services: - ${RPKI_RTR_SLURM_HOST_DIR:-../../data}:${RPKI_RTR_SLURM_DIR:-/app/slurm}:ro - ${RPKI_RTR_TLS_CERTS_HOST_DIR:-../../tests/fixtures/tls}:/app/certs:ro - ${RPKI_RTR_LOG_HOST_DIR:-../../logs/server}:/app/logs + - ${RPKI_RTR_REPORT_HOST_DIR:-../../report}:${RPKI_RTR_REPORT_DIR:-/app/report} networks: - rpki_net diff --git a/deploy/server/docker-compose.yml b/deploy/server/docker-compose.yml index c7a2cd2..d607c78 100644 --- a/deploy/server/docker-compose.yml +++ b/deploy/server/docker-compose.yml @@ -20,6 +20,9 @@ services: RPKI_RTR_DB_PATH: "${RPKI_RTR_DB_PATH:-/app/rtr-db}" RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "${RPKI_RTR_SLURM_DIR:-/app/slurm}" + RPKI_RTR_REPORT_DIR: "${RPKI_RTR_REPORT_DIR:-/app/report}" + RPKI_RTR_REPORT_INTERVAL_SECS: "${RPKI_RTR_REPORT_INTERVAL_SECS:-60}" + RPKI_RTR_TIMEZONE: "${RPKI_RTR_TIMEZONE:-Asia/Shanghai}" RPKI_RTR_STRICT_CCR_VALIDATION: "${RPKI_RTR_STRICT_CCR_VALIDATION:-false}" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "${RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS:-300}" RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" @@ -40,6 +43,7 @@ services: - ${RPKI_RTR_DB_HOST_DIR:-../../rtr-db}:${RPKI_RTR_DB_PATH:-/app/rtr-db} - ${RPKI_RTR_SLURM_HOST_DIR:-../../data}:${RPKI_RTR_SLURM_DIR:-/app/slurm}:ro - ${RPKI_RTR_LOG_HOST_DIR:-../../logs/server}:/app/logs + - ${RPKI_RTR_REPORT_HOST_DIR:-../../report}:${RPKI_RTR_REPORT_DIR:-/app/report} # TLS mode example: # - ../../certs:/app/certs:ro networks: diff --git a/src/main_rtr.rs b/src/main_rtr.rs index 1f23c85..53c27a0 100644 --- a/src/main_rtr.rs +++ b/src/main_rtr.rs @@ -1,314 +1,58 @@ -use std::env; -use std::fs; -use std::net::SocketAddr; +use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; -use anyhow::{Result, anyhow}; +use anyhow::Result; use arc_swap::ArcSwap; -use chrono::{FixedOffset, Utc}; +use chrono::Utc; use tokio::task::JoinHandle; use tracing::{info, warn}; use rpki::rtr::cache::{RtrCache, SharedRtrCache, Snapshot}; -use rpki::rtr::payload::Timing; -use rpki::rtr::server::ssh::SshAuthMode; -use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; +use rpki::rtr::config::{AppConfig, log_startup_config}; +use rpki::rtr::report::{ReportConfiguration, ReportContext, current_rss_mib}; +use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceStats, RunningRtrService}; use rpki::rtr::store::RtrStore; use rpki::source::pipeline::{ - PayloadLoadConfig, SourceFingerprint, latest_sources_fingerprint, load_payloads_from_latest_sources, + PayloadLoadConfig, SourceFingerprint, latest_sources_fingerprint, + load_payloads_from_latest_sources_with_report, }; -#[derive(Debug, Clone)] -struct AppConfig { - enable_tls: bool, - enable_ssh: bool, - tcp_addr: SocketAddr, - tls_addr: SocketAddr, - ssh_addr: SocketAddr, - - db_path: String, - ccr_dir: String, - slurm_dir: Option, - tls_cert_path: String, - tls_key_path: String, - tls_client_ca_path: String, - ssh_host_key_path: String, - ssh_authorized_keys_path: String, - ssh_username: String, - ssh_subsystem_name: String, - ssh_auth_mode: SshAuthMode, - ssh_password: Option, - - max_delta: u8, - prune_delta_by_snapshot_size: bool, - strict_ccr_validation: bool, - source_refresh_interval: Duration, - timing: Timing, - - service_config: RtrServiceConfig, -} - -impl Default for AppConfig { - fn default() -> Self { - Self { - enable_tls: false, - enable_ssh: false, - tcp_addr: "0.0.0.0:323".parse().expect("invalid default tcp_addr"), - tls_addr: "0.0.0.0:324".parse().expect("invalid default tls_addr"), - ssh_addr: "0.0.0.0:22".parse().expect("invalid default ssh_addr"), - - db_path: "./rtr-db".to_string(), - ccr_dir: "./data".to_string(), - slurm_dir: None, - tls_cert_path: "./certs/server.crt".to_string(), - tls_key_path: "./certs/server.key".to_string(), - tls_client_ca_path: "./certs/client-ca.crt".to_string(), - ssh_host_key_path: "./certs/ssh_host_ed25519_key".to_string(), - ssh_authorized_keys_path: "./certs/rtr-authorized_keys".to_string(), - ssh_username: "rpki-rtr".to_string(), - ssh_subsystem_name: "rpki-rtr".to_string(), - ssh_auth_mode: SshAuthMode::Key, - ssh_password: None, - - max_delta: 100, - prune_delta_by_snapshot_size: false, - strict_ccr_validation: false, - source_refresh_interval: Duration::from_secs(300), - timing: Timing::default(), - - service_config: RtrServiceConfig { - max_connections: 512, - max_concurrent_handshakes: 128, - notify_queue_size: 1024, - tcp_keepalive: Some(Duration::from_secs(60)), - warn_insecure_tcp: true, - require_tls_server_dns_name_san: false, - enforce_tls_client_san_ip_match: true, - }, - } - } -} - -impl AppConfig { - fn from_env() -> Result { - let mut config = Self::default(); - - // TLS and TCP - if let Some(value) = env_var("RPKI_RTR_ENABLE_TLS")? { - config.enable_tls = parse_bool(&value, "RPKI_RTR_ENABLE_TLS")?; - } - if let Some(value) = env_var("RPKI_RTR_ENABLE_SSH")? { - config.enable_ssh = parse_bool(&value, "RPKI_RTR_ENABLE_SSH")?; - } - if let Some(value) = env_var("RPKI_RTR_TCP_ADDR")? { - config.tcp_addr = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_TCP_ADDR '{}': {}", value, err))?; - } - if let Some(value) = env_var("RPKI_RTR_TLS_ADDR")? { - config.tls_addr = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_TLS_ADDR '{}': {}", value, err))?; - } - if let Some(value) = env_var("RPKI_RTR_SSH_ADDR")? { - config.ssh_addr = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_ADDR '{}': {}", value, err))?; - } - if let Some(value) = env_var("RPKI_RTR_SSH_PORT")? { - let port: u16 = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_PORT '{}': {}", value, err))?; - config.ssh_addr.set_port(port); - } - - // data - if let Some(value) = env_var("RPKI_RTR_DB_PATH")? { - config.db_path = value; - } - if let Some(value) = env_var("RPKI_RTR_CCR_DIR")? { - config.ccr_dir = value; - } - if let Some(value) = env_var("RPKI_RTR_SLURM_DIR")? { - let value = value.trim(); - config.slurm_dir = if value.is_empty() { - None - } else { - Some(value.to_string()) - }; - } - if let Some(value) = env_var("RPKI_RTR_TLS_CERT_PATH")? { - config.tls_cert_path = value; - } - if let Some(value) = env_var("RPKI_RTR_TLS_KEY_PATH")? { - config.tls_key_path = value; - } - if let Some(value) = env_var("RPKI_RTR_TLS_CLIENT_CA_PATH")? { - config.tls_client_ca_path = value; - } - if let Some(value) = env_var("RPKI_RTR_SSH_HOST_KEY_PATH")? { - config.ssh_host_key_path = value; - } - if let Some(value) = env_var("RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH")? { - config.ssh_authorized_keys_path = value; - } - if let Some(value) = env_var("RPKI_RTR_SSH_USERNAME")? { - config.ssh_username = value; - } - if let Some(value) = env_var("RPKI_RTR_SSH_SUBSYSTEM_NAME")? { - config.ssh_subsystem_name = value; - } - if let Some(value) = env_var("RPKI_RTR_SSH_AUTH_MODE")? { - config.ssh_auth_mode = SshAuthMode::parse(&value).ok_or_else(|| { - anyhow!( - "invalid RPKI_RTR_SSH_AUTH_MODE '{}': expected key|password|both", - value - ) - })?; - } - if let Some(value) = env_var("RPKI_RTR_SSH_PASSWORD")? { - let value = value.trim().to_string(); - config.ssh_password = if value.is_empty() { None } else { Some(value) }; - } - if let Some(value) = env_var("RPKI_RTR_MAX_DELTA")? { - let parsed: u8 = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_DELTA '{}': {}", value, err))?; - if parsed == 0 { - return Err(anyhow!( - "invalid RPKI_RTR_MAX_DELTA '{}': must be >= 1", - value - )); - } - config.max_delta = parsed; - } - if let Some(value) = env_var("RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")? { - config.prune_delta_by_snapshot_size = - parse_bool(&value, "RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")?; - } - if let Some(value) = env_var("RPKI_RTR_STRICT_CCR_VALIDATION")? { - config.strict_ccr_validation = parse_bool(&value, "RPKI_RTR_STRICT_CCR_VALIDATION")?; - } - let source_refresh_interval_new = env_var("RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; - let source_refresh_interval_legacy = env_var("RPKI_RTR_REFRESH_INTERVAL_SECS")?; - match ( - source_refresh_interval_new.as_deref(), - source_refresh_interval_legacy.as_deref(), - ) { - (Some(new_value), Some(_)) => { - let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; - config.source_refresh_interval = Duration::from_secs(secs); - warn!( - "both RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS and legacy RPKI_RTR_REFRESH_INTERVAL_SECS are set; using RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS" - ); - } - (Some(new_value), None) => { - let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; - config.source_refresh_interval = Duration::from_secs(secs); - } - (None, Some(legacy_value)) => { - let secs = parse_positive_u64(legacy_value, "RPKI_RTR_REFRESH_INTERVAL_SECS")?; - config.source_refresh_interval = Duration::from_secs(secs); - warn!( - "RPKI_RTR_REFRESH_INTERVAL_SECS is deprecated; use RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS" - ); - } - (None, None) => {} - } - if let Some(value) = env_var("RPKI_RTR_TIMING_REFRESH_SECS")? { - config.timing.refresh = parse_positive_u32(&value, "RPKI_RTR_TIMING_REFRESH_SECS")?; - } - if let Some(value) = env_var("RPKI_RTR_TIMING_RETRY_SECS")? { - config.timing.retry = parse_positive_u32(&value, "RPKI_RTR_TIMING_RETRY_SECS")?; - } - if let Some(value) = env_var("RPKI_RTR_TIMING_EXPIRE_SECS")? { - config.timing.expire = parse_positive_u32(&value, "RPKI_RTR_TIMING_EXPIRE_SECS")?; - } - config - .timing - .validate() - .map_err(|err| anyhow!("invalid RTR timing configuration: {}", err))?; - if let Some(value) = env_var("RPKI_RTR_MAX_CONNECTIONS")? { - config.service_config.max_connections = value - .parse() - .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err))?; - } - if let Some(value) = env_var("RPKI_RTR_MAX_CONCURRENT_HANDSHAKES")? { - config.service_config.max_concurrent_handshakes = value.parse().map_err(|err| { - anyhow!( - "invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': {}", - value, - err - ) - })?; - } - if let Some(value) = env_var("RPKI_RTR_NOTIFY_QUEUE_SIZE")? { - config.service_config.notify_queue_size = value.parse().map_err(|err| { - anyhow!("invalid RPKI_RTR_NOTIFY_QUEUE_SIZE '{}': {}", value, err) - })?; - } - if let Some(value) = env_var("RPKI_RTR_TCP_KEEPALIVE_SECS")? { - let secs: u64 = value.parse().map_err(|err| { - anyhow!("invalid RPKI_RTR_TCP_KEEPALIVE_SECS '{}': {}", value, err) - })?; - config.service_config.tcp_keepalive = if secs == 0 { - None - } else { - Some(Duration::from_secs(secs)) - }; - } - if let Some(value) = env_var("RPKI_RTR_WARN_INSECURE_TCP")? { - config.service_config.warn_insecure_tcp = - parse_bool(&value, "RPKI_RTR_WARN_INSECURE_TCP")?; - } - if let Some(value) = env_var("RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")? { - config.service_config.require_tls_server_dns_name_san = - parse_bool(&value, "RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")?; - } - if let Some(value) = env_var("RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")? { - config.service_config.enforce_tls_client_san_ip_match = - parse_bool(&value, "RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")?; - } - if config.service_config.max_connections == 0 { - return Err(anyhow!( - "invalid RPKI_RTR_MAX_CONNECTIONS '{}': must be >= 1", - config.service_config.max_connections - )); - } - if config.service_config.max_concurrent_handshakes == 0 { - return Err(anyhow!( - "invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': must be >= 1", - config.service_config.max_concurrent_handshakes - )); - } - if config.service_config.max_concurrent_handshakes > config.service_config.max_connections { - return Err(anyhow!( - "invalid handshake/connection limits: RPKI_RTR_MAX_CONCURRENT_HANDSHAKES ({}) must be <= RPKI_RTR_MAX_CONNECTIONS ({})", - config.service_config.max_concurrent_handshakes, - config.service_config.max_connections - )); - } - - Ok(config) - } -} - #[tokio::main] async fn main() -> Result<()> { - init_tracing(); - let config = AppConfig::from_env()?; + init_tracing(config.timezone); log_startup_config(&config); + let report_context = ReportContext::new(ReportConfiguration::new( + config.source_refresh_interval.as_secs(), + config.report_interval.as_secs(), + config.max_delta, + config.prune_delta_by_snapshot_size, + config.strict_ccr_validation, + config.timezone, + ( + config.timing.refresh, + config.timing.retry, + config.timing.expire, + ), + )); let store = open_store(&config)?; - let shared_cache = init_shared_cache(&config, &store)?; + let shared_cache = init_shared_cache(&config, &store, &report_context)?; let service = RtrService::with_config(shared_cache.clone(), config.service_config.clone()); let notifier = service.notifier(); + let service_stats = service.stats(); let running = start_servers(&config, &service); - let refresh_task = spawn_refresh_task(&config, shared_cache.clone(), store.clone(), notifier); + let refresh_task = spawn_refresh_task( + &config, + shared_cache.clone(), + store.clone(), + notifier, + service_stats, + report_context, + ); wait_for_shutdown().await?; @@ -327,27 +71,39 @@ fn open_store(config: &AppConfig) -> Result { RtrStore::open(&config.db_path) } -fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result { +fn init_shared_cache( + config: &AppConfig, + store: &RtrStore, + report_context: &ReportContext, +) -> Result { let payload_load_config = PayloadLoadConfig { ccr_dir: config.ccr_dir.clone(), slurm_dir: config.slurm_dir.clone(), strict_ccr_validation: config.strict_ccr_validation, }; let source_to_delta_started = Instant::now(); + let report_context_for_loader = report_context.clone(); let initial_cache = RtrCache::default().init( store, config.max_delta, config.prune_delta_by_snapshot_size, config.timing, || { - let payloads = load_payloads_from_latest_sources(&payload_load_config)?; + let load = load_payloads_from_latest_sources_with_report(&payload_load_config)?; info!( "RTR source-to-delta timing: phase=startup_load_complete, ccr_dir={}, payload_count={}, elapsed_ms={}", payload_load_config.ccr_dir, - payloads.len(), + load.payloads.len(), source_to_delta_started.elapsed().as_millis() ); - Ok(payloads) + report_context_for_loader.record_refresh_success( + Utc::now(), + source_to_delta_started.elapsed().as_millis(), + true, + load.source, + load.quality, + ); + Ok(load.payloads) }, )?; info!( @@ -417,8 +173,12 @@ fn spawn_refresh_task( shared_cache: SharedRtrCache, store: RtrStore, notifier: RtrNotifier, + service_stats: RtrServiceStats, + report_context: ReportContext, ) -> JoinHandle<()> { let refresh_interval = config.source_refresh_interval; + let report_interval = config.report_interval; + let report_dir = PathBuf::from(&config.report_dir); let payload_load_config = PayloadLoadConfig { ccr_dir: config.ccr_dir.clone(), slurm_dir: config.slurm_dir.clone(), @@ -428,33 +188,61 @@ fn spawn_refresh_task( tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); let mut last_fingerprint: Option = None; - let mut stats_interval = tokio::time::interval(Duration::from_secs(60)); + report_context.write_or_warn( + &report_dir, + "startup", + &shared_cache, + ¬ifier, + &service_stats, + ); + let mut stats_interval = tokio::time::interval_at( + tokio::time::Instant::now() + report_interval, + report_interval, + ); stats_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = stats_interval.tick() => { log_cache_memory_stats("periodic_observe", &shared_cache, ¬ifier); + report_context.write_or_warn(&report_dir, "periodic", &shared_cache, ¬ifier, &service_stats); continue; } _ = interval.tick() => {} } let source_to_delta_started = Instant::now(); + let attempted_at = Utc::now(); let current_fingerprint = match latest_sources_fingerprint(&payload_load_config) { Ok(fp) => fp, Err(err) => { + report_context.record_refresh_failure( + attempted_at, + source_to_delta_started.elapsed().as_millis(), + &err, + ); warn!( "failed to fingerprint CCR/SLURM sources from {}: {:?} (source_to_delta_elapsed_ms={})", payload_load_config.ccr_dir, err, source_to_delta_started.elapsed().as_millis() ); + report_context.write_or_warn( + &report_dir, + "refresh_failed", + &shared_cache, + ¬ifier, + &service_stats, + ); continue; } }; if last_fingerprint.as_ref() == Some(¤t_fingerprint) { + report_context.record_refresh_unchanged( + attempted_at, + source_to_delta_started.elapsed().as_millis(), + ); info!( "RTR source refresh skipped: source files unchanged (ccr_path={}, slurm_file_count={}, elapsed_ms={})", current_fingerprint.ccr.path, @@ -462,11 +250,21 @@ fn spawn_refresh_task( source_to_delta_started.elapsed().as_millis() ); log_cache_memory_stats("refresh_skipped_unchanged", &shared_cache, ¬ifier); + report_context.write_or_warn( + &report_dir, + "refresh_skipped_unchanged", + &shared_cache, + ¬ifier, + &service_stats, + ); continue; } - match load_payloads_from_latest_sources(&payload_load_config) { - Ok(payloads) => { + match load_payloads_from_latest_sources_with_report(&payload_load_config) { + Ok(load) => { + let source = load.source; + let quality = load.quality; + let payloads = load.payloads; let (payload_count, updated) = { let payload_count = payloads.len(); let source_snapshot = Snapshot::from_payloads(payloads); @@ -474,7 +272,8 @@ fn spawn_refresh_task( let old_serial = old_cache.serial_for_version(2); let mut next_cache = old_cache.as_ref().clone(); - let updated = match next_cache.update_with_snapshot(source_snapshot, &store) { + let updated = match next_cache.update_with_snapshot(source_snapshot, &store) + { Ok(()) => { let new_serial = next_cache.serial_for_version(2); shared_cache.store(std::sync::Arc::new(next_cache)); @@ -496,12 +295,31 @@ fn spawn_refresh_task( } } Err(err) => { + report_context.record_refresh_failure( + attempted_at, + source_to_delta_started.elapsed().as_millis(), + &err, + ); warn!("RTR cache update failed: {:?}", err); - false + report_context.write_or_warn( + &report_dir, + "refresh_failed", + &shared_cache, + ¬ifier, + &service_stats, + ); + continue; } }; (payload_count, updated) }; + report_context.record_refresh_success( + attempted_at, + source_to_delta_started.elapsed().as_millis(), + updated, + source, + quality, + ); info!( "RTR source-to-delta timing: phase=refresh_cache_update_complete, ccr_dir={}, payload_count={}, changed={}, elapsed_ms={}", payload_load_config.ccr_dir, @@ -518,15 +336,34 @@ fn spawn_refresh_task( ); } log_cache_memory_stats("refresh_complete", &shared_cache, ¬ifier); + report_context.write_or_warn( + &report_dir, + "refresh_complete", + &shared_cache, + ¬ifier, + &service_stats, + ); last_fingerprint = Some(current_fingerprint); } Err(err) => { + report_context.record_refresh_failure( + attempted_at, + source_to_delta_started.elapsed().as_millis(), + &err, + ); warn!( "failed to reload CCR/SLURM payloads from {}: {:?} (source_to_delta_elapsed_ms={})", payload_load_config.ccr_dir, err, source_to_delta_started.elapsed().as_millis() ); + report_context.write_or_warn( + &report_dir, + "refresh_failed", + &shared_cache, + ¬ifier, + &service_stats, + ); } } } @@ -552,113 +389,32 @@ fn log_cache_memory_stats(phase: &str, shared_cache: &SharedRtrCache, notifier: ); } -fn current_rss_mib() -> Option { - let status = fs::read_to_string("/proc/self/status").ok()?; - let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?; - let kb = vmrss_line - .split_whitespace() - .nth(1) - .and_then(|v| v.parse::().ok())?; - Some(kb / 1024) -} - async fn wait_for_shutdown() -> Result<()> { tokio::signal::ctrl_c().await?; info!("shutdown signal received"); Ok(()) } -fn log_startup_config(config: &AppConfig) { - info!("starting RTR service"); - info!("db_path={}", config.db_path); - info!("tcp_addr={}", config.tcp_addr); - info!("tls_enabled={}", config.enable_tls); - info!("ssh_enabled={}", config.enable_ssh); - - if config.enable_tls { - info!("tls_addr={}", config.tls_addr); - info!("tls_cert_path={}", config.tls_cert_path); - info!("tls_key_path={}", config.tls_key_path); - info!("tls_client_ca_path={}", config.tls_client_ca_path); - } - if config.enable_ssh { - info!("ssh_addr={}", config.ssh_addr); - info!("ssh_host_key_path={}", config.ssh_host_key_path); - info!( - "ssh_authorized_keys_path={}", - config.ssh_authorized_keys_path - ); - info!("ssh_username={}", config.ssh_username); - info!("ssh_subsystem_name={}", config.ssh_subsystem_name); - info!("ssh_auth_mode={}", config.ssh_auth_mode.as_str()); - info!("ssh_password_enabled={}", config.ssh_password.is_some()); - } - - info!("ccr_dir={}", config.ccr_dir); - info!( - "slurm_dir={}", - config.slurm_dir.as_deref().unwrap_or("disabled") - ); - info!("max_delta={}", config.max_delta); - info!("strict_ccr_validation={}", config.strict_ccr_validation); - info!( - "source_refresh_interval_secs={}", - config.source_refresh_interval.as_secs() - ); - info!("rtr_timing_refresh_secs={}", config.timing.refresh); - info!("rtr_timing_retry_secs={}", config.timing.retry); - info!("rtr_timing_expire_secs={}", config.timing.expire); - info!("max_connections={}", config.service_config.max_connections); - info!( - "max_concurrent_handshakes={}", - config.service_config.max_concurrent_handshakes - ); - info!( - "notify_queue_size={}", - config.service_config.notify_queue_size - ); - info!( - "tcp_keepalive_secs={}", - config - .service_config - .tcp_keepalive - .map(|duration| duration.as_secs().to_string()) - .unwrap_or_else(|| "disabled".to_string()) - ); - info!( - "warn_insecure_tcp={}", - config.service_config.warn_insecure_tcp - ); - info!( - "require_tls_server_dns_name_san={}", - config.service_config.require_tls_server_dns_name_san - ); - info!( - "enforce_tls_client_san_ip_match={}", - config.service_config.enforce_tls_client_san_ip_match - ); -} - -fn init_tracing() { +fn init_tracing(timezone: chrono_tz::Tz) { let filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")); - struct ShanghaiTimer; + struct LocalTimer { + timezone: chrono_tz::Tz, + } - impl tracing_subscriber::fmt::time::FormatTime for ShanghaiTimer { + impl tracing_subscriber::fmt::time::FormatTime for LocalTimer { fn format_time( &self, w: &mut tracing_subscriber::fmt::format::Writer<'_>, ) -> std::fmt::Result { - let shanghai_offset = FixedOffset::east_opt(8 * 60 * 60) - .expect("fixed +08:00 offset should always be valid"); - let now = Utc::now().with_timezone(&shanghai_offset); + let now = Utc::now().with_timezone(&self.timezone); write!(w, "{}", now.format("%Y-%m-%d %H:%M:%S%.3f %:z")) } } if let Err(err) = tracing_subscriber::fmt() - .with_timer(ShanghaiTimer) + .with_timer(LocalTimer { timezone }) .with_env_filter(filter) .with_target(true) .with_thread_ids(true) @@ -668,39 +424,3 @@ fn init_tracing() { eprintln!("failed to initialize tracing subscriber: {err}"); } } - -fn env_var(name: &str) -> Result> { - match env::var(name) { - Ok(value) => Ok(Some(value)), - Err(env::VarError::NotPresent) => Ok(None), - Err(err) => Err(anyhow!("failed to read {}: {}", name, err)), - } -} - -fn parse_bool(value: &str, name: &str) -> Result { - match value.trim().to_ascii_lowercase().as_str() { - "1" | "true" | "yes" | "on" => Ok(true), - "0" | "false" | "no" | "off" => Ok(false), - _ => Err(anyhow!("invalid {} '{}': expected boolean", name, value)), - } -} - -fn parse_positive_u64(value: &str, name: &str) -> Result { - let parsed = value - .parse::() - .map_err(|err| anyhow!("invalid {} '{}': {}", name, value, err))?; - if parsed == 0 { - return Err(anyhow!("invalid {} '{}': must be >= 1", name, value)); - } - Ok(parsed) -} - -fn parse_positive_u32(value: &str, name: &str) -> Result { - let parsed = value - .parse::() - .map_err(|err| anyhow!("invalid {} '{}': {}", name, value, err))?; - if parsed == 0 { - return Err(anyhow!("invalid {} '{}': must be >= 1", name, value)); - } - Ok(parsed) -} diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index a7885ae..5b6b549 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -443,7 +443,10 @@ impl RtrCache { } pub fn snapshot_for_version(&self, version: u8) -> Snapshot { - self.versions[version_index(version)].snapshot.as_ref().clone() + self.versions[version_index(version)] + .snapshot + .as_ref() + .clone() } pub fn rtr_payloads_for_version(&self, version: u8) -> Arc> { @@ -661,6 +664,66 @@ pub struct CacheMemoryStats { pub rtr_payloads_arc_strong_counts: [usize; VERSION_COUNT], } +#[derive(Debug, Clone, Copy, Default, Serialize)] +pub struct PayloadCounts { + pub total: usize, + pub vrp: usize, + pub router_key: usize, + pub aspa: usize, +} + +impl PayloadCounts { + fn add_payload(&mut self, payload: &Payload) { + self.total += 1; + match payload { + Payload::RouteOrigin(_) => self.vrp += 1, + Payload::RouterKey(_) => self.router_key += 1, + Payload::Aspa(_) => self.aspa += 1, + } + } + + fn add_counts(&mut self, other: Self) { + self.total += other.total; + self.vrp += other.vrp; + self.router_key += other.router_key; + self.aspa += other.aspa; + } + + fn from_payloads(payloads: &[Payload]) -> Self { + let mut counts = Self::default(); + for payload in payloads { + counts.add_payload(payload); + } + counts + } +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct DeltaReportStats { + pub serial: u32, + pub announced: PayloadCounts, + pub withdrawn: PayloadCounts, +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct DeltaWindowReportStats { + pub length: usize, + pub oldest_serial: Option, + pub newest_serial: Option, + pub announced: PayloadCounts, + pub withdrawn: PayloadCounts, +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct VersionReportStats { + pub version: u8, + pub session_id: u16, + pub serial: u32, + pub snapshot: PayloadCounts, + pub latest_delta: Option, + pub delta_window: DeltaWindowReportStats, +} + impl RtrCache { pub fn memory_stats(&self) -> CacheMemoryStats { let snapshot_payload_counts = @@ -687,6 +750,44 @@ impl RtrCache { rtr_payloads_arc_strong_counts, } } + + pub fn version_report_stats(&self) -> [VersionReportStats; VERSION_COUNT] { + std::array::from_fn(|idx| { + let state = &self.versions[idx]; + let snapshot = PayloadCounts { + total: state.rtr_payloads.len(), + vrp: state.snapshot.origins().len(), + router_key: state.snapshot.router_keys().len(), + aspa: state.snapshot.aspas().len(), + }; + let latest_delta = state.deltas.back().map(|delta| DeltaReportStats { + serial: delta.serial(), + announced: PayloadCounts::from_payloads(delta.announced()), + withdrawn: PayloadCounts::from_payloads(delta.withdrawn()), + }); + let mut window_announced = PayloadCounts::default(); + let mut window_withdrawn = PayloadCounts::default(); + for delta in &state.deltas { + window_announced.add_counts(PayloadCounts::from_payloads(delta.announced())); + window_withdrawn.add_counts(PayloadCounts::from_payloads(delta.withdrawn())); + } + + VersionReportStats { + version: idx as u8, + session_id: state.session_id, + serial: state.serial, + snapshot, + latest_delta, + delta_window: DeltaWindowReportStats { + length: state.deltas.len(), + oldest_serial: state.deltas.front().map(|delta| delta.serial()), + newest_serial: state.deltas.back().map(|delta| delta.serial()), + announced: window_announced, + withdrawn: window_withdrawn, + }, + } + }) + } } #[derive(Clone)] diff --git a/src/rtr/cache/mod.rs b/src/rtr/cache/mod.rs index 57a9793..f8d031d 100644 --- a/src/rtr/cache/mod.rs +++ b/src/rtr/cache/mod.rs @@ -4,7 +4,8 @@ mod ordering; mod store; pub use core::{ - CacheAvailability, CacheMemoryStats, RtrCache, RtrCacheBuilder, SerialResult, SessionIds, + CacheAvailability, CacheMemoryStats, DeltaReportStats, DeltaWindowReportStats, PayloadCounts, + RtrCache, RtrCacheBuilder, SerialResult, SessionIds, VersionReportStats, }; pub use model::{Delta, DualTime, Snapshot}; pub use ordering::{ diff --git a/src/rtr/config.rs b/src/rtr/config.rs new file mode 100644 index 0000000..d463b1e --- /dev/null +++ b/src/rtr/config.rs @@ -0,0 +1,474 @@ +use std::env; +use std::net::SocketAddr; +use std::time::Duration; + +use anyhow::{Result, anyhow}; +use chrono_tz::Tz; +use tracing::{info, warn}; + +use crate::rtr::payload::Timing; +use crate::rtr::server::RtrServiceConfig; +use crate::rtr::server::ssh::SshAuthMode; + +#[derive(Debug, Clone)] +pub struct AppConfig { + pub enable_tls: bool, + pub enable_ssh: bool, + pub tcp_addr: SocketAddr, + pub tls_addr: SocketAddr, + pub ssh_addr: SocketAddr, + + pub db_path: String, + pub ccr_dir: String, + pub slurm_dir: Option, + pub tls_cert_path: String, + pub tls_key_path: String, + pub tls_client_ca_path: String, + pub ssh_host_key_path: String, + pub ssh_authorized_keys_path: String, + pub ssh_username: String, + pub ssh_subsystem_name: String, + pub ssh_auth_mode: SshAuthMode, + pub ssh_password: Option, + + pub max_delta: u8, + pub prune_delta_by_snapshot_size: bool, + pub strict_ccr_validation: bool, + pub source_refresh_interval: Duration, + pub report_dir: String, + pub report_interval: Duration, + pub timezone: Tz, + pub timing: Timing, + + pub service_config: RtrServiceConfig, +} + +impl Default for AppConfig { + fn default() -> Self { + Self { + enable_tls: false, + enable_ssh: false, + tcp_addr: "0.0.0.0:323".parse().expect("invalid default tcp_addr"), + tls_addr: "0.0.0.0:324".parse().expect("invalid default tls_addr"), + ssh_addr: "0.0.0.0:22".parse().expect("invalid default ssh_addr"), + + db_path: "./rtr-db".to_string(), + ccr_dir: "./data".to_string(), + slurm_dir: None, + tls_cert_path: "./certs/server.crt".to_string(), + tls_key_path: "./certs/server.key".to_string(), + tls_client_ca_path: "./certs/client-ca.crt".to_string(), + ssh_host_key_path: "./certs/ssh_host_ed25519_key".to_string(), + ssh_authorized_keys_path: "./certs/rtr-authorized_keys".to_string(), + ssh_username: "rpki-rtr".to_string(), + ssh_subsystem_name: "rpki-rtr".to_string(), + ssh_auth_mode: SshAuthMode::Key, + ssh_password: None, + + max_delta: 100, + prune_delta_by_snapshot_size: false, + strict_ccr_validation: false, + source_refresh_interval: Duration::from_secs(300), + report_dir: "./report".to_string(), + report_interval: Duration::from_secs(60), + timezone: default_timezone(), + timing: Timing::default(), + + service_config: RtrServiceConfig { + max_connections: 512, + max_concurrent_handshakes: 128, + notify_queue_size: 1024, + tcp_keepalive: Some(Duration::from_secs(60)), + warn_insecure_tcp: true, + require_tls_server_dns_name_san: false, + enforce_tls_client_san_ip_match: true, + }, + } + } +} + +impl AppConfig { + pub fn from_env() -> Result { + let mut config = Self::default(); + + if let Some(value) = env_var("RPKI_RTR_ENABLE_TLS")? { + config.enable_tls = parse_bool(&value, "RPKI_RTR_ENABLE_TLS")?; + } + if let Some(value) = env_var("RPKI_RTR_ENABLE_SSH")? { + config.enable_ssh = parse_bool(&value, "RPKI_RTR_ENABLE_SSH")?; + } + if let Some(value) = env_var("RPKI_RTR_TCP_ADDR")? { + config.tcp_addr = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_TCP_ADDR '{}': {}", value, err))?; + } + if let Some(value) = env_var("RPKI_RTR_TLS_ADDR")? { + config.tls_addr = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_TLS_ADDR '{}': {}", value, err))?; + } + if let Some(value) = env_var("RPKI_RTR_SSH_ADDR")? { + config.ssh_addr = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_ADDR '{}': {}", value, err))?; + } + if let Some(value) = env_var("RPKI_RTR_SSH_PORT")? { + let port: u16 = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_PORT '{}': {}", value, err))?; + config.ssh_addr.set_port(port); + } + + if let Some(value) = env_var("RPKI_RTR_DB_PATH")? { + config.db_path = value; + } + if let Some(value) = env_var("RPKI_RTR_CCR_DIR")? { + config.ccr_dir = value; + } + if let Some(value) = env_var("RPKI_RTR_SLURM_DIR")? { + let value = value.trim(); + config.slurm_dir = if value.is_empty() { + None + } else { + Some(value.to_string()) + }; + } + if let Some(value) = env_var("RPKI_RTR_TLS_CERT_PATH")? { + config.tls_cert_path = value; + } + if let Some(value) = env_var("RPKI_RTR_TLS_KEY_PATH")? { + config.tls_key_path = value; + } + if let Some(value) = env_var("RPKI_RTR_TLS_CLIENT_CA_PATH")? { + config.tls_client_ca_path = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_HOST_KEY_PATH")? { + config.ssh_host_key_path = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH")? { + config.ssh_authorized_keys_path = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_USERNAME")? { + config.ssh_username = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_SUBSYSTEM_NAME")? { + config.ssh_subsystem_name = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_AUTH_MODE")? { + config.ssh_auth_mode = SshAuthMode::parse(&value).ok_or_else(|| { + anyhow!( + "invalid RPKI_RTR_SSH_AUTH_MODE '{}': expected key|password|both", + value + ) + })?; + } + if let Some(value) = env_var("RPKI_RTR_SSH_PASSWORD")? { + let value = value.trim().to_string(); + config.ssh_password = if value.is_empty() { None } else { Some(value) }; + } + if let Some(value) = env_var("RPKI_RTR_MAX_DELTA")? { + let parsed: u8 = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_DELTA '{}': {}", value, err))?; + if parsed == 0 { + return Err(anyhow!( + "invalid RPKI_RTR_MAX_DELTA '{}': must be >= 1", + value + )); + } + config.max_delta = parsed; + } + if let Some(value) = env_var("RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")? { + config.prune_delta_by_snapshot_size = + parse_bool(&value, "RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")?; + } + if let Some(value) = env_var("RPKI_RTR_STRICT_CCR_VALIDATION")? { + config.strict_ccr_validation = parse_bool(&value, "RPKI_RTR_STRICT_CCR_VALIDATION")?; + } + if let Some(value) = env_var("RPKI_RTR_REPORT_DIR")? { + config.report_dir = value; + } + if let Some(value) = env_var("RPKI_RTR_REPORT_INTERVAL_SECS")? { + let secs = parse_positive_u64(&value, "RPKI_RTR_REPORT_INTERVAL_SECS")?; + config.report_interval = Duration::from_secs(secs); + } + if let Some(value) = env_var("RPKI_RTR_TIMEZONE")? { + config.timezone = parse_timezone(&value, "RPKI_RTR_TIMEZONE")?; + } + + let source_refresh_interval_new = env_var("RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; + let source_refresh_interval_legacy = env_var("RPKI_RTR_REFRESH_INTERVAL_SECS")?; + match ( + source_refresh_interval_new.as_deref(), + source_refresh_interval_legacy.as_deref(), + ) { + (Some(new_value), Some(_)) => { + let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; + config.source_refresh_interval = Duration::from_secs(secs); + warn!( + "both RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS and legacy RPKI_RTR_REFRESH_INTERVAL_SECS are set; using RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS" + ); + } + (Some(new_value), None) => { + let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; + config.source_refresh_interval = Duration::from_secs(secs); + } + (None, Some(legacy_value)) => { + let secs = parse_positive_u64(legacy_value, "RPKI_RTR_REFRESH_INTERVAL_SECS")?; + config.source_refresh_interval = Duration::from_secs(secs); + warn!( + "RPKI_RTR_REFRESH_INTERVAL_SECS is deprecated; use RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS" + ); + } + (None, None) => {} + } + + if let Some(value) = env_var("RPKI_RTR_TIMING_REFRESH_SECS")? { + config.timing.refresh = parse_positive_u32(&value, "RPKI_RTR_TIMING_REFRESH_SECS")?; + } + if let Some(value) = env_var("RPKI_RTR_TIMING_RETRY_SECS")? { + config.timing.retry = parse_positive_u32(&value, "RPKI_RTR_TIMING_RETRY_SECS")?; + } + if let Some(value) = env_var("RPKI_RTR_TIMING_EXPIRE_SECS")? { + config.timing.expire = parse_positive_u32(&value, "RPKI_RTR_TIMING_EXPIRE_SECS")?; + } + config + .timing + .validate() + .map_err(|err| anyhow!("invalid RTR timing configuration: {}", err))?; + + if let Some(value) = env_var("RPKI_RTR_MAX_CONNECTIONS")? { + config.service_config.max_connections = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err))?; + } + if let Some(value) = env_var("RPKI_RTR_MAX_CONCURRENT_HANDSHAKES")? { + config.service_config.max_concurrent_handshakes = value.parse().map_err(|err| { + anyhow!( + "invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': {}", + value, + err + ) + })?; + } + if let Some(value) = env_var("RPKI_RTR_NOTIFY_QUEUE_SIZE")? { + config.service_config.notify_queue_size = value.parse().map_err(|err| { + anyhow!("invalid RPKI_RTR_NOTIFY_QUEUE_SIZE '{}': {}", value, err) + })?; + } + if let Some(value) = env_var("RPKI_RTR_TCP_KEEPALIVE_SECS")? { + let secs: u64 = value.parse().map_err(|err| { + anyhow!("invalid RPKI_RTR_TCP_KEEPALIVE_SECS '{}': {}", value, err) + })?; + config.service_config.tcp_keepalive = if secs == 0 { + None + } else { + Some(Duration::from_secs(secs)) + }; + } + if let Some(value) = env_var("RPKI_RTR_WARN_INSECURE_TCP")? { + config.service_config.warn_insecure_tcp = + parse_bool(&value, "RPKI_RTR_WARN_INSECURE_TCP")?; + } + if let Some(value) = env_var("RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")? { + config.service_config.require_tls_server_dns_name_san = + parse_bool(&value, "RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN")?; + } + if let Some(value) = env_var("RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")? { + config.service_config.enforce_tls_client_san_ip_match = + parse_bool(&value, "RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH")?; + } + + if config.service_config.max_connections == 0 { + return Err(anyhow!( + "invalid RPKI_RTR_MAX_CONNECTIONS '{}': must be >= 1", + config.service_config.max_connections + )); + } + if config.service_config.max_concurrent_handshakes == 0 { + return Err(anyhow!( + "invalid RPKI_RTR_MAX_CONCURRENT_HANDSHAKES '{}': must be >= 1", + config.service_config.max_concurrent_handshakes + )); + } + if config.service_config.max_concurrent_handshakes > config.service_config.max_connections { + return Err(anyhow!( + "invalid handshake/connection limits: RPKI_RTR_MAX_CONCURRENT_HANDSHAKES ({}) must be <= RPKI_RTR_MAX_CONNECTIONS ({})", + config.service_config.max_concurrent_handshakes, + config.service_config.max_connections + )); + } + + Ok(config) + } +} + +pub fn log_startup_config(config: &AppConfig) { + info!("starting RTR service"); + info!("db_path={}", config.db_path); + info!("tcp_addr={}", config.tcp_addr); + info!("tls_enabled={}", config.enable_tls); + info!("ssh_enabled={}", config.enable_ssh); + + if config.enable_tls { + info!("tls_addr={}", config.tls_addr); + info!("tls_cert_path={}", config.tls_cert_path); + info!("tls_key_path={}", config.tls_key_path); + info!("tls_client_ca_path={}", config.tls_client_ca_path); + } + if config.enable_ssh { + info!("ssh_addr={}", config.ssh_addr); + info!("ssh_host_key_path={}", config.ssh_host_key_path); + info!( + "ssh_authorized_keys_path={}", + config.ssh_authorized_keys_path + ); + info!("ssh_username={}", config.ssh_username); + info!("ssh_subsystem_name={}", config.ssh_subsystem_name); + info!("ssh_auth_mode={}", config.ssh_auth_mode.as_str()); + info!("ssh_password_enabled={}", config.ssh_password.is_some()); + } + + info!("ccr_dir={}", config.ccr_dir); + info!( + "slurm_dir={}", + config.slurm_dir.as_deref().unwrap_or("disabled") + ); + info!("max_delta={}", config.max_delta); + info!("strict_ccr_validation={}", config.strict_ccr_validation); + info!( + "source_refresh_interval_secs={}", + config.source_refresh_interval.as_secs() + ); + info!("report_dir={}", config.report_dir); + info!("report_interval_secs={}", config.report_interval.as_secs()); + info!("timezone={}", format_timezone(config.timezone)); + info!("rtr_timing_refresh_secs={}", config.timing.refresh); + info!("rtr_timing_retry_secs={}", config.timing.retry); + info!("rtr_timing_expire_secs={}", config.timing.expire); + info!("max_connections={}", config.service_config.max_connections); + info!( + "max_concurrent_handshakes={}", + config.service_config.max_concurrent_handshakes + ); + info!( + "notify_queue_size={}", + config.service_config.notify_queue_size + ); + info!( + "tcp_keepalive_secs={}", + config + .service_config + .tcp_keepalive + .map(|duration| duration.as_secs().to_string()) + .unwrap_or_else(|| "disabled".to_string()) + ); + info!( + "warn_insecure_tcp={}", + config.service_config.warn_insecure_tcp + ); + info!( + "require_tls_server_dns_name_san={}", + config.service_config.require_tls_server_dns_name_san + ); + info!( + "enforce_tls_client_san_ip_match={}", + config.service_config.enforce_tls_client_san_ip_match + ); +} + +pub fn default_timezone() -> Tz { + chrono_tz::Asia::Shanghai +} + +pub fn format_timezone(timezone: Tz) -> String { + timezone.name().to_string() +} + +fn env_var(name: &str) -> Result> { + match env::var(name) { + Ok(value) => Ok(Some(value)), + Err(env::VarError::NotPresent) => Ok(None), + Err(err) => Err(anyhow!("failed to read {}: {}", name, err)), + } +} + +fn parse_bool(value: &str, name: &str) -> Result { + match value.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "yes" | "on" => Ok(true), + "0" | "false" | "no" | "off" => Ok(false), + _ => Err(anyhow!("invalid {} '{}': expected boolean", name, value)), + } +} + +fn parse_positive_u64(value: &str, name: &str) -> Result { + let parsed = value + .parse::() + .map_err(|err| anyhow!("invalid {} '{}': {}", name, value, err))?; + if parsed == 0 { + return Err(anyhow!("invalid {} '{}': must be >= 1", name, value)); + } + Ok(parsed) +} + +fn parse_positive_u32(value: &str, name: &str) -> Result { + let parsed = value + .parse::() + .map_err(|err| anyhow!("invalid {} '{}': {}", name, value, err))?; + if parsed == 0 { + return Err(anyhow!("invalid {} '{}': must be >= 1", name, value)); + } + Ok(parsed) +} + +fn parse_timezone(value: &str, name: &str) -> Result { + let value = value.trim(); + let normalized = match value.to_ascii_lowercase().as_str() { + "shanghai" | "beijing" | "peking" => "Asia/Shanghai", + "utc" | "z" => "UTC", + _ => value, + }; + + normalized.parse::().map_err(|err| { + anyhow!( + "invalid {} '{}': expected IANA timezone like Asia/Shanghai, Europe/London, America/New_York, or UTC: {}", + name, + value, + err + ) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_timezone_accepts_iana_names_and_common_aliases() { + assert_eq!( + parse_timezone("Asia/Shanghai", "TEST").unwrap(), + chrono_tz::Asia::Shanghai + ); + assert_eq!( + parse_timezone("shanghai", "TEST").unwrap(), + chrono_tz::Asia::Shanghai + ); + assert_eq!( + parse_timezone("Europe/London", "TEST").unwrap(), + chrono_tz::Europe::London + ); + assert_eq!( + parse_timezone("America/New_York", "TEST").unwrap(), + chrono_tz::America::New_York + ); + assert_eq!(parse_timezone("UTC", "TEST").unwrap(), chrono_tz::UTC); + assert_eq!(parse_timezone("Z", "TEST").unwrap(), chrono_tz::UTC); + } + + #[test] + fn parse_timezone_rejects_offset_and_invalid_values() { + assert!(parse_timezone("+08:00", "TEST").is_err()); + assert!(parse_timezone("+0800", "TEST").is_err()); + assert!(parse_timezone("Mars/Base", "TEST").is_err()); + } +} diff --git a/src/rtr/mod.rs b/src/rtr/mod.rs index 11ed2dd..01304a7 100644 --- a/src/rtr/mod.rs +++ b/src/rtr/mod.rs @@ -1,7 +1,9 @@ pub mod cache; +pub mod config; pub mod error_type; pub mod payload; pub mod pdu; +pub mod report; pub mod server; pub mod session; pub mod state; diff --git a/src/rtr/report.rs b/src/rtr/report.rs new file mode 100644 index 0000000..ba2c026 --- /dev/null +++ b/src/rtr/report.rs @@ -0,0 +1,565 @@ +use std::fs; +use std::path::Path; +use std::sync::{Arc, RwLock}; +use std::time::Instant; + +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use chrono_tz::Tz; +use serde::Serialize; +use tracing::warn; + +use crate::rtr::cache::{CacheAvailability, SharedRtrCache, VersionReportStats}; +use crate::rtr::config::format_timezone; +use crate::rtr::server::{RtrNotifier, RtrServiceStats, RtrTransportConnectionCounts}; +use crate::source::pipeline::{DataQualityReport, SourceLoadReport}; + +#[derive(Clone, Serialize)] +pub struct ReportConfiguration { + source_refresh_interval_seconds: u64, + report_interval_seconds: u64, + max_delta: u8, + prune_delta_by_snapshot_size: bool, + strict_ccr_validation: bool, + timezone: String, + timing: TimingReport, +} + +#[derive(Clone, Copy, Serialize)] +struct TimingReport { + refresh: u32, + retry: u32, + expire: u32, +} + +impl ReportConfiguration { + pub fn new( + source_refresh_interval_seconds: u64, + report_interval_seconds: u64, + max_delta: u8, + prune_delta_by_snapshot_size: bool, + strict_ccr_validation: bool, + timezone: Tz, + timing: (u32, u32, u32), + ) -> Self { + Self { + source_refresh_interval_seconds, + report_interval_seconds, + max_delta, + prune_delta_by_snapshot_size, + strict_ccr_validation, + timezone: format_timezone(timezone), + timing: TimingReport { + refresh: timing.0, + retry: timing.1, + expire: timing.2, + }, + } + } + + pub fn timezone(&self) -> Tz { + self.timezone + .parse::() + .expect("serialized timezone should be a valid IANA timezone") + } +} + +#[derive(Clone)] +pub struct ReportContext { + started_at: DateTime, + started_instant: Instant, + timezone: Tz, + configuration: ReportConfiguration, + runtime: Arc>, +} + +#[derive(Debug, Clone, Default)] +struct RuntimeReportState { + source: Option, + data_quality: Option, + refresh: RefreshReport, +} + +#[derive(Debug, Clone, Serialize)] +struct RefreshReport { + last_attempt_at: Option>, + last_success_at: Option>, + last_changed_at: Option>, + status: &'static str, + changed: Option, + duration_ms: Option, + consecutive_failures: u64, + last_error: Option, +} + +#[derive(Serialize)] +struct RefreshReportView { + last_attempt_at: Option>, + last_success_at: Option>, + last_changed_at: Option>, + status: &'static str, + changed: Option, + duration_ms: Option, + consecutive_failures: u64, + last_error: Option, +} + +impl RefreshReportView { + fn from_report(report: RefreshReport, timezone: Tz) -> Self { + Self { + last_attempt_at: report + .last_attempt_at + .map(|time| to_report_time(time, timezone)), + last_success_at: report + .last_success_at + .map(|time| to_report_time(time, timezone)), + last_changed_at: report + .last_changed_at + .map(|time| to_report_time(time, timezone)), + status: report.status, + changed: report.changed, + duration_ms: report.duration_ms, + consecutive_failures: report.consecutive_failures, + last_error: report.last_error, + } + } +} + +impl Default for RefreshReport { + fn default() -> Self { + Self { + last_attempt_at: None, + last_success_at: None, + last_changed_at: None, + status: "not_attempted", + changed: None, + duration_ms: None, + consecutive_failures: 0, + last_error: None, + } + } +} + +impl ReportContext { + pub fn new(configuration: ReportConfiguration) -> Self { + let timezone = configuration.timezone(); + Self { + started_at: Utc::now(), + started_instant: Instant::now(), + timezone, + configuration, + runtime: Arc::new(RwLock::new(RuntimeReportState::default())), + } + } + + pub fn record_refresh_success( + &self, + attempted_at: DateTime, + duration_ms: u128, + changed: bool, + source: SourceLoadReport, + data_quality: DataQualityReport, + ) { + let mut state = self + .runtime + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.source = Some(source); + state.data_quality = Some(data_quality); + state.refresh.last_attempt_at = Some(attempted_at); + state.refresh.last_success_at = Some(Utc::now()); + if changed { + state.refresh.last_changed_at = Some(Utc::now()); + } + state.refresh.status = "success"; + state.refresh.changed = Some(changed); + state.refresh.duration_ms = Some(duration_ms); + state.refresh.consecutive_failures = 0; + state.refresh.last_error = None; + } + + pub fn record_refresh_unchanged(&self, attempted_at: DateTime, duration_ms: u128) { + let mut state = self + .runtime + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.refresh.last_attempt_at = Some(attempted_at); + state.refresh.last_success_at = Some(Utc::now()); + state.refresh.status = "success"; + state.refresh.changed = Some(false); + state.refresh.duration_ms = Some(duration_ms); + state.refresh.consecutive_failures = 0; + state.refresh.last_error = None; + } + + pub fn record_refresh_failure( + &self, + attempted_at: DateTime, + duration_ms: u128, + error: &anyhow::Error, + ) { + let mut state = self + .runtime + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.refresh.last_attempt_at = Some(attempted_at); + state.refresh.status = "failed"; + state.refresh.changed = None; + state.refresh.duration_ms = Some(duration_ms); + state.refresh.consecutive_failures = state.refresh.consecutive_failures.saturating_add(1); + state.refresh.last_error = Some(error.to_string()); + } + + pub fn write_or_warn( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) { + if let Err(err) = self.write(report_dir, phase, shared_cache, notifier, service_stats) { + warn!( + "failed to write RTR report to {}: {:?}", + report_dir.display(), + err + ); + } + } + + fn write( + &self, + report_dir: &Path, + phase: &str, + shared_cache: &SharedRtrCache, + notifier: &RtrNotifier, + service_stats: &RtrServiceStats, + ) -> Result<()> { + let cache = shared_cache.load_full(); + let runtime = self + .runtime + .read() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .clone(); + let availability = match cache.availability() { + CacheAvailability::Ready => "ready", + CacheAvailability::NoDataAvailable => "no_data_available", + }; + let active_connections = service_stats.active_connections(); + let connections_by_transport = service_stats.transport_connections(); + let max_connections = service_stats.max_connections(); + let report = RtrReport { + schema_version: 2, + generated_at: self.report_now(), + phase: phase.to_string(), + service: ServiceReport { + started_at: self.to_report_time(self.started_at), + uptime_seconds: self.started_instant.elapsed().as_secs(), + active_connections, + connections_by_transport: TransportConnectionReport::from(connections_by_transport), + session_listeners: notifier.listener_count(), + max_connections, + connection_utilization: active_connections as f64 / max_connections as f64, + }, + process: ProcessReport { + rss_mib: current_rss_mib(), + }, + source: runtime + .source + .map(|source| SourceLoadReportView::from_report(source, self.timezone)), + refresh: RefreshReportView::from_report(runtime.refresh, self.timezone), + data_quality: runtime.data_quality, + configuration: self.configuration.clone(), + cache: CacheReport { + availability, + created_at: self.to_report_time(cache.created_at().utc()), + last_update_begin: self.to_report_time(cache.last_update_begin().utc()), + last_update_end: self.to_report_time(cache.last_update_end().utc()), + versions: cache.version_report_stats(), + }, + }; + + fs::create_dir_all(report_dir) + .with_context(|| format!("create report directory {}", report_dir.display()))?; + let target = report_dir.join("rtr-server.json"); + let temporary = report_dir.join(".rtr-server.json.tmp"); + let json = serde_json::to_vec_pretty(&report).context("serialize RTR report")?; + fs::write(&temporary, json) + .with_context(|| format!("write temporary report {}", temporary.display()))?; + + if let Err(err) = fs::rename(&temporary, &target) { + if target.exists() { + fs::remove_file(&target) + .with_context(|| format!("replace existing report {}", target.display()))?; + fs::rename(&temporary, &target) + .with_context(|| format!("move report into {}", target.display()))?; + } else { + return Err(err).with_context(|| { + format!( + "move temporary report {} into {}", + temporary.display(), + target.display() + ) + }); + } + } + + Ok(()) + } + + fn report_now(&self) -> DateTime { + self.to_report_time(Utc::now()) + } + + fn to_report_time(&self, time: DateTime) -> DateTime { + time.with_timezone(&self.timezone) + } +} + +#[derive(Serialize)] +struct RtrReport { + schema_version: u16, + generated_at: DateTime, + phase: String, + service: ServiceReport, + process: ProcessReport, + source: Option, + refresh: RefreshReportView, + data_quality: Option, + configuration: ReportConfiguration, + cache: CacheReport, +} + +#[derive(Serialize)] +struct ServiceReport { + started_at: DateTime, + uptime_seconds: u64, + active_connections: usize, + connections_by_transport: TransportConnectionReport, + session_listeners: usize, + max_connections: usize, + connection_utilization: f64, +} + +#[derive(Serialize)] +struct SourceLoadReportView { + ccr_file: String, + ccr_file_size_bytes: u64, + ccr_modified_at: Option>, + ccr_produced_at: Option, + slurm_enabled: bool, + slurm_file_count: usize, + slurm_files: Vec, + slurm_version: Option, +} + +impl SourceLoadReportView { + fn from_report(report: SourceLoadReport, timezone: Tz) -> Self { + Self { + ccr_file: report.ccr_file, + ccr_file_size_bytes: report.ccr_file_size_bytes, + ccr_modified_at: report + .ccr_modified_at + .map(|time| to_report_time(time, timezone)), + ccr_produced_at: report.ccr_produced_at, + slurm_enabled: report.slurm_enabled, + slurm_file_count: report.slurm_file_count, + slurm_files: report.slurm_files, + slurm_version: report.slurm_version, + } + } +} + +#[derive(Serialize)] +struct TransportConnectionReport { + tcp: usize, + tls: usize, + ssh: usize, +} + +impl From for TransportConnectionReport { + fn from(counts: RtrTransportConnectionCounts) -> Self { + Self { + tcp: counts.tcp, + tls: counts.tls, + ssh: counts.ssh, + } + } +} + +#[derive(Serialize)] +struct ProcessReport { + rss_mib: Option, +} + +#[derive(Serialize)] +struct CacheReport { + availability: &'static str, + created_at: DateTime, + last_update_begin: DateTime, + last_update_end: DateTime, + versions: [VersionReportStats; 3], +} + +pub fn to_report_time(time: DateTime, timezone: Tz) -> DateTime { + time.with_timezone(&timezone) +} + +pub fn current_rss_mib() -> Option { + let status = fs::read_to_string("/proc/self/status").ok()?; + let vmrss_line = status.lines().find(|line| line.starts_with("VmRSS:"))?; + let kb = vmrss_line + .split_whitespace() + .nth(1) + .and_then(|value| value.parse::().ok())?; + Some(kb / 1024) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::rtr::cache::RtrCache; + use crate::rtr::server::RtrService; + use crate::source::pipeline::{ + DataQualityReport, PayloadTypeCounts, SlurmRuleCounts, SourceLoadReport, + }; + use arc_swap::ArcSwap; + use serde_json::Value; + + use super::*; + + fn test_configuration() -> ReportConfiguration { + ReportConfiguration::new( + 300, + 60, + 100, + false, + false, + chrono_tz::Asia::Shanghai, + (3600, 600, 7200), + ) + } + + #[test] + fn write_report_creates_parseable_json() { + let temp = tempfile::tempdir().unwrap(); + let report_dir = temp.path().join("report"); + let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); + let service = RtrService::new(shared_cache.clone()); + let notifier = service.notifier(); + let context = ReportContext::new(test_configuration()); + + context + .write( + &report_dir, + "test", + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + + let report: Value = + serde_json::from_slice(&fs::read(report_dir.join("rtr-server.json")).unwrap()).unwrap(); + assert_eq!(report["schema_version"], 2); + assert_eq!(report["phase"], "test"); + assert_report_time_offset(&report["generated_at"]); + assert_report_time_offset(&report["service"]["started_at"]); + assert_report_time_offset(&report["cache"]["created_at"]); + assert_eq!(report["cache"]["availability"], "ready"); + assert_eq!(report["refresh"]["status"], "not_attempted"); + assert!(report["source"].is_null()); + assert!(report["data_quality"].is_null()); + assert_eq!( + report["configuration"]["source_refresh_interval_seconds"], + 300 + ); + assert_eq!(report["service"]["max_connections"], 1024); + assert_eq!(report["service"]["active_connections"], 0); + assert_eq!(report["service"]["connections_by_transport"]["tcp"], 0); + assert_eq!(report["service"]["connections_by_transport"]["tls"], 0); + assert_eq!(report["service"]["connections_by_transport"]["ssh"], 0); + assert_eq!(report["cache"]["versions"].as_array().unwrap().len(), 3); + assert_eq!(report["cache"]["versions"][2]["snapshot"]["total"], 0); + assert!(!report_dir.join(".rtr-server.json.tmp").exists()); + } + + #[test] + fn refresh_failure_preserves_last_successful_source_data() { + let temp = tempfile::tempdir().unwrap(); + let report_dir = temp.path().join("report"); + let shared_cache = Arc::new(ArcSwap::from_pointee(RtrCache::default())); + let service = RtrService::new(shared_cache.clone()); + let notifier = service.notifier(); + let context = ReportContext::new(test_configuration()); + let source = SourceLoadReport { + ccr_file: "data/example.ccr".to_string(), + ccr_file_size_bytes: 123, + ccr_modified_at: Some(Utc::now()), + ccr_produced_at: Some("20260615000000Z".to_string()), + slurm_enabled: true, + slurm_file_count: 1, + slurm_files: vec!["policy.slurm".to_string()], + slurm_version: Some(2), + }; + let quality = DataQualityReport { + ccr_input: PayloadTypeCounts { + total: 11, + vrp: 10, + router_key: 0, + aspa: 1, + }, + invalid: PayloadTypeCounts::default(), + before_slurm: PayloadTypeCounts { + total: 11, + vrp: 10, + router_key: 0, + aspa: 1, + }, + after_slurm: PayloadTypeCounts { + total: 10, + vrp: 9, + router_key: 0, + aspa: 1, + }, + slurm_filters: SlurmRuleCounts { + prefix: 1, + router_key: 0, + aspa: 0, + }, + slurm_assertions: SlurmRuleCounts::default(), + }; + + context.record_refresh_success(Utc::now(), 12, true, source, quality); + context.record_refresh_failure(Utc::now(), 5, &anyhow::anyhow!("source unavailable")); + context + .write( + &report_dir, + "refresh_failed", + &shared_cache, + ¬ifier, + &service.stats(), + ) + .unwrap(); + + let report: Value = + serde_json::from_slice(&fs::read(report_dir.join("rtr-server.json")).unwrap()).unwrap(); + assert_eq!(report["source"]["ccr_file"], "data/example.ccr"); + assert_report_time_offset(&report["source"]["ccr_modified_at"]); + assert_eq!(report["data_quality"]["after_slurm"]["total"], 10); + assert_eq!(report["refresh"]["status"], "failed"); + assert_eq!(report["refresh"]["consecutive_failures"], 1); + assert_eq!(report["refresh"]["last_error"], "source unavailable"); + assert!(!report["refresh"]["last_success_at"].is_null()); + assert_report_time_offset(&report["refresh"]["last_success_at"]); + } + + fn assert_report_time_offset(value: &Value) { + let value = value.as_str().expect("report time should be a string"); + assert!( + value.ends_with("+08:00"), + "report time should use +08:00 offset, got {value}" + ); + } +} diff --git a/src/rtr/server/connection.rs b/src/rtr/server/connection.rs index 9cdd115..8673e18 100644 --- a/src/rtr/server/connection.rs +++ b/src/rtr/server/connection.rs @@ -16,16 +16,73 @@ use tokio_rustls::TlsAcceptor; use crate::rtr::cache::SharedRtrCache; use crate::rtr::session::RtrSession; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RtrTransportKind { + Tcp, + Tls, + Ssh, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct RtrTransportConnectionCounts { + pub tcp: usize, + pub tls: usize, + pub ssh: usize, +} + +#[derive(Default)] +pub struct RtrTransportConnectionCounters { + tcp: AtomicUsize, + tls: AtomicUsize, + ssh: AtomicUsize, +} + +impl RtrTransportConnectionCounters { + pub fn increment(&self, transport: RtrTransportKind) { + self.counter(transport).fetch_add(1, Ordering::Relaxed); + } + + pub fn decrement(&self, transport: RtrTransportKind) { + self.counter(transport).fetch_sub(1, Ordering::Relaxed); + } + + pub fn snapshot(&self) -> RtrTransportConnectionCounts { + RtrTransportConnectionCounts { + tcp: self.tcp.load(Ordering::Relaxed), + tls: self.tls.load(Ordering::Relaxed), + ssh: self.ssh.load(Ordering::Relaxed), + } + } + + fn counter(&self, transport: RtrTransportKind) -> &AtomicUsize { + match transport { + RtrTransportKind::Tcp => &self.tcp, + RtrTransportKind::Tls => &self.tls, + RtrTransportKind::Ssh => &self.ssh, + } + } +} + pub struct ConnectionGuard { active_connections: Arc, + transport_connections: Arc, + transport: RtrTransportKind, _permit: OwnedSemaphorePermit, } impl ConnectionGuard { - pub fn new(active_connections: Arc, permit: OwnedSemaphorePermit) -> Self { + pub fn new( + active_connections: Arc, + transport_connections: Arc, + transport: RtrTransportKind, + permit: OwnedSemaphorePermit, + ) -> Self { active_connections.fetch_add(1, Ordering::Relaxed); + transport_connections.increment(transport); Self { active_connections, + transport_connections, + transport, _permit: permit, } } @@ -38,6 +95,7 @@ impl ConnectionGuard { impl Drop for ConnectionGuard { fn drop(&mut self) { self.active_connections.fetch_sub(1, Ordering::Relaxed); + self.transport_connections.decrement(self.transport); } } @@ -72,7 +130,10 @@ pub async fn handle_tls_connection( .with_context(|| format!("TLS handshake failed for {}", peer_addr))?; info!("RTR TLS handshake completed for {}", peer_addr); match verify_peer_certificate_ip(&tls_stream, peer_addr.ip()) { - Ok(()) => info!("RTR TLS client certificate SAN IP validated for {}", peer_addr), + Ok(()) => info!( + "RTR TLS client certificate SAN IP validated for {}", + peer_addr + ), Err(err) => { if enforce_client_san_ip_match { return Err(err).with_context(|| { diff --git a/src/rtr/server/listener.rs b/src/rtr/server/listener.rs index ccfb808..c6e1964 100644 --- a/src/rtr/server/listener.rs +++ b/src/rtr/server/listener.rs @@ -20,7 +20,8 @@ use rustls::ServerConfig; use crate::rtr::cache::SharedRtrCache; use crate::rtr::server::config::RtrServiceConfig; use crate::rtr::server::connection::{ - ConnectionGuard, handle_tcp_connection, handle_tls_connection, is_expected_disconnect, + ConnectionGuard, RtrTransportConnectionCounters, RtrTransportKind, handle_tcp_connection, + handle_tls_connection, is_expected_disconnect, }; use crate::rtr::server::ssh::RtrSshRuntimeConfig; use crate::rtr::server::tls::load_rustls_server_config_with_options; @@ -30,6 +31,7 @@ type TransportFuture = Pin> + Send>>; pub trait TransportAcceptor: Clone + Send + Sync + 'static { fn name(&self) -> &'static str; + fn kind(&self) -> RtrTransportKind; fn requires_handshake_limit(&self) -> bool { false } @@ -52,6 +54,9 @@ impl TransportAcceptor for TcpTransport { fn name(&self) -> &'static str { "TCP" } + fn kind(&self) -> RtrTransportKind { + RtrTransportKind::Tcp + } fn handle_connection( &self, @@ -85,6 +90,9 @@ impl TransportAcceptor for TlsTransport { fn name(&self) -> &'static str { "TLS" } + fn kind(&self) -> RtrTransportKind { + RtrTransportKind::Tls + } fn requires_handshake_limit(&self) -> bool { true } @@ -125,6 +133,9 @@ impl TransportAcceptor for SshTransport { fn name(&self) -> &'static str { "SSH" } + fn kind(&self) -> RtrTransportKind { + RtrTransportKind::Ssh + } fn handle_connection( &self, @@ -187,6 +198,7 @@ pub struct RtrServer { connection_limiter: Arc, handshake_limiter: Arc, active_connections: Arc, + transport_connections: Arc, config: RtrServiceConfig, } @@ -199,6 +211,7 @@ impl RtrServer { connection_limiter: Arc, handshake_limiter: Arc, active_connections: Arc, + transport_connections: Arc, config: RtrServiceConfig, ) -> Self { Self { @@ -209,6 +222,7 @@ impl RtrServer { connection_limiter, handshake_limiter, active_connections, + transport_connections, config, } } @@ -358,7 +372,9 @@ impl RtrServer { let notify_tx = self.notify_tx.clone(); let shutdown_tx = self.shutdown_tx.clone(); let active_connections = self.active_connections.clone(); + let transport_connections = self.transport_connections.clone(); let transport_instance = transport.clone(); + let transport_kind = transport_instance.kind(); debug!( "RTR {} client connected: peer_addr={}, active_connections(before_spawn)={}", @@ -368,7 +384,12 @@ impl RtrServer { ); tokio::spawn(async move { - let guard = ConnectionGuard::new(active_connections, permit); + let guard = ConnectionGuard::new( + active_connections, + transport_connections, + transport_kind, + permit, + ); info!( "RTR {} connection established: peer_addr={}, active_connections={}", transport_instance.name(), diff --git a/src/rtr/server/mod.rs b/src/rtr/server/mod.rs index 0808ddb..d7a0133 100644 --- a/src/rtr/server/mod.rs +++ b/src/rtr/server/mod.rs @@ -7,7 +7,8 @@ pub mod ssh; pub mod tls; pub use config::RtrServiceConfig; +pub use connection::RtrTransportConnectionCounts; pub use listener::RtrServer; pub use notifier::RtrNotifier; -pub use service::{RtrService, RunningRtrService}; +pub use service::{RtrService, RtrServiceStats, RunningRtrService}; pub use tls::load_rustls_server_config; diff --git a/src/rtr/server/service.rs b/src/rtr/server/service.rs index 7a9f366..e43192e 100644 --- a/src/rtr/server/service.rs +++ b/src/rtr/server/service.rs @@ -11,6 +11,9 @@ use tracing::{error, warn}; use crate::rtr::cache::SharedRtrCache; use crate::rtr::server::config::RtrServiceConfig; +use crate::rtr::server::connection::{ + RtrTransportConnectionCounters, RtrTransportConnectionCounts, +}; use crate::rtr::server::listener::RtrServer; use crate::rtr::server::notifier::RtrNotifier; use crate::rtr::server::ssh::{SshAuthMode, load_rtr_ssh_runtime_config}; @@ -22,9 +25,31 @@ pub struct RtrService { connection_limiter: Arc, handshake_limiter: Arc, active_connections: Arc, + transport_connections: Arc, config: RtrServiceConfig, } +#[derive(Clone)] +pub struct RtrServiceStats { + active_connections: Arc, + transport_connections: Arc, + max_connections: usize, +} + +impl RtrServiceStats { + pub fn active_connections(&self) -> usize { + self.active_connections.load(Ordering::Relaxed) + } + + pub fn transport_connections(&self) -> RtrTransportConnectionCounts { + self.transport_connections.snapshot() + } + + pub fn max_connections(&self) -> usize { + self.max_connections + } +} + impl RtrService { pub fn new(cache: SharedRtrCache) -> Self { Self::with_config(cache, RtrServiceConfig::default()) @@ -41,6 +66,7 @@ impl RtrService { connection_limiter: Arc::new(Semaphore::new(config.max_connections)), handshake_limiter: Arc::new(Semaphore::new(config.max_concurrent_handshakes)), active_connections: Arc::new(AtomicUsize::new(0)), + transport_connections: Arc::new(RtrTransportConnectionCounters::default()), config, } } @@ -61,10 +87,22 @@ impl RtrService { self.active_connections.load(Ordering::Relaxed) } + pub fn transport_connections(&self) -> RtrTransportConnectionCounts { + self.transport_connections.snapshot() + } + pub fn max_connections(&self) -> usize { self.config.max_connections } + pub fn stats(&self) -> RtrServiceStats { + RtrServiceStats { + active_connections: self.active_connections.clone(), + transport_connections: self.transport_connections.clone(), + max_connections: self.config.max_connections, + } + } + pub fn tcp_server(&self, bind_addr: SocketAddr) -> RtrServer { RtrServer::new( bind_addr, @@ -74,6 +112,7 @@ impl RtrService { self.connection_limiter.clone(), self.handshake_limiter.clone(), self.active_connections.clone(), + self.transport_connections.clone(), self.config.clone(), ) } @@ -87,6 +126,7 @@ impl RtrService { self.connection_limiter.clone(), self.handshake_limiter.clone(), self.active_connections.clone(), + self.transport_connections.clone(), self.config.clone(), ) } @@ -100,6 +140,7 @@ impl RtrService { self.connection_limiter.clone(), self.handshake_limiter.clone(), self.active_connections.clone(), + self.transport_connections.clone(), self.config.clone(), ) } diff --git a/src/rtr/server/ssh.rs b/src/rtr/server/ssh.rs index 97cef64..67de191 100644 --- a/src/rtr/server/ssh.rs +++ b/src/rtr/server/ssh.rs @@ -69,7 +69,10 @@ pub fn load_rtr_ssh_runtime_config( }; let password = password.map(str::trim).filter(|value| !value.is_empty()); if matches!(auth_mode, SshAuthMode::Password | SshAuthMode::Both) && password.is_none() { - bail!("SSH auth mode '{}' requires non-empty password", auth_mode.as_str()); + bail!( + "SSH auth mode '{}' requires non-empty password", + auth_mode.as_str() + ); } let mut methods = MethodSet::empty(); diff --git a/src/source/pipeline.rs b/src/source/pipeline.rs index e0e23b0..79c79ef 100644 --- a/src/source/pipeline.rs +++ b/src/source/pipeline.rs @@ -1,11 +1,14 @@ -use anyhow::{Result, anyhow}; use std::path::PathBuf; + +use anyhow::{Result, anyhow}; +use chrono::{DateTime, Utc}; +use serde::Serialize; use tracing::{info, warn}; use crate::rtr::payload::Payload; use crate::slurm::file::SlurmFile; use crate::source::ccr::{ - find_latest_ccr_file, load_ccr_payloads_from_file_with_options, load_ccr_snapshot_from_file, + find_latest_ccr_file, load_ccr_snapshot_from_file, snapshot_to_payloads_with_options, }; #[derive(Debug, Clone)] @@ -28,61 +31,90 @@ pub struct FileFingerprint { pub modified_unix_secs: u64, } -pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result> { - let payloads = load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation)?; +#[derive(Debug, Clone)] +pub struct PayloadLoadResult { + pub payloads: Vec, + pub source: SourceLoadReport, + pub quality: DataQualityReport, +} - match config.slurm_dir.as_deref() { - Some(dir) => apply_slurm_to_payloads_from_dir(dir, payloads), - None => Ok(payloads), +#[derive(Debug, Clone, Serialize)] +pub struct SourceLoadReport { + pub ccr_file: String, + pub ccr_file_size_bytes: u64, + pub ccr_modified_at: Option>, + pub ccr_produced_at: Option, + pub slurm_enabled: bool, + pub slurm_file_count: usize, + pub slurm_files: Vec, + pub slurm_version: Option, +} + +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)] +pub struct PayloadTypeCounts { + pub total: usize, + pub vrp: usize, + pub router_key: usize, + pub aspa: usize, +} + +impl PayloadTypeCounts { + fn from_payloads(payloads: &[Payload]) -> Self { + let mut counts = Self::default(); + for payload in payloads { + counts.total += 1; + match payload { + Payload::RouteOrigin(_) => counts.vrp += 1, + Payload::RouterKey(_) => counts.router_key += 1, + Payload::Aspa(_) => counts.aspa += 1, + } + } + counts } } -pub fn latest_sources_fingerprint(config: &PayloadLoadConfig) -> Result { - let latest_ccr = find_latest_ccr_file(&config.ccr_dir)?; - let ccr = fingerprint_of_path(&latest_ccr)?; - - let slurm_files = match config.slurm_dir.as_deref() { - Some(dir) => { - let mut paths = Vec::::new(); - for entry in std::fs::read_dir(dir) - .map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", dir, err))? - { - let entry = entry - .map_err(|err| anyhow!("failed to enumerate SLURM directory '{}': {}", dir, err))?; - let path = entry.path(); - if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("slurm") { - paths.push(path); - } - } - paths.sort_by_key(|path| { - path.file_name() - .and_then(|name| name.to_str()) - .map(|name| name.to_ascii_lowercase()) - .unwrap_or_default() - }); - let mut fps = Vec::with_capacity(paths.len()); - for path in paths { - fps.push(fingerprint_of_path(path)?); - } - fps - } - None => Vec::new(), - }; - - Ok(SourceFingerprint { ccr, slurm_files }) +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)] +pub struct SlurmRuleCounts { + pub prefix: usize, + pub router_key: usize, + pub aspa: usize, } -fn load_payloads_from_latest_ccr( - ccr_dir: &str, - strict_ccr_validation: bool, -) -> Result> { - let latest = find_latest_ccr_file(ccr_dir)?; +#[derive(Debug, Clone, Serialize)] +pub struct DataQualityReport { + pub ccr_input: PayloadTypeCounts, + pub invalid: PayloadTypeCounts, + pub before_slurm: PayloadTypeCounts, + pub after_slurm: PayloadTypeCounts, + pub slurm_filters: SlurmRuleCounts, + pub slurm_assertions: SlurmRuleCounts, +} + +pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result> { + Ok(load_payloads_from_latest_sources_with_report(config)?.payloads) +} + +pub fn load_payloads_from_latest_sources_with_report( + config: &PayloadLoadConfig, +) -> Result { + let latest = find_latest_ccr_file(&config.ccr_dir)?; + let fingerprint = fingerprint_of_path(&latest)?; let snapshot = load_ccr_snapshot_from_file(&latest)?; - let vrp_count = snapshot.vrps.len(); - let vap_count = snapshot.vaps.len(); + let ccr_input = PayloadTypeCounts { + total: snapshot.vrps.len() + snapshot.vaps.len(), + vrp: snapshot.vrps.len(), + router_key: 0, + aspa: snapshot.vaps.len(), + }; let produced_at = snapshot.produced_at.clone(); - let conversion = load_ccr_payloads_from_file_with_options(&latest, strict_ccr_validation)?; - let payloads = conversion.payloads; + let conversion = snapshot_to_payloads_with_options(&snapshot, config.strict_ccr_validation)?; + let invalid = PayloadTypeCounts { + total: conversion.invalid_vrps.len() + conversion.invalid_vaps.len(), + vrp: conversion.invalid_vrps.len(), + router_key: 0, + aspa: conversion.invalid_vaps.len(), + }; + let before_slurm = PayloadTypeCounts::from_payloads(&conversion.payloads); if !conversion.invalid_vrps.is_empty() { warn!( @@ -92,7 +124,6 @@ fn load_payloads_from_latest_ccr( sample_messages(&conversion.invalid_vrps) ); } - if !conversion.invalid_vaps.is_empty() { warn!( "CCR load skipped invalid VAPs/ASPAs: file={}, skipped={}, samples={:?}", @@ -102,23 +133,76 @@ fn load_payloads_from_latest_ccr( ); } + let (payloads, slurm_files, slurm_version, slurm_filters, slurm_assertions) = + match config.slurm_dir.as_deref() { + Some(dir) => apply_slurm_to_payloads_from_dir(dir, conversion.payloads)?, + None => ( + conversion.payloads, + Vec::new(), + None, + SlurmRuleCounts::default(), + SlurmRuleCounts::default(), + ), + }; + let after_slurm = PayloadTypeCounts::from_payloads(&payloads); + info!( "loaded latest CCR snapshot: file={}, produced_at={:?}, vrp_count={}, vap_count={}, payload_count={}, strict_ccr_validation={}", latest.display(), produced_at, - vrp_count, - vap_count, - payloads.len(), - strict_ccr_validation + snapshot.vrps.len(), + snapshot.vaps.len(), + before_slurm.total, + config.strict_ccr_validation ); - Ok(payloads) + Ok(PayloadLoadResult { + payloads, + source: SourceLoadReport { + ccr_file: fingerprint.path, + ccr_file_size_bytes: fingerprint.len, + ccr_modified_at: DateTime::from_timestamp(fingerprint.modified_unix_secs as i64, 0), + ccr_produced_at: produced_at, + slurm_enabled: config.slurm_dir.is_some(), + slurm_file_count: slurm_files.len(), + slurm_files, + slurm_version, + }, + quality: DataQualityReport { + ccr_input, + invalid, + before_slurm, + after_slurm, + slurm_filters, + slurm_assertions, + }, + }) +} + +pub fn latest_sources_fingerprint(config: &PayloadLoadConfig) -> Result { + let latest_ccr = find_latest_ccr_file(&config.ccr_dir)?; + let ccr = fingerprint_of_path(&latest_ccr)?; + let slurm_files = match config.slurm_dir.as_deref() { + Some(dir) => slurm_paths(dir)? + .into_iter() + .map(fingerprint_of_path) + .collect::>>()?, + None => Vec::new(), + }; + + Ok(SourceFingerprint { ccr, slurm_files }) } fn apply_slurm_to_payloads_from_dir( slurm_dir: &str, payloads: Vec, -) -> Result> { +) -> Result<( + Vec, + Vec, + Option, + SlurmRuleCounts, + SlurmRuleCounts, +)> { let files = read_slurm_files(slurm_dir)?; let file_count = files.len(); let file_names = files @@ -127,6 +211,18 @@ fn apply_slurm_to_payloads_from_dir( .collect::>(); let slurm = SlurmFile::merge_named(files) .map_err(|err| anyhow!("failed to merge SLURM files from '{}': {}", slurm_dir, err))?; + let filters = slurm.validation_output_filters(); + let assertions = slurm.locally_added_assertions(); + let filter_counts = SlurmRuleCounts { + prefix: filters.prefix_filters.len(), + router_key: filters.bgpsec_filters.len(), + aspa: filters.aspa_filters.len(), + }; + let assertion_counts = SlurmRuleCounts { + prefix: assertions.prefix_assertions.len(), + router_key: assertions.bgpsec_assertions.len(), + aspa: assertions.aspa_assertions.len(), + }; let input_count = payloads.len(); let filtered = slurm.apply_owned(payloads); @@ -142,11 +238,31 @@ fn apply_slurm_to_payloads_from_dir( output_count ); - Ok(filtered) + Ok(( + filtered, + file_names, + Some(slurm.version().as_u32()), + filter_counts, + assertion_counts, + )) } fn read_slurm_files(slurm_dir: &str) -> Result> { - let mut paths = Vec::::new(); + slurm_paths(slurm_dir)? + .into_iter() + .map(|path| { + let name = path.to_string_lossy().to_string(); + let file = std::fs::File::open(&path) + .map_err(|err| anyhow!("failed to open SLURM file '{}': {}", name, err))?; + let slurm = SlurmFile::from_reader(file) + .map_err(|err| anyhow!("failed to parse SLURM file '{}': {}", name, err))?; + Ok((name, slurm)) + }) + .collect() +} + +fn slurm_paths(slurm_dir: &str) -> Result> { + let mut paths = Vec::new(); for entry in std::fs::read_dir(slurm_dir) .map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", slurm_dir, err))? { @@ -169,25 +285,13 @@ fn read_slurm_files(slurm_dir: &str) -> Result> { .map(|name| name.to_ascii_lowercase()) .unwrap_or_default() }); - if paths.is_empty() { return Err(anyhow!( "SLURM directory '{}' does not contain .slurm files", slurm_dir )); } - - paths - .into_iter() - .map(|path| { - let name = path.to_string_lossy().to_string(); - let file = std::fs::File::open(&path) - .map_err(|err| anyhow!("failed to open SLURM file '{}': {}", name, err))?; - let slurm = SlurmFile::from_reader(file) - .map_err(|err| anyhow!("failed to parse SLURM file '{}': {}", name, err))?; - Ok((name, slurm)) - }) - .collect() + Ok(paths) } fn sample_messages(messages: &[String]) -> Vec<&str> { diff --git a/tests/test_cache.rs b/tests/test_cache.rs index 8aa5f11..e2995b4 100644 --- a/tests/test_cache.rs +++ b/tests/test_cache.rs @@ -71,6 +71,55 @@ fn deltas_all(deltas: VecDeque>) -> [VecDeque>; 3] { [deltas.clone(), deltas.clone(), deltas] } +#[test] +fn version_report_stats_separate_snapshot_and_delta_payload_types() { + let vrp = Payload::RouteOrigin(v4_origin(192, 0, 2, 0, 24, 24, 64496)); + let router_key = Payload::RouterKey(RouterKey::new( + Ski::default(), + Asn::from(64496u32), + vec![1, 2, 3], + )); + let aspa = Payload::Aspa(Aspa::new( + Asn::from(64496u32), + vec![Asn::from(64497u32)], + )); + let snapshot = Snapshot::from_payloads(vec![vrp.clone(), router_key.clone(), aspa.clone()]); + let mut deltas = VecDeque::new(); + deltas.push_back(Arc::new(Delta::new( + 101, + vec![vrp, aspa], + vec![router_key], + ))); + let cache = RtrCacheBuilder::new() + .session_ids(SessionIds::from_array([40, 41, 42])) + .serials([99, 100, 101]) + .snapshots(snapshots_all(snapshot)) + .deltas_by_version(deltas_all(deltas)) + .build(); + + let v2 = cache.version_report_stats()[2]; + + assert_eq!(v2.version, 2); + assert_eq!(v2.session_id, 42); + assert_eq!(v2.serial, 101); + assert_eq!(v2.snapshot.total, 3); + assert_eq!(v2.snapshot.vrp, 1); + assert_eq!(v2.snapshot.router_key, 1); + assert_eq!(v2.snapshot.aspa, 1); + + let latest = v2.latest_delta.unwrap(); + assert_eq!(latest.serial, 101); + assert_eq!(latest.announced.total, 2); + assert_eq!(latest.announced.vrp, 1); + assert_eq!(latest.announced.aspa, 1); + assert_eq!(latest.withdrawn.total, 1); + assert_eq!(latest.withdrawn.router_key, 1); + + assert_eq!(v2.delta_window.length, 1); + assert_eq!(v2.delta_window.oldest_serial, Some(101)); + assert_eq!(v2.delta_window.newest_serial, Some(101)); +} + /// Snapshot ?hash ? /// payload snapshot_hash / origins_hash ? #[test] diff --git a/tests/test_pipeline.rs b/tests/test_pipeline.rs index eb545f9..ea939b5 100644 --- a/tests/test_pipeline.rs +++ b/tests/test_pipeline.rs @@ -1,7 +1,10 @@ use std::fs; use std::path::PathBuf; -use rpki::source::pipeline::{PayloadLoadConfig, load_payloads_from_latest_sources}; +use rpki::source::pipeline::{ + PayloadLoadConfig, load_payloads_from_latest_sources, + load_payloads_from_latest_sources_with_report, +}; use tempfile::tempdir; fn data_dir() -> String { @@ -41,3 +44,56 @@ fn load_payloads_rejects_entire_slurm_set_when_any_file_is_invalid() { assert!(text.contains("failed to parse SLURM file")); assert!(text.contains("02-invalid.slurm")); } + +#[test] +fn load_report_contains_source_and_quality_details() { + let config = PayloadLoadConfig { + ccr_dir: data_dir(), + slurm_dir: None, + strict_ccr_validation: false, + }; + + let result = load_payloads_from_latest_sources_with_report(&config).unwrap(); + + assert!(result.source.ccr_file.ends_with(".ccr")); + assert!(result.source.ccr_file_size_bytes > 0); + assert!(!result.source.slurm_enabled); + assert_eq!(result.source.slurm_file_count, 0); + assert_eq!(result.quality.before_slurm, result.quality.after_slurm); + assert_eq!(result.quality.after_slurm.total, result.payloads.len()); + assert_eq!( + result.quality.ccr_input.total, + result.quality.ccr_input.vrp + result.quality.ccr_input.aspa + ); +} + +#[test] +fn load_report_contains_slurm_metadata_and_rule_counts() { + let slurm_dir = tempdir().expect("create temp slurm dir"); + let slurm = r#"{ + "slurmVersion": 1, + "validationOutputFilters": { + "prefixFilters": [], + "bgpsecFilters": [] + }, + "locallyAddedAssertions": { + "prefixAssertions": [], + "bgpsecAssertions": [] + } + }"#; + fs::write(slurm_dir.path().join("policy.slurm"), slurm).unwrap(); + let config = PayloadLoadConfig { + ccr_dir: data_dir(), + slurm_dir: Some(slurm_dir.path().to_string_lossy().to_string()), + strict_ccr_validation: false, + }; + + let result = load_payloads_from_latest_sources_with_report(&config).unwrap(); + + assert!(result.source.slurm_enabled); + assert_eq!(result.source.slurm_file_count, 1); + assert_eq!(result.source.slurm_version, Some(1)); + assert_eq!(result.quality.slurm_filters.prefix, 0); + assert_eq!(result.quality.slurm_assertions.prefix, 0); + assert_eq!(result.quality.before_slurm, result.quality.after_slurm); +}