diff --git a/monitor/grafana/dashboards/ours-rp-repo-sync.json b/monitor/grafana/dashboards/ours-rp-repo-sync.json index 5cc9f1a..d3ece25 100644 --- a/monitor/grafana/dashboards/ours-rp-repo-sync.json +++ b/monitor/grafana/dashboards/ours-rp-repo-sync.json @@ -461,6 +461,80 @@ } ] }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 29, + "w": 12, + "h": 9 + }, + "id": 11, + "options": { + "showHeader": true, + "cellHeight": "sm", + "footer": { + "show": false, + "reducer": [ + "sum" + ], + "countRows": false, + "fields": "" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "topk(20, ours_rp_top_repository_sync_duration_seconds)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Top 20 Repositories by Sync Duration", + "type": "table", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "job": true, + "terminal_state": true, + "__name__": true, + "publication_points": true, + "instance": true, + "repo_id": true, + "phase": true, + "pp_id": true, + "exported_instance": true, + "rp": true, + "source": true + }, + "indexByName": { + "Time": 0, + "host": 1, + "rank": 2, + "transport": 3, + "uri": 4, + "Value": 5 + }, + "renameByName": { + "Value": "value" + } + } + } + ] + }, { "datasource": { "type": "prometheus", @@ -538,74 +612,129 @@ "type": "prometheus", "uid": "Prometheus" }, + "description": "Per-repository sync success in the latest successful run; 1 means successful, 0 means failed or failed_no_cache.", "fieldConfig": { "defaults": { - "unit": "none" + "unit": "bool" }, "overrides": [] }, "gridPos": { - "x": 12, - "y": 29, - "w": 12, - "h": 9 + "h": 8, + "w": 24, + "x": 0, + "y": 47 }, - "id": 11, + "id": 12, "options": { - "showHeader": true, - "cellHeight": "sm", - "footer": { - "show": false, - "reducer": [ - "sum" + "legend": { + "calcs": [ + "lastNotNull" ], - "countRows": false, - "fields": "" + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" } }, - "pluginVersion": "11.3.1", "targets": [ { - "expr": "topk(20, ours_rp_top_repository_sync_duration_seconds)", - "format": "table", - "instant": true, - "legendFormat": "", + "expr": "ours_rp_repository_sync_success", + "legendFormat": "{{host}} {{repo_id}}", "refId": "A" } ], - "title": "Top 20 Repositories by Sync Duration", - "type": "table", - "transformations": [ - { - "id": "organize", - "options": { - "excludeByName": { - "job": true, - "terminal_state": true, - "__name__": true, - "publication_points": true, - "instance": true, - "repo_id": true, - "phase": true, - "pp_id": true, - "exported_instance": true, - "rp": true, - "source": true - }, - "indexByName": { - "Time": 0, - "host": 1, - "rank": 2, - "transport": 3, - "uri": 4, - "Value": 5 - }, - "renameByName": { - "Value": "value" - } - } + "title": "Repository Sync Success by Repo", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "description": "Per-repository total sync duration aggregated from publication point repo_sync_duration_ms.", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 55 + }, + "id": 13, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" } - ] + }, + "targets": [ + { + "expr": "ours_rp_repository_sync_duration_seconds{stat=\"sum\"}", + "legendFormat": "{{host}} {{repo_id}}", + "refId": "A" + } + ], + "title": "Repository Sync Duration by Repo", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "description": "Per-repository downloaded bytes attributed from report.json downloads events.", + "fieldConfig": { + "defaults": { + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 63 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "expr": "ours_rp_repository_download_bytes", + "legendFormat": "{{host}} {{repo_id}}", + "refId": "A" + } + ], + "title": "Repository Download Bytes by Repo", + "type": "timeseries" } ], "refresh": "5s", diff --git a/src/bin/rpki_artifact_metrics.rs b/src/bin/rpki_artifact_metrics.rs index ed51248..974c376 100644 --- a/src/bin/rpki_artifact_metrics.rs +++ b/src/bin/rpki_artifact_metrics.rs @@ -251,6 +251,8 @@ struct RepoMetrics { host: String, transport: String, publication_points: u64, + sync_success: bool, + download_bytes: u64, duration_seconds_sum: f64, duration_seconds_max: f64, duration_seconds_avg: f64, @@ -659,11 +661,15 @@ fn parse_report(path: &Path, snapshot: &mut MetricsSnapshot, latest: &mut Latest } if let Some(pps) = report.get("publication_points").and_then(|v| v.as_array()) { latest.publication_points = pps.len() as u64; - extract_publication_point_metrics(pps, snapshot); + extract_publication_point_metrics(pps, report.get("downloads"), snapshot); } } -fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapshot) { +fn extract_publication_point_metrics( + pps: &[Value], + downloads: Option<&Value>, + snapshot: &mut MetricsSnapshot, +) { let mut repos: BTreeMap = BTreeMap::new(); let mut pp_by_object_count = Vec::::new(); let mut pp_by_sync_duration = Vec::::new(); @@ -706,11 +712,15 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh uri: repo_uri.to_string(), host: host.clone(), transport: transport.clone(), + sync_success: true, ..RepoMetrics::default() }); repo.publication_points += 1; repo.duration_seconds_sum += duration_seconds; repo.duration_seconds_max = repo.duration_seconds_max.max(duration_seconds); + if !is_success_terminal_state(&terminal_state) { + repo.sync_success = false; + } *repo.phase_counts.entry(phase.clone()).or_default() += 1; *repo .terminal_state_counts @@ -755,6 +765,7 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh } let mut repo_stats = repos.into_values().collect::>(); + assign_download_bytes_to_repos(&mut repo_stats, downloads); for repo in &mut repo_stats { if repo.publication_points > 0 { repo.duration_seconds_avg = repo.duration_seconds_sum / repo.publication_points as f64; @@ -799,6 +810,56 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh snapshot.top_pp_by_sync_duration = pp_by_sync_duration; } +fn assign_download_bytes_to_repos(repos: &mut [RepoMetrics], downloads: Option<&Value>) { + let Some(downloads) = downloads.and_then(|v| v.as_array()) else { + return; + }; + for download in downloads { + let Some(uri) = json_str(download, &["uri"]) else { + continue; + }; + let bytes = json_u64(download, &["bytes"]).unwrap_or(0); + if bytes == 0 { + continue; + } + if let Some(index) = find_repo_for_download(repos, uri) { + repos[index].download_bytes = repos[index].download_bytes.saturating_add(bytes); + } + } +} + +fn find_repo_for_download(repos: &[RepoMetrics], uri: &str) -> Option { + if let Some(index) = repos.iter().position(|repo| repo.uri == uri) { + return Some(index); + } + if uri.starts_with("rsync://") { + return repos + .iter() + .enumerate() + .filter(|(_, repo)| uri.starts_with(&repo.uri)) + .max_by_key(|(_, repo)| repo.uri.len()) + .map(|(index, _)| index); + } + + let uri_host = uri_host(uri); + let mut candidates = repos + .iter() + .enumerate() + .filter(|(_, repo)| repo.host == uri_host && repo.uri.starts_with("http")) + .collect::>(); + if candidates.len() == 1 { + return Some(candidates[0].0); + } + candidates.sort_by_key(|(_, repo)| common_prefix_len(&repo.uri, uri)); + candidates + .last() + .and_then(|(index, repo)| (common_prefix_len(&repo.uri, uri) > 0).then_some(*index)) +} + +fn is_success_terminal_state(state: &str) -> bool { + matches!(state, "fresh" | "cached" | "reused" | "valid") +} + fn parse_cir(path: &Path, snapshot: &mut MetricsSnapshot) { if !path.exists() { return; @@ -998,11 +1059,7 @@ fn render_metrics(snapshot: &MetricsSnapshot) -> String { render_top_repo_metrics(&mut writer, instance, &snapshot.top_repos_by_sync_duration); render_object_metrics(&mut writer, instance, &snapshot.object_counts); render_large_pp_metrics(&mut writer, instance, &snapshot.large_pp_counts); - render_top_publication_point_metrics( - &mut writer, - instance, - &snapshot.top_pp_by_object_count, - ); + render_top_publication_point_metrics(&mut writer, instance, &snapshot.top_pp_by_object_count); for (transport, histogram) in &snapshot.pp_sync_histograms { writer.histogram( "ours_rp_publication_point_sync_duration_seconds", @@ -1236,6 +1293,18 @@ fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[Rep &base, repo.publication_points as f64, ); + writer.gauge( + "ours_rp_repository_sync_success", + "Whether repository sync is successful in the latest run", + &base, + bool_value(repo.sync_success), + ); + writer.gauge( + "ours_rp_repository_download_bytes", + "Repository download bytes attributed from latest run download events", + &base, + repo.download_bytes as f64, + ); for (stat, value) in [ ("sum", repo.duration_seconds_sum), ("max", repo.duration_seconds_max), @@ -1288,10 +1357,7 @@ fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[Rep fn render_failed_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) { for repo in repos { - if repo - .phase_counts - .contains_key("rrdp_failed_rsync_failed") - { + if repo.phase_counts.contains_key("rrdp_failed_rsync_failed") { writer.gauge( "ours_rp_rrdp_rsync_failed_repository_duration_seconds", "Repositories whose RRDP and rsync sync both failed; value is max sync duration when available", @@ -1875,6 +1941,13 @@ fn short_sha256(value: &str) -> String { hex::encode(&digest[..6]) } +fn common_prefix_len(left: &str, right: &str) -> usize { + left.bytes() + .zip(right.bytes()) + .take_while(|(l, r)| l == r) + .count() +} + fn format_prom_value(value: f64) -> String { if value.is_infinite() && value.is_sign_positive() { "+Inf".to_string() @@ -1950,7 +2023,7 @@ mod tests { fs::write(run.join("stage-timing.json"), "{}").expect("stage"); fs::write( run.join("report.json"), - r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#, + r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"downloads":[{"kind":"rrdp_notification","uri":"https://repo.example/notify.xml","success":true,"duration_ms":100,"bytes":111},{"kind":"rrdp_delta","uri":"https://repo.example/session/1/delta.xml","success":true,"duration_ms":200,"bytes":222}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#, ) .expect("report"); fs::write(run.join("input.cir"), sample_cir()).expect("cir"); @@ -1959,11 +2032,15 @@ mod tests { let snapshot = scan_run_root(td.path(), "test").expect("scan"); assert_eq!(snapshot.runs.success, 1); assert_eq!(snapshot.repo_stats.len(), 1); + assert!(snapshot.repo_stats[0].sync_success); + assert_eq!(snapshot.repo_stats[0].download_bytes, 333); assert_eq!(snapshot.top_pp_by_object_count[0].object_count, 2); assert_eq!(snapshot.cir.as_ref().unwrap().objects, 1); assert_eq!(snapshot.ccr.as_ref().unwrap().state_items["tas"], 1); let metrics = render_metrics(&snapshot); assert!(metrics.contains("ours_rp_repository_info")); + assert!(metrics.contains("ours_rp_repository_sync_success")); + assert!(metrics.contains("ours_rp_repository_download_bytes")); assert!(metrics.contains("ours_rp_large_publication_points")); assert!(metrics.contains("ours_rp_cir_objects")); assert!(metrics.contains("ours_rp_ccr_state_items"));