diff --git a/certs/bird-rtr-client-rsa.pem b/certs/bird-rtr-client-rsa.pem new file mode 100644 index 0000000..c190cd3 --- /dev/null +++ b/certs/bird-rtr-client-rsa.pem @@ -0,0 +1,39 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIG4wIBAAKCAYEAvA91otbecwZFtDiZcV2m/A8RP9C+r1DmaDi/1tBaqleNRt3W +UCdMPEF5NGCBWq/52tWwL3rBXEEp0gcEHYoTZ5djAlAjLaQphbFGQU/h3WWDEVOl +GFnGCc4DP26SRp/oC01QpOlgT9Z/m2xkMZGhZo9oR2v7+ARmF9Qrxlj9HjuDWmZI +Onl5DGXmbI0sBOs1PMtOvmU2aJNKVU3hD335Z0J9c5NubuQQtS/vl0XJQYgbCMh9 +4BlbEBppJ6LyzcILbUAfayl2U9sQGErf9CjkwOuITpRBO+f10BQSpGjznRYCX4Yf +71lhH1ri6lOKhupOW1wgC6kkQTcvI9hub1ePLtmOiWUCfnrc2eZljNssqKWYC9+5 +v9Ya6+aikSlOheUoMJP1C50PTMnMqRfNGLn3yFKTT/CaQBw1TMO/+Rfsb80Oj9O3 +yF9zCHTFrUsYn9T2tupRi/aYWfLe3ckqTBLLhSWLfiY9S49U/fdP5Sk8l4JsAyxE +9oH6RokJIrZxR09pAgMBAAECggGABSIjOvtEWLLwjFC+3YGUg+JcZ+0d22FpB1TE +kHZLrJqAXybB3sUSghLjyP/OYB2YGMwG8mYr04XC1g++IhlJWiVKJ6oXf7uGAkx6 +Z2DTwJbwsDR5N/QxYUX36uTCMOkqAhmi7HwjVnfmL/r83tPZFO3ase0QacQi+PO1 +cIVa1BcEnknW9GOpn81DRYNyqBK3N2OG+3r99HZQITUjl32VOnyYjzhUcMn1wSBf +w6BnyTZleISn159m5TEMsu8yKKwrLDNe5pyXjLk1HpU8fDo/CoDq+i4KJ2V2Ns4j ++zg2EMqW1EQNI/BYDtOYlL+m5dGEST50Oqq7OiopvchKjIkvgFr+08vlyCyjbryv +DF5B6VEkk1iEacDKK/NyHdjXPmGBvlEeBixOOpS5dBb6m0KPx9C9jTi/tTRBnH4t +7qAko/LoYfBgjyRqDyqeUKrvVp/3NOmkQefaHttr9abhotjmtGqAvJfN1VfFY5e1 +s6Ga7yJno5tyS7RArw3OA6t2n2S1AoHBAN8mMuuZQL5xNY2bpn5O9u19QaoG3rRR +Ma6nEO2QPxM+hB8QR2fYAZwgHr38B+keodf/Xe02Xkf+urd2t+eHpoMbzZeEopaM +hNDEo1mYaIl0lgQLy1RzpvfV0B+gy808FOut6YDESyJ4sxGuP3E7CIn+QfFh9Uht +6I/ZyDfM87Q2LlP8ES/EecHfVO97l1gOJ5T9j1bnTTVgM4zJqAWiqlLyUcS56sos +7NPZbYF81ViR4BI/3rthtoUUY8hoSmfXFQKBwQDXvt8oC3Inmr729nrsZCb72fbG +8T9DieUW8MCAwvv4c1DEA2aATgLpidH+NJJyCsR7Ph05AvcSpiUgyF25TeTliBZu +YUaUDE8JuTFTEcjkqpPj62MX8C8J3DjN0TG0fI1ltousef+Ind64Fdh2kPKl2YfE +F/95pc+/y0VbXl8rcQyYCQuo+4ooBIbq1BhprzKC6Tdg2HsvCYrz3EzOJHHD/iLZ +jlHjXAeKo0MjVyDRDy3ollxDg7a1ESGwewVGrAUCgcEAkyeLmNY9XdkmKbYlO1Iz +gDWDoe5Z2qtYigZeIWtoTPaDBkEj9ZT6qsx7uWmEhfTPYbNja1TZI70VwHqnmCVa +Z0dkcrDiz0jnJQ0nc6QP++VIMG7erVh/GRyE3Paar0MZwLm1LFdF/Pt/iv9PkwoM +/YMQVW/14sen/4Tshe/AHm1Ea9nkM43rhTATwMHN8iGTdKspZXOu9K32ELfC98Rn +cb+esI6yCJVLsADIOAXsYzX/f8lixksPo/7oNnaf6o5pAoHAcxBooknge0DsAnkY +vET6Ca6JEDeQfyvnU+HZOZNEoZCXDcOmgl2Y7gdESSiuxkX53qUVHtf3ACo+eQqD ++hWSM9zt4bbd1o3uBYiQxvYgR9y1/YQIGLdPzxl17kdZFCKtowbF3Zo/cBSKuXFl +Pm82CsBGyY3nAnEtqOP3vLBLX7bueZOxdVGasJchgdLWpl77OJi7oVoz8LVNN5xS +FkuXpWlAGvntsK3fk9BZOVr5tKY83OR8lsGDy3Q0nJQwDy4lAoHAOqqDBGbV9oMZ +kBMnntl9FNE8h/UoprFEQKEAsU5Vbx7hrPkLGmgO6pWQceFPYXTdu2u8ictWCBoP +aogdoEsD59A6XHQ28OqMuf41njNc6E6kVBoFq9j9+for8EQsx5iH6Azd+rV67hxW +esLGn7H+P55t5s7/KgPOn2fZI4cHScvEJxNZMqqoUsq488by0Obf0qYmwpKh3HE3 +1wybZj9ZTTFTDwqpyrWBUF2BLf/LGlalcrbbiYIHa1L8i5Gy6u6y +-----END RSA PRIVATE KEY----- diff --git a/certs/bird-rtr-client-rsa.pem.pub b/certs/bird-rtr-client-rsa.pem.pub new file mode 100644 index 0000000..cff53f4 Binary files /dev/null and b/certs/bird-rtr-client-rsa.pem.pub differ diff --git a/certs/ssh_known_hosts b/certs/ssh_known_hosts new file mode 100644 index 0000000..2bf2602 --- /dev/null +++ b/certs/ssh_known_hosts @@ -0,0 +1,2 @@ +host.docker.internal ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDTymirIP0Z4Hth01pumXpwV9/fIe7v8Tq6y9b38FjTkeNhUNZJQIG0ssj7VnbDTT/wG44oQJnwTFJ8xbIwdYNBZzbBcvxlhk9CPOo4JZc2w230NcyKfsMZLcuDgMKyyXebdwwxGRBxoqUZt04swlrK3cAEd/6Wi9ErwP1Tp4YAKxfj4XvOhJvmWE1q4isffb2zEqrkcXycqBAQKK9zjO+xQF0VfFJqTec1GYD5JznwjAF/yWK/57FZgdQKuH9jdDuzykTl7G891uSkRKTbN7uwhqtnkTergVhk/vDoxz8En0FXP5iaQhVwO2bTDAYwUvnpt/HaXzsaBlGYeNZX7q+dcLqKQfcokj7IKqGIvWtShpWBYKkexSwXpkeVk/0ajRAfdkCjSkMaerEkk9lrbNQ5oXrHEXchQ61n+gxkY2pf5a6Jcq6pMwtwvs1xP5ZBdiTXNFwdb0InxeYKXjFLKYQDuQjs5sbAnVV+L+hwhSOC0zF1g5ZoZgC4WnRpjj+e/e0= +[host.docker.internal]:22 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDTymirIP0Z4Hth01pumXpwV9/fIe7v8Tq6y9b38FjTkeNhUNZJQIG0ssj7VnbDTT/wG44oQJnwTFJ8xbIwdYNBZzbBcvxlhk9CPOo4JZc2w230NcyKfsMZLcuDgMKyyXebdwwxGRBxoqUZt04swlrK3cAEd/6Wi9ErwP1Tp4YAKxfj4XvOhJvmWE1q4isffb2zEqrkcXycqBAQKK9zjO+xQF0VfFJqTec1GYD5JznwjAF/yWK/57FZgdQKuH9jdDuzykTl7G891uSkRKTbN7uwhqtnkTergVhk/vDoxz8En0FXP5iaQhVwO2bTDAYwUvnpt/HaXzsaBlGYeNZX7q+dcLqKQfcokj7IKqGIvWtShpWBYKkexSwXpkeVk/0ajRAfdkCjSkMaerEkk9lrbNQ5oXrHEXchQ61n+gxkY2pf5a6Jcq6pMwtwvs1xP5ZBdiTXNFwdb0InxeYKXjFLKYQDuQjs5sbAnVV+L+hwhSOC0zF1g5ZoZgC4WnRpjj+e/e0= diff --git a/data/20260324T091640Z-yyz1.ccr b/data/20260324T091640Z-yyz1.ccr deleted file mode 100644 index 7853517..0000000 Binary files a/data/20260324T091640Z-yyz1.ccr and /dev/null differ diff --git a/deploy/bird/README.md b/deploy/bird/README.md index ba4b570..b872dbc 100644 --- a/deploy/bird/README.md +++ b/deploy/bird/README.md @@ -16,8 +16,8 @@ Server defaults in this repo: - `docker-compose.yml`: one-click local TCP test client. - `docker-compose.ssh.yml`: compose override for SSH transport. -By default, the container prints periodic RPKI protocol snapshots to logs -every 30 seconds. +By default, the container uses event-driven observation and prints snapshots +only when BIRD reports RPKI-related changes. ## Docker quick start @@ -77,7 +77,10 @@ docker logs -f bird-rpki-client - `network_mode: host` expects your RTR server to be reachable at `host.docker.internal:323` from the container. - Observation is controlled by env vars: - `OBSERVE_INTERVAL` (seconds, default `30`) and `OBSERVE_PROTO`. + `OBSERVE_MODE` (`event` by default, `interval` as fallback), + `OBSERVE_DEBOUNCE_SECS` (default `1`), + `OBSERVE_INTERVAL` (seconds, used when `OBSERVE_MODE=interval`), + and `OBSERVE_PROTO`. - SSH mode mounts `../../certs` into `/config/ssh` and expects: `bird-rtr-client.pem` and `ssh_host_rsa_key.pub`. - Entrypoint auto-generates `/run/bird/known_hosts` from diff --git a/deploy/bird/docker-compose.yml b/deploy/bird/docker-compose.yml index 7f0a92b..ae8b3eb 100644 --- a/deploy/bird/docker-compose.yml +++ b/deploy/bird/docker-compose.yml @@ -15,6 +15,8 @@ services: RPKI_PORT: "323" OBSERVE_PROTO: "rpki_tcp" + OBSERVE_MODE: "event" + OBSERVE_DEBOUNCE_SECS: "1" OBSERVE_INTERVAL: "30" OBSERVE_ASPA_TABLE: "rtr_aspa" @@ -29,4 +31,4 @@ services: SHOW_ROA4: "1" SHOW_ROA6: "1" volumes: - - ./bird.conf:/config/bird.conf:ro \ No newline at end of file + - ./bird.conf:/config/bird.conf:ro diff --git a/deploy/bird/entrypoint.sh b/deploy/bird/entrypoint.sh index aa2ee0e..4bfa720 100644 --- a/deploy/bird/entrypoint.sh +++ b/deploy/bird/entrypoint.sh @@ -6,6 +6,8 @@ mkdir -p /run/bird SOCK_PATH="/run/bird/bird.ctl" PROTO="${OBSERVE_PROTO:-rpki_tcp}" INTERVAL="${OBSERVE_INTERVAL:-30}" +MODE="${OBSERVE_MODE:-event}" +DEBOUNCE_SECS="${OBSERVE_DEBOUNCE_SECS:-1}" RPKI_HOST="${RPKI_HOST:-host.docker.internal}" RPKI_PORT="${RPKI_PORT:-323}" @@ -101,10 +103,37 @@ print_first_n_objects() { ' || true } +print_snapshot() { + echo "==== $(date -u +"%Y-%m-%dT%H:%M:%SZ") RPKI snapshot ($PROTO) ====" + birdc -s "$SOCK_PATH" show protocols all "$PROTO" || true + + if [ "$SHOW_ASPA" = "1" ]; then + echo "---- ASPA table ($ASPA_TABLE, first ${ASPA_COUNT} objects) ----" + print_first_n_objects "$ASPA_TABLE" "$ASPA_COUNT" + fi + + if [ "$SHOW_ROA4" = "1" ]; then + echo "---- ROA4 table ($ROA4_TABLE, first ${ROA4_COUNT} objects) ----" + print_first_n_objects "$ROA4_TABLE" "$ROA4_COUNT" + fi + + if [ "$SHOW_ROA6" = "1" ]; then + echo "---- ROA6 table ($ROA6_TABLE, first ${ROA6_COUNT} objects) ----" + print_first_n_objects "$ROA6_TABLE" "$ROA6_COUNT" + fi +} + +is_rpki_related_event() { + line="$1" + echo "$line" | grep -Eiq "$PROTO|$ASPA_TABLE|$ROA4_TABLE|$ROA6_TABLE|rpki|rtr" +} + echo "[entrypoint] starting bird" echo "[entrypoint] config : $BIRD_CONFIG_PATH" echo "[entrypoint] observe proto : $PROTO" -echo "[entrypoint] observe interval : $INTERVAL" +echo "[entrypoint] observe mode : $MODE" +echo "[entrypoint] observe interval : $INTERVAL (used when mode=interval)" +echo "[entrypoint] debounce secs : $DEBOUNCE_SECS (used when mode=event)" echo "[entrypoint] target : $RPKI_HOST:$RPKI_PORT" echo "[entrypoint] show aspa : $SHOW_ASPA ($ASPA_TABLE, first $ASPA_COUNT objects)" echo "[entrypoint] show roa4 : $SHOW_ROA4 ($ROA4_TABLE, first $ROA4_COUNT objects)" @@ -131,26 +160,41 @@ case "$INTERVAL" in ;; esac -if [ "$INTERVAL" -gt 0 ]; then +case "$MODE" in + event) + case "$DEBOUNCE_SECS" in + ''|*[!0-9]*) + DEBOUNCE_SECS=1 + ;; + esac + + echo "[entrypoint] waiting for BIRD monitor events and only printing on RPKI-related changes" + LAST_PRINT_TS=0 + + if birdc -s "$SOCK_PATH" monitor all 2>/dev/null | while IFS= read -r line; do + [ -n "$line" ] || continue + if ! is_rpki_related_event "$line"; then + continue + fi + now_ts="$(date +%s)" + if [ $((now_ts - LAST_PRINT_TS)) -lt "$DEBOUNCE_SECS" ]; then + continue + fi + LAST_PRINT_TS="$now_ts" + echo "[monitor] $line" + print_snapshot + done; then + : + else + echo "[entrypoint] WARNING: birdc monitor failed, fallback to interval polling" + MODE="interval" + fi + ;; +esac + +if [ "$MODE" = "interval" ] && [ "$INTERVAL" -gt 0 ]; then while kill -0 "$BIRD_PID" 2>/dev/null; do - echo "==== $(date -u +"%Y-%m-%dT%H:%M:%SZ") RPKI snapshot ($PROTO) ====" - birdc -s "$SOCK_PATH" show protocols all "$PROTO" || true - - if [ "$SHOW_ASPA" = "1" ]; then - echo "---- ASPA table ($ASPA_TABLE, first ${ASPA_COUNT} objects) ----" - print_first_n_objects "$ASPA_TABLE" "$ASPA_COUNT" - fi - - if [ "$SHOW_ROA4" = "1" ]; then - echo "---- ROA4 table ($ROA4_TABLE, first ${ROA4_COUNT} objects) ----" - print_first_n_objects "$ROA4_TABLE" "$ROA4_COUNT" - fi - - if [ "$SHOW_ROA6" = "1" ]; then - echo "---- ROA6 table ($ROA6_TABLE, first ${ROA6_COUNT} objects) ----" - print_first_n_objects "$ROA6_TABLE" "$ROA6_COUNT" - fi - + print_snapshot sleep "$INTERVAL" done fi diff --git a/src/main.rs b/src/main.rs index 90ee0e2..78507fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use std::env; use std::net::SocketAddr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{Result, anyhow}; +use arc_swap::ArcSwap; use chrono::{FixedOffset, Utc}; use tokio::task::JoinHandle; use tracing::{info, warn}; @@ -308,19 +309,13 @@ fn init_shared_cache(config: &AppConfig, store: &RtrStore) -> Result { let payload_count = payloads.len(); let updated = { - let mut cache = match shared_cache.write() { - Ok(guard) => guard, - Err(_) => { - warn!("cache write lock poisoned during refresh"); - continue; - } - }; + let old_cache = shared_cache.load_full(); + let old_serial = old_cache.serial_for_version(2); + let mut next_cache = old_cache.as_ref().clone(); - let old_serial = cache.serial_for_version(2); - - match cache.update(payloads, &store) { + match next_cache.update(payloads, &store) { Ok(()) => { - let new_serial = cache.serial_for_version(2); + let new_serial = next_cache.serial_for_version(2); + shared_cache.store(std::sync::Arc::new(next_cache)); if new_serial != old_serial { info!( "RTR cache refresh applied: ccr_dir={}, payload_count={}, old_serial={}, new_serial={}", diff --git a/src/rtr/cache/core.rs b/src/rtr/cache/core.rs index e679aa0..345b86f 100644 --- a/src/rtr/cache/core.rs +++ b/src/rtr/cache/core.rs @@ -58,21 +58,25 @@ pub struct VersionState { serial: u32, snapshot: Snapshot, #[serde(skip)] + rtr_payloads: Arc>, + #[serde(skip)] deltas: VecDeque>, } impl VersionState { fn new(session_id: u16, serial: u32, snapshot: Snapshot, max_delta: u8) -> Self { + let rtr_payloads = Arc::new(snapshot.payloads_for_rtr()); Self { session_id, serial, snapshot, + rtr_payloads, deltas: VecDeque::with_capacity(max_delta as usize), } } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RtrCache { availability: CacheAvailability, versions: [VersionState; VERSION_COUNT], @@ -190,11 +194,15 @@ impl RtrCacheBuilder { std::array::from_fn(|_| VecDeque::with_capacity(max_delta as usize)) }); - let versions = std::array::from_fn(|idx| VersionState { - session_id: session_ids.as_array()[idx], - serial: serials[idx], - snapshot: snapshots[idx].clone(), - deltas: deltas[idx].clone(), + let versions = std::array::from_fn(|idx| { + let snapshot = snapshots[idx].clone(); + VersionState { + session_id: session_ids.as_array()[idx], + serial: serials[idx], + rtr_payloads: Arc::new(snapshot.payloads_for_rtr()), + snapshot, + deltas: deltas[idx].clone(), + } }); let created_at = self.created_at.unwrap_or_else(|| now.clone()); @@ -222,6 +230,7 @@ impl RtrCache { self.availability = CacheAvailability::NoDataAvailable; for version_state in &mut self.versions { version_state.snapshot = Snapshot::empty(); + version_state.rtr_payloads = Arc::new(Vec::new()); version_state.deltas.clear(); } } @@ -238,6 +247,7 @@ impl RtrCache { state.session_id = new_session_ids.get(v); state.serial = 1; state.snapshot = project_snapshot_for_version(source_snapshot, v); + state.rtr_payloads = Arc::new(state.snapshot.payloads_for_rtr()); state.deltas.clear(); } self.last_update_end = DualTime::now(); @@ -352,6 +362,7 @@ impl RtrCache { } state.snapshot = projected; + state.rtr_payloads = Arc::new(state.snapshot.payloads_for_rtr()); Self::push_delta( state, self.max_delta, @@ -427,6 +438,10 @@ impl RtrCache { self.versions[version_index(version)].snapshot.clone() } + pub fn rtr_payloads_for_version(&self, version: u8) -> Arc> { + self.versions[version_index(version)].rtr_payloads.clone() + } + pub fn serial_for_version(&self, version: u8) -> u32 { self.versions[version_index(version)].serial } diff --git a/src/rtr/cache/mod.rs b/src/rtr/cache/mod.rs index 1a38470..9affc59 100644 --- a/src/rtr/cache/mod.rs +++ b/src/rtr/cache/mod.rs @@ -9,6 +9,8 @@ pub use ordering::{ OrderingViolation, validate_payload_updates_for_rtr, validate_payloads_for_rtr, }; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; -pub type SharedRtrCache = Arc>; +use arc_swap::ArcSwap; + +pub type SharedRtrCache = Arc>; diff --git a/src/rtr/cache/model.rs b/src/rtr/cache/model.rs index 5cee060..93ded34 100644 --- a/src/rtr/cache/model.rs +++ b/src/rtr/cache/model.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::sync::OnceLock; use std::time::{Duration, Instant}; use chrono::{DateTime, NaiveDateTime, Utc}; @@ -282,6 +283,8 @@ pub struct Delta { announced: Vec, withdrawn: Vec, created_at: DualTime, + #[serde(skip)] + updates_for_rtr: OnceLock>, } impl Delta { @@ -291,12 +294,16 @@ impl Delta { sort_payloads_for_rtr(&mut announced, true); sort_payloads_for_rtr(&mut withdrawn, false); + let cached_updates = build_payload_updates_for_rtr(&announced, &withdrawn); + let updates_for_rtr = OnceLock::new(); + let _ = updates_for_rtr.set(cached_updates); Delta { serial, announced, withdrawn, created_at: DualTime::now(), + updates_for_rtr, } } @@ -320,18 +327,31 @@ impl Delta { self.announced.is_empty() && self.withdrawn.is_empty() } - pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> { - let mut updates = Vec::with_capacity(self.announced.len() + self.withdrawn.len()); - - updates.extend(self.announced.iter().cloned().map(|p| (true, p))); - updates.extend(self.withdrawn.iter().cloned().map(|p| (false, p))); - - updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| { - compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd) - }); - - updates + pub fn payload_updates_for_rtr(&self) -> &[(bool, Payload)] { + self.updates_for_rtr + .get_or_init(|| build_payload_updates_for_rtr(&self.announced, &self.withdrawn)) + .as_slice() } + + pub fn payloads_for_rtr(&self) -> Vec<(bool, Payload)> { + self.payload_updates_for_rtr().to_vec() + } +} + +fn build_payload_updates_for_rtr( + announced: &[Payload], + withdrawn: &[Payload], +) -> Vec<(bool, Payload)> { + let mut updates = Vec::with_capacity(announced.len() + withdrawn.len()); + + updates.extend(announced.iter().cloned().map(|p| (true, p))); + updates.extend(withdrawn.iter().cloned().map(|p| (false, p))); + + updates.sort_by(|(a_upd, a_payload), (b_upd, b_payload)| { + compare_payload_update_for_rtr(a_payload, *a_upd, b_payload, *b_upd) + }); + + updates } fn dedup_payloads(payloads: &mut Vec) { diff --git a/src/rtr/session.rs b/src/rtr/session.rs index b30d796..b5208d9 100644 --- a/src/rtr/session.rs +++ b/src/rtr/session.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::sync::Arc; +use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; use anyhow::{Result, anyhow, bail}; @@ -14,7 +16,7 @@ use crate::rtr::cache::{ validate_payloads_for_rtr, }; use crate::rtr::error_type::ErrorCode; -use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey}; +use crate::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey, Timing}; use crate::rtr::pdu::{ Aspa as AspaPdu, CacheReset, CacheResponse, EndOfData, ErrorReport, Flags, HEADER_LEN, Header, IPv4Prefix, IPv6Prefix, ResetQuery, RouterKey as RouterKeyPdu, SerialNotify, SerialQuery, @@ -28,6 +30,83 @@ const WITHDRAW_FLAG: u8 = 0; /// Per-session notify rate limit: no more than once per minute. const NOTIFY_MIN_INTERVAL: Duration = Duration::from_secs(60); +/// Limit captured offending PDU bytes in memory for error reporting. +const MAX_OFFENDING_PDU_CAPTURE: usize = 4096; +/// Log rate limiting window and burst for repeated high-frequency events. +const LOG_RATE_LIMIT_WINDOW: Duration = Duration::from_secs(30); +const LOG_RATE_LIMIT_BURST: u32 = 10; + +static SESSION_LOG_LIMITERS: OnceLock>> = OnceLock::new(); + +#[derive(Debug, Clone, Copy)] +struct LogBucket { + window_start: Instant, + emitted: u32, + suppressed: u32, +} + +fn allow_rate_limited_log(event: &'static str) -> Option { + let now = Instant::now(); + let lock = SESSION_LOG_LIMITERS.get_or_init(|| Mutex::new(HashMap::new())); + let mut map = match lock.lock() { + Ok(guard) => guard, + Err(poisoned) => poisoned.into_inner(), + }; + + let bucket = map.entry(event).or_insert_with(|| LogBucket { + window_start: now, + emitted: 0, + suppressed: 0, + }); + + if now.duration_since(bucket.window_start) >= LOG_RATE_LIMIT_WINDOW { + let suppressed = bucket.suppressed; + bucket.window_start = now; + bucket.emitted = 1; + bucket.suppressed = 0; + return Some(suppressed); + } + + if bucket.emitted < LOG_RATE_LIMIT_BURST { + bucket.emitted += 1; + let suppressed = bucket.suppressed; + bucket.suppressed = 0; + Some(suppressed) + } else { + bucket.suppressed += 1; + None + } +} + +fn warn_rate_limited(event: &'static str, build: F) +where + F: FnOnce() -> String, +{ + if let Some(suppressed) = allow_rate_limited_log(event) { + if suppressed > 0 { + warn!( + "RTR session log rate-limited: event={}, suppressed_count={}", + event, suppressed + ); + } + warn!("{}", build()); + } +} + +fn error_rate_limited(event: &'static str, build: F) +where + F: FnOnce() -> String, +{ + if let Some(suppressed) = allow_rate_limited_log(event) { + if suppressed > 0 { + warn!( + "RTR session log rate-limited: event={}, suppressed_count={}", + event, suppressed + ); + } + error!("{}", build()); + } +} #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum SessionState { @@ -126,16 +205,25 @@ where return Ok(()); } Err(err) if err.kind() == io::ErrorKind::TimedOut => { - warn!("RTR session transport timeout while waiting for header: {}", self.session_summary()); + warn_rate_limited("session_header_timeout_waiting", || { + format!( + "RTR session transport timeout while waiting for header: state={}, negotiated_version={:?}", + self.state_name(), + self.version + ) + }); self.handle_transport_timeout(&[]).await?; return Ok(()); } Err(err) => { - warn!( - "RTR session failed to read header: err={}, {}", - err, - self.session_summary() - ); + warn_rate_limited("session_header_read_failed", || { + format!( + "RTR session failed to read header: err={}, state={}, negotiated_version={:?}", + err, + self.state_name(), + self.version + ) + }); self.state = SessionState::Closed; return Ok(()); } @@ -143,11 +231,14 @@ where let header = match Header::from_raw(raw_header) { Ok(h) => h, Err(err) => { - warn!( - "RTR session received invalid header: err={}, {}", - err, - self.session_summary() - ); + warn_rate_limited("session_header_invalid", || { + format!( + "RTR session received invalid header: err={}, state={}, negotiated_version={:?}", + err, + self.state_name(), + self.version + ) + }); self.handle_header_read_error(raw_header, err).await?; return Ok(()); } @@ -244,11 +335,14 @@ where ) .await?; - warn!( - "RTR session received unsupported PDU type {}, closing session: {}", - header.pdu(), - self.session_summary() - ); + warn_rate_limited("session_unsupported_pdu", || { + format!( + "RTR session received unsupported PDU type {}, closing session: state={}, negotiated_version={:?}", + header.pdu(), + self.state_name(), + self.version + ) + }); self.state = SessionState::Closed; return Ok(()); } @@ -259,15 +353,29 @@ where if self.state == SessionState::Established && self.version.is_some() => { match notify_res { Ok(()) => { - debug!("RTR session handling cache update notify: {}", self.session_summary()); + debug!( + "RTR session handling cache update notify: state={}, negotiated_version={:?}", + self.state_name(), + self.version + ); self.handle_notify().await?; } Err(broadcast::error::RecvError::Lagged(_)) => { - warn!("RTR session lagged on notify channel, forcing notify handling: {}", self.session_summary()); + warn_rate_limited("session_notify_lagged", || { + format!( + "RTR session lagged on notify channel, forcing notify handling: state={}, negotiated_version={:?}", + self.state_name(), + self.version + ) + }); self.handle_notify().await?; } Err(broadcast::error::RecvError::Closed) => { - debug!("RTR session notify channel closed, keeping session alive: {}", self.session_summary()); + debug!( + "RTR session notify channel closed, keeping session alive: state={}, negotiated_version={:?}", + self.state_name(), + self.version + ); // keep session alive } } @@ -334,12 +442,14 @@ where return Ok(()); } - warn!( - "RTR session received unexpected protocol version in established session: established_version={}, received_version={}, pdu={}", - established, - header.version(), - header.pdu() - ); + warn_rate_limited("session_unexpected_protocol_version", || { + format!( + "RTR session received unexpected protocol version in established session: established_version={}, received_version={}, pdu={}", + established, + header.version(), + header.pdu() + ) + }); if header.pdu() != ErrorReport::PDU { let offending = self.read_full_pdu_bytes(header).await?; @@ -575,39 +685,37 @@ where self.version, offending_pdu.len() ); - let (data_available, payloads, session_id, serial) = { + let (data_available, payloads, session_id, serial, timing) = { let version = self.version()?; - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; + let cache = self.cache.load_full(); let data_available = cache.is_data_available(); - let snapshot = cache.snapshot_for_version(version); - let payloads = snapshot.payloads_for_rtr(); + let payloads = cache.rtr_payloads_for_version(version); let session_id = cache.session_id_for_version(version); let serial = cache.serial_for_version(version); - (data_available, payloads, session_id, serial) + let timing = cache.timing(); + (data_available, payloads, session_id, serial, timing) }; if !data_available { self.send_no_data_available(offending_pdu, "cache data is not currently available") .await?; debug!( - "RTR session replied No Data Available to Reset Query: {}", - self.session_summary() + "RTR session replied No Data Available to Reset Query: negotiated_version={:?}", + self.version ); return Ok(()); } self.write_cache_response(session_id).await?; - self.send_payloads(&payloads, true).await?; - self.write_end_of_data(session_id, serial).await?; + self.send_payloads(payloads.as_slice(), true).await?; + self.write_end_of_data(session_id, serial, timing).await?; debug!( - "RTR session completed Reset Query: response_session_id={}, response_serial={}, payload_count={}, {}", + "RTR session completed Reset Query: response_session_id={}, response_serial={}, payload_count={}, state={}, negotiated_version={:?}", session_id, serial, payloads.len(), - self.session_summary() + self.state_name(), + self.version ); Ok(()) @@ -627,25 +735,19 @@ where client_serial, offending_pdu.len() ); - let (data_available, current_session) = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; - ( - cache.is_data_available(), - cache.session_id_for_version(version), - ) - }; + let cache = self.cache.load_full(); + let data_available = cache.is_data_available(); + let current_session = cache.session_id_for_version(version); if !data_available { self.send_no_data_available(offending_pdu, "cache data is not currently available") .await?; debug!( - "RTR session replied No Data Available to Serial Query: client_session_id={}, client_serial={}, {}", + "RTR session replied No Data Available to Serial Query: client_session_id={}, client_serial={}, state={}, negotiated_version={:?}", client_session, client_serial, - self.session_summary() + self.state_name(), + self.version ); return Ok(()); } @@ -663,75 +765,53 @@ where ); } - let serial_result = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; - cache.get_deltas_since_for_version(version, client_serial) - }; + let serial_result = cache.get_deltas_since_for_version(version, client_serial); + let current_session = cache.session_id_for_version(version); + let current_serial = cache.serial_for_version(version); + let timing = cache.timing(); match serial_result { SerialResult::ResetRequired => { self.write_cache_reset().await?; debug!( - "RTR session replied Cache Reset to Serial Query: client_session_id={}, client_serial={}, {}", + "RTR session replied Cache Reset to Serial Query: client_session_id={}, client_serial={}, state={}, negotiated_version={:?}", client_session, client_serial, - self.session_summary() + self.state_name(), + self.version ); return Ok(()); } SerialResult::UpToDate => { - let (current_session, current_serial) = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; - ( - cache.session_id_for_version(version), - cache.serial_for_version(version), - ) - }; - self.write_cache_response(current_session).await?; - self.write_end_of_data(current_session, current_serial) + self.write_end_of_data(current_session, current_serial, timing) .await?; debug!( - "RTR session replied CacheResponse+EndOfData (up-to-date) to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", + "RTR session replied CacheResponse+EndOfData (up-to-date) to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, state={}, negotiated_version={:?}", client_session, client_serial, current_session, current_serial, - self.session_summary() + self.state_name(), + self.version ); return Ok(()); } SerialResult::Delta(delta) => { - let (current_session, current_serial) = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; - ( - cache.session_id_for_version(version), - cache.serial_for_version(version), - ) - }; - self.write_cache_response(current_session).await?; self.send_delta(&delta).await?; - self.write_end_of_data(current_session, current_serial) + self.write_end_of_data(current_session, current_serial, timing) .await?; debug!( - "RTR session replied delta to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, {}", + "RTR session replied delta to Serial Query: client_session_id={}, client_serial={}, response_session_id={}, response_serial={}, state={}, negotiated_version={:?}", client_session, client_serial, current_session, current_serial, - self.session_summary() + self.state_name(), + self.version ); } } @@ -753,18 +833,16 @@ where if let Some(last) = self.last_notify_at { if now.duration_since(last) < NOTIFY_MIN_INTERVAL { debug!( - "RTR session notify skipped due to rate limit: {}", - self.session_summary() + "RTR session notify skipped due to rate limit: state={}, negotiated_version={:?}", + self.state_name(), + self.version ); return Ok(()); } } let (session_id, serial) = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; + let cache = self.cache.load_full(); ( cache.session_id_for_version(version), cache.serial_for_version(version), @@ -772,56 +850,50 @@ where }; debug!( - "RTR session sending SerialNotify: notify_session_id={}, notify_serial={}, {}", + "RTR session sending SerialNotify: notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}", session_id, serial, - self.session_summary() + self.state_name(), + self.version ); if let Err(err) = SerialNotify::new(version, session_id, serial) .write(&mut self.stream) .await { - error!( - "RTR session failed to send SerialNotify: err={}, notify_session_id={}, notify_serial={}, {}", - err, - session_id, - serial, - self.session_summary() - ); + error_rate_limited("session_serial_notify_send_failed", || { + format!( + "RTR session failed to send SerialNotify: err={}, notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}", + err, + session_id, + serial, + self.state_name(), + self.version + ) + }); return Err(err.into()); } self.last_notify_at = Some(now); info!( - "RTR session sent SerialNotify: notify_session_id={}, notify_serial={}, {}", + "RTR session sent SerialNotify: notify_session_id={}, notify_serial={}, state={}, negotiated_version={:?}", session_id, serial, - self.session_summary() + self.state_name(), + self.version ); Ok(()) } fn session_summary(&self) -> String { - let serial = self - .cache - .read() - .ok() - .and_then(|cache| { - self.version - .map(|version| cache.serial_for_version(version)) - }) - .map(|serial| serial.to_string()) - .unwrap_or_else(|| "".to_string()); - let session_id = self - .version - .and_then(|version| { - self.cache - .read() - .ok() - .map(|cache| cache.session_id_for_version(version)) - }) - .map(|id| id.to_string()) - .unwrap_or_else(|| "".to_string()); + let (session_id, serial) = if let Some(version) = self.version { + let cache = self.cache.load_full(); + ( + cache.session_id_for_version(version).to_string(), + cache.serial_for_version(version).to_string(), + ) + } else { + ("".to_string(), "".to_string()) + }; format!( "state={}, negotiated_version={:?}, cache_session_id={}, cache_serial={}", self.state_name(), @@ -849,12 +921,7 @@ where return timeout; } - let retry = self - .cache - .read() - .ok() - .map(|cache| cache.timing().retry()) - .unwrap_or_else(|| Duration::from_secs(600)); + let retry = self.cache.load_full().timing().retry(); retry.checked_mul(3).unwrap_or(retry) } @@ -878,15 +945,8 @@ where Ok(()) } - async fn write_end_of_data(&mut self, session_id: u16, serial: u32) -> Result<()> { + async fn write_end_of_data(&mut self, session_id: u16, serial: u32, timing: Timing) -> Result<()> { let version = self.version()?; - let timing = { - let cache = self - .cache - .read() - .map_err(|_| anyhow!("cache read lock poisoned"))?; - cache.timing() - }; let end = EndOfData::new(version, session_id, serial, timing)?; debug!( @@ -932,7 +992,7 @@ where } async fn send_delta(&mut self, delta: &Delta) -> Result<()> { - let updates = delta.payloads_for_rtr(); + let updates = delta.payload_updates_for_rtr(); // draft-ietf-sidrops-8210bis-25 Section 11.4 / 12 define Ordering Error // for the party receiving out-of-order PDUs. A validator failure here // means we are about to send an invalid sequence, so abort locally @@ -953,7 +1013,7 @@ where aspas ); for (announce, payload) in updates { - self.send_payload(&payload, announce).await?; + self.send_payload(payload, *announce).await?; } Ok(()) } @@ -1065,26 +1125,30 @@ where text: &[u8], ) -> io::Result<()> { let text_preview = String::from_utf8_lossy(text); - warn!( - "RTR session sending ErrorReport: version={}, error_code={}({}), offending_pdu_len={}, text={}", - version, - code.as_u16(), - code.description(), - offending_pdu.len(), - text_preview - ); + warn_rate_limited("session_error_report_sent", || { + format!( + "RTR session sending ErrorReport: version={}, error_code={}({}), offending_pdu_len={}, text={}", + version, + code.as_u16(), + code.description(), + offending_pdu.len(), + text_preview + ) + }); ErrorReport::new(version, code.as_u16(), offending_pdu, text) .write(&mut self.stream) .await } async fn handle_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> { - warn!( - "RTR session failed to read established-session PDU payload: pdu={}, version={}, err={}", - header.pdu(), - header.version(), - err - ); + warn_rate_limited("session_established_pdu_read_failed", || { + format!( + "RTR session failed to read established-session PDU payload: pdu={}, version={}, err={}", + header.pdu(), + header.version(), + err + ) + }); if err.kind() == io::ErrorKind::InvalidData { let offending = self.read_full_pdu_bytes(header).await?; let version = self.version()?; @@ -1099,12 +1163,14 @@ where } async fn handle_first_pdu_read_error(&mut self, header: Header, err: io::Error) -> Result<()> { - warn!( - "RTR session failed to read first PDU payload: pdu={}, version={}, err={}", - header.pdu(), - header.version(), - err - ); + warn_rate_limited("session_first_pdu_read_failed", || { + format!( + "RTR session failed to read first PDU payload: pdu={}, version={}, err={}", + header.pdu(), + header.version(), + err + ) + }); if err.kind() == io::ErrorKind::InvalidData { let offending = self.read_full_pdu_bytes(header).await?; let err_version = @@ -1129,10 +1195,12 @@ where raw_header: [u8; HEADER_LEN], err: io::Error, ) -> Result<()> { - warn!( - "RTR session handling invalid header bytes: raw_header={:02X?}, err={}", - raw_header, err - ); + warn_rate_limited("session_invalid_header_bytes", || { + format!( + "RTR session handling invalid header bytes: raw_header={:02X?}, err={}", + raw_header, err + ) + }); if err.kind() == io::ErrorKind::InvalidData { let version = match self.version { Some(version) => version, @@ -1158,12 +1226,14 @@ where let version = self.version.unwrap_or(SUPPORTED_MAX_VERSION); let timeout = self.transport_timeout(); let detail = format!("transport stalled for longer than {:?}", timeout); - warn!( - "RTR session transport timeout: version={}, offending_pdu_len={}, timeout={:?}", - version, - offending_pdu.len(), - timeout - ); + warn_rate_limited("session_transport_timeout", || { + format!( + "RTR session transport timeout: version={}, offending_pdu_len={}, timeout={:?}", + version, + offending_pdu.len(), + timeout + ) + }); let _ = self .send_error( @@ -1187,16 +1257,39 @@ where )); }; - let mut bytes = Vec::with_capacity(total_len); + let capture_len = total_len.min(MAX_OFFENDING_PDU_CAPTURE); + let capture_payload_len = capture_len.saturating_sub(HEADER_LEN); + let mut bytes = Vec::with_capacity(capture_len); bytes.extend_from_slice(header.as_ref()); - bytes.resize(total_len, 0); - timeout( - self.transport_timeout(), - self.stream - .read_exact(&mut bytes[HEADER_LEN..HEADER_LEN + payload_len]), - ) - .await - .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??; + bytes.resize(capture_len, 0); + if capture_payload_len > 0 { + timeout( + self.transport_timeout(), + self.stream + .read_exact(&mut bytes[HEADER_LEN..HEADER_LEN + capture_payload_len]), + ) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??; + } + + let mut remaining = payload_len.saturating_sub(capture_payload_len); + let mut discard_buf = [0u8; 1024]; + while remaining > 0 { + let read_len = remaining.min(discard_buf.len()); + let read = timeout( + self.transport_timeout(), + self.stream.read(&mut discard_buf[..read_len]), + ) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "transport read timed out"))??; + if read == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "unexpected EOF while draining oversized PDU payload", + )); + } + remaining -= read; + } Ok(bytes) } }