增加bird部署;

优化内存占用
This commit is contained in:
xiuting.xu 2026-05-06 09:49:41 +08:00
parent 88c11e7a97
commit 6e4b59a208
12 changed files with 459 additions and 249 deletions

View File

@ -0,0 +1,39 @@
-----BEGIN RSA PRIVATE KEY-----
MIIG4wIBAAKCAYEAvA91otbecwZFtDiZcV2m/A8RP9C+r1DmaDi/1tBaqleNRt3W
UCdMPEF5NGCBWq/52tWwL3rBXEEp0gcEHYoTZ5djAlAjLaQphbFGQU/h3WWDEVOl
GFnGCc4DP26SRp/oC01QpOlgT9Z/m2xkMZGhZo9oR2v7+ARmF9Qrxlj9HjuDWmZI
Onl5DGXmbI0sBOs1PMtOvmU2aJNKVU3hD335Z0J9c5NubuQQtS/vl0XJQYgbCMh9
4BlbEBppJ6LyzcILbUAfayl2U9sQGErf9CjkwOuITpRBO+f10BQSpGjznRYCX4Yf
71lhH1ri6lOKhupOW1wgC6kkQTcvI9hub1ePLtmOiWUCfnrc2eZljNssqKWYC9+5
v9Ya6+aikSlOheUoMJP1C50PTMnMqRfNGLn3yFKTT/CaQBw1TMO/+Rfsb80Oj9O3
yF9zCHTFrUsYn9T2tupRi/aYWfLe3ckqTBLLhSWLfiY9S49U/fdP5Sk8l4JsAyxE
9oH6RokJIrZxR09pAgMBAAECggGABSIjOvtEWLLwjFC+3YGUg+JcZ+0d22FpB1TE
kHZLrJqAXybB3sUSghLjyP/OYB2YGMwG8mYr04XC1g++IhlJWiVKJ6oXf7uGAkx6
Z2DTwJbwsDR5N/QxYUX36uTCMOkqAhmi7HwjVnfmL/r83tPZFO3ase0QacQi+PO1
cIVa1BcEnknW9GOpn81DRYNyqBK3N2OG+3r99HZQITUjl32VOnyYjzhUcMn1wSBf
w6BnyTZleISn159m5TEMsu8yKKwrLDNe5pyXjLk1HpU8fDo/CoDq+i4KJ2V2Ns4j
+zg2EMqW1EQNI/BYDtOYlL+m5dGEST50Oqq7OiopvchKjIkvgFr+08vlyCyjbryv
DF5B6VEkk1iEacDKK/NyHdjXPmGBvlEeBixOOpS5dBb6m0KPx9C9jTi/tTRBnH4t
7qAko/LoYfBgjyRqDyqeUKrvVp/3NOmkQefaHttr9abhotjmtGqAvJfN1VfFY5e1
s6Ga7yJno5tyS7RArw3OA6t2n2S1AoHBAN8mMuuZQL5xNY2bpn5O9u19QaoG3rRR
Ma6nEO2QPxM+hB8QR2fYAZwgHr38B+keodf/Xe02Xkf+urd2t+eHpoMbzZeEopaM
hNDEo1mYaIl0lgQLy1RzpvfV0B+gy808FOut6YDESyJ4sxGuP3E7CIn+QfFh9Uht
6I/ZyDfM87Q2LlP8ES/EecHfVO97l1gOJ5T9j1bnTTVgM4zJqAWiqlLyUcS56sos
7NPZbYF81ViR4BI/3rthtoUUY8hoSmfXFQKBwQDXvt8oC3Inmr729nrsZCb72fbG
8T9DieUW8MCAwvv4c1DEA2aATgLpidH+NJJyCsR7Ph05AvcSpiUgyF25TeTliBZu
YUaUDE8JuTFTEcjkqpPj62MX8C8J3DjN0TG0fI1ltousef+Ind64Fdh2kPKl2YfE
F/95pc+/y0VbXl8rcQyYCQuo+4ooBIbq1BhprzKC6Tdg2HsvCYrz3EzOJHHD/iLZ
jlHjXAeKo0MjVyDRDy3ollxDg7a1ESGwewVGrAUCgcEAkyeLmNY9XdkmKbYlO1Iz
gDWDoe5Z2qtYigZeIWtoTPaDBkEj9ZT6qsx7uWmEhfTPYbNja1TZI70VwHqnmCVa
Z0dkcrDiz0jnJQ0nc6QP++VIMG7erVh/GRyE3Paar0MZwLm1LFdF/Pt/iv9PkwoM
/YMQVW/14sen/4Tshe/AHm1Ea9nkM43rhTATwMHN8iGTdKspZXOu9K32ELfC98Rn
cb+esI6yCJVLsADIOAXsYzX/f8lixksPo/7oNnaf6o5pAoHAcxBooknge0DsAnkY
vET6Ca6JEDeQfyvnU+HZOZNEoZCXDcOmgl2Y7gdESSiuxkX53qUVHtf3ACo+eQqD
+hWSM9zt4bbd1o3uBYiQxvYgR9y1/YQIGLdPzxl17kdZFCKtowbF3Zo/cBSKuXFl
Pm82CsBGyY3nAnEtqOP3vLBLX7bueZOxdVGasJchgdLWpl77OJi7oVoz8LVNN5xS
FkuXpWlAGvntsK3fk9BZOVr5tKY83OR8lsGDy3Q0nJQwDy4lAoHAOqqDBGbV9oMZ
kBMnntl9FNE8h/UoprFEQKEAsU5Vbx7hrPkLGmgO6pWQceFPYXTdu2u8ictWCBoP
aogdoEsD59A6XHQ28OqMuf41njNc6E6kVBoFq9j9+for8EQsx5iH6Azd+rV67hxW
esLGn7H+P55t5s7/KgPOn2fZI4cHScvEJxNZMqqoUsq488by0Obf0qYmwpKh3HE3
1wybZj9ZTTFTDwqpyrWBUF2BLf/LGlalcrbbiYIHa1L8i5Gy6u6y
-----END RSA PRIVATE KEY-----

Binary file not shown.

2
certs/ssh_known_hosts Normal file
View File

