diff --git a/Cargo.toml b/Cargo.toml index 65d879d..b487b65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,5 +31,6 @@ rustls = "0.23" rustls-pemfile = "2" rustls-pki-types = "1.14.0" socket2 = "0.5" +russh = { version = "0.60.0", default-features = false, features = ["ring", "rsa"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } -rpki_rs = { package = "rpki", version = "0.18", features = ["rtr", "crypto"] } +rpki_rs = { package = "rpki", version = "0.19.2", features = ["rtr", "crypto"] } diff --git a/README.md b/README.md index 566ab2a..92de252 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,14 @@ RTR Server 运行时从 `CCR` 目录中扫描最新的 `.ccr` 文件作为输入 | `RPKI_RTR_TLS_CERT_PATH` | TLS 服务端证书路径。 | `./certs/server.crt` | `./certs/server-dns.crt` | | `RPKI_RTR_TLS_KEY_PATH` | TLS 服务端私钥路径。 | `./certs/server.key` | `./certs/server-dns.key` | | `RPKI_RTR_TLS_CLIENT_CA_PATH` | 用于校验 router 客户端证书的 CA 证书路径。 | `./certs/client-ca.crt` | `./certs/client-ca.crt` | +| `RPKI_RTR_ENABLE_SSH` | 是否额外启用进程内原生 SSH 监听。支持 `true/false`、`1/0`、`yes/no`、`on/off`。 | `false` | `true` | +| `RPKI_RTR_SSH_ADDR` | SSH 监听地址。 | `0.0.0.0:22` | `0.0.0.0:22` | +| `RPKI_RTR_SSH_PORT` | SSH 监听端口(仅覆盖 `RPKI_RTR_SSH_ADDR` 中的端口)。 | `22` | `2022` | +| `RPKI_RTR_SSH_HOST_KEY_PATH` | OpenSSH host 私钥路径。 | `./certs/ssh_host_ed25519_key` | `./certs/ssh_host_ed25519_key` | +| `RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH` | 允许接入的 router 公钥列表(authorized_keys)。 | `./certs/rtr-authorized_keys` | `./certs/rtr-authorized_keys` | +| `RPKI_RTR_SSH_USERNAME` | SSH 用户名白名单。 | `rpki-rtr` | `rpki-rtr` | +| `RPKI_RTR_SSH_SUBSYSTEM_NAME` | SSH 子系统名称。 | `rpki-rtr` | `rpki-rtr` | +| `RPKI_RTR_SSH_PASSWORD` | 可选的 SSH password 认证口令。未设置时仅允许 publickey;设置后同时允许 publickey 与 password。 | `未设置` | `test-password` | | `RPKI_RTR_MAX_DELTA` | 最多保留多少条 delta。 | `100` | `100` | | `RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE` | 是否启用“累计 delta 估算 wire size 不小于 snapshot 时,继续裁剪最老 delta”的策略。 | `false` | `false` | | `RPKI_RTR_STRICT_CCR_VALIDATION` | 是否对 CCR 中的非法 VRP / VAP 采用严格模式;`true` 表示整份 CCR 拒绝,`false` 表示跳过非法项并告警。 | `false` | `false` | @@ -113,36 +121,24 @@ docker compose -f deploy/server/docker-compose.yml down ### 本地运行(推荐先用脚本) -```sh -sh ./scripts/start-rtr-server-tcp.sh -sh ./scripts/start-rtr-server-tls.sh +```bash +docker compose -f deploy/server/docker-compose.yml up -d --build +docker compose -f deploy/server/docker-compose.yml logs -f rpki-rtr +docker compose -f deploy/server/docker-compose.yml down ``` -脚本入口: - -- [`scripts/start-rtr-server-tcp.sh`](scripts/start-rtr-server-tcp.sh) -- [`scripts/start-rtr-server-tls.sh`](scripts/start-rtr-server-tls.sh) -- [`scripts/start-rtr-server.sh`](scripts/start-rtr-server.sh) - ### 本地手动运行(最小示例) 纯 TCP: -```sh -export RPKI_RTR_ENABLE_TLS=false -export RPKI_RTR_CCR_DIR=./data -cargo run --bin rpki +```bash +docker compose -f deploy/server/docker-compose.yml up -d --build ``` TLS / mutual TLS: -```sh -export RPKI_RTR_ENABLE_TLS=true -export RPKI_RTR_CCR_DIR=./data -export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt -export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key -export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt -cargo run --bin rpki +```bash +docker compose -f deploy/server/docker-compose.yml -f deploy/server/docker-compose.tls.yml up -d --build ``` ## CCR 输入说明 @@ -174,16 +170,6 @@ cargo run --bin rpki 2. 再用 `rpki-rs-test-client` 做可重复的自动化步骤校验。 3. 最后用 `FRR` 做黑盒互通验证,确认真实客户端接入行为。 -### rtr_debug_client(本地) - -```sh -cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset -``` - -说明: -- 适合手工排查:你可以快速切换 TCP/TLS、版本号、请求类型来观察响应差异。 -- 适合问题定位:当服务端日志出现异常时,可用最小参数复现问题流量。 - ### rtr_debug_client(Docker) ```bash @@ -372,3 +358,100 @@ docker exec -it frr-rpki-client vtysh -c "show rpki prefix-table" - `deploy/server/DEPLOYMENT.md` - `deploy/frr/README.md` - `deploy/frr/README.zh.md` + +## 传输模式说明(TCP / TLS / 原生 SSH) + +当前 `rpki` 进程内支持三种传输: + +- TCP(默认开启) +- TLS(可选开启,mTLS) +- 原生 SSH(可选开启,进程内实现,不再使用外部 `sshd Subsystem` 桥接) + +### TCP 模式 + +默认监听: +- `RPKI_RTR_TCP_ADDR=0.0.0.0:323` + +最小启动(仅 TCP): + +```sh +export RPKI_RTR_ENABLE_TLS=false +export RPKI_RTR_ENABLE_SSH=false +cargo run --bin rpki +``` + +说明: +- 仅建议部署在受信任、可控网络中。 + +### TLS 模式(mTLS) + +相关环境变量: +- `RPKI_RTR_ENABLE_TLS=true` +- `RPKI_RTR_TLS_ADDR`(默认 `0.0.0.0:324`) +- `RPKI_RTR_TLS_CERT_PATH` +- `RPKI_RTR_TLS_KEY_PATH` +- `RPKI_RTR_TLS_CLIENT_CA_PATH` + +最小启动(TCP + TLS): + +```sh +export RPKI_RTR_ENABLE_TLS=true +export RPKI_RTR_ENABLE_SSH=false +export RPKI_RTR_TLS_CERT_PATH=./certs/server-dns.crt +export RPKI_RTR_TLS_KEY_PATH=./certs/server-dns.key +export RPKI_RTR_TLS_CLIENT_CA_PATH=./certs/client-ca.crt +cargo run --bin rpki +``` + +### 原生 SSH 模式(进程内) + +与 `draft-ietf-sidrops-8210bis-25` 对齐要点: +- 使用 SSHv2 +- 使用 subsystem(默认 `rpki-rtr`) +- 使用 public key 认证 +- 服务端拒绝 `none` +- 服务端默认不启用 `password`,配置 `RPKI_RTR_SSH_PASSWORD` 后可选启用(draft 中为 MAY) + +相关环境变量: +- `RPKI_RTR_ENABLE_SSH=true` +- `RPKI_RTR_SSH_ADDR`(默认 `0.0.0.0:22`) +- `RPKI_RTR_SSH_PORT`(默认 `22`,设置后会覆盖 `RPKI_RTR_SSH_ADDR` 的端口) +- `RPKI_RTR_SSH_HOST_KEY_PATH`(OpenSSH host 私钥) +- `RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH`(允许接入的 router 公钥列表) +- `RPKI_RTR_SSH_USERNAME`(默认 `rpki-rtr`) +- `RPKI_RTR_SSH_SUBSYSTEM_NAME`(默认 `rpki-rtr`) +- `RPKI_RTR_SSH_PASSWORD`(可选;设置后启用 password 认证) + +密钥准备示例: + +```sh +ssh-keygen -t ed25519 -N '' -f ./certs/ssh_host_ed25519_key +ssh-keygen -t ed25519 -N '' -f ./certs/rtr_client_ed25519_key +cp ./certs/rtr_client_ed25519_key.pub ./certs/rtr-authorized_keys +``` + +最小启动(TCP + SSH): + +```sh +export RPKI_RTR_ENABLE_TLS=false +export RPKI_RTR_ENABLE_SSH=true +export RPKI_RTR_SSH_ADDR=0.0.0.0:22 +# or only override the port: +# export RPKI_RTR_SSH_PORT=2022 +export RPKI_RTR_SSH_HOST_KEY_PATH=./certs/ssh_host_ed25519_key +export RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH=./certs/rtr-authorized_keys +export RPKI_RTR_SSH_USERNAME=rpki-rtr +export RPKI_RTR_SSH_SUBSYSTEM_NAME=rpki-rtr +# 可选:启用 password 认证(同时仍支持 publickey) +# export RPKI_RTR_SSH_PASSWORD=test-password +cargo run --bin rpki +``` + +连通性检查(OpenSSH): + +```sh +ssh -i ./certs/rtr_client_ed25519_key -p 22 -s rpki-rtr@127.0.0.1 rpki-rtr +``` + +说明: +- 该命令主要用于验证 SSH 子系统通道可建立,不等价于完整 RTR 协议回归测试。 diff --git a/deploy/README.md b/deploy/README.md index 8673706..093207c 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -166,3 +166,43 @@ docker compose -f deploy/frr/docker-compose.yml down ```bash docker compose -f deploy/frr/docker-compose.yml logs -f frr-rpki-client ``` + +--- + +## 5) BIRD Client + +路径: +- `deploy/bird/Dockerfile` +- `deploy/bird/docker-compose.yml` +- `deploy/bird/docker-compose.tls.yml` +- `deploy/bird/bird.conf.example` +- `deploy/bird/bird.conf.tls.example` +- `deploy/bird/README.md` +- `deploy/bird/README.zh.md` + +启动: +```bash +docker compose -f deploy/bird/docker-compose.yml up --build +``` + +观察活动: + +```bash +docker logs -f bird-rpki-client +``` + +停止: +```bash +docker compose -f deploy/bird/docker-compose.yml down +``` + +日志: +```bash +docker compose -f deploy/bird/docker-compose.yml logs -f bird-rpki-client +``` + +TLS/mTLS: +```bash +docker compose -f deploy/bird/docker-compose.yml -f deploy/bird/docker-compose.tls.yml up --build +docker logs -f bird-rpki-client +``` diff --git a/deploy/bird/Dockerfile b/deploy/bird/Dockerfile new file mode 100644 index 0000000..0ff4b4d --- /dev/null +++ b/deploy/bird/Dockerfile @@ -0,0 +1,10 @@ +FROM debian:bookworm-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends bird2 ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/deploy/bird/README.md b/deploy/bird/README.md new file mode 100644 index 0000000..b80c356 --- /dev/null +++ b/deploy/bird/README.md @@ -0,0 +1,75 @@ +# BIRD Minimal RTR Client Config + +This folder provides a minimal BIRD setup for black-box interop testing +against this repository's RTR server defaults. + +Server defaults in this repo: +- TCP: `0.0.0.0:323` +- TLS: `0.0.0.0:324` + +## Files + +- `Dockerfile`: builds a minimal BIRD2 runtime image. +- `bird.conf.example`: sample `/etc/bird/bird.conf`. +- `bird.conf.tls.example`: sample TLS/mTLS `/etc/bird/bird.conf`. +- `entrypoint.sh`: starts BIRD in foreground mode. +- `docker-compose.yml`: one-click local TCP test client. +- `docker-compose.tls.yml`: compose override for TLS/mTLS. + +By default, the container prints periodic RPKI protocol snapshots to logs +every 5 seconds. + +## Docker quick start + +From repository root: + +```bash +docker compose -f deploy/bird/docker-compose.yml up --build +``` + +Use another terminal to inspect: + +```bash +docker logs -f bird-rpki-client +``` + +If protocol state is `up`, the RTR client path is working. + +Detached mode: + +```bash +docker compose -f deploy/bird/docker-compose.yml up -d --build +docker logs -f bird-rpki-client +``` + +Stop: + +```bash +docker compose -f deploy/bird/docker-compose.yml down +``` + +## TLS/mTLS quick start + +```bash +docker compose \ + -f deploy/bird/docker-compose.yml \ + -f deploy/bird/docker-compose.tls.yml \ + up --build +``` + +In detached mode, observe with: + +```bash +docker logs -f bird-rpki-client +``` + +## Notes + +- This setup targets RTR over TCP (`remote "127.0.0.1" port 323`). +- `network_mode: host` expects your RTR server to be reachable at + `127.0.0.1:323` from the Docker host. +- TLS override mounts `../../certs` into `/etc/bird/certs`. +- Observation is controlled by env vars: + `OBSERVE_INTERVAL` (seconds, default `5`) and `OBSERVE_PROTO`. +- If your environment does not support Docker host networking, switch to a + bridge network and replace `remote` addresses accordingly. diff --git a/deploy/bird/README.zh.md b/deploy/bird/README.zh.md new file mode 100644 index 0000000..7c9ba0a --- /dev/null +++ b/deploy/bird/README.zh.md @@ -0,0 +1,63 @@ +# BIRD 最小化 RTR 客户端配置 + +本目录提供一个最小化 BIRD 配置,用于和本仓库 RTR Server 做黑盒互通测试。 + +本仓库默认 RTR 监听地址: +- TCP: `0.0.0.0:323` +- TLS: `0.0.0.0:324` + +## 文件说明 + +- `Dockerfile`: 构建最小 BIRD2 运行镜像。 +- `bird.conf.example`: `/etc/bird/bird.conf` 的 TCP 示例。 +- `bird.conf.tls.example`: `/etc/bird/bird.conf` 的 TLS/mTLS 示例。 +- `entrypoint.sh`: 前台启动 BIRD。 +- `docker-compose.yml`: TCP 一键启动。 +- `docker-compose.tls.yml`: TLS/mTLS 覆盖文件。 + +容器默认每 5 秒向日志输出一次 RPKI 协议状态快照。 + +## Docker 快速启动(TCP) + +在仓库根目录执行: + +```bash +docker compose -f deploy/bird/docker-compose.yml up --build +``` + +另开一个终端查看日志: + +```bash +docker logs -f bird-rpki-client +``` + +如果协议状态显示 `up`,说明 RTR 客户端链路正常。 + +后台模式: + +```bash +docker compose -f deploy/bird/docker-compose.yml up -d --build +docker logs -f bird-rpki-client +``` + +停止: + +```bash +docker compose -f deploy/bird/docker-compose.yml down +``` + +## TLS/mTLS 快速启动 + +```bash +docker compose \ + -f deploy/bird/docker-compose.yml \ + -f deploy/bird/docker-compose.tls.yml \ + up --build +``` + +## 说明 + +- 当前 compose 使用 `network_mode: host`,要求容器可通过 `127.0.0.1` 访问宿主机 RTR Server。 +- TLS 覆盖文件会把 `../../certs` 挂载到容器内 `/etc/bird/certs`。 +- 观测频率由环境变量控制:`OBSERVE_INTERVAL`(秒,默认 `5`)和 `OBSERVE_PROTO`。 +- 若你运行在 Docker Desktop(非 Linux 原生 host network 场景),建议改为自定义 bridge 网络并把 `remote` 地址改成可达的 server 容器名或宿主地址。 diff --git a/deploy/bird/bird.conf.example b/deploy/bird/bird.conf.example new file mode 100644 index 0000000..5e75a5e --- /dev/null +++ b/deploy/bird/bird.conf.example @@ -0,0 +1,15 @@ +log stderr all; +router id 192.0.2.2; + +roa4 table rtr_roa_v4; +roa6 table rtr_roa_v6; + +protocol device { +} + +protocol rpki rpki_tcp { + roa4 { table rtr_roa_v4; }; + roa6 { table rtr_roa_v6; }; + + remote "127.0.0.1" port 323; +} diff --git a/deploy/bird/bird.conf.tls.example b/deploy/bird/bird.conf.tls.example new file mode 100644 index 0000000..c1ee97e --- /dev/null +++ b/deploy/bird/bird.conf.tls.example @@ -0,0 +1,21 @@ +log stderr all; +router id 192.0.2.2; + +roa4 table rtr_roa_v4; +roa6 table rtr_roa_v6; + +protocol device { +} + +protocol rpki rpki_tls { + roa4 { table rtr_roa_v4; }; + roa6 { table rtr_roa_v6; }; + + remote "127.0.0.1" port 324; + + transport tls { + ca file "/etc/bird/certs/client-ca.crt"; + cert file "/etc/bird/certs/client-good.crt"; + key file "/etc/bird/certs/client-good.key"; + }; +} diff --git a/deploy/bird/docker-compose.tls.yml b/deploy/bird/docker-compose.tls.yml new file mode 100644 index 0000000..4a14666 --- /dev/null +++ b/deploy/bird/docker-compose.tls.yml @@ -0,0 +1,7 @@ +services: + bird-rpki-client: + environment: + OBSERVE_PROTO: rpki_tls + volumes: + - ./bird.conf.tls.example:/etc/bird/bird.conf:ro + - ../../certs:/etc/bird/certs:ro diff --git a/deploy/bird/docker-compose.yml b/deploy/bird/docker-compose.yml new file mode 100644 index 0000000..897a4f5 --- /dev/null +++ b/deploy/bird/docker-compose.yml @@ -0,0 +1,13 @@ +services: + bird-rpki-client: + build: + context: . + dockerfile: Dockerfile + container_name: bird-rpki-client + restart: unless-stopped + network_mode: host + environment: + OBSERVE_INTERVAL: "5" + OBSERVE_PROTO: rpki_tcp + volumes: + - ./bird.conf.example:/etc/bird/bird.conf:ro diff --git a/deploy/bird/entrypoint.sh b/deploy/bird/entrypoint.sh new file mode 100644 index 0000000..08d5bbe --- /dev/null +++ b/deploy/bird/entrypoint.sh @@ -0,0 +1,30 @@ +#!/bin/sh +set -eu + +mkdir -p /run/bird + +SOCK_PATH="/run/bird/bird.ctl" +PROTO="${OBSERVE_PROTO:-rpki_tcp}" +INTERVAL="${OBSERVE_INTERVAL:-5}" + +bird -f -c /etc/bird/bird.conf -s "$SOCK_PATH" & +BIRD_PID="$!" + +sleep 1 + +case "$INTERVAL" in + ''|*[!0-9]*) + INTERVAL=0 + ;; +esac + +if [ "$INTERVAL" -gt 0 ]; then + while kill -0 "$BIRD_PID" 2>/dev/null; do + echo "==== $(date -u +"%Y-%m-%dT%H:%M:%SZ") RPKI snapshot ($PROTO) ====" + birdc -s "$SOCK_PATH" show protocols all "$PROTO" || true + birdc -s "$SOCK_PATH" show roa count || true + sleep "$INTERVAL" + done +fi + +wait "$BIRD_PID" diff --git a/deploy/client/Dockerfile b/deploy/client/Dockerfile index 8aff6d3..dac92f2 100644 --- a/deploy/client/Dockerfile +++ b/deploy/client/Dockerfile @@ -20,5 +20,8 @@ RUN apt-get update \ WORKDIR /app COPY --from=builder /build/target/release/rtr_debug_client /usr/local/bin/rtr_debug_client +COPY deploy/client/entrypoint.sh /usr/local/bin/rtr-debug-client-entrypoint.sh -ENTRYPOINT ["/usr/local/bin/rtr_debug_client"] +RUN chmod +x /usr/local/bin/rtr-debug-client-entrypoint.sh + +ENTRYPOINT ["/usr/local/bin/rtr-debug-client-entrypoint.sh"] diff --git a/deploy/client/docker-compose.clients.yml b/deploy/client/docker-compose.clients.yml index 4c1ad96..afb6d62 100644 --- a/deploy/client/docker-compose.clients.yml +++ b/deploy/client/docker-compose.clients.yml @@ -5,28 +5,38 @@ services: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped rtr-client-2: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped rtr-client-3: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped rtr-client-4: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped rtr-client-5: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped diff --git a/deploy/client/docker-compose.ssh.yml b/deploy/client/docker-compose.ssh.yml new file mode 100644 index 0000000..3b6c569 --- /dev/null +++ b/deploy/client/docker-compose.ssh.yml @@ -0,0 +1,30 @@ +version: "3.9" + +services: + rtr-debug-client: + build: + context: ../.. + dockerfile: deploy/client/Dockerfile + image: rpki-rtr-debug-client:latest + network_mode: host + command: + [ + "127.0.0.1:${RPKI_RTR_SSH_PORT:-22}", + "2", + "reset", + "--ssh", + "--ssh-user", + "rpki-rtr", + "--ssh-key", + "/app/certs/rtr-client.key", + "--ssh-server-key", + "/app/certs/ssh_host_rsa_key.pub", + "--keep-after-error", + "--summary-only" + ] + volumes: + - ../../certs:/app/certs:ro + - ../../logs/client:/app/logs + restart: unless-stopped + stdin_open: true + tty: true diff --git a/deploy/client/docker-compose.tcp.yml b/deploy/client/docker-compose.tcp.yml new file mode 100644 index 0000000..16fa4ca --- /dev/null +++ b/deploy/client/docker-compose.tcp.yml @@ -0,0 +1,13 @@ +services: + rtr-debug-client: + build: + context: ../.. + dockerfile: deploy/client/Dockerfile + image: rpki-rtr-debug-client:latest + network_mode: host + command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs + restart: no + stdin_open: true + tty: true diff --git a/deploy/client/docker-compose.tls.yml b/deploy/client/docker-compose.tls.yml new file mode 100644 index 0000000..de8204f --- /dev/null +++ b/deploy/client/docker-compose.tls.yml @@ -0,0 +1,32 @@ +version: "3.9" + +services: + rtr-debug-client: + build: + context: ../.. + dockerfile: deploy/client/Dockerfile + image: rpki-rtr-debug-client:latest + network_mode: host + command: + [ + "127.0.0.1:324", + "2", + "reset", + "--tls", + "--ca-cert", + "/app/certs/client-ca.crt", + "--server-name", + "localhost", + "--client-cert", + "/app/certs/client-good.crt", + "--client-key", + "/app/certs/client-good.key", + "--keep-after-error", + "--summary-only" + ] + volumes: + - ../../tests/fixtures/tls:/app/certs:ro + - ../../logs/client:/app/logs + restart: unless-stopped + stdin_open: true + tty: true diff --git a/deploy/client/docker-compose.yml b/deploy/client/docker-compose.yml index ef110b3..470d103 100644 --- a/deploy/client/docker-compose.yml +++ b/deploy/client/docker-compose.yml @@ -6,6 +6,8 @@ services: image: rpki-rtr-debug-client:latest network_mode: host command: ["127.0.0.1:323", "2", "reset", "--keep-after-error", "--summary-only"] + volumes: + - ../../logs/client:/app/logs restart: unless-stopped stdin_open: true tty: true diff --git a/deploy/client/entrypoint.sh b/deploy/client/entrypoint.sh new file mode 100644 index 0000000..de5f2e8 --- /dev/null +++ b/deploy/client/entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/sh +set -eu + +mkdir -p /app/logs + +log_name="${HOSTNAME:-rtr-debug-client}" +stdout_log="/app/logs/${log_name}.stdout.log" +stderr_log="/app/logs/${log_name}.stderr.log" + +exec /usr/local/bin/rtr_debug_client "$@" >>"$stdout_log" 2>>"$stderr_log" diff --git a/deploy/server/Dockerfile b/deploy/server/Dockerfile index c7c5f14..88b518c 100644 --- a/deploy/server/Dockerfile +++ b/deploy/server/Dockerfile @@ -2,8 +2,25 @@ FROM rust:1.89-bookworm AS builder WORKDIR /build +RUN set -eux; \ + cat > /etc/apt/sources.list.d/debian.sources <<'EOF' +Types: deb +URIs: http://mirrors.tuna.tsinghua.edu.cn/debian +Suites: bookworm bookworm-updates +Components: main +Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg + +Types: deb +URIs: http://mirrors.tuna.tsinghua.edu.cn/debian-security +Suites: bookworm-security +Components: main +Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg +EOF + RUN apt-get update \ - && apt-get install -y --no-install-recommends \ + && apt-get install -y --fix-missing --no-install-recommends \ + -o Acquire::Retries=10 \ + -o Acquire::http::Timeout=60 \ build-essential \ cmake \ pkg-config \ @@ -19,8 +36,27 @@ RUN cargo build --release --bin rpki FROM debian:bookworm-slim AS runtime +RUN set -eux; \ + cat > /etc/apt/sources.list.d/debian.sources <<'EOF' +Types: deb +URIs: http://mirrors.tuna.tsinghua.edu.cn/debian +Suites: bookworm bookworm-updates +Components: main +Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg + +Types: deb +URIs: http://mirrors.tuna.tsinghua.edu.cn/debian-security +Suites: bookworm-security +Components: main +Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg +EOF + RUN apt-get update \ - && apt-get install -y --no-install-recommends ca-certificates supervisor \ + && apt-get install -y --fix-missing --no-install-recommends \ + -o Acquire::Retries=10 \ + -o Acquire::http::Timeout=60 \ + ca-certificates \ + supervisor \ && rm -rf /var/lib/apt/lists/* WORKDIR /app @@ -28,7 +64,7 @@ WORKDIR /app COPY --from=builder /build/target/release/rpki /usr/local/bin/rpki COPY deploy/server/supervisord.conf /etc/supervisor/conf.d/rpki-rtr.conf -RUN mkdir -p /app/data /app/rtr-db /app/certs /app/slurm /var/log/supervisor +RUN mkdir -p /app/data /app/rtr-db /app/certs /app/slurm /app/logs /var/log/supervisor ENV RPKI_RTR_ENABLE_TLS=false \ RPKI_RTR_TCP_ADDR=0.0.0.0:323 \ @@ -41,4 +77,4 @@ ENV RPKI_RTR_ENABLE_TLS=false \ EXPOSE 323 324 -CMD ["supervisord", "-n", "-c", "/etc/supervisor/conf.d/rpki-rtr.conf"] +CMD ["supervisord", "-n", "-c", "/etc/supervisor/conf.d/rpki-rtr.conf"] \ No newline at end of file diff --git a/deploy/server/docker-compose.ssh.yml b/deploy/server/docker-compose.ssh.yml new file mode 100644 index 0000000..5f7608e --- /dev/null +++ b/deploy/server/docker-compose.ssh.yml @@ -0,0 +1,35 @@ +version: "3.9" + +services: + rpki-rtr: + build: + context: ../.. + dockerfile: deploy/server/Dockerfile + image: rpki-rtr:latest + container_name: rpki-rtr-ssh + restart: unless-stopped + ports: + - "323:323" + - "${RPKI_RTR_SSH_PORT:-22}:${RPKI_RTR_SSH_PORT:-22}" + environment: + RPKI_RTR_ENABLE_TLS: "false" + RPKI_RTR_ENABLE_SSH: "true" + RPKI_RTR_TCP_ADDR: "0.0.0.0:323" + RPKI_RTR_SSH_ADDR: "0.0.0.0:${RPKI_RTR_SSH_PORT:-22}" + RPKI_RTR_SSH_HOST_KEY_PATH: "/app/certs/ssh_host_rsa_key" + RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH: "/app/certs/rtr-authorized_keys" + RPKI_RTR_SSH_USERNAME: "rpki-rtr" + RPKI_RTR_SSH_SUBSYSTEM_NAME: "rpki-rtr" + # Optional: enable password authentication in addition to publickey + # RPKI_RTR_SSH_PASSWORD: "test-password" + RPKI_RTR_DB_PATH: "/app/rtr-db" + RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_SLURM_DIR: "/app/slurm" + RPKI_RTR_STRICT_CCR_VALIDATION: "false" + RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + volumes: + - ../../data:/app/data:ro + - ../../rtr-db:/app/rtr-db + - ../../data:/app/slurm:ro + - ../../certs:/app/certs:ro + - ../../logs/server:/app/logs diff --git a/deploy/server/docker-compose.tcp.yml b/deploy/server/docker-compose.tcp.yml new file mode 100644 index 0000000..8d9e71b --- /dev/null +++ b/deploy/server/docker-compose.tcp.yml @@ -0,0 +1,27 @@ +version: "3.9" + +services: + rpki-rtr: + build: + context: ../.. + dockerfile: deploy/server/Dockerfile + image: rpki-rtr:latest + container_name: rpki-rtr-tcp + restart: unless-stopped + ports: + - "323:323" + environment: + RPKI_RTR_ENABLE_TLS: "false" + RPKI_RTR_ENABLE_SSH: "false" + RPKI_RTR_TCP_ADDR: "0.0.0.0:323" + RPKI_RTR_DB_PATH: "/app/rtr-db" + RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_SLURM_DIR: "/app/slurm" + RPKI_RTR_STRICT_CCR_VALIDATION: "false" + RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + RPKI_RTR_MAX_CONNECTIONS: "100000" + volumes: + - ../../data:/app/data:ro + - ../../rtr-db:/app/rtr-db + - ../../data:/app/slurm:ro + - ../../logs/server:/app/logs diff --git a/deploy/server/docker-compose.tls.yml b/deploy/server/docker-compose.tls.yml new file mode 100644 index 0000000..831335a --- /dev/null +++ b/deploy/server/docker-compose.tls.yml @@ -0,0 +1,32 @@ +version: "3.9" + +services: + rpki-rtr: + build: + context: ../.. + dockerfile: deploy/server/Dockerfile + image: rpki-rtr:latest + container_name: rpki-rtr-tls + restart: unless-stopped + ports: + - "323:323" + - "324:324" + environment: + RPKI_RTR_ENABLE_TLS: "true" + RPKI_RTR_ENABLE_SSH: "false" + RPKI_RTR_TCP_ADDR: "0.0.0.0:323" + RPKI_RTR_TLS_ADDR: "0.0.0.0:324" + RPKI_RTR_TLS_CERT_PATH: "/app/certs/server-dns.crt" + RPKI_RTR_TLS_KEY_PATH: "/app/certs/server-dns.key" + RPKI_RTR_TLS_CLIENT_CA_PATH: "/app/certs/client-ca.crt" + RPKI_RTR_DB_PATH: "/app/rtr-db" + RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_SLURM_DIR: "/app/slurm" + RPKI_RTR_STRICT_CCR_VALIDATION: "false" + RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + volumes: + - ../../data:/app/data:ro + - ../../rtr-db:/app/rtr-db + - ../../data:/app/slurm:ro + - ../../tests/fixtures/tls:/app/certs:ro + - ../../logs/server:/app/logs diff --git a/deploy/server/docker-compose.yml b/deploy/server/docker-compose.yml index 53d72f0..7d55e77 100644 --- a/deploy/server/docker-compose.yml +++ b/deploy/server/docker-compose.yml @@ -11,6 +11,8 @@ services: ports: - "323:323" - "324:324" + # SSH mode example: + # - "22:22" environment: RPKI_RTR_ENABLE_TLS: "false" RPKI_RTR_TCP_ADDR: "0.0.0.0:323" @@ -20,9 +22,21 @@ services: RPKI_RTR_SLURM_DIR: "/app/slurm" RPKI_RTR_STRICT_CCR_VALIDATION: "false" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + RUST_LOG: "info" + # SSH mode example: + # RPKI_RTR_ENABLE_SSH: "true" + # RPKI_RTR_SSH_ADDR: "0.0.0.0:22" + # RPKI_RTR_SSH_PORT: "22" + # RPKI_RTR_SSH_HOST_KEY_PATH: "/app/certs/ssh_host_ed25519_key" + # RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH: "/app/certs/rtr-authorized_keys" + # RPKI_RTR_SSH_USERNAME: "rpki-rtr" + # RPKI_RTR_SSH_SUBSYSTEM_NAME: "rpki-rtr" + # Optional: enable password auth in addition to publickey + # RPKI_RTR_SSH_PASSWORD: "test-password" volumes: - ../../data:/app/data:ro - ../../rtr-db:/app/rtr-db - ../../data:/app/slurm:ro + - ../../logs/server:/app/logs # TLS mode example: # - ../../certs:/app/certs:ro diff --git a/deploy/server/supervisord.conf b/deploy/server/supervisord.conf index f35bc5b..2355ce7 100644 --- a/deploy/server/supervisord.conf +++ b/deploy/server/supervisord.conf @@ -12,7 +12,9 @@ startretries=3 stopsignal=TERM stopasgroup=true killasgroup=true -stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/fd/2 -stderr_logfile_maxbytes=0 +stdout_logfile=/app/logs/rpki-rtr.stdout.log +stdout_logfile_maxbytes=50MB +stdout_logfile_backups=10 +stderr_logfile=/app/logs/rpki-rtr.stderr.log +stderr_logfile_maxbytes=50MB +stderr_logfile_backups=10 diff --git a/src/bin/rpki_rs_test_client/main.rs b/src/bin/rpki_rs_test_client/main.rs index 3c9a061..c4ab1dc 100644 --- a/src/bin/rpki_rs_test_client/main.rs +++ b/src/bin/rpki_rs_test_client/main.rs @@ -1,42 +1,41 @@ +use std::collections::BTreeSet; use std::env; use std::io; +use std::net::IpAddr; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Instant; use rustls::{ClientConfig as RustlsClientConfig, RootCertStore}; use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use tokio::time::{timeout, Duration}; use tokio_rustls::TlsConnector; use rpki_rs::rtr::client::{Client, PayloadError, PayloadTarget}; use rpki_rs::rtr::payload::{Action, Payload, Timing}; -const DEFAULT_TIMEOUT_SECS: u64 = 10; const DEFAULT_STEPS: usize = 1; +const DEFAULT_STEP_TIMEOUT_SECS: u64 = 300; +const DEFAULT_PROGRESS_EVERY: u64 = 10_000; trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {} impl AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {} + type DynStream = Box; #[derive(Debug, Clone)] struct Config { addr: String, - version: u8, - mode: QueryMode, steps: usize, follow: bool, transport: TransportConfig, assert_substr: Vec, assert_min_records: Option, print_records: bool, -} - -#[derive(Debug, Clone, Copy)] -enum QueryMode { - Reset, - SerialAuto, - Serial { session_id: u16, serial: u32 }, + step_timeout_secs: u64, + progress_every: u64, } impl Config { @@ -50,54 +49,74 @@ impl Config { let mut assert_substr = Vec::new(); let mut assert_min_records = None; let mut print_records = false; + let mut step_timeout_secs = DEFAULT_STEP_TIMEOUT_SECS; + let mut progress_every = DEFAULT_PROGRESS_EVERY; while let Some(arg) = args.next() { match arg.as_str() { - "--version" => { - let _ = args.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "--version requires value") - })?; - // rpki-rs v0.18 client only exposes Client::new without - // initial-version override. Keep this option as reserved. + "-h" | "--help" => { + print_usage(); + std::process::exit(0); } + "--steps" => { let v = args.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "--steps requires value") })?; steps = parse_usize_arg(&v, "--steps")?; + if steps == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--steps must be >= 1", + )); + } } + "--follow" => { follow = true; } + "--tls" => { - if matches!(transport, TransportConfig::Tcp) { - transport = TransportConfig::Tls(TlsConfig::default()); - } + ensure_tls(&mut transport)?; } + "--ca-cert" => { let v = args.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "--ca-cert requires path") })?; ensure_tls(&mut transport)?.ca_cert = Some(PathBuf::from(v)); } + "--server-name" => { let v = args.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "--server-name requires value") + io::Error::new( + io::ErrorKind::InvalidInput, + "--server-name requires value", + ) })?; ensure_tls(&mut transport)?.server_name = Some(v); } + "--client-cert" => { let v = args.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "--client-cert requires path") + io::Error::new( + io::ErrorKind::InvalidInput, + "--client-cert requires path", + ) })?; ensure_tls(&mut transport)?.client_cert = Some(PathBuf::from(v)); } + "--client-key" => { let v = args.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "--client-key requires path") + io::Error::new( + io::ErrorKind::InvalidInput, + "--client-key requires path", + ) })?; ensure_tls(&mut transport)?.client_key = Some(PathBuf::from(v)); } + "--assert-substr" => { let v = args.next().ok_or_else(|| { io::Error::new( @@ -107,6 +126,7 @@ impl Config { })?; assert_substr.push(v); } + "--assert-min-records" => { let v = args.next().ok_or_else(|| { io::Error::new( @@ -116,21 +136,71 @@ impl Config { })?; assert_min_records = Some(parse_usize_arg(&v, "--assert-min-records")?); } + "--print-records" => { print_records = true; } + + "--step-timeout-secs" => { + let v = args.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "--step-timeout-secs requires value", + ) + })?; + step_timeout_secs = parse_u64_arg(&v, "--step-timeout-secs")?; + if step_timeout_secs == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--step-timeout-secs must be >= 1", + )); + } + } + + "--progress-every" => { + let v = args.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "--progress-every requires value", + ) + })?; + progress_every = parse_u64_arg(&v, "--progress-every")?; + if progress_every == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--progress-every must be >= 1", + )); + } + } + + // 明确拒绝当前 wrapper 不支持的能力 + "--version" => { + let _ = args.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "--version requires value") + })?; + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--version is not exposed by this rpki client wrapper", + )); + } + "--timeout" => { let _ = args.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "--timeout requires value") })?; - // This binary relies on rpki-rs client's built-in IO timeout. + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--timeout is not exposed by this rpki client wrapper; use --step-timeout-secs instead", + )); } + _ if arg.starts_with("--") => { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("unknown option '{}'", arg), )); } + _ => positional.push(arg), } } @@ -139,56 +209,72 @@ impl Config { let addr = positional .next() .unwrap_or_else(|| "127.0.0.1:323".to_string()); - let version = positional - .next() - .map(|v| parse_u8_arg(&v, "version")) - .transpose()? - .unwrap_or(2); - let mode = match positional.next().as_deref() { - None | Some("reset") => QueryMode::Reset, - Some("serial") if positional.clone().next().is_none() => QueryMode::SerialAuto, - Some("serial") => { - let session_id = parse_u16_arg( - &positional.next().ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "serial mode requires session_id and serial", - ) - })?, - "session_id", - )?; - let serial = parse_u32_arg( - &positional.next().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "serial mode requires serial") - })?, - "serial", - )?; - QueryMode::Serial { session_id, serial } - } - Some(other) => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid mode '{}', expected 'reset' or 'serial'", other), - )); - } - }; + + if let Some(extra) = positional.next() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "unexpected positional argument '{}'; only optional [addr] is supported", + extra + ), + )); + } let transport = finalize_transport(transport, &addr)?; Ok(Self { addr, - version, - mode, steps, follow, transport, assert_substr, assert_min_records, print_records, + step_timeout_secs, + progress_every, }) } } +fn print_usage() { + eprintln!( + "\ +rpki-rs-test-client + +Usage: + rpki-rs-test-client [OPTIONS] [ADDR] + +Examples: + rpki-rs-test-client + rpki-rs-test-client 127.0.0.1:323 --steps 1 + rpki-rs-test-client 127.0.0.1:323 --steps 1 --step-timeout-secs 600 + rpki-rs-test-client 127.0.0.1:323 --follow + rpki-rs-test-client 127.0.0.1:323 --assert-min-records 1 --assert-substr 192.0.2. + rpki-rs-test-client 127.0.0.1:3324 --tls --ca-cert certs/ca.pem --server-name localhost + +Options: + --steps Number of client.step() calls to perform (default: 1) + --follow Keep calling step() forever + --tls Enable TLS + --ca-cert CA certificate PEM file (required in TLS mode) + --server-name TLS server name; required when ADDR host is an IP + --client-cert Client certificate PEM file (optional, with --client-key) + --client-key Client private key PEM file (optional, with --client-cert) + --assert-substr Assert final stable record dump contains substring + --assert-min-records Assert final record count >= N + --print-records Print records after each successful step + --step-timeout-secs Timeout for each step() call in seconds (default: 300) + --progress-every Print apply progress every N updates (default: 10000) + -h, --help Show this help + +Not supported by this wrapper: + --version + --timeout + explicit serial bootstrap via session_id/serial +" + ); +} + #[derive(Debug, Clone)] enum TransportConfig { Tcp, @@ -207,6 +293,7 @@ fn ensure_tls(transport: &mut TransportConfig) -> io::Result<&mut TlsConfig> { if matches!(transport, TransportConfig::Tcp) { *transport = TransportConfig::Tls(TlsConfig::default()); } + match transport { TransportConfig::Tls(cfg) => Ok(cfg), TransportConfig::Tcp => Err(io::Error::other("tls configuration unavailable")), @@ -234,16 +321,26 @@ fn finalize_transport(transport: TransportConfig, addr: &str) -> io::Result name, + None => { + let host = parse_host_from_addr(addr).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "failed to parse host from address", + ) + })?; + + if host.parse::().is_ok() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "TLS with IP address requires explicit --server-name", + )); + } + + host + } + }; Ok(TransportConfig::Tls(TlsConfig { server_name: Some(server_name), @@ -255,169 +352,231 @@ fn finalize_transport(transport: TransportConfig, addr: &str) -> io::Result, - timing: Option, - announced: u64, - withdrawn: u64, +#[derive(Debug, Default, Clone)] +struct SharedTarget { + inner: Arc>, } -impl InMemoryTarget { +#[derive(Debug)] +struct TargetState { + records: BTreeSet, + timing: Option, + announced_seen: u64, + withdrawn_seen: u64, + updates_applied_total: u64, + progress_every: u64, + apply_batches: u64, + last_apply_started_at: Option, +} + +impl Default for TargetState { + fn default() -> Self { + Self { + records: BTreeSet::new(), + timing: None, + announced_seen: 0, + withdrawn_seen: 0, + updates_applied_total: 0, + progress_every: DEFAULT_PROGRESS_EVERY, + apply_batches: 0, + last_apply_started_at: None, + } + } +} + +#[derive(Debug, Clone)] +struct TargetSnapshot { + records: Vec, + timing: Option, + announced_seen: u64, + withdrawn_seen: u64, + updates_applied_total: u64, + apply_batches: u64, +} + +impl SharedTarget { + fn new(progress_every: u64) -> Self { + let state = TargetState { + progress_every, + ..TargetState::default() + }; + + Self { + inner: Arc::new(Mutex::new(state)), + } + } + + fn snapshot(&self) -> TargetSnapshot { + let guard = self.inner.lock().expect("target mutex poisoned"); + TargetSnapshot { + records: guard.records.iter().cloned().collect(), + timing: guard.timing, + announced_seen: guard.announced_seen, + withdrawn_seen: guard.withdrawn_seen, + updates_applied_total: guard.updates_applied_total, + apply_batches: guard.apply_batches, + } + } + + fn payload_to_stable_text(payload: &Payload) -> String { + format!("{:?}", payload) + } +} + +impl TargetSnapshot { fn dump_text(&self) -> String { self.records .iter() - .map(|p| format!("{:?}", p)) + .map(SharedTarget::payload_to_stable_text) .collect::>() .join("\n") } } -impl PayloadTarget for InMemoryTarget { +impl PayloadTarget for SharedTarget { type Update = Vec<(Action, Payload)>; fn start(&mut self, reset: bool) -> Self::Update { if reset { - self.records.clear(); + let mut guard = self.inner.lock().expect("target mutex poisoned"); + println!( + "[target] start reset=true | clearing existing records={}", + guard.records.len() + ); + guard.records.clear(); + } else { + let guard = self.inner.lock().expect("target mutex poisoned"); + println!( + "[target] start reset=false | current records={}", + guard.records.len() + ); } + Vec::new() } fn apply(&mut self, update: Self::Update, timing: Timing) -> Result<(), PayloadError> { + let total = update.len() as u64; + + { + let mut guard = self.inner.lock().expect("target mutex poisoned"); + guard.apply_batches += 1; + guard.last_apply_started_at = Some(Instant::now()); + println!( + "[target] apply batch #{} started | updates={} | current records={}", + guard.apply_batches, + total, + guard.records.len() + ); + } + + let started = Instant::now(); + let mut local_processed: u64 = 0; + + let progress_every = { + let guard = self.inner.lock().expect("target mutex poisoned"); + guard.progress_every + }; + + let mut guard = self.inner.lock().expect("target mutex poisoned"); + for (action, payload) in update { match action { Action::Announce => { - self.announced += 1; - if self.records.iter().any(|p| p == &payload) { + guard.announced_seen += 1; + if !guard.records.insert(payload) { return Err(PayloadError::DuplicateAnnounce); } - self.records.push(payload); } Action::Withdraw => { - self.withdrawn += 1; - if let Some(pos) = self.records.iter().position(|p| p == &payload) { - self.records.swap_remove(pos); - } else { + guard.withdrawn_seen += 1; + if !guard.records.remove(&payload) { return Err(PayloadError::UnknownWithdraw); } } } + + local_processed += 1; + guard.updates_applied_total += 1; + + if local_processed % progress_every == 0 || local_processed == total { + println!( + "[target] apply progress | batch_processed={}/{} | total_updates_seen={} | records={} | elapsed={:.2?}", + local_processed, + total, + guard.updates_applied_total, + guard.records.len(), + started.elapsed(), + ); + } } - self.timing = Some(timing); + + guard.timing = Some(timing); + + println!( + "[target] apply batch complete | updates={} | records={} | announced_seen={} | withdrawn_seen={} | elapsed={:.2?}", + total, + guard.records.len(), + guard.announced_seen, + guard.withdrawn_seen, + started.elapsed(), + ); + Ok(()) } } -#[tokio::main] -async fn main() -> io::Result<()> { - let config = Config::from_args()?; - println!("== rpki_rs_test_client =="); - println!("target : {}", config.addr); - println!("version : {}", config.version); +fn print_step_summary( + step_no: usize, + before: &TargetSnapshot, + after: &TargetSnapshot, + print_records: bool, +) { println!( - "mode : {}", - match config.mode { - QueryMode::Reset => "reset".to_string(), - QueryMode::SerialAuto => "serial(auto)".to_string(), - QueryMode::Serial { session_id, serial } => { - format!("serial sid={} serial={}", session_id, serial) + "[step] {} ok | records: {} -> {} | delta announce={} withdraw={} | delta updates={} | apply_batches={}", + step_no, + before.records.len(), + after.records.len(), + after.announced_seen.saturating_sub(before.announced_seen), + after.withdrawn_seen.saturating_sub(before.withdrawn_seen), + after + .updates_applied_total + .saturating_sub(before.updates_applied_total), + after.apply_batches, + ); + + if let Some(timing) = after.timing { + println!( + "[step] {} timing | refresh={} retry={} expire={}", + step_no, timing.refresh, timing.retry, timing.expire + ); + } + + if print_records { + println!("-- records after step {} --", step_no); + if after.records.is_empty() { + println!("(empty)"); + } else { + for rec in &after.records { + println!("{}", SharedTarget::payload_to_stable_text(rec)); } } - ); - println!("steps : {}", config.steps); - println!("follow : {}", config.follow); - println!( - "timeout : {}s (from rpki-rs client IO timeout)", - DEFAULT_TIMEOUT_SECS - ); - - if config.version != 2 { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "rpki-rs v0.18 client API does not expose initial-version override; please use version 2", - )); } - if matches!(config.mode, QueryMode::Serial { .. }) { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "rpki-rs v0.18 Client::new cannot bootstrap explicit serial state; use reset mode", - )); - } - - let stream = connect_stream(&config).await?; - let target = InMemoryTarget::default(); - let mut client = Client::new(stream, target, None); - - let bootstrap_steps = match config.mode { - QueryMode::SerialAuto if config.steps < 2 => 2, - _ => config.steps, - }; - if bootstrap_steps != config.steps { - println!( - "steps adjusted : {} -> {} (serial(auto) needs at least 2 steps)", - config.steps, bootstrap_steps - ); - } - - for idx in 0..bootstrap_steps { - client.step().await.map_err(|err| { - io::Error::new(err.kind(), format!("step {} failed: {}", idx + 1, err)) - })?; - println!("[step] bootstrap {} ok", idx + 1); - } - - if config.follow { - println!("[follow] enabled, entering continuous step loop"); - let mut step_index = bootstrap_steps; - loop { - step_index += 1; - client.step().await.map_err(|err| { - io::Error::new(err.kind(), format!("step {} failed: {}", step_index, err)) - })?; - println!("[step] follow {} ok", step_index); - } - } - - let negotiated_state = client.state(); - let target = client.into_target(); - println!("state : {:?}", negotiated_state); - if let Some(timing) = target.timing { - println!( - "timing : refresh={} retry={} expire={}", - timing.refresh, timing.retry, timing.expire - ); - } - println!("records : {}", target.records.len()); - println!( - "updates : announce={} withdraw={}", - target.announced, target.withdrawn - ); - - if config.print_records { - println!("-- records --"); - for rec in &target.records { - println!("{:?}", rec); - } - } - - run_assertions(&config, &target)?; - println!("[assert] passed"); - Ok(()) } -fn run_assertions(config: &Config, target: &InMemoryTarget) -> io::Result<()> { +fn run_assertions(config: &Config, snapshot: &TargetSnapshot) -> io::Result<()> { if let Some(min) = config.assert_min_records - && target.records.len() < min + && snapshot.records.len() < min { return Err(io::Error::other(format!( "assertion failed: records {} < {}", - target.records.len(), + snapshot.records.len(), min ))); } if !config.assert_substr.is_empty() { - let dump = target.dump_text(); + let dump = snapshot.dump_text(); for needle in &config.assert_substr { if !dump.contains(needle) { return Err(io::Error::other(format!( @@ -427,6 +586,136 @@ fn run_assertions(config: &Config, target: &InMemoryTarget) -> io::Result<()> { } } } + + Ok(()) +} + +#[tokio::main] +async fn main() -> io::Result<()> { + let config = Config::from_args()?; + + println!("== rpki-rs-test-client =="); + println!("target : {}", config.addr); + println!("steps : {}", config.steps); + println!("follow : {}", config.follow); + println!("step_timeout_secs : {}", config.step_timeout_secs); + println!("progress_every : {}", config.progress_every); + + match &config.transport { + TransportConfig::Tcp => { + println!("transport : tcp"); + } + TransportConfig::Tls(tls) => { + println!("transport : tls"); + println!( + "server_name : {}", + tls.server_name.as_deref().unwrap_or("") + ); + println!( + "ca_cert : {}", + tls.ca_cert + .as_ref() + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "".to_string()) + ); + } + } + + let stream = connect_stream(&config).await?; + let target = SharedTarget::new(config.progress_every); + let inspect = target.clone(); + + let mut client = Client::new(stream, target, None); + + for idx in 0..config.steps { + let step_no = idx + 1; + let before = inspect.snapshot(); + + println!("[step] {} begin", step_no); + let step_started = Instant::now(); + + timeout(Duration::from_secs(config.step_timeout_secs), client.step()) + .await + .map_err(|_| { + io::Error::new( + io::ErrorKind::TimedOut, + format!( + "step {} timed out after {}s", + step_no, config.step_timeout_secs + ), + ) + })? + .map_err(|err| io::Error::new(err.kind(), format!("step {} failed: {}", step_no, err)))?; + + let after = inspect.snapshot(); + print_step_summary(step_no, &before, &after, config.print_records); + println!( + "[step] {} finished in {:.2?}", + step_no, + step_started.elapsed() + ); + } + + if config.follow { + let mut step_index = config.steps; + println!("[follow] enabled, entering continuous step loop"); + + loop { + step_index += 1; + let before = inspect.snapshot(); + + println!("[step] {} begin", step_index); + let step_started = Instant::now(); + + timeout(Duration::from_secs(config.step_timeout_secs), client.step()) + .await + .map_err(|_| { + io::Error::new( + io::ErrorKind::TimedOut, + format!( + "step {} timed out after {}s", + step_index, config.step_timeout_secs + ), + ) + })? + .map_err(|err| { + io::Error::new(err.kind(), format!("step {} failed: {}", step_index, err)) + })?; + + let after = inspect.snapshot(); + print_step_summary(step_index, &before, &after, config.print_records); + println!( + "[step] {} finished in {:.2?}", + step_index, + step_started.elapsed() + ); + } + } + + let state = client.state(); + let final_snapshot = inspect.snapshot(); + + println!("state : {:?}", state); + if let Some(timing) = final_snapshot.timing { + println!( + "timing : refresh={} retry={} expire={}", + timing.refresh, timing.retry, timing.expire + ); + } + println!("records : {}", final_snapshot.records.len()); + println!( + "updates_seen : announce={} withdraw={}", + final_snapshot.announced_seen, final_snapshot.withdrawn_seen + ); + println!( + "updates_applied : {}", + final_snapshot.updates_applied_total + ); + println!("apply_batches : {}", final_snapshot.apply_batches); + + run_assertions(&config, &final_snapshot)?; + println!("[assert] passed"); + Ok(()) } @@ -440,22 +729,26 @@ async fn connect_stream(config: &Config) -> io::Result { async fn connect_tls_stream(addr: &str, tls: &TlsConfig) -> io::Result { let stream = TcpStream::connect(addr).await?; let connector = build_tls_connector(tls)?; + let server_name_str = tls .server_name .as_ref() .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing TLS server name"))?; + let server_name = ServerName::try_from(server_name_str.clone()).map_err(|err| { io::Error::new( io::ErrorKind::InvalidInput, format!("invalid TLS server name '{}': {}", server_name_str, err), ) })?; + let tls_stream = connector.connect(server_name, stream).await.map_err(|err| { io::Error::new( io::ErrorKind::ConnectionAborted, format!("TLS handshake failed: {}", err), ) })?; + Ok(Box::new(tls_stream)) } @@ -464,10 +757,11 @@ fn build_tls_connector(tls: &TlsConfig) -> io::Result { .ca_cert .as_ref() .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing TLS CA cert"))?; + let ca_certs = load_certs(ca_cert_path)?; let mut roots = RootCertStore::empty(); - let (added, _) = roots.add_parsable_certificates(ca_certs); + let (added, _ignored) = roots.add_parsable_certificates(ca_certs); if added == 0 { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -476,6 +770,7 @@ fn build_tls_connector(tls: &TlsConfig) -> io::Result { } let builder = RustlsClientConfig::builder().with_root_certificates(roots); + let cfg = match (&tls.client_cert, &tls.client_key) { (Some(cert_path), Some(key_path)) => { let certs = load_certs(cert_path)?; @@ -490,6 +785,7 @@ fn build_tls_connector(tls: &TlsConfig) -> io::Result { (None, None) => builder.with_no_client_auth(), _ => unreachable!(), }; + Ok(TlsConnector::from(Arc::new(cfg))) } @@ -498,12 +794,14 @@ fn load_certs(path: &Path) -> io::Result>> { let certs = rustls_pemfile::certs(&mut reader) .collect::, _>>() .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + if certs.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidData, format!("no certs found in {}", path.display()), )); } + Ok(certs) } @@ -519,40 +817,13 @@ fn load_private_key(path: &Path) -> io::Result> { }) } -fn default_server_name_for_addr(addr: &str) -> Option { +fn parse_host_from_addr(addr: &str) -> Option { if let Some(rest) = addr.strip_prefix('[') { return rest.split(']').next().map(str::to_string); } addr.rsplit_once(':').map(|(host, _)| host.to_string()) } -fn parse_u8_arg(value: &str, name: &str) -> io::Result { - value.parse::().map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid {} '{}': {}", name, value, err), - ) - }) -} - -fn parse_u16_arg(value: &str, name: &str) -> io::Result { - value.parse::().map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid {} '{}': {}", name, value, err), - ) - }) -} - -fn parse_u32_arg(value: &str, name: &str) -> io::Result { - value.parse::().map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid {} '{}': {}", name, value, err), - ) - }) -} - fn parse_usize_arg(value: &str, name: &str) -> io::Result { value.parse::().map_err(|err| { io::Error::new( @@ -561,3 +832,12 @@ fn parse_usize_arg(value: &str, name: &str) -> io::Result { ) }) } + +fn parse_u64_arg(value: &str, name: &str) -> io::Result { + value.parse::().map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid {} '{}': {}", name, value, err), + ) + }) +} \ No newline at end of file diff --git a/src/bin/rtr_debug_client/README.md b/src/bin/rtr_debug_client/README.md index 3cdd2fb..b536cf2 100644 --- a/src/bin/rtr_debug_client/README.md +++ b/src/bin/rtr_debug_client/README.md @@ -1,48 +1,11 @@ # rtr_debug_client -`rtr_debug_client` 是一个轻量级的 RTR 调试客户端,用于手工联调和协议行为观察。 +`rtr_debug_client` 是用于 RTR 协议联调的命令行调试客户端,支持 `TCP`、`TLS`、`SSH` 三种传输。 -它适合以下场景: -- 在开发阶段验证 RTR server 的行为 -- 发送 `Reset Query` 和 `Serial Query` -- 观察服务端返回的各类 PDU -- 检查会话状态、`session_id`、`serial` 的变化 -- 排查 `ErrorReport`、`CacheReset`、`SerialNotify`、`RouterKey`、`ASPA` -- 联调纯 TCP 和 TLS 两种 RTR 传输方式 - -它不是生产级 router client,而是一个便于调试和观察协议细节的小工具。 - -## 当前支持的能力 - -当前版本支持: -- 纯 TCP 连接 -- TLS 连接 -- TLS 服务端证书校验 -- 可选的 TLS 客户端证书认证 -- 发送 `Reset Query` -- 发送 `Serial Query` -- 保持长连接持续接收服务端 PDU -- 格式化展示以下 PDU: - - `Serial Notify` - - `Serial Query` - - `Reset Query` - - `Cache Response` - - `IPv4 Prefix` - - `IPv6 Prefix` - - `Router Key` - - `ASPA` - - `End of Data` - - `Cache Reset` - - `Error Report` -- 结构化展示 `ErrorReport`: - - 错误码及语义名称 - - encapsulated PDU 的 header 摘要 - - encapsulated PDU 原始 hex - - arbitrary text 是否为 UTF-8 - - arbitrary text 内容 -- 根据 `EndOfData` 的 timing hint 自动轮询 -- 收到 `ErrorReport` 后默认暂停自动轮询 -- 通过 `--keep-after-error` 保持错误后的自动轮询 +它用于: +- 手动发送 `Reset Query`、`Serial Query` +- 持续接收并打印服务端 PDU +- 观察 `session_id`、`serial`、`EndOfData` timing hint、`ErrorReport` 等状态变化 ## 构建 @@ -52,8 +15,6 @@ cargo build --bin rtr_debug_client ## 基本用法 -基本形式: - ```sh cargo run --bin rtr_debug_client -- [reset|serial ] [options] ``` @@ -62,32 +23,22 @@ cargo run --bin rtr_debug_client -- [reset|serial - `addr`: `127.0.0.1:323` - `version`: `1` - `mode`: `reset` -- `timeout`: `30` -- `poll`: `600` +- `--timeout`: `30` +- `--poll`: `600` -## TCP 示例 - -发送 `Reset Query`: +## TCP 用法 ```sh cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset ``` -发送 `Serial Query`: - ```sh cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 serial 42 100 ``` -持续观察错误路径: +## TLS 用法 -```sh -cargo run --bin rtr_debug_client -- 127.0.0.1:323 1 reset --keep-after-error -``` - -## TLS 示例 - -只做服务端证书校验: +仅校验服务端证书: ```sh cargo run --bin rtr_debug_client -- \ @@ -97,7 +48,7 @@ cargo run --bin rtr_debug_client -- \ --server-name localhost ``` -双向 TLS 认证: +双向 TLS: ```sh cargo run --bin rtr_debug_client -- \ @@ -109,127 +60,91 @@ cargo run --bin rtr_debug_client -- \ --client-key tests/fixtures/tls/client-good.key ``` -双向 TLS + 错误后继续自动轮询: +## SSH 用法(按 draft-ietf-sidrops-8210bis-25) + +`rtr_debug_client --ssh` 采用以下流程: +- SSHv2 连接 +- `session` channel +- 请求 `subsystem`,默认 `rpki-rtr` +- 使用 `publickey` 认证 +- 强制服务端 host key 校验(`known_hosts` 或 pinned server key 二选一) + +### 1. 使用 known_hosts 校验服务端 ```sh cargo run --bin rtr_debug_client -- \ - 127.0.0.1:324 1 reset \ - --tls \ - --ca-cert tests/fixtures/tls/client-ca.crt \ - --server-name localhost \ - --client-cert tests/fixtures/tls/client-good.crt \ - --client-key tests/fixtures/tls/client-good.key \ - --keep-after-error + 127.0.0.1:22 1 reset \ + --ssh \ + --ssh-user rpki-rtr \ + --ssh-key certs/rtr-client.key \ + --ssh-known-hosts certs/known_hosts ``` -说明: -- 开启 `--tls` 时必须提供 `--ca-cert` -- 如果目标地址本身不适合直接作为 TLS 名称,显式提供 `--server-name` -- 客户端认证必须同时提供 `--client-cert` 和 `--client-key` +### 2. 使用固定服务端公钥校验 -## 命令行参数 +```sh +cargo run --bin rtr_debug_client -- \ + 127.0.0.1:22 1 reset \ + --ssh \ + --ssh-user rpki-rtr \ + --ssh-key certs/rtr-client.key \ + --ssh-server-key certs/ssh_host_ed25519_key.pub +``` +### 3. 自定义 subsystem 名称 + +```sh +cargo run --bin rtr_debug_client -- \ + 127.0.0.1:22 1 reset \ + --ssh \ + --ssh-user rpki-rtr \ + --ssh-key certs/rtr-client.key \ + --ssh-known-hosts certs/known_hosts \ + --ssh-subsystem rpki-rtr +``` + +## 参数说明 + +通用参数: +- `--timeout `:读取 PDU 超时时间(秒) +- `--poll `:默认自动轮询间隔(秒) +- `--keep-after-error`:收到 `ErrorReport` 后不暂停自动轮询 +- `--summary-only`:仅打印摘要,抑制 payload PDU 详细内容 + +TLS 参数: - `--tls` - 使用 TLS 而不是纯 TCP。 - - `--ca-cert ` - 用于校验服务端证书的 CA 证书文件,PEM 格式。 - - `--server-name ` - TLS 握手时用于校验证书的服务端名称。 - - `--client-cert ` - 双向 TLS 时使用的客户端证书,PEM 格式。 - - `--client-key ` - 与 `--client-cert` 配套的客户端私钥,PEM 格式。 -- `--timeout ` - 等待下一个 PDU 的读取超时时间,单位秒。 +SSH 参数: +- `--ssh` +- `--ssh-user ` +- `--ssh-key `(OpenSSH 私钥) +- `--ssh-subsystem `(默认 `rpki-rtr`) +- `--ssh-known-hosts ` 或 `--ssh-server-key `(二选一,必须提供) -- `--poll ` - 在尚未拿到 `EndOfData` timing hint 前,默认使用的自动轮询间隔。默认值为 `600` 秒,对齐 draft 第 6 节的默认 Retry Interval。 +## SSH 连通性测试建议 -- `--keep-after-error` - 收到 `ErrorReport` 后不暂停自动轮询。 +如果你已经在 Docker 中启动了支持 SSH 的 RTR server,可按以下方式验证: -## 运行中可用命令 +1. 先用 `ssh` 命令确认认证与 host key 配置正确(可连通)。 +2. 再用 `rtr_debug_client --ssh` 发起连接并发送 `reset`。 +3. 观察是否收到 `Cache Response` 和 `EndOfData`。 -程序启动后,可以在控制台输入以下命令: +如果 `rtr_debug_client` 报 `failed to request SSH subsystem 'rpki-rtr'`,通常表示服务端未开启对应 subsystem 名称,或名称不一致。 +## 运行时交互命令 + +客户端启动后可在标准输入中使用: - `help` - 显示帮助。 - - `state` - 打印当前客户端状态。 - +- `version` / `version ` - `reset` - 发送 `Reset Query`。 - -- `serial` - 使用当前 `session_id` 和 `serial` 发送 `Serial Query`。 - -- `serial ` - 使用显式参数发送 `Serial Query`。 - -- `timeout` - 查看当前读取超时设置。 - -- `timeout ` - 修改读取超时。 - -- `poll` - 查看当前自动轮询间隔、轮询来源以及暂停状态。 - -- `poll ` - 手工覆盖当前轮询间隔。 - -- `poll pause` - 暂停自动轮询。 - -- `poll resume` - 恢复自动轮询。 - +- `serial` / `serial ` +- `timeout` / `timeout ` +- `poll` / `poll ` / `poll pause` / `poll resume` - `keep-after-error` - 查看当前是否启用了错误后持续轮询。 - +- `output` / `output verbose` / `output summary` - `quit` - 退出客户端。 - -## 自动轮询行为 - -客户端会保持连接,并周期性地向服务端发起下一次查询。 - -选择下一次轮询间隔的优先级如下: -1. `retry`,当最近一次 `ErrorReport` 是 `No Data Available` 或 `Transport Failure` -2. `refresh`,如果已经从 `EndOfData` 中拿到 -3. 启动参数里的默认轮询间隔 - -收到 `ErrorReport` 后的默认行为: -- 默认暂停自动轮询 -- 连接保持不关,方便继续观察 -- 你可以手工输入 `reset`、`serial` 或 `poll resume` 继续 - -如果带了 `--keep-after-error`: -- 收到 `ErrorReport` 后不会暂停 -- 会继续按当前有效轮询间隔自动轮询 - -特殊情况: -- 当最近一次错误是 `No Data Available` 或 `Transport Failure` 时,恢复自动轮询后会优先参考 `retry`,而不是继续只看 `refresh` - -## ErrorReport 展示内容 - -`ErrorReport` 会展示以下内容: -- 错误码及其语义名称 -- encapsulated PDU 长度 -- encapsulated PDU 的 header 摘要 - - PDU 类型 - - version - - length - - field1(按类型解释为 `session_id` 或 `error_code`) -- encapsulated PDU 原始 hex -- arbitrary text 长度 -- arbitrary text 是否是 UTF-8 -- arbitrary text 内容 - -这样在排查协议问题时,不需要先手工拆原始 hex,就能快速知道是哪一个请求触发了错误。 diff --git a/src/bin/rtr_debug_client/main.rs b/src/bin/rtr_debug_client/main.rs index 9fa6c57..7b451c6 100644 --- a/src/bin/rtr_debug_client/main.rs +++ b/src/bin/rtr_debug_client/main.rs @@ -2,11 +2,20 @@ use std::env; use std::future::pending; use std::io; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +use russh::client; +use russh::keys::{ + PrivateKeyWithHashAlg, check_known_hosts_path, load_public_key, load_secret_key, +}; +use russh::{ChannelStream, client::Msg as SshClientMsg}; use rustls::{ClientConfig as RustlsClientConfig, RootCertStore}; use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName}; -use tokio::io::{self as tokio_io, AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader, WriteHalf}; +use tokio::io::{ + self as tokio_io, AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader, ReadBuf, WriteHalf, +}; use tokio::net::TcpStream; use tokio::time::{Duration, Instant, timeout}; use tokio_rustls::TlsConnector; @@ -19,8 +28,29 @@ use crate::pretty::{parse_end_of_data_info, parse_serial_notify_serial, print_pd use crate::protocol::{PduHeader, PduType, QueryMode}; use crate::wire::{read_pdu, send_reset_query, send_serial_query}; +macro_rules! println { + () => { + ::std::println!(); + }; + ($($arg:tt)*) => {{ + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z"); + ::std::println!("[{}] {}", ts, format_args!($($arg)*)); + }}; +} + +macro_rules! eprintln { + () => { + ::std::eprintln!(); + }; + ($($arg:tt)*) => {{ + let ts = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z"); + ::std::eprintln!("[{}] {}", ts, format_args!($($arg)*)); + }}; +} + const DEFAULT_READ_TIMEOUT_SECS: u64 = 30; const DEFAULT_POLL_INTERVAL_SECS: u64 = 600; +const DEFAULT_SSH_SUBSYSTEM_NAME: &str = "rpki-rtr"; trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {} impl AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {} @@ -60,7 +90,9 @@ async fn main() -> io::Result<()> { } } println!(); - print_help(); + if config.output_mode == OutputMode::Verbose { + print_help(); + } let mut state = ClientState::new( config.version, @@ -78,7 +110,9 @@ async fn main() -> io::Result<()> { let stream = loop { match connect_stream(&config).await { Ok(stream) => { - println!("connected to {}", config.addr); + if state.output_mode == OutputMode::Verbose { + println!("connected to {}", config.addr); + } break stream; } Err(err) => { @@ -175,10 +209,12 @@ async fn main() -> io::Result<()> { return Err(err); } Err(_) => { - println!( - "[timeout] no PDU received in {}s, connection kept open.", - state.read_timeout_secs - ); + if state.output_mode == OutputMode::Verbose { + println!( + "[timeout] no PDU received in {}s, connection kept open.", + state.read_timeout_secs + ); + } } } } @@ -188,7 +224,9 @@ async fn main() -> io::Result<()> { if reconnect { let delay = state.reconnect_delay_secs(); state.current_session_id = None; - println!("[reconnect] transport disconnected, retry after {}s", delay); + if state.output_mode == OutputMode::Verbose { + println!("[reconnect] transport disconnected, retry after {}s", delay); + } let reconnect_sleep = tokio::time::sleep(Duration::from_secs(delay)); tokio::pin!(reconnect_sleep); @@ -235,7 +273,9 @@ async fn main() -> io::Result<()> { } if reconnect_now { - println!("[reconnect] user requested immediate reconnect"); + if state.output_mode == OutputMode::Verbose { + println!("[reconnect] user requested immediate reconnect"); + } } } } @@ -252,28 +292,36 @@ async fn send_resume_query( state.serial = None; state.current_session_id = None; send_reset_query(writer, state.version).await?; - println!("reconnected, send Reset Query (forced)"); + if state.output_mode == OutputMode::Verbose { + println!("reconnected, send Reset Query (forced)"); + } return Ok(()); } match (state.session_id, state.serial) { (Some(session_id), Some(serial)) => { - println!( - "reconnected, send Serial Query with session_id={}, serial={}", - session_id, serial - ); + if state.output_mode == OutputMode::Verbose { + println!( + "reconnected, send Serial Query with session_id={}, serial={}", + session_id, serial + ); + } send_serial_query(writer, state.version, session_id, serial).await?; } _ => match mode { QueryMode::Reset => { send_reset_query(writer, state.version).await?; - println!("sent Reset Query"); + if state.output_mode == OutputMode::Verbose { + println!("sent Reset Query"); + } } QueryMode::Serial { session_id, serial } => { state.session_id = Some(*session_id); state.serial = Some(*serial); send_serial_query(writer, state.version, *session_id, *serial).await?; - println!("sent Serial Query"); + if state.output_mode == OutputMode::Verbose { + println!("sent Serial Query"); + } } }, } @@ -317,8 +365,6 @@ async fn handle_incoming_pdu( state.session_id = Some(session_id); state.current_session_id = Some(session_id); - println!(); - if let Some(eod) = eod { state.serial = Some(eod.serial); state.refresh = eod.refresh; @@ -326,31 +372,44 @@ async fn handle_incoming_pdu( state.expire = eod.expire; state.last_error_code = None; - println!( - "updated client state: session_id={}, serial={}", - session_id, eod.serial - ); - - if let Some(refresh) = eod.refresh { - println!("refresh : {}", refresh); - } - if let Some(retry) = eod.retry { - println!("retry : {}", retry); - } - if let Some(expire) = eod.expire { - println!("expire : {}", expire); - } - state.schedule_next_poll(); - println!( - "next auto poll scheduled after {}s", - state.effective_poll_secs() - ); + if state.output_mode == OutputMode::Verbose { + println!(); + println!( + "updated client state: session_id={}, serial={}", + session_id, eod.serial + ); + if let Some(refresh) = eod.refresh { + println!("refresh : {}", refresh); + } + if let Some(retry) = eod.retry { + println!("retry : {}", retry); + } + if let Some(expire) = eod.expire { + println!("expire : {}", expire); + } + println!( + "next auto poll scheduled after {}s", + state.effective_poll_secs() + ); + } else { + println!( + "EndOfData: session_id={}, serial={}, next_poll={}s", + session_id, + eod.serial, + state.effective_poll_secs() + ); + } } else { - println!( - "updated client state: session_id={}, serial=", - session_id - ); + if state.output_mode == OutputMode::Verbose { + println!(); + println!( + "updated client state: session_id={}, serial=", + session_id + ); + } else { + println!("EndOfData: session_id={}, serial=", session_id); + } } if state.output_mode == OutputMode::SummaryOnly @@ -363,8 +422,10 @@ async fn handle_incoming_pdu( state.skipped_payload_pdu_count_in_round = 0; } - println!("received EndOfData, keep connection open."); - println!(); + if state.output_mode == OutputMode::Verbose { + println!("received EndOfData, keep connection open."); + println!(); + } } PduType::SerialNotify => { @@ -442,27 +503,35 @@ async fn handle_incoming_pdu( } async fn handle_poll_tick(writer: &mut ClientWriter, state: &mut ClientState) -> io::Result<()> { - println!(); - println!( - "[auto-poll] timer fired (interval={}s)", - state.effective_poll_secs() - ); + if state.output_mode == OutputMode::Verbose { + println!(); + println!( + "[auto-poll] timer fired (interval={}s)", + state.effective_poll_secs() + ); + } match (state.session_id, state.serial) { (Some(session_id), Some(serial)) => { - println!( - "[auto-poll] send Serial Query with session_id={}, serial={}", - session_id, serial - ); + if state.output_mode == OutputMode::Verbose { + println!( + "[auto-poll] send Serial Query with session_id={}, serial={}", + session_id, serial + ); + } send_serial_query(writer, state.version, session_id, serial).await?; } _ => { - println!("[auto-poll] local state incomplete, send Reset Query"); + if state.output_mode == OutputMode::Verbose { + println!("[auto-poll] local state incomplete, send Reset Query"); + } send_reset_query(writer, state.version).await?; } } - println!(); + if state.output_mode == OutputMode::Verbose { + println!(); + } Ok(()) } @@ -869,11 +938,30 @@ impl Config { while let Some(arg) = args.next() { match arg.as_str() { - "--tls" => { - if matches!(transport, TransportConfig::Tcp) { + "--tls" => match transport { + TransportConfig::Tcp => { transport = TransportConfig::Tls(TlsConfig::default()); } - } + TransportConfig::Tls(_) => {} + TransportConfig::Ssh(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--tls cannot be used together with --ssh", + )); + } + }, + "--ssh" => match transport { + TransportConfig::Tcp => { + transport = TransportConfig::Ssh(SshConfig::default()); + } + TransportConfig::Ssh(_) => {} + TransportConfig::Tls(_) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh cannot be used together with --tls", + )); + } + }, "--ca-cert" => { let path = args.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "--ca-cert requires a path") @@ -901,6 +989,45 @@ impl Config { })?; ensure_tls_config(&mut transport)?.server_name = Some(name); } + "--ssh-user" => { + let user = args.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "--ssh-user requires a value") + })?; + ensure_ssh_config(&mut transport)?.user = Some(user); + } + "--ssh-key" => { + let path = args.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "--ssh-key requires a path") + })?; + ensure_ssh_config(&mut transport)?.private_key = Some(PathBuf::from(path)); + } + "--ssh-subsystem" => { + let subsystem = args.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh-subsystem requires a value", + ) + })?; + ensure_ssh_config(&mut transport)?.subsystem = Some(subsystem); + } + "--ssh-known-hosts" => { + let path = args.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh-known-hosts requires a path", + ) + })?; + ensure_ssh_config(&mut transport)?.known_hosts = Some(PathBuf::from(path)); + } + "--ssh-server-key" => { + let path = args.next().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh-server-key requires a path", + ) + })?; + ensure_ssh_config(&mut transport)?.server_key = Some(PathBuf::from(path)); + } "--timeout" => { let secs = args.next().ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "--timeout requires seconds") @@ -1034,6 +1161,7 @@ fn should_print_pdu(output_mode: OutputMode, header: &PduHeader) -> bool { enum TransportConfig { Tcp, Tls(TlsConfig), + Ssh(SshConfig), } impl TransportConfig { @@ -1052,6 +1180,17 @@ impl TransportConfig { .map(|path| path.display().to_string()) .unwrap_or_else(|| "".to_string()) ), + Self::Ssh(cfg) => format!( + "ssh (user={}, subsystem={}, host_key_check={})", + cfg.user.as_deref().unwrap_or(""), + cfg.subsystem + .as_deref() + .unwrap_or(DEFAULT_SSH_SUBSYSTEM_NAME), + cfg.host_key_verification + .as_ref() + .map(HostKeyVerification::describe) + .unwrap_or("") + ), } } } @@ -1064,14 +1203,62 @@ struct TlsConfig { client_key: Option, } -fn ensure_tls_config(transport: &mut TransportConfig) -> io::Result<&mut TlsConfig> { - if matches!(transport, TransportConfig::Tcp) { - *transport = TransportConfig::Tls(TlsConfig::default()); - } +#[derive(Debug, Clone)] +enum HostKeyVerification { + KnownHosts(PathBuf), + PinnedServerKey(PathBuf), +} +impl HostKeyVerification { + fn describe(&self) -> &'static str { + match self { + Self::KnownHosts(_) => "known_hosts", + Self::PinnedServerKey(_) => "pinned_server_key", + } + } +} + +#[derive(Debug, Clone, Default)] +struct SshConfig { + user: Option, + private_key: Option, + subsystem: Option, + known_hosts: Option, + server_key: Option, + host_key_verification: Option, +} + +fn ensure_tls_config(transport: &mut TransportConfig) -> io::Result<&mut TlsConfig> { match transport { + TransportConfig::Tcp => { + *transport = TransportConfig::Tls(TlsConfig::default()); + match transport { + TransportConfig::Tls(cfg) => Ok(cfg), + _ => unreachable!(), + } + } TransportConfig::Tls(cfg) => Ok(cfg), - TransportConfig::Tcp => unreachable!(), + TransportConfig::Ssh(_) => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "TLS options cannot be used together with --ssh", + )), + } +} + +fn ensure_ssh_config(transport: &mut TransportConfig) -> io::Result<&mut SshConfig> { + match transport { + TransportConfig::Tcp => { + *transport = TransportConfig::Ssh(SshConfig::default()); + match transport { + TransportConfig::Ssh(cfg) => Ok(cfg), + _ => unreachable!(), + } + } + TransportConfig::Ssh(cfg) => Ok(cfg), + TransportConfig::Tls(_) => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "SSH options cannot be used together with --tls", + )), } } @@ -1114,6 +1301,68 @@ fn finalize_transport(transport: TransportConfig, addr: &str) -> io::Result { + let user = cfg.user.take().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "SSH mode requires --ssh-user ", + ) + })?; + if user.trim().is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh-user must not be empty", + )); + } + + let private_key = cfg.private_key.take().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "SSH mode requires --ssh-key ", + ) + })?; + + if cfg.known_hosts.is_some() && cfg.server_key.is_some() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "SSH host key verification must choose one: --ssh-known-hosts or --ssh-server-key", + )); + } + + let host_key_verification = if let Some(path) = cfg.known_hosts.take() { + HostKeyVerification::KnownHosts(path) + } else if let Some(path) = cfg.server_key.take() { + HostKeyVerification::PinnedServerKey(path) + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "SSH mode requires host key verification: --ssh-known-hosts or --ssh-server-key ", + )); + }; + + let subsystem = cfg + .subsystem + .take() + .unwrap_or_else(|| DEFAULT_SSH_SUBSYSTEM_NAME.to_string()); + + if subsystem.trim().is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "--ssh-subsystem must not be empty", + )); + } + + let _ = parse_host_port(addr)?; + + Ok(TransportConfig::Ssh(SshConfig { + user: Some(user), + private_key: Some(private_key), + subsystem: Some(subsystem), + known_hosts: None, + server_key: None, + host_key_verification: Some(host_key_verification), + })) + } } } @@ -1121,9 +1370,165 @@ async fn connect_stream(config: &Config) -> io::Result { match &config.transport { TransportConfig::Tcp => Ok(Box::new(TcpStream::connect(&config.addr).await?)), TransportConfig::Tls(tls) => connect_tls_stream(&config.addr, tls).await, + TransportConfig::Ssh(ssh) => connect_ssh_stream(&config.addr, ssh).await, } } +#[derive(Debug, Clone)] +struct SshClientHandler { + host: String, + port: u16, + host_key_verification: HostKeyVerification, +} + +impl client::Handler for SshClientHandler { + type Error = russh::Error; + + async fn check_server_key( + &mut self, + server_public_key: &russh::keys::ssh_key::PublicKey, + ) -> Result { + match &self.host_key_verification { + HostKeyVerification::KnownHosts(path) => { + check_known_hosts_path(&self.host, self.port, server_public_key, path) + .map_err(Into::into) + } + HostKeyVerification::PinnedServerKey(path) => { + let expected_key = load_public_key(path)?; + Ok(expected_key == *server_public_key) + } + } + } +} + +struct SshSessionStream { + channel_stream: ChannelStream, + _session: client::Handle, +} + +impl AsyncRead for SshSessionStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.channel_stream).poll_read(cx, buf) + } +} + +impl AsyncWrite for SshSessionStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.channel_stream).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.channel_stream).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.channel_stream).poll_shutdown(cx) + } +} + +async fn connect_ssh_stream(addr: &str, ssh: &SshConfig) -> io::Result { + let user = ssh + .user + .as_deref() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing SSH user"))?; + let private_key_path = ssh + .private_key + .as_ref() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing SSH private key"))?; + let subsystem = ssh + .subsystem + .as_deref() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing SSH subsystem"))?; + let host_key_verification = ssh.host_key_verification.clone().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "missing SSH host key verification", + ) + })?; + + let (host, port) = parse_host_port(addr)?; + let handler = SshClientHandler { + host, + port, + host_key_verification, + }; + + let session_config = Arc::new(client::Config::default()); + let mut session = client::connect(session_config, addr, handler) + .await + .map_err(|err| { + io::Error::new( + io::ErrorKind::ConnectionAborted, + format!("SSH handshake failed: {}", err), + ) + })?; + + let private_key = load_secret_key(private_key_path, None).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "failed to load SSH private key {}: {}", + private_key_path.display(), + err + ), + ) + })?; + let rsa_hash = session.best_supported_rsa_hash().await.map_err(|err| { + io::Error::new( + io::ErrorKind::ConnectionAborted, + format!("failed to negotiate SSH RSA hash: {}", err), + ) + })?; + let auth_result = session + .authenticate_publickey( + user.to_string(), + PrivateKeyWithHashAlg::new(Arc::new(private_key), rsa_hash.flatten()), + ) + .await + .map_err(|err| { + io::Error::new( + io::ErrorKind::PermissionDenied, + format!("SSH publickey authentication failed: {}", err), + ) + })?; + if !auth_result.success() { + return Err(io::Error::new( + io::ErrorKind::PermissionDenied, + "SSH publickey authentication rejected by server", + )); + } + + let channel = session.channel_open_session().await.map_err(|err| { + io::Error::new( + io::ErrorKind::ConnectionAborted, + format!("failed to open SSH session channel: {}", err), + ) + })?; + channel + .request_subsystem(true, subsystem) + .await + .map_err(|err| { + io::Error::new( + io::ErrorKind::ConnectionAborted, + format!("failed to request SSH subsystem '{}': {}", subsystem, err), + ) + })?; + let channel_stream = channel.into_stream(); + + Ok(Box::new(SshSessionStream { + channel_stream, + _session: session, + })) +} + async fn connect_tls_stream(addr: &str, tls: &TlsConfig) -> io::Result { let stream = TcpStream::connect(addr).await?; let connector = build_tls_connector(tls)?; @@ -1219,6 +1624,44 @@ fn default_server_name_for_addr(addr: &str) -> Option { addr.rsplit_once(':').map(|(host, _port)| host.to_string()) } +fn parse_host_port(addr: &str) -> io::Result<(String, u16)> { + if let Some(rest) = addr.strip_prefix('[') { + let (host, port_part) = rest.split_once("]:").ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid address '{}', expected [host]:port", addr), + ) + })?; + let port = port_part.parse::().map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid port in address '{}': {}", addr, err), + ) + })?; + return Ok((host.to_string(), port)); + } + + let (host, port_part) = addr.rsplit_once(':').ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid address '{}', expected host:port", addr), + ) + })?; + if host.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid address '{}', host must not be empty", addr), + )); + } + let port = port_part.parse::().map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid port in address '{}': {}", addr, err), + ) + })?; + Ok((host.to_string(), port)) +} + fn parse_u64_arg(value: &str, name: &str) -> io::Result { let parsed = value.parse::().map_err(|err| { io::Error::new( diff --git a/src/main.rs b/src/main.rs index 8df60db..117d616 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use anyhow::{Result, anyhow}; +use chrono::{FixedOffset, Utc}; use tokio::task::JoinHandle; use tracing::{info, warn}; @@ -16,8 +17,10 @@ use rpki::source::pipeline::{PayloadLoadConfig, load_payloads_from_latest_source #[derive(Debug, Clone)] struct AppConfig { enable_tls: bool, + enable_ssh: bool, tcp_addr: SocketAddr, tls_addr: SocketAddr, + ssh_addr: SocketAddr, db_path: String, ccr_dir: String, @@ -25,6 +28,11 @@ struct AppConfig { tls_cert_path: String, tls_key_path: String, tls_client_ca_path: String, + ssh_host_key_path: String, + ssh_authorized_keys_path: String, + ssh_username: String, + ssh_subsystem_name: String, + ssh_password: Option, max_delta: u8, prune_delta_by_snapshot_size: bool, @@ -39,8 +47,10 @@ impl Default for AppConfig { fn default() -> Self { Self { enable_tls: false, + enable_ssh: false, tcp_addr: "0.0.0.0:323".parse().expect("invalid default tcp_addr"), tls_addr: "0.0.0.0:324".parse().expect("invalid default tls_addr"), + ssh_addr: "0.0.0.0:22".parse().expect("invalid default ssh_addr"), db_path: "./rtr-db".to_string(), ccr_dir: "./data".to_string(), @@ -48,6 +58,11 @@ impl Default for AppConfig { tls_cert_path: "./certs/server.crt".to_string(), tls_key_path: "./certs/server.key".to_string(), tls_client_ca_path: "./certs/client-ca.crt".to_string(), + ssh_host_key_path: "./certs/ssh_host_ed25519_key".to_string(), + ssh_authorized_keys_path: "./certs/rtr-authorized_keys".to_string(), + ssh_username: "rpki-rtr".to_string(), + ssh_subsystem_name: "rpki-rtr".to_string(), + ssh_password: None, max_delta: 100, prune_delta_by_snapshot_size: false, @@ -74,6 +89,9 @@ impl AppConfig { if let Some(value) = env_var("RPKI_RTR_ENABLE_TLS")? { config.enable_tls = parse_bool(&value, "RPKI_RTR_ENABLE_TLS")?; } + if let Some(value) = env_var("RPKI_RTR_ENABLE_SSH")? { + config.enable_ssh = parse_bool(&value, "RPKI_RTR_ENABLE_SSH")?; + } if let Some(value) = env_var("RPKI_RTR_TCP_ADDR")? { config.tcp_addr = value .parse() @@ -84,6 +102,17 @@ impl AppConfig { .parse() .map_err(|err| anyhow!("invalid RPKI_RTR_TLS_ADDR '{}': {}", value, err))?; } + if let Some(value) = env_var("RPKI_RTR_SSH_ADDR")? { + config.ssh_addr = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_ADDR '{}': {}", value, err))?; + } + if let Some(value) = env_var("RPKI_RTR_SSH_PORT")? { + let port: u16 = value + .parse() + .map_err(|err| anyhow!("invalid RPKI_RTR_SSH_PORT '{}': {}", value, err))?; + config.ssh_addr.set_port(port); + } // data if let Some(value) = env_var("RPKI_RTR_DB_PATH")? { @@ -109,6 +138,22 @@ impl AppConfig { if let Some(value) = env_var("RPKI_RTR_TLS_CLIENT_CA_PATH")? { config.tls_client_ca_path = value; } + if let Some(value) = env_var("RPKI_RTR_SSH_HOST_KEY_PATH")? { + config.ssh_host_key_path = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_AUTHORIZED_KEYS_PATH")? { + config.ssh_authorized_keys_path = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_USERNAME")? { + config.ssh_username = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_SUBSYSTEM_NAME")? { + config.ssh_subsystem_name = value; + } + if let Some(value) = env_var("RPKI_RTR_SSH_PASSWORD")? { + let value = value.trim().to_string(); + config.ssh_password = if value.is_empty() { None } else { Some(value) }; + } if let Some(value) = env_var("RPKI_RTR_MAX_DELTA")? { let parsed: u8 = value .parse() @@ -135,20 +180,14 @@ impl AppConfig { source_refresh_interval_legacy.as_deref(), ) { (Some(new_value), Some(_)) => { - let secs = parse_positive_u64( - new_value, - "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS", - )?; + let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; config.source_refresh_interval = Duration::from_secs(secs); warn!( "both RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS and legacy RPKI_RTR_REFRESH_INTERVAL_SECS are set; using RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS" ); } (Some(new_value), None) => { - let secs = parse_positive_u64( - new_value, - "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS", - )?; + let secs = parse_positive_u64(new_value, "RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS")?; config.source_refresh_interval = Duration::from_secs(secs); } (None, Some(legacy_value)) => { @@ -271,7 +310,22 @@ fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result RunningRtrService { - if config.enable_tls { + if config.enable_tls && config.enable_ssh { + info!("starting TCP, TLS and SSH RTR servers"); + service.spawn_tcp_tls_and_ssh_from_pem_and_openssh( + config.tcp_addr, + config.tls_addr, + config.ssh_addr, + &config.tls_cert_path, + &config.tls_key_path, + &config.tls_client_ca_path, + &config.ssh_host_key_path, + &config.ssh_authorized_keys_path, + &config.ssh_username, + &config.ssh_subsystem_name, + config.ssh_password.as_deref(), + ) + } else if config.enable_tls { info!("starting TCP and TLS RTR servers"); service.spawn_tcp_and_tls_from_pem( config.tcp_addr, @@ -280,6 +334,17 @@ fn start_servers(config: &AppConfig, service: &RtrService) -> RunningRtrService &config.tls_key_path, &config.tls_client_ca_path, ) + } else if config.enable_ssh { + info!("starting TCP and SSH RTR servers"); + service.spawn_tcp_and_ssh_from_openssh( + config.tcp_addr, + config.ssh_addr, + &config.ssh_host_key_path, + &config.ssh_authorized_keys_path, + &config.ssh_username, + &config.ssh_subsystem_name, + config.ssh_password.as_deref(), + ) } else { info!("starting TCP RTR server"); service.spawn_tcp_only(config.tcp_addr) @@ -373,6 +438,7 @@ fn log_startup_config(config: &AppConfig) { info!("db_path={}", config.db_path); info!("tcp_addr={}", config.tcp_addr); info!("tls_enabled={}", config.enable_tls); + info!("ssh_enabled={}", config.enable_ssh); if config.enable_tls { info!("tls_addr={}", config.tls_addr); @@ -380,6 +446,17 @@ fn log_startup_config(config: &AppConfig) { info!("tls_key_path={}", config.tls_key_path); info!("tls_client_ca_path={}", config.tls_client_ca_path); } + if config.enable_ssh { + info!("ssh_addr={}", config.ssh_addr); + info!("ssh_host_key_path={}", config.ssh_host_key_path); + info!( + "ssh_authorized_keys_path={}", + config.ssh_authorized_keys_path + ); + info!("ssh_username={}", config.ssh_username); + info!("ssh_subsystem_name={}", config.ssh_subsystem_name); + info!("ssh_password_enabled={}", config.ssh_password.is_some()); + } info!("ccr_dir={}", config.ccr_dir); info!( @@ -419,11 +496,33 @@ fn log_startup_config(config: &AppConfig) { } fn init_tracing() { - let _ = tracing_subscriber::fmt() + let filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")); + + struct ShanghaiTimer; + + impl tracing_subscriber::fmt::time::FormatTime for ShanghaiTimer { + fn format_time( + &self, + w: &mut tracing_subscriber::fmt::format::Writer<'_>, + ) -> std::fmt::Result { + let shanghai_offset = FixedOffset::east_opt(8 * 60 * 60) + .expect("fixed +08:00 offset should always be valid"); + let now = Utc::now().with_timezone(&shanghai_offset); + write!(w, "{}", now.format("%Y-%m-%d %H:%M:%S%.3f %:z")) + } + } + + if let Err(err) = tracing_subscriber::fmt() + .with_timer(ShanghaiTimer) + .with_env_filter(filter) .with_target(true) .with_thread_ids(true) .with_level(true) - .try_init(); + .try_init() + { + eprintln!("failed to initialize tracing subscriber: {err}"); + } } fn env_var(name: &str) -> Result> { diff --git a/src/rtr/cache/store.rs b/src/rtr/cache/store.rs index d337f82..91f6561 100644 --- a/src/rtr/cache/store.rs +++ b/src/rtr/cache/store.rs @@ -48,7 +48,8 @@ impl RtrCache { project_snapshot_for_version(&source_snapshot, version as u8) }); let serials = [serial; VERSION_COUNT]; - let deltas = std::array::from_fn(|_| VecDeque::>::with_capacity(max_delta as usize)); + let deltas = + std::array::from_fn(|_| VecDeque::>::with_capacity(max_delta as usize)); tokio::spawn({ let store = store.clone(); @@ -103,7 +104,8 @@ fn try_restore_from_store( let mut snapshots = std::array::from_fn(|_| Snapshot::empty()); let mut session_ids = [0u16; VERSION_COUNT]; let mut serials = [0u32; VERSION_COUNT]; - let mut deltas = std::array::from_fn(|_| VecDeque::>::with_capacity(max_delta as usize)); + let mut deltas = + std::array::from_fn(|_| VecDeque::>::with_capacity(max_delta as usize)); for version in 0u8..=2 { let idx = version as usize; @@ -123,7 +125,8 @@ fn try_restore_from_store( } if let Some((min_serial, max_serial)) = store.get_delta_window_for_version(version)? { - let mut loaded = store.load_delta_window_for_version(version, min_serial, max_serial)?; + let mut loaded = + store.load_delta_window_for_version(version, min_serial, max_serial)?; let max_keep = usize::from(max_delta.max(1)); if loaded.len() > max_keep { let drop_count = loaded.len() - max_keep; diff --git a/src/rtr/server/listener.rs b/src/rtr/server/listener.rs index ed6fafa..b1f470d 100644 --- a/src/rtr/server/listener.rs +++ b/src/rtr/server/listener.rs @@ -1,23 +1,169 @@ +use std::collections::HashMap; +use std::future::Future; use std::net::SocketAddr; use std::path::Path; +use std::pin::Pin; use std::sync::{Arc, atomic::AtomicUsize}; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; +use russh::server::{self, Msg, Session}; +use russh::{Channel, ChannelId, Disconnect}; use socket2::{SockRef, TcpKeepalive}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Semaphore, broadcast, watch}; -use tracing::{info, warn}; +use tokio_rustls::TlsAcceptor; +use tracing::{debug, info, warn}; use rustls::ServerConfig; -use tokio_rustls::TlsAcceptor; use crate::rtr::cache::SharedRtrCache; use crate::rtr::server::config::RtrServiceConfig; use crate::rtr::server::connection::{ ConnectionGuard, handle_tcp_connection, handle_tls_connection, is_expected_disconnect, }; +use crate::rtr::server::ssh::RtrSshRuntimeConfig; use crate::rtr::server::tls::load_rustls_server_config_with_options; +use crate::rtr::session::RtrSession; + +type TransportFuture = Pin> + Send>>; + +pub trait TransportAcceptor: Clone + Send + Sync + 'static { + fn name(&self) -> &'static str; + + fn handle_connection( + &self, + cache: SharedRtrCache, + stream: TcpStream, + peer_addr: SocketAddr, + notify_tx: broadcast::Sender<()>, + shutdown_tx: watch::Sender, + ) -> TransportFuture; +} + +#[derive(Clone)] +struct TcpTransport; + +impl TransportAcceptor for TcpTransport { + fn name(&self) -> &'static str { + "TCP" + } + + fn handle_connection( + &self, + cache: SharedRtrCache, + stream: TcpStream, + peer_addr: SocketAddr, + notify_tx: broadcast::Sender<()>, + shutdown_tx: watch::Sender, + ) -> TransportFuture { + Box::pin(async move { + handle_tcp_connection( + cache, + stream, + peer_addr, + notify_tx.subscribe(), + shutdown_tx.subscribe(), + ) + .await + }) + } +} + +#[derive(Clone)] +struct TlsTransport { + acceptor: TlsAcceptor, +} + +impl TransportAcceptor for TlsTransport { + fn name(&self) -> &'static str { + "TLS" + } + + fn handle_connection( + &self, + cache: SharedRtrCache, + stream: TcpStream, + peer_addr: SocketAddr, + notify_tx: broadcast::Sender<()>, + shutdown_tx: watch::Sender, + ) -> TransportFuture { + let acceptor = self.acceptor.clone(); + Box::pin(async move { + handle_tls_connection( + cache, + stream, + peer_addr, + acceptor, + notify_tx.subscribe(), + shutdown_tx.subscribe(), + ) + .await + }) + } +} + +#[derive(Clone)] +struct SshTransport { + runtime: Arc, +} + +impl TransportAcceptor for SshTransport { + fn name(&self) -> &'static str { + "SSH" + } + + fn handle_connection( + &self, + cache: SharedRtrCache, + stream: TcpStream, + peer_addr: SocketAddr, + notify_tx: broadcast::Sender<()>, + shutdown_tx: watch::Sender, + ) -> TransportFuture { + let runtime = self.runtime.clone(); + Box::pin(async move { + let handler = RtrSshHandler::new( + cache, + notify_tx.subscribe(), + shutdown_tx.subscribe(), + peer_addr, + runtime.authorized_keys.clone(), + runtime.username.clone(), + runtime.subsystem_name.clone(), + runtime.password.clone(), + ); + + let running = server::run_stream(runtime.server_config.clone(), stream, handler) + .await + .with_context(|| format!("failed to start SSH session for {}", peer_addr))?; + + let handle = running.handle(); + let mut connection_shutdown_rx = shutdown_tx.subscribe(); + + tokio::select! { + session_res = running => { + session_res.map_err(|err| anyhow!(err)) + } + changed = connection_shutdown_rx.changed() => { + match changed { + Ok(()) if *connection_shutdown_rx.borrow() => { + let _ = handle + .disconnect( + Disconnect::ByApplication, + "service shutdown".to_string(), + "".to_string(), + ) + .await; + } + Ok(()) | Err(_) => {} + } + Ok(()) + } + } + }) + } +} pub struct RtrServer { bind_addr: SocketAddr, @@ -64,104 +210,7 @@ impl RtrServer { } pub async fn run_tcp(self) -> Result<()> { - let listener = TcpListener::bind(self.bind_addr) - .await - .with_context(|| format!("failed to bind TCP RTR server on {}", self.bind_addr))?; - - let mut shutdown_rx = self.shutdown_tx.subscribe(); - - info!("RTR TCP server listening on {}", self.bind_addr); - - loop { - tokio::select! { - changed = shutdown_rx.changed() => { - match changed { - Ok(()) => { - if *shutdown_rx.borrow() { - info!("RTR TCP listener {} shutting down", self.bind_addr); - return Ok(()); - } - } - Err(_) => { - info!("RTR TCP listener {} shutdown channel closed", self.bind_addr); - return Ok(()); - } - } - } - - accept_res = listener.accept() => { - let (stream, peer_addr) = match accept_res { - Ok(v) => v, - Err(err) => { - warn!("RTR TCP accept failed: {}", err); - continue; - } - }; - - if let Err(err) = apply_keepalive(&stream, self.config.tcp_keepalive) { - warn!("failed to configure TCP keepalive for {}: {}", peer_addr, err); - } - - let permit = match self.connection_limiter.clone().try_acquire_owned() { - Ok(permit) => permit, - Err(_) => { - warn!( - "RTR TCP connection rejected for {}: max connections reached ({})", - peer_addr, - self.config.max_connections - ); - drop(stream); - continue; - } - }; - - let cache = self.cache.clone(); - let notify_rx = self.notify_tx.subscribe(); - let shutdown_rx = self.shutdown_tx.subscribe(); - let active_connections = self.active_connections.clone(); - - info!( - "RTR TCP client connected: peer_addr={}, active_connections(before_spawn)={}", - peer_addr, - self.active_connections() - ); - - tokio::spawn(async move { - let guard = ConnectionGuard::new(active_connections, permit); - info!( - "RTR TCP connection established: peer_addr={}, active_connections={}", - peer_addr, - guard.active_count() - ); - if let Err(err) = - handle_tcp_connection(cache, stream, peer_addr, notify_rx, shutdown_rx).await - { - if is_expected_disconnect(&err) { - info!( - "RTR TCP session closed by peer: peer_addr={}, active_connections={}, err={}", - peer_addr, - guard.active_count(), - err - ); - } else { - warn!( - "RTR TCP session closed with error: peer_addr={}, active_connections={}, err={}", - peer_addr, - guard.active_count(), - err - ); - } - } else { - info!( - "RTR TCP session closed cleanly: peer_addr={}, active_connections={}", - peer_addr, - guard.active_count() - ); - } - }); - } - } - } + self.run_with_transport(TcpTransport).await } pub async fn run_tls_from_pem( @@ -180,14 +229,37 @@ impl RtrServer { } pub async fn run_tls(self, tls_config: Arc) -> Result<()> { - let listener = TcpListener::bind(self.bind_addr) - .await - .with_context(|| format!("failed to bind TLS RTR server on {}", self.bind_addr))?; + let transport = TlsTransport { + acceptor: TlsAcceptor::from(tls_config), + }; + self.run_with_transport(transport).await + } - let acceptor = TlsAcceptor::from(tls_config); + pub async fn run_ssh(self, runtime_config: Arc) -> Result<()> { + let transport = SshTransport { + runtime: runtime_config, + }; + self.run_with_transport(transport).await + } + + async fn run_with_transport(self, transport: T) -> Result<()> + where + T: TransportAcceptor, + { + let listener = TcpListener::bind(self.bind_addr).await.with_context(|| { + format!( + "failed to bind {} RTR server on {}", + transport.name(), + self.bind_addr + ) + })?; let mut shutdown_rx = self.shutdown_tx.subscribe(); - info!("RTR TLS server listening on {}", self.bind_addr); + info!( + "RTR {} server listening on {}", + transport.name(), + self.bind_addr + ); loop { tokio::select! { @@ -195,12 +267,20 @@ impl RtrServer { match changed { Ok(()) => { if *shutdown_rx.borrow() { - info!("RTR TLS listener {} shutting down", self.bind_addr); + info!( + "RTR {} listener {} shutting down", + transport.name(), + self.bind_addr + ); return Ok(()); } } Err(_) => { - info!("RTR TLS listener {} shutdown channel closed", self.bind_addr); + info!( + "RTR {} listener {} shutdown channel closed", + transport.name(), + self.bind_addr + ); return Ok(()); } } @@ -210,20 +290,26 @@ impl RtrServer { let (stream, peer_addr) = match accept_res { Ok(v) => v, Err(err) => { - warn!("RTR TLS accept failed: {}", err); + warn!("RTR {} accept failed: {}", transport.name(), err); continue; } }; if let Err(err) = apply_keepalive(&stream, self.config.tcp_keepalive) { - warn!("failed to configure TCP keepalive for {}: {}", peer_addr, err); + warn!( + "failed to configure TCP keepalive for {} peer {}: {}", + transport.name(), + peer_addr, + err + ); } let permit = match self.connection_limiter.clone().try_acquire_owned() { Ok(permit) => permit, Err(_) => { warn!( - "RTR TLS connection rejected for {}: max connections reached ({})", + "RTR {} connection rejected for {}: max connections reached ({})", + transport.name(), peer_addr, self.config.max_connections ); @@ -233,13 +319,14 @@ impl RtrServer { }; let cache = self.cache.clone(); - let acceptor = acceptor.clone(); - let notify_rx = self.notify_tx.subscribe(); - let shutdown_rx = self.shutdown_tx.subscribe(); + let notify_tx = self.notify_tx.clone(); + let shutdown_tx = self.shutdown_tx.clone(); let active_connections = self.active_connections.clone(); + let transport_instance = transport.clone(); - info!( - "RTR TLS client connected: peer_addr={}, active_connections(before_spawn)={}", + debug!( + "RTR {} client connected: peer_addr={}, active_connections(before_spawn)={}", + transport_instance.name(), peer_addr, self.active_connections() ); @@ -247,38 +334,41 @@ impl RtrServer { tokio::spawn(async move { let guard = ConnectionGuard::new(active_connections, permit); info!( - "RTR TLS connection established: peer_addr={}, active_connections={}", + "RTR {} connection established: peer_addr={}, active_connections={}", + transport_instance.name(), peer_addr, guard.active_count() ); - if let Err(err) = handle_tls_connection( - cache, - stream, - peer_addr, - acceptor, - notify_rx, - shutdown_rx, - ).await { + + if let Err(err) = transport_instance + .handle_connection(cache, stream, peer_addr, notify_tx, shutdown_tx) + .await + { + let active_after_close = guard.active_count().saturating_sub(1); if is_expected_disconnect(&err) { info!( - "RTR TLS session closed by peer: peer_addr={}, active_connections={}, err={}", + "RTR {} session closed by peer: peer_addr={}, active_connections={}, err={}", + transport_instance.name(), peer_addr, - guard.active_count(), + active_after_close, err ); } else { warn!( - "RTR TLS session closed with error: peer_addr={}, active_connections={}, err={}", + "RTR {} session closed with error: peer_addr={}, active_connections={}, err={}", + transport_instance.name(), peer_addr, - guard.active_count(), + active_after_close, err ); } } else { + let active_after_close = guard.active_count().saturating_sub(1); info!( - "RTR TLS session closed cleanly: peer_addr={}, active_connections={}", + "RTR {} session closed cleanly: peer_addr={}, active_connections={}", + transport_instance.name(), peer_addr, - guard.active_count() + active_after_close ); } }); @@ -288,6 +378,215 @@ impl RtrServer { } } +struct RtrSshHandler { + cache: SharedRtrCache, + notify_rx: broadcast::Receiver<()>, + shutdown_rx: watch::Receiver, + peer_addr: SocketAddr, + authorized_keys: Arc>, + username: Arc, + subsystem_name: Arc, + password: Option>, + channels: HashMap>, + subsystem_started: bool, +} + +impl RtrSshHandler { + fn new( + cache: SharedRtrCache, + notify_rx: broadcast::Receiver<()>, + shutdown_rx: watch::Receiver, + peer_addr: SocketAddr, + authorized_keys: Arc>, + username: Arc, + subsystem_name: Arc, + password: Option>, + ) -> Self { + Self { + cache, + notify_rx, + shutdown_rx, + peer_addr, + authorized_keys, + username, + subsystem_name, + password, + channels: HashMap::new(), + subsystem_started: false, + } + } + + fn is_authorized_key(&self, key: &russh::keys::ssh_key::PublicKey) -> bool { + self.authorized_keys + .iter() + .any(|allowed| allowed.key_data() == key.key_data()) + } + + fn is_expected_user(&self, user: &str) -> bool { + user == self.username.as_ref() + } +} + +impl server::Handler for RtrSshHandler { + type Error = anyhow::Error; + + async fn auth_none(&mut self, _user: &str) -> Result { + Ok(server::Auth::reject()) + } + + async fn auth_password( + &mut self, + user: &str, + password: &str, + ) -> Result { + let accepted = self.is_expected_user(user) + && self + .password + .as_deref() + .map(|expected| expected == password) + .unwrap_or(false); + if accepted { + info!( + "RTR SSH password auth accepted: peer_addr={}, user={}", + self.peer_addr, user + ); + Ok(server::Auth::Accept) + } else { + warn!( + "RTR SSH password auth rejected: peer_addr={}, user={}", + self.peer_addr, user + ); + Ok(server::Auth::reject()) + } + } + + async fn auth_publickey_offered( + &mut self, + user: &str, + public_key: &russh::keys::ssh_key::PublicKey, + ) -> Result { + if self.is_expected_user(user) && self.is_authorized_key(public_key) { + Ok(server::Auth::Accept) + } else { + Ok(server::Auth::reject()) + } + } + + async fn auth_publickey( + &mut self, + user: &str, + public_key: &russh::keys::ssh_key::PublicKey, + ) -> Result { + if self.is_expected_user(user) && self.is_authorized_key(public_key) { + info!( + "RTR SSH publickey auth accepted: peer_addr={}, user={}", + self.peer_addr, user + ); + Ok(server::Auth::Accept) + } else { + warn!( + "RTR SSH publickey auth rejected: peer_addr={}, user={}", + self.peer_addr, user + ); + Ok(server::Auth::reject()) + } + } + + async fn channel_open_session( + &mut self, + channel: Channel, + _session: &mut Session, + ) -> Result { + if self.subsystem_started { + return Ok(false); + } + self.channels.insert(channel.id(), channel); + Ok(true) + } + + async fn subsystem_request( + &mut self, + channel: ChannelId, + name: &str, + session: &mut Session, + ) -> Result<(), Self::Error> { + if name != self.subsystem_name.as_ref() { + let _ = session.channel_failure(channel); + warn!( + "RTR SSH subsystem rejected: peer_addr={}, requested={}, expected={}", + self.peer_addr, name, self.subsystem_name + ); + return Ok(()); + } + + let Some(channel) = self.channels.remove(&channel) else { + let _ = session.channel_failure(channel); + return Ok(()); + }; + + session.channel_success(channel.id())?; + self.subsystem_started = true; + + let cache = self.cache.clone(); + let notify_rx = self.notify_rx.resubscribe(); + let shutdown_rx = self.shutdown_rx.clone(); + let peer_addr = self.peer_addr; + let subsystem = self.subsystem_name.clone(); + + tokio::spawn(async move { + let stream = channel.into_stream(); + let session = RtrSession::new(cache, stream, notify_rx, shutdown_rx); + if let Err(err) = session.run().await { + warn!( + "RTR SSH subsystem session closed with error: peer_addr={}, subsystem={}, err={}", + peer_addr, subsystem, err + ); + } else { + info!( + "RTR SSH subsystem session completed: peer_addr={}, subsystem={}", + peer_addr, subsystem + ); + } + }); + + Ok(()) + } + + async fn shell_request( + &mut self, + channel: ChannelId, + session: &mut Session, + ) -> Result<(), Self::Error> { + let _ = session.channel_failure(channel); + Ok(()) + } + + async fn exec_request( + &mut self, + channel: ChannelId, + _data: &[u8], + session: &mut Session, + ) -> Result<(), Self::Error> { + let _ = session.channel_failure(channel); + Ok(()) + } + + async fn pty_request( + &mut self, + channel: ChannelId, + _term: &str, + _col_width: u32, + _row_height: u32, + _pix_width: u32, + _pix_height: u32, + _modes: &[(russh::Pty, u32)], + session: &mut Session, + ) -> Result<(), Self::Error> { + let _ = session.channel_failure(channel); + Ok(()) + } +} + fn apply_keepalive(stream: &tokio::net::TcpStream, keepalive: Option) -> Result<()> { let Some(keepalive) = keepalive else { return Ok(()); diff --git a/src/rtr/server/mod.rs b/src/rtr/server/mod.rs index 53ced47..0808ddb 100644 --- a/src/rtr/server/mod.rs +++ b/src/rtr/server/mod.rs @@ -3,6 +3,7 @@ pub mod connection; pub mod listener; pub mod notifier; pub mod service; +pub mod ssh; pub mod tls; pub use config::RtrServiceConfig; diff --git a/src/rtr/server/service.rs b/src/rtr/server/service.rs index b86a3a1..b4f248d 100644 --- a/src/rtr/server/service.rs +++ b/src/rtr/server/service.rs @@ -13,6 +13,7 @@ use crate::rtr::cache::SharedRtrCache; use crate::rtr::server::config::RtrServiceConfig; use crate::rtr::server::listener::RtrServer; use crate::rtr::server::notifier::RtrNotifier; +use crate::rtr::server::ssh::load_rtr_ssh_runtime_config; pub struct RtrService { cache: SharedRtrCache, @@ -86,6 +87,18 @@ impl RtrService { ) } + pub fn ssh_server(&self, bind_addr: SocketAddr) -> RtrServer { + RtrServer::new( + bind_addr, + self.cache.clone(), + self.notify_tx.clone(), + self.shutdown_tx.clone(), + self.connection_limiter.clone(), + self.active_connections.clone(), + self.config.clone(), + ) + } + pub fn spawn_tcp(&self, bind_addr: SocketAddr) -> JoinHandle<()> { if self.config.warn_insecure_tcp { warn!( @@ -141,6 +154,111 @@ impl RtrService { } } + #[allow(clippy::too_many_arguments)] + pub fn spawn_ssh_from_openssh( + &self, + bind_addr: SocketAddr, + host_key_path: impl AsRef, + authorized_keys_path: impl AsRef, + username: &str, + subsystem_name: &str, + password: Option<&str>, + ) -> JoinHandle<()> { + let host_key_path = host_key_path.as_ref().to_path_buf(); + let authorized_keys_path = authorized_keys_path.as_ref().to_path_buf(); + let username = username.to_string(); + let subsystem_name = subsystem_name.to_string(); + let password = password.map(ToString::to_string); + let inactivity_timeout = Some(std::time::Duration::from_secs(3600)); + let keepalive_interval = self.config.tcp_keepalive; + let server = self.ssh_server(bind_addr); + + tokio::spawn(async move { + let runtime_config = match load_rtr_ssh_runtime_config( + &host_key_path, + &authorized_keys_path, + &username, + &subsystem_name, + password.as_deref(), + inactivity_timeout, + keepalive_interval, + ) { + Ok(cfg) => Arc::new(cfg), + Err(err) => { + error!( + "RTR SSH server {} failed to load configuration: {:?}", + bind_addr, err + ); + return; + } + }; + + if let Err(err) = server.run_ssh(runtime_config).await { + error!("RTR SSH server {} exited with error: {:?}", bind_addr, err); + } + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn spawn_tcp_and_ssh_from_openssh( + &self, + tcp_bind_addr: SocketAddr, + ssh_bind_addr: SocketAddr, + host_key_path: impl AsRef, + authorized_keys_path: impl AsRef, + username: &str, + subsystem_name: &str, + password: Option<&str>, + ) -> RunningRtrService { + let tcp_handle = self.spawn_tcp(tcp_bind_addr); + let ssh_handle = self.spawn_ssh_from_openssh( + ssh_bind_addr, + host_key_path, + authorized_keys_path, + username, + subsystem_name, + password, + ); + + RunningRtrService { + shutdown_tx: self.shutdown_tx.clone(), + handles: vec![tcp_handle, ssh_handle], + } + } + + #[allow(clippy::too_many_arguments)] + pub fn spawn_tcp_tls_and_ssh_from_pem_and_openssh( + &self, + tcp_bind_addr: SocketAddr, + tls_bind_addr: SocketAddr, + ssh_bind_addr: SocketAddr, + cert_path: impl AsRef, + key_path: impl AsRef, + client_ca_path: impl AsRef, + host_key_path: impl AsRef, + authorized_keys_path: impl AsRef, + username: &str, + subsystem_name: &str, + password: Option<&str>, + ) -> RunningRtrService { + let tcp_handle = self.spawn_tcp(tcp_bind_addr); + let tls_handle = + self.spawn_tls_from_pem(tls_bind_addr, cert_path, key_path, client_ca_path); + let ssh_handle = self.spawn_ssh_from_openssh( + ssh_bind_addr, + host_key_path, + authorized_keys_path, + username, + subsystem_name, + password, + ); + + RunningRtrService { + shutdown_tx: self.shutdown_tx.clone(), + handles: vec![tcp_handle, tls_handle, ssh_handle], + } + } + pub fn spawn_tcp_only(&self, tcp_bind_addr: SocketAddr) -> RunningRtrService { let tcp_handle = self.spawn_tcp(tcp_bind_addr); diff --git a/src/rtr/server/ssh.rs b/src/rtr/server/ssh.rs new file mode 100644 index 0000000..feecfc0 --- /dev/null +++ b/src/rtr/server/ssh.rs @@ -0,0 +1,97 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow, bail}; +use russh::keys; +use russh::keys::PrivateKey; +use russh::keys::ssh_key::{self, AuthorizedKeys}; +use russh::server::Config as RusshServerConfig; +use russh::{MethodKind, MethodSet}; + +#[derive(Debug, Clone)] +pub struct RtrSshRuntimeConfig { + pub server_config: Arc, + pub authorized_keys: Arc>, + pub username: Arc, + pub subsystem_name: Arc, + pub password: Option>, +} + +pub fn load_rtr_ssh_runtime_config( + host_key_path: impl AsRef, + authorized_keys_path: impl AsRef, + username: &str, + subsystem_name: &str, + password: Option<&str>, + inactivity_timeout: Option, + keepalive_interval: Option, +) -> Result { + if username.trim().is_empty() { + bail!("SSH username must not be empty"); + } + if subsystem_name.trim().is_empty() { + bail!("SSH subsystem name must not be empty"); + } + + let host_key = load_host_key(host_key_path.as_ref())?; + let authorized_keys = load_authorized_keys(authorized_keys_path.as_ref())?; + let password = password.map(str::trim).filter(|value| !value.is_empty()); + + let mut methods = MethodSet::empty(); + methods.push(MethodKind::PublicKey); + if password.is_some() { + methods.push(MethodKind::Password); + } + + let server_config = RusshServerConfig { + methods, + keys: vec![host_key], + inactivity_timeout, + keepalive_interval, + keepalive_max: 3, + auth_rejection_time: Duration::from_secs(1), + auth_rejection_time_initial: Some(Duration::from_secs(0)), + ..Default::default() + }; + + Ok(RtrSshRuntimeConfig { + server_config: Arc::new(server_config), + authorized_keys: Arc::new(authorized_keys), + username: Arc::from(username.trim()), + subsystem_name: Arc::from(subsystem_name.trim()), + password: password.map(Arc::from), + }) +} + +fn load_host_key(path: &Path) -> Result { + keys::load_secret_key(path, None).with_context(|| { + format!( + "failed to load SSH host private key from {} (OpenSSH private key expected)", + path.display() + ) + }) +} + +fn load_authorized_keys(path: &Path) -> Result> { + let entries = AuthorizedKeys::read_file(path).with_context(|| { + format!( + "failed to read SSH authorized_keys file from {}", + path.display() + ) + })?; + + let mut keys = Vec::with_capacity(entries.len()); + for entry in entries { + keys.push(entry.public_key().clone()); + } + + if keys.is_empty() { + return Err(anyhow!( + "SSH authorized_keys file {} does not contain any usable keys", + path.display() + )); + } + + Ok(keys) +} diff --git a/src/rtr/session.rs b/src/rtr/session.rs index e578270..b30d796 100644 --- a/src/rtr/session.rs +++ b/src/rtr/session.rs @@ -102,19 +102,43 @@ where } } - header_res = timeout(transport_timeout, Header::read_raw(&mut self.stream)) => { + header_res = async { + // draft-ietf-sidrops-8210bis-25 Section 6 allows routers to wait up to + // Refresh Interval before polling again (recommended default: 3600s). + // In an established session, a long quiet period is therefore expected and + // must not be treated as a transport stall. + // + // We only enforce transport timeout before session establishment. Once the + // session is established, we wait indefinitely for the next query header. + if self.state == SessionState::Established { + Header::read_raw(&mut self.stream).await + } else { + timeout(transport_timeout, Header::read_raw(&mut self.stream)) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))? + } + } => { let raw_header = match header_res { - Ok(Ok(raw)) => raw, - Ok(Err(_)) => { + Ok(raw) => raw, + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => { info!("RTR session closed by peer before header read completed: {}", self.session_summary()); self.state = SessionState::Closed; return Ok(()); } - Err(_) => { + Err(err) if err.kind() == io::ErrorKind::TimedOut => { warn!("RTR session transport timeout while waiting for header: {}", self.session_summary()); self.handle_transport_timeout(&[]).await?; return Ok(()); } + Err(err) => { + warn!( + "RTR session failed to read header: err={}, {}", + err, + self.session_summary() + ); + self.state = SessionState::Closed; + return Ok(()); + } }; let header = match Header::from_raw(raw_header) { Ok(h) => h, @@ -546,7 +570,7 @@ where } async fn handle_reset_query(&mut self, offending_pdu: &[u8]) -> Result<()> { - info!( + debug!( "RTR session received Reset Query: negotiated_version={:?}, offending_pdu_len={}", self.version, offending_pdu.len() @@ -568,7 +592,7 @@ where if !data_available { self.send_no_data_available(offending_pdu, "cache data is not currently available") .await?; - info!( + debug!( "RTR session replied No Data Available to Reset Query: {}", self.session_summary() ); @@ -578,7 +602,7 @@ where self.write_cache_response(session_id).await?; self.send_payloads(&payloads, true).await?; self.write_end_of_data(session_id, serial).await?; - info!( + debug!( "RTR session completed Reset Query: response_session_id={}, response_serial={}, payload_count={}, {}", session_id, serial, @@ -596,7 +620,7 @@ where client_serial: u32, offending_pdu: &[u8], ) -> Result<()> { - info!( + debug!( "RTR session received Serial Query: negotiated_version={}, client_session_id={}, client_serial={}, offending_pdu_len={}", version, client_session, @@ -617,7 +641,7 @@ where if !data_available { self.send_no_data_available(offending_pdu, "cache data is not currently available") .await?; - info!( + debug!( "RTR session replied No Data Available to Serial Query: client_session_id={}, client_serial={}, {}", client_session, client_serial, @@ -650,7 +674,7 @@ where match serial_result { SerialResult::ResetRequired => { self.write_cache_reset().await?; - info!( + debug!( "RTR session replied Cache Reset to Serial Query: client_session_id={}, client_serial={}, {}", client_session, client_serial, @@ -668,13 +692,13 @@ where ( cache.session_id_for_version(version), cache.serial_for_version(version), - ) + ) }; self.write_cache_response(current_session).await?; self.write_end_of_data(current_session, current_serial) .await?; - info!( + debug!( "RTR session replied CacheResponse+EndOfData (up-to-date) to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", client_session, client_serial, @@ -701,7 +725,7 @@ where self.send_delta(&delta).await?; self.write_end_of_data(current_session, current_serial) .await?; - info!( + debug!( "RTR session replied delta to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", client_session, client_serial, @@ -782,7 +806,10 @@ where .cache .read() .ok() - .and_then(|cache| self.version.map(|version| cache.serial_for_version(version))) + .and_then(|cache| { + self.version + .map(|version| cache.serial_for_version(version)) + }) .map(|serial| serial.to_string()) .unwrap_or_else(|| "".to_string()); let session_id = self diff --git a/src/source/pipeline.rs b/src/source/pipeline.rs index ec1a5fd..a82ac41 100644 --- a/src/source/pipeline.rs +++ b/src/source/pipeline.rs @@ -102,8 +102,13 @@ fn read_slurm_files(slurm_dir: &str) -> Result> { for entry in std::fs::read_dir(slurm_dir) .map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", slurm_dir, err))? { - let entry = entry - .map_err(|err| anyhow!("failed to enumerate SLURM directory '{}': {}", slurm_dir, err))?; + let entry = entry.map_err(|err| { + anyhow!( + "failed to enumerate SLURM directory '{}': {}", + slurm_dir, + err + ) + })?; let path = entry.path(); if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("slurm") { paths.push(path); diff --git a/tests/test_rtr_debug_client_ssh_cli.rs b/tests/test_rtr_debug_client_ssh_cli.rs new file mode 100644 index 0000000..c68881c --- /dev/null +++ b/tests/test_rtr_debug_client_ssh_cli.rs @@ -0,0 +1,80 @@ +use std::process::Command; + +fn run_client(args: &[&str]) -> std::process::Output { + Command::new(env!("CARGO_BIN_EXE_rtr_debug_client")) + .args(args) + .output() + .expect("failed to run rtr_debug_client") +} + +#[test] +fn ssh_requires_user() { + let output = run_client(&[ + "--ssh", + "--ssh-key", + "tests/fixtures/ssh/client.key", + "--ssh-server-key", + "tests/fixtures/ssh/server.pub", + ]); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("SSH mode requires --ssh-user ")); +} + +#[test] +fn ssh_requires_key() { + let output = run_client(&[ + "--ssh", + "--ssh-user", + "rpki-rtr", + "--ssh-server-key", + "tests/fixtures/ssh/server.pub", + ]); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("SSH mode requires --ssh-key ")); +} + +#[test] +fn ssh_requires_host_key_verification() { + let output = run_client(&[ + "--ssh", + "--ssh-user", + "rpki-rtr", + "--ssh-key", + "tests/fixtures/ssh/client.key", + ]); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("SSH mode requires host key verification") + && stderr.contains("--ssh-known-hosts") + && stderr.contains("--ssh-server-key") + ); +} + +#[test] +fn ssh_rejects_multiple_host_key_verification_sources() { + let output = run_client(&[ + "--ssh", + "--ssh-user", + "rpki-rtr", + "--ssh-key", + "tests/fixtures/ssh/client.key", + "--ssh-known-hosts", + "tests/fixtures/ssh/known_hosts", + "--ssh-server-key", + "tests/fixtures/ssh/server.pub", + ]); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("must choose one")); +} + +#[test] +fn ssh_conflicts_with_tls() { + let output = run_client(&["--ssh", "--tls"]); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("--tls cannot be used together with --ssh")); +} diff --git a/tests/test_server_transports.rs b/tests/test_server_transports.rs new file mode 100644 index 0000000..174f064 --- /dev/null +++ b/tests/test_server_transports.rs @@ -0,0 +1,344 @@ +use std::fs; +use std::io::BufReader; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use rustls::{ClientConfig, RootCertStore}; +use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName}; +use tokio::io::AsyncBufReadExt; +use tokio::net::TcpStream; +use tokio::time::{Instant, sleep}; +use tokio_rustls::TlsConnector; + +use rpki::rtr::cache::{RtrCacheBuilder, SessionIds, SharedRtrCache}; +use rpki::rtr::payload::Timing; +use rpki::rtr::pdu::{CacheResponse, EndOfDataV1, ResetQuery}; +use rpki::rtr::server::RtrService; +use russh::client; +use russh::keys; +use russh::keys::ssh_key::LineEnding; + +fn fixture_path(name: &str) -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("fixtures") + .join("tls") + .join(name) +} + +fn load_pem_certs(path: &Path) -> Vec> { + let file = fs::File::open(path).expect("open cert file"); + let mut reader = BufReader::new(file); + rustls_pemfile::certs(&mut reader) + .collect::, _>>() + .expect("parse certs") +} + +fn load_pem_key(path: &Path) -> PrivateKeyDer<'static> { + let file = fs::File::open(path).expect("open key file"); + let mut reader = BufReader::new(file); + rustls_pemfile::private_key(&mut reader) + .expect("read private key") + .expect("missing private key") +} + +fn test_cache() -> SharedRtrCache { + Arc::new(RwLock::new( + RtrCacheBuilder::new() + .session_ids(SessionIds::from_array([42, 42, 42])) + .serials([100, 100, 100]) + .timing(Timing::new(600, 600, 7200)) + .build(), + )) +} + +fn reserve_local_addr() -> SocketAddr { + let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind temp listener"); + listener.local_addr().expect("local addr") +} + +async fn wait_for_port(addr: SocketAddr) { + let deadline = Instant::now() + Duration::from_secs(2); + loop { + if TcpStream::connect(addr).await.is_ok() { + return; + } + assert!( + Instant::now() < deadline, + "port {} did not open in time", + addr + ); + sleep(Duration::from_millis(20)).await; + } +} + +async fn connect_tls_client(addr: SocketAddr) -> tokio_rustls::client::TlsStream { + let mut roots = RootCertStore::empty(); + for cert in load_pem_certs(&fixture_path("client-ca.crt")) { + roots.add(cert).expect("add root cert"); + } + + let certs = load_pem_certs(&fixture_path("client-good.crt")); + let key = load_pem_key(&fixture_path("client-good.key")); + let cfg = ClientConfig::builder() + .with_root_certificates(roots) + .with_client_auth_cert(certs, key) + .expect("build tls client auth"); + let connector = TlsConnector::from(Arc::new(cfg)); + + let tcp = TcpStream::connect(addr).await.expect("connect tls tcp"); + connector + .connect(ServerName::IpAddress(addr.ip().into()), tcp) + .await + .expect("tls connect") +} + +struct TestSshClientHandler; + +impl client::Handler for TestSshClientHandler { + type Error = anyhow::Error; + + async fn check_server_key( + &mut self, + _server_public_key: &russh::keys::ssh_key::PublicKey, + ) -> Result { + Ok(true) + } +} + +#[tokio::test] +async fn unified_server_tcp_handles_reset_query() { + let service = RtrService::new(test_cache()); + let tcp_addr = reserve_local_addr(); + + let running = service.spawn_tcp_only(tcp_addr); + wait_for_port(tcp_addr).await; + + let mut client = TcpStream::connect(tcp_addr).await.expect("connect tcp"); + ResetQuery::new(1) + .write(&mut client) + .await + .expect("send reset"); + + let response = CacheResponse::read(&mut client) + .await + .expect("read cache response"); + assert_eq!(response.version(), 1); + assert_eq!(response.session_id(), 42); + + let eod = EndOfDataV1::read(&mut client).await.expect("read eod"); + assert_eq!(eod.version(), 1); + assert_eq!(eod.session_id(), 42); + assert_eq!(eod.serial_number(), 100); + + running.shutdown(); + running.wait().await; +} + +#[tokio::test] +async fn unified_server_tls_handles_reset_query() { + let service = RtrService::new(test_cache()); + let tcp_addr = reserve_local_addr(); + let tls_addr = reserve_local_addr(); + + let running = service.spawn_tcp_and_tls_from_pem( + tcp_addr, + tls_addr, + fixture_path("server.crt"), + fixture_path("server.key"), + fixture_path("client-ca.crt"), + ); + wait_for_port(tls_addr).await; + + let mut client = connect_tls_client(tls_addr).await; + ResetQuery::new(1) + .write(&mut client) + .await + .expect("send reset tls"); + + let response = CacheResponse::read(&mut client) + .await + .expect("read tls cache response"); + assert_eq!(response.version(), 1); + assert_eq!(response.session_id(), 42); + + let eod = EndOfDataV1::read(&mut client).await.expect("read tls eod"); + assert_eq!(eod.version(), 1); + assert_eq!(eod.session_id(), 42); + assert_eq!(eod.serial_number(), 100); + + running.shutdown(); + running.wait().await; +} + +#[tokio::test] +async fn unified_server_ssh_opens_listener_and_emits_banner() { + let service = RtrService::new(test_cache()); + let tcp_addr = reserve_local_addr(); + let ssh_addr = reserve_local_addr(); + + let tmp = tempfile::tempdir().expect("tempdir"); + let host_key_path = tmp.path().join("ssh_host_ed25519_key"); + let authorized_keys_path = tmp.path().join("authorized_keys"); + + let host_key = + keys::PrivateKey::random(&mut rand::rng(), keys::Algorithm::Ed25519).expect("gen host key"); + let host_key_pem = host_key + .to_openssh(LineEnding::LF) + .expect("encode host key"); + fs::write(&host_key_path, host_key_pem).expect("write host key"); + + let pubkey_line = host_key.public_key().to_openssh().expect("encode pubkey"); + fs::write(&authorized_keys_path, format!("{pubkey_line}\n")).expect("write authorized_keys"); + + let running = service.spawn_tcp_and_ssh_from_openssh( + tcp_addr, + ssh_addr, + &host_key_path, + &authorized_keys_path, + "rpki-rtr", + "rpki-rtr", + None, + ); + wait_for_port(ssh_addr).await; + + let stream = TcpStream::connect(ssh_addr).await.expect("connect ssh"); + let mut reader = tokio::io::BufReader::new(stream); + let mut banner = String::new(); + reader + .read_line(&mut banner) + .await + .expect("read ssh banner"); + assert!( + banner.starts_with("SSH-2.0-"), + "unexpected ssh banner: {}", + banner + ); + + running.shutdown(); + running.wait().await; +} + +#[tokio::test] +async fn unified_server_ssh_accepts_password_when_configured() { + let service = RtrService::new(test_cache()); + let tcp_addr = reserve_local_addr(); + let ssh_addr = reserve_local_addr(); + + let tmp = tempfile::tempdir().expect("tempdir"); + let host_key_path = tmp.path().join("ssh_host_ed25519_key"); + let authorized_keys_path = tmp.path().join("authorized_keys"); + + let host_key = + keys::PrivateKey::random(&mut rand::rng(), keys::Algorithm::Ed25519).expect("gen host key"); + let host_key_pem = host_key + .to_openssh(LineEnding::LF) + .expect("encode host key"); + fs::write(&host_key_path, host_key_pem).expect("write host key"); + + let pubkey_line = host_key.public_key().to_openssh().expect("encode pubkey"); + fs::write(&authorized_keys_path, format!("{pubkey_line}\n")).expect("write authorized_keys"); + + let running = service.spawn_tcp_and_ssh_from_openssh( + tcp_addr, + ssh_addr, + &host_key_path, + &authorized_keys_path, + "rpki-rtr", + "rpki-rtr", + Some("test-password"), + ); + wait_for_port(ssh_addr).await; + + let mut session = client::connect( + Arc::new(client::Config::default()), + ssh_addr, + TestSshClientHandler, + ) + .await + .expect("connect ssh client"); + let auth_result = session + .authenticate_password("rpki-rtr", "test-password") + .await + .expect("password auth result"); + assert!(auth_result.success(), "password auth should succeed"); + + let channel = session + .channel_open_session() + .await + .expect("open session channel"); + channel + .request_subsystem(true, "rpki-rtr") + .await + .expect("request subsystem"); + let mut stream = channel.into_stream(); + + ResetQuery::new(1) + .write(&mut stream) + .await + .expect("send reset over ssh subsystem"); + let response = CacheResponse::read(&mut stream) + .await + .expect("read cache response over ssh subsystem"); + assert_eq!(response.version(), 1); + assert_eq!(response.session_id(), 42); + let eod = EndOfDataV1::read(&mut stream) + .await + .expect("read eod over ssh subsystem"); + assert_eq!(eod.version(), 1); + assert_eq!(eod.session_id(), 42); + assert_eq!(eod.serial_number(), 100); + + running.shutdown(); + running.wait().await; +} + +#[tokio::test] +async fn unified_server_ssh_rejects_password_when_not_configured() { + let service = RtrService::new(test_cache()); + let tcp_addr = reserve_local_addr(); + let ssh_addr = reserve_local_addr(); + + let tmp = tempfile::tempdir().expect("tempdir"); + let host_key_path = tmp.path().join("ssh_host_ed25519_key"); + let authorized_keys_path = tmp.path().join("authorized_keys"); + + let host_key = + keys::PrivateKey::random(&mut rand::rng(), keys::Algorithm::Ed25519).expect("gen host key"); + let host_key_pem = host_key + .to_openssh(LineEnding::LF) + .expect("encode host key"); + fs::write(&host_key_path, host_key_pem).expect("write host key"); + + let pubkey_line = host_key.public_key().to_openssh().expect("encode pubkey"); + fs::write(&authorized_keys_path, format!("{pubkey_line}\n")).expect("write authorized_keys"); + + let running = service.spawn_tcp_and_ssh_from_openssh( + tcp_addr, + ssh_addr, + &host_key_path, + &authorized_keys_path, + "rpki-rtr", + "rpki-rtr", + None, + ); + wait_for_port(ssh_addr).await; + + let mut session = client::connect( + Arc::new(client::Config::default()), + ssh_addr, + TestSshClientHandler, + ) + .await + .expect("connect ssh client"); + let auth_result = session + .authenticate_password("rpki-rtr", "test-password") + .await + .expect("password auth result"); + assert!(!auth_result.success(), "password auth should be rejected"); + + running.shutdown(); + running.wait().await; +}