From ff711a2fe18fcb026ae9251b44e86ad43a4cd5a5 Mon Sep 17 00:00:00 2001 From: "xiuting.xu" Date: Tue, 12 May 2026 16:45:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=85=E5=AD=98=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy/client/.env | 4 + deploy/client/docker-compose.clients.yml | 10 +- deploy/client/docker-compose.ssh.password.yml | 2 +- deploy/client/docker-compose.ssh.yml | 2 +- deploy/client/docker-compose.tcp.yml | 2 +- deploy/client/docker-compose.tls.yml | 2 +- deploy/client/docker-compose.yml | 2 +- deploy/server/.env | 8 + deploy/server/docker-compose.ssh.yml | 5 +- deploy/server/docker-compose.tcp.yml | 5 +- deploy/server/docker-compose.tls.yml | 5 +- deploy/server/docker-compose.yml | 5 +- src/rtr/cache/core.rs | 28 +--- src/rtr/cache/model.rs | 157 +++++++++++++----- src/rtr/cache/store.rs | 11 +- 15 files changed, 152 insertions(+), 96 deletions(-) create mode 100644 deploy/server/.env diff --git a/deploy/client/.env b/deploy/client/.env index a333784..b6e9ce8 100644 --- a/deploy/client/.env +++ b/deploy/client/.env @@ -4,6 +4,10 @@ # SSH example: 10.0.0.12:22 RPKI_RTR_SERVER_ADDR=rpki-rtr-tcp:323 + +# RTR protocol version used as client command second argument (supported: 0,1,2) +RPKI_RTR_PROTOCOL_VERSION=2 + # TLS server name used by --server-name in TLS mode # Must match server certificate SAN dNSName. RPKI_RTR_TLS_SERVER_NAME=localhost diff --git a/deploy/client/docker-compose.clients.yml b/deploy/client/docker-compose.clients.yml index 8c4b1ff..c5c9a26 100644 --- a/deploy/client/docker-compose.clients.yml +++ b/deploy/client/docker-compose.clients.yml @@ -3,7 +3,7 @@ version: "3.9" services: rtr-client-1: image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no @@ -12,7 +12,7 @@ services: rtr-client-2: image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no @@ -21,7 +21,7 @@ services: rtr-client-3: image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no @@ -30,7 +30,7 @@ services: rtr-client-4: image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no @@ -39,7 +39,7 @@ services: rtr-client-5: image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no diff --git a/deploy/client/docker-compose.ssh.password.yml b/deploy/client/docker-compose.ssh.password.yml index e7d0786..4e28c34 100644 --- a/deploy/client/docker-compose.ssh.password.yml +++ b/deploy/client/docker-compose.ssh.password.yml @@ -9,7 +9,7 @@ services: command: [ "${RPKI_RTR_SERVER_ADDR:-rpki-rtr-ssh:22}", - "2", + "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--ssh", "--ssh-user", diff --git a/deploy/client/docker-compose.ssh.yml b/deploy/client/docker-compose.ssh.yml index 8755ca1..73755ae 100644 --- a/deploy/client/docker-compose.ssh.yml +++ b/deploy/client/docker-compose.ssh.yml @@ -9,7 +9,7 @@ services: command: [ "${RPKI_RTR_SERVER_ADDR:-rpki-rtr-ssh:22}", - "2", + "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--ssh", "--ssh-user", diff --git a/deploy/client/docker-compose.tcp.yml b/deploy/client/docker-compose.tcp.yml index fe712ba..b7e8519 100644 --- a/deploy/client/docker-compose.tcp.yml +++ b/deploy/client/docker-compose.tcp.yml @@ -4,7 +4,7 @@ services: context: ../.. dockerfile: deploy/client/Dockerfile image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no diff --git a/deploy/client/docker-compose.tls.yml b/deploy/client/docker-compose.tls.yml index a0ca781..0de5bf5 100644 --- a/deploy/client/docker-compose.tls.yml +++ b/deploy/client/docker-compose.tls.yml @@ -9,7 +9,7 @@ services: command: [ "${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tls:324}", - "2", + "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--tls", "--ca-cert", diff --git a/deploy/client/docker-compose.yml b/deploy/client/docker-compose.yml index fe712ba..b7e8519 100644 --- a/deploy/client/docker-compose.yml +++ b/deploy/client/docker-compose.yml @@ -4,7 +4,7 @@ services: context: ../.. dockerfile: deploy/client/Dockerfile image: rpki-rtr-debug-client:latest - command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"] + command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"] volumes: - ../../logs/client:/app/logs restart: no diff --git a/deploy/server/.env b/deploy/server/.env new file mode 100644 index 0000000..dc4fb98 --- /dev/null +++ b/deploy/server/.env @@ -0,0 +1,8 @@ +# Host directory containing CCR files to mount into the server container. +RPKI_RTR_CCR_HOST_DIR=../../data + +# In-container directory used by rpki_rtr as CCR input directory. +RPKI_RTR_CCR_DIR=/app/data + +# Max retained delta count in RTR cache. +RPKI_RTR_MAX_DELTA=10 diff --git a/deploy/server/docker-compose.ssh.yml b/deploy/server/docker-compose.ssh.yml index 708609f..87f7672 100644 --- a/deploy/server/docker-compose.ssh.yml +++ b/deploy/server/docker-compose.ssh.yml @@ -25,14 +25,15 @@ services: # Optional: enable password authentication in addition to publickey # RPKI_RTR_SSH_PASSWORD: "test-password" RPKI_RTR_DB_PATH: "/app/rtr-db" - RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "/app/slurm" RPKI_RTR_STRICT_CCR_VALIDATION: "false" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128" RUST_LOG: "info" volumes: - - ../../data:/app/data:ro + - ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro - ../../rtr-db:/app/rtr-db - ../../data:/app/slurm:ro - ${RPKI_RTR_SSH_KEYS_VOLUME:-/etc/ssh:/host-ssh:ro} diff --git a/deploy/server/docker-compose.tcp.yml b/deploy/server/docker-compose.tcp.yml index 657b1db..15018bd 100644 --- a/deploy/server/docker-compose.tcp.yml +++ b/deploy/server/docker-compose.tcp.yml @@ -15,15 +15,16 @@ services: RPKI_RTR_ENABLE_SSH: "false" RPKI_RTR_TCP_ADDR: "0.0.0.0:323" RPKI_RTR_DB_PATH: "/app/rtr-db" - RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "/app/slurm" RPKI_RTR_STRICT_CCR_VALIDATION: "false" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "60" + RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" RPKI_RTR_MAX_CONNECTIONS: "100000" RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128" RUST_LOG: "info" volumes: - - ../../data:/app/data:ro + - ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro - ../../rtr-db:/app/rtr-db - ../../data:/app/slurm:ro - ../../logs/server:/app/logs diff --git a/deploy/server/docker-compose.tls.yml b/deploy/server/docker-compose.tls.yml index f7bed57..985234b 100644 --- a/deploy/server/docker-compose.tls.yml +++ b/deploy/server/docker-compose.tls.yml @@ -21,14 +21,15 @@ services: RPKI_RTR_TLS_CLIENT_CA_PATH: "/app/certs/client-ca.crt" RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH: "false" RPKI_RTR_DB_PATH: "/app/rtr-db" - RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "/app/slurm" RPKI_RTR_STRICT_CCR_VALIDATION: "false" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128" RUST_LOG: "info" volumes: - - ../../data:/app/data:ro + - ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro - ../../rtr-db:/app/rtr-db - ../../data:/app/slurm:ro - ../../tests/fixtures/tls:/app/certs:ro diff --git a/deploy/server/docker-compose.yml b/deploy/server/docker-compose.yml index 2816232..351f6ef 100644 --- a/deploy/server/docker-compose.yml +++ b/deploy/server/docker-compose.yml @@ -18,10 +18,11 @@ services: RPKI_RTR_TCP_ADDR: "0.0.0.0:323" RPKI_RTR_TLS_ADDR: "0.0.0.0:324" RPKI_RTR_DB_PATH: "/app/rtr-db" - RPKI_RTR_CCR_DIR: "/app/data" + RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}" RPKI_RTR_SLURM_DIR: "/app/slurm" RPKI_RTR_STRICT_CCR_VALIDATION: "false" RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300" + RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}" RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128" RUST_LOG: "info" # SSH mode example: @@ -35,7 +36,7 @@ services: # Optional: enable password auth in addition to publickey # RPKI_RTR_SSH_PASSWORD: "test-password" volumes: - - ../../data:/app/data:ro + - ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro - ../../rtr-db:/app/rtr-db - ../../data:/app/slurm:ro - ../../logs/server:/app/logs diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index fa14aff..0101bac 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -578,33 +578,7 @@ fn merge_deltas_minimally(current_serial: u32, deltas: &[Arc]) -> Delta { } fn project_snapshot_for_version(snapshot: &Snapshot, version: u8) -> Snapshot { - let mut payloads = Vec::new(); - for payload in snapshot.payloads() { - if let Some(projected) = project_payload_for_version(&payload, version) { - payloads.push(projected); - } - } - Snapshot::from_payloads(payloads) -} - -fn project_payload_for_version(payload: &Payload, version: u8) -> Option { - match payload { - Payload::RouteOrigin(origin) => Some(Payload::RouteOrigin(origin.clone())), - Payload::RouterKey(key) => { - if version >= 1 { - Some(Payload::RouterKey(key.clone())) - } else { - None - } - } - Payload::Aspa(aspa) => { - if version >= 2 { - Some(Payload::Aspa(aspa.clone())) - } else { - None - } - } - } + snapshot.project_for_version(version) } fn estimate_snapshot_payload_wire_size(snapshot: &Snapshot) -> usize { diff --git a/src/rtr/cache/model.rs b/src/rtr/cache/model.rs index 0f5134b..6aecc1d 100644 --- a/src/rtr/cache/model.rs +++ b/src/rtr/cache/model.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; @@ -69,9 +69,9 @@ impl<'de> Deserialize<'de> for DualTime { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Snapshot { - origins: BTreeSet, - router_keys: BTreeSet, - aspas: BTreeSet, + origins: Arc>, + router_keys: Arc>, + aspas: Arc>, created_at: DualTime, origins_hash: [u8; 32], router_keys_hash: [u8; 32], @@ -83,14 +83,26 @@ pub struct Snapshot { impl Snapshot { pub fn new( - origins: BTreeSet, - router_keys: BTreeSet, - aspas: BTreeSet, + origins: Vec, + router_keys: Vec, + aspas: Vec, + ) -> Self { + Self::from_shared_parts( + Arc::new(sorted_dedup(origins)), + Arc::new(sorted_dedup(router_keys)), + Arc::new(normalize_aspas(aspas)), + ) + } + + fn from_shared_parts( + origins: Arc>, + router_keys: Arc>, + aspas: Arc>, ) -> Self { let mut snapshot = Snapshot { origins, router_keys, - aspas: normalize_aspas(aspas), + aspas, created_at: DualTime::now(), origins_hash: [0u8; 32], router_keys_hash: [0u8; 32], @@ -105,21 +117,21 @@ impl Snapshot { } pub fn empty() -> Self { - Self::new(BTreeSet::new(), BTreeSet::new(), BTreeSet::new()) + Self::new(Vec::new(), Vec::new(), Vec::new()) } pub fn from_payloads(payloads: Vec) -> Self { - let mut origins = BTreeSet::new(); - let mut router_keys = BTreeSet::new(); + let mut origins = Vec::new(); + let mut router_keys = Vec::new(); let mut aspas = Vec::new(); for p in payloads { match p { Payload::RouteOrigin(o) => { - origins.insert(o); + origins.push(o); } Payload::RouterKey(k) => { - router_keys.insert(k); + router_keys.push(k); } Payload::Aspa(a) => { aspas.push(a); @@ -130,6 +142,21 @@ impl Snapshot { Snapshot::new(origins, router_keys, normalize_aspas(aspas)) } + pub fn project_for_version(&self, version: u8) -> Self { + let router_keys = if version >= 1 { + self.router_keys.clone() + } else { + Arc::new(Vec::new()) + }; + let aspas = if version >= 2 { + self.aspas.clone() + } else { + Arc::new(Vec::new()) + }; + + Self::from_shared_parts(self.origins.clone(), router_keys, aspas) + } + pub fn recompute_hashes(&mut self) { self.origins_hash = self.compute_origins_hash(); self.router_keys_hash = self.compute_router_keys_hash(); @@ -182,21 +209,23 @@ impl Snapshot { let mut withdrawn = Vec::new(); if !self.same_origins(new_snapshot) { - for origin in new_snapshot.origins.difference(&self.origins) { - announced.push(Payload::RouteOrigin(origin.clone())); - } - for origin in self.origins.difference(&new_snapshot.origins) { - withdrawn.push(Payload::RouteOrigin(origin.clone())); - } + diff_sorted( + self.origins.as_slice(), + new_snapshot.origins.as_slice(), + &mut announced, + &mut withdrawn, + Payload::RouteOrigin, + ); } if !self.same_router_keys(new_snapshot) { - for key in new_snapshot.router_keys.difference(&self.router_keys) { - announced.push(Payload::RouterKey(key.clone())); - } - for key in self.router_keys.difference(&new_snapshot.router_keys) { - withdrawn.push(Payload::RouterKey(key.clone())); - } + diff_sorted( + self.router_keys.as_slice(), + new_snapshot.router_keys.as_slice(), + &mut announced, + &mut withdrawn, + Payload::RouterKey, + ); } if !self.same_aspas(new_snapshot) { @@ -268,16 +297,16 @@ impl Snapshot { self.snapshot_hash == other.snapshot_hash } - pub fn origins(&self) -> &BTreeSet { - &self.origins + pub fn origins(&self) -> &[RouteOrigin] { + self.origins.as_ref() } - pub fn router_keys(&self) -> &BTreeSet { - &self.router_keys + pub fn router_keys(&self) -> &[RouterKey] { + self.router_keys.as_ref() } - pub fn aspas(&self) -> &BTreeSet { - &self.aspas + pub fn aspas(&self) -> &[Aspa] { + self.aspas.as_ref() } pub fn is_empty(&self) -> bool { @@ -369,15 +398,15 @@ fn build_snapshot_payloads_for_rtr(snapshot: &Snapshot) -> Vec { } fn dedup_payloads(payloads: &mut Vec) { - let mut seen = BTreeSet::new(); - payloads.retain(|p| seen.insert(p.clone())); + payloads.sort(); + payloads.dedup(); } -fn normalize_aspas(aspas: I) -> BTreeSet +fn normalize_aspas(aspas: I) -> Vec where I: IntoIterator, { - let mut by_customer = BTreeMap::>::new(); + let mut by_customer = BTreeMap::>::new(); for aspa in aspas { let providers = by_customer @@ -386,17 +415,19 @@ where providers.extend(aspa.provider_asns().iter().copied()); } - by_customer + let mut normalized = by_customer .into_iter() .map(|(customer_asn, providers)| { - Aspa::new(customer_asn.into(), providers.into_iter().collect()) + Aspa::new(customer_asn.into(), providers) }) - .collect() + .collect::>(); + normalized.sort(); + normalized } fn diff_aspas( - current: &BTreeSet, - next: &BTreeSet, + current: &[Aspa], + next: &[Aspa], announced: &mut Vec, withdrawn: &mut Vec, ) { @@ -413,7 +444,7 @@ fn diff_aspas( .keys() .chain(next.keys()) .copied() - .collect::>(); + .collect::>(); for customer in customers { match (current.get(&customer), next.get(&customer)) { @@ -426,3 +457,47 @@ fn diff_aspas( } } } + +fn sorted_dedup(mut items: Vec) -> Vec { + items.sort(); + items.dedup(); + items +} + +fn diff_sorted( + current: &[T], + next: &[T], + announced: &mut Vec, + withdrawn: &mut Vec, + wrap: F, +) where + T: Ord + Clone, + F: Fn(T) -> Payload, +{ + let mut i = 0usize; + let mut j = 0usize; + while i < current.len() && j < next.len() { + match current[i].cmp(&next[j]) { + std::cmp::Ordering::Less => { + withdrawn.push(wrap(current[i].clone())); + i += 1; + } + std::cmp::Ordering::Greater => { + announced.push(wrap(next[j].clone())); + j += 1; + } + std::cmp::Ordering::Equal => { + i += 1; + j += 1; + } + } + } + while i < current.len() { + withdrawn.push(wrap(current[i].clone())); + i += 1; + } + while j < next.len() { + announced.push(wrap(next[j].clone())); + j += 1; + } +} diff --git a/src/rtr/cache/store.rs b/src/rtr/cache/store.rs index 210fbcb..6d4d3f9 100644 --- a/src/rtr/cache/store.rs +++ b/src/rtr/cache/store.rs @@ -213,14 +213,5 @@ fn persist_update_job(job: StoreSyncJob) { } fn project_snapshot_for_version(snapshot: &Snapshot, version: u8) -> Snapshot { - let mut payloads = Vec::new(); - for payload in snapshot.payloads() { - match payload { - Payload::RouteOrigin(_) => payloads.push(payload), - Payload::RouterKey(_) if version >= 1 => payloads.push(payload), - Payload::Aspa(_) if version >= 2 => payloads.push(payload), - _ => {} - } - } - Snapshot::from_payloads(payloads) + snapshot.project_for_version(version) }