@ -0,0 +1,2 @@
host.docker.internal ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDTymirIP0Z4Hth01pumXpwV9/fIe7v8Tq6y9b38FjTkeNhUNZJQIG0ssj7VnbDTT/wG44oQJnwTFJ8xbIwdYNBZzbBcvxlhk9CPOo4JZc2w230NcyKfsMZLcuDgMKyyXebdwwxGRBxoqUZt04swlrK3cAEd/6Wi9ErwP1Tp4YAKxfj4XvOhJvmWE1q4isffb2zEqrkcXycqBAQKK9zjO+xQF0VfFJqTec1GYD5JznwjAF/yWK/57FZgdQKuH9jdDuzykTl7G891uSkRKTbN7uwhqtnkTergVhk/vDoxz8En0FXP5iaQhVwO2bTDAYwUvnpt/HaXzsaBlGYeNZX7q+dcLqKQfcokj7IKqGIvWtShpWBYKkexSwXpkeVk/0ajRAfdkCjSkMaerEkk9lrbNQ5oXrHEXchQ61n+gxkY2pf5a6Jcq6pMwtwvs1xP5ZBdiTXNFwdb0InxeYKXjFLKYQDuQjs5sbAnVV+L+hwhSOC0zF1g5ZoZgC4WnRpjj+e/e0=
[host.docker.internal]:22 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDTymirIP0Z4Hth01pumXpwV9/fIe7v8Tq6y9b38FjTkeNhUNZJQIG0ssj7VnbDTT/wG44oQJnwTFJ8xbIwdYNBZzbBcvxlhk9CPOo4JZc2w230NcyKfsMZLcuDgMKyyXebdwwxGRBxoqUZt04swlrK3cAEd/6Wi9ErwP1Tp4YAKxfj4XvOhJvmWE1q4isffb2zEqrkcXycqBAQKK9zjO+xQF0VfFJqTec1GYD5JznwjAF/yWK/57FZgdQKuH9jdDuzykTl7G891uSkRKTbN7uwhqtnkTergVhk/vDoxz8En0FXP5iaQhVwO2bTDAYwUvnpt/HaXzsaBlGYeNZX7q+dcLqKQfcokj7IKqGIvWtShpWBYKkexSwXpkeVk/0ajRAfdkCjSkMaerEkk9lrbNQ5oXrHEXchQ61n+gxkY2pf5a6Jcq6pMwtwvs1xP5ZBdiTXNFwdb0InxeYKXjFLKYQDuQjs5sbAnVV+L+hwhSOC0zF1g5ZoZgC4WnRpjj+e/e0=

Binary file not shown.

View File

@ -16,8 +16,8 @@ Server defaults in this repo:
- `docker-compose.yml`: one-click local TCP test client. - `docker-compose.yml`: one-click local TCP test client.
- `docker-compose.ssh.yml`: compose override for SSH transport. - `docker-compose.ssh.yml`: compose override for SSH transport.
By default, the container prints periodic RPKI protocol snapshots to logs By default, the container uses event-driven observation and prints snapshots
every 30 seconds. only when BIRD reports RPKI-related changes.
## Docker quick start ## Docker quick start
@ -77,7 +77,10 @@ docker logs -f bird-rpki-client
- `network_mode: host` expects your RTR server to be reachable at - `network_mode: host` expects your RTR server to be reachable at
`host.docker.internal:323` from the container. `host.docker.internal:323` from the container.
- Observation is controlled by env vars: - Observation is controlled by env vars:
`OBSERVE_INTERVAL` (seconds, default `30`) and `OBSERVE_PROTO`. `OBSERVE_MODE` (`event` by default, `interval` as fallback),
`OBSERVE_DEBOUNCE_SECS` (default `1`),
`OBSERVE_INTERVAL` (seconds, used when `OBSERVE_MODE=interval`),
and `OBSERVE_PROTO`.
- SSH mode mounts `../../certs` into `/config/ssh` and expects: - SSH mode mounts `../../certs` into `/config/ssh` and expects:
`bird-rtr-client.pem` and `ssh_host_rsa_key.pub`. `bird-rtr-client.pem` and `ssh_host_rsa_key.pub`.
- Entrypoint auto-generates `/run/bird/known_hosts` from - Entrypoint auto-generates `/run/bird/known_hosts` from

View File

@ -15,6 +15,8 @@ services:
RPKI_PORT: "323" RPKI_PORT: "323"
OBSERVE_PROTO: "rpki_tcp" OBSERVE_PROTO: "rpki_tcp"
OBSERVE_MODE: "event"
OBSERVE_DEBOUNCE_SECS: "1"
OBSERVE_INTERVAL: "30" OBSERVE_INTERVAL: "30"
OBSERVE_ASPA_TABLE: "rtr_aspa" OBSERVE_ASPA_TABLE: "rtr_aspa"

View File

