rpki/src/tools/rpki_artifact_metrics.rs

2237 lines
77 KiB
Rust

use std::collections::{BTreeMap, BTreeSet};
use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::ccr::decode_content_info;
use crate::cir::decode_cir;
use serde::Serialize;
use serde_json::{Value, json};
use sha2::{Digest, Sha256};
const LARGE_PP_OBJECT_THRESHOLDS: &[u64] = &[10, 50, 100, 500, 1000, 5000, 10000, 50000];
const PP_SYNC_SECONDS_BUCKETS: &[f64] = &[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0];
#[derive(Clone, Debug, PartialEq, Eq)]
struct Args {
run_root: PathBuf,
listen: String,
poll_secs: u64,
instance: String,
once: bool,
out_metrics: Option<PathBuf>,
out_status: Option<PathBuf>,
}
fn usage() -> &'static str {
"Usage: rpki_artifact_metrics --run-root <path> [--listen <addr:port>] [--poll-secs <n>] [--instance <name>] [--once] [--out-metrics <path>] [--out-status <path>]"
}
pub fn main_entry() -> Result<(), String> {
real_main()
}
fn real_main() -> Result<(), String> {
let args = parse_args(&std::env::args().collect::<Vec<_>>())?;
if args.once {
let snapshot = scan_run_root(&args.run_root, &args.instance)?;
let metrics = render_metrics(&snapshot);
let status = render_status_json(&snapshot)?;
if let Some(path) = args.out_metrics.as_ref() {
write_file(path, metrics.as_bytes())?;
} else {
print!("{metrics}");
}
if let Some(path) = args.out_status.as_ref() {
write_file(path, status.as_bytes())?;
}
return Ok(());
}
let shared = Arc::new(RwLock::new(scan_run_root(&args.run_root, &args.instance)?));
let scanner = Arc::clone(&shared);
let run_root = args.run_root.clone();
let instance = args.instance.clone();
let poll_secs = args.poll_secs.max(1);
thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(poll_secs));
let previous_snapshot = scanner.read().expect("metrics lock poisoned").clone();
let next =
match scan_run_root_incremental(&run_root, &instance, Some(&previous_snapshot)) {
Ok(snapshot) => snapshot,
Err(err) => {
let mut previous = scanner.write().expect("metrics lock poisoned");
previous
.service
.parse_errors
.push(format!("scan failed: {err}"));
previous.service.last_reload_success = false;
previous.service.last_scan_timestamp_seconds = unix_now_seconds();
continue;
}
};
*scanner.write().expect("metrics lock poisoned") = next;
}
});
serve_http(&args.listen, shared)
}
fn parse_args(argv: &[String]) -> Result<Args, String> {
let mut run_root = None;
let mut listen = "127.0.0.1:9556".to_string();
let mut poll_secs = 10u64;
let mut instance = "ours-rp".to_string();
let mut once = false;
let mut out_metrics = None;
let mut out_status = None;
let mut index = 1usize;
while index < argv.len() {
match argv[index].as_str() {
"--run-root" => {
index += 1;
run_root = Some(PathBuf::from(value_at(argv, index, "--run-root")?));
}
"--listen" => {
index += 1;
listen = value_at(argv, index, "--listen")?.to_string();
}
"--poll-secs" => {
index += 1;
let value = value_at(argv, index, "--poll-secs")?;
poll_secs = value
.parse::<u64>()
.map_err(|_| format!("invalid --poll-secs: {value}"))?;
}
"--instance" => {
index += 1;
instance = value_at(argv, index, "--instance")?.to_string();
}
"--once" => once = true,
"--out-metrics" => {
index += 1;
out_metrics = Some(PathBuf::from(value_at(argv, index, "--out-metrics")?));
}
"--out-status" => {
index += 1;
out_status = Some(PathBuf::from(value_at(argv, index, "--out-status")?));
}
"-h" | "--help" => return Err(usage().to_string()),
other => return Err(format!("unknown argument: {other}\n{}", usage())),
}
index += 1;
}
Ok(Args {
run_root: run_root.ok_or_else(|| format!("--run-root is required\n{}", usage()))?,
listen,
poll_secs,
instance,
once,
out_metrics,
out_status,
})
}
fn value_at<'a>(argv: &'a [String], index: usize, flag: &str) -> Result<&'a str, String> {
argv.get(index)
.map(|s| s.as_str())
.ok_or_else(|| format!("{flag} requires a value"))
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct MetricsSnapshot {
instance: String,
service: ServiceMetrics,
runs: RunScanSummary,
latest_run: Option<LatestRunMetrics>,
cumulative: CumulativeMetrics,
repo_stats: Vec<RepoMetrics>,
object_counts: BTreeMap<(String, String), u64>,
large_pp_counts: BTreeMap<u64, u64>,
pp_sync_histograms: BTreeMap<String, Histogram>,
top_repos_by_sync_duration: Vec<TopRepo>,
top_pp_by_object_count: Vec<TopPublicationPoint>,
top_pp_by_sync_duration: Vec<TopPublicationPoint>,
cir: Option<CirMetrics>,
ccr: Option<CcrMetrics>,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct ServiceMetrics {
last_scan_timestamp_seconds: f64,
last_scan_duration_seconds: f64,
last_reload_success: bool,
parse_errors: Vec<String>,
run_root: String,
runs_root: String,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct RunScanSummary {
known: u64,
success: u64,
failed: u64,
partial: u64,
consecutive_failures: u64,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct CumulativeMetrics {
completed_success_total: u64,
completed_failed_total: u64,
observed_duration_seconds_sum: f64,
observed_duration_seconds_count: u64,
observed_download_bytes_total: u64,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct LatestRunMetrics {
run_seq: u64,
run_id: String,
run_dir: String,
status: String,
sync_mode: String,
snapshot_reason: Option<String>,
started_at: Option<String>,
finished_at: Option<String>,
start_timestamp_seconds: Option<f64>,
finish_timestamp_seconds: Option<f64>,
wall_seconds: f64,
user_cpu_seconds: Option<f64>,
system_cpu_seconds: Option<f64>,
cpu_percent: Option<f64>,
max_rss_bytes: Option<u64>,
exit_code: Option<i64>,
vrps: u64,
vrps_unique: Option<u64>,
vaps: u64,
publication_points: u64,
warnings: u64,
tree_instances_processed: Option<u64>,
tree_instances_failed: Option<u64>,
stage_seconds: BTreeMap<String, f64>,
repo_sync_phase: BTreeMap<String, CountDuration>,
repo_terminal_state: BTreeMap<String, CountDuration>,
download_events: Option<u64>,
download_bytes: Option<u64>,
artifact_sizes: BTreeMap<String, u64>,
state_path_sizes: BTreeMap<String, PathSize>,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct CountDuration {
count: u64,
duration_seconds_total: f64,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct PathSize {
total_size_bytes: u64,
file_count: u64,
dir_count: u64,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct RepoMetrics {
repo_id: String,
uri: String,
host: String,
transport: String,
publication_points: u64,
sync_success: bool,
download_bytes: u64,
duration_seconds_sum: f64,
duration_seconds_max: f64,
duration_seconds_avg: f64,
phase_counts: BTreeMap<String, u64>,
terminal_state_counts: BTreeMap<String, u64>,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct TopRepo {
rank: usize,
repo_id: String,
uri: String,
host: String,
transport: String,
duration_ms_max: u64,
duration_ms_sum: u64,
publication_points: u64,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct TopPublicationPoint {
rank: usize,
pp_id: String,
repo_id: String,
uri: String,
repo_uri: String,
host: String,
transport: String,
object_count: u64,
sync_duration_ms: u64,
terminal_state: String,
phase: String,
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct Histogram {
buckets: Vec<f64>,
counts: Vec<u64>,
sum: f64,
count: u64,
}
impl Default for Histogram {
fn default() -> Self {
Self {
buckets: Vec::new(),
counts: Vec::new(),
sum: 0.0,
count: 0,
}
}
}
impl Histogram {
fn new(buckets: &[f64]) -> Self {
Self {
buckets: buckets.to_vec(),
counts: vec![0; buckets.len() + 1],
sum: 0.0,
count: 0,
}
}
fn observe(&mut self, value: f64) {
self.sum += value;
self.count += 1;
let mut placed = false;
for (index, bucket) in self.buckets.iter().enumerate() {
if value <= *bucket {
self.counts[index] += 1;
placed = true;
break;
}
}
if !placed {
let last = self.counts.len() - 1;
self.counts[last] += 1;
}
}
fn cumulative_counts(&self) -> Vec<u64> {
let mut out = Vec::with_capacity(self.counts.len());
let mut running = 0u64;
for count in &self.counts {
running += *count;
out.push(running);
}
out
}
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct CirMetrics {
version: u32,
objects: u64,
trust_anchors: u64,
rejected_objects: u64,
reject_list_sha256: String,
objects_by_type: BTreeMap<String, u64>,
rejected_objects_by_type: BTreeMap<String, u64>,
}
#[derive(Clone, Debug, Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct CcrMetrics {
version: u32,
state_present: BTreeMap<String, bool>,
state_items: BTreeMap<String, u64>,
state_digests: BTreeMap<String, String>,
}
#[derive(Clone, Debug)]
struct RunRecord {
path: PathBuf,
status: String,
summary: Option<Value>,
meta: Option<Value>,
}
fn scan_run_root(input_root: &Path, instance: &str) -> Result<MetricsSnapshot, String> {
scan_run_root_incremental(input_root, instance, None)
}
fn scan_run_root_incremental(
input_root: &Path,
instance: &str,
previous: Option<&MetricsSnapshot>,
) -> Result<MetricsSnapshot, String> {
let started = Instant::now();
let runs_root = resolve_runs_root(input_root);
let mut snapshot = MetricsSnapshot {
instance: instance.to_string(),
service: ServiceMetrics {
run_root: input_root.display().to_string(),
runs_root: runs_root.display().to_string(),
..ServiceMetrics::default()
},
..MetricsSnapshot::default()
};
let records = collect_run_records(&runs_root, &mut snapshot.service.parse_errors)?;
snapshot.runs.known = records.len() as u64;
for record in &records {
match record.status.as_str() {
"success" => snapshot.runs.success += 1,
"failed" | "spawn_failed" => snapshot.runs.failed += 1,
_ => snapshot.runs.partial += 1,
}
}
snapshot.runs.consecutive_failures = consecutive_failures(&records);
snapshot.cumulative.completed_success_total = snapshot.runs.success;
snapshot.cumulative.completed_failed_total = snapshot.runs.failed;
for record in records.iter().filter(|record| record.status == "success") {
if let Some(summary) = record.summary.as_ref() {
let wall_seconds = json_u64(summary, &["wallMs"]).unwrap_or(0) as f64 / 1000.0;
snapshot.cumulative.observed_duration_seconds_sum += wall_seconds;
snapshot.cumulative.observed_duration_seconds_count += 1;
if let Some(bytes) = json_u64(summary, &["stageTiming", "download_bytes_total"]) {
snapshot.cumulative.observed_download_bytes_total = snapshot
.cumulative
.observed_download_bytes_total
.saturating_add(bytes);
}
}
}
if let Some(latest) = records
.iter()
.rev()
.find(|record| record.status == "success")
{
if can_reuse_latest_metrics(latest, previous) {
reuse_latest_artifact_metrics(previous.expect("checked previous"), &mut snapshot);
} else {
build_latest_metrics(latest, &mut snapshot);
let _ = crate::memory_telemetry::malloc_trim_probe();
}
}
snapshot.service.last_scan_timestamp_seconds = unix_now_seconds();
snapshot.service.last_scan_duration_seconds = started.elapsed().as_secs_f64();
snapshot.service.last_reload_success = snapshot.service.parse_errors.is_empty();
Ok(snapshot)
}
fn can_reuse_latest_metrics(record: &RunRecord, previous: Option<&MetricsSnapshot>) -> bool {
let Some(previous) = previous else {
return false;
};
let Some(previous_latest) = previous.latest_run.as_ref() else {
return false;
};
if previous_latest.run_dir != record.path.display().to_string() {
return false;
}
if previous_latest.status != record.status {
return false;
}
let summary = record.summary.as_ref();
let meta = record.meta.as_ref();
let finished_at = summary
.and_then(|v| json_str(v, &["finishedAtRfc3339Utc"]))
.or_else(|| meta.and_then(|v| json_str(v, &["completed_at_rfc3339_utc"])));
let wall_seconds = summary.and_then(|v| json_u64(v, &["wallMs"])).unwrap_or(0) as f64 / 1000.0;
previous_latest.finished_at.as_deref() == finished_at
&& (previous_latest.wall_seconds - wall_seconds).abs() < f64::EPSILON
}
fn reuse_latest_artifact_metrics(previous: &MetricsSnapshot, snapshot: &mut MetricsSnapshot) {
snapshot.latest_run = previous.latest_run.clone();
snapshot.repo_stats = previous.repo_stats.clone();
snapshot.object_counts = previous.object_counts.clone();
snapshot.large_pp_counts = previous.large_pp_counts.clone();
snapshot.pp_sync_histograms = previous.pp_sync_histograms.clone();
snapshot.top_repos_by_sync_duration = previous.top_repos_by_sync_duration.clone();
snapshot.top_pp_by_object_count = previous.top_pp_by_object_count.clone();
snapshot.top_pp_by_sync_duration = previous.top_pp_by_sync_duration.clone();
snapshot.cir = previous.cir.clone();
snapshot.ccr = previous.ccr.clone();
}
fn resolve_runs_root(input_root: &Path) -> PathBuf {
let runs = input_root.join("runs");
if runs.is_dir() {
runs
} else {
input_root.to_path_buf()
}
}
fn collect_run_records(
runs_root: &Path,
errors: &mut Vec<String>,
) -> Result<Vec<RunRecord>, String> {
let mut records = Vec::new();
if !runs_root.is_dir() {
return Err(format!(
"runs root is not a directory: {}",
runs_root.display()
));
}
let entries = fs::read_dir(runs_root)
.map_err(|e| format!("read runs root failed: {}: {e}", runs_root.display()))?;
for entry in entries {
let entry = entry.map_err(|e| format!("read runs entry failed: {e}"))?;
let path = entry.path();
if !path.is_dir() {
continue;
}
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if !name.starts_with("run_") {
continue;
}
let summary = read_json_optional(&path.join("run-summary.json"), errors);
let meta = read_json_optional(&path.join("run-meta.json"), errors);
let status = classify_run_status(&summary, &meta, &path);
records.push(RunRecord {
path,
status,
summary,
meta,
});
}
records.sort_by(|left, right| left.path.cmp(&right.path));
Ok(records)
}
fn classify_run_status(summary: &Option<Value>, meta: &Option<Value>, path: &Path) -> String {
let summary_status = summary.as_ref().and_then(|v| json_str(v, &["status"]));
let meta_status = meta.as_ref().and_then(|v| json_str(v, &["status"]));
if summary_status == Some("success") && meta_status == Some("success") {
return "success".to_string();
}
if matches!(summary_status, Some("failed" | "spawn_failed"))
|| matches!(meta_status, Some("failed" | "spawn_failed"))
{
return "failed".to_string();
}
if path.join("run-summary.json").exists() || path.join("run-meta.json").exists() {
"partial".to_string()
} else {
"missing_metadata".to_string()
}
}
fn consecutive_failures(records: &[RunRecord]) -> u64 {
let mut count = 0u64;
for record in records.iter().rev() {
if record.status == "success" {
break;
}
count += 1;
}
count
}
fn read_json_optional(path: &Path, errors: &mut Vec<String>) -> Option<Value> {
if !path.exists() {
return None;
}
match fs::read(path)
.ok()
.and_then(|bytes| serde_json::from_slice::<Value>(&bytes).ok())
{
Some(value) => Some(value),
None => {
errors.push(format!("parse json failed: {}", path.display()));
None
}
}
}
fn build_latest_metrics(record: &RunRecord, snapshot: &mut MetricsSnapshot) {
let summary = record.summary.as_ref();
let meta = record.meta.as_ref();
let run_seq = summary
.and_then(|v| json_u64(v, &["runSeq"]))
.or_else(|| meta.and_then(|v| json_u64(v, &["run_index"])))
.unwrap_or_else(|| run_index_from_path(&record.path).unwrap_or(0));
let run_id = summary
.and_then(|v| json_str(v, &["runId"]))
.or_else(|| meta.and_then(|v| json_str(v, &["run_id"])))
.unwrap_or_else(|| {
record
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
})
.to_string();
let sync_mode = meta
.and_then(|v| json_str(v, &["sync_mode"]))
.unwrap_or("unknown")
.to_string();
let snapshot_reason = meta
.and_then(|v| json_str(v, &["snapshot_reason"]))
.map(|s| s.to_string());
let started_at = summary
.and_then(|v| json_str(v, &["startedAtRfc3339Utc"]))
.or_else(|| meta.and_then(|v| json_str(v, &["started_at_rfc3339_utc"])))
.map(|s| s.to_string());
let finished_at = summary
.and_then(|v| json_str(v, &["finishedAtRfc3339Utc"]))
.or_else(|| meta.and_then(|v| json_str(v, &["completed_at_rfc3339_utc"])))
.map(|s| s.to_string());
let wall_seconds = summary.and_then(|v| json_u64(v, &["wallMs"])).unwrap_or(0) as f64 / 1000.0;
let mut latest = LatestRunMetrics {
run_seq,
run_id,
run_dir: record.path.display().to_string(),
status: record.status.clone(),
sync_mode,
snapshot_reason,
started_at: started_at.clone(),
finished_at: finished_at.clone(),
start_timestamp_seconds: started_at.as_deref().and_then(parse_rfc3339_to_unix),
finish_timestamp_seconds: finished_at.as_deref().and_then(parse_rfc3339_to_unix),
wall_seconds,
user_cpu_seconds: summary.and_then(|v| json_f64(v, &["processMetrics", "userSeconds"])),
system_cpu_seconds: summary.and_then(|v| json_f64(v, &["processMetrics", "systemSeconds"])),
cpu_percent: summary.and_then(|v| json_f64(v, &["processMetrics", "cpuPercent"])),
max_rss_bytes: summary
.and_then(|v| json_u64(v, &["processMetrics", "maxRssKb"]))
.map(|kb| kb.saturating_mul(1024)),
exit_code: summary.and_then(|v| json_i64(v, &["exitCode"])),
..LatestRunMetrics::default()
};
if let Some(summary) = summary {
latest.vrps = json_u64(summary, &["reportCounts", "vrps"]).unwrap_or(0);
latest.vaps = json_u64(summary, &["reportCounts", "aspas"]).unwrap_or(0);
latest.publication_points =
json_u64(summary, &["reportCounts", "publicationPoints"]).unwrap_or(0);
latest.warnings = json_u64(summary, &["reportCounts", "warnings"]).unwrap_or(0);
latest.tree_instances_processed =
json_u64(summary, &["reportCounts", "treeInstancesProcessed"]);
latest.tree_instances_failed = json_u64(summary, &["reportCounts", "treeInstancesFailed"]);
latest.stage_seconds = extract_stage_seconds(summary.get("stageTiming"));
latest.repo_sync_phase =
extract_count_duration_map(summary.pointer("/repoSyncStats/by_phase"));
latest.repo_terminal_state =
extract_count_duration_map(summary.pointer("/repoSyncStats/by_terminal_state"));
latest.download_events = json_u64(summary, &["stageTiming", "download_event_count"]);
latest.download_bytes = json_u64(summary, &["stageTiming", "download_bytes_total"]);
latest.artifact_sizes = extract_artifact_sizes(summary.get("artifacts"));
latest.state_path_sizes = extract_path_sizes(summary.get("pathStats"));
}
parse_report(&record.path.join("report.json"), snapshot, &mut latest);
latest.vrps_unique = count_vrp_csv_unique_keys_opt(
&record.path.join("vrps.csv"),
&mut snapshot.service.parse_errors,
);
parse_cir(&record.path.join("input.cir"), snapshot);
parse_ccr(&record.path.join("result.ccr"), snapshot);
snapshot.latest_run = Some(latest);
}
fn count_vrp_csv_unique_keys_opt(path: &Path, errors: &mut Vec<String>) -> Option<u64> {
if !path.exists() {
return None;
}
match count_vrp_csv_unique_keys(path) {
Ok(count) => Some(count),
Err(err) => {
errors.push(err);
None
}
}
}
fn count_vrp_csv_unique_keys(path: &Path) -> Result<u64, String> {
let content = fs::read_to_string(path)
.map_err(|e| format!("read VRP CSV failed: {}: {e}", path.display()))?;
let mut unique = BTreeSet::new();
for (index, line) in data_csv_lines(&content).enumerate() {
if index == 0 {
continue;
}
let columns = split_csv_simple(line);
if columns.len() < 3 {
return Err(format!(
"invalid VRP CSV row in {}: expected at least 3 columns, got {}",
path.display(),
columns.len()
));
}
unique.insert((
columns[0].to_string(),
columns[1].to_string(),
columns[2].to_string(),
));
}
Ok(unique.len() as u64)
}
fn data_csv_lines(content: &str) -> impl Iterator<Item = &str> {
content
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.filter(|line| !line.starts_with('#'))
}
fn split_csv_simple(line: &str) -> Vec<&str> {
line.split(',').map(str::trim).collect()
}
fn parse_report(path: &Path, snapshot: &mut MetricsSnapshot, latest: &mut LatestRunMetrics) {
if !path.exists() {
return;
}
let Ok(bytes) = fs::read(path) else {
snapshot
.service
.parse_errors
.push(format!("read report.json failed: {}", path.display()));
return;
};
let Ok(report) = serde_json::from_slice::<Value>(&bytes) else {
snapshot
.service
.parse_errors
.push(format!("parse report.json failed: {}", path.display()));
return;
};
if latest.vrps == 0 {
latest.vrps = report
.get("vrps")
.and_then(|v| v.as_array())
.map(|a| a.len() as u64)
.unwrap_or(0);
}
if latest.vaps == 0 {
latest.vaps = report
.get("aspas")
.and_then(|v| v.as_array())
.map(|a| a.len() as u64)
.unwrap_or(0);
}
latest.warnings = latest.warnings.max(
report
.pointer("/tree/warnings")
.and_then(|v| v.as_array())
.map(|a| a.len() as u64)
.unwrap_or(0),
);
if let Some(processed) = json_u64(&report, &["tree", "instances_processed"]) {
latest.tree_instances_processed = Some(processed);
}
if let Some(failed) = json_u64(&report, &["tree", "instances_failed"]) {
latest.tree_instances_failed = Some(failed);
}
if latest.repo_sync_phase.is_empty() {
latest.repo_sync_phase =
extract_count_duration_map(report.pointer("/repo_sync_stats/by_phase"));
}
if latest.repo_terminal_state.is_empty() {
latest.repo_terminal_state =
extract_count_duration_map(report.pointer("/repo_sync_stats/by_terminal_state"));
}
if let Some(pps) = report.get("publication_points").and_then(|v| v.as_array()) {
latest.publication_points = pps.len() as u64;
extract_publication_point_metrics(pps, report.get("downloads"), snapshot);
}
}
fn extract_publication_point_metrics(
pps: &[Value],
downloads: Option<&Value>,
snapshot: &mut MetricsSnapshot,
) {
let mut repos: BTreeMap<String, RepoMetrics> = BTreeMap::new();
let mut pp_by_object_count = Vec::<TopPublicationPoint>::new();
let mut pp_by_sync_duration = Vec::<TopPublicationPoint>::new();
let mut large_pp_counts = BTreeMap::<u64, u64>::new();
let mut pp_sync_histograms = BTreeMap::<String, Histogram>::new();
let mut object_counts = BTreeMap::<(String, String), u64>::new();
for pp in pps {
let pp_uri = json_str(pp, &["publication_point_rsync_uri"])
.or_else(|| json_str(pp, &["manifest_rsync_uri"]))
.or_else(|| json_str(pp, &["rsync_base_uri"]))
.unwrap_or("unknown");
let repo_uri = json_str(pp, &["rrdp_notification_uri"])
.or_else(|| json_str(pp, &["rsync_base_uri"]))
.or_else(|| json_str(pp, &["publication_point_rsync_uri"]))
.unwrap_or(pp_uri);
let repo_id = short_sha256(repo_uri);
let pp_id = short_sha256(pp_uri);
let host = uri_host(repo_uri);
let transport = json_str(pp, &["repo_sync_source"])
.or_else(|| json_str(pp, &["source"]))
.map(normalize_transport)
.unwrap_or_else(|| infer_transport(repo_uri));
let duration_ms = json_u64(pp, &["repo_sync_duration_ms"]).unwrap_or(0);
let duration_seconds = duration_ms as f64 / 1000.0;
let phase = json_str(pp, &["repo_sync_phase"])
.unwrap_or("unknown")
.to_string();
let terminal_state = json_str(pp, &["repo_terminal_state"])
.unwrap_or("unknown")
.to_string();
let object_count = pp
.get("objects")
.and_then(|v| v.as_array())
.map(|a| a.len() as u64)
.unwrap_or(0);
let repo = repos.entry(repo_id.clone()).or_insert_with(|| RepoMetrics {
repo_id: repo_id.clone(),
uri: repo_uri.to_string(),
host: host.clone(),
transport: transport.clone(),
sync_success: true,
..RepoMetrics::default()
});
repo.publication_points += 1;
repo.duration_seconds_sum += duration_seconds;
repo.duration_seconds_max = repo.duration_seconds_max.max(duration_seconds);
if !is_success_terminal_state(&terminal_state) {
repo.sync_success = false;
}
*repo.phase_counts.entry(phase.clone()).or_default() += 1;
*repo
.terminal_state_counts
.entry(terminal_state.clone())
.or_default() += 1;
for threshold in LARGE_PP_OBJECT_THRESHOLDS {
if object_count > *threshold {
*large_pp_counts.entry(*threshold).or_default() += 1;
}
}
pp_sync_histograms
.entry(transport.clone())
.or_insert_with(|| Histogram::new(PP_SYNC_SECONDS_BUCKETS))
.observe(duration_seconds);
if let Some(objects) = pp.get("objects").and_then(|v| v.as_array()) {
for object in objects {
let kind = json_str(object, &["kind"]).unwrap_or("unknown").to_string();
let result = json_str(object, &["result"])
.unwrap_or("unknown")
.to_string();
*object_counts.entry((kind, result)).or_default() += 1;
}
}
let top = TopPublicationPoint {
rank: 0,
pp_id,
repo_id,
uri: pp_uri.to_string(),
repo_uri: repo_uri.to_string(),
host,
transport,
object_count,
sync_duration_ms: duration_ms,
terminal_state,
phase,
};
pp_by_object_count.push(top.clone());
pp_by_sync_duration.push(top);
}
let mut repo_stats = repos.into_values().collect::<Vec<_>>();
assign_download_bytes_to_repos(&mut repo_stats, downloads);
for repo in &mut repo_stats {
if repo.publication_points > 0 {
repo.duration_seconds_avg = repo.duration_seconds_sum / repo.publication_points as f64;
}
}
let mut top_repos = repo_stats
.iter()
.map(|repo| TopRepo {
rank: 0,
repo_id: repo.repo_id.clone(),
uri: repo.uri.clone(),
host: repo.host.clone(),
transport: repo.transport.clone(),
duration_ms_max: (repo.duration_seconds_max * 1000.0).round() as u64,
duration_ms_sum: (repo.duration_seconds_sum * 1000.0).round() as u64,
publication_points: repo.publication_points,
})
.collect::<Vec<_>>();
top_repos.sort_by(|a, b| b.duration_ms_max.cmp(&a.duration_ms_max));
top_repos.truncate(20);
for (index, item) in top_repos.iter_mut().enumerate() {
item.rank = index + 1;
}
pp_by_object_count.sort_by(|a, b| b.object_count.cmp(&a.object_count));
pp_by_object_count.truncate(20);
for (index, item) in pp_by_object_count.iter_mut().enumerate() {
item.rank = index + 1;
}
pp_by_sync_duration.sort_by(|a, b| b.sync_duration_ms.cmp(&a.sync_duration_ms));
pp_by_sync_duration.truncate(20);
for (index, item) in pp_by_sync_duration.iter_mut().enumerate() {
item.rank = index + 1;
}
snapshot.repo_stats = repo_stats;
snapshot.object_counts = object_counts;
snapshot.large_pp_counts = large_pp_counts;
snapshot.pp_sync_histograms = pp_sync_histograms;
snapshot.top_repos_by_sync_duration = top_repos;
snapshot.top_pp_by_object_count = pp_by_object_count;
snapshot.top_pp_by_sync_duration = pp_by_sync_duration;
}
fn assign_download_bytes_to_repos(repos: &mut [RepoMetrics], downloads: Option<&Value>) {
let Some(downloads) = downloads.and_then(|v| v.as_array()) else {
return;
};
for download in downloads {
let Some(uri) = json_str(download, &["uri"]) else {
continue;
};
let bytes = json_u64(download, &["bytes"]).unwrap_or(0);
if bytes == 0 {
continue;
}
if let Some(index) = find_repo_for_download(repos, uri) {
repos[index].download_bytes = repos[index].download_bytes.saturating_add(bytes);
}
}
}
fn find_repo_for_download(repos: &[RepoMetrics], uri: &str) -> Option<usize> {
if let Some(index) = repos.iter().position(|repo| repo.uri == uri) {
return Some(index);
}
if uri.starts_with("rsync://") {
return repos
.iter()
.enumerate()
.filter(|(_, repo)| uri.starts_with(&repo.uri))
.max_by_key(|(_, repo)| repo.uri.len())
.map(|(index, _)| index);
}
let uri_host = uri_host(uri);
let mut candidates = repos
.iter()
.enumerate()
.filter(|(_, repo)| repo.host == uri_host && repo.uri.starts_with("http"))
.collect::<Vec<_>>();
if candidates.len() == 1 {
return Some(candidates[0].0);
}
candidates.sort_by_key(|(_, repo)| common_prefix_len(&repo.uri, uri));
candidates
.last()
.and_then(|(index, repo)| (common_prefix_len(&repo.uri, uri) > 0).then_some(*index))
}
fn is_success_terminal_state(state: &str) -> bool {
matches!(state, "fresh" | "cached" | "reused" | "valid")
}
fn parse_cir(path: &Path, snapshot: &mut MetricsSnapshot) {
if !path.exists() {
return;
}
match fs::read(path)
.map_err(|e| e.to_string())
.and_then(|bytes| decode_cir(&bytes).map_err(|e| e.to_string()))
{
Ok(cir) => {
let mut objects_by_type = BTreeMap::new();
for object in &cir.objects {
*objects_by_type
.entry(object_type_from_uri(&object.rsync_uri))
.or_default() += 1;
}
let mut rejected_objects_by_type = BTreeMap::new();
for object in &cir.rejected_objects {
*rejected_objects_by_type
.entry(object_type_from_uri(&object.object_uri))
.or_default() += 1;
}
snapshot.cir = Some(CirMetrics {
version: cir.version,
objects: cir.objects.len() as u64,
trust_anchors: cir.trust_anchors.len() as u64,
rejected_objects: cir.rejected_objects.len() as u64,
reject_list_sha256: hex::encode(&cir.reject_list_sha256),
objects_by_type,
rejected_objects_by_type,
});
}
Err(err) => snapshot
.service
.parse_errors
.push(format!("decode CIR failed: {}: {err}", path.display())),
}
}
fn parse_ccr(path: &Path, snapshot: &mut MetricsSnapshot) {
if !path.exists() {
return;
}
match fs::read(path)
.map_err(|e| e.to_string())
.and_then(|bytes| decode_content_info(&bytes).map_err(|e| e.to_string()))
{
Ok(ccr) => {
let content = ccr.content;
let mut state_present = BTreeMap::new();
let mut state_items = BTreeMap::new();
let mut state_digests = BTreeMap::new();
if let Some(state) = content.mfts.as_ref() {
state_present.insert("mfts".to_string(), true);
state_items.insert("mfts".to_string(), state.mis.len() as u64);
state_digests.insert("mfts".to_string(), hex::encode(&state.hash));
} else {
state_present.insert("mfts".to_string(), false);
}
if let Some(state) = content.vrps.as_ref() {
state_present.insert("vrps".to_string(), true);
state_items.insert("vrps".to_string(), state.rps.len() as u64);
state_digests.insert("vrps".to_string(), hex::encode(&state.hash));
} else {
state_present.insert("vrps".to_string(), false);
}
if let Some(state) = content.vaps.as_ref() {
state_present.insert("vaps".to_string(), true);
state_items.insert("vaps".to_string(), state.aps.len() as u64);
state_digests.insert("vaps".to_string(), hex::encode(&state.hash));
} else {
state_present.insert("vaps".to_string(), false);
}
if let Some(state) = content.tas.as_ref() {
state_present.insert("tas".to_string(), true);
state_items.insert("tas".to_string(), state.skis.len() as u64);
state_digests.insert("tas".to_string(), hex::encode(&state.hash));
} else {
state_present.insert("tas".to_string(), false);
}
if let Some(state) = content.rks.as_ref() {
state_present.insert("rks".to_string(), true);
state_items.insert("rks".to_string(), state.rksets.len() as u64);
state_digests.insert("rks".to_string(), hex::encode(&state.hash));
} else {
state_present.insert("rks".to_string(), false);
}
snapshot.ccr = Some(CcrMetrics {
version: content.version,
state_present,
state_items,
state_digests,
});
}
Err(err) => snapshot
.service
.parse_errors
.push(format!("decode CCR failed: {}: {err}", path.display())),
}
}
fn render_metrics(snapshot: &MetricsSnapshot) -> String {
let mut out = String::new();
let mut writer = PromWriter::new(&mut out);
let instance = snapshot.instance.as_str();
writer.gauge(
"ours_rp_metrics_service_up",
"Artifact metrics service is up",
&[label("instance", instance)],
1.0,
);
writer.gauge(
"ours_rp_metrics_service_last_scan_timestamp_seconds",
"Unix timestamp of the last artifact scan",
&[label("instance", instance)],
snapshot.service.last_scan_timestamp_seconds,
);
writer.gauge(
"ours_rp_metrics_service_last_scan_duration_seconds",
"Duration of the last artifact scan",
&[label("instance", instance)],
snapshot.service.last_scan_duration_seconds,
);
writer.gauge(
"ours_rp_metrics_service_last_reload_success",
"Whether the last artifact reload had no parse errors",
&[label("instance", instance)],
bool_value(snapshot.service.last_reload_success),
);
writer.gauge(
"ours_rp_metrics_service_parse_errors",
"Current parse error count",
&[label("instance", instance)],
snapshot.service.parse_errors.len() as f64,
);
writer.gauge(
"ours_rp_metrics_service_known_runs",
"Known run directories by status",
&[label("instance", instance), label("status", "success")],
snapshot.runs.success as f64,
);
writer.gauge(
"ours_rp_metrics_service_known_runs",
"Known run directories by status",
&[label("instance", instance), label("status", "failed")],
snapshot.runs.failed as f64,
);
writer.gauge(
"ours_rp_metrics_service_known_runs",
"Known run directories by status",
&[label("instance", instance), label("status", "partial")],
snapshot.runs.partial as f64,
);
writer.counter(
"ours_rp_run_completed_total",
"Completed runs observed by the artifact metrics service",
&[label("instance", instance), label("status", "success")],
snapshot.cumulative.completed_success_total as f64,
);
writer.counter(
"ours_rp_run_completed_total",
"Completed runs observed by the artifact metrics service",
&[label("instance", instance), label("status", "failed")],
snapshot.cumulative.completed_failed_total as f64,
);
writer.counter(
"ours_rp_run_observed_duration_seconds_sum",
"Observed wall duration sum for successful runs",
&[label("instance", instance)],
snapshot.cumulative.observed_duration_seconds_sum,
);
writer.counter(
"ours_rp_run_observed_duration_seconds_count",
"Observed wall duration count for successful runs",
&[label("instance", instance)],
snapshot.cumulative.observed_duration_seconds_count as f64,
);
writer.counter(
"ours_rp_run_observed_download_bytes_total",
"Observed download bytes across successful runs",
&[label("instance", instance)],
snapshot.cumulative.observed_download_bytes_total as f64,
);
writer.gauge(
"ours_rp_run_consecutive_failures",
"Consecutive non-success runs at the end of the run list",
&[label("instance", instance)],
snapshot.runs.consecutive_failures as f64,
);
if let Some(latest) = snapshot.latest_run.as_ref() {
render_latest_metrics(&mut writer, instance, latest);
}
render_repo_metrics(&mut writer, instance, &snapshot.repo_stats);
render_failed_repo_metrics(&mut writer, instance, &snapshot.repo_stats);
render_top_repo_metrics(&mut writer, instance, &snapshot.top_repos_by_sync_duration);
render_object_metrics(&mut writer, instance, &snapshot.object_counts);
render_large_pp_metrics(&mut writer, instance, &snapshot.large_pp_counts);
render_top_publication_point_metrics(&mut writer, instance, &snapshot.top_pp_by_object_count);
for (transport, histogram) in &snapshot.pp_sync_histograms {
writer.histogram(
"ours_rp_publication_point_sync_duration_seconds",
"Distribution of sync duration per publication point",
&[label("instance", instance), label("transport", transport)],
histogram,
);
}
if let Some(cir) = snapshot.cir.as_ref() {
render_cir_metrics(&mut writer, instance, cir);
}
if let Some(ccr) = snapshot.ccr.as_ref() {
render_ccr_metrics(&mut writer, instance, ccr);
}
out
}
fn render_latest_metrics(writer: &mut PromWriter<'_>, instance: &str, latest: &LatestRunMetrics) {
writer.gauge(
"ours_rp_run_sequence",
"Latest successful run sequence",
&[label("instance", instance)],
latest.run_seq as f64,
);
writer.gauge(
"ours_rp_run_success",
"Whether the latest selected run is successful",
&[label("instance", instance)],
bool_value(latest.status == "success"),
);
writer.gauge(
"ours_rp_run_sync_mode",
"Latest run sync mode state",
&[
label("instance", instance),
label("sync_mode", &latest.sync_mode),
],
1.0,
);
if let Some(ts) = latest.start_timestamp_seconds {
writer.gauge(
"ours_rp_run_start_timestamp_seconds",
"Latest run start timestamp",
&[label("instance", instance)],
ts,
);
}
if let Some(ts) = latest.finish_timestamp_seconds {
writer.gauge(
"ours_rp_run_finish_timestamp_seconds",
"Latest run finish timestamp",
&[label("instance", instance)],
ts,
);
}
writer.gauge(
"ours_rp_run_duration_seconds",
"Latest run wall duration",
&[label("instance", instance)],
latest.wall_seconds,
);
if let Some(value) = latest.user_cpu_seconds {
writer.gauge(
"ours_rp_run_user_cpu_seconds",
"Latest run user CPU seconds",
&[label("instance", instance)],
value,
);
}
if let Some(value) = latest.system_cpu_seconds {
writer.gauge(
"ours_rp_run_system_cpu_seconds",
"Latest run system CPU seconds",
&[label("instance", instance)],
value,
);
}
if let Some(value) = latest.cpu_percent {
writer.gauge(
"ours_rp_run_cpu_percent",
"Latest run CPU percent from GNU time",
&[label("instance", instance)],
value,
);
}
if let Some(value) = latest.max_rss_bytes {
writer.gauge(
"ours_rp_run_max_rss_bytes",
"Latest run maximum resident set size",
&[label("instance", instance)],
value as f64,
);
}
if let Some(value) = latest.exit_code {
writer.gauge(
"ours_rp_run_exit_code",
"Latest run exit code",
&[label("instance", instance)],
value as f64,
);
}
writer.gauge(
"ours_rp_vrps",
"Latest run VRP count",
&[label("instance", instance), label("kind", "total")],
latest.vrps as f64,
);
if let Some(value) = latest.vrps_unique {
writer.gauge(
"ours_rp_vrps",
"Latest run VRP count",
&[label("instance", instance), label("kind", "unique")],
value as f64,
);
}
writer.gauge(
"ours_rp_vaps",
"Latest run VAP/ASPA count",
&[label("instance", instance), label("kind", "total")],
latest.vaps as f64,
);
writer.gauge(
"ours_rp_publication_points",
"Latest run publication point count",
&[label("instance", instance)],
latest.publication_points as f64,
);
writer.gauge(
"ours_rp_warnings",
"Latest run warning count",
&[label("instance", instance)],
latest.warnings as f64,
);
if let Some(value) = latest.tree_instances_processed {
writer.gauge(
"ours_rp_tree_instances",
"Latest run tree instances by state",
&[label("instance", instance), label("state", "processed")],
value as f64,
);
}
if let Some(value) = latest.tree_instances_failed {
writer.gauge(
"ours_rp_tree_instances",
"Latest run tree instances by state",
&[label("instance", instance), label("state", "failed")],
value as f64,
);
}
for (stage, value) in &latest.stage_seconds {
writer.gauge(
"ours_rp_run_stage_duration_seconds",
"Latest run stage duration",
&[label("instance", instance), label("stage", stage)],
*value,
);
}
for (phase, stat) in &latest.repo_sync_phase {
writer.gauge(
"ours_rp_repo_sync_phase_count",
"Publication points by repo sync phase",
&[label("instance", instance), label("phase", phase)],
stat.count as f64,
);
writer.gauge(
"ours_rp_repo_sync_phase_duration_seconds_total",
"Repo sync phase cumulative duration in latest run",
&[label("instance", instance), label("phase", phase)],
stat.duration_seconds_total,
);
}
for (state, stat) in &latest.repo_terminal_state {
writer.gauge(
"ours_rp_repo_terminal_state_count",
"Publication points by terminal state",
&[label("instance", instance), label("terminal_state", state)],
stat.count as f64,
);
writer.gauge(
"ours_rp_repo_terminal_state_duration_seconds_total",
"Terminal state cumulative duration in latest run",
&[label("instance", instance), label("terminal_state", state)],
stat.duration_seconds_total,
);
}
if let Some(value) = latest.download_events {
writer.gauge(
"ours_rp_download_events",
"Latest run download event count",
&[label("instance", instance)],
value as f64,
);
}
if let Some(value) = latest.download_bytes {
writer.gauge(
"ours_rp_download_bytes",
"Latest run download bytes",
&[label("instance", instance)],
value as f64,
);
}
for (artifact, size) in &latest.artifact_sizes {
writer.gauge(
"ours_rp_artifact_size_bytes",
"Latest run artifact size",
&[label("instance", instance), label("artifact", artifact)],
*size as f64,
);
}
for (path, stat) in &latest.state_path_sizes {
writer.gauge(
"ours_rp_state_path_size_bytes",
"State path size",
&[label("instance", instance), label("path", path)],
stat.total_size_bytes as f64,
);
writer.gauge(
"ours_rp_state_path_files",
"State path file count",
&[label("instance", instance), label("path", path)],
stat.file_count as f64,
);
}
}
fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) {
for repo in repos {
let base = [
label("instance", instance),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("uri", &repo.uri),
label("transport", &repo.transport),
];
writer.gauge("ours_rp_repository_info", "Repository metadata", &base, 1.0);
writer.gauge(
"ours_rp_repository_publication_points",
"Publication points per repository",
&base,
repo.publication_points as f64,
);
writer.gauge(
"ours_rp_repository_sync_success",
"Whether repository sync is successful in the latest run",
&base,
bool_value(repo.sync_success),
);
writer.gauge(
"ours_rp_repository_download_bytes",
"Repository download bytes attributed from latest run download events",
&base,
repo.download_bytes as f64,
);
for (stat, value) in [
("sum", repo.duration_seconds_sum),
("max", repo.duration_seconds_max),
("avg", repo.duration_seconds_avg),
] {
let labels = [
label("instance", instance),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("transport", &repo.transport),
label("stat", stat),
];
writer.gauge(
"ours_rp_repository_sync_duration_seconds",
"Repository sync duration summary",
&labels,
value,
);
}
for (phase, count) in &repo.phase_counts {
let labels = [
label("instance", instance),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("phase", phase),
];
writer.gauge(
"ours_rp_repository_sync_phase_publication_points",
"Repository publication points by sync phase",
&labels,
*count as f64,
);
}
for (state, count) in &repo.terminal_state_counts {
let labels = [
label("instance", instance),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("terminal_state", state),
];
writer.gauge(
"ours_rp_repository_terminal_state_publication_points",
"Repository publication points by terminal state",
&labels,
*count as f64,
);
}
}
}
fn render_failed_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) {
for repo in repos {
if repo.phase_counts.contains_key("rrdp_failed_rsync_failed") {
writer.gauge(
"ours_rp_rrdp_rsync_failed_repository_duration_seconds",
"Repositories whose RRDP and rsync sync both failed; value is max sync duration when available",
&[
label("instance", instance),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("phase", "rrdp_failed_rsync_failed"),
label("transport", &repo.transport),
label("uri", &repo.uri),
],
repo.duration_seconds_max,
);
}
}
}
fn render_top_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[TopRepo]) {
for repo in repos {
writer.gauge(
"ours_rp_top_repository_sync_duration_seconds",
"Top repositories by max sync duration in latest run",
&[
label("instance", instance),
label("rank", &repo.rank.to_string()),
label("repo_id", &repo.repo_id),
label("host", &repo.host),
label("transport", &repo.transport),
label("publication_points", &repo.publication_points.to_string()),
label("uri", &repo.uri),
],
repo.duration_ms_max as f64 / 1000.0,
);
}
}
fn render_object_metrics(
writer: &mut PromWriter<'_>,
instance: &str,
counts: &BTreeMap<(String, String), u64>,
) {
for ((object_type, result), count) in counts {
writer.gauge(
"ours_rp_objects",
"Latest run audited objects by type and result",
&[
label("instance", instance),
label("object_type", object_type),
label("result", result),
],
*count as f64,
);
}
}
fn render_top_publication_point_metrics(
writer: &mut PromWriter<'_>,
instance: &str,
publication_points: &[TopPublicationPoint],
) {
for publication_point in publication_points {
writer.gauge(
"ours_rp_top_publication_point_object_count",
"Top publication points by object count in latest run",
&[
label("instance", instance),
label("rank", &publication_point.rank.to_string()),
label("pp_id", &publication_point.pp_id),
label("repo_id", &publication_point.repo_id),
label("host", &publication_point.host),
label("transport", &publication_point.transport),
label("terminal_state", &publication_point.terminal_state),
label("phase", &publication_point.phase),
label("uri", &publication_point.uri),
],
publication_point.object_count as f64,
);
}
}
fn render_large_pp_metrics(
writer: &mut PromWriter<'_>,
instance: &str,
counts: &BTreeMap<u64, u64>,
) {
for threshold in LARGE_PP_OBJECT_THRESHOLDS {
writer.gauge(
"ours_rp_large_publication_points",
"Publication points with object count greater than threshold",
&[
label("instance", instance),
label("object_count_gt", &threshold.to_string()),
],
counts.get(threshold).copied().unwrap_or(0) as f64,
);
}
}
fn render_cir_metrics(writer: &mut PromWriter<'_>, instance: &str, cir: &CirMetrics) {
writer.gauge(
"ours_rp_cir_version",
"CIR version",
&[label("instance", instance)],
cir.version as f64,
);
writer.gauge(
"ours_rp_cir_objects",
"CIR object count",
&[label("instance", instance)],
cir.objects as f64,
);
writer.gauge(
"ours_rp_cir_trust_anchors",
"CIR trust anchor count",
&[label("instance", instance)],
cir.trust_anchors as f64,
);
writer.gauge(
"ours_rp_cir_rejected_objects",
"CIR rejected object count",
&[label("instance", instance)],
cir.rejected_objects as f64,
);
writer.gauge(
"ours_rp_cir_reject_list_digest_present",
"CIR reject list digest is present",
&[label("instance", instance)],
if cir.reject_list_sha256.len() == 64 {
1.0
} else {
0.0
},
);
for (object_type, count) in &cir.objects_by_type {
writer.gauge(
"ours_rp_cir_objects_by_type",
"CIR object count by file type",
&[
label("instance", instance),
label("object_type", object_type),
],
*count as f64,
);
}
for (object_type, count) in &cir.rejected_objects_by_type {
writer.gauge(
"ours_rp_cir_rejected_objects_by_type",
"CIR rejected object count by file type",
&[
label("instance", instance),
label("object_type", object_type),
],
*count as f64,
);
}
}
fn render_ccr_metrics(writer: &mut PromWriter<'_>, instance: &str, ccr: &CcrMetrics) {
writer.gauge(
"ours_rp_ccr_version",
"CCR version",
&[label("instance", instance)],
ccr.version as f64,
);
for (state, present) in &ccr.state_present {
writer.gauge(
"ours_rp_ccr_state_present",
"CCR state presence",
&[label("instance", instance), label("state", state)],
bool_value(*present),
);
}
for (state, count) in &ccr.state_items {
writer.gauge(
"ours_rp_ccr_state_items",
"CCR state item count",
&[label("instance", instance), label("state", state)],
*count as f64,
);
}
for state in ccr.state_digests.keys() {
writer.gauge(
"ours_rp_ccr_state_digest_present",
"CCR state digest presence",
&[label("instance", instance), label("state", state)],
1.0,
);
}
}
fn render_status_json(snapshot: &MetricsSnapshot) -> Result<String, String> {
serde_json::to_string_pretty(&json!({
"schemaVersion": 1,
"generatedBy": "rpki_artifact_metrics",
"instance": snapshot.instance,
"service": snapshot.service,
"runs": snapshot.runs,
"latestRun": snapshot.latest_run,
"cir": snapshot.cir,
"ccr": snapshot.ccr,
"topRepositoriesBySyncDuration": snapshot.top_repos_by_sync_duration,
"topPublicationPointsByObjectCount": snapshot.top_pp_by_object_count,
"topPublicationPointsBySyncDuration": snapshot.top_pp_by_sync_duration,
}))
.map_err(|e| e.to_string())
}
struct PromWriter<'a> {
out: &'a mut String,
emitted_headers: BTreeSet<String>,
}
#[derive(Clone, Debug)]
struct Label<'a> {
key: &'a str,
value: &'a str,
}
fn label<'a>(key: &'a str, value: &'a str) -> Label<'a> {
Label { key, value }
}
impl<'a> PromWriter<'a> {
fn new(out: &'a mut String) -> Self {
Self {
out,
emitted_headers: BTreeSet::new(),
}
}
fn gauge(&mut self, name: &str, help: &str, labels: &[Label<'_>], value: f64) {
self.metric("gauge", name, help, labels, value);
}
fn counter(&mut self, name: &str, help: &str, labels: &[Label<'_>], value: f64) {
self.metric("counter", name, help, labels, value);
}
fn metric(
&mut self,
metric_type: &str,
name: &str,
help: &str,
labels: &[Label<'_>],
value: f64,
) {
self.header(name, help, metric_type);
self.out.push_str(name);
write_labels(self.out, labels);
self.out.push(' ');
self.out.push_str(&format_prom_value(value));
self.out.push('\n');
}
fn histogram(
&mut self,
name: &str,
help: &str,
base_labels: &[Label<'_>],
histogram: &Histogram,
) {
self.header(name, help, "histogram");
let cumulative = histogram.cumulative_counts();
for (index, count) in cumulative.iter().enumerate() {
let le = if index < histogram.buckets.len() {
format_prom_value(histogram.buckets[index])
} else {
"+Inf".to_string()
};
let mut labels = base_labels.to_vec();
labels.push(label("le", &le));
self.out.push_str(name);
self.out.push_str("_bucket");
write_labels(self.out, &labels);
self.out.push(' ');
self.out.push_str(&count.to_string());
self.out.push('\n');
}
self.out.push_str(name);
self.out.push_str("_sum");
write_labels(self.out, base_labels);
self.out.push(' ');
self.out.push_str(&format_prom_value(histogram.sum));
self.out.push('\n');
self.out.push_str(name);
self.out.push_str("_count");
write_labels(self.out, base_labels);
self.out.push(' ');
self.out.push_str(&histogram.count.to_string());
self.out.push('\n');
}
fn header(&mut self, name: &str, help: &str, metric_type: &str) {
if self.emitted_headers.insert(name.to_string()) {
self.out.push_str("# HELP ");
self.out.push_str(name);
self.out.push(' ');
self.out.push_str(&escape_help(help));
self.out.push('\n');
self.out.push_str("# TYPE ");
self.out.push_str(name);
self.out.push(' ');
self.out.push_str(metric_type);
self.out.push('\n');
}
}
}
fn write_labels(out: &mut String, labels: &[Label<'_>]) {
if labels.is_empty() {
return;
}
out.push('{');
for (index, label) in labels.iter().enumerate() {
if index > 0 {
out.push(',');
}
out.push_str(label.key);
out.push_str("=\"");
out.push_str(&escape_label(label.value));
out.push('"');
}
out.push('}');
}
fn serve_http(listen: &str, shared: Arc<RwLock<MetricsSnapshot>>) -> Result<(), String> {
let listener = TcpListener::bind(listen).map_err(|e| format!("bind failed: {listen}: {e}"))?;
for stream in listener.incoming() {
match stream {
Ok(mut stream) => {
let snapshot = shared.read().expect("metrics lock poisoned").clone();
if let Err(err) = handle_http_stream(&mut stream, &snapshot) {
eprintln!("http request failed: {err}");
}
}
Err(err) => eprintln!("accept failed: {err}"),
}
}
Ok(())
}
fn handle_http_stream(stream: &mut TcpStream, snapshot: &MetricsSnapshot) -> Result<(), String> {
let mut buf = [0u8; 4096];
let len = stream.read(&mut buf).map_err(|e| e.to_string())?;
let req = String::from_utf8_lossy(&buf[..len]);
let path = req
.lines()
.next()
.and_then(|line| line.split_whitespace().nth(1))
.unwrap_or("/");
match path {
"/metrics" => write_http_response(
stream,
"200 OK",
"text/plain; version=0.0.4",
&render_metrics(snapshot),
),
"/status" => write_http_response(
stream,
"200 OK",
"application/json",
&render_status_json(snapshot)?,
),
"/healthz" => write_http_response(stream, "200 OK", "text/plain", "ok\n"),
_ => write_http_response(stream, "404 Not Found", "text/plain", "not found\n"),
}
}
fn write_http_response(
stream: &mut TcpStream,
status: &str,
content_type: &str,
body: &str,
) -> Result<(), String> {
let header = format!(
"HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.as_bytes().len()
);
stream
.write_all(header.as_bytes())
.map_err(|e| e.to_string())?;
stream.write_all(body.as_bytes()).map_err(|e| e.to_string())
}
fn write_file(path: &Path, bytes: &[u8]) -> Result<(), String> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?;
}
fs::write(path, bytes).map_err(|e| format!("write failed: {}: {e}", path.display()))
}
fn extract_stage_seconds(value: Option<&Value>) -> BTreeMap<String, f64> {
let mut out = BTreeMap::new();
let Some(value) = value else {
return out;
};
let mapping = [
("validation_ms", "validation"),
("report_build_ms", "report_build"),
("report_write_ms", "report_write"),
("ccr_build_ms", "ccr_build"),
("ccr_write_ms", "ccr_write"),
("compare_view_build_ms", "compare_view_build"),
("compare_view_write_ms", "compare_view_write"),
("cir_build_cir_ms", "cir_build"),
("cir_write_cir_ms", "cir_write"),
("cir_total_ms", "cir_total"),
("total_ms", "total"),
("repo_sync_ms_total", "repo_sync_total"),
("rrdp_download_ms_total", "rrdp_download_total"),
("rsync_download_ms_total", "rsync_download_total"),
];
for (field, stage) in mapping {
if let Some(ms) = json_u64(value, &[field]) {
out.insert(stage.to_string(), ms as f64 / 1000.0);
}
}
out
}
fn extract_count_duration_map(value: Option<&Value>) -> BTreeMap<String, CountDuration> {
let mut out = BTreeMap::new();
let Some(object) = value.and_then(|v| v.as_object()) else {
return out;
};
for (key, value) in object {
out.insert(
key.clone(),
CountDuration {
count: json_u64(value, &["count"]).unwrap_or(0),
duration_seconds_total: json_u64(value, &["duration_ms_total"]).unwrap_or(0) as f64
/ 1000.0,
},
);
}
out
}
fn extract_artifact_sizes(value: Option<&Value>) -> BTreeMap<String, u64> {
let mut out = BTreeMap::new();
for item in value.and_then(|v| v.as_array()).into_iter().flatten() {
let artifact = json_str(item, &["type"])
.or_else(|| {
json_str(item, &["path"])
.and_then(|path| Path::new(path).file_name().and_then(|name| name.to_str()))
})
.unwrap_or("unknown");
let size = json_u64(item, &["sizeBytes"])
.or_else(|| json_u64(item, &["size"]))
.unwrap_or(0);
*out.entry(artifact.to_string()).or_default() += size;
}
out
}
fn extract_path_sizes(value: Option<&Value>) -> BTreeMap<String, PathSize> {
let mut out = BTreeMap::new();
for item in value.and_then(|v| v.as_array()).into_iter().flatten() {
let label = json_str(item, &["label"]).unwrap_or("unknown").to_string();
out.insert(
label,
PathSize {
total_size_bytes: json_u64(item, &["totalSizeBytes"]).unwrap_or(0),
file_count: json_u64(item, &["fileCount"]).unwrap_or(0),
dir_count: json_u64(item, &["dirCount"]).unwrap_or(0),
},
);
}
out
}
fn run_index_from_path(path: &Path) -> Option<u64> {
path.file_name()
.and_then(|name| name.to_str())
.and_then(|name| name.strip_prefix("run_"))
.and_then(|value| value.parse::<u64>().ok())
}
fn json_str<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_str()
}
fn json_u64(value: &Value, path: &[&str]) -> Option<u64> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_u64()
}
fn json_i64(value: &Value, path: &[&str]) -> Option<i64> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_i64()
}
fn json_f64(value: &Value, path: &[&str]) -> Option<f64> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current
.as_f64()
.or_else(|| current.as_u64().map(|v| v as f64))
}
fn parse_rfc3339_to_unix(value: &str) -> Option<f64> {
time::OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339)
.ok()
.map(|dt| dt.unix_timestamp() as f64)
}
fn unix_now_seconds() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn bool_value(value: bool) -> f64 {
if value { 1.0 } else { 0.0 }
}
fn normalize_transport(value: &str) -> String {
let lower = value.to_ascii_lowercase();
if lower.contains("rrdp") || lower.contains("https") {
"rrdp".to_string()
} else if lower.contains("rsync") {
"rsync".to_string()
} else {
lower
}
}
fn infer_transport(uri: &str) -> String {
if uri.starts_with("http://") || uri.starts_with("https://") {
"rrdp".to_string()
} else if uri.starts_with("rsync://") {
"rsync".to_string()
} else {
"unknown".to_string()
}
}
fn uri_host(uri: &str) -> String {
let without_scheme = uri.split_once("://").map(|(_, rest)| rest).unwrap_or(uri);
without_scheme
.split('/')
.next()
.filter(|s| !s.is_empty())
.unwrap_or("unknown")
.to_string()
}
fn object_type_from_uri(uri: &str) -> String {
let lower = uri.to_ascii_lowercase();
for (suffix, kind) in [
(".mft", "manifest"),
(".crl", "crl"),
(".cer", "certificate"),
(".roa", "roa"),
(".asa", "aspa"),
(".gbr", "gbr"),
] {
if lower.ends_with(suffix) {
return kind.to_string();
}
}
"other".to_string()
}
fn short_sha256(value: &str) -> String {
let digest = Sha256::digest(value.as_bytes());
hex::encode(&digest[..6])
}
fn common_prefix_len(left: &str, right: &str) -> usize {
left.bytes()
.zip(right.bytes())
.take_while(|(l, r)| l == r)
.count()
}
fn format_prom_value(value: f64) -> String {
if value.is_infinite() && value.is_sign_positive() {
"+Inf".to_string()
} else if value.fract() == 0.0 {
format!("{value:.0}")
} else {
format!("{value:.6}")
.trim_end_matches('0')
.trim_end_matches('.')
.to_string()
}
}
fn escape_label(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('\n', "\\n")
.replace('"', "\\\"")
}
fn escape_help(value: &str) -> String {
value.replace('\\', "\\\\").replace('\n', "\\n")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ccr::model::CCR_VERSION_V0;
use crate::ccr::{
CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation, TrustAnchorState,
encode_content_info,
};
use crate::cir::{
CanonicalInputRepresentation, CirHashAlgorithm, CirObject, CirRejectedObject,
CirTrustAnchor, compute_reject_list_sha256, encode_cir, sha256,
};
use tempfile::TempDir;
#[test]
fn parse_args_accepts_once_outputs() {
let args = parse_args(&[
"rpki_artifact_metrics".to_string(),
"--run-root".to_string(),
"root".to_string(),
"--once".to_string(),
"--out-metrics".to_string(),
"metrics.prom".to_string(),
"--out-status".to_string(),
"status.json".to_string(),
])
.expect("parse");
assert!(args.once);
assert_eq!(args.run_root, PathBuf::from("root"));
assert_eq!(args.out_metrics.as_deref(), Some(Path::new("metrics.prom")));
}
#[test]
fn scan_fixture_exports_repo_pp_cir_and_ccr_metrics() {
let td = TempDir::new().expect("tempdir");
let run = td.path().join("runs/run_0001");
fs::create_dir_all(&run).expect("create run");
fs::write(
run.join("run-meta.json"),
r#"{"status":"success","run_index":1,"run_id":"run_0001","sync_mode":"snapshot","snapshot_reason":"first_run","started_at_rfc3339_utc":"2026-05-25T00:00:00Z","completed_at_rfc3339_utc":"2026-05-25T00:00:10Z"}"#,
)
.expect("meta");
fs::write(
run.join("run-summary.json"),
r#"{"runSeq":1,"runId":"run_0001","runDir":"RUN","startedAtRfc3339Utc":"2026-05-25T00:00:00Z","finishedAtRfc3339Utc":"2026-05-25T00:00:10Z","wallMs":10000,"status":"success","exitCode":0,"processMetrics":{"userSeconds":2.5,"systemSeconds":1.5,"cpuPercent":40,"maxRssKb":1000},"stageTiming":{"validation_ms":7000,"total_ms":9000,"download_event_count":2,"download_bytes_total":1234},"reportCounts":{"vrps":3,"aspas":1,"publicationPoints":2,"warnings":0},"repoSyncStats":{"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}},"pathStats":[{"label":"work-db","totalSizeBytes":99,"fileCount":2,"dirCount":1}],"artifacts":[{"path":"report.json","sizeBytes":10}]}"#,
)
.expect("summary");
fs::write(run.join("process-time.txt"), "time").expect("time");
fs::write(run.join("stage-timing.json"), "{}").expect("stage");
fs::write(
run.join("vrps.csv"),
"ASN,IP Prefix,Max Length,Trust Anchor,Expires\n\
AS64496,192.0.2.0/24,24,ta,2026-05-25T01:00:00Z\n\
AS64496,192.0.2.0/24,24,ta,2026-05-25T01:00:00Z\n\
AS64497,2001:db8::/32,48,ta,2026-05-25T01:00:00Z\n",
)
.expect("vrps");
fs::write(
run.join("report.json"),
r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{},{}],"aspas":[{}],"downloads":[{"kind":"rrdp_notification","uri":"https://repo.example/notify.xml","success":true,"duration_ms":100,"bytes":111},{"kind":"rrdp_delta","uri":"https://repo.example/session/1/delta.xml","success":true,"duration_ms":200,"bytes":222}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#,
)
.expect("report");
fs::write(run.join("input.cir"), sample_cir()).expect("cir");
fs::write(run.join("result.ccr"), sample_ccr()).expect("ccr");
let snapshot = scan_run_root(td.path(), "test").expect("scan");
assert_eq!(snapshot.runs.success, 1);
assert_eq!(snapshot.repo_stats.len(), 1);
assert!(snapshot.repo_stats[0].sync_success);
assert_eq!(snapshot.repo_stats[0].download_bytes, 333);
assert_eq!(snapshot.top_pp_by_object_count[0].object_count, 2);
assert_eq!(snapshot.cir.as_ref().unwrap().objects, 1);
assert_eq!(snapshot.ccr.as_ref().unwrap().state_items["tas"], 1);
let metrics = render_metrics(&snapshot);
assert!(metrics.contains("ours_rp_repository_info"));
assert!(metrics.contains("ours_rp_repository_sync_success"));
assert!(metrics.contains("ours_rp_repository_download_bytes"));
assert!(metrics.contains("ours_rp_large_publication_points"));
assert!(metrics.contains("ours_rp_cir_objects"));
assert!(metrics.contains("ours_rp_ccr_state_items"));
assert!(metrics.contains(r#"ours_rp_vrps{instance="test",kind="total"} 3"#));
assert!(metrics.contains(r#"ours_rp_vrps{instance="test",kind="unique"} 2"#));
let status = render_status_json(&snapshot).expect("status");
assert!(status.contains("topPublicationPointsByObjectCount"));
assert!(status.contains(r#""vrpsUnique": 2"#));
}
#[test]
fn partial_run_does_not_become_latest_success() {
let td = TempDir::new().expect("tempdir");
let run = td.path().join("runs/run_0001");
fs::create_dir_all(&run).expect("create run");
fs::write(run.join("run-meta.json"), r#"{"status":"running"}"#).expect("meta");
let snapshot = scan_run_root(td.path(), "test").expect("scan");
assert_eq!(snapshot.runs.partial, 1);
assert!(snapshot.latest_run.is_none());
}
fn sample_cir() -> Vec<u8> {
let rejected = vec![CirRejectedObject {
object_uri: "rsync://repo.example/a/bad.roa".to_string(),
reason: Some("bad".to_string()),
}];
let cir = CanonicalInputRepresentation {
version: crate::cir::CIR_VERSION_V3,
hash_alg: CirHashAlgorithm::Sha256,
validation_time: time::OffsetDateTime::parse(
"2026-05-25T00:00:00Z",
&time::format_description::well_known::Rfc3339,
)
.unwrap(),
objects: vec![CirObject {
rsync_uri: "rsync://repo.example/a/a.roa".to_string(),
sha256: vec![1; 32],
}],
trust_anchors: vec![CirTrustAnchor {
ta_rsync_uri: "rsync://repo.example/ta.cer".to_string(),
tal_uri: "https://tal.example/tal.tal".to_string(),
tal_bytes: b"rsync://repo.example/ta.cer\n\nAQID\n".to_vec(),
ta_certificate_der: b"ta".to_vec(),
ta_certificate_sha256: sha256(b"ta"),
}],
reject_list_sha256: compute_reject_list_sha256(
rejected.iter().map(|item| item.object_uri.as_str()),
),
rejected_objects: rejected,
};
encode_cir(&cir).expect("encode cir")
}
fn sample_ccr() -> Vec<u8> {
let ci = CcrContentInfo::new(RpkiCanonicalCacheRepresentation {
version: CCR_VERSION_V0,
hash_alg: CcrDigestAlgorithm::Sha256,
produced_at: time::OffsetDateTime::parse(
"2026-05-25T00:00:00Z",
&time::format_description::well_known::Rfc3339,
)
.unwrap(),
mfts: None,
vrps: None,
vaps: None,
tas: Some(TrustAnchorState {
skis: vec![vec![1; 20]],
hash: vec![2; 32],
}),
rks: None,
});
encode_content_info(&ci).expect("encode ccr")
}
}