diff --git a/src/bin/rpki_daemon.rs b/src/bin/rpki_daemon.rs new file mode 100644 index 0000000..0bdc3a4 --- /dev/null +++ b/src/bin/rpki_daemon.rs @@ -0,0 +1,1783 @@ +use serde::Serialize; +use std::collections::BTreeMap; +use std::fs::{self, File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Duration; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct Args { + state_root: PathBuf, + rpki_bin: PathBuf, + interval_secs: u64, + max_runs: Option, + retain_runs: usize, + status_json: Option, + summary_jsonl: Option, + work_db: PathBuf, + repo_bytes_db: Option, + raw_store_db: Option, + db_stats_bin: Option, + db_stats_exact_every: Option, + time_bin: Option, + child_args: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +struct RunContext { + seq: u64, + run_id: String, + run_dir: PathBuf, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum DaemonState { + Starting, + Idle, + Running, + Collecting, + Sleeping, + Exited, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct DaemonStatus { + state: DaemonState, + updated_at_rfc3339_utc: String, + runs_completed: u64, + max_runs: Option, + current_run_seq: Option, + current_run_id: Option, + last_run_id: Option, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum RunStatus { + Success, + Failed, + SpawnFailed, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct ArtifactInfo { + path: String, + size_bytes: u64, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct ProcessMetrics { + time_wrapper_used: bool, + time_output_path: Option, + user_seconds: Option, + system_seconds: Option, + cpu_percent: Option, + elapsed_raw: Option, + max_rss_kb: Option, + exit_status_from_time: Option, + parse_error: Option, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct ReportCounts { + vrps: usize, + aspas: usize, + publication_points: usize, + rrdp_repos_unique: Option, + tree_instances_processed: Option, + tree_instances_failed: Option, + warnings: usize, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct PathFileStats { + label: String, + path: String, + exists: bool, + is_dir: bool, + total_size_bytes: u64, + file_count: u64, + dir_count: u64, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +struct DbStatsSummary { + mode: String, + db_path: String, + output_path: Option, + stderr_path: Option, + status: String, + exit_code: Option, + error: Option, + metrics: BTreeMap, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct RunSummary { + run_seq: u64, + run_id: String, + run_dir: String, + started_at_rfc3339_utc: String, + finished_at_rfc3339_utc: String, + wall_ms: u64, + status: RunStatus, + exit_code: Option, + exit_status: Option, + error: Option, + rpki_bin: String, + child_args: Vec, + stdout_path: String, + stderr_path: String, + process_metrics: Option, + stage_timing: Option, + report_counts: Option, + repo_sync_stats: Option, + path_stats: Vec, + db_stats: Vec, + retention_deleted_runs: Vec, + artifacts: Vec, +} + +fn usage() -> String { + let bin = "rpki_daemon"; + format!( + "\ +Usage: + {bin} --state-root --rpki-bin [options] -- + +Options: + --state-root Persistent daemon root containing state/, runs/, status, and JSONL summary + --rpki-bin rpki child binary to execute for each run + --interval-secs Sleep seconds between runs (default: 60) + --max-runs Stop after n runs (default: run forever) + --retain-runs Keep only the latest n run directories (default: 10) + --status-json Override status JSON path (default: /daemon-status.json) + --summary-jsonl Override summary JSONL path (default: /daemon-runs.jsonl) + --work-db Work DB path for metrics (default: /state/work-db) + --repo-bytes-db Repo bytes DB path for file metrics (default: /state/repo-bytes.db) + --raw-store-db Raw store DB path for file metrics (optional) + --db-stats-bin db_stats binary path (default: sibling db_stats next to this executable when present) + --db-stats-exact-every + Run db_stats --exact every n runs (default: disabled) + --time-bin GNU time binary for child process metrics (default: /usr/bin/time when present) + --no-time-wrapper Disable GNU time wrapper + --help Show this help + +Child argument placeholders: + {{state_root}} Daemon state root + {{run_out}} Current run output directory + {{run_id}} Current run id, e.g. 000001-20260428T090000Z + {{run_seq}} Current run sequence number +" + ) +} + +fn default_time_bin() -> Option { + let path = PathBuf::from("/usr/bin/time"); + if path.is_file() { Some(path) } else { None } +} + +fn default_db_stats_bin() -> Option { + let mut path = std::env::current_exe().ok()?; + path.set_file_name("db_stats"); + if path.is_file() { Some(path) } else { None } +} + +fn parse_args(argv: &[String]) -> Result { + if argv.iter().any(|arg| arg == "--help" || arg == "-h") { + return Err(usage()); + } + + let mut state_root: Option = None; + let mut rpki_bin: Option = None; + let mut interval_secs = 60u64; + let mut max_runs = None; + let mut retain_runs = 10usize; + let mut status_json: Option = None; + let mut summary_jsonl: Option = None; + let mut work_db: Option = None; + let mut repo_bytes_db: Option = None; + let mut raw_store_db: Option = None; + let mut db_stats_bin: Option = None; + let mut db_stats_exact_every = None; + let mut time_bin = default_time_bin(); + let mut no_time_wrapper = false; + + let mut i = 1usize; + while i < argv.len() { + match argv[i].as_str() { + "--" => { + let child_args = argv[i + 1..].to_vec(); + let state_root = + state_root.ok_or_else(|| format!("--state-root is required\n\n{}", usage()))?; + let work_db = work_db.unwrap_or_else(|| state_root.join("state").join("work-db")); + let repo_bytes_db = + repo_bytes_db.or_else(|| Some(state_root.join("state").join("repo-bytes.db"))); + if no_time_wrapper { + time_bin = None; + } + let args = Args { + state_root, + rpki_bin: rpki_bin + .ok_or_else(|| format!("--rpki-bin is required\n\n{}", usage()))?, + interval_secs, + max_runs, + retain_runs, + status_json, + summary_jsonl, + work_db, + repo_bytes_db, + raw_store_db, + db_stats_bin, + db_stats_exact_every, + time_bin, + child_args, + }; + return validate_args(args); + } + "--state-root" => { + i += 1; + state_root = Some(PathBuf::from(value_at(argv, i, "--state-root")?)); + } + "--rpki-bin" => { + i += 1; + rpki_bin = Some(PathBuf::from(value_at(argv, i, "--rpki-bin")?)); + } + "--interval-secs" => { + i += 1; + interval_secs = + parse_u64(value_at(argv, i, "--interval-secs")?, "--interval-secs")?; + } + "--max-runs" => { + i += 1; + let parsed = parse_u64(value_at(argv, i, "--max-runs")?, "--max-runs")?; + if parsed == 0 { + return Err("--max-runs must be > 0".to_string()); + } + max_runs = Some(parsed); + } + "--retain-runs" => { + i += 1; + let parsed = parse_usize(value_at(argv, i, "--retain-runs")?, "--retain-runs")?; + if parsed == 0 { + return Err("--retain-runs must be > 0".to_string()); + } + retain_runs = parsed; + } + "--status-json" => { + i += 1; + status_json = Some(PathBuf::from(value_at(argv, i, "--status-json")?)); + } + "--summary-jsonl" => { + i += 1; + summary_jsonl = Some(PathBuf::from(value_at(argv, i, "--summary-jsonl")?)); + } + "--work-db" => { + i += 1; + work_db = Some(PathBuf::from(value_at(argv, i, "--work-db")?)); + } + "--repo-bytes-db" => { + i += 1; + repo_bytes_db = Some(PathBuf::from(value_at(argv, i, "--repo-bytes-db")?)); + } + "--raw-store-db" => { + i += 1; + raw_store_db = Some(PathBuf::from(value_at(argv, i, "--raw-store-db")?)); + } + "--db-stats-bin" => { + i += 1; + db_stats_bin = Some(PathBuf::from(value_at(argv, i, "--db-stats-bin")?)); + } + "--db-stats-exact-every" => { + i += 1; + let parsed = parse_u64( + value_at(argv, i, "--db-stats-exact-every")?, + "--db-stats-exact-every", + )?; + if parsed == 0 { + return Err("--db-stats-exact-every must be > 0".to_string()); + } + db_stats_exact_every = Some(parsed); + } + "--time-bin" => { + i += 1; + time_bin = Some(PathBuf::from(value_at(argv, i, "--time-bin")?)); + } + "--no-time-wrapper" => { + no_time_wrapper = true; + } + other => return Err(format!("unknown argument: {other}\n\n{}", usage())), + } + i += 1; + } + + Err(format!("missing -- before child rpki args\n\n{}", usage())) +} + +fn validate_args(args: Args) -> Result { + if args.child_args.is_empty() { + return Err(format!( + "child rpki args are required after --\n\n{}", + usage() + )); + } + Ok(args) +} + +fn value_at<'a>(argv: &'a [String], index: usize, flag: &str) -> Result<&'a str, String> { + argv.get(index) + .map(String::as_str) + .ok_or_else(|| format!("{flag} requires a value")) +} + +fn parse_u64(raw: &str, flag: &str) -> Result { + raw.parse::() + .map_err(|_| format!("invalid {flag}: {raw}")) +} + +fn parse_usize(raw: &str, flag: &str) -> Result { + raw.parse::() + .map_err(|_| format!("invalid {flag}: {raw}")) +} + +fn status_path(args: &Args) -> PathBuf { + args.status_json + .clone() + .unwrap_or_else(|| args.state_root.join("daemon-status.json")) +} + +fn summary_jsonl_path(args: &Args) -> PathBuf { + args.summary_jsonl + .clone() + .unwrap_or_else(|| args.state_root.join("daemon-runs.jsonl")) +} + +fn utc_now() -> time::OffsetDateTime { + time::OffsetDateTime::now_utc().to_offset(time::UtcOffset::UTC) +} + +fn format_rfc3339(t: time::OffsetDateTime) -> Result { + t.format(&time::format_description::well_known::Rfc3339) + .map_err(|e| format!("format RFC3339 failed: {e}")) +} + +fn format_compact_utc(t: time::OffsetDateTime) -> String { + format!( + "{:04}{:02}{:02}T{:02}{:02}{:02}Z", + t.year(), + u8::from(t.month()), + t.day(), + t.hour(), + t.minute(), + t.second() + ) +} + +fn write_json_pretty(path: &Path, value: &T) -> Result<(), String> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .map_err(|e| format!("create parent dir failed: {}: {e}", parent.display()))?; + } + let file = + File::create(path).map_err(|e| format!("create json failed: {}: {e}", path.display()))?; + serde_json::to_writer_pretty(file, value) + .map_err(|e| format!("write json failed: {}: {e}", path.display())) +} + +fn append_json_line(path: &Path, value: &T) -> Result<(), String> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .map_err(|e| format!("create parent dir failed: {}: {e}", parent.display()))?; + } + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|e| format!("open jsonl failed: {}: {e}", path.display()))?; + serde_json::to_writer(&mut file, value) + .map_err(|e| format!("write jsonl failed: {}: {e}", path.display()))?; + file.write_all(b"\n") + .map_err(|e| format!("flush jsonl failed: {}: {e}", path.display())) +} + +fn write_status( + args: &Args, + state: DaemonState, + runs_completed: u64, + current: Option<&RunContext>, + last_run_id: Option, +) -> Result<(), String> { + let updated_at_rfc3339_utc = format_rfc3339(utc_now())?; + let status = DaemonStatus { + state, + updated_at_rfc3339_utc, + runs_completed, + max_runs: args.max_runs, + current_run_seq: current.map(|ctx| ctx.seq), + current_run_id: current.map(|ctx| ctx.run_id.clone()), + last_run_id, + }; + write_json_pretty(&status_path(args), &status) +} + +fn render_child_args(args: &[String], daemon_args: &Args, ctx: &RunContext) -> Vec { + args.iter() + .map(|arg| { + arg.replace("{state_root}", &path_string(&daemon_args.state_root)) + .replace("{run_out}", &path_string(&ctx.run_dir)) + .replace("{run_id}", &ctx.run_id) + .replace("{run_seq}", &ctx.seq.to_string()) + }) + .collect() +} + +fn render_path_template(path: &Path, daemon_args: &Args, ctx: &RunContext) -> PathBuf { + PathBuf::from( + path_string(path) + .replace("{state_root}", &path_string(&daemon_args.state_root)) + .replace("{run_out}", &path_string(&ctx.run_dir)) + .replace("{run_id}", &ctx.run_id) + .replace("{run_seq}", &ctx.seq.to_string()), + ) +} + +fn path_string(path: &Path) -> String { + path.to_string_lossy().into_owned() +} + +fn make_run_context(args: &Args, seq: u64, now: time::OffsetDateTime) -> RunContext { + let run_id = format!("{seq:06}-{}", format_compact_utc(now)); + let run_dir = args.state_root.join("runs").join(&run_id); + RunContext { + seq, + run_id, + run_dir, + } +} + +fn collect_artifacts(run_dir: &Path) -> Result, String> { + let mut artifacts = Vec::new(); + for entry in fs::read_dir(run_dir) + .map_err(|e| format!("read run dir failed: {}: {e}", run_dir.display()))? + { + let entry = entry.map_err(|e| format!("read run dir entry failed: {e}"))?; + if !entry + .file_type() + .map_err(|e| format!("read file type failed: {}: {e}", entry.path().display()))? + .is_file() + { + continue; + } + let metadata = entry + .metadata() + .map_err(|e| format!("read metadata failed: {}: {e}", entry.path().display()))?; + artifacts.push(ArtifactInfo { + path: path_string(&entry.path()), + size_bytes: metadata.len(), + }); + } + artifacts.sort_by(|a, b| a.path.cmp(&b.path)); + Ok(artifacts) +} + +fn collect_process_metrics(time_wrapper_used: bool, time_output_path: &Path) -> ProcessMetrics { + if !time_wrapper_used { + return ProcessMetrics { + time_wrapper_used, + time_output_path: None, + user_seconds: None, + system_seconds: None, + cpu_percent: None, + elapsed_raw: None, + max_rss_kb: None, + exit_status_from_time: None, + parse_error: None, + }; + } + + let mut metrics = ProcessMetrics { + time_wrapper_used, + time_output_path: Some(path_string(time_output_path)), + user_seconds: None, + system_seconds: None, + cpu_percent: None, + elapsed_raw: None, + max_rss_kb: None, + exit_status_from_time: None, + parse_error: None, + }; + + let text = match fs::read_to_string(time_output_path) { + Ok(text) => text, + Err(err) => { + metrics.parse_error = Some(format!( + "read process time output failed: {}: {err}", + time_output_path.display() + )); + return metrics; + } + }; + + for line in text.lines() { + let line = line.trim(); + if let Some(value) = line.strip_prefix("User time (seconds):") { + metrics.user_seconds = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("System time (seconds):") { + metrics.system_seconds = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("Percent of CPU this job got:") { + metrics.cpu_percent = value.trim().trim_end_matches('%').parse::().ok(); + } else if let Some(value) = + line.strip_prefix("Elapsed (wall clock) time (h:mm:ss or m:ss):") + { + metrics.elapsed_raw = Some(value.trim().to_string()); + } else if let Some(value) = line.strip_prefix("Maximum resident set size (kbytes):") { + metrics.max_rss_kb = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("Exit status:") { + metrics.exit_status_from_time = value.trim().parse::().ok(); + } + } + metrics +} + +fn run_child_once(args: &Args, ctx: &RunContext) -> Result { + fs::create_dir_all(&ctx.run_dir) + .map_err(|e| format!("create run dir failed: {}: {e}", ctx.run_dir.display()))?; + + let started_at = utc_now(); + let started_at_rfc3339_utc = format_rfc3339(started_at)?; + let stdout_path = ctx.run_dir.join("stdout.log"); + let stderr_path = ctx.run_dir.join("stderr.log"); + let stdout = File::create(&stdout_path) + .map_err(|e| format!("create stdout log failed: {}: {e}", stdout_path.display()))?; + let stderr = File::create(&stderr_path) + .map_err(|e| format!("create stderr log failed: {}: {e}", stderr_path.display()))?; + let child_args = render_child_args(&args.child_args, args, ctx); + let time_output_path = ctx.run_dir.join("process-time.txt"); + + let mut command = if let Some(time_bin) = args.time_bin.as_ref() { + let mut command = Command::new(time_bin); + command + .arg("-v") + .arg("-o") + .arg(&time_output_path) + .arg("--") + .arg(&args.rpki_bin) + .args(&child_args); + command + } else { + let mut command = Command::new(&args.rpki_bin); + command.args(&child_args); + command + }; + command + .stdout(Stdio::from(stdout)) + .stderr(Stdio::from(stderr)); + + let (status, exit_code, exit_status, error) = match command.status() { + Ok(status) if status.success() => ( + RunStatus::Success, + status.code(), + Some(status.to_string()), + None, + ), + Ok(status) => ( + RunStatus::Failed, + status.code(), + Some(status.to_string()), + None, + ), + Err(err) => ( + RunStatus::SpawnFailed, + None, + None, + Some(format!("spawn child failed: {err}")), + ), + }; + + let finished_at = utc_now(); + let finished_at_rfc3339_utc = format_rfc3339(finished_at)?; + let wall_ms = (finished_at - started_at).whole_milliseconds().max(0) as u64; + let process_metrics = Some(collect_process_metrics( + args.time_bin.is_some(), + &time_output_path, + )); + let artifacts = collect_artifacts(&ctx.run_dir)?; + let summary = RunSummary { + run_seq: ctx.seq, + run_id: ctx.run_id.clone(), + run_dir: path_string(&ctx.run_dir), + started_at_rfc3339_utc, + finished_at_rfc3339_utc, + wall_ms, + status, + exit_code, + exit_status, + error, + rpki_bin: path_string(&args.rpki_bin), + child_args, + stdout_path: path_string(&stdout_path), + stderr_path: path_string(&stderr_path), + process_metrics, + stage_timing: None, + report_counts: None, + repo_sync_stats: None, + path_stats: Vec::new(), + db_stats: Vec::new(), + retention_deleted_runs: Vec::new(), + artifacts, + }; + Ok(summary) +} + +fn apply_retention(runs_root: &Path, retain_runs: usize) -> Result, String> { + if !runs_root.exists() { + return Ok(Vec::new()); + } + let mut dirs = Vec::new(); + for entry in fs::read_dir(runs_root) + .map_err(|e| format!("read runs dir failed: {}: {e}", runs_root.display()))? + { + let entry = entry.map_err(|e| format!("read runs dir entry failed: {e}"))?; + if entry + .file_type() + .map_err(|e| format!("read file type failed: {}: {e}", entry.path().display()))? + .is_dir() + { + dirs.push(entry.path()); + } + } + dirs.sort(); + let remove_count = dirs.len().saturating_sub(retain_runs); + let mut removed = Vec::new(); + for dir in dirs.into_iter().take(remove_count) { + fs::remove_dir_all(&dir) + .map_err(|e| format!("remove old run dir failed: {}: {e}", dir.display()))?; + removed.push(dir); + } + Ok(removed) +} + +fn find_named_file(root: &Path, name: &str) -> Option { + let mut stack = vec![root.to_path_buf()]; + while let Some(dir) = stack.pop() { + let entries = fs::read_dir(&dir).ok()?; + for entry in entries.flatten() { + let path = entry.path(); + let file_type = entry.file_type().ok()?; + if file_type.is_file() && entry.file_name().to_string_lossy() == name { + return Some(path); + } + if file_type.is_dir() { + stack.push(path); + } + } + } + None +} + +fn read_json_value_if_exists(path: &Path) -> Option { + let bytes = fs::read(path).ok()?; + serde_json::from_slice(&bytes).ok() +} + +fn json_array_len(value: &serde_json::Value, key: &str) -> usize { + value + .get(key) + .and_then(serde_json::Value::as_array) + .map(Vec::len) + .unwrap_or(0) +} + +fn parse_stdout_summary(run_dir: &Path) -> Option { + let stdout_path = run_dir.join("stdout.log"); + let text = fs::read_to_string(stdout_path).ok()?; + let mut vrps = None; + let mut aspas = None; + let mut publication_points = None; + let mut rrdp_repos_unique = None; + let mut tree_instances_processed = None; + let mut tree_instances_failed = None; + let mut warnings = None; + + for line in text.lines() { + if let Some(value) = line.strip_prefix("vrps=") { + vrps = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("aspas=") { + aspas = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("audit_publication_points=") { + publication_points = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("rrdp_repos_unique=") { + rrdp_repos_unique = value.trim().parse::().ok(); + } else if let Some(value) = line.strip_prefix("warnings_total=") { + warnings = value.trim().parse::().ok(); + } else if let Some(rest) = line.strip_prefix("publication_points_processed=") { + for token in rest.split_whitespace() { + if let Some(value) = token.strip_prefix("publication_points_failed=") { + tree_instances_failed = value.parse::().ok(); + } else if tree_instances_processed.is_none() { + tree_instances_processed = token.parse::().ok(); + } + } + } + } + + Some(ReportCounts { + vrps: vrps?, + aspas: aspas?, + publication_points: publication_points?, + rrdp_repos_unique, + tree_instances_processed, + tree_instances_failed, + warnings: warnings.unwrap_or(0), + }) +} + +fn parse_report_counts_fallback(report: &serde_json::Value) -> ReportCounts { + let tree = report.get("tree"); + let tree_warnings = tree + .and_then(|tree| tree.get("warnings")) + .and_then(serde_json::Value::as_array) + .map(Vec::len) + .unwrap_or(0); + let pp_warnings = report + .get("publication_points") + .and_then(serde_json::Value::as_array) + .map(|items| { + items + .iter() + .map(|pp| { + pp.get("warnings") + .and_then(serde_json::Value::as_array) + .map(Vec::len) + .unwrap_or(0) + }) + .sum() + }) + .unwrap_or(0); + ReportCounts { + vrps: json_array_len(report, "vrps"), + aspas: json_array_len(report, "aspas"), + publication_points: json_array_len(report, "publication_points"), + rrdp_repos_unique: None, + tree_instances_processed: tree + .and_then(|tree| tree.get("instances_processed")) + .and_then(serde_json::Value::as_u64), + tree_instances_failed: tree + .and_then(|tree| tree.get("instances_failed")) + .and_then(serde_json::Value::as_u64), + warnings: tree_warnings + pp_warnings, + } +} + +fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option { + haystack + .windows(needle.len()) + .position(|window| window == needle) +} + +fn extract_json_object_field(path: &Path, field_name: &str) -> Option { + let bytes = fs::read(path).ok()?; + let needle = format!("\"{field_name}\":"); + let pos = find_subslice(&bytes, needle.as_bytes())?; + let mut i = pos + needle.len(); + while i < bytes.len() && bytes[i].is_ascii_whitespace() { + i += 1; + } + if bytes.get(i).copied()? != b'{' { + return None; + } + let start = i; + let mut depth = 0u32; + let mut in_string = false; + let mut escaped = false; + for (offset, &b) in bytes[start..].iter().enumerate() { + if in_string { + if escaped { + escaped = false; + } else if b == b'\\' { + escaped = true; + } else if b == b'"' { + in_string = false; + } + continue; + } + match b { + b'"' => in_string = true, + b'{' => depth = depth.saturating_add(1), + b'}' => { + depth = depth.saturating_sub(1); + if depth == 0 { + let end = start + offset + 1; + return serde_json::from_slice(&bytes[start..end]).ok(); + } + } + _ => {} + } + } + None +} + +fn parse_report_metadata(run_dir: &Path) -> (Option, Option) { + let Some(report_path) = find_named_file(run_dir, "report.json") else { + return (parse_stdout_summary(run_dir), None); + }; + let counts = parse_stdout_summary(run_dir).or_else(|| { + read_json_value_if_exists(&report_path).map(|report| parse_report_counts_fallback(&report)) + }); + let repo_sync_stats = extract_json_object_field(&report_path, "repo_sync_stats"); + (counts, repo_sync_stats) +} + +fn collect_path_file_stats(label: &str, path: &Path) -> PathFileStats { + let mut stats = PathFileStats { + label: label.to_string(), + path: path_string(path), + exists: path.exists(), + is_dir: path.is_dir(), + total_size_bytes: 0, + file_count: 0, + dir_count: 0, + }; + if !stats.exists { + return stats; + } + if path.is_file() { + if let Ok(metadata) = path.metadata() { + stats.total_size_bytes = metadata.len(); + stats.file_count = 1; + } + return stats; + } + + let mut stack = vec![path.to_path_buf()]; + while let Some(dir) = stack.pop() { + let Ok(entries) = fs::read_dir(&dir) else { + continue; + }; + for entry in entries.flatten() { + let Ok(file_type) = entry.file_type() else { + continue; + }; + if file_type.is_dir() { + stats.dir_count = stats.dir_count.saturating_add(1); + stack.push(entry.path()); + } else if file_type.is_file() { + stats.file_count = stats.file_count.saturating_add(1); + if let Ok(metadata) = entry.metadata() { + stats.total_size_bytes = stats.total_size_bytes.saturating_add(metadata.len()); + } + } + } + } + stats +} + +fn collect_state_path_stats(args: &Args, ctx: &RunContext) -> Vec { + let mut stats = Vec::new(); + stats.push(collect_path_file_stats( + "work_db", + &render_path_template(&args.work_db, args, ctx), + )); + if let Some(path) = args.repo_bytes_db.as_ref() { + stats.push(collect_path_file_stats( + "repo_bytes_db", + &render_path_template(path, args, ctx), + )); + } + if let Some(path) = args.raw_store_db.as_ref() { + stats.push(collect_path_file_stats( + "raw_store_db", + &render_path_template(path, args, ctx), + )); + } + stats +} + +fn parse_key_value_metrics(text: &str) -> BTreeMap { + let mut metrics = BTreeMap::new(); + for line in text.lines() { + let Some((key, value)) = line.split_once('=') else { + continue; + }; + metrics.insert(key.trim().to_string(), value.trim().to_string()); + } + metrics +} + +fn run_db_stats_command( + db_stats_bin: &Path, + db_path: &Path, + run_dir: &Path, + mode: &str, +) -> DbStatsSummary { + let output_path = run_dir.join(format!("db-stats-{mode}.txt")); + let stderr_path = run_dir.join(format!("db-stats-{mode}.stderr.txt")); + let mut summary = DbStatsSummary { + mode: mode.to_string(), + db_path: path_string(db_path), + output_path: Some(path_string(&output_path)), + stderr_path: None, + status: "success".to_string(), + exit_code: None, + error: None, + metrics: BTreeMap::new(), + }; + + if !db_path.exists() { + summary.status = "skipped".to_string(); + summary.error = Some(format!("db path does not exist: {}", db_path.display())); + summary.output_path = None; + return summary; + } + + let mut command = Command::new(db_stats_bin); + command.arg("--db").arg(db_path); + if mode == "exact" { + command.arg("--exact"); + } + match command.output() { + Ok(output) => { + summary.exit_code = output.status.code(); + if !output.status.success() { + summary.status = "failed".to_string(); + } + let stdout_text = String::from_utf8_lossy(&output.stdout).into_owned(); + if let Err(err) = fs::write(&output_path, stdout_text.as_bytes()) { + summary.status = "failed".to_string(); + summary.error = Some(format!( + "write db_stats output failed: {}: {err}", + output_path.display() + )); + } + summary.metrics = parse_key_value_metrics(&stdout_text); + if !output.stderr.is_empty() { + if fs::write(&stderr_path, &output.stderr).is_ok() { + summary.stderr_path = Some(path_string(&stderr_path)); + } + } + if !output.status.success() && summary.error.is_none() { + summary.error = Some(String::from_utf8_lossy(&output.stderr).into_owned()); + } + } + Err(err) => { + summary.status = "spawn_failed".to_string(); + summary.exit_code = None; + summary.output_path = None; + summary.error = Some(format!("spawn db_stats failed: {err}")); + } + } + summary +} + +fn collect_db_stats(args: &Args, ctx: &RunContext) -> Vec { + let work_db = render_path_template(&args.work_db, args, ctx); + let Some(db_stats_bin) = args + .db_stats_bin + .as_ref() + .cloned() + .or_else(default_db_stats_bin) + else { + return vec![DbStatsSummary { + mode: "estimate".to_string(), + db_path: path_string(&work_db), + output_path: None, + stderr_path: None, + status: "skipped".to_string(), + exit_code: None, + error: Some( + "db_stats binary not configured and sibling db_stats was not found".to_string(), + ), + metrics: BTreeMap::new(), + }]; + }; + + let mut stats = Vec::new(); + stats.push(run_db_stats_command( + &db_stats_bin, + &work_db, + &ctx.run_dir, + "estimate", + )); + if args + .db_stats_exact_every + .is_some_and(|every| ctx.seq % every == 0) + { + stats.push(run_db_stats_command( + &db_stats_bin, + &work_db, + &ctx.run_dir, + "exact", + )); + } + stats +} + +fn collect_post_run_metrics(args: &Args, ctx: &RunContext, summary: &mut RunSummary) { + if let Some(path) = find_named_file(&ctx.run_dir, "stage-timing.json") { + summary.stage_timing = read_json_value_if_exists(&path); + } + let (report_counts, repo_sync_stats) = parse_report_metadata(&ctx.run_dir); + summary.report_counts = report_counts; + summary.repo_sync_stats = repo_sync_stats; + summary.path_stats = collect_state_path_stats(args, ctx); + summary.db_stats = collect_db_stats(args, ctx); + summary.artifacts = collect_artifacts(&ctx.run_dir).unwrap_or_default(); +} + +fn run_daemon(args: &Args) -> Result<(), String> { + fs::create_dir_all(args.state_root.join("state")).map_err(|e| { + format!( + "create daemon state dir failed: {}: {e}", + args.state_root.join("state").display() + ) + })?; + fs::create_dir_all(args.state_root.join("runs")).map_err(|e| { + format!( + "create daemon runs dir failed: {}: {e}", + args.state_root.join("runs").display() + ) + })?; + + let mut runs_completed = 0u64; + let mut next_seq = 1u64; + let mut last_run_id = None; + write_status( + args, + DaemonState::Starting, + runs_completed, + None, + last_run_id.clone(), + )?; + + loop { + if args.max_runs.is_some_and(|max| runs_completed >= max) { + break; + } + + write_status( + args, + DaemonState::Idle, + runs_completed, + None, + last_run_id.clone(), + )?; + let ctx = make_run_context(args, next_seq, utc_now()); + write_status( + args, + DaemonState::Running, + runs_completed, + Some(&ctx), + last_run_id.clone(), + )?; + let mut summary = run_child_once(args, &ctx)?; + write_status( + args, + DaemonState::Collecting, + runs_completed, + Some(&ctx), + last_run_id.clone(), + )?; + collect_post_run_metrics(args, &ctx, &mut summary); + let removed = apply_retention(&args.state_root.join("runs"), args.retain_runs)?; + summary.retention_deleted_runs = removed.iter().map(|p| path_string(p)).collect(); + summary.artifacts = collect_artifacts(&ctx.run_dir).unwrap_or_default(); + write_json_pretty(&ctx.run_dir.join("run-summary.json"), &summary)?; + append_json_line(&summary_jsonl_path(args), &summary)?; + runs_completed += 1; + next_seq += 1; + last_run_id = Some(ctx.run_id); + + if args.max_runs.is_some_and(|max| runs_completed >= max) { + break; + } + write_status( + args, + DaemonState::Sleeping, + runs_completed, + None, + last_run_id.clone(), + )?; + if args.interval_secs > 0 { + std::thread::sleep(Duration::from_secs(args.interval_secs)); + } + } + + write_status(args, DaemonState::Exited, runs_completed, None, last_run_id) +} + +fn main() { + let argv: Vec = std::env::args().collect(); + match parse_args(&argv) { + Ok(args) => { + if let Err(err) = run_daemon(&args) { + eprintln!("{err}"); + std::process::exit(2); + } + } + Err(err) => { + if argv.iter().any(|a| a == "--help" || a == "-h") { + println!("{err}"); + return; + } + eprintln!("{err}"); + std::process::exit(2); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_args(state_root: PathBuf) -> Args { + Args { + work_db: state_root.join("state/work-db"), + repo_bytes_db: Some(state_root.join("state/repo-bytes.db")), + state_root, + rpki_bin: PathBuf::from("/bin/true"), + interval_secs: 0, + max_runs: Some(1), + retain_runs: 10, + status_json: None, + summary_jsonl: None, + raw_store_db: None, + db_stats_bin: None, + db_stats_exact_every: None, + time_bin: None, + child_args: vec!["--version".to_string()], + } + } + + #[test] + fn parse_args_accepts_required_flags_and_child_args() { + let argv = vec![ + "rpki_daemon".to_string(), + "--state-root".to_string(), + "/tmp/daemon".to_string(), + "--rpki-bin".to_string(), + "/bin/echo".to_string(), + "--interval-secs".to_string(), + "0".to_string(), + "--max-runs".to_string(), + "2".to_string(), + "--retain-runs".to_string(), + "3".to_string(), + "--".to_string(), + "--db".to_string(), + "{state_root}/state/work-db".to_string(), + "--report-json".to_string(), + "{run_out}/report.json".to_string(), + ]; + + let args = parse_args(&argv).expect("parse args"); + assert_eq!(args.state_root, PathBuf::from("/tmp/daemon")); + assert_eq!(args.rpki_bin, PathBuf::from("/bin/echo")); + assert_eq!(args.interval_secs, 0); + assert_eq!(args.max_runs, Some(2)); + assert_eq!(args.retain_runs, 3); + assert_eq!(args.work_db, PathBuf::from("/tmp/daemon/state/work-db")); + assert_eq!( + args.repo_bytes_db, + Some(PathBuf::from("/tmp/daemon/state/repo-bytes.db")) + ); + assert_eq!( + args.child_args, + vec![ + "--db", + "{state_root}/state/work-db", + "--report-json", + "{run_out}/report.json" + ] + ); + } + + #[test] + fn usage_and_parse_args_cover_optional_flags_and_errors() { + let help = usage(); + assert!(help.contains("--db-stats-exact-every")); + assert!(help.contains("{run_seq}")); + + let argv = vec![ + "rpki_daemon".to_string(), + "--state-root".to_string(), + "/tmp/daemon".to_string(), + "--rpki-bin".to_string(), + "/bin/echo".to_string(), + "--status-json".to_string(), + "/tmp/status.json".to_string(), + "--summary-jsonl".to_string(), + "/tmp/runs.jsonl".to_string(), + "--work-db".to_string(), + "{state_root}/work".to_string(), + "--repo-bytes-db".to_string(), + "{state_root}/repo-bytes".to_string(), + "--raw-store-db".to_string(), + "{state_root}/raw".to_string(), + "--db-stats-bin".to_string(), + "/bin/echo".to_string(), + "--db-stats-exact-every".to_string(), + "2".to_string(), + "--time-bin".to_string(), + "/usr/bin/time".to_string(), + "--no-time-wrapper".to_string(), + "--".to_string(), + "child".to_string(), + ]; + let args = parse_args(&argv).expect("optional args"); + assert_eq!(args.status_json, Some(PathBuf::from("/tmp/status.json"))); + assert_eq!(args.summary_jsonl, Some(PathBuf::from("/tmp/runs.jsonl"))); + assert_eq!(args.work_db, PathBuf::from("{state_root}/work")); + assert_eq!( + args.repo_bytes_db, + Some(PathBuf::from("{state_root}/repo-bytes")) + ); + assert_eq!(args.raw_store_db, Some(PathBuf::from("{state_root}/raw"))); + assert_eq!(args.db_stats_bin, Some(PathBuf::from("/bin/echo"))); + assert_eq!(args.db_stats_exact_every, Some(2)); + assert_eq!(args.time_bin, None); + + for (argv, expected) in [ + (vec!["rpki_daemon", "--help"], "Usage:"), + ( + vec![ + "rpki_daemon", + "--state-root", + "/tmp/x", + "--rpki-bin", + "/bin/echo", + "--max-runs", + "0", + "--", + "child", + ], + "--max-runs must be > 0", + ), + ( + vec![ + "rpki_daemon", + "--state-root", + "/tmp/x", + "--rpki-bin", + "/bin/echo", + "--retain-runs", + "0", + "--", + "child", + ], + "--retain-runs must be > 0", + ), + ( + vec![ + "rpki_daemon", + "--state-root", + "/tmp/x", + "--rpki-bin", + "/bin/echo", + "--db-stats-exact-every", + "0", + "--", + "child", + ], + "--db-stats-exact-every must be > 0", + ), + (vec!["rpki_daemon", "--unknown"], "unknown argument"), + ( + vec!["rpki_daemon", "--state-root"], + "--state-root requires a value", + ), + (vec!["rpki_daemon"], "missing -- before child rpki args"), + ( + vec![ + "rpki_daemon", + "--state-root", + "/tmp/x", + "--rpki-bin", + "/bin/echo", + "--", + ], + "child rpki args are required", + ), + ] { + let owned: Vec = argv.into_iter().map(str::to_string).collect(); + let err = parse_args(&owned).expect_err("parse should fail"); + assert!(err.contains(expected), "{err}"); + } + } + + #[test] + fn render_child_args_replaces_placeholders() { + let args = Args { + state_root: PathBuf::from("/tmp/root"), + rpki_bin: PathBuf::from("/bin/echo"), + interval_secs: 0, + max_runs: Some(1), + retain_runs: 10, + status_json: None, + summary_jsonl: None, + work_db: PathBuf::from("/tmp/root/state/work-db"), + repo_bytes_db: Some(PathBuf::from("/tmp/root/state/repo-bytes.db")), + raw_store_db: None, + db_stats_bin: None, + db_stats_exact_every: None, + time_bin: None, + child_args: vec![ + "{state_root}/state/work-db".to_string(), + "{run_out}/result.ccr".to_string(), + "{run_id}".to_string(), + "{run_seq}".to_string(), + ], + }; + let ctx = RunContext { + seq: 7, + run_id: "000007-20260428T090000Z".to_string(), + run_dir: PathBuf::from("/tmp/root/runs/000007-20260428T090000Z"), + }; + + assert_eq!( + render_child_args(&args.child_args, &args, &ctx), + vec![ + "/tmp/root/state/work-db", + "/tmp/root/runs/000007-20260428T090000Z/result.ccr", + "000007-20260428T090000Z", + "7", + ] + ); + } + + #[test] + fn path_json_and_report_helpers_cover_fallbacks_and_nested_stats() { + let td = tempfile::tempdir().expect("tempdir"); + let state_root = td.path().join("daemon"); + let mut args = test_args(state_root.clone()); + args.work_db = PathBuf::from("{state_root}/state/work-db"); + args.repo_bytes_db = Some(PathBuf::from("{state_root}/state/repo-bytes.db")); + args.raw_store_db = Some(PathBuf::from("{state_root}/state/raw-store.db")); + + let now = time::Date::from_calendar_date(2026, time::Month::April, 28) + .expect("date") + .with_hms(9, 0, 0) + .expect("time") + .assume_utc(); + let ctx = make_run_context(&args, 42, now); + assert_eq!(ctx.run_id, "000042-20260428T090000Z"); + + let rendered = render_path_template(Path::new("{run_out}/{run_id}/{run_seq}"), &args, &ctx); + assert!(rendered.ends_with("000042-20260428T090000Z/000042-20260428T090000Z/42")); + + let nested_json = td.path().join("nested/out/status.json"); + write_json_pretty(&nested_json, &serde_json::json!({"ok": true})).expect("write json"); + append_json_line( + &td.path().join("nested/out/runs.jsonl"), + &serde_json::json!({"n": 1}), + ) + .expect("append jsonl"); + + fs::create_dir_all(ctx.run_dir.join("subdir")).expect("subdir"); + fs::write(ctx.run_dir.join("a.txt"), "aaa").expect("file"); + fs::write(ctx.run_dir.join("subdir/ignored.txt"), "bbb").expect("nested file"); + let artifacts = collect_artifacts(&ctx.run_dir).expect("artifacts"); + assert_eq!(artifacts.len(), 1); + assert!(artifacts[0].path.ends_with("a.txt")); + + fs::create_dir_all(state_root.join("state/work-db/nested")).expect("work db"); + fs::write(state_root.join("state/work-db/file.sst"), "abc").expect("sst"); + fs::write(state_root.join("state/work-db/nested/inner.sst"), "def").expect("inner"); + fs::write(state_root.join("state/raw-store.db"), "raw").expect("raw file"); + let file_stats = + collect_path_file_stats("raw_store_db", &state_root.join("state/raw-store.db")); + assert!(file_stats.exists); + assert!(!file_stats.is_dir); + assert_eq!(file_stats.file_count, 1); + let missing_stats = collect_path_file_stats("missing", &state_root.join("missing")); + assert!(!missing_stats.exists); + let state_stats = collect_state_path_stats(&args, &ctx); + assert!( + state_stats + .iter() + .any(|s| s.label == "raw_store_db" && s.exists) + ); + assert!( + state_stats + .iter() + .any(|s| s.label == "work_db" && s.file_count == 2) + ); + + let report_path = td.path().join("report.json"); + fs::write( + &report_path, + r#"{"repo_sync_stats": { "nested": {"text": "a\"b"} }, "after": 1}"#, + ) + .expect("report"); + assert_eq!( + extract_json_object_field(&report_path, "repo_sync_stats").expect("repo stats")["nested"] + ["text"], + "a\"b" + ); + fs::write(&report_path, r#"{"repo_sync_stats": []}"#).expect("report"); + assert!(extract_json_object_field(&report_path, "repo_sync_stats").is_none()); + assert!(extract_json_object_field(&report_path, "missing").is_none()); + + let counts = parse_report_counts_fallback(&serde_json::json!({ + "vrps": [{}, {}], + "aspas": [{}], + "publication_points": [{"warnings": [{}, {}]}, {"warnings": [{}]}], + "tree": {"instances_processed": 2, "instances_failed": 1, "warnings": [{}]} + })); + assert_eq!(counts.vrps, 2); + assert_eq!(counts.aspas, 1); + assert_eq!(counts.publication_points, 2); + assert_eq!(counts.warnings, 4); + assert_eq!(counts.tree_instances_processed, Some(2)); + assert_eq!(counts.tree_instances_failed, Some(1)); + } + + #[test] + fn retention_removes_oldest_run_directories() { + let td = tempfile::tempdir().expect("tempdir"); + let runs = td.path().join("runs"); + fs::create_dir_all(&runs).expect("runs dir"); + for name in [ + "000001-20260428T000001Z", + "000002-20260428T000002Z", + "000003-20260428T000003Z", + ] { + fs::create_dir_all(runs.join(name)).expect("run dir"); + } + + let removed = apply_retention(&runs, 2).expect("retention"); + assert_eq!(removed.len(), 1); + assert!(!runs.join("000001-20260428T000001Z").exists()); + assert!(runs.join("000002-20260428T000002Z").exists()); + assert!(runs.join("000003-20260428T000003Z").exists()); + } + + #[test] + fn retention_empty_root_and_parse_metrics_error_paths_are_reported() { + let td = tempfile::tempdir().expect("tempdir"); + let missing_runs = td.path().join("missing-runs"); + assert!( + apply_retention(&missing_runs, 2) + .expect("empty retention") + .is_empty() + ); + + let disabled = collect_process_metrics(false, &td.path().join("missing-time.txt")); + assert!(!disabled.time_wrapper_used); + assert!(disabled.time_output_path.is_none()); + + let missing = collect_process_metrics(true, &td.path().join("missing-time.txt")); + assert!(missing.time_wrapper_used); + assert!( + missing + .parse_error + .expect("parse error") + .contains("read process time output failed") + ); + } + + #[test] + fn process_metrics_parses_gnu_time_elapsed_line() { + let td = tempfile::tempdir().expect("tempdir"); + let path = td.path().join("time.txt"); + fs::write( + &path, + "User time (seconds): 1.25\nSystem time (seconds): 0.50\nPercent of CPU this job got: 175%\nElapsed (wall clock) time (h:mm:ss or m:ss): 0:01.00\nMaximum resident set size (kbytes): 12345\nExit status: 0\n", + ) + .expect("write time"); + + let metrics = collect_process_metrics(true, &path); + assert_eq!(metrics.user_seconds, Some(1.25)); + assert_eq!(metrics.system_seconds, Some(0.50)); + assert_eq!(metrics.cpu_percent, Some(175.0)); + assert_eq!(metrics.elapsed_raw.as_deref(), Some("0:01.00")); + assert_eq!(metrics.max_rss_kb, Some(12345)); + assert_eq!(metrics.exit_status_from_time, Some(0)); + } + + #[test] + #[cfg(unix)] + fn child_and_db_stats_error_paths_are_reported() { + use std::os::unix::fs::PermissionsExt; + + let td = tempfile::tempdir().expect("tempdir"); + let run_dir = td.path().join("run"); + fs::create_dir_all(&run_dir).expect("run dir"); + let missing_db = td.path().join("missing-db"); + let skipped = + run_db_stats_command(Path::new("/bin/echo"), &missing_db, &run_dir, "estimate"); + assert_eq!(skipped.status, "skipped"); + assert!(skipped.output_path.is_none()); + + let db_path = td.path().join("db"); + fs::create_dir_all(&db_path).expect("db"); + let failing_bin = td.path().join("failing_db_stats.sh"); + fs::write( + &failing_bin, + "#!/bin/sh\necho partial=1\necho db-stats failed >&2\nexit 7\n", + ) + .expect("script"); + let mut permissions = fs::metadata(&failing_bin).expect("metadata").permissions(); + permissions.set_mode(0o755); + fs::set_permissions(&failing_bin, permissions).expect("chmod"); + let failed = run_db_stats_command(&failing_bin, &db_path, &run_dir, "exact"); + assert_eq!(failed.status, "failed"); + assert_eq!(failed.exit_code, Some(7)); + assert_eq!(failed.metrics.get("partial").map(String::as_str), Some("1")); + assert!(failed.stderr_path.is_some()); + assert!(failed.error.expect("stderr").contains("db-stats failed")); + + let spawn_failed = run_db_stats_command( + Path::new("/definitely/not/db_stats"), + &db_path, + &run_dir, + "estimate", + ); + assert_eq!(spawn_failed.status, "spawn_failed"); + assert!( + spawn_failed + .error + .expect("spawn err") + .contains("spawn db_stats failed") + ); + + let metrics = parse_key_value_metrics("alpha=1\nnot-a-pair\nbeta = two\n"); + assert_eq!(metrics.get("alpha").map(String::as_str), Some("1")); + assert_eq!(metrics.get("beta").map(String::as_str), Some("two")); + + let mut args = test_args(td.path().join("daemon")); + args.rpki_bin = PathBuf::from("/bin/sh"); + args.time_bin = Some(PathBuf::from("/usr/bin/time")); + args.child_args = vec![ + "-c".to_string(), + "echo child-out; echo child-err >&2; exit 7".to_string(), + ]; + let ctx = RunContext { + seq: 1, + run_id: "000001-20260428T000000Z".to_string(), + run_dir: args.state_root.join("runs/000001-20260428T000000Z"), + }; + let summary = run_child_once(&args, &ctx).expect("run failing child"); + assert_eq!(summary.status, RunStatus::Failed); + assert_eq!(summary.exit_code, Some(7)); + let process_metrics = summary.process_metrics.expect("process metrics"); + assert!(process_metrics.time_wrapper_used); + assert_eq!(process_metrics.exit_status_from_time, Some(7)); + + let mut spawn_args = test_args(td.path().join("spawn")); + spawn_args.rpki_bin = PathBuf::from("/definitely/not/rpki"); + spawn_args.child_args = vec!["--help".to_string()]; + let spawn_ctx = RunContext { + seq: 1, + run_id: "000001-20260428T000001Z".to_string(), + run_dir: spawn_args.state_root.join("runs/000001-20260428T000001Z"), + }; + let spawn_summary = run_child_once(&spawn_args, &spawn_ctx).expect("spawn summary"); + assert_eq!(spawn_summary.status, RunStatus::SpawnFailed); + assert!( + spawn_summary + .error + .expect("spawn error") + .contains("spawn child failed") + ); + } + + #[test] + fn report_metadata_prefers_stdout_summary_and_extracts_repo_sync_stats() { + let td = tempfile::tempdir().expect("tempdir"); + fs::write( + td.path().join("stdout.log"), + "RPKI stage2 serial run summary\npublication_points_processed=7 publication_points_failed=1\nrrdp_repos_unique=3\nvrps=11\naspas=2\naudit_publication_points=7\nwarnings_total=5\n", + ) + .expect("stdout"); + fs::write( + td.path().join("report.json"), + "{\"large\":[{\"ignored\":\"{}\"}],\"repo_sync_stats\":{\"by_phase\":{\"rrdp_ok\":{\"count\":7}},\"by_terminal_state\":{},\"publication_points_total\":7}}", + ) + .expect("report"); + + let (counts, repo_sync_stats) = parse_report_metadata(td.path()); + let counts = counts.expect("counts"); + assert_eq!(counts.vrps, 11); + assert_eq!(counts.aspas, 2); + assert_eq!(counts.publication_points, 7); + assert_eq!(counts.rrdp_repos_unique, Some(3)); + assert_eq!(counts.tree_instances_processed, Some(7)); + assert_eq!(counts.tree_instances_failed, Some(1)); + assert_eq!(counts.warnings, 5); + assert_eq!( + repo_sync_stats.expect("repo sync")["by_phase"]["rrdp_ok"]["count"].as_u64(), + Some(7) + ); + } + + #[test] + fn daemon_exits_immediately_when_max_runs_already_reached() { + let td = tempfile::tempdir().expect("tempdir"); + let mut args = test_args(td.path().join("daemon")); + args.max_runs = Some(0); + + run_daemon(&args).expect("run daemon"); + + let status_text = + fs::read_to_string(args.state_root.join("daemon-status.json")).expect("status"); + assert!(status_text.contains("\"state\": \"exited\"")); + assert!(status_text.contains("\"runsCompleted\": 0")); + assert!(args.state_root.join("runs").exists()); + } + + #[test] + fn daemon_runs_fake_child_twice_and_writes_summaries() { + let td = tempfile::tempdir().expect("tempdir"); + let args = Args { + state_root: td.path().join("daemon"), + rpki_bin: PathBuf::from("/bin/sh"), + interval_secs: 0, + max_runs: Some(2), + retain_runs: 10, + status_json: None, + summary_jsonl: None, + work_db: td.path().join("daemon/state/work-db"), + repo_bytes_db: Some(td.path().join("daemon/state/repo-bytes.db")), + raw_store_db: None, + db_stats_bin: None, + db_stats_exact_every: None, + time_bin: None, + child_args: vec![ + "-c".to_string(), + "echo stdout-{run_seq}; echo stderr-{run_seq} >&2; echo marker > {run_out}/marker.txt".to_string(), + ], + }; + + run_daemon(&args).expect("run daemon"); + + let status_text = + fs::read_to_string(args.state_root.join("daemon-status.json")).expect("status"); + assert!(status_text.contains("\"state\": \"exited\"")); + assert!(status_text.contains("\"runsCompleted\": 2")); + + let jsonl = + fs::read_to_string(args.state_root.join("daemon-runs.jsonl")).expect("summary jsonl"); + assert_eq!(jsonl.lines().count(), 2); + + let run_dirs = fs::read_dir(args.state_root.join("runs")) + .expect("runs dir") + .collect::, _>>() + .expect("entries"); + assert_eq!(run_dirs.len(), 2); + for entry in run_dirs { + assert!(entry.path().join("run-summary.json").exists()); + assert!(entry.path().join("marker.txt").exists()); + } + } + + #[test] + fn daemon_sleep_and_retention_deletion_are_recorded() { + let td = tempfile::tempdir().expect("tempdir"); + let mut args = test_args(td.path().join("daemon")); + args.rpki_bin = PathBuf::from("/bin/sh"); + args.interval_secs = 1; + args.max_runs = Some(2); + args.retain_runs = 1; + args.child_args = vec![ + "-c".to_string(), + "echo run-{run_seq}; printf marker > {run_out}/marker.txt".to_string(), + ]; + + run_daemon(&args).expect("run daemon"); + + let jsonl = fs::read_to_string(args.state_root.join("daemon-runs.jsonl")).expect("jsonl"); + assert_eq!(jsonl.lines().count(), 2); + let last: serde_json::Value = + serde_json::from_str(jsonl.lines().last().expect("last line")).expect("summary"); + assert_eq!( + last["retentionDeletedRuns"] + .as_array() + .expect("deleted") + .len(), + 1 + ); + let run_dirs = fs::read_dir(args.state_root.join("runs")) + .expect("runs") + .collect::, _>>() + .expect("entries"); + assert_eq!(run_dirs.len(), 1); + } + + #[test] + #[cfg(unix)] + fn daemon_collects_stage_report_db_and_file_metrics() { + use std::os::unix::fs::PermissionsExt; + + let td = tempfile::tempdir().expect("tempdir"); + let db_stats_bin = td.path().join("fake_db_stats.sh"); + fs::write( + &db_stats_bin, + "#!/bin/sh\nif [ \"$3\" = \"--exact\" ]; then echo mode=exact; else echo mode=estimate; fi\necho total=42\necho db.files.total_size_bytes=123\n", + ) + .expect("fake db_stats"); + let mut permissions = fs::metadata(&db_stats_bin).expect("metadata").permissions(); + permissions.set_mode(0o755); + fs::set_permissions(&db_stats_bin, permissions).expect("chmod"); + + let args = Args { + state_root: td.path().join("daemon"), + rpki_bin: PathBuf::from("/bin/sh"), + interval_secs: 0, + max_runs: Some(1), + retain_runs: 10, + status_json: None, + summary_jsonl: None, + work_db: PathBuf::from("{state_root}/state/work-db"), + repo_bytes_db: Some(PathBuf::from("{state_root}/state/repo-bytes.db")), + raw_store_db: None, + db_stats_bin: Some(db_stats_bin), + db_stats_exact_every: Some(1), + time_bin: None, + child_args: vec![ + "-c".to_string(), + "mkdir -p {state_root}/state/work-db {state_root}/state/repo-bytes.db; \ + printf x > {state_root}/state/work-db/000001.sst; \ + printf '{\"validation_ms\":7,\"download_event_count\":2}' > {run_out}/stage-timing.json; \ + printf '{\"tree\":{\"instances_processed\":3,\"instances_failed\":1,\"warnings\":[{}]},\"publication_points\":[{},{}],\"vrps\":[{},{}],\"aspas\":[{}],\"repo_sync_stats\":{\"by_phase\":{\"snapshot\":{\"count\":1}}}}' > {run_out}/report.json" + .to_string(), + ], + }; + + run_daemon(&args).expect("run daemon"); + + let run_summary = find_named_file(&args.state_root.join("runs"), "run-summary.json") + .expect("run summary"); + let summary: serde_json::Value = + serde_json::from_slice(&fs::read(run_summary).expect("read run summary")) + .expect("parse run summary"); + + assert_eq!(summary["stageTiming"]["validation_ms"].as_u64(), Some(7)); + assert_eq!(summary["reportCounts"]["vrps"].as_u64(), Some(2)); + assert_eq!(summary["reportCounts"]["aspas"].as_u64(), Some(1)); + assert_eq!( + summary["reportCounts"]["publicationPoints"].as_u64(), + Some(2) + ); + assert_eq!( + summary["repoSyncStats"]["by_phase"]["snapshot"]["count"].as_u64(), + Some(1) + ); + assert_eq!(summary["dbStats"].as_array().expect("db stats").len(), 2); + assert!( + summary["pathStats"] + .as_array() + .expect("path stats") + .iter() + .any(|item| item["label"] == "work_db" && item["exists"] == true) + ); + } +} diff --git a/src/validation/manifest.rs b/src/validation/manifest.rs index 326d4ed..667b9c2 100644 --- a/src/validation/manifest.rs +++ b/src/validation/manifest.rs @@ -214,6 +214,17 @@ pub enum ManifestFreshError { HashMismatch { rsync_uri: String }, } +impl ManifestFreshError { + pub(crate) fn should_warn_when_current_instance_reused(&self) -> bool { + !matches!( + self, + ManifestFreshError::RepoSyncFailed { .. } + | ManifestFreshError::MissingManifest { .. } + | ManifestFreshError::MissingFile { .. } + ) + } +} + #[derive(Debug, thiserror::Error)] pub enum ManifestReuseError { #[error("latest current-instance VCIR missing: {0} (RFC 9286 §6.6)")] @@ -398,12 +409,6 @@ pub fn process_manifest_publication_point_after_repo_sync( Err(ManifestProcessError::StopAllOutput(fresh_err)) } CaFailedFetchPolicy::ReuseCurrentInstanceVcir => { - let mut warnings = vec![ - Warning::new(format!("manifest failed fetch: {fresh_err}")) - .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) - .with_context(manifest_rsync_uri), - ]; - match load_current_instance_vcir_publication_point( store, manifest_rsync_uri, @@ -411,11 +416,14 @@ pub fn process_manifest_publication_point_after_repo_sync( validation_time, ) { Ok(snapshot) => { - warnings.push( - Warning::new("using latest validated result for current CA instance") - .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) - .with_context(manifest_rsync_uri), - ); + let mut warnings = Vec::new(); + if fresh_err.should_warn_when_current_instance_reused() { + warnings.push( + Warning::new(format!("manifest failed fetch: {fresh_err}")) + .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) + .with_context(manifest_rsync_uri), + ); + } Ok(PublicationPointResult { source: PublicationPointSource::VcirCurrentInstance, snapshot, diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index d62869d..2064dd5 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -1960,16 +1960,17 @@ fn project_current_instance_vcir_on_failed_fetch( fresh_err: &ManifestFreshError, validation_time: time::OffsetDateTime, ) -> Result { - let mut warnings = vec![ - Warning::new(format!("manifest failed fetch: {fresh_err}")) - .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) - .with_context(&ca.manifest_rsync_uri), - ]; + let mut warnings = Vec::new(); let Some(vcir) = store .get_vcir(&ca.manifest_rsync_uri) .map_err(|e| format!("load VCIR failed: {e}"))? else { + warnings.push( + Warning::new(format!("manifest failed fetch: {fresh_err}")) + .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) + .with_context(&ca.manifest_rsync_uri), + ); warnings.push( Warning::new( "no latest validated result for current CA instance; no cached output reused", @@ -1990,6 +1991,11 @@ fn project_current_instance_vcir_on_failed_fetch( }; if !vcir.audit_summary.failed_fetch_eligible { + warnings.push( + Warning::new(format!("manifest failed fetch: {fresh_err}")) + .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) + .with_context(&ca.manifest_rsync_uri), + ); warnings.push( Warning::new( "latest VCIR is not marked failed-fetch eligible; no cached output reused", @@ -2012,6 +2018,11 @@ fn project_current_instance_vcir_on_failed_fetch( let instance_effective_until = parse_snapshot_time_value(&vcir.instance_gate.instance_effective_until)?; if validation_time > instance_effective_until { + warnings.push( + Warning::new(format!("manifest failed fetch: {fresh_err}")) + .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) + .with_context(&ca.manifest_rsync_uri), + ); warnings.push( Warning::new( "latest VCIR instance_gate expired; current instance contributes no cached output", @@ -2031,14 +2042,17 @@ fn project_current_instance_vcir_on_failed_fetch( }); } - warnings.push( - Warning::new("using latest validated result for current CA instance") - .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) - .with_context(&ca.manifest_rsync_uri), - ); - let ccr_manifest_projection = reuse_ccr_manifest_projection_from_vcir(ca, &vcir)?; - let snapshot = reconstruct_snapshot_from_vcir(store, ca, &vcir, &mut warnings); + if fresh_err.should_warn_when_current_instance_reused() { + warnings.push( + Warning::new(format!("manifest failed fetch: {fresh_err}")) + .with_rfc_refs(&[RfcRef("RFC 9286 §6.6")]) + .with_context(&ca.manifest_rsync_uri), + ); + } + // Current-instance reuse is fully described by VCIR projections; rebuilding a + // byte-backed snapshot here only duplicates repo-byte I/O and creates warning noise. + let snapshot = None; let objects = build_objects_output_from_vcir(&vcir, validation_time, &mut warnings); let (discovered_children, child_audits) = restore_children_from_vcir(store, ca, &vcir, &mut warnings); @@ -2055,6 +2069,7 @@ fn project_current_instance_vcir_on_failed_fetch( }) } +#[cfg(test)] fn reconstruct_snapshot_from_vcir( store: &RocksStore, ca: &CaInstanceHandle, @@ -2065,8 +2080,8 @@ fn reconstruct_snapshot_from_vcir( artifact.artifact_role == VcirArtifactRole::Manifest && artifact.uri.as_deref() == Some(ca.manifest_rsync_uri.as_str()) })?; - let manifest_entry = match store.get_raw_by_hash_entry(&manifest_artifact.sha256) { - Ok(Some(entry)) => entry, + let manifest_bytes = match store.get_blob_bytes(&manifest_artifact.sha256) { + Ok(Some(bytes)) => bytes, Ok(None) => { warnings.push( Warning::new("manifest raw bytes missing for VCIR audit reconstruction") @@ -2101,8 +2116,8 @@ fn reconstruct_snapshot_from_vcir( if !seen.insert(uri.clone()) { continue; } - match store.get_raw_by_hash_entry(&artifact.sha256) { - Ok(Some(entry)) => files.push(PackFile::from_bytes_compute_sha256(uri, entry.bytes)), + match store.get_blob_bytes(&artifact.sha256) { + Ok(Some(bytes)) => files.push(PackFile::from_bytes_compute_sha256(uri, bytes)), Ok(None) => warnings.push( Warning::new("related artifact raw bytes missing for VCIR audit reconstruction") .with_context(uri), @@ -2133,7 +2148,7 @@ fn reconstruct_snapshot_from_vcir( .validated_manifest_next_update .clone(), verified_at: vcir.last_successful_validation_time.clone(), - manifest_bytes: manifest_entry.bytes, + manifest_bytes, files, }) } @@ -2399,14 +2414,14 @@ fn restore_children_from_vcir( let mut children = Vec::new(); let mut audits = Vec::new(); for child in &vcir.child_entries { - match store.get_raw_by_hash_entry(&child.child_cert_hash) { - Ok(Some(entry)) => { + match store.get_blob_bytes(&child.child_cert_hash) { + Ok(Some(bytes)) => { children.push(DiscoveredChildCaInstance { handle: CaInstanceHandle { depth: 0, tal_id: ca.tal_id.clone(), parent_manifest_rsync_uri: Some(ca.manifest_rsync_uri.clone()), - ca_certificate_der: entry.bytes, + ca_certificate_der: bytes, ca_certificate_rsync_uri: Some(child.child_cert_rsync_uri.clone()), effective_ip_resources: child.child_effective_ip_resources.clone(), effective_as_resources: child.child_effective_as_resources.clone(), @@ -2433,7 +2448,7 @@ fn restore_children_from_vcir( } Ok(None) => { warnings.push( - Warning::new("child certificate raw bytes missing for VCIR child restoration") + Warning::new("child certificate bytes missing for VCIR child restoration") .with_context(&child.child_cert_rsync_uri), ); audits.push(ObjectAuditEntry { @@ -2442,15 +2457,14 @@ fn restore_children_from_vcir( kind: AuditObjectKind::Certificate, result: AuditObjectResult::Error, detail: Some( - "child certificate raw bytes missing for VCIR child restoration" - .to_string(), + "child certificate bytes missing for VCIR child restoration".to_string(), ), }); } Err(e) => { warnings.push( Warning::new(format!( - "child certificate raw bytes load failed for VCIR child restoration: {e}" + "child certificate bytes load failed for VCIR child restoration: {e}" )) .with_context(&child.child_cert_rsync_uri), ); @@ -2460,7 +2474,7 @@ fn restore_children_from_vcir( kind: AuditObjectKind::Certificate, result: AuditObjectResult::Error, detail: Some(format!( - "child certificate raw bytes load failed for VCIR child restoration: {e}" + "child certificate bytes load failed for VCIR child restoration: {e}" )), }); } @@ -5699,45 +5713,21 @@ authorityKeyIdentifier = keyid:always let vcir = sample_vcir_for_projection(now, &child_cert_hash); let store_dir = tempfile::tempdir().expect("store dir"); - let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let main_db = store_dir.path().join("work-db"); + let repo_bytes_db = store_dir.path().join("repo-bytes.db"); + let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db) + .expect("open rocksdb with external repo bytes"); store.put_vcir(&vcir).expect("put vcir"); - for (bytes, uri, object_type) in [ - ( - b"manifest-bytes".to_vec(), - Some(vcir.current_manifest_rsync_uri.clone()), - Some("mft".to_string()), - ), - ( - b"current-crl-bytes".to_vec(), - Some(vcir.current_crl_rsync_uri.clone()), - Some("crl".to_string()), - ), - ( - g.child_ca_der.clone(), - Some(vcir.child_entries[0].child_cert_rsync_uri.clone()), - Some("cer".to_string()), - ), - ( - b"roa-bytes".to_vec(), - Some("rsync://example.test/repo/issuer/a.roa".to_string()), - Some("roa".to_string()), - ), - ( - b"aspa-bytes".to_vec(), - Some("rsync://example.test/repo/issuer/a.asa".to_string()), - Some("aspa".to_string()), - ), - ] { - let mut entry = RawByHashEntry::from_bytes(sha256_hex(&bytes), bytes); - if let Some(uri) = uri { - entry.origin_uris.push(uri); - } - entry.object_type = object_type; - entry.encoding = Some("der".to_string()); + store + .put_blob_bytes_batch(&[(child_cert_hash.clone(), g.child_ca_der.clone())]) + .expect("put child cert repo bytes"); + assert!( store - .put_raw_by_hash_entry(&entry) - .expect("put raw_by_hash"); - } + .get_raw_by_hash_entry(&child_cert_hash) + .expect("lookup child raw_by_hash") + .is_none(), + "child cert restoration should not require raw_by_hash entries" + ); let ca = CaInstanceHandle { depth: 0, @@ -5780,8 +5770,36 @@ authorityKeyIdentifier = keyid:always Some(&vcir.ccr_manifest_projection) ); assert!( - projection.snapshot.is_some(), - "expected reconstructed snapshot" + projection.snapshot.is_none(), + "current-instance reuse should not reconstruct a byte-backed snapshot" + ); + assert!( + !projection + .warnings + .iter() + .any(|warning| warning.message.contains("manifest failed fetch")), + "successful current-instance reuse should not duplicate the fresh fetch error" + ); + assert!( + !projection + .warnings + .iter() + .any(|warning| warning.message.contains("using latest validated result")), + "successful current-instance reuse should be tracked by source, not warning" + ); + assert!( + !projection + .warnings + .iter() + .any(|warning| warning.message.contains("manifest raw bytes missing")), + "successful current-instance reuse should not load repo bytes for audit reconstruction" + ); + assert!( + !projection + .warnings + .iter() + .any(|warning| warning.message.contains("child certificate bytes missing")), + "child discovery restoration should read child certs from repo bytes" ); } @@ -5836,6 +5854,65 @@ authorityKeyIdentifier = keyid:always assert!(projection.discovered_children.is_empty()); } + #[test] + fn project_current_instance_vcir_keeps_real_fresh_validation_warning() { + let now = time::OffsetDateTime::now_utc(); + let child_cert_hash = sha256_hex(b"child-cert"); + let vcir = sample_vcir_for_projection(now, &child_cert_hash); + + let store_dir = tempfile::tempdir().expect("store dir"); + let main_db = store_dir.path().join("work-db"); + let repo_bytes_db = store_dir.path().join("repo-bytes.db"); + let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db) + .expect("open rocksdb with external repo bytes"); + store.put_vcir(&vcir).expect("put vcir"); + store + .put_blob_bytes_batch(&[(child_cert_hash, b"child-cert".to_vec())]) + .expect("put child cert repo bytes"); + + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: Vec::new(), + ca_certificate_rsync_uri: None, + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: "rsync://example.test/repo/issuer/".to_string(), + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(), + rrdp_notification_uri: None, + }; + + let projection = project_current_instance_vcir_on_failed_fetch( + &store, + &ca, + &ManifestFreshError::HashMismatch { + rsync_uri: "rsync://example.test/repo/issuer/a.roa".to_string(), + }, + now, + ) + .expect("project vcir"); + + assert_eq!( + projection.source, + PublicationPointSource::VcirCurrentInstance + ); + assert!( + projection + .warnings + .iter() + .any(|warning| { warning.message.contains("manifest file hash mismatch") }) + ); + assert!( + !projection + .warnings + .iter() + .any(|warning| warning.message.contains("using latest validated result")), + "successful current-instance reuse should not emit bookkeeping warnings" + ); + } + #[test] fn project_current_instance_vcir_returns_no_output_when_latest_result_missing() { let now = time::OffsetDateTime::now_utc(); @@ -6509,6 +6586,74 @@ authorityKeyIdentifier = keyid:always })); } + #[test] + fn reconstruct_snapshot_from_vcir_reads_repo_bytes_without_raw_entries() { + let now = time::OffsetDateTime::now_utc(); + let child_cert_hash = sha256_hex(b"child-cert"); + let vcir = sample_vcir_for_projection(now, &child_cert_hash); + let ca = CaInstanceHandle { + depth: 0, + tal_id: "test-tal".to_string(), + parent_manifest_rsync_uri: None, + ca_certificate_der: Vec::new(), + ca_certificate_rsync_uri: None, + effective_ip_resources: None, + effective_as_resources: None, + rsync_base_uri: "rsync://example.test/repo/issuer/".to_string(), + manifest_rsync_uri: vcir.manifest_rsync_uri.clone(), + publication_point_rsync_uri: "rsync://example.test/repo/issuer/".to_string(), + rrdp_notification_uri: None, + }; + + let store_dir = tempfile::tempdir().expect("store dir"); + let main_db = store_dir.path().join("work-db"); + let repo_bytes_db = store_dir.path().join("repo-bytes.db"); + let store = RocksStore::open_with_external_repo_bytes(&main_db, &repo_bytes_db) + .expect("open rocksdb with external repo bytes"); + let repo_blobs = [ + b"manifest-bytes".to_vec(), + b"current-crl-bytes".to_vec(), + b"child-cert".to_vec(), + b"roa-bytes".to_vec(), + b"aspa-bytes".to_vec(), + ] + .into_iter() + .map(|bytes| (sha256_hex(&bytes), bytes)) + .collect::>(); + store + .put_blob_bytes_batch(&repo_blobs) + .expect("put external repo bytes"); + + let manifest_hash = vcir + .related_artifacts + .iter() + .find(|artifact| artifact.artifact_role == VcirArtifactRole::Manifest) + .expect("manifest artifact") + .sha256 + .clone(); + assert!( + store + .get_raw_by_hash_entry(&manifest_hash) + .expect("raw manifest lookup") + .is_none(), + "repo object bytes must not require raw_by_hash entries" + ); + + let mut warnings = Vec::new(); + let pack = reconstruct_snapshot_from_vcir(&store, &ca, &vcir, &mut warnings) + .expect("reconstruct pack from external repo bytes"); + assert_eq!(pack.manifest_bytes, b"manifest-bytes".to_vec()); + assert_eq!(pack.files.len(), 4, "crl + child cert + roa + aspa"); + assert!( + warnings.iter().all(|warning| { + !warning + .message + .contains("raw bytes missing for VCIR audit reconstruction") + }), + "external repo bytes should satisfy VCIR audit reconstruction without raw warnings" + ); + } + #[test] fn runner_dedup_paths_execute_with_timing_enabled() { let fixture_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) diff --git a/tests/test_apnic_rrdp_delta_live_20260226.rs b/tests/test_apnic_rrdp_delta_live_20260226.rs index 6f53096..2be02fd 100644 --- a/tests/test_apnic_rrdp_delta_live_20260226.rs +++ b/tests/test_apnic_rrdp_delta_live_20260226.rs @@ -318,9 +318,11 @@ fn apnic_root_repo_sync_failure_reuses_current_instance_vcir() { assert_eq!(pp.source, PublicationPointSource::VcirCurrentInstance); assert!( - pp.warnings.iter().any(|w| w - .message - .contains("using latest validated result for current CA instance")), - "expected current-instance VCIR reuse warning" + pp.warnings.iter().all(|w| { + !w.message + .contains("using latest validated result for current CA instance") + && !w.message.contains("manifest failed fetch") + }), + "successful current-instance VCIR reuse should not emit fallback bookkeeping warnings" ); } diff --git a/tests/test_manifest_processor_m4.rs b/tests/test_manifest_processor_m4.rs index 11e907d..4f9c945 100644 --- a/tests/test_manifest_processor_m4.rs +++ b/tests/test_manifest_processor_m4.rs @@ -298,10 +298,18 @@ fn manifest_hash_mismatch_reuses_current_instance_vcir_when_enabled() { .expect("second run reuses current-instance VCIR"); assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance); assert!( - second.warnings.iter().any(|w| w - .message - .contains("using latest validated result for current CA instance")), - "expected current-instance VCIR reuse warning" + second + .warnings + .iter() + .any(|w| w.message.contains("manifest file hash mismatch")), + "expected warning for real fresh manifest validation failure" + ); + assert!( + second.warnings.iter().all(|w| { + !w.message + .contains("using latest validated result for current CA instance") + }), + "successful current-instance VCIR reuse should not emit fallback bookkeeping warning" ); } diff --git a/tests/test_manifest_processor_repo_sync_and_cached_snapshot_cov.rs b/tests/test_manifest_processor_repo_sync_and_cached_snapshot_cov.rs index 43edf13..6d46403 100644 --- a/tests/test_manifest_processor_repo_sync_and_cached_snapshot_cov.rs +++ b/tests/test_manifest_processor_repo_sync_and_cached_snapshot_cov.rs @@ -247,10 +247,12 @@ fn repo_sync_failed_can_reuse_current_instance_vcir_when_present() { .expect("repo sync failure should reuse current-instance VCIR"); assert_eq!(second.source, PublicationPointSource::VcirCurrentInstance); assert!( - second.warnings.iter().any(|w| w - .message - .contains("using latest validated result for current CA instance")), - "expected current-instance VCIR reuse warning" + second.warnings.iter().all(|w| { + !w.message + .contains("using latest validated result for current CA instance") + && !w.message.contains("manifest failed fetch") + }), + "successful current-instance VCIR reuse should not emit fallback bookkeeping warnings" ); }