@ -6,6 +6,8 @@ mkdir -p /run/bird
SOCK_PATH="/run/bird/bird.ctl" SOCK_PATH="/run/bird/bird.ctl"
PROTO="${OBSERVE_PROTO:-rpki_tcp}" PROTO="${OBSERVE_PROTO:-rpki_tcp}"
INTERVAL="${OBSERVE_INTERVAL:-30}" INTERVAL="${OBSERVE_INTERVAL:-30}"
MODE="${OBSERVE_MODE:-event}"
DEBOUNCE_SECS="${OBSERVE_DEBOUNCE_SECS:-1}"
RPKI_HOST="${RPKI_HOST:-host.docker.internal}" RPKI_HOST="${RPKI_HOST:-host.docker.internal}"
RPKI_PORT="${RPKI_PORT:-323}" RPKI_PORT="${RPKI_PORT:-323}"
@ -101,10 +103,37 @@ print_first_n_objects() {
' || true ' || true
} }
print_snapshot() {
echo "==== $(date -u +"%Y-%m-%dT%H:%M:%SZ") RPKI snapshot ($PROTO) ===="
birdc -s "$SOCK_PATH" show protocols all "$PROTO" || true
if [ "$SHOW_ASPA" = "1" ]; then
echo "---- ASPA table ($ASPA_TABLE, first ${ASPA_COUNT} objects) ----"
print_first_n_objects "$ASPA_TABLE" "$ASPA_COUNT"
fi
if [ "$SHOW_ROA4" = "1" ]; then
echo "---- ROA4 table ($ROA4_TABLE, first ${ROA4_COUNT} objects) ----"
print_first_n_objects "$ROA4_TABLE" "$ROA4_COUNT"
fi
if [ "$SHOW_ROA6" = "1" ]; then
echo "---- ROA6 table ($ROA6_TABLE, first ${ROA6_COUNT} objects) ----"
print_first_n_objects "$ROA6_TABLE" "$ROA6_COUNT"
fi
}
is_rpki_related_event() {
line="$1"
echo "$line" | grep -Eiq "$PROTO|$ASPA_TABLE|$ROA4_TABLE|$ROA6_TABLE|rpki|rtr"
}
echo "[entrypoint] starting bird" echo "[entrypoint] starting bird"
echo "[entrypoint] config : $BIRD_CONFIG_PATH" echo "[entrypoint] config : $BIRD_CONFIG_PATH"
echo "[entrypoint] observe proto : $PROTO" echo "[entrypoint] observe proto : $PROTO"
echo "[entrypoint] observe interval : $INTERVAL" echo "[entrypoint] observe mode : $MODE"
echo "[entrypoint] observe interval : $INTERVAL (used when mode=interval)"
echo "[entrypoint] debounce secs : $DEBOUNCE_SECS (used when mode=event)"
echo "[entrypoint] target : $RPKI_HOST:$RPKI_PORT" echo "[entrypoint] target : $RPKI_HOST:$RPKI_PORT"
echo "[entrypoint] show aspa : $SHOW_ASPA ($ASPA_TABLE, first $ASPA_COUNT objects)" echo "[entrypoint] show aspa : $SHOW_ASPA ($ASPA_TABLE, first $ASPA_COUNT objects)"
echo "[entrypoint] show roa4 : $SHOW_ROA4 ($ROA4_TABLE, first $ROA4_COUNT objects)" echo "[entrypoint] show roa4 : $SHOW_ROA4 ($ROA4_TABLE, first $ROA4_COUNT objects)"
@ -131,26 +160,41 @@ case "$INTERVAL" in
;; ;;
esac esac
if [ "$INTERVAL" -gt 0 ]; then case "$MODE" in
event)
case "$DEBOUNCE_SECS" in
''|*[!0-9]*)
DEBOUNCE_SECS=1
;;
esac
echo "[entrypoint] waiting for BIRD monitor events and only printing on RPKI-related changes"
LAST_PRINT_TS=0
if birdc -s "$SOCK_PATH" monitor all 2>/dev/null | while IFS= read -r line; do
[ -n "$line" ] || continue
if ! is_rpki_related_event "$line"; then
continue
fi
now_ts="$(date +%s)"
if [ $((now_ts - LAST_PRINT_TS)) -lt "$DEBOUNCE_SECS" ]; then
continue
fi
LAST_PRINT_TS="$now_ts"
echo "[monitor] $line"
print_snapshot
done; then
:
else
echo "[entrypoint] WARNING: birdc monitor failed, fallback to interval polling"
MODE="interval"
fi
;;
esac
if [ "$MODE" = "interval" ] && [ "$INTERVAL" -gt 0 ]; then
while kill -0 "$BIRD_PID" 2>/dev/null; do while kill -0 "$BIRD_PID" 2>/dev/null; do
echo "==== $(date -u +"%Y-%m-%dT%H:%M:%SZ") RPKI snapshot ($PROTO) ====" print_snapshot
birdc -s "$SOCK_PATH" show protocols all "$PROTO" || true
if [ "$SHOW_ASPA" = "1" ]; then
echo "---- ASPA table ($ASPA_TABLE, first ${ASPA_COUNT} objects) ----"
print_first_n_objects "$ASPA_TABLE" "$ASPA_COUNT"
fi
if [ "$SHOW_ROA4" = "1" ]; then
echo "---- ROA4 table ($ROA4_TABLE, first ${ROA4_COUNT} objects) ----"
print_first_n_objects "$ROA4_TABLE" "$ROA4_COUNT"
fi
if [ "$SHOW_ROA6" = "1" ]; then
echo "---- ROA6 table ($ROA6_TABLE, first ${ROA6_COUNT} objects) ----"
print_first_n_objects "$ROA6_TABLE" "$ROA6_COUNT"
fi
sleep "$INTERVAL" sleep "$INTERVAL"
done done
fi fi

View File

