diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..6eef9bf
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,10 @@
+.git
+.gitignore
+.idea
+target
+tmp_slurm_output.json
+rtr-db
+tests
+specs
+scripts
+README.md
diff --git a/.gitignore b/.gitignore
index 6e61639..1d9b68b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
target/
Cargo.lock
-rtr-db/
\ No newline at end of file
+rtr-db/
+.idea/
diff --git a/Cargo.toml b/Cargo.toml
index 85d3734..f706324 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ rand = "0.10.0"
rocksdb = { version = "0.21.0", default-features = false }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
+base64 = "0.22"
anyhow = "1"
tracing = "0.1.44"
sha2 = "0.10"
diff --git a/data/20260324T000037Z-sng1.ccr b/data/20260324T000037Z-sng1.ccr
new file mode 100644
index 0000000..128af6d
Binary files /dev/null and b/data/20260324T000037Z-sng1.ccr differ
diff --git a/data/20260324T000138Z-zur1.ccr b/data/20260324T000138Z-zur1.ccr
new file mode 100644
index 0000000..3040cc1
Binary files /dev/null and b/data/20260324T000138Z-zur1.ccr differ
diff --git a/data/example.slurm b/data/example.slurm
new file mode 100644
index 0000000..0a712ee
--- /dev/null
+++ b/data/example.slurm
@@ -0,0 +1,23 @@
+{
+ "slurmVersion": 2,
+ "validationOutputFilters": {
+ "prefixFilters": [
+ {
+ "prefix": "24.0.0.0/8",
+ "comment": "Filter many VRPs in current CCR sample"
+ }
+ ],
+ "bgpsecFilters": [],
+ "aspaFilters": [
+ {
+ "customerAsn": 80,
+ "comment": "Filter one ASPA known to exist in current CCR sample"
+ }
+ ]
+ },
+ "locallyAddedAssertions": {
+ "prefixAssertions": [],
+ "bgpsecAssertions": [],
+ "aspaAssertions": []
+ }
+}
diff --git a/deploy/DEPLOYMENT.md b/deploy/DEPLOYMENT.md
new file mode 100644
index 0000000..a5bb9cb
--- /dev/null
+++ b/deploy/DEPLOYMENT.md
@@ -0,0 +1,40 @@
+# Deployment (Supervisor + Docker Compose)
+
+This project runs `src/main.rs` as a long-running server that:
+
+1. loads latest `.ccr` from a configured directory,
+2. applies optional SLURM filtering,
+3. starts RTR server.
+
+`supervisord` is used as PID 1 in container to keep the process managed and auto-restarted.
+
+## Files
+
+- `deploy/Dockerfile`
+- `deploy/supervisord.conf`
+- `deploy/docker-compose.yml`
+
+## Runtime Paths in Container
+
+- CCR directory: `/app/data`
+- RocksDB directory: `/app/rtr-db`
+- SLURM directory: `/app/slurm`
+- TLS cert directory (optional): `/app/certs`
+
+## Start
+
+```bash
+docker compose -f deploy/docker-compose.yml up -d --build
+```
+
+## Stop
+
+```bash
+docker compose -f deploy/docker-compose.yml down
+```
+
+## Logs
+
+```bash
+docker compose -f deploy/docker-compose.yml logs -f rpki-rtr
+```
diff --git a/deploy/Dockerfile b/deploy/Dockerfile
new file mode 100644
index 0000000..fd19891
--- /dev/null
+++ b/deploy/Dockerfile
@@ -0,0 +1,34 @@
+FROM rust:1.86-bookworm AS builder
+
+WORKDIR /build
+
+COPY Cargo.toml Cargo.lock ./
+COPY src ./src
+
+RUN cargo build --release --bin rpki
+
+FROM debian:bookworm-slim AS runtime
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends ca-certificates supervisor \
+ && rm -rf /var/lib/apt/lists/*
+
+WORKDIR /app
+
+COPY --from=builder /build/target/release/rpki /usr/local/bin/rpki
+COPY deploy/supervisord.conf /etc/supervisor/conf.d/rpki-rtr.conf
+
+RUN mkdir -p /app/data /app/rtr-db /app/certs /app/slurm /var/log/supervisor
+
+ENV RPKI_RTR_ENABLE_TLS=false \
+ RPKI_RTR_TCP_ADDR=0.0.0.0:323 \
+ RPKI_RTR_TLS_ADDR=0.0.0.0:324 \
+ RPKI_RTR_DB_PATH=/app/rtr-db \
+ RPKI_RTR_CCR_DIR=/app/data \
+ RPKI_RTR_SLURM_DIR=/app/slurm \
+ RPKI_RTR_REFRESH_INTERVAL_SECS=300 \
+ RPKI_RTR_STRICT_CCR_VALIDATION=false
+
+EXPOSE 323 324
+
+CMD ["supervisord", "-n", "-c", "/etc/supervisor/conf.d/rpki-rtr.conf"]
diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml
new file mode 100644
index 0000000..0dcf0e0
--- /dev/null
+++ b/deploy/docker-compose.yml
@@ -0,0 +1,28 @@
+version: "3.9"
+
+services:
+ rpki-rtr:
+ build:
+ context: ..
+ dockerfile: deploy/Dockerfile
+ image: rpki-rtr:latest
+ container_name: rpki-rtr
+ restart: unless-stopped
+ ports:
+ - "323:323"
+ - "324:324"
+ environment:
+ RPKI_RTR_ENABLE_TLS: "false"
+ RPKI_RTR_TCP_ADDR: "0.0.0.0:323"
+ RPKI_RTR_TLS_ADDR: "0.0.0.0:324"
+ RPKI_RTR_DB_PATH: "/app/rtr-db"
+ RPKI_RTR_CCR_DIR: "/app/data"
+ RPKI_RTR_SLURM_DIR: "/app/slurm"
+ RPKI_RTR_STRICT_CCR_VALIDATION: "false"
+ RPKI_RTR_REFRESH_INTERVAL_SECS: "300"
+ volumes:
+ - ../data:/app/data:ro
+ - ../rtr-db:/app/rtr-db
+ - ../data:/app/slurm:ro
+ # TLS mode example:
+ # - ../certs:/app/certs:ro
diff --git a/deploy/supervisord.conf b/deploy/supervisord.conf
new file mode 100644
index 0000000..f35bc5b
--- /dev/null
+++ b/deploy/supervisord.conf
@@ -0,0 +1,18 @@
+[supervisord]
+nodaemon=true
+logfile=/dev/null
+pidfile=/tmp/supervisord.pid
+
+[program:rpki-rtr]
+command=/usr/local/bin/rpki
+autostart=true
+autorestart=true
+startsecs=2
+startretries=3
+stopsignal=TERM
+stopasgroup=true
+killasgroup=true
+stdout_logfile=/dev/fd/1
+stdout_logfile_maxbytes=0
+stderr_logfile=/dev/fd/2
+stderr_logfile_maxbytes=0
diff --git a/specs/10_slurm.md b/specs/10_slurm.md
index 698ce2a..8a2edab 100644
--- a/specs/10_slurm.md
+++ b/specs/10_slurm.md
@@ -1,34 +1,23 @@
-# 10. SLURM(Simplified Local Internet Number Resource Management with the RPKI)
+# 10. SLURM(Simplified Local Internet Number Resource Management with the RPKI)
-## 10.1 对象定位
+## 10.1 目标与范围
-SLURM是一个JSON文件,允许 RPKI 依赖方在本地“覆盖/修正/忽略”来自上游RPKI数据的内容,而不需要修改或伪造原始RPKI对象。
+SLURM 用于让 RP(Relying Party)在本地对上游 RPKI 验证结果做“过滤”和“补充断言”,而不修改上游发布对象。
-## 10.2 数据格式 (RFC 8416 §3)
+本文档基于:
+- RFC 8416(SLURM v1,ROA/BGPsec)
+- draft-ietf-sidrops-aspa-slurm-04(SLURM v2,新增 ASPA)
-### SLURM
+## 10.2 版本与顶层结构
-SLURM是一个只包含一个JSON对象的文件。格式要求如下(RFC 8416 §3.2):
+### 10.2.1 SLURM v1(RFC 8416)
-```text
-A SLURM file consists of a single JSON object containing the
-following members:
- o A "slurmVersion" member that MUST be set to 1, encoded as a number
- o A "validationOutputFilters" member (Section 3.3), whose value is
- an object. The object MUST contain exactly two members:
- * A "prefixFilters" member, whose value is described in
- Section 3.3.1.
- * A "bgpsecFilters" member, whose value is described in
- Section 3.3.2.
- o A "locallyAddedAssertions" member (Section 3.4), whose value is an
- object. The object MUST contain exactly two members:
- * A "prefixAssertions" member, whose value is described in
- Section 3.4.1.
- * A "bgpsecAssertions" member, whose value is described in
- Section 3.4.2.
-```
+`slurmVersion` 必须为 `1`,且顶层 JSON 对象必须包含且仅包含以下成员:
+- `slurmVersion`
+- `validationOutputFilters`(必须包含 `prefixFilters`、`bgpsecFilters`)
+- `locallyAddedAssertions`(必须包含 `prefixAssertions`、`bgpsecAssertions`)
-一个空的SLURM json结构体如下:
+空策略示例:
```json
{
@@ -44,193 +33,154 @@ following members:
}
```
-### prefixFilters
-其中`prefixFilters`格式要求如下(RFC 8416 §3.3.1):
+### 10.2.2 SLURM v2(draft-04)
+
+`slurmVersion` 必须为 `2`,在 v1 基础上扩展 ASPA 两类成员:
+- `validationOutputFilters.aspaFilters`
+- `locallyAddedAssertions.aspaAssertions`
+
+空策略示例:
-```text
-The above is expressed as a value of the "prefixFilters" member, as
-an array of zero or more objects. Each object MUST contain either 1)
-one of the following members or 2) one of each of the following
-members.
- o A "prefix" member, whose value is a string representing either an
- IPv4 prefix (see Section 3.1 of [RFC4632]) or an IPv6 prefix (see
- [RFC5952]).
- o An "asn" member, whose value is a number.
- In addition, each object MAY contain one optional "comment" member,
- whose value is a string.
-```
-示例:
```json
-"prefixFilters": [
- {
- "prefix": "192.0.2.0/24",
- "comment": "All VRPs encompassed by prefix"
- },
- {
- "asn": 64496,
- "comment": "All VRPs matching ASN"
- },
- {
- "prefix": "198.51.100.0/24",
- "asn": 64497,
- "comment": "All VRPs encompassed by prefix, matching ASN"
+{
+ "slurmVersion": 2,
+ "validationOutputFilters": {
+ "prefixFilters": [],
+ "bgpsecFilters": [],
+ "aspaFilters": []
+ },
+ "locallyAddedAssertions": {
+ "prefixAssertions": [],
+ "bgpsecAssertions": [],
+ "aspaAssertions": []
}
-]
+}
```
-### bgpsecFilters
-`bgpsecFilters`格式要求如下(RFC 8416 §3.3.2)
+## 10.3 字段规范(RFC 8416)
-```text
-The above is expressed as a value of the "bgpsecFilters" member, as
-an array of zero or more objects. Each object MUST contain one of
-either, or one each of both following members:
- o An "asn" member, whose value is a number
- o An "SKI" member, whose value is the Base64 encoding without
- trailing ’=’ (Section 5 of [RFC4648]) of the certificate’s Subject
- Key Identifier as described in Section 4.8.2 of [RFC6487]. (This
- is the value of the ASN.1 OCTET STRING without the ASN.1 tag or
- length fields.)
-In addition, each object MAY contain one optional "comment" member,
-whose value is a string.
-```
+### 10.3.1 `prefixFilters`
+
+数组元素每项:
+- 必须至少包含一个:`prefix` 或 `asn`
+- 可选:`comment`
+
+匹配规则:
+- 若配置了 `prefix`:匹配“被该前缀覆盖(encompassed)”的 VRP 前缀
+- 若配置了 `asn`:匹配该 ASN
+- 同时配置时:两者都要匹配
+
+### 10.3.2 `bgpsecFilters`
+
+数组元素每项:
+- 必须至少包含一个:`asn` 或 `SKI`
+- 可选:`comment`
+
+匹配规则:
+- 按 `asn`/`SKI` 单独或联合匹配 Router Key(BGPsec)
+
+### 10.3.3 `prefixAssertions`
+
+数组元素每项:
+- 必须:`prefix`、`asn`
+- 可选:`maxPrefixLength`、`comment`
+
+约束:
+- 若给出 `maxPrefixLength`,应满足 `prefix 长度 <= maxPrefixLength <= 地址位宽(IPv4=32, IPv6=128)`
+
+### 10.3.4 `bgpsecAssertions`
+
+数组元素每项:
+- 必须:`asn`、`SKI`、`routerPublicKey`
+- 可选:`comment`
+
+## 10.4 ASPA 扩展(draft-ietf-sidrops-aspa-slurm-04)
+
+### 10.4.1 `aspaFilters`
+
+数组元素每项:
+- 必须:`customerAsn`
+- 可选:`comment`
+
+匹配规则:
+- 当 VAP(Validated ASPA Payload)的 `customerAsn` 等于过滤器 `customerAsn` 时命中并移除。
+
+### 10.4.2 `aspaAssertions`
+
+数组元素每项:
+- 必须:`customerAsn`
+- 必须:`providerAsns`(ASN 数组)
+- 可选:`comment`
+
+关键约束(draft-04):
+- `customerAsn` 不得出现在 `providerAsns` 中
+- `providerAsns` 必须按升序排列
+- `providerAsns` 里的 ASN 必须唯一(无重复)
+
+语义补充(draft-04):
+- `aspaAssertions` 仅用于“新增断言”,不构成隐式过滤(不会自动替代 `aspaFilters`)。
+- 在 RTRv2 输出阶段,新增的 ASPA 断言应加入 ASPA PDU 集合,并做去重。
+
+## 10.5 应用语义(RFC 8416 Section 4)
+
+### 10.5.1 原子性
+
+SLURM 应用必须是原子的:
+- 要么完全不生效(等同未使用 SLURM)
+- 要么完整按当前 SLURM 配置生效
+
+### 10.5.2 处理顺序
+
+在同一次计算中:
+1. 先执行 `validationOutputFilters`(移除匹配验证结果)
+2. 再追加 `locallyAddedAssertions`
+
+### 10.5.3 多文件
+
+实现可以支持多个 SLURM 文件并行使用(取并集),但在启用前应检查断言重叠冲突;若存在冲突,整组文件应被拒绝。
+
+## 10.6 最小可用示例(SLURM v2)
-示例:
```json
-"bgpsecFilters": [
- {
- "asn": 64496,
- "comment": "All keys for ASN"
- },
- {
- "SKI": "",
- "comment": "Key matching Router SKI"
- },
- {
- "asn": 64497,
- "SKI": "",
- "comment": "Key for ASN 64497 matching Router SKI"
- }
-]
+{
+ "slurmVersion": 2,
+ "validationOutputFilters": {
+ "prefixFilters": [
+ {
+ "prefix": "203.0.113.0/24",
+ "comment": "Filter a broken VRP from upstream"
+ }
+ ],
+ "bgpsecFilters": [],
+ "aspaFilters": [
+ {
+ "customerAsn": 64496,
+ "comment": "Filter one customer ASPA"
+ }
+ ]
+ },
+ "locallyAddedAssertions": {
+ "prefixAssertions": [
+ {
+ "asn": 64496,
+ "prefix": "203.0.113.0/24",
+ "maxPrefixLength": 24,
+ "comment": "Local business exception"
+ }
+ ],
+ "bgpsecAssertions": [],
+ "aspaAssertions": [
+ {
+ "customerAsn": 64496,
+ "providerAsns": [64497, 64498],
+ "comment": "Local ASPA assertion"
+ }
+ ]
+ }
+}
```
-### prefixAssertions
-`prefixAssertions`格式要求如下(RFC 8416 §3.4.1)
-```text
-The above is expressed as a value of the "prefixAssertions" member,
-as an array of zero or more objects. Each object MUST contain one of
-each of the following members:
- o A "prefix" member, whose value is a string representing either an
- IPv4 prefix (see Section 3.1 of [RFC4632]) or an IPv6 prefix (see
- [RFC5952]).
- o An "asn" member, whose value is a number.
-In addition, each object MAY contain one of each of the following
-members:
- o A "maxPrefixLength" member, whose value is a number.
- o A "comment" member, whose value is a string.
-```
-
-示例:
-```json
-"prefixAssertions": [
- {
- "asn": 64496,
- "prefix": "198.51.100.0/24",
- "comment": "My other important route"
- },
- {
- "asn": 64496,
- "prefix": "2001:DB8::/32",
- "maxPrefixLength": 48,
- "comment": "My other important de-aggregated routes"
- }
-]
-```
-
-### bgpsecAssertions
-`bgpsecAssertions`格式要求如下(RFC 8416 §3.4.2)
-```text
-The above is expressed as a value of the "bgpsecAssertions" member,
-as an array of zero or more objects. Each object MUST contain one
-each of all of the following members:
- o An "asn" member, whose value is a number.
- o An "SKI" member, whose value is the Base64 encoding without
- trailing ’=’ (Section 5 of [RFC4648]) of the certificate’s Subject
- Key Identifier as described in Section 4.8.2 of [RFC6487] (This is
- the value of the ASN.1 OCTET STRING without the ASN.1 tag or
- length fields.)
- o A "routerPublicKey" member, whose value is the Base64 encoding
- without trailing ’=’ (Section 5 of [RFC4648]) of the equivalent to
- the subjectPublicKeyInfo value of the router certificate’s public
- key, as described in [RFC8208]. This is the full ASN.1 DER
- encoding of the subjectPublicKeyInfo, including the ASN.1 tag and
- length values of the subjectPublicKeyInfo SEQUENCE.
-```
-示例:
-```json
-"bgpsecAssertions": [
- {
- "asn": 64496,
- "SKI": "",
- "routerPublicKey": "",
- "comment": "My known key for my important ASN"
- }
-]
-```
-
-## 10.3 抽象数据结构
-
-### SLURM
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|---------------------------|------------------------|---------|---------|---------------|
-| slurm_version | number | SLURM版本 | 版本必须为1 | RFC 8416 §3.2 |
-| validation_output_filters | ValidationOutputFilter | 过滤条件 | | |
-| locally_added_assertions | LocallyAddedAssertions | 本地添加断言 | | |
-
-### ValidationOutputFilter
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|----------------|-------------------|-----------|---------|---------------|
-| prefix_filters | Vec | 前缀过滤 | 可以为空数组 | RFC 8416 §3.3 |
-| bgpsec_filters | Vec | BGPsec过滤 | 可以为空数组 | RFC 8416 §3.3 |
-
-### LocallyAddedAssertions
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|-------------------|----------------------|-----------|---------|---------------|
-| prefix_assertions | Vec | 前缀断言 | 可以为空数组 | RFC 8416 §3.4 |
-| bgpsec_assertions | Vec | BGPsec断言 | 可以为空数组 | RFC 8416 §3.4 |
-
-### PrefixFilter
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|---------|--------|------|--------------------------------|-----------------|
-| prefix | string | 前缀 | IPv4前缀或IPv6前缀,prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
-| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
-| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.3.1 |
-
-### BgpsecFilter
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|---------|--------|------|------------------|------------------|
-| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.3.1 |
-| ski | u8 | | 证书的SKI | RFC 8416 §3.3.1 |
-| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.3.1 |
-
-### PrefixAssertion
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|-------------------|--------|--------|---------------|-----------------|
-| prefix | string | 前缀 | IPv4前缀或IPv6前缀 | RFC 8416 §3.4.1 |
-| asn | number | ASN | | RFC 8416 §3.4.1 |
-| max_prefix_length | number | 最大前缀长度 | 可选字段 | RFC 8416 §3.4.1 |
-| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.4.1 |
-
-
-### BgpsecAssertion
-| 字段 | 类型 | 语义 | 约束/解析规则 | RFC 引用 |
-|-------------------|--------|--------|------------------|-----------------|
-| asn | number | ASN | prefix和asn至少存在一个 | RFC 8416 §3.4.2 |
-| ski | u8 | | 证书的SKI | RFC 8416 §3.4.2 |
-| router_public_key | u8 | 证书的SKI | | RFC 8416 §3.4.2 |
-| comment | string | 备注说明 | 可选字段 | RFC 8416 §3.4.2 |
-
-> 注:BGPsec部分可以在第一版考虑先留空
-
-## 10.4 规则
+## 10.7 参考文献
+- RFC 8416: https://www.rfc-editor.org/rfc/rfc8416.html
+- draft-ietf-sidrops-aspa-slurm-04: https://www.ietf.org/archive/id/draft-ietf-sidrops-aspa-slurm-04.html
diff --git a/specs/11_rtr.md b/specs/11_rtr.md
index 8a6051a..9bb3eff 100644
--- a/specs/11_rtr.md
+++ b/specs/11_rtr.md
@@ -1,65 +1,158 @@
-# 11. RTR (The Resource Public Key Infrastructure (RPKI) to Router Protocol)
+# 11. RTR(RPKI to Router Protocol)
-## 11.1 Cache Server
+## 11.1 目标与文档范围
-### 11.1.1 功能需求
+RTR 用于把 RP/Cache 已完成密码学验证的 RPKI 数据下发给路由器。
-- 支持Full Sync(Reset Query)
-- 支持Incremental Sync(Serial Query)
-- 支持多客户端并发
-- 支持Serial递增
-- 保留一定数量的delta
-- 支持原子更新
+本文按以下规范整理:
+- RFC 6810(RTR v0)
+- RFC 8210(RTR v1,更新 RFC 6810)
+- draft-ietf-sidrops-8210bis-25(RTR v2,草案)
-### 11.1.2 架构设计
-采用一级缓存+二级缓存并存的方式。
+## 11.2 协议演进
-
+### 11.2.1 RFC 6810(v0)
-其中,一级缓存为运行时缓存,主要职责:
-- 存储当前完整的snapshot
-- 历史Delta队列管理
-- Serial管理
-- RTR查询响应
+- 只定义 Prefix Origin 相关 payload(IPv4/IPv6 Prefix PDU)。
+- 主要 PDU:Serial Notify / Serial Query / Reset Query / Cache Response / Prefix / End of Data / Cache Reset / Error Report。
-二级缓存为持久化缓存,主要职责:
-- snapshot持久化
-- 缓存重启后的快速恢复(snapshot和serial)
-- 不参与实时查询
-- 异步写入
+### 11.2.2 RFC 8210(v1)
-### 11.1.3 核心数据结构设计
+在 v0 基础上新增/强化:
+- 新增 `Router Key PDU`(PDU Type 9,v1 可用,v0 保留)。
+- 强化协议版本协商与降级行为。
+- `End of Data` 在 v1 中携带 `Refresh/Retry/Expire` 三个计时参数。
-#### 11.1.3.1 总cache
-```rust
-struct RtrCache {
- serial: AtomicU32,
- snapshot: ArcSwap,
- deltas: RwLock>>,
- max_delta: usize,
-}
-```
+### 11.2.3 Version 2(草案)
-#### 11.1.3.2 Snapshot
-```rust
-struct Snapshot {
- origins: Vec,
- router_keys: Vec,
- aspas: Vec,
- created_at: Instant,
-}
-```
+在 v1 基础上新增/强化:
+- 新增 `ASPA PDU`(PDU Type 11,仅 v2)。
+- 新增 “Races, Ordering, and Transactions” 章节,要求缓存按规定顺序输出 payload 以降低路由器短暂误判。
+- 协议版本提升到 `2`。
+- 明确 PDU 最大长度上限为 64k(65535)。
-#### 11.1.3.3 Delta
-```rust
-struct Delta {
- serial: u32,
- announced: Vec,
- withdrawn: Vec,
-}
-```
+## 11.3 PDU 与版本矩阵
+PDU 类型(按规范注册表):
-## 11.2 Transport
+| PDU Type | 名称 | v0 (RFC6810) | v1 (RFC8210) | v2 (8210bis-25) |
+|---|---|---|---|---|
+| 0 | Serial Notify | 支持 | 支持 | 支持 |
+| 1 | Serial Query | 支持 | 支持 | 支持 |
+| 2 | Reset Query | 支持 | 支持 | 支持 |
+| 3 | Cache Response | 支持 | 支持 | 支持 |
+| 4 | IPv4 Prefix | 支持 | 支持 | 支持 |
+| 6 | IPv6 Prefix | 支持 | 支持 | 支持 |
+| 7 | End of Data | 支持 | 支持(含计时参数) | 支持 |
+| 8 | Cache Reset | 支持 | 支持 | 支持 |
+| 9 | Router Key | 保留 | 支持 | 支持 |
+| 10 | Error Report | 支持 | 支持 | 支持 |
+| 11 | ASPA | 保留 | 保留 | 支持 |
-初版实现RTR over TLS(可外网)和RTR over TCP(内网)两种方式。
\ No newline at end of file
+通用字段约束:
+- `Protocol Version`:8-bit。
+- `PDU Type`:8-bit。
+- `Session ID`:16-bit。
+- `Length`:32-bit。
+- 保留位(zero/reserved)发送必须为 0,接收时按规范处理。
+
+## 11.4 关键 PDU 语义
+
+### 11.4.1 Serial Notify(Type 0)
+
+- 由 Cache 主动发送,提示有新序列可拉取。
+- 是少数可不由 Router 请求触发的消息。
+
+### 11.4.2 Reset Query(Type 2)与 Cache Response(Type 3)
+
+- Router 启动或失配时发 `Reset Query` 请求全量。
+- Cache 回复 `Cache Response`,随后发送全量 payload,最后 `End of Data`。
+
+### 11.4.3 Serial Query(Type 1)
+
+- Router 持有上次 `Session ID + Serial` 时请求增量。
+- Cache 若可提供增量:返回变化集。
+- Cache 若无法从该 serial 补增量:返回 `Cache Reset`,要求 Router 走全量。
+
+### 11.4.4 Prefix / Router Key / ASPA payload
+
+- `IPv4 Prefix`(Type 4)/ `IPv6 Prefix`(Type 6):表示 VRP 的 announce/withdraw。
+- `Router Key`(Type 9,v1+):表示 BGPsec Router Key 的 announce/withdraw。
+- `ASPA`(Type 11,v2 草案):表示 ASPA 数据单元的 announce/withdraw。
+
+语义要点(v1 / v2 草案):
+- 对同一 payload 键(如 Prefix 四元组、Router Key 三元组、ASPA customer 键)应维护清晰的替换/撤销关系。
+- Cache 负责把历史变化“合并简化”后再发给 Router,避免无意义抖动。
+
+### 11.4.5 End of Data(Type 7)
+
+- 标识一次响应结束,并给出当前 serial。
+- v0:不含定时器字段。
+- v1/v2:携带 `Refresh Interval`、`Retry Interval`、`Expire Interval`。
+
+## 11.5 协议时序
+
+### 11.5.1 初始同步(Full Sync)
+
+1. Router 建连后发 `Reset Query`(带支持的协议版本)。
+2. Cache 回 `Cache Response`。
+3. Cache 按规范发送 payload 集合。
+4. Cache 发 `End of Data` 收尾。
+
+### 11.5.2 增量同步(Incremental Sync)
+
+1. Router 发 `Serial Query(session_id, serial)`。
+2. Cache 若可增量,返回变化并以 `End of Data` 收尾。
+3. 若不可增量,返回 `Cache Reset`;Router 退回 Full Sync。
+
+## 11.6 版本协商与降级
+
+- Router 每次新连接必须由 `Reset Query` 或 `Serial Query` 启动,携带其协议版本。
+- 双方在协商完成后,本连接内版本固定。
+- 遇到不支持版本时,可按规范降级(例如 v1 对 v0、v2 对 v1/v0)或返回 `Unsupported Protocol Version` 后断开。
+- 协商期若收到 `Serial Notify`,Router 应按规范兼容处理(通常忽略,待协商完成)。
+
+## 11.7 计时器与失效(v1/v2)
+
+`End of Data` 下发三个参数:
+- `Refresh Interval`:多久后主动刷新。
+- `Retry Interval`:失败后重试间隔。
+- `Expire Interval`:本地数据最长可保留时长。
+
+规范边界(RFC 8210):
+- Refresh: 1 .. 86400(推荐 3600)
+- Retry: 1 .. 7200(推荐 600)
+- Expire: 600 .. 172800(推荐 7200)
+- 且 `Expire` 必须大于 `Refresh` 和 `Retry`。
+
+## 11.8 Version 2(草案)新增关注点
+
+### 11.8.1 ASPA PDU
+
+- 新增 ASPA 传输能力(Type 11)。
+- 针对同一 customer ASN,Cache 需向 Router 提供一致且可替换的 ASPA 视图。
+
+### 11.8.2 排序与事务
+
+- 草案新增 race 条件说明(如前缀替换、撤销先后导致短暂误判)。
+- 对 Cache 输出 payload 的顺序提出约束。
+- 建议 Router 使用“事务式应用”(例如接收到完整响应后再切换生效)降低中间态影响。
+
+## 11.9 传输与安全
+
+规范定义可承载于多种传输:
+- SSH
+- TLS
+- TCP MD5
+- TCP-AO
+
+安全原则:
+- Router 与 Cache 之间必须建立可信关系。
+- 需要完整性/机密性时优先使用具备认证与加密能力的传输。
+- 若使用普通 TCP,部署上应限制在可信受控网络中。
+
+## 11.10 参考文献
+
+- RFC 6810: https://www.rfc-editor.org/rfc/rfc6810.html
+- RFC 8210: https://www.rfc-editor.org/rfc/rfc8210.html
+- draft-ietf-sidrops-8210bis-25: https://www.ietf.org/archive/id/draft-ietf-sidrops-8210bis-25.html
diff --git a/src/bin/rtr_debug_client/main.rs b/src/bin/rtr_debug_client/main.rs
index d7f03ce..dffc315 100644
--- a/src/bin/rtr_debug_client/main.rs
+++ b/src/bin/rtr_debug_client/main.rs
@@ -7,18 +7,16 @@ use rustls::{ClientConfig as RustlsClientConfig, RootCertStore};
use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName};
use tokio::io::{self as tokio_io, AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader, WriteHalf};
use tokio::net::TcpStream;
-use tokio::time::{timeout, Duration, Instant};
+use tokio::time::{Duration, Instant, timeout};
use tokio_rustls::TlsConnector;
-mod wire;
mod pretty;
mod protocol;
+mod wire;
-use crate::wire::{read_pdu, send_reset_query, send_serial_query};
-use crate::pretty::{
- parse_end_of_data_info, parse_serial_notify_serial, print_pdu, print_raw_pdu,
-};
+use crate::pretty::{parse_end_of_data_info, parse_serial_notify_serial, print_pdu, print_raw_pdu};
use crate::protocol::{PduHeader, PduType, QueryMode};
+use crate::wire::{read_pdu, send_reset_query, send_serial_query};
const DEFAULT_READ_TIMEOUT_SECS: u64 = 30;
const DEFAULT_POLL_INTERVAL_SECS: u64 = 600;
@@ -38,7 +36,10 @@ async fn main() -> io::Result<()> {
println!("transport: {}", config.transport.describe());
println!("version : {}", config.version);
println!("timeout : {}s", config.read_timeout_secs);
- println!("poll : {}s (default before EndOfData refresh is known)", config.default_poll_secs);
+ println!(
+ "poll : {}s (default before EndOfData refresh is known)",
+ config.default_poll_secs
+ );
println!("keep-after-error: {}", config.keep_after_error);
match &config.mode {
QueryMode::Reset => {
@@ -72,11 +73,7 @@ async fn main() -> io::Result<()> {
}
Err(err) => {
let delay = state.reconnect_delay_secs();
- eprintln!(
- "connect failed: {}. retry after {}s",
- err,
- delay
- );
+ eprintln!("connect failed: {}. retry after {}s", err, delay);
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
@@ -171,10 +168,7 @@ async fn main() -> io::Result<()> {
if reconnect {
let delay = state.reconnect_delay_secs();
state.current_session_id = None;
- println!(
- "[reconnect] transport disconnected, retry after {}s",
- delay
- );
+ println!("[reconnect] transport disconnected, retry after {}s", delay);
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
@@ -189,8 +183,7 @@ async fn send_resume_query(
(Some(session_id), Some(serial)) => {
println!(
"reconnected, send Serial Query with session_id={}, serial={}",
- session_id,
- serial
+ session_id, serial
);
send_serial_query(writer, state.version, session_id, serial).await?;
}
@@ -294,26 +287,20 @@ async fn handle_incoming_pdu(
println!();
println!(
"[notify] received Serial Notify: session_id={}, notify_serial={:?}",
- notify_session_id,
- notify_serial
+ notify_session_id, notify_serial
);
match (state.session_id, state.serial, notify_serial) {
(Some(current_session_id), Some(current_serial), Some(_new_serial))
- if current_session_id == notify_session_id =>
- {
- println!(
- "received Serial Notify for current session {}, send Serial Query with serial {}",
- current_session_id, current_serial
- );
- send_serial_query(
- writer,
- state.version,
- current_session_id,
- current_serial,
- )
- .await?;
- }
+ if current_session_id == notify_session_id =>
+ {
+ println!(
+ "received Serial Notify for current session {}, send Serial Query with serial {}",
+ current_session_id, current_serial
+ );
+ send_serial_query(writer, state.version, current_session_id, current_serial)
+ .await?;
+ }
_ => {
println!(
@@ -366,10 +353,7 @@ async fn handle_incoming_pdu(
Ok(())
}
-async fn handle_poll_tick(
- writer: &mut ClientWriter,
- state: &mut ClientState,
-) -> io::Result<()> {
+async fn handle_poll_tick(writer: &mut ClientWriter, state: &mut ClientState) -> io::Result<()> {
println!();
println!(
"[auto-poll] timer fired (interval={}s)",
@@ -422,23 +406,21 @@ async fn handle_console_command(
state.schedule_next_poll();
}
- ["serial"] => {
- match (state.session_id, state.serial) {
- (Some(session_id), Some(serial)) => {
- println!(
- "manual command: send Serial Query with current state: session_id={}, serial={}",
- session_id, serial
- );
- send_serial_query(writer, state.version, session_id, serial).await?;
- state.schedule_next_poll();
- }
- _ => {
- println!(
- "manual command failed: current session_id/serial not available, use `reset` or `serial `"
- );
- }
+ ["serial"] => match (state.session_id, state.serial) {
+ (Some(session_id), Some(serial)) => {
+ println!(
+ "manual command: send Serial Query with current state: session_id={}, serial={}",
+ session_id, serial
+ );
+ send_serial_query(writer, state.version, session_id, serial).await?;
+ state.schedule_next_poll();
}
- }
+ _ => {
+ println!(
+ "manual command failed: current session_id/serial not available, use `reset` or `serial `"
+ );
+ }
+ },
["serial", session_id, serial] => {
let session_id = match session_id.parse::() {
@@ -493,7 +475,10 @@ async fn handle_console_command(
"current effective poll interval: {}s",
state.effective_poll_secs()
);
- println!("poll interval source : {}", state.poll_interval_source());
+ println!(
+ "poll interval source : {}",
+ state.poll_interval_source()
+ );
println!("stored refresh hint : {:?}", state.refresh);
println!("default poll interval : {}s", state.default_poll_secs);
println!("last_error_code : {:?}", state.last_error_code);
@@ -626,17 +611,20 @@ impl ClientState {
fn effective_poll_secs(&self) -> u64 {
if self.should_prefer_retry_poll() {
- self.retry
- .map(|v| v as u64)
- .unwrap_or_else(|| self.refresh.map(|v| v as u64).unwrap_or(self.default_poll_secs))
+ self.retry.map(|v| v as u64).unwrap_or_else(|| {
+ self.refresh
+ .map(|v| v as u64)
+ .unwrap_or(self.default_poll_secs)
+ })
} else {
- self.refresh.map(|v| v as u64).unwrap_or(self.default_poll_secs)
+ self.refresh
+ .map(|v| v as u64)
+ .unwrap_or(self.default_poll_secs)
}
}
fn schedule_next_poll(&mut self) {
- self.next_poll_deadline =
- Instant::now() + Duration::from_secs(self.effective_poll_secs());
+ self.next_poll_deadline = Instant::now() + Duration::from_secs(self.effective_poll_secs());
}
fn pause_auto_poll(&mut self) {
@@ -728,7 +716,10 @@ impl Config {
}
"--server-name" => {
let name = args.next().ok_or_else(|| {
- io::Error::new(io::ErrorKind::InvalidInput, "--server-name requires a value")
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "--server-name requires a value",
+ )
})?;
ensure_tls_config(&mut transport)?.server_name = Some(name);
}
@@ -805,10 +796,7 @@ impl Config {
let serial = positional
.next()
.ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- "serial mode requires serial",
- )
+ io::Error::new(io::ErrorKind::InvalidInput, "serial mode requires serial")
})?
.parse::()
.map_err(|e| {
@@ -949,9 +937,15 @@ async fn connect_tls_stream(addr: &str, tls: &TlsConfig) -> io::Result io::Result {
if added == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
- format!("no valid CA certificates found in {}", ca_cert_path.display()),
+ format!(
+ "no valid CA certificates found in {}",
+ ca_cert_path.display()
+ ),
));
}
diff --git a/src/bin/rtr_debug_client/pretty.rs b/src/bin/rtr_debug_client/pretty.rs
index 401caab..6610721 100644
--- a/src/bin/rtr_debug_client/pretty.rs
+++ b/src/bin/rtr_debug_client/pretty.rs
@@ -1,9 +1,8 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use crate::protocol::{
- flag_meaning, hex_bytes, PduHeader, PduType, ASPA_FIXED_BODY_LEN,
- END_OF_DATA_V0_BODY_LEN, END_OF_DATA_V1_BODY_LEN, IPV4_PREFIX_BODY_LEN,
- IPV6_PREFIX_BODY_LEN, ROUTER_KEY_FIXED_BODY_LEN,
+ ASPA_FIXED_BODY_LEN, END_OF_DATA_V0_BODY_LEN, END_OF_DATA_V1_BODY_LEN, IPV4_PREFIX_BODY_LEN,
+ IPV6_PREFIX_BODY_LEN, PduHeader, PduType, ROUTER_KEY_FIXED_BODY_LEN, flag_meaning, hex_bytes,
};
pub fn print_pdu(header: &PduHeader, body: &[u8]) {
@@ -143,8 +142,7 @@ fn print_error_report(header: &PduHeader, body: &[u8]) {
return;
}
- let encapsulated_len =
- u32::from_be_bytes([body[0], body[1], body[2], body[3]]) as usize;
+ let encapsulated_len = u32::from_be_bytes([body[0], body[1], body[2], body[3]]) as usize;
if body.len() < 4 + encapsulated_len + 4 {
println!("invalid ErrorReport: truncated encapsulated PDU");
diff --git a/src/bin/rtr_debug_client/wire.rs b/src/bin/rtr_debug_client/wire.rs
index 346cb01..48a6766 100644
--- a/src/bin/rtr_debug_client/wire.rs
+++ b/src/bin/rtr_debug_client/wire.rs
@@ -2,9 +2,7 @@ use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use crate::protocol::{
- PduHeader, PduType, RawPdu, HEADER_LEN, MAX_PDU_LEN, SERIAL_QUERY_LEN,
-};
+use crate::protocol::{HEADER_LEN, MAX_PDU_LEN, PduHeader, PduType, RawPdu, SERIAL_QUERY_LEN};
pub async fn send_reset_query(stream: &mut S, version: u8) -> io::Result<()>
where
@@ -56,10 +54,7 @@ where
if header.length < HEADER_LEN as u32 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- format!(
- "invalid PDU length {} < {}",
- header.length, HEADER_LEN
- ),
+ format!("invalid PDU length {} < {}", header.length, HEADER_LEN),
));
}
@@ -78,4 +73,4 @@ where
stream.read_exact(&mut body).await?;
Ok(RawPdu { header, body })
-}
\ No newline at end of file
+}
diff --git a/src/bin/slurm_apply_client.rs b/src/bin/slurm_apply_client.rs
new file mode 100644
index 0000000..eabc9c6
--- /dev/null
+++ b/src/bin/slurm_apply_client.rs
@@ -0,0 +1,168 @@
+use std::env;
+use std::fs::File;
+use std::path::PathBuf;
+
+use anyhow::{Context, Result, anyhow};
+use serde::Serialize;
+
+use rpki::rtr::payload::Payload;
+use rpki::slurm::file::SlurmFile;
+use rpki::source::ccr::{
+ find_latest_ccr_file, load_ccr_snapshot_from_file, snapshot_to_payloads_with_options,
+};
+
+#[derive(Debug)]
+struct Cli {
+ ccr_path: PathBuf,
+ slurm_path: PathBuf,
+ strict_ccr: bool,
+ dump_payloads: bool,
+}
+
+#[derive(Debug, Serialize)]
+struct Output {
+ ccr_path: String,
+ slurm_path: String,
+ produced_at: Option,
+ slurm_version: u32,
+ input_payload_count: usize,
+ input_vrp_count: usize,
+ input_vap_count: usize,
+ output_payload_count: usize,
+ output_vrp_count: usize,
+ output_vap_count: usize,
+ invalid_vrps: Vec,
+ invalid_vaps: Vec,
+ sample_output_aspa_customers: Vec,
+ payloads: Option>,
+}
+
+fn main() -> Result<()> {
+ let cli = parse_args(env::args().skip(1))?;
+
+ let snapshot = load_ccr_snapshot_from_file(&cli.ccr_path)
+ .with_context(|| format!("failed to load CCR snapshot: {}", cli.ccr_path.display()))?;
+ let slurm = load_slurm(&cli.slurm_path)?;
+
+ let conversion = snapshot_to_payloads_with_options(&snapshot, cli.strict_ccr)?;
+ let payloads = slurm.apply(&conversion.payloads);
+ let (input_vrp_count, input_vap_count) = count_vrps_and_vaps(&conversion.payloads);
+ let (output_vrp_count, output_vap_count) = count_vrps_and_vaps(&payloads);
+
+ let output = Output {
+ ccr_path: cli.ccr_path.display().to_string(),
+ slurm_path: cli.slurm_path.display().to_string(),
+ produced_at: snapshot.produced_at.clone(),
+ slurm_version: slurm.version().as_u32(),
+ input_payload_count: conversion.payloads.len(),
+ input_vrp_count,
+ input_vap_count,
+ output_payload_count: payloads.len(),
+ output_vrp_count,
+ output_vap_count,
+ invalid_vrps: conversion.invalid_vrps,
+ invalid_vaps: conversion.invalid_vaps,
+ sample_output_aspa_customers: sample_aspa_customers(&payloads, 8),
+ payloads: cli.dump_payloads.then_some(payloads),
+ };
+
+ println!("{}", serde_json::to_string_pretty(&output)?);
+ Ok(())
+}
+
+fn load_slurm(path: &PathBuf) -> Result {
+ let file = File::open(path)
+ .with_context(|| format!("failed to open SLURM file: {}", path.display()))?;
+ SlurmFile::from_reader(file)
+ .with_context(|| format!("failed to parse SLURM file: {}", path.display()))
+}
+
+fn parse_args(args: impl Iterator- ) -> Result {
+ let mut strict_ccr = false;
+ let mut dump_payloads = false;
+ let mut positionals = Vec::new();
+
+ for arg in args {
+ match arg.as_str() {
+ "--strict-ccr" => strict_ccr = true,
+ "--dump-payloads" => dump_payloads = true,
+ "-h" | "--help" => {
+ print_help();
+ std::process::exit(0);
+ }
+ _ if arg.starts_with('-') => {
+ return Err(anyhow!("unknown option: {}", arg));
+ }
+ _ => positionals.push(arg),
+ }
+ }
+
+ if positionals.is_empty() {
+ return Ok(Cli {
+ ccr_path: find_latest_ccr_file("data")
+ .context("failed to find latest .ccr in ./data for default run")?,
+ slurm_path: PathBuf::from("data/example.slurm"),
+ strict_ccr,
+ dump_payloads,
+ });
+ }
+
+ if positionals.len() != 2 {
+ print_help();
+ return Err(anyhow!(
+ "expected: slurm_apply_client "
+ ));
+ }
+
+ Ok(Cli {
+ ccr_path: PathBuf::from(&positionals[0]),
+ slurm_path: PathBuf::from(&positionals[1]),
+ strict_ccr,
+ dump_payloads,
+ })
+}
+
+fn print_help() {
+ eprintln!(
+ "Usage: cargo run --bin slurm_apply_client -- [--strict-ccr] [--dump-payloads] "
+ );
+ eprintln!();
+ eprintln!("Reads a CCR snapshot, converts it into payloads, applies SLURM, and prints JSON.");
+ eprintln!(
+ "When no arguments are provided, it defaults to the latest .ccr under ./data and ./data/example.slurm."
+ );
+ eprintln!("Use --dump-payloads to include the full payload list in the JSON output.");
+}
+
+fn count_vrps_and_vaps(payloads: &[Payload]) -> (usize, usize) {
+ let mut vrps = 0;
+ let mut vaps = 0;
+
+ for payload in payloads {
+ match payload {
+ Payload::RouteOrigin(_) => vrps += 1,
+ Payload::Aspa(_) => vaps += 1,
+ Payload::RouterKey(_) => {}
+ }
+ }
+
+ (vrps, vaps)
+}
+
+fn sample_aspa_customers(payloads: &[Payload], limit: usize) -> Vec {
+ let mut customers = Vec::new();
+
+ for payload in payloads {
+ if let Payload::Aspa(aspa) = payload {
+ let customer = aspa.customer_asn().into_u32();
+ if !customers.contains(&customer) {
+ customers.push(customer);
+ if customers.len() == limit {
+ break;
+ }
+ }
+ }
+ }
+
+ customers
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5ef2acb..88e8e53 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,4 @@
pub mod data_model;
-mod slurm;
+pub mod slurm;
pub mod rtr;
+pub mod source;
diff --git a/src/main.rs b/src/main.rs
index 19c8073..41af49e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,15 +3,15 @@ use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
-use anyhow::{anyhow, Result};
+use anyhow::{Result, anyhow};
use tokio::task::JoinHandle;
use tracing::{info, warn};
-use rpki::rtr::ccr::{find_latest_ccr_file, load_ccr_payloads_from_file_with_options, load_ccr_snapshot_from_file};
use rpki::rtr::cache::{RtrCache, SharedRtrCache};
use rpki::rtr::payload::Timing;
use rpki::rtr::server::{RtrNotifier, RtrService, RtrServiceConfig, RunningRtrService};
use rpki::rtr::store::RtrStore;
+use rpki::source::pipeline::{PayloadLoadConfig, load_payloads_from_latest_sources};
#[derive(Debug, Clone)]
struct AppConfig {
@@ -21,6 +21,7 @@ struct AppConfig {
db_path: String,
ccr_dir: String,
+ slurm_dir: Option,
tls_cert_path: String,
tls_key_path: String,
tls_client_ca_path: String,
@@ -42,6 +43,7 @@ impl Default for AppConfig {
db_path: "./rtr-db".to_string(),
ccr_dir: "./data".to_string(),
+ slurm_dir: None,
tls_cert_path: "./certs/server.crt".to_string(),
tls_key_path: "./certs/server.key".to_string(),
tls_client_ca_path: "./certs/client-ca.crt".to_string(),
@@ -85,6 +87,14 @@ impl AppConfig {
if let Some(value) = env_var("RPKI_RTR_CCR_DIR")? {
config.ccr_dir = value;
}
+ if let Some(value) = env_var("RPKI_RTR_SLURM_DIR")? {
+ let value = value.trim();
+ config.slurm_dir = if value.is_empty() {
+ None
+ } else {
+ Some(value.to_string())
+ };
+ }
if let Some(value) = env_var("RPKI_RTR_TLS_CERT_PATH")? {
config.tls_cert_path = value;
}
@@ -104,8 +114,7 @@ impl AppConfig {
parse_bool(&value, "RPKI_RTR_PRUNE_DELTA_BY_SNAPSHOT_SIZE")?;
}
if let Some(value) = env_var("RPKI_RTR_STRICT_CCR_VALIDATION")? {
- config.strict_ccr_validation =
- parse_bool(&value, "RPKI_RTR_STRICT_CCR_VALIDATION")?;
+ config.strict_ccr_validation = parse_bool(&value, "RPKI_RTR_STRICT_CCR_VALIDATION")?;
}
if let Some(value) = env_var("RPKI_RTR_REFRESH_INTERVAL_SECS")? {
let secs: u64 = value.parse().map_err(|err| {
@@ -118,9 +127,9 @@ impl AppConfig {
config.refresh_interval = Duration::from_secs(secs);
}
if let Some(value) = env_var("RPKI_RTR_MAX_CONNECTIONS")? {
- config.service_config.max_connections = value.parse().map_err(|err| {
- anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err)
- })?;
+ config.service_config.max_connections = value
+ .parse()
+ .map_err(|err| anyhow!("invalid RPKI_RTR_MAX_CONNECTIONS '{}': {}", value, err))?;
}
if let Some(value) = env_var("RPKI_RTR_NOTIFY_QUEUE_SIZE")? {
config.service_config.notify_queue_size = value.parse().map_err(|err| {
@@ -184,12 +193,17 @@ fn open_store(config: &AppConfig) -> Result {
}
fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result {
+ let payload_load_config = PayloadLoadConfig {
+ ccr_dir: config.ccr_dir.clone(),
+ slurm_dir: config.slurm_dir.clone(),
+ strict_ccr_validation: config.strict_ccr_validation,
+ };
let initial_cache = RtrCache::default().init(
store,
config.max_delta,
config.prune_delta_by_snapshot_size,
Timing::default(),
- || load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation),
+ || load_payloads_from_latest_sources(&payload_load_config),
)?;
let shared_cache: SharedRtrCache = Arc::new(RwLock::new(initial_cache));
@@ -232,8 +246,11 @@ fn spawn_refresh_task(
notifier: RtrNotifier,
) -> JoinHandle<()> {
let refresh_interval = config.refresh_interval;
- let ccr_dir = config.ccr_dir.clone();
- let strict_ccr_validation = config.strict_ccr_validation;
+ let payload_load_config = PayloadLoadConfig {
+ ccr_dir: config.ccr_dir.clone(),
+ slurm_dir: config.slurm_dir.clone(),
+ strict_ccr_validation: config.strict_ccr_validation,
+ };
tokio::spawn(async move {
let mut interval = tokio::time::interval(refresh_interval);
@@ -241,7 +258,7 @@ fn spawn_refresh_task(
loop {
interval.tick().await;
- match load_payloads_from_latest_ccr(&ccr_dir, strict_ccr_validation) {
+ match load_payloads_from_latest_sources(&payload_load_config) {
Ok(payloads) => {
let payload_count = payloads.len();
let updated = {
@@ -261,7 +278,7 @@ fn spawn_refresh_task(
if new_serial != old_serial {
info!(
"RTR cache refresh applied: ccr_dir={}, payload_count={}, old_serial={}, new_serial={}",
- ccr_dir,
+ payload_load_config.ccr_dir,
payload_count,
old_serial,
new_serial
@@ -270,9 +287,7 @@ fn spawn_refresh_task(
} else {
info!(
"RTR cache refresh found no change: ccr_dir={}, payload_count={}, serial={}",
- ccr_dir,
- payload_count,
- old_serial
+ payload_load_config.ccr_dir, payload_count, old_serial
);
false
}
@@ -290,7 +305,10 @@ fn spawn_refresh_task(
}
}
Err(err) => {
- warn!("failed to reload CCR payloads from {}: {:?}", ccr_dir, err);
+ warn!(
+ "failed to reload CCR/SLURM payloads from {}: {:?}",
+ payload_load_config.ccr_dir, err
+ );
}
}
}
@@ -317,16 +335,17 @@ fn log_startup_config(config: &AppConfig) {
}
info!("ccr_dir={}", config.ccr_dir);
+ info!(
+ "slurm_dir={}",
+ config.slurm_dir.as_deref().unwrap_or("disabled")
+ );
info!("max_delta={}", config.max_delta);
info!("strict_ccr_validation={}", config.strict_ccr_validation);
info!(
"refresh_interval_secs={}",
config.refresh_interval.as_secs()
);
- info!(
- "max_connections={}",
- config.service_config.max_connections
- );
+ info!("max_connections={}", config.service_config.max_connections);
info!(
"notify_queue_size={}",
config.service_config.notify_queue_size
@@ -372,50 +391,3 @@ fn parse_bool(value: &str, name: &str) -> Result {
_ => Err(anyhow!("invalid {} '{}': expected boolean", name, value)),
}
}
-
-fn load_payloads_from_latest_ccr(
- ccr_dir: &str,
- strict_ccr_validation: bool,
-) -> Result> {
- let latest = find_latest_ccr_file(ccr_dir)?;
- let snapshot = load_ccr_snapshot_from_file(&latest)?;
- let vrp_count = snapshot.vrps.len();
- let vap_count = snapshot.vaps.len();
- let produced_at = snapshot.produced_at.clone();
- let conversion = load_ccr_payloads_from_file_with_options(&latest, strict_ccr_validation)?;
- let payloads = conversion.payloads;
-
- if !conversion.invalid_vrps.is_empty() {
- warn!(
- "CCR load skipped invalid VRPs: file={}, skipped={}, samples={:?}",
- latest.display(),
- conversion.invalid_vrps.len(),
- sample_messages(&conversion.invalid_vrps)
- );
- }
-
- if !conversion.invalid_vaps.is_empty() {
- warn!(
- "CCR load skipped invalid VAPs/ASPAs: file={}, skipped={}, samples={:?}",
- latest.display(),
- conversion.invalid_vaps.len(),
- sample_messages(&conversion.invalid_vaps)
- );
- }
-
- info!(
- "loaded latest CCR snapshot: file={}, produced_at={:?}, vrp_count={}, vap_count={}, payload_count={}, strict_ccr_validation={}",
- latest.display(),
- produced_at,
- vrp_count,
- vap_count,
- payloads.len(),
- strict_ccr_validation
- );
-
- Ok(payloads)
-}
-
-fn sample_messages(messages: &[String]) -> Vec<&str> {
- messages.iter().take(3).map(String::as_str).collect()
-}
diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs
index 817c0c0..1059453 100644
--- a/src/rtr/cache/core.rs
+++ b/src/rtr/cache/core.rs
@@ -1,14 +1,14 @@
-use std::collections::{BTreeMap, VecDeque};
-use std::cmp::Ordering;
-use std::sync::Arc;
use anyhow::Result;
use serde::{Deserialize, Serialize};
+use std::cmp::Ordering;
+use std::collections::{BTreeMap, VecDeque};
+use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::rtr::payload::{Payload, Timing};
use super::model::{Delta, DualTime, Snapshot};
-use super::ordering::{change_key, ChangeKey};
+use super::ordering::{ChangeKey, change_key};
const SERIAL_HALF_RANGE: u32 = 1 << 31;
@@ -166,9 +166,7 @@ impl RtrCacheBuilder {
let serial = self.serial.unwrap_or(0);
let created_at = self.created_at.unwrap_or_else(|| now.clone());
let availability = self.availability.unwrap_or(CacheAvailability::Ready);
- let session_ids = self
- .session_ids
- .unwrap_or_else(SessionIds::random_distinct);
+ let session_ids = self.session_ids.unwrap_or_else(SessionIds::random_distinct);
RtrCache {
availability,
@@ -235,8 +233,7 @@ impl RtrCache {
self.serial = self.serial.wrapping_add(1);
debug!(
"RTR cache advanced serial: old_serial={}, new_serial={}",
- old,
- self.serial
+ old, self.serial
);
self.serial
}
@@ -251,9 +248,7 @@ impl RtrCache {
let snapshot_wire_size = estimate_snapshot_payload_wire_size(&self.snapshot);
let mut cumulative_delta_wire_size =
estimate_delta_window_payload_wire_size(&self.deltas);
- while !self.deltas.is_empty()
- && cumulative_delta_wire_size >= snapshot_wire_size
- {
+ while !self.deltas.is_empty() && cumulative_delta_wire_size >= snapshot_wire_size {
if let Some(oldest) = self.deltas.pop_front() {
dropped_serials.push(oldest.serial());
cumulative_delta_wire_size =
@@ -262,9 +257,7 @@ impl RtrCache {
}
debug!(
"RTR cache delta-size pruning evaluated: snapshot_wire_size={}, cumulative_delta_wire_size={}, dropped_serials={:?}",
- snapshot_wire_size,
- cumulative_delta_wire_size,
- dropped_serials
+ snapshot_wire_size, cumulative_delta_wire_size, dropped_serials
);
}
debug!(
@@ -292,7 +285,10 @@ impl RtrCache {
}
}
- pub(super) fn apply_update(&mut self, new_payloads: Vec) -> Result