20260413_2 并行化架构优化第一阶段,apnic 串行98s->并行74s,传输层任务并行,同repo内发布点串行

This commit is contained in:
yuyr 2026-04-15 09:53:11 +08:00
parent af1c2c7f88
commit 585c41b83b
21 changed files with 5098 additions and 73 deletions

View File

@ -0,0 +1,156 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
cat <<'EOF'
Usage:
./scripts/periodic/run_apnic_ours_parallel_round_remote.sh \
--run-root <path> \
--round-id <round-XXX> \
--kind <snapshot|delta> \
--ssh-target <user@host> \
--remote-root <path> \
[--scheduled-at <RFC3339>] \
[--skip-sync]
EOF
}
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
RUN_ROOT=""
ROUND_ID=""
KIND=""
SSH_TARGET="${SSH_TARGET:-root@47.77.183.68}"
REMOTE_ROOT=""
SCHEDULED_AT=""
SKIP_SYNC=0
while [[ $# -gt 0 ]]; do
case "$1" in
--run-root) RUN_ROOT="$2"; shift 2 ;;
--round-id) ROUND_ID="$2"; shift 2 ;;
--kind) KIND="$2"; shift 2 ;;
--ssh-target) SSH_TARGET="$2"; shift 2 ;;
--remote-root) REMOTE_ROOT="$2"; shift 2 ;;
--scheduled-at) SCHEDULED_AT="$2"; shift 2 ;;
--skip-sync) SKIP_SYNC=1; shift 1 ;;
-h|--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage; exit 2 ;;
esac
done
[[ -n "$RUN_ROOT" && -n "$ROUND_ID" && -n "$KIND" && -n "$REMOTE_ROOT" ]] || { usage >&2; exit 2; }
[[ "$KIND" == "snapshot" || "$KIND" == "delta" ]] || { echo "--kind must be snapshot or delta" >&2; exit 2; }
LOCAL_OUT="$RUN_ROOT/rounds/$ROUND_ID/ours"
REMOTE_REPO="$REMOTE_ROOT/repo"
REMOTE_OUT="$REMOTE_ROOT/rounds/$ROUND_ID/ours"
REMOTE_WORK_DB="$REMOTE_ROOT/state/ours/work-db"
REMOTE_RAW_STORE="$REMOTE_ROOT/state/ours/raw-store.db"
mkdir -p "$LOCAL_OUT"
if [[ "$SKIP_SYNC" -eq 0 ]]; then
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_ROOT'"
rsync -a --delete \
--exclude target \
--exclude .git \
"$ROOT_DIR/" "$SSH_TARGET:$REMOTE_REPO/"
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_REPO/target/release' '$REMOTE_OUT' '$REMOTE_ROOT/state/ours'"
rsync -a "$ROOT_DIR/target/release/rpki" "$SSH_TARGET:$REMOTE_REPO/target/release/"
else
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_OUT' '$REMOTE_ROOT/state/ours'"
fi
ssh "$SSH_TARGET" \
REMOTE_REPO="$REMOTE_REPO" \
REMOTE_OUT="$REMOTE_OUT" \
REMOTE_WORK_DB="$REMOTE_WORK_DB" \
REMOTE_RAW_STORE="$REMOTE_RAW_STORE" \
KIND="$KIND" \
ROUND_ID="$ROUND_ID" \
SCHEDULED_AT="$SCHEDULED_AT" \
'bash -s' <<'EOS'
set -euo pipefail
cd "$REMOTE_REPO"
mkdir -p "$REMOTE_OUT"
if [[ "$KIND" == "snapshot" ]]; then
rm -rf "$REMOTE_WORK_DB" "$REMOTE_RAW_STORE"
fi
mkdir -p "$(dirname "$REMOTE_WORK_DB")"
started_at_iso="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
started_at_ms="$(python3 - <<'PY'
import time
print(int(time.time() * 1000))
PY
)"
ccr_out="$REMOTE_OUT/result.ccr"
cir_out="$REMOTE_OUT/result.cir"
report_out="$REMOTE_OUT/report.json"
run_log="$REMOTE_OUT/run.log"
meta_out="$REMOTE_OUT/round-result.json"
set +e
env RPKI_PROGRESS_LOG=1 RPKI_PROGRESS_SLOW_SECS=0 target/release/rpki \
--db "$REMOTE_WORK_DB" \
--raw-store-db "$REMOTE_RAW_STORE" \
--tal-path tests/fixtures/tal/apnic-rfc7730-https.tal \
--ta-path tests/fixtures/ta/apnic-ta.cer \
--parallel-phase1 \
--ccr-out "$ccr_out" \
--report-json "$report_out" \
--cir-enable \
--cir-out "$cir_out" \
--cir-tal-uri "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer" \
>"$run_log" 2>&1
exit_code=$?
set -e
finished_at_iso="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
finished_at_ms="$(python3 - <<'PY'
import time
print(int(time.time() * 1000))
PY
)"
python3 - <<'PY' "$meta_out" "$ROUND_ID" "$KIND" "$SCHEDULED_AT" "$started_at_iso" "$finished_at_iso" "$REMOTE_WORK_DB" "$REMOTE_RAW_STORE" "$exit_code" "$started_at_ms" "$finished_at_ms"
import json, sys
(
path,
round_id,
kind,
scheduled_at,
started_at,
finished_at,
work_db,
raw_store,
exit_code,
start_ms,
end_ms,
) = sys.argv[1:]
with open(path, "w", encoding="utf-8") as fh:
json.dump(
{
"roundId": round_id,
"kind": kind,
"scheduledAt": scheduled_at or None,
"startedAt": started_at,
"finishedAt": finished_at,
"durationMs": int(end_ms) - int(start_ms),
"remoteWorkDbPath": work_db,
"remoteRawStoreDbPath": raw_store,
"exitCode": int(exit_code),
},
fh,
indent=2,
)
PY
exit "$exit_code"
EOS
rsync -a "$SSH_TARGET:$REMOTE_OUT/" "$LOCAL_OUT/"
echo "$LOCAL_OUT"

View File

