内存优化

This commit is contained in:
xiuting.xu 2026-05-12 16:45:56 +08:00
parent 4c6b441753
commit ff711a2fe1
15 changed files with 152 additions and 96 deletions

View File

@ -4,6 +4,10 @@
# SSH example: 10.0.0.12:22
RPKI_RTR_SERVER_ADDR=rpki-rtr-tcp:323
# RTR protocol version used as client command second argument (supported: 0,1,2)
RPKI_RTR_PROTOCOL_VERSION=2
# TLS server name used by --server-name in TLS mode
# Must match server certificate SAN dNSName.
RPKI_RTR_TLS_SERVER_NAME=localhost

View File

@ -3,7 +3,7 @@ version: "3.9"
services:
rtr-client-1:
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no
@ -12,7 +12,7 @@ services:
rtr-client-2:
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no
@ -21,7 +21,7 @@ services:
rtr-client-3:
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no
@ -30,7 +30,7 @@ services:
rtr-client-4:
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no
@ -39,7 +39,7 @@ services:
rtr-client-5:
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no

View File

@ -9,7 +9,7 @@ services:
command:
[
"${RPKI_RTR_SERVER_ADDR:-rpki-rtr-ssh:22}",
"2",
"${RPKI_RTR_PROTOCOL_VERSION:-2}",
"reset",
"--ssh",
"--ssh-user",

View File

@ -9,7 +9,7 @@ services:
command:
[
"${RPKI_RTR_SERVER_ADDR:-rpki-rtr-ssh:22}",
"2",
"${RPKI_RTR_PROTOCOL_VERSION:-2}",
"reset",
"--ssh",
"--ssh-user",

View File

@ -4,7 +4,7 @@ services:
context: ../..
dockerfile: deploy/client/Dockerfile
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no

View File

@ -9,7 +9,7 @@ services:
command:
[
"${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tls:324}",
"2",
"${RPKI_RTR_PROTOCOL_VERSION:-2}",
"reset",
"--tls",
"--ca-cert",

View File

@ -4,7 +4,7 @@ services:
context: ../..
dockerfile: deploy/client/Dockerfile
image: rpki-rtr-debug-client:latest
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "2", "reset", "--keep-after-error", "--summary-only"]
command: ["${RPKI_RTR_SERVER_ADDR:-rpki-rtr-tcp:323}", "${RPKI_RTR_PROTOCOL_VERSION:-2}", "reset", "--keep-after-error", "--summary-only"]
volumes:
- ../../logs/client:/app/logs
restart: no

8
deploy/server/.env Normal file
View File

@ -0,0 +1,8 @@
# Host directory containing CCR files to mount into the server container.
RPKI_RTR_CCR_HOST_DIR=../../data
# In-container directory used by rpki_rtr as CCR input directory.
RPKI_RTR_CCR_DIR=/app/data
# Max retained delta count in RTR cache.
RPKI_RTR_MAX_DELTA=10

View File

@ -25,14 +25,15 @@ services:
# Optional: enable password authentication in addition to publickey
# RPKI_RTR_SSH_PASSWORD: "test-password"
RPKI_RTR_DB_PATH: "/app/rtr-db"
RPKI_RTR_CCR_DIR: "/app/data"
RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}"
RPKI_RTR_SLURM_DIR: "/app/slurm"
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}"
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
RUST_LOG: "info"
volumes:
- ../../data:/app/data:ro
- ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro
- ../../rtr-db:/app/rtr-db
- ../../data:/app/slurm:ro
- ${RPKI_RTR_SSH_KEYS_VOLUME:-/etc/ssh:/host-ssh:ro}

View File

@ -15,15 +15,16 @@ services:
RPKI_RTR_ENABLE_SSH: "false"
RPKI_RTR_TCP_ADDR: "0.0.0.0:323"
RPKI_RTR_DB_PATH: "/app/rtr-db"
RPKI_RTR_CCR_DIR: "/app/data"
RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}"
RPKI_RTR_SLURM_DIR: "/app/slurm"
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "60"
RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}"
RPKI_RTR_MAX_CONNECTIONS: "100000"
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
RUST_LOG: "info"
volumes:
- ../../data:/app/data:ro
- ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro
- ../../rtr-db:/app/rtr-db
- ../../data:/app/slurm:ro
- ../../logs/server:/app/logs

