mod common; use std::collections::VecDeque; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::{Arc, RwLock}; use tokio::net::TcpListener; use tokio::sync::{broadcast, watch}; use common::test_helper::{ dump_cache_reset, dump_cache_response, dump_eod_v1, dump_ipv4_prefix, dump_ipv6_prefix, RtrDebugDumper, }; use rpki::data_model::resources::ip_resources::{IPAddress, IPAddressPrefix}; use rpki::rtr::cache::{Delta, SharedRtrCache, RtrCacheBuilder, Snapshot}; use rpki::rtr::payload::{Payload, RouteOrigin, Timing}; use rpki::rtr::pdu::{ CacheResponse, CacheReset, EndOfDataV1, IPv4Prefix, IPv6Prefix, ResetQuery, SerialQuery, }; use rpki::rtr::session::RtrSession; fn shared_cache(cache: rpki::rtr::cache::RtrCache) -> SharedRtrCache { Arc::new(RwLock::new(cache)) } #[tokio::test] async fn reset_query_returns_snapshot_and_end_of_data() { let prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)), prefix_length: 24, }; let origin = RouteOrigin::new(prefix, 24, 64496u32.into()); let snapshot = Snapshot::from_payloads(vec![Payload::RouteOrigin(origin)]); let cache = RtrCacheBuilder::new() .session_id(42) .serial(100) .timing(Timing::new(600, 600, 7200)) .snapshot(snapshot) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); ResetQuery::new(1).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let response = CacheResponse::read(&mut client).await.unwrap(); dump.push_value(response.pdu(), dump_cache_response(&response)); assert_eq!(response.pdu(), 3); assert_eq!(response.version(), 1); assert_eq!(response.session_id(), 42); let prefix = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(prefix.pdu(), dump_ipv4_prefix(&prefix)); assert_eq!(prefix.pdu(), 4); assert_eq!(prefix.version(), 1); assert!(prefix.flag().is_announce()); assert_eq!(prefix.prefix_len(), 24); assert_eq!(prefix.max_len(), 24); assert_eq!(prefix.prefix(), Ipv4Addr::new(192, 0, 2, 0)); assert_eq!(prefix.asn(), 64496u32.into()); let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7); assert_eq!(eod.version(), 1); assert_eq!(eod.session_id(), 42); assert_eq!(eod.serial_number(), 100); let timing = eod.timing(); assert_eq!(timing.refresh, 600); assert_eq!(timing.retry, 600); assert_eq!(timing.expire, 7200); dump.print_pretty("reset_query_returns_snapshot_and_end_of_data"); } #[tokio::test] async fn serial_query_returns_end_of_data_when_up_to_date() { let cache = RtrCacheBuilder::new() .session_id(42) .serial(100) .timing(Timing { refresh: 600, retry: 600, expire: 7200, }) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7); assert_eq!(eod.version(), 1); assert_eq!(eod.session_id(), 42); assert_eq!(eod.serial_number(), 100); let timing = eod.timing(); assert_eq!(timing.refresh, 600); assert_eq!(timing.retry, 600); assert_eq!(timing.expire, 7200); dump.print_pretty("serial_query_returns_end_of_data_when_up_to_date"); } #[tokio::test] async fn serial_query_returns_cache_reset_when_session_id_mismatch() { let cache = RtrCacheBuilder::new() .session_id(42) .serial(100) .timing(Timing { refresh: 600, retry: 600, expire: 7200, }) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); SerialQuery::new(1, 999, 100).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let reset = CacheReset::read(&mut client).await.unwrap(); dump.push_value(reset.pdu(), dump_cache_reset(reset.version(), reset.pdu())); assert_eq!(reset.pdu(), 8); assert_eq!(reset.version(), 1); dump.print_pretty("serial_query_returns_cache_reset_when_session_id_mismatch"); } #[tokio::test] async fn serial_query_returns_deltas_when_incremental_update_available() { let prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)), prefix_length: 24, }; let origin = RouteOrigin::new(prefix, 24, 64496u32.into()); let delta = Arc::new(Delta::new( 101, vec![Payload::RouteOrigin(origin)], vec![], )); let mut deltas = VecDeque::new(); deltas.push_back(delta); let cache = RtrCacheBuilder::new() .session_id(42) .serial(101) .timing(Timing { refresh: 600, retry: 600, expire: 7200, }) .deltas(deltas) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let response = CacheResponse::read(&mut client).await.unwrap(); dump.push_value(response.pdu(), dump_cache_response(&response)); assert_eq!(response.pdu(), 3); assert_eq!(response.version(), 1); assert_eq!(response.session_id(), 42); let prefix = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(prefix.pdu(), dump_ipv4_prefix(&prefix)); assert_eq!(prefix.pdu(), 4); assert_eq!(prefix.version(), 1); assert!(prefix.flag().is_announce()); assert_eq!(prefix.prefix_len(), 24); assert_eq!(prefix.max_len(), 24); assert_eq!(prefix.prefix(), Ipv4Addr::new(192, 0, 2, 0)); assert_eq!(prefix.asn(), 64496u32.into()); let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7); assert_eq!(eod.version(), 1); assert_eq!(eod.session_id(), 42); assert_eq!(eod.serial_number(), 101); let timing = eod.timing(); assert_eq!(timing.refresh, 600); assert_eq!(timing.retry, 600); assert_eq!(timing.expire, 7200); dump.print_pretty("serial_query_returns_deltas_when_incremental_update_available"); } #[tokio::test] async fn reset_query_returns_payloads_in_rtr_order() { let v4_low_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)), prefix_length: 24, }; let v4_low_origin = RouteOrigin::new(v4_low_prefix, 24, 64496u32.into()); let v4_high_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(198, 51, 100, 0)), prefix_length: 24, }; let v4_high_origin = RouteOrigin::new(v4_high_prefix, 24, 64497u32.into()); let v6_prefix = IPAddressPrefix { address: IPAddress::from_ipv6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0)), prefix_length: 32, }; let v6_origin = RouteOrigin::new(v6_prefix, 48, 64498u32.into()); let snapshot = Snapshot::from_payloads(vec![ Payload::RouteOrigin(v6_origin), Payload::RouteOrigin(v4_low_origin), Payload::RouteOrigin(v4_high_origin), ]); let cache = RtrCacheBuilder::new() .session_id(42) .serial(100) .timing(Timing::new(600, 600, 7200)) .snapshot(snapshot) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); ResetQuery::new(1).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let response = CacheResponse::read(&mut client).await.unwrap(); dump.push_value(response.pdu(), dump_cache_response(&response)); assert_eq!(response.pdu(), 3); assert_eq!(response.version(), 1); assert_eq!(response.session_id(), 42); let first = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(first.pdu(), dump_ipv4_prefix(&first)); assert_eq!(first.pdu(), 4); assert_eq!(first.version(), 1); assert!(first.flag().is_announce()); assert_eq!(first.prefix(), Ipv4Addr::new(198, 51, 100, 0)); assert_eq!(first.prefix_len(), 24); assert_eq!(first.max_len(), 24); assert_eq!(first.asn(), 64497u32.into()); let second = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(second.pdu(), dump_ipv4_prefix(&second)); assert_eq!(second.pdu(), 4); assert_eq!(second.version(), 1); assert!(second.flag().is_announce()); assert_eq!(second.prefix(), Ipv4Addr::new(192, 0, 2, 0)); assert_eq!(second.prefix_len(), 24); assert_eq!(second.max_len(), 24); assert_eq!(second.asn(), 64496u32.into()); assert!(u32::from(first.prefix()) > u32::from(second.prefix())); let third = IPv6Prefix::read(&mut client).await.unwrap(); dump.push_value(third.pdu(), dump_ipv6_prefix(&third)); assert_eq!(third.pdu(), 6); assert_eq!(third.version(), 1); assert!(third.flag().is_announce()); assert_eq!(third.prefix(), Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 0)); assert_eq!(third.prefix_len(), 32); assert_eq!(third.max_len(), 48); assert_eq!(third.asn(), 64498u32.into()); let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7); assert_eq!(eod.version(), 1); assert_eq!(eod.session_id(), 42); assert_eq!(eod.serial_number(), 100); let timing = eod.timing(); assert_eq!(timing.refresh, 600); assert_eq!(timing.retry, 600); assert_eq!(timing.expire, 7200); dump.print_pretty("reset_query_returns_payloads_in_rtr_order"); } #[tokio::test] async fn serial_query_returns_announcements_before_withdrawals() { let announced_low_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(192, 0, 2, 0)), prefix_length: 24, }; let announced_low_origin = RouteOrigin::new(announced_low_prefix, 24, 64496u32.into()); let announced_high_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(198, 51, 100, 0)), prefix_length: 24, }; let announced_high_origin = RouteOrigin::new(announced_high_prefix, 24, 64497u32.into()); let withdrawn_low_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(10, 0, 0, 0)), prefix_length: 24, }; let withdrawn_low_origin = RouteOrigin::new(withdrawn_low_prefix, 24, 64500u32.into()); let withdrawn_high_prefix = IPAddressPrefix { address: IPAddress::from_ipv4(Ipv4Addr::new(203, 0, 113, 0)), prefix_length: 24, }; let withdrawn_high_origin = RouteOrigin::new(withdrawn_high_prefix, 24, 64501u32.into()); let delta = Arc::new(Delta::new( 101, vec![ Payload::RouteOrigin(announced_low_origin), Payload::RouteOrigin(announced_high_origin), ], vec![ Payload::RouteOrigin(withdrawn_high_origin), Payload::RouteOrigin(withdrawn_low_origin), ], )); let mut deltas = VecDeque::new(); deltas.push_back(delta); let cache = RtrCacheBuilder::new() .session_id(42) .serial(101) .timing(Timing { refresh: 600, retry: 600, expire: 7200, }) .deltas(deltas) .build(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let server_cache = shared_cache(cache); let (_notify_tx, notify_rx) = broadcast::channel(16); let (_shutdown_tx, shutdown_rx) = watch::channel(false); tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let session = RtrSession::new(server_cache, stream, notify_rx, shutdown_rx); session.run().await.unwrap(); }); let mut client = tokio::net::TcpStream::connect(addr).await.unwrap(); SerialQuery::new(1, 42, 100).write(&mut client).await.unwrap(); let mut dump = RtrDebugDumper::new(); let response = CacheResponse::read(&mut client).await.unwrap(); dump.push_value(response.pdu(), dump_cache_response(&response)); assert_eq!(response.pdu(), 3); assert_eq!(response.version(), 1); assert_eq!(response.session_id(), 42); let first = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(first.pdu(), dump_ipv4_prefix(&first)); assert_eq!(first.pdu(), 4); assert_eq!(first.version(), 1); assert!(first.flag().is_announce()); assert_eq!(first.prefix(), Ipv4Addr::new(198, 51, 100, 0)); assert_eq!(first.prefix_len(), 24); assert_eq!(first.max_len(), 24); assert_eq!(first.asn(), 64497u32.into()); let second = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(second.pdu(), dump_ipv4_prefix(&second)); assert_eq!(second.pdu(), 4); assert_eq!(second.version(), 1); assert!(second.flag().is_announce()); assert_eq!(second.prefix(), Ipv4Addr::new(192, 0, 2, 0)); assert_eq!(second.prefix_len(), 24); assert_eq!(second.max_len(), 24); assert_eq!(second.asn(), 64496u32.into()); assert!(u32::from(first.prefix()) > u32::from(second.prefix())); let third = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(third.pdu(), dump_ipv4_prefix(&third)); assert_eq!(third.pdu(), 4); assert_eq!(third.version(), 1); assert!(!third.flag().is_announce()); assert_eq!(third.prefix(), Ipv4Addr::new(10, 0, 0, 0)); assert_eq!(third.prefix_len(), 24); assert_eq!(third.max_len(), 24); assert_eq!(third.asn(), 64500u32.into()); let fourth = IPv4Prefix::read(&mut client).await.unwrap(); dump.push_value(fourth.pdu(), dump_ipv4_prefix(&fourth)); assert_eq!(fourth.pdu(), 4); assert_eq!(fourth.version(), 1); assert!(!fourth.flag().is_announce()); assert_eq!(fourth.prefix(), Ipv4Addr::new(203, 0, 113, 0)); assert_eq!(fourth.prefix_len(), 24); assert_eq!(fourth.max_len(), 24); assert_eq!(fourth.asn(), 64501u32.into()); assert!(u32::from(third.prefix()) < u32::from(fourth.prefix())); assert!(first.flag().is_announce()); assert!(second.flag().is_announce()); assert!(!third.flag().is_announce()); assert!(!fourth.flag().is_announce()); let eod = EndOfDataV1::read(&mut client).await.unwrap(); dump.push_value(eod.pdu(), dump_eod_v1(&eod)); assert_eq!(eod.pdu(), 7); assert_eq!(eod.version(), 1); assert_eq!(eod.session_id(), 42); assert_eq!(eod.serial_number(), 101); let timing = eod.timing(); assert_eq!(timing.refresh, 600); assert_eq!(timing.retry, 600); assert_eq!(timing.expire, 7200); dump.print_pretty("serial_query_returns_announcements_before_withdrawals"); }