@ -0,0 +1,280 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
cat <<'EOF'
Usage:
./scripts/periodic/run_apnic_parallel_dual_rp_periodic_ccr_compare.sh \
--run-root <path> \
[--ssh-target <user@host>] \
[--remote-root <path>] \
[--rpki-client-bin <path>] \
[--round-count <n>] \
[--interval-secs <n>] \
[--start-at <RFC3339>] \
[--dry-run]
EOF
}
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
RUN_ROOT=""
SSH_TARGET="${SSH_TARGET:-root@47.77.183.68}"
REMOTE_ROOT=""
RPKI_CLIENT_BIN="${RPKI_CLIENT_BIN:-/home/yuyr/dev/rpki-client-9.7/build-m5/src/rpki-client}"
ROUND_COUNT=10
INTERVAL_SECS=600
START_AT=""
DRY_RUN=0
while [[ $# -gt 0 ]]; do
case "$1" in
--run-root) RUN_ROOT="$2"; shift 2 ;;
--ssh-target) SSH_TARGET="$2"; shift 2 ;;
--remote-root) REMOTE_ROOT="$2"; shift 2 ;;
--rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;;
--round-count) ROUND_COUNT="$2"; shift 2 ;;
--interval-secs) INTERVAL_SECS="$2"; shift 2 ;;
--start-at) START_AT="$2"; shift 2 ;;
--dry-run) DRY_RUN=1; shift 1 ;;
-h|--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage; exit 2 ;;
esac
done
[[ -n "$RUN_ROOT" ]] || { usage >&2; exit 2; }
[[ "$ROUND_COUNT" =~ ^[0-9]+$ ]] || { echo "--round-count must be an integer" >&2; exit 2; }
[[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || { echo "--interval-secs must be an integer" >&2; exit 2; }
if [[ "$DRY_RUN" -ne 1 ]]; then
[[ -n "$REMOTE_ROOT" ]] || { echo "--remote-root is required unless --dry-run" >&2; exit 2; }
[[ -x "$RPKI_CLIENT_BIN" ]] || { echo "rpki-client binary not executable: $RPKI_CLIENT_BIN" >&2; exit 2; }
fi
mkdir -p "$RUN_ROOT"
python3 - <<'PY' "$RUN_ROOT" "$SSH_TARGET" "$REMOTE_ROOT" "$ROUND_COUNT" "$INTERVAL_SECS" "$START_AT" "$DRY_RUN"
import json
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
run_root = Path(sys.argv[1]).resolve()
ssh_target = sys.argv[2]
remote_root = sys.argv[3]
round_count = int(sys.argv[4])
interval_secs = int(sys.argv[5])
start_at_arg = sys.argv[6]
dry_run = bool(int(sys.argv[7]))
def parse_rfc3339_utc(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def fmt(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
base_time = parse_rfc3339_utc(start_at_arg) if start_at_arg else datetime.now(timezone.utc)
(run_root / "rounds").mkdir(parents=True, exist_ok=True)
(run_root / "state" / "ours" / "work-db").mkdir(parents=True, exist_ok=True)
(run_root / "state" / "ours" / "raw-store.db").mkdir(parents=True, exist_ok=True)
(run_root / "state" / "rpki-client" / "cache").mkdir(parents=True, exist_ok=True)
(run_root / "state" / "rpki-client" / "out").mkdir(parents=True, exist_ok=True)
meta = {
"version": 1,
"rir": "apnic",
"roundCount": round_count,
"intervalSecs": interval_secs,
"baseScheduledAt": fmt(base_time),
"mode": "dry_run" if dry_run else "remote_periodic",
"execution": {
"mode": "remote",
"sshTarget": ssh_target,
"remoteRoot": remote_root or None,
},
}
(run_root / "meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
rounds = []
for idx in range(round_count):
round_id = f"round-{idx+1:03d}"
kind = "snapshot" if idx == 0 else "delta"
scheduled_at = base_time + timedelta(seconds=interval_secs * idx)
round_dir = run_root / "rounds" / round_id
for name in ("ours", "rpki-client", "compare"):
(round_dir / name).mkdir(parents=True, exist_ok=True)
round_meta = {
"roundId": round_id,
"kind": kind,
"scheduledAt": fmt(scheduled_at),
"status": "dry_run" if dry_run else "pending",
"paths": {
"ours": f"rounds/{round_id}/ours",
"rpkiClient": f"rounds/{round_id}/rpki-client",
"compare": f"rounds/{round_id}/compare",
},
}
(round_dir / "round-meta.json").write_text(json.dumps(round_meta, indent=2), encoding="utf-8")
rounds.append(round_meta)
final_summary = {
"version": 1,
"status": "dry_run" if dry_run else "pending",
"roundCount": round_count,
"allMatch": None,
"rounds": rounds,
}
(run_root / "final-summary.json").write_text(json.dumps(final_summary, indent=2), encoding="utf-8")
PY
if [[ "$DRY_RUN" -eq 1 ]]; then
echo "$RUN_ROOT"
exit 0
fi
if [[ ! -x "$ROOT_DIR/target/release/rpki" || ! -x "$ROOT_DIR/target/release/ccr_to_compare_views" ]]; then
(
cd "$ROOT_DIR"
cargo build --release --bin rpki --bin ccr_to_compare_views
)
fi
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_ROOT'"
rsync -a --delete \
--exclude target \
--exclude .git \
"$ROOT_DIR/" "$SSH_TARGET:$REMOTE_ROOT/repo/"
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_ROOT/repo/target/release' '$REMOTE_ROOT/bin' '$REMOTE_ROOT/rounds' '$REMOTE_ROOT/state/ours' '$REMOTE_ROOT/state/rpki-client'"
rsync -a "$ROOT_DIR/target/release/rpki" "$SSH_TARGET:$REMOTE_ROOT/repo/target/release/"
rsync -a "$RPKI_CLIENT_BIN" "$SSH_TARGET:$REMOTE_ROOT/bin/rpki-client"
for idx in $(seq 1 "$ROUND_COUNT"); do
ROUND_ID="$(printf 'round-%03d' "$idx")"
ROUND_DIR="$RUN_ROOT/rounds/$ROUND_ID"
SCHEDULED_AT="$(python3 - <<'PY' "$ROUND_DIR/round-meta.json"
import json, sys
print(json.load(open(sys.argv[1], 'r', encoding='utf-8'))['scheduledAt'])
PY
)"
python3 - <<'PY' "$SCHEDULED_AT"
from datetime import datetime, timezone
import sys, time
scheduled = datetime.fromisoformat(sys.argv[1].replace("Z", "+00:00")).astimezone(timezone.utc)
delay = (scheduled - datetime.now(timezone.utc)).total_seconds()
if delay > 0:
time.sleep(delay)
PY
"$ROOT_DIR/scripts/periodic/run_apnic_ours_parallel_round_remote.sh" \
--run-root "$RUN_ROOT" \
--round-id "$ROUND_ID" \
--kind "$(python3 - <<'PY' "$ROUND_DIR/round-meta.json"
import json, sys
print(json.load(open(sys.argv[1], 'r', encoding='utf-8'))['kind'])
PY
)" \
--ssh-target "$SSH_TARGET" \
--remote-root "$REMOTE_ROOT" \
--scheduled-at "$SCHEDULED_AT" \
--skip-sync &
OURS_PID=$!
"$ROOT_DIR/scripts/periodic/run_apnic_rpki_client_round_remote.sh" \
--run-root "$RUN_ROOT" \
--round-id "$ROUND_ID" \
--kind "$(python3 - <<'PY' "$ROUND_DIR/round-meta.json"
import json, sys
print(json.load(open(sys.argv[1], 'r', encoding='utf-8'))['kind'])
PY
)" \
--ssh-target "$SSH_TARGET" \
--remote-root "$REMOTE_ROOT" \
--scheduled-at "$SCHEDULED_AT" \
--rpki-client-bin "$RPKI_CLIENT_BIN" \
--skip-sync &
CLIENT_PID=$!
set +e
wait "$OURS_PID"; OURS_STATUS=$?
wait "$CLIENT_PID"; CLIENT_STATUS=$?
set -e
rsync -az "$SSH_TARGET:$REMOTE_ROOT/rounds/$ROUND_ID/ours/" "$ROUND_DIR/ours/"
rsync -az "$SSH_TARGET:$REMOTE_ROOT/rounds/$ROUND_ID/rpki-client/" "$ROUND_DIR/rpki-client/"
if [[ "$OURS_STATUS" -eq 0 && "$CLIENT_STATUS" -eq 0 \
&& -f "$ROUND_DIR/ours/result.ccr" && -f "$ROUND_DIR/rpki-client/result.ccr" ]]; then
"$ROOT_DIR/scripts/periodic/compare_ccr_round.sh" \
--ours-ccr "$ROUND_DIR/ours/result.ccr" \
--rpki-client-ccr "$ROUND_DIR/rpki-client/result.ccr" \
--out-dir "$ROUND_DIR/compare" \
--trust-anchor apnic >/dev/null
fi
python3 - <<'PY' "$ROUND_DIR/round-meta.json" "$ROUND_DIR/ours/round-result.json" "$ROUND_DIR/rpki-client/round-result.json" "$ROUND_DIR/compare/compare-summary.json"
import json, sys
from datetime import datetime, timezone
from pathlib import Path
meta_path, ours_path, client_path, compare_path = sys.argv[1:]
meta = json.load(open(meta_path, 'r', encoding='utf-8'))
ours = json.load(open(ours_path, 'r', encoding='utf-8'))
client = json.load(open(client_path, 'r', encoding='utf-8'))
scheduled = datetime.fromisoformat(meta['scheduledAt'].replace('Z', '+00:00')).astimezone(timezone.utc)
started = [
datetime.fromisoformat(v.replace('Z', '+00:00')).astimezone(timezone.utc)
for v in [ours.get('startedAt'), client.get('startedAt')] if v
]
finished = [
datetime.fromisoformat(v.replace('Z', '+00:00')).astimezone(timezone.utc)
for v in [ours.get('finishedAt'), client.get('finishedAt')] if v
]
if started:
start_at = min(started)
meta['startedAt'] = start_at.strftime('%Y-%m-%dT%H:%M:%SZ')
meta['startLagMs'] = max(int((start_at - scheduled).total_seconds() * 1000), 0)
if finished:
finish_at = max(finished)
meta['finishedAt'] = finish_at.strftime('%Y-%m-%dT%H:%M:%SZ')
meta['status'] = 'completed' if ours.get('exitCode') == 0 and client.get('exitCode') == 0 else 'failed'
meta['ours'] = {'exitCode': ours.get('exitCode'), 'durationMs': ours.get('durationMs')}
meta['rpkiClient'] = {'exitCode': client.get('exitCode'), 'durationMs': client.get('durationMs')}
if Path(compare_path).exists():
compare = json.load(open(compare_path, 'r', encoding='utf-8'))
meta['compare'] = {
'allMatch': compare.get('allMatch'),
'vrpMatch': compare.get('vrps', {}).get('match'),
'vapMatch': compare.get('vaps', {}).get('match'),
'oursVrps': compare.get('vrps', {}).get('ours'),
'rpkiClientVrps': compare.get('vrps', {}).get('rpkiClient'),
'oursVaps': compare.get('vaps', {}).get('ours'),
'rpkiClientVaps': compare.get('vaps', {}).get('rpkiClient'),
}
json.dump(meta, open(meta_path, 'w', encoding='utf-8'), indent=2)
PY
ssh "$SSH_TARGET" "rm -rf '$REMOTE_ROOT/rounds/$ROUND_ID'"
done
python3 - <<'PY' "$RUN_ROOT/final-summary.json" "$RUN_ROOT/rounds"
import json, sys
from pathlib import Path
summary_path = Path(sys.argv[1])
rounds_root = Path(sys.argv[2])
rounds = []
all_match = True
for round_dir in sorted(rounds_root.glob('round-*')):
meta = json.load(open(round_dir / 'round-meta.json', 'r', encoding='utf-8'))
rounds.append(meta)
compare = meta.get('compare')
if compare is None or compare.get('allMatch') is not True:
all_match = False
summary = {
'version': 1,
'status': 'completed',
'roundCount': len(rounds),
'allMatch': all_match,
'rounds': rounds,
}
json.dump(summary, open(summary_path, 'w', encoding='utf-8'), indent=2)
PY
echo "$RUN_ROOT"

View File

@ -0,0 +1,163 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
cat <<'EOF'
Usage:
./scripts/periodic/run_apnic_rpki_client_round_remote.sh \
--run-root <path> \
--round-id <round-XXX> \
--kind <snapshot|delta> \
--ssh-target <user@host> \
--remote-root <path> \
[--scheduled-at <RFC3339>] \
[--rpki-client-bin <local path>] \
[--skip-sync]
EOF
}
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
RUN_ROOT=""
ROUND_ID=""
KIND=""
SSH_TARGET="${SSH_TARGET:-root@47.77.183.68}"
REMOTE_ROOT=""
SCHEDULED_AT=""
RPKI_CLIENT_BIN="${RPKI_CLIENT_BIN:-/home/yuyr/dev/rpki-client-9.7/build-m5/src/rpki-client}"
SKIP_SYNC=0
while [[ $# -gt 0 ]]; do
case "$1" in
--run-root) RUN_ROOT="$2"; shift 2 ;;
--round-id) ROUND_ID="$2"; shift 2 ;;
--kind) KIND="$2"; shift 2 ;;
--ssh-target) SSH_TARGET="$2"; shift 2 ;;
--remote-root) REMOTE_ROOT="$2"; shift 2 ;;
--scheduled-at) SCHEDULED_AT="$2"; shift 2 ;;
--rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;;
--skip-sync) SKIP_SYNC=1; shift 1 ;;
-h|--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage; exit 2 ;;
esac
done
[[ -n "$RUN_ROOT" && -n "$ROUND_ID" && -n "$KIND" && -n "$REMOTE_ROOT" ]] || { usage >&2; exit 2; }
[[ "$KIND" == "snapshot" || "$KIND" == "delta" ]] || { echo "--kind must be snapshot or delta" >&2; exit 2; }
[[ -x "$RPKI_CLIENT_BIN" ]] || { echo "rpki-client binary not executable: $RPKI_CLIENT_BIN" >&2; exit 2; }
LOCAL_OUT="$RUN_ROOT/rounds/$ROUND_ID/rpki-client"
REMOTE_REPO="$REMOTE_ROOT/repo"
REMOTE_BIN_DIR="$REMOTE_ROOT/bin"
REMOTE_BIN="$REMOTE_BIN_DIR/rpki-client"
REMOTE_OUT="$REMOTE_ROOT/rounds/$ROUND_ID/rpki-client"
REMOTE_CACHE="$REMOTE_ROOT/state/rpki-client/cache"
REMOTE_STATE_OUT="$REMOTE_ROOT/state/rpki-client/out"
REMOTE_STATE_ROOT="$REMOTE_ROOT/state/rpki-client"
mkdir -p "$LOCAL_OUT"
if [[ "$SKIP_SYNC" -eq 0 ]]; then
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_ROOT'"
rsync -a --delete \
--exclude target \
--exclude .git \
"$ROOT_DIR/" "$SSH_TARGET:$REMOTE_REPO/"
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_BIN_DIR' '$REMOTE_OUT' '$REMOTE_STATE_ROOT'"
rsync -a "$RPKI_CLIENT_BIN" "$SSH_TARGET:$REMOTE_BIN"
else
ssh "$SSH_TARGET" "mkdir -p '$REMOTE_BIN_DIR' '$REMOTE_OUT' '$REMOTE_STATE_ROOT'"
fi
ssh "$SSH_TARGET" \
REMOTE_ROOT="$REMOTE_ROOT" \
REMOTE_BIN="$REMOTE_BIN" \
REMOTE_OUT="$REMOTE_OUT" \
REMOTE_CACHE="$REMOTE_CACHE" \
REMOTE_STATE_OUT="$REMOTE_STATE_OUT" \
REMOTE_STATE_ROOT="$REMOTE_STATE_ROOT" \
KIND="$KIND" \
ROUND_ID="$ROUND_ID" \
SCHEDULED_AT="$SCHEDULED_AT" \
'bash -s' <<'EOS'
set -euo pipefail
cd "$REMOTE_ROOT"
mkdir -p "$REMOTE_OUT"
if [[ "$KIND" == "snapshot" ]]; then
rm -rf "$REMOTE_CACHE" "$REMOTE_STATE_OUT" "$REMOTE_STATE_ROOT/ta" "$REMOTE_STATE_ROOT/.ta"
fi
mkdir -p "$REMOTE_CACHE" "$REMOTE_STATE_OUT"
chmod 0777 "$REMOTE_STATE_ROOT" "$REMOTE_CACHE" "$REMOTE_STATE_OUT"
started_at_iso="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
started_at_ms="$(python3 - <<'PY'
import time
print(int(time.time() * 1000))
PY
)"
ccr_out="$REMOTE_OUT/result.ccr"
run_log="$REMOTE_OUT/run.log"
meta_out="$REMOTE_OUT/round-result.json"
set +e
(
cd "$REMOTE_STATE_ROOT"
"$REMOTE_BIN" -vv -t "../../repo/tests/fixtures/tal/apnic-rfc7730-https.tal" -d "cache" "out"
) >"$run_log" 2>&1
exit_code=$?
set -e
if [[ -f "$REMOTE_STATE_OUT/rpki.ccr" ]]; then
cp "$REMOTE_STATE_OUT/rpki.ccr" "$ccr_out"
fi
if [[ -f "$REMOTE_STATE_OUT/openbgpd" ]]; then
cp "$REMOTE_STATE_OUT/openbgpd" "$REMOTE_OUT/openbgpd"
fi
finished_at_iso="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
finished_at_ms="$(python3 - <<'PY'
import time
print(int(time.time() * 1000))
PY
)"
python3 - <<'PY' "$meta_out" "$ROUND_ID" "$KIND" "$SCHEDULED_AT" "$started_at_iso" "$finished_at_iso" "$REMOTE_CACHE" "$REMOTE_STATE_OUT" "$exit_code" "$started_at_ms" "$finished_at_ms"
import json, sys
(
path,
round_id,
kind,
scheduled_at,
started_at,
finished_at,
cache_path,
out_path,
exit_code,
start_ms,
end_ms,
) = sys.argv[1:]
with open(path, "w", encoding="utf-8") as fh:
json.dump(
{
"roundId": round_id,
"kind": kind,
"scheduledAt": scheduled_at or None,
"startedAt": started_at,
"finishedAt": finished_at,
"durationMs": int(end_ms) - int(start_ms),
"remoteCachePath": cache_path,
"remoteOutPath": out_path,
"exitCode": int(exit_code),
},
fh,
indent=2,
)
PY
exit "$exit_code"
EOS
rsync -a "$SSH_TARGET:$REMOTE_OUT/" "$LOCAL_OUT/"
echo "$LOCAL_OUT"

