rpki/tests/test_session.rs
2026-03-25 10:08:40 +08:00

1783 lines
61 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

mod common;
use std::collections::VecDeque;
use std::fs::File;
use std::io::BufReader;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use rustls::{ClientConfig, RootCertStore};
use rustls_pki_types::{CertificateDer, PrivateKeyDer, ServerName};
use serde_json::json;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Duration};
use tokio_rustls::{TlsAcceptor, TlsConnector};
use common::test_helper::{
dump_cache_reset, dump_cache_response, dump_eod_v1, dump_ipv4_prefix, dump_ipv6_prefix,
RtrDebugDumper,
};
use rpki::data_model::resources::ip_resources::{IPAddress, IPAddressPrefix};
use rpki::data_model::resources::as_resources::Asn;
use rpki::rtr::cache::{Delta, RtrCacheBuilder, SessionIds, SharedRtrCache, Snapshot};
use rpki::rtr::error_type::ErrorCode;
use rpki::rtr::payload::{Aspa, Payload, RouteOrigin, RouterKey, Ski, Timing};
use rpki::rtr::pdu::{
Aspa as AspaPdu, CacheReset, CacheResponse, EndOfDataV1, ErrorReport, Header, IPv4Prefix,
IPv6Prefix, ResetQuery, RouterKey as RouterKeyPdu, SerialNotify, SerialQuery,
};
use rpki::rtr::server::connection::handle_tls_connection;
use rpki::rtr::server::tls::load_rustls_server_config_with_options;
use rpki::rtr::session::RtrSession;
fn shared_cache(cache: rpki::rtr::cache::RtrCache) -> SharedRtrCache {
Arc::new(RwLock::new(cache))
}
async fn start_session_server(
cache: SharedRtrCache,
) -> (
SocketAddr,
broadcast::Sender<()>,
watch::Sender<bool>,
JoinHandle<()>,
) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (notify_tx, notify_rx) = broadcast::channel(16);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let session = RtrSession::new(cache, stream, notify_rx, shutdown_rx);
let _ = session.run().await;
});
(addr, notify_tx, shutdown_tx, handle)
}
async fn start_session_server_with_transport_timeout(
cache: SharedRtrCache,
transport_timeout: Duration,
) -> (
SocketAddr,
broadcast::Sender<()>,
watch::Sender<bool>,
JoinHandle<()>,
) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (notify_tx, notify_rx) = broadcast::channel(16);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let session = RtrSession::new(cache, stream, notify_rx, shutdown_rx)
.with_transport_timeout(transport_timeout);
let _ = session.run().await;
});
(addr, notify_tx, shutdown_tx, handle)
}
async fn start_session_server_returning_result(
cache: SharedRtrCache,
) -> (
SocketAddr,
watch::Sender<bool>,
JoinHandle<anyhow::Result<()>>,
) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (_notify_tx, notify_rx) = broadcast::channel(16);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let session = RtrSession::new(cache, stream, notify_rx, shutdown_rx);
session.run().await
});
(addr, shutdown_tx, handle)
}
async fn start_tls_session_server(
cache: SharedRtrCache,
) -> (
SocketAddr,
watch::Sender<bool>,
JoinHandle<()>,
) {
start_tls_session_server_with_cert(cache, "server.crt", "server.key").await
}
async fn start_tls_session_server_with_cert(
cache: SharedRtrCache,
cert_name: &str,
key_name: &str,
) -> (
SocketAddr,
watch::Sender<bool>,
JoinHandle<()>,
) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (_notify_tx, notify_rx) = broadcast::channel(16);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let tls_config = Arc::new(
load_rustls_server_config_with_options(
fixture_path(cert_name),
fixture_path(key_name),
fixture_path("client-ca.crt"),
false,
)
.unwrap(),
);
let acceptor = TlsAcceptor::from(tls_config);
let handle = tokio::spawn(async move {
let Ok((stream, peer_addr)) = listener.accept().await else {
return;
};
let _ = handle_tls_connection(cache, stream, peer_addr, acceptor, notify_rx, shutdown_rx).await;
});
(addr, shutdown_tx, handle)
}
async fn shutdown_server(
mut client: TcpStream,
shutdown_tx: watch::Sender<bool>,
server_handle: JoinHandle<()>,
) {
shutdown_io(&mut client, shutdown_tx, server_handle).await;
}
async fn shutdown_io<S>(
io: &mut S,
shutdown_tx: watch::Sender<bool>,
server_handle: JoinHandle<()>,
) where
S: AsyncWrite + Unpin,
{
let _ = io.shutdown().await;
let _ = shutdown_tx.send(true);
match timeout(Duration::from_secs(1), server_handle).await {
Ok(join_res) => {
let _ = join_res;
}
Err(_) => {
panic!("server task did not exit within timeout");
}
}
}
fn fixture_path(name: &str) -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("tls")
.join(name)
}
fn load_pem_certs(path: &Path) -> Vec<CertificateDer<'static>> {
let file = File::open(path).unwrap();
let mut reader = BufReader::new(file);
rustls_pemfile::certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.unwrap()
}
fn load_pem_key(path: &Path) -> PrivateKeyDer<'static> {
let file = File::open(path).unwrap();
let mut reader = BufReader::new(file);
rustls_pemfile::private_key(&mut reader)
.unwrap()
.expect("missing PEM private key")
}
async fn connect_tls_client(
addr: SocketAddr,
cert_name: &str,
key_name: &str,
) -> tokio_rustls::client::TlsStream<TcpStream> {
connect_tls_client_with_server_name(
addr,
cert_name,
key_name,
ServerName::IpAddress(addr.ip().into()),
)
.await
}
async fn connect_tls_client_with_server_name(
addr: SocketAddr,
cert_name: &str,
key_name: &str,
server_name: ServerName<'static>,
) -> tokio_rustls::client::TlsStream<TcpStream> {
let mut roots = RootCertStore::empty();
for cert in load_pem_certs(&fixture_path("client-ca.crt")) {
roots.add(cert).unwrap();
}
let certs = load_pem_certs(&fixture_path(cert_name));
let key = load_pem_key(&fixture_path(key_name));
let client_config = ClientConfig::builder()
.with_root_certificates(roots)
.with_client_auth_cert(certs, key)
.unwrap();
let connector = TlsConnector::from(Arc::new(client_config));
let tcp = TcpStream::connect(addr).await.unwrap();
connector.connect(server_name, tcp).await.unwrap()
}
/// 用于 dump Serial Notify保持输出风格一致。
fn dump_serial_notify(notify: &SerialNotify) -> serde_json::Value {
json!({
"version": notify.version(),
"pdu": notify.pdu(),
"pdu_name": "Serial Notify",
"session_id": notify.session_id(),
"serial_number": notify.serial_number(),
})
}
fn assert_error_report_matches(
report: &ErrorReport,
version: u8,
code: ErrorCode,
offending_pdu: &[u8],
) {
assert_eq!(report.version(), version);
assert_eq!(report.error_code(), Ok(code));
assert_eq!(report.erroneous_pdu(), offending_pdu);
}
/// 测试Reset Query 会返回完整 snapshot并以 End of Data 结束响应。
#[tokio::test]
async fn reset_query_returns_snapshot_and_end_of_data() {
let prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)),
prefix_length: 24,
};
let origin = RouteOrigin::new(prefix, 24, 64496u32.into());
let snapshot = Snapshot::from_payloads(vec![Payload::RouteOrigin(origin)]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let prefix = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(prefix.pdu(), dump_ipv4_prefix(&prefix));
assert_eq!(prefix.pdu(), 4);
assert_eq!(prefix.version(), 1);
assert!(prefix.flag().is_announce());
assert_eq!(prefix.prefix_len(), 24);
assert_eq!(prefix.max_len(), 24);
assert_eq!(prefix.prefix(), Ipv4Addr::new(192, 0, 2, 0));
assert_eq!(prefix.asn(), 64496u32.into());
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
let timing = eod.timing();
assert_eq!(timing.refresh, 600);
assert_eq!(timing.retry, 600);
assert_eq!(timing.expire, 7200);
dump.print_pretty("reset_query_returns_snapshot_and_end_of_data");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn reset_query_uses_version_specific_session_id() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([40, 41, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(2).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 2);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.version(), 2);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
dump.print_pretty("reset_query_uses_version_specific_session_id");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:当 Serial Query 的 session_id 和 serial 都与当前 cache 一致时,仅返回 End of Data。
#[tokio::test]
async fn serial_query_returns_end_of_data_when_up_to_date() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing {
refresh: 600,
retry: 600,
expire: 7200,
})
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
let timing = eod.timing();
assert_eq!(timing.refresh, 600);
assert_eq!(timing.retry, 600);
assert_eq!(timing.expire, 7200);
dump.print_pretty("serial_query_returns_end_of_data_when_up_to_date");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:当已建立 session 后收到错误的 session_id 时,返回 CorruptData 并关闭连接。
#[tokio::test]
async fn serial_query_returns_corrupt_data_when_session_id_mismatch() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing {
refresh: 600,
retry: 600,
expire: 7200,
})
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 999, 100).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let report = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&report,
1,
ErrorCode::CorruptData,
SerialQuery::new(1, 999, 100).as_ref(),
);
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_corrupt_session_id",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("serial_query_returns_corrupt_data_when_session_id_mismatch");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试当增量更新可用时Serial Query 返回 Cache Response + delta payload + End of Data。
#[tokio::test]
async fn serial_query_returns_deltas_when_incremental_update_available() {
let prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)),
prefix_length: 24,
};
let origin = RouteOrigin::new(prefix, 24, 64496u32.into());
let delta = Arc::new(Delta::new(101, vec![Payload::RouteOrigin(origin)], vec![]));
let mut deltas = VecDeque::new();
deltas.push_back(delta);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(101)
.timing(Timing {
refresh: 600,
retry: 600,
expire: 7200,
})
.deltas(deltas)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let prefix = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(prefix.pdu(), dump_ipv4_prefix(&prefix));
assert_eq!(prefix.pdu(), 4);
assert_eq!(prefix.version(), 1);
assert!(prefix.flag().is_announce());
assert_eq!(prefix.prefix_len(), 24);
assert_eq!(prefix.max_len(), 24);
assert_eq!(prefix.prefix(), Ipv4Addr::new(192, 0, 2, 0));
assert_eq!(prefix.asn(), 64496u32.into());
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 101);
let timing = eod.timing();
assert_eq!(timing.refresh, 600);
assert_eq!(timing.retry, 600);
assert_eq!(timing.expire, 7200);
dump.print_pretty("serial_query_returns_deltas_when_incremental_update_available");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn serial_query_returns_deltas_across_serial_wraparound() {
let first_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)),
prefix_length: 24,
};
let first_origin = RouteOrigin::new(first_prefix, 24, 64496u32.into());
let second_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(198, 51, 100, 0)),
prefix_length: 24,
};
let second_origin = RouteOrigin::new(second_prefix, 24, 64497u32.into());
let d_max = Arc::new(Delta::new(
u32::MAX,
vec![Payload::RouteOrigin(first_origin.clone())],
vec![],
));
let d_zero = Arc::new(Delta::new(
0,
vec![Payload::RouteOrigin(second_origin.clone())],
vec![],
));
let mut deltas = VecDeque::new();
deltas.push_back(d_max);
deltas.push_back(d_zero);
let snapshot = Snapshot::from_payloads(vec![
Payload::RouteOrigin(first_origin),
Payload::RouteOrigin(second_origin),
]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(0)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.deltas(deltas)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, u32::MAX.wrapping_sub(1))
.write(&mut client)
.await
.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let first = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(first.pdu(), dump_ipv4_prefix(&first));
assert!(first.flag().is_announce());
assert_eq!(first.prefix(), Ipv4Addr::new(198, 51, 100, 0));
let second = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(second.pdu(), dump_ipv4_prefix(&second));
assert!(second.flag().is_announce());
assert_eq!(second.prefix(), Ipv4Addr::new(192, 0, 2, 0));
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 0);
dump.print_pretty("serial_query_returns_deltas_across_serial_wraparound");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn serial_query_returns_cache_reset_for_future_serial_across_wraparound() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(u32::MAX)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, 0).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let reset = CacheReset::read(&mut client).await.unwrap();
dump.push_value(reset.pdu(), dump_cache_reset(reset.version(), reset.pdu()));
assert_eq!(reset.pdu(), CacheReset::PDU);
assert_eq!(reset.version(), 1);
dump.print_pretty("serial_query_returns_cache_reset_for_future_serial_across_wraparound");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试Reset Query 返回的 payload 顺序符合当前实现的 RTR 排序规则。
#[tokio::test]
async fn reset_query_returns_payloads_in_rtr_order() {
let v4_low_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)),
prefix_length: 24,
};
let v4_low_origin = RouteOrigin::new(v4_low_prefix, 24, 64496u32.into());
let v4_high_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(198, 51, 100, 0)),
prefix_length: 24,
};
let v4_high_origin = RouteOrigin::new(v4_high_prefix, 24, 64497u32.into());
let v6_prefix = IPAddressPrefix {
address: IPAddress::from_ipv6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0)),
prefix_length: 32,
};
let v6_origin = RouteOrigin::new(v6_prefix, 48, 64498u32.into());
let snapshot = Snapshot::from_payloads(vec![
Payload::RouteOrigin(v6_origin),
Payload::RouteOrigin(v4_low_origin),
Payload::RouteOrigin(v4_high_origin),
]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let first = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(first.pdu(), dump_ipv4_prefix(&first));
assert_eq!(first.pdu(), 4);
assert_eq!(first.version(), 1);
assert!(first.flag().is_announce());
assert_eq!(first.prefix(), Ipv4Addr::new(198, 51, 100, 0));
assert_eq!(first.prefix_len(), 24);
assert_eq!(first.max_len(), 24);
assert_eq!(first.asn(), 64497u32.into());
let second = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(second.pdu(), dump_ipv4_prefix(&second));
assert_eq!(second.pdu(), 4);
assert_eq!(second.version(), 1);
assert!(second.flag().is_announce());
assert_eq!(second.prefix(), Ipv4Addr::new(192, 0, 2, 0));
assert_eq!(second.prefix_len(), 24);
assert_eq!(second.max_len(), 24);
assert_eq!(second.asn(), 64496u32.into());
assert!(u32::from(first.prefix()) > u32::from(second.prefix()));
let third = IPv6Prefix::read(&mut client).await.unwrap();
dump.push_value(third.pdu(), dump_ipv6_prefix(&third));
assert_eq!(third.pdu(), 6);
assert_eq!(third.version(), 1);
assert!(third.flag().is_announce());
assert_eq!(third.prefix(), Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0));
assert_eq!(third.prefix_len(), 32);
assert_eq!(third.max_len(), 48);
assert_eq!(third.asn(), 64498u32.into());
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
let timing = eod.timing();
assert_eq!(timing.refresh, 600);
assert_eq!(timing.retry, 600);
assert_eq!(timing.expire, 7200);
dump.print_pretty("reset_query_returns_payloads_in_rtr_order");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试Serial Query 返回的增量中announcement 在前withdrawal 在后,且各自内部顺序符合当前实现。
#[tokio::test]
async fn serial_query_returns_announcements_before_withdrawals() {
let announced_low_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)),
prefix_length: 24,
};
let announced_low_origin = RouteOrigin::new(announced_low_prefix, 24, 64496u32.into());
let announced_high_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(198, 51, 100, 0)),
prefix_length: 24,
};
let announced_high_origin = RouteOrigin::new(announced_high_prefix, 24, 64497u32.into());
let withdrawn_low_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(10, 0, 0, 0)),
prefix_length: 24,
};
let withdrawn_low_origin = RouteOrigin::new(withdrawn_low_prefix, 24, 64500u32.into());
let withdrawn_high_prefix = IPAddressPrefix {
address: IPAddress::from_ipv4(Ipv4Addr::new(203, 0, 113, 0)),
prefix_length: 24,
};
let withdrawn_high_origin = RouteOrigin::new(withdrawn_high_prefix, 24, 64501u32.into());
let delta = Arc::new(Delta::new(
101,
vec![
Payload::RouteOrigin(announced_low_origin),
Payload::RouteOrigin(announced_high_origin),
],
vec![
Payload::RouteOrigin(withdrawn_high_origin),
Payload::RouteOrigin(withdrawn_low_origin),
],
));
let mut deltas = VecDeque::new();
deltas.push_back(delta);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(101)
.timing(Timing {
refresh: 600,
retry: 600,
expire: 7200,
})
.deltas(deltas)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let first = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(first.pdu(), dump_ipv4_prefix(&first));
assert_eq!(first.pdu(), 4);
assert_eq!(first.version(), 1);
assert!(first.flag().is_announce());
assert_eq!(first.prefix(), Ipv4Addr::new(198, 51, 100, 0));
assert_eq!(first.prefix_len(), 24);
assert_eq!(first.max_len(), 24);
assert_eq!(first.asn(), 64497u32.into());
let second = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(second.pdu(), dump_ipv4_prefix(&second));
assert_eq!(second.pdu(), 4);
assert_eq!(second.version(), 1);
assert!(second.flag().is_announce());
assert_eq!(second.prefix(), Ipv4Addr::new(192, 0, 2, 0));
assert_eq!(second.prefix_len(), 24);
assert_eq!(second.max_len(), 24);
assert_eq!(second.asn(), 64496u32.into());
assert!(u32::from(first.prefix()) > u32::from(second.prefix()));
let third = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(third.pdu(), dump_ipv4_prefix(&third));
assert_eq!(third.pdu(), 4);
assert_eq!(third.version(), 1);
assert!(!third.flag().is_announce());
assert_eq!(third.prefix(), Ipv4Addr::new(10, 0, 0, 0));
assert_eq!(third.prefix_len(), 24);
assert_eq!(third.max_len(), 24);
assert_eq!(third.asn(), 64500u32.into());
let fourth = IPv4Prefix::read(&mut client).await.unwrap();
dump.push_value(fourth.pdu(), dump_ipv4_prefix(&fourth));
assert_eq!(fourth.pdu(), 4);
assert_eq!(fourth.version(), 1);
assert!(!fourth.flag().is_announce());
assert_eq!(fourth.prefix(), Ipv4Addr::new(203, 0, 113, 0));
assert_eq!(fourth.prefix_len(), 24);
assert_eq!(fourth.max_len(), 24);
assert_eq!(fourth.asn(), 64501u32.into());
assert!(u32::from(third.prefix()) < u32::from(fourth.prefix()));
assert!(first.flag().is_announce());
assert!(second.flag().is_announce());
assert!(!third.flag().is_announce());
assert!(!fourth.flag().is_announce());
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 101);
let timing = eod.timing();
assert_eq!(timing.refresh, 600);
assert_eq!(timing.retry, 600);
assert_eq!(timing.expire, 7200);
dump.print_pretty("serial_query_returns_announcements_before_withdrawals");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试session 建立后,收到 notify 广播时会发送 Serial Notify。
#[tokio::test]
async fn established_session_sends_serial_notify() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
notify_tx.send(()).unwrap();
let notify = SerialNotify::read(&mut client).await.unwrap();
dump.push_value(notify.pdu(), dump_serial_notify(&notify));
assert_eq!(notify.pdu(), SerialNotify::PDU);
assert_eq!(notify.version(), 1);
assert_eq!(notify.session_id(), 42);
assert_eq!(notify.serial_number(), 100);
dump.print_pretty("established_session_sends_serial_notify");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:首个 PDU 版本过高时,返回 UnsupportedProtocolVersion 错误并关闭连接。
#[tokio::test]
async fn first_pdu_with_too_high_version_returns_unsupported_version_error() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(3).write(&mut client).await.unwrap();
let report = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&report,
2,
ErrorCode::UnsupportedProtocolVersion,
ResetQuery::new(3).as_ref(),
);
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_unsupported_protocol_version",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("first_pdu_with_too_high_version_returns_unsupported_version_error");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:版本协商完成后,如果后续请求更换了协议版本,返回 UnexpectedProtocolVersion 并关闭连接。
#[tokio::test]
async fn session_rejects_version_change_after_negotiation() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
ResetQuery::new(2).write(&mut client).await.unwrap();
let report = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&report,
1,
ErrorCode::UnexpectedProtocolVersion,
ResetQuery::new(2).as_ref(),
);
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_unexpected_protocol_version",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("session_rejects_version_change_after_negotiation");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:在版本协商完成前,即使收到 notify 广播,也不能发送 Serial Notify。
#[tokio::test]
async fn notify_is_not_sent_before_version_negotiation() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
notify_tx.send(()).unwrap();
let res = timeout(Duration::from_millis(100), SerialNotify::read(&mut client)).await;
assert!(
res.is_err(),
"serial notify should not be sent before version negotiation"
);
dump.push_value(
0,
json!({
"event": "notify_before_version_negotiation",
"result": "no_serial_notify_received_within_timeout",
"timeout_ms": 100
}),
);
dump.print_pretty("notify_is_not_sent_before_version_negotiation");
shutdown_server(client, shutdown_tx, server_handle).await;
}
/// 测试:同一 session 在一分钟窗口内连续收到 notify 广播时,只会发送一个 Serial Notify。
#[tokio::test]
async fn serial_notify_is_rate_limited_to_once_per_minute() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let mut dump = RtrDebugDumper::new();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.pdu(), 3);
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.pdu(), 7);
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
notify_tx.send(()).unwrap();
let first_notify = SerialNotify::read(&mut client).await.unwrap();
dump.push_value(first_notify.pdu(), dump_serial_notify(&first_notify));
assert_eq!(first_notify.pdu(), SerialNotify::PDU);
assert_eq!(first_notify.version(), 1);
assert_eq!(first_notify.session_id(), 42);
assert_eq!(first_notify.serial_number(), 100);
notify_tx.send(()).unwrap();
let second_res = timeout(Duration::from_millis(100), SerialNotify::read(&mut client)).await;
assert!(
second_res.is_err(),
"second serial notify should be rate-limited within one minute"
);
dump.push_value(
0,
json!({
"event": "second_notify_within_rate_limit_window",
"result": "no_second_serial_notify_received_within_timeout",
"timeout_ms": 100
}),
);
dump.print_pretty("serial_notify_is_rate_limited_to_once_per_minute");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn reset_query_returns_no_data_available_when_cache_is_unavailable() {
let cache = RtrCacheBuilder::new()
.availability(rpki::rtr::cache::CacheAvailability::NoDataAvailable)
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let report = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&report,
1,
ErrorCode::NoDataAvailable,
ResetQuery::new(1).as_ref(),
);
ResetQuery::new(1).write(&mut client).await.unwrap();
let second = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&second,
1,
ErrorCode::NoDataAvailable,
ResetQuery::new(1).as_ref(),
);
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn serial_query_returns_no_data_available_when_cache_is_unavailable() {
let cache = RtrCacheBuilder::new()
.availability(rpki::rtr::cache::CacheAvailability::NoDataAvailable)
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap();
let report = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&report,
1,
ErrorCode::NoDataAvailable,
SerialQuery::new(1, 42, 100).as_ref(),
);
SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap();
let second = ErrorReport::read(&mut client).await.unwrap();
assert_error_report_matches(
&second,
1,
ErrorCode::NoDataAvailable,
SerialQuery::new(1, 42, 100).as_ref(),
);
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn first_pdu_with_invalid_length_returns_corrupt_data() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, _shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
let header = Header::new(1, ResetQuery::PDU, 0, 9);
let mut request = Vec::from(header.as_ref());
request.push(0);
timeout(Duration::from_secs(1), client.write_all(&request))
.await
.expect("write_all timed out")
.unwrap();
dump.push_value(
ResetQuery::PDU,
json!({
"event": "invalid_first_pdu_sent",
"raw_hex": common::test_helper::bytes_to_hex(&request),
"length": request.len(),
}),
);
let report = timeout(Duration::from_secs(1), ErrorReport::read(&mut client))
.await
.expect("timed out waiting for ErrorReport")
.unwrap();
dump.push_value(
ErrorReport::PDU,
json!({
"version": report.version(),
"pdu": ErrorReport::PDU,
"pdu_name": "Error Report",
"error_code": report.error_code().map(|code| code.as_u16()).unwrap_or_else(|code| code),
"erroneous_pdu_len": report.erroneous_pdu().len(),
"erroneous_pdu_hex": common::test_helper::bytes_to_hex(report.erroneous_pdu()),
"text": String::from_utf8_lossy(report.text()),
}),
);
assert_error_report_matches(&report, 1, ErrorCode::CorruptData, &request);
assert!(std::str::from_utf8(report.text()).unwrap().contains("invalid length"));
let read_res = timeout(Duration::from_secs(1), Header::read(&mut client))
.await
.expect("timed out waiting for connection close");
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_invalid_first_pdu",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("first_pdu_with_invalid_length_returns_corrupt_data");
drop(client);
server_handle.abort();
let _ = server_handle.await;
}
#[tokio::test]
async fn established_session_closes_after_receiving_error_report() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
let report = ErrorReport::new(1, ErrorCode::InternalError.as_u16(), [], b"peer error");
client.write_all(report.as_ref()).await.unwrap();
dump.push_value(
ErrorReport::PDU,
json!({
"event": "peer_error_report_sent",
"version": report.version(),
"error_code": report.error_code().map(|code| code.as_u16()).unwrap_or_else(|code| code),
"text": String::from_utf8_lossy(report.text()),
}),
);
let read_res = timeout(Duration::from_secs(1), Header::read(&mut client))
.await
.unwrap();
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_receiving_peer_error_report",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("established_session_closes_after_receiving_error_report");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn established_session_invalid_header_returns_corrupt_data() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
let invalid_header = Header::new(1, SerialQuery::PDU, 42, 7);
client.write_all(invalid_header.as_ref()).await.unwrap();
dump.push_value(
SerialQuery::PDU,
json!({
"event": "invalid_header_sent_after_establishment",
"raw_hex": common::test_helper::bytes_to_hex(invalid_header.as_ref()),
"length": invalid_header.length(),
}),
);
let report = ErrorReport::read(&mut client).await.unwrap();
dump.push_value(
ErrorReport::PDU,
json!({
"version": report.version(),
"pdu": ErrorReport::PDU,
"pdu_name": "Error Report",
"error_code": report.error_code().map(|code| code.as_u16()).unwrap_or_else(|code| code),
"erroneous_pdu_len": report.erroneous_pdu().len(),
"erroneous_pdu_hex": common::test_helper::bytes_to_hex(report.erroneous_pdu()),
"text": String::from_utf8_lossy(report.text()),
}),
);
assert_error_report_matches(&report, 1, ErrorCode::CorruptData, invalid_header.as_ref());
assert!(std::str::from_utf8(report.text())
.unwrap()
.contains("invalid PDU length"));
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_invalid_established_header",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("established_session_invalid_header_returns_corrupt_data");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn established_session_unknown_pdu_returns_unsupported_pdu_type() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
let unknown_pdu = Header::new(1, 12, 0, 8);
client.write_all(unknown_pdu.as_ref()).await.unwrap();
dump.push_value(
12,
json!({
"event": "unknown_pdu_sent_after_establishment",
"raw_hex": common::test_helper::bytes_to_hex(unknown_pdu.as_ref()),
"length": unknown_pdu.length(),
}),
);
let report = ErrorReport::read(&mut client).await.unwrap();
dump.push_value(
ErrorReport::PDU,
json!({
"version": report.version(),
"pdu": ErrorReport::PDU,
"pdu_name": "Error Report",
"error_code": report.error_code().map(|code| code.as_u16()).unwrap_or_else(|code| code),
"erroneous_pdu_len": report.erroneous_pdu().len(),
"erroneous_pdu_hex": common::test_helper::bytes_to_hex(report.erroneous_pdu()),
"text": String::from_utf8_lossy(report.text()),
}),
);
assert_error_report_matches(&report, 1, ErrorCode::UnsupportedPduType, unknown_pdu.as_ref());
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_unknown_pdu",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("established_session_unknown_pdu_returns_unsupported_pdu_type");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn version_zero_does_not_send_router_key_or_aspa() {
let router_key = RouterKey::new(Ski::default(), Asn::from(64496u32), vec![1u8; 32]);
let aspa = Aspa::new(Asn::from(64496u32), vec![Asn::from(64497u32), Asn::from(64498u32)]);
let snapshot = Snapshot::from_payloads(vec![
Payload::RouterKey(router_key),
Payload::Aspa(aspa),
]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(0).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let eod = rpki::rtr::pdu::EndOfDataV0::read(&mut client).await.unwrap();
dump.push_value(
eod.pdu(),
json!({
"version": eod.version(),
"pdu": eod.pdu(),
"pdu_name": "End of Data",
"session_id": eod.session_id(),
"serial_number": eod.serial_number(),
}),
);
let res = timeout(Duration::from_millis(100), Header::read(&mut client)).await;
assert!(res.is_err(), "version 0 response should not contain RouterKey or ASPA PDUs");
dump.print_pretty("version_zero_does_not_send_router_key_or_aspa");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn version_two_aspa_withdraw_has_empty_provider_list() {
let aspa = Aspa::new(Asn::from(64496u32), vec![Asn::from(64497u32), Asn::from(64498u32)]);
let delta = Arc::new(Delta::new(101, vec![], vec![Payload::Aspa(aspa)]));
let mut deltas = VecDeque::new();
deltas.push_back(delta);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(101)
.timing(Timing::new(600, 600, 7200))
.deltas(deltas)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
SerialQuery::new(2, 42, 100).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let header = Header::read(&mut client).await.unwrap();
assert_eq!(header.pdu(), AspaPdu::PDU);
assert_eq!(header.length(), 12);
let mut body = [0u8; 4];
client.read_exact(&mut body).await.unwrap();
assert_eq!(u32::from_be_bytes(body), 64496);
dump.push_value(
AspaPdu::PDU,
json!({
"version": header.version(),
"pdu": header.pdu(),
"length": header.length(),
"customer_asn": u32::from_be_bytes(body),
"withdraw": true,
}),
);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
dump.print_pretty("version_two_aspa_withdraw_has_empty_provider_list");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn version_one_sends_router_key_but_not_aspa() {
let router_key = RouterKey::new(Ski::default(), Asn::from(64496u32), vec![1u8; 32]);
let aspa = Aspa::new(Asn::from(64496u32), vec![Asn::from(64497u32)]);
let snapshot = Snapshot::from_payloads(vec![
Payload::RouterKey(router_key),
Payload::Aspa(aspa),
]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) = start_session_server(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let header = Header::read(&mut client).await.unwrap();
assert_eq!(header.pdu(), RouterKeyPdu::PDU);
dump.push_value(
RouterKeyPdu::PDU,
json!({
"version": header.version(),
"pdu": header.pdu(),
"length": header.length(),
}),
);
let payload_len = header.length() as usize - 8;
let mut payload = vec![0u8; payload_len];
client.read_exact(&mut payload).await.unwrap();
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
let res = timeout(Duration::from_millis(100), Header::read(&mut client)).await;
assert!(res.is_err(), "version 1 response should not contain ASPA PDUs");
dump.print_pretty("version_one_sends_router_key_but_not_aspa");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn established_session_idle_timeout_returns_transport_failed() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, _notify_tx, shutdown_tx, server_handle) =
start_session_server_with_transport_timeout(server_cache, Duration::from_millis(100)).await;
let mut client = TcpStream::connect(addr).await.unwrap();
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
let report = timeout(Duration::from_secs(1), ErrorReport::read(&mut client))
.await
.expect("timed out waiting for transport failure ErrorReport")
.unwrap();
dump.push_value(
ErrorReport::PDU,
json!({
"version": report.version(),
"pdu": ErrorReport::PDU,
"pdu_name": "Error Report",
"error_code": report.error_code().map(|code| code.as_u16()).unwrap_or_else(|code| code),
"erroneous_pdu_len": report.erroneous_pdu().len(),
"erroneous_pdu_hex": common::test_helper::bytes_to_hex(report.erroneous_pdu()),
"text": String::from_utf8_lossy(report.text()),
}),
);
assert_eq!(report.version(), 1);
assert_eq!(report.error_code(), Ok(ErrorCode::TransportFailed));
assert!(report.erroneous_pdu().is_empty());
assert!(std::str::from_utf8(report.text()).unwrap().contains("transport stalled"));
let read_res = Header::read(&mut client).await;
assert!(read_res.is_err());
dump.push_value(
0,
json!({
"event": "connection_closed_after_transport_timeout",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("established_session_idle_timeout_returns_transport_failed");
shutdown_server(client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn tls_client_with_matching_san_ip_is_accepted() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, shutdown_tx, server_handle) = start_tls_session_server(server_cache).await;
let mut client = connect_tls_client(addr, "client-good.crt", "client-good.key").await;
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
dump.print_pretty("tls_client_with_matching_san_ip_is_accepted");
shutdown_io(&mut client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn tls_client_accepts_server_certificate_with_dns_san() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, shutdown_tx, server_handle) =
start_tls_session_server_with_cert(server_cache, "server-dns.crt", "server-dns.key").await;
let mut client = connect_tls_client_with_server_name(
addr,
"client-good.crt",
"client-good.key",
ServerName::try_from("localhost").unwrap(),
)
.await;
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
dump.push_value(response.pdu(), dump_cache_response(&response));
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let eod = EndOfDataV1::read(&mut client).await.unwrap();
dump.push_value(eod.pdu(), dump_eod_v1(&eod));
assert_eq!(eod.version(), 1);
assert_eq!(eod.session_id(), 42);
assert_eq!(eod.serial_number(), 100);
dump.print_pretty("tls_client_accepts_server_certificate_with_dns_san");
shutdown_io(&mut client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn tls_server_dns_name_san_strict_mode_rejects_ip_only_certificate() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let err = load_rustls_server_config_with_options(
fixture_path("server.crt"),
fixture_path("server.key"),
fixture_path("client-ca.crt"),
true,
)
.unwrap_err();
assert!(err
.to_string()
.contains("does not contain a subjectAltName dNSName entry"));
let _ = cache;
}
#[tokio::test]
async fn tls_client_with_mismatched_san_ip_is_rejected() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, shutdown_tx, server_handle) = start_tls_session_server(server_cache).await;
let mut client = connect_tls_client(addr, "client-bad.crt", "client-bad.key").await;
let mut dump = RtrDebugDumper::new();
ResetQuery::new(1).write(&mut client).await.unwrap();
dump.push_value(
ResetQuery::PDU,
json!({
"event": "tls_reset_query_sent_with_bad_client_cert",
"version": 1,
}),
);
let read_res = timeout(Duration::from_secs(1), Header::read(&mut client))
.await
.expect("timed out waiting for TLS session close");
assert!(read_res.is_err(), "server should close TLS session when client SAN IP mismatches");
dump.push_value(
0,
json!({
"event": "tls_session_closed_after_san_ip_mismatch",
"result": "header_read_failed_as_expected"
}),
);
dump.print_pretty("tls_client_with_mismatched_san_ip_is_rejected");
shutdown_io(&mut client, shutdown_tx, server_handle).await;
}
#[tokio::test]
async fn invalid_timing_prevents_end_of_data_response() {
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 8000, 7200))
.build();
let server_cache = shared_cache(cache);
let (addr, shutdown_tx, server_handle) = start_session_server_returning_result(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(1).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
assert_eq!(response.version(), 1);
assert_eq!(response.session_id(), 42);
let read_res = timeout(Duration::from_secs(1), Header::read(&mut client))
.await
.expect("timed out waiting for server close");
assert!(read_res.is_err(), "server should close instead of sending invalid EndOfData");
let _ = shutdown_tx.send(true);
let join = timeout(Duration::from_secs(1), server_handle)
.await
.expect("server task did not exit within timeout")
.unwrap();
let err = join.expect_err("session should fail on invalid timing");
assert!(err.to_string().contains("retry interval"));
}
#[tokio::test]
async fn invalid_aspa_prevents_snapshot_response() {
let snapshot = Snapshot::from_payloads(vec![Payload::Aspa(Aspa::new(
Asn::from(64496u32),
vec![],
))]);
let cache = RtrCacheBuilder::new()
.session_ids(SessionIds::from_array([42, 42, 42]))
.serial(100)
.timing(Timing::new(600, 600, 7200))
.snapshot(snapshot)
.build();
let server_cache = shared_cache(cache);
let (addr, shutdown_tx, server_handle) = start_session_server_returning_result(server_cache).await;
let mut client = TcpStream::connect(addr).await.unwrap();
ResetQuery::new(2).write(&mut client).await.unwrap();
let response = CacheResponse::read(&mut client).await.unwrap();
assert_eq!(response.version(), 2);
assert_eq!(response.session_id(), 42);
let read_res = timeout(Duration::from_secs(1), Header::read(&mut client))
.await
.expect("timed out waiting for server close");
assert!(read_res.is_err(), "server should close instead of sending invalid ASPA");
let _ = shutdown_tx.send(true);
let join = timeout(Duration::from_secs(1), server_handle)
.await
.expect("server task did not exit within timeout")
.unwrap();
let err = join.expect_err("session should fail on invalid ASPA");
assert!(err.to_string().contains("ASPA announcement"));
}