From 7432fbcac4df34644f3c8073a93d97ec29b51cd3 Mon Sep 17 00:00:00 2001 From: "xiuting.xu" Date: Thu, 26 Mar 2026 10:04:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84routerkey=E5=92=8Caspa?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 168 ++++++++++++++------ data/aspas.txt | 3 + data/router-keys.txt | 3 + data/vrps.txt | 9 +- scripts/start-rtr-server-tcp.sh | 20 +++ scripts/start-rtr-server-tls.sh | 26 ++++ scripts/start-rtr-server.sh | 26 ++++ src/bin/rtr_debug_client/README.md | 14 +- src/bin/rtr_debug_client/main.rs | 225 +++++++++++++++++++-------- src/bin/rtr_debug_client/pretty.rs | 84 +++++++--- src/bin/rtr_debug_client/protocol.rs | 14 +- src/main.rs | 37 ++++- src/rtr/loader.rs | 221 ++++++++++++++++++++------ src/rtr/payload.rs | 44 ++++++ src/rtr/pdu.rs | 218 ++++++++++++++++++++++++++ src/rtr/server/listener.rs | 6 - src/rtr/session.rs | 21 ++- tests/test_loader.rs | 123 +++++++++++++++ tests/test_pdu.rs | 60 +++++++ tests/test_session.rs | 38 +++++ 20 files changed, 1156 insertions(+), 204 deletions(-) create mode 100644 data/aspas.txt create mode 100644 data/router-keys.txt create mode 100644 scripts/start-rtr-server-tcp.sh create mode 100644 scripts/start-rtr-server-tls.sh create mode 100644 scripts/start-rtr-server.sh create mode 100644 tests/test_loader.rs diff --git a/README.md b/README.md index 853e4a5..46e3139 100644 --- a/README.md +++ b/README.md @@ -1,64 +1,84 @@ # RPKI RTR Server -Default runtime target: Ubuntu/Linux. Windows is only used during development. - -## Tests - -```bash -cargo test -``` - -To show test output: - -```bash -cargo test -- --nocapture -``` +默认运行目标平台:Ubuntu/Linux。 ## RTR Server -The RTR server binary reads its runtime configuration from environment variables. -If an environment variable is not set, the built-in default from `src/main.rs` -is used. +RTR Server 的运行配置通过环境变量读取。如果某个环境变量没有设置,则使用 +[`src/main.rs`](src/main.rs) 中的内置默认值。 -### Environment Variables +### 环境变量 -| Variable | Description | Example | +| 变量名 | 说明 | 示例 | | --- | --- | --- | -| `RPKI_RTR_ENABLE_TLS` | Enable TLS listener in addition to TCP. Accepts `true/false`, `1/0`, `yes/no`, `on/off`. | `true` | -| `RPKI_RTR_TCP_ADDR` | TCP bind address. | `0.0.0.0:3323` | -| `RPKI_RTR_TLS_ADDR` | TLS bind address. | `0.0.0.0:3324` | -| `RPKI_RTR_DB_PATH` | RTR RocksDB path. | `./rtr-db` | -| `RPKI_RTR_VRP_FILE` | Input VRP file path. | `./data/vrps.txt` | -| `RPKI_RTR_TLS_CERT_PATH` | TLS server certificate path. | `./certs/server.crt` | -| `RPKI_RTR_TLS_KEY_PATH` | TLS server private key path. | `./certs/server.key` | -| `RPKI_RTR_TLS_CLIENT_CA_PATH` | Client CA certificate path used to verify router certificates. | `./certs/client-ca.crt` | -| `RPKI_RTR_MAX_DELTA` | Maximum retained delta count. | `100` | -| `RPKI_RTR_REFRESH_INTERVAL_SECS` | VRP reload interval in seconds. | `300` | -| `RPKI_RTR_MAX_CONNECTIONS` | Maximum concurrent RTR connections. | `512` | -| `RPKI_RTR_NOTIFY_QUEUE_SIZE` | Broadcast queue size for serial notify events. | `1024` | -| `RPKI_RTR_TCP_KEEPALIVE_SECS` | TCP keepalive time in seconds. Set `0` to disable. | `60` | -| `RPKI_RTR_WARN_INSECURE_TCP` | Emit a warning when plain TCP is enabled. Accepts boolean values. | `true` | -| `RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN` | Strict mode: reject TLS server certificates that do not contain a `subjectAltName dNSName`. Accepts boolean values. | `false` | +| `RPKI_RTR_ENABLE_TLS` | 是否额外启用 TLS 监听。支持 `true/false`、`1/0`、`yes/no`、`on/off`。 | `true` | +| `RPKI_RTR_TCP_ADDR` | TCP 监听地址。 | `0.0.0.0:323` | +| `RPKI_RTR_TLS_ADDR` | TLS 监听地址。 | `0.0.0.0:324` | +| `RPKI_RTR_DB_PATH` | RTR 使用的 RocksDB 路径。 | `./rtr-db` | +| `RPKI_RTR_VRP_FILE` | 输入 VRP 文件路径。 | `./data/vrps.txt` | +| `RPKI_RTR_ASPA_FILE` | 输入 ASPA 文件路径。 | `./data/aspas.txt` | +| `RPKI_RTR_ROUTER_KEY_FILE` | 输入 Router Key 文件路径。 | `./data/router-keys.txt` | +| `RPKI_RTR_TLS_CERT_PATH` | TLS 服务端证书路径。 | `./certs/server.crt` | +| `RPKI_RTR_TLS_KEY_PATH` | TLS 服务端私钥路径。 | `./certs/server.key` | +| `RPKI_RTR_TLS_CLIENT_CA_PATH` | 用于校验 router 客户端证书的 CA 证书路径。 | `./certs/client-ca.crt` | +| `RPKI_RTR_MAX_DELTA` | 保留的最大 delta 条数。 | `100` | +| `RPKI_RTR_REFRESH_INTERVAL_SECS` | 重新加载 VRP 文件的时间间隔,单位秒。 | `300` | +| `RPKI_RTR_MAX_CONNECTIONS` | 最大并发 RTR 连接数。 | `512` | +| `RPKI_RTR_NOTIFY_QUEUE_SIZE` | Serial Notify 广播队列大小。 | `1024` | +| `RPKI_RTR_TCP_KEEPALIVE_SECS` | TCP keepalive 时间,单位秒。设为 `0` 表示禁用。 | `60` | +| `RPKI_RTR_WARN_INSECURE_TCP` | 纯 TCP 模式下是否输出不安全告警。支持布尔值。 | `true` | +| `RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN` | 严格模式:如果 TLS 服务端证书不包含 `subjectAltName dNSName`,则拒绝启动。支持布尔值。 | `false` | -### Notes +### 说明 -- Plain TCP should only be used on a trusted and controlled network. -- TLS mode requires client certificate authentication. -- In strict TLS server certificate mode, a server certificate without - `subjectAltName dNSName` will be rejected during startup. -- `RPKI_RTR_TCP_KEEPALIVE_SECS=0` disables TCP keepalive. Any non-zero value - enables keepalive for the lifetime of each accepted socket. +- 纯 TCP 模式只应部署在受信任、可控的网络环境中。 +- TLS 模式要求客户端证书认证。 +- 开启严格 TLS 服务端证书模式后,如果服务端证书缺少 `subjectAltName dNSName`,启动时会被拒绝。 +- `RPKI_RTR_TCP_KEEPALIVE_SECS=0` 表示关闭 TCP keepalive;非零值表示在连接整个生命周期内启用 keepalive。 -## Example Startup +## 启动示例 ### Bash +纯 TCP 模式: + ```sh -export RPKI_RTR_ENABLE_TLS=true -export RPKI_RTR_TCP_ADDR=0.0.0.0:3323 -export RPKI_RTR_TLS_ADDR=0.0.0.0:3324 +sh ./scripts/start-rtr-server-tcp.sh +``` + +TLS / mutual TLS 模式: + +```sh +sh ./scripts/start-rtr-server-tls.sh +``` + +如果你想手动设置环境变量,也可以直接这样启动。 + +#### 纯 TCP + +```sh +export RPKI_RTR_ENABLE_TLS=false +export RPKI_RTR_TCP_ADDR=0.0.0.0:323 export RPKI_RTR_DB_PATH=./rtr-db export RPKI_RTR_VRP_FILE=./data/vrps.txt +export RPKI_RTR_ASPA_FILE=./data/aspas.txt +export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt +export RPKI_RTR_TCP_KEEPALIVE_SECS=60 +export RPKI_RTR_WARN_INSECURE_TCP=true + +cargo run +``` + +#### TLS / mutual TLS + +```sh +export RPKI_RTR_ENABLE_TLS=true +export RPKI_RTR_TCP_ADDR=0.0.0.0:323 +export RPKI_RTR_TLS_ADDR=0.0.0.0:324 +export RPKI_RTR_DB_PATH=./rtr-db +export RPKI_RTR_VRP_FILE=./data/vrps.txt +export RPKI_RTR_ASPA_FILE=./data/aspas.txt +export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt @@ -69,5 +89,61 @@ export RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN=true cargo run ``` -A ready-to-edit example script is provided at -[`scripts/start-rtr-server.sh`](/C:/Users/xuxiu/git_code/rpki/scripts/start-rtr-server.sh). +可直接修改的示例脚本见: +- [`scripts/start-rtr-server-tcp.sh`](scripts/start-rtr-server-tcp.sh) +- [`scripts/start-rtr-server-tls.sh`](scripts/start-rtr-server-tls.sh) +- [`scripts/start-rtr-server.sh`](scripts/start-rtr-server.sh) + +### ASPA 文件格式 + +`RPKI_RTR_ASPA_FILE` 当前使用简单文本格式: + +```text +# customer_asn,provider_asn [provider_asn ...] +64496,64497 64498 +64497,64500 +``` + +### Router Key 文件格式 + +`RPKI_RTR_ROUTER_KEY_FILE` 当前使用简单文本格式: + +```text +# ski_hex,asn,spki_hex +00112233445566778899aabbccddeeff00112233,64496,3013300d06092a864886f70d010101050003020000 +8899aabbccddeeff00112233445566778899aabb,64497,cafebabe +``` + +## RTR Client + +调试用 RTR client 位于: +- [`src/bin/rtr_debug_client/main.rs`](src/bin/rtr_debug_client/main.rs) + +它的说明文档位于: +- [`src/bin/rtr_debug_client/README.md`](src/bin/rtr_debug_client/README.md) + +### Client 启动示例 + +连接纯 TCP RTR server: + +```sh +cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset +``` + +连接 TLS RTR server: + +```sh +cargo run --bin rtr_debug_client -- \ + 127.0.0.1:324 1 reset \ + --tls \ + --ca-cert ./certs/client-ca.crt \ + --server-name localhost \ + --client-cert ./certs/client-good.crt \ + --client-key ./certs/client-good.key +``` + +如果要持续观察错误后的行为,可以加: + +```sh +--keep-after-error +``` diff --git a/data/aspas.txt b/data/aspas.txt new file mode 100644 index 0000000..7ea98bc --- /dev/null +++ b/data/aspas.txt @@ -0,0 +1,3 @@ +# customer_asn,provider_asn [provider_asn ...] +64496,64497 64498 +64497,64500 diff --git a/data/router-keys.txt b/data/router-keys.txt new file mode 100644 index 0000000..f833e46 --- /dev/null +++ b/data/router-keys.txt @@ -0,0 +1,3 @@ +# ski_hex,asn,spki_hex +00112233445566778899aabbccddeeff00112233,64496,3013300d06092a864886f70d010101050003020000 +8899aabbccddeeff00112233445566778899aabb,64497,3013300d06092a864886f70d010101050003020000 diff --git a/data/vrps.txt b/data/vrps.txt index b479756..0e7078f 100644 --- a/data/vrps.txt +++ b/data/vrps.txt @@ -1,4 +1,7 @@ # prefix,max_len,asn -10.0.0.0/24,25,65001 -10.0.1.0/24,24,65022 -2001:db8::/32,64,65003 \ No newline at end of file +10.0.0.0/24,24,65001 +10.0.1.0/24,24,65002 +10.0.2.0/24,24,65004 +192.0.2.0/24,24,64496 +2001:db8::/32,48,65003 +2001:db8:1::/48,48,65007 \ No newline at end of file diff --git a/scripts/start-rtr-server-tcp.sh b/scripts/start-rtr-server-tcp.sh new file mode 100644 index 0000000..f430cb2 --- /dev/null +++ b/scripts/start-rtr-server-tcp.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env sh +set -eu + +export RPKI_RTR_ENABLE_TLS=false +export RPKI_RTR_TCP_ADDR=0.0.0.0:323 + +export RPKI_RTR_DB_PATH=./rtr-db +export RPKI_RTR_VRP_FILE=./data/vrps.txt +export RPKI_RTR_ASPA_FILE=./data/aspas.txt +export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt + +export RPKI_RTR_MAX_DELTA=100 +export RPKI_RTR_REFRESH_INTERVAL_SECS=300 +export RPKI_RTR_MAX_CONNECTIONS=512 +export RPKI_RTR_NOTIFY_QUEUE_SIZE=1024 + +export RPKI_RTR_TCP_KEEPALIVE_SECS=60 +export RPKI_RTR_WARN_INSECURE_TCP=true + +cargo run diff --git a/scripts/start-rtr-server-tls.sh b/scripts/start-rtr-server-tls.sh new file mode 100644 index 0000000..af1b2aa --- /dev/null +++ b/scripts/start-rtr-server-tls.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env sh +set -eu + +export RPKI_RTR_ENABLE_TLS=true +export RPKI_RTR_TCP_ADDR=0.0.0.0:323 +export RPKI_RTR_TLS_ADDR=0.0.0.0:324 + +export RPKI_RTR_DB_PATH=./rtr-db +export RPKI_RTR_VRP_FILE=./data/vrps.txt +export RPKI_RTR_ASPA_FILE=./data/aspas.txt +export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt + +export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt +export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key +export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt + +export RPKI_RTR_MAX_DELTA=100 +export RPKI_RTR_REFRESH_INTERVAL_SECS=300 +export RPKI_RTR_MAX_CONNECTIONS=512 +export RPKI_RTR_NOTIFY_QUEUE_SIZE=1024 + +export RPKI_RTR_TCP_KEEPALIVE_SECS=60 +export RPKI_RTR_WARN_INSECURE_TCP=true +export RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN=true + +cargo run diff --git a/scripts/start-rtr-server.sh b/scripts/start-rtr-server.sh new file mode 100644 index 0000000..af1b2aa --- /dev/null +++ b/scripts/start-rtr-server.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env sh +set -eu + +export RPKI_RTR_ENABLE_TLS=true +export RPKI_RTR_TCP_ADDR=0.0.0.0:323 +export RPKI_RTR_TLS_ADDR=0.0.0.0:324 + +export RPKI_RTR_DB_PATH=./rtr-db +export RPKI_RTR_VRP_FILE=./data/vrps.txt +export RPKI_RTR_ASPA_FILE=./data/aspas.txt +export RPKI_RTR_ROUTER_KEY_FILE=./data/router-keys.txt + +export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt +export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key +export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt + +export RPKI_RTR_MAX_DELTA=100 +export RPKI_RTR_REFRESH_INTERVAL_SECS=300 +export RPKI_RTR_MAX_CONNECTIONS=512 +export RPKI_RTR_NOTIFY_QUEUE_SIZE=1024 + +export RPKI_RTR_TCP_KEEPALIVE_SECS=60 +export RPKI_RTR_WARN_INSECURE_TCP=true +export RPKI_RTR_REQUIRE_TLS_SERVER_DNS_NAME_SAN=true + +cargo run diff --git a/src/bin/rtr_debug_client/README.md b/src/bin/rtr_debug_client/README.md index 8ad68a4..9b744c7 100644 --- a/src/bin/rtr_debug_client/README.md +++ b/src/bin/rtr_debug_client/README.md @@ -59,7 +59,7 @@ cargo run --bin rtr_debug_client -- [reset|serial ``` 默认值: -- `addr`: `127.0.0.1:3323` +- `addr`: `127.0.0.1:323` - `version`: `1` - `mode`: `reset` - `timeout`: `30` @@ -70,19 +70,19 @@ cargo run --bin rtr_debug_client -- [reset|serial 发送 `Reset Query`: ```sh -cargo run --bin rtr_debug_client -- 127.0.0.1:3323 1 reset +cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset ``` 发送 `Serial Query`: ```sh -cargo run --bin rtr_debug_client -- 127.0.0.1:3323 1 serial 42 100 +cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 serial 42 100 ``` 持续观察错误路径: ```sh -cargo run --bin rtr_debug_client -- 127.0.0.1:3323 1 reset --keep-after-error +cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset --keep-after-error ``` ## TLS 示例 @@ -91,7 +91,7 @@ cargo run --bin rtr_debug_client -- 127.0.0.1:3323 1 reset --keep-after-error ```sh cargo run --bin rtr_debug_client -- \ - 127.0.0.1:3324 1 reset \ + 127.0.0.1:324 1 reset \ --tls \ --ca-cert tests/fixtures/tls/client-ca.crt \ --server-name localhost @@ -101,7 +101,7 @@ cargo run --bin rtr_debug_client -- \ ```sh cargo run --bin rtr_debug_client -- \ - 127.0.0.1:3324 1 reset \ + 127.0.0.1:324 1 reset \ --tls \ --ca-cert tests/fixtures/tls/client-ca.crt \ --server-name localhost \ @@ -113,7 +113,7 @@ cargo run --bin rtr_debug_client -- \ ```sh cargo run --bin rtr_debug_client -- \ - 127.0.0.1:3324 1 reset \ + 127.0.0.1:324 1 reset \ --tls \ --ca-cert tests/fixtures/tls/client-ca.crt \ --server-name localhost \ diff --git a/src/bin/rtr_debug_client/main.rs b/src/bin/rtr_debug_client/main.rs index 3fb6ae3..c59eb0e 100644 --- a/src/bin/rtr_debug_client/main.rs +++ b/src/bin/rtr_debug_client/main.rs @@ -16,7 +16,7 @@ mod protocol; use crate::wire::{read_pdu, send_reset_query, send_serial_query}; use crate::pretty::{ - parse_end_of_data_info, parse_serial_notify_serial, print_pdu, + parse_end_of_data_info, parse_serial_notify_serial, print_pdu, print_raw_pdu, }; use crate::protocol::{PduHeader, PduType, QueryMode}; @@ -53,10 +53,6 @@ async fn main() -> io::Result<()> { println!(); print_help(); - let stream = connect_stream(&config).await?; - println!("connected to {}", config.addr); - - let (mut reader, mut writer) = tokio_io::split(stream); let mut state = ClientState::new( config.version, config.read_timeout_secs, @@ -64,85 +60,168 @@ async fn main() -> io::Result<()> { config.keep_after_error, ); - match config.mode { - QueryMode::Reset => { - send_reset_query(&mut writer, config.version).await?; - println!("sent Reset Query"); - } - QueryMode::Serial { session_id, serial } => { - state.session_id = Some(session_id); - state.serial = Some(serial); - send_serial_query(&mut writer, config.version, session_id, serial).await?; - println!("sent Serial Query"); - } - } - - state.schedule_next_poll(); - println!(); - let stdin = tokio::io::stdin(); let mut stdin_lines = BufReader::new(stdin).lines(); loop { - let poll_sleep = tokio::time::sleep_until(state.poll_deadline()); - tokio::pin!(poll_sleep); + let stream = loop { + match connect_stream(&config).await { + Ok(stream) => { + println!("connected to {}", config.addr); + break stream; + } + Err(err) => { + let delay = state.reconnect_delay_secs(); + eprintln!( + "connect failed: {}. retry after {}s", + err, + delay + ); + tokio::time::sleep(Duration::from_secs(delay)).await; + } + } + }; - tokio::select! { - line = stdin_lines.next_line() => { - match line { - Ok(Some(line)) => { - let should_quit = handle_console_command( - &line, - &mut writer, - &mut state, - ).await?; + let (mut reader, mut writer) = tokio_io::split(stream); + send_resume_query(&mut writer, &mut state, &config.mode).await?; + state.schedule_next_poll(); + println!(); - if should_quit { - println!("quit requested, closing client."); - break; + let reconnect = loop { + let poll_sleep = tokio::time::sleep_until(state.poll_deadline()); + tokio::pin!(poll_sleep); + + tokio::select! { + line = stdin_lines.next_line() => { + match line { + Ok(Some(line)) => { + match handle_console_command( + &line, + &mut writer, + &mut state, + ).await { + Ok(should_quit) => { + if should_quit { + println!("quit requested, closing client."); + return Ok(()); + } + } + Err(err) if should_reconnect(&err) => { + eprintln!("command failed due to disconnected transport: {}", err); + break true; + } + Err(err) => return Err(err), + } + } + Ok(None) => { + println!("stdin closed, continue network loop."); + } + Err(err) => { + eprintln!("read stdin failed: {}", err); } } - Ok(None) => { - println!("stdin closed, continue network loop."); + } + + _ = &mut poll_sleep => { + match handle_poll_tick(&mut writer, &mut state).await { + Ok(()) => state.schedule_next_poll(), + Err(err) if should_reconnect(&err) => { + eprintln!("auto poll failed due to disconnected transport: {}", err); + break true; + } + Err(err) => return Err(err), } - Err(err) => { - eprintln!("read stdin failed: {}", err); + } + + read_result = timeout( + Duration::from_secs(state.read_timeout_secs), + read_pdu(&mut reader) + ) => { + match read_result { + Ok(Ok(pdu)) => { + print_raw_pdu(&pdu.header, &pdu.body); + print_pdu(&pdu.header, &pdu.body); + match handle_incoming_pdu(&mut writer, &mut state, &pdu.header, &pdu.body).await { + Ok(()) => {} + Err(err) if should_reconnect(&err) => { + eprintln!("connection dropped while handling incoming PDU: {}", err); + break true; + } + Err(err) => return Err(err), + } + } + Ok(Err(err)) => { + eprintln!("read PDU failed: {}", err); + if should_reconnect(&err) { + break true; + } + return Err(err); + } + Err(_) => { + println!( + "[timeout] no PDU received in {}s, connection kept open.", + state.read_timeout_secs + ); + } } } } + }; - _ = &mut poll_sleep => { - handle_poll_tick(&mut writer, &mut state).await?; - state.schedule_next_poll(); - } - - read_result = timeout( - Duration::from_secs(state.read_timeout_secs), - read_pdu(&mut reader) - ) => { - match read_result { - Ok(Ok(pdu)) => { - print_pdu(&pdu.header, &pdu.body); - handle_incoming_pdu(&mut writer, &mut state, &pdu.header, &pdu.body).await?; - } - Ok(Err(err)) => { - eprintln!("read PDU failed: {}", err); - return Err(err); - } - Err(_) => { - println!( - "[timeout] no PDU received in {}s, connection kept open.", - state.read_timeout_secs - ); - } - } - } + if reconnect { + let delay = state.reconnect_delay_secs(); + state.current_session_id = None; + println!( + "[reconnect] transport disconnected, retry after {}s", + delay + ); + tokio::time::sleep(Duration::from_secs(delay)).await; } } +} + +async fn send_resume_query( + writer: &mut ClientWriter, + state: &mut ClientState, + mode: &QueryMode, +) -> io::Result<()> { + match (state.session_id, state.serial) { + (Some(session_id), Some(serial)) => { + println!( + "reconnected, send Serial Query with session_id={}, serial={}", + session_id, + serial + ); + send_serial_query(writer, state.version, session_id, serial).await?; + } + _ => match mode { + QueryMode::Reset => { + send_reset_query(writer, state.version).await?; + println!("sent Reset Query"); + } + QueryMode::Serial { session_id, serial } => { + state.session_id = Some(*session_id); + state.serial = Some(*serial); + send_serial_query(writer, state.version, *session_id, *serial).await?; + println!("sent Serial Query"); + } + }, + } Ok(()) } +fn should_reconnect(err: &io::Error) -> bool { + matches!( + err.kind(), + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::BrokenPipe + | io::ErrorKind::NotConnected + ) +} + async fn handle_incoming_pdu( writer: &mut ClientWriter, state: &mut ClientState, @@ -590,6 +669,16 @@ impl ClientState { "default" } } + + fn reconnect_delay_secs(&self) -> u64 { + if self.should_prefer_retry_poll() { + self.retry + .map(|v| v as u64) + .unwrap_or(self.default_poll_secs) + } else { + self.default_poll_secs + } + } } #[derive(Debug)] @@ -615,7 +704,9 @@ impl Config { while let Some(arg) = args.next() { match arg.as_str() { "--tls" => { - transport = TransportConfig::Tls(TlsConfig::default()); + if matches!(transport, TransportConfig::Tcp) { + transport = TransportConfig::Tls(TlsConfig::default()); + } } "--ca-cert" => { let path = args.next().ok_or_else(|| { @@ -670,7 +761,7 @@ impl Config { let addr = positional .next() - .unwrap_or_else(|| "127.0.0.1:3323".to_string()); + .unwrap_or_else(|| "127.0.0.1:323".to_string()); let version = positional .next() diff --git a/src/bin/rtr_debug_client/pretty.rs b/src/bin/rtr_debug_client/pretty.rs index b44ae49..401caab 100644 --- a/src/bin/rtr_debug_client/pretty.rs +++ b/src/bin/rtr_debug_client/pretty.rs @@ -220,20 +220,18 @@ fn print_serial_query(header: &PduHeader, body: &[u8]) { println!("serial : {}", serial); } -fn print_router_key(header: &PduHeader, body: &[u8]) { - println!("session_id : {}", header.session_id()); - +fn print_router_key(_header: &PduHeader, body: &[u8]) { if body.len() < ROUTER_KEY_FIXED_BODY_LEN { println!("invalid Router Key body length: {}", body.len()); println!("raw body : {}", hex_bytes(body)); return; } - let flags = body[0]; - let zero = body[1]; - let ski = &body[2..22]; - let asn = u32::from_be_bytes([body[22], body[23], body[24], body[25]]); - let spki = &body[26..]; + let flags = _header.flags(); + let zero = _header.zero(); + let ski = &body[0..20]; + let asn = u32::from_be_bytes([body[20], body[21], body[22], body[23]]); + let spki = &body[24..]; println!("flags : 0x{:02x} ({})", flags, flag_meaning(flags)); println!("zero : {}", zero); @@ -241,6 +239,32 @@ fn print_router_key(header: &PduHeader, body: &[u8]) { println!("asn : {}", asn); println!("spki_len : {}", spki.len()); println!("spki : {}", hex_bytes(spki)); + + if flags & !0x01 != 0 { + println!("warning : Router Key flags use reserved bits"); + } + if zero != 0 { + println!("warning : Router Key reserved zero octet is non-zero"); + } + if asn == 0 { + println!("warning : Router Key ASN is AS0"); + } + if spki.is_empty() { + println!("warning : Router Key SPKI is empty"); + } +} + +pub fn print_raw_pdu(header: &PduHeader, body: &[u8]) { + let mut raw = Vec::with_capacity(8 + body.len()); + raw.push(header.version); + raw.push(header.pdu_type_raw); + raw.extend_from_slice(&header.field1.to_be_bytes()); + raw.extend_from_slice(&header.length.to_be_bytes()); + raw.extend_from_slice(body); + + println!("--------------------------------------------------"); + println!("[raw] pdu_type : {}", header.pdu_type()); + println!("[raw] octets : {}", hex_bytes(&raw)); } fn error_code_name(code: u16) -> &'static str { @@ -271,26 +295,22 @@ fn parse_encapsulated_header(encapsulated: &[u8]) -> Option { Some(PduHeader::from_bytes(header)) } -fn print_aspa(header: &PduHeader, body: &[u8]) { - println!("session_id : {}", header.session_id()); - +fn print_aspa(_header: &PduHeader, body: &[u8]) { if body.len() < ASPA_FIXED_BODY_LEN { println!("invalid ASPA body length: {}", body.len()); println!("raw body : {}", hex_bytes(body)); return; } - let flags = body[0]; - let zero1 = body[1]; - let zero2 = body[2]; - let zero3 = body[3]; - let customer_asn = u32::from_be_bytes([body[4], body[5], body[6], body[7]]); + let flags = _header.flags(); + let zero = _header.zero(); + let customer_asn = u32::from_be_bytes([body[0], body[1], body[2], body[3]]); println!("flags : 0x{:02x} ({})", flags, flag_meaning(flags)); - println!("reserved : [{}, {}, {}]", zero1, zero2, zero3); + println!("reserved : {}", zero); println!("customer_as : {}", customer_asn); - let providers_raw = &body[8..]; + let providers_raw = &body[4..]; if providers_raw.len() % 4 != 0 { println!("invalid ASPA providers length: {}", providers_raw.len()); println!("providers : {}", hex_bytes(providers_raw)); @@ -303,6 +323,34 @@ fn print_aspa(header: &PduHeader, body: &[u8]) { } println!("providers : {:?}", providers); + + if flags & !0x01 != 0 { + println!("warning : ASPA flags use reserved bits"); + } + if zero != 0 { + println!("warning : ASPA reserved zero octet is non-zero"); + } + if customer_asn == 0 { + println!("warning : ASPA customer ASN is AS0"); + } + + let is_announcement = flags & 0x01 == 0x01; + if is_announcement { + if providers.is_empty() { + println!("warning : ASPA announcement has empty provider list"); + } + } else if !providers.is_empty() { + println!("warning : ASPA withdrawal must not contain providers"); + } + + if providers.iter().any(|asn| *asn == 0) { + println!("warning : ASPA provider list contains AS0"); + } + + let strictly_increasing = providers.windows(2).all(|pair| pair[0] < pair[1]); + if !strictly_increasing && providers.len() > 1 { + println!("warning : ASPA providers are not in strictly increasing order"); + } } pub fn parse_serial_notify_serial(body: &[u8]) -> Option { diff --git a/src/bin/rtr_debug_client/protocol.rs b/src/bin/rtr_debug_client/protocol.rs index eecd881..29217b6 100644 --- a/src/bin/rtr_debug_client/protocol.rs +++ b/src/bin/rtr_debug_client/protocol.rs @@ -2,14 +2,14 @@ use std::fmt; pub const HEADER_LEN: usize = 8; pub const SERIAL_QUERY_LEN: usize = 12; -pub const MAX_PDU_LEN: u32 = 1024 * 1024; // 1 MiB +pub const MAX_PDU_LEN: u32 = 65_535; pub const IPV4_PREFIX_BODY_LEN: usize = 12; pub const IPV6_PREFIX_BODY_LEN: usize = 24; pub const END_OF_DATA_V0_BODY_LEN: usize = 4; pub const END_OF_DATA_V1_BODY_LEN: usize = 16; -pub const ROUTER_KEY_FIXED_BODY_LEN: usize = 26; -pub const ASPA_FIXED_BODY_LEN: usize = 8; +pub const ROUTER_KEY_FIXED_BODY_LEN: usize = 24; +pub const ASPA_FIXED_BODY_LEN: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum QueryMode { @@ -126,6 +126,14 @@ impl PduHeader { pub fn error_code(&self) -> u16 { self.field1 } + + pub fn flags(&self) -> u8 { + self.field1.to_be_bytes()[0] + } + + pub fn zero(&self) -> u8 { + self.field1.to_be_bytes()[1] + } } #[derive(Debug, Clone)] diff --git a/src/main.rs b/src/main.rs index 1b9cf0d..47b0478 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use tokio::task::JoinHandle; use tracing::{info, warn}; use rpki::rtr::cache::{RtrCache, SharedRtrCache}; -use rpki::rtr::loader::load_vrps_from_file; +use rpki::rtr::loader::{load_aspas_from_file, load_router_keys_from_file, load_vrps_from_file}; use rpki::rtr::payload::Timing; use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService}; use rpki::rtr::store::RtrStore; @@ -21,6 +21,8 @@ struct AppConfig { db_path: String, vrp_file: String, + aspa_file: String, + router_key_file: String, tls_cert_path: String, tls_key_path: String, tls_client_ca_path: String, @@ -35,17 +37,19 @@ impl Default for AppConfig { fn default() -> Self { Self { enable_tls: false, - tcp_addr: "0.0.0.0:3323".parse().expect("invalid default tcp_addr"), - tls_addr: "0.0.0.0:3324".parse().expect("invalid default tls_addr"), + tcp_addr: "0.0.0.0:323".parse().expect("invalid default tcp_addr"), + tls_addr: "0.0.0.0:324".parse().expect("invalid default tls_addr"), db_path: "./rtr-db".to_string(), vrp_file: r"C:\Users\xuxiu\git_code\rpki\data\vrps.txt".to_string(), + aspa_file: "./data/aspas.txt".to_string(), + router_key_file: "./data/router-keys.txt".to_string(), tls_cert_path: "./certs/server.crt".to_string(), tls_key_path: "./certs/server.key".to_string(), tls_client_ca_path: "./certs/client-ca.crt".to_string(), max_delta: 100, - refresh_interval: Duration::from_secs(10), + refresh_interval: Duration::from_secs(300), service_config: RtrServiceConfig { max_connections: 512, @@ -81,6 +85,12 @@ impl AppConfig { if let Some(value) = env_var("RPKI_RTR_VRP_FILE")? { config.vrp_file = value; } + if let Some(value) = env_var("RPKI_RTR_ASPA_FILE")? { + config.aspa_file = value; + } + if let Some(value) = env_var("RPKI_RTR_ROUTER_KEY_FILE")? { + config.router_key_file = value; + } if let Some(value) = env_var("RPKI_RTR_TLS_CERT_PATH")? { config.tls_cert_path = value; } @@ -176,7 +186,7 @@ fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result JoinHandle<()> { let refresh_interval = config.refresh_interval; let vrp_file = config.vrp_file.clone(); + let aspa_file = config.aspa_file.clone(); + let router_key_file = config.router_key_file.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(refresh_interval); @@ -227,7 +239,7 @@ fn spawn_refresh_task( loop { interval.tick().await; - match load_vrps_from_file(&vrp_file) { + match load_payloads_from_files(&vrp_file, &aspa_file, &router_key_file) { Ok(payloads) => { let payload_count = payloads.len(); let updated = { @@ -303,6 +315,8 @@ fn log_startup_config(config: &AppConfig) { } info!("vrp_file={}", config.vrp_file); + info!("aspa_file={}", config.aspa_file); + info!("router_key_file={}", config.router_key_file); info!("max_delta={}", config.max_delta); info!( "refresh_interval_secs={}", @@ -357,3 +371,14 @@ fn parse_bool(value: &str, name: &str) -> Result { _ => Err(anyhow!("invalid {} '{}': expected boolean", name, value)), } } + +fn load_payloads_from_files( + vrp_file: &str, + aspa_file: &str, + router_key_file: &str, +) -> Result> { + let mut payloads = load_vrps_from_file(vrp_file)?; + payloads.extend(load_aspas_from_file(aspa_file)?); + payloads.extend(load_router_keys_from_file(router_key_file)?); + Ok(payloads) +} diff --git a/src/rtr/loader.rs b/src/rtr/loader.rs index 35633b4..ce37bdb 100644 --- a/src/rtr/loader.rs +++ b/src/rtr/loader.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Context, Result}; use crate::data_model::resources::as_resources::Asn; use crate::data_model::resources::ip_resources::{IPAddress, IPAddressPrefix}; -use crate::rtr::payload::{Payload, RouteOrigin}; +use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey, Ski}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParsedVrp { @@ -17,6 +17,19 @@ pub struct ParsedVrp { pub asn: u32, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParsedAspa { + pub customer_asn: u32, + pub provider_asns: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParsedRouterKey { + pub ski: [u8; 20], + pub asn: u32, + pub spki: Vec, +} + /// 从文本文件中加载 VRP,并转换成 RTR Payload::RouteOrigin。 /// /// 文件格式: @@ -52,6 +65,56 @@ pub fn load_vrps_from_file(path: impl AsRef) -> Result> { Ok(payloads) } +pub fn load_aspas_from_file(path: impl AsRef) -> Result> { + let path = path.as_ref(); + + let content = fs::read_to_string(path) + .with_context(|| format!("failed to read ASPA file: {}", path.display()))?; + + let mut payloads = Vec::new(); + + for (idx, raw_line) in content.lines().enumerate() { + let line_no = idx + 1; + let line = raw_line.trim(); + + if line.is_empty() || line.starts_with('#') { + continue; + } + + let aspa = parse_aspa_line(line) + .with_context(|| format!("invalid ASPA line {}: {}", line_no, raw_line))?; + + payloads.push(Payload::Aspa(build_aspa(aspa)?)); + } + + Ok(payloads) +} + +pub fn load_router_keys_from_file(path: impl AsRef) -> Result> { + let path = path.as_ref(); + + let content = fs::read_to_string(path) + .with_context(|| format!("failed to read Router Key file: {}", path.display()))?; + + let mut payloads = Vec::new(); + + for (idx, raw_line) in content.lines().enumerate() { + let line_no = idx + 1; + let line = raw_line.trim(); + + if line.is_empty() || line.starts_with('#') { + continue; + } + + let router_key = parse_router_key_line(line) + .with_context(|| format!("invalid Router Key line {}: {}", line_no, raw_line))?; + + payloads.push(Payload::RouterKey(build_router_key(router_key)?)); + } + + Ok(payloads) +} + /// 解析单行 VRP。 /// /// 格式: @@ -93,6 +156,59 @@ pub fn parse_vrp_line(line: &str) -> Result { }) } +pub fn parse_aspa_line(line: &str) -> Result { + let parts: Vec<_> = line.split(',').map(|s| s.trim()).collect(); + if parts.len() != 2 { + return Err(anyhow!( + "expected format: , [provider_asn ...]" + )); + } + + let customer_asn = u32::from_str(parts[0]) + .with_context(|| format!("invalid customer_asn: {}", parts[0]))?; + + let provider_asns = parts[1] + .split_whitespace() + .map(|provider| { + u32::from_str(provider) + .with_context(|| format!("invalid provider_asn: {}", provider)) + }) + .collect::>>()?; + + validate_aspa(customer_asn, &provider_asns)?; + + Ok(ParsedAspa { + customer_asn, + provider_asns, + }) +} + +pub fn parse_router_key_line(line: &str) -> Result { + let parts: Vec<_> = line.split(',').map(|s| s.trim()).collect(); + if parts.len() != 3 { + return Err(anyhow!( + "expected format: ,," + )); + } + + let ski_vec = decode_hex(parts[0]) + .with_context(|| format!("invalid SKI hex: {}", parts[0]))?; + if ski_vec.len() != 20 { + return Err(anyhow!("SKI must be exactly 20 bytes")); + } + let mut ski = [0u8; 20]; + ski.copy_from_slice(&ski_vec); + + let asn = u32::from_str(parts[1]) + .with_context(|| format!("invalid asn: {}", parts[1]))?; + let spki = decode_hex(parts[2]) + .with_context(|| format!("invalid SPKI hex: {}", parts[2]))?; + + validate_router_key(asn, &spki)?; + + Ok(ParsedRouterKey { ski, asn, spki }) +} + fn validate_vrp(prefix_addr: IpAddr, prefix_len: u8, max_len: u8) -> Result<()> { match prefix_addr { IpAddr::V4(_) => { @@ -121,6 +237,33 @@ fn validate_vrp(prefix_addr: IpAddr, prefix_len: u8, max_len: u8) -> Result<()> Ok(()) } +fn validate_aspa(customer_asn: u32, provider_asns: &[u32]) -> Result<()> { + if customer_asn == 0 { + return Err(anyhow!("customer_asn must not be AS0")); + } + + if provider_asns.is_empty() { + return Err(anyhow!("provider list must not be empty")); + } + + if provider_asns.iter().any(|asn| *asn == 0) { + return Err(anyhow!("provider list must not contain AS0")); + } + + Ok(()) +} + +fn validate_router_key(asn: u32, spki: &[u8]) -> Result<()> { + crate::rtr::payload::RouterKey::new( + Ski::default(), + Asn::from(asn), + spki.to_vec(), + ) + .validate() + .map_err(|err| anyhow!(err.to_string()))?; + Ok(()) +} + pub fn build_route_origin(vrp: ParsedVrp) -> Result { let address = match vrp.prefix_addr { IpAddr::V4(addr) => IPAddress::from_ipv4(addr), @@ -133,53 +276,37 @@ pub fn build_route_origin(vrp: ParsedVrp) -> Result { Ok(RouteOrigin::new(prefix, vrp.max_len, asn)) } -#[cfg(test)] -mod tests { - use super::*; +pub fn build_aspa(aspa: ParsedAspa) -> Result { + let customer_asn = Asn::from(aspa.customer_asn); + let provider_asns = aspa + .provider_asns + .into_iter() + .map(Asn::from) + .collect::>(); + let aspa = Aspa::new(customer_asn, provider_asns); + aspa.validate_announcement()?; + Ok(aspa) +} - #[test] - fn parse_ipv4_vrp_line() { - let got = parse_vrp_line("10.0.0.0/24,24,65001").unwrap(); - assert_eq!( - got, - ParsedVrp { - prefix_addr: IpAddr::from_str("10.0.0.0").unwrap(), - prefix_len: 24, - max_len: 24, - asn: 65001, - } - ); +pub fn build_router_key(router_key: ParsedRouterKey) -> Result { + let asn = Asn::from(router_key.asn); + let ski = Ski::from_bytes(router_key.ski); + let router_key = RouterKey::new(ski, asn, router_key.spki); + Ok(router_key) +} + +fn decode_hex(input: &str) -> Result> { + let trimmed = input.trim(); + if trimmed.len() % 2 != 0 { + return Err(anyhow!("hex string must have even length")); } - #[test] - fn parse_ipv6_vrp_line() { - let got = parse_vrp_line("2001:db8::/32,48,65003").unwrap(); - assert_eq!( - got, - ParsedVrp { - prefix_addr: IpAddr::from_str("2001:db8::").unwrap(), - prefix_len: 32, - max_len: 48, - asn: 65003, - } - ); - } + (0..trimmed.len()) + .step_by(2) + .map(|idx| { + u8::from_str_radix(&trimmed[idx..idx + 2], 16) + .map_err(|err| anyhow!("invalid hex at byte {}: {}", idx / 2, err)) + }) + .collect() +} - #[test] - fn parse_rejects_invalid_max_len() { - let err = parse_vrp_line("10.0.0.0/24,16,65001").unwrap_err(); - assert!(err.to_string().contains("max_len")); - } - - #[test] - fn parse_rejects_invalid_ip() { - let err = parse_vrp_line("10.0.0.999/24,24,65001").unwrap_err(); - assert!(err.to_string().contains("invalid IP")); - } - - #[test] - fn parse_rejects_invalid_format() { - let err = parse_vrp_line("10.0.0.0/24,24").unwrap_err(); - assert!(err.to_string().contains("expected format")); - } -} \ No newline at end of file diff --git a/src/rtr/payload.rs b/src/rtr/payload.rs index c1f393e..826599e 100644 --- a/src/rtr/payload.rs +++ b/src/rtr/payload.rs @@ -4,6 +4,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use crate::data_model::resources::as_resources::Asn; use crate::data_model::resources::ip_resources::IPAddressPrefix; +use x509_parser::prelude::FromDer; +use x509_parser::x509::SubjectPublicKeyInfo; #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] @@ -23,6 +25,12 @@ impl AsRef<[u8]> for Ski { } } +impl Ski { + pub fn from_bytes(bytes: [u8; 20]) -> Self { + Self(bytes) + } +} + #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct RouteOrigin { prefix: IPAddressPrefix, @@ -80,6 +88,42 @@ impl RouterKey { pub fn spki(&self) -> &[u8] { &self.subject_public_key_info } + + pub fn validate(&self) -> Result<(), io::Error> { + if self.asn.into_u32() == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey ASN must not be AS0", + )); + } + + if self.subject_public_key_info.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey SPKI must not be empty", + )); + } + + let (rem, _) = SubjectPublicKeyInfo::from_der(&self.subject_public_key_info) + .map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("RouterKey SPKI is not valid DER: {err}"), + ) + })?; + + if !rem.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "RouterKey SPKI DER has trailing bytes: {}", + rem.len() + ), + )); + } + + Ok(()) + } } #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] diff --git a/src/rtr/pdu.rs b/src/rtr/pdu.rs index c7bdf20..998986c 100644 --- a/src/rtr/pdu.rs +++ b/src/rtr/pdu.rs @@ -333,6 +333,8 @@ impl HeaderWithFlags { pub fn flags(self) -> Flags{Flags(self.flags)} + pub fn zero(self) -> u8 { self.zero } + pub fn length(self) -> u32{u32::from_be(self.length)} } @@ -968,6 +970,58 @@ impl RouterKey { pub const PDU: u8 = 9; const BASE_LEN: usize = HEADER_LEN + 20 + 4; + pub async fn read( + sock: &mut Sock + ) -> Result { + let header = HeaderWithFlags::read(sock) + .await + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + if header.pdu() != Self::PDU { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "PDU type mismatch when expecting RouterKey", + )); + } + Self::read_payload(header, sock).await + } + + pub async fn read_payload( + header: HeaderWithFlags, + sock: &mut Sock, + ) -> Result { + let total_len = usize::try_from(header.length()).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey PDU too large for this system to handle", + ) + })?; + if total_len < Self::BASE_LEN { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid length for RouterKey PDU", + )); + } + + let body_len = total_len - HEADER_LEN; + let mut body = vec![0u8; body_len]; + sock.read_exact(&mut body).await?; + + let mut ski = [0u8; 20]; + ski.copy_from_slice(&body[..20]); + let asn = Asn::from(u32::from_be_bytes(body[20..24].try_into().unwrap())); + let subject_public_key_info = Arc::<[u8]>::from(body[24..].to_vec()); + + let res = Self { + header, + flags: header.flags(), + ski: Ski::from_bytes(ski), + asn, + subject_public_key_info, + }; + res.validate()?; + Ok(res) + } + pub async fn write( &self, w: &mut A, @@ -1024,6 +1078,46 @@ impl RouterKey { pub fn spki(&self) -> &[u8] { &self.subject_public_key_info } + + fn validate(&self) -> Result<(), io::Error> { + if self.header.pdu() != Self::PDU { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "unexpected PDU type for RouterKey", + )); + } + if usize::try_from(self.header.length()).unwrap_or(0) < Self::BASE_LEN { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey PDU shorter than fixed wire size", + )); + } + if self.header.zero() != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey reserved zero octet must be zero", + )); + } + if self.header.flags().into_u8() & !0x01 != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey flags use reserved bits", + )); + } + if self.asn.into_u32() == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey ASN must not be AS0", + )); + } + if self.subject_public_key_info.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "RouterKey SPKI must not be empty", + )); + } + Ok(()) + } } @@ -1041,6 +1135,63 @@ impl Aspa { pub const PDU: u8 = 11; const BASE_LEN: usize = HEADER_LEN + 4; + pub async fn read( + sock: &mut Sock + ) -> Result { + let header = HeaderWithFlags::read(sock) + .await + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + if header.pdu() != Self::PDU { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "PDU type mismatch when expecting ASPA", + )); + } + Self::read_payload(header, sock).await + } + + pub async fn read_payload( + header: HeaderWithFlags, + sock: &mut Sock, + ) -> Result { + let total_len = usize::try_from(header.length()).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + "ASPA PDU too large for this system to handle", + ) + })?; + if total_len < Self::BASE_LEN { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid length for ASPA PDU", + )); + } + if (total_len - Self::BASE_LEN) % 4 != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA provider list length must be a multiple of four octets", + )); + } + + let body_len = total_len - HEADER_LEN; + let mut body = vec![0u8; body_len]; + sock.read_exact(&mut body).await?; + + let customer_asn = u32::from_be_bytes(body[..4].try_into().unwrap()); + let mut provider_asns = Vec::with_capacity((body.len() - 4) / 4); + for chunk in body[4..].chunks_exact(4) { + provider_asns.push(u32::from_be_bytes(chunk.try_into().unwrap())); + } + + let res = Self { + header, + customer_asn, + provider_asns, + }; + res.validate()?; + Ok(res) + } + pub async fn write( &self, w: &mut A, @@ -1086,6 +1237,73 @@ impl Aspa { } } + fn validate(&self) -> Result<(), io::Error> { + if self.header.pdu() != Self::PDU { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "unexpected PDU type for ASPA", + )); + } + let total_len = usize::try_from(self.header.length()).unwrap_or(0); + if total_len < Self::BASE_LEN { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA PDU shorter than fixed wire size", + )); + } + if (total_len - Self::BASE_LEN) % 4 != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA provider list length must be a multiple of four octets", + )); + } + if self.header.zero() != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA reserved zero octet must be zero", + )); + } + if self.header.flags().into_u8() & !0x01 != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA flags use reserved bits", + )); + } + if self.customer_asn == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA customer ASN must not be AS0", + )); + } + + let is_announcement = self.header.flags().is_announce(); + if is_announcement && self.provider_asns.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA announcement must contain at least one provider ASN", + )); + } + if !is_announcement && !self.provider_asns.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA withdrawal must not contain provider ASNs", + )); + } + if self.provider_asns.iter().any(|asn| *asn == 0) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA provider list must not contain AS0", + )); + } + if self.provider_asns.windows(2).any(|pair| pair[0] >= pair[1]) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "ASPA provider ASNs must be strictly increasing", + )); + } + + Ok(()) + } } diff --git a/src/rtr/server/listener.rs b/src/rtr/server/listener.rs index 28fe940..db46fa5 100644 --- a/src/rtr/server/listener.rs +++ b/src/rtr/server/listener.rs @@ -98,9 +98,6 @@ impl RtrServer { } }; - if let Err(err) = stream.set_nodelay(true) { - warn!("failed to enable TCP_NODELAY for {}: {}", peer_addr, err); - } if let Err(err) = apply_keepalive(&stream, self.config.tcp_keepalive) { warn!("failed to configure TCP keepalive for {}: {}", peer_addr, err); } @@ -209,9 +206,6 @@ impl RtrServer { } }; - if let Err(err) = stream.set_nodelay(true) { - warn!("failed to enable TCP_NODELAY for {}: {}", peer_addr, err); - } if let Err(err) = apply_keepalive(&stream, self.config.tcp_keepalive) { warn!("failed to configure TCP keepalive for {}: {}", peer_addr, err); } diff --git a/src/rtr/session.rs b/src/rtr/session.rs index ff56c31..f5e09c3 100644 --- a/src/rtr/session.rs +++ b/src/rtr/session.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; use tokio::io; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::{broadcast, watch}; use tokio::time::timeout; use tracing::{debug, error, info, warn}; @@ -77,6 +77,12 @@ where } pub async fn run(mut self) -> Result<()> { + let result = self.run_inner().await; + self.close_transport().await; + result + } + + async fn run_inner(&mut self) -> Result<()> { info!( "RTR session started: {}", self.session_summary() @@ -251,6 +257,18 @@ where } } + async fn close_transport(&mut self) { + if let Err(err) = self.stream.shutdown().await { + debug!( + "RTR session transport shutdown returned error: err={}, {}", + err, + self.session_summary() + ); + } else { + debug!("RTR session transport shutdown completed: {}", self.session_summary()); + } + } + async fn negotiate_version(&mut self, router_version: u8) -> io::Result { if let Some(current) = self.version { if current == router_version { @@ -958,6 +976,7 @@ where async fn send_router_key(&mut self, key: &RouterKey, announce: bool) -> Result<()> { let version = self.version()?; + key.validate()?; let flags = Flags::new(if announce { ANNOUNCE_FLAG diff --git a/tests/test_loader.rs b/tests/test_loader.rs new file mode 100644 index 0000000..ebcd2f1 --- /dev/null +++ b/tests/test_loader.rs @@ -0,0 +1,123 @@ +use std::net::IpAddr; +use std::str::FromStr; + +use rpki::rtr::loader::{ + parse_aspa_line, parse_router_key_line, parse_vrp_line, ParsedAspa, ParsedVrp, +}; + +#[test] +fn parse_ipv4_vrp_line() { + let got = parse_vrp_line("10.0.0.0/24,24,65001").unwrap(); + assert_eq!( + got, + ParsedVrp { + prefix_addr: IpAddr::from_str("10.0.0.0").unwrap(), + prefix_len: 24, + max_len: 24, + asn: 65001, + } + ); +} + +#[test] +fn parse_ipv6_vrp_line() { + let got = parse_vrp_line("2001:db8::/32,48,65003").unwrap(); + assert_eq!( + got, + ParsedVrp { + prefix_addr: IpAddr::from_str("2001:db8::").unwrap(), + prefix_len: 32, + max_len: 48, + asn: 65003, + } + ); +} + +#[test] +fn parse_rejects_invalid_max_len() { + let err = parse_vrp_line("10.0.0.0/24,16,65001").unwrap_err(); + assert!(err.to_string().contains("max_len")); +} + +#[test] +fn parse_rejects_invalid_ip() { + let err = parse_vrp_line("10.0.0.999/24,24,65001").unwrap_err(); + assert!(err.to_string().contains("invalid IP")); +} + +#[test] +fn parse_rejects_invalid_format() { + let err = parse_vrp_line("10.0.0.0/24,24").unwrap_err(); + assert!(err.to_string().contains("expected format")); +} + +#[test] +fn parse_aspa_line_ok() { + let got = parse_aspa_line("64496,64497 64498").unwrap(); + assert_eq!( + got, + ParsedAspa { + customer_asn: 64496, + provider_asns: vec![64497, 64498], + } + ); +} + +#[test] +fn parse_aspa_rejects_empty_provider_list() { + let err = parse_aspa_line("64496,").unwrap_err(); + assert!(err.to_string().contains("provider list")); +} + +#[test] +fn parse_aspa_rejects_as0() { + let err = parse_aspa_line("0,64497").unwrap_err(); + assert!(err.to_string().contains("AS0")); + + let err = parse_aspa_line("64496,0").unwrap_err(); + assert!(err.to_string().contains("AS0")); +} + +#[test] +fn parse_router_key_line_ok() { + let got = parse_router_key_line( + "00112233445566778899aabbccddeeff00112233,64496,3013300d06092a864886f70d010101050003020000", + ) + .unwrap(); + assert_eq!(got.asn, 64496); + assert_eq!( + got.ski, + [ + 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, + 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, + ] + ); + assert_eq!( + got.spki, + vec![ + 0x30, 0x13, 0x30, 0x0d, 0x06, 0x09, 0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x01, + 0x01, 0x01, 0x05, 0x00, 0x03, 0x02, 0x00, 0x00, + ] + ); +} + +#[test] +fn parse_router_key_rejects_invalid_ski_length() { + let err = parse_router_key_line("0011,64496,deadbeef").unwrap_err(); + assert!(err.to_string().contains("SKI")); +} + +#[test] +fn parse_router_key_rejects_empty_spki() { + let err = + parse_router_key_line("00112233445566778899aabbccddeeff00112233,64496,").unwrap_err(); + assert!(err.to_string().contains("SPKI")); +} + +#[test] +fn parse_router_key_rejects_invalid_spki_der() { + let err = + parse_router_key_line("00112233445566778899aabbccddeeff00112233,64496,deadbeef") + .unwrap_err(); + assert!(err.to_string().contains("valid DER")); +} diff --git a/tests/test_pdu.rs b/tests/test_pdu.rs index 06c2f87..36de498 100644 --- a/tests/test_pdu.rs +++ b/tests/test_pdu.rs @@ -113,6 +113,22 @@ async fn router_key_length_matches_wire_size() { assert_eq!(header.length(), 8 + 20 + 4 + 32); } +#[tokio::test] +async fn router_key_read_rejects_reserved_zero_octet() { + let (mut client, mut server) = duplex(1024); + let mut bytes = vec![1, RouterKey::PDU, 1, 1]; + bytes.extend_from_slice(&(8u32 + 20 + 4 + 4).to_be_bytes()); + bytes.extend_from_slice(&[0u8; 20]); + bytes.extend_from_slice(&64496u32.to_be_bytes()); + bytes.extend_from_slice(&[1, 2, 3, 4]); + + client.write_all(&bytes).await.unwrap(); + + let err = RouterKey::read(&mut server).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + assert!(err.to_string().contains("zero octet")); +} + #[tokio::test] async fn aspa_length_matches_wire_size() { let pdu = Aspa::new(2, Flags::new(1), 64496, vec![64497, 64498]); @@ -127,6 +143,37 @@ async fn aspa_length_matches_wire_size() { assert_eq!(header.length(), 8 + 4 + 8); } +#[tokio::test] +async fn aspa_read_rejects_unsorted_provider_list() { + let (mut client, mut server) = duplex(1024); + let mut bytes = vec![2, Aspa::PDU, 1, 0]; + bytes.extend_from_slice(&(20u32).to_be_bytes()); + bytes.extend_from_slice(&64496u32.to_be_bytes()); + bytes.extend_from_slice(&64498u32.to_be_bytes()); + bytes.extend_from_slice(&64497u32.to_be_bytes()); + + client.write_all(&bytes).await.unwrap(); + + let err = Aspa::read(&mut server).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + assert!(err.to_string().contains("strictly increasing")); +} + +#[tokio::test] +async fn aspa_read_rejects_withdraw_with_providers() { + let (mut client, mut server) = duplex(1024); + let mut bytes = vec![2, Aspa::PDU, 0, 0]; + bytes.extend_from_slice(&(16u32).to_be_bytes()); + bytes.extend_from_slice(&64496u32.to_be_bytes()); + bytes.extend_from_slice(&64497u32.to_be_bytes()); + + client.write_all(&bytes).await.unwrap(); + + let err = Aspa::read(&mut server).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + assert!(err.to_string().contains("withdrawal")); +} + #[test] fn aspa_announcement_rejects_empty_provider_list() { let err = PayloadAspa::new(Asn::from(64496u32), vec![]) @@ -136,6 +183,19 @@ fn aspa_announcement_rejects_empty_provider_list() { assert!(err.to_string().contains("at least one provider")); } +#[test] +fn router_key_payload_rejects_invalid_spki() { + let err = rpki::rtr::payload::RouterKey::new( + Ski::default(), + Asn::from(64496u32), + vec![0xde, 0xad, 0xbe, 0xef], + ) + .validate() + .unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + assert!(err.to_string().contains("valid DER")); +} + #[test] fn aspa_announcement_rejects_as0() { let err = PayloadAspa::new(Asn::from(0u32), vec![Asn::from(64497u32)]) diff --git a/tests/test_session.rs b/tests/test_session.rs index fd2d719..08d1a3a 100644 --- a/tests/test_session.rs +++ b/tests/test_session.rs @@ -1780,3 +1780,41 @@ async fn invalid_aspa_prevents_snapshot_response() { let err = join.expect_err("session should fail on invalid ASPA"); assert!(err.to_string().contains("ASPA announcement")); } + +#[tokio::test] +async fn invalid_router_key_prevents_snapshot_response() { + let snapshot = Snapshot::from_payloads(vec![Payload::RouterKey(RouterKey::new( + Ski::default(), + Asn::from(64496u32), + vec![0xde, 0xad, 0xbe, 0xef], + ))]); + let cache = RtrCacheBuilder::new() + .session_ids(SessionIds::from_array([42, 42, 42])) + .serial(100) + .timing(Timing::new(600, 600, 7200)) + .snapshot(snapshot) + .build(); + + let server_cache = shared_cache(cache); + let (addr, shutdown_tx, server_handle) = start_session_server_returning_result(server_cache).await; + + let mut client = TcpStream::connect(addr).await.unwrap(); + ResetQuery::new(1).write(&mut client).await.unwrap(); + + let response = CacheResponse::read(&mut client).await.unwrap(); + assert_eq!(response.version(), 1); + assert_eq!(response.session_id(), 42); + + let read_res = timeout(Duration::from_secs(1), Header::read(&mut client)) + .await + .expect("timed out waiting for server close"); + assert!(read_res.is_err(), "server should close instead of sending invalid RouterKey"); + + let _ = shutdown_tx.send(true); + let join = timeout(Duration::from_secs(1), server_handle) + .await + .expect("server task did not exit within timeout") + .unwrap(); + let err = join.expect_err("session should fail on invalid RouterKey"); + assert!(err.to_string().contains("RouterKey SPKI")); +}