20260527 增加repo同步监控面板指标

This commit is contained in:
yuyr 2026-05-27 15:47:09 +08:00
parent 7e1c24fcc3
commit 57c23f19aa
2 changed files with 269 additions and 63 deletions

View File

@ -461,6 +461,80 @@
}
]
},
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"fieldConfig": {
"defaults": {
"unit": "none"
},
"overrides": []
},
"gridPos": {
"x": 12,
"y": 29,
"w": 12,
"h": 9
},
"id": 11,
"options": {
"showHeader": true,
"cellHeight": "sm",
"footer": {
"show": false,
"reducer": [
"sum"
],
"countRows": false,
"fields": ""
}
},
"pluginVersion": "11.3.1",
"targets": [
{
"expr": "topk(20, ours_rp_top_repository_sync_duration_seconds)",
"format": "table",
"instant": true,
"legendFormat": "",
"refId": "A"
}
],
"title": "Top 20 Repositories by Sync Duration",
"type": "table",
"transformations": [
{
"id": "organize",
"options": {
"excludeByName": {
"job": true,
"terminal_state": true,
"__name__": true,
"publication_points": true,
"instance": true,
"repo_id": true,
"phase": true,
"pp_id": true,
"exported_instance": true,
"rp": true,
"source": true
},
"indexByName": {
"Time": 0,
"host": 1,
"rank": 2,
"transport": 3,
"uri": 4,
"Value": 5
},
"renameByName": {
"Value": "value"
}
}
}
]
},
{
"datasource": {
"type": "prometheus",
@ -538,74 +612,129 @@
"type": "prometheus",
"uid": "Prometheus"
},
"description": "Per-repository sync success in the latest successful run; 1 means successful, 0 means failed or failed_no_cache.",
"fieldConfig": {
"defaults": {
"unit": "none"
"unit": "bool"
},
"overrides": []
},
"gridPos": {
"x": 12,
"y": 29,
"w": 12,
"h": 9
"h": 8,
"w": 24,
"x": 0,
"y": 47
},
"id": 11,
"id": 12,
"options": {
"showHeader": true,
"cellHeight": "sm",
"footer": {
"show": false,
"reducer": [
"sum"
"legend": {
"calcs": [
"lastNotNull"
],
"countRows": false,
"fields": ""
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"pluginVersion": "11.3.1",
"targets": [
{
"expr": "topk(20, ours_rp_top_repository_sync_duration_seconds)",
"format": "table",
"instant": true,
"legendFormat": "",
"expr": "ours_rp_repository_sync_success",
"legendFormat": "{{host}} {{repo_id}}",
"refId": "A"
}
],
"title": "Top 20 Repositories by Sync Duration",
"type": "table",
"transformations": [
{
"id": "organize",
"options": {
"excludeByName": {
"job": true,
"terminal_state": true,
"__name__": true,
"publication_points": true,
"instance": true,
"repo_id": true,
"phase": true,
"pp_id": true,
"exported_instance": true,
"rp": true,
"source": true
},
"indexByName": {
"Time": 0,
"host": 1,
"rank": 2,
"transport": 3,
"uri": 4,
"Value": 5
},
"renameByName": {
"Value": "value"
}
}
"title": "Repository Sync Success by Repo",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"description": "Per-repository total sync duration aggregated from publication point repo_sync_duration_ms.",
"fieldConfig": {
"defaults": {
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 55
},
"id": 13,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
]
},
"targets": [
{
"expr": "ours_rp_repository_sync_duration_seconds{stat=\"sum\"}",
"legendFormat": "{{host}} {{repo_id}}",
"refId": "A"
}
],
"title": "Repository Sync Duration by Repo",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "Prometheus"
},
"description": "Per-repository downloaded bytes attributed from report.json downloads events.",
"fieldConfig": {
"defaults": {
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 63
},
"id": 14,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"expr": "ours_rp_repository_download_bytes",
"legendFormat": "{{host}} {{repo_id}}",
"refId": "A"
}
],
"title": "Repository Download Bytes by Repo",
"type": "timeseries"
}
],
"refresh": "5s",

View File

@ -251,6 +251,8 @@ struct RepoMetrics {
host: String,
transport: String,
publication_points: u64,
sync_success: bool,
download_bytes: u64,
duration_seconds_sum: f64,
duration_seconds_max: f64,
duration_seconds_avg: f64,
@ -659,11 +661,15 @@ fn parse_report(path: &Path, snapshot: &mut MetricsSnapshot, latest: &mut Latest
}
if let Some(pps) = report.get("publication_points").and_then(|v| v.as_array()) {
latest.publication_points = pps.len() as u64;
extract_publication_point_metrics(pps, snapshot);
extract_publication_point_metrics(pps, report.get("downloads"), snapshot);
}
}
fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapshot) {
fn extract_publication_point_metrics(
pps: &[Value],
downloads: Option<&Value>,
snapshot: &mut MetricsSnapshot,
) {
let mut repos: BTreeMap<String, RepoMetrics> = BTreeMap::new();
let mut pp_by_object_count = Vec::<TopPublicationPoint>::new();
let mut pp_by_sync_duration = Vec::<TopPublicationPoint>::new();
@ -706,11 +712,15 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh
uri: repo_uri.to_string(),
host: host.clone(),
transport: transport.clone(),
sync_success: true,
..RepoMetrics::default()
});
repo.publication_points += 1;
repo.duration_seconds_sum += duration_seconds;
repo.duration_seconds_max = repo.duration_seconds_max.max(duration_seconds);
if !is_success_terminal_state(&terminal_state) {
repo.sync_success = false;
}
*repo.phase_counts.entry(phase.clone()).or_default() += 1;
*repo
.terminal_state_counts
@ -755,6 +765,7 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh
}
let mut repo_stats = repos.into_values().collect::<Vec<_>>();
assign_download_bytes_to_repos(&mut repo_stats, downloads);
for repo in &mut repo_stats {
if repo.publication_points > 0 {
repo.duration_seconds_avg = repo.duration_seconds_sum / repo.publication_points as f64;
@ -799,6 +810,56 @@ fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapsh
snapshot.top_pp_by_sync_duration = pp_by_sync_duration;
}
fn assign_download_bytes_to_repos(repos: &mut [RepoMetrics], downloads: Option<&Value>) {
let Some(downloads) = downloads.and_then(|v| v.as_array()) else {
return;
};
for download in downloads {
let Some(uri) = json_str(download, &["uri"]) else {
continue;
};
let bytes = json_u64(download, &["bytes"]).unwrap_or(0);
if bytes == 0 {
continue;
}
if let Some(index) = find_repo_for_download(repos, uri) {
repos[index].download_bytes = repos[index].download_bytes.saturating_add(bytes);
}
}
}
fn find_repo_for_download(repos: &[RepoMetrics], uri: &str) -> Option<usize> {
if let Some(index) = repos.iter().position(|repo| repo.uri == uri) {
return Some(index);
}
if uri.starts_with("rsync://") {
return repos
.iter()
.enumerate()
.filter(|(_, repo)| uri.starts_with(&repo.uri))
.max_by_key(|(_, repo)| repo.uri.len())
.map(|(index, _)| index);
}
let uri_host = uri_host(uri);
let mut candidates = repos
.iter()
.enumerate()
.filter(|(_, repo)| repo.host == uri_host && repo.uri.starts_with("http"))
.collect::<Vec<_>>();
if candidates.len() == 1 {
return Some(candidates[0].0);
}
candidates.sort_by_key(|(_, repo)| common_prefix_len(&repo.uri, uri));
candidates
.last()
.and_then(|(index, repo)| (common_prefix_len(&repo.uri, uri) > 0).then_some(*index))
}
fn is_success_terminal_state(state: &str) -> bool {
matches!(state, "fresh" | "cached" | "reused" | "valid")
}
fn parse_cir(path: &Path, snapshot: &mut MetricsSnapshot) {
if !path.exists() {
return;
@ -998,11 +1059,7 @@ fn render_metrics(snapshot: &MetricsSnapshot) -> String {
render_top_repo_metrics(&mut writer, instance, &snapshot.top_repos_by_sync_duration);
render_object_metrics(&mut writer, instance, &snapshot.object_counts);
render_large_pp_metrics(&mut writer, instance, &snapshot.large_pp_counts);
render_top_publication_point_metrics(
&mut writer,
instance,
&snapshot.top_pp_by_object_count,
);
render_top_publication_point_metrics(&mut writer, instance, &snapshot.top_pp_by_object_count);
for (transport, histogram) in &snapshot.pp_sync_histograms {
writer.histogram(
"ours_rp_publication_point_sync_duration_seconds",
@ -1236,6 +1293,18 @@ fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[Rep
&base,
repo.publication_points as f64,
);
writer.gauge(
"ours_rp_repository_sync_success",
"Whether repository sync is successful in the latest run",
&base,
bool_value(repo.sync_success),
);
writer.gauge(
"ours_rp_repository_download_bytes",
"Repository download bytes attributed from latest run download events",
&base,
repo.download_bytes as f64,
);
for (stat, value) in [
("sum", repo.duration_seconds_sum),
("max", repo.duration_seconds_max),
@ -1288,10 +1357,7 @@ fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[Rep
fn render_failed_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) {
for repo in repos {
if repo
.phase_counts
.contains_key("rrdp_failed_rsync_failed")
{
if repo.phase_counts.contains_key("rrdp_failed_rsync_failed") {
writer.gauge(
"ours_rp_rrdp_rsync_failed_repository_duration_seconds",
"Repositories whose RRDP and rsync sync both failed; value is max sync duration when available",
@ -1875,6 +1941,13 @@ fn short_sha256(value: &str) -> String {
hex::encode(&digest[..6])
}
fn common_prefix_len(left: &str, right: &str) -> usize {
left.bytes()
.zip(right.bytes())
.take_while(|(l, r)| l == r)
.count()
}
fn format_prom_value(value: f64) -> String {
if value.is_infinite() && value.is_sign_positive() {
"+Inf".to_string()
@ -1950,7 +2023,7 @@ mod tests {
fs::write(run.join("stage-timing.json"), "{}").expect("stage");
fs::write(
run.join("report.json"),
r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#,
r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"downloads":[{"kind":"rrdp_notification","uri":"https://repo.example/notify.xml","success":true,"duration_ms":100,"bytes":111},{"kind":"rrdp_delta","uri":"https://repo.example/session/1/delta.xml","success":true,"duration_ms":200,"bytes":222}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#,
)
.expect("report");
fs::write(run.join("input.cir"), sample_cir()).expect("cir");
@ -1959,11 +2032,15 @@ mod tests {
let snapshot = scan_run_root(td.path(), "test").expect("scan");
assert_eq!(snapshot.runs.success, 1);
assert_eq!(snapshot.repo_stats.len(), 1);
assert!(snapshot.repo_stats[0].sync_success);
assert_eq!(snapshot.repo_stats[0].download_bytes, 333);
assert_eq!(snapshot.top_pp_by_object_count[0].object_count, 2);
assert_eq!(snapshot.cir.as_ref().unwrap().objects, 1);
assert_eq!(snapshot.ccr.as_ref().unwrap().state_items["tas"], 1);
let metrics = render_metrics(&snapshot);
assert!(metrics.contains("ours_rp_repository_info"));
assert!(metrics.contains("ours_rp_repository_sync_success"));
assert!(metrics.contains("ours_rp_repository_download_bytes"));
assert!(metrics.contains("ours_rp_large_publication_points"));
assert!(metrics.contains("ours_rp_cir_objects"));
assert!(metrics.contains("ours_rp_ccr_state_items"));