View File

@ -4,25 +4,30 @@ use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate};
use crate::audit::{
AspaOutput, AuditReportV2, AuditRepoSyncStats, AuditRunMeta, AuditWarning, TreeSummary,
VrpOutput, format_roa_ip_prefix,
format_roa_ip_prefix, AspaOutput, AuditRepoSyncStats, AuditReportV2, AuditRunMeta,
AuditWarning, TreeSummary, VrpOutput,
};
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use crate::parallel::config::ParallelPhase1Config;
use crate::parallel::types::TalInputSpec;
use crate::policy::Policy;
use crate::storage::RocksStore;
use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_parallel_phase1_audit,
run_tree_from_tal_and_ta_der_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_url_serial_audit,
run_tree_from_tal_url_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase1_audit,
run_tree_from_tal_url_serial_audit_with_timing, RunTreeFromTalAuditOutput,
};
use crate::validation::tree::TreeRunConfig;
use serde::Serialize;
use std::sync::Arc;
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
struct RunStageTiming {
@ -49,6 +54,10 @@ pub struct CliArgs {
pub tal_url: Option<String>,
pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>,
pub parallel_phase1: bool,
pub parallel_tal_urls: Vec<String>,
pub parallel_phase1_config: Option<ParallelPhase1Config>,
pub tal_inputs: Vec<TalInputSpec>,
pub db_path: PathBuf,
pub raw_store_db: Option<PathBuf>,
@ -112,6 +121,14 @@ Options:
--tal-url <url> TAL URL (downloads TAL + TA over HTTPS)
--tal-path <path> TAL file path (offline-friendly; requires --ta-path)
--ta-path <path> TA certificate DER file path (offline-friendly)
--parallel-phase1 Enable Phase 1 parallel scheduler skeleton
--parallel-tal-url <url> Additional TAL URL for future multi-TAL runs (repeatable)
--parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget
--parallel-max-inflight-snapshot-bytes-global <n>
Phase 1 inflight snapshot byte budget
--parallel-max-pending-repo-results <n>
Phase 1 pending repo result budget
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync
@ -134,6 +151,10 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_url: Option<String> = None;
let mut tal_path: Option<PathBuf> = None;
let mut ta_path: Option<PathBuf> = None;
let mut parallel_phase1: bool = false;
let mut parallel_tal_urls: Vec<String> = Vec::new();
let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase1_cfg_overridden: bool = false;
let mut db_path: Option<PathBuf> = None;
let mut raw_store_db: Option<PathBuf> = None;
@ -184,6 +205,45 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let v = argv.get(i).ok_or("--ta-path requires a value")?;
ta_path = Some(PathBuf::from(v));
}
"--parallel-phase1" => {
parallel_phase1 = true;
}
"--parallel-tal-url" => {
i += 1;
let v = argv.get(i).ok_or("--parallel-tal-url requires a value")?;
parallel_tal_urls.push(v.clone());
}
"--parallel-max-repo-sync-workers-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-repo-sync-workers-global requires a value")?;
parallel_phase1_cfg.max_repo_sync_workers_global = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-repo-sync-workers-global: {v}"))?;
parallel_phase1_cfg_overridden = true;
}
"--parallel-max-inflight-snapshot-bytes-global" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-inflight-snapshot-bytes-global requires a value")?;
parallel_phase1_cfg.max_inflight_snapshot_bytes_global =
v.parse::<usize>().map_err(|_| {
format!("invalid --parallel-max-inflight-snapshot-bytes-global: {v}")
})?;
parallel_phase1_cfg_overridden = true;
}
"--parallel-max-pending-repo-results" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-max-pending-repo-results requires a value")?;
parallel_phase1_cfg.max_pending_repo_results = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?;
parallel_phase1_cfg_overridden = true;
}
"--db" => {
i += 1;
let v = argv.get(i).ok_or("--db requires a value")?;
@ -354,6 +414,12 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage()
));
}
if !parallel_phase1 && (!parallel_tal_urls.is_empty() || parallel_phase1_cfg_overridden) {
return Err(format!(
"--parallel-tal-url and --parallel-max-* options require --parallel-phase1\n\n{}",
usage()
));
}
if tal_path.is_some() && ta_path.is_none() && !disable_rrdp {
return Err(format!(
"--tal-path requires --ta-path unless --disable-rrdp is set\n\n{}",
@ -368,9 +434,7 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
));
}
if !cir_enabled
&& (cir_out_path.is_some()
|| cir_static_root.is_some()
|| cir_tal_uri.is_some())
&& (cir_out_path.is_some() || cir_static_root.is_some() || cir_tal_uri.is_some())
{
return Err(format!(
"--cir-out/--cir-static-root/--cir-tal-uri require --cir-enable\n\n{}",
@ -468,10 +532,29 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
}
}
let mut tal_inputs = Vec::new();
if let Some(url) = tal_url.as_ref() {
tal_inputs.push(TalInputSpec::from_url(url.clone()));
} else if let Some(path) = tal_path.as_ref() {
tal_inputs.push(TalInputSpec::from_file_path(path.clone()));
}
if parallel_phase1 {
tal_inputs.extend(
parallel_tal_urls
.iter()
.cloned()
.map(TalInputSpec::from_url),
);
}
Ok(CliArgs {
tal_url,
tal_path,
ta_path,
parallel_phase1,
parallel_tal_urls,
parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg),
tal_inputs,
db_path,
raw_store_db,
policy_path,
@ -653,10 +736,12 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.unwrap_or_else(time::OffsetDateTime::now_utc);
let store = if let Some(raw_store_db) = args.raw_store_db.as_ref() {
RocksStore::open_with_external_raw_store(&args.db_path, raw_store_db)
.map_err(|e| e.to_string())?
Arc::new(
RocksStore::open_with_external_raw_store(&args.db_path, raw_store_db)
.map_err(|e| e.to_string())?,
)
} else {
RocksStore::open(&args.db_path).map_err(|e| e.to_string())?
Arc::new(RocksStore::open(&args.db_path).map_err(|e| e.to_string())?)
};
let config = TreeRunConfig {
max_depth: args.max_depth,
@ -759,7 +844,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -776,7 +861,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -814,7 +899,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -828,7 +913,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -853,9 +938,23 @@ pub fn run(argv: &[String]) -> Result<(), String> {
args.ta_path.as_ref(),
) {
(Some(url), _, _) => {
if let Some((_, t)) = timing.as_ref() {
if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
url,
&http,
@ -867,7 +966,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_serial_audit(
&store,
store.as_ref(),
&policy,
url,
&http,
@ -883,9 +982,25 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
if args.parallel_phase1 {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -899,7 +1014,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -918,7 +1033,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let tal_uri = args.cir_tal_uri.clone();
if let Some((_, t)) = timing.as_ref() {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
@ -931,7 +1046,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
@ -966,9 +1081,23 @@ pub fn run(argv: &[String]) -> Result<(), String> {
args.ta_path.as_ref(),
) {
(Some(url), _, _) => {
if let Some((_, t)) = timing.as_ref() {
if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
url,
&http,
@ -980,7 +1109,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_serial_audit(
&store,
store.as_ref(),
&policy,
url,
&http,
@ -996,9 +1125,25 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| format!("read tal failed: {}: {e}", tal_path.display()))?;
let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if let Some((_, t)) = timing.as_ref() {
if args.parallel_phase1 {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
} else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -1012,7 +1157,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
&ta_der,
@ -1031,7 +1176,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let tal_uri = args.cir_tal_uri.clone();
if let Some((_, t)) = timing.as_ref() {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit_with_timing(
&store,
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
@ -1044,7 +1189,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?
} else {
crate::validation::run_tree_from_tal::run_tree_from_tal_bytes_serial_audit(
&store,
store.as_ref(),
&policy,
&tal_bytes,
tal_uri,
@ -1114,7 +1259,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
if let Some(path) = args.ccr_out_path.as_deref() {
let started = std::time::Instant::now();
let ccr = build_ccr_from_run(
&store,
store.as_ref(),
&[out.discovery.trust_anchor.clone()],
&out.tree.vrps,
&out.tree.aspas,
@ -1144,7 +1289,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.as_deref()
.expect("validated by parse_args for cir");
let summary = export_cir_from_run(
&store,
store.as_ref(),
&out.discovery.trust_anchor,
&cir_tal_uri,
validation_time,
@ -1285,6 +1430,7 @@ mod tests {
assert!(err.contains("Usage:"), "{err}");
assert!(err.contains("--db"), "{err}");
assert!(err.contains("--rsync-mirror-root"), "{err}");
assert!(err.contains("--parallel-phase1"), "{err}");
}
#[test]
@ -1568,6 +1714,55 @@ mod tests {
assert_eq!(args.tal_url.as_deref(), Some("https://example.test/x.tal"));
assert!(args.tal_path.is_none());
assert!(args.ta_path.is_none());
assert_eq!(args.tal_inputs.len(), 1);
assert_eq!(args.tal_inputs[0].tal_id, "x");
assert!(!args.parallel_phase1);
}
#[test]
fn parse_accepts_parallel_phase1_with_extra_tal_urls() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--parallel-phase1".to_string(),
"--parallel-tal-url".to_string(),
"https://example.test/apnic.tal".to_string(),
"--parallel-tal-url".to_string(),
"https://example.test/ripe.tal".to_string(),
"--parallel-max-repo-sync-workers-global".to_string(),
"8".to_string(),
];
let args = parse_args(&argv).expect("parse");
assert!(args.parallel_phase1);
assert_eq!(args.parallel_tal_urls.len(), 2);
assert_eq!(args.tal_inputs.len(), 3);
assert_eq!(args.tal_inputs[0].tal_id, "arin");
assert_eq!(args.tal_inputs[1].tal_id, "apnic");
assert_eq!(args.tal_inputs[2].tal_id, "ripe");
assert_eq!(
args.parallel_phase1_config
.as_ref()
.map(|cfg| cfg.max_repo_sync_workers_global),
Some(8)
);
}
#[test]
fn parse_rejects_parallel_tal_flags_without_parallel_phase1() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/arin.tal".to_string(),
"--parallel-tal-url".to_string(),
"https://example.test/apnic.tal".to_string(),
];
let err = parse_args(&argv).unwrap_err();
assert!(err.contains("require --parallel-phase1"), "{err}");
}
#[test]
@ -1892,11 +2087,9 @@ mod tests {
let tree = crate::validation::tree::TreeRunOutput {
instances_processed: 1,
instances_failed: 0,
warnings: vec![
crate::report::Warning::new("synthetic warning")
.with_rfc_refs(&[crate::report::RfcRef("RFC 6487 §4.8.8.1")])
.with_context("rsync://example.test/repo/pp/"),
],
warnings: vec![crate::report::Warning::new("synthetic warning")
.with_rfc_refs(&[crate::report::RfcRef("RFC 6487 §4.8.8.1")])
.with_context("rsync://example.test/repo/pp/")],
vrps: vec![crate::validation::objects::Vrp {
asn: 64496,
prefix: crate::data_model::roa::IpPrefix {
@ -1990,7 +2183,10 @@ mod tests {
assert_eq!(stats.by_phase["rrdp_ok"].count, 1);
assert_eq!(stats.by_phase["rrdp_ok"].duration_ms_total, 10);
assert_eq!(stats.by_phase["rrdp_failed_rsync_failed"].count, 2);
assert_eq!(stats.by_phase["rrdp_failed_rsync_failed"].duration_ms_total, 50);
assert_eq!(
stats.by_phase["rrdp_failed_rsync_failed"].duration_ms_total,
50
);
assert_eq!(stats.by_terminal_state["fresh"].count, 1);
assert_eq!(stats.by_terminal_state["failed_no_cache"].count, 2);
assert_eq!(

View File

@ -20,7 +20,7 @@ pub fn normalize_rsync_base_uri(s: &str) -> String {
///
/// v1: this is intentionally abstract so unit tests can use a mock, and later we can
/// back it by calling the system `rsync` binary (RFC 6481 §5; RFC 8182 §3.4.5).
pub trait RsyncFetcher {
pub trait RsyncFetcher: Send + Sync {
/// Return a list of objects as `(rsync_uri, bytes)` pairs.
fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult<Vec<(String, Vec<u8>)>>;
@ -56,6 +56,7 @@ pub trait RsyncFetcher {
///
/// This is primarily meant for offline tests and fixtures. The key generation mimics rsync URIs:
/// `rsync_base_uri` + relative path (with `/` separators).
#[derive(Clone, Debug)]
pub struct LocalDirRsyncFetcher {
pub root_dir: PathBuf,
}

View File

@ -2,8 +2,6 @@ pub mod ccr;
pub mod cir;
pub mod data_model;
#[cfg(feature = "full")]
pub mod blob_store;
#[cfg(feature = "full")]
pub mod analysis;
#[cfg(feature = "full")]
@ -13,12 +11,16 @@ pub mod audit_downloads;
#[cfg(feature = "full")]
pub mod audit_trace;
#[cfg(feature = "full")]
pub mod blob_store;
#[cfg(feature = "full")]
pub mod bundle;
#[cfg(feature = "full")]
pub mod cli;
#[cfg(feature = "full")]
pub mod fetch;
#[cfg(feature = "full")]
pub mod parallel;
#[cfg(feature = "full")]
pub mod policy;
#[cfg(feature = "full")]
pub mod progress_log;

29
src/parallel/config.rs Normal file
View File

@ -0,0 +1,29 @@
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ParallelPhase1Config {
pub max_repo_sync_workers_global: usize,
pub max_inflight_snapshot_bytes_global: usize,
pub max_pending_repo_results: usize,
}
impl Default for ParallelPhase1Config {
fn default() -> Self {
Self {
max_repo_sync_workers_global: 4,
max_inflight_snapshot_bytes_global: 512 * 1024 * 1024,
max_pending_repo_results: 1024,
}
}
}
#[cfg(test)]
mod tests {
use super::ParallelPhase1Config;
#[test]
fn default_parallel_phase1_config_is_bounded() {
let cfg = ParallelPhase1Config::default();
assert!(cfg.max_repo_sync_workers_global > 0);
assert!(cfg.max_inflight_snapshot_bytes_global > 0);
assert!(cfg.max_pending_repo_results > 0);
}
}

7
src/parallel/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod config;
pub mod repo_runtime;
pub mod repo_scheduler;
pub mod repo_worker;
pub mod run_coordinator;
pub mod stats;
pub mod types;

View File

@ -0,0 +1,548 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::parallel::repo_scheduler::TransportRequestAction;
use crate::parallel::repo_worker::{RepoTransportExecutor, RepoTransportWorkerPool};
use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{
RepoIdentity, RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope,
RepoTransportResultKind, RepoRequester,
};
use crate::policy::SyncPreference;
use crate::report::Warning;
use crate::validation::tree::{CaInstanceHandle, DiscoveredChildCaInstance};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncRuntimeOutcome {
pub repo_sync_ok: bool,
pub repo_sync_err: Option<String>,
pub repo_sync_source: Option<String>,
pub repo_sync_phase: Option<String>,
pub repo_sync_duration_ms: u64,
pub warnings: Vec<Warning>,
}
pub trait RepoSyncRuntime: Send + Sync {
fn sync_publication_point_repo(
&self,
ca: &CaInstanceHandle,
) -> Result<RepoSyncRuntimeOutcome, String>;
fn prefetch_discovered_children(
&self,
children: &[DiscoveredChildCaInstance],
) -> Result<(), String>;
}
pub struct Phase1RepoSyncRuntime<E: RepoTransportExecutor> {
coordinator: Mutex<GlobalRunCoordinator>,
worker_pool: Mutex<RepoTransportWorkerPool<E>>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
sync_preference: SyncPreference,
}
impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
pub fn new(
coordinator: GlobalRunCoordinator,
worker_pool: RepoTransportWorkerPool<E>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
sync_preference: SyncPreference,
) -> Self {
Self {
coordinator: Mutex::new(coordinator),
worker_pool: Mutex::new(worker_pool),
rsync_scope_resolver,
sync_preference,
}
}
fn build_requester(ca: &CaInstanceHandle) -> RepoRequester {
RepoRequester {
tal_id: ca.tal_id.clone(),
rir_id: ca.tal_id.clone(),
parent_node_id: None,
ca_instance_handle_id: format!("{}:{}", ca.tal_id, ca.manifest_rsync_uri),
publication_point_rsync_uri: ca.publication_point_rsync_uri.clone(),
manifest_rsync_uri: ca.manifest_rsync_uri.clone(),
}
}
fn build_identity(ca: &CaInstanceHandle) -> RepoIdentity {
RepoIdentity::new(ca.rrdp_notification_uri.clone(), ca.rsync_base_uri.clone())
}
fn ensure_transport_requested(
&self,
ca: &CaInstanceHandle,
priority: u8,
) -> Result<Option<RepoSyncRuntimeOutcome>, String> {
let identity = Self::build_identity(ca);
let requester = Self::build_requester(ca);
let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri);
let action = {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.register_transport_request(
identity.clone(),
requester,
time::OffsetDateTime::now_utc(),
priority,
rsync_scope_uri,
self.sync_preference,
)
};
match action {
TransportRequestAction::Enqueue(task) => {
crate::progress_log::emit(
"phase1_repo_task_enqueued",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri,
"repo_key_notification_uri": task.repo_identity.notification_uri,
"priority": priority,
"transport_mode": match task.mode {
RepoTransportMode::Rrdp => "rrdp",
RepoTransportMode::Rsync => "rsync",
},
}),
);
self.drain_pending_transport_tasks()?;
Ok(None)
}
TransportRequestAction::Waiting { state } => {
crate::progress_log::emit(
"phase1_repo_task_waiting",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": identity.rsync_base_uri,
"repo_key_notification_uri": identity.notification_uri,
"priority": priority,
"runtime_state": format!("{state:?}"),
}),
);
Ok(None)
}
TransportRequestAction::ReusedSuccess(result)
| TransportRequestAction::ReusedTerminalFailure(result) => {
crate::progress_log::emit(
"phase1_repo_task_reused",
serde_json::json!({
"manifest_rsync_uri": ca.manifest_rsync_uri,
"publication_point_rsync_uri": ca.publication_point_rsync_uri,
"repo_key_rsync_base_uri": identity.rsync_base_uri,
"repo_key_notification_uri": identity.notification_uri,
"priority": priority,
"transport_mode": match result.mode {
RepoTransportMode::Rrdp => "rrdp",
RepoTransportMode::Rsync => "rsync",
},
}),
);
Ok(Some(outcome_from_transport_result(
&result,
self.runtime_state_for_identity(&identity)
.unwrap_or(RepoRuntimeState::Init),
)))
}
}
}
fn drain_pending_transport_tasks(&self) -> Result<(), String> {
loop {
let maybe_task = {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.pop_next_transport_task()
};
let Some(task) = maybe_task else {
break;
};
{
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.mark_transport_running(&task.dedup_key, time::OffsetDateTime::now_utc())?;
}
crate::progress_log::emit(
"phase1_repo_task_dispatched",
serde_json::json!({
"repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri,
"repo_key_notification_uri": task.repo_identity.notification_uri,
"requester_count": task.requesters.len(),
"priority": task.priority,
"transport_mode": match task.mode {
RepoTransportMode::Rrdp => "rrdp",
RepoTransportMode::Rsync => "rsync",
},
}),
);
let pool = self.worker_pool.lock().expect("worker pool lock poisoned");
pool.submit(task)?;
}
Ok(())
}
fn pump_one_transport_result(&self) -> Result<(), String> {
let envelope = {
let pool = self.worker_pool.lock().expect("worker pool lock poisoned");
pool.recv_result_timeout(Duration::from_millis(50))?
};
let Some(envelope) = envelope else {
return Ok(());
};
crate::progress_log::emit(
"phase1_repo_task_result",
serde_json::json!({
"repo_key_rsync_base_uri": envelope.repo_identity.rsync_base_uri,
"repo_key_notification_uri": envelope.repo_identity.notification_uri,
"timing_ms": envelope.timing_ms,
"transport_mode": match envelope.mode {
RepoTransportMode::Rrdp => "rrdp",
RepoTransportMode::Rsync => "rsync",
},
"result": match &envelope.result {
RepoTransportResultKind::Success { .. } => "success",
RepoTransportResultKind::Failed { .. } => "failed",
},
}),
);
let finished_at = time::OffsetDateTime::now_utc();
let completion = {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.complete_transport_result(envelope, finished_at)?
};
if !completion.follow_up_tasks.is_empty() {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
for task in completion.follow_up_tasks {
crate::progress_log::emit(
"phase1_repo_task_enqueued",
serde_json::json!({
"manifest_rsync_uri": serde_json::Value::Null,
"publication_point_rsync_uri": task.requesters.first().map(|r| r.publication_point_rsync_uri.clone()),
"repo_key_rsync_base_uri": task.repo_identity.rsync_base_uri,
"repo_key_notification_uri": task.repo_identity.notification_uri,
"priority": task.priority,
"transport_mode": "rsync",
}),
);
coordinator.push_transport_task(task);
}
}
self.drain_pending_transport_tasks()?;
Ok(())
}
fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option<RepoRuntimeState> {
let coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.runtime_record(identity).map(|record| record.state)
}
fn resolved_outcome_for_identity(
&self,
identity: &RepoIdentity,
) -> Option<RepoSyncRuntimeOutcome> {
let coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
let record = coordinator.runtime_record(identity)?;
match record.state {
RepoRuntimeState::RrdpOk | RepoRuntimeState::RsyncOk => record
.last_success
.as_ref()
.map(|result| outcome_from_transport_result(result, record.state)),
RepoRuntimeState::FailedTerminal => record
.terminal_failure
.as_ref()
.map(|result| outcome_from_transport_result(result, record.state)),
_ => None,
}
}
}
impl<E: RepoTransportExecutor> RepoSyncRuntime for Phase1RepoSyncRuntime<E> {
fn sync_publication_point_repo(
&self,
ca: &CaInstanceHandle,
) -> Result<RepoSyncRuntimeOutcome, String> {
if let Some(done) = self.ensure_transport_requested(ca, 0)? {
return Ok(done);
}
let identity = Self::build_identity(ca);
loop {
if let Some(done) = self.resolved_outcome_for_identity(&identity) {
return Ok(done);
}
self.pump_one_transport_result()?;
}
}
fn prefetch_discovered_children(
&self,
children: &[DiscoveredChildCaInstance],
) -> Result<(), String> {
for child in children {
let _ = self.ensure_transport_requested(&child.handle, 1)?;
}
Ok(())
}
}
fn outcome_from_transport_result(
envelope: &RepoTransportResultEnvelope,
state: RepoRuntimeState,
) -> RepoSyncRuntimeOutcome {
match (&envelope.result, state) {
(RepoTransportResultKind::Success { source, warnings }, RepoRuntimeState::RrdpOk) => {
RepoSyncRuntimeOutcome {
repo_sync_ok: true,
repo_sync_err: None,
repo_sync_source: Some(source.clone()),
repo_sync_phase: Some("rrdp_ok".to_string()),
repo_sync_duration_ms: envelope.timing_ms,
warnings: warnings.clone(),
}
}
(RepoTransportResultKind::Success { source, warnings }, RepoRuntimeState::RsyncOk) => {
RepoSyncRuntimeOutcome {
repo_sync_ok: true,
repo_sync_err: None,
repo_sync_source: Some(source.clone()),
repo_sync_phase: Some(if envelope.repo_identity.notification_uri.is_some() {
"rrdp_failed_rsync_ok".to_string()
} else {
"rsync_only_ok".to_string()
}),
repo_sync_duration_ms: envelope.timing_ms,
warnings: warnings.clone(),
}
}
(RepoTransportResultKind::Failed { detail, warnings }, RepoRuntimeState::FailedTerminal) => {
RepoSyncRuntimeOutcome {
repo_sync_ok: false,
repo_sync_err: Some(detail.clone()),
repo_sync_source: None,
repo_sync_phase: Some(if envelope.repo_identity.notification_uri.is_some() {
"rrdp_failed_rsync_failed".to_string()
} else {
"rsync_failed".to_string()
}),
repo_sync_duration_ms: envelope.timing_ms,
warnings: warnings.clone(),
}
}
_ => RepoSyncRuntimeOutcome {
repo_sync_ok: false,
repo_sync_err: Some("repo runtime state unresolved".to_string()),
repo_sync_source: None,
repo_sync_phase: Some("repo_runtime_unresolved".to_string()),
repo_sync_duration_ms: envelope.timing_ms,
warnings: Vec::new(),
},
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use crate::policy::SyncPreference;
use crate::parallel::config::ParallelPhase1Config;
use crate::parallel::repo_runtime::{Phase1RepoSyncRuntime, RepoSyncRuntime};
use crate::parallel::repo_worker::{
RepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig,
};
use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{
RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind,
RepoTransportTask, TalInputSpec,
};
use crate::report::Warning;
use crate::validation::tree::{CaInstanceHandle, DiscoveredChildCaInstance};
fn sample_ca(manifest: &str) -> CaInstanceHandle {
CaInstanceHandle {
depth: 0,
tal_id: "arin".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: vec![1, 2, 3],
ca_certificate_rsync_uri: None,
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: "rsync://example.test/repo/".to_string(),
manifest_rsync_uri: manifest.to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/".to_string(),
rrdp_notification_uri: Some("https://example.test/notify.xml".to_string()),
}
}
struct SuccessTransportExecutor;
impl RepoTransportExecutor for SuccessTransportExecutor {
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
repo_identity: task.repo_identity,
mode: task.mode,
tal_id: task.tal_id,
rir_id: task.rir_id,
timing_ms: 7,
result: RepoTransportResultKind::Success {
source: match task.mode {
RepoTransportMode::Rrdp => "rrdp".to_string(),
RepoTransportMode::Rsync => "rsync".to_string(),
},
warnings: vec![Warning::new("transport ok")],
},
}
}
}
struct FailRrdpThenSucceedRsyncExecutor {
rrdp_count: Arc<AtomicUsize>,
rsync_count: Arc<AtomicUsize>,
}
impl RepoTransportExecutor for FailRrdpThenSucceedRsyncExecutor {
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
match task.mode {
RepoTransportMode::Rrdp => {
self.rrdp_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rrdp,
tal_id: task.tal_id,
rir_id: task.rir_id,
timing_ms: 10,
result: RepoTransportResultKind::Failed {
detail: "rrdp failed".to_string(),
warnings: vec![Warning::new("rrdp failed")],
},
}
}
RepoTransportMode::Rsync => {
self.rsync_count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
repo_identity: task.repo_identity,
mode: RepoTransportMode::Rsync,
tal_id: task.tal_id,
rir_id: task.rir_id,
timing_ms: 12,
result: RepoTransportResultKind::Success {
source: "rsync".to_string(),
warnings: vec![Warning::new("rsync ok")],
},
}
}
}
}
}
#[test]
fn phase1_runtime_waits_for_rrdp_transport_and_returns_rrdp_outcome() {
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
SuccessTransportExecutor,
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let outcome = runtime
.sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft"))
.expect("sync repo");
assert!(outcome.repo_sync_ok);
assert_eq!(outcome.repo_sync_source.as_deref(), Some("rrdp"));
assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok"));
}
#[test]
fn phase1_runtime_transitions_rrdp_failure_to_rsync_success() {
let rrdp_count = Arc::new(AtomicUsize::new(0));
let rsync_count = Arc::new(AtomicUsize::new(0));
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
FailRrdpThenSucceedRsyncExecutor {
rrdp_count: Arc::clone(&rrdp_count),
rsync_count: Arc::clone(&rsync_count),
},
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|_base: &str| "rsync://example.test/module/".to_string()),
SyncPreference::RrdpThenRsync,
);
let outcome = runtime
.sync_publication_point_repo(&sample_ca("rsync://example.test/repo/root.mft"))
.expect("sync repo");
assert!(outcome.repo_sync_ok);
assert_eq!(outcome.repo_sync_source.as_deref(), Some("rsync"));
assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_failed_rsync_ok"));
assert_eq!(rrdp_count.load(Ordering::SeqCst), 1);
assert_eq!(rsync_count.load(Ordering::SeqCst), 1);
}
#[test]
fn phase1_runtime_prefetch_submits_transport_task_before_consumption() {
let rrdp_count = Arc::new(AtomicUsize::new(0));
let rsync_count = Arc::new(AtomicUsize::new(0));
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
FailRrdpThenSucceedRsyncExecutor {
rrdp_count: Arc::clone(&rrdp_count),
rsync_count: Arc::clone(&rsync_count),
},
)
.expect("pool");
let runtime = Arc::new(Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|_base: &str| "rsync://example.test/module/".to_string()),
SyncPreference::RrdpThenRsync,
));
let child = DiscoveredChildCaInstance {
handle: sample_ca("rsync://example.test/repo/child.mft"),
discovered_from: crate::audit::DiscoveredFrom {
parent_manifest_rsync_uri: "rsync://example.test/repo/root.mft".to_string(),
child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer".to_string(),
child_ca_certificate_sha256_hex: "00".repeat(32),
},
};
runtime
.prefetch_discovered_children(std::slice::from_ref(&child))
.expect("prefetch");
let started = Instant::now();
while rrdp_count.load(Ordering::SeqCst) == 0 && started.elapsed() < Duration::from_secs(1) {
std::thread::sleep(Duration::from_millis(10));
}
assert_eq!(rrdp_count.load(Ordering::SeqCst), 1);
let outcome = runtime
.sync_publication_point_repo(&child.handle)
.expect("sync child repo");
assert!(outcome.repo_sync_ok);
assert_eq!(rsync_count.load(Ordering::SeqCst), 1);
}
}

File diff suppressed because it is too large Load Diff

1121
src/parallel/repo_worker.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,364 @@
use std::collections::VecDeque;
use crate::parallel::config::ParallelPhase1Config;
use crate::parallel::repo_scheduler::{
InFlightRepoTable, RepoCompletion, RepoRequestAction, TransportCompletion, TransportRequestAction,
TransportStateTables,
};
use crate::parallel::stats::ParallelRunStats;
use crate::parallel::types::{
RepoIdentity, RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncTask, RepoTransportResultEnvelope,
RepoTransportTask, TalInputSpec,
};
use crate::policy::SyncPreference;
pub struct GlobalRunCoordinator {
pub config: ParallelPhase1Config,
pub tal_inputs: Vec<TalInputSpec>,
pub in_flight_repos: InFlightRepoTable,
pub transport_tables: TransportStateTables,
pub pending_repo_tasks: VecDeque<RepoSyncTask>,
pub pending_transport_tasks: VecDeque<RepoTransportTask>,
pub stats: ParallelRunStats,
}
impl GlobalRunCoordinator {
pub fn new(config: ParallelPhase1Config, tal_inputs: Vec<TalInputSpec>) -> Self {
Self {
config,
tal_inputs,
in_flight_repos: InFlightRepoTable::new(),
transport_tables: TransportStateTables::new(),
pending_repo_tasks: VecDeque::new(),
pending_transport_tasks: VecDeque::new(),
stats: ParallelRunStats::default(),
}
}
pub fn register_repo_request(
&mut self,
repo_key: RepoKey,
requester: RepoRequester,
validation_time: time::OffsetDateTime,
sync_preference: SyncPreference,
priority: u8,
) -> RepoRequestAction {
let action = self.in_flight_repos.register_request(
repo_key,
requester,
validation_time,
sync_preference,
priority,
);
match &action {
RepoRequestAction::Enqueued(task) => {
self.stats.repo_tasks_total += 1;
self.pending_repo_tasks.push_back(task.clone());
self.stats.repo_queue_depth = self.pending_repo_tasks.len();
}
RepoRequestAction::Reused(_) | RepoRequestAction::FailedReuse { .. } => {
self.stats.repo_tasks_reused += 1;
}
RepoRequestAction::Waiting => {}
}
action
}
pub fn pop_next_repo_task(&mut self) -> Option<RepoSyncTask> {
let next = self.pending_repo_tasks.pop_front();
self.stats.repo_queue_depth = self.pending_repo_tasks.len();
next
}
pub fn mark_repo_running(
&mut self,
repo_key: &RepoKey,
started_at: time::OffsetDateTime,
) -> Result<(), String> {
self.in_flight_repos.mark_running(repo_key, started_at)?;
self.stats.repo_tasks_running += 1;
Ok(())
}
pub fn complete_repo_success(
&mut self,
result: RepoSyncResultEnvelope,
finished_at: time::OffsetDateTime,
) -> Result<RepoCompletion, String> {
let repo_key = result.repo_key.clone();
let completion = self
.in_flight_repos
.complete_success(&repo_key, result, finished_at)?;
self.stats.repo_tasks_running = self.stats.repo_tasks_running.saturating_sub(1);
Ok(completion)
}
pub fn complete_repo_failure(
&mut self,
result: RepoSyncResultEnvelope,
finished_at: time::OffsetDateTime,
) -> Result<RepoCompletion, String> {
let repo_key = result.repo_key.clone();
let completion = self
.in_flight_repos
.complete_failure(&repo_key, result, finished_at)?;
self.stats.repo_tasks_running = self.stats.repo_tasks_running.saturating_sub(1);
self.stats.repo_tasks_failed += 1;
Ok(completion)
}
pub fn register_transport_request(
&mut self,
identity: RepoIdentity,
requester: RepoRequester,
validation_time: time::OffsetDateTime,
priority: u8,
rsync_scope_uri: String,
sync_preference: SyncPreference,
) -> TransportRequestAction {
let action = self.transport_tables.register_transport_request(
identity,
requester,
validation_time,
priority,
rsync_scope_uri,
sync_preference,
);
match &action {
TransportRequestAction::Enqueue(task) => {
self.stats.repo_tasks_total += 1;
self.pending_transport_tasks.push_back(task.clone());
self.stats.repo_queue_depth = self.pending_transport_tasks.len();
}
TransportRequestAction::ReusedSuccess(_)
| TransportRequestAction::ReusedTerminalFailure(_) => {
self.stats.repo_tasks_reused += 1;
}
TransportRequestAction::Waiting { .. } => {}
}
action
}
pub fn push_transport_task(&mut self, task: RepoTransportTask) {
self.stats.repo_tasks_total += 1;
self.pending_transport_tasks.push_back(task);
self.stats.repo_queue_depth = self.pending_transport_tasks.len();
}
pub fn pop_next_transport_task(&mut self) -> Option<RepoTransportTask> {
let next = self.pending_transport_tasks.pop_front();
self.stats.repo_queue_depth = self.pending_transport_tasks.len();
next
}
pub fn mark_transport_running(
&mut self,
dedup_key: &crate::parallel::types::RepoDedupKey,
started_at: time::OffsetDateTime,
) -> Result<(), String> {
self.transport_tables.mark_transport_running(dedup_key, started_at)?;
self.stats.repo_tasks_running += 1;
Ok(())
}
pub fn complete_transport_result(
&mut self,
result: RepoTransportResultEnvelope,
finished_at: time::OffsetDateTime,
) -> Result<TransportCompletion, String> {
let completion = self.transport_tables.complete_transport_result(result.clone(), finished_at)?;
self.stats.repo_tasks_running = self.stats.repo_tasks_running.saturating_sub(1);
if matches!(result.result, crate::parallel::types::RepoTransportResultKind::Failed { .. })
&& result.mode == crate::parallel::types::RepoTransportMode::Rsync
{
self.stats.repo_tasks_failed += 1;
}
Ok(completion)
}
pub fn runtime_record(
&self,
identity: &RepoIdentity,
) -> Option<&crate::parallel::repo_scheduler::RepoRuntimeRecord> {
self.transport_tables.runtime_record(identity)
}
}
#[cfg(test)]
mod tests {
use crate::parallel::config::ParallelPhase1Config;
use crate::parallel::repo_scheduler::RepoRequestAction;
use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{
RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncResultKind, RepoSyncResultRef,
TalInputSpec,
};
use crate::policy::SyncPreference;
fn requester(tal_id: &str, rir_id: &str, manifest: &str) -> RepoRequester {
RepoRequester {
tal_id: tal_id.to_string(),
rir_id: rir_id.to_string(),
parent_node_id: None,
ca_instance_handle_id: format!("{tal_id}:{manifest}"),
publication_point_rsync_uri: "rsync://example.test/repo/".to_string(),
manifest_rsync_uri: manifest.to_string(),
}
}
#[test]
fn coordinator_holds_tal_inputs_and_default_stats() {
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
assert_eq!(coordinator.tal_inputs.len(), 1);
assert_eq!(coordinator.tal_inputs[0].tal_id, "arin");
assert_eq!(coordinator.stats.repo_tasks_total, 0);
assert!(coordinator.in_flight_repos.is_empty());
}
#[test]
fn coordinator_enqueues_new_repo_task_once() {
let mut coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let key = RepoKey::new("rsync://example.test/repo/", None);
let action = coordinator.register_repo_request(
key.clone(),
requester("arin", "arin", "rsync://example.test/repo/root.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
7,
);
let task = match action {
RepoRequestAction::Enqueued(task) => task,
other => panic!("expected enqueue, got {other:?}"),
};
assert_eq!(task.priority, 7);
assert_eq!(coordinator.stats.repo_tasks_total, 1);
assert_eq!(coordinator.stats.repo_queue_depth, 1);
let popped = coordinator.pop_next_repo_task().expect("task queued");
assert_eq!(popped.repo_key, key);
assert_eq!(coordinator.stats.repo_queue_depth, 0);
}
#[test]
fn coordinator_merges_waiting_requesters_and_reuses_success() {
let mut coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![
TalInputSpec::from_url("https://example.test/arin.tal"),
TalInputSpec::from_url("https://example.test/apnic.tal"),
],
);
let key = RepoKey::new("rsync://shared.example/repo/", None);
let _ = coordinator.register_repo_request(
key.clone(),
requester("arin", "arin", "rsync://shared.example/repo/root.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
0,
);
let _ = coordinator.pop_next_repo_task();
coordinator
.mark_repo_running(&key, time::OffsetDateTime::UNIX_EPOCH)
.expect("running");
let wait_action = coordinator.register_repo_request(
key.clone(),
requester("apnic", "apnic", "rsync://shared.example/repo/child.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
0,
);
assert_eq!(wait_action, RepoRequestAction::Waiting);
let completion = coordinator
.complete_repo_success(
RepoSyncResultEnvelope {
repo_key: key.clone(),
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
result: RepoSyncResultKind::Success(RepoSyncResultRef {
repo_key: key.clone(),
source: "rrdp".to_string(),
}),
phase: Some("rrdp_ok".to_string()),
timing_ms: 12,
warnings: Vec::new(),
},
time::OffsetDateTime::UNIX_EPOCH,
)
.expect("success");
assert_eq!(completion.released_requesters.len(), 1);
assert_eq!(coordinator.stats.repo_tasks_running, 0);
let reuse_action = coordinator.register_repo_request(
key,
requester("ripe", "ripe", "rsync://shared.example/repo/again.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
0,
);
assert!(matches!(reuse_action, RepoRequestAction::Reused(_)));
assert_eq!(coordinator.stats.repo_tasks_reused, 1);
}
#[test]
fn coordinator_reuses_failures_without_enqueuing_again() {
let mut coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let key = RepoKey::new("rsync://shared.example/repo/", None);
let _ = coordinator.register_repo_request(
key.clone(),
requester("arin", "arin", "rsync://shared.example/repo/root.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
0,
);
let _ = coordinator.pop_next_repo_task();
coordinator
.mark_repo_running(&key, time::OffsetDateTime::UNIX_EPOCH)
.expect("running");
coordinator
.complete_repo_failure(
RepoSyncResultEnvelope {
repo_key: key.clone(),
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
result: RepoSyncResultKind::Failed {
detail: "timeout".to_string(),
},
phase: Some("rrdp_failed_rsync_failed".to_string()),
timing_ms: 12,
warnings: Vec::new(),
},
time::OffsetDateTime::UNIX_EPOCH,
)
.expect("failure");
assert_eq!(coordinator.stats.repo_tasks_failed, 1);
let action = coordinator.register_repo_request(
key,
requester("ripe", "ripe", "rsync://shared.example/repo/child.mft"),
time::OffsetDateTime::UNIX_EPOCH,
SyncPreference::RrdpThenRsync,
0,
);
assert_eq!(
action,
RepoRequestAction::FailedReuse {
detail: "timeout".to_string()
}
);
assert_eq!(coordinator.stats.repo_tasks_total, 1);
assert_eq!(coordinator.stats.repo_tasks_reused, 1);
}
}

25
src/parallel/stats.rs Normal file
View File

@ -0,0 +1,25 @@
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ParallelRunStats {
pub repo_tasks_total: usize,
pub repo_tasks_reused: usize,
pub repo_tasks_running: usize,
pub repo_tasks_failed: usize,
pub inflight_snapshot_bytes: usize,
pub repo_queue_depth: usize,
}
#[cfg(test)]
mod tests {
use super::ParallelRunStats;
#[test]
fn parallel_run_stats_default_to_zero() {
let stats = ParallelRunStats::default();
assert_eq!(stats.repo_tasks_total, 0);
assert_eq!(stats.repo_tasks_reused, 0);
assert_eq!(stats.repo_tasks_running, 0);
assert_eq!(stats.repo_tasks_failed, 0);
assert_eq!(stats.inflight_snapshot_bytes, 0);
assert_eq!(stats.repo_queue_depth, 0);
}
}

472
src/parallel/types.rs Normal file
View File

@ -0,0 +1,472 @@
use std::path::{Path, PathBuf};
use crate::policy::SyncPreference;
use crate::report::Warning;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TalSource {
Url(String),
DerBytes { tal_url: String, ta_der: Vec<u8> },
FilePath(PathBuf),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TalInputSpec {
pub tal_id: String,
pub rir_id: String,
pub source: TalSource,
}
impl TalInputSpec {
pub fn from_url(url: impl Into<String>) -> Self {
let url = url.into();
let tal_id = derive_tal_id_from_url_like(&url);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::Url(url),
}
}
pub fn from_file_path(path: impl Into<PathBuf>) -> Self {
let path = path.into();
let tal_id = derive_tal_id_from_path(&path);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::FilePath(path),
}
}
pub fn from_ta_der(tal_url: impl Into<String>, ta_der: Vec<u8>) -> Self {
let tal_url = tal_url.into();
let tal_id = derive_tal_id_from_url_like(&tal_url);
Self {
rir_id: tal_id.clone(),
tal_id,
source: TalSource::DerBytes { tal_url, ta_der },
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RepoIdentity {
pub notification_uri: Option<String>,
pub rsync_base_uri: String,
}
impl RepoIdentity {
pub fn new(notification_uri: Option<String>, rsync_base_uri: impl Into<String>) -> Self {
Self {
notification_uri,
rsync_base_uri: rsync_base_uri.into(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum RepoDedupKey {
RrdpNotify { notification_uri: String },
RsyncScope { rsync_scope_uri: String },
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoTransportMode {
Rrdp,
Rsync,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportTask {
pub dedup_key: RepoDedupKey,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
pub rir_id: String,
pub validation_time: time::OffsetDateTime,
pub priority: u8,
pub requesters: Vec<RepoRequester>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RepoTransportResultKind {
Success {
source: String,
warnings: Vec<Warning>,
},
Failed {
detail: String,
warnings: Vec<Warning>,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoTransportResultEnvelope {
pub dedup_key: RepoDedupKey,
pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode,
pub tal_id: String,
pub rir_id: String,
pub timing_ms: u64,
pub result: RepoTransportResultKind,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoRuntimeState {
Init,
WaitingRrdp,
RrdpOk,
RrdpFailedPendingRsync,
WaitingRsync,
RsyncOk,
FailedTerminal,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RepoKey {
pub rsync_base_uri: String,
pub notification_uri: Option<String>,
}
impl RepoKey {
pub fn new(rsync_base_uri: impl Into<String>, notification_uri: Option<String>) -> Self {
Self {
rsync_base_uri: rsync_base_uri.into(),
notification_uri,
}
}
pub fn as_identity(&self) -> RepoIdentity {
RepoIdentity {
notification_uri: self.notification_uri.clone(),
rsync_base_uri: self.rsync_base_uri.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoRequester {
pub tal_id: String,
pub rir_id: String,
pub parent_node_id: Option<u64>,
pub ca_instance_handle_id: String,
pub publication_point_rsync_uri: String,
pub manifest_rsync_uri: String,
}
impl RepoRequester {
pub fn with_tal_rir(
tal_id: impl Into<String>,
rir_id: impl Into<String>,
manifest_rsync_uri: impl Into<String>,
publication_point_rsync_uri: impl Into<String>,
ca_instance_handle_id: impl Into<String>,
) -> Self {
Self {
tal_id: tal_id.into(),
rir_id: rir_id.into(),
parent_node_id: None,
ca_instance_handle_id: ca_instance_handle_id.into(),
publication_point_rsync_uri: publication_point_rsync_uri.into(),
manifest_rsync_uri: manifest_rsync_uri.into(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncTask {
pub repo_key: RepoKey,
pub validation_time: time::OffsetDateTime,
pub sync_preference: SyncPreference,
pub tal_id: String,
pub rir_id: String,
pub priority: u8,
pub requesters: Vec<RepoRequester>,
}
impl RepoSyncTask {
pub fn as_transport_task(
&self,
dedup_key: RepoDedupKey,
mode: RepoTransportMode,
) -> RepoTransportTask {
RepoTransportTask {
dedup_key,
repo_identity: self.repo_key.as_identity(),
mode,
tal_id: self.tal_id.clone(),
rir_id: self.rir_id.clone(),
validation_time: self.validation_time,
priority: self.priority,
requesters: self.requesters.clone(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RepoTaskState {
Pending,
Running,
Succeeded,
Failed,
Reused,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncResultRef {
pub repo_key: RepoKey,
pub source: String,
}
impl RepoSyncResultRef {
pub fn as_identity(&self) -> RepoIdentity {
self.repo_key.as_identity()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InFlightRepoEntry {
pub state: RepoTaskState,
pub task_ref: Option<RepoSyncTask>,
pub waiting_requesters: Vec<RepoRequester>,
pub result_ref: Option<RepoSyncResultRef>,
pub last_result: Option<RepoSyncResultEnvelope>,
pub last_error: Option<String>,
pub started_at: Option<time::OffsetDateTime>,
pub finished_at: Option<time::OffsetDateTime>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncResultEnvelope {
pub repo_key: RepoKey,
pub tal_id: String,
pub rir_id: String,
pub result: RepoSyncResultKind,
pub phase: Option<String>,
pub timing_ms: u64,
pub warnings: Vec<Warning>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RepoSyncResultKind {
Success(RepoSyncResultRef),
Failed { detail: String },
Reused(RepoSyncResultRef),
}
fn derive_tal_id_from_url_like(s: &str) -> String {
if let Ok(url) = url::Url::parse(s) {
if let Some(last) = url
.path_segments()
.and_then(|segments| segments.filter(|seg| !seg.is_empty()).next_back())
{
let stem = last.rsplit_once('.').map(|(stem, _)| stem).unwrap_or(last);
let trimmed = stem.trim();
if !trimmed.is_empty() {
return trimmed.to_string();
}
}
if let Some(host) = url.host_str() {
return host.to_string();
}
}
"unknown-tal".to_string()
}
fn derive_tal_id_from_path(path: &Path) -> String {
path.file_stem()
.and_then(|stem| stem.to_str())
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.unwrap_or("unknown-tal")
.to_string()
}
#[cfg(test)]
mod tests {
use std::path::Path;
use crate::policy::SyncPreference;
use crate::report::Warning;
use super::{
derive_tal_id_from_path, derive_tal_id_from_url_like, RepoDedupKey, RepoIdentity,
RepoKey, RepoRequester, RepoRuntimeState, RepoSyncTask, RepoTaskState,
RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, TalInputSpec,
TalSource,
};
#[test]
fn tal_input_spec_from_url_derives_tal_and_rir_ids() {
let spec = TalInputSpec::from_url("https://example.test/tals/apnic.tal");
assert_eq!(spec.tal_id, "apnic");
assert_eq!(spec.rir_id, "apnic");
assert_eq!(
spec.source,
TalSource::Url("https://example.test/tals/apnic.tal".to_string())
);
}
#[test]
fn tal_input_spec_from_file_path_derives_file_stem() {
let spec = TalInputSpec::from_file_path("local/arin.tal");
assert_eq!(spec.tal_id, "arin");
assert_eq!(spec.rir_id, "arin");
}
#[test]
fn tal_input_spec_from_ta_der_preserves_payload() {
let spec = TalInputSpec::from_ta_der("https://example.test/ripe.tal", vec![1, 2, 3]);
assert_eq!(spec.tal_id, "ripe");
assert_eq!(spec.rir_id, "ripe");
assert_eq!(
spec.source,
TalSource::DerBytes {
tal_url: "https://example.test/ripe.tal".to_string(),
ta_der: vec![1, 2, 3],
}
);
}
#[test]
fn repo_key_equality_uses_rsync_base_and_notification() {
let a = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let b = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let c = RepoKey::new("rsync://example.test/repo/", None);
assert_eq!(a, b);
assert_ne!(a, c);
}
#[test]
fn repo_task_state_variants_are_distinct() {
assert_ne!(RepoTaskState::Pending, RepoTaskState::Running);
assert_ne!(RepoTaskState::Succeeded, RepoTaskState::Failed);
assert_ne!(RepoTaskState::Failed, RepoTaskState::Reused);
}
#[test]
fn repo_identity_preserves_raw_inputs() {
let ident = RepoIdentity::new(
Some("https://example.test/notify.xml".to_string()),
"rsync://example.test/repo/",
);
assert_eq!(
ident.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
assert_eq!(ident.rsync_base_uri, "rsync://example.test/repo/");
}
#[test]
fn repo_key_can_be_viewed_as_repo_identity() {
let key = RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
);
let ident = key.as_identity();
assert_eq!(ident.rsync_base_uri, "rsync://example.test/repo/");
assert_eq!(
ident.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
}
#[test]
fn repo_sync_task_maps_to_rrdp_transport_task() {
let task = RepoSyncTask {
repo_key: RepoKey::new(
"rsync://example.test/repo/",
Some("https://example.test/notify.xml".to_string()),
),
validation_time: time::OffsetDateTime::UNIX_EPOCH,
sync_preference: SyncPreference::RrdpThenRsync,
tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(),
priority: 1,
requesters: vec![RepoRequester::with_tal_rir(
"apnic",
"apnic",
"rsync://example.test/repo/root.mft",
"rsync://example.test/repo/",
"node:1",
)],
};
let transport = task.as_transport_task(
RepoDedupKey::RrdpNotify {
notification_uri: "https://example.test/notify.xml".to_string(),
},
RepoTransportMode::Rrdp,
);
assert_eq!(transport.mode, RepoTransportMode::Rrdp);
assert_eq!(transport.tal_id, "apnic");
assert_eq!(transport.rir_id, "apnic");
assert_eq!(transport.requesters.len(), 1);
assert_eq!(
transport.repo_identity.notification_uri.as_deref(),
Some("https://example.test/notify.xml")
);
}
#[test]
fn repo_transport_result_envelope_supports_success_and_failure_shapes() {
let identity = RepoIdentity::new(None, "rsync://example.test/repo/");
let ok = RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
repo_identity: identity.clone(),
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
timing_ms: 12,
result: RepoTransportResultKind::Success {
source: "rsync".to_string(),
warnings: vec![Warning::new("ok")],
},
};
let fail = RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/module/".to_string(),
},
repo_identity: identity,
mode: RepoTransportMode::Rsync,
tal_id: "arin".to_string(),
rir_id: "arin".to_string(),
timing_ms: 30,
result: RepoTransportResultKind::Failed {
detail: "timeout".to_string(),
warnings: vec![Warning::new("timeout")],
},
};
assert!(matches!(ok.result, RepoTransportResultKind::Success { .. }));
assert!(matches!(fail.result, RepoTransportResultKind::Failed { .. }));
}
#[test]
fn repo_runtime_state_variants_are_distinct() {
assert_ne!(RepoRuntimeState::Init, RepoRuntimeState::WaitingRrdp);
assert_ne!(RepoRuntimeState::RrdpOk, RepoRuntimeState::RsyncOk);
assert_ne!(
RepoRuntimeState::RrdpFailedPendingRsync,
RepoRuntimeState::FailedTerminal
);
}
#[test]
fn derive_tal_id_helpers_fall_back_safely() {
assert_eq!(
derive_tal_id_from_url_like("https://example.test/path/afrinic.tal"),
"afrinic"
);
assert_eq!(
derive_tal_id_from_path(Path::new("foo/lacnic.tal")),
"lacnic"
);
}
}

View File

@ -531,6 +531,16 @@ fn try_rrdp_sync_with_retry(
}
}
pub(crate) fn run_rrdp_transport(
store: &RocksStore,
notification_uri: &str,
http_fetcher: &dyn HttpFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RrdpSyncError> {
try_rrdp_sync_with_retry(store, notification_uri, http_fetcher, timing, download_log)
}
fn rsync_sync_into_current_store(
store: &RocksStore,
rsync_base_uri: &str,
@ -709,6 +719,16 @@ fn rsync_sync_into_current_store(
Ok(object_count)
}
pub(crate) fn run_rsync_transport(
store: &RocksStore,
rsync_base_uri: &str,
rsync_fetcher: &dyn RsyncFetcher,
timing: Option<&TimingHandle>,
download_log: Option<&DownloadLogHandle>,
) -> Result<usize, RepoSyncError> {
rsync_sync_into_current_store(store, rsync_base_uri, rsync_fetcher, timing, download_log)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -192,7 +192,7 @@ pub enum RrdpSyncError {
pub type RrdpSyncResult<T> = Result<T, RrdpSyncError>;
pub trait Fetcher {
pub trait Fetcher: Send + Sync {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String>;
fn fetch_to_writer(&self, uri: &str, out: &mut dyn Write) -> Result<u64, String> {

View File

@ -73,6 +73,7 @@ pub fn run_publication_point_once(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let result = runner

View File

@ -4,6 +4,13 @@ use crate::analysis::timing::TimingHandle;
use crate::audit::PublicationPointAudit;
use crate::audit_downloads::DownloadLogHandle;
use crate::data_model::ta::TrustAnchor;
use crate::parallel::config::ParallelPhase1Config;
use crate::parallel::repo_runtime::{Phase1RepoSyncRuntime, RepoSyncRuntime};
use crate::parallel::repo_worker::{
LiveRepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig,
};
use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{TalInputSpec, TalSource};
use crate::replay::archive::ReplayArchiveIndex;
use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::replay::delta_fetch_http::PayloadDeltaReplayHttpFetcher;
@ -78,6 +85,70 @@ pub struct RunTreeFromTalAuditOutput {
pub download_stats: crate::audit::AuditDownloadStats,
}
fn make_live_runner<'a>(
store: &'a crate::storage::RocksStore,
policy: &'a crate::policy::Policy,
http_fetcher: &'a dyn Fetcher,
rsync_fetcher: &'a dyn crate::fetch::rsync::RsyncFetcher,
validation_time: time::OffsetDateTime,
timing: Option<TimingHandle>,
download_log: Option<DownloadLogHandle>,
repo_sync_runtime: Option<Arc<dyn RepoSyncRuntime>>,
) -> Rpkiv1PublicationPointRunner<'a> {
Rpkiv1PublicationPointRunner {
store,
policy,
http_fetcher,
rsync_fetcher,
validation_time,
timing,
download_log,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime,
}
}
fn build_phase1_repo_sync_runtime<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
http_fetcher: &H,
rsync_fetcher: &R,
parallel_config: ParallelPhase1Config,
timing: Option<TimingHandle>,
download_log: Option<DownloadLogHandle>,
tal_inputs: Vec<crate::parallel::types::TalInputSpec>,
) -> Result<Arc<dyn RepoSyncRuntime>, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let coordinator = GlobalRunCoordinator::new(parallel_config.clone(), tal_inputs);
let rsync_fetcher_arc = Arc::new(rsync_fetcher.clone());
let executor = LiveRepoTransportExecutor::new(
Arc::clone(&store),
Arc::new(http_fetcher.clone()),
Arc::clone(&rsync_fetcher_arc),
timing,
download_log,
);
let pool = RepoTransportWorkerPool::new(RepoWorkerPoolConfig::from(&parallel_config), executor)
.map_err(RunTreeFromTalError::Replay)?;
let resolver: Arc<dyn Fn(&str) -> String + Send + Sync> =
Arc::new(move |base: &str| rsync_fetcher_arc.dedup_key(base));
let _ = policy; // policy reserved for later runtime-level decisions
Ok(Arc::new(Phase1RepoSyncRuntime::new(
coordinator,
pool,
resolver,
policy.sync_preference,
)))
}
#[derive(Debug, thiserror::Error)]
pub enum RunTreeFromTalError {
#[error("{0}")]
@ -123,21 +194,16 @@ pub fn run_tree_from_tal_url_serial(
) -> Result<RunTreeFromTalOutput, RunTreeFromTalError> {
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
let runner = Rpkiv1PublicationPointRunner {
let runner = make_live_runner(
store,
policy,
http_fetcher,
rsync_fetcher,
validation_time,
timing: None,
download_log: None,
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
};
None,
None,
None,
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
@ -162,21 +228,16 @@ pub fn run_tree_from_tal_url_serial_audit(
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
let runner = make_live_runner(
store,
policy,
http_fetcher,
rsync_fetcher,
validation_time,
timing: None,
download_log: Some(download_log.clone()),
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
};
None,
Some(download_log.clone()),
None,
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
@ -215,21 +276,16 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
drop(_tal);
let download_log = DownloadLogHandle::new();
let runner = Rpkiv1PublicationPointRunner {
let runner = make_live_runner(
store,
policy,
http_fetcher,
rsync_fetcher,
validation_time,
timing: Some(timing.clone()),
download_log: Some(download_log.clone()),
replay_archive_index: None,
replay_delta_index: None,
rrdp_dedup: true,
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
};
Some(timing.clone()),
Some(download_log.clone()),
None,
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
@ -254,6 +310,136 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
})
}
pub fn run_tree_from_tal_url_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_url: &str,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
let download_log = DownloadLogHandle::new();
let runtime = build_phase1_repo_sync_runtime(
Arc::clone(&store),
policy,
http_fetcher,
rsync_fetcher,
parallel_config,
None,
Some(download_log.clone()),
vec![TalInputSpec::from_url(tal_url.to_string())],
)?;
let runner = make_live_runner(
store.as_ref(),
policy,
http_fetcher,
rsync_fetcher,
validation_time,
None,
Some(download_log.clone()),
Some(runtime),
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
derive_tal_id(&discovery),
None,
&discovery.ca_instance,
);
let TreeRunAuditOutput {
tree,
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}
pub fn run_tree_from_tal_and_ta_der_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_bytes: &[u8],
ta_der: &[u8],
resolved_ta_uri: Option<&url::Url>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery = discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
let download_log = DownloadLogHandle::new();
let derived_tal_id = derive_tal_id(&discovery);
let tal_inputs = vec![TalInputSpec {
tal_id: derived_tal_id.clone(),
rir_id: derived_tal_id,
source: TalSource::DerBytes {
tal_url: discovery
.tal_url
.clone()
.unwrap_or_else(|| "embedded-tal".to_string()),
ta_der: ta_der.to_vec(),
},
}];
let runtime = build_phase1_repo_sync_runtime(
Arc::clone(&store),
policy,
http_fetcher,
rsync_fetcher,
parallel_config,
None,
Some(download_log.clone()),
tal_inputs,
)?;
let runner = make_live_runner(
store.as_ref(),
policy,
http_fetcher,
rsync_fetcher,
validation_time,
None,
Some(download_log.clone()),
Some(runtime),
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
derive_tal_id(&discovery),
None,
&discovery.ca_instance,
);
let TreeRunAuditOutput {
tree,
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery,
tree,
publication_points,
downloads,
download_stats,
})
}
pub fn run_tree_from_tal_and_ta_der_serial(
store: &crate::storage::RocksStore,
policy: &crate::policy::Policy,
@ -282,6 +468,7 @@ pub fn run_tree_from_tal_and_ta_der_serial(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -328,6 +515,7 @@ pub fn run_tree_from_tal_bytes_serial_audit(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -388,6 +576,7 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -443,6 +632,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -499,6 +689,7 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -562,6 +753,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -614,6 +806,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -680,6 +873,7 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let root = root_handle_from_trust_anchor(
@ -729,6 +923,7 @@ fn build_payload_replay_runner<'a>(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
}
}
@ -756,6 +951,7 @@ fn build_payload_delta_replay_runner<'a>(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
}
}
@ -783,6 +979,7 @@ fn build_payload_delta_replay_current_store_runner<'a>(
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
}
}

View File

@ -95,6 +95,13 @@ pub trait PublicationPointRunner {
&self,
ca: &CaInstanceHandle,
) -> Result<PublicationPointRunResult, String>;
fn prefetch_discovered_children(
&self,
_children: &[DiscoveredChildCaInstance],
) -> Result<(), String> {
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -198,6 +205,12 @@ pub fn run_tree_serial_audit(
.cmp(&b.discovered_from.child_ca_certificate_rsync_uri)
})
});
if let Err(e) = runner.prefetch_discovered_children(&children) {
warnings.push(
Warning::new(format!("prefetch discovered children failed: {e}"))
.with_context(&ca.manifest_rsync_uri),
);
}
for child in children {
queue.push_back(QueuedCaInstance {
id: next_id,
@ -221,3 +234,109 @@ pub fn run_tree_serial_audit(
publication_points,
})
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use crate::audit::{DiscoveredFrom, PublicationPointAudit};
use crate::validation::objects::{ObjectsOutput, ObjectsStats};
use super::{
CaInstanceHandle, DiscoveredChildCaInstance, PublicationPointRunResult,
PublicationPointRunner, TreeRunConfig, run_tree_serial_audit,
};
fn sample_handle(manifest: &str) -> CaInstanceHandle {
CaInstanceHandle {
depth: 0,
tal_id: "arin".to_string(),
parent_manifest_rsync_uri: None,
ca_certificate_der: vec![1, 2, 3],
ca_certificate_rsync_uri: None,
effective_ip_resources: None,
effective_as_resources: None,
rsync_base_uri: "rsync://example.test/repo/".to_string(),
manifest_rsync_uri: manifest.to_string(),
publication_point_rsync_uri: "rsync://example.test/repo/".to_string(),
rrdp_notification_uri: None,
}
}
struct PrefetchRecordingRunner {
seen_prefetch_children: Arc<Mutex<Vec<String>>>,
}
impl PublicationPointRunner for PrefetchRecordingRunner {
fn prefetch_discovered_children(
&self,
children: &[DiscoveredChildCaInstance],
) -> Result<(), String> {
let mut seen = self
.seen_prefetch_children
.lock()
.expect("prefetch lock poisoned");
seen.extend(
children
.iter()
.map(|child| child.handle.manifest_rsync_uri.clone()),
);
Ok(())
}
fn run_publication_point(
&self,
ca: &CaInstanceHandle,
) -> Result<PublicationPointRunResult, String> {
let children = if ca.manifest_rsync_uri.ends_with("root.mft") {
vec![DiscoveredChildCaInstance {
handle: sample_handle("rsync://example.test/repo/child.mft"),
discovered_from: DiscoveredFrom {
parent_manifest_rsync_uri: ca.manifest_rsync_uri.clone(),
child_ca_certificate_rsync_uri: "rsync://example.test/repo/child.cer"
.to_string(),
child_ca_certificate_sha256_hex: "00".repeat(32),
},
}]
} else {
Vec::new()
};
Ok(PublicationPointRunResult {
source: crate::validation::manifest::PublicationPointSource::Fresh,
snapshot: None,
warnings: Vec::new(),
objects: ObjectsOutput {
vrps: Vec::new(),
aspas: Vec::new(),
router_keys: Vec::new(),
local_outputs_cache: Vec::new(),
warnings: Vec::new(),
stats: ObjectsStats::default(),
audit: Vec::new(),
},
audit: PublicationPointAudit::default(),
discovered_children: children,
})
}
}
#[test]
fn run_tree_serial_audit_invokes_prefetch_hook_for_discovered_children() {
let seen = Arc::new(Mutex::new(Vec::new()));
let runner = PrefetchRecordingRunner {
seen_prefetch_children: Arc::clone(&seen),
};
let out = run_tree_serial_audit(
sample_handle("rsync://example.test/repo/root.mft"),
&runner,
&TreeRunConfig::default(),
)
.expect("tree run");
assert_eq!(out.tree.instances_processed, 2);
assert_eq!(
seen.lock().expect("lock").as_slice(),
["rsync://example.test/repo/child.mft"]
);
}
}

View File

@ -15,6 +15,7 @@ use crate::data_model::router_cert::{
};
use crate::fetch::rsync::RsyncFetcher;
use crate::policy::Policy;
use crate::parallel::repo_runtime::{RepoSyncRuntime, RepoSyncRuntimeOutcome};
use crate::replay::archive::ReplayArchiveIndex;
use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::report::{RfcRef, Warning};
@ -77,9 +78,20 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
/// same `rsync_base_uri` (observed in APNIC full sync timing reports).
pub rsync_dedup: bool,
pub rsync_repo_cache: Mutex<HashMap<String, bool>>, // rsync_base_uri -> rsync_ok
pub repo_sync_runtime: Option<Arc<dyn RepoSyncRuntime>>,
}
impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
fn prefetch_discovered_children(
&self,
children: &[DiscoveredChildCaInstance],
) -> Result<(), String> {
if let Some(runtime) = self.repo_sync_runtime.as_ref() {
runtime.prefetch_discovered_children(children)?;
}
Ok(())
}
fn run_publication_point(
&self,
ca: &CaInstanceHandle,
@ -174,7 +186,18 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
Option<String>,
Option<String>,
Option<String>,
) = if skip_sync_due_to_dedup {
) = if let Some(runtime) = self.repo_sync_runtime.as_ref() {
let RepoSyncRuntimeOutcome {
repo_sync_ok,
repo_sync_err,
repo_sync_source,
repo_sync_phase,
repo_sync_duration_ms: _,
warnings: repo_warnings,
} = runtime.sync_publication_point_repo(ca)?;
warnings.extend(repo_warnings);
(repo_sync_ok, repo_sync_err, repo_sync_source, repo_sync_phase)
} else if skip_sync_due_to_dedup {
let source = if effective_notification_uri.is_some() {
Some("rrdp_dedup_skip".to_string())
} else {
@ -4023,6 +4046,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
// For this fixture-driven smoke, we provide the correct issuer CA certificate (the CA for
@ -4191,6 +4215,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -4298,6 +4323,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -4408,6 +4434,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let first = runner.run_publication_point(&handle).expect("first run ok");
@ -4490,6 +4517,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let first = ok_runner
.run_publication_point(&handle)
@ -4515,6 +4543,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let second = bad_runner
.run_publication_point(&handle)
@ -5699,6 +5728,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: false,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let first = runner_rrdp
.run_publication_point(&handle)
@ -5727,6 +5757,7 @@ authorityKeyIdentifier = keyid:always
rrdp_repo_cache: Mutex::new(HashMap::new()),
rsync_dedup: true,
rsync_repo_cache: Mutex::new(HashMap::new()),
repo_sync_runtime: None,
};
let third = runner_rsync
.run_publication_point(&handle)

View File

@ -0,0 +1,84 @@
use std::path::PathBuf;
fn fixture_path(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel)
}
fn run_offline_case(parallel_phase1: bool) -> (serde_json::Value, Vec<u8>) {
let db_dir = tempfile::tempdir().expect("db tempdir");
let out_dir = tempfile::tempdir().expect("out tempdir");
let report_path = out_dir.path().join("report.json");
let ccr_path = out_dir.path().join("result.ccr");
let mut argv = vec![
"rpki".to_string(),
"--db".to_string(),
db_dir.path().to_string_lossy().to_string(),
"--tal-path".to_string(),
fixture_path("tests/fixtures/tal/apnic-rfc7730-https.tal")
.to_string_lossy()
.to_string(),
"--ta-path".to_string(),
fixture_path("tests/fixtures/ta/apnic-ta.cer")
.to_string_lossy()
.to_string(),
"--disable-rrdp".to_string(),
"--rsync-local-dir".to_string(),
fixture_path("tests/fixtures/repository")
.to_string_lossy()
.to_string(),
"--validation-time".to_string(),
"2026-04-07T00:00:00Z".to_string(),
"--max-depth".to_string(),
"4".to_string(),
"--max-instances".to_string(),
"64".to_string(),
"--report-json".to_string(),
report_path.to_string_lossy().to_string(),
"--ccr-out".to_string(),
ccr_path.to_string_lossy().to_string(),
];
if parallel_phase1 {
argv.push("--parallel-phase1".to_string());
}
rpki::cli::run(&argv).expect("cli run");
let report_bytes = std::fs::read(&report_path).expect("read report");
let report: serde_json::Value = serde_json::from_slice(&report_bytes).expect("parse report");
let ccr_bytes = std::fs::read(&ccr_path).expect("read ccr");
(report, ccr_bytes)
}
#[test]
fn offline_serial_and_parallel_phase1_match_compare_views() {
let (serial_report, serial_ccr_bytes) = run_offline_case(false);
let (parallel_report, parallel_ccr_bytes) = run_offline_case(true);
let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr");
let parallel_ccr =
rpki::ccr::decode_content_info(&parallel_ccr_bytes).expect("decode parallel ccr");
let (serial_vrps, serial_vaps) =
rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view");
let (parallel_vrps, parallel_vaps) = rpki::bundle::decode_ccr_compare_views(
&parallel_ccr,
"apnic",
)
.expect("parallel compare view");
assert_eq!(serial_vrps, parallel_vrps, "VRP compare views must match");
assert_eq!(serial_vaps, parallel_vaps, "VAP compare views must match");
let serial_points = serial_report["publication_points"]
.as_array()
.expect("serial publication_points");
let parallel_points = parallel_report["publication_points"]
.as_array()
.expect("parallel publication_points");
assert_eq!(
serial_points.len(),
parallel_points.len(),
"publication point counts must match"
);
}