From 7e1c24fcc3dcc6b767bf94399dcc67fedfb41cdb Mon Sep 17 00:00:00 2001 From: yuyr Date: Tue, 26 May 2026 18:02:40 +0800 Subject: [PATCH] =?UTF-8?q?20260526=20=E5=A2=9E=E5=8A=A0=E6=8C=81=E7=BB=AD?= =?UTF-8?q?soak=E7=9B=91=E6=8E=A7=E4=B8=8E=E6=9C=AC=E5=9C=B0=E5=9B=9E?= =?UTF-8?q?=E6=94=BE=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + monitor/README.md | 131 ++ monitor/docker-compose.yml | 38 + .../grafana/dashboards/ours-rp-repo-sync.json | 632 +++++ .../dashboards/ours-rp-soak-overview.json | 582 +++++ .../provisioning/dashboards/dashboard.yml | 12 + .../provisioning/datasources/prometheus.yml | 10 + monitor/prometheus/prometheus.yml | 13 + .../build_local_repo_replay_package.sh | 79 + .../compare_normalized_sets.py | 51 + .../local_repo_replay/normalize_rp_outputs.py | 123 + scripts/local_repo_replay/prepare_tals.py | 84 + .../run_dual_local_tree_replay.sh | 76 + .../run_routinator_from_local_tree.sh | 131 ++ .../run_rpki_client_from_local_tree.sh | 107 + scripts/local_repo_replay/summarize_replay.py | 47 + scripts/local_repo_replay/templates/README.md | 118 + .../templates/docs/input_tree_requirements.md | 39 + .../templates/docs/offline_replay_limits.md | 38 + .../templates/docs/output_files.md | 16 + .../examples/dual_compare_example.sh | 11 + .../templates/examples/routinator_example.sh | 9 + .../templates/examples/rpki_client_example.sh | 10 + scripts/soak/portable-soak.env.example | 3 +- scripts/soak/run_soak.sh | 42 +- src/bin/rpki_artifact_metrics.rs | 2037 +++++++++++++++++ src/cir/mod.rs | 2 + src/lib.rs | 1 + src/parallel/repo_runtime.rs | 78 + src/validation/tree_runner.rs | 32 +- 30 files changed, 4541 insertions(+), 12 deletions(-) create mode 100644 monitor/README.md create mode 100644 monitor/docker-compose.yml create mode 100644 monitor/grafana/dashboards/ours-rp-repo-sync.json create mode 100644 monitor/grafana/dashboards/ours-rp-soak-overview.json create mode 100644 monitor/grafana/provisioning/dashboards/dashboard.yml create mode 100644 monitor/grafana/provisioning/datasources/prometheus.yml create mode 100644 monitor/prometheus/prometheus.yml create mode 100755 scripts/local_repo_replay/build_local_repo_replay_package.sh create mode 100755 scripts/local_repo_replay/compare_normalized_sets.py create mode 100755 scripts/local_repo_replay/normalize_rp_outputs.py create mode 100755 scripts/local_repo_replay/prepare_tals.py create mode 100755 scripts/local_repo_replay/run_dual_local_tree_replay.sh create mode 100755 scripts/local_repo_replay/run_routinator_from_local_tree.sh create mode 100755 scripts/local_repo_replay/run_rpki_client_from_local_tree.sh create mode 100755 scripts/local_repo_replay/summarize_replay.py create mode 100644 scripts/local_repo_replay/templates/README.md create mode 100644 scripts/local_repo_replay/templates/docs/input_tree_requirements.md create mode 100644 scripts/local_repo_replay/templates/docs/offline_replay_limits.md create mode 100644 scripts/local_repo_replay/templates/docs/output_files.md create mode 100755 scripts/local_repo_replay/templates/examples/dual_compare_example.sh create mode 100755 scripts/local_repo_replay/templates/examples/routinator_example.sh create mode 100755 scripts/local_repo_replay/templates/examples/rpki_client_example.sh create mode 100644 src/bin/rpki_artifact_metrics.rs diff --git a/.gitignore b/.gitignore index 3c96bf2..a252467 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target/ Cargo.lock perf.* +specs/* copy.excalidraw diff --git a/monitor/README.md b/monitor/README.md new file mode 100644 index 0000000..36d83b1 --- /dev/null +++ b/monitor/README.md @@ -0,0 +1,131 @@ +# Ours RP Prometheus / Grafana Monitor + +本目录提供本地开发监控栈,用于采集 `rpki_artifact_metrics` 暴露的 ours RP soak 指标。 + +## 前置条件 + +1. Docker + Docker Compose v2; +2. 宿主机已启动 `rpki_artifact_metrics`,并监听 Docker 网桥可访问的地址,例如 `0.0.0.0:9556`; +3. Prometheus 容器通过 `host.docker.internal:9556` 访问宿主 sidecar。 + +Linux Docker 下 compose 已配置: + +```yaml +extra_hosts: + - host.docker.internal:host-gateway +``` + +## 启动 + +```bash +cd rpki_2/rpki/monitor +docker compose up -d +``` + +默认镜像使用官方 Docker Hub 镜像: + +```text +prom/prometheus:v2.55.1 +grafana/grafana:11.3.1 +``` + +如需切到其它镜像源: + +```bash +PROMETHEUS_IMAGE=/prom/prometheus:v2.55.1 \ +GRAFANA_IMAGE=/grafana/grafana:11.3.1 \ +docker compose up -d +``` + +默认端口: + +- Prometheus: +- Grafana: +- Grafana 默认账号密码:`admin` / `admin` + +如端口冲突: + +```bash +PROMETHEUS_PORT=19090 GRAFANA_PORT=13000 docker compose up -d +``` + +## 停止 + +```bash +cd rpki_2/rpki/monitor +docker compose down +``` + +保留数据 volume。若要清理数据: + +```bash +docker compose down -v +``` + +## 典型本地联调命令 + +先启动 APNIC soak 和 metrics sidecar,例如: + +```bash +# soak .env 关键配置 +MAX_RUNS=-1 +RIRS=apnic +RETAIN_RUNS=5 +INTERVAL_SECS=0 + +# metrics sidecar +rpki_artifact_metrics \ + --run-root /path/to/portable-soak \ + --listen 0.0.0.0:9556 \ + --poll-secs 5 \ + --instance local-apnic-continuous +``` + +再启动监控栈: + +```bash +cd rpki_2/rpki/monitor +docker compose up -d +``` + +## 验证 + +Prometheus target: + +```bash +curl -s 'http://localhost:9090/api/v1/targets' | python3 -m json.tool +``` + +Prometheus query: + +```bash +curl -G 'http://localhost:9090/api/v1/query' \ + --data-urlencode 'query=up{job="ours-rp-artifact-metrics"}' + +curl -G 'http://localhost:9090/api/v1/query' \ + --data-urlencode 'query=ours_rp_run_completed_total{status="success"}' +``` + +Grafana health: + +```bash +curl -s http://localhost:3000/api/health | python3 -m json.tool +``` + +Grafana dashboard: + +- 打开 + +## 主要指标 + +- `ours_rp_metrics_service_up` +- `ours_rp_run_completed_total` +- `ours_rp_run_duration_seconds` +- `ours_rp_run_max_rss_bytes` +- `ours_rp_vrps` +- `ours_rp_vaps` +- `ours_rp_publication_points` +- `ours_rp_repo_sync_phase_count` +- `ours_rp_large_publication_points{object_count_gt="10|50|100|..."}` +- `ours_rp_cir_objects` +- `ours_rp_ccr_state_items` diff --git a/monitor/docker-compose.yml b/monitor/docker-compose.yml new file mode 100644 index 0000000..acc40be --- /dev/null +++ b/monitor/docker-compose.yml @@ -0,0 +1,38 @@ +services: + prometheus: + image: ${PROMETHEUS_IMAGE:-prom/prometheus:v2.55.1} + container_name: ours-rp-prometheus + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + - --storage.tsdb.retention.time=${PROMETHEUS_RETENTION:-7d} + - --web.enable-lifecycle + extra_hosts: + - host.docker.internal:host-gateway + ports: + - "${PROMETHEUS_PORT:-9090}:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + restart: unless-stopped + + grafana: + image: ${GRAFANA_IMAGE:-grafana/grafana:11.3.1} + container_name: ours-rp-grafana + depends_on: + - prometheus + ports: + - "${GRAFANA_PORT:-3000}:3000" + environment: + GF_SECURITY_ADMIN_USER: ${GRAFANA_ADMIN_USER:-admin} + GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD:-admin} + GF_USERS_ALLOW_SIGN_UP: "false" + volumes: + - grafana-data:/var/lib/grafana + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro + restart: unless-stopped + +volumes: + prometheus-data: + grafana-data: diff --git a/monitor/grafana/dashboards/ours-rp-repo-sync.json b/monitor/grafana/dashboards/ours-rp-repo-sync.json new file mode 100644 index 0000000..5cc9f1a --- /dev/null +++ b/monitor/grafana/dashboards/ours-rp-repo-sync.json @@ -0,0 +1,632 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_publication_points", + "legendFormat": "publication points", + "refId": "A" + } + ], + "title": "Publication Points", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_repo_sync_phase_count{phase=\"rrdp_ok\"}", + "legendFormat": "rrdp ok", + "refId": "A" + } + ], + "title": "RRDP OK Points", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_repo_sync_phase_count{phase=\"rrdp_failed_rsync_ok\"}", + "legendFormat": "fallback", + "refId": "A" + } + ], + "title": "Rsync Fallback Points", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_repo_terminal_state_count{terminal_state=\"failed_no_cache\"}", + "legendFormat": "failed no cache", + "refId": "A" + } + ], + "title": "Failed No Cache Points", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 4 + }, + "id": 5, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"repo_sync_total\"}", + "legendFormat": "repo sync total", + "refId": "A" + }, + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"rrdp_download_total\"}", + "legendFormat": "rrdp download", + "refId": "B" + }, + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"rsync_download_total\"}", + "legendFormat": "rsync download", + "refId": "C" + } + ], + "title": "Repo Sync Download Durations", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 12, + "w": 12, + "h": 8 + }, + "id": 6, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_repo_sync_phase_count", + "legendFormat": "{{phase}}", + "refId": "A" + } + ], + "title": "Repo Sync Phase Counts", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 12, + "w": 12, + "h": 8 + }, + "id": 7, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_repo_sync_phase_count{phase=\"rrdp_failed_rsync_ok\"}", + "legendFormat": "rrdp failed, rsync ok", + "refId": "A" + }, + { + "expr": "ours_rp_repo_sync_phase_count{phase=\"rrdp_failed_rsync_failed\"}", + "legendFormat": "rrdp failed, rsync failed", + "refId": "B" + }, + { + "expr": "ours_rp_repo_terminal_state_count{terminal_state=\"failed_no_cache\"}", + "legendFormat": "failed no cache", + "refId": "C" + }, + { + "expr": "ours_rp_tree_instances{state=\"failed\"}", + "legendFormat": "tree failed", + "refId": "D" + } + ], + "title": "Repo Failure / Fallback Counts", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 20, + "w": 12, + "h": 8 + }, + "id": 8, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_repo_sync_phase_duration_seconds_total{phase=\"rrdp_failed_rsync_ok\"}", + "legendFormat": "rsync fallback duration", + "refId": "A" + }, + { + "expr": "ours_rp_repo_sync_phase_duration_seconds_total{phase=\"rrdp_failed_rsync_failed\"}", + "legendFormat": "failed duration", + "refId": "B" + }, + { + "expr": "ours_rp_repo_terminal_state_duration_seconds_total{terminal_state=\"failed_no_cache\"}", + "legendFormat": "failed no cache duration", + "refId": "C" + } + ], + "title": "Repo Failure / Fallback Durations", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 29, + "w": 12, + "h": 9 + }, + "id": 9, + "options": { + "showHeader": true, + "cellHeight": "sm", + "footer": { + "show": false, + "reducer": [ + "sum" + ], + "countRows": false, + "fields": "" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_rrdp_rsync_failed_repository_duration_seconds", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "RRDP + Rsync Failed Repositories", + "type": "table", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "job": true, + "terminal_state": true, + "rank": true, + "transport": true, + "__name__": true, + "publication_points": true, + "instance": true, + "repo_id": true, + "pp_id": true, + "exported_instance": true, + "rp": true, + "source": true + }, + "indexByName": { + "Time": 0, + "host": 1, + "phase": 2, + "uri": 3, + "Value": 4 + }, + "renameByName": { + "Value": "duration" + } + } + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 38, + "w": 24, + "h": 9 + }, + "id": 10, + "options": { + "showHeader": true, + "cellHeight": "sm", + "footer": { + "show": false, + "reducer": [ + "sum" + ], + "countRows": false, + "fields": "" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "topk(20, ours_rp_top_publication_point_object_count)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Top Publication Points by Objects", + "type": "table", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "job": true, + "__name__": true, + "publication_points": true, + "instance": true, + "repo_id": true, + "phase": true, + "pp_id": true, + "exported_instance": true, + "rp": true, + "source": true + }, + "indexByName": { + "Time": 0, + "host": 1, + "rank": 2, + "terminal_state": 3, + "transport": 4, + "uri": 5, + "Value": 6 + }, + "renameByName": {} + } + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 29, + "w": 12, + "h": 9 + }, + "id": 11, + "options": { + "showHeader": true, + "cellHeight": "sm", + "footer": { + "show": false, + "reducer": [ + "sum" + ], + "countRows": false, + "fields": "" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "topk(20, ours_rp_top_repository_sync_duration_seconds)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "title": "Top 20 Repositories by Sync Duration", + "type": "table", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "job": true, + "terminal_state": true, + "__name__": true, + "publication_points": true, + "instance": true, + "repo_id": true, + "phase": true, + "pp_id": true, + "exported_instance": true, + "rp": true, + "source": true + }, + "indexByName": { + "Time": 0, + "host": 1, + "rank": 2, + "transport": 3, + "uri": 4, + "Value": 5 + }, + "renameByName": { + "Value": "value" + } + } + } + ] + } + ], + "refresh": "5s", + "schemaVersion": 40, + "tags": [ + "ours-rp", + "rpki", + "soak", + "repo-sync" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Ours RP Repo Sync", + "uid": "ours-rp-repo-sync", + "version": 3, + "weekStart": "" +} diff --git a/monitor/grafana/dashboards/ours-rp-soak-overview.json b/monitor/grafana/dashboards/ours-rp-soak-overview.json new file mode 100644 index 0000000..e18238a --- /dev/null +++ b/monitor/grafana/dashboards/ours-rp-soak-overview.json @@ -0,0 +1,582 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 0, + "w": 6, + "h": 4 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_cir_trust_anchors", + "legendFormat": "RIRs", + "refId": "A" + } + ], + "title": "Current Run RIRs", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "x": 6, + "y": 0, + "w": 6, + "h": 4 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_run_duration_seconds", + "legendFormat": "wall", + "refId": "A" + } + ], + "title": "Latest Wall Time", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 0, + "w": 6, + "h": 4 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_run_max_rss_bytes", + "legendFormat": "rss", + "refId": "A" + } + ], + "title": "Latest Max RSS", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 18, + "y": 0, + "w": 6, + "h": 4 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_publication_points", + "legendFormat": "publication points", + "refId": "A" + } + ], + "title": "Publication Points", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 8, + "w": 12, + "h": 8 + }, + "id": 5, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_run_duration_seconds", + "legendFormat": "wall", + "refId": "A" + }, + { + "expr": "ours_rp_stage_duration_seconds{stage=\"validation\"}", + "legendFormat": "validation", + "refId": "B" + } + ], + "title": "Run / Validation Duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 8, + "w": 12, + "h": 8 + }, + "id": 6, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_vrps", + "legendFormat": "VRPs", + "refId": "A" + }, + { + "expr": "ours_rp_vaps", + "legendFormat": "VAPs", + "refId": "B" + }, + { + "expr": "ours_rp_cir_objects", + "legendFormat": "CIR objects", + "refId": "C" + } + ], + "title": "Output and Input Sizes", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 16, + "w": 12, + "h": 8 + }, + "id": 8, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_large_publication_points", + "legendFormat": "> {{object_count_gt}} objects", + "refId": "A" + } + ], + "title": "Large Publication Points by Object Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 0, + "y": 4, + "w": 6, + "h": 4 + }, + "id": 9, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_run_sequence", + "legendFormat": "seq", + "refId": "A" + } + ], + "title": "Latest Run Sequence", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 6, + "y": 4, + "w": 6, + "h": 4 + }, + "id": 10, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_run_success", + "legendFormat": "success", + "refId": "A" + } + ], + "title": "Latest Run Success", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 4, + "w": 6, + "h": 4 + }, + "id": 11, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_vrps", + "legendFormat": "VRPs", + "refId": "A" + } + ], + "title": "VRPs", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "x": 18, + "y": 4, + "w": 6, + "h": 4 + }, + "id": 12, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "expr": "ours_rp_vaps", + "legendFormat": "VAPs", + "refId": "A" + } + ], + "title": "VAPs", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "x": 12, + "y": 16, + "w": 12, + "h": 8 + }, + "id": 13, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"validation\"}", + "legendFormat": "validation", + "refId": "A" + }, + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"report_write\"}", + "legendFormat": "report write", + "refId": "E" + }, + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"ccr_write\"}", + "legendFormat": "ccr write", + "refId": "F" + }, + { + "expr": "ours_rp_run_stage_duration_seconds{stage=\"cir_write\"}", + "legendFormat": "cir write", + "refId": "G" + } + ], + "title": "Output Stage Durations", + "type": "timeseries" + } + ], + "refresh": "5s", + "schemaVersion": 40, + "tags": [ + "ours-rp", + "rpki", + "soak" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-30m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Ours RP Soak Overview", + "uid": "ours-rp-soak-overview", + "version": 4, + "weekStart": "" +} diff --git a/monitor/grafana/provisioning/dashboards/dashboard.yml b/monitor/grafana/provisioning/dashboards/dashboard.yml new file mode 100644 index 0000000..39a681c --- /dev/null +++ b/monitor/grafana/provisioning/dashboards/dashboard.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: ours-rp + orgId: 1 + folder: Ours RP + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /var/lib/grafana/dashboards diff --git a/monitor/grafana/provisioning/datasources/prometheus.yml b/monitor/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 0000000..25e8d70 --- /dev/null +++ b/monitor/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + uid: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: true diff --git a/monitor/prometheus/prometheus.yml b/monitor/prometheus/prometheus.yml new file mode 100644 index 0000000..a7a48fa --- /dev/null +++ b/monitor/prometheus/prometheus.yml @@ -0,0 +1,13 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: ours-rp-artifact-metrics + metrics_path: /metrics + static_configs: + - targets: + - host.docker.internal:9556 + labels: + rp: ours-rp + source: artifact-sidecar diff --git a/scripts/local_repo_replay/build_local_repo_replay_package.sh b/scripts/local_repo_replay/build_local_repo_replay_package.sh new file mode 100755 index 0000000..39a1d64 --- /dev/null +++ b/scripts/local_repo_replay/build_local_repo_replay_package.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + build_local_repo_replay_package.sh --out [--tar-gz] + +Build a standalone local repository tree replay package for Routinator and +rpki-client. The package does not include repository data and does not include +materialize tooling. +EOF +} + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +SRC_DIR="$ROOT_DIR/scripts/local_repo_replay" +OUT="" +TAR_GZ=0 + +while [[ $# -gt 0 ]]; do + case "$1" in + --out) OUT="$2"; shift 2 ;; + --tar-gz) TAR_GZ=1; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;; + esac +done + +[[ -n "$OUT" ]] || { usage >&2; exit 2; } + +if [[ "$TAR_GZ" -eq 1 ]]; then + PACKAGE_DIR="$(mktemp -d)" + TARGET_DIR="$PACKAGE_DIR/local-repo-replay-package" +else + TARGET_DIR="$OUT" + rm -rf "$TARGET_DIR" +fi + +mkdir -p "$TARGET_DIR/scripts" "$TARGET_DIR/docs" "$TARGET_DIR/examples" + +install -m 0755 "$SRC_DIR/run_routinator_from_local_tree.sh" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/run_rpki_client_from_local_tree.sh" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/run_dual_local_tree_replay.sh" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/prepare_tals.py" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/normalize_rp_outputs.py" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/compare_normalized_sets.py" "$TARGET_DIR/scripts/" +install -m 0755 "$SRC_DIR/summarize_replay.py" "$TARGET_DIR/scripts/" +install -m 0755 "$ROOT_DIR/scripts/cir/cir-rsync-wrapper" "$TARGET_DIR/scripts/" +install -m 0755 "$ROOT_DIR/scripts/cir/cir-local-link-sync.py" "$TARGET_DIR/scripts/" +install -m 0644 "$SRC_DIR/templates/README.md" "$TARGET_DIR/README.md" +install -m 0644 "$SRC_DIR/templates/docs/input_tree_requirements.md" "$TARGET_DIR/docs/" +install -m 0644 "$SRC_DIR/templates/docs/offline_replay_limits.md" "$TARGET_DIR/docs/" +install -m 0644 "$SRC_DIR/templates/docs/output_files.md" "$TARGET_DIR/docs/" +install -m 0755 "$SRC_DIR/templates/examples/routinator_example.sh" "$TARGET_DIR/examples/" +install -m 0755 "$SRC_DIR/templates/examples/rpki_client_example.sh" "$TARGET_DIR/examples/" +install -m 0755 "$SRC_DIR/templates/examples/dual_compare_example.sh" "$TARGET_DIR/examples/" +cat > "$TARGET_DIR/env.example" <<'EOF' +# 本地目录树 replay 示例配置。目录树由使用者提前准备,不包含在 package 中。 +TAL_DIR=/data/replay/tals +MIRROR_ROOT=/data/replay/mirror +ROUTINATOR_BIN=/opt/routinator/target/release/routinator +RPKI_CLIENT_BIN=/opt/rpki-client/src/rpki-client +RPKI_CLIENT_CACHE_DIR=/data/replay/work/rpki-client-cache +VALIDATION_TIME=2026-05-23T00:00:00Z +EOF + +if grep -R -E 'cir_materialize|repo-bytes\\.db|\\.cir' "$TARGET_DIR/scripts" >/dev/null; then + echo "package contains forbidden materialize/repo-bytes implementation references" >&2 + exit 1 +fi + +if [[ "$TAR_GZ" -eq 1 ]]; then + mkdir -p "$(dirname "$OUT")" + tar -C "$PACKAGE_DIR" -czf "$OUT" local-repo-replay-package + rm -rf "$PACKAGE_DIR" + echo "$OUT" +else + echo "$TARGET_DIR" +fi diff --git a/scripts/local_repo_replay/compare_normalized_sets.py b/scripts/local_repo_replay/compare_normalized_sets.py new file mode 100755 index 0000000..47f9e66 --- /dev/null +++ b/scripts/local_repo_replay/compare_normalized_sets.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +from pathlib import Path + + +def read_set(path: Path) -> set[str]: + if not path.is_file(): + return set() + return {line.strip() for line in path.read_text(encoding="utf-8", errors="replace").splitlines() if line.strip()} + + +def compare(left: set[str], right: set[str]) -> dict[str, object]: + union = left | right + inter = left & right + return { + "left": len(left), + "right": len(right), + "intersection": len(inter), + "onlyLeft": len(left - right), + "onlyRight": len(right - left), + "jaccard": round(len(inter) / len(union), 8) if union else 1.0, + "onlyLeftSamples": sorted(left - right)[:20], + "onlyRightSamples": sorted(right - left)[:20], + } + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--left", type=Path, required=True) + parser.add_argument("--right", type=Path, required=True) + parser.add_argument("--left-name", default="left") + parser.add_argument("--right-name", default="right") + parser.add_argument("--out", type=Path, required=True) + args = parser.parse_args() + result = { + "leftName": args.left_name, + "rightName": args.right_name, + "vrps": compare(read_set(args.left / "vrps.normalized.txt"), read_set(args.right / "vrps.normalized.txt")), + "vaps": compare(read_set(args.left / "vaps.normalized.txt"), read_set(args.right / "vaps.normalized.txt")), + } + args.out.parent.mkdir(parents=True, exist_ok=True) + args.out.write_text(json.dumps(result, indent=2, sort_keys=True) + "\n", encoding="utf-8") + print(args.out) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/local_repo_replay/normalize_rp_outputs.py b/scripts/local_repo_replay/normalize_rp_outputs.py new file mode 100755 index 0000000..1dbe6ce --- /dev/null +++ b/scripts/local_repo_replay/normalize_rp_outputs.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import csv +import json +from pathlib import Path +from typing import Any + + +def normalize_asn(value: Any) -> str: + text = str(value).strip().upper() + if text.startswith("AS"): + text = text[2:] + return f"AS{int(text)}" + + +def write_set(path: Path, rows: set[str]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("\n".join(sorted(rows)) + ("\n" if rows else ""), encoding="utf-8") + + +def load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def normalize_vrps_from_json(path: Path) -> set[str]: + data = load_json(path) + rows: set[str] = set() + objects: list[dict[str, Any]] = [] + if isinstance(data, dict): + for key in ("roas", "routeOrigins", "valid_roas"): + value = data.get(key) + if isinstance(value, list): + objects.extend(item for item in value if isinstance(item, dict)) + elif isinstance(data, list): + objects = [item for item in data if isinstance(item, dict)] + for item in objects: + asn = item.get("asn") or item.get("asID") or item.get("origin") + prefix = item.get("prefix") + max_length = item.get("maxLength") or item.get("max_length") or item.get("maxlen") or item.get("maxLengthPrefix") + if asn is None or prefix is None or max_length is None: + continue + rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}") + return rows + + +def normalize_vaps_from_json(path: Path) -> set[str]: + data = load_json(path) + rows: set[str] = set() + objects: list[dict[str, Any]] = [] + if isinstance(data, dict): + for key in ("aspas", "aspaAssertions", "vaps"): + value = data.get(key) + if isinstance(value, list): + objects.extend(item for item in value if isinstance(item, dict)) + elif isinstance(data, list): + objects = [item for item in data if isinstance(item, dict)] + for item in objects: + customer = ( + item.get("customer") + or item.get("customer_asid") + or item.get("customerASID") + or item.get("customerAsid") + or item.get("customerAsn") + ) + providers = item.get("providers") or item.get("provider_asns") or item.get("providerASNs") or [] + if customer is None: + continue + provider_asns = [normalize_asn(provider) for provider in providers] + rows.add(f"{normalize_asn(customer)}|{','.join(sorted(set(provider_asns), key=lambda value: int(value[2:])))}") + return rows + + +def normalize_vrps_from_csv(path: Path) -> set[str]: + rows: set[str] = set() + with path.open(newline="", encoding="utf-8", errors="replace") as handle: + for row in csv.DictReader(handle): + asn = row.get("ASN") or row.get("asn") or row.get("AS") + prefix = row.get("IP Prefix") or row.get("prefix") or row.get("Prefix") + max_length = row.get("Max Length") or row.get("maxLength") or row.get("max_length") + if asn and prefix and max_length: + rows.add(f"{normalize_asn(asn)}|{prefix}|{int(max_length)}") + return rows + + +def normalize_routinator(input_path: Path) -> tuple[set[str], set[str]]: + return normalize_vrps_from_json(input_path), normalize_vaps_from_json(input_path) + + +def normalize_rpki_client(input_path: Path) -> tuple[set[str], set[str]]: + json_path = input_path / "json" if input_path.is_dir() else input_path + csv_path = input_path / "csv" if input_path.is_dir() else input_path + vrps: set[str] = set() + vaps: set[str] = set() + if json_path.is_file(): + vrps |= normalize_vrps_from_json(json_path) + vaps |= normalize_vaps_from_json(json_path) + if csv_path.is_file(): + vrps |= normalize_vrps_from_csv(csv_path) + return vrps, vaps + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--kind", choices=["routinator", "rpki-client"], required=True) + parser.add_argument("--input", type=Path, required=True) + parser.add_argument("--vrps-out", type=Path, required=True) + parser.add_argument("--vaps-out", type=Path, required=True) + args = parser.parse_args() + + if args.kind == "routinator": + vrps, vaps = normalize_routinator(args.input) + else: + vrps, vaps = normalize_rpki_client(args.input) + write_set(args.vrps_out, vrps) + write_set(args.vaps_out, vaps) + print(json.dumps({"vrps": len(vrps), "vaps": len(vaps)}, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/local_repo_replay/prepare_tals.py b/scripts/local_repo_replay/prepare_tals.py new file mode 100755 index 0000000..a47ec9e --- /dev/null +++ b/scripts/local_repo_replay/prepare_tals.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +from pathlib import Path + + +def split_tal(text: str) -> tuple[list[str], list[str]]: + lines = text.splitlines() + for index, line in enumerate(lines): + if line.strip() == "": + return lines[:index], lines[index + 1 :] + return lines, [] + + +def rsync_only_tal(source: Path) -> tuple[str, dict[str, object]]: + text = source.read_text(encoding="utf-8", errors="replace") + uri_lines, key_lines = split_tal(text) + uris = [ + line.strip() + for line in uri_lines + if line.strip() and not line.lstrip().startswith("#") + ] + rsync_uris = [ + uri for uri in uris + if uri.lower().startswith("rsync://") + ] + if not rsync_uris: + return text if text.endswith("\n") else text + "\n", { + "file": source.name, + "mode": "unchanged_no_rsync_uri", + "uris": len(uris), + "rsyncUris": 0, + } + out = "\n".join(rsync_uris) + "\n\n" + "\n".join(key_lines).strip() + "\n" + return out, { + "file": source.name, + "mode": "rsync_only", + "uris": len(uris), + "rsyncUris": len(rsync_uris), + } + + +def collect_tals(tal_dir: Path | None, tals: list[Path]) -> list[Path]: + paths: list[Path] = [] + if tal_dir is not None: + paths.extend(sorted(tal_dir.glob("*.tal"))) + paths.extend(tals) + unique: dict[str, Path] = {} + for path in paths: + unique[path.name] = path + return [unique[name] for name in sorted(unique)] + + +def main() -> int: + parser = argparse.ArgumentParser(description="Prepare TALs for local rsync-tree replay.") + parser.add_argument("--tal-dir", type=Path) + parser.add_argument("--tal", type=Path, action="append", default=[]) + parser.add_argument("--out-dir", type=Path, required=True) + parser.add_argument("--summary", type=Path) + args = parser.parse_args() + + sources = collect_tals(args.tal_dir, args.tal) + if not sources: + raise SystemExit("no TAL files provided") + + args.out_dir.mkdir(parents=True, exist_ok=True) + rows = [] + for source in sources: + if not source.is_file(): + raise SystemExit(f"TAL file not found: {source}") + content, row = rsync_only_tal(source) + (args.out_dir / source.name).write_text(content, encoding="utf-8") + rows.append(row) + + if args.summary is not None: + args.summary.parent.mkdir(parents=True, exist_ok=True) + args.summary.write_text(json.dumps(rows, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/local_repo_replay/run_dual_local_tree_replay.sh b/scripts/local_repo_replay/run_dual_local_tree_replay.sh new file mode 100755 index 0000000..c7fe73a --- /dev/null +++ b/scripts/local_repo_replay/run_dual_local_tree_replay.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + run_dual_local_tree_replay.sh \ + --routinator-bin --routinator-mirror-root \ + --rpki-client-bin --rpki-client-mirror-root \ + --tal-dir --out-dir [--validation-time ] +EOF +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROUTINATOR_BIN="" +ROUTINATOR_MIRROR_ROOT="" +RPKI_CLIENT_BIN="" +RPKI_CLIENT_MIRROR_ROOT="" +RPKI_CLIENT_CACHE_DIR="" +TAL_DIR="" +OUT_DIR="" +VALIDATION_TIME="" + +while [[ $# -gt 0 ]]; do + case "$1" in + --routinator-bin) ROUTINATOR_BIN="$2"; shift 2 ;; + --routinator-mirror-root) ROUTINATOR_MIRROR_ROOT="$2"; shift 2 ;; + --rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;; + --rpki-client-mirror-root) RPKI_CLIENT_MIRROR_ROOT="$2"; shift 2 ;; + --rpki-client-cache-dir) RPKI_CLIENT_CACHE_DIR="$2"; shift 2 ;; + --tal-dir) TAL_DIR="$2"; shift 2 ;; + --out-dir) OUT_DIR="$2"; shift 2 ;; + --validation-time) VALIDATION_TIME="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;; + esac +done + +if [[ -z "$RPKI_CLIENT_MIRROR_ROOT" ]]; then + RPKI_CLIENT_MIRROR_ROOT="$ROUTINATOR_MIRROR_ROOT" +fi +[[ -n "$ROUTINATOR_BIN" && -n "$ROUTINATOR_MIRROR_ROOT" && -n "$RPKI_CLIENT_BIN" && -n "$RPKI_CLIENT_MIRROR_ROOT" && -n "$TAL_DIR" && -n "$OUT_DIR" ]] || { usage >&2; exit 2; } + +mkdir -p "$OUT_DIR" +TIME_ARGS=() +if [[ -n "$VALIDATION_TIME" ]]; then + TIME_ARGS=(--validation-time "$VALIDATION_TIME") +fi +CACHE_ARGS=() +if [[ -n "$RPKI_CLIENT_CACHE_DIR" ]]; then + CACHE_ARGS=(--cache-dir "$RPKI_CLIENT_CACHE_DIR") +fi + +"$SCRIPT_DIR/run_routinator_from_local_tree.sh" \ + --routinator-bin "$ROUTINATOR_BIN" \ + --mirror-root "$ROUTINATOR_MIRROR_ROOT" \ + --tal-dir "$TAL_DIR" \ + --out-dir "$OUT_DIR/routinator" \ + --enable-aspa \ + "${TIME_ARGS[@]}" + +"$SCRIPT_DIR/run_rpki_client_from_local_tree.sh" \ + --rpki-client-bin "$RPKI_CLIENT_BIN" \ + --mirror-root "$RPKI_CLIENT_MIRROR_ROOT" \ + --tal-dir "$TAL_DIR" \ + --out-dir "$OUT_DIR/rpki-client" \ + "${CACHE_ARGS[@]}" \ + "${TIME_ARGS[@]}" + +python3 "$SCRIPT_DIR/compare_normalized_sets.py" \ + --left "$OUT_DIR/routinator" \ + --right "$OUT_DIR/rpki-client" \ + --left-name routinator \ + --right-name rpki-client \ + --out "$OUT_DIR/compare-summary.json" +echo "done: $OUT_DIR" diff --git a/scripts/local_repo_replay/run_routinator_from_local_tree.sh b/scripts/local_repo_replay/run_routinator_from_local_tree.sh new file mode 100755 index 0000000..a1709fd --- /dev/null +++ b/scripts/local_repo_replay/run_routinator_from_local_tree.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + run_routinator_from_local_tree.sh \ + --routinator-bin \ + --mirror-root \ + --tal-dir | --tal ... \ + --out-dir \ + [--validation-time ] \ + [--real-rsync-bin ] \ + [--enable-aspa] + +The input mirror is prepared by the caller. This script does not generate it. +Pass --validation-time only if FAKETIME_LIB points to a working libfaketime +library; otherwise Routinator will validate at wall-clock time. +Example: + export FAKETIME_LIB=/usr/lib/x86_64-linux-gnu/faketime/libfaketime.so.1 +EOF +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROUTINATOR_BIN="" +MIRROR_ROOT="" +OUT_DIR="" +VALIDATION_TIME="" +REAL_RSYNC_BIN="${REAL_RSYNC_BIN:-/usr/bin/rsync}" +ENABLE_ASPA=0 +TAL_DIR="" +TALS=() + +while [[ $# -gt 0 ]]; do + case "$1" in + --routinator-bin) ROUTINATOR_BIN="$2"; shift 2 ;; + --mirror-root) MIRROR_ROOT="$2"; shift 2 ;; + --tal-dir) TAL_DIR="$2"; shift 2 ;; + --tal) TALS+=("$2"); shift 2 ;; + --out-dir) OUT_DIR="$2"; shift 2 ;; + --validation-time) VALIDATION_TIME="$2"; shift 2 ;; + --real-rsync-bin) REAL_RSYNC_BIN="$2"; shift 2 ;; + --enable-aspa) ENABLE_ASPA=1; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;; + esac +done + +[[ -n "$ROUTINATOR_BIN" && -n "$MIRROR_ROOT" && -n "$OUT_DIR" ]] || { usage >&2; exit 2; } +[[ -x "$ROUTINATOR_BIN" ]] || { echo "routinator binary not executable: $ROUTINATOR_BIN" >&2; exit 2; } +[[ -d "$MIRROR_ROOT" ]] || { echo "mirror root not found: $MIRROR_ROOT" >&2; exit 2; } +if [[ -z "$TAL_DIR" && "${#TALS[@]}" -eq 0 ]]; then + echo "provide --tal-dir or at least one --tal" >&2 + exit 2 +fi + +mkdir -p "$OUT_DIR" +WORK_DIR="$OUT_DIR/work" +REPO_DIR="$WORK_DIR/repository" +EXTRA_TALS="$WORK_DIR/tals" +CONFIG_FILE="$WORK_DIR/routinator.conf" +rm -rf "$WORK_DIR" +mkdir -p "$REPO_DIR" "$EXTRA_TALS" + +PREPARE_TAL_ARGS=(--out-dir "$EXTRA_TALS" --summary "$OUT_DIR/prepared-tals.json") +if [[ -n "$TAL_DIR" ]]; then + PREPARE_TAL_ARGS+=(--tal-dir "$TAL_DIR") +fi +for tal in "${TALS[@]}"; do + PREPARE_TAL_ARGS+=(--tal "$tal") +done +python3 "$SCRIPT_DIR/prepare_tals.py" "${PREPARE_TAL_ARGS[@]}" + +cat > "$CONFIG_FILE" <> "$CONFIG_FILE" + ARGS+=(--enable-aspa) +fi +if [[ -n "$VALIDATION_TIME" && -z "${FAKETIME_LIB:-}" ]]; then + echo "warning: --validation-time is ignored for Routinator because FAKETIME_LIB is not set" >&2 +fi +VRP_ARGS=(vrps -o "$OUT_DIR/vrps.csv") +JSON_ARGS=(vrps -f jsonext -o "$OUT_DIR/routinator.json") + +/usr/bin/time -v -o "$OUT_DIR/process-time.txt" bash -c ' + set -euo pipefail + if [[ -n "$LOCAL_REPO_REPLAY_VALIDATION_TIME" && -n "${FAKETIME_LIB:-}" ]]; then + faketime_value="$(python3 - <<'"'"'PY'"'"' "$LOCAL_REPO_REPLAY_VALIDATION_TIME" +from datetime import datetime, timezone +import sys +dt = datetime.fromisoformat(sys.argv[1].replace("Z", "+00:00")).astimezone(timezone.utc) +print("@" + dt.strftime("%Y-%m-%d %H:%M:%S")) +PY +)" + export LD_PRELOAD="$FAKETIME_LIB" + export TZ=UTC + unset FAKETIME_FMT + export FAKETIME="$faketime_value" + export FAKETIME_DONT_FAKE_MONOTONIC=1 + fi + "$@" update --complete >"$LOCAL_REPO_REPLAY_OUT_DIR/update.log" 2>&1 || true + "$@" vrps -o "$LOCAL_REPO_REPLAY_OUT_DIR/vrps.csv" >"$LOCAL_REPO_REPLAY_OUT_DIR/vrps.log" 2>&1 + "$@" vrps -f jsonext -o "$LOCAL_REPO_REPLAY_OUT_DIR/routinator.json" >"$LOCAL_REPO_REPLAY_OUT_DIR/json.log" 2>&1 +' local_repo_replay "${ARGS[@]}" +python3 "$SCRIPT_DIR/normalize_rp_outputs.py" --kind routinator --input "$OUT_DIR/routinator.json" --vrps-out "$OUT_DIR/vrps.normalized.txt" --vaps-out "$OUT_DIR/vaps.normalized.txt" +python3 "$SCRIPT_DIR/summarize_replay.py" --rp routinator --out-dir "$OUT_DIR" --summary "$OUT_DIR/summary.json" +echo "done: $OUT_DIR" diff --git a/scripts/local_repo_replay/run_rpki_client_from_local_tree.sh b/scripts/local_repo_replay/run_rpki_client_from_local_tree.sh new file mode 100755 index 0000000..57e8c93 --- /dev/null +++ b/scripts/local_repo_replay/run_rpki_client_from_local_tree.sh @@ -0,0 +1,107 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + run_rpki_client_from_local_tree.sh \ + --rpki-client-bin \ + --mirror-root \ + --tal-dir | --tal ... \ + --out-dir \ + [--cache-dir ] \ + [--validation-time ] \ + [--real-rsync-bin ] \ + [--parser-workers ] + +The input mirror is prepared by the caller. This script disables RRDP, points +rpki-client's rsync command at a local URI mapper, and fetches only from the +local filesystem tree. +EOF +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RPKI_CLIENT_BIN="" +CACHE_DIR="" +MIRROR_ROOT="" +OUT_DIR="" +TAL_DIR="" +VALIDATION_TIME="" +PARSER_WORKERS="${PARSER_WORKERS:-4}" +REAL_RSYNC_BIN="${REAL_RSYNC_BIN:-/usr/bin/rsync}" +TALS=() + +while [[ $# -gt 0 ]]; do + case "$1" in + --rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;; + --mirror-root) MIRROR_ROOT="$2"; shift 2 ;; + --cache-dir) CACHE_DIR="$2"; shift 2 ;; + --tal-dir) TAL_DIR="$2"; shift 2 ;; + --tal) TALS+=("$2"); shift 2 ;; + --out-dir) OUT_DIR="$2"; shift 2 ;; + --validation-time) VALIDATION_TIME="$2"; shift 2 ;; + --real-rsync-bin) REAL_RSYNC_BIN="$2"; shift 2 ;; + --parser-workers) PARSER_WORKERS="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "unknown argument: $1" >&2; usage >&2; exit 2 ;; + esac +done + +[[ -n "$RPKI_CLIENT_BIN" && -n "$MIRROR_ROOT" && -n "$OUT_DIR" ]] || { usage >&2; exit 2; } +[[ -x "$RPKI_CLIENT_BIN" ]] || { echo "rpki-client binary not executable: $RPKI_CLIENT_BIN" >&2; exit 2; } +[[ -d "$MIRROR_ROOT" ]] || { echo "mirror root not found: $MIRROR_ROOT" >&2; exit 2; } +if [[ -z "$TAL_DIR" && "${#TALS[@]}" -eq 0 ]]; then + echo "provide --tal-dir or at least one --tal" >&2 + exit 2 +fi + +mkdir -p "$OUT_DIR" +if [[ -z "$CACHE_DIR" ]]; then + CACHE_DIR="$OUT_DIR/cache" +fi +mkdir -p "$CACHE_DIR" "$OUT_DIR/out" +chmod a+rwx "$OUT_DIR" "$CACHE_DIR" "$OUT_DIR/out" +WORK_DIR="$OUT_DIR/work" +PREPARED_TALS="$WORK_DIR/tals" +rm -rf "$WORK_DIR" +mkdir -p "$PREPARED_TALS" +PREPARE_TAL_ARGS=(--out-dir "$PREPARED_TALS" --summary "$OUT_DIR/prepared-tals.json") +if [[ -n "$TAL_DIR" ]]; then + PREPARE_TAL_ARGS+=(--tal-dir "$TAL_DIR") +fi +for tal in "${TALS[@]}"; do + PREPARE_TAL_ARGS+=(--tal "$tal") +done +python3 "$SCRIPT_DIR/prepare_tals.py" "${PREPARE_TAL_ARGS[@]}" + +CLIENT_TAL_ARGS=() +while IFS= read -r tal; do + CLIENT_TAL_ARGS+=(-t "$tal") +done < <(find "$PREPARED_TALS" -maxdepth 1 -type f -name '*.tal' | sort) + +TIME_ARGS=() +if [[ -n "$VALIDATION_TIME" ]]; then + epoch="$(python3 - <<'PY' "$VALIDATION_TIME" +from datetime import datetime, timezone +import sys +print(int(datetime.fromisoformat(sys.argv[1].replace("Z", "+00:00")).astimezone(timezone.utc).timestamp())) +PY +)" + TIME_ARGS=(-P "$epoch") +fi + +export CIR_MIRROR_ROOT="$(cd "$MIRROR_ROOT" && pwd)" +export REAL_RSYNC_BIN="$REAL_RSYNC_BIN" +export CIR_LOCAL_LINK_MODE=1 + +/usr/bin/time -v -o "$OUT_DIR/process-time.txt" \ + "$RPKI_CLIENT_BIN" \ + -R -e "$SCRIPT_DIR/cir-rsync-wrapper" -j -c -p "$PARSER_WORKERS" \ + "${TIME_ARGS[@]}" \ + "${CLIENT_TAL_ARGS[@]}" \ + -d "$CACHE_DIR" \ + "$OUT_DIR/out" >"$OUT_DIR/stdout.log" 2>"$OUT_DIR/stderr.log" + +python3 "$SCRIPT_DIR/normalize_rp_outputs.py" --kind rpki-client --input "$OUT_DIR/out" --vrps-out "$OUT_DIR/vrps.normalized.txt" --vaps-out "$OUT_DIR/vaps.normalized.txt" +python3 "$SCRIPT_DIR/summarize_replay.py" --rp rpki-client --out-dir "$OUT_DIR" --summary "$OUT_DIR/summary.json" +echo "done: $OUT_DIR" diff --git a/scripts/local_repo_replay/summarize_replay.py b/scripts/local_repo_replay/summarize_replay.py new file mode 100755 index 0000000..b2c77a3 --- /dev/null +++ b/scripts/local_repo_replay/summarize_replay.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import re +from pathlib import Path + + +def count_lines(path: Path) -> int: + if not path.is_file(): + return 0 + return sum(1 for line in path.read_text(encoding="utf-8", errors="replace").splitlines() if line.strip()) + + +def parse_time(path: Path) -> dict[str, object]: + if not path.is_file(): + return {} + text = path.read_text(encoding="utf-8", errors="replace") + elapsed = re.search(r"Elapsed \(wall clock\) time .*: (.+)", text) + rss = re.search(r"Maximum resident set size \(kbytes\): (\d+)", text) + return { + "elapsed": elapsed.group(1).strip() if elapsed else "", + "maxRssKb": int(rss.group(1)) if rss else 0, + } + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--rp", required=True) + parser.add_argument("--out-dir", type=Path, required=True) + parser.add_argument("--summary", type=Path, required=True) + args = parser.parse_args() + summary = { + "rp": args.rp, + "vrps": count_lines(args.out_dir / "vrps.normalized.txt"), + "vaps": count_lines(args.out_dir / "vaps.normalized.txt"), + "time": parse_time(args.out_dir / "process-time.txt"), + "artifacts": sorted(path.name for path in args.out_dir.iterdir() if path.is_file()), + } + args.summary.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") + print(args.summary) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/local_repo_replay/templates/README.md b/scripts/local_repo_replay/templates/README.md new file mode 100644 index 0000000..f332f69 --- /dev/null +++ b/scripts/local_repo_replay/templates/README.md @@ -0,0 +1,118 @@ +# Local Repository Tree Replay Package + +This package replays already prepared local RPKI repository trees with +Routinator and rpki-client. + +It is intentionally independent from CIR: + +- it does not read `.cir`; +- it does not read `repo-bytes.db`; +- it does not call `cir_materialize`; +- it does not generate a local repository tree. + +The caller must prepare the local repository/cache tree before running these +scripts. + +## Contents + +```text +local-repo-replay-package/ + scripts/ + run_routinator_from_local_tree.sh + run_rpki_client_from_local_tree.sh + run_dual_local_tree_replay.sh + prepare_tals.py + cir-rsync-wrapper + cir-local-link-sync.py + normalize_rp_outputs.py + compare_normalized_sets.py + summarize_replay.py + docs/ + input_tree_requirements.md + offline_replay_limits.md + output_files.md + examples/ + routinator_example.sh + rpki_client_example.sh + dual_compare_example.sh + env.example +``` + +## Routinator replay + +```bash +./scripts/run_routinator_from_local_tree.sh \ + --routinator-bin /opt/routinator/target/release/routinator \ + --mirror-root /data/replay/mirror \ + --tal-dir /data/replay/tals \ + --out-dir /data/replay/out/routinator \ + --enable-aspa +``` + +The script uses `--disable-rrdp`, `--rsync-command ./scripts/cir-rsync-wrapper`, +and the local mirror root to satisfy rsync fetches from the local filesystem. +The wrapper name is historical; in this package it is only a generic +`rsync://` to local-path mapper. + +If `--validation-time` is needed for Routinator, set `FAKETIME_LIB` to a working +libfaketime shared library. Otherwise Routinator validates at wall-clock time. + +On Ubuntu, install and use faketime like this: + +```bash +sudo apt-get install -y libfaketime +export FAKETIME_LIB=/usr/lib/x86_64-linux-gnu/faketime/libfaketime.so.1 +./scripts/run_routinator_from_local_tree.sh \ + --routinator-bin /opt/routinator/target/release/routinator \ + --mirror-root /data/replay/mirror \ + --tal-dir /data/replay/tals \ + --out-dir /data/replay/out/routinator \ + --validation-time 2026-05-14T06:48:00Z \ + --enable-aspa +``` + +Without `FAKETIME_LIB`, old local trees can produce empty or smaller output +because Routinator validates manifests and CRLs against current wall-clock time. + +## rpki-client replay + +```bash +./scripts/run_rpki_client_from_local_tree.sh \ + --rpki-client-bin /opt/rpki-client/src/rpki-client \ + --mirror-root /data/replay/mirror \ + --tal-dir /data/replay/tals \ + --out-dir /data/replay/out/rpki-client \ + --parser-workers 4 +``` + +The script uses `rpki-client -R -e ./scripts/cir-rsync-wrapper` so RRDP is +disabled and rsync fetches are served from the local mirror. `--cache-dir` is an +optional working cache directory used by rpki-client during this local replay. + +## Dual replay + +```bash +./scripts/run_dual_local_tree_replay.sh \ + --routinator-bin /opt/routinator/target/release/routinator \ + --routinator-mirror-root /data/replay/mirror \ + --rpki-client-bin /opt/rpki-client/src/rpki-client \ + --rpki-client-mirror-root /data/replay/mirror \ + --tal-dir /data/replay/tals \ + --out-dir /data/replay/out/dual +``` + +If `--validation-time` is passed to dual replay, remember to export +`FAKETIME_LIB` first so Routinator and rpki-client use the same logical +validation time. + +## Outputs + +Each run writes normalized output: + +- `vrps.normalized.txt` +- `vaps.normalized.txt` +- `summary.json` +- raw RP output and logs +- `process-time.txt` + +See `docs/output_files.md`. diff --git a/scripts/local_repo_replay/templates/docs/input_tree_requirements.md b/scripts/local_repo_replay/templates/docs/input_tree_requirements.md new file mode 100644 index 0000000..cb2a0ac --- /dev/null +++ b/scripts/local_repo_replay/templates/docs/input_tree_requirements.md @@ -0,0 +1,39 @@ +# Input Tree Requirements + +The input tree is not part of this package. The caller must prepare it before +running replay. + +## Mirror root + +The mirror root must map rsync URIs to local paths: + +```text +rsync://rpki.example.net/repo/a/b/c.roa +=> /rpki.example.net/repo/a/b/c.roa +``` + +The tree must contain all objects needed by the selected TALs: TA certificates, +manifests, CRLs, ROAs, ASPAs, router certs, and child CA certificates. + +Both Routinator and rpki-client scripts consume this same mirror root through a +local rsync command wrapper. + +## rpki-client working cache + +For rpki-client replay, `--cache-dir` is only rpki-client's working cache +directory for this local run. It is not the input dataset. The authoritative +input is `--mirror-root`. + +## TALs + +Provide either `--tal-dir ` or repeated `--tal `. + +The scripts prepare a replay-local TAL copy that prefers `rsync://` TA +certificate URIs. This prevents a TAL with an HTTPS URI listed first from +escaping to the network during local replay. The TAL set should match the local +tree. Mixing a tree from one run with different TALs may produce meaningless +differences. + +## No generation + +This package does not generate the tree and does not repair missing objects. diff --git a/scripts/local_repo_replay/templates/docs/offline_replay_limits.md b/scripts/local_repo_replay/templates/docs/offline_replay_limits.md new file mode 100644 index 0000000..5065791 --- /dev/null +++ b/scripts/local_repo_replay/templates/docs/offline_replay_limits.md @@ -0,0 +1,38 @@ +# Offline Replay Limits + +## Routinator + +The Routinator script disables RRDP and uses an rsync command wrapper to map +rsync URLs to local paths. It still runs Routinator's normal validation logic. + +If the local mirror does not contain required objects, validation can fail or +produce fewer outputs. + +## rpki-client + +The rpki-client script uses `-R` to disable RRDP and `-e` to point rsync at the +local mapper. rpki-client still builds its normal working cache, but every +rsync source is rewritten to the local mirror. + +If the mirror was incomplete or produced by a different TAL set, replay results +may differ from the original run. + +## Validation time + +rpki-client supports `-P `. Routinator does not expose the same +simple command-line evaluation-time option in the tested version; if `FAKETIME_LIB` +is configured, the script can run Routinator under faketime. Without +`FAKETIME_LIB`, `--validation-time` is intentionally ignored for Routinator and +current wall-clock validation can reject stale manifests or CRLs. + +Ubuntu example: + +```bash +sudo apt-get install -y libfaketime +export FAKETIME_LIB=/usr/lib/x86_64-linux-gnu/faketime/libfaketime.so.1 +``` + +The script sets `TZ=UTC` and converts RFC3339 validation time to libfaketime +absolute UTC format, for example `2026-05-14T06:48:00Z` becomes +`@2026-05-14 06:48:00`. Setting `TZ=UTC` is required because libfaketime parses +absolute timestamps in the process timezone. diff --git a/scripts/local_repo_replay/templates/docs/output_files.md b/scripts/local_repo_replay/templates/docs/output_files.md new file mode 100644 index 0000000..f863059 --- /dev/null +++ b/scripts/local_repo_replay/templates/docs/output_files.md @@ -0,0 +1,16 @@ +# Output Files + +Each replay output directory can contain: + +- `vrps.normalized.txt`: one normalized VRP per line. +- `vaps.normalized.txt`: one normalized VAP/ASPA per line. +- `summary.json`: counts and resource summary. +- `process-time.txt`: `/usr/bin/time -v` output. +- RP-specific raw output: + - Routinator: `routinator.json`, `vrps.csv`. + - rpki-client: `out/json`, `out/csv`, and other native files. +- logs: + - `stdout.log` / `stderr.log` for rpki-client. + - `update.log`, `vrps.log`, `json.log` for Routinator. + +Dual replay additionally writes `compare-summary.json`. diff --git a/scripts/local_repo_replay/templates/examples/dual_compare_example.sh b/scripts/local_repo_replay/templates/examples/dual_compare_example.sh new file mode 100755 index 0000000..386b1b6 --- /dev/null +++ b/scripts/local_repo_replay/templates/examples/dual_compare_example.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +set -euo pipefail + +./scripts/run_dual_local_tree_replay.sh \ + --routinator-bin "${ROUTINATOR_BIN:-/opt/routinator/target/release/routinator}" \ + --routinator-mirror-root "${ROUTINATOR_MIRROR_ROOT:-/data/replay/mirror}" \ + --rpki-client-bin "${RPKI_CLIENT_BIN:-/opt/rpki-client/src/rpki-client}" \ + --rpki-client-mirror-root "${RPKI_CLIENT_MIRROR_ROOT:-/data/replay/mirror}" \ + --rpki-client-cache-dir "${RPKI_CLIENT_CACHE_DIR:-/data/replay/work/rpki-client-cache}" \ + --tal-dir "${TAL_DIR:-/data/replay/tals}" \ + --out-dir "${OUT_DIR:-/data/replay/out/dual}" diff --git a/scripts/local_repo_replay/templates/examples/routinator_example.sh b/scripts/local_repo_replay/templates/examples/routinator_example.sh new file mode 100755 index 0000000..b84620b --- /dev/null +++ b/scripts/local_repo_replay/templates/examples/routinator_example.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +./scripts/run_routinator_from_local_tree.sh \ + --routinator-bin "${ROUTINATOR_BIN:-/opt/routinator/target/release/routinator}" \ + --mirror-root "${MIRROR_ROOT:-/data/replay/mirror}" \ + --tal-dir "${TAL_DIR:-/data/replay/tals}" \ + --out-dir "${OUT_DIR:-/data/replay/out/routinator}" \ + --enable-aspa diff --git a/scripts/local_repo_replay/templates/examples/rpki_client_example.sh b/scripts/local_repo_replay/templates/examples/rpki_client_example.sh new file mode 100755 index 0000000..5cbac4a --- /dev/null +++ b/scripts/local_repo_replay/templates/examples/rpki_client_example.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +./scripts/run_rpki_client_from_local_tree.sh \ + --rpki-client-bin "${RPKI_CLIENT_BIN:-/opt/rpki-client/src/rpki-client}" \ + --mirror-root "${MIRROR_ROOT:-/data/replay/mirror}" \ + --cache-dir "${RPKI_CLIENT_CACHE_DIR:-/data/replay/work/rpki-client-cache}" \ + --tal-dir "${TAL_DIR:-/data/replay/tals}" \ + --out-dir "${OUT_DIR:-/data/replay/out/rpki-client}" \ + --parser-workers "${PARSER_WORKERS:-4}" diff --git a/scripts/soak/portable-soak.env.example b/scripts/soak/portable-soak.env.example index 6748667..bc388f9 100644 --- a/scripts/soak/portable-soak.env.example +++ b/scripts/soak/portable-soak.env.example @@ -2,6 +2,7 @@ # 复制为 .env 后可以在远端直接调整;所有路径默认相对 package 根目录。 # 最大运行轮次。重复执行 run_soak.sh 时会从已有最后一轮之后继续编号。 +# 正整数表示固定运行轮次;负数(例如 -1)表示持续运行不自动停止;0 非法。 MAX_RUNS=3 # 两轮之间等待秒数。做连续无等待验收时设置为 0。 @@ -15,7 +16,7 @@ RIRS=afrinic,apnic,arin,lacnic,ripe # 运行根目录。默认使用 package 根目录;如需把产物写到独立数据盘,可改成绝对路径。 RUN_ROOT="${PACKAGE_ROOT}" -# 保留最近多少轮 run 目录。旧 run 会由 rpki_daemon 自身或后续脚本策略清理。 +# 保留最近多少轮 run 目录。持续运行模式建议设置为 5 或按磁盘容量评估。 RETAIN_RUNS=10 # 是否输出 compact report JSON。1 表示启用,0 表示关闭。 diff --git a/scripts/soak/run_soak.sh b/scripts/soak/run_soak.sh index fc564cb..df771bb 100755 --- a/scripts/soak/run_soak.sh +++ b/scripts/soak/run_soak.sh @@ -77,6 +77,11 @@ validate_non_negative_int() { [[ "$value" =~ ^[0-9]+$ ]] || die "$name must be an integer: $value" } +validate_max_runs() { + [[ "$MAX_RUNS" =~ ^-?[0-9]+$ ]] || die "MAX_RUNS must be an integer: $MAX_RUNS" + [[ "$MAX_RUNS" != "0" ]] || die "MAX_RUNS must be non-zero; use a positive value for fixed runs or -1 for continuous mode" +} + validate_rsync_scope() { case "$RSYNC_SCOPE" in publication-point|module-root) @@ -486,20 +491,30 @@ PY apply_outer_retention() { local dirs=() + local retain_limit="$RETAIN_RUNS" + local keep_run="${1:-}" local run_dir shopt -s nullglob for run_dir in "$RUNS_ROOT"/run_[0-9][0-9][0-9][0-9]; do [[ -d "$run_dir" ]] && dirs+=("$run_dir") done shopt -u nullglob - if (( ${#dirs[@]} <= RETAIN_RUNS )); then + if (( ${#dirs[@]} <= retain_limit )); then return 0 fi mapfile -t dirs < <(printf '%s\n' "${dirs[@]}" | sort) - local remove_count=$(( ${#dirs[@]} - RETAIN_RUNS )) - local index - for (( index = 0; index < remove_count; index++ )); do - rm -rf "${dirs[$index]}" + local remove_count=$(( ${#dirs[@]} - retain_limit )) + local removed_count=0 + local candidate + for candidate in "${dirs[@]}"; do + if [[ -n "$keep_run" && "$(basename "$candidate")" == "$keep_run" ]]; then + continue + fi + rm -rf "$candidate" + removed_count=$((removed_count + 1)) + if (( removed_count >= remove_count )); then + break + fi done } @@ -519,6 +534,7 @@ run_one_round() { local summary_state mkdir -p "$run_dir" "$daemon_state_root" + apply_outer_retention "$run_id" started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)" write_run_meta "$run_dir/run-meta.json" "running" "$run_index" "$run_id" "$sync_mode" \ "$snapshot_reason" "$previous_run_id" "$previous_success_value" "$started_at" "" \ @@ -573,7 +589,7 @@ main() { require_command python3 require_command date require_command find - validate_positive_int "MAX_RUNS" "$MAX_RUNS" + validate_max_runs validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS" validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS" validate_rsync_scope @@ -599,12 +615,20 @@ main() { local max_index local next_index + local run_forever=0 + local stop_index=0 max_index="$(max_existing_run_index)" next_index=$((max_index + 1)) - local stop_index=$((max_index + MAX_RUNS)) + if (( MAX_RUNS < 0 )); then + run_forever=1 + echo "run_soak mode=continuous max_existing_run_index=$max_index next_run=$(printf 'run_%04d' "$next_index")" + else + stop_index=$((max_index + MAX_RUNS)) + echo "run_soak mode=fixed max_existing_run_index=$max_index next_run=$(printf 'run_%04d' "$next_index") stop_run=$(printf 'run_%04d' "$stop_index")" + fi local any_failed=0 - while (( next_index <= stop_index )); do + while (( run_forever == 1 || next_index <= stop_index )); do INVALID_DB_PATH="" INVALID_STATE_PATH="" INVALID_TMP_PATH="" @@ -649,7 +673,7 @@ main() { echo "completed run $(printf 'run_%04d' "$next_index") status=failed" >&2 any_failed=1 fi - if (( next_index < stop_index && INTERVAL_SECS > 0 )); then + if (( (run_forever == 1 || next_index < stop_index) && INTERVAL_SECS > 0 )); then sleep "$INTERVAL_SECS" fi next_index=$((next_index + 1)) diff --git a/src/bin/rpki_artifact_metrics.rs b/src/bin/rpki_artifact_metrics.rs new file mode 100644 index 0000000..ed51248 --- /dev/null +++ b/src/bin/rpki_artifact_metrics.rs @@ -0,0 +1,2037 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::fs; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use rpki::ccr::decode_content_info; +use rpki::cir::decode_cir; +use serde::Serialize; +use serde_json::{Value, json}; +use sha2::{Digest, Sha256}; + +const LARGE_PP_OBJECT_THRESHOLDS: &[u64] = &[10, 50, 100, 500, 1000, 5000, 10000, 50000]; +const PP_SYNC_SECONDS_BUCKETS: &[f64] = &[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0]; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct Args { + run_root: PathBuf, + listen: String, + poll_secs: u64, + instance: String, + once: bool, + out_metrics: Option, + out_status: Option, +} + +fn usage() -> &'static str { + "Usage: rpki_artifact_metrics --run-root [--listen ] [--poll-secs ] [--instance ] [--once] [--out-metrics ] [--out-status ]" +} + +fn main() { + if let Err(err) = real_main() { + eprintln!("{err}"); + std::process::exit(1); + } +} + +fn real_main() -> Result<(), String> { + let args = parse_args(&std::env::args().collect::>())?; + if args.once { + let snapshot = scan_run_root(&args.run_root, &args.instance)?; + let metrics = render_metrics(&snapshot); + let status = render_status_json(&snapshot)?; + if let Some(path) = args.out_metrics.as_ref() { + write_file(path, metrics.as_bytes())?; + } else { + print!("{metrics}"); + } + if let Some(path) = args.out_status.as_ref() { + write_file(path, status.as_bytes())?; + } + return Ok(()); + } + + let shared = Arc::new(RwLock::new(scan_run_root(&args.run_root, &args.instance)?)); + let scanner = Arc::clone(&shared); + let run_root = args.run_root.clone(); + let instance = args.instance.clone(); + let poll_secs = args.poll_secs.max(1); + thread::spawn(move || { + loop { + thread::sleep(Duration::from_secs(poll_secs)); + let next = match scan_run_root(&run_root, &instance) { + Ok(snapshot) => snapshot, + Err(err) => { + let mut previous = scanner.write().expect("metrics lock poisoned"); + previous + .service + .parse_errors + .push(format!("scan failed: {err}")); + previous.service.last_reload_success = false; + previous.service.last_scan_timestamp_seconds = unix_now_seconds(); + continue; + } + }; + *scanner.write().expect("metrics lock poisoned") = next; + } + }); + + serve_http(&args.listen, shared) +} + +fn parse_args(argv: &[String]) -> Result { + let mut run_root = None; + let mut listen = "127.0.0.1:9556".to_string(); + let mut poll_secs = 10u64; + let mut instance = "ours-rp".to_string(); + let mut once = false; + let mut out_metrics = None; + let mut out_status = None; + let mut index = 1usize; + while index < argv.len() { + match argv[index].as_str() { + "--run-root" => { + index += 1; + run_root = Some(PathBuf::from(value_at(argv, index, "--run-root")?)); + } + "--listen" => { + index += 1; + listen = value_at(argv, index, "--listen")?.to_string(); + } + "--poll-secs" => { + index += 1; + let value = value_at(argv, index, "--poll-secs")?; + poll_secs = value + .parse::() + .map_err(|_| format!("invalid --poll-secs: {value}"))?; + } + "--instance" => { + index += 1; + instance = value_at(argv, index, "--instance")?.to_string(); + } + "--once" => once = true, + "--out-metrics" => { + index += 1; + out_metrics = Some(PathBuf::from(value_at(argv, index, "--out-metrics")?)); + } + "--out-status" => { + index += 1; + out_status = Some(PathBuf::from(value_at(argv, index, "--out-status")?)); + } + "-h" | "--help" => return Err(usage().to_string()), + other => return Err(format!("unknown argument: {other}\n{}", usage())), + } + index += 1; + } + Ok(Args { + run_root: run_root.ok_or_else(|| format!("--run-root is required\n{}", usage()))?, + listen, + poll_secs, + instance, + once, + out_metrics, + out_status, + }) +} + +fn value_at<'a>(argv: &'a [String], index: usize, flag: &str) -> Result<&'a str, String> { + argv.get(index) + .map(|s| s.as_str()) + .ok_or_else(|| format!("{flag} requires a value")) +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct MetricsSnapshot { + instance: String, + service: ServiceMetrics, + runs: RunScanSummary, + latest_run: Option, + cumulative: CumulativeMetrics, + repo_stats: Vec, + object_counts: BTreeMap<(String, String), u64>, + large_pp_counts: BTreeMap, + pp_sync_histograms: BTreeMap, + top_repos_by_sync_duration: Vec, + top_pp_by_object_count: Vec, + top_pp_by_sync_duration: Vec, + cir: Option, + ccr: Option, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct ServiceMetrics { + last_scan_timestamp_seconds: f64, + last_scan_duration_seconds: f64, + last_reload_success: bool, + parse_errors: Vec, + run_root: String, + runs_root: String, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct RunScanSummary { + known: u64, + success: u64, + failed: u64, + partial: u64, + consecutive_failures: u64, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct CumulativeMetrics { + completed_success_total: u64, + completed_failed_total: u64, + observed_duration_seconds_sum: f64, + observed_duration_seconds_count: u64, + observed_download_bytes_total: u64, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct LatestRunMetrics { + run_seq: u64, + run_id: String, + run_dir: String, + status: String, + sync_mode: String, + snapshot_reason: Option, + started_at: Option, + finished_at: Option, + start_timestamp_seconds: Option, + finish_timestamp_seconds: Option, + wall_seconds: f64, + user_cpu_seconds: Option, + system_cpu_seconds: Option, + cpu_percent: Option, + max_rss_bytes: Option, + exit_code: Option, + vrps: u64, + vaps: u64, + publication_points: u64, + warnings: u64, + tree_instances_processed: Option, + tree_instances_failed: Option, + stage_seconds: BTreeMap, + repo_sync_phase: BTreeMap, + repo_terminal_state: BTreeMap, + download_events: Option, + download_bytes: Option, + artifact_sizes: BTreeMap, + state_path_sizes: BTreeMap, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct CountDuration { + count: u64, + duration_seconds_total: f64, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct PathSize { + total_size_bytes: u64, + file_count: u64, + dir_count: u64, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct RepoMetrics { + repo_id: String, + uri: String, + host: String, + transport: String, + publication_points: u64, + duration_seconds_sum: f64, + duration_seconds_max: f64, + duration_seconds_avg: f64, + phase_counts: BTreeMap, + terminal_state_counts: BTreeMap, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct TopRepo { + rank: usize, + repo_id: String, + uri: String, + host: String, + transport: String, + duration_ms_max: u64, + duration_ms_sum: u64, + publication_points: u64, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct TopPublicationPoint { + rank: usize, + pp_id: String, + repo_id: String, + uri: String, + repo_uri: String, + host: String, + transport: String, + object_count: u64, + sync_duration_ms: u64, + terminal_state: String, + phase: String, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct Histogram { + buckets: Vec, + counts: Vec, + sum: f64, + count: u64, +} + +impl Default for Histogram { + fn default() -> Self { + Self { + buckets: Vec::new(), + counts: Vec::new(), + sum: 0.0, + count: 0, + } + } +} + +impl Histogram { + fn new(buckets: &[f64]) -> Self { + Self { + buckets: buckets.to_vec(), + counts: vec![0; buckets.len() + 1], + sum: 0.0, + count: 0, + } + } + + fn observe(&mut self, value: f64) { + self.sum += value; + self.count += 1; + let mut placed = false; + for (index, bucket) in self.buckets.iter().enumerate() { + if value <= *bucket { + self.counts[index] += 1; + placed = true; + break; + } + } + if !placed { + let last = self.counts.len() - 1; + self.counts[last] += 1; + } + } + + fn cumulative_counts(&self) -> Vec { + let mut out = Vec::with_capacity(self.counts.len()); + let mut running = 0u64; + for count in &self.counts { + running += *count; + out.push(running); + } + out + } +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct CirMetrics { + version: u32, + objects: u64, + trust_anchors: u64, + rejected_objects: u64, + reject_list_sha256: String, + objects_by_type: BTreeMap, + rejected_objects_by_type: BTreeMap, +} + +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct CcrMetrics { + version: u32, + state_present: BTreeMap, + state_items: BTreeMap, + state_digests: BTreeMap, +} + +#[derive(Clone, Debug)] +struct RunRecord { + path: PathBuf, + status: String, + summary: Option, + meta: Option, +} + +fn scan_run_root(input_root: &Path, instance: &str) -> Result { + let started = Instant::now(); + let runs_root = resolve_runs_root(input_root); + let mut snapshot = MetricsSnapshot { + instance: instance.to_string(), + service: ServiceMetrics { + run_root: input_root.display().to_string(), + runs_root: runs_root.display().to_string(), + ..ServiceMetrics::default() + }, + ..MetricsSnapshot::default() + }; + + let records = collect_run_records(&runs_root, &mut snapshot.service.parse_errors)?; + snapshot.runs.known = records.len() as u64; + for record in &records { + match record.status.as_str() { + "success" => snapshot.runs.success += 1, + "failed" | "spawn_failed" => snapshot.runs.failed += 1, + _ => snapshot.runs.partial += 1, + } + } + snapshot.runs.consecutive_failures = consecutive_failures(&records); + snapshot.cumulative.completed_success_total = snapshot.runs.success; + snapshot.cumulative.completed_failed_total = snapshot.runs.failed; + + for record in records.iter().filter(|record| record.status == "success") { + if let Some(summary) = record.summary.as_ref() { + let wall_seconds = json_u64(summary, &["wallMs"]).unwrap_or(0) as f64 / 1000.0; + snapshot.cumulative.observed_duration_seconds_sum += wall_seconds; + snapshot.cumulative.observed_duration_seconds_count += 1; + if let Some(bytes) = json_u64(summary, &["stageTiming", "download_bytes_total"]) { + snapshot.cumulative.observed_download_bytes_total = snapshot + .cumulative + .observed_download_bytes_total + .saturating_add(bytes); + } + } + } + + if let Some(latest) = records + .iter() + .rev() + .find(|record| record.status == "success") + { + build_latest_metrics(latest, &mut snapshot); + } + + snapshot.service.last_scan_timestamp_seconds = unix_now_seconds(); + snapshot.service.last_scan_duration_seconds = started.elapsed().as_secs_f64(); + snapshot.service.last_reload_success = snapshot.service.parse_errors.is_empty(); + Ok(snapshot) +} + +fn resolve_runs_root(input_root: &Path) -> PathBuf { + let runs = input_root.join("runs"); + if runs.is_dir() { + runs + } else { + input_root.to_path_buf() + } +} + +fn collect_run_records( + runs_root: &Path, + errors: &mut Vec, +) -> Result, String> { + let mut records = Vec::new(); + if !runs_root.is_dir() { + return Err(format!( + "runs root is not a directory: {}", + runs_root.display() + )); + } + let entries = fs::read_dir(runs_root) + .map_err(|e| format!("read runs root failed: {}: {e}", runs_root.display()))?; + for entry in entries { + let entry = entry.map_err(|e| format!("read runs entry failed: {e}"))?; + let path = entry.path(); + if !path.is_dir() { + continue; + } + let Some(name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + if !name.starts_with("run_") { + continue; + } + let summary = read_json_optional(&path.join("run-summary.json"), errors); + let meta = read_json_optional(&path.join("run-meta.json"), errors); + let status = classify_run_status(&summary, &meta, &path); + records.push(RunRecord { + path, + status, + summary, + meta, + }); + } + records.sort_by(|left, right| left.path.cmp(&right.path)); + Ok(records) +} + +fn classify_run_status(summary: &Option, meta: &Option, path: &Path) -> String { + let summary_status = summary.as_ref().and_then(|v| json_str(v, &["status"])); + let meta_status = meta.as_ref().and_then(|v| json_str(v, &["status"])); + if summary_status == Some("success") && meta_status == Some("success") { + return "success".to_string(); + } + if matches!(summary_status, Some("failed" | "spawn_failed")) + || matches!(meta_status, Some("failed" | "spawn_failed")) + { + return "failed".to_string(); + } + if path.join("run-summary.json").exists() || path.join("run-meta.json").exists() { + "partial".to_string() + } else { + "missing_metadata".to_string() + } +} + +fn consecutive_failures(records: &[RunRecord]) -> u64 { + let mut count = 0u64; + for record in records.iter().rev() { + if record.status == "success" { + break; + } + count += 1; + } + count +} + +fn read_json_optional(path: &Path, errors: &mut Vec) -> Option { + if !path.exists() { + return None; + } + match fs::read(path) + .ok() + .and_then(|bytes| serde_json::from_slice::(&bytes).ok()) + { + Some(value) => Some(value), + None => { + errors.push(format!("parse json failed: {}", path.display())); + None + } + } +} + +fn build_latest_metrics(record: &RunRecord, snapshot: &mut MetricsSnapshot) { + let summary = record.summary.as_ref(); + let meta = record.meta.as_ref(); + let run_seq = summary + .and_then(|v| json_u64(v, &["runSeq"])) + .or_else(|| meta.and_then(|v| json_u64(v, &["run_index"]))) + .unwrap_or_else(|| run_index_from_path(&record.path).unwrap_or(0)); + let run_id = summary + .and_then(|v| json_str(v, &["runId"])) + .or_else(|| meta.and_then(|v| json_str(v, &["run_id"]))) + .unwrap_or_else(|| { + record + .path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + }) + .to_string(); + let sync_mode = meta + .and_then(|v| json_str(v, &["sync_mode"])) + .unwrap_or("unknown") + .to_string(); + let snapshot_reason = meta + .and_then(|v| json_str(v, &["snapshot_reason"])) + .map(|s| s.to_string()); + let started_at = summary + .and_then(|v| json_str(v, &["startedAtRfc3339Utc"])) + .or_else(|| meta.and_then(|v| json_str(v, &["started_at_rfc3339_utc"]))) + .map(|s| s.to_string()); + let finished_at = summary + .and_then(|v| json_str(v, &["finishedAtRfc3339Utc"])) + .or_else(|| meta.and_then(|v| json_str(v, &["completed_at_rfc3339_utc"]))) + .map(|s| s.to_string()); + let wall_seconds = summary.and_then(|v| json_u64(v, &["wallMs"])).unwrap_or(0) as f64 / 1000.0; + + let mut latest = LatestRunMetrics { + run_seq, + run_id, + run_dir: record.path.display().to_string(), + status: record.status.clone(), + sync_mode, + snapshot_reason, + started_at: started_at.clone(), + finished_at: finished_at.clone(), + start_timestamp_seconds: started_at.as_deref().and_then(parse_rfc3339_to_unix), + finish_timestamp_seconds: finished_at.as_deref().and_then(parse_rfc3339_to_unix), + wall_seconds, + user_cpu_seconds: summary.and_then(|v| json_f64(v, &["processMetrics", "userSeconds"])), + system_cpu_seconds: summary.and_then(|v| json_f64(v, &["processMetrics", "systemSeconds"])), + cpu_percent: summary.and_then(|v| json_f64(v, &["processMetrics", "cpuPercent"])), + max_rss_bytes: summary + .and_then(|v| json_u64(v, &["processMetrics", "maxRssKb"])) + .map(|kb| kb.saturating_mul(1024)), + exit_code: summary.and_then(|v| json_i64(v, &["exitCode"])), + ..LatestRunMetrics::default() + }; + + if let Some(summary) = summary { + latest.vrps = json_u64(summary, &["reportCounts", "vrps"]).unwrap_or(0); + latest.vaps = json_u64(summary, &["reportCounts", "aspas"]).unwrap_or(0); + latest.publication_points = + json_u64(summary, &["reportCounts", "publicationPoints"]).unwrap_or(0); + latest.warnings = json_u64(summary, &["reportCounts", "warnings"]).unwrap_or(0); + latest.tree_instances_processed = + json_u64(summary, &["reportCounts", "treeInstancesProcessed"]); + latest.tree_instances_failed = json_u64(summary, &["reportCounts", "treeInstancesFailed"]); + latest.stage_seconds = extract_stage_seconds(summary.get("stageTiming")); + latest.repo_sync_phase = + extract_count_duration_map(summary.pointer("/repoSyncStats/by_phase")); + latest.repo_terminal_state = + extract_count_duration_map(summary.pointer("/repoSyncStats/by_terminal_state")); + latest.download_events = json_u64(summary, &["stageTiming", "download_event_count"]); + latest.download_bytes = json_u64(summary, &["stageTiming", "download_bytes_total"]); + latest.artifact_sizes = extract_artifact_sizes(summary.get("artifacts")); + latest.state_path_sizes = extract_path_sizes(summary.get("pathStats")); + } + + parse_report(&record.path.join("report.json"), snapshot, &mut latest); + parse_cir(&record.path.join("input.cir"), snapshot); + parse_ccr(&record.path.join("result.ccr"), snapshot); + snapshot.latest_run = Some(latest); +} + +fn parse_report(path: &Path, snapshot: &mut MetricsSnapshot, latest: &mut LatestRunMetrics) { + if !path.exists() { + return; + } + let Ok(bytes) = fs::read(path) else { + snapshot + .service + .parse_errors + .push(format!("read report.json failed: {}", path.display())); + return; + }; + let Ok(report) = serde_json::from_slice::(&bytes) else { + snapshot + .service + .parse_errors + .push(format!("parse report.json failed: {}", path.display())); + return; + }; + if latest.vrps == 0 { + latest.vrps = report + .get("vrps") + .and_then(|v| v.as_array()) + .map(|a| a.len() as u64) + .unwrap_or(0); + } + if latest.vaps == 0 { + latest.vaps = report + .get("aspas") + .and_then(|v| v.as_array()) + .map(|a| a.len() as u64) + .unwrap_or(0); + } + latest.warnings = latest.warnings.max( + report + .pointer("/tree/warnings") + .and_then(|v| v.as_array()) + .map(|a| a.len() as u64) + .unwrap_or(0), + ); + if let Some(processed) = json_u64(&report, &["tree", "instances_processed"]) { + latest.tree_instances_processed = Some(processed); + } + if let Some(failed) = json_u64(&report, &["tree", "instances_failed"]) { + latest.tree_instances_failed = Some(failed); + } + if latest.repo_sync_phase.is_empty() { + latest.repo_sync_phase = + extract_count_duration_map(report.pointer("/repo_sync_stats/by_phase")); + } + if latest.repo_terminal_state.is_empty() { + latest.repo_terminal_state = + extract_count_duration_map(report.pointer("/repo_sync_stats/by_terminal_state")); + } + if let Some(pps) = report.get("publication_points").and_then(|v| v.as_array()) { + latest.publication_points = pps.len() as u64; + extract_publication_point_metrics(pps, snapshot); + } +} + +fn extract_publication_point_metrics(pps: &[Value], snapshot: &mut MetricsSnapshot) { + let mut repos: BTreeMap = BTreeMap::new(); + let mut pp_by_object_count = Vec::::new(); + let mut pp_by_sync_duration = Vec::::new(); + let mut large_pp_counts = BTreeMap::::new(); + let mut pp_sync_histograms = BTreeMap::::new(); + let mut object_counts = BTreeMap::<(String, String), u64>::new(); + + for pp in pps { + let pp_uri = json_str(pp, &["publication_point_rsync_uri"]) + .or_else(|| json_str(pp, &["manifest_rsync_uri"])) + .or_else(|| json_str(pp, &["rsync_base_uri"])) + .unwrap_or("unknown"); + let repo_uri = json_str(pp, &["rrdp_notification_uri"]) + .or_else(|| json_str(pp, &["rsync_base_uri"])) + .or_else(|| json_str(pp, &["publication_point_rsync_uri"])) + .unwrap_or(pp_uri); + let repo_id = short_sha256(repo_uri); + let pp_id = short_sha256(pp_uri); + let host = uri_host(repo_uri); + let transport = json_str(pp, &["repo_sync_source"]) + .or_else(|| json_str(pp, &["source"])) + .map(normalize_transport) + .unwrap_or_else(|| infer_transport(repo_uri)); + let duration_ms = json_u64(pp, &["repo_sync_duration_ms"]).unwrap_or(0); + let duration_seconds = duration_ms as f64 / 1000.0; + let phase = json_str(pp, &["repo_sync_phase"]) + .unwrap_or("unknown") + .to_string(); + let terminal_state = json_str(pp, &["repo_terminal_state"]) + .unwrap_or("unknown") + .to_string(); + let object_count = pp + .get("objects") + .and_then(|v| v.as_array()) + .map(|a| a.len() as u64) + .unwrap_or(0); + + let repo = repos.entry(repo_id.clone()).or_insert_with(|| RepoMetrics { + repo_id: repo_id.clone(), + uri: repo_uri.to_string(), + host: host.clone(), + transport: transport.clone(), + ..RepoMetrics::default() + }); + repo.publication_points += 1; + repo.duration_seconds_sum += duration_seconds; + repo.duration_seconds_max = repo.duration_seconds_max.max(duration_seconds); + *repo.phase_counts.entry(phase.clone()).or_default() += 1; + *repo + .terminal_state_counts + .entry(terminal_state.clone()) + .or_default() += 1; + + for threshold in LARGE_PP_OBJECT_THRESHOLDS { + if object_count > *threshold { + *large_pp_counts.entry(*threshold).or_default() += 1; + } + } + pp_sync_histograms + .entry(transport.clone()) + .or_insert_with(|| Histogram::new(PP_SYNC_SECONDS_BUCKETS)) + .observe(duration_seconds); + + if let Some(objects) = pp.get("objects").and_then(|v| v.as_array()) { + for object in objects { + let kind = json_str(object, &["kind"]).unwrap_or("unknown").to_string(); + let result = json_str(object, &["result"]) + .unwrap_or("unknown") + .to_string(); + *object_counts.entry((kind, result)).or_default() += 1; + } + } + + let top = TopPublicationPoint { + rank: 0, + pp_id, + repo_id, + uri: pp_uri.to_string(), + repo_uri: repo_uri.to_string(), + host, + transport, + object_count, + sync_duration_ms: duration_ms, + terminal_state, + phase, + }; + pp_by_object_count.push(top.clone()); + pp_by_sync_duration.push(top); + } + + let mut repo_stats = repos.into_values().collect::>(); + for repo in &mut repo_stats { + if repo.publication_points > 0 { + repo.duration_seconds_avg = repo.duration_seconds_sum / repo.publication_points as f64; + } + } + let mut top_repos = repo_stats + .iter() + .map(|repo| TopRepo { + rank: 0, + repo_id: repo.repo_id.clone(), + uri: repo.uri.clone(), + host: repo.host.clone(), + transport: repo.transport.clone(), + duration_ms_max: (repo.duration_seconds_max * 1000.0).round() as u64, + duration_ms_sum: (repo.duration_seconds_sum * 1000.0).round() as u64, + publication_points: repo.publication_points, + }) + .collect::>(); + top_repos.sort_by(|a, b| b.duration_ms_max.cmp(&a.duration_ms_max)); + top_repos.truncate(20); + for (index, item) in top_repos.iter_mut().enumerate() { + item.rank = index + 1; + } + + pp_by_object_count.sort_by(|a, b| b.object_count.cmp(&a.object_count)); + pp_by_object_count.truncate(20); + for (index, item) in pp_by_object_count.iter_mut().enumerate() { + item.rank = index + 1; + } + pp_by_sync_duration.sort_by(|a, b| b.sync_duration_ms.cmp(&a.sync_duration_ms)); + pp_by_sync_duration.truncate(20); + for (index, item) in pp_by_sync_duration.iter_mut().enumerate() { + item.rank = index + 1; + } + + snapshot.repo_stats = repo_stats; + snapshot.object_counts = object_counts; + snapshot.large_pp_counts = large_pp_counts; + snapshot.pp_sync_histograms = pp_sync_histograms; + snapshot.top_repos_by_sync_duration = top_repos; + snapshot.top_pp_by_object_count = pp_by_object_count; + snapshot.top_pp_by_sync_duration = pp_by_sync_duration; +} + +fn parse_cir(path: &Path, snapshot: &mut MetricsSnapshot) { + if !path.exists() { + return; + } + match fs::read(path) + .map_err(|e| e.to_string()) + .and_then(|bytes| decode_cir(&bytes).map_err(|e| e.to_string())) + { + Ok(cir) => { + let mut objects_by_type = BTreeMap::new(); + for object in &cir.objects { + *objects_by_type + .entry(object_type_from_uri(&object.rsync_uri)) + .or_default() += 1; + } + let mut rejected_objects_by_type = BTreeMap::new(); + for object in &cir.rejected_objects { + *rejected_objects_by_type + .entry(object_type_from_uri(&object.object_uri)) + .or_default() += 1; + } + snapshot.cir = Some(CirMetrics { + version: cir.version, + objects: cir.objects.len() as u64, + trust_anchors: cir.trust_anchors.len() as u64, + rejected_objects: cir.rejected_objects.len() as u64, + reject_list_sha256: hex::encode(&cir.reject_list_sha256), + objects_by_type, + rejected_objects_by_type, + }); + } + Err(err) => snapshot + .service + .parse_errors + .push(format!("decode CIR failed: {}: {err}", path.display())), + } +} + +fn parse_ccr(path: &Path, snapshot: &mut MetricsSnapshot) { + if !path.exists() { + return; + } + match fs::read(path) + .map_err(|e| e.to_string()) + .and_then(|bytes| decode_content_info(&bytes).map_err(|e| e.to_string())) + { + Ok(ccr) => { + let content = ccr.content; + let mut state_present = BTreeMap::new(); + let mut state_items = BTreeMap::new(); + let mut state_digests = BTreeMap::new(); + if let Some(state) = content.mfts.as_ref() { + state_present.insert("mfts".to_string(), true); + state_items.insert("mfts".to_string(), state.mis.len() as u64); + state_digests.insert("mfts".to_string(), hex::encode(&state.hash)); + } else { + state_present.insert("mfts".to_string(), false); + } + if let Some(state) = content.vrps.as_ref() { + state_present.insert("vrps".to_string(), true); + state_items.insert("vrps".to_string(), state.rps.len() as u64); + state_digests.insert("vrps".to_string(), hex::encode(&state.hash)); + } else { + state_present.insert("vrps".to_string(), false); + } + if let Some(state) = content.vaps.as_ref() { + state_present.insert("vaps".to_string(), true); + state_items.insert("vaps".to_string(), state.aps.len() as u64); + state_digests.insert("vaps".to_string(), hex::encode(&state.hash)); + } else { + state_present.insert("vaps".to_string(), false); + } + if let Some(state) = content.tas.as_ref() { + state_present.insert("tas".to_string(), true); + state_items.insert("tas".to_string(), state.skis.len() as u64); + state_digests.insert("tas".to_string(), hex::encode(&state.hash)); + } else { + state_present.insert("tas".to_string(), false); + } + if let Some(state) = content.rks.as_ref() { + state_present.insert("rks".to_string(), true); + state_items.insert("rks".to_string(), state.rksets.len() as u64); + state_digests.insert("rks".to_string(), hex::encode(&state.hash)); + } else { + state_present.insert("rks".to_string(), false); + } + snapshot.ccr = Some(CcrMetrics { + version: content.version, + state_present, + state_items, + state_digests, + }); + } + Err(err) => snapshot + .service + .parse_errors + .push(format!("decode CCR failed: {}: {err}", path.display())), + } +} + +fn render_metrics(snapshot: &MetricsSnapshot) -> String { + let mut out = String::new(); + let mut writer = PromWriter::new(&mut out); + let instance = snapshot.instance.as_str(); + + writer.gauge( + "ours_rp_metrics_service_up", + "Artifact metrics service is up", + &[label("instance", instance)], + 1.0, + ); + writer.gauge( + "ours_rp_metrics_service_last_scan_timestamp_seconds", + "Unix timestamp of the last artifact scan", + &[label("instance", instance)], + snapshot.service.last_scan_timestamp_seconds, + ); + writer.gauge( + "ours_rp_metrics_service_last_scan_duration_seconds", + "Duration of the last artifact scan", + &[label("instance", instance)], + snapshot.service.last_scan_duration_seconds, + ); + writer.gauge( + "ours_rp_metrics_service_last_reload_success", + "Whether the last artifact reload had no parse errors", + &[label("instance", instance)], + bool_value(snapshot.service.last_reload_success), + ); + writer.gauge( + "ours_rp_metrics_service_parse_errors", + "Current parse error count", + &[label("instance", instance)], + snapshot.service.parse_errors.len() as f64, + ); + writer.gauge( + "ours_rp_metrics_service_known_runs", + "Known run directories by status", + &[label("instance", instance), label("status", "success")], + snapshot.runs.success as f64, + ); + writer.gauge( + "ours_rp_metrics_service_known_runs", + "Known run directories by status", + &[label("instance", instance), label("status", "failed")], + snapshot.runs.failed as f64, + ); + writer.gauge( + "ours_rp_metrics_service_known_runs", + "Known run directories by status", + &[label("instance", instance), label("status", "partial")], + snapshot.runs.partial as f64, + ); + + writer.counter( + "ours_rp_run_completed_total", + "Completed runs observed by the artifact metrics service", + &[label("instance", instance), label("status", "success")], + snapshot.cumulative.completed_success_total as f64, + ); + writer.counter( + "ours_rp_run_completed_total", + "Completed runs observed by the artifact metrics service", + &[label("instance", instance), label("status", "failed")], + snapshot.cumulative.completed_failed_total as f64, + ); + writer.counter( + "ours_rp_run_observed_duration_seconds_sum", + "Observed wall duration sum for successful runs", + &[label("instance", instance)], + snapshot.cumulative.observed_duration_seconds_sum, + ); + writer.counter( + "ours_rp_run_observed_duration_seconds_count", + "Observed wall duration count for successful runs", + &[label("instance", instance)], + snapshot.cumulative.observed_duration_seconds_count as f64, + ); + writer.counter( + "ours_rp_run_observed_download_bytes_total", + "Observed download bytes across successful runs", + &[label("instance", instance)], + snapshot.cumulative.observed_download_bytes_total as f64, + ); + writer.gauge( + "ours_rp_run_consecutive_failures", + "Consecutive non-success runs at the end of the run list", + &[label("instance", instance)], + snapshot.runs.consecutive_failures as f64, + ); + + if let Some(latest) = snapshot.latest_run.as_ref() { + render_latest_metrics(&mut writer, instance, latest); + } + render_repo_metrics(&mut writer, instance, &snapshot.repo_stats); + render_failed_repo_metrics(&mut writer, instance, &snapshot.repo_stats); + render_top_repo_metrics(&mut writer, instance, &snapshot.top_repos_by_sync_duration); + render_object_metrics(&mut writer, instance, &snapshot.object_counts); + render_large_pp_metrics(&mut writer, instance, &snapshot.large_pp_counts); + render_top_publication_point_metrics( + &mut writer, + instance, + &snapshot.top_pp_by_object_count, + ); + for (transport, histogram) in &snapshot.pp_sync_histograms { + writer.histogram( + "ours_rp_publication_point_sync_duration_seconds", + "Distribution of sync duration per publication point", + &[label("instance", instance), label("transport", transport)], + histogram, + ); + } + if let Some(cir) = snapshot.cir.as_ref() { + render_cir_metrics(&mut writer, instance, cir); + } + if let Some(ccr) = snapshot.ccr.as_ref() { + render_ccr_metrics(&mut writer, instance, ccr); + } + out +} + +fn render_latest_metrics(writer: &mut PromWriter<'_>, instance: &str, latest: &LatestRunMetrics) { + writer.gauge( + "ours_rp_run_sequence", + "Latest successful run sequence", + &[label("instance", instance)], + latest.run_seq as f64, + ); + writer.gauge( + "ours_rp_run_success", + "Whether the latest selected run is successful", + &[label("instance", instance)], + bool_value(latest.status == "success"), + ); + writer.gauge( + "ours_rp_run_sync_mode", + "Latest run sync mode state", + &[ + label("instance", instance), + label("sync_mode", &latest.sync_mode), + ], + 1.0, + ); + if let Some(ts) = latest.start_timestamp_seconds { + writer.gauge( + "ours_rp_run_start_timestamp_seconds", + "Latest run start timestamp", + &[label("instance", instance)], + ts, + ); + } + if let Some(ts) = latest.finish_timestamp_seconds { + writer.gauge( + "ours_rp_run_finish_timestamp_seconds", + "Latest run finish timestamp", + &[label("instance", instance)], + ts, + ); + } + writer.gauge( + "ours_rp_run_duration_seconds", + "Latest run wall duration", + &[label("instance", instance)], + latest.wall_seconds, + ); + if let Some(value) = latest.user_cpu_seconds { + writer.gauge( + "ours_rp_run_user_cpu_seconds", + "Latest run user CPU seconds", + &[label("instance", instance)], + value, + ); + } + if let Some(value) = latest.system_cpu_seconds { + writer.gauge( + "ours_rp_run_system_cpu_seconds", + "Latest run system CPU seconds", + &[label("instance", instance)], + value, + ); + } + if let Some(value) = latest.cpu_percent { + writer.gauge( + "ours_rp_run_cpu_percent", + "Latest run CPU percent from GNU time", + &[label("instance", instance)], + value, + ); + } + if let Some(value) = latest.max_rss_bytes { + writer.gauge( + "ours_rp_run_max_rss_bytes", + "Latest run maximum resident set size", + &[label("instance", instance)], + value as f64, + ); + } + if let Some(value) = latest.exit_code { + writer.gauge( + "ours_rp_run_exit_code", + "Latest run exit code", + &[label("instance", instance)], + value as f64, + ); + } + writer.gauge( + "ours_rp_vrps", + "Latest run VRP count", + &[label("instance", instance), label("kind", "total")], + latest.vrps as f64, + ); + writer.gauge( + "ours_rp_vaps", + "Latest run VAP/ASPA count", + &[label("instance", instance), label("kind", "total")], + latest.vaps as f64, + ); + writer.gauge( + "ours_rp_publication_points", + "Latest run publication point count", + &[label("instance", instance)], + latest.publication_points as f64, + ); + writer.gauge( + "ours_rp_warnings", + "Latest run warning count", + &[label("instance", instance)], + latest.warnings as f64, + ); + if let Some(value) = latest.tree_instances_processed { + writer.gauge( + "ours_rp_tree_instances", + "Latest run tree instances by state", + &[label("instance", instance), label("state", "processed")], + value as f64, + ); + } + if let Some(value) = latest.tree_instances_failed { + writer.gauge( + "ours_rp_tree_instances", + "Latest run tree instances by state", + &[label("instance", instance), label("state", "failed")], + value as f64, + ); + } + for (stage, value) in &latest.stage_seconds { + writer.gauge( + "ours_rp_run_stage_duration_seconds", + "Latest run stage duration", + &[label("instance", instance), label("stage", stage)], + *value, + ); + } + for (phase, stat) in &latest.repo_sync_phase { + writer.gauge( + "ours_rp_repo_sync_phase_count", + "Publication points by repo sync phase", + &[label("instance", instance), label("phase", phase)], + stat.count as f64, + ); + writer.gauge( + "ours_rp_repo_sync_phase_duration_seconds_total", + "Repo sync phase cumulative duration in latest run", + &[label("instance", instance), label("phase", phase)], + stat.duration_seconds_total, + ); + } + for (state, stat) in &latest.repo_terminal_state { + writer.gauge( + "ours_rp_repo_terminal_state_count", + "Publication points by terminal state", + &[label("instance", instance), label("terminal_state", state)], + stat.count as f64, + ); + writer.gauge( + "ours_rp_repo_terminal_state_duration_seconds_total", + "Terminal state cumulative duration in latest run", + &[label("instance", instance), label("terminal_state", state)], + stat.duration_seconds_total, + ); + } + if let Some(value) = latest.download_events { + writer.gauge( + "ours_rp_download_events", + "Latest run download event count", + &[label("instance", instance)], + value as f64, + ); + } + if let Some(value) = latest.download_bytes { + writer.gauge( + "ours_rp_download_bytes", + "Latest run download bytes", + &[label("instance", instance)], + value as f64, + ); + } + for (artifact, size) in &latest.artifact_sizes { + writer.gauge( + "ours_rp_artifact_size_bytes", + "Latest run artifact size", + &[label("instance", instance), label("artifact", artifact)], + *size as f64, + ); + } + for (path, stat) in &latest.state_path_sizes { + writer.gauge( + "ours_rp_state_path_size_bytes", + "State path size", + &[label("instance", instance), label("path", path)], + stat.total_size_bytes as f64, + ); + writer.gauge( + "ours_rp_state_path_files", + "State path file count", + &[label("instance", instance), label("path", path)], + stat.file_count as f64, + ); + } +} + +fn render_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) { + for repo in repos { + let base = [ + label("instance", instance), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("uri", &repo.uri), + label("transport", &repo.transport), + ]; + writer.gauge("ours_rp_repository_info", "Repository metadata", &base, 1.0); + writer.gauge( + "ours_rp_repository_publication_points", + "Publication points per repository", + &base, + repo.publication_points as f64, + ); + for (stat, value) in [ + ("sum", repo.duration_seconds_sum), + ("max", repo.duration_seconds_max), + ("avg", repo.duration_seconds_avg), + ] { + let labels = [ + label("instance", instance), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("transport", &repo.transport), + label("stat", stat), + ]; + writer.gauge( + "ours_rp_repository_sync_duration_seconds", + "Repository sync duration summary", + &labels, + value, + ); + } + for (phase, count) in &repo.phase_counts { + let labels = [ + label("instance", instance), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("phase", phase), + ]; + writer.gauge( + "ours_rp_repository_sync_phase_publication_points", + "Repository publication points by sync phase", + &labels, + *count as f64, + ); + } + for (state, count) in &repo.terminal_state_counts { + let labels = [ + label("instance", instance), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("terminal_state", state), + ]; + writer.gauge( + "ours_rp_repository_terminal_state_publication_points", + "Repository publication points by terminal state", + &labels, + *count as f64, + ); + } + } +} + +fn render_failed_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[RepoMetrics]) { + for repo in repos { + if repo + .phase_counts + .contains_key("rrdp_failed_rsync_failed") + { + writer.gauge( + "ours_rp_rrdp_rsync_failed_repository_duration_seconds", + "Repositories whose RRDP and rsync sync both failed; value is max sync duration when available", + &[ + label("instance", instance), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("phase", "rrdp_failed_rsync_failed"), + label("transport", &repo.transport), + label("uri", &repo.uri), + ], + repo.duration_seconds_max, + ); + } + } +} + +fn render_top_repo_metrics(writer: &mut PromWriter<'_>, instance: &str, repos: &[TopRepo]) { + for repo in repos { + writer.gauge( + "ours_rp_top_repository_sync_duration_seconds", + "Top repositories by max sync duration in latest run", + &[ + label("instance", instance), + label("rank", &repo.rank.to_string()), + label("repo_id", &repo.repo_id), + label("host", &repo.host), + label("transport", &repo.transport), + label("publication_points", &repo.publication_points.to_string()), + label("uri", &repo.uri), + ], + repo.duration_ms_max as f64 / 1000.0, + ); + } +} + +fn render_object_metrics( + writer: &mut PromWriter<'_>, + instance: &str, + counts: &BTreeMap<(String, String), u64>, +) { + for ((object_type, result), count) in counts { + writer.gauge( + "ours_rp_objects", + "Latest run audited objects by type and result", + &[ + label("instance", instance), + label("object_type", object_type), + label("result", result), + ], + *count as f64, + ); + } +} + +fn render_top_publication_point_metrics( + writer: &mut PromWriter<'_>, + instance: &str, + publication_points: &[TopPublicationPoint], +) { + for publication_point in publication_points { + writer.gauge( + "ours_rp_top_publication_point_object_count", + "Top publication points by object count in latest run", + &[ + label("instance", instance), + label("rank", &publication_point.rank.to_string()), + label("pp_id", &publication_point.pp_id), + label("repo_id", &publication_point.repo_id), + label("host", &publication_point.host), + label("transport", &publication_point.transport), + label("terminal_state", &publication_point.terminal_state), + label("phase", &publication_point.phase), + label("uri", &publication_point.uri), + ], + publication_point.object_count as f64, + ); + } +} + +fn render_large_pp_metrics( + writer: &mut PromWriter<'_>, + instance: &str, + counts: &BTreeMap, +) { + for threshold in LARGE_PP_OBJECT_THRESHOLDS { + writer.gauge( + "ours_rp_large_publication_points", + "Publication points with object count greater than threshold", + &[ + label("instance", instance), + label("object_count_gt", &threshold.to_string()), + ], + counts.get(threshold).copied().unwrap_or(0) as f64, + ); + } +} + +fn render_cir_metrics(writer: &mut PromWriter<'_>, instance: &str, cir: &CirMetrics) { + writer.gauge( + "ours_rp_cir_version", + "CIR version", + &[label("instance", instance)], + cir.version as f64, + ); + writer.gauge( + "ours_rp_cir_objects", + "CIR object count", + &[label("instance", instance)], + cir.objects as f64, + ); + writer.gauge( + "ours_rp_cir_trust_anchors", + "CIR trust anchor count", + &[label("instance", instance)], + cir.trust_anchors as f64, + ); + writer.gauge( + "ours_rp_cir_rejected_objects", + "CIR rejected object count", + &[label("instance", instance)], + cir.rejected_objects as f64, + ); + writer.gauge( + "ours_rp_cir_reject_list_digest_present", + "CIR reject list digest is present", + &[label("instance", instance)], + if cir.reject_list_sha256.len() == 64 { + 1.0 + } else { + 0.0 + }, + ); + for (object_type, count) in &cir.objects_by_type { + writer.gauge( + "ours_rp_cir_objects_by_type", + "CIR object count by file type", + &[ + label("instance", instance), + label("object_type", object_type), + ], + *count as f64, + ); + } + for (object_type, count) in &cir.rejected_objects_by_type { + writer.gauge( + "ours_rp_cir_rejected_objects_by_type", + "CIR rejected object count by file type", + &[ + label("instance", instance), + label("object_type", object_type), + ], + *count as f64, + ); + } +} + +fn render_ccr_metrics(writer: &mut PromWriter<'_>, instance: &str, ccr: &CcrMetrics) { + writer.gauge( + "ours_rp_ccr_version", + "CCR version", + &[label("instance", instance)], + ccr.version as f64, + ); + for (state, present) in &ccr.state_present { + writer.gauge( + "ours_rp_ccr_state_present", + "CCR state presence", + &[label("instance", instance), label("state", state)], + bool_value(*present), + ); + } + for (state, count) in &ccr.state_items { + writer.gauge( + "ours_rp_ccr_state_items", + "CCR state item count", + &[label("instance", instance), label("state", state)], + *count as f64, + ); + } + for state in ccr.state_digests.keys() { + writer.gauge( + "ours_rp_ccr_state_digest_present", + "CCR state digest presence", + &[label("instance", instance), label("state", state)], + 1.0, + ); + } +} + +fn render_status_json(snapshot: &MetricsSnapshot) -> Result { + serde_json::to_string_pretty(&json!({ + "schemaVersion": 1, + "generatedBy": "rpki_artifact_metrics", + "instance": snapshot.instance, + "service": snapshot.service, + "runs": snapshot.runs, + "latestRun": snapshot.latest_run, + "cir": snapshot.cir, + "ccr": snapshot.ccr, + "topRepositoriesBySyncDuration": snapshot.top_repos_by_sync_duration, + "topPublicationPointsByObjectCount": snapshot.top_pp_by_object_count, + "topPublicationPointsBySyncDuration": snapshot.top_pp_by_sync_duration, + })) + .map_err(|e| e.to_string()) +} + +struct PromWriter<'a> { + out: &'a mut String, + emitted_headers: BTreeSet, +} + +#[derive(Clone, Debug)] +struct Label<'a> { + key: &'a str, + value: &'a str, +} + +fn label<'a>(key: &'a str, value: &'a str) -> Label<'a> { + Label { key, value } +} + +impl<'a> PromWriter<'a> { + fn new(out: &'a mut String) -> Self { + Self { + out, + emitted_headers: BTreeSet::new(), + } + } + + fn gauge(&mut self, name: &str, help: &str, labels: &[Label<'_>], value: f64) { + self.metric("gauge", name, help, labels, value); + } + + fn counter(&mut self, name: &str, help: &str, labels: &[Label<'_>], value: f64) { + self.metric("counter", name, help, labels, value); + } + + fn metric( + &mut self, + metric_type: &str, + name: &str, + help: &str, + labels: &[Label<'_>], + value: f64, + ) { + self.header(name, help, metric_type); + self.out.push_str(name); + write_labels(self.out, labels); + self.out.push(' '); + self.out.push_str(&format_prom_value(value)); + self.out.push('\n'); + } + + fn histogram( + &mut self, + name: &str, + help: &str, + base_labels: &[Label<'_>], + histogram: &Histogram, + ) { + self.header(name, help, "histogram"); + let cumulative = histogram.cumulative_counts(); + for (index, count) in cumulative.iter().enumerate() { + let le = if index < histogram.buckets.len() { + format_prom_value(histogram.buckets[index]) + } else { + "+Inf".to_string() + }; + let mut labels = base_labels.to_vec(); + labels.push(label("le", &le)); + self.out.push_str(name); + self.out.push_str("_bucket"); + write_labels(self.out, &labels); + self.out.push(' '); + self.out.push_str(&count.to_string()); + self.out.push('\n'); + } + self.out.push_str(name); + self.out.push_str("_sum"); + write_labels(self.out, base_labels); + self.out.push(' '); + self.out.push_str(&format_prom_value(histogram.sum)); + self.out.push('\n'); + self.out.push_str(name); + self.out.push_str("_count"); + write_labels(self.out, base_labels); + self.out.push(' '); + self.out.push_str(&histogram.count.to_string()); + self.out.push('\n'); + } + + fn header(&mut self, name: &str, help: &str, metric_type: &str) { + if self.emitted_headers.insert(name.to_string()) { + self.out.push_str("# HELP "); + self.out.push_str(name); + self.out.push(' '); + self.out.push_str(&escape_help(help)); + self.out.push('\n'); + self.out.push_str("# TYPE "); + self.out.push_str(name); + self.out.push(' '); + self.out.push_str(metric_type); + self.out.push('\n'); + } + } +} + +fn write_labels(out: &mut String, labels: &[Label<'_>]) { + if labels.is_empty() { + return; + } + out.push('{'); + for (index, label) in labels.iter().enumerate() { + if index > 0 { + out.push(','); + } + out.push_str(label.key); + out.push_str("=\""); + out.push_str(&escape_label(label.value)); + out.push('"'); + } + out.push('}'); +} + +fn serve_http(listen: &str, shared: Arc>) -> Result<(), String> { + let listener = TcpListener::bind(listen).map_err(|e| format!("bind failed: {listen}: {e}"))?; + for stream in listener.incoming() { + match stream { + Ok(mut stream) => { + let snapshot = shared.read().expect("metrics lock poisoned").clone(); + if let Err(err) = handle_http_stream(&mut stream, &snapshot) { + eprintln!("http request failed: {err}"); + } + } + Err(err) => eprintln!("accept failed: {err}"), + } + } + Ok(()) +} + +fn handle_http_stream(stream: &mut TcpStream, snapshot: &MetricsSnapshot) -> Result<(), String> { + let mut buf = [0u8; 4096]; + let len = stream.read(&mut buf).map_err(|e| e.to_string())?; + let req = String::from_utf8_lossy(&buf[..len]); + let path = req + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or("/"); + match path { + "/metrics" => write_http_response( + stream, + "200 OK", + "text/plain; version=0.0.4", + &render_metrics(snapshot), + ), + "/status" => write_http_response( + stream, + "200 OK", + "application/json", + &render_status_json(snapshot)?, + ), + "/healthz" => write_http_response(stream, "200 OK", "text/plain", "ok\n"), + _ => write_http_response(stream, "404 Not Found", "text/plain", "not found\n"), + } +} + +fn write_http_response( + stream: &mut TcpStream, + status: &str, + content_type: &str, + body: &str, +) -> Result<(), String> { + let header = format!( + "HTTP/1.1 {status}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.as_bytes().len() + ); + stream + .write_all(header.as_bytes()) + .map_err(|e| e.to_string())?; + stream.write_all(body.as_bytes()).map_err(|e| e.to_string()) +} + +fn write_file(path: &Path, bytes: &[u8]) -> Result<(), String> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .map_err(|e| format!("create parent failed: {}: {e}", parent.display()))?; + } + fs::write(path, bytes).map_err(|e| format!("write failed: {}: {e}", path.display())) +} + +fn extract_stage_seconds(value: Option<&Value>) -> BTreeMap { + let mut out = BTreeMap::new(); + let Some(value) = value else { + return out; + }; + let mapping = [ + ("validation_ms", "validation"), + ("report_build_ms", "report_build"), + ("report_write_ms", "report_write"), + ("ccr_build_ms", "ccr_build"), + ("ccr_write_ms", "ccr_write"), + ("compare_view_build_ms", "compare_view_build"), + ("compare_view_write_ms", "compare_view_write"), + ("cir_build_cir_ms", "cir_build"), + ("cir_write_cir_ms", "cir_write"), + ("cir_total_ms", "cir_total"), + ("total_ms", "total"), + ("repo_sync_ms_total", "repo_sync_total"), + ("rrdp_download_ms_total", "rrdp_download_total"), + ("rsync_download_ms_total", "rsync_download_total"), + ]; + for (field, stage) in mapping { + if let Some(ms) = json_u64(value, &[field]) { + out.insert(stage.to_string(), ms as f64 / 1000.0); + } + } + out +} + +fn extract_count_duration_map(value: Option<&Value>) -> BTreeMap { + let mut out = BTreeMap::new(); + let Some(object) = value.and_then(|v| v.as_object()) else { + return out; + }; + for (key, value) in object { + out.insert( + key.clone(), + CountDuration { + count: json_u64(value, &["count"]).unwrap_or(0), + duration_seconds_total: json_u64(value, &["duration_ms_total"]).unwrap_or(0) as f64 + / 1000.0, + }, + ); + } + out +} + +fn extract_artifact_sizes(value: Option<&Value>) -> BTreeMap { + let mut out = BTreeMap::new(); + for item in value.and_then(|v| v.as_array()).into_iter().flatten() { + let artifact = json_str(item, &["type"]) + .or_else(|| { + json_str(item, &["path"]) + .and_then(|path| Path::new(path).file_name().and_then(|name| name.to_str())) + }) + .unwrap_or("unknown"); + let size = json_u64(item, &["sizeBytes"]) + .or_else(|| json_u64(item, &["size"])) + .unwrap_or(0); + *out.entry(artifact.to_string()).or_default() += size; + } + out +} + +fn extract_path_sizes(value: Option<&Value>) -> BTreeMap { + let mut out = BTreeMap::new(); + for item in value.and_then(|v| v.as_array()).into_iter().flatten() { + let label = json_str(item, &["label"]).unwrap_or("unknown").to_string(); + out.insert( + label, + PathSize { + total_size_bytes: json_u64(item, &["totalSizeBytes"]).unwrap_or(0), + file_count: json_u64(item, &["fileCount"]).unwrap_or(0), + dir_count: json_u64(item, &["dirCount"]).unwrap_or(0), + }, + ); + } + out +} + +fn run_index_from_path(path: &Path) -> Option { + path.file_name() + .and_then(|name| name.to_str()) + .and_then(|name| name.strip_prefix("run_")) + .and_then(|value| value.parse::().ok()) +} + +fn json_str<'a>(value: &'a Value, path: &[&str]) -> Option<&'a str> { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_str() +} + +fn json_u64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_u64() +} + +fn json_i64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_i64() +} + +fn json_f64(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current + .as_f64() + .or_else(|| current.as_u64().map(|v| v as f64)) +} + +fn parse_rfc3339_to_unix(value: &str) -> Option { + time::OffsetDateTime::parse(value, &time::format_description::well_known::Rfc3339) + .ok() + .map(|dt| dt.unix_timestamp() as f64) +} + +fn unix_now_seconds() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0) +} + +fn bool_value(value: bool) -> f64 { + if value { 1.0 } else { 0.0 } +} + +fn normalize_transport(value: &str) -> String { + let lower = value.to_ascii_lowercase(); + if lower.contains("rrdp") || lower.contains("https") { + "rrdp".to_string() + } else if lower.contains("rsync") { + "rsync".to_string() + } else { + lower + } +} + +fn infer_transport(uri: &str) -> String { + if uri.starts_with("http://") || uri.starts_with("https://") { + "rrdp".to_string() + } else if uri.starts_with("rsync://") { + "rsync".to_string() + } else { + "unknown".to_string() + } +} + +fn uri_host(uri: &str) -> String { + let without_scheme = uri.split_once("://").map(|(_, rest)| rest).unwrap_or(uri); + without_scheme + .split('/') + .next() + .filter(|s| !s.is_empty()) + .unwrap_or("unknown") + .to_string() +} + +fn object_type_from_uri(uri: &str) -> String { + let lower = uri.to_ascii_lowercase(); + for (suffix, kind) in [ + (".mft", "manifest"), + (".crl", "crl"), + (".cer", "certificate"), + (".roa", "roa"), + (".asa", "aspa"), + (".gbr", "gbr"), + ] { + if lower.ends_with(suffix) { + return kind.to_string(); + } + } + "other".to_string() +} + +fn short_sha256(value: &str) -> String { + let digest = Sha256::digest(value.as_bytes()); + hex::encode(&digest[..6]) +} + +fn format_prom_value(value: f64) -> String { + if value.is_infinite() && value.is_sign_positive() { + "+Inf".to_string() + } else if value.fract() == 0.0 { + format!("{value:.0}") + } else { + format!("{value:.6}") + .trim_end_matches('0') + .trim_end_matches('.') + .to_string() + } +} + +fn escape_label(value: &str) -> String { + value + .replace('\\', "\\\\") + .replace('\n', "\\n") + .replace('"', "\\\"") +} + +fn escape_help(value: &str) -> String { + value.replace('\\', "\\\\").replace('\n', "\\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use rpki::ccr::model::CCR_VERSION_V0; + use rpki::ccr::{ + CcrContentInfo, CcrDigestAlgorithm, RpkiCanonicalCacheRepresentation, TrustAnchorState, + encode_content_info, + }; + use rpki::cir::{ + CanonicalInputRepresentation, CirHashAlgorithm, CirObject, CirRejectedObject, + CirTrustAnchor, compute_reject_list_sha256, encode_cir, sha256, + }; + use tempfile::TempDir; + + #[test] + fn parse_args_accepts_once_outputs() { + let args = parse_args(&[ + "rpki_artifact_metrics".to_string(), + "--run-root".to_string(), + "root".to_string(), + "--once".to_string(), + "--out-metrics".to_string(), + "metrics.prom".to_string(), + "--out-status".to_string(), + "status.json".to_string(), + ]) + .expect("parse"); + assert!(args.once); + assert_eq!(args.run_root, PathBuf::from("root")); + assert_eq!(args.out_metrics.as_deref(), Some(Path::new("metrics.prom"))); + } + + #[test] + fn scan_fixture_exports_repo_pp_cir_and_ccr_metrics() { + let td = TempDir::new().expect("tempdir"); + let run = td.path().join("runs/run_0001"); + fs::create_dir_all(&run).expect("create run"); + fs::write( + run.join("run-meta.json"), + r#"{"status":"success","run_index":1,"run_id":"run_0001","sync_mode":"snapshot","snapshot_reason":"first_run","started_at_rfc3339_utc":"2026-05-25T00:00:00Z","completed_at_rfc3339_utc":"2026-05-25T00:00:10Z"}"#, + ) + .expect("meta"); + fs::write( + run.join("run-summary.json"), + r#"{"runSeq":1,"runId":"run_0001","runDir":"RUN","startedAtRfc3339Utc":"2026-05-25T00:00:00Z","finishedAtRfc3339Utc":"2026-05-25T00:00:10Z","wallMs":10000,"status":"success","exitCode":0,"processMetrics":{"userSeconds":2.5,"systemSeconds":1.5,"cpuPercent":40,"maxRssKb":1000},"stageTiming":{"validation_ms":7000,"total_ms":9000,"download_event_count":2,"download_bytes_total":1234},"reportCounts":{"vrps":2,"aspas":1,"publicationPoints":2,"warnings":0},"repoSyncStats":{"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}},"pathStats":[{"label":"work-db","totalSizeBytes":99,"fileCount":2,"dirCount":1}],"artifacts":[{"path":"report.json","sizeBytes":10}]}"#, + ) + .expect("summary"); + fs::write(run.join("process-time.txt"), "time").expect("time"); + fs::write(run.join("stage-timing.json"), "{}").expect("stage"); + fs::write( + run.join("report.json"), + r#"{"tree":{"instances_processed":2,"instances_failed":0,"warnings":[]},"vrps":[{},{}],"aspas":[{}],"publication_points":[{"rsync_base_uri":"rsync://repo.example/a/","manifest_rsync_uri":"rsync://repo.example/a/a.mft","publication_point_rsync_uri":"rsync://repo.example/a/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":1000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"},{"kind":"manifest","result":"ok"}]},{"rsync_base_uri":"rsync://repo.example/b/","manifest_rsync_uri":"rsync://repo.example/b/b.mft","publication_point_rsync_uri":"rsync://repo.example/b/","rrdp_notification_uri":"https://repo.example/notify.xml","repo_sync_source":"rrdp","repo_sync_phase":"rrdp_delta","repo_sync_duration_ms":2000,"repo_terminal_state":"fresh","objects":[{"kind":"roa","result":"ok"}]}],"repo_sync_stats":{"publication_points_total":2,"by_phase":{"rrdp_delta":{"count":2,"duration_ms_total":3000}},"by_terminal_state":{"fresh":{"count":2,"duration_ms_total":3000}}}}"#, + ) + .expect("report"); + fs::write(run.join("input.cir"), sample_cir()).expect("cir"); + fs::write(run.join("result.ccr"), sample_ccr()).expect("ccr"); + + let snapshot = scan_run_root(td.path(), "test").expect("scan"); + assert_eq!(snapshot.runs.success, 1); + assert_eq!(snapshot.repo_stats.len(), 1); + assert_eq!(snapshot.top_pp_by_object_count[0].object_count, 2); + assert_eq!(snapshot.cir.as_ref().unwrap().objects, 1); + assert_eq!(snapshot.ccr.as_ref().unwrap().state_items["tas"], 1); + let metrics = render_metrics(&snapshot); + assert!(metrics.contains("ours_rp_repository_info")); + assert!(metrics.contains("ours_rp_large_publication_points")); + assert!(metrics.contains("ours_rp_cir_objects")); + assert!(metrics.contains("ours_rp_ccr_state_items")); + let status = render_status_json(&snapshot).expect("status"); + assert!(status.contains("topPublicationPointsByObjectCount")); + } + + #[test] + fn partial_run_does_not_become_latest_success() { + let td = TempDir::new().expect("tempdir"); + let run = td.path().join("runs/run_0001"); + fs::create_dir_all(&run).expect("create run"); + fs::write(run.join("run-meta.json"), r#"{"status":"running"}"#).expect("meta"); + let snapshot = scan_run_root(td.path(), "test").expect("scan"); + assert_eq!(snapshot.runs.partial, 1); + assert!(snapshot.latest_run.is_none()); + } + + fn sample_cir() -> Vec { + let rejected = vec![CirRejectedObject { + object_uri: "rsync://repo.example/a/bad.roa".to_string(), + reason: Some("bad".to_string()), + }]; + let cir = CanonicalInputRepresentation { + version: rpki::cir::CIR_VERSION_V3, + hash_alg: CirHashAlgorithm::Sha256, + validation_time: time::OffsetDateTime::parse( + "2026-05-25T00:00:00Z", + &time::format_description::well_known::Rfc3339, + ) + .unwrap(), + objects: vec![CirObject { + rsync_uri: "rsync://repo.example/a/a.roa".to_string(), + sha256: vec![1; 32], + }], + trust_anchors: vec![CirTrustAnchor { + ta_rsync_uri: "rsync://repo.example/ta.cer".to_string(), + tal_uri: "https://tal.example/tal.tal".to_string(), + tal_bytes: b"rsync://repo.example/ta.cer\n\nAQID\n".to_vec(), + ta_certificate_der: b"ta".to_vec(), + ta_certificate_sha256: sha256(b"ta"), + }], + reject_list_sha256: compute_reject_list_sha256( + rejected.iter().map(|item| item.object_uri.as_str()), + ), + rejected_objects: rejected, + }; + encode_cir(&cir).expect("encode cir") + } + + fn sample_ccr() -> Vec { + let ci = CcrContentInfo::new(RpkiCanonicalCacheRepresentation { + version: CCR_VERSION_V0, + hash_alg: CcrDigestAlgorithm::Sha256, + produced_at: time::OffsetDateTime::parse( + "2026-05-25T00:00:00Z", + &time::format_description::well_known::Rfc3339, + ) + .unwrap(), + mfts: None, + vrps: None, + vaps: None, + tas: Some(TrustAnchorState { + skis: vec![vec![1; 20]], + hash: vec![2; 32], + }), + rks: None, + }); + encode_content_info(&ci).expect("encode ccr") + } +} diff --git a/src/cir/mod.rs b/src/cir/mod.rs index b5addb5..e4e62de 100644 --- a/src/cir/mod.rs +++ b/src/cir/mod.rs @@ -2,6 +2,7 @@ pub mod decode; pub mod encode; #[cfg(feature = "full")] pub mod export; +#[cfg(feature = "full")] pub mod materialize; pub mod model; pub mod sequence; @@ -15,6 +16,7 @@ pub use export::{ CirExportError, CirExportSummary, CirTrustAnchorBinding, build_cir_from_run, build_cir_from_run_multi, export_cir_from_run, export_cir_from_run_multi, write_cir_file, }; +#[cfg(feature = "full")] pub use materialize::{ CirMaterializeError, CirMaterializeSummary, materialize_cir, materialize_cir_from_raw_store, materialize_cir_from_repo_bytes, mirror_relative_path_for_rsync_uri, resolve_static_pool_file, diff --git a/src/lib.rs b/src/lib.rs index 6d4417a..16e8caf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod audit_downloads; pub mod audit_trace; #[cfg(feature = "full")] pub mod blob_store; +#[cfg(feature = "full")] pub mod cli; #[cfg(feature = "full")] pub mod current_repo_index; diff --git a/src/parallel/repo_runtime.rs b/src/parallel/repo_runtime.rs index 00224ac..9584d01 100644 --- a/src/parallel/repo_runtime.rs +++ b/src/parallel/repo_runtime.rs @@ -589,6 +589,48 @@ mod tests { } } + struct FailRrdpThenFailRsyncExecutor { + rrdp_count: Arc, + rsync_count: Arc, + } + + impl RepoTransportExecutor for FailRrdpThenFailRsyncExecutor { + fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { + match task.mode { + RepoTransportMode::Rrdp => { + self.rrdp_count.fetch_add(1, Ordering::SeqCst); + RepoTransportResultEnvelope { + dedup_key: task.dedup_key, + repo_identity: task.repo_identity, + mode: RepoTransportMode::Rrdp, + tal_id: task.tal_id, + rir_id: task.rir_id, + timing_ms: 10, + result: RepoTransportResultKind::Failed { + detail: "rrdp failed".to_string(), + warnings: vec![Warning::new("rrdp failed")], + }, + } + } + RepoTransportMode::Rsync => { + self.rsync_count.fetch_add(1, Ordering::SeqCst); + RepoTransportResultEnvelope { + dedup_key: task.dedup_key, + repo_identity: task.repo_identity, + mode: RepoTransportMode::Rsync, + tal_id: task.tal_id, + rir_id: task.rir_id, + timing_ms: 12, + result: RepoTransportResultKind::Failed { + detail: "rsync failed".to_string(), + warnings: vec![Warning::new("rsync failed")], + }, + } + } + } + } + } + #[test] fn phase1_runtime_waits_for_rrdp_transport_and_returns_rrdp_outcome() { let coordinator = GlobalRunCoordinator::new( @@ -852,6 +894,42 @@ mod tests { assert_eq!(rsync_count.load(Ordering::SeqCst), 1); } + #[test] + fn phase1_runtime_terminal_failure_keeps_rsync_failure_duration() { + let rrdp_count = Arc::new(AtomicUsize::new(0)); + let rsync_count = Arc::new(AtomicUsize::new(0)); + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 1 }, + FailRrdpThenFailRsyncExecutor { + rrdp_count: Arc::clone(&rrdp_count), + rsync_count: Arc::clone(&rsync_count), + }, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new( + coordinator, + pool, + Arc::new(|_base: &str| "rsync://example.test/module/".to_string()), + SyncPreference::RrdpThenRsync, + ); + + let outcome = runtime + .sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft")) + .expect("sync repo"); + assert!(!outcome.repo_sync_ok); + assert_eq!( + outcome.repo_sync_phase.as_deref(), + Some("rrdp_failed_rsync_failed") + ); + assert_eq!(outcome.repo_sync_duration_ms, 12); + assert_eq!(rrdp_count.load(Ordering::SeqCst), 1); + assert_eq!(rsync_count.load(Ordering::SeqCst), 1); + } + #[test] fn phase1_runtime_prefetch_submits_transport_task_before_consumption() { let rrdp_count = Arc::new(AtomicUsize::new(0)); diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 2064dd5..dcd1e02 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -433,6 +433,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { } let repo_sync_started = std::time::Instant::now(); + let mut runtime_repo_sync_duration_ms = None; let (repo_sync_ok, repo_sync_err, repo_sync_source, repo_sync_phase): ( bool, Option, @@ -444,9 +445,10 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { repo_sync_err, repo_sync_source, repo_sync_phase, - repo_sync_duration_ms: _, + repo_sync_duration_ms, warnings: repo_warnings, } = runtime.sync_publication_point_repo(ca)?; + runtime_repo_sync_duration_ms = Some(repo_sync_duration_ms); warnings.extend(repo_warnings); ( repo_sync_ok, @@ -575,7 +577,11 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { } } }; - let repo_sync_duration_ms = repo_sync_started.elapsed().as_millis() as u64; + let repo_sync_duration_ms = effective_repo_sync_duration_ms( + repo_sync_started.elapsed().as_millis() as u64, + runtime_repo_sync_duration_ms, + repo_sync_ok, + ); crate::progress_log::emit( "publication_point_repo_sync_done", serde_json::json!({ @@ -1640,6 +1646,19 @@ fn repo_sync_source_label(source: crate::sync::repo::RepoSyncSource) -> &'static } } +fn effective_repo_sync_duration_ms( + elapsed_ms: u64, + runtime_reported_duration_ms: Option, + repo_sync_ok: bool, +) -> u64 { + if repo_sync_ok { + return elapsed_ms; + } + runtime_reported_duration_ms + .map(|runtime_ms| elapsed_ms.max(runtime_ms)) + .unwrap_or(elapsed_ms) +} + fn kind_from_vcir_artifact_kind(kind: VcirArtifactKind) -> AuditObjectKind { match kind { VcirArtifactKind::Mft => AuditObjectKind::Manifest, @@ -6463,6 +6482,15 @@ authorityKeyIdentifier = keyid:always assert!(audit.objects.is_empty()); } + #[test] + fn effective_repo_sync_duration_uses_runtime_duration_for_failures() { + assert_eq!(effective_repo_sync_duration_ms(0, Some(12), false), 12); + assert_eq!(effective_repo_sync_duration_ms(5, Some(12), false), 12); + assert_eq!(effective_repo_sync_duration_ms(20, Some(12), false), 20); + assert_eq!(effective_repo_sync_duration_ms(5, None, false), 5); + assert_eq!(effective_repo_sync_duration_ms(5, Some(12), true), 5); + } + #[test] fn reconstruct_snapshot_from_vcir_reports_missing_manifest_and_related_raw_bytes() { let now = time::OffsetDateTime::now_utc();