From 8d6173f6054385b1bd36948d17f59be14f515335 Mon Sep 17 00:00:00 2001 From: "xiuting.xu" Date: Mon, 30 Mar 2026 10:03:43 +0800 Subject: [PATCH] =?UTF-8?q?rtr=E7=9A=84=E8=BE=93=E5=85=A5=E6=8D=A2?= =?UTF-8?q?=E6=88=90ccr=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 97 ++++---- scripts/start-rtr-server-tcp.sh | 5 +- scripts/start-rtr-server-tls.sh | 5 +- scripts/start-rtr-server.sh | 5 +- src/bin/rtr_debug_client/README.md | 4 +- src/bin/rtr_debug_client/main.rs | 2 +- src/main.rs | 107 ++++++--- src/rtr/cache/core.rs | 95 ++++++-- src/rtr/cache/store.rs | 17 +- src/rtr/ccr.rs | 361 +++++++++++++++++++++++++++++ src/rtr/mod.rs | 1 + src/rtr/server/connection.rs | 30 ++- src/rtr/server/listener.rs | 49 ++-- tests/test_cache.rs | 47 +++- tests/test_ccr.rs | 71 ++++++ 15 files changed, 759 insertions(+), 137 deletions(-) create mode 100644 src/rtr/ccr.rs create mode 100644 tests/test_ccr.rs diff --git a/README.md b/README.md index 46e3139..94137f5 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,18 @@ # RPKI RTR Server -默认运行目标平台:Ubuntu/Linux。 +默认运行平台:Ubuntu/Linux。 ## RTR Server -RTR Server 的运行配置通过环境变量读取。如果某个环境变量没有设置,则使用 -[`src/main.rs`](src/main.rs) 中的内置默认值。 +RTR Server 运行时从 `CCR` 目录中扫描最新的 `.ccr` 文件作为输入源。当前 `main` 路径不再读取 `vrps.txt` / `aspas.txt` / `router-keys.txt`,而是统一从 CCR 快照加载: + +- `VRP` +- `VAP / ASPA` + +相关实现位置: + +- [`src/main.rs`](src/main.rs) +- [`src/rtr/ccr.rs`](src/rtr/ccr.rs) ### 环境变量 @@ -14,27 +21,30 @@ RTR Server 的运行配置通过环境变量读取。如果某个环境变量没 | `RPKI_RTR_ENABLE_TLS` | 是否额外启用 TLS 监听。支持 `true/false`、`1/0`、`yes/no`、`on/off`。 | `true` | | `RPKI_RTR_TCP_ADDR` | TCP 监听地址。 | `0.0.0.0:323` | | `RPKI_RTR_TLS_ADDR` | TLS 监听地址。 | `0.0.0.0:324` | -| `RPKI_RTR_DB_PATH` | RTR 使用的 RocksDB 路径。 | `./rtr-db` | -| `RPKI_RTR_VRP_FILE` | 输入 VRP 文件路径。 | `./data/vrps.txt` | -| `RPKI_RTR_ASPA_FILE` | 输入 ASPA 文件路径。 | `./data/aspas.txt` | -| `RPKI_RTR_ROUTER_KEY_FILE` | 输入 Router Key 文件路径。 | `./data/router-keys.txt` | -| `RPKI_RTR_TLS_CERT_PATH` | TLS 服务端证书路径。 | `./certs/server.crt` | -| `RPKI_RTR_TLS_KEY_PATH` | TLS 服务端私钥路径。 | `./certs/server.key` | +| `RPKI_RTR_DB_PATH` | RocksDB 路径。 | `./rtr-db` | +| `RPKI_RTR_CCR_DIR` | CCR 目录路径;程序会扫描其中最新的 `.ccr` 文件。 | `./data` | +| `RPKI_RTR_TLS_CERT_PATH` | TLS 服务端证书路径。 | `./certs/server-dns.crt` | +| `RPKI_RTR_TLS_KEY_PATH` | TLS 服务端私钥路径。 | `./certs/server-dns.key` | | `RPKI_RTR_TLS_CLIENT_CA_PATH` | 用于校验 router 客户端证书的 CA 证书路径。 | `./certs/client-ca.crt` | -| `RPKI_RTR_MAX_DELTA` | 保留的最大 delta 条数。 | `100` | -| `RPKI_RTR_REFRESH_INTERVAL_SECS` | 重新加载 VRP 文件的时间间隔,单位秒。 | `300` | -| `RPKI_RTR_MAX_CONNECTIONS` | 最大并发 RTR 连接数。 | `512` | +| `RPKI_RTR_MAX_DELTA` | 最多保留多少条 delta。 | `100` | +| `RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE` | 是否启用“累计 delta 估算 wire size 不小于 snapshot 时,继续裁剪最老 delta”的策略。 | `false` | +| `RPKI_RTR_STRICT_CCR_VALIDATION` | 是否对 CCR 中的非法 VRP / VAP 采用严格模式;`true` 表示整份 CCR 拒绝,`false` 表示跳过非法项并告警。 | `false` | +| `RPKI_RTR_REFRESH_INTERVAL_SECS` | 刷新 CCR 目录并重新加载最新 `.ccr` 的间隔,单位秒。 | `300` | +| `RPKI_RTR_MAX_CONNECTIONS` | 最大并发 RTR 客户端连接数。 | `512` | | `RPKI_RTR_NOTIFY_QUEUE_SIZE` | Serial Notify 广播队列大小。 | `1024` | -| `RPKI_RTR_TCP_KEEPALIVE_SECS` | TCP keepalive 时间,单位秒。设为 `0` 表示禁用。 | `60` | -| `RPKI_RTR_WARN_INSECURE_TCP` | 纯 TCP 模式下是否输出不安全告警。支持布尔值。 | `true` | -| `RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN` | 严格模式:如果 TLS 服务端证书不包含 `subjectAltName dNSName`,则拒绝启动。支持布尔值。 | `false` | +| `RPKI_RTR_TCP_KEEPALIVE_SECS` | TCP keepalive 时间,单位秒;设为 `0` 表示禁用。 | `60` | +| `RPKI_RTR_WARN_INSECURE_TCP` | 纯 TCP 模式下是否输出不安全警告。 | `true` | +| `RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN` | 严格模式:TLS 服务端证书不包含 `subjectAltName dNSName` 时拒绝启动。 | `false` | ### 说明 -- 纯 TCP 模式只应部署在受信任、可控的网络环境中。 +- 纯 TCP 模式只应部署在受信任、可控网络中。 - TLS 模式要求客户端证书认证。 -- 开启严格 TLS 服务端证书模式后,如果服务端证书缺少 `subjectAltName dNSName`,启动时会被拒绝。 -- `RPKI_RTR_TCP_KEEPALIVE_SECS=0` 表示关闭 TCP keepalive;非零值表示在连接整个生命周期内启用 keepalive。 +- 当前输入源是 `CCR` 目录,不再是单独的文本 VRP / ASPA / Router Key 文件。 +- 刷新时会重新扫描 `RPKI_RTR_CCR_DIR`,并选取文件名排序最新的 `.ccr` 文件。 +- `RPKI_RTR_STRICT_CCR_VALIDATION=false` 时,CCR 中的非法 VRP / VAP 会被跳过并输出 warning;`true` 时整份 CCR 更新失败。 +- `RPKI_RTR_TCP_KEEPALIVE_SECS=0` 表示关闭 keepalive;非零值表示在整个连接生命周期内启用 keepalive。 +- `RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE=true` 时,除了 `RPKI_RTR_MAX_DELTA` 的固定条数裁剪外,还会在累计 delta 估算 wire size 不小于 snapshot 时继续删除最老 delta。 ## 启动示例 @@ -52,7 +62,7 @@ TLS / mutual TLS 模式: sh ./scripts/start-rtr-server-tls.sh ``` -如果你想手动设置环境变量,也可以直接这样启动。 +### 手动启动 #### 纯 TCP @@ -60,13 +70,13 @@ sh ./scripts/start-rtr-server-tls.sh export RPKI_RTR_ENABLE_TLS=false export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_DB_PATH=./rtr-db -export RPKI_RTR_VRP_FILE=./data/vrps.txt -export RPKI_RTR_ASPA_FILE=./data/aspas.txt -export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_CCR_DIR=./data +export RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE=false +export RPKI_RTR_STRICT_CCR_VALIDATION=false export RPKI_RTR_TCP_KEEPALIVE_SECS=60 export RPKI_RTR_WARN_INSECURE_TCP=true -cargo run +cargo run --bin rpki ``` #### TLS / mutual TLS @@ -76,9 +86,9 @@ export RPKI_RTR_ENABLE_TLS=true export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_TLS_ADDR=0.0.0.0:324 export RPKI_RTR_DB_PATH=./rtr-db -export RPKI_RTR_VRP_FILE=./data/vrps.txt -export RPKI_RTR_ASPA_FILE=./data/aspas.txt -export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_CCR_DIR=./data +export RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE=false +export RPKI_RTR_STRICT_CCR_VALIDATION=false export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt @@ -86,40 +96,35 @@ export RPKI_RTR_TCP_KEEPALIVE_SECS=60 export RPKI_RTR_WARN_INSECURE_TCP=true export RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN=true -cargo run +cargo run --bin rpki ``` -可直接修改的示例脚本见: +示例脚本: + - [`scripts/start-rtr-server-tcp.sh`](scripts/start-rtr-server-tcp.sh) - [`scripts/start-rtr-server-tls.sh`](scripts/start-rtr-server-tls.sh) - [`scripts/start-rtr-server.sh`](scripts/start-rtr-server.sh) -### ASPA 文件格式 +## CCR 输入说明 -`RPKI_RTR_ASPA_FILE` 当前使用简单文本格式: +当前会从 `RPKI_RTR_CCR_DIR` 指向的目录中扫描最新 `.ccr` 文件,并从中提取: -```text -# customer_asn,provider_asn [provider_asn ...] -64496,64497 64498 -64497,64500 -``` +- `VRP` +- `VAP / ASPA` -### Router Key 文件格式 +测试样例可参考: -`RPKI_RTR_ROUTER_KEY_FILE` 当前使用简单文本格式: +- [`data/20260324T000037Z-sng1.ccr`](data/20260324T000037Z-sng1.ccr) +- [`data/20260324T000138Z-zur1.ccr`](data/20260324T000138Z-zur1.ccr) -```text -# ski_hex,asn,spki_hex -00112233445566778899aabbccddeeff00112233,64496,3013300d06092a864886f70d010101050003020000 -8899aabbccddeeff00112233445566778899aabb,64497,cafebabe -``` - -## RTR Client +## RTR Debug Client 调试用 RTR client 位于: + - [`src/bin/rtr_debug_client/main.rs`](src/bin/rtr_debug_client/main.rs) -它的说明文档位于: +说明文档位于: + - [`src/bin/rtr_debug_client/README.md`](src/bin/rtr_debug_client/README.md) ### Client 启动示例 @@ -142,7 +147,7 @@ cargo run --bin rtr_debug_client -- \ --client-key ./certs/client-good.key ``` -如果要持续观察错误后的行为,可以加: +如果要在收到错误后继续自动轮询,可以加: ```sh --keep-after-error diff --git a/scripts/start-rtr-server-tcp.sh b/scripts/start-rtr-server-tcp.sh index f430cb2..7a72fd7 100644 --- a/scripts/start-rtr-server-tcp.sh +++ b/scripts/start-rtr-server-tcp.sh @@ -5,11 +5,10 @@ export RPKI_RTR_ENABLE_TLS=false export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_DB_PATH=./rtr-db -export RPKI_RTR_VRP_FILE=./data/vrps.txt -export RPKI_RTR_ASPA_FILE=./data/aspas.txt -export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_CCR_DIR=./data export RPKI_RTR_MAX_DELTA=100 +export RPKI_RTR_STRICT_CCR_VALIDATION=false export RPKI_RTR_REFRESH_INTERVAL_SECS=300 export RPKI_RTR_MAX_CONNECTIONS=512 export RPKI_RTR_NOTIFY_QUEUE_SIZE=1024 diff --git a/scripts/start-rtr-server-tls.sh b/scripts/start-rtr-server-tls.sh index af1b2aa..bbf761c 100644 --- a/scripts/start-rtr-server-tls.sh +++ b/scripts/start-rtr-server-tls.sh @@ -6,15 +6,14 @@ export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_TLS_ADDR=0.0.0.0:324 export RPKI_RTR_DB_PATH=./rtr-db -export RPKI_RTR_VRP_FILE=./data/vrps.txt -export RPKI_RTR_ASPA_FILE=./data/aspas.txt -export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_CCR_DIR=./data export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt export RPKI_RTR_MAX_DELTA=100 +export RPKI_RTR_STRICT_CCR_VALIDATION=false export RPKI_RTR_REFRESH_INTERVAL_SECS=300 export RPKI_RTR_MAX_CONNECTIONS=512 export RPKI_RTR_NOTIFY_QUEUE_SIZE=1024 diff --git a/scripts/start-rtr-server.sh b/scripts/start-rtr-server.sh index af1b2aa..64a5502 100644 --- a/scripts/start-rtr-server.sh +++ b/scripts/start-rtr-server.sh @@ -6,9 +6,8 @@ export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_TLS_ADDR=0.0.0.0:324 export RPKI_RTR_DB_PATH=./rtr-db -export RPKI_RTR_VRP_FILE=./data/vrps.txt -export RPKI_RTR_ASPA_FILE=./data/aspas.txt -export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_CCR_DIR=./data +export RPKI_RTR_STRICT_CCR_VALIDATION=false export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key diff --git a/src/bin/rtr_debug_client/README.md b/src/bin/rtr_debug_client/README.md index 9b744c7..3cdd2fb 100644 --- a/src/bin/rtr_debug_client/README.md +++ b/src/bin/rtr_debug_client/README.md @@ -63,7 +63,7 @@ cargo run --bin rtr_debug_client -- [reset|serial - `version`: `1` - `mode`: `reset` - `timeout`: `30` -- `poll`: `60` +- `poll`: `600` ## TCP 示例 @@ -148,7 +148,7 @@ cargo run --bin rtr_debug_client -- \ 等待下一个 PDU 的读取超时时间,单位秒。 - `--poll ` - 在尚未拿到 `EndOfData` timing hint 前,默认使用的自动轮询间隔。 + 在尚未拿到 `EndOfData` timing hint 前,默认使用的自动轮询间隔。默认值为 `600` 秒,对齐 draft 第 6 节的默认 Retry Interval。 - `--keep-after-error` 收到 `ErrorReport` 后不暂停自动轮询。 diff --git a/src/bin/rtr_debug_client/main.rs b/src/bin/rtr_debug_client/main.rs index c59eb0e..d7f03ce 100644 --- a/src/bin/rtr_debug_client/main.rs +++ b/src/bin/rtr_debug_client/main.rs @@ -21,7 +21,7 @@ use crate::pretty::{ use crate::protocol::{PduHeader, PduType, QueryMode}; const DEFAULT_READ_TIMEOUT_SECS: u64 = 30; -const DEFAULT_POLL_INTERVAL_SECS: u64 = 60; +const DEFAULT_POLL_INTERVAL_SECS: u64 = 600; trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {} impl AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {} diff --git a/src/main.rs b/src/main.rs index 47b0478..19c8073 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,8 @@ use anyhow::{anyhow, Result}; use tokio::task::JoinHandle; use tracing::{info, warn}; +use rpki::rtr::ccr::{find_latest_ccr_file, load_ccr_payloads_from_file_with_options, load_ccr_snapshot_from_file}; use rpki::rtr::cache::{RtrCache, SharedRtrCache}; -use rpki::rtr::loader::{load_aspas_from_file, load_router_keys_from_file, load_vrps_from_file}; use rpki::rtr::payload::Timing; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; use rpki::rtr::store::RtrStore; @@ -20,14 +20,14 @@ struct AppConfig { tls_addr: SocketAddr, db_path: String, - vrp_file: String, - aspa_file: String, - router_key_file: String, + ccr_dir: String, tls_cert_path: String, tls_key_path: String, tls_client_ca_path: String, max_delta: u8, + prune_delta_by_snapshot_size: bool, + strict_ccr_validation: bool, refresh_interval: Duration, service_config: RtrServiceConfig, @@ -41,14 +41,14 @@ impl Default for AppConfig { tls_addr: "0.0.0.0:324".parse().expect("invalid default tls_addr"), db_path: "./rtr-db".to_string(), - vrp_file: r"C:\Users\xuxiu\git_code\rpki\data\vrps.txt".to_string(), - aspa_file: "./data/aspas.txt".to_string(), - router_key_file: "./data/router-keys.txt".to_string(), + ccr_dir: "./data".to_string(), tls_cert_path: "./certs/server.crt".to_string(), tls_key_path: "./certs/server.key".to_string(), tls_client_ca_path: "./certs/client-ca.crt".to_string(), max_delta: 100, + prune_delta_by_snapshot_size: false, + strict_ccr_validation: false, refresh_interval: Duration::from_secs(300), service_config: RtrServiceConfig { @@ -82,14 +82,8 @@ impl AppConfig { if let Some(value) = env_var("RPKI_RTR_DB_PATH")? { config.db_path = value; } - if let Some(value) = env_var("RPKI_RTR_VRP_FILE")? { - config.vrp_file = value; - } - if let Some(value) = env_var("RPKI_RTR_ASPA_FILE")? { - config.aspa_file = value; - } - if let Some(value) = env_var("RPKI_RTR_ROUTER_KEY_FILE")? { - config.router_key_file = value; + if let Some(value) = env_var("RPKI_RTR_CCR_DIR")? { + config.ccr_dir = value; } if let Some(value) = env_var("RPKI_RTR_TLS_CERT_PATH")? { config.tls_cert_path = value; @@ -105,6 +99,14 @@ impl AppConfig { .parse() .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_DELTA '{}': {}", value, err))?; } + if let Some(value) = env_var("RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")? { + config.prune_delta_by_snapshot_size = + parse_bool(&value, "RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")?; + } + if let Some(value) = env_var("RPKI_RTR_STRICT_CCR_VALIDATION")? { + config.strict_ccr_validation = + parse_bool(&value, "RPKI_RTR_STRICT_CCR_VALIDATION")?; + } if let Some(value) = env_var("RPKI_RTR_REFRESH_INTERVAL_SECS")? { let secs: u64 = value.parse().map_err(|err| { anyhow!( @@ -185,8 +187,9 @@ fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result JoinHandle<()> { let refresh_interval = config.refresh_interval; - let vrp_file = config.vrp_file.clone(); - let aspa_file = config.aspa_file.clone(); - let router_key_file = config.router_key_file.clone(); + let ccr_dir = config.ccr_dir.clone(); + let strict_ccr_validation = config.strict_ccr_validation; tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); @@ -239,7 +241,7 @@ fn spawn_refresh_task( loop { interval.tick().await; - match load_payloads_from_files(&vrp_file, &aspa_file, &router_key_file) { + match load_payloads_from_latest_ccr(&ccr_dir, strict_ccr_validation) { Ok(payloads) => { let payload_count = payloads.len(); let updated = { @@ -258,8 +260,8 @@ fn spawn_refresh_task( let new_serial = cache.serial(); if new_serial != old_serial { info!( - "RTR cache refresh applied: vrp_file={}, payload_count={}, old_serial={}, new_serial={}", - vrp_file, + "RTR cache refresh applied: ccr_dir={}, payload_count={}, old_serial={}, new_serial={}", + ccr_dir, payload_count, old_serial, new_serial @@ -267,8 +269,8 @@ fn spawn_refresh_task( true } else { info!( - "RTR cache refresh found no change: vrp_file={}, payload_count={}, serial={}", - vrp_file, + "RTR cache refresh found no change: ccr_dir={}, payload_count={}, serial={}", + ccr_dir, payload_count, old_serial ); @@ -288,7 +290,7 @@ fn spawn_refresh_task( } } Err(err) => { - warn!("failed to reload VRPs from file {}: {:?}", vrp_file, err); + warn!("failed to reload CCR payloads from {}: {:?}", ccr_dir, err); } } } @@ -314,10 +316,9 @@ fn log_startup_config(config: &AppConfig) { info!("tls_client_ca_path={}", config.tls_client_ca_path); } - info!("vrp_file={}", config.vrp_file); - info!("aspa_file={}", config.aspa_file); - info!("router_key_file={}", config.router_key_file); + info!("ccr_dir={}", config.ccr_dir); info!("max_delta={}", config.max_delta); + info!("strict_ccr_validation={}", config.strict_ccr_validation); info!( "refresh_interval_secs={}", config.refresh_interval.as_secs() @@ -372,13 +373,49 @@ fn parse_bool(value: &str, name: &str) -> Result { } } -fn load_payloads_from_files( - vrp_file: &str, - aspa_file: &str, - router_key_file: &str, +fn load_payloads_from_latest_ccr( + ccr_dir: &str, + strict_ccr_validation: bool, ) -> Result> { - let mut payloads = load_vrps_from_file(vrp_file)?; - payloads.extend(load_aspas_from_file(aspa_file)?); - payloads.extend(load_router_keys_from_file(router_key_file)?); + let latest = find_latest_ccr_file(ccr_dir)?; + let snapshot = load_ccr_snapshot_from_file(&latest)?; + let vrp_count = snapshot.vrps.len(); + let vap_count = snapshot.vaps.len(); + let produced_at = snapshot.produced_at.clone(); + let conversion = load_ccr_payloads_from_file_with_options(&latest, strict_ccr_validation)?; + let payloads = conversion.payloads; + + if !conversion.invalid_vrps.is_empty() { + warn!( + "CCR load skipped invalid VRPs: file={}, skipped={}, samples={:?}", + latest.display(), + conversion.invalid_vrps.len(), + sample_messages(&conversion.invalid_vrps) + ); + } + + if !conversion.invalid_vaps.is_empty() { + warn!( + "CCR load skipped invalid VAPs/ASPAs: file={}, skipped={}, samples={:?}", + latest.display(), + conversion.invalid_vaps.len(), + sample_messages(&conversion.invalid_vaps) + ); + } + + info!( + "loaded latest CCR snapshot: file={}, produced_at={:?}, vrp_count={}, vap_count={}, payload_count={}, strict_ccr_validation={}", + latest.display(), + produced_at, + vrp_count, + vap_count, + payloads.len(), + strict_ccr_validation + ); + Ok(payloads) } + +fn sample_messages(messages: &[String]) -> Vec<&str> { + messages.iter().take(3).map(String::as_str).collect() +} diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index b056ab0..817c0c0 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -55,6 +55,7 @@ pub struct RtrCache { snapshot: Snapshot, deltas: VecDeque>, max_delta: u8, + prune_delta_by_snapshot_size: bool, timing: Timing, last_update_begin: DualTime, last_update_end: DualTime, @@ -71,6 +72,7 @@ impl Default for RtrCache { snapshot: Snapshot::empty(), deltas: VecDeque::with_capacity(100), max_delta: 100, + prune_delta_by_snapshot_size: false, timing: Timing::default(), last_update_begin: now.clone(), last_update_end: now.clone(), @@ -83,6 +85,7 @@ pub struct RtrCacheBuilder { availability: Option, session_ids: Option, max_delta: Option, + prune_delta_by_snapshot_size: Option, timing: Option, serial: Option, snapshot: Option, @@ -96,6 +99,7 @@ impl RtrCacheBuilder { availability: None, session_ids: None, max_delta: None, + prune_delta_by_snapshot_size: None, timing: None, serial: None, snapshot: None, @@ -119,6 +123,11 @@ impl RtrCacheBuilder { self } + pub fn prune_delta_by_snapshot_size(mut self, v: bool) -> Self { + self.prune_delta_by_snapshot_size = Some(v); + self + } + pub fn timing(mut self, v: Timing) -> Self { self.timing = Some(v); self @@ -147,6 +156,7 @@ impl RtrCacheBuilder { pub fn build(self) -> RtrCache { let now = DualTime::now(); let max_delta = self.max_delta.unwrap_or(100); + let prune_delta_by_snapshot_size = self.prune_delta_by_snapshot_size.unwrap_or(false); let timing = self.timing.unwrap_or_default(); let snapshot = self.snapshot.unwrap_or_else(Snapshot::empty); let deltas = self @@ -167,6 +177,7 @@ impl RtrCacheBuilder { snapshot, deltas, max_delta, + prune_delta_by_snapshot_size, timing, last_update_begin: now.clone(), last_update_end: now, @@ -231,24 +242,41 @@ impl RtrCache { } fn push_delta(&mut self, delta: Arc) { - let dropped_serial = if self.deltas.len() >= self.max_delta as usize { - self.deltas.front().map(|oldest| oldest.serial()) - } else { - None - }; if self.deltas.len() >= self.max_delta as usize { self.deltas.pop_front(); } - debug!( - "RTR cache pushing delta into window: delta_serial={}, announced={}, withdrawn={}, dropped_oldest_serial={:?}, window_size_before={}, max_delta={}", - delta.serial(), - delta.announced().len(), - delta.withdrawn().len(), - dropped_serial, - self.deltas.len(), - self.max_delta - ); self.deltas.push_back(delta); + let mut dropped_serials = Vec::new(); + if self.prune_delta_by_snapshot_size { + let snapshot_wire_size = estimate_snapshot_payload_wire_size(&self.snapshot); + let mut cumulative_delta_wire_size = + estimate_delta_window_payload_wire_size(&self.deltas); + while !self.deltas.is_empty() + && cumulative_delta_wire_size >= snapshot_wire_size + { + if let Some(oldest) = self.deltas.pop_front() { + dropped_serials.push(oldest.serial()); + cumulative_delta_wire_size = + estimate_delta_window_payload_wire_size(&self.deltas); + } + } + debug!( + "RTR cache delta-size pruning evaluated: snapshot_wire_size={}, cumulative_delta_wire_size={}, dropped_serials={:?}", + snapshot_wire_size, + cumulative_delta_wire_size, + dropped_serials + ); + } + debug!( + "RTR cache pushing delta into window: delta_serial={}, announced={}, withdrawn={}, dropped_oldest_serials={:?}, window_size_after={}, max_delta={}, prune_delta_by_snapshot_size={}", + self.deltas.back().map(|d| d.serial()).unwrap_or(0), + self.deltas.back().map(|d| d.announced().len()).unwrap_or(0), + self.deltas.back().map(|d| d.withdrawn().len()).unwrap_or(0), + dropped_serials, + self.deltas.len(), + self.max_delta, + self.prune_delta_by_snapshot_size + ); } fn replace_snapshot(&mut self, snapshot: Snapshot) { @@ -353,8 +381,8 @@ impl RtrCache { return Ok(None); } - self.push_delta(delta.clone()); self.replace_snapshot(new_snapshot.clone()); + self.push_delta(delta.clone()); self.last_update_end = DualTime::now(); let delta_window = self.delta_window(); info!( @@ -574,6 +602,43 @@ impl RtrCache { } } +fn estimate_snapshot_payload_wire_size(snapshot: &Snapshot) -> usize { + snapshot + .payloads_for_rtr() + .iter() + .map(|payload| estimate_payload_wire_size(payload, true)) + .sum() +} + +fn estimate_delta_window_payload_wire_size(deltas: &VecDeque>) -> usize { + deltas + .iter() + .map(|delta| estimate_delta_wire_size(delta)) + .sum() +} + +fn estimate_delta_wire_size(delta: &Delta) -> usize { + delta + .payloads_for_rtr() + .iter() + .map(|(announce, payload)| estimate_payload_wire_size(payload, *announce)) + .sum() +} + +fn estimate_payload_wire_size(payload: &Payload, announce: bool) -> usize { + match payload { + Payload::RouteOrigin(origin) => match origin.prefix().address { + crate::data_model::resources::ip_resources::IPAddress::V4(_) => 20, + crate::data_model::resources::ip_resources::IPAddress::V6(_) => 32, + }, + Payload::RouterKey(key) => 8 + 20 + 4 + key.spki().len(), + Payload::Aspa(aspa) => { + let providers = if announce { aspa.provider_asns().len() } else { 0 }; + 8 + 4 + providers * 4 + } + } +} + #[derive(Debug, Clone, Default)] struct LogicalState { before: Option, diff --git a/src/rtr/cache/store.rs b/src/rtr/cache/store.rs index 4efc655..47396a4 100644 --- a/src/rtr/cache/store.rs +++ b/src/rtr/cache/store.rs @@ -14,10 +14,16 @@ impl RtrCache { self, store: &RtrStore, max_delta: u8, + prune_delta_by_snapshot_size: bool, timing: Timing, file_loader: impl Fn() -> Result>, ) -> Result { - if let Some(cache) = try_restore_from_store(store, max_delta, timing)? { + if let Some(cache) = try_restore_from_store( + store, + max_delta, + prune_delta_by_snapshot_size, + timing, + )? { tracing::info!( "RTR cache restored from store: availability={:?}, session_ids={:?}, serial={}, snapshot(route_origins={}, router_keys={}, aspas={})", cache.availability(), @@ -80,6 +86,7 @@ impl RtrCache { .availability(availability) .session_ids(session_ids) .max_delta(max_delta) + .prune_delta_by_snapshot_size(prune_delta_by_snapshot_size) .timing(timing) .serial(serial) .snapshot(snapshot) @@ -95,7 +102,12 @@ impl RtrCache { } } -fn try_restore_from_store(store: &RtrStore, max_delta: u8, timing: Timing) -> Result> { +fn try_restore_from_store( + store: &RtrStore, + max_delta: u8, + prune_delta_by_snapshot_size: bool, + timing: Timing, +) -> Result> { let snapshot = store.get_snapshot()?; let session_ids = store.get_session_ids()?; let serial = store.get_serial()?; @@ -143,6 +155,7 @@ fn try_restore_from_store(store: &RtrStore, max_delta: u8, timing: Timing) -> Re .availability(availability) .session_ids(session_ids) .max_delta(max_delta) + .prune_delta_by_snapshot_size(prune_delta_by_snapshot_size) .timing(timing) .serial(serial) .snapshot(snapshot) diff --git a/src/rtr/ccr.rs b/src/rtr/ccr.rs new file mode 100644 index 0000000..1cc30fb --- /dev/null +++ b/src/rtr/ccr.rs @@ -0,0 +1,361 @@ +use std::fs; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use der_parser::ber::{BerObject, BerObjectContent}; +use der_parser::der::parse_der; + +use crate::rtr::loader::{ParsedAspa, ParsedVrp, build_aspa, build_route_origin}; +use crate::rtr::payload::Payload; + +const VRPS_INDEX: usize = 3; +const VAPS_INDEX: usize = 4; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ParsedCcrSnapshot { + pub content_type_oid: String, + pub produced_at: Option, + pub vrps: Vec, + pub vaps: Vec, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct CcrPayloadConversion { + pub payloads: Vec, + pub invalid_vrps: Vec, + pub invalid_vaps: Vec, +} + +pub fn load_ccr_snapshot_from_file(path: impl AsRef) -> Result { + let path = path.as_ref(); + let bytes = fs::read(path) + .with_context(|| format!("failed to read CCR file: {}", path.display()))?; + parse_ccr_bytes(&bytes).with_context(|| format!("failed to parse CCR file: {}", path.display())) +} + +pub fn load_ccr_payloads_from_file(path: impl AsRef) -> Result> { + let snapshot = load_ccr_snapshot_from_file(path)?; + snapshot_to_payloads(&snapshot) +} + +pub fn load_ccr_payloads_from_file_with_options( + path: impl AsRef, + strict: bool, +) -> Result { + let snapshot = load_ccr_snapshot_from_file(path)?; + snapshot_to_payloads_with_options(&snapshot, strict) +} + +pub fn find_latest_ccr_file(dir: impl AsRef) -> Result { + let dir = dir.as_ref(); + let mut latest: Option = None; + + for entry in fs::read_dir(dir) + .with_context(|| format!("failed to read CCR directory: {}", dir.display()))? + { + let entry = + entry.with_context(|| format!("failed to iterate CCR directory: {}", dir.display()))?; + let path = entry.path(); + if !path.is_file() { + continue; + } + if path.extension().and_then(|ext| ext.to_str()) != Some("ccr") { + continue; + } + + if latest.as_ref().is_none_or(|current| { + file_name_key(&path) > file_name_key(current) + }) { + latest = Some(path); + } + } + + latest.ok_or_else(|| anyhow!("no .ccr files found in {}", dir.display())) +} + +pub fn snapshot_to_payloads(snapshot: &ParsedCcrSnapshot) -> Result> { + Ok(snapshot_to_payloads_with_options(snapshot, true)?.payloads) +} + +pub fn snapshot_to_payloads_with_options( + snapshot: &ParsedCcrSnapshot, + strict: bool, +) -> Result { + let mut payloads = Vec::with_capacity(snapshot.vrps.len() + snapshot.vaps.len()); + let mut invalid_vrps = Vec::new(); + let mut invalid_vaps = Vec::new(); + + for vrp in &snapshot.vrps { + match build_route_origin(vrp.clone()) { + Ok(origin) => payloads.push(Payload::RouteOrigin(origin)), + Err(err) => { + let msg = format!("invalid CCR VRP: {:?}: {}", vrp, err); + if strict { + return Err(anyhow!(msg)); + } + invalid_vrps.push(msg); + } + } + } + + for vap in &snapshot.vaps { + match build_aspa(vap.clone()) { + Ok(aspa) => payloads.push(Payload::Aspa(aspa)), + Err(err) => { + let msg = format!("invalid CCR VAP/ASPA: {:?}: {}", vap, err); + if strict { + return Err(anyhow!(msg)); + } + invalid_vaps.push(msg); + } + } + } + + Ok(CcrPayloadConversion { + payloads, + invalid_vrps, + invalid_vaps, + }) +} + +pub fn parse_ccr_bytes(bytes: &[u8]) -> Result { + let (rem, root) = parse_der(bytes).map_err(|err| anyhow!("failed to parse CCR DER: {err}"))?; + if !rem.is_empty() { + return Err(anyhow!("CCR DER has {} trailing bytes", rem.len())); + } + + let root_items = sequence_items(&root)?; + if root_items.len() != 2 { + return Err(anyhow!("CCR root must contain exactly 2 items")); + } + + let content_type_oid = match &root_items[0].content { + BerObjectContent::OID(oid) => oid.to_string(), + other => { + return Err(anyhow!( + "CCR root first element must be content type OID, got {other:?}" + )); + } + }; + + let payload = decode_context_wrapped_sequence(&root_items[1])?; + let payload_items = sequence_items(&payload)?; + + let produced_at = payload_items.get(1).and_then(|obj| match &obj.content { + BerObjectContent::GeneralizedTime(t) => Some(t.to_string()), + _ => None, + }); + + let vrps = if let Some(vrps_field) = payload_items.get(VRPS_INDEX) { + parse_vrps(vrps_field)? + } else { + Vec::new() + }; + + let vaps = if let Some(vaps_field) = payload_items.get(VAPS_INDEX) { + parse_vaps(vaps_field)? + } else { + Vec::new() + }; + + Ok(ParsedCcrSnapshot { + content_type_oid, + produced_at, + vrps, + vaps, + }) +} + +fn parse_vrps(field: &BerObject<'_>) -> Result> { + let vrp_state = decode_context_wrapped_sequence(field)?; + let vrp_state_items = sequence_items(&vrp_state)?; + let roa_payload_sets = vrp_state_items + .first() + .ok_or_else(|| anyhow!("ROA payload state missing payload set list"))?; + let roa_payload_sets = sequence_items(roa_payload_sets)?; + + let mut vrps = Vec::new(); + for payload_set in roa_payload_sets { + let payload_set_items = sequence_items(payload_set)?; + if payload_set_items.len() != 2 { + return Err(anyhow!( + "ROAPayloadSet must contain 2 items, got {}", + payload_set_items.len() + )); + } + + let asn = as_u32(&payload_set_items[0], "ROAPayloadSet.asID")?; + let families = sequence_items(&payload_set_items[1])?; + + for family in families { + let family_items = sequence_items(family)?; + if family_items.len() != 2 { + return Err(anyhow!( + "ROAIPAddressFamily must contain 2 items, got {}", + family_items.len() + )); + } + let address_family = as_octets(&family_items[0], "ROAIPAddressFamily.addressFamily")?; + let addresses = sequence_items(&family_items[1])?; + + for address in addresses { + let address_items = sequence_items(address)?; + let (prefix_addr, prefix_len, max_len) = + parse_roa_address(address_family, address_items)?; + vrps.push(ParsedVrp { + prefix_addr, + prefix_len, + max_len, + asn, + }); + } + } + } + + Ok(vrps) +} + +fn parse_vaps(field: &BerObject<'_>) -> Result> { + let vap_state = decode_context_wrapped_sequence(field)?; + let vap_state_items = sequence_items(&vap_state)?; + let aspa_payload_sets = vap_state_items + .first() + .ok_or_else(|| anyhow!("ASPA payload state missing payload set list"))?; + let aspa_payload_sets = sequence_items(aspa_payload_sets)?; + + let mut vaps = Vec::new(); + for payload_set in aspa_payload_sets { + let payload_set_items = sequence_items(payload_set)?; + if payload_set_items.len() != 2 { + return Err(anyhow!( + "ASPAPayloadSet must contain 2 items, got {}", + payload_set_items.len() + )); + } + + let customer_asn = as_u32(&payload_set_items[0], "ASPAPayloadSet.customerASID")?; + let provider_set = sequence_items(&payload_set_items[1])?; + let mut provider_asns = Vec::with_capacity(provider_set.len()); + for provider in provider_set { + provider_asns.push(as_u32(provider, "ASPAPayloadSet.providerASID")?); + } + + vaps.push(ParsedAspa { + customer_asn, + provider_asns, + }); + } + + Ok(vaps) +} + +fn parse_roa_address( + address_family: &[u8], + items: &[BerObject<'_>], +) -> Result<(IpAddr, u8, u8)> { + let address = items + .first() + .ok_or_else(|| anyhow!("ROAIPAddress missing address field"))?; + let (unused_bits, bit_string) = match &address.content { + BerObjectContent::BitString(unused_bits, bit_string) => (*unused_bits, bit_string), + other => { + return Err(anyhow!( + "ROAIPAddress.address must be BIT STRING, got {other:?}" + )); + } + }; + + let prefix_len = (bit_string.data.len() * 8) + .checked_sub(usize::from(unused_bits)) + .ok_or_else(|| anyhow!("invalid ROAIPAddress BIT STRING length"))?; + let prefix_len = u8::try_from(prefix_len) + .map_err(|_| anyhow!("prefix length {prefix_len} does not fit in u8"))?; + + let max_len = match items.get(1) { + Some(value) => { + let max_len = as_u32(value, "ROAIPAddress.maxLength")?; + u8::try_from(max_len) + .map_err(|_| anyhow!("maxLength {max_len} does not fit in u8"))? + } + None => prefix_len, + }; + + let prefix_addr = match address_family { + [0, 1] | [0, 1, ..] => { + let mut octets = [0u8; 4]; + if bit_string.data.len() > octets.len() { + return Err(anyhow!( + "IPv4 ROAIPAddress too long: {} bytes", + bit_string.data.len() + )); + } + octets[..bit_string.data.len()].copy_from_slice(bit_string.data); + IpAddr::V4(Ipv4Addr::from(octets)) + } + [0, 2] | [0, 2, ..] => { + let mut octets = [0u8; 16]; + if bit_string.data.len() > octets.len() { + return Err(anyhow!( + "IPv6 ROAIPAddress too long: {} bytes", + bit_string.data.len() + )); + } + octets[..bit_string.data.len()].copy_from_slice(bit_string.data); + IpAddr::V6(Ipv6Addr::from(octets)) + } + _ => { + return Err(anyhow!( + "unsupported ROA address family octets: {:?}", + address_family + )); + } + }; + + Ok((prefix_addr, prefix_len, max_len)) +} + +fn sequence_items<'a>(obj: &'a BerObject<'a>) -> Result<&'a [BerObject<'a>]> { + match &obj.content { + BerObjectContent::Sequence(items) => Ok(items), + other => Err(anyhow!("expected SEQUENCE, got {other:?}")), + } +} + +fn decode_context_wrapped_sequence<'a>(obj: &'a BerObject<'a>) -> Result> { + match &obj.content { + BerObjectContent::Unknown(any) => { + let (rem, inner) = parse_der(any.data) + .map_err(|err| anyhow!("failed to parse encapsulated DER: {err}"))?; + if !rem.is_empty() { + return Err(anyhow!( + "encapsulated DER has {} trailing bytes", + rem.len() + )); + } + Ok(inner) + } + other => Err(anyhow!( + "expected context-specific wrapped field, got {other:?}" + )), + } +} + +fn as_u32(obj: &BerObject<'_>, field_name: &str) -> Result { + obj.as_u32() + .map_err(|err| anyhow!("{field_name} must be INTEGER fitting in u32: {err}")) +} + +fn as_octets<'a>(obj: &'a BerObject<'a>, field_name: &str) -> Result<&'a [u8]> { + match &obj.content { + BerObjectContent::OctetString(bytes) => Ok(bytes), + other => Err(anyhow!("{field_name} must be OCTET STRING, got {other:?}")), + } +} + +fn file_name_key(path: &Path) -> String { + path.file_name() + .and_then(|name| name.to_str()) + .map(|name| name.to_ascii_lowercase()) + .unwrap_or_default() +} diff --git a/src/rtr/mod.rs b/src/rtr/mod.rs index a085a4a..1a86bc3 100644 --- a/src/rtr/mod.rs +++ b/src/rtr/mod.rs @@ -7,3 +7,4 @@ pub mod error_type; pub mod state; pub mod server; pub mod loader; +pub mod ccr; diff --git a/src/rtr/server/connection.rs b/src/rtr/server/connection.rs index d8e63d5..088899d 100644 --- a/src/rtr/server/connection.rs +++ b/src/rtr/server/connection.rs @@ -7,7 +7,7 @@ use std::sync::{ use anyhow::{Context, Result, anyhow}; use tokio::net::TcpStream; use tokio::sync::{broadcast, watch, OwnedSemaphorePermit}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use x509_parser::extensions::GeneralName; use x509_parser::prelude::{FromDer, X509Certificate}; @@ -52,11 +52,7 @@ pub async fn handle_tcp_connection( shutdown_rx: watch::Receiver, ) -> Result<()> { let session = RtrSession::new(cache, stream, notify_rx, shutdown_rx); - - if let Err(err) = session.run().await { - error!("RTR TCP session run failed for {}: {:?}", peer_addr, err); - return Err(err); - } + session.run().await?; info!("RTR TCP session completed normally for {}", peer_addr); Ok(()) @@ -81,16 +77,28 @@ pub async fn handle_tls_connection( info!("RTR TLS client certificate validated for {}", peer_addr); let session = RtrSession::new(cache, tls_stream, notify_rx, shutdown_rx); - - if let Err(err) = session.run().await { - error!("RTR TLS session run failed for {}: {:?}", peer_addr, err); - return Err(err); - } + session.run().await?; info!("RTR TLS session completed normally for {}", peer_addr); Ok(()) } +pub fn is_expected_disconnect(err: &anyhow::Error) -> bool { + err.chain().any(|cause| { + if let Some(io_err) = cause.downcast_ref::() { + return matches!( + io_err.kind(), + std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::UnexpectedEof + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::NotConnected + ) || io_err.raw_os_error() == Some(10054); + } + false + }) +} + fn verify_peer_certificate_ip( tls_stream: &tokio_rustls::server::TlsStream, peer_ip: IpAddr, diff --git a/src/rtr/server/listener.rs b/src/rtr/server/listener.rs index db46fa5..700e746 100644 --- a/src/rtr/server/listener.rs +++ b/src/rtr/server/listener.rs @@ -16,7 +16,12 @@ use rustls::ServerConfig; use tokio_rustls::TlsAcceptor; use crate::rtr::cache::SharedRtrCache; -use crate::rtr::server::connection::{ConnectionGuard, handle_tcp_connection, handle_tls_connection}; +use crate::rtr::server::connection::{ + ConnectionGuard, + handle_tcp_connection, + handle_tls_connection, + is_expected_disconnect, +}; use crate::rtr::server::config::RtrServiceConfig; use crate::rtr::server::tls::load_rustls_server_config_with_options; @@ -136,12 +141,21 @@ impl RtrServer { if let Err(err) = handle_tcp_connection(cache, stream, peer_addr, notify_rx, shutdown_rx).await { - warn!( - "RTR TCP session closed with error: peer_addr={}, active_connections={}, err={:?}", - peer_addr, - guard.active_count(), - err - ); + if is_expected_disconnect(&err) { + info!( + "RTR TCP session closed by peer: peer_addr={}, active_connections={}, err={}", + peer_addr, + guard.active_count(), + err + ); + } else { + warn!( + "RTR TCP session closed with error: peer_addr={}, active_connections={}, err={}", + peer_addr, + guard.active_count(), + err + ); + } } else { info!( "RTR TCP session closed cleanly: peer_addr={}, active_connections={}", @@ -250,12 +264,21 @@ impl RtrServer { notify_rx, shutdown_rx, ).await { - warn!( - "RTR TLS session closed with error: peer_addr={}, active_connections={}, err={:?}", - peer_addr, - guard.active_count(), - err - ); + if is_expected_disconnect(&err) { + info!( + "RTR TLS session closed by peer: peer_addr={}, active_connections={}, err={}", + peer_addr, + guard.active_count(), + err + ); + } else { + warn!( + "RTR TLS session closed with error: peer_addr={}, active_connections={}, err={}", + peer_addr, + guard.active_count(), + err + ); + } } else { info!( "RTR TLS session closed cleanly: peer_addr={}, active_connections={}", diff --git a/tests/test_cache.rs b/tests/test_cache.rs index 46de18a..c82897d 100644 --- a/tests/test_cache.rs +++ b/tests/test_cache.rs @@ -15,7 +15,7 @@ use rpki::rtr::cache::{ validate_payload_updates_for_rtr, validate_payloads_for_rtr, }; -use rpki::rtr::payload::{Aspa, Payload, Timing}; +use rpki::rtr::payload::{Aspa, Payload, RouterKey, Ski, Timing}; use rpki::rtr::store::RtrStore; fn delta_to_string(delta: &Delta) -> String { @@ -118,7 +118,7 @@ async fn init_keeps_cache_running_when_file_loader_returns_no_data() { let store = RtrStore::open(dir.path()).unwrap(); let cache = rpki::rtr::cache::RtrCache::default() - .init(&store, 16, Timing::new(600, 600, 7200), || Ok(vec![])) + .init(&store, 16, false, Timing::new(600, 600, 7200), || Ok(vec![])) .unwrap(); assert!(!cache.is_data_available()); @@ -188,7 +188,7 @@ async fn init_restores_wraparound_delta_window_from_store() { .unwrap(); let cache = rpki::rtr::cache::RtrCache::default() - .init(&store, 16, Timing::new(600, 600, 7200), || Ok(Vec::new())) + .init(&store, 16, false, Timing::new(600, 600, 7200), || Ok(Vec::new())) .unwrap(); match cache.get_deltas_since(u32::MAX.wrapping_sub(1)) { @@ -200,6 +200,47 @@ async fn init_restores_wraparound_delta_window_from_store() { } } +#[tokio::test] +async fn update_prunes_delta_window_when_cumulative_delta_size_reaches_snapshot_size() { + let dir = tempfile::tempdir().unwrap(); + let store = RtrStore::open(dir.path()).unwrap(); + let valid_spki = vec![ + 0x30, 0x13, 0x30, 0x0d, 0x06, 0x09, 0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x01, + 0x01, 0x01, 0x05, 0x00, 0x03, 0x02, 0x00, 0x00, + ]; + + let initial_snapshot = Snapshot::from_payloads(vec![Payload::RouterKey(RouterKey::new( + Ski::default(), + Asn::from(64496u32), + valid_spki.clone(), + ))]); + + let mut cache = RtrCacheBuilder::new() + .availability(CacheAvailability::Ready) + .session_ids(SessionIds::from_array([42, 43, 44])) + .serial(1) + .snapshot(initial_snapshot) + .max_delta(16) + .prune_delta_by_snapshot_size(true) + .timing(Timing::new(600, 600, 7200)) + .build(); + + cache + .update(vec![Payload::RouterKey(RouterKey::new( + Ski::from_bytes([1u8; 20]), + Asn::from(64496u32), + valid_spki, + ))], &store) + .unwrap(); + + match cache.get_deltas_since(1) { + SerialResult::ResetRequired => {} + _ => panic!( + "expected delta window to be pruned when cumulative delta size exceeds snapshot size" + ), + } +} + /// Snapshot::diff() ? /// old_snapshot ?new_snapshot announced?withdrawn? #[test] diff --git a/tests/test_ccr.rs b/tests/test_ccr.rs new file mode 100644 index 0000000..10463e6 --- /dev/null +++ b/tests/test_ccr.rs @@ -0,0 +1,71 @@ +use std::fs; +use std::path::PathBuf; + +use rpki::rtr::ccr::{ + ParsedCcrSnapshot, + find_latest_ccr_file, + load_ccr_snapshot_from_file, + snapshot_to_payloads_with_options, +}; +use rpki::rtr::loader::{ParsedAspa, ParsedVrp}; +use tempfile::tempdir; + +fn fixture_path(name: &str) -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data").join(name) +} + +#[test] +#[ignore = "manual CCR loader smoke test for local samples"] +fn ccr_loader_smoke() { + let snapshot = load_ccr_snapshot_from_file(fixture_path("20260324T000037Z-sng1.ccr")) + .expect("load CCR snapshot"); + + println!("content_type_oid: {}", snapshot.content_type_oid); + println!("produced_at : {:?}", snapshot.produced_at); + println!("vrp_count : {}", snapshot.vrps.len()); + println!("vap_count : {}", snapshot.vaps.len()); + println!("first_vrp : {:?}", snapshot.vrps.first()); + println!("first_vap : {:?}", snapshot.vaps.first()); +} + +#[test] +fn find_latest_ccr_file_picks_latest_name() { + let dir = tempdir().expect("create temp dir"); + let older = dir.path().join("20260324T000037Z-sng1.ccr"); + let newer = dir.path().join("20260324T000138Z-zur1.ccr"); + let ignored = dir.path().join("notes.txt"); + + fs::write(&older, b"older").expect("write older ccr"); + fs::write(&newer, b"newer").expect("write newer ccr"); + fs::write(&ignored, b"ignored").expect("write ignored file"); + + let latest = find_latest_ccr_file(dir.path()).expect("find latest ccr"); + + assert_eq!(latest, newer); +} + +#[test] +fn snapshot_to_payloads_with_options_skips_invalid_aspa_when_not_strict() { + let snapshot = ParsedCcrSnapshot { + content_type_oid: "1.2.840.113549.1.9.16.1.54".to_string(), + produced_at: Some("20260324000037Z".to_string()), + vrps: vec![ParsedVrp { + prefix_addr: "192.0.2.0".parse().expect("parse IPv4"), + prefix_len: 24, + max_len: 24, + asn: 64496, + }], + vaps: vec![ParsedAspa { + customer_asn: 174, + provider_asns: vec![0], + }], + }; + + let conversion = + snapshot_to_payloads_with_options(&snapshot, false).expect("non-strict conversion"); + + assert_eq!(conversion.payloads.len(), 1); + assert!(conversion.invalid_vrps.is_empty()); + assert_eq!(conversion.invalid_vaps.len(), 1); + assert!(conversion.invalid_vaps[0].contains("provider list must not contain AS0")); +}