rpki/src/parallel/types.rs

506 lines
14 KiB
Rust

use std::path::{Path, PathBuf};
use crate::policy::SyncPreference;
use crate::report::Warning;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TalSource {
Url(String),
DerBytes {
tal_url: String,
tal_bytes: Vec<u8>,
ta_der: Vec<u8>,
},
FilePath(PathBuf),
FilePathWithTa {
tal_path: PathBuf,
ta_path: PathBuf,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TalInputSpec {
pub tal_id: String,
pub rir_id: String,
pub source: TalSource,
}
impl TalInputSpec {
pub fn from_url(url: impl Into<String>) -> Self {
let url = url.into();
let tal_id = derive_tal_id_from_url_like(&url);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::Url(url),
}
}
pub fn from_file_path(path: impl Into<PathBuf>) -> Self {
let path = path.into();
let tal_id = derive_tal_id_from_path(&path);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::FilePath(path),
}
}
pub fn from_file_path_with_ta(
tal_path: impl Into<PathBuf>,
ta_path: impl Into<PathBuf>,
) -> Self {
let tal_path = tal_path.into();
let ta_path = ta_path.into();
let tal_id = derive_tal_id_from_path(&tal_path);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::FilePathWithTa { tal_path, ta_path },
}
}
pub fn from_ta_der(tal_url: impl Into<String>, tal_bytes: Vec<u8>, ta_der: Vec<u8>) -> Self {
let tal_url = tal_url.into();
let tal_id = derive_tal_id_from_url_like(&tal_url);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::DerBytes {
tal_url,
tal_bytes,
ta_der,
},
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RepoIdentity {
pub notification_uri: Option<String>,
pub rsync_base_uri: String,
}
impl RepoIdentity {
pub fn new(notification_uri: Option<String>, rsync_base_uri: impl Into<String>) -> Self {
Self {
notification_uri,
rsync_base_uri: rsync_base_uri.into(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum RepoDedupKey {
RrdpNotify { notification_uri: String },
RsyncScope { rsync_scope_uri: String },
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoTransportMode {
Rrdp,
Rsync,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportTask {
pub dedup_key: RepoDedupKey,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
pub rir_id: String,
pub validation_time: time::OffsetDateTime,
pub priority: u8,
pub requesters: Vec<RepoRequester>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RepoTransportResultKind {
Success {
source: String,
warnings: Vec<Warning>,
},
Failed {
detail: String,
warnings: Vec<Warning>,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportResultEnvelope {
pub dedup_key: RepoDedupKey,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
pub rir_id: String,
pub timing_ms: u64,
pub result: RepoTransportResultKind,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoRuntimeState {
Init,
WaitingRrdp,
RrdpOk,
RrdpFailedPendingRsync,
WaitingRsync,
RsyncOk,
FailedTerminal,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RepoKey {
pub rsync_base_uri: String,
pub notification_uri: Option<String>,
}
impl RepoKey {
pub fn new(rsync_base_uri: impl Into<String>, notification_uri: Option<String>) -> Self {
Self {
rsync_base_uri: rsync_base_uri.into(),
notification_uri,
}
}
pub fn as_identity(&self) -> RepoIdentity {
RepoIdentity {
notification_uri: self.notification_uri.clone(),
rsync_base_uri: self.rsync_base_uri.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoRequester {
pub tal_id: String,
pub rir_id: String,
pub parent_node_id: Option<u64>,
pub ca_instance_handle_id: String,
pub publication_point_rsync_uri: String,
pub manifest_rsync_uri: String,
}
impl RepoRequester {
pub fn with_tal_rir(
tal_id: impl Into<String>,
rir_id: impl Into<String>,
manifest_rsync_uri: impl Into<String>,
publication_point_rsync_uri: impl Into<String>,
ca_instance_handle_id: impl Into<String>,
) -> Self {
Self {
tal_id: tal_id.into(),
rir_id: rir_id.into(),
parent_node_id: None,
ca_instance_handle_id: ca_instance_handle_id.into(),
publication_point_rsync_uri: publication_point_rsync_uri.into(),
manifest_rsync_uri: manifest_rsync_uri.into(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncTask {
pub repo_key: RepoKey,
pub validation_time: time::OffsetDateTime,
pub sync_preference: SyncPreference,
pub tal_id: String,
pub rir_id: String,
pub priority: u8,
pub requesters: Vec<RepoRequester>,
}
impl RepoSyncTask {
pub fn as_transport_task(
&self,
dedup_key: RepoDedupKey,
mode: RepoTransportMode,
) -> RepoTransportTask {
RepoTransportTask {
dedup_key,
repo_identity: self.repo_key.as_identity(),
mode,
tal_id: self.tal_id.clone(),
rir_id: self.rir_id.clone(),
validation_time: self.validation_time,
priority: self.priority,
requesters: self.requesters.clone(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoTaskState {
Pending,
Running,
Succeeded,
Failed,
Reused,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncResultRef {
pub repo_key: RepoKey,
pub source: String,
}
impl RepoSyncResultRef {
pub fn as_identity(&self) -> RepoIdentity {
self.repo_key.as_identity()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InFlightRepoEntry {
pub state: RepoTaskState,
pub task_ref: Option<RepoSyncTask>,
pub waiting_requesters: Vec<RepoRequester>,
pub result_ref: Option<RepoSyncResultRef>,
pub last_result: Option<RepoSyncResultEnvelope>,
pub last_error: Option<String>,
pub started_at: Option<time::OffsetDateTime>,
pub finished_at: Option<time::OffsetDateTime>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncResultEnvelope {
pub repo_key: RepoKey,
pub tal_id: String,
pub rir_id: String,
pub result: RepoSyncResultKind,
pub phase: Option<String>,
pub timing_ms: u64,
pub warnings: Vec<Warning>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RepoSyncResultKind {
Success(RepoSyncResultRef),
Failed { detail: String },
Reused(RepoSyncResultRef),
}
fn derive_tal_id_from_url_like(s: &str) -> String {
if let Ok(url) = url::Url::parse(s) {
if let Some(last) = url
.path_segments()
.and_then(|segments| segments.filter(|seg| !seg.is_empty()).next_back())
{
let stem = last.rsplit_once('.').map(|(stem, _)| stem).unwrap_or(last);
let trimmed = stem.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
if let Some(host) = url.host_str() {
return host.to_string();
}
}
"unknown-tal".to_string()
}
fn derive_tal_id_from_path(path: &Path) -> String {
path.file_stem()
.and_then(|stem| stem.to_str())
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.unwrap_or("unknown-tal")
.to_string()
}
#[cfg(test)]
mod tests {
use std::path::Path;
use crate::policy::SyncPreference;
use crate::report::Warning;
use super::{
RepoDedupKey, RepoIdentity, RepoKey, RepoRequester, RepoRuntimeState, RepoSyncTask,
RepoTaskState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind,
TalInputSpec, TalSource, derive_tal_id_from_path, derive_tal_id_from_url_like,
};
#[test]
fn tal_input_spec_from_url_derives_tal_and_rir_ids() {
let spec = TalInputSpec::from_url("https://example.test/tals/apnic.tal");
assert_eq!(spec.tal_id, "apnic");
assert_eq!(spec.rir_id, "apnic");
assert_eq!(
spec.source,
TalSource::Url("https://example.test/tals/apnic.tal".to_string())
);
}
#[test]
fn tal_input_spec_from_file_path_derives_file_stem() {
let spec = TalInputSpec::from_file_path("local/arin.tal");
assert_eq!(spec.tal_id, "arin");
assert_eq!(spec.rir_id, "arin");
}
#[test]
fn tal_input_spec_from_ta_der_preserves_payload() {
let spec = TalInputSpec::from_ta_der(
"https://example.test/ripe.tal",
vec![4, 5, 6],
vec![1, 2, 3],
);
assert_eq!(spec.tal_id, "ripe");
assert_eq!(spec.rir_id, "ripe");
assert_eq!(
spec.source,
TalSource::DerBytes {
tal_url: "https://example.test/ripe.tal".to_string(),
tal_bytes: vec![4, 5, 6],
ta_der: vec![1, 2, 3],
}
);
}
#[test]
fn repo_key_equality_uses_rsync_base_and_notification() {
let a = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let b = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let c = RepoKey::new("rsync://example.test/repo/", None);
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn repo_task_state_variants_are_distinct() {
assert_ne!(RepoTaskState::Pending, RepoTaskState::Running);
assert_ne!(RepoTaskState::Succeeded, RepoTaskState::Failed);
assert_ne!(RepoTaskState::Failed, RepoTaskState::Reused);
}
#[test]
fn repo_identity_preserves_raw_inputs() {
let ident = RepoIdentity::new(
Some("https://example.test/notify.xml".to_string()),
"rsync://example.test/repo/",
);
assert_eq!(
ident.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
assert_eq!(ident.rsync_base_uri, "rsync://example.test/repo/");
}
#[test]
fn repo_key_can_be_viewed_as_repo_identity() {
let key = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let ident = key.as_identity();
assert_eq!(ident.rsync_base_uri, "rsync://example.test/repo/");
assert_eq!(
ident.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
}
#[test]
fn repo_sync_task_maps_to_rrdp_transport_task() {
let task = RepoSyncTask {
repo_key: RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
),
validation_time: time::OffsetDateTime::UNIX_EPOCH,
sync_preference: SyncPreference::RrdpThenRsync,
tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(),
priority: 1,
requesters: vec![RepoRequester::with_tal_rir(
"apnic",
"apnic",
"rsync://example.test/repo/root.mft",
"rsync://example.test/repo/",
"node:1",
)],
};
let transport = task.as_transport_task(
RepoDedupKey::RrdpNotify {
notification_uri: "https://example.test/notify.xml".to_string(),
},
RepoTransportMode::Rrdp,
);
assert_eq!(transport.mode, RepoTransportMode::Rrdp);
assert_eq!(transport.tal_id, "apnic");
assert_eq!(transport.rir_id, "apnic");
assert_eq!(transport.requesters.len(), 1);
assert_eq!(
transport.repo_identity.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
}
#[test]
fn repo_transport_result_envelope_supports_success_and_failure_shapes() {
let identity = RepoIdentity::new(None, "rsync://example.test/repo/");
let ok = RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
repo_identity: identity.clone(),
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
timing_ms: 12,
result: RepoTransportResultKind::Success {
source: "rsync".to_string(),
warnings: vec![Warning::new("ok")],
},
};
let fail = RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
repo_identity: identity,
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
timing_ms: 30,
result: RepoTransportResultKind::Failed {
detail: "timeout".to_string(),
warnings: vec![Warning::new("timeout")],
},
};
assert!(matches!(ok.result, RepoTransportResultKind::Success { .. }));
assert!(matches!(
fail.result,
RepoTransportResultKind::Failed { .. }
));
}
#[test]
fn repo_runtime_state_variants_are_distinct() {
assert_ne!(RepoRuntimeState::Init, RepoRuntimeState::WaitingRrdp);
assert_ne!(RepoRuntimeState::RrdpOk, RepoRuntimeState::RsyncOk);
assert_ne!(
RepoRuntimeState::RrdpFailedPendingRsync,
RepoRuntimeState::FailedTerminal
);
}
#[test]
fn derive_tal_id_helpers_fall_back_safely() {
assert_eq!(
derive_tal_id_from_url_like("https://example.test/path/afrinic.tal"),
"afrinic"
);
assert_eq!(
derive_tal_id_from_path(Path::new("foo/lacnic.tal")),
"lacnic"
);
}
}