View File

@ -21,14 +21,15 @@ services:
RPKI_RTR_TLS_CLIENT_CA_PATH: "/app/certs/client-ca.crt"
RPKI_RTR_ENFORCE_TLS_CLIENT_SAN_IP_MATCH: "false"
RPKI_RTR_DB_PATH: "/app/rtr-db"
RPKI_RTR_CCR_DIR: "/app/data"
RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}"
RPKI_RTR_SLURM_DIR: "/app/slurm"
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}"
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
RUST_LOG: "info"
volumes:
- ../../data:/app/data:ro
- ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro
- ../../rtr-db:/app/rtr-db
- ../../data:/app/slurm:ro
- ../../tests/fixtures/tls:/app/certs:ro

View File

@ -18,10 +18,11 @@ services:
RPKI_RTR_TCP_ADDR: "0.0.0.0:323"
RPKI_RTR_TLS_ADDR: "0.0.0.0:324"
RPKI_RTR_DB_PATH: "/app/rtr-db"
RPKI_RTR_CCR_DIR: "/app/data"
RPKI_RTR_CCR_DIR: "${RPKI_RTR_CCR_DIR:-/app/data}"
RPKI_RTR_SLURM_DIR: "/app/slurm"
RPKI_RTR_STRICT_CCR_VALIDATION: "false"
RPKI_RTR_SOURCE_REFRESH_INTERVAL_SECS: "300"
RPKI_RTR_MAX_DELTA: "${RPKI_RTR_MAX_DELTA:-10}"
RPKI_RTR_MAX_CONCURRENT_HANDSHAKES: "128"
RUST_LOG: "info"
# SSH mode example:
@ -35,7 +36,7 @@ services:
# Optional: enable password auth in addition to publickey
# RPKI_RTR_SSH_PASSWORD: "test-password"
volumes:
- ../../data:/app/data:ro
- ${RPKI_RTR_CCR_HOST_DIR:-../../data}:${RPKI_RTR_CCR_DIR:-/app/data}:ro
- ../../rtr-db:/app/rtr-db
- ../../data:/app/slurm:ro
- ../../logs/server:/app/logs

28
src/rtr/cache/core.rs vendored
View File