@ -1,9 +1,10 @@
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{Result, anyhow}; use anyhow::{Result, anyhow};
use arc_swap::ArcSwap;
use chrono::{FixedOffset, Utc}; use chrono::{FixedOffset, Utc};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{info, warn}; use tracing::{info, warn};
@ -308,19 +309,13 @@ fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result<SharedRtrCa
source_to_delta_started.elapsed().as_millis() source_to_delta_started.elapsed().as_millis()
); );
let shared_cache: SharedRtrCache = Arc::new(RwLock::new(initial_cache)); let shared_cache: SharedRtrCache = Arc::new(ArcSwap::from_pointee(initial_cache));
let cache = shared_cache.load_full();
{
let cache = shared_cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned during startup"))?;
info!( info!(
"cache initialized: session_ids={:?}, serials={:?}", "cache initialized: session_ids={:?}, serials={:?}",
cache.session_ids(), cache.session_ids(),
cache.serials() cache.serials()
); );
}
Ok(shared_cache) Ok(shared_cache)
} }
@ -391,19 +386,14 @@ fn spawn_refresh_task(
Ok(payloads) => { Ok(payloads) => {
let payload_count = payloads.len(); let payload_count = payloads.len();
let updated = { let updated = {
let mut cache = match shared_cache.write() { let old_cache = shared_cache.load_full();
Ok(guard) => guard, let old_serial = old_cache.serial_for_version(2);
Err(_) => { let mut next_cache = old_cache.as_ref().clone();
warn!("cache write lock poisoned during refresh");
continue;
}
};
let old_serial = cache.serial_for_version(2); match next_cache.update(payloads, &store) {
match cache.update(payloads, &store) {
Ok(()) => { Ok(()) => {
let new_serial = cache.serial_for_version(2); let new_serial = next_cache.serial_for_version(2);
shared_cache.store(std::sync::Arc::new(next_cache));
if new_serial != old_serial { if new_serial != old_serial {
info!( info!(
"RTR cache refresh applied: ccr_dir={}, payload_count={}, old_serial={}, new_serial={}", "RTR cache refresh applied: ccr_dir={}, payload_count={}, old_serial={}, new_serial={}",

21
src/rtr/cache/core.rs vendored
View File

@ -58,21 +58,25 @@ pub struct VersionState {
serial: u32, serial: u32,
snapshot: Snapshot, snapshot: Snapshot,
#[serde(skip)] #[serde(skip)]
rtr_payloads: Arc<Vec<Payload>>,
#[serde(skip)]
deltas: VecDeque<Arc<Delta>>, deltas: VecDeque<Arc<Delta>>,
} }
impl VersionState { impl VersionState {
fn new(session_id: u16, serial: u32, snapshot: Snapshot, max_delta: u8) -> Self { fn new(session_id: u16, serial: u32, snapshot: Snapshot, max_delta: u8) -> Self {
let rtr_payloads = Arc::new(snapshot.payloads_for_rtr());
Self { Self {
session_id, session_id,
serial, serial,
snapshot, snapshot,
rtr_payloads,
deltas: VecDeque::with_capacity(max_delta as usize), deltas: VecDeque::with_capacity(max_delta as usize),
} }
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct RtrCache { pub struct RtrCache {
availability: CacheAvailability, availability: CacheAvailability,
versions: [VersionState; VERSION_COUNT], versions: [VersionState; VERSION_COUNT],
@ -190,11 +194,15 @@ impl RtrCacheBuilder {
std::array::from_fn(|_| VecDeque::with_capacity(max_delta as usize)) std::array::from_fn(|_| VecDeque::with_capacity(max_delta as usize))
}); });
let versions = std::array::from_fn(|idx| VersionState { let versions = std::array::from_fn(|idx| {
let snapshot = snapshots[idx].clone();
VersionState {
session_id: session_ids.as_array()[idx], session_id: session_ids.as_array()[idx],
serial: serials[idx], serial: serials[idx],
snapshot: snapshots[idx].clone(), rtr_payloads: Arc::new(snapshot.payloads_for_rtr()),
snapshot,
deltas: deltas[idx].clone(), deltas: deltas[idx].clone(),
}
}); });
let created_at = self.created_at.unwrap_or_else(|| now.clone()); let created_at = self.created_at.unwrap_or_else(|| now.clone());
@ -222,6 +230,7 @@ impl RtrCache {
self.availability = CacheAvailability::NoDataAvailable; self.availability = CacheAvailability::NoDataAvailable;
for version_state in &mut self.versions { for version_state in &mut self.versions {
version_state.snapshot = Snapshot::empty(); version_state.snapshot = Snapshot::empty();
version_state.rtr_payloads = Arc::new(Vec::new());
version_state.deltas.clear(); version_state.deltas.clear();
} }
} }
@ -238,6 +247,7 @@ impl RtrCache {
state.session_id = new_session_ids.get(v); state.session_id = new_session_ids.get(v);
state.serial = 1; state.serial = 1;
state.snapshot = project_snapshot_for_version(source_snapshot, v); state.snapshot = project_snapshot_for_version(source_snapshot, v);
state.rtr_payloads = Arc::new(state.snapshot.payloads_for_rtr());
state.deltas.clear(); state.deltas.clear();
} }
self.last_update_end = DualTime::now(); self.last_update_end = DualTime::now();
@ -352,6 +362,7 @@ impl RtrCache {
} }
state.snapshot = projected; state.snapshot = projected;
state.rtr_payloads = Arc::new(state.snapshot.payloads_for_rtr());
Self::push_delta( Self::push_delta(
state, state,
self.max_delta, self.max_delta,
@ -427,6 +438,10 @@ impl RtrCache {
self.versions[version_index(version)].snapshot.clone() self.versions[version_index(version)].snapshot.clone()
} }
pub fn rtr_payloads_for_version(&self, version: u8) -> Arc<Vec<Payload>> {
self.versions[version_index(version)].rtr_payloads.clone()
}
pub fn serial_for_version(&self, version: u8) -> u32 { pub fn serial_for_version(&self, version: u8) -> u32 {
self.versions[version_index(version)].serial self.versions[version_index(version)].serial
} }

View File

@ -9,6 +9,8 @@ pub use ordering::{
OrderingViolation, validate_payload_updates_for_rtr, validate_payloads_for_rtr, OrderingViolation, validate_payload_updates_for_rtr, validate_payloads_for_rtr,
}; };
use std::sync::{Arc, RwLock}; use std::sync::Arc;
pub type SharedRtrCache = Arc<RwLock<RtrCache>>; use arc_swap::ArcSwap;
pub type SharedRtrCache = Arc<ArcSwap<RtrCache>>;

View File

@ -1,4 +1,5 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::sync::OnceLock;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
@ -282,6 +283,8 @@ pub struct Delta {
announced: Vec<Payload>, announced: Vec<Payload>,
withdrawn: Vec<Payload>, withdrawn: Vec<Payload>,
created_at: DualTime, created_at: DualTime,
#[serde(skip)]
updates_for_rtr: OnceLock<Vec<(bool, Payload)>>,
} }
impl Delta { impl Delta {
@ -291,12 +294,16 @@ impl Delta {
sort_payloads_for_rtr(&mut announced, true); sort_payloads_for_rtr(&mut announced, true);
sort_payloads_for_rtr(&mut withdrawn, false); sort_payloads_for_rtr(&mut withdrawn, false);
let cached_updates = build_payload_updates_for_rtr(&announced, &withdrawn);
let updates_for_rtr = OnceLock::new();
let _ = updates_for_rtr.set(cached_updates);
Delta { Delta {
serial, serial,
announced, announced,
withdrawn, withdrawn,
created_at: DualTime::now(), created_at: DualTime::now(),
updates_for_rtr,
} }
} }
@ -320,18 +327,31 @@ impl Delta {
self.announced.is_empty() && self.withdrawn.is_empty() self.announced.is_empty() && self.withdrawn.is_empty()
} }
pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> { pub fn payload_updates_for_rtr(&self) -> &[(bool, Payload)] {
let mut updates = Vec::with_capacity(self.announced.len() + self.withdrawn.len()); self.updates_for_rtr
.get_or_init(|| build_payload_updates_for_rtr(&self.announced, &self.withdrawn))
.as_slice()
}
updates.extend(self.announced.iter().cloned().map(|p| (true, p))); pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> {
updates.extend(self.withdrawn.iter().cloned().map(|p| (false, p))); self.payload_updates_for_rtr().to_vec()
}
}
fn build_payload_updates_for_rtr(
announced: &[Payload],
withdrawn: &[Payload],
) -> Vec<(bool, Payload)> {
let mut updates = Vec::with_capacity(announced.len() + withdrawn.len());
updates.extend(announced.iter().cloned().map(|p| (true, p)));
updates.extend(withdrawn.iter().cloned().map(|p| (false, p)));
updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| { updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| {
compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd) compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd)
}); });
updates updates
}
} }
fn dedup_payloads(payloads: &mut Vec<Payload>) { fn dedup_payloads(payloads: &mut Vec<Payload>) {

View File

@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{Result, anyhow, bail}; use anyhow::{Result, anyhow, bail};
@ -14,7 +16,7 @@ use crate::rtr::cache::{
validate_payloads_for_rtr, validate_payloads_for_rtr,
}; };
use crate::rtr::error_type::ErrorCode; use crate::rtr::error_type::ErrorCode;
use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey}; use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey, Timing};
use crate::rtr::pdu::{ use crate::rtr::pdu::{
Aspa as AspaPdu, CacheReset, CacheResponse, EndOfData, ErrorReport, Flags, HEADER_LEN, Header, Aspa as AspaPdu, CacheReset, CacheResponse, EndOfData, ErrorReport, Flags, HEADER_LEN, Header,
IPv4Prefix, IPv6Prefix, ResetQuery, RouterKey as RouterKeyPdu, SerialNotify, SerialQuery, IPv4Prefix, IPv6Prefix, ResetQuery, RouterKey as RouterKeyPdu, SerialNotify, SerialQuery,
@ -28,6 +30,83 @@ const WITHDRAW_FLAG: u8 = 0;
/// Per-session notify rate limit: no more than once per minute. /// Per-session notify rate limit: no more than once per minute.
const NOTIFY_MIN_INTERVAL: Duration = Duration::from_secs(60); const NOTIFY_MIN_INTERVAL: Duration = Duration::from_secs(60);
/// Limit captured offending PDU bytes in memory for error reporting.
const MAX_OFFENDING_PDU_CAPTURE: usize = 4096;
/// Log rate limiting window and burst for repeated high-frequency events.
const LOG_RATE_LIMIT_WINDOW: Duration = Duration::from_secs(30);
const LOG_RATE_LIMIT_BURST: u32 = 10;
static SESSION_LOG_LIMITERS: OnceLock<Mutex<HashMap<&'static str, LogBucket>>> = OnceLock::new();
#[derive(Debug, Clone, Copy)]
struct LogBucket {
window_start: Instant,
emitted: u32,
suppressed: u32,
}
fn allow_rate_limited_log(event: &'static str) -> Option<u32> {
let now = Instant::now();
let lock = SESSION_LOG_LIMITERS.get_or_init(|| Mutex::new(HashMap::new()));
let mut map = match lock.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let bucket = map.entry(event).or_insert_with(|| LogBucket {
window_start: now,
emitted: 0,
suppressed: 0,
});
if now.duration_since(bucket.window_start) >= LOG_RATE_LIMIT_WINDOW {
let suppressed = bucket.suppressed;
bucket.window_start = now;
bucket.emitted = 1;
bucket.suppressed = 0;
return Some(suppressed);
}
if bucket.emitted < LOG_RATE_LIMIT_BURST {
bucket.emitted += 1;
let suppressed = bucket.suppressed;
bucket.suppressed = 0;
Some(suppressed)
} else {
bucket.suppressed += 1;
None
}
}
fn warn_rate_limited<F>(event: &'static str, build: F)
where
F: FnOnce() -> String,
{
if let Some(suppressed) = allow_rate_limited_log(event) {
if suppressed > 0 {
warn!(
"RTR session log rate-limited: event={}, suppressed_count={}",
event, suppressed
);
}
warn!("{}", build());
}
}
fn error_rate_limited<F>(event: &'static str, build: F)
where
F: FnOnce() -> String,
{
if let Some(suppressed) = allow_rate_limited_log(event) {
if suppressed > 0 {
warn!(
"RTR session log rate-limited: event={}, suppressed_count={}",
event, suppressed
);
}
error!("{}", build());
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SessionState { enum SessionState {
@ -126,16 +205,25 @@ where
return Ok(()); return Ok(());
} }
Err(err) if err.kind() == io::ErrorKind::TimedOut => { Err(err) if err.kind() == io::ErrorKind::TimedOut => {
warn!("RTR session transport timeout while waiting for header: {}", self.session_summary()); warn_rate_limited("session_header_timeout_waiting", || {
format!(
"RTR session transport timeout while waiting for header: state={}, negotiated_version={:?}",
self.state_name(),
self.version
)
});
self.handle_transport_timeout(&[]).await?; self.handle_transport_timeout(&[]).await?;
return Ok(()); return Ok(());
} }
Err(err) => { Err(err) => {
warn!( warn_rate_limited("session_header_read_failed", || {
"RTR session failed to read header: err={}, {}", format!(
"RTR session failed to read header: err={}, state={}, negotiated_version={:?}",
err, err,
self.session_summary() self.state_name(),
); self.version
)
});
self.state = SessionState::Closed; self.state = SessionState::Closed;
return Ok(()); return Ok(());
} }
@ -143,11 +231,14 @@ where
let header = match Header::from_raw(raw_header) { let header = match Header::from_raw(raw_header) {
Ok(h) => h, Ok(h) => h,
Err(err) => { Err(err) => {
warn!( warn_rate_limited("session_header_invalid", || {
"RTR session received invalid header: err={}, {}", format!(
"RTR session received invalid header: err={}, state={}, negotiated_version={:?}",
err, err,
self.session_summary() self.state_name(),
); self.version
)
});
self.handle_header_read_error(raw_header, err).await?; self.handle_header_read_error(raw_header, err).await?;
return Ok(()); return Ok(());
} }
@ -244,11 +335,14 @@ where
) )
.await?; .await?;
warn!( warn_rate_limited("session_unsupported_pdu", || {
"RTR session received unsupported PDU type {}, closing session: {}", format!(
"RTR session received unsupported PDU type {}, closing session: state={}, negotiated_version={:?}",
header.pdu(), header.pdu(),
self.session_summary() self.state_name(),
); self.version
)
});
self.state = SessionState::Closed; self.state = SessionState::Closed;
return Ok(()); return Ok(());
} }
@ -259,15 +353,29 @@ where
if self.state == SessionState::Established && self.version.is_some() => { if self.state == SessionState::Established && self.version.is_some() => {
match notify_res { match notify_res {
Ok(()) => { Ok(()) => {
debug!("RTR session handling cache update notify: {}", self.session_summary()); debug!(
"RTR session handling cache update notify: state={}, negotiated_version={:?}",
self.state_name(),
self.version
);
self.handle_notify().await?; self.handle_notify().await?;
} }
Err(broadcast::error::RecvError::Lagged(_)) => { Err(broadcast::error::RecvError::Lagged(_)) => {
warn!("RTR session lagged on notify channel, forcing notify handling: {}", self.session_summary()); warn_rate_limited("session_notify_lagged", || {
format!(
"RTR session lagged on notify channel, forcing notify handling: state={}, negotiated_version={:?}",
self.state_name(),
self.version
)
});
self.handle_notify().await?; self.handle_notify().await?;
} }
Err(broadcast::error::RecvError::Closed) => { Err(broadcast::error::RecvError::Closed) => {
debug!("RTR session notify channel closed, keeping session alive: {}", self.session_summary()); debug!(
"RTR session notify channel closed, keeping session alive: state={}, negotiated_version={:?}",
self.state_name(),
self.version
);
// keep session alive // keep session alive
} }
} }
@ -334,12 +442,14 @@ where
return Ok(()); return Ok(());
} }
warn!( warn_rate_limited("session_unexpected_protocol_version", || {
format!(
"RTR session received unexpected protocol version in established session: established_version={}, received_version={}, pdu={}", "RTR session received unexpected protocol version in established session: established_version={}, received_version={}, pdu={}",
established, established,
header.version(), header.version(),
header.pdu() header.pdu()
); )
});
if header.pdu() != ErrorReport::PDU { if header.pdu() != ErrorReport::PDU {
let offending = self.read_full_pdu_bytes(header).await?; let offending = self.read_full_pdu_bytes(header).await?;
@ -575,39 +685,37 @@ where
self.version, self.version,
offending_pdu.len() offending_pdu.len()
); );
let (data_available, payloads, session_id, serial) = { let (data_available, payloads, session_id, serial, timing) = {
let version = self.version()?; let version = self.version()?;
let cache = self let cache = self.cache.load_full();
.cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
let data_available = cache.is_data_available(); let data_available = cache.is_data_available();
let snapshot = cache.snapshot_for_version(version); let payloads = cache.rtr_payloads_for_version(version);
let payloads = snapshot.payloads_for_rtr();
let session_id = cache.session_id_for_version(version); let session_id = cache.session_id_for_version(version);
let serial = cache.serial_for_version(version); let serial = cache.serial_for_version(version);
(data_available, payloads, session_id, serial) let timing = cache.timing();
(data_available, payloads, session_id, serial, timing)
}; };
if !data_available { if !data_available {
self.send_no_data_available(offending_pdu, "cache data is not currently available") self.send_no_data_available(offending_pdu, "cache data is not currently available")
.await?; .await?;
debug!( debug!(
"RTR session replied No Data Available to Reset Query: {}", "RTR session replied No Data Available to Reset Query: negotiated_version={:?}",
self.session_summary() self.version
); );
return Ok(()); return Ok(());
} }
self.write_cache_response(session_id).await?; self.write_cache_response(session_id).await?;
self.send_payloads(&payloads, true).await?; self.send_payloads(payloads.as_slice(), true).await?;
self.write_end_of_data(session_id, serial).await?; self.write_end_of_data(session_id, serial, timing).await?;
debug!( debug!(
"RTR session completed Reset Query: response_session_id={}, response_serial={}, payload_count={}, {}", "RTR session completed Reset Query: response_session_id={}, response_serial={}, payload_count={}, state={}, negotiated_version={:?}",
session_id, session_id,
serial, serial,
payloads.len(), payloads.len(),
self.session_summary() self.state_name(),
self.version
); );
Ok(()) Ok(())
@ -627,25 +735,19 @@ where
client_serial, client_serial,
offending_pdu.len() offending_pdu.len()
); );
let (data_available, current_session) = { let cache = self.cache.load_full();
let cache = self let data_available = cache.is_data_available();
.cache let current_session = cache.session_id_for_version(version);
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
(
cache.is_data_available(),
cache.session_id_for_version(version),
)
};
if !data_available { if !data_available {
self.send_no_data_available(offending_pdu, "cache data is not currently available") self.send_no_data_available(offending_pdu, "cache data is not currently available")
.await?; .await?;
debug!( debug!(
"RTR session replied No Data Available to Serial Query: client_session_id={}, client_serial={}, {}", "RTR session replied No Data Available to Serial Query: client_session_id={}, client_serial={}, state={}, negotiated_version={:?}",
client_session, client_session,
client_serial, client_serial,
self.session_summary() self.state_name(),
self.version
); );
return Ok(()); return Ok(());
} }
@ -663,75 +765,53 @@ where
); );
} }
let serial_result = { let serial_result = cache.get_deltas_since_for_version(version, client_serial);
let cache = self let current_session = cache.session_id_for_version(version);
.cache let current_serial = cache.serial_for_version(version);
.read() let timing = cache.timing();
.map_err(|_| anyhow!("cache read lock poisoned"))?;
cache.get_deltas_since_for_version(version, client_serial)
};
match serial_result { match serial_result {
SerialResult::ResetRequired => { SerialResult::ResetRequired => {
self.write_cache_reset().await?; self.write_cache_reset().await?;
debug!( debug!(
"RTR session replied Cache Reset to Serial Query: client_session_id={}, client_serial={}, {}", "RTR session replied Cache Reset to Serial Query: client_session_id={}, client_serial={}, state={}, negotiated_version={:?}",
client_session, client_session,
client_serial, client_serial,
self.session_summary() self.state_name(),
self.version
); );
return Ok(()); return Ok(());
} }
SerialResult::UpToDate => { SerialResult::UpToDate => {
let (current_session, current_serial) = {
let cache = self
.cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
(
cache.session_id_for_version(version),
cache.serial_for_version(version),
)
};
self.write_cache_response(current_session).await?; self.write_cache_response(current_session).await?;
self.write_end_of_data(current_session, current_serial) self.write_end_of_data(current_session, current_serial, timing)
.await?; .await?;
debug!( debug!(
"RTR session replied CacheResponse+EndOfData (up-to-date) to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", "RTR session replied CacheResponse+EndOfData (up-to-date) to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, state={}, negotiated_version={:?}",
client_session, client_session,
client_serial, client_serial,
current_session, current_session,
current_serial, current_serial,
self.session_summary() self.state_name(),
self.version
); );
return Ok(()); return Ok(());
} }
SerialResult::Delta(delta) => { SerialResult::Delta(delta) => {
let (current_session, current_serial) = {
let cache = self
.cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
(
cache.session_id_for_version(version),
cache.serial_for_version(version),
)
};
self.write_cache_response(current_session).await?; self.write_cache_response(current_session).await?;
self.send_delta(&delta).await?; self.send_delta(&delta).await?;
self.write_end_of_data(current_session, current_serial) self.write_end_of_data(current_session, current_serial, timing)
.await?; .await?;
debug!( debug!(
"RTR session replied delta to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", "RTR session replied delta to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, state={}, negotiated_version={:?}",
client_session, client_session,
client_serial, client_serial,
current_session, current_session,
current_serial, current_serial,
self.session_summary() self.state_name(),
self.version
); );
} }
} }
@ -753,18 +833,16 @@ where
if let Some(last) = self.last_notify_at { if let Some(last) = self.last_notify_at {
if now.duration_since(last) < NOTIFY_MIN_INTERVAL { if now.duration_since(last) < NOTIFY_MIN_INTERVAL {
debug!( debug!(
"RTR session notify skipped due to rate limit: {}", "RTR session notify skipped due to rate limit: state={}, negotiated_version={:?}",
self.session_summary() self.state_name(),
self.version
); );
return Ok(()); return Ok(());
} }
} }
let (session_id, serial) = { let (session_id, serial) = {
let cache = self let cache = self.cache.load_full();
.cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
( (
cache.session_id_for_version(version), cache.session_id_for_version(version),
cache.serial_for_version(version), cache.serial_for_version(version),
@ -772,56 +850,50 @@ where
}; };
debug!( debug!(
"RTR session sending SerialNotify: notify_session_id={}, notify_serial={}, {}", "RTR session sending SerialNotify: notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}",
session_id, session_id,
serial, serial,
self.session_summary() self.state_name(),
self.version
); );
if let Err(err) = SerialNotify::new(version, session_id, serial) if let Err(err) = SerialNotify::new(version, session_id, serial)
.write(&mut self.stream) .write(&mut self.stream)
.await .await
{ {
error!( error_rate_limited("session_serial_notify_send_failed", || {
"RTR session failed to send SerialNotify: err={}, notify_session_id={}, notify_serial={}, {}", format!(
"RTR session failed to send SerialNotify: err={}, notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}",
err, err,
session_id, session_id,
serial, serial,
self.session_summary() self.state_name(),
); self.version
)
});
return Err(err.into()); return Err(err.into());
} }
self.last_notify_at = Some(now); self.last_notify_at = Some(now);
info!( info!(
"RTR session sent SerialNotify: notify_session_id={}, notify_serial={}, {}", "RTR session sent SerialNotify: notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}",
session_id, session_id,
serial, serial,
self.session_summary() self.state_name(),
self.version
); );
Ok(()) Ok(())
} }
fn session_summary(&self) -> String { fn session_summary(&self) -> String {
let serial = self let (session_id, serial) = if let Some(version) = self.version {
.cache let cache = self.cache.load_full();
.read() (
.ok() cache.session_id_for_version(version).to_string(),
.and_then(|cache| { cache.serial_for_version(version).to_string(),
self.version )
.map(|version| cache.serial_for_version(version)) } else {
}) ("<unknown>".to_string(), "<unavailable>".to_string())
.map(|serial| serial.to_string()) };
.unwrap_or_else(|| "<unavailable>".to_string());
let session_id = self
.version
.and_then(|version| {
self.cache
.read()
.ok()
.map(|cache| cache.session_id_for_version(version))
})
.map(|id| id.to_string())
.unwrap_or_else(|| "<unknown>".to_string());
format!( format!(
"state={}, negotiated_version={:?}, cache_session_id={}, cache_serial={}", "state={}, negotiated_version={:?}, cache_session_id={}, cache_serial={}",
self.state_name(), self.state_name(),
@ -849,12 +921,7 @@ where
return timeout; return timeout;
} }
let retry = self let retry = self.cache.load_full().timing().retry();
.cache
.read()
.ok()
.map(|cache| cache.timing().retry())
.unwrap_or_else(|| Duration::from_secs(600));
retry.checked_mul(3).unwrap_or(retry) retry.checked_mul(3).unwrap_or(retry)
} }
@ -878,15 +945,8 @@ where
Ok(()) Ok(())
} }
async fn write_end_of_data(&mut self, session_id: u16, serial: u32) -> Result<()> { async fn write_end_of_data(&mut self, session_id: u16, serial: u32, timing: Timing) -> Result<()> {
let version = self.version()?; let version = self.version()?;
let timing = {
let cache = self
.cache
.read()
.map_err(|_| anyhow!("cache read lock poisoned"))?;
cache.timing()
};
let end = EndOfData::new(version, session_id, serial, timing)?; let end = EndOfData::new(version, session_id, serial, timing)?;
debug!( debug!(
@ -932,7 +992,7 @@ where
} }
async fn send_delta(&mut self, delta: &Delta) -> Result<()> { async fn send_delta(&mut self, delta: &Delta) -> Result<()> {
let updates = delta.payloads_for_rtr(); let updates = delta.payload_updates_for_rtr();
// draft-ietf-sidrops-8210bis-25 Section 11.4 / 12 define Ordering Error // draft-ietf-sidrops-8210bis-25 Section 11.4 / 12 define Ordering Error
// for the party receiving out-of-order PDUs. A validator failure here // for the party receiving out-of-order PDUs. A validator failure here
// means we are about to send an invalid sequence, so abort locally // means we are about to send an invalid sequence, so abort locally
@ -953,7 +1013,7 @@ where
aspas aspas
); );
for (announce, payload) in updates { for (announce, payload) in updates {
self.send_payload(&payload, announce).await?; self.send_payload(payload, *announce).await?;
} }
Ok(()) Ok(())
} }
@ -1065,26 +1125,30 @@ where
text: &[u8], text: &[u8],
) -> io::Result<()> { ) -> io::Result<()> {
let text_preview = String::from_utf8_lossy(text); let text_preview = String::from_utf8_lossy(text);
warn!( warn_rate_limited("session_error_report_sent", || {
format!(
"RTR session sending ErrorReport: version={}, error_code={}({}), offending_pdu_len={}, text={}", "RTR session sending ErrorReport: version={}, error_code={}({}), offending_pdu_len={}, text={}",
version, version,
code.as_u16(), code.as_u16(),
code.description(), code.description(),
offending_pdu.len(), offending_pdu.len(),
text_preview text_preview
); )
});
ErrorReport::new(version, code.as_u16(), offending_pdu, text) ErrorReport::new(version, code.as_u16(), offending_pdu, text)
.write(&mut self.stream) .write(&mut self.stream)
.await .await
} }
async fn handle_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> { async fn handle_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> {
warn!( warn_rate_limited("session_established_pdu_read_failed", || {
format!(
"RTR session failed to read established-session PDU payload: pdu={}, version={}, err={}", "RTR session failed to read established-session PDU payload: pdu={}, version={}, err={}",
header.pdu(), header.pdu(),
header.version(), header.version(),
err err
); )
});
if err.kind() == io::ErrorKind::InvalidData { if err.kind() == io::ErrorKind::InvalidData {
let offending = self.read_full_pdu_bytes(header).await?; let offending = self.read_full_pdu_bytes(header).await?;
let version = self.version()?; let version = self.version()?;
@ -1099,12 +1163,14 @@ where
} }
async fn handle_first_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> { async fn handle_first_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> {
warn!( warn_rate_limited("session_first_pdu_read_failed", || {
format!(
"RTR session failed to read first PDU payload: pdu={}, version={}, err={}", "RTR session failed to read first PDU payload: pdu={}, version={}, err={}",
header.pdu(), header.pdu(),
header.version(), header.version(),
err err
); )
});
if err.kind() == io::ErrorKind::InvalidData { if err.kind() == io::ErrorKind::InvalidData {
let offending = self.read_full_pdu_bytes(header).await?; let offending = self.read_full_pdu_bytes(header).await?;
let err_version = let err_version =
@ -1129,10 +1195,12 @@ where
raw_header: [u8; HEADER_LEN], raw_header: [u8; HEADER_LEN],
err: io::Error, err: io::Error,
) -> Result<()> { ) -> Result<()> {
warn!( warn_rate_limited("session_invalid_header_bytes", || {
format!(
"RTR session handling invalid header bytes: raw_header={:02X?}, err={}", "RTR session handling invalid header bytes: raw_header={:02X?}, err={}",
raw_header, err raw_header, err
); )
});
if err.kind() == io::ErrorKind::InvalidData { if err.kind() == io::ErrorKind::InvalidData {
let version = match self.version { let version = match self.version {
Some(version) => version, Some(version) => version,
@ -1158,12 +1226,14 @@ where
let version = self.version.unwrap_or(SUPPORTED_MAX_VERSION); let version = self.version.unwrap_or(SUPPORTED_MAX_VERSION);
let timeout = self.transport_timeout(); let timeout = self.transport_timeout();
let detail = format!("transport stalled for longer than {:?}", timeout); let detail = format!("transport stalled for longer than {:?}", timeout);
warn!( warn_rate_limited("session_transport_timeout", || {
format!(
"RTR session transport timeout: version={}, offending_pdu_len={}, timeout={:?}", "RTR session transport timeout: version={}, offending_pdu_len={}, timeout={:?}",
version, version,
offending_pdu.len(), offending_pdu.len(),
timeout timeout
); )
});
let _ = self let _ = self
.send_error( .send_error(
@ -1187,16 +1257,39 @@ where
)); ));
}; };
let mut bytes = Vec::with_capacity(total_len); let capture_len = total_len.min(MAX_OFFENDING_PDU_CAPTURE);
let capture_payload_len = capture_len.saturating_sub(HEADER_LEN);
let mut bytes = Vec::with_capacity(capture_len);
bytes.extend_from_slice(header.as_ref()); bytes.extend_from_slice(header.as_ref());
bytes.resize(total_len, 0); bytes.resize(capture_len, 0);
if capture_payload_len > 0 {
timeout( timeout(
self.transport_timeout(), self.transport_timeout(),
self.stream self.stream
.read_exact(&mut bytes[HEADER_LEN..HEADER_LEN + payload_len]), .read_exact(&mut bytes[HEADER_LEN..HEADER_LEN + capture_payload_len]),
) )
.await .await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??; .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??;
}
let mut remaining = payload_len.saturating_sub(capture_payload_len);
let mut discard_buf = [0u8; 1024];
while remaining > 0 {
let read_len = remaining.min(discard_buf.len());
let read = timeout(
self.transport_timeout(),
self.stream.read(&mut discard_buf[..read_len]),
)
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??;
if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF while draining oversized PDU payload",
));
}
remaining -= read;
}
Ok(bytes) Ok(bytes)
} }
} }