rpki/src/source/pipeline.rs
2026-04-01 16:24:01 +08:00

139 lines
4.3 KiB
Rust

use anyhow::{Result, anyhow};
use std::path::PathBuf;
use tracing::{info, warn};
use crate::rtr::payload::Payload;
use crate::slurm::file::SlurmFile;
use crate::source::ccr::{
find_latest_ccr_file, load_ccr_payloads_from_file_with_options, load_ccr_snapshot_from_file,
};
#[derive(Debug, Clone)]
pub struct PayloadLoadConfig {
pub ccr_dir: String,
pub slurm_dir: Option<String>,
pub strict_ccr_validation: bool,
}
pub fn load_payloads_from_latest_sources(config: &PayloadLoadConfig) -> Result<Vec<Payload>> {
let payloads = load_payloads_from_latest_ccr(&config.ccr_dir, config.strict_ccr_validation)?;
match config.slurm_dir.as_deref() {
Some(dir) => apply_slurm_to_payloads_from_dir(dir, payloads),
None => Ok(payloads),
}
}
fn load_payloads_from_latest_ccr(
ccr_dir: &str,
strict_ccr_validation: bool,
) -> Result<Vec<Payload>> {
let latest = find_latest_ccr_file(ccr_dir)?;
let snapshot = load_ccr_snapshot_from_file(&latest)?;
let vrp_count = snapshot.vrps.len();
let vap_count = snapshot.vaps.len();
let produced_at = snapshot.produced_at.clone();
let conversion = load_ccr_payloads_from_file_with_options(&latest, strict_ccr_validation)?;
let payloads = conversion.payloads;
if !conversion.invalid_vrps.is_empty() {
warn!(
"CCR load skipped invalid VRPs: file={}, skipped={}, samples={:?}",
latest.display(),
conversion.invalid_vrps.len(),
sample_messages(&conversion.invalid_vrps)
);
}
if !conversion.invalid_vaps.is_empty() {
warn!(
"CCR load skipped invalid VAPs/ASPAs: file={}, skipped={}, samples={:?}",
latest.display(),
conversion.invalid_vaps.len(),
sample_messages(&conversion.invalid_vaps)
);
}
info!(
"loaded latest CCR snapshot: file={}, produced_at={:?}, vrp_count={}, vap_count={}, payload_count={}, strict_ccr_validation={}",
latest.display(),
produced_at,
vrp_count,
vap_count,
payloads.len(),
strict_ccr_validation
);
Ok(payloads)
}
fn apply_slurm_to_payloads_from_dir(
slurm_dir: &str,
payloads: Vec<Payload>,
) -> Result<Vec<Payload>> {
let files = read_slurm_files(slurm_dir)?;
let file_count = files.len();
let file_names = files
.iter()
.map(|(name, _)| name.clone())
.collect::<Vec<_>>();
let slurm = SlurmFile::merge_named(files)
.map_err(|err| anyhow!("failed to merge SLURM files from '{}': {}", slurm_dir, err))?;
let input_count = payloads.len();
let filtered = slurm.apply(&payloads);
let output_count = filtered.len();
info!(
"applied SLURM policy set: slurm_dir={}, file_count={}, files={:?}, merged_slurm_version={}, input_payload_count={}, output_payload_count={}",
slurm_dir,
file_count,
file_names,
slurm.version().as_u32(),
input_count,
output_count
);
Ok(filtered)
}
fn read_slurm_files(slurm_dir: &str) -> Result<Vec<(String, SlurmFile)>> {
let mut paths = std::fs::read_dir(slurm_dir)
.map_err(|err| anyhow!("failed to read SLURM directory '{}': {}", slurm_dir, err))?
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.filter(|path| path.is_file())
.filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("slurm"))
.collect::<Vec<PathBuf>>();
paths.sort_by_key(|path| {
path.file_name()
.and_then(|name| name.to_str())
.map(|name| name.to_ascii_lowercase())
.unwrap_or_default()
});
if paths.is_empty() {
return Err(anyhow!(
"SLURM directory '{}' does not contain .slurm files",
slurm_dir
));
}
paths
.into_iter()
.map(|path| {
let name = path.to_string_lossy().to_string();
let file = std::fs::File::open(&path)
.map_err(|err| anyhow!("failed to open SLURM file '{}': {}", name, err))?;
let slurm = SlurmFile::from_reader(file)
.map_err(|err| anyhow!("failed to parse SLURM file '{}': {}", name, err))?;
Ok((name, slurm))
})
.collect()
}
fn sample_messages(messages: &[String]) -> Vec<&str> {
messages.iter().take(3).map(String::as_str).collect()
}