@ -578,33 +578,7 @@ fn merge_deltas_minimally(current_serial: u32, deltas: &[Arc<Delta>]) -> Delta {
}
fn project_snapshot_for_version(snapshot: &Snapshot, version: u8) -> Snapshot {
let mut payloads = Vec::new();
for payload in snapshot.payloads() {
if let Some(projected) = project_payload_for_version(&payload, version) {
payloads.push(projected);
}
}
Snapshot::from_payloads(payloads)
}
fn project_payload_for_version(payload: &Payload, version: u8) -> Option<Payload> {
match payload {
Payload::RouteOrigin(origin) => Some(Payload::RouteOrigin(origin.clone())),
Payload::RouterKey(key) => {
if version >= 1 {
Some(Payload::RouterKey(key.clone()))
} else {
None
}
}
Payload::Aspa(aspa) => {
if version >= 2 {
Some(Payload::Aspa(aspa.clone()))
} else {
None
}
}
}
snapshot.project_for_version(version)
}
fn estimate_snapshot_payload_wire_size(snapshot: &Snapshot) -> usize {

157
src/rtr/cache/model.rs vendored
View File

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
@ -69,9 +69,9 @@ impl<'de> Deserialize<'de> for DualTime {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
origins: BTreeSet<RouteOrigin>,
router_keys: BTreeSet<RouterKey>,
aspas: BTreeSet<Aspa>,
origins: Arc<Vec<RouteOrigin>>,
router_keys: Arc<Vec<RouterKey>>,
aspas: Arc<Vec<Aspa>>,
created_at: DualTime,
origins_hash: [u8; 32],
router_keys_hash: [u8; 32],
@ -83,14 +83,26 @@ pub struct Snapshot {
impl Snapshot {
pub fn new(
origins: BTreeSet<RouteOrigin>,
router_keys: BTreeSet<RouterKey>,
aspas: BTreeSet<Aspa>,
origins: Vec<RouteOrigin>,
router_keys: Vec<RouterKey>,
aspas: Vec<Aspa>,
) -> Self {
Self::from_shared_parts(
Arc::new(sorted_dedup(origins)),
Arc::new(sorted_dedup(router_keys)),
Arc::new(normalize_aspas(aspas)),
)
}
fn from_shared_parts(
origins: Arc<Vec<RouteOrigin>>,
router_keys: Arc<Vec<RouterKey>>,
aspas: Arc<Vec<Aspa>>,
) -> Self {
let mut snapshot = Snapshot {
origins,
router_keys,
aspas: normalize_aspas(aspas),
aspas,
created_at: DualTime::now(),
origins_hash: [0u8; 32],
router_keys_hash: [0u8; 32],
@ -105,21 +117,21 @@ impl Snapshot {
}
pub fn empty() -> Self {
Self::new(BTreeSet::new(), BTreeSet::new(), BTreeSet::new())
Self::new(Vec::new(), Vec::new(), Vec::new())
}
pub fn from_payloads(payloads: Vec<Payload>) -> Self {
let mut origins = BTreeSet::new();
let mut router_keys = BTreeSet::new();
let mut origins = Vec::new();
let mut router_keys = Vec::new();
let mut aspas = Vec::new();
for p in payloads {
match p {
Payload::RouteOrigin(o) => {
origins.insert(o);
origins.push(o);
}
Payload::RouterKey(k) => {
router_keys.insert(k);
router_keys.push(k);
}
Payload::Aspa(a) => {
aspas.push(a);
@ -130,6 +142,21 @@ impl Snapshot {
Snapshot::new(origins, router_keys, normalize_aspas(aspas))
}
pub fn project_for_version(&self, version: u8) -> Self {
let router_keys = if version >= 1 {
self.router_keys.clone()
} else {
Arc::new(Vec::new())
};
let aspas = if version >= 2 {
self.aspas.clone()
} else {
Arc::new(Vec::new())
};
Self::from_shared_parts(self.origins.clone(), router_keys, aspas)
}
pub fn recompute_hashes(&mut self) {
self.origins_hash = self.compute_origins_hash();
self.router_keys_hash = self.compute_router_keys_hash();
@ -182,21 +209,23 @@ impl Snapshot {
let mut withdrawn = Vec::new();
if !self.same_origins(new_snapshot) {
for origin in new_snapshot.origins.difference(&self.origins) {
announced.push(Payload::RouteOrigin(origin.clone()));
}
for origin in self.origins.difference(&new_snapshot.origins) {
withdrawn.push(Payload::RouteOrigin(origin.clone()));
}
diff_sorted(
self.origins.as_slice(),
new_snapshot.origins.as_slice(),
&mut announced,
&mut withdrawn,
Payload::RouteOrigin,
);
}
if !self.same_router_keys(new_snapshot) {
for key in new_snapshot.router_keys.difference(&self.router_keys) {
announced.push(Payload::RouterKey(key.clone()));
}
for key in self.router_keys.difference(&new_snapshot.router_keys) {
withdrawn.push(Payload::RouterKey(key.clone()));
}
diff_sorted(
self.router_keys.as_slice(),
new_snapshot.router_keys.as_slice(),
&mut announced,
&mut withdrawn,
Payload::RouterKey,
);
}
if !self.same_aspas(new_snapshot) {
@ -268,16 +297,16 @@ impl Snapshot {
self.snapshot_hash == other.snapshot_hash
}
pub fn origins(&self) -> &BTreeSet<RouteOrigin> {
&self.origins
pub fn origins(&self) -> &[RouteOrigin] {
self.origins.as_ref()
}
pub fn router_keys(&self) -> &BTreeSet<RouterKey> {
&self.router_keys
pub fn router_keys(&self) -> &[RouterKey] {
self.router_keys.as_ref()
}
pub fn aspas(&self) -> &BTreeSet<Aspa> {
&self.aspas
pub fn aspas(&self) -> &[Aspa] {
self.aspas.as_ref()
}
pub fn is_empty(&self) -> bool {
@ -369,15 +398,15 @@ fn build_snapshot_payloads_for_rtr(snapshot: &Snapshot) -> Vec<Payload> {
}
fn dedup_payloads(payloads: &mut Vec<Payload>) {
let mut seen = BTreeSet::new();
payloads.retain(|p| seen.insert(p.clone()));
payloads.sort();
payloads.dedup();
}
fn normalize_aspas<I>(aspas: I) -> BTreeSet<Aspa>
fn normalize_aspas<I>(aspas: I) -> Vec<Aspa>
where
I: IntoIterator<Item = Aspa>,
{
let mut by_customer = BTreeMap::<u32, BTreeSet<_>>::new();
let mut by_customer = BTreeMap::<u32, Vec<_>>::new();
for aspa in aspas {
let providers = by_customer
@ -386,17 +415,19 @@ where
providers.extend(aspa.provider_asns().iter().copied());
}
by_customer
let mut normalized = by_customer
.into_iter()
.map(|(customer_asn, providers)| {
Aspa::new(customer_asn.into(), providers.into_iter().collect())
Aspa::new(customer_asn.into(), providers)
})
.collect()
.collect::<Vec<_>>();
normalized.sort();
normalized
}
fn diff_aspas(
current: &BTreeSet<Aspa>,
next: &BTreeSet<Aspa>,
current: &[Aspa],
next: &[Aspa],
announced: &mut Vec<Payload>,
withdrawn: &mut Vec<Payload>,
) {
@ -413,7 +444,7 @@ fn diff_aspas(
.keys()
.chain(next.keys())
.copied()
.collect::<BTreeSet<_>>();
.collect::<std::collections::BTreeSet<_>>();
for customer in customers {
match (current.get(&customer), next.get(&customer)) {
@ -426,3 +457,47 @@ fn diff_aspas(
}
}
}
fn sorted_dedup<T: Ord>(mut items: Vec<T>) -> Vec<T> {
items.sort();
items.dedup();
items
}
fn diff_sorted<T, F>(
current: &[T],
next: &[T],
announced: &mut Vec<Payload>,
withdrawn: &mut Vec<Payload>,
wrap: F,
) where
T: Ord + Clone,
F: Fn(T) -> Payload,
{
let mut i = 0usize;
let mut j = 0usize;
while i < current.len() && j < next.len() {
match current[i].cmp(&next[j]) {
std::cmp::Ordering::Less => {
withdrawn.push(wrap(current[i].clone()));
i += 1;
}
std::cmp::Ordering::Greater => {
announced.push(wrap(next[j].clone()));
j += 1;
}
std::cmp::Ordering::Equal => {
i += 1;
j += 1;
}
}
}
while i < current.len() {
withdrawn.push(wrap(current[i].clone()));
i += 1;
}
while j < next.len() {
announced.push(wrap(next[j].clone()));
j += 1;
}
}

View File

@ -213,14 +213,5 @@ fn persist_update_job(job: StoreSyncJob) {
}
fn project_snapshot_for_version(snapshot: &Snapshot, version: u8) -> Snapshot {
let mut payloads = Vec::new();
for payload in snapshot.payloads() {
match payload {
Payload::RouteOrigin(_) => payloads.push(payload),
Payload::RouterKey(_) if version >= 1 => payloads.push(payload),
Payload::Aspa(_) if version >= 2 => payloads.push(payload),
_ => {}
}
}
Snapshot::from_payloads(payloads)
snapshot.project_for_version(version)
}