rpki/src/rtr/cache/store.rs
2026-03-30 10:03:43 +08:00

209 lines
6.7 KiB
Rust

use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::Result;
use crate::rtr::payload::{Payload, Timing};
use crate::rtr::store::RtrStore;
use super::core::{AppliedUpdate, CacheAvailability, RtrCache, RtrCacheBuilder, SessionIds};
use super::model::Snapshot;
impl RtrCache {
pub fn init(
self,
store: &RtrStore,
max_delta: u8,
prune_delta_by_snapshot_size: bool,
timing: Timing,
file_loader: impl Fn() -> Result<Vec<Payload>>,
) -> Result<Self> {
if let Some(cache) = try_restore_from_store(
store,
max_delta,
prune_delta_by_snapshot_size,
timing,
)? {
tracing::info!(
"RTR cache restored from store: availability={:?}, session_ids={:?}, serial={}, snapshot(route_origins={}, router_keys={}, aspas={})",
cache.availability(),
cache.session_ids(),
cache.serial(),
cache.snapshot().origins().len(),
cache.snapshot().router_keys().len(),
cache.snapshot().aspas().len()
);
return Ok(cache);
}
tracing::warn!("RTR cache store unavailable or invalid, fallback to file loader");
let payloads = file_loader()?;
let session_ids = SessionIds::random_distinct();
let snapshot = Snapshot::from_payloads(payloads);
let availability = if snapshot.is_empty() {
CacheAvailability::NoDataAvailable
} else {
CacheAvailability::Ready
};
let serial = if snapshot.is_empty() { 0 } else { 1 };
if snapshot.is_empty() {
tracing::warn!(
"RTR cache initialized without usable data: session_ids={:?}, serial={}",
session_ids,
serial
);
} else {
tracing::info!(
"RTR cache initialized from file loader: session_ids={:?}, serial={}",
session_ids,
serial
);
}
let snapshot_for_store = snapshot.clone();
let session_ids_for_store = session_ids.clone();
tokio::spawn({
let store = store.clone();
async move {
if let Err(e) = store.save_cache_state(
availability,
&snapshot_for_store,
&session_ids_for_store,
serial,
None,
None,
true,
) {
tracing::error!("persist cache state failed: {:?}", e);
}
}
});
Ok(RtrCacheBuilder::new()
.availability(availability)
.session_ids(session_ids)
.max_delta(max_delta)
.prune_delta_by_snapshot_size(prune_delta_by_snapshot_size)
.timing(timing)
.serial(serial)
.snapshot(snapshot)
.build())
}
pub fn update(&mut self, new_payloads: Vec<Payload>, store: &RtrStore) -> Result<()> {
if let Some(update) = self.apply_update(new_payloads)? {
spawn_store_sync(store, update);
}
Ok(())
}
}
fn try_restore_from_store(
store: &RtrStore,
max_delta: u8,
prune_delta_by_snapshot_size: bool,
timing: Timing,
) -> Result<Option<RtrCache>> {
let snapshot = store.get_snapshot()?;
let session_ids = store.get_session_ids()?;
let serial = store.get_serial()?;
let availability = store.get_availability()?;
let (snapshot, session_ids, serial) = match (snapshot, session_ids, serial) {
(Some(snapshot), Some(session_ids), Some(serial)) => (snapshot, session_ids, serial),
_ => {
tracing::warn!("RTR cache store incomplete: snapshot/session_ids/serial missing");
return Ok(None);
}
};
let availability = availability.unwrap_or_else(|| {
tracing::warn!("RTR cache store missing availability metadata, defaulting to Ready");
CacheAvailability::Ready
});
let deltas = if availability == CacheAvailability::NoDataAvailable {
tracing::warn!("RTR cache store restored in no-data-available state");
VecDeque::with_capacity(max_delta.into())
} else {
match store.get_delta_window()? {
Some((min_serial, max_serial)) => {
match store.load_delta_window(min_serial, max_serial) {
Ok(deltas) => deltas.into_iter().map(Arc::new).collect(),
Err(err) => {
tracing::warn!(
"RTR cache store delta recovery failed, treat store as unusable: {:?}",
err
);
return Ok(None);
}
}
}
None => {
tracing::info!("RTR cache store has no delta window, restore snapshot only");
VecDeque::with_capacity(max_delta.into())
}
}
};
Ok(Some(
RtrCacheBuilder::new()
.availability(availability)
.session_ids(session_ids)
.max_delta(max_delta)
.prune_delta_by_snapshot_size(prune_delta_by_snapshot_size)
.timing(timing)
.serial(serial)
.snapshot(snapshot)
.deltas(deltas)
.build(),
))
}
fn spawn_store_sync(store: &RtrStore, update: AppliedUpdate) {
let AppliedUpdate {
availability,
snapshot,
serial,
session_ids,
delta,
delta_window,
clear_delta_window,
} = update;
tokio::spawn({
let store = store.clone();
async move {
tracing::debug!(
"persisting RTR cache state: availability={:?}, serial={}, session_ids={:?}, delta_present={}, delta_window={:?}, clear_delta_window={}, snapshot(route_origins={}, router_keys={}, aspas={})",
availability,
serial,
session_ids,
delta.is_some(),
delta_window,
clear_delta_window,
snapshot.origins().len(),
snapshot.router_keys().len(),
snapshot.aspas().len()
);
if let Err(e) = store.save_cache_state(
availability,
&snapshot,
&session_ids,
serial,
delta.as_deref(),
delta_window,
clear_delta_window,
) {
tracing::error!("persist cache state failed: {:?}", e);
} else {
tracing::debug!("persist RTR cache state completed: serial={}", serial);
}
}
});
}