181 lines
5.8 KiB
Rust

use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use crate::ccr::{decode_ccr_compare_views, decode_content_info};
use crate::cir::decode_cir;
use super::io::{object_hash_key, parse_rfc3339, read_file, resolve_path};
use super::model::{SequenceItemRaw, SequenceMeta, SequenceSample, Side};
pub(super) fn load_sequence_meta(path: &Path, side: Side) -> Result<Vec<SequenceMeta>, String> {
let base_dir = path.parent().unwrap_or_else(|| Path::new("."));
let text = std::fs::read_to_string(path)
.map_err(|e| format!("read sequence failed: {}: {e}", path.display()))?;
let mut samples = Vec::new();
let mut seen_seq = BTreeSet::new();
for (line_index, line) in text.lines().enumerate() {
let line = line.trim();
if line.is_empty() {
continue;
}
let raw: SequenceItemRaw = serde_json::from_str(line).map_err(|e| {
format!(
"parse sequence JSONL failed: {}:{}: {e}",
path.display(),
line_index + 1
)
})?;
validate_raw(path, line_index, &raw, side, &mut seen_seq)?;
let cir_path = resolve_path(base_dir, &raw.cir_path);
let ccr_path = resolve_path(base_dir, &raw.ccr_path);
let cir = decode_cir(&read_file(&cir_path)?).map_err(|e| {
format!(
"decode CIR metadata failed for sample {} ({}): {e}",
raw.run_id,
cir_path.display()
)
})?;
let _sequence_validation_time = raw
.validation_time
.as_deref()
.map(parse_rfc3339)
.transpose()?;
let validation_time = cir.validation_time;
samples.push(SequenceMeta {
cir_object_count: raw
.cir_object_count
.or(Some(cir.validated_object_count() as u64)),
cir_reject_count: raw
.cir_reject_count
.or(Some(cir.rejected_object_count() as u64)),
cir_trust_anchor_count: raw
.cir_trust_anchor_count
.or(Some(cir.trust_anchors.len() as u64)),
raw,
validation_time,
ccr_path,
cir_path,
});
}
samples.sort_by(|left, right| {
left.validation_time
.cmp(&right.validation_time)
.then_with(|| left.raw.seq.cmp(&right.raw.seq))
.then_with(|| left.raw.run_id.cmp(&right.raw.run_id))
});
Ok(samples)
}
pub(super) fn load_sample_cir_from_meta(meta: &SequenceMeta) -> Result<SequenceSample, String> {
load_sample_parts(meta, true, false)
}
pub(super) fn load_sample_ccr_from_meta(meta: &SequenceMeta) -> Result<SequenceSample, String> {
load_sample_parts(meta, false, true)
}
fn load_sample_parts(
meta: &SequenceMeta,
include_cir: bool,
include_ccr: bool,
) -> Result<SequenceSample, String> {
let mut objects = BTreeMap::new();
let mut object_hashes = BTreeSet::new();
let mut rejects = BTreeSet::new();
if include_cir {
let cir = decode_cir(&read_file(&meta.cir_path)?).map_err(|e| {
format!(
"decode CIR failed for sample {} ({}): {e}",
meta.raw.run_id,
meta.cir_path.display()
)
})?;
objects = cir
.validated_objects()
.map(|item| (item.rsync_uri.clone(), hex::encode(&item.sha256)))
.collect::<BTreeMap<_, _>>();
object_hashes = objects
.iter()
.map(|(uri, hash)| object_hash_key(uri, hash))
.collect::<BTreeSet<_>>();
rejects = cir
.rejected_objects()
.map(|item| item.object_uri.clone())
.collect::<BTreeSet<_>>();
}
let mut vrps = BTreeSet::new();
let mut vaps = BTreeSet::new();
if include_ccr {
let ccr = decode_content_info(&read_file(&meta.ccr_path)?).map_err(|e| {
format!(
"decode CCR failed for sample {} ({}): {e}",
meta.raw.run_id,
meta.ccr_path.display()
)
})?;
let (decoded_vrps, decoded_vaps) = decode_ccr_compare_views(&ccr).map_err(|e| {
format!(
"decode CCR compare views failed for sample {} ({}): {e}",
meta.raw.run_id,
meta.ccr_path.display()
)
})?;
vrps = decoded_vrps
.into_iter()
.map(|row| format!("{}|{}|{}", row.asn, row.ip_prefix, row.max_length))
.collect::<BTreeSet<_>>();
vaps = decoded_vaps
.into_iter()
.map(|row| format!("{}|{}", row.customer_asn, row.providers))
.collect::<BTreeSet<_>>();
}
Ok(SequenceSample {
raw: meta.raw.clone(),
validation_time: meta.validation_time,
objects,
object_hashes,
rejects,
vrps,
vaps,
})
}
fn validate_raw(
path: &Path,
line_index: usize,
raw: &SequenceItemRaw,
side: Side,
seen_seq: &mut BTreeSet<u32>,
) -> Result<(), String> {
if raw.schema_version.unwrap_or(1) != 1 {
return Err(format!(
"unsupported sequence item schemaVersion in {}:{}",
path.display(),
line_index + 1
));
}
if !seen_seq.insert(raw.seq) {
return Err(format!("duplicate seq {} in {}", raw.seq, path.display()));
}
if let Some(status) = &raw.status
&& status != "success"
{
return Err(format!(
"sequence sample {} has non-success status: {status}",
raw.run_id
));
}
if raw
.side
.as_deref()
.is_some_and(|item| item != side.as_str())
{
return Err(format!(
"sequence side field does not match expected side in {}:{}",
path.display(),
line_index + 1
));
}
Ok(())
}