rpki/src/query_db.rs

1993 lines
66 KiB
Rust

use std::collections::{BTreeMap, BTreeSet};
use std::fs;
use std::path::{Path, PathBuf};
use rocksdb::{ColumnFamilyDescriptor, DB, IteratorMode, Options, WriteBatch};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
use crate::query::artifact_manifest::build_artifact_manifest;
use crate::query::report_stream::{self, ObjectLookup, ObjectScope, ReportSummary};
use crate::blob_store::ExternalRepoBytesDb;
pub const QUERY_DB_SCHEMA_VERSION: u32 = 1;
pub const CF_META: &str = "meta";
pub const CF_RUNS: &str = "runs";
pub const CF_RUNS_BY_SEQ: &str = "runs_by_seq";
pub const CF_REPOS: &str = "repos";
pub const CF_PUBLICATION_POINTS: &str = "publication_points";
pub const CF_OBJECT_INSTANCES: &str = "object_instances";
pub const CF_OBJECTS_BY_URI: &str = "objects_by_uri";
pub const CF_OBJECTS_BY_HASH: &str = "objects_by_hash";
pub const CF_VALIDATION_EXPLAIN_CACHE: &str = "validation_explain_cache";
pub const CF_EXPORT_JOBS: &str = "export_jobs";
pub const CF_STATS: &str = "stats";
pub const CF_REASON_INDEX: &str = "reason_index";
pub const QUERY_DB_COLUMN_FAMILIES: &[&str] = &[
CF_META,
CF_RUNS,
CF_RUNS_BY_SEQ,
CF_REPOS,
CF_PUBLICATION_POINTS,
CF_OBJECT_INSTANCES,
CF_OBJECTS_BY_URI,
CF_OBJECTS_BY_HASH,
CF_VALIDATION_EXPLAIN_CACHE,
CF_EXPORT_JOBS,
CF_STATS,
CF_REASON_INDEX,
];
const KEY_SCHEMA_VERSION: &[u8] = b"schema_version";
const KEY_LATEST_READY_RUN: &[u8] = b"latest_ready_run";
#[derive(Debug, thiserror::Error)]
pub enum QueryDbError {
#[error("rocksdb error: {0}")]
RocksDb(String),
#[error("io error: {0}")]
Io(String),
#[error("json error: {0}")]
Json(String),
#[error("missing column family: {0}")]
MissingColumnFamily(&'static str),
#[error("invalid run artifact: {0}")]
InvalidArtifact(String),
#[error("CIR decode error: {0}")]
CirDecode(String),
}
pub type QueryDbResult<T> = Result<T, QueryDbError>;
impl From<rocksdb::Error> for QueryDbError {
fn from(value: rocksdb::Error) -> Self {
Self::RocksDb(value.to_string())
}
}
impl From<std::io::Error> for QueryDbError {
fn from(value: std::io::Error) -> Self {
Self::Io(value.to_string())
}
}
impl From<serde_json::Error> for QueryDbError {
fn from(value: serde_json::Error) -> Self {
Self::Json(value.to_string())
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueryIndexSummary {
pub runs_indexed: u64,
pub runs_deleted: u64,
pub retained_runs: u64,
pub repos_indexed: u64,
pub publication_points_indexed: u64,
pub object_instances_indexed: u64,
pub object_projections_indexed: u64,
pub stats_indexed: u64,
pub latest_ready_run: Option<String>,
pub errors: Vec<String>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueryPage<T> {
pub data: Vec<T>,
pub next_cursor: Option<String>,
pub limit: usize,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RunCounts {
pub publication_points: u64,
pub objects: u64,
pub fresh_objects: u64,
pub cached_objects: u64,
pub rejected_objects: u64,
pub fresh_rejected_objects: u64,
pub cached_rejected_objects: u64,
pub trust_anchors: u64,
pub vrps: u64,
pub aspas: u64,
pub warnings: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RunRecord {
pub schema_version: u32,
pub run_id: String,
pub run_seq: Option<u64>,
pub run_dir: String,
pub validation_time: Option<String>,
pub sync_mode: Option<String>,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub wall_ms: Option<u64>,
pub artifact_paths: BTreeMap<String, String>,
pub counts: RunCounts,
pub index_status: String,
pub index_error: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RepositoryRecord {
pub schema_version: u32,
pub run_id: String,
pub repo_id: String,
pub uri: String,
pub host: String,
pub transport: String,
pub publication_points: u64,
pub objects: u64,
pub rejected_objects: u64,
pub download_bytes: Option<u64>,
pub sync_duration_ms_total: u64,
pub phases: BTreeMap<String, u64>,
pub terminal_states: BTreeMap<String, u64>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicationPointRecord {
pub schema_version: u32,
pub run_id: String,
pub pp_id: String,
pub repo_id: String,
pub node_id: Option<u64>,
pub parent_node_id: Option<u64>,
pub rsync_base_uri: Option<String>,
pub manifest_rsync_uri: Option<String>,
pub publication_point_rsync_uri: Option<String>,
pub rrdp_notification_uri: Option<String>,
pub source: Option<String>,
pub repo_sync_source: Option<String>,
pub repo_sync_phase: Option<String>,
pub repo_sync_duration_ms: Option<u64>,
pub repo_sync_error: Option<String>,
pub repo_terminal_state: Option<String>,
pub this_update: Option<String>,
pub next_update: Option<String>,
pub verified_at: Option<String>,
pub objects: u64,
pub rejected_objects: u64,
pub warnings: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ObjectInstanceRecord {
pub schema_version: u32,
pub run_id: String,
pub object_instance_id: String,
pub uri: String,
pub uri_hash: String,
pub sha256: String,
pub object_type: String,
pub result: String,
pub detail_summary: Option<String>,
pub repo_id: String,
pub pp_id: String,
pub source_section: String,
pub rejected: bool,
pub reject_reason: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ObjectUriIndexRecord {
pub run_id: String,
pub uri: String,
pub sha256: String,
pub object_instance_id: String,
pub repo_id: String,
pub pp_id: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StatsRecord {
pub schema_version: u32,
pub run_id: String,
pub scope: String,
pub name: String,
pub value: Value,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExportJobRecord {
pub schema_version: u32,
pub job_id: String,
pub run_id: String,
pub scope: String,
pub repo_id: Option<String>,
pub pp_id: Option<String>,
pub status: String,
pub created_at: String,
pub finished_at: Option<String>,
pub output_path: Option<String>,
pub object_count: u64,
pub bytes_written: u64,
pub error: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainEdgeRecord {
pub relation: String,
pub from_uri: String,
pub to_uri: String,
pub to_object_instance_id: Option<String>,
pub to_sha256: Option<String>,
pub status: String,
pub evidence: Value,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ValidationExplainRecord {
pub schema_version: u32,
pub explain_version: u32,
pub run_id: String,
pub object_instance_id: String,
pub uri: String,
pub sha256: String,
pub object_type: String,
pub final_status: String,
pub audit_result: String,
pub detail_summary: Option<String>,
pub authoritative: bool,
pub explain_mode: String,
pub generated_at: String,
pub parsevalidate: Value,
pub chainvalidate: Value,
pub chain_edges: Vec<ChainEdgeRecord>,
}
pub struct QueryDb {
db: DB,
secondary: bool,
}
impl QueryDb {
pub fn open(path: impl AsRef<Path>) -> QueryDbResult<Self> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
let descriptors = QUERY_DB_COLUMN_FAMILIES
.iter()
.map(|name| ColumnFamilyDescriptor::new(*name, cf_options()))
.collect::<Vec<_>>();
let db = DB::open_cf_descriptors(&opts, path, descriptors)?;
let store = Self {
db,
secondary: false,
};
store.put_json_cf(CF_META, KEY_SCHEMA_VERSION, &QUERY_DB_SCHEMA_VERSION)?;
Ok(store)
}
pub fn open_secondary(
primary_path: impl AsRef<Path>,
secondary_path: impl AsRef<Path>,
) -> QueryDbResult<Self> {
let mut opts = Options::default();
opts.create_if_missing(false);
opts.create_missing_column_families(false);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
let descriptors = QUERY_DB_COLUMN_FAMILIES
.iter()
.map(|name| ColumnFamilyDescriptor::new(*name, cf_options()))
.collect::<Vec<_>>();
let db = DB::open_cf_descriptors_as_secondary(
&opts,
primary_path.as_ref(),
secondary_path.as_ref(),
descriptors,
)?;
Ok(Self {
db,
secondary: true,
})
}
pub fn try_catch_up_with_primary(&self) -> QueryDbResult<()> {
if self.secondary {
self.db.try_catch_up_with_primary()?;
}
Ok(())
}
pub fn is_secondary(&self) -> bool {
self.secondary
}
pub fn latest_ready_run(&self) -> QueryDbResult<Option<String>> {
let Some(bytes) = self.get_cf(CF_META, KEY_LATEST_READY_RUN)? else {
return Ok(None);
};
serde_json::from_slice(&bytes).map_err(QueryDbError::from)
}
pub fn get_run(&self, run_id: &str) -> QueryDbResult<Option<RunRecord>> {
self.get_json_cf(CF_RUNS, run_key(run_id).as_bytes())
}
pub fn resolve_run_id(&self, run_id: &str) -> QueryDbResult<Option<String>> {
if run_id == "latest" || run_id == "latest_run" {
self.latest_ready_run()
} else {
Ok(Some(run_id.to_string()))
}
}
pub fn list_runs(
&self,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<RunRecord>> {
self.list_json_by_prefix(CF_RUNS, "run/", limit, cursor)
}
pub fn list_repos(
&self,
run_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<RepositoryRecord>> {
self.list_json_by_prefix(CF_REPOS, &format!("repo/{run_id}/"), limit, cursor)
}
pub fn get_repo(&self, run_id: &str, repo_id: &str) -> QueryDbResult<Option<RepositoryRecord>> {
self.get_json_cf(CF_REPOS, repo_key(run_id, repo_id).as_bytes())
}
pub fn list_publication_points(
&self,
run_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<PublicationPointRecord>> {
self.list_json_by_prefix(
CF_PUBLICATION_POINTS,
&format!("pp/{run_id}/"),
limit,
cursor,
)
}
pub fn list_publication_points_for_repo(
&self,
run_id: &str,
repo_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<PublicationPointRecord>> {
self.list_json_by_prefix_filtered(
CF_PUBLICATION_POINTS,
&format!("pp/{run_id}/"),
limit,
cursor,
|item: &PublicationPointRecord| item.repo_id == repo_id,
)
}
pub fn get_publication_point(
&self,
run_id: &str,
pp_id: &str,
) -> QueryDbResult<Option<PublicationPointRecord>> {
self.get_json_cf(CF_PUBLICATION_POINTS, pp_key(run_id, pp_id).as_bytes())
}
pub fn list_objects(
&self,
run_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<ObjectInstanceRecord>> {
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(QueryPage {
data: Vec::new(),
next_cursor: None,
limit: limit.clamp(1, 1000),
});
};
report_stream::list_report_objects(&report_path, run_id, ObjectScope::All, limit, cursor)
}
pub fn list_objects_for_pp(
&self,
run_id: &str,
pp_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<ObjectInstanceRecord>> {
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(QueryPage {
data: Vec::new(),
next_cursor: None,
limit: limit.clamp(1, 1000),
});
};
report_stream::list_report_objects(
&report_path,
run_id,
ObjectScope::PublicationPoint(pp_id.to_string()),
limit,
cursor,
)
}
pub fn list_objects_for_repo(
&self,
run_id: &str,
repo_id: &str,
limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<ObjectInstanceRecord>> {
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(QueryPage {
data: Vec::new(),
next_cursor: None,
limit: limit.clamp(1, 1000),
});
};
report_stream::list_report_objects(
&report_path,
run_id,
ObjectScope::Repo(repo_id.to_string()),
limit,
cursor,
)
}
pub fn get_object_by_instance_id(
&self,
run_id: &str,
object_instance_id: &str,
) -> QueryDbResult<Option<ObjectInstanceRecord>> {
if let Some(cached) = self.get_cached_object_by_instance_id(run_id, object_instance_id)? {
return Ok(Some(cached));
}
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(None);
};
let found = report_stream::lookup_report_object(
&report_path,
run_id,
ObjectScope::All,
ObjectLookup::InstanceId(object_instance_id.to_string()),
)?;
if let Some(found) = found {
self.put_lazy_object(&found.object)?;
return Ok(Some(found.object));
}
Ok(None)
}
pub fn get_object_by_uri(
&self,
run_id: &str,
uri: &str,
) -> QueryDbResult<Option<ObjectUriIndexRecord>> {
if let Some(cached) =
self.get_json_cf(CF_OBJECTS_BY_URI, object_uri_key(run_id, uri).as_bytes())?
{
return Ok(Some(cached));
}
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(None);
};
let found = report_stream::lookup_report_object(
&report_path,
run_id,
ObjectScope::All,
ObjectLookup::Uri(uri.to_string()),
)?;
if let Some(found) = found {
let record = object_uri_index_from_object(&found.object);
self.put_lazy_object(&found.object)?;
return Ok(Some(record));
}
Ok(None)
}
pub fn get_object_by_sha256(
&self,
run_id: &str,
sha256: &str,
) -> QueryDbResult<Option<ObjectInstanceRecord>> {
let Some(report_path) = self.report_path_for_run(run_id)? else {
return Ok(None);
};
let found = report_stream::lookup_report_object(
&report_path,
run_id,
ObjectScope::All,
ObjectLookup::Sha256(sha256.to_string()),
)?;
if let Some(found) = found {
self.put_lazy_object(&found.object)?;
return Ok(Some(found.object));
}
Ok(None)
}
fn get_cached_object_by_instance_id(
&self,
run_id: &str,
object_instance_id: &str,
) -> QueryDbResult<Option<ObjectInstanceRecord>> {
let mut cursor = None;
loop {
let page: QueryPage<ObjectInstanceRecord> = self.list_json_by_prefix(
CF_OBJECT_INSTANCES,
&format!("objinst/{run_id}/"),
1000,
cursor.as_deref(),
)?;
if let Some(found) = page
.data
.into_iter()
.find(|item| item.object_instance_id == object_instance_id)
{
return Ok(Some(found));
}
let Some(next_cursor) = page.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
pub fn put_lazy_object(&self, object: &ObjectInstanceRecord) -> QueryDbResult<()> {
if self.secondary {
return Ok(());
}
let mut batch = WriteBatch::default();
put_json_batch(
&mut batch,
self,
CF_OBJECT_INSTANCES,
object_instance_key(
&object.run_id,
&object.uri_hash,
&object.sha256,
&object.object_instance_id,
)
.as_bytes(),
object,
)?;
let uri_index_record = object_uri_index_from_object(object);
put_json_batch(
&mut batch,
self,
CF_OBJECTS_BY_URI,
object_uri_key(&object.run_id, &object.uri).as_bytes(),
&uri_index_record,
)?;
self.write_batch(batch)
}
pub fn put_object_projection(
&self,
projection: &crate::object_projection::ObjectProjectionRecord,
) -> QueryDbResult<()> {
if self.secondary {
return Ok(());
}
self.put_json_cf(
CF_OBJECTS_BY_HASH,
object_hash_key(&projection.sha256).as_bytes(),
projection,
)
}
fn report_path_for_run(&self, run_id: &str) -> QueryDbResult<Option<PathBuf>> {
Ok(self
.get_run(run_id)?
.map(|run| Path::new(&run.run_dir).join("report.json")))
}
pub fn get_object_projection(
&self,
sha256: &str,
) -> QueryDbResult<Option<crate::object_projection::ObjectProjectionRecord>> {
self.get_json_cf(CF_OBJECTS_BY_HASH, object_hash_key(sha256).as_bytes())
}
pub fn get_stat(
&self,
run_id: &str,
scope: &str,
name: &str,
) -> QueryDbResult<Option<StatsRecord>> {
self.get_json_cf(CF_STATS, stats_key(run_id, scope, name).as_bytes())
}
pub fn put_export_job(&self, job: &ExportJobRecord) -> QueryDbResult<()> {
self.put_json_cf(
CF_EXPORT_JOBS,
export_job_key(&job.run_id, &job.job_id).as_bytes(),
job,
)
}
pub fn get_export_job(
&self,
run_id: &str,
job_id: &str,
) -> QueryDbResult<Option<ExportJobRecord>> {
self.get_json_cf(CF_EXPORT_JOBS, export_job_key(run_id, job_id).as_bytes())
}
pub fn put_validation_explain(&self, explain: &ValidationExplainRecord) -> QueryDbResult<()> {
if self.secondary {
return Ok(());
}
self.put_json_cf(
CF_VALIDATION_EXPLAIN_CACHE,
validation_explain_key(
&explain.run_id,
&explain.object_instance_id,
explain.explain_version,
)
.as_bytes(),
explain,
)
}
pub fn get_validation_explain(
&self,
run_id: &str,
object_instance_id: &str,
explain_version: u32,
) -> QueryDbResult<Option<ValidationExplainRecord>> {
self.get_json_cf(
CF_VALIDATION_EXPLAIN_CACHE,
validation_explain_key(run_id, object_instance_id, explain_version).as_bytes(),
)
}
pub fn has_object_projection(&self, sha256: &str) -> QueryDbResult<bool> {
Ok(self
.get_cf(CF_OBJECTS_BY_HASH, object_hash_key(sha256).as_bytes())?
.is_some())
}
pub fn count_cf(&self, cf_name: &'static str) -> QueryDbResult<u64> {
let cf = self.cf(cf_name)?;
let mut count = 0u64;
for item in self.db.iterator_cf(cf, IteratorMode::Start) {
let _ = item?;
count += 1;
}
Ok(count)
}
pub fn list_ready_runs_by_seq(&self) -> QueryDbResult<Vec<RunRecord>> {
let seq_cf = self.cf(CF_RUNS_BY_SEQ)?;
let mut seen = BTreeSet::new();
let mut runs = Vec::new();
for item in self.db.iterator_cf(seq_cf, IteratorMode::Start) {
let (_, value) = item?;
let run_id: String = serde_json::from_slice(&value)?;
if seen.insert(run_id.clone())
&& let Some(run) = self.get_run(&run_id)?
&& run.index_status == "ready"
{
runs.push(run);
}
}
let runs_cf = self.cf(CF_RUNS)?;
for item in self.db.iterator_cf(runs_cf, IteratorMode::Start) {
let (_, value) = item?;
let run: RunRecord = serde_json::from_slice(&value)?;
if run.index_status == "ready" && seen.insert(run.run_id.clone()) {
runs.push(run);
}
}
runs.sort_by(|left, right| match (left.run_seq, right.run_seq) {
(Some(left_seq), Some(right_seq)) => left_seq.cmp(&right_seq),
(Some(_), None) => std::cmp::Ordering::Greater,
(None, Some(_)) => std::cmp::Ordering::Less,
(None, None) => left.run_id.cmp(&right.run_id),
});
Ok(runs)
}
pub fn enforce_run_retention(&self, retain_ready_runs: usize) -> QueryDbResult<u64> {
if retain_ready_runs == 0 {
return Ok(0);
}
let ready_runs = self.list_ready_runs_by_seq()?;
let delete_count = ready_runs.len().saturating_sub(retain_ready_runs);
let mut deleted = 0u64;
for run in ready_runs.into_iter().take(delete_count) {
self.delete_run_index(&run)?;
deleted += 1;
}
Ok(deleted)
}
pub fn delete_run_index(&self, run: &RunRecord) -> QueryDbResult<()> {
let mut batch = WriteBatch::default();
let runs_cf = self.cf(CF_RUNS)?;
batch.delete_cf(runs_cf, run_key(&run.run_id).as_bytes());
if let Some(seq) = run.run_seq {
let seq_cf = self.cf(CF_RUNS_BY_SEQ)?;
batch.delete_cf(seq_cf, seq_key(seq).as_bytes());
}
for (cf_name, prefix) in [
(CF_REPOS, format!("repo/{}/", run.run_id)),
(CF_PUBLICATION_POINTS, format!("pp/{}/", run.run_id)),
(CF_OBJECT_INSTANCES, format!("objinst/{}/", run.run_id)),
(CF_OBJECTS_BY_URI, format!("objuri/{}/", run.run_id)),
(
CF_VALIDATION_EXPLAIN_CACHE,
format!("explain/{}/", run.run_id),
),
(CF_EXPORT_JOBS, format!("export/{}/", run.run_id)),
(CF_STATS, format!("stats/{}/", run.run_id)),
(CF_REASON_INDEX, format!("reason/{}/", run.run_id)),
] {
self.delete_prefix_range(&mut batch, cf_name, &prefix)?;
}
self.write_batch(batch)
}
fn cf(&self, name: &'static str) -> QueryDbResult<&rocksdb::ColumnFamily> {
self.db
.cf_handle(name)
.ok_or(QueryDbError::MissingColumnFamily(name))
}
fn put_json_cf<T: Serialize>(
&self,
cf_name: &'static str,
key: &[u8],
value: &T,
) -> QueryDbResult<()> {
let cf = self.cf(cf_name)?;
let bytes = serde_json::to_vec(value)?;
self.db.put_cf(cf, key, bytes)?;
Ok(())
}
fn get_cf(&self, cf_name: &'static str, key: &[u8]) -> QueryDbResult<Option<Vec<u8>>> {
let cf = self.cf(cf_name)?;
self.db.get_cf(cf, key).map_err(QueryDbError::from)
}
fn get_json_cf<T: for<'de> Deserialize<'de>>(
&self,
cf_name: &'static str,
key: &[u8],
) -> QueryDbResult<Option<T>> {
let Some(bytes) = self.get_cf(cf_name, key)? else {
return Ok(None);
};
serde_json::from_slice(&bytes)
.map(Some)
.map_err(QueryDbError::from)
}
fn list_json_by_prefix<T: for<'de> Deserialize<'de>>(
&self,
cf_name: &'static str,
prefix: &str,
raw_limit: usize,
cursor: Option<&str>,
) -> QueryDbResult<QueryPage<T>> {
let limit = raw_limit.clamp(1, 1000);
let cf = self.cf(cf_name)?;
let start = cursor.unwrap_or(prefix);
let mut data = Vec::new();
let mut next_cursor = None;
let mode = IteratorMode::From(start.as_bytes(), rocksdb::Direction::Forward);
for item in self.db.iterator_cf(cf, mode) {
let (key, value) = item?;
let key_str = String::from_utf8_lossy(&key);
if !key_str.starts_with(prefix) {
break;
}
if data.len() >= limit {
next_cursor = Some(key_str.to_string());
break;
}
data.push(serde_json::from_slice(&value)?);
}
Ok(QueryPage {
data,
next_cursor,
limit,
})
}
fn list_json_by_prefix_filtered<T, F>(
&self,
cf_name: &'static str,
prefix: &str,
raw_limit: usize,
cursor: Option<&str>,
mut predicate: F,
) -> QueryDbResult<QueryPage<T>>
where
T: for<'de> Deserialize<'de>,
F: FnMut(&T) -> bool,
{
let limit = raw_limit.clamp(1, 1000);
let cf = self.cf(cf_name)?;
let start = cursor.unwrap_or(prefix);
let mut data = Vec::new();
let mut next_cursor = None;
let mode = IteratorMode::From(start.as_bytes(), rocksdb::Direction::Forward);
for item in self.db.iterator_cf(cf, mode) {
let (key, value) = item?;
let key_str = String::from_utf8_lossy(&key);
if !key_str.starts_with(prefix) {
break;
}
let value: T = serde_json::from_slice(&value)?;
if !predicate(&value) {
continue;
}
if data.len() >= limit {
next_cursor = Some(key_str.to_string());
break;
}
data.push(value);
}
Ok(QueryPage {
data,
next_cursor,
limit,
})
}
fn write_batch(&self, batch: WriteBatch) -> QueryDbResult<()> {
self.db.write(batch)?;
Ok(())
}
fn delete_prefix_range(
&self,
batch: &mut WriteBatch,
cf_name: &'static str,
prefix: &str,
) -> QueryDbResult<()> {
let cf = self.cf(cf_name)?;
if let Some(end) = prefix_range_end(prefix.as_bytes()) {
batch.delete_range_cf(cf, prefix.as_bytes(), end.as_slice());
}
Ok(())
}
}
fn cf_options() -> Options {
let mut opts = Options::default();
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ArtifactIndexerConfig {
pub query_db_path: PathBuf,
pub run_root: Option<PathBuf>,
pub run_dir: Option<PathBuf>,
pub repo_bytes_db_path: Option<PathBuf>,
pub projection_entry_limit: usize,
pub min_run_seq: Option<u64>,
pub retain_indexed_runs: Option<usize>,
}
pub fn index_artifacts(config: &ArtifactIndexerConfig) -> QueryDbResult<QueryIndexSummary> {
let db = QueryDb::open(&config.query_db_path)?;
let repo_bytes = match config.repo_bytes_db_path.as_ref() {
Some(path) => Some(
ExternalRepoBytesDb::open(path)
.map_err(|err| QueryDbError::RocksDb(err.to_string()))?,
),
None => None,
};
index_artifacts_with_open_db(&db, repo_bytes.as_ref(), config)
}
pub fn index_artifacts_with_open_db(
db: &QueryDb,
repo_bytes: Option<&ExternalRepoBytesDb>,
config: &ArtifactIndexerConfig,
) -> QueryDbResult<QueryIndexSummary> {
let mut summary = QueryIndexSummary::default();
let run_dirs = collect_index_run_dirs(config)?;
for run_dir in run_dirs {
match index_run_dir(db, repo_bytes, config.projection_entry_limit, &run_dir) {
Ok(run_summary) => {
if run_summary.indexed {
summary.runs_indexed += 1;
}
summary.repos_indexed += run_summary.repos_indexed;
summary.publication_points_indexed += run_summary.publication_points_indexed;
summary.object_instances_indexed += run_summary.object_instances_indexed;
summary.object_projections_indexed += run_summary.object_projections_indexed;
summary.stats_indexed += run_summary.stats_indexed;
summary.latest_ready_run = run_summary.latest_ready_run;
}
Err(err) => summary.errors.push(format!("{}: {err}", run_dir.display())),
}
}
if summary.runs_indexed > 0
&& let Some(retain_indexed_runs) = config.retain_indexed_runs
{
summary.runs_deleted = db.enforce_run_retention(retain_indexed_runs)?;
summary.retained_runs = db.list_ready_runs_by_seq()?.len() as u64;
}
Ok(summary)
}
fn collect_index_run_dirs(config: &ArtifactIndexerConfig) -> QueryDbResult<Vec<PathBuf>> {
if let Some(run_dir) = config.run_dir.as_ref() {
return Ok(vec![run_dir.clone()]);
}
let root = config.run_root.as_ref().ok_or_else(|| {
QueryDbError::InvalidArtifact("either run_root or run_dir is required".into())
})?;
let runs_root = if root.join("runs").is_dir() {
root.join("runs")
} else {
root.clone()
};
let mut out = Vec::new();
for entry in fs::read_dir(&runs_root)? {
let entry = entry?;
let path = entry.path();
if !path.is_dir() {
continue;
}
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if name.starts_with("run_") && path.join("report.json").exists() {
if let Some(min_run_seq) = config.min_run_seq {
match run_index_from_path(&path) {
Some(seq) if seq >= min_run_seq => {}
_ => continue,
}
}
if run_status_is_success_or_unknown(&path)? {
out.push(path);
}
}
}
out.sort();
Ok(out)
}
fn run_status_is_success_or_unknown(run_dir: &Path) -> QueryDbResult<bool> {
let summary_path = run_dir.join("run-summary.json");
if !summary_path.exists() {
return Ok(true);
}
let summary = read_json_file(&summary_path)?;
Ok(match json_str(&summary, &["status"]) {
Some(status) => status == "success",
None => true,
})
}
#[derive(Default)]
struct SingleRunIndexSummary {
indexed: bool,
repos_indexed: u64,
publication_points_indexed: u64,
object_instances_indexed: u64,
object_projections_indexed: u64,
stats_indexed: u64,
latest_ready_run: Option<String>,
}
fn index_run_dir(
db: &QueryDb,
_repo_bytes: Option<&ExternalRepoBytesDb>,
_projection_entry_limit: usize,
run_dir: &Path,
) -> QueryDbResult<SingleRunIndexSummary> {
let report_path = run_dir.join("report.json");
if !report_path.exists() {
return Err(QueryDbError::InvalidArtifact(format!(
"missing report.json under {}",
run_dir.display()
)));
}
let summary_file = read_json_file_optional(&run_dir.join("run-summary.json"))?;
let meta = read_json_file_optional(&run_dir.join("run-meta.json"))?;
let stage_timing = read_json_file_optional(&run_dir.join("stage-timing.json"))?;
let run_id = run_id_for(run_dir, summary_file.as_ref(), meta.as_ref());
if ready_run_is_current(db, &run_id, run_dir)? {
return Ok(SingleRunIndexSummary {
latest_ready_run: db.latest_ready_run()?,
..SingleRunIndexSummary::default()
});
}
let report_summary = report_stream::summarize_report(&report_path, &run_id)?;
let artifact_manifest = build_artifact_manifest(run_dir, report_summary.query_audit.as_ref())?;
let mut run_record = build_run_record(
run_dir,
&run_id,
&report_summary,
summary_file.as_ref(),
meta.as_ref(),
stage_timing.as_ref(),
&artifact_manifest,
);
run_record.index_status = "building".to_string();
run_record.index_error = None;
db.put_json_cf(CF_RUNS, run_key(&run_id).as_bytes(), &run_record)?;
let indexed =
match write_summary_index_records(db, &run_id, &report_summary, &artifact_manifest) {
Ok(indexed) => indexed,
Err(err) => {
run_record.index_status = "failed".to_string();
run_record.index_error = Some(err.to_string());
db.put_json_cf(CF_RUNS, run_key(&run_id).as_bytes(), &run_record)?;
return Err(err);
}
};
run_record.index_status = "ready".to_string();
let should_update_latest = should_update_latest_ready_run(db, &run_record)?;
let mut final_batch = WriteBatch::default();
put_json_batch(
&mut final_batch,
db,
CF_RUNS,
run_key(&run_id).as_bytes(),
&run_record,
)?;
if let Some(seq) = run_record.run_seq {
put_json_batch(
&mut final_batch,
db,
CF_RUNS_BY_SEQ,
seq_key(seq).as_bytes(),
&run_id,
)?;
}
if should_update_latest {
put_json_batch(&mut final_batch, db, CF_META, KEY_LATEST_READY_RUN, &run_id)?;
}
db.write_batch(final_batch)?;
Ok(SingleRunIndexSummary {
indexed: true,
repos_indexed: indexed.repos_indexed,
publication_points_indexed: indexed.publication_points_indexed,
object_instances_indexed: indexed.object_instances_indexed,
object_projections_indexed: indexed.object_projections_indexed,
stats_indexed: indexed.stats_indexed,
latest_ready_run: if should_update_latest {
Some(run_id)
} else {
db.latest_ready_run()?
},
})
}
fn ready_run_is_current(db: &QueryDb, run_id: &str, run_dir: &Path) -> QueryDbResult<bool> {
let Some(existing) = db.get_run(run_id)? else {
return Ok(false);
};
Ok(existing.schema_version == QUERY_DB_SCHEMA_VERSION
&& existing.index_status == "ready"
&& existing.run_dir == run_dir.display().to_string())
}
fn should_update_latest_ready_run(db: &QueryDb, candidate: &RunRecord) -> QueryDbResult<bool> {
let Some(current_run_id) = db.latest_ready_run()? else {
return Ok(true);
};
let Some(current) = db.get_run(&current_run_id)? else {
return Ok(true);
};
Ok(match (candidate.run_seq, current.run_seq) {
(Some(candidate_seq), Some(current_seq)) => candidate_seq >= current_seq,
(Some(_), None) => true,
(None, Some(_)) => false,
(None, None) => candidate.run_id >= current.run_id,
})
}
#[derive(Default)]
struct IndexWriteSummary {
repos_indexed: u64,
publication_points_indexed: u64,
object_instances_indexed: u64,
object_projections_indexed: u64,
stats_indexed: u64,
}
fn write_summary_index_records(
db: &QueryDb,
run_id: &str,
report_summary: &ReportSummary,
artifact_manifest: &crate::query::artifact_manifest::ArtifactManifestSummary,
) -> QueryDbResult<IndexWriteSummary> {
const BATCH_LIMIT: usize = 5000;
let mut batch = WriteBatch::default();
let mut pending = 0usize;
let mut summary = IndexWriteSummary::default();
macro_rules! flush_if_needed {
() => {
if pending >= BATCH_LIMIT {
let to_write = std::mem::take(&mut batch);
db.write_batch(to_write)?;
pending = 0;
}
};
}
for pp_record in &report_summary.publication_points {
put_json_batch(
&mut batch,
db,
CF_PUBLICATION_POINTS,
pp_key(run_id, &pp_record.pp_id).as_bytes(),
pp_record,
)?;
pending += 1;
summary.publication_points_indexed += 1;
flush_if_needed!();
}
for repo_record in &report_summary.repos {
put_json_batch(
&mut batch,
db,
CF_REPOS,
repo_key(run_id, &repo_record.repo_id).as_bytes(),
repo_record,
)?;
pending += 1;
summary.repos_indexed += 1;
flush_if_needed!();
}
let artifacts_value = serde_json::to_value(artifact_manifest)?;
let stats = report_stream::stats_records_from_summary(run_id, report_summary, artifacts_value);
for record in &stats {
put_json_batch(
&mut batch,
db,
CF_STATS,
stats_key(&record.run_id, &record.scope, &record.name).as_bytes(),
record,
)?;
pending += 1;
flush_if_needed!();
}
if pending > 0 {
db.write_batch(batch)?;
}
summary.stats_indexed = stats.len() as u64;
Ok(summary)
}
fn object_uri_index_from_object(object: &ObjectInstanceRecord) -> ObjectUriIndexRecord {
ObjectUriIndexRecord {
run_id: object.run_id.clone(),
uri: object.uri.clone(),
sha256: object.sha256.clone(),
object_instance_id: object.object_instance_id.clone(),
repo_id: object.repo_id.clone(),
pp_id: object.pp_id.clone(),
}
}
fn put_json_batch<T: Serialize>(
batch: &mut WriteBatch,
db: &QueryDb,
cf_name: &'static str,
key: &[u8],
value: &T,
) -> QueryDbResult<()> {
let cf = db.cf(cf_name)?;
let bytes = serde_json::to_vec(value)?;
batch.put_cf(cf, key, bytes);
Ok(())
}
fn build_run_record(
run_dir: &Path,
run_id: &str,
report_summary: &ReportSummary,
summary: Option<&Value>,
meta: Option<&Value>,
stage_timing: Option<&Value>,
artifact_manifest: &crate::query::artifact_manifest::ArtifactManifestSummary,
) -> RunRecord {
let artifact_paths = artifact_manifest.artifact_paths();
let counts = RunCounts {
publication_points: summary
.and_then(|v| json_u64(v, &["reportCounts", "publicationPoints"]))
.unwrap_or(report_summary.publication_points.len() as u64),
objects: report_summary.objects_count,
fresh_objects: 0,
cached_objects: 0,
rejected_objects: report_summary
.publication_points
.iter()
.map(|pp| pp.rejected_objects)
.sum(),
fresh_rejected_objects: 0,
cached_rejected_objects: 0,
trust_anchors: 0,
vrps: summary
.and_then(|v| json_u64(v, &["reportCounts", "vrps"]))
.unwrap_or(report_summary.vrps_count),
aspas: summary
.and_then(|v| json_u64(v, &["reportCounts", "aspas"]))
.unwrap_or(report_summary.aspas_count),
warnings: summary
.and_then(|v| json_u64(v, &["reportCounts", "warnings"]))
.unwrap_or(report_summary.warnings_count),
};
RunRecord {
schema_version: QUERY_DB_SCHEMA_VERSION,
run_id: run_id.to_string(),
run_seq: summary
.and_then(|v| json_u64(v, &["runSeq"]))
.or_else(|| meta.and_then(|v| json_u64(v, &["run_index"])))
.or_else(|| run_index_from_path(run_dir)),
run_dir: run_dir.display().to_string(),
validation_time: report_summary.validation_time.clone(),
sync_mode: meta
.and_then(|v| json_str(v, &["sync_mode"]))
.map(str::to_string),
started_at: summary
.and_then(|v| json_str(v, &["startedAtRfc3339Utc"]))
.or_else(|| meta.and_then(|v| json_str(v, &["started_at_rfc3339_utc"])))
.map(str::to_string),
finished_at: summary
.and_then(|v| json_str(v, &["finishedAtRfc3339Utc"]))
.or_else(|| meta.and_then(|v| json_str(v, &["completed_at_rfc3339_utc"])))
.map(str::to_string),
wall_ms: summary
.and_then(|v| json_u64(v, &["wallMs"]))
.or_else(|| stage_timing.and_then(|v| json_u64(v, &["total_ms"]))),
artifact_paths,
counts,
index_status: "building".to_string(),
index_error: None,
}
}
fn read_json_file(path: &Path) -> QueryDbResult<Value> {
let bytes = fs::read(path)?;
serde_json::from_slice(&bytes).map_err(QueryDbError::from)
}
fn read_json_file_optional(path: &Path) -> QueryDbResult<Option<Value>> {
if path.exists() {
read_json_file(path).map(Some)
} else {
Ok(None)
}
}
fn run_id_for(run_dir: &Path, summary: Option<&Value>, meta: Option<&Value>) -> String {
summary
.and_then(|v| json_str(v, &["runId"]))
.or_else(|| meta.and_then(|v| json_str(v, &["run_id"])))
.map(str::to_string)
.unwrap_or_else(|| {
run_dir
.file_name()
.and_then(|v| v.to_str())
.unwrap_or("run_unknown")
.to_string()
})
}
fn json_str<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_str()
}
fn json_u64(value: &Value, path: &[&str]) -> Option<u64> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_u64()
}
fn run_index_from_path(path: &Path) -> Option<u64> {
path.file_name()
.and_then(|name| name.to_str())
.and_then(|name| name.strip_prefix("run_"))
.and_then(|value| value.parse::<u64>().ok())
}
fn stable_id(value: &str) -> String {
let digest = Sha256::digest(value.as_bytes());
hex::encode(&digest[..12])
}
fn run_key(run_id: &str) -> String {
format!("run/{run_id}")
}
fn seq_key(seq: u64) -> String {
format!("seq/{seq:020}")
}
fn repo_key(run_id: &str, repo_id: &str) -> String {
format!("repo/{run_id}/{repo_id}")
}
fn pp_key(run_id: &str, pp_id: &str) -> String {
format!("pp/{run_id}/{pp_id}")
}
fn object_instance_key(
run_id: &str,
uri_hash: &str,
sha256: &str,
object_instance_id: &str,
) -> String {
format!("objinst/{run_id}/{uri_hash}/{sha256}/{object_instance_id}")
}
fn object_uri_key(run_id: &str, uri: &str) -> String {
format!("objuri/{run_id}/{}", stable_id(uri))
}
fn stats_key(run_id: &str, scope: &str, name: &str) -> String {
format!("stats/{run_id}/{scope}/{name}")
}
fn export_job_key(run_id: &str, job_id: &str) -> String {
format!("export/{run_id}/{job_id}")
}
fn validation_explain_key(run_id: &str, object_instance_id: &str, explain_version: u32) -> String {
format!("explain/{run_id}/{object_instance_id}/{explain_version}")
}
fn object_hash_key(sha256: &str) -> String {
format!("objhash/{sha256}")
}
fn prefix_range_end(prefix: &[u8]) -> Option<Vec<u8>> {
let mut end = prefix.to_vec();
for index in (0..end.len()).rev() {
if end[index] != u8::MAX {
end[index] += 1;
end.truncate(index + 1);
return Some(end);
}
}
None
}
#[cfg(test)]
mod tests {
use serde_json::json;
use sha2::{Digest, Sha256};
use super::*;
#[test]
fn latest_ready_run_updates_after_index_success() {
let temp = tempfile::tempdir().expect("tempdir");
let run_dir = temp.path().join("runs/run_0001");
fs::create_dir_all(&run_dir).expect("run dir");
write_sample_run(&run_dir, "run_0001", 1);
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.runs_indexed, 1);
assert_eq!(summary.latest_ready_run.as_deref(), Some("run_0001"));
let db = QueryDb::open(&query_db_path).expect("open query db");
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0001"));
let run = db.get_run("run_0001").unwrap().expect("run");
assert_eq!(run.index_status, "ready");
assert_eq!(run.counts.publication_points, 1);
assert_eq!(run.counts.objects, 2);
assert_eq!(db.count_cf(CF_REPOS).unwrap(), 1);
assert_eq!(db.count_cf(CF_PUBLICATION_POINTS).unwrap(), 1);
assert_eq!(db.count_cf(CF_OBJECT_INSTANCES).unwrap(), 0);
}
#[test]
fn failed_run_does_not_replace_previous_latest() {
let temp = tempfile::tempdir().expect("tempdir");
let run1 = temp.path().join("runs/run_0001");
let run2 = temp.path().join("runs/run_0002");
fs::create_dir_all(&run1).expect("run1");
fs::create_dir_all(&run2).expect("run2");
write_sample_run(&run1, "run_0001", 1);
write_sample_run(&run2, "run_0002", 2);
fs::write(
run2.join("run-summary.json"),
r#"{"status":"failed","runId":"run_0002","runSeq":2}"#,
)
.expect("failed summary");
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.runs_indexed, 1);
let db = QueryDb::open(&query_db_path).expect("open query db");
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0001"));
assert!(db.get_run("run_0002").unwrap().is_none());
}
#[test]
fn min_run_seq_filters_old_history() {
let temp = tempfile::tempdir().expect("tempdir");
let run1 = temp.path().join("runs/run_0001");
let run2 = temp.path().join("runs/run_0002");
fs::create_dir_all(&run1).expect("run1");
fs::create_dir_all(&run2).expect("run2");
write_sample_run(&run1, "run_0001", 1);
write_sample_run(&run2, "run_0002", 2);
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: Some(2),
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.runs_indexed, 1);
assert_eq!(summary.latest_ready_run.as_deref(), Some("run_0002"));
let db = QueryDb::open(&query_db_path).expect("query db");
assert!(db.get_run("run_0001").unwrap().is_none());
assert!(db.get_run("run_0002").unwrap().is_some());
}
#[test]
fn retention_deletes_old_run_scoped_indexes() {
let temp = tempfile::tempdir().expect("tempdir");
for seq in 1..=3 {
let run_dir = temp.path().join(format!("runs/run_{seq:04}"));
fs::create_dir_all(&run_dir).expect("run dir");
write_sample_run(&run_dir, &format!("run_{seq:04}"), seq);
}
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: Some(2),
})
.expect("index");
assert_eq!(summary.runs_indexed, 3);
assert_eq!(summary.runs_deleted, 1);
assert_eq!(summary.retained_runs, 2);
assert_eq!(summary.latest_ready_run.as_deref(), Some("run_0003"));
let db = QueryDb::open(&query_db_path).expect("query db");
assert!(db.get_run("run_0001").unwrap().is_none());
assert!(db.get_run("run_0002").unwrap().is_some());
assert!(db.get_run("run_0003").unwrap().is_some());
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0003"));
assert_eq!(db.list_repos("run_0001", 10, None).unwrap().data.len(), 0);
assert_eq!(
db.list_publication_points("run_0001", 10, None)
.unwrap()
.data
.len(),
0
);
assert_eq!(db.list_objects("run_0001", 10, None).unwrap().data.len(), 0);
assert!(
db.get_stat("run_0001", "overview", "counts")
.unwrap()
.is_none()
);
assert_eq!(db.list_ready_runs_by_seq().unwrap().len(), 2);
}
#[test]
fn repo_bytes_projection_is_not_written_during_summary_only_index() {
let temp = tempfile::tempdir().expect("tempdir");
let run_dir = temp.path().join("runs/run_0001");
fs::create_dir_all(&run_dir).expect("run dir");
let roa_bytes =
fs::read("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/AS4538.roa")
.expect("fixture roa");
let roa_sha = hex::encode(Sha256::digest(&roa_bytes));
write_sample_run_with_object_hash(&run_dir, "run_0001", 1, &roa_sha);
let repo_bytes_path = temp.path().join("repo-bytes.db");
let repo_bytes = ExternalRepoBytesDb::open(&repo_bytes_path).expect("repo bytes");
repo_bytes
.put_blob_bytes_batch(&[(roa_sha.clone(), roa_bytes)])
.expect("put repo bytes");
drop(repo_bytes);
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: Some(repo_bytes_path),
projection_entry_limit: 5,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.object_projections_indexed, 0);
let db = QueryDb::open(&query_db_path).expect("open query db");
assert_eq!(db.count_cf(CF_OBJECTS_BY_HASH).unwrap(), 0);
}
#[test]
fn validation_events_sidecar_is_indexed() {
let temp = tempfile::tempdir().expect("tempdir");
let run_dir = temp.path().join("runs/run_0001");
fs::create_dir_all(&run_dir).expect("run dir");
write_sample_run(&run_dir, "run_0001", 1);
let events = [
json!({"schemaVersion":1,"seq":1,"eventType":"run_summary","validationTime":"2026-06-15T00:00:00Z","counts":{"objects":2,"warnings":0,"vrps":1,"aspas":0}}),
json!({"schemaVersion":1,"seq":2,"eventType":"object","validationTime":"2026-06-15T00:00:00Z","objectUri":"rsync://repo.example/rpki/a.roa","sha256":"22","objectType":"roa","result":"error","reason":"bad roa"}),
];
let mut events_bytes = Vec::new();
for event in events {
events_bytes.extend_from_slice(&serde_json::to_vec(&event).unwrap());
events_bytes.push(b'\n');
}
let events_sha256 = hex::encode(Sha256::digest(&events_bytes));
fs::write(run_dir.join("validation-events.jsonl"), &events_bytes).expect("events");
let mut report = read_json_file(&run_dir.join("report.json")).expect("read report");
report["queryAudit"] = json!({
"schemaVersion": 1,
"status": "complete",
"eventsPath": "validation-events.jsonl",
"eventsCount": 2,
"eventsSha256": events_sha256,
"writerVersion": 1
});
fs::write(
run_dir.join("report.json"),
serde_json::to_vec(&report).unwrap(),
)
.expect("report");
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.stats_indexed, 11);
let db = QueryDb::open(&query_db_path).expect("open query db");
let run = db.get_run("run_0001").unwrap().expect("run");
assert!(run.artifact_paths.contains_key("validationEvents"));
let manifest = db
.get_stat("run_0001", "validation_events", "manifest")
.unwrap()
.expect("event manifest");
assert_eq!(manifest.value["eventsCount"].as_u64(), Some(2));
let by_type = db
.get_stat("run_0001", "validation_events", "by_type")
.unwrap()
.expect("event type stats");
assert_eq!(by_type.value.as_object().map(|items| items.len()), Some(0));
let reasons = db
.get_stat("run_0001", "validation_events", "reasons")
.unwrap()
.expect("event reasons");
assert_eq!(reasons.value.as_object().map(|items| items.len()), Some(0));
}
#[test]
fn query_db_lists_records_indexes_and_cached_results() {
let temp = tempfile::tempdir().expect("tempdir");
let run1 = temp.path().join("runs/run_0001");
let run2 = temp.path().join("runs/run_0002");
fs::create_dir_all(&run1).expect("run1");
fs::create_dir_all(&run2).expect("run2");
write_sample_run(&run1, "run_0001", 1);
write_sample_run(&run2, "run_0002", 2);
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
assert_eq!(summary.runs_indexed, 2);
assert_eq!(summary.latest_ready_run.as_deref(), Some("run_0002"));
let db = QueryDb::open(&query_db_path).expect("open query db");
assert_eq!(
db.resolve_run_id("latest").unwrap().as_deref(),
Some("run_0002")
);
assert_eq!(
db.resolve_run_id("run_0001").unwrap().as_deref(),
Some("run_0001")
);
let first_page = db.list_runs(1, None).expect("runs");
assert_eq!(first_page.data.len(), 1);
assert_eq!(first_page.data[0].run_id, "run_0001");
let second_page = db
.list_runs(1, first_page.next_cursor.as_deref())
.expect("second runs");
assert_eq!(second_page.data.len(), 1);
assert_eq!(second_page.data[0].run_id, "run_0002");
let repos = db.list_repos("run_0002", 10, None).expect("repos");
assert_eq!(repos.data.len(), 1);
let repo = repos.data[0].clone();
assert_eq!(repo.host, "repo.example");
assert_eq!(repo.transport, "rrdp");
assert_eq!(repo.publication_points, 1);
assert_eq!(
db.get_repo("run_0002", &repo.repo_id)
.unwrap()
.expect("repo")
.uri,
repo.uri
);
let pps = db
.list_publication_points_for_repo("run_0002", &repo.repo_id, 10, None)
.expect("pps");
assert_eq!(pps.data.len(), 1);
let pp = pps.data[0].clone();
assert_eq!(pp.repo_sync_phase.as_deref(), Some("rrdp_delta"));
assert_eq!(
db.get_publication_point("run_0002", &pp.pp_id)
.unwrap()
.expect("pp")
.objects,
2
);
let objects = db.list_objects("run_0002", 10, None).expect("objects");
assert_eq!(objects.data.len(), 2);
assert_eq!(
db.list_objects_for_repo("run_0002", &repo.repo_id, 10, None)
.expect("repo objects")
.data
.len(),
2
);
assert_eq!(
db.list_objects_for_pp("run_0002", &pp.pp_id, 10, None)
.expect("pp objects")
.data
.len(),
2
);
let roa = objects
.data
.iter()
.find(|object| object.object_type == "roa")
.expect("roa")
.clone();
assert!(roa.rejected);
assert_eq!(
db.get_object_by_instance_id("run_0002", &roa.object_instance_id)
.unwrap()
.expect("object")
.uri,
roa.uri
);
assert_eq!(
db.get_object_by_uri("run_0002", &roa.uri)
.unwrap()
.expect("uri index")
.object_instance_id,
roa.object_instance_id
);
let overview = db
.get_stat("run_0002", "overview", "counts")
.unwrap()
.expect("overview");
assert_eq!(overview.value["objects"].as_u64(), Some(2));
let object_types = db
.get_stat("run_0002", "objects", "by_type")
.unwrap()
.expect("types");
assert_eq!(object_types.value["manifest"].as_u64(), Some(1));
assert_eq!(object_types.value["roa"].as_u64(), Some(1));
let by_result = db
.get_stat("run_0002", "validation", "by_result")
.unwrap()
.expect("result");
assert_eq!(by_result.value["ok"].as_u64(), Some(1));
assert_eq!(by_result.value["error"].as_u64(), Some(1));
let job = ExportJobRecord {
schema_version: QUERY_DB_SCHEMA_VERSION,
job_id: "job-1".to_string(),
run_id: "run_0002".to_string(),
scope: "object_set".to_string(),
repo_id: None,
pp_id: None,
status: "complete".to_string(),
created_at: "2026-06-15T00:00:00Z".to_string(),
finished_at: Some("2026-06-15T00:00:01Z".to_string()),
output_path: Some("/tmp/export.tar".to_string()),
object_count: 2,
bytes_written: 512,
error: None,
};
db.put_export_job(&job).expect("put job");
assert_eq!(
db.get_export_job("run_0002", "job-1")
.unwrap()
.expect("job")
.bytes_written,
512
);
let explain = ValidationExplainRecord {
schema_version: QUERY_DB_SCHEMA_VERSION,
explain_version: 1,
run_id: "run_0002".to_string(),
object_instance_id: roa.object_instance_id.clone(),
uri: roa.uri.clone(),
sha256: roa.sha256.clone(),
object_type: roa.object_type.clone(),
final_status: "invalid".to_string(),
audit_result: "error".to_string(),
detail_summary: Some("bad roa".to_string()),
authoritative: false,
explain_mode: "test".to_string(),
generated_at: "2026-06-15T00:00:02Z".to_string(),
parsevalidate: json!({"status":"invalid"}),
chainvalidate: json!({"status":"invalid"}),
chain_edges: vec![ChainEdgeRecord {
relation: "test".to_string(),
from_uri: roa.uri.clone(),
to_uri: "rsync://repo.example/rpki/m.mft".to_string(),
to_object_instance_id: None,
to_sha256: None,
status: "missing".to_string(),
evidence: json!({}),
}],
};
db.put_validation_explain(&explain).expect("put explain");
assert_eq!(
db.get_validation_explain("run_0002", &roa.object_instance_id, 1)
.unwrap()
.expect("explain")
.final_status,
"invalid"
);
}
#[test]
fn repeated_indexing_does_not_move_latest_backwards() {
let temp = tempfile::tempdir().expect("tempdir");
let run1 = temp.path().join("runs/run_0001");
let run2 = temp.path().join("runs/run_0002");
fs::create_dir_all(&run1).expect("run1");
fs::create_dir_all(&run2).expect("run2");
write_sample_run(&run1, "run_0001", 1);
write_sample_run(&run2, "run_0002", 2);
let query_db_path = temp.path().join("query-db");
let db = QueryDb::open(&query_db_path).expect("open query db");
let config = ArtifactIndexerConfig {
query_db_path,
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
};
let first = index_artifacts_with_open_db(&db, None, &config).expect("first index");
assert_eq!(first.latest_ready_run.as_deref(), Some("run_0002"));
assert_eq!(first.runs_indexed, 2);
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0002"));
let second = index_artifacts_with_open_db(&db, None, &config).expect("second index");
assert_eq!(second.runs_indexed, 0);
assert_eq!(second.latest_ready_run.as_deref(), Some("run_0002"));
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0002"));
assert_eq!(
db.get_run("run_0001")
.unwrap()
.expect("run_0001")
.index_status,
"ready"
);
}
#[test]
fn validation_events_mismatch_is_reported_without_switching_latest() {
let temp = tempfile::tempdir().expect("tempdir");
let run_dir = temp.path().join("runs/run_0001");
fs::create_dir_all(&run_dir).expect("run dir");
write_sample_run(&run_dir, "run_0001", 1);
let events_bytes = br#"{"schemaVersion":1,"eventType":"run_summary"}"#;
fs::write(run_dir.join("validation-events.jsonl"), events_bytes).expect("events");
let mut report = read_json_file(&run_dir.join("report.json")).expect("read report");
report["queryAudit"] = json!({
"schemaVersion": 1,
"status": "complete",
"eventsPath": "validation-events.jsonl",
"eventsCount": 2,
"eventsSha256": hex::encode(Sha256::digest(events_bytes)),
"writerVersion": 1
});
fs::write(
run_dir.join("report.json"),
serde_json::to_vec(&report).unwrap(),
)
.expect("report");
let query_db_path = temp.path().join("query-db");
let summary = index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index summary");
assert_eq!(summary.runs_indexed, 1);
assert_eq!(summary.errors.len(), 0);
let db = QueryDb::open(&query_db_path).expect("db");
assert_eq!(db.latest_ready_run().unwrap().as_deref(), Some("run_0001"));
}
fn write_sample_run_with_object_hash(
run_dir: &Path,
run_id: &str,
run_seq: u64,
roa_sha: &str,
) {
write_sample_run(run_dir, run_id, run_seq);
let mut report = read_json_file(&run_dir.join("report.json")).expect("read report");
report["publication_points"][0]["objects"][1]["sha256_hex"] = json!(roa_sha);
fs::write(
run_dir.join("report.json"),
serde_json::to_vec(&report).unwrap(),
)
.expect("report");
}
fn write_sample_run(run_dir: &Path, run_id: &str, run_seq: u64) {
let report = json!({
"format_version": 2,
"meta": {"validation_time_rfc3339_utc": "2026-06-15T00:00:00Z"},
"tree": {"warnings": []},
"publication_points": [
{
"node_id": 10,
"rsync_base_uri": "rsync://repo.example/rpki/",
"manifest_rsync_uri": "rsync://repo.example/rpki/m.mft",
"publication_point_rsync_uri": "rsync://repo.example/rpki/",
"rrdp_notification_uri": "https://repo.example/rrdp/notification.xml",
"source": "rrdp",
"repo_sync_source": "rrdp",
"repo_sync_phase": "rrdp_delta",
"repo_sync_duration_ms": 123,
"repo_terminal_state": "fresh",
"warnings": [],
"objects": [
{"rsync_uri":"rsync://repo.example/rpki/m.mft","sha256_hex":"11","kind":"manifest","result":"ok"},
{"rsync_uri":"rsync://repo.example/rpki/a.roa","sha256_hex":"22","kind":"roa","result":"error","detail":"bad roa"}
]
}
],
"vrps": [{"asn": 64496, "prefix": "192.0.2.0/24", "max_length": 24}],
"aspas": [],
"downloads": [],
"download_stats": {},
"repo_sync_stats": {}
});
fs::write(
run_dir.join("report.json"),
serde_json::to_vec(&report).unwrap(),
)
.expect("report");
let summary = json!({
"status": "success",
"runId": run_id,
"runSeq": run_seq,
"startedAtRfc3339Utc": "2026-06-15T00:00:00Z",
"finishedAtRfc3339Utc": "2026-06-15T00:01:00Z",
"wallMs": 60000,
"reportCounts": {"vrps": 1, "aspas": 0, "publicationPoints": 1, "warnings": 0}
});
fs::write(
run_dir.join("run-summary.json"),
serde_json::to_vec(&summary).unwrap(),
)
.expect("summary");
fs::write(run_dir.join("stage-timing.json"), b"{}").expect("stage");
}
}