diff --git a/monitor/README.md b/monitor/README.md index 36d83b1..aac40c5 100644 --- a/monitor/README.md +++ b/monitor/README.md @@ -49,6 +49,40 @@ docker compose up -d PROMETHEUS_PORT=19090 GRAFANA_PORT=13000 docker compose up -d ``` +Prometheus 默认保留 7 天数据;可通过 `PROMETHEUS_RETENTION` 覆盖: + +```bash +PROMETHEUS_RETENTION=7d docker compose up -d +``` + +## 长期稳定性测试 + +portable soak package 内置 `run_24h_soak_with_metrics.sh`,用于连续运行 ours RP、启动 metrics sidecar、启动本监控栈,并每小时生成报告: + +```bash +cd /path/to/portable-soak +SOAK_DURATION_SECS=0 \ +HOURLY_REPORT_INTERVAL_SECS=3600 \ +SOAK_RETAIN_RUNS=100 \ +CLEAN_TMP_AFTER_RUN=1 \ +PROMETHEUS_RETENTION=7d \ +STOP_MONITOR_STACK_ON_EXIT=0 \ +FEISHU_WEBHOOK_SCRIPT=/home/yuyr/.codex/skills/user/feishu-webhook/scripts/send_feishu_text.py \ +./run_24h_soak_with_metrics.sh +``` + +`SOAK_DURATION_SECS=0` 表示持续运行不自动停止;如需 24 小时自然停止,可设置为 `86400`,脚本会等当前 run 完成后退出,不会直接 kill 半轮验证。 + +关键产物: + +- `runs/run_xxxx/`:最近 100 个 run 原始产物; +- `hourly_reports/hour_*.md`:小时级报告; +- `hourly_reports/hourly_summary.jsonl`:小时级结构化汇总; +- `incident_runs/run_xxxx/`:异常 run 固化副本; +- `logs/metrics.*`、`logs/24h-soak.*`、`logs/hourly-reporter.*`:运行日志。 + +短周期联调可把 `SOAK_DURATION_SECS` 和 `HOURLY_REPORT_INTERVAL_SECS` 调小,并设置 `FEISHU_DRY_RUN=1` 避免真实飞书通知。 + ## 停止 ```bash @@ -122,10 +156,65 @@ Grafana dashboard: - `ours_rp_run_completed_total` - `ours_rp_run_duration_seconds` - `ours_rp_run_max_rss_bytes` -- `ours_rp_vrps` +- `ours_rp_vrps{kind="total|unique"}`:`total` 为去重前 VRP 条目数,`unique` 按 `(ASN, IP Prefix, Max Length)` 去重。 - `ours_rp_vaps` - `ours_rp_publication_points` - `ours_rp_repo_sync_phase_count` - `ours_rp_large_publication_points{object_count_gt="10|50|100|..."}` - `ours_rp_cir_objects` - `ours_rp_ccr_state_items` + +## Inter-RP 持续对比监控 + +`rpki_inter_rp_metrics` 用于汇总三方 RP 的最新产物: + +- ours RP:读取当前 portable soak 的 `runs/run_xxxx/run-summary.json`、`result.ccr`、CSV 产物; +- Routinator:读取远端200同步来的 `routinator/latest/run-meta.json`、`vrps.csv`、`vaps.csv`; +- rpki-client 9.8:读取远端200同步来的 `rpki-client/latest/run-meta.json`、`vrps.csv`、`vaps.csv`、`result.ccr`。 + +远端231 启动 sidecar 示例: + +```bash +rpki_inter_rp_metrics \ + --ours-run-root /root/rpki_20260608_2_feature062_24h_20260608T075547Z/portable-soak \ + --peer-root /root/inter-rp-aggregator/synced-from-200 \ + --listen 0.0.0.0:9557 \ + --poll-secs 30 \ + --instance remote231-inter-rp +``` + +Prometheus 已新增 `ours-rp-inter-rp-metrics` scrape job,默认访问 `host.docker.internal:9557`。 + +远端200 runner 与远端231同步脚本位于: + +```text +scripts/inter_rp/run_remote200_rp_loops.sh +scripts/inter_rp/run_single_rp_with_rss.sh +scripts/inter_rp/sync_remote200_to_231.sh +scripts/inter_rp/run_inter_rp_metrics_sidecar.sh +scripts/inter_rp/inter-rp.env.example +``` + +如需从本机独立开关远端200上的 Routinator 或 rpki-client,使用: + +```bash +scripts/inter_rp/control_remote200_rp.sh status all +scripts/inter_rp/control_remote200_rp.sh stop routinator +scripts/inter_rp/control_remote200_rp.sh start routinator +scripts/inter_rp/control_remote200_rp.sh restart rpki-client +``` + +默认远端为 `root@43.110.128.200`,可通过 `REMOTE_HOST=...` 覆盖;脚本只管理指定 RP 的 loop 和当前子进程,不会自动影响另一个 RP。 + +关键指标: + +- `inter_rp_run_wall_seconds{rp="ours-rp|routinator|rpki-client"}` +- `inter_rp_run_max_rss_bytes{rp="...",kind="aggregate_peak"}` +- `inter_rp_vrps{rp="..."}`:按 `(ASN, IP Prefix, Max Length)` 去重。 +- `inter_rp_vaps{rp="..."}`:按 `(Customer ASN, Providers)` 去重,Routinator 使用 `--enable-aspa` JSON 输出转换,rpki-client 使用 `-j` JSON 输出转换。 +- `inter_rp_ccr_digest_match{left="ours-rp",right="rpki-client",state="overall|mfts|vrps|vaps|tas|rks"}` +- `inter_rp_sync_age_seconds` + +Grafana dashboard: + +- diff --git a/monitor/grafana/dashboards/ours-rp-inter-rp.json b/monitor/grafana/dashboards/ours-rp-inter-rp.json new file mode 100644 index 0000000..58dc268 --- /dev/null +++ b/monitor/grafana/dashboards/ours-rp-inter-rp.json @@ -0,0 +1,505 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "inter_rp_service_last_reload_success", + "legendFormat": "reload", + "refId": "A" + } + ], + "title": "Metrics Reload OK", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "inter_rp_sync_age_seconds", + "legendFormat": "sync age", + "refId": "A" + } + ], + "title": "Remote200 Sync Age", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "inter_rp_parse_errors", + "legendFormat": "errors", + "refId": "A" + } + ], + "title": "Parse Errors", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "inter_rp_ccr_digest_match{state=\"overall\"}", + "legendFormat": "overall", + "refId": "A" + } + ], + "title": "Ours vs rpki-client CCR Match", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "min": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 4 + }, + "id": 5, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "inter_rp_run_wall_seconds", + "legendFormat": "{{rp}}", + "refId": "A" + } + ], + "title": "Wall Time by RP", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "bytes", + "min": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 4 + }, + "id": 6, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "inter_rp_run_max_rss_bytes{kind=\"aggregate_peak\"}", + "legendFormat": "{{rp}}", + "refId": "A" + } + ], + "title": "Max RSS Aggregate Peak by RP", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0, + "min": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 12 + }, + "id": 7, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "inter_rp_vrps", + "legendFormat": "{{rp}}", + "refId": "A" + } + ], + "title": "VRPs by RP (unique ASN/Prefix/MaxLen)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0, + "min": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 12 + }, + "id": 8, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "inter_rp_vaps", + "legendFormat": "{{rp}}", + "refId": "A" + } + ], + "title": "VAPs / ASPAs by RP (unique Customer/Providers)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 20 + }, + "id": 9, + "options": { + "showHeader": true, + "sortBy": [] + }, + "targets": [ + { + "expr": "inter_rp_ccr_digest_match{left=\"ours-rp\",right=\"rpki-client\"}", + "format": "table", + "instant": true, + "legendFormat": "{{state}}", + "refId": "A" + } + ], + "title": "CCR Digest Match States", + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "decimals": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 20 + }, + "id": 10, + "options": { + "showHeader": true, + "sortBy": [] + }, + "targets": [ + { + "expr": "inter_rp_vrps_diff", + "format": "table", + "instant": true, + "legendFormat": "vrps {{left}}-{{right}}", + "refId": "A" + }, + { + "expr": "inter_rp_vaps_diff", + "format": "table", + "instant": true, + "legendFormat": "vaps {{left}}-{{right}}", + "refId": "B" + } + ], + "title": "Output Count Diffs (VRP/VAP unique)", + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "min": 0 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 28 + }, + "id": 11, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "inter_rp_artifact_age_seconds", + "legendFormat": "{{rp}}", + "refId": "A" + } + ], + "title": "Artifact Age by RP", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "10s", + "schemaVersion": 40, + "tags": [ + "rpki", + "inter-rp" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Ours RP Inter-RP", + "uid": "ours-rp-inter-rp", + "version": 1 +} diff --git a/monitor/grafana/dashboards/ours-rp-soak-overview.json b/monitor/grafana/dashboards/ours-rp-soak-overview.json index e18238a..ebe15e0 100644 --- a/monitor/grafana/dashboards/ours-rp-soak-overview.json +++ b/monitor/grafana/dashboards/ours-rp-soak-overview.json @@ -15,7 +15,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -147,7 +148,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -238,7 +240,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -265,10 +268,15 @@ }, "targets": [ { - "expr": "ours_rp_vrps", - "legendFormat": "VRPs", + "expr": "ours_rp_vrps{kind=\"total\"}", + "legendFormat": "VRPs raw", "refId": "A" }, + { + "expr": "ours_rp_vrps{kind=\"unique\"}", + "legendFormat": "VRPs unique", + "refId": "D" + }, { "expr": "ours_rp_vaps", "legendFormat": "VAPs", @@ -290,7 +298,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -332,7 +341,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -376,7 +386,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "decimals": 0, + "unit": "none" }, "overrides": [] }, @@ -420,7 +431,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "unit": "none", + "decimals": 0 }, "overrides": [] }, @@ -449,8 +461,8 @@ "pluginVersion": "11.3.1", "targets": [ { - "expr": "ours_rp_vrps", - "legendFormat": "VRPs", + "expr": "ours_rp_vrps{kind=\"total\"}", + "legendFormat": "VRPs raw", "refId": "A" } ], @@ -464,7 +476,8 @@ }, "fieldConfig": { "defaults": { - "unit": "short" + "unit": "none", + "decimals": 0 }, "overrides": [] }, @@ -557,6 +570,50 @@ ], "title": "Output Stage Durations", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "bytes", + "decimals": 2 + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 24, + "w": 24, + "h": 8 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_run_max_rss_bytes", + "legendFormat": "Max RSS", + "refId": "A" + } + ], + "title": "Max RSS Over Time", + "type": "timeseries" } ], "refresh": "5s", diff --git a/monitor/prometheus/prometheus.yml b/monitor/prometheus/prometheus.yml index a7a48fa..7621242 100644 --- a/monitor/prometheus/prometheus.yml +++ b/monitor/prometheus/prometheus.yml @@ -11,3 +11,10 @@ scrape_configs: labels: rp: ours-rp source: artifact-sidecar + - job_name: ours-rp-inter-rp-metrics + metrics_path: /metrics + static_configs: + - targets: + - host.docker.internal:9557 + labels: + source: inter-rp-sidecar diff --git a/scripts/coverage.sh b/scripts/coverage.sh index 51d0db3..e9ab1b3 100755 --- a/scripts/coverage.sh +++ b/scripts/coverage.sh @@ -27,7 +27,7 @@ cleanup() { } trap cleanup EXIT -IGNORE_REGEX='repository_view_stats\.rs|db_stats\.rs|rrdp_state_dump\.rs|ccr_dump\.rs|ccr_verify\.rs|ccr_to_routinator_csv\.rs|ccr_to_compare_views\.rs|cir_materialize\.rs|cir_extract_inputs\.rs|cir_drop_report\.rs|cir_ta_only_fixture\.rs|cir_dump_reject_list\.rs|rpki_object_parse\.rs|triage_ccr_cir_pair\.rs|rpki_artifact_metrics|rpki_daemon\.rs|sequence_triage_ccr_cir|ccr_state_compare\.rs|cir_state_compare\.rs|cir_probe_rpki_client_cache\.rs|ccr/compare_view\.rs|progress_log\.rs|cli\.rs|validation/run_tree_from_tal\.rs|validation/tree_parallel\.rs|validation/tree_runner|validation/from_tal\.rs|sync/store_projection\.rs|sync/repo\.rs|sync/rrdp|(^|/)storage(/|\.rs$)|cir/materialize\.rs' +IGNORE_REGEX='repository_view_stats\.rs|db_stats\.rs|rrdp_state_dump\.rs|ccr_dump\.rs|ccr_verify\.rs|ccr_to_routinator_csv\.rs|ccr_to_compare_views\.rs|cir_materialize\.rs|cir_extract_inputs\.rs|cir_drop_report\.rs|cir_ta_only_fixture\.rs|cir_dump_reject_list\.rs|rpki_object_parse\.rs|triage_ccr_cir_pair\.rs|rpki_artifact_metrics|rpki_inter_rp_metrics|rpki_daemon\.rs|sequence_triage_ccr_cir|ccr_state_compare\.rs|cir_state_compare\.rs|cir_probe_rpki_client_cache\.rs|ccr/compare_view\.rs|progress_log\.rs|cli\.rs|validation/run_tree_from_tal\.rs|validation/tree_parallel\.rs|validation/tree_runner|validation/from_tal\.rs|sync/store_projection\.rs|sync/repo\.rs|sync/rrdp|(^|/)storage(/|\.rs$)|cir/materialize\.rs' # Preserve colored output even though we post-process output by running under a pseudo-TTY. # We run tests only once, then generate both CLI text + HTML reports without rerunning tests. diff --git a/scripts/inter_rp/control_remote200_rp.sh b/scripts/inter_rp/control_remote200_rp.sh new file mode 100755 index 0000000..c62953d --- /dev/null +++ b/scripts/inter_rp/control_remote200_rp.sh @@ -0,0 +1,265 @@ +#!/usr/bin/env bash +set -euo pipefail + +REMOTE_HOST="${REMOTE_HOST:-root@43.110.128.200}" +REMOTE_CONFIG="${REMOTE_CONFIG:-/root/inter-rp-runners/inter-rp.env}" +REMOTE_SCRIPTS_DIR="${REMOTE_SCRIPTS_DIR:-/root/inter-rp-runners/scripts}" +DEFAULT_RETAIN_RUNS="${RETAIN_RUNS:-20}" +DEFAULT_RSS_SAMPLE_MS="${RSS_SAMPLE_MS:-500}" + +usage() { + cat <<'USAGE' +Usage: + control_remote200_rp.sh status [rpki-client|routinator|all] + control_remote200_rp.sh start + control_remote200_rp.sh stop + control_remote200_rp.sh restart + +Environment overrides: + REMOTE_HOST=root@43.110.128.200 + REMOTE_CONFIG=/root/inter-rp-runners/inter-rp.env + REMOTE_SCRIPTS_DIR=/root/inter-rp-runners/scripts + RETAIN_RUNS=20 + RSS_SAMPLE_MS=500 + +Notes: + - start uses the remote run_single_rp_with_rss.sh loop and runs until stopped. + - stop only kills the selected RP loop branch and its current child processes. + - rpki-client and routinator are managed independently. +USAGE +} + +ACTION="${1:-status}" +RP="${2:-all}" + +case "$ACTION" in + status|start|stop|restart) ;; + -h|--help|help) + usage + exit 0 + ;; + *) + echo "unknown action: $ACTION" >&2 + usage >&2 + exit 2 + ;; +esac + +case "$RP" in + rpki-client|routinator|all) ;; + *) + echo "unknown RP: $RP" >&2 + usage >&2 + exit 2 + ;; +esac + +if [[ "$ACTION" == "start" || "$ACTION" == "restart" ]] && [[ "$RP" == "all" ]]; then + echo "start/restart requires one RP: rpki-client or routinator" >&2 + exit 2 +fi + +ssh "$REMOTE_HOST" \ + "REMOTE_CONFIG='$REMOTE_CONFIG' REMOTE_SCRIPTS_DIR='$REMOTE_SCRIPTS_DIR' RETAIN_RUNS='$DEFAULT_RETAIN_RUNS' RSS_SAMPLE_MS='$DEFAULT_RSS_SAMPLE_MS' ACTION='$ACTION' RP='$RP' bash -s" <<'REMOTE' +set -euo pipefail + +load_config() { + if [[ -f "$REMOTE_CONFIG" ]]; then + # shellcheck disable=SC1090 + source "$REMOTE_CONFIG" + fi + INTER_RP_ROOT="${INTER_RP_ROOT:-/var/lib/inter-rp-runners}" + RETAIN_RUNS="${RETAIN_RUNS:-20}" + RSS_SAMPLE_MS="${RSS_SAMPLE_MS:-500}" + ROUTINATOR_RUN_COMMAND="${ROUTINATOR_RUN_COMMAND:-/root/inter-rp-runners/scripts/run_routinator_once.sh}" + RPKI_CLIENT_RUN_COMMAND="${RPKI_CLIENT_RUN_COMMAND:-/root/inter-rp-runners/scripts/run_rpki_client_official_98_once.sh}" +} + +rp_root_name() { + case "$1" in + routinator) echo "routinator" ;; + rpki-client) echo "rpki-client" ;; + esac +} + +rp_pid_file() { + echo "$INTER_RP_ROOT/$(rp_root_name "$1").loop.pid" +} + +rp_log_file() { + echo "$INTER_RP_ROOT/$(rp_root_name "$1").loop.log" +} + +rp_command() { + case "$1" in + routinator) echo "$ROUTINATOR_RUN_COMMAND" ;; + rpki-client) echo "$RPKI_CLIENT_RUN_COMMAND" ;; + esac +} + +rp_binary_pattern() { + case "$1" in + routinator) echo "/root/inter-rp-runners/bin/routinator|[[:space:]]routinator[[:space:]]" ;; + rpki-client) echo "rpki-client-official-9\\.8|[[:space:]]rpki-client[[:space:]]" ;; + esac +} + +loop_pattern() { + local rp="$1" + echo "run_single_rp_with_rss\\.sh --rp $rp " +} + +is_running_pid() { + local pid="${1:-}" + [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null +} + +current_loop_pids() { + local rp="$1" + pgrep -af "$(loop_pattern "$rp")" 2>/dev/null | awk '{print $1}' || true +} + +status_one() { + local rp="$1" + local pid_file + pid_file="$(rp_pid_file "$rp")" + local pid="" + [[ -f "$pid_file" ]] && pid="$(cat "$pid_file" 2>/dev/null || true)" + echo "== $rp ==" + if is_running_pid "$pid"; then + echo "loop: running pid=$pid" + else + local detected + detected="$(current_loop_pids "$rp" | paste -sd, -)" + if [[ -n "$detected" ]]; then + echo "loop: running detected_pids=$detected" + else + echo "loop: stopped" + fi + fi + ps -eo pid,ppid,stat,etime,%cpu,%mem,rss,cmd --sort=-%cpu \ + | grep -E "$(loop_pattern "$rp")|$(rp_binary_pattern "$rp")" \ + | grep -v grep \ + | head -20 || true + local latest="$INTER_RP_ROOT/$(rp_root_name "$rp")/latest/run-meta.json" + if [[ -f "$latest" ]]; then + python3 - "$latest" <<'PY' +import json, sys +path = sys.argv[1] +with open(path, "r", encoding="utf-8") as handle: + meta = json.load(handle) +print("latest:", "runSeq=", meta.get("runSeq"), "success=", meta.get("success"), "wallMs=", meta.get("wallMs"), "vrps=", meta.get("counts", {}).get("vrps"), "vaps=", meta.get("counts", {}).get("vaps"), "rssKb=", meta.get("maxRssKb", {}).get("aggregatePeak")) +PY + else + echo "latest: none" + fi +} + +start_one() { + local rp="$1" + local pid_file log_file root_name run_command + pid_file="$(rp_pid_file "$rp")" + log_file="$(rp_log_file "$rp")" + root_name="$(rp_root_name "$rp")" + run_command="$(rp_command "$rp")" + mkdir -p "$INTER_RP_ROOT/$root_name" "$INTER_RP_ROOT" + local existing="" + if [[ -f "$pid_file" ]]; then + existing="$(cat "$pid_file" 2>/dev/null || true)" + fi + if is_running_pid "$existing"; then + echo "$rp already running pid=$existing" + return 0 + fi + local detected + detected="$(current_loop_pids "$rp" | head -1)" + if [[ -n "$detected" ]] && is_running_pid "$detected"; then + echo "$detected" >"$pid_file" + echo "$rp already running detected pid=$detected" + return 0 + fi + nohup bash -lc ' + set -euo pipefail + while true; do + "'"$REMOTE_SCRIPTS_DIR"'/run_single_rp_with_rss.sh" \ + --rp "'"$rp"'" \ + --root "'"$INTER_RP_ROOT/$root_name"'" \ + --command "'"$run_command"'" \ + --retain-runs "'"$RETAIN_RUNS"'" \ + --sample-ms "'"$RSS_SAMPLE_MS"'" || true + done + ' >"$log_file" 2>&1 & + local pid="$!" + echo "$pid" >"$pid_file" + echo "$rp started pid=$pid log=$log_file" +} + +stop_one() { + local rp="$1" + local candidates=() + local pid_file pid + pid_file="$(rp_pid_file "$rp")" + if [[ -f "$pid_file" ]]; then + pid="$(cat "$pid_file" 2>/dev/null || true)" + [[ -n "$pid" ]] && candidates+=("$pid") + fi + while read -r pid; do + [[ -n "$pid" ]] && candidates+=("$pid") + done < <(current_loop_pids "$rp") + while read -r pid; do + [[ -n "$pid" ]] && candidates+=("$pid") + done < <(pgrep -af "$(rp_binary_pattern "$rp")" 2>/dev/null | awk '{print $1}' || true) + + if ((${#candidates[@]} == 0)); then + echo "$rp already stopped" + rm -f "$pid_file" + return 0 + fi + + local unique_pids + unique_pids="$(printf '%s\n' "${candidates[@]}" | awk '!seen[$0]++' | tr '\n' ' ')" + echo "stopping $rp pids=$unique_pids" + for pid in $unique_pids; do + kill -TERM "$pid" 2>/dev/null || true + done + sleep 3 + for pid in $unique_pids; do + kill -KILL "$pid" 2>/dev/null || true + done + rm -f "$pid_file" + echo "$rp stopped" +} + +load_config + +case "$ACTION:$RP" in + status:all) + date -Is + uptime + status_one routinator + status_one rpki-client + ;; + status:*) + date -Is + uptime + status_one "$RP" + ;; + start:*) + start_one "$RP" + status_one "$RP" + ;; + stop:all) + stop_one routinator + stop_one rpki-client + ;; + stop:*) + stop_one "$RP" + status_one "$RP" + ;; + restart:*) + stop_one "$RP" + start_one "$RP" + status_one "$RP" + ;; +esac +REMOTE diff --git a/scripts/inter_rp/inter-rp.env.example b/scripts/inter_rp/inter-rp.env.example new file mode 100644 index 0000000..efcb9a4 --- /dev/null +++ b/scripts/inter_rp/inter-rp.env.example @@ -0,0 +1,26 @@ +# 远端200 runner 配置示例。 +# 该脚本只负责任务调度、RSS采样、run目录和run-meta.json;具体RP命令由下面两个变量提供。 + +# 远端200上的运行根目录。 +# 建议把运行态数据放到 /var/lib,避免 rpki-client 降权后无法读写 /root。 +INTER_RP_ROOT=/var/lib/inter-rp-runners + +# -1 表示持续运行;正整数表示每个RP运行多少轮。 +MAX_RUNS=-1 + +# 每轮结束后的等待秒数。 +RUN_INTERVAL_SECS=0 + +# 每个RP保留最近多少轮。 +RETAIN_RUNS=20 + +# RSS采样间隔,毫秒。 +RSS_SAMPLE_MS=500 + +# Routinator 命令模板。命令运行时会自动导出 RUN_DIR/RP_ROOT/RUN_SEQ/RUN_ID/RP_NAME。 +# 命令必须把 vrps.csv/vaps.csv 写入 $RUN_DIR;Routinator 使用 --enable-aspa + JSON 输出转换出 VAP CSV。 +ROUTINATOR_RUN_COMMAND=/root/inter-rp-runners/scripts/run_routinator_once.sh + +# 官方 rpki-client 9.8 命令模板。 +# rpki-client 会降权运行,cache 和 output 必须预先放在可写目录;命令需复制 output/csv、output/rpki.ccr,并从 output/json 转换出 vaps.csv。 +RPKI_CLIENT_RUN_COMMAND=/root/inter-rp-runners/scripts/run_rpki_client_official_98_once.sh diff --git a/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh b/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh new file mode 100755 index 0000000..baab5e3 --- /dev/null +++ b/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +set -euo pipefail + +CONFIG_FILE="${INTER_RP_METRICS_CONFIG:-./inter-rp-metrics.env}" +if [[ -f "$CONFIG_FILE" ]]; then + # shellcheck disable=SC1090 + source "$CONFIG_FILE" +fi + +RPKI_INTER_RP_METRICS_BIN="${RPKI_INTER_RP_METRICS_BIN:-./bin/rpki_inter_rp_metrics}" +OURS_RUN_ROOT="${OURS_RUN_ROOT:?OURS_RUN_ROOT is required}" +PEER_ROOT="${PEER_ROOT:-/root/inter-rp-aggregator/synced-from-200}" +LISTEN="${INTER_RP_METRICS_LISTEN:-0.0.0.0:9557}" +POLL_SECS="${INTER_RP_METRICS_POLL_SECS:-30}" +INSTANCE="${INTER_RP_METRICS_INSTANCE:-remote231-inter-rp}" +LOG_DIR="${INTER_RP_METRICS_LOG_DIR:-./logs}" + +mkdir -p "$LOG_DIR" +exec "$RPKI_INTER_RP_METRICS_BIN" \ + --ours-run-root "$OURS_RUN_ROOT" \ + --peer-root "$PEER_ROOT" \ + --listen "$LISTEN" \ + --poll-secs "$POLL_SECS" \ + --instance "$INSTANCE" \ + >>"$LOG_DIR/inter-rp-metrics.log" 2>&1 diff --git a/scripts/inter_rp/run_remote200_rp_loops.sh b/scripts/inter_rp/run_remote200_rp_loops.sh new file mode 100755 index 0000000..6020d83 --- /dev/null +++ b/scripts/inter_rp/run_remote200_rp_loops.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CONFIG_FILE="${INTER_RP_CONFIG:-./inter-rp.env}" +if [[ -f "$CONFIG_FILE" ]]; then + # shellcheck disable=SC1090 + source "$CONFIG_FILE" +fi + +INTER_RP_ROOT="${INTER_RP_ROOT:-/root/inter-rp-runners}" +MAX_RUNS="${MAX_RUNS:--1}" +RUN_INTERVAL_SECS="${RUN_INTERVAL_SECS:-0}" +RETAIN_RUNS="${RETAIN_RUNS:-20}" +RSS_SAMPLE_MS="${RSS_SAMPLE_MS:-500}" +ROUTINATOR_RUN_COMMAND="${ROUTINATOR_RUN_COMMAND:-}" +RPKI_CLIENT_RUN_COMMAND="${RPKI_CLIENT_RUN_COMMAND:-}" + +if [[ -z "$ROUTINATOR_RUN_COMMAND" || -z "$RPKI_CLIENT_RUN_COMMAND" ]]; then + cat >&2 <<'MSG' +ROUTINATOR_RUN_COMMAND and RPKI_CLIENT_RUN_COMMAND are required. +Each command runs with RUN_DIR, RP_ROOT, RUN_SEQ, RUN_ID and RP_NAME exported. +MSG + exit 2 +fi + +mkdir -p "$INTER_RP_ROOT" + +run_loop() { + local rp_name="$1" + local run_command="$2" + local rp_root="$INTER_RP_ROOT/$rp_name" + local completed="0" + mkdir -p "$rp_root" + while true; do + if [[ "$MAX_RUNS" =~ ^[0-9]+$ ]] && (( completed >= MAX_RUNS )); then + break + fi + "$SCRIPT_DIR/run_single_rp_with_rss.sh" \ + --rp "$rp_name" \ + --root "$rp_root" \ + --command "$run_command" \ + --retain-runs "$RETAIN_RUNS" \ + --sample-ms "$RSS_SAMPLE_MS" || true + completed=$((completed + 1)) + if (( RUN_INTERVAL_SECS > 0 )); then + sleep "$RUN_INTERVAL_SECS" + fi + done +} + +run_loop routinator "$ROUTINATOR_RUN_COMMAND" >"$INTER_RP_ROOT/routinator.loop.log" 2>&1 & +ROUTINATOR_LOOP_PID=$! +run_loop rpki-client "$RPKI_CLIENT_RUN_COMMAND" >"$INTER_RP_ROOT/rpki-client.loop.log" 2>&1 & +RPKI_CLIENT_LOOP_PID=$! + +cat >"$INTER_RP_ROOT/runner-pids.env" < --root --command [--retain-runs ] [--sample-ms ] + +The command runs with RUN_DIR, RP_ROOT, RUN_SEQ, RUN_ID, and RP_NAME exported. +It must write artifacts into RUN_DIR. The wrapper writes run-meta.json and atomically updates latest. +USAGE +} + +RP_NAME="" +RP_ROOT="" +RUN_COMMAND="" +RETAIN_RUNS="${RETAIN_RUNS:-20}" +SAMPLE_MS="${RSS_SAMPLE_MS:-500}" + +while [[ $# -gt 0 ]]; do + case "$1" in + --rp) + RP_NAME="$2" + shift 2 + ;; + --root) + RP_ROOT="$2" + shift 2 + ;; + --command) + RUN_COMMAND="$2" + shift 2 + ;; + --retain-runs) + RETAIN_RUNS="$2" + shift 2 + ;; + --sample-ms) + SAMPLE_MS="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "unknown argument: $1" >&2 + usage >&2 + exit 2 + ;; + esac +done + +if [[ -z "$RP_NAME" || -z "$RP_ROOT" || -z "$RUN_COMMAND" ]]; then + usage >&2 + exit 2 +fi + +mkdir -p "$RP_ROOT/runs" "$RP_ROOT/logs" + +next_seq() { + local max_seq="0" + local run_path + shopt -s nullglob + for run_path in "$RP_ROOT"/runs/run_*; do + local run_name="${run_path##*/}" + local seq="${run_name#run_}" + if [[ "$seq" =~ ^[0-9]+$ ]] && (( 10#$seq > max_seq )); then + max_seq=$((10#$seq)) + fi + done + shopt -u nullglob + printf '%06d' $((max_seq + 1)) +} + +rss_kb_for_pid() { + local pid="$1" + awk '/^VmRSS:/ {print $2; found=1} END {if (!found) print 0}' "/proc/$pid/status" 2>/dev/null || echo 0 +} + +collect_related_pids() { + local root_pid="$1" + local process_group="$2" + local queue=("$root_pid") + local pid + declare -A seen=() + + if [[ -n "$process_group" ]]; then + while read -r pid; do + [[ -n "$pid" ]] && queue+=("$pid") + done < <(pgrep -g "$process_group" 2>/dev/null || true) + fi + + while ((${#queue[@]} > 0)); do + pid="${queue[0]}" + queue=("${queue[@]:1}") + [[ -z "$pid" || -n "${seen[$pid]:-}" ]] && continue + seen[$pid]=1 + echo "$pid" + while read -r child_pid; do + [[ -n "$child_pid" ]] && queue+=("$child_pid") + done < <(pgrep -P "$pid" 2>/dev/null || true) + done +} + +sum_related_rss() { + local process_group="$1" + local parent_pid="$2" + local total_rss="0" + local child_max_rss="0" + local pid + while read -r pid; do + [[ -z "$pid" ]] && continue + local rss + rss="$(rss_kb_for_pid "$pid")" + total_rss=$((total_rss + rss)) + if [[ "$pid" != "$parent_pid" ]] && (( rss > child_max_rss )); then + child_max_rss="$rss" + fi + done < <(collect_related_pids "$parent_pid" "$process_group") + printf '%s %s +' "$total_rss" "$child_max_rss" +} + +count_csv_rows() { + local path="$1" + if [[ ! -f "$path" ]]; then + echo 0 + return + fi + awk 'BEGIN {count=0} /^[[:space:]]*$/ {next} /^#/ {next} NR==1 {next} {count++} END {print count}' "$path" +} + +RUN_SEQ="$(next_seq)" +RUN_ID="run_${RUN_SEQ}" +RUN_DIR="$RP_ROOT/runs/$RUN_ID" +mkdir -p "$RUN_DIR" + +STARTED_AT="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +START_EPOCH_MS="$(python3 - <<'PY' +import time +print(int(time.time() * 1000)) +PY +)" + +export RP_NAME RP_ROOT RUN_SEQ RUN_ID RUN_DIR RUN_COMMAND +set +e +setsid bash -lc "$RUN_COMMAND" >"$RUN_DIR/stdout.log" 2>"$RUN_DIR/stderr.log" & +CHILD_PID=$! +set -e +sleep 0.05 +PROCESS_GROUP="$(ps -o pgid= -p "$CHILD_PID" 2>/dev/null | tr -d '[:space:]' || true)" +if [[ -z "$PROCESS_GROUP" ]]; then + PROCESS_GROUP="$CHILD_PID" +fi + +PARENT_MAX_RSS_KB="0" +CHILD_MAX_RSS_KB="0" +AGGREGATE_PEAK_RSS_KB="0" +SAMPLE_INTERVAL_SECONDS="$(python3 - </dev/null; do + parent_rss="$(rss_kb_for_pid "$CHILD_PID")" + read -r aggregate_rss child_rss < <(sum_related_rss "$PROCESS_GROUP" "$CHILD_PID") + if (( parent_rss > PARENT_MAX_RSS_KB )); then + PARENT_MAX_RSS_KB="$parent_rss" + fi + if (( child_rss > CHILD_MAX_RSS_KB )); then + CHILD_MAX_RSS_KB="$child_rss" + fi + if (( aggregate_rss > AGGREGATE_PEAK_RSS_KB )); then + AGGREGATE_PEAK_RSS_KB="$aggregate_rss" + fi + sleep "$SAMPLE_INTERVAL_SECONDS" +done + +set +e +wait "$CHILD_PID" +EXIT_CODE=$? +set -e + +FINISHED_AT="$(date -u +%Y-%m-%dT%H:%M:%SZ)" +END_EPOCH_MS="$(python3 - <<'PY' +import time +print(int(time.time() * 1000)) +PY +)" +WALL_MS=$((END_EPOCH_MS - START_EPOCH_MS)) +SUCCESS=false +if [[ "$EXIT_CODE" == "0" ]]; then + SUCCESS=true +fi + +VRPS_COUNT="$(count_csv_rows "$RUN_DIR/vrps.csv")" +VAPS_COUNT="$(count_csv_rows "$RUN_DIR/vaps.csv")" + +export STARTED_AT FINISHED_AT WALL_MS EXIT_CODE SUCCESS +export PARENT_MAX_RSS_KB CHILD_MAX_RSS_KB AGGREGATE_PEAK_RSS_KB SAMPLE_MS +export VRPS_COUNT VAPS_COUNT +export CCR_ARTIFACT_PATH="" +export VRPS_ARTIFACT_PATH="" +export VAPS_ARTIFACT_PATH="" +if [[ -f "$RUN_DIR/result.ccr" ]]; then + CCR_ARTIFACT_PATH="result.ccr" +fi +if [[ -f "$RUN_DIR/vrps.csv" ]]; then + VRPS_ARTIFACT_PATH="vrps.csv" +fi +if [[ -f "$RUN_DIR/vaps.csv" ]]; then + VAPS_ARTIFACT_PATH="vaps.csv" +fi + +python3 - <<'PY' >"$RUN_DIR/run-meta.json" +import os +import json, socket +def optional(name): + value = os.environ.get(name, "") + return value if value else None +meta = { + "schemaVersion": 1, + "rp": os.environ["RP_NAME"], + "runSeq": int(os.environ["RUN_SEQ"]), + "runId": os.environ["RUN_ID"], + "host": socket.gethostname(), + "command": os.environ["RUN_COMMAND"], + "startedAtRfc3339Utc": os.environ["STARTED_AT"], + "finishedAtRfc3339Utc": os.environ["FINISHED_AT"], + "wallMs": int(os.environ["WALL_MS"]), + "exitCode": int(os.environ["EXIT_CODE"]), + "success": os.environ["SUCCESS"] == "true", + "maxRssKb": { + "parent": int(os.environ["PARENT_MAX_RSS_KB"]), + "childMax": int(os.environ["CHILD_MAX_RSS_KB"]), + "aggregatePeak": int(os.environ["AGGREGATE_PEAK_RSS_KB"]), + "sampleIntervalMs": int(os.environ["SAMPLE_MS"]), + }, + "artifacts": { + "vrpsCsv": optional("VRPS_ARTIFACT_PATH"), + "vapsCsv": optional("VAPS_ARTIFACT_PATH"), + "ccr": optional("CCR_ARTIFACT_PATH"), + "stdout": "stdout.log", + "stderr": "stderr.log", + }, + "counts": { + "vrps": int(os.environ["VRPS_COUNT"]), + "vaps": int(os.environ["VAPS_COUNT"]), + }, +} +print(json.dumps(meta, indent=2, ensure_ascii=False)) +PY + +ln -sfn "runs/$RUN_ID" "$RP_ROOT/latest.tmp" +mv -Tf "$RP_ROOT/latest.tmp" "$RP_ROOT/latest" + +echo "$RUN_ID $RP_NAME exit=$EXIT_CODE wall_ms=$WALL_MS vrps=$VRPS_COUNT vaps=$VAPS_COUNT rss_kb=$AGGREGATE_PEAK_RSS_KB" + +if [[ "$RETAIN_RUNS" =~ ^[0-9]+$ ]] && (( RETAIN_RUNS > 0 )); then + mapfile -t old_runs < <(find "$RP_ROOT/runs" -maxdepth 1 -type d -name 'run_*' -printf '%f\n' | sort | head -n -"$RETAIN_RUNS" || true) + for old_run in "${old_runs[@]}"; do + rm -rf "$RP_ROOT/runs/$old_run" + done +fi + +exit "$EXIT_CODE" diff --git a/scripts/inter_rp/sync_remote200_to_231.sh b/scripts/inter_rp/sync_remote200_to_231.sh new file mode 100755 index 0000000..e958170 --- /dev/null +++ b/scripts/inter_rp/sync_remote200_to_231.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +set -euo pipefail + +CONFIG_FILE="${INTER_RP_SYNC_CONFIG:-./inter-rp-sync.env}" +if [[ -f "$CONFIG_FILE" ]]; then + # shellcheck disable=SC1090 + source "$CONFIG_FILE" +fi + +REMOTE200="${REMOTE200:-root@43.110.128.200}" +REMOTE200_ROOT="${REMOTE200_ROOT:-/root/inter-rp-runners}" +PEER_ROOT="${PEER_ROOT:-/root/inter-rp-aggregator/synced-from-200}" +SYNC_INTERVAL_SECS="${SYNC_INTERVAL_SECS:-60}" +MAX_SYNCS="${MAX_SYNCS:--1}" +RPS="${RPS:-routinator rpki-client}" + +mkdir -p "$PEER_ROOT" + +write_status() { + local success="$1" + local message="$2" + export SYNC_SUCCESS="$success" + export SYNC_MESSAGE="$message" + export SYNC_REMOTE200="$REMOTE200" + export SYNC_REMOTE200_ROOT="$REMOTE200_ROOT" + python3 - <<'PY' >"$PEER_ROOT/sync-status.json" +import json, socket, datetime +print(json.dumps({ + "schemaVersion": 1, + "success": __import__("os").environ["SYNC_SUCCESS"] == "true", + "lastSyncAtRfc3339Utc": datetime.datetime.now(datetime.UTC).replace(microsecond=0).isoformat().replace('+00:00', 'Z'), + "remoteHost": __import__("os").environ["SYNC_REMOTE200"], + "remoteRoot": __import__("os").environ["SYNC_REMOTE200_ROOT"], + "localHost": socket.gethostname(), + "message": __import__("os").environ["SYNC_MESSAGE"], +}, indent=2)) +PY +} + +sync_once() { + local rp_name + local temp_root="$PEER_ROOT/.sync-tmp-$$" + rm -rf "$temp_root" + mkdir -p "$temp_root" + for rp_name in $RPS; do + mkdir -p "$temp_root/$rp_name" + if ! rsync -aL --delete \ + --include='run-meta.json' \ + --include='result.ccr' \ + --include='vrps.csv' \ + --include='vaps.csv' \ + --include='stdout.log' \ + --include='stderr.log' \ + --exclude='*' \ + "$REMOTE200:$REMOTE200_ROOT/$rp_name/latest/" "$temp_root/$rp_name/latest/"; then + echo "sync failed for $rp_name; clearing stale local latest" >&2 + rm -rf "$PEER_ROOT/$rp_name/latest" "$PEER_ROOT/$rp_name/latest.next" "$PEER_ROOT/$rp_name/latest.prev" + return 1 + fi + if [[ ! -d "$temp_root/$rp_name/latest" ]]; then + echo "missing synced latest directory for $rp_name" >&2 + rm -rf "$PEER_ROOT/$rp_name/latest" "$PEER_ROOT/$rp_name/latest.next" "$PEER_ROOT/$rp_name/latest.prev" + return 1 + fi + rm -rf "$PEER_ROOT/$rp_name/latest.next" "$PEER_ROOT/$rp_name/latest.prev" + mkdir -p "$PEER_ROOT/$rp_name" + mv "$temp_root/$rp_name/latest" "$PEER_ROOT/$rp_name/latest.next" + if [[ -e "$PEER_ROOT/$rp_name/latest" ]]; then + mv "$PEER_ROOT/$rp_name/latest" "$PEER_ROOT/$rp_name/latest.prev" + fi + mv "$PEER_ROOT/$rp_name/latest.next" "$PEER_ROOT/$rp_name/latest" + rm -rf "$PEER_ROOT/$rp_name/latest.prev" + done + rm -rf "$temp_root" +} + +completed="0" +while true; do + if sync_once; then + write_status true "ok" + else + rm -rf "$PEER_ROOT/.sync-tmp-$$" + write_status false "rsync failed" + fi + completed=$((completed + 1)) + if [[ "$MAX_SYNCS" =~ ^[0-9]+$ ]] && (( completed >= MAX_SYNCS )); then + break + fi + sleep "$SYNC_INTERVAL_SECS" +done diff --git a/scripts/soak/build_portable_soak_package.sh b/scripts/soak/build_portable_soak_package.sh index 941d205..c8b9a85 100755 --- a/scripts/soak/build_portable_soak_package.sh +++ b/scripts/soak/build_portable_soak_package.sh @@ -53,7 +53,7 @@ else TARGET_BIN_DIR="$REPO_ROOT/target/$PROFILE" fi -REQUIRED_BINS=(rpki rpki_daemon db_stats) +REQUIRED_BINS=(rpki rpki_daemon db_stats rpki_artifact_metrics) OPTIONAL_BINS=( ccr_dump ccr_state_compare @@ -82,10 +82,12 @@ STAGE_DIR="$BUILD_ROOT/$PACKAGE_DIR_NAME" ARCHIVE_PATH="$OUT_DIR/$PACKAGE_NAME.tar.gz" rm -rf "$STAGE_DIR" "$ARCHIVE_PATH" -mkdir -p "$STAGE_DIR/bin" "$STAGE_DIR/fixtures" "$STAGE_DIR/scripts" \ +mkdir -p "$STAGE_DIR/bin" "$STAGE_DIR/fixtures" "$STAGE_DIR/scripts" "$STAGE_DIR/scripts/soak" \ "$STAGE_DIR/runs" "$STAGE_DIR/state" "$STAGE_DIR/logs" "$STAGE_DIR/tmp" install -m 0755 "$SCRIPT_DIR/run_soak.sh" "$STAGE_DIR/run_soak.sh" +install -m 0755 "$SCRIPT_DIR/run_24h_soak_with_metrics.sh" "$STAGE_DIR/run_24h_soak_with_metrics.sh" +install -m 0755 "$SCRIPT_DIR/hourly_soak_report.py" "$STAGE_DIR/scripts/soak/hourly_soak_report.py" install -m 0644 "$SCRIPT_DIR/portable-soak.env.example" "$STAGE_DIR/.env" install -m 0644 "$SCRIPT_DIR/portable-soak.env.example" "$STAGE_DIR/portable-soak.env.example" @@ -112,6 +114,7 @@ cp -a "$REPO_ROOT/tests/fixtures/tal" "$STAGE_DIR/fixtures/" cp -a "$REPO_ROOT/tests/fixtures/ta" "$STAGE_DIR/fixtures/" cp -a "$REPO_ROOT/scripts/periodic" "$STAGE_DIR/scripts/" cp -a "$REPO_ROOT/scripts/cir" "$STAGE_DIR/scripts/" +cp -a "$REPO_ROOT/monitor" "$STAGE_DIR/" find "$STAGE_DIR/scripts" -type d -name __pycache__ -prune -exec rm -rf {} + (cd "$STAGE_DIR" && find fixtures -type f | sort > fixtures.txt) diff --git a/scripts/soak/hourly_soak_report.py b/scripts/soak/hourly_soak_report.py new file mode 100755 index 0000000..81e2e90 --- /dev/null +++ b/scripts/soak/hourly_soak_report.py @@ -0,0 +1,552 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from statistics import median +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + + +def parse_rfc3339(value: str) -> datetime: + normalized = value.strip() + if normalized.endswith("Z"): + normalized = normalized[:-1] + "+00:00" + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def format_rfc3339(value: datetime) -> str: + return value.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def default_window() -> tuple[datetime, datetime]: + window_end = datetime.now(timezone.utc) + return window_end - timedelta(hours=1), window_end + + +def read_json(path: Path) -> dict[str, Any] | None: + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + +def nested_get(data: dict[str, Any] | None, keys: list[str], default: Any = None) -> Any: + current: Any = data + for key in keys: + if not isinstance(current, dict) or key not in current: + return default + current = current[key] + return current + + +def as_int(value: Any, default: int = 0) -> int: + if value is None: + return default + try: + return int(value) + except (TypeError, ValueError): + return default + + +def as_float(value: Any, default: float = 0.0) -> float: + if value is None: + return default + try: + return float(value) + except (TypeError, ValueError): + return default + + +def bytes_to_mib(value: int | None) -> str: + if value is None: + return "-" + return f"{value / 1024 / 1024:.1f}" + + +def millis_to_seconds_text(value: int | None) -> str: + if value is None: + return "-" + return f"{value / 1000:.3f}" + + +def seconds_text(value: float | None) -> str: + if value is None: + return "-" + return f"{value:.3f}" + + +def percent_text(numerator: int, denominator: int) -> str: + if denominator <= 0: + return "-" + return f"{(numerator / denominator) * 100:.1f}%" + + +@dataclass +class RunRecord: + run_id: str + run_index: int + run_dir: Path + status: str + sync_mode: str + started_at: datetime | None + completed_at: datetime | None + wall_ms: int | None + validation_ms: int | None + repo_sync_ms_total: int | None + rrdp_download_ms_total: int | None + rsync_download_ms_total: int | None + download_bytes_total: int | None + max_rss_bytes: int | None + vrps: int + vaps: int + publication_points: int + warnings: int + repo_sync_stats: dict[str, Any] + artifact_sizes: dict[str, int] + parse_errors: list[str] + incident_reasons: list[str] + + +def load_run(run_dir: Path, window_start: datetime, window_end: datetime, args: argparse.Namespace) -> RunRecord | None: + meta = read_json(run_dir / "run-meta.json") + summary = read_json(run_dir / "run-summary.json") + if meta is None: + return None + + completed_raw = meta.get("completed_at_rfc3339_utc") or nested_get(summary, ["finishedAtRfc3339Utc"]) + if not completed_raw: + return None + try: + completed_at = parse_rfc3339(str(completed_raw)) + except ValueError: + return None + if completed_at < window_start or completed_at >= window_end: + return None + + started_at = None + started_raw = meta.get("started_at_rfc3339_utc") or nested_get(summary, ["startedAtRfc3339Utc"]) + if started_raw: + try: + started_at = parse_rfc3339(str(started_raw)) + except ValueError: + started_at = None + + status = str(meta.get("status") or nested_get(summary, ["status"], "unknown")) + sync_mode = str(meta.get("sync_mode") or "unknown") + run_index = as_int(meta.get("run_index"), default=0) + run_id = str(meta.get("run_id") or run_dir.name) + stage = nested_get(summary, ["stageTiming"], {}) or {} + process_metrics = nested_get(summary, ["processMetrics"], {}) or {} + report_counts = nested_get(summary, ["reportCounts"], {}) or {} + repo_sync_stats = nested_get(summary, ["repoSyncStats"], {}) or {} + wall_ms = nested_get(summary, ["wallMs"]) + max_rss_kb = nested_get(process_metrics, ["maxRssKb"]) + artifact_sizes = {} + for artifact in nested_get(summary, ["artifacts"], []) or []: + if isinstance(artifact, dict): + artifact_path = str(artifact.get("path", "")) + artifact_sizes[Path(artifact_path).name] = as_int(artifact.get("sizeBytes")) + + parse_errors = [] + if summary is None: + parse_errors.append("missing_or_invalid_run_summary") + if not (run_dir / "stage-timing.json").exists(): + parse_errors.append("missing_stage_timing") + if not (run_dir / "process-time.txt").exists(): + parse_errors.append("missing_process_time") + + record = RunRecord( + run_id=run_id, + run_index=run_index, + run_dir=run_dir, + status=status, + sync_mode=sync_mode, + started_at=started_at, + completed_at=completed_at, + wall_ms=as_int(wall_ms) if wall_ms is not None else None, + validation_ms=as_int(stage.get("validation_ms")) if stage else None, + repo_sync_ms_total=as_int(stage.get("repo_sync_ms_total")) if stage else None, + rrdp_download_ms_total=as_int(stage.get("rrdp_download_ms_total")) if stage else None, + rsync_download_ms_total=as_int(stage.get("rsync_download_ms_total")) if stage else None, + download_bytes_total=as_int(stage.get("download_bytes_total")) if stage else None, + max_rss_bytes=as_int(max_rss_kb) * 1024 if max_rss_kb is not None else None, + vrps=as_int(report_counts.get("vrps")), + vaps=as_int(report_counts.get("aspas")), + publication_points=as_int(report_counts.get("publicationPoints")), + warnings=as_int(report_counts.get("warnings")), + repo_sync_stats=repo_sync_stats if isinstance(repo_sync_stats, dict) else {}, + artifact_sizes=artifact_sizes, + parse_errors=parse_errors, + incident_reasons=[], + ) + record.incident_reasons = classify_incident(record, args) + return record + + +def classify_incident(record: RunRecord, args: argparse.Namespace) -> list[str]: + reasons = [] + if record.status != "success": + reasons.append(f"status={record.status}") + if record.wall_ms is not None and record.wall_ms > args.wall_warn_secs * 1000: + reasons.append(f"wall>{args.wall_warn_secs}s") + if record.vrps < args.vrp_min: + reasons.append(f"vrps<{args.vrp_min}") + if record.vaps < args.vaps_min: + reasons.append(f"vaps<{args.vaps_min}") + if record.publication_points < args.pp_min: + reasons.append(f"pp<{args.pp_min}") + if args.warning_max >= 0 and record.warnings > args.warning_max: + reasons.append(f"warnings>{args.warning_max}") + reasons.extend(record.parse_errors) + return reasons + + +def discover_runs(run_root: Path, window_start: datetime, window_end: datetime, args: argparse.Namespace) -> list[RunRecord]: + runs_root = run_root / "runs" + records = [] + for run_dir in sorted(runs_root.glob("run_[0-9][0-9][0-9][0-9]")): + if not run_dir.is_dir(): + continue + record = load_run(run_dir, window_start, window_end, args) + if record is not None: + records.append(record) + return records + + +def percentile(values: list[float], target_percentile: float) -> float | None: + if not values: + return None + ordered = sorted(values) + index = int(round((target_percentile / 100.0) * (len(ordered) - 1))) + return ordered[index] + + +def aggregate_count_duration(records: list[RunRecord], group_key: str) -> dict[str, dict[str, int]]: + aggregate: dict[str, dict[str, int]] = {} + for record in records: + group = record.repo_sync_stats.get(group_key, {}) + if not isinstance(group, dict): + continue + for name, value in group.items(): + if not isinstance(value, dict): + continue + bucket = aggregate.setdefault(str(name), {"count": 0, "duration_ms_total": 0}) + bucket["count"] += as_int(value.get("count")) + bucket["duration_ms_total"] += as_int(value.get("duration_ms_total")) + return aggregate + + +def copy_tree_preserve(source_dir: Path, target_dir: Path) -> None: + if target_dir.exists(): + return + target_dir.parent.mkdir(parents=True, exist_ok=True) + try: + shutil.copytree(source_dir, target_dir, copy_function=os.link) + except OSError: + if target_dir.exists(): + shutil.rmtree(target_dir) + shutil.copytree(source_dir, target_dir) + + +def preserve_incidents(records: list[RunRecord], incident_dir: Path) -> list[dict[str, Any]]: + preserved = [] + for record in records: + if not record.incident_reasons: + continue + target_dir = incident_dir / record.run_id + copy_tree_preserve(record.run_dir, target_dir) + preserved.append( + { + "runId": record.run_id, + "sourceDir": str(record.run_dir), + "incidentDir": str(target_dir), + "reasons": record.incident_reasons, + } + ) + return preserved + + +def disk_snapshot(path: Path) -> dict[str, Any]: + usage = shutil.disk_usage(path) + return { + "path": str(path), + "totalBytes": usage.total, + "usedBytes": usage.used, + "freeBytes": usage.free, + "usedPercent": (usage.used / usage.total) * 100 if usage.total else 0.0, + } + + +def build_summary( + run_root: Path, + window_start: datetime, + window_end: datetime, + records: list[RunRecord], + preserved: list[dict[str, Any]], +) -> dict[str, Any]: + wall_seconds = [record.wall_ms / 1000 for record in records if record.wall_ms is not None] + validation_seconds = [ + record.validation_ms / 1000 for record in records if record.validation_ms is not None + ] + max_rss_values = [record.max_rss_bytes for record in records if record.max_rss_bytes is not None] + success_count = sum(1 for record in records if record.status == "success") + failed_count = sum(1 for record in records if record.status != "success") + snapshot_count = sum(1 for record in records if record.sync_mode == "snapshot") + delta_count = sum(1 for record in records if record.sync_mode == "delta") + incident_count = sum(1 for record in records if record.incident_reasons) + + def range_for(values: list[int]) -> dict[str, int | None]: + return {"min": min(values) if values else None, "max": max(values) if values else None} + + return { + "schemaVersion": 1, + "generatedAtUtc": format_rfc3339(datetime.now(timezone.utc)), + "windowStartUtc": format_rfc3339(window_start), + "windowEndUtc": format_rfc3339(window_end), + "runRoot": str(run_root), + "runs": { + "total": len(records), + "success": success_count, + "failed": failed_count, + "snapshot": snapshot_count, + "delta": delta_count, + "incidents": incident_count, + }, + "wallSeconds": { + "min": min(wall_seconds) if wall_seconds else None, + "median": median(wall_seconds) if wall_seconds else None, + "p95": percentile(wall_seconds, 95), + "max": max(wall_seconds) if wall_seconds else None, + }, + "validationSeconds": { + "min": min(validation_seconds) if validation_seconds else None, + "median": median(validation_seconds) if validation_seconds else None, + "max": max(validation_seconds) if validation_seconds else None, + }, + "maxRssBytes": { + "max": max(max_rss_values) if max_rss_values else None, + }, + "outputs": { + "vrps": range_for([record.vrps for record in records]), + "vaps": range_for([record.vaps for record in records]), + "publicationPoints": range_for([record.publication_points for record in records]), + "warnings": range_for([record.warnings for record in records]), + }, + "repoSyncByPhase": aggregate_count_duration(records, "by_phase"), + "repoSyncByTerminalState": aggregate_count_duration(records, "by_terminal_state"), + "downloadBytesTotal": sum(record.download_bytes_total or 0 for record in records), + "preservedIncidents": preserved, + "disk": disk_snapshot(run_root), + } + + +def render_markdown(summary: dict[str, Any], records: list[RunRecord]) -> str: + lines = [ + "# Ours RP 24h Soak 小时级报告", + "", + f"- Window: `{summary['windowStartUtc']}` → `{summary['windowEndUtc']}`", + f"- Generated: `{summary['generatedAtUtc']}`", + f"- Run root: `{summary['runRoot']}`", + "", + "## 汇总", + "", + "| 指标 | 值 |", + "|---|---:|", + f"| Runs | {summary['runs']['total']} |", + f"| Success / Failed | {summary['runs']['success']} / {summary['runs']['failed']} |", + f"| Snapshot / Delta | {summary['runs']['snapshot']} / {summary['runs']['delta']} |", + f"| Incidents | {summary['runs']['incidents']} |", + f"| Wall min / median / p95 / max (s) | {seconds_text(summary['wallSeconds']['min'])} / {seconds_text(summary['wallSeconds']['median'])} / {seconds_text(summary['wallSeconds']['p95'])} / {seconds_text(summary['wallSeconds']['max'])} |", + f"| Validation min / median / max (s) | {seconds_text(summary['validationSeconds']['min'])} / {seconds_text(summary['validationSeconds']['median'])} / {seconds_text(summary['validationSeconds']['max'])} |", + f"| Max RSS max (MiB) | {bytes_to_mib(summary['maxRssBytes']['max'])} |", + f"| VRPs range | {summary['outputs']['vrps']['min']} - {summary['outputs']['vrps']['max']} |", + f"| VAPs range | {summary['outputs']['vaps']['min']} - {summary['outputs']['vaps']['max']} |", + f"| PP range | {summary['outputs']['publicationPoints']['min']} - {summary['outputs']['publicationPoints']['max']} |", + f"| Disk used | {summary['disk']['usedPercent']:.1f}% |", + "", + "## 每轮明细", + "", + "| Run | Mode | Status | Wall(s) | Validation(s) | RRDP(s) | Rsync(s) | Max RSS(MiB) | VRPs | VAPs | PP | Warnings | Incident |", + "|---|---|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---|", + ] + for record in records: + incident = ", ".join(record.incident_reasons) if record.incident_reasons else "" + lines.append( + "| " + + " | ".join( + [ + record.run_id, + record.sync_mode, + record.status, + millis_to_seconds_text(record.wall_ms), + millis_to_seconds_text(record.validation_ms), + millis_to_seconds_text(record.rrdp_download_ms_total), + millis_to_seconds_text(record.rsync_download_ms_total), + bytes_to_mib(record.max_rss_bytes), + str(record.vrps), + str(record.vaps), + str(record.publication_points), + str(record.warnings), + incident, + ] + ) + + " |" + ) + + lines.extend(["", "## Repo Sync 聚合", "", "### By Phase", "", "| Phase | Count | Duration(s) |", "|---|---:|---:|"]) + for phase, value in sorted(summary["repoSyncByPhase"].items()): + lines.append(f"| {phase} | {value['count']} | {value['duration_ms_total'] / 1000:.3f} |") + lines.extend(["", "### By Terminal State", "", "| State | Count | Duration(s) |", "|---|---:|---:|"]) + for state, value in sorted(summary["repoSyncByTerminalState"].items()): + lines.append(f"| {state} | {value['count']} | {value['duration_ms_total'] / 1000:.3f} |") + + lines.extend(["", "## Incident 固化", "", "| Run | Incident Dir | Reasons |", "|---|---|---|"]) + for incident in summary["preservedIncidents"]: + lines.append( + f"| {incident['runId']} | `{incident['incidentDir']}` | {', '.join(incident['reasons'])} |" + ) + if not summary["preservedIncidents"]: + lines.append("| - | - | - |") + lines.append("") + return "\n".join(lines) + + +def append_jsonl(path: Path, summary: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(summary, ensure_ascii=False, sort_keys=True) + "\n") + + +def send_feishu_with_script(script_path: Path, message: str, dry_run: bool) -> int: + command = [sys.executable, str(script_path), "--stdin"] + if dry_run: + command.append("--dry-run") + completed = subprocess.run(command, input=message, text=True, check=False) + return completed.returncode + + +def send_feishu_with_webhook(message: str, timeout_seconds: float = 10.0) -> int: + webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", "").strip() + if not webhook_url: + return 2 + final_text = message if message.lower().startswith("from codex:") else f"From codex: {message}" + payload = json.dumps( + {"msg_type": "text", "content": {"text": final_text}}, + ensure_ascii=False, + ).encode("utf-8") + request = Request( + webhook_url, + data=payload, + method="POST", + headers={"Content-Type": "application/json"}, + ) + try: + with urlopen(request, timeout=timeout_seconds) as response: + body = response.read().decode("utf-8", errors="replace") + if getattr(response, "status", 0) != 200: + print(body, file=sys.stderr) + return 1 + parsed = json.loads(body) + if parsed.get("code") == 0 or parsed.get("StatusCode") == 0: + return 0 + print(body, file=sys.stderr) + return 1 + except (HTTPError, URLError, json.JSONDecodeError) as error: + print(f"feishu send failed: {error}", file=sys.stderr) + return 1 + + +def build_feishu_message(summary: dict[str, Any], report_path: Path) -> str: + return ( + "#062 24h soak hourly " + f"{summary['windowStartUtc']}..{summary['windowEndUtc']} " + f"runs={summary['runs']['total']} ok={summary['runs']['success']} fail={summary['runs']['failed']} " + f"incidents={summary['runs']['incidents']} " + f"wall_s={seconds_text(summary['wallSeconds']['median'])}/{seconds_text(summary['wallSeconds']['max'])} " + f"vrps={summary['outputs']['vrps']['min']}-{summary['outputs']['vrps']['max']} " + f"vaps={summary['outputs']['vaps']['min']}-{summary['outputs']['vaps']['max']} " + f"report={report_path}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Generate hourly reports for portable ours RP soak runs.") + parser.add_argument("--run-root", required=True, type=Path) + parser.add_argument("--reports-dir", type=Path) + parser.add_argument("--incident-dir", type=Path) + parser.add_argument("--window-start", help="RFC3339 UTC timestamp. Defaults to now-1h.") + parser.add_argument("--window-end", help="RFC3339 UTC timestamp. Defaults to now.") + parser.add_argument("--report-name", help="Override report filename.") + parser.add_argument("--wall-warn-secs", type=int, default=140) + parser.add_argument("--vrp-min", type=int, default=900_000) + parser.add_argument("--vaps-min", type=int, default=1_000) + parser.add_argument("--pp-min", type=int, default=50_000) + parser.add_argument( + "--warning-max", + type=int, + default=-1, + help="Incident threshold for warnings; negative disables warning-based incidents.", + ) + parser.add_argument("--send-feishu", action="store_true") + parser.add_argument("--dry-run-feishu", action="store_true") + parser.add_argument("--feishu-script", type=Path) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + if args.window_start and args.window_end: + window_start = parse_rfc3339(args.window_start) + window_end = parse_rfc3339(args.window_end) + else: + window_start, window_end = default_window() + if window_end <= window_start: + raise SystemExit("--window-end must be later than --window-start") + + run_root = args.run_root.resolve() + reports_dir = (args.reports_dir or run_root / "hourly_reports").resolve() + incident_dir = (args.incident_dir or run_root / "incident_runs").resolve() + reports_dir.mkdir(parents=True, exist_ok=True) + records = discover_runs(run_root, window_start, window_end, args) + preserved = preserve_incidents(records, incident_dir) + summary = build_summary(run_root, window_start, window_end, records, preserved) + + report_name = args.report_name or f"hour_{format_rfc3339(window_start).replace(':', '').replace('-', '')}.md" + report_path = reports_dir / report_name + report_path.write_text(render_markdown(summary, records), encoding="utf-8") + append_jsonl(reports_dir / "hourly_summary.jsonl", summary) + + if args.send_feishu: + message = build_feishu_message(summary, report_path) + feishu_script = args.feishu_script + if feishu_script is None: + env_script = os.environ.get("FEISHU_WEBHOOK_SCRIPT", "").strip() + feishu_script = Path(env_script) if env_script else None + if feishu_script and feishu_script.exists(): + return send_feishu_with_script(feishu_script, message, args.dry_run_feishu) + if args.dry_run_feishu: + print(f"DRY RUN FEISHU: {message}") + else: + result = send_feishu_with_webhook(message) + if result != 0: + print("Feishu not sent: configure --feishu-script or FEISHU_WEBHOOK_URL", file=sys.stderr) + return result + + print(report_path) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/soak/portable-soak.env.example b/scripts/soak/portable-soak.env.example index f6133f9..aa888a1 100644 --- a/scripts/soak/portable-soak.env.example +++ b/scripts/soak/portable-soak.env.example @@ -8,6 +8,10 @@ MAX_RUNS=3 # 两轮之间等待秒数。做连续无等待验收时设置为 0。 INTERVAL_SECS=0 +# 连续运行的时间预算,单位秒。0 表示不启用;启用后会在每轮结束时检查并自然停止。 +# 24 小时稳定性测试设置为 86400。 +STOP_AFTER_SECS=0 + # 要运行的 RIR 列表,逗号分隔。 # 合法值只有:afrinic, apnic, arin, lacnic, ripe。 # 示例:RIRS=apnic,arin 或 RIRS=afrinic,apnic,arin,lacnic,ripe @@ -16,9 +20,12 @@ RIRS=afrinic,apnic,arin,lacnic,ripe # 运行根目录。默认使用 package 根目录;如需把产物写到独立数据盘,可改成绝对路径。 RUN_ROOT="${PACKAGE_ROOT}" -# 保留最近多少轮 run 目录。持续运行模式建议设置为 5 或按磁盘容量评估。 +# 保留最近多少轮 run 目录。24 小时稳定性测试默认使用最近 100 轮。 RETAIN_RUNS=10 +# 每轮完成并复制出 run 产物后,是否清理 tmp/daemon-run_xxxx,避免 tmp 与 runs 双份占用。 +CLEAN_TMP_AFTER_RUN=0 + # 是否输出 compact report JSON。1 表示启用,0 表示关闭。 OUTPUT_COMPACT_REPORT=1 @@ -62,6 +69,54 @@ RPKI_EXTRA_ARGS="" # 性能 profile 或打点验证时设置为 1;普通 soak 建议保持 0,避免额外开销。 RPKI_ANALYZE=0 +# 长期稳定性测试 runner 配置。运行命令:./run_24h_soak_with_metrics.sh +# 总运行时长,单位秒;0 表示持续运行不自动停止;如需 24 小时自然停止可设置为 86400。 +SOAK_DURATION_SECS=0 + +# 小时报告间隔,单位秒。正式运行使用 3600;短周期验证可设置为 60。 +HOURLY_REPORT_INTERVAL_SECS=3600 + +# 24 小时 runner 覆盖 run 保留数量;按 #062 要求默认保留最近 100 个 run。 +SOAK_RETAIN_RUNS=100 + +# 是否启动 rpki_artifact_metrics sidecar。 +START_METRICS_SERVICE=1 + +# 24 小时 runner 退出时是否停止 metrics sidecar。默认保留,方便 Prometheus/Grafana 继续展示最后状态。 +STOP_METRICS_SERVICE_ON_EXIT=0 + +# metrics sidecar 监听地址。Prometheus 容器通过 host.docker.internal 访问。 +METRICS_LISTEN=0.0.0.0:9556 +METRICS_POLL_SECS=5 +METRICS_INSTANCE=remote231-24h + +# 是否启动 package 内置 Prometheus/Grafana monitor stack。 +START_MONITOR_STACK=1 + +# 24 小时 runner 退出时是否停止 monitor stack。默认保留,方便继续查看最后状态;数据 volume 会保留。 +STOP_MONITOR_STACK_ON_EXIT=0 + +# Prometheus/Grafana 监控数据保留周期。 +PROMETHEUS_RETENTION=7d + +# 是否发送每小时 Feishu 摘要。需要配置 FEISHU_WEBHOOK_SCRIPT 或 FEISHU_WEBHOOK_URL。 +SEND_FEISHU=1 + +# 只打印 Feishu 消息,不真实发送。联调时可设置为 1。 +FEISHU_DRY_RUN=0 + +# 可选:复用 feishu-webhook skill 的发送脚本路径。 +# 示例:FEISHU_WEBHOOK_SCRIPT=/home/yuyr/.codex/skills/user/feishu-webhook/scripts/send_feishu_text.py +FEISHU_WEBHOOK_SCRIPT= + +# 小时报告异常判定阈值。 +WALL_WARN_SECS=140 +VRP_MIN=900000 +VAPS_MIN=1000 +PP_MIN=50000 +# warning 只汇总展示;默认不作为 incident 条件。设置为非负数才启用阈值。 +WARNING_MAX=-1 + # 可选覆盖路径;默认由 package 自动推导。 # BIN_DIR="${PACKAGE_ROOT}/bin" # FIXTURE_DIR="${PACKAGE_ROOT}/fixtures" diff --git a/scripts/soak/run_24h_soak_with_metrics.sh b/scripts/soak/run_24h_soak_with_metrics.sh new file mode 100755 index 0000000..6db8fc0 --- /dev/null +++ b/scripts/soak/run_24h_soak_with_metrics.sh @@ -0,0 +1,226 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PACKAGE_ROOT="${PACKAGE_ROOT:-$SCRIPT_DIR}" +ENV_FILE="${ENV_FILE:-$PACKAGE_ROOT/.env}" + +if [[ -f "$ENV_FILE" ]]; then + # shellcheck disable=SC1090 + source "$ENV_FILE" +fi + +RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}" +BIN_DIR="${BIN_DIR:-$PACKAGE_ROOT/bin}" +LOG_ROOT="${LOG_ROOT:-$RUN_ROOT/logs}" +REPORTS_DIR="${HOURLY_REPORTS_DIR:-$RUN_ROOT/hourly_reports}" +INCIDENT_DIR="${INCIDENT_DIR:-$RUN_ROOT/incident_runs}" +MONITOR_DIR="${MONITOR_DIR:-$PACKAGE_ROOT/monitor}" +SOAK_SCRIPT="${SOAK_SCRIPT:-$PACKAGE_ROOT/run_soak.sh}" +HOURLY_REPORT_SCRIPT="${HOURLY_REPORT_SCRIPT:-$PACKAGE_ROOT/scripts/soak/hourly_soak_report.py}" + +SOAK_DURATION_SECS="${SOAK_DURATION_SECS:-0}" +HOURLY_REPORT_INTERVAL_SECS="${HOURLY_REPORT_INTERVAL_SECS:-3600}" +SOAK_RETAIN_RUNS="${SOAK_RETAIN_RUNS:-100}" +CLEAN_TMP_AFTER_RUN="${CLEAN_TMP_AFTER_RUN:-1}" +START_MONITOR_STACK="${START_MONITOR_STACK:-1}" +STOP_MONITOR_STACK_ON_EXIT="${STOP_MONITOR_STACK_ON_EXIT:-0}" +START_METRICS_SERVICE="${START_METRICS_SERVICE:-1}" +STOP_METRICS_SERVICE_ON_EXIT="${STOP_METRICS_SERVICE_ON_EXIT:-0}" +METRICS_LISTEN="${METRICS_LISTEN:-0.0.0.0:9556}" +METRICS_POLL_SECS="${METRICS_POLL_SECS:-5}" +METRICS_INSTANCE="${METRICS_INSTANCE:-remote231-24h}" +PROMETHEUS_RETENTION="${PROMETHEUS_RETENTION:-7d}" +SEND_FEISHU="${SEND_FEISHU:-1}" +FEISHU_DRY_RUN="${FEISHU_DRY_RUN:-0}" +FEISHU_WEBHOOK_SCRIPT="${FEISHU_WEBHOOK_SCRIPT:-}" +FEISHU_WEBHOOK_URL="${FEISHU_WEBHOOK_URL:-}" +export FEISHU_WEBHOOK_URL + +WALL_WARN_SECS="${WALL_WARN_SECS:-140}" +VRP_MIN="${VRP_MIN:-900000}" +VAPS_MIN="${VAPS_MIN:-1000}" +PP_MIN="${PP_MIN:-50000}" +WARNING_MAX="${WARNING_MAX:--1}" + +SOAK_PID="" +METRICS_PID="" +REPORTER_STOP=0 + +die() { + echo "error: $*" >&2 + exit 2 +} + +is_true() { + case "${1:-}" in + 1|true|TRUE|yes|YES|on|ON) return 0 ;; + *) return 1 ;; + esac +} + +validate_non_negative_int() { + local name="$1" + local value="$2" + [[ "$value" =~ ^[0-9]+$ ]] || die "$name must be a non-negative integer: $value" +} + +cleanup() { + REPORTER_STOP=1 + if [[ -n "$SOAK_PID" ]] && kill -0 "$SOAK_PID" >/dev/null 2>&1; then + kill "$SOAK_PID" >/dev/null 2>&1 || true + wait "$SOAK_PID" >/dev/null 2>&1 || true + fi + if is_true "$STOP_METRICS_SERVICE_ON_EXIT" && [[ -n "$METRICS_PID" ]] && kill -0 "$METRICS_PID" >/dev/null 2>&1; then + kill "$METRICS_PID" >/dev/null 2>&1 || true + wait "$METRICS_PID" >/dev/null 2>&1 || true + fi + if is_true "$START_MONITOR_STACK" && is_true "$STOP_MONITOR_STACK_ON_EXIT" && [[ -f "$MONITOR_DIR/docker-compose.yml" ]]; then + (cd "$MONITOR_DIR" && PROMETHEUS_RETENTION="$PROMETHEUS_RETENTION" docker compose down) >/dev/null 2>&1 || true + fi +} + +trap cleanup EXIT INT TERM + +run_hourly_report() { + local window_start="$1" + local window_end="$2" + local feishu_args=() + if is_true "$SEND_FEISHU"; then + feishu_args+=(--send-feishu) + if is_true "$FEISHU_DRY_RUN"; then + feishu_args+=(--dry-run-feishu) + fi + if [[ -n "$FEISHU_WEBHOOK_SCRIPT" ]]; then + feishu_args+=(--feishu-script "$FEISHU_WEBHOOK_SCRIPT") + fi + fi + python3 "$HOURLY_REPORT_SCRIPT" \ + --run-root "$RUN_ROOT" \ + --reports-dir "$REPORTS_DIR" \ + --incident-dir "$INCIDENT_DIR" \ + --window-start "$window_start" \ + --window-end "$window_end" \ + --wall-warn-secs "$WALL_WARN_SECS" \ + --vrp-min "$VRP_MIN" \ + --vaps-min "$VAPS_MIN" \ + --pp-min "$PP_MIN" \ + --warning-max "$WARNING_MAX" \ + "${feishu_args[@]}" \ + >> "$LOG_ROOT/hourly-reporter.stdout" 2>> "$LOG_ROOT/hourly-reporter.stderr" || true +} + +format_epoch_rfc3339() { + date -u -d "@$1" +%Y-%m-%dT%H:%M:%SZ +} + +main() { + validate_non_negative_int "SOAK_DURATION_SECS" "$SOAK_DURATION_SECS" + validate_non_negative_int "HOURLY_REPORT_INTERVAL_SECS" "$HOURLY_REPORT_INTERVAL_SECS" + [[ "$HOURLY_REPORT_INTERVAL_SECS" != "0" ]] || die "HOURLY_REPORT_INTERVAL_SECS must be > 0" + [[ -x "$SOAK_SCRIPT" ]] || die "missing executable: $SOAK_SCRIPT" + [[ -x "$BIN_DIR/rpki_artifact_metrics" ]] || die "missing executable: $BIN_DIR/rpki_artifact_metrics" + [[ -f "$HOURLY_REPORT_SCRIPT" ]] || die "missing hourly report script: $HOURLY_REPORT_SCRIPT" + + mkdir -p "$LOG_ROOT" "$REPORTS_DIR" "$INCIDENT_DIR" + df -h > "$LOG_ROOT/24h-df-before.txt" 2>&1 || true + free -h > "$LOG_ROOT/24h-free-before.txt" 2>&1 || true + + if is_true "$START_METRICS_SERVICE"; then + "$BIN_DIR/rpki_artifact_metrics" \ + --run-root "$RUN_ROOT" \ + --listen "$METRICS_LISTEN" \ + --poll-secs "$METRICS_POLL_SECS" \ + --instance "$METRICS_INSTANCE" \ + > "$LOG_ROOT/metrics.stdout" 2> "$LOG_ROOT/metrics.stderr" & + METRICS_PID="$!" + echo "$METRICS_PID" > "$LOG_ROOT/metrics.pid" + fi + + if is_true "$START_MONITOR_STACK"; then + if [[ ! -f "$MONITOR_DIR/docker-compose.yml" ]]; then + die "missing monitor compose: $MONITOR_DIR/docker-compose.yml" + fi + (cd "$MONITOR_DIR" && PROMETHEUS_RETENTION="$PROMETHEUS_RETENTION" docker compose up -d) \ + > "$LOG_ROOT/monitor-start.stdout" 2> "$LOG_ROOT/monitor-start.stderr" + fi + + local start_epoch + local deadline_epoch + local next_report_epoch + local window_start_epoch + local now_epoch + local window_start + local window_end + local run_soak_env + start_epoch="$(date +%s)" + if (( SOAK_DURATION_SECS > 0 )); then + deadline_epoch=$((start_epoch + SOAK_DURATION_SECS)) + else + deadline_epoch=0 + fi + next_report_epoch=$((start_epoch + HOURLY_REPORT_INTERVAL_SECS)) + window_start_epoch="$start_epoch" + + run_soak_env="$LOG_ROOT/24h-run-soak.env" + { + if [[ -f "$ENV_FILE" ]]; then + cat "$ENV_FILE" + fi + printf '\n# Generated by run_24h_soak_with_metrics.sh\n' + printf 'MAX_RUNS=-1\n' + printf 'INTERVAL_SECS=0\n' + if (( SOAK_DURATION_SECS > 0 )); then + printf 'STOP_AFTER_SECS=%q\n' "$SOAK_DURATION_SECS" + else + printf 'STOP_AFTER_SECS=0\n' + fi + printf 'RETAIN_RUNS=%q\n' "$SOAK_RETAIN_RUNS" + printf 'CLEAN_TMP_AFTER_RUN=%q\n' "$CLEAN_TMP_AFTER_RUN" + } > "$run_soak_env" + + env \ + ENV_FILE="$run_soak_env" \ + "$SOAK_SCRIPT" \ + > "$LOG_ROOT/24h-soak.stdout" 2> "$LOG_ROOT/24h-soak.stderr" & + SOAK_PID="$!" + echo "$SOAK_PID" > "$LOG_ROOT/24h-soak.pid" + + while kill -0 "$SOAK_PID" >/dev/null 2>&1; do + if (( REPORTER_STOP == 1 )); then + break + fi + now_epoch="$(date +%s)" + if (( now_epoch >= next_report_epoch )); then + window_start="$(format_epoch_rfc3339 "$window_start_epoch")" + window_end="$(format_epoch_rfc3339 "$now_epoch")" + run_hourly_report "$window_start" "$window_end" + window_start_epoch="$now_epoch" + next_report_epoch=$((now_epoch + HOURLY_REPORT_INTERVAL_SECS)) + fi + if (( deadline_epoch > 0 && now_epoch > deadline_epoch + HOURLY_REPORT_INTERVAL_SECS + 300 )); then + echo "deadline grace exceeded; waiting for soak process pid=$SOAK_PID" >&2 + fi + sleep 5 + done + + local soak_exit_code + set +e + wait "$SOAK_PID" + soak_exit_code=$? + set -e + SOAK_PID="" + + now_epoch="$(date +%s)" + if (( now_epoch > window_start_epoch )); then + window_start="$(format_epoch_rfc3339 "$window_start_epoch")" + window_end="$(format_epoch_rfc3339 "$now_epoch")" + run_hourly_report "$window_start" "$window_end" + fi + + df -h > "$LOG_ROOT/24h-df-after.txt" 2>&1 || true + free -h > "$LOG_ROOT/24h-free-after.txt" 2>&1 || true + return "$soak_exit_code" +} + +main "$@" diff --git a/scripts/soak/run_soak.sh b/scripts/soak/run_soak.sh index 03467d7..10b0cde 100755 --- a/scripts/soak/run_soak.sh +++ b/scripts/soak/run_soak.sh @@ -12,9 +12,11 @@ fi MAX_RUNS="${MAX_RUNS:-3}" INTERVAL_SECS="${INTERVAL_SECS:-0}" +STOP_AFTER_SECS="${STOP_AFTER_SECS:-0}" RIRS="${RIRS:-afrinic,apnic,arin,lacnic,ripe}" RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}" RETAIN_RUNS="${RETAIN_RUNS:-10}" +CLEAN_TMP_AFTER_RUN="${CLEAN_TMP_AFTER_RUN:-0}" OUTPUT_COMPACT_REPORT="${OUTPUT_COMPACT_REPORT:-1}" ALLOW_RSYNC_MIRROR_REUSE="${ALLOW_RSYNC_MIRROR_REUSE:-1}" RSYNC_SCOPE="${RSYNC_SCOPE:-module-root}" @@ -591,6 +593,9 @@ run_one_round() { "$snapshot_reason" "$previous_run_id" "$previous_success_value" "$started_at" "$completed_at" \ "$INVALID_DB_PATH" "$INVALID_STATE_PATH" "$INVALID_TMP_PATH" "$daemon_exit_code" "$PACKAGE_ROOT" "$ENV_FILE" printf '%s\n' "$run_id" > "$META_DIR/last-run-id" + if is_true "$CLEAN_TMP_AFTER_RUN"; then + rm -rf "$daemon_state_root" + fi apply_outer_retention [[ "$final_status" == "success" ]] } @@ -605,6 +610,7 @@ main() { require_command find validate_max_runs validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS" + validate_non_negative_int "STOP_AFTER_SECS" "$STOP_AFTER_SECS" validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS" validate_rsync_scope if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then @@ -631,6 +637,9 @@ main() { local next_index local run_forever=0 local stop_index=0 + local started_epoch + local elapsed_secs + started_epoch="$(date +%s)" max_index="$(max_existing_run_index)" next_index=$((max_index + 1)) if (( MAX_RUNS < 0 )); then @@ -687,6 +696,13 @@ main() { echo "completed run $(printf 'run_%04d' "$next_index") status=failed" >&2 any_failed=1 fi + if (( STOP_AFTER_SECS > 0 )); then + elapsed_secs=$(( $(date +%s) - started_epoch )) + if (( elapsed_secs >= STOP_AFTER_SECS )); then + echo "run_soak stop_after_secs reached elapsed_secs=$elapsed_secs stop_after_secs=$STOP_AFTER_SECS" + break + fi + fi if (( (run_forever == 1 || next_index < stop_index) && INTERVAL_SECS > 0 )); then sleep "$INTERVAL_SECS" fi diff --git a/src/bin/rpki_inter_rp_metrics.rs b/src/bin/rpki_inter_rp_metrics.rs new file mode 100644 index 0000000..a3ce4e7 --- /dev/null +++ b/src/bin/rpki_inter_rp_metrics.rs @@ -0,0 +1,6 @@ +fn main() { + if let Err(err) = rpki::tools::rpki_inter_rp_metrics::main_entry() { + eprintln!("{err}"); + std::process::exit(1); + } +} diff --git a/src/tools/mod.rs b/src/tools/mod.rs index c68c8fc..816e4bd 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -1,3 +1,4 @@ pub mod rpki_artifact_metrics; pub mod rpki_daemon; +pub mod rpki_inter_rp_metrics; pub mod sequence_triage_ccr_cir; diff --git a/src/tools/rpki_artifact_metrics.rs b/src/tools/rpki_artifact_metrics.rs index 5ed9b26..e8ccc21 100644 --- a/src/tools/rpki_artifact_metrics.rs +++ b/src/tools/rpki_artifact_metrics.rs @@ -60,19 +60,21 @@ fn real_main() -> Result<(), String> { thread::spawn(move || { loop { thread::sleep(Duration::from_secs(poll_secs)); - let next = match scan_run_root(&run_root, &instance) { - Ok(snapshot) => snapshot, - Err(err) => { - let mut previous = scanner.write().expect("metrics lock poisoned"); - previous - .service - .parse_errors - .push(format!("scan failed: {err}")); - previous.service.last_reload_success = false; - previous.service.last_scan_timestamp_seconds = unix_now_seconds(); - continue; - } - }; + let previous_snapshot = scanner.read().expect("metrics lock poisoned").clone(); + let next = + match scan_run_root_incremental(&run_root, &instance, Some(&previous_snapshot)) { + Ok(snapshot) => snapshot, + Err(err) => { + let mut previous = scanner.write().expect("metrics lock poisoned"); + previous + .service + .parse_errors + .push(format!("scan failed: {err}")); + previous.service.last_reload_success = false; + previous.service.last_scan_timestamp_seconds = unix_now_seconds(); + continue; + } + }; *scanner.write().expect("metrics lock poisoned") = next; } }); @@ -211,6 +213,7 @@ struct LatestRunMetrics { max_rss_bytes: Option, exit_code: Option, vrps: u64, + vrps_unique: Option, vaps: u64, publication_points: u64, warnings: u64, @@ -374,6 +377,14 @@ struct RunRecord { } fn scan_run_root(input_root: &Path, instance: &str) -> Result { + scan_run_root_incremental(input_root, instance, None) +} + +fn scan_run_root_incremental( + input_root: &Path, + instance: &str, + previous: Option<&MetricsSnapshot>, +) -> Result { let started = Instant::now(); let runs_root = resolve_runs_root(input_root); let mut snapshot = MetricsSnapshot { @@ -418,7 +429,12 @@ fn scan_run_root(input_root: &Path, instance: &str) -> Result Result) -> bool { + let Some(previous) = previous else { + return false; + }; + let Some(previous_latest) = previous.latest_run.as_ref() else { + return false; + }; + if previous_latest.run_dir != record.path.display().to_string() { + return false; + } + if previous_latest.status != record.status { + return false; + } + let summary = record.summary.as_ref(); + let meta = record.meta.as_ref(); + let finished_at = summary + .and_then(|v| json_str(v, &["finishedAtRfc3339Utc"])) + .or_else(|| meta.and_then(|v| json_str(v, &["completed_at_rfc3339_utc"]))); + let wall_seconds = summary.and_then(|v| json_u64(v, &["wallMs"])).unwrap_or(0) as f64 / 1000.0; + previous_latest.finished_at.as_deref() == finished_at + && (previous_latest.wall_seconds - wall_seconds).abs() < f64::EPSILON +} + +fn reuse_latest_artifact_metrics(previous: &MetricsSnapshot, snapshot: &mut MetricsSnapshot) { + snapshot.latest_run = previous.latest_run.clone(); + snapshot.repo_stats = previous.repo_stats.clone(); + snapshot.object_counts = previous.object_counts.clone(); + snapshot.large_pp_counts = previous.large_pp_counts.clone(); + snapshot.pp_sync_histograms = previous.pp_sync_histograms.clone(); + snapshot.top_repos_by_sync_duration = previous.top_repos_by_sync_duration.clone(); + snapshot.top_pp_by_object_count = previous.top_pp_by_object_count.clone(); + snapshot.top_pp_by_sync_duration = previous.top_pp_by_sync_duration.clone(); + snapshot.cir = previous.cir.clone(); + snapshot.ccr = previous.ccr.clone(); +} + fn resolve_runs_root(input_root: &Path) -> PathBuf { let runs = input_root.join("runs"); if runs.is_dir() { @@ -598,11 +650,65 @@ fn build_latest_metrics(record: &RunRecord, snapshot: &mut MetricsSnapshot) { } parse_report(&record.path.join("report.json"), snapshot, &mut latest); + latest.vrps_unique = count_vrp_csv_unique_keys_opt( + &record.path.join("vrps.csv"), + &mut snapshot.service.parse_errors, + ); parse_cir(&record.path.join("input.cir"), snapshot); parse_ccr(&record.path.join("result.ccr"), snapshot); snapshot.latest_run = Some(latest); } +fn count_vrp_csv_unique_keys_opt(path: &Path, errors: &mut Vec) -> Option { + if !path.exists() { + return None; + } + match count_vrp_csv_unique_keys(path) { + Ok(count) => Some(count), + Err(err) => { + errors.push(err); + None + } + } +} + +fn count_vrp_csv_unique_keys(path: &Path) -> Result { + let content = fs::read_to_string(path) + .map_err(|e| format!("read VRP CSV failed: {}: {e}", path.display()))?; + let mut unique = BTreeSet::new(); + for (index, line) in data_csv_lines(&content).enumerate() { + if index == 0 { + continue; + } + let columns = split_csv_simple(line); + if columns.len() < 3 { + return Err(format!( + "invalid VRP CSV row in {}: expected at least 3 columns, got {}", + path.display(), + columns.len() + )); + } + unique.insert(( + columns[0].to_string(), + columns[1].to_string(), + columns[2].to_string(), + )); + } + Ok(unique.len() as u64) +} + +fn data_csv_lines(content: &str) -> impl Iterator { + content + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .filter(|line| !line.starts_with('#')) +} + +fn split_csv_simple(line: &str) -> Vec<&str> { + line.split(',').map(str::trim).collect() +} + fn parse_report(path: &Path, snapshot: &mut MetricsSnapshot, latest: &mut LatestRunMetrics) { if !path.exists() { return; @@ -1164,6 +1270,14 @@ fn render_latest_metrics(writer: &mut PromWriter<'_>, instance: &str, latest: &L &[label("instance", instance), label("kind", "total")], latest.vrps as f64, ); + if let Some(value) = latest.vrps_unique { + writer.gauge( + "ours_rp_vrps", + "Latest run VRP count", + &[label("instance", instance), label("kind", "unique")], + value as f64, + ); + } writer.gauge( "ours_rp_vaps", "Latest run VAP/ASPA count", @@ -2013,14 +2127,22 @@ mod tests { .expect("meta"); fs::write( run.join("run-summary.json"), - r#"{"runSeq":1,"runId":"run_0001","runDir":"RUN","startedAtRfc3339Utc":"2026-05-25T00:00:00Z","finishedAtRfc3339Utc":"2026-05-25T00:00:10Z","wallMs":10000,"status":"success","exitCode":0,"processMetrics":{"userSeconds":2.5,"systemSeconds":1.5,"cpuPercent":40,"maxRssKb":1000},"stageTiming":{"validation_ms":7000,"total_ms":9000,"download_event_count":2,"download_bytes_total":1234},"reportCounts":{"vrps":2,"aspas":1,"publicationPoints":2,"warnings":0},"repoSyncStats":{"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}},"pathStats":[{"label":"work-db","totalSizeBytes":99,"fileCount":2,"dirCount":1}],"artifacts":[{"path":"report.json","sizeBytes":10}]}"#, + r#"{"runSeq":1,"runId":"run_0001","runDir":"RUN","startedAtRfc3339Utc":"2026-05-25T00:00:00Z","finishedAtRfc3339Utc":"2026-05-25T00:00:10Z","wallMs":10000,"status":"success","exitCode":0,"processMetrics":{"userSeconds":2.5,"systemSeconds":1.5,"cpuPercent":40,"maxRssKb":1000},"stageTiming":{"validation_ms":7000,"total_ms":9000,"download_event_count":2,"download_bytes_total":1234},"reportCounts":{"vrps":3,"aspas":1,"publicationPoints":2,"warnings":0},"repoSyncStats":{"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}},"pathStats":[{"label":"work-db","totalSizeBytes":99,"fileCount":2,"dirCount":1}],"artifacts":[{"path":"report.json","sizeBytes":10}]}"#, ) .expect("summary"); fs::write(run.join("process-time.txt"), "time").expect("time"); fs::write(run.join("stage-timing.json"), "{}").expect("stage"); + fs::write( + run.join("vrps.csv"), + "ASN,IP Prefix,Max Length,Trust Anchor,Expires\n\ + AS64496,192.0.2.0/24,24,ta,2026-05-25T01:00:00Z\n\ + AS64496,192.0.2.0/24,24,ta,2026-05-25T01:00:00Z\n\ + AS64497,2001:db8::/32,48,ta,2026-05-25T01:00:00Z\n", + ) + .expect("vrps"); fs::write( run.join("report.json"), - r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"downloads":[{"kind":"rrdp_notification","uri":"https://repo.example/notify.xml","success":true,"duration_ms":100,"bytes":111},{"kind":"rrdp_delta","uri":"https://repo.example/session/1/delta.xml","success":true,"duration_ms":200,"bytes":222}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#, + r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{},{}],"aspas":[{}],"downloads":[{"kind":"rrdp_notification","uri":"https://repo.example/notify.xml","success":true,"duration_ms":100,"bytes":111},{"kind":"rrdp_delta","uri":"https://repo.example/session/1/delta.xml","success":true,"duration_ms":200,"bytes":222}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#, ) .expect("report"); fs::write(run.join("input.cir"), sample_cir()).expect("cir"); @@ -2041,8 +2163,11 @@ mod tests { assert!(metrics.contains("ours_rp_large_publication_points")); assert!(metrics.contains("ours_rp_cir_objects")); assert!(metrics.contains("ours_rp_ccr_state_items")); + assert!(metrics.contains(r#"ours_rp_vrps{instance="test",kind="total"} 3"#)); + assert!(metrics.contains(r#"ours_rp_vrps{instance="test",kind="unique"} 2"#)); let status = render_status_json(&snapshot).expect("status"); assert!(status.contains("topPublicationPointsByObjectCount")); + assert!(status.contains(r#""vrpsUnique": 2"#)); } #[test] diff --git a/src/tools/rpki_inter_rp_metrics.rs b/src/tools/rpki_inter_rp_metrics.rs new file mode 100644 index 0000000..c5c5b89 --- /dev/null +++ b/src/tools/rpki_inter_rp_metrics.rs @@ -0,0 +1,1381 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::fs; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use crate::ccr::{CcrStateDigestComparison, compare_state_digests, decode_state_digest_summary}; +use serde::Serialize; +use serde_json::{Value, json}; + +const RPS: [&str; 3] = ["ours-rp", "routinator", "rpki-client"]; +const CCR_STATES: [&str; 5] = ["mfts", "vrps", "vaps", "tas", "rks"]; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct Args { + ours_run_root: PathBuf, + peer_root: PathBuf, + listen: String, + poll_secs: u64, + instance: String, + once: bool, + out_metrics: Option, + out_status: Option, +} + +fn usage() -> &'static str { + "Usage: rpki_inter_rp_metrics --ours-run-root --peer-root [--listen ] [--poll-secs ] [--instance ] [--once] [--out-metrics ] [--out-status ]" +} + +pub fn main_entry() -> Result<(), String> { + real_main() +} + +fn real_main() -> Result<(), String> { + let args = parse_args(&std::env::args().collect::>())?; + if args.once { + let snapshot = scan(&args)?; + let metrics = render_metrics(&snapshot); + let status = render_status_json(&snapshot)?; + if let Some(path) = args.out_metrics.as_ref() { + write_file(path, metrics.as_bytes())?; + } else { + print!("{metrics}"); + } + if let Some(path) = args.out_status.as_ref() { + write_file(path, status.as_bytes())?; + } + return Ok(()); + } + + let shared = Arc::new(RwLock::new(scan(&args)?)); + let scanner = Arc::clone(&shared); + let scan_args = args.clone(); + thread::spawn(move || { + let poll_secs = scan_args.poll_secs.max(1); + loop { + thread::sleep(Duration::from_secs(poll_secs)); + match scan(&scan_args) { + Ok(next) => *scanner.write().expect("metrics lock poisoned") = next, + Err(err) => { + let mut previous = scanner.write().expect("metrics lock poisoned"); + previous.service.last_reload_success = false; + previous.service.last_scan_timestamp_seconds = unix_now_seconds(); + previous + .service + .parse_errors + .push(format!("scan failed: {err}")); + } + } + } + }); + + serve_http(&args.listen, shared) +} + +fn parse_args(argv: &[String]) -> Result { + let mut ours_run_root = None; + let mut peer_root = None; + let mut listen = "127.0.0.1:9557".to_string(); + let mut poll_secs = 30u64; + let mut instance = "inter-rp".to_string(); + let mut once = false; + let mut out_metrics = None; + let mut out_status = None; + let mut index = 1usize; + while index < argv.len() { + match argv[index].as_str() { + "--ours-run-root" => { + index += 1; + ours_run_root = Some(PathBuf::from(value_at(argv, index, "--ours-run-root")?)); + } + "--peer-root" => { + index += 1; + peer_root = Some(PathBuf::from(value_at(argv, index, "--peer-root")?)); + } + "--listen" => { + index += 1; + listen = value_at(argv, index, "--listen")?.to_string(); + } + "--poll-secs" => { + index += 1; + let value = value_at(argv, index, "--poll-secs")?; + poll_secs = value + .parse::() + .map_err(|_| format!("invalid --poll-secs: {value}"))?; + } + "--instance" => { + index += 1; + instance = value_at(argv, index, "--instance")?.to_string(); + } + "--once" => once = true, + "--out-metrics" => { + index += 1; + out_metrics = Some(PathBuf::from(value_at(argv, index, "--out-metrics")?)); + } + "--out-status" => { + index += 1; + out_status = Some(PathBuf::from(value_at(argv, index, "--out-status")?)); + } + "-h" | "--help" => return Err(usage().to_string()), + other => return Err(format!("unknown argument: {other}\n{}", usage())), + } + index += 1; + } + Ok(Args { + ours_run_root: ours_run_root + .ok_or_else(|| format!("--ours-run-root is required\n{}", usage()))?, + peer_root: peer_root.ok_or_else(|| format!("--peer-root is required\n{}", usage()))?, + listen, + poll_secs, + instance, + once, + out_metrics, + out_status, + }) +} + +fn value_at<'a>(argv: &'a [String], index: usize, flag: &str) -> Result<&'a str, String> { + argv.get(index) + .map(|s| s.as_str()) + .ok_or_else(|| format!("{flag} requires a value")) +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct Snapshot { + instance: String, + service: ServiceMetrics, + sync: SyncMetrics, + samples: BTreeMap, + ccr_compare: Option, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct ServiceMetrics { + last_scan_timestamp_seconds: f64, + last_scan_duration_seconds: f64, + last_reload_success: bool, + parse_errors: Vec, + ours_run_root: String, + peer_root: String, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct SyncMetrics { + present: bool, + success: bool, + last_sync_timestamp_seconds: Option, + age_seconds: Option, + remote_host: Option, + message: Option, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct RpSample { + rp: String, + present: bool, + source_dir: Option, + run_id: Option, + run_seq: Option, + success: bool, + wall_seconds: Option, + finish_timestamp_seconds: Option, + artifact_age_seconds: Option, + max_rss_bytes: BTreeMap, + vrps: Option, + vaps: Option, + ccr_path: Option, + ccr: Option, + parse_errors: Vec, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct CcrSampleMetrics { + version: u32, + hash_alg_oid: String, + state_present: BTreeMap, + state_hashes: BTreeMap, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct CcrCompareMetrics { + available: bool, + match_all: bool, + states: BTreeMap, + error: Option, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct CcrStateMetrics { + left_present: bool, + right_present: bool, + matches: bool, + left_hash_hex: Option, + right_hash_hex: Option, +} + +fn scan(args: &Args) -> Result { + let start = Instant::now(); + let now = unix_now_seconds(); + let mut parse_errors = Vec::new(); + let mut samples = BTreeMap::new(); + + let ours = scan_ours(&args.ours_run_root, now); + parse_errors.extend( + ours.parse_errors + .iter() + .map(|err| format!("ours-rp: {err}")), + ); + samples.insert("ours-rp".to_string(), ours); + + for rp in ["routinator", "rpki-client"] { + let peer = scan_peer(&args.peer_root, rp, now); + parse_errors.extend(peer.parse_errors.iter().map(|err| format!("{rp}: {err}"))); + samples.insert(rp.to_string(), peer); + } + + let sync = scan_sync_status(&args.peer_root, now); + if let Some(message) = sync.message.as_ref().filter(|_| !sync.success) { + parse_errors.push(format!("sync: {message}")); + } + + let ccr_compare = build_ccr_compare(samples.get("ours-rp"), samples.get("rpki-client")); + if let Some(compare) = ccr_compare + .as_ref() + .and_then(|compare| compare.error.as_ref()) + { + parse_errors.push(format!("ccr compare: {compare}")); + } + + Ok(Snapshot { + instance: args.instance.clone(), + service: ServiceMetrics { + last_scan_timestamp_seconds: now, + last_scan_duration_seconds: start.elapsed().as_secs_f64(), + last_reload_success: parse_errors.is_empty(), + parse_errors, + ours_run_root: args.ours_run_root.display().to_string(), + peer_root: args.peer_root.display().to_string(), + }, + sync, + samples, + ccr_compare, + }) +} + +fn scan_ours(run_root: &Path, now: f64) -> RpSample { + let mut sample = RpSample { + rp: "ours-rp".to_string(), + ..RpSample::default() + }; + let runs_root = if run_root.join("runs").is_dir() { + run_root.join("runs") + } else { + run_root.to_path_buf() + }; + let Some(run_dir) = latest_run_with_summary(&runs_root) else { + sample.parse_errors.push(format!( + "no run-summary.json found under {}", + runs_root.display() + )); + return sample; + }; + sample.present = true; + sample.source_dir = Some(run_dir.display().to_string()); + + let summary_path = run_dir.join("run-summary.json"); + let value = match read_json(&summary_path) { + Ok(value) => value, + Err(err) => { + sample.parse_errors.push(err); + return sample; + } + }; + sample.run_id = json_str(&value, &["runId"]) + .map(str::to_string) + .or_else(|| { + run_dir + .file_name() + .and_then(|v| v.to_str()) + .map(str::to_string) + }); + sample.run_seq = json_u64(&value, &["runSeq"]).or_else(|| run_index_from_path(&run_dir)); + let status = json_str(&value, &["status"]).unwrap_or("unknown"); + sample.success = status == "success" && json_i64(&value, &["exitCode"]).unwrap_or(0) == 0; + sample.wall_seconds = json_f64(&value, &["wallMs"]).map(|v| v / 1000.0); + sample.finish_timestamp_seconds = + json_str(&value, &["finishedAtRfc3339Utc"]).and_then(parse_rfc3339_to_unix); + sample.artifact_age_seconds = sample + .finish_timestamp_seconds + .map(|finished| (now - finished).max(0.0)); + if let Some(max_rss_kb) = json_u64(&value, &["processMetrics", "maxRssKb"]) { + sample + .max_rss_bytes + .insert("parent".to_string(), max_rss_kb.saturating_mul(1024)); + sample.max_rss_bytes.insert( + "aggregate_peak".to_string(), + max_rss_kb.saturating_mul(1024), + ); + } + sample.vrps = + count_vrp_csv_unique_keys_opt(&run_dir.join("vrps.csv"), &mut sample.parse_errors) + .or_else(|| json_u64(&value, &["reportCounts", "vrps"])); + sample.vaps = + count_vap_csv_unique_keys_opt(&run_dir.join("vaps.csv"), &mut sample.parse_errors) + .or_else(|| json_u64(&value, &["reportCounts", "aspas"])); + + let ccr_path = run_dir.join("result.ccr"); + load_ccr_sample(&ccr_path, &mut sample); + sample +} + +fn scan_peer(peer_root: &Path, rp: &str, now: f64) -> RpSample { + let mut sample = RpSample { + rp: rp.to_string(), + ..RpSample::default() + }; + let rp_root = peer_root.join(rp); + let latest = rp_root.join("latest"); + if !latest.exists() { + sample + .parse_errors + .push(format!("missing latest directory: {}", latest.display())); + return sample; + } + sample.present = true; + sample.source_dir = Some(latest.display().to_string()); + + let meta_path = ["run-meta.json", "meta.json"] + .into_iter() + .map(|name| latest.join(name)) + .find(|path| path.exists()); + let mut meta = Value::Null; + if let Some(path) = meta_path.as_ref() { + match read_json(path) { + Ok(value) => meta = value, + Err(err) => sample.parse_errors.push(err), + } + } else { + sample + .parse_errors + .push(format!("missing run-meta.json under {}", latest.display())); + } + + sample.run_id = json_str(&meta, &["runId"]) + .map(str::to_string) + .or_else(|| json_str(&meta, &["run_id"]).map(str::to_string)); + sample.run_seq = json_u64(&meta, &["runSeq"]).or_else(|| json_u64(&meta, &["run_seq"])); + sample.success = json_bool(&meta, &["success"]).unwrap_or(false); + sample.wall_seconds = json_f64(&meta, &["wallSeconds"]) + .or_else(|| json_f64(&meta, &["wall_seconds"])) + .or_else(|| json_f64(&meta, &["wallMs"]).map(|v| v / 1000.0)) + .or_else(|| json_f64(&meta, &["wall_ms"]).map(|v| v / 1000.0)); + sample.finish_timestamp_seconds = json_str(&meta, &["finishedAtRfc3339Utc"]) + .or_else(|| json_str(&meta, &["finished_at_rfc3339_utc"])) + .and_then(parse_rfc3339_to_unix); + sample.artifact_age_seconds = sample + .finish_timestamp_seconds + .map(|finished| (now - finished).max(0.0)); + read_rss_metric(&meta, &mut sample); + + let vrps_path = artifact_path(&latest, &meta, "vrpsCsv", "vrps.csv"); + let vaps_path = artifact_path(&latest, &meta, "vapsCsv", "vaps.csv"); + let ccr_path = artifact_path(&latest, &meta, "ccr", "result.ccr"); + load_ccr_sample(&ccr_path, &mut sample); + + sample.vrps = count_vrp_csv_unique_keys_opt(&vrps_path, &mut sample.parse_errors) + .or_else(|| json_u64(&meta, &["counts", "vrps"])); + sample.vaps = count_vap_csv_unique_keys_opt(&vaps_path, &mut sample.parse_errors) + .or_else(|| json_u64(&meta, &["counts", "vaps"])) + .or_else(|| json_u64(&meta, &["counts", "aspas"])); + + sample +} + +fn read_rss_metric(meta: &Value, sample: &mut RpSample) { + let fields = [ + ("parent", &["maxRssKb", "parent"][..]), + ("child_max", &["maxRssKb", "childMax"][..]), + ("aggregate_peak", &["maxRssKb", "aggregatePeak"][..]), + ]; + for (name, path) in fields { + if let Some(value) = json_u64(meta, path) { + sample + .max_rss_bytes + .insert(name.to_string(), value.saturating_mul(1024)); + } + } + if sample.max_rss_bytes.is_empty() { + if let Some(value) = json_u64(meta, &["maxRssKb"]) { + sample + .max_rss_bytes + .insert("parent".to_string(), value.saturating_mul(1024)); + sample + .max_rss_bytes + .insert("aggregate_peak".to_string(), value.saturating_mul(1024)); + } + } +} + +fn artifact_path(run_dir: &Path, meta: &Value, key: &str, default_name: &str) -> PathBuf { + json_str(meta, &["artifacts", key]) + .map(PathBuf::from) + .filter(|path| !path.as_os_str().is_empty()) + .map(|path| { + if path.is_absolute() { + path + } else { + run_dir.join(path) + } + }) + .unwrap_or_else(|| run_dir.join(default_name)) +} + +fn load_ccr_sample(ccr_path: &Path, sample: &mut RpSample) { + if !ccr_path.exists() { + return; + } + sample.ccr_path = Some(ccr_path.display().to_string()); + let der = match fs::read(ccr_path) { + Ok(der) => der, + Err(err) => { + sample + .parse_errors + .push(format!("read CCR failed: {}: {err}", ccr_path.display())); + return; + } + }; + match decode_state_digest_summary(&der) { + Ok(summary) => sample.ccr = Some(ccr_summary_to_metrics(summary)), + Err(err) => sample.parse_errors.push(format!( + "decode CCR digest failed: {}: {err}", + ccr_path.display() + )), + } +} + +fn ccr_summary_to_metrics(summary: crate::ccr::CcrStateDigestSummary) -> CcrSampleMetrics { + let mut state_present = BTreeMap::new(); + let mut state_hashes = BTreeMap::new(); + for (name, hash) in [ + ("mfts", summary.mfts), + ("vrps", summary.vrps), + ("vaps", summary.vaps), + ("tas", summary.tas), + ("rks", summary.rks), + ] { + state_present.insert(name.to_string(), hash.is_some()); + if let Some(hash) = hash { + state_hashes.insert(name.to_string(), hex::encode(hash)); + } + } + CcrSampleMetrics { + version: summary.version, + hash_alg_oid: summary.hash_alg_oid, + state_present, + state_hashes, + } +} + +fn scan_sync_status(peer_root: &Path, now: f64) -> SyncMetrics { + let path = peer_root.join("sync-status.json"); + if !path.exists() { + return SyncMetrics { + present: false, + success: false, + message: Some(format!("missing {}", path.display())), + ..SyncMetrics::default() + }; + } + let value = match read_json(&path) { + Ok(value) => value, + Err(err) => { + return SyncMetrics { + present: true, + success: false, + message: Some(err), + ..SyncMetrics::default() + }; + } + }; + let timestamp = json_str(&value, &["lastSyncAtRfc3339Utc"]) + .or_else(|| json_str(&value, &["lastSyncTimestampRfc3339Utc"])) + .or_else(|| json_str(&value, &["updatedAtRfc3339Utc"])) + .and_then(parse_rfc3339_to_unix); + SyncMetrics { + present: true, + success: json_bool(&value, &["success"]) + .or_else(|| json_bool(&value, &["lastSyncSuccess"])) + .unwrap_or(false), + last_sync_timestamp_seconds: timestamp, + age_seconds: timestamp.map(|ts| (now - ts).max(0.0)), + remote_host: json_str(&value, &["remoteHost"]).map(str::to_string), + message: json_str(&value, &["message"]) + .or_else(|| json_str(&value, &["error"])) + .map(str::to_string), + } +} + +fn build_ccr_compare( + ours: Option<&RpSample>, + rpki_client: Option<&RpSample>, +) -> Option { + let left = ours?.ccr_path.as_ref()?; + let right = rpki_client?.ccr_path.as_ref()?; + let left = match fs::read(left) { + Ok(value) => value, + Err(err) => { + return Some(CcrCompareMetrics { + available: false, + match_all: false, + states: BTreeMap::new(), + error: Some(format!("read ours CCR failed: {err}")), + }); + } + }; + let right = match fs::read(right) { + Ok(value) => value, + Err(err) => { + return Some(CcrCompareMetrics { + available: false, + match_all: false, + states: BTreeMap::new(), + error: Some(format!("read rpki-client CCR failed: {err}")), + }); + } + }; + match compare_state_digests(&left, &right) { + Ok(comparison) => Some(compare_to_metrics(comparison)), + Err(err) => Some(CcrCompareMetrics { + available: false, + match_all: false, + states: BTreeMap::new(), + error: Some(err.to_string()), + }), + } +} + +fn compare_to_metrics(comparison: CcrStateDigestComparison) -> CcrCompareMetrics { + let states = comparison + .states + .iter() + .map(|state| { + ( + state.name.to_string(), + CcrStateMetrics { + left_present: state.ours_present, + right_present: state.peer_present, + matches: state.matches, + left_hash_hex: state.ours_hash_hex.clone(), + right_hash_hex: state.peer_hash_hex.clone(), + }, + ) + }) + .collect(); + CcrCompareMetrics { + available: true, + match_all: comparison.matches(), + states, + error: None, + } +} + +fn latest_run_with_summary(runs_root: &Path) -> Option { + let mut candidates = fs::read_dir(runs_root) + .ok()? + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|path| path.is_dir() && path.join("run-summary.json").exists()) + .filter_map(|path| run_index_from_path(&path).map(|index| (index, path))) + .collect::>(); + candidates.sort_by_key(|(index, _)| *index); + candidates.pop().map(|(_, path)| path) +} + +fn count_vrp_csv_unique_keys_opt(path: &Path, errors: &mut Vec) -> Option { + if !path.exists() { + return None; + } + match count_vrp_csv_unique_keys(path) { + Ok(count) => Some(count), + Err(err) => { + errors.push(err); + None + } + } +} + +fn count_vap_csv_unique_keys_opt(path: &Path, errors: &mut Vec) -> Option { + if !path.exists() { + return None; + } + match count_vap_csv_unique_keys(path) { + Ok(count) => Some(count), + Err(err) => { + errors.push(err); + None + } + } +} + +fn count_vrp_csv_unique_keys(path: &Path) -> Result { + let content = fs::read_to_string(path) + .map_err(|e| format!("read VRP CSV failed: {}: {e}", path.display()))?; + let mut unique = BTreeSet::new(); + for (index, line) in data_csv_lines(&content).enumerate() { + if index == 0 { + continue; + } + let columns = split_csv_simple(line); + if columns.len() < 3 { + return Err(format!( + "invalid VRP CSV row in {}: expected at least 3 columns, got {}", + path.display(), + columns.len() + )); + } + unique.insert(( + columns[0].to_string(), + columns[1].to_string(), + columns[2].to_string(), + )); + } + Ok(unique.len() as u64) +} + +fn count_vap_csv_unique_keys(path: &Path) -> Result { + let content = fs::read_to_string(path) + .map_err(|e| format!("read VAP CSV failed: {}: {e}", path.display()))?; + let mut unique = BTreeSet::new(); + for (index, line) in data_csv_lines(&content).enumerate() { + if index == 0 { + continue; + } + let columns = split_csv_simple(line); + if columns.len() < 2 { + return Err(format!( + "invalid VAP CSV row in {}: expected at least 2 columns, got {}", + path.display(), + columns.len() + )); + } + unique.insert((columns[0].to_string(), columns[1].to_string())); + } + Ok(unique.len() as u64) +} + +#[cfg(test)] +fn count_csv_rows(path: &Path) -> Result { + let content = fs::read_to_string(path) + .map_err(|e| format!("read CSV failed: {}: {e}", path.display()))?; + let count = data_csv_lines(&content).skip(1).count(); + Ok(count as u64) +} + +fn data_csv_lines(content: &str) -> impl Iterator { + content + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .filter(|line| !line.starts_with('#')) +} + +fn split_csv_simple(line: &str) -> Vec<&str> { + line.split(',').map(str::trim).collect() +} + +fn render_metrics(snapshot: &Snapshot) -> String { + let mut out = String::new(); + let mut writer = PromWriter::new(&mut out); + let instance = snapshot.instance.as_str(); + writer.gauge( + "inter_rp_service_up", + "Inter-RP metrics service is up", + &[label("instance", instance)], + 1.0, + ); + writer.gauge( + "inter_rp_service_last_scan_timestamp_seconds", + "Last inter-RP scan timestamp", + &[label("instance", instance)], + snapshot.service.last_scan_timestamp_seconds, + ); + writer.gauge( + "inter_rp_service_last_scan_duration_seconds", + "Last inter-RP scan duration", + &[label("instance", instance)], + snapshot.service.last_scan_duration_seconds, + ); + writer.gauge( + "inter_rp_service_last_reload_success", + "Last inter-RP reload success", + &[label("instance", instance)], + bool_value(snapshot.service.last_reload_success), + ); + writer.gauge( + "inter_rp_parse_errors", + "Current inter-RP parse error count", + &[label("instance", instance)], + snapshot.service.parse_errors.len() as f64, + ); + writer.gauge( + "inter_rp_sync_last_success", + "Last remote200 artifact sync success", + &[label("instance", instance)], + bool_value(snapshot.sync.success), + ); + if let Some(ts) = snapshot.sync.last_sync_timestamp_seconds { + writer.gauge( + "inter_rp_sync_last_timestamp_seconds", + "Last remote200 artifact sync timestamp", + &[label("instance", instance)], + ts, + ); + } + if let Some(age) = snapshot.sync.age_seconds { + writer.gauge( + "inter_rp_sync_age_seconds", + "Age of latest remote200 artifact sync", + &[label("instance", instance)], + age, + ); + } + + for rp in RPS { + let sample = snapshot.samples.get(rp); + render_sample_metrics(&mut writer, instance, rp, sample); + } + render_count_diffs(&mut writer, instance, &snapshot.samples); + render_ccr_compare_metrics(&mut writer, instance, snapshot.ccr_compare.as_ref()); + out +} + +fn render_sample_metrics( + writer: &mut PromWriter<'_>, + instance: &str, + rp: &str, + sample: Option<&RpSample>, +) { + let present = sample.map(|sample| sample.present).unwrap_or(false); + let success = sample.map(|sample| sample.success).unwrap_or(false); + writer.gauge( + "inter_rp_run_present", + "Whether latest RP run artifact is present", + &[label("instance", instance), label("rp", rp)], + bool_value(present), + ); + writer.gauge( + "inter_rp_run_success", + "Whether latest RP run succeeded", + &[label("instance", instance), label("rp", rp)], + bool_value(success), + ); + if let Some(sample) = sample { + if let Some(seq) = sample.run_seq { + writer.gauge( + "inter_rp_run_seq", + "Latest RP run sequence", + &[label("instance", instance), label("rp", rp)], + seq as f64, + ); + } + if let Some(wall) = sample.wall_seconds { + writer.gauge( + "inter_rp_run_wall_seconds", + "Latest RP run wall clock seconds", + &[label("instance", instance), label("rp", rp)], + wall, + ); + } + if let Some(finish) = sample.finish_timestamp_seconds { + writer.gauge( + "inter_rp_run_finish_timestamp_seconds", + "Latest RP run finish timestamp", + &[label("instance", instance), label("rp", rp)], + finish, + ); + } + if let Some(age) = sample.artifact_age_seconds { + writer.gauge( + "inter_rp_artifact_age_seconds", + "Latest RP artifact age seconds", + &[label("instance", instance), label("rp", rp)], + age, + ); + } + for (kind, bytes) in &sample.max_rss_bytes { + writer.gauge( + "inter_rp_run_max_rss_bytes", + "Latest RP run max RSS bytes", + &[ + label("instance", instance), + label("rp", rp), + label("kind", kind), + ], + *bytes as f64, + ); + } + if let Some(vrps) = sample.vrps { + writer.gauge( + "inter_rp_vrps", + "Latest RP VRP count", + &[label("instance", instance), label("rp", rp)], + vrps as f64, + ); + } + if let Some(vaps) = sample.vaps { + writer.gauge( + "inter_rp_vaps", + "Latest RP VAP/ASPA count", + &[label("instance", instance), label("rp", rp)], + vaps as f64, + ); + } + writer.gauge( + "inter_rp_sample_parse_errors", + "Current parse error count for an RP sample", + &[label("instance", instance), label("rp", rp)], + sample.parse_errors.len() as f64, + ); + if let Some(ccr) = sample.ccr.as_ref() { + writer.gauge( + "inter_rp_ccr_version", + "CCR version", + &[label("instance", instance), label("rp", rp)], + ccr.version as f64, + ); + for state in CCR_STATES { + writer.gauge( + "inter_rp_ccr_digest_present", + "CCR state digest presence", + &[ + label("instance", instance), + label("rp", rp), + label("state", state), + ], + bool_value(ccr.state_present.get(state).copied().unwrap_or(false)), + ); + } + } + } +} + +fn render_count_diffs( + writer: &mut PromWriter<'_>, + instance: &str, + samples: &BTreeMap, +) { + for (left_index, left) in RPS.iter().enumerate() { + for right in RPS.iter().skip(left_index + 1) { + let left_sample = samples.get(*left); + let right_sample = samples.get(*right); + if let (Some(left_vrps), Some(right_vrps)) = ( + left_sample.and_then(|sample| sample.vrps), + right_sample.and_then(|sample| sample.vrps), + ) { + writer.gauge( + "inter_rp_vrps_diff", + "Latest VRP count difference between two RPs; value is left minus right", + &[ + label("instance", instance), + label("left", left), + label("right", right), + ], + left_vrps as f64 - right_vrps as f64, + ); + } + if let (Some(left_vaps), Some(right_vaps)) = ( + left_sample.and_then(|sample| sample.vaps), + right_sample.and_then(|sample| sample.vaps), + ) { + writer.gauge( + "inter_rp_vaps_diff", + "Latest VAP/ASPA count difference between two RPs; value is left minus right", + &[ + label("instance", instance), + label("left", left), + label("right", right), + ], + left_vaps as f64 - right_vaps as f64, + ); + } + } + } +} + +fn render_ccr_compare_metrics( + writer: &mut PromWriter<'_>, + instance: &str, + compare: Option<&CcrCompareMetrics>, +) { + let available = compare.map(|compare| compare.available).unwrap_or(false); + let match_all = compare.map(|compare| compare.match_all).unwrap_or(false); + writer.gauge( + "inter_rp_ccr_digest_compare_available", + "Whether ours-rp and rpki-client CCR digest comparison is available", + &[ + label("instance", instance), + label("left", "ours-rp"), + label("right", "rpki-client"), + ], + bool_value(available), + ); + writer.gauge( + "inter_rp_ccr_digest_match", + "Whether CCR state digest matches between ours-rp and rpki-client", + &[ + label("instance", instance), + label("left", "ours-rp"), + label("right", "rpki-client"), + label("state", "overall"), + ], + bool_value(match_all), + ); + if let Some(compare) = compare { + for state in CCR_STATES { + let state_metrics = compare.states.get(state); + writer.gauge( + "inter_rp_ccr_digest_match", + "Whether CCR state digest matches between ours-rp and rpki-client", + &[ + label("instance", instance), + label("left", "ours-rp"), + label("right", "rpki-client"), + label("state", state), + ], + bool_value(state_metrics.map(|state| state.matches).unwrap_or(false)), + ); + } + } +} + +fn render_status_json(snapshot: &Snapshot) -> Result { + serde_json::to_string_pretty(&json!({ + "schemaVersion": 1, + "generatedBy": "rpki_inter_rp_metrics", + "instance": snapshot.instance, + "service": snapshot.service, + "sync": snapshot.sync, + "samples": snapshot.samples, + "ccrCompare": snapshot.ccr_compare, + })) + .map_err(|e| e.to_string()) +} + +#[derive(Clone, Debug)] +struct Label<'a> { + key: &'a str, + value: &'a str, +} + +fn label<'a>(key: &'a str, value: &'a str) -> Label<'a> { + Label { key, value } +} + +struct PromWriter<'a> { + out: &'a mut String, + emitted_headers: BTreeSet, +} + +impl<'a> PromWriter<'a> { + fn new(out: &'a mut String) -> Self { + Self { + out, + emitted_headers: BTreeSet::new(), + } + } + + fn gauge(&mut self, name: &str, help: &str, labels: &[Label<'_>], value: f64) { + self.header(name, help, "gauge"); + self.out.push_str(name); + write_labels(self.out, labels); + self.out.push(' '); + self.out.push_str(&format_prom_value(value)); + self.out.push('\n'); + } + + fn header(&mut self, name: &str, help: &str, metric_type: &str) { + if self.emitted_headers.insert(name.to_string()) { + self.out.push_str("# HELP "); + self.out.push_str(name); + self.out.push(' '); + self.out.push_str(&escape_help(help)); + self.out.push('\n'); + self.out.push_str("# TYPE "); + self.out.push_str(name); + self.out.push(' '); + self.out.push_str(metric_type); + self.out.push('\n'); + } + } +} + +fn write_labels(out: &mut String, labels: &[Label<'_>]) { + if labels.is_empty() { + return; + } + out.push('{'); + for (index, label) in labels.iter().enumerate() { + if index > 0 { + out.push(','); + } + out.push_str(label.key); + out.push_str("=\""); + out.push_str(&escape_label(label.value)); + out.push('"'); + } + out.push('}'); +} + +fn serve_http(listen: &str, shared: Arc>) -> Result<(), String> { + let listener = TcpListener::bind(listen).map_err(|e| format!("bind failed: {listen}: {e}"))?; + for stream in listener.incoming() { + match stream { + Ok(mut stream) => { + let snapshot = shared.read().expect("metrics lock poisoned").clone(); + if let Err(err) = handle_http_stream(&mut stream, &snapshot) { + eprintln!("http request failed: {err}"); + } + } + Err(err) => eprintln!("accept failed: {err}"), + } + } + Ok(()) +} + +fn handle_http_stream(stream: &mut TcpStream, snapshot: &Snapshot) -> Result<(), String> { + let mut buf = [0u8; 4096]; + let len = stream.read(&mut buf).map_err(|e| e.to_string())?; + let req = String::from_utf8_lossy(&buf[..len]); + let path = req + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/"); + match path { + "/metrics" => write_http_response( + stream, + "200 OK", + "text/plain; version=0.0.4", + &render_metrics(snapshot), + ), + "/status" => write_http_response( + stream, + "200 OK", + "application/json", + &render_status_json(snapshot)?, + ), + "/healthz" => write_http_response(stream, "200 OK", "text/plain", "ok\n"), + _ => write_http_response(stream, "404 Not Found", "text/plain", "not found\n"), + } +} + +fn write_http_response( + stream: &mut TcpStream, + status: &str, + content_type: &str, + body: &str, +) -> Result<(), String> { + let header = format!( + "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + stream + .write_all(header.as_bytes()) + .and_then(|_| stream.write_all(body.as_bytes())) + .map_err(|e| e.to_string()) +} + +fn read_json(path: &Path) -> Result { + let bytes = fs::read(path).map_err(|e| format!("read json failed: {}: {e}", path.display()))?; + serde_json::from_slice(&bytes) + .map_err(|e| format!("parse json failed: {}: {e}", path.display())) +} + +fn write_file(path: &Path, bytes: &[u8]) -> Result<(), String> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .map_err(|e| format!("create parent dirs failed: {}: {e}", parent.display()))?; + } + fs::write(path, bytes).map_err(|e| format!("write file failed: {}: {e}", path.display())) +} + +fn run_index_from_path(path: &Path) -> Option { + path.file_name() + .and_then(|name| name.to_str()) + .and_then(|name| name.strip_prefix("run_")) + .and_then(|value| value.parse::().ok()) +} + +fn json_str<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_str() +} + +fn json_u64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_u64() +} + +fn json_i64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_i64() +} + +fn json_f64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current + .as_f64() + .or_else(|| current.as_i64().map(|value| value as f64)) + .or_else(|| current.as_u64().map(|value| value as f64)) +} + +fn json_bool(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_bool() +} + +fn parse_rfc3339_to_unix(value: &str) -> Option { + time::OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339) + .ok() + .map(|dt| dt.unix_timestamp() as f64) +} + +fn unix_now_seconds() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0) +} + +fn bool_value(value: bool) -> f64 { + if value { 1.0 } else { 0.0 } +} + +fn format_prom_value(value: f64) -> String { + if value.is_infinite() && value.is_sign_positive() { + "+Inf".to_string() + } else if value.fract() == 0.0 { + format!("{value:.0}") + } else { + format!("{value:.6}") + .trim_end_matches('0') + .trim_end_matches('.') + .to_string() + } +} + +fn escape_label(value: &str) -> String { + value + .replace('\\', "\\\\") + .replace('\n', "\\n") + .replace('"', "\\\"") +} + +fn escape_help(value: &str) -> String { + value.replace('\\', "\\\\").replace('\n', "\\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ccr::{ + CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation, + build_aspa_payload_state, build_roa_payload_state, encode_content_info, + }; + use crate::data_model::roa::{IpPrefix, RoaAfi}; + use crate::validation::objects::{AspaAttestation, Vrp}; + use tempfile::TempDir; + + #[test] + fn parse_args_accepts_required_paths() { + let args = parse_args(&[ + "rpki_inter_rp_metrics".to_string(), + "--ours-run-root".to_string(), + "ours".to_string(), + "--peer-root".to_string(), + "peers".to_string(), + "--once".to_string(), + ]) + .expect("parse"); + assert_eq!(args.ours_run_root, PathBuf::from("ours")); + assert_eq!(args.peer_root, PathBuf::from("peers")); + assert!(args.once); + } + + #[test] + fn count_csv_rows_ignores_header_comments_and_blanks() { + let temp = TempDir::new().expect("temp"); + let path = temp.path().join("vrps.csv"); + fs::write( + &path, + "# generated\nASN,IP Prefix,Max Length,Trust Anchor\nAS1,192.0.2.0/24,24,ta\n\n", + ) + .expect("write"); + assert_eq!(count_csv_rows(&path).expect("count"), 1); + } + + #[test] + fn count_vrp_csv_uses_unique_vrp_keys() { + let temp = TempDir::new().expect("temp"); + let path = temp.path().join("vrps.csv"); + fs::write( + &path, + "ASN,IP Prefix,Max Length,Trust Anchor,Expires\n\ + AS1,192.0.2.0/24,24,ta-a,100\n\ + AS1,192.0.2.0/24,24,ta-a,100\n\ + AS1,192.0.2.0/24,24,ta-b,200\n\ + AS2,198.51.100.0/24,24,ta-a,100\n", + ) + .expect("write"); + assert_eq!(count_csv_rows(&path).expect("raw rows"), 4); + assert_eq!(count_vrp_csv_unique_keys(&path).expect("unique vrps"), 2); + } + + #[test] + fn count_vap_csv_uses_unique_vap_keys() { + let temp = TempDir::new().expect("temp"); + let path = temp.path().join("vaps.csv"); + fs::write( + &path, + "Customer ASN,Providers,Trust Anchor\n\ + AS64496,AS64497;AS64498,arin\n\ + AS64496,AS64497;AS64498,ripe\n\ + AS64496,AS64497,arin\n", + ) + .expect("write"); + assert_eq!(count_csv_rows(&path).expect("raw rows"), 3); + assert_eq!(count_vap_csv_unique_keys(&path).expect("unique vaps"), 2); + } + + #[test] + fn scan_fixture_renders_core_metrics_and_ccr_match() { + let temp = TempDir::new().expect("temp"); + let ours_run = temp.path().join("ours/runs/run_0001"); + fs::create_dir_all(&ours_run).expect("mkdir"); + fs::write( + ours_run.join("run-summary.json"), + r#"{ + "runId":"run_0001", + "runSeq":1, + "status":"success", + "exitCode":0, + "wallMs":10000, + "finishedAtRfc3339Utc":"2026-06-09T00:00:00Z", + "processMetrics":{"maxRssKb":1000}, + "reportCounts":{"vrps":1,"aspas":1,"publicationPoints":1} + }"#, + ) + .expect("write summary"); + let ccr = sample_ccr(64496); + fs::write(ours_run.join("result.ccr"), &ccr).expect("write ccr"); + + let peer_root = temp.path().join("peers"); + let routinator = peer_root.join("routinator/latest"); + fs::create_dir_all(&routinator).expect("mkdir"); + fs::write( + routinator.join("run-meta.json"), + r#"{"runId":"run_0001","runSeq":1,"success":true,"wallMs":9000,"finishedAtRfc3339Utc":"2026-06-09T00:00:01Z","maxRssKb":{"aggregatePeak":900}}"#, + ) + .expect("write meta"); + fs::write( + routinator.join("vrps.csv"), + "ASN,IP Prefix,Max Length,Trust Anchor\nAS64496,192.0.2.0/24,24,ta\n", + ) + .expect("write vrps"); + fs::write( + routinator.join("vaps.csv"), + "Customer ASN,Providers,Trust Anchor\nAS64496,AS64497,ta\n", + ) + .expect("write vaps"); + + let rpki_client = peer_root.join("rpki-client/latest"); + fs::create_dir_all(&rpki_client).expect("mkdir"); + fs::write( + rpki_client.join("run-meta.json"), + r#"{"runId":"run_0001","runSeq":1,"success":true,"wallMs":8000,"finishedAtRfc3339Utc":"2026-06-09T00:00:02Z","maxRssKb":{"aggregatePeak":800},"artifacts":{"ccr":"result.ccr"},"counts":{"vrps":1,"vaps":1}}"#, + ) + .expect("write meta"); + fs::write(rpki_client.join("result.ccr"), &ccr).expect("write ccr"); + fs::write( + peer_root.join("sync-status.json"), + r#"{"success":true,"lastSyncAtRfc3339Utc":"2026-06-09T00:00:03Z","remoteHost":"remote200"}"#, + ) + .expect("write sync"); + + let args = Args { + ours_run_root: temp.path().join("ours"), + peer_root, + listen: "127.0.0.1:0".to_string(), + poll_secs: 1, + instance: "test".to_string(), + once: true, + out_metrics: None, + out_status: None, + }; + let snapshot = scan(&args).expect("scan"); + let metrics = render_metrics(&snapshot); + assert!(metrics.contains("inter_rp_vrps{instance=\"test\",rp=\"ours-rp\"} 1")); + assert!(metrics.contains("inter_rp_vaps{instance=\"test\",rp=\"rpki-client\"} 1")); + assert!(metrics.contains( + "inter_rp_ccr_digest_match{instance=\"test\",left=\"ours-rp\",right=\"rpki-client\",state=\"overall\"} 1" + )); + assert!(metrics.contains( + "inter_rp_vrps_diff{instance=\"test\",left=\"routinator\",right=\"rpki-client\"} 0" + )); + } + + fn sample_ccr(customer_asn: u32) -> Vec { + let vrps = build_roa_payload_state(&[Vrp { + asn: customer_asn, + prefix: IpPrefix { + afi: RoaAfi::Ipv4, + prefix_len: 24, + addr: [192, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + }, + max_length: 24, + }]) + .expect("build vrps"); + let vaps = build_aspa_payload_state(&[AspaAttestation { + customer_as_id: customer_asn, + provider_as_ids: vec![64497], + }]) + .expect("build vaps"); + let content = CcrContentInfo::new(RpkiCanonicalCacheRepresentation { + version: 0, + hash_alg: CcrDigestAlgorithm::Sha256, + produced_at: time::OffsetDateTime::UNIX_EPOCH, + mfts: None, + vrps: Some(vrps), + vaps: Some(vaps), + tas: None, + rks: None, + }); + encode_content_info(&content).expect("encode") + } +}