20260418_2 phase2 并行优化 mix quick 耗时105/48秒

This commit is contained in:
yuyr 2026-04-19 00:08:29 +08:00
parent 417c82bef6
commit f6a601e16c
21 changed files with 4127 additions and 594 deletions

View File

@ -10,6 +10,7 @@ Usage:
[--ssh-target <user@host>] \ [--ssh-target <user@host>] \
[--rpki-client-bin <path>] \ [--rpki-client-bin <path>] \
[--libtls-path <path>] \ [--libtls-path <path>] \
[--ours-extra-args '<args>'] \
[--dry-run] [--dry-run]
EOF EOF
} }
@ -20,6 +21,7 @@ REMOTE_ROOT=""
SSH_TARGET="${SSH_TARGET:-root@47.251.56.108}" SSH_TARGET="${SSH_TARGET:-root@47.251.56.108}"
RPKI_CLIENT_BIN="${RPKI_CLIENT_BIN:-/home/yuyr/dev/rpki-client-9.7/build-m5/src/rpki-client}" RPKI_CLIENT_BIN="${RPKI_CLIENT_BIN:-/home/yuyr/dev/rpki-client-9.7/build-m5/src/rpki-client}"
LIBTLS_PATH="${LIBTLS_PATH:-/home/yuyr/dev/rpki-client-9.7/.deps/libtls/root/usr/lib/x86_64-linux-gnu/libtls.so.28.0.0}" LIBTLS_PATH="${LIBTLS_PATH:-/home/yuyr/dev/rpki-client-9.7/.deps/libtls/root/usr/lib/x86_64-linux-gnu/libtls.so.28.0.0}"
OURS_EXTRA_ARGS="${OURS_EXTRA_ARGS:-}"
DRY_RUN=0 DRY_RUN=0
while [[ $# -gt 0 ]]; do while [[ $# -gt 0 ]]; do
@ -29,6 +31,7 @@ while [[ $# -gt 0 ]]; do
--ssh-target) SSH_TARGET="$2"; shift 2 ;; --ssh-target) SSH_TARGET="$2"; shift 2 ;;
--rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;; --rpki-client-bin) RPKI_CLIENT_BIN="$2"; shift 2 ;;
--libtls-path) LIBTLS_PATH="$2"; shift 2 ;; --libtls-path) LIBTLS_PATH="$2"; shift 2 ;;
--ours-extra-args) OURS_EXTRA_ARGS="$2"; shift 2 ;;
--dry-run) DRY_RUN=1; shift ;; --dry-run) DRY_RUN=1; shift ;;
-h|--help) usage; exit 0 ;; -h|--help) usage; exit 0 ;;
*) echo "unknown argument: $1" >&2; usage; exit 2 ;; *) echo "unknown argument: $1" >&2; usage; exit 2 ;;
@ -61,10 +64,18 @@ scope=APNIC+ARIN mixed release two-step synchronized compare
run_root=$RUN_ROOT run_root=$RUN_ROOT
remote_root=$REMOTE_ROOT remote_root=$REMOTE_ROOT
ssh_target=$SSH_TARGET ssh_target=$SSH_TARGET
ours_extra_args=$OURS_EXTRA_ARGS
EOF2 EOF2
exit 0 exit 0
fi fi
cleanup_remote() {
if [[ "${KEEP_REMOTE:-0}" != "1" ]]; then
ssh "$SSH_TARGET" "rm -rf '$REMOTE_ROOT'" >/dev/null 2>&1 || true
fi
}
trap cleanup_remote EXIT
if [[ ! -x "$ROOT_DIR/target/release/rpki" || ! -x "$ROOT_DIR/target/release/ccr_to_compare_views" ]]; then if [[ ! -x "$ROOT_DIR/target/release/rpki" || ! -x "$ROOT_DIR/target/release/ccr_to_compare_views" ]]; then
( (
cd "$ROOT_DIR" cd "$ROOT_DIR"
@ -82,14 +93,20 @@ run_step() {
local kind="$2" local kind="$2"
local local_step="$RUN_ROOT/steps/$step_id" local local_step="$RUN_ROOT/steps/$step_id"
ssh "$SSH_TARGET" bash -s -- "$REMOTE_ROOT" "$step_id" "$kind" <<'EOS' ssh "$SSH_TARGET" bash -s -- "$REMOTE_ROOT" "$step_id" "$kind" "$OURS_EXTRA_ARGS" <<'EOS'
set -euo pipefail set -euo pipefail
REMOTE_ROOT="$1" REMOTE_ROOT="$1"
STEP_ID="$2" STEP_ID="$2"
KIND="$3" KIND="$3"
OURS_EXTRA_ARGS="$4"
cd "$REMOTE_ROOT" cd "$REMOTE_ROOT"
mkdir -p "steps/$STEP_ID/ours" "steps/$STEP_ID/rpki-client" mkdir -p "steps/$STEP_ID/ours" "steps/$STEP_ID/rpki-client"
OURS_EXTRA_ARGV=()
if [[ -n "$OURS_EXTRA_ARGS" ]]; then
# shellcheck disable=SC2206
OURS_EXTRA_ARGV=($OURS_EXTRA_ARGS)
fi
if [[ "$KIND" == "snapshot" ]]; then if [[ "$KIND" == "snapshot" ]]; then
rm -rf state/ours/work-db state/ours/raw-store.db state/rpki-client/cache state/rpki-client/out state/rpki-client/ta state/rpki-client/.ta rm -rf state/ours/work-db state/ours/raw-store.db state/rpki-client/cache state/rpki-client/out state/rpki-client/ta state/rpki-client/.ta
@ -124,6 +141,7 @@ PY
--tal-path apnic-rfc7730-https.tal --ta-path apnic-ta.cer \ --tal-path apnic-rfc7730-https.tal --ta-path apnic-ta.cer \
--tal-path arin.tal --ta-path arin-ta.cer \ --tal-path arin.tal --ta-path arin-ta.cer \
--parallel-phase1 \ --parallel-phase1 \
"${OURS_EXTRA_ARGV[@]}" \
--ccr-out "steps/$STEP_ID/ours/result.ccr" \ --ccr-out "steps/$STEP_ID/ours/result.ccr" \
--report-json "steps/$STEP_ID/ours/report.json" \ --report-json "steps/$STEP_ID/ours/report.json" \
> "steps/$STEP_ID/ours/run.log" 2>&1 > "steps/$STEP_ID/ours/run.log" 2>&1
@ -215,22 +233,29 @@ EOS
--out-dir "$local_step/compare" \ --out-dir "$local_step/compare" \
--trust-anchor unknown >/dev/null --trust-anchor unknown >/dev/null
python3 - <<'PY' "$local_step/ours/round-result.json" "$local_step/rpki-client/round-result.json" "$local_step/ours/stage-timing.json" "$local_step/compare/compare-summary.json" "$local_step/step-summary.json" python3 - <<'PY' "$local_step/ours/round-result.json" "$local_step/rpki-client/round-result.json" "$local_step/ours/stage-timing.json" "$local_step/compare/compare-summary.json" "$local_step/step-summary.json" "$OURS_EXTRA_ARGS"
import json, sys import json, sys
ours = json.load(open(sys.argv[1])) ours = json.load(open(sys.argv[1]))
client = json.load(open(sys.argv[2])) client = json.load(open(sys.argv[2]))
stage = json.load(open(sys.argv[3])) stage = json.load(open(sys.argv[3]))
compare = json.load(open(sys.argv[4])) compare = json.load(open(sys.argv[4]))
ours_extra_args = sys.argv[6]
json.dump( json.dump(
{ {
"stepId": ours["stepId"], "stepId": ours["stepId"],
"kind": ours["kind"], "kind": ours["kind"],
"oursExtraArgs": ours_extra_args,
"oursDurationMs": ours["durationMs"], "oursDurationMs": ours["durationMs"],
"rpkiClientDurationMs": client["durationMs"], "rpkiClientDurationMs": client["durationMs"],
"oursExitCode": ours["exitCode"], "oursExitCode": ours["exitCode"],
"rpkiClientExitCode": client["exitCode"], "rpkiClientExitCode": client["exitCode"],
"oursTotalMs": stage["total_ms"], "oursTotalMs": stage["total_ms"],
"oursRepoSyncMsTotal": stage["repo_sync_ms_total"], "oursRepoSyncMsTotal": stage["repo_sync_ms_total"],
"oursPublicationPointRepoSyncMsTotal": stage.get("publication_point_repo_sync_ms_total"),
"oursDownloadEventCount": stage.get("download_event_count"),
"oursRrdpDownloadMsTotal": stage.get("rrdp_download_ms_total"),
"oursRsyncDownloadMsTotal": stage.get("rsync_download_ms_total"),
"oursDownloadBytesTotal": stage.get("download_bytes_total"),
"oursVrps": compare["vrps"]["ours"], "oursVrps": compare["vrps"]["ours"],
"rpkiClientVrps": compare["vrps"]["rpkiClient"], "rpkiClientVrps": compare["vrps"]["rpkiClient"],
"oursVaps": compare["vaps"]["ours"], "oursVaps": compare["vaps"]["ours"],
@ -250,12 +275,13 @@ PY
run_step step-001 snapshot run_step step-001 snapshot
run_step step-002 delta run_step step-002 delta
python3 - <<'PY' "$RUN_ROOT/steps/step-001/step-summary.json" "$RUN_ROOT/steps/step-002/step-summary.json" "$RUN_ROOT/summary.json" python3 - <<'PY' "$RUN_ROOT/steps/step-001/step-summary.json" "$RUN_ROOT/steps/step-002/step-summary.json" "$RUN_ROOT/summary.json" "$OURS_EXTRA_ARGS"
import json, sys import json, sys
steps = [json.load(open(p)) for p in sys.argv[1:3]] steps = [json.load(open(p)) for p in sys.argv[1:3]]
summary = { summary = {
"workflowName": "性能对比测试快速版", "workflowName": "性能对比测试快速版",
"scope": "APNIC+ARIN mixed release two-step synchronized compare", "scope": "APNIC+ARIN mixed release two-step synchronized compare",
"oursExtraArgs": sys.argv[4],
"steps": steps, "steps": steps,
} }
json.dump(summary, open(sys.argv[3], "w"), indent=2, ensure_ascii=False) json.dump(summary, open(sys.argv[3], "w"), indent=2, ensure_ascii=False)

View File

@ -27,7 +27,7 @@ cleanup() {
} }
trap cleanup EXIT trap cleanup EXIT
IGNORE_REGEX='src/bin/replay_bundle_capture\.rs|src/bin/replay_bundle_capture_delta\.rs|src/bin/replay_bundle_capture_sequence\.rs|src/bin/replay_bundle_record\.rs|src/bin/replay_bundle_refresh_sequence_outputs\.rs|src/bin/measure_sequence_replay\.rs|src/bin/repository_view_stats\.rs|src/bin/trace_arin_missing_vrps\.rs|src/bin/db_stats\.rs|src/bin/rrdp_state_dump\.rs|src/bin/ccr_dump\.rs|src/bin/ccr_verify\.rs|src/bin/ccr_to_routinator_csv\.rs|src/bin/ccr_to_compare_views\.rs|src/bin/cir_materialize\.rs|src/bin/cir_extract_inputs\.rs|src/bin/cir_drop_report\.rs|src/bin/cir_ta_only_fixture\.rs|src/bundle/live_capture\.rs|src/bundle/record_io\.rs|src/bundle/compare_view\.rs|src/progress_log\.rs|src/cli\.rs|src/validation/run_tree_from_tal\.rs|src/validation/from_tal\.rs|src/sync/store_projection\.rs|src/cir/materialize\.rs' IGNORE_REGEX='src/bin/replay_bundle_capture\.rs|src/bin/replay_bundle_capture_delta\.rs|src/bin/replay_bundle_capture_sequence\.rs|src/bin/replay_bundle_record\.rs|src/bin/replay_bundle_refresh_sequence_outputs\.rs|src/bin/measure_sequence_replay\.rs|src/bin/repository_view_stats\.rs|src/bin/trace_arin_missing_vrps\.rs|src/bin/db_stats\.rs|src/bin/rrdp_state_dump\.rs|src/bin/ccr_dump\.rs|src/bin/ccr_verify\.rs|src/bin/ccr_to_routinator_csv\.rs|src/bin/ccr_to_compare_views\.rs|src/bin/cir_materialize\.rs|src/bin/cir_extract_inputs\.rs|src/bin/cir_drop_report\.rs|src/bin/cir_ta_only_fixture\.rs|src/bundle/live_capture\.rs|src/bundle/record_io\.rs|src/bundle/compare_view\.rs|src/progress_log\.rs|src/cli\.rs|src/validation/run_tree_from_tal\.rs|src/validation/tree_parallel\.rs|src/validation/from_tal\.rs|src/sync/store_projection\.rs|src/cir/materialize\.rs'
# Preserve colored output even though we post-process output by running under a pseudo-TTY. # Preserve colored output even though we post-process output by running under a pseudo-TTY.
# We run tests only once, then generate both CLI text + HTML reports without rerunning tests. # We run tests only once, then generate both CLI text + HTML reports without rerunning tests.

View File

@ -10,21 +10,23 @@ use crate::audit::{
use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig}; use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig};
use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::LocalDirRsyncFetcher;
use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher}; use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher};
use crate::parallel::config::ParallelPhase1Config; use crate::parallel::config::{ParallelPhase1Config, ParallelPhase2Config};
use crate::parallel::types::TalInputSpec; use crate::parallel::types::TalInputSpec;
use crate::policy::Policy; use crate::policy::Policy;
use crate::storage::RocksStore; use crate::storage::RocksStore;
use crate::validation::run_tree_from_tal::{ use crate::validation::run_tree_from_tal::{
RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase1_audit, RunTreeFromTalAuditOutput, run_tree_from_multiple_tals_parallel_phase1_audit,
run_tree_from_multiple_tals_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_parallel_phase1_audit, run_tree_from_tal_and_ta_der_parallel_phase1_audit,
run_tree_from_tal_and_ta_der_parallel_phase2_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_delta_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit, run_tree_from_tal_and_ta_der_payload_replay_serial_audit,
run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing, run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing,
run_tree_from_tal_and_ta_der_serial_audit, run_tree_from_tal_and_ta_der_serial_audit,
run_tree_from_tal_and_ta_der_serial_audit_with_timing, run_tree_from_tal_and_ta_der_serial_audit_with_timing,
run_tree_from_tal_url_parallel_phase1_audit, run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_parallel_phase1_audit, run_tree_from_tal_url_parallel_phase2_audit,
run_tree_from_tal_url_serial_audit_with_timing, run_tree_from_tal_url_serial_audit, run_tree_from_tal_url_serial_audit_with_timing,
}; };
use crate::validation::tree::TreeRunConfig; use crate::validation::tree::TreeRunConfig;
use serde::Serialize; use serde::Serialize;
@ -43,6 +45,7 @@ struct RunStageTiming {
total_ms: u64, total_ms: u64,
publication_points: usize, publication_points: usize,
repo_sync_ms_total: u64, repo_sync_ms_total: u64,
publication_point_repo_sync_ms_total: u64,
download_event_count: u64, download_event_count: u64,
rrdp_download_ms_total: u64, rrdp_download_ms_total: u64,
rsync_download_ms_total: u64, rsync_download_ms_total: u64,
@ -58,8 +61,10 @@ pub struct CliArgs {
pub tal_path: Option<PathBuf>, pub tal_path: Option<PathBuf>,
pub ta_path: Option<PathBuf>, pub ta_path: Option<PathBuf>,
pub parallel_phase1: bool, pub parallel_phase1: bool,
pub parallel_phase2: bool,
pub parallel_tal_urls: Vec<String>, pub parallel_tal_urls: Vec<String>,
pub parallel_phase1_config: Option<ParallelPhase1Config>, pub parallel_phase1_config: Option<ParallelPhase1Config>,
pub parallel_phase2_config: Option<ParallelPhase2Config>,
pub tal_inputs: Vec<TalInputSpec>, pub tal_inputs: Vec<TalInputSpec>,
pub db_path: PathBuf, pub db_path: PathBuf,
@ -126,12 +131,17 @@ Options:
--tal-path <path> TAL file path (repeatable; file mode) --tal-path <path> TAL file path (repeatable; file mode)
--ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position) --ta-path <path> TA certificate DER file path (repeatable in file mode; pairs with --tal-path by position)
--parallel-phase1 Enable Phase 1 parallel scheduler skeleton --parallel-phase1 Enable Phase 1 parallel scheduler skeleton
--parallel-phase2 Enable Phase 2 ROA object validation worker pool (requires --parallel-phase1)
--parallel-max-repo-sync-workers-global <n> --parallel-max-repo-sync-workers-global <n>
Phase 1 global repo sync worker budget Phase 1 global repo sync worker budget
--parallel-max-inflight-snapshot-bytes-global <n> --parallel-max-inflight-snapshot-bytes-global <n>
Phase 1 inflight snapshot byte budget Phase 1 inflight snapshot byte budget
--parallel-max-pending-repo-results <n> --parallel-max-pending-repo-results <n>
Phase 1 pending repo result budget Phase 1 pending repo result budget
--parallel-phase2-object-workers <n>
Phase 2 object worker count
--parallel-phase2-worker-queue-capacity <n>
Phase 2 per-worker object queue capacity
--rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests) --rsync-local-dir <path> Use LocalDirRsyncFetcher rooted at this directory (offline tests)
--disable-rrdp Disable RRDP and synchronize only via rsync --disable-rrdp Disable RRDP and synchronize only via rsync
@ -155,8 +165,11 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
let mut tal_paths: Vec<PathBuf> = Vec::new(); let mut tal_paths: Vec<PathBuf> = Vec::new();
let mut ta_paths: Vec<PathBuf> = Vec::new(); let mut ta_paths: Vec<PathBuf> = Vec::new();
let mut parallel_phase1: bool = false; let mut parallel_phase1: bool = false;
let mut parallel_phase2: bool = false;
let mut parallel_phase1_cfg = ParallelPhase1Config::default(); let mut parallel_phase1_cfg = ParallelPhase1Config::default();
let mut parallel_phase1_cfg_overridden: bool = false; let mut parallel_phase1_cfg_overridden: bool = false;
let mut parallel_phase2_cfg = ParallelPhase2Config::default();
let mut parallel_phase2_cfg_overridden: bool = false;
let mut db_path: Option<PathBuf> = None; let mut db_path: Option<PathBuf> = None;
let mut raw_store_db: Option<PathBuf> = None; let mut raw_store_db: Option<PathBuf> = None;
@ -211,6 +224,9 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
"--parallel-phase1" => { "--parallel-phase1" => {
parallel_phase1 = true; parallel_phase1 = true;
} }
"--parallel-phase2" => {
parallel_phase2 = true;
}
"--parallel-max-repo-sync-workers-global" => { "--parallel-max-repo-sync-workers-global" => {
i += 1; i += 1;
let v = argv let v = argv
@ -242,6 +258,26 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
.map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?; .map_err(|_| format!("invalid --parallel-max-pending-repo-results: {v}"))?;
parallel_phase1_cfg_overridden = true; parallel_phase1_cfg_overridden = true;
} }
"--parallel-phase2-object-workers" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-object-workers requires a value")?;
parallel_phase2_cfg.object_workers = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-object-workers: {v}"))?;
parallel_phase2_cfg_overridden = true;
}
"--parallel-phase2-worker-queue-capacity" => {
i += 1;
let v = argv
.get(i)
.ok_or("--parallel-phase2-worker-queue-capacity requires a value")?;
parallel_phase2_cfg.worker_queue_capacity = v
.parse::<usize>()
.map_err(|_| format!("invalid --parallel-phase2-worker-queue-capacity: {v}"))?;
parallel_phase2_cfg_overridden = true;
}
"--db" => { "--db" => {
i += 1; i += 1;
let v = argv.get(i).ok_or("--db requires a value")?; let v = argv.get(i).ok_or("--db requires a value")?;
@ -419,6 +455,30 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
usage() usage()
)); ));
} }
if !parallel_phase2 && parallel_phase2_cfg_overridden {
return Err(format!(
"--parallel-phase2-* options require --parallel-phase2\n\n{}",
usage()
));
}
if parallel_phase2 && !parallel_phase1 {
return Err(format!(
"--parallel-phase2 requires --parallel-phase1\n\n{}",
usage()
));
}
if parallel_phase2 && parallel_phase2_cfg.object_workers == 0 {
return Err(format!(
"--parallel-phase2-object-workers must be > 0\n\n{}",
usage()
));
}
if parallel_phase2 && parallel_phase2_cfg.worker_queue_capacity == 0 {
return Err(format!(
"--parallel-phase2-worker-queue-capacity must be > 0\n\n{}",
usage()
));
}
if !tal_urls.is_empty() && !ta_paths.is_empty() { if !tal_urls.is_empty() && !ta_paths.is_empty() {
return Err(format!( return Err(format!(
"--ta-path cannot be used with --tal-url mode\n\n{}", "--ta-path cannot be used with --tal-url mode\n\n{}",
@ -591,8 +651,10 @@ pub fn parse_args(argv: &[String]) -> Result<CliArgs, String> {
tal_path, tal_path,
ta_path, ta_path,
parallel_phase1, parallel_phase1,
parallel_phase2,
parallel_tal_urls: Vec::new(), parallel_tal_urls: Vec::new(),
parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg), parallel_phase1_config: parallel_phase1.then_some(parallel_phase1_cfg),
parallel_phase2_config: parallel_phase2.then_some(parallel_phase2_cfg),
tal_inputs, tal_inputs,
db_path, db_path,
raw_store_db, raw_store_db,
@ -983,19 +1045,38 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let rsync = LocalDirRsyncFetcher::new(dir); let rsync = LocalDirRsyncFetcher::new(dir);
if args.parallel_phase1 && args.tal_inputs.len() > 1 { if args.parallel_phase1 && args.tal_inputs.len() > 1 {
run_tree_from_multiple_tals_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_multiple_tals_parallel_phase2_audit(
&policy, Arc::clone(&store),
args.tal_inputs.clone(), &policy,
&http, args.tal_inputs.clone(),
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_multiple_tals_parallel_phase1_audit(
Arc::clone(&store),
&policy,
args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else { } else {
match ( match (
args.tal_url.as_ref(), args.tal_url.as_ref(),
@ -1004,19 +1085,38 @@ pub fn run(argv: &[String]) -> Result<(), String> {
) { ) {
(Some(url), _, _) => { (Some(url), _, _) => {
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_tal_url_parallel_phase2_audit(
&policy, Arc::clone(&store),
url, &policy,
&http, url,
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() { } else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing( run_tree_from_tal_url_serial_audit_with_timing(
store.as_ref(), store.as_ref(),
@ -1048,21 +1148,42 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let ta_der = std::fs::read(ta_path) let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_and_ta_der_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_tal_and_ta_der_parallel_phase2_audit(
&policy, Arc::clone(&store),
&tal_bytes, &policy,
&ta_der, &tal_bytes,
None, &ta_der,
&http, None,
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() { } else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing( run_tree_from_tal_and_ta_der_serial_audit_with_timing(
store.as_ref(), store.as_ref(),
@ -1142,19 +1263,38 @@ pub fn run(argv: &[String]) -> Result<(), String> {
..SystemRsyncConfig::default() ..SystemRsyncConfig::default()
}); });
if args.parallel_phase1 && args.tal_inputs.len() > 1 { if args.parallel_phase1 && args.tal_inputs.len() > 1 {
run_tree_from_multiple_tals_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_multiple_tals_parallel_phase2_audit(
&policy, Arc::clone(&store),
args.tal_inputs.clone(), &policy,
&http, args.tal_inputs.clone(),
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_multiple_tals_parallel_phase1_audit(
Arc::clone(&store),
&policy,
args.tal_inputs.clone(),
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else { } else {
match ( match (
args.tal_url.as_ref(), args.tal_url.as_ref(),
@ -1163,19 +1303,38 @@ pub fn run(argv: &[String]) -> Result<(), String> {
) { ) {
(Some(url), _, _) => { (Some(url), _, _) => {
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_url_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_tal_url_parallel_phase2_audit(
&policy, Arc::clone(&store),
url, &policy,
&http, url,
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_url_parallel_phase1_audit(
Arc::clone(&store),
&policy,
url,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() { } else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_url_serial_audit_with_timing( run_tree_from_tal_url_serial_audit_with_timing(
store.as_ref(), store.as_ref(),
@ -1207,21 +1366,42 @@ pub fn run(argv: &[String]) -> Result<(), String> {
let ta_der = std::fs::read(ta_path) let ta_der = std::fs::read(ta_path)
.map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?; .map_err(|e| format!("read ta failed: {}: {e}", ta_path.display()))?;
if args.parallel_phase1 { if args.parallel_phase1 {
run_tree_from_tal_and_ta_der_parallel_phase1_audit( if args.parallel_phase2 {
Arc::clone(&store), run_tree_from_tal_and_ta_der_parallel_phase2_audit(
&policy, Arc::clone(&store),
&tal_bytes, &policy,
&ta_der, &tal_bytes,
None, &ta_der,
&http, None,
&rsync, &http,
validation_time, &rsync,
&config, validation_time,
args.parallel_phase1_config &config,
.clone() args.parallel_phase1_config
.expect("phase1 config present"), .clone()
) .expect("phase1 config present"),
.map_err(|e| e.to_string())? args.parallel_phase2_config
.clone()
.expect("phase2 config present"),
)
.map_err(|e| e.to_string())?
} else {
run_tree_from_tal_and_ta_der_parallel_phase1_audit(
Arc::clone(&store),
&policy,
&tal_bytes,
&ta_der,
None,
&http,
&rsync,
validation_time,
&config,
args.parallel_phase1_config
.clone()
.expect("phase1 config present"),
)
.map_err(|e| e.to_string())?
}
} else if let Some((_, t)) = timing.as_ref() { } else if let Some((_, t)) = timing.as_ref() {
run_tree_from_tal_and_ta_der_serial_audit_with_timing( run_tree_from_tal_and_ta_der_serial_audit_with_timing(
store.as_ref(), store.as_ref(),
@ -1295,7 +1475,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
} }
let publication_points = out.publication_points.len(); let publication_points = out.publication_points.len();
let repo_sync_ms_total: u64 = out let publication_point_repo_sync_ms_total: u64 = out
.publication_points .publication_points
.iter() .iter()
.map(|pp| pp.repo_sync_duration_ms.unwrap_or(0)) .map(|pp| pp.repo_sync_duration_ms.unwrap_or(0))
@ -1317,6 +1497,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.get("rsync") .get("rsync")
.map(|item| item.duration_ms_total) .map(|item| item.duration_ms_total)
.unwrap_or(0); .unwrap_or(0);
let repo_sync_ms_total = rrdp_download_ms_total + rsync_download_ms_total;
let download_bytes_total: u64 = out let download_bytes_total: u64 = out
.download_stats .download_stats
.by_kind .by_kind
@ -1439,6 +1620,7 @@ pub fn run(argv: &[String]) -> Result<(), String> {
total_ms: total_started.elapsed().as_millis() as u64, total_ms: total_started.elapsed().as_millis() as u64,
publication_points, publication_points,
repo_sync_ms_total, repo_sync_ms_total,
publication_point_repo_sync_ms_total,
download_event_count, download_event_count,
rrdp_download_ms_total, rrdp_download_ms_total,
rsync_download_ms_total, rsync_download_ms_total,
@ -1690,6 +1872,43 @@ mod tests {
assert!(err.contains("no longer supported"), "{err}"); assert!(err.contains("no longer supported"), "{err}");
} }
#[test]
fn parse_accepts_parallel_phase2_with_config() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase1".to_string(),
"--parallel-phase2".to_string(),
"--parallel-phase2-object-workers".to_string(),
"3".to_string(),
"--parallel-phase2-worker-queue-capacity".to_string(),
"17".to_string(),
];
let args = parse_args(&argv).expect("parse args");
assert!(args.parallel_phase1);
assert!(args.parallel_phase2);
let cfg = args.parallel_phase2_config.expect("phase2 config");
assert_eq!(cfg.object_workers, 3);
assert_eq!(cfg.worker_queue_capacity, 17);
}
#[test]
fn parse_rejects_parallel_phase2_without_phase1() {
let argv = vec![
"rpki".to_string(),
"--db".to_string(),
"db".to_string(),
"--tal-url".to_string(),
"https://example.test/root.tal".to_string(),
"--parallel-phase2".to_string(),
];
let err = parse_args(&argv).expect_err("phase2 without phase1 should fail");
assert!(err.contains("requires --parallel-phase1"), "{err}");
}
#[test] #[test]
fn parse_accepts_multi_tal_cir_overrides_in_file_mode() { fn parse_accepts_multi_tal_cir_overrides_in_file_mode() {
let argv = vec![ let argv = vec![

View File

@ -74,6 +74,11 @@ impl CurrentRepoIndex {
out out
} }
pub fn clear(&mut self) {
self.by_uri.clear();
self.by_scope.clear();
}
pub fn apply_repository_view_entries( pub fn apply_repository_view_entries(
&mut self, &mut self,
entries: &[RepositoryViewEntry], entries: &[RepositoryViewEntry],

View File

@ -5,6 +5,23 @@ pub struct ParallelPhase1Config {
pub max_pending_repo_results: usize, pub max_pending_repo_results: usize,
} }
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ParallelPhase2Config {
pub object_workers: usize,
pub worker_queue_capacity: usize,
}
impl Default for ParallelPhase2Config {
fn default() -> Self {
Self {
object_workers: std::thread::available_parallelism()
.map(|n| n.get().max(1))
.unwrap_or(4),
worker_queue_capacity: 256,
}
}
}
impl Default for ParallelPhase1Config { impl Default for ParallelPhase1Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -17,7 +34,7 @@ impl Default for ParallelPhase1Config {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::ParallelPhase1Config; use super::{ParallelPhase1Config, ParallelPhase2Config};
#[test] #[test]
fn default_parallel_phase1_config_is_bounded() { fn default_parallel_phase1_config_is_bounded() {
@ -26,4 +43,11 @@ mod tests {
assert!(cfg.max_inflight_snapshot_bytes_global > 0); assert!(cfg.max_inflight_snapshot_bytes_global > 0);
assert!(cfg.max_pending_repo_results > 0); assert!(cfg.max_pending_repo_results > 0);
} }
#[test]
fn default_parallel_phase2_config_is_bounded() {
let cfg = ParallelPhase2Config::default();
assert!(cfg.object_workers > 0);
assert!(cfg.worker_queue_capacity > 0);
}
} }

View File

@ -1,4 +1,6 @@
pub mod config; pub mod config;
pub mod object_worker;
pub mod phase2_scheduler;
pub mod repo_runtime; pub mod repo_runtime;
pub mod repo_scheduler; pub mod repo_scheduler;
pub mod repo_worker; pub mod repo_worker;

View File

@ -0,0 +1,287 @@
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TrySendError};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
pub trait ObjectTaskExecutor<T, R>: Send + Sync + 'static {
fn execute(&self, worker_index: usize, task: T) -> R;
}
enum ObjectWorkerMessage<T> {
Task(T),
Shutdown,
}
#[derive(Debug)]
pub enum ObjectWorkerSubmitError<T> {
QueueFull { worker_index: usize, task: T },
Disconnected { worker_index: usize, task: T },
}
pub struct ObjectWorkerPool<T, R, E>
where
T: Send + 'static,
R: Send + 'static,
E: ObjectTaskExecutor<T, R>,
{
task_txs: Vec<SyncSender<ObjectWorkerMessage<T>>>,
result_rx: Receiver<R>,
workers: Vec<JoinHandle<()>>,
next_worker_idx: usize,
_executor: Arc<E>,
}
impl<T, R, E> ObjectWorkerPool<T, R, E>
where
T: Send + 'static,
R: Send + 'static,
E: ObjectTaskExecutor<T, R>,
{
pub fn new(worker_count: usize, queue_capacity: usize, executor: E) -> Result<Self, String> {
if worker_count == 0 {
return Err("ObjectWorkerPool requires at least one worker".to_string());
}
if queue_capacity == 0 {
return Err("ObjectWorkerPool requires queue_capacity > 0".to_string());
}
let executor = Arc::new(executor);
let (result_tx, result_rx) = mpsc::channel::<R>();
let mut task_txs = Vec::with_capacity(worker_count);
let mut workers = Vec::with_capacity(worker_count);
for worker_index in 0..worker_count {
let (task_tx, task_rx) = mpsc::sync_channel::<ObjectWorkerMessage<T>>(queue_capacity);
let result_tx = result_tx.clone();
let executor = Arc::clone(&executor);
let handle = thread::Builder::new()
.name(format!("object-validation-worker-{worker_index}"))
.spawn(move || object_worker_loop(worker_index, task_rx, result_tx, executor))
.map_err(|e| format!("spawn object worker failed: {e}"))?;
task_txs.push(task_tx);
workers.push(handle);
}
Ok(Self {
task_txs,
result_rx,
workers,
next_worker_idx: 0,
_executor: executor,
})
}
pub fn worker_count(&self) -> usize {
self.task_txs.len()
}
pub fn next_worker_index(&self) -> usize {
self.next_worker_idx
}
pub fn try_submit_round_robin(&mut self, task: T) -> Result<usize, ObjectWorkerSubmitError<T>> {
let worker_index = self.next_worker_idx % self.task_txs.len();
match self.task_txs[worker_index].try_send(ObjectWorkerMessage::Task(task)) {
Ok(()) => {
self.next_worker_idx = (worker_index + 1) % self.task_txs.len();
Ok(worker_index)
}
Err(TrySendError::Full(ObjectWorkerMessage::Task(task))) => {
Err(ObjectWorkerSubmitError::QueueFull { worker_index, task })
}
Err(TrySendError::Disconnected(ObjectWorkerMessage::Task(task))) => {
Err(ObjectWorkerSubmitError::Disconnected { worker_index, task })
}
Err(TrySendError::Full(ObjectWorkerMessage::Shutdown))
| Err(TrySendError::Disconnected(ObjectWorkerMessage::Shutdown)) => {
unreachable!("shutdown is never submitted via try_submit_round_robin")
}
}
}
pub fn recv_result_timeout(&self, timeout: Duration) -> Result<Option<R>, String> {
match self.result_rx.recv_timeout(timeout) {
Ok(result) => Ok(Some(result)),
Err(RecvTimeoutError::Timeout) => Ok(None),
Err(RecvTimeoutError::Disconnected) => {
Err("object worker result channel disconnected".to_string())
}
}
}
pub fn shutdown(mut self) -> Result<(), String> {
self.shutdown_inner()
}
fn shutdown_inner(&mut self) -> Result<(), String> {
if self.workers.is_empty() {
return Ok(());
}
for tx in &self.task_txs {
tx.send(ObjectWorkerMessage::Shutdown)
.map_err(|e| format!("send shutdown to object worker failed: {e}"))?;
}
let mut first_err = None;
for handle in self.workers.drain(..) {
if let Err(e) = handle.join() {
if first_err.is_none() {
first_err = Some(format!("join object worker failed: {e:?}"));
}
}
}
if let Some(err) = first_err {
return Err(err);
}
Ok(())
}
}
impl<T, R, E> Drop for ObjectWorkerPool<T, R, E>
where
T: Send + 'static,
R: Send + 'static,
E: ObjectTaskExecutor<T, R>,
{
fn drop(&mut self) {
let _ = self.shutdown_inner();
}
}
fn object_worker_loop<T, R, E>(
worker_index: usize,
task_rx: Receiver<ObjectWorkerMessage<T>>,
result_tx: mpsc::Sender<R>,
executor: Arc<E>,
) where
T: Send + 'static,
R: Send + 'static,
E: ObjectTaskExecutor<T, R>,
{
loop {
match task_rx.recv() {
Ok(ObjectWorkerMessage::Task(task)) => {
let result = executor.execute(worker_index, task);
if result_tx.send(result).is_err() {
break;
}
}
Ok(ObjectWorkerMessage::Shutdown) | Err(_) => break,
}
}
}
#[cfg(test)]
mod tests {
use super::{ObjectTaskExecutor, ObjectWorkerPool, ObjectWorkerSubmitError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::time::Duration;
#[derive(Clone)]
struct EchoExecutor;
impl ObjectTaskExecutor<u32, (usize, u32)> for EchoExecutor {
fn execute(&self, worker_index: usize, task: u32) -> (usize, u32) {
(worker_index, task)
}
}
#[test]
fn object_worker_pool_rejects_invalid_config_and_shutdowns_explicitly() {
let err = match ObjectWorkerPool::new(0, 1, EchoExecutor) {
Ok(_) => panic!("zero workers should be rejected"),
Err(err) => err,
};
assert!(err.contains("at least one worker"));
let err = match ObjectWorkerPool::new(1, 0, EchoExecutor) {
Ok(_) => panic!("zero queue should be rejected"),
Err(err) => err,
};
assert!(err.contains("queue_capacity > 0"));
let pool = ObjectWorkerPool::new(2, 1, EchoExecutor).expect("pool");
assert_eq!(pool.worker_count(), 2);
assert_eq!(pool.next_worker_index(), 0);
pool.shutdown().expect("shutdown");
}
#[test]
fn object_worker_pool_round_robin_submits_to_worker_queues() {
let mut pool = ObjectWorkerPool::new(3, 4, EchoExecutor).expect("pool");
assert_eq!(pool.try_submit_round_robin(10).expect("submit 10"), 0);
assert_eq!(pool.try_submit_round_robin(11).expect("submit 11"), 1);
assert_eq!(pool.try_submit_round_robin(12).expect("submit 12"), 2);
assert_eq!(pool.try_submit_round_robin(13).expect("submit 13"), 0);
let mut results = Vec::new();
for _ in 0..4 {
results.push(
pool.recv_result_timeout(Duration::from_secs(1))
.expect("result channel")
.expect("result"),
);
}
results.sort_by_key(|(_, task)| *task);
assert_eq!(results, vec![(0, 10), (1, 11), (2, 12), (0, 13)]);
}
struct BlockingExecutor {
barrier: Arc<Barrier>,
started: Arc<AtomicBool>,
}
impl ObjectTaskExecutor<u32, u32> for BlockingExecutor {
fn execute(&self, _worker_index: usize, task: u32) -> u32 {
self.started.store(true, Ordering::SeqCst);
self.barrier.wait();
task
}
}
#[test]
fn object_worker_pool_reports_full_worker_queue_without_advancing_round_robin() {
let barrier = Arc::new(Barrier::new(2));
let started = Arc::new(AtomicBool::new(false));
let mut pool = ObjectWorkerPool::new(
1,
1,
BlockingExecutor {
barrier: Arc::clone(&barrier),
started: Arc::clone(&started),
},
)
.expect("pool");
assert_eq!(pool.try_submit_round_robin(1).expect("first task"), 0);
let deadline = std::time::Instant::now() + Duration::from_secs(1);
while !started.load(Ordering::SeqCst) {
assert!(
std::time::Instant::now() < deadline,
"worker did not start first task"
);
std::thread::sleep(Duration::from_millis(1));
}
assert_eq!(pool.try_submit_round_robin(2).expect("queued task"), 0);
match pool.try_submit_round_robin(3) {
Err(ObjectWorkerSubmitError::QueueFull { worker_index, task }) => {
assert_eq!(worker_index, 0);
assert_eq!(task, 3);
}
other => panic!("expected queue full, got {other:?}"),
}
assert_eq!(pool.next_worker_index(), 0);
barrier.wait();
assert_eq!(
pool.recv_result_timeout(Duration::from_secs(1))
.expect("result channel"),
Some(1)
);
barrier.wait();
assert_eq!(
pool.recv_result_timeout(Duration::from_secs(1))
.expect("result channel"),
Some(2)
);
}
}

View File

@ -0,0 +1,324 @@
use std::collections::{HashMap, VecDeque};
use crate::parallel::types::RepoIdentity;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CaInstanceId(pub u64);
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PublicationPointId(pub u64);
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PublicationPointState {
pub ca_instance_id: CaInstanceId,
pub pending_roa_tasks: usize,
pub child_discovery_released: bool,
pub finalized: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Phase2CompletionSnapshot {
pub ca_ready_queue_empty: bool,
pub ca_waiting_repo_empty: bool,
pub repo_tasks_idle: bool,
pub pending_roa_dispatch_empty: bool,
pub worker_queues_empty: bool,
pub object_result_queue_empty: bool,
pub object_workers_idle: bool,
pub inflight_publication_points_empty: bool,
}
impl Phase2CompletionSnapshot {
pub fn is_complete(&self) -> bool {
self.ca_ready_queue_empty
&& self.ca_waiting_repo_empty
&& self.repo_tasks_idle
&& self.pending_roa_dispatch_empty
&& self.worker_queues_empty
&& self.object_result_queue_empty
&& self.object_workers_idle
&& self.inflight_publication_points_empty
}
}
#[derive(Default)]
pub struct Phase2SchedulerState<T> {
ca_waiting_repo_by_identity: HashMap<RepoIdentity, Vec<CaInstanceId>>,
ca_ready_queue: VecDeque<CaInstanceId>,
inflight_publication_points: HashMap<PublicationPointId, PublicationPointState>,
pending_roa_dispatch: VecDeque<T>,
}
impl<T> Phase2SchedulerState<T> {
pub fn new() -> Self {
Self {
ca_waiting_repo_by_identity: HashMap::new(),
ca_ready_queue: VecDeque::new(),
inflight_publication_points: HashMap::new(),
pending_roa_dispatch: VecDeque::new(),
}
}
pub fn wait_for_repo(&mut self, identity: RepoIdentity, ca_id: CaInstanceId) {
self.ca_waiting_repo_by_identity
.entry(identity)
.or_default()
.push(ca_id);
}
pub fn release_repo_waiters(&mut self, identity: &RepoIdentity) -> Vec<CaInstanceId> {
let released = self
.ca_waiting_repo_by_identity
.remove(identity)
.unwrap_or_default();
for ca_id in &released {
self.ca_ready_queue.push_back(*ca_id);
}
released
}
pub fn push_ready_ca(&mut self, ca_id: CaInstanceId) {
self.ca_ready_queue.push_back(ca_id);
}
pub fn pop_ready_ca(&mut self) -> Option<CaInstanceId> {
self.ca_ready_queue.pop_front()
}
pub fn start_publication_point(
&mut self,
pp_id: PublicationPointId,
ca_id: CaInstanceId,
pending_roa_tasks: usize,
) {
self.inflight_publication_points.insert(
pp_id,
PublicationPointState {
ca_instance_id: ca_id,
pending_roa_tasks,
child_discovery_released: false,
finalized: false,
},
);
}
pub fn mark_child_discovery_released(&mut self, pp_id: PublicationPointId) {
if let Some(state) = self.inflight_publication_points.get_mut(&pp_id) {
state.child_discovery_released = true;
}
}
pub fn enqueue_roa_task(&mut self, task: T) {
self.pending_roa_dispatch.push_back(task);
}
pub fn pop_pending_roa_dispatch(&mut self) -> Option<T> {
self.pending_roa_dispatch.pop_front()
}
pub fn push_front_pending_roa_dispatch(&mut self, task: T) {
self.pending_roa_dispatch.push_front(task);
}
pub fn record_roa_result(&mut self, pp_id: PublicationPointId) -> Option<PublicationPointId> {
let state = self.inflight_publication_points.get_mut(&pp_id)?;
state.pending_roa_tasks = state.pending_roa_tasks.saturating_sub(1);
if state.pending_roa_tasks == 0 {
state.finalized = true;
self.inflight_publication_points.remove(&pp_id);
Some(pp_id)
} else {
None
}
}
pub fn waiting_repo_len(&self) -> usize {
self.ca_waiting_repo_by_identity
.values()
.map(Vec::len)
.sum()
}
pub fn ready_queue_len(&self) -> usize {
self.ca_ready_queue.len()
}
pub fn inflight_len(&self) -> usize {
self.inflight_publication_points.len()
}
pub fn pending_roa_dispatch_len(&self) -> usize {
self.pending_roa_dispatch.len()
}
pub fn publication_point_state(
&self,
pp_id: PublicationPointId,
) -> Option<&PublicationPointState> {
self.inflight_publication_points.get(&pp_id)
}
pub fn completion_snapshot(
&self,
repo_tasks_idle: bool,
worker_queues_empty: bool,
object_result_queue_empty: bool,
object_workers_idle: bool,
) -> Phase2CompletionSnapshot {
Phase2CompletionSnapshot {
ca_ready_queue_empty: self.ca_ready_queue.is_empty(),
ca_waiting_repo_empty: self.ca_waiting_repo_by_identity.is_empty(),
repo_tasks_idle,
pending_roa_dispatch_empty: self.pending_roa_dispatch.is_empty(),
worker_queues_empty,
object_result_queue_empty,
object_workers_idle,
inflight_publication_points_empty: self.inflight_publication_points.is_empty(),
}
}
}
#[cfg(test)]
mod tests {
use super::{CaInstanceId, Phase2SchedulerState, PublicationPointId};
use crate::parallel::object_worker::{ObjectTaskExecutor, ObjectWorkerPool};
use crate::parallel::types::RepoIdentity;
use std::time::Duration;
fn identity(name: &str) -> RepoIdentity {
RepoIdentity::new(
Some(format!("https://example.test/{name}/notification.xml")),
format!("rsync://example.test/{name}/"),
)
}
#[test]
fn scheduler_repo_ready_moves_waiting_ca_to_ready_queue() {
let mut state = Phase2SchedulerState::<u64>::new();
let repo = identity("arin");
state.wait_for_repo(repo.clone(), CaInstanceId(1));
state.wait_for_repo(repo.clone(), CaInstanceId(2));
assert_eq!(state.waiting_repo_len(), 2);
let released = state.release_repo_waiters(&repo);
assert_eq!(released, vec![CaInstanceId(1), CaInstanceId(2)]);
assert_eq!(state.waiting_repo_len(), 0);
assert_eq!(state.ready_queue_len(), 2);
assert_eq!(state.pop_ready_ca(), Some(CaInstanceId(1)));
assert_eq!(state.pop_ready_ca(), Some(CaInstanceId(2)));
}
#[test]
fn scheduler_releases_child_before_roa_results_finalize_parent() {
let mut state = Phase2SchedulerState::<u64>::new();
let pp = PublicationPointId(10);
state.start_publication_point(pp, CaInstanceId(1), 2);
state.mark_child_discovery_released(pp);
state.push_ready_ca(CaInstanceId(2));
let pp_state = state.publication_point_state(pp).expect("inflight pp");
assert!(pp_state.child_discovery_released);
assert_eq!(pp_state.pending_roa_tasks, 2);
assert_eq!(state.pop_ready_ca(), Some(CaInstanceId(2)));
assert_eq!(state.record_roa_result(pp), None);
assert_eq!(state.inflight_len(), 1);
assert_eq!(state.record_roa_result(pp), Some(pp));
assert_eq!(state.inflight_len(), 0);
}
#[test]
fn scheduler_completion_requires_all_queues_and_inflight_to_be_empty() {
let mut state = Phase2SchedulerState::new();
assert!(state
.completion_snapshot(true, true, true, true)
.is_complete());
state.enqueue_roa_task(1u64);
assert!(!state
.completion_snapshot(true, true, true, true)
.is_complete());
assert_eq!(state.pop_pending_roa_dispatch(), Some(1));
assert!(state
.completion_snapshot(true, true, true, true)
.is_complete());
}
#[test]
fn scheduler_can_retry_pending_roa_task_at_front() {
let mut state = Phase2SchedulerState::new();
state.enqueue_roa_task(1u64);
state.enqueue_roa_task(2u64);
assert_eq!(state.pop_pending_roa_dispatch(), Some(1));
state.push_front_pending_roa_dispatch(3);
assert_eq!(state.pop_pending_roa_dispatch(), Some(3));
assert_eq!(state.pop_pending_roa_dispatch(), Some(2));
assert_eq!(state.pop_pending_roa_dispatch(), None);
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestRoaTask {
pp_id: PublicationPointId,
value: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestRoaResult {
worker_index: usize,
pp_id: PublicationPointId,
value: u64,
}
#[derive(Clone)]
struct TestRoaExecutor;
impl ObjectTaskExecutor<TestRoaTask, TestRoaResult> for TestRoaExecutor {
fn execute(&self, worker_index: usize, task: TestRoaTask) -> TestRoaResult {
TestRoaResult {
worker_index,
pp_id: task.pp_id,
value: task.value,
}
}
}
#[test]
fn scheduler_dispatches_pending_roa_tasks_to_workers_and_finalizes_on_results() {
let pp = PublicationPointId(42);
let mut state = Phase2SchedulerState::new();
state.start_publication_point(pp, CaInstanceId(7), 3);
state.mark_child_discovery_released(pp);
for value in 0..3 {
state.enqueue_roa_task(TestRoaTask { pp_id: pp, value });
}
let mut pool = ObjectWorkerPool::new(2, 4, TestRoaExecutor).expect("object pool");
while let Some(task) = state.pop_pending_roa_dispatch() {
pool.try_submit_round_robin(task).expect("submit task");
}
assert_eq!(state.pending_roa_dispatch_len(), 0);
let mut results = Vec::new();
for _ in 0..3 {
let result = pool
.recv_result_timeout(Duration::from_secs(1))
.expect("result channel")
.expect("result");
let finalized = state.record_roa_result(result.pp_id);
results.push(result);
if results.len() < 3 {
assert_eq!(finalized, None);
} else {
assert_eq!(finalized, Some(pp));
}
}
results.sort_by_key(|result| result.value);
assert_eq!(results[0].worker_index, 0);
assert_eq!(results[1].worker_index, 1);
assert_eq!(results[2].worker_index, 0);
assert_eq!(state.inflight_len(), 0);
assert!(state
.completion_snapshot(true, true, true, true)
.is_complete());
}
}

View File

@ -22,12 +22,50 @@ pub struct RepoSyncRuntimeOutcome {
pub warnings: Vec<Warning>, pub warnings: Vec<Warning>,
} }
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RepoSyncRequestStatus {
Ready {
identity: RepoIdentity,
outcome: RepoSyncRuntimeOutcome,
},
Pending {
identity: RepoIdentity,
state: RepoRuntimeState,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncRuntimeCompletion {
pub identity: RepoIdentity,
pub state: RepoRuntimeState,
pub outcome: RepoSyncRuntimeOutcome,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoSyncRuntimeEvent {
pub transport_identity: RepoIdentity,
pub completions: Vec<RepoSyncRuntimeCompletion>,
}
pub trait RepoSyncRuntime: Send + Sync { pub trait RepoSyncRuntime: Send + Sync {
fn sync_publication_point_repo( fn sync_publication_point_repo(
&self, &self,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
) -> Result<RepoSyncRuntimeOutcome, String>; ) -> Result<RepoSyncRuntimeOutcome, String>;
fn request_publication_point_repo(
&self,
ca: &CaInstanceHandle,
priority: u8,
) -> Result<RepoSyncRequestStatus, String>;
fn recv_repo_result_timeout(
&self,
timeout: Duration,
) -> Result<Option<RepoSyncRuntimeEvent>, String>;
fn reset_run_state(&self) -> Result<(), String>;
fn prefetch_discovered_children( fn prefetch_discovered_children(
&self, &self,
children: &[DiscoveredChildCaInstance], children: &[DiscoveredChildCaInstance],
@ -71,11 +109,11 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
RepoIdentity::new(ca.rrdp_notification_uri.clone(), ca.rsync_base_uri.clone()) RepoIdentity::new(ca.rrdp_notification_uri.clone(), ca.rsync_base_uri.clone())
} }
fn ensure_transport_requested( fn request_transport_for_ca(
&self, &self,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
priority: u8, priority: u8,
) -> Result<Option<RepoSyncRuntimeOutcome>, String> { ) -> Result<RepoSyncRequestStatus, String> {
let identity = Self::build_identity(ca); let identity = Self::build_identity(ca);
let requester = Self::build_requester(ca); let requester = Self::build_requester(ca);
let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri); let rsync_scope_uri = (self.rsync_scope_resolver)(&identity.rsync_base_uri);
@ -108,7 +146,12 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
}), }),
); );
self.drain_pending_transport_tasks()?; self.drain_pending_transport_tasks()?;
Ok(None) Ok(RepoSyncRequestStatus::Pending {
identity,
state: self
.runtime_state_for_identity(&task.repo_identity)
.unwrap_or(RepoRuntimeState::WaitingRrdp),
})
} }
TransportRequestAction::Waiting { state } => { TransportRequestAction::Waiting { state } => {
crate::progress_log::emit( crate::progress_log::emit(
@ -122,7 +165,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
"runtime_state": format!("{state:?}"), "runtime_state": format!("{state:?}"),
}), }),
); );
Ok(None) Ok(RepoSyncRequestStatus::Pending { identity, state })
} }
TransportRequestAction::ReusedSuccess(result) TransportRequestAction::ReusedSuccess(result)
| TransportRequestAction::ReusedTerminalFailure(result) => { | TransportRequestAction::ReusedTerminalFailure(result) => {
@ -140,11 +183,14 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
}, },
}), }),
); );
Ok(Some(outcome_from_transport_result( Ok(RepoSyncRequestStatus::Ready {
&result, outcome: outcome_from_transport_result(
self.runtime_state_for_identity(&identity) &result,
.unwrap_or(RepoRuntimeState::Init), self.runtime_state_for_identity(&identity)
))) .unwrap_or(RepoRuntimeState::Init),
),
identity,
})
} }
} }
} }
@ -182,14 +228,19 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
Ok(()) Ok(())
} }
fn pump_one_transport_result(&self) -> Result<(), String> { fn pump_one_transport_result(
&self,
timeout: Duration,
) -> Result<Option<RepoSyncRuntimeEvent>, String> {
let envelope = { let envelope = {
let pool = self.worker_pool.lock().expect("worker pool lock poisoned"); let pool = self.worker_pool.lock().expect("worker pool lock poisoned");
pool.recv_result_timeout(Duration::from_millis(50))? pool.recv_result_timeout(timeout)?
}; };
let Some(envelope) = envelope else { let Some(envelope) = envelope else {
return Ok(()); return Ok(None);
}; };
let transport_identity = envelope.repo_identity.clone();
let completed_dedup_key = envelope.dedup_key.clone();
crate::progress_log::emit( crate::progress_log::emit(
"phase1_repo_task_result", "phase1_repo_task_result",
serde_json::json!({ serde_json::json!({
@ -229,7 +280,38 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
} }
} }
self.drain_pending_transport_tasks()?; self.drain_pending_transport_tasks()?;
Ok(()) let completions = {
let coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator
.finalized_runtime_records_for_transport(&completed_dedup_key)
.into_iter()
.filter_map(|record| {
let outcome = match record.state {
RepoRuntimeState::RrdpOk | RepoRuntimeState::RsyncOk => record
.last_success
.as_ref()
.map(|result| outcome_from_transport_result(result, record.state)),
RepoRuntimeState::FailedTerminal => record
.terminal_failure
.as_ref()
.map(|result| outcome_from_transport_result(result, record.state)),
_ => None,
}?;
Some(RepoSyncRuntimeCompletion {
identity: record.identity,
state: record.state,
outcome,
})
})
.collect::<Vec<_>>()
};
if completions.is_empty() {
return Ok(None);
}
Ok(Some(RepoSyncRuntimeEvent {
transport_identity,
completions,
}))
} }
fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option<RepoRuntimeState> { fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option<RepoRuntimeState> {
@ -264,24 +346,64 @@ impl<E: RepoTransportExecutor> RepoSyncRuntime for Phase1RepoSyncRuntime<E> {
&self, &self,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,
) -> Result<RepoSyncRuntimeOutcome, String> { ) -> Result<RepoSyncRuntimeOutcome, String> {
if let Some(done) = self.ensure_transport_requested(ca, 0)? { if let RepoSyncRequestStatus::Ready { outcome, .. } =
return Ok(done); self.request_publication_point_repo(ca, 0)?
{
return Ok(outcome);
} }
let identity = Self::build_identity(ca); let identity = Self::build_identity(ca);
loop { loop {
if let Some(done) = self.resolved_outcome_for_identity(&identity) { if let Some(done) = self.resolved_outcome_for_identity(&identity) {
return Ok(done); return Ok(done);
} }
self.pump_one_transport_result()?; let _ = self.recv_repo_result_timeout(Duration::from_millis(50))?;
} }
} }
fn request_publication_point_repo(
&self,
ca: &CaInstanceHandle,
priority: u8,
) -> Result<RepoSyncRequestStatus, String> {
self.request_transport_for_ca(ca, priority)
}
fn recv_repo_result_timeout(
&self,
timeout: Duration,
) -> Result<Option<RepoSyncRuntimeEvent>, String> {
self.pump_one_transport_result(timeout)
}
fn reset_run_state(&self) -> Result<(), String> {
{
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
if coordinator.stats.repo_tasks_running != 0 {
return Err(format!(
"cannot reset repo runtime with {} repo task(s) still running",
coordinator.stats.repo_tasks_running
));
}
coordinator.reset_run_state();
}
loop {
let maybe_result = {
let pool = self.worker_pool.lock().expect("worker pool lock poisoned");
pool.recv_result_timeout(Duration::from_millis(0))?
};
if maybe_result.is_none() {
break;
}
}
Ok(())
}
fn prefetch_discovered_children( fn prefetch_discovered_children(
&self, &self,
children: &[DiscoveredChildCaInstance], children: &[DiscoveredChildCaInstance],
) -> Result<(), String> { ) -> Result<(), String> {
for child in children { for child in children {
let _ = self.ensure_transport_requested(&child.handle, 1)?; let _ = self.request_publication_point_repo(&child.handle, 1)?;
} }
Ok(()) Ok(())
} }
@ -355,8 +477,8 @@ mod tests {
}; };
use crate::parallel::run_coordinator::GlobalRunCoordinator; use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{ use crate::parallel::types::{
RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind, RepoTransportTask, RepoRuntimeState, RepoTransportMode, RepoTransportResultEnvelope, RepoTransportResultKind,
TalInputSpec, RepoTransportTask, TalInputSpec,
}; };
use crate::policy::SyncPreference; use crate::policy::SyncPreference;
use crate::report::Warning; use crate::report::Warning;
@ -400,6 +522,31 @@ mod tests {
} }
} }
struct CountingSuccessTransportExecutor {
count: Arc<AtomicUsize>,
}
impl RepoTransportExecutor for CountingSuccessTransportExecutor {
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
self.count.fetch_add(1, Ordering::SeqCst);
RepoTransportResultEnvelope {
dedup_key: task.dedup_key,
repo_identity: task.repo_identity,
mode: task.mode,
tal_id: task.tal_id,
rir_id: task.rir_id,
timing_ms: 7,
result: RepoTransportResultKind::Success {
source: match task.mode {
RepoTransportMode::Rrdp => "rrdp".to_string(),
RepoTransportMode::Rsync => "rsync".to_string(),
},
warnings: vec![Warning::new("transport ok")],
},
}
}
}
struct FailRrdpThenSucceedRsyncExecutor { struct FailRrdpThenSucceedRsyncExecutor {
rrdp_count: Arc<AtomicUsize>, rrdp_count: Arc<AtomicUsize>,
rsync_count: Arc<AtomicUsize>, rsync_count: Arc<AtomicUsize>,
@ -468,6 +615,207 @@ mod tests {
assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok")); assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok"));
} }
#[test]
fn phase1_runtime_request_repo_returns_pending_then_repo_ready_event() {
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
SuccessTransportExecutor,
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let ca = sample_ca("rsync://example.test/repo/root.mft");
let status = runtime
.request_publication_point_repo(&ca, 0)
.expect("request repo");
let identity = match status {
super::RepoSyncRequestStatus::Pending { identity, state } => {
assert_eq!(state, RepoRuntimeState::WaitingRrdp);
identity
}
other => panic!("expected pending, got {other:?}"),
};
let event = runtime
.recv_repo_result_timeout(Duration::from_secs(1))
.expect("repo event")
.expect("event");
assert_eq!(event.transport_identity, identity);
assert_eq!(event.completions.len(), 1);
assert_eq!(event.completions[0].identity, identity);
assert_eq!(event.completions[0].state, RepoRuntimeState::RrdpOk);
assert!(event.completions[0].outcome.repo_sync_ok);
assert_eq!(
event.completions[0].outcome.repo_sync_source.as_deref(),
Some("rrdp")
);
}
#[test]
fn phase1_runtime_request_repo_reuses_ready_event_result() {
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
SuccessTransportExecutor,
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let ca = sample_ca("rsync://example.test/repo/root.mft");
let first = runtime
.request_publication_point_repo(&ca, 0)
.expect("request repo");
assert!(matches!(
first,
super::RepoSyncRequestStatus::Pending { .. }
));
let _ = runtime
.recv_repo_result_timeout(Duration::from_secs(1))
.expect("repo event")
.expect("event");
let second = runtime
.request_publication_point_repo(&ca, 0)
.expect("request repo reused");
match second {
super::RepoSyncRequestStatus::Ready { outcome, .. } => {
assert!(outcome.repo_sync_ok);
assert_eq!(outcome.repo_sync_phase.as_deref(), Some("rrdp_ok"));
}
other => panic!("expected ready reuse, got {other:?}"),
}
}
#[test]
fn phase1_runtime_repo_event_reports_all_finalized_identities_for_shared_rrdp() {
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
SuccessTransportExecutor,
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let ca1 = sample_ca("rsync://example.test/repo/root.mft");
let mut ca2 = sample_ca("rsync://example.test/other/root.mft");
ca2.rsync_base_uri = "rsync://example.test/other/".to_string();
ca2.publication_point_rsync_uri = "rsync://example.test/other/".to_string();
let id1 = match runtime
.request_publication_point_repo(&ca1, 0)
.expect("request first")
{
super::RepoSyncRequestStatus::Pending { identity, .. } => identity,
other => panic!("expected first pending, got {other:?}"),
};
let id2 = match runtime
.request_publication_point_repo(&ca2, 0)
.expect("request second")
{
super::RepoSyncRequestStatus::Pending { identity, .. } => identity,
other => panic!("expected second pending, got {other:?}"),
};
assert_ne!(id1, id2);
let event = runtime
.recv_repo_result_timeout(Duration::from_secs(1))
.expect("repo event")
.expect("event");
let mut identities = event
.completions
.iter()
.map(|completion| completion.identity.clone())
.collect::<Vec<_>>();
identities.sort_by(|a, b| a.rsync_base_uri.cmp(&b.rsync_base_uri));
let mut expected = vec![id1, id2];
expected.sort_by(|a, b| a.rsync_base_uri.cmp(&b.rsync_base_uri));
assert_eq!(identities, expected);
assert!(
event
.completions
.iter()
.all(|completion| completion.state == RepoRuntimeState::RrdpOk)
);
}
#[test]
fn phase1_runtime_reset_run_state_clears_completed_transport_reuse() {
let count = Arc::new(AtomicUsize::new(0));
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 1 },
CountingSuccessTransportExecutor {
count: Arc::clone(&count),
},
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let ca = sample_ca("rsync://example.test/repo/root.mft");
assert!(matches!(
runtime
.request_publication_point_repo(&ca, 0)
.expect("first request"),
super::RepoSyncRequestStatus::Pending { .. }
));
let _ = runtime
.recv_repo_result_timeout(Duration::from_secs(1))
.expect("first event")
.expect("event");
assert_eq!(count.load(Ordering::SeqCst), 1);
assert!(matches!(
runtime
.request_publication_point_repo(&ca, 0)
.expect("ready reuse before reset"),
super::RepoSyncRequestStatus::Ready { .. }
));
runtime.reset_run_state().expect("reset");
assert!(matches!(
runtime
.request_publication_point_repo(&ca, 0)
.expect("second request after reset"),
super::RepoSyncRequestStatus::Pending { .. }
));
let _ = runtime
.recv_repo_result_timeout(Duration::from_secs(1))
.expect("second event")
.expect("event");
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[test] #[test]
fn phase1_runtime_transitions_rrdp_failure_to_rsync_success() { fn phase1_runtime_transitions_rrdp_failure_to_rsync_success() {
let rrdp_count = Arc::new(AtomicUsize::new(0)); let rrdp_count = Arc::new(AtomicUsize::new(0));

View File

@ -81,6 +81,38 @@ impl TransportStateTables {
self.runtime_records.get(identity) self.runtime_records.get(identity)
} }
pub fn finalized_runtime_records_for_transport(
&self,
dedup_key: &RepoDedupKey,
) -> Vec<RepoRuntimeRecord> {
self.runtime_records
.values()
.filter(|record| match dedup_key {
RepoDedupKey::RrdpNotify { notification_uri } => {
record.rrdp_notification_key.as_deref() == Some(notification_uri.as_str())
}
RepoDedupKey::RsyncScope { rsync_scope_uri } => {
record.rsync_scope_key == *rsync_scope_uri
}
})
.filter(|record| {
matches!(
record.state,
RepoRuntimeState::RrdpOk
| RepoRuntimeState::RsyncOk
| RepoRuntimeState::FailedTerminal
)
})
.cloned()
.collect()
}
pub fn reset_run_state(&mut self) {
self.rrdp_inflight.clear();
self.rsync_inflight.clear();
self.runtime_records.clear();
}
pub fn register_transport_request( pub fn register_transport_request(
&mut self, &mut self,
identity: RepoIdentity, identity: RepoIdentity,
@ -182,9 +214,9 @@ impl TransportStateTables {
entry.waiting_requesters.push(requester.clone()); entry.waiting_requesters.push(requester.clone());
self.runtime_records.insert( self.runtime_records.insert(
identity, identity.clone(),
RepoRuntimeRecord { RepoRuntimeRecord {
identity: entry.task.repo_identity.clone(), identity,
state: RepoRuntimeState::WaitingRrdp, state: RepoRuntimeState::WaitingRrdp,
rrdp_notification_key: Some(notification_uri), rrdp_notification_key: Some(notification_uri),
rsync_scope_key: rsync_scope_uri, rsync_scope_key: rsync_scope_uri,
@ -263,9 +295,9 @@ impl TransportStateTables {
return match result.result { return match result.result {
RepoTransportResultKind::Success { .. } => { RepoTransportResultKind::Success { .. } => {
self.runtime_records.insert( self.runtime_records.insert(
identity, identity.clone(),
RepoRuntimeRecord { RepoRuntimeRecord {
identity: entry.task.repo_identity.clone(), identity,
state: RepoRuntimeState::RsyncOk, state: RepoRuntimeState::RsyncOk,
rrdp_notification_key: None, rrdp_notification_key: None,
rsync_scope_key: rsync_scope_uri, rsync_scope_key: rsync_scope_uri,
@ -280,9 +312,9 @@ impl TransportStateTables {
} }
RepoTransportResultKind::Failed { .. } => { RepoTransportResultKind::Failed { .. } => {
self.runtime_records.insert( self.runtime_records.insert(
identity, identity.clone(),
RepoRuntimeRecord { RepoRuntimeRecord {
identity: entry.task.repo_identity.clone(), identity,
state: RepoRuntimeState::FailedTerminal, state: RepoRuntimeState::FailedTerminal,
rrdp_notification_key: None, rrdp_notification_key: None,
rsync_scope_key: rsync_scope_uri, rsync_scope_key: rsync_scope_uri,
@ -552,6 +584,10 @@ impl InFlightRepoTable {
self.entries.is_empty() self.entries.is_empty()
} }
pub fn reset_run_state(&mut self) {
self.entries.clear();
}
pub fn get(&self, key: &RepoKey) -> Option<&InFlightRepoEntry> { pub fn get(&self, key: &RepoKey) -> Option<&InFlightRepoEntry> {
self.entries.get(key) self.entries.get(key)
} }

View File

@ -8,7 +8,7 @@ use crate::parallel::repo_scheduler::{
}; };
use crate::parallel::stats::ParallelRunStats; use crate::parallel::stats::ParallelRunStats;
use crate::parallel::types::{ use crate::parallel::types::{
RepoIdentity, RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncTask, RepoDedupKey, RepoIdentity, RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncTask,
RepoTransportResultEnvelope, RepoTransportTask, TalInputSpec, RepoTransportResultEnvelope, RepoTransportTask, TalInputSpec,
}; };
use crate::policy::SyncPreference; use crate::policy::SyncPreference;
@ -196,6 +196,25 @@ impl GlobalRunCoordinator {
) -> Option<&crate::parallel::repo_scheduler::RepoRuntimeRecord> { ) -> Option<&crate::parallel::repo_scheduler::RepoRuntimeRecord> {
self.transport_tables.runtime_record(identity) self.transport_tables.runtime_record(identity)
} }
pub fn finalized_runtime_records_for_transport(
&self,
dedup_key: &RepoDedupKey,
) -> Vec<crate::parallel::repo_scheduler::RepoRuntimeRecord> {
self.transport_tables
.finalized_runtime_records_for_transport(dedup_key)
}
pub fn reset_run_state(&mut self) {
self.in_flight_repos.reset_run_state();
self.transport_tables.reset_run_state();
self.pending_repo_tasks.clear();
self.pending_transport_tasks.clear();
self.stats = ParallelRunStats::default();
if let Ok(mut index) = self.current_repo_index.lock() {
index.clear();
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -204,10 +223,11 @@ mod tests {
use crate::parallel::repo_scheduler::RepoRequestAction; use crate::parallel::repo_scheduler::RepoRequestAction;
use crate::parallel::run_coordinator::GlobalRunCoordinator; use crate::parallel::run_coordinator::GlobalRunCoordinator;
use crate::parallel::types::{ use crate::parallel::types::{
RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncResultKind, RepoSyncResultRef, RepoIdentity, RepoKey, RepoRequester, RepoSyncResultEnvelope, RepoSyncResultKind,
TalInputSpec, RepoSyncResultRef, TalInputSpec,
}; };
use crate::policy::SyncPreference; use crate::policy::SyncPreference;
use crate::storage::{RepositoryViewEntry, RepositoryViewState};
fn requester(tal_id: &str, rir_id: &str, manifest: &str) -> RepoRequester { fn requester(tal_id: &str, rir_id: &str, manifest: &str) -> RepoRequester {
RepoRequester { RepoRequester {
@ -259,6 +279,62 @@ mod tests {
assert_eq!(coordinator.stats.repo_queue_depth, 0); assert_eq!(coordinator.stats.repo_queue_depth, 0);
} }
#[test]
fn coordinator_reset_run_state_clears_runtime_only_state() {
let mut coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let identity = RepoIdentity::new(
Some("https://example.test/notify.xml".to_string()),
"rsync://example.test/repo/",
);
let requester = requester("arin", "arin", "rsync://example.test/repo/root.mft");
let action = coordinator.register_transport_request(
identity.clone(),
requester,
time::OffsetDateTime::UNIX_EPOCH,
0,
"rsync://example.test/repo/".to_string(),
SyncPreference::RrdpThenRsync,
);
assert!(matches!(
action,
crate::parallel::repo_scheduler::TransportRequestAction::Enqueue(_)
));
assert!(coordinator.runtime_record(&identity).is_some());
assert_eq!(coordinator.pending_transport_tasks.len(), 1);
{
let mut index = coordinator.current_repo_index.lock().expect("index lock");
index
.apply_repository_view_entries(&[RepositoryViewEntry {
rsync_uri: "rsync://example.test/repo/a.roa".to_string(),
current_hash: Some("aa".repeat(32)),
repository_source: Some("rsync://example.test/repo/".to_string()),
object_type: Some("roa".to_string()),
state: RepositoryViewState::Present,
}])
.expect("apply current object");
assert_eq!(index.active_uri_count(), 1);
}
coordinator.reset_run_state();
assert!(coordinator.runtime_record(&identity).is_none());
assert!(coordinator.pending_transport_tasks.is_empty());
assert!(coordinator.pending_repo_tasks.is_empty());
assert_eq!(
coordinator
.current_repo_index
.lock()
.expect("index lock")
.active_uri_count(),
0
);
assert_eq!(coordinator.stats.repo_tasks_total, 0);
}
#[test] #[test]
fn coordinator_merges_waiting_requesters_and_reuses_success() { fn coordinator_merges_waiting_requesters_and_reuses_success() {
let mut coordinator = GlobalRunCoordinator::new( let mut coordinator = GlobalRunCoordinator::new(

View File

@ -8,5 +8,6 @@ pub mod publication_point;
pub mod run; pub mod run;
pub mod run_tree_from_tal; pub mod run_tree_from_tal;
pub mod tree; pub mod tree;
pub mod tree_parallel;
pub mod tree_runner; pub mod tree_runner;
pub mod x509_name; pub mod x509_name;

File diff suppressed because it is too large Load Diff

View File

@ -75,6 +75,8 @@ pub fn run_publication_point_once(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let result = runner let result = runner

View File

@ -5,7 +5,7 @@ use crate::audit::PublicationPointAudit;
use crate::audit_downloads::DownloadLogHandle; use crate::audit_downloads::DownloadLogHandle;
use crate::current_repo_index::{CurrentRepoIndexHandle, CurrentRepoObject}; use crate::current_repo_index::{CurrentRepoIndexHandle, CurrentRepoObject};
use crate::data_model::ta::TrustAnchor; use crate::data_model::ta::TrustAnchor;
use crate::parallel::config::ParallelPhase1Config; use crate::parallel::config::{ParallelPhase1Config, ParallelPhase2Config};
use crate::parallel::repo_runtime::{Phase1RepoSyncRuntime, RepoSyncRuntime}; use crate::parallel::repo_runtime::{Phase1RepoSyncRuntime, RepoSyncRuntime};
use crate::parallel::repo_worker::{ use crate::parallel::repo_worker::{
LiveRepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig, LiveRepoTransportExecutor, RepoTransportWorkerPool, RepoWorkerPoolConfig,
@ -22,12 +22,16 @@ use crate::replay::fetch_http::PayloadReplayHttpFetcher;
use crate::replay::fetch_rsync::PayloadReplayRsyncFetcher; use crate::replay::fetch_rsync::PayloadReplayRsyncFetcher;
use crate::sync::rrdp::Fetcher; use crate::sync::rrdp::Fetcher;
use crate::validation::from_tal::{ use crate::validation::from_tal::{
DiscoveredRootCaInstance, FromTalError, discover_root_ca_instance_from_tal_and_ta_der, discover_root_ca_instance_from_tal_and_ta_der, discover_root_ca_instance_from_tal_url,
discover_root_ca_instance_from_tal_url, discover_root_ca_instance_from_tal_with_fetchers, discover_root_ca_instance_from_tal_with_fetchers, DiscoveredRootCaInstance, FromTalError,
}; };
use crate::validation::objects::ParallelRoaWorkerPool;
use crate::validation::tree::{ use crate::validation::tree::{
CaInstanceHandle, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput, run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root, CaInstanceHandle,
run_tree_serial, run_tree_serial_audit, run_tree_serial_audit_multi_root, TreeRunAuditOutput, TreeRunConfig, TreeRunError, TreeRunOutput,
};
use crate::validation::tree_parallel::{
run_tree_parallel_phase2_audit, run_tree_parallel_phase2_audit_multi_root,
}; };
use crate::validation::tree_runner::Rpkiv1PublicationPointRunner; use crate::validation::tree_runner::Rpkiv1PublicationPointRunner;
use std::collections::HashMap; use std::collections::HashMap;
@ -113,7 +117,11 @@ fn make_live_runner<'a>(
download_log: Option<DownloadLogHandle>, download_log: Option<DownloadLogHandle>,
current_repo_index: Option<CurrentRepoIndexHandle>, current_repo_index: Option<CurrentRepoIndexHandle>,
repo_sync_runtime: Option<Arc<dyn RepoSyncRuntime>>, repo_sync_runtime: Option<Arc<dyn RepoSyncRuntime>>,
parallel_phase2_config: Option<ParallelPhase2Config>,
) -> Rpkiv1PublicationPointRunner<'a> { ) -> Rpkiv1PublicationPointRunner<'a> {
let parallel_roa_worker_pool = parallel_phase2_config
.as_ref()
.and_then(|config| ParallelRoaWorkerPool::new(config).ok());
Rpkiv1PublicationPointRunner { Rpkiv1PublicationPointRunner {
store, store,
policy, policy,
@ -130,6 +138,8 @@ fn make_live_runner<'a>(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index, current_repo_index,
repo_sync_runtime, repo_sync_runtime,
parallel_phase2_config,
parallel_roa_worker_pool,
} }
} }
@ -280,6 +290,7 @@ pub fn run_tree_from_tal_url_serial(
None, None,
None, None,
None, None,
None,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -315,6 +326,7 @@ pub fn run_tree_from_tal_url_serial_audit(
Some(download_log.clone()), Some(download_log.clone()),
None, None,
None, None,
None,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -366,6 +378,7 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
Some(download_log.clone()), Some(download_log.clone()),
None, None,
None, None,
None,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -393,100 +406,24 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
}) })
} }
pub fn run_tree_from_tal_url_parallel_phase1_audit<H, R>( fn run_single_root_parallel_audit_inner<H, R>(
store: Arc<crate::storage::RocksStore>, store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy, policy: &crate::policy::Policy,
tal_url: &str, discovery: DiscoveredRootCaInstance,
tal_inputs: Vec<TalInputSpec>,
http_fetcher: &H, http_fetcher: &H,
rsync_fetcher: &R, rsync_fetcher: &R,
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
config: &TreeRunConfig, config: &TreeRunConfig,
parallel_config: ParallelPhase1Config, parallel_config: ParallelPhase1Config,
phase2_config: Option<ParallelPhase2Config>,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> ) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where where
H: Fetcher + Clone + 'static, H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{ {
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?; let phase2_enabled = phase2_config.is_some();
let download_log = DownloadLogHandle::new(); let download_log = DownloadLogHandle::new();
let (runtime, current_repo_index) = build_phase1_repo_sync_runtime(
Arc::clone(&store),
policy,
http_fetcher,
rsync_fetcher,
parallel_config,
None,
Some(download_log.clone()),
vec![TalInputSpec::from_url(tal_url.to_string())],
)?;
let current_repo_index_for_output = current_repo_index.clone();
let runner = make_live_runner(
store.as_ref(),
policy,
http_fetcher,
rsync_fetcher,
validation_time,
None,
Some(download_log.clone()),
Some(current_repo_index),
Some(runtime),
);
let root = root_handle_from_trust_anchor(
&discovery.trust_anchor,
derive_tal_id(&discovery),
None,
&discovery.ca_instance,
);
let TreeRunAuditOutput {
tree,
publication_points,
} = run_tree_serial_audit(root, &runner, config)?;
let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput {
discovery: discovery.clone(),
discoveries: vec![discovery],
tree,
publication_points,
downloads,
download_stats,
current_repo_objects: snapshot_current_repo_objects(Some(&current_repo_index_for_output)),
})
}
pub fn run_tree_from_tal_and_ta_der_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_bytes: &[u8],
ta_der: &[u8],
resolved_ta_uri: Option<&url::Url>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery =
discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
let download_log = DownloadLogHandle::new();
let derived_tal_id = derive_tal_id(&discovery);
let tal_inputs = vec![TalInputSpec {
tal_id: derived_tal_id.clone(),
rir_id: derived_tal_id,
source: TalSource::DerBytes {
tal_url: discovery
.tal_url
.clone()
.unwrap_or_else(|| "embedded-tal".to_string()),
tal_bytes: tal_bytes.to_vec(),
ta_der: ta_der.to_vec(),
},
}];
let (runtime, current_repo_index) = build_phase1_repo_sync_runtime( let (runtime, current_repo_index) = build_phase1_repo_sync_runtime(
Arc::clone(&store), Arc::clone(&store),
policy, policy,
@ -508,6 +445,7 @@ where
Some(download_log.clone()), Some(download_log.clone()),
Some(current_repo_index), Some(current_repo_index),
Some(runtime), Some(runtime),
phase2_config,
); );
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -519,7 +457,11 @@ where
let TreeRunAuditOutput { let TreeRunAuditOutput {
tree, tree,
publication_points, publication_points,
} = run_tree_serial_audit(root, &runner, config)?; } = if phase2_enabled {
run_tree_parallel_phase2_audit(root, &runner, config)?
} else {
run_tree_serial_audit(root, &runner, config)?
};
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
@ -533,7 +475,7 @@ where
}) })
} }
pub fn run_tree_from_multiple_tals_parallel_phase1_audit<H, R>( fn run_multi_root_parallel_audit_inner<H, R>(
store: Arc<crate::storage::RocksStore>, store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy, policy: &crate::policy::Policy,
tal_inputs: Vec<TalInputSpec>, tal_inputs: Vec<TalInputSpec>,
@ -542,11 +484,13 @@ pub fn run_tree_from_multiple_tals_parallel_phase1_audit<H, R>(
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
config: &TreeRunConfig, config: &TreeRunConfig,
parallel_config: ParallelPhase1Config, parallel_config: ParallelPhase1Config,
phase2_config: Option<ParallelPhase2Config>,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> ) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where where
H: Fetcher + Clone + 'static, H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{ {
let phase2_enabled = phase2_config.is_some();
if tal_inputs.is_empty() { if tal_inputs.is_empty() {
return Err(RunTreeFromTalError::Replay( return Err(RunTreeFromTalError::Replay(
"multi-TAL run requires at least one TAL input".to_string(), "multi-TAL run requires at least one TAL input".to_string(),
@ -587,12 +531,17 @@ where
Some(download_log.clone()), Some(download_log.clone()),
Some(current_repo_index), Some(current_repo_index),
Some(runtime), Some(runtime),
phase2_config,
); );
let TreeRunAuditOutput { let TreeRunAuditOutput {
tree, tree,
publication_points, publication_points,
} = run_tree_serial_audit_multi_root(root_handles, &runner, config)?; } = if phase2_enabled {
run_tree_parallel_phase2_audit_multi_root(root_handles, &runner, config)?
} else {
run_tree_serial_audit_multi_root(root_handles, &runner, config)?
};
let downloads = download_log.snapshot_events(); let downloads = download_log.snapshot_events();
let download_stats = DownloadLogHandle::stats_from_events(&downloads); let download_stats = DownloadLogHandle::stats_from_events(&downloads);
Ok(RunTreeFromTalAuditOutput { Ok(RunTreeFromTalAuditOutput {
@ -606,6 +555,211 @@ where
}) })
} }
pub fn run_tree_from_tal_url_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_url: &str,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
vec![TalInputSpec::from_url(tal_url.to_string())],
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
None,
)
}
pub fn run_tree_from_tal_and_ta_der_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_bytes: &[u8],
ta_der: &[u8],
resolved_ta_uri: Option<&url::Url>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery =
discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
let derived_tal_id = derive_tal_id(&discovery);
let tal_inputs = vec![TalInputSpec {
tal_id: derived_tal_id.clone(),
rir_id: derived_tal_id,
source: TalSource::DerBytes {
tal_url: discovery
.tal_url
.clone()
.unwrap_or_else(|| "embedded-tal".to_string()),
tal_bytes: tal_bytes.to_vec(),
ta_der: ta_der.to_vec(),
},
}];
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
None,
)
}
pub fn run_tree_from_multiple_tals_parallel_phase1_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_inputs: Vec<TalInputSpec>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
run_multi_root_parallel_audit_inner(
store,
policy,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
None,
)
}
pub fn run_tree_from_tal_url_parallel_phase2_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_url: &str,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?;
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
vec![TalInputSpec::from_url(tal_url.to_string())],
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
)
}
pub fn run_tree_from_tal_and_ta_der_parallel_phase2_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_bytes: &[u8],
ta_der: &[u8],
resolved_ta_uri: Option<&url::Url>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
let discovery =
discover_root_ca_instance_from_tal_and_ta_der(tal_bytes, ta_der, resolved_ta_uri)?;
let derived_tal_id = derive_tal_id(&discovery);
let tal_inputs = vec![TalInputSpec {
tal_id: derived_tal_id.clone(),
rir_id: derived_tal_id,
source: TalSource::DerBytes {
tal_url: discovery
.tal_url
.clone()
.unwrap_or_else(|| "embedded-tal".to_string()),
tal_bytes: tal_bytes.to_vec(),
ta_der: ta_der.to_vec(),
},
}];
run_single_root_parallel_audit_inner(
store,
policy,
discovery,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
)
}
pub fn run_tree_from_multiple_tals_parallel_phase2_audit<H, R>(
store: Arc<crate::storage::RocksStore>,
policy: &crate::policy::Policy,
tal_inputs: Vec<TalInputSpec>,
http_fetcher: &H,
rsync_fetcher: &R,
validation_time: time::OffsetDateTime,
config: &TreeRunConfig,
parallel_config: ParallelPhase1Config,
phase2_config: ParallelPhase2Config,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError>
where
H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{
run_multi_root_parallel_audit_inner(
store,
policy,
tal_inputs,
http_fetcher,
rsync_fetcher,
validation_time,
config,
parallel_config,
Some(phase2_config),
)
}
pub fn run_tree_from_tal_and_ta_der_serial( pub fn run_tree_from_tal_and_ta_der_serial(
store: &crate::storage::RocksStore, store: &crate::storage::RocksStore,
policy: &crate::policy::Policy, policy: &crate::policy::Policy,
@ -636,6 +790,8 @@ pub fn run_tree_from_tal_and_ta_der_serial(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -684,6 +840,8 @@ pub fn run_tree_from_tal_bytes_serial_audit(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -748,6 +906,8 @@ pub fn run_tree_from_tal_bytes_serial_audit_with_timing(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -807,6 +967,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -867,6 +1029,8 @@ pub fn run_tree_from_tal_and_ta_der_serial_audit_with_timing(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -934,6 +1098,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -988,6 +1154,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1058,6 +1226,8 @@ pub fn run_tree_from_tal_and_ta_der_payload_replay_serial_audit_with_timing(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let root = root_handle_from_trust_anchor( let root = root_handle_from_trust_anchor(
@ -1111,6 +1281,8 @@ fn build_payload_replay_runner<'a>(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
} }
} }
@ -1140,6 +1312,8 @@ fn build_payload_delta_replay_runner<'a>(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
} }
} }
@ -1169,6 +1343,8 @@ fn build_payload_delta_replay_current_store_runner<'a>(
rsync_repo_cache: Mutex::new(HashMap::new()), rsync_repo_cache: Mutex::new(HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
} }
} }

View File

@ -0,0 +1,598 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use crate::audit::{DiscoveredFrom, PublicationPointAudit};
use crate::parallel::object_worker::ObjectWorkerSubmitError;
use crate::parallel::repo_runtime::{RepoSyncRequestStatus, RepoSyncRuntimeOutcome};
use crate::parallel::types::RepoIdentity;
use crate::policy::SignedObjectFailurePolicy;
use crate::report::Warning;
use crate::validation::objects::{
prepare_publication_point_for_parallel_roa, reduce_parallel_roa_stage, ObjectsOutput,
OwnedRoaTask, ParallelObjectsPrepare, ParallelObjectsStage,
};
use crate::validation::tree::{
run_tree_serial_audit_multi_root, CaInstanceHandle, DiscoveredChildCaInstance,
PublicationPointRunResult, PublicationPointRunner, TreeRunAuditOutput, TreeRunConfig,
TreeRunError, TreeRunOutput,
};
use crate::validation::tree_runner::{FreshPublicationPointStage, Rpkiv1PublicationPointRunner};
#[derive(Clone, Debug)]
struct QueuedCaInstance {
id: u64,
handle: CaInstanceHandle,
parent_id: Option<u64>,
discovered_from: Option<DiscoveredFrom>,
}
#[derive(Clone, Debug)]
struct ReadyCaInstance {
node: QueuedCaInstance,
repo_outcome: RepoSyncRuntimeOutcome,
}
struct InflightPublicationPoint {
node: QueuedCaInstance,
fresh_stage: FreshPublicationPointStage,
objects_stage: ParallelObjectsStage,
repo_outcome: RepoSyncRuntimeOutcome,
warnings: Vec<Warning>,
started_at: Instant,
objects_started_at: Instant,
task_count: usize,
results: Vec<crate::validation::objects::RoaTaskResult>,
}
struct FinishedPublicationPoint {
node: QueuedCaInstance,
result: Result<PublicationPointRunResult, String>,
}
pub fn run_tree_parallel_phase2_audit_multi_root(
roots: Vec<CaInstanceHandle>,
runner: &Rpkiv1PublicationPointRunner<'_>,
config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> {
if runner.policy.signed_object_failure_policy == SignedObjectFailurePolicy::DropPublicationPoint
{
return run_tree_serial_audit_multi_root(roots, runner, config);
}
let Some(repo_runtime) = runner.repo_sync_runtime.as_ref() else {
return run_tree_serial_audit_multi_root(roots, runner, config);
};
if runner.parallel_roa_worker_pool.is_none() {
return run_tree_serial_audit_multi_root(roots, runner, config);
}
let mut next_id: u64 = 0;
let mut ca_queue: VecDeque<QueuedCaInstance> = VecDeque::new();
for root in roots {
ca_queue.push_back(QueuedCaInstance {
id: next_id,
handle: root,
parent_id: None,
discovered_from: None,
});
next_id += 1;
}
let mut visited_manifest_uris: HashSet<String> = HashSet::new();
let mut ca_waiting_repo_by_identity: HashMap<RepoIdentity, Vec<QueuedCaInstance>> =
HashMap::new();
let mut ready_queue: VecDeque<ReadyCaInstance> = VecDeque::new();
let mut inflight_publication_points: HashMap<u64, InflightPublicationPoint> = HashMap::new();
let mut pending_roa_dispatch: VecDeque<OwnedRoaTask> = VecDeque::new();
let mut finished: Vec<FinishedPublicationPoint> = Vec::new();
let mut instances_started = 0usize;
loop {
while can_start_more(instances_started, config) {
let Some(node) = ca_queue.pop_front() else {
break;
};
if !visited_manifest_uris.insert(node.handle.manifest_rsync_uri.clone()) {
continue;
}
if let Some(max_depth) = config.max_depth {
if node.handle.depth > max_depth {
continue;
}
}
instances_started += 1;
match repo_runtime.request_publication_point_repo(&node.handle, 0) {
Ok(RepoSyncRequestStatus::Ready { mut outcome, .. }) => {
// Ready here means this CA is reusing repo work that has already completed
// (often due to child prefetch). Do not add the transport duration again.
outcome.repo_sync_duration_ms = 0;
ready_queue.push_back(ReadyCaInstance {
node,
repo_outcome: outcome,
});
}
Ok(RepoSyncRequestStatus::Pending { identity, .. }) => {
ca_waiting_repo_by_identity
.entry(identity)
.or_default()
.push(node);
}
Err(err) => {
finished.push(FinishedPublicationPoint {
node,
result: Err(err),
});
}
}
}
while let Some(ready) = ready_queue.pop_front() {
stage_ready_publication_point(
runner,
&mut next_id,
&mut ca_queue,
&mut pending_roa_dispatch,
&mut inflight_publication_points,
&mut finished,
ready,
);
}
flush_pending_roa_dispatch(runner, &mut pending_roa_dispatch)?;
drain_object_results(runner, &mut inflight_publication_points, &mut finished)?;
let repo_poll_timeout = event_poll_timeout(
&ca_queue,
&ready_queue,
&pending_roa_dispatch,
&inflight_publication_points,
instances_started,
config,
);
drain_repo_events(
repo_runtime.as_ref(),
&mut ca_waiting_repo_by_identity,
&mut ready_queue,
repo_poll_timeout,
)?;
if is_complete(
&ca_queue,
&ready_queue,
&ca_waiting_repo_by_identity,
&pending_roa_dispatch,
&inflight_publication_points,
instances_started,
config,
) {
break;
}
}
repo_runtime
.reset_run_state()
.map_err(TreeRunError::Runner)?;
Ok(build_tree_output(finished))
}
fn can_start_more(instances_started: usize, config: &TreeRunConfig) -> bool {
config
.max_instances
.map(|max| instances_started < max)
.unwrap_or(true)
}
fn stage_ready_publication_point(
runner: &Rpkiv1PublicationPointRunner<'_>,
next_id: &mut u64,
ca_queue: &mut VecDeque<QueuedCaInstance>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>,
ready: ReadyCaInstance,
) {
let publication_point_started = Instant::now();
let mut warnings = ready.repo_outcome.warnings.clone();
let repo_outcome = ready.repo_outcome.clone();
let stage = runner.stage_fresh_publication_point_after_repo_ready(
&ready.node.handle,
repo_outcome.repo_sync_ok,
repo_outcome.repo_sync_err.as_deref(),
);
let fresh_stage = match stage {
Ok(stage) => stage,
Err(_) => {
let fallback = runner.run_publication_point(&ready.node.handle);
if let Ok(result) = fallback.as_ref() {
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
result.discovered_children.clone(),
);
}
finished.push(FinishedPublicationPoint {
node: ready.node,
result: fallback,
});
return;
}
};
warnings.extend(fresh_stage.warnings.clone());
enqueue_discovered_children(
runner,
next_id,
ca_queue,
&ready.node,
fresh_stage.discovered_children.clone(),
);
match prepare_publication_point_for_parallel_roa(
ready.node.id,
&fresh_stage.fresh_point,
&ready.node.handle.ca_certificate_der,
ready.node.handle.ca_certificate_rsync_uri.as_deref(),
ready.node.handle.effective_ip_resources.as_ref(),
ready.node.handle.effective_as_resources.as_ref(),
runner.validation_time,
) {
ParallelObjectsPrepare::Complete(mut objects) => {
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&ready.node.handle,
&objects.router_keys,
),
);
finalize_ready_objects(
runner,
ready.node,
fresh_stage,
warnings,
objects,
repo_outcome,
finished,
);
}
ParallelObjectsPrepare::Staged(objects_stage) => {
let tasks = objects_stage.build_roa_tasks();
let task_count = objects_stage.roa_task_count();
for task in tasks {
pending_roa_dispatch.push_back(task);
}
if task_count == 0 {
match reduce_parallel_roa_stage(objects_stage, Vec::new(), runner.timing.as_ref()) {
Ok(mut objects) => {
objects
.router_keys
.extend(fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&ready.node.handle,
&objects.router_keys,
),
);
finalize_ready_objects(
runner,
ready.node,
fresh_stage,
warnings,
objects,
repo_outcome,
finished,
);
}
Err(err) => finished.push(FinishedPublicationPoint {
node: ready.node,
result: Err(err),
}),
}
} else {
inflight_publication_points.insert(
ready.node.id,
InflightPublicationPoint {
node: ready.node,
fresh_stage,
objects_stage,
repo_outcome,
warnings,
started_at: publication_point_started,
objects_started_at: Instant::now(),
task_count,
results: Vec::with_capacity(task_count),
},
);
}
}
}
}
fn enqueue_discovered_children(
runner: &Rpkiv1PublicationPointRunner<'_>,
next_id: &mut u64,
ca_queue: &mut VecDeque<QueuedCaInstance>,
parent: &QueuedCaInstance,
mut children: Vec<DiscoveredChildCaInstance>,
) {
children.sort_by(|a, b| {
a.handle
.manifest_rsync_uri
.cmp(&b.handle.manifest_rsync_uri)
.then_with(|| {
a.discovered_from
.child_ca_certificate_rsync_uri
.cmp(&b.discovered_from.child_ca_certificate_rsync_uri)
})
});
if let Some(runtime) = runner.repo_sync_runtime.as_ref() {
let _ = runtime.prefetch_discovered_children(&children);
}
for child in children {
let mut handle = child.handle.with_depth(parent.handle.depth + 1);
handle.parent_manifest_rsync_uri = Some(parent.handle.manifest_rsync_uri.clone());
ca_queue.push_back(QueuedCaInstance {
id: *next_id,
handle,
parent_id: Some(parent.id),
discovered_from: Some(child.discovered_from),
});
*next_id += 1;
}
}
fn finalize_ready_objects(
runner: &Rpkiv1PublicationPointRunner<'_>,
node: QueuedCaInstance,
fresh_stage: FreshPublicationPointStage,
warnings: Vec<Warning>,
objects: ObjectsOutput,
repo_outcome: RepoSyncRuntimeOutcome,
finished: &mut Vec<FinishedPublicationPoint>,
) {
let result = runner
.finalize_fresh_publication_point_from_reducer(
&node.handle,
&fresh_stage.fresh_point,
warnings,
objects,
fresh_stage.child_audits,
fresh_stage.discovered_children,
repo_outcome.repo_sync_source.as_deref(),
repo_outcome.repo_sync_phase.as_deref(),
repo_outcome.repo_sync_duration_ms,
repo_outcome.repo_sync_err.as_deref(),
)
.map(|out| out.result);
finished.push(FinishedPublicationPoint { node, result });
}
fn flush_pending_roa_dispatch(
runner: &Rpkiv1PublicationPointRunner<'_>,
pending_roa_dispatch: &mut VecDeque<OwnedRoaTask>,
) -> Result<(), TreeRunError> {
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(());
};
while let Some(task) = pending_roa_dispatch.pop_front() {
match pool.try_submit_round_robin(task) {
Ok(_) => {}
Err(ObjectWorkerSubmitError::QueueFull { task, .. }) => {
pending_roa_dispatch.push_front(task);
break;
}
Err(ObjectWorkerSubmitError::Disconnected { .. }) => {
return Err(TreeRunError::Runner(
"parallel ROA worker queue disconnected".to_string(),
));
}
}
}
Ok(())
}
fn drain_object_results(
runner: &Rpkiv1PublicationPointRunner<'_>,
inflight_publication_points: &mut HashMap<u64, InflightPublicationPoint>,
finished: &mut Vec<FinishedPublicationPoint>,
) -> Result<(), TreeRunError> {
let Some(pool) = runner.parallel_roa_worker_pool.as_ref() else {
return Ok(());
};
loop {
let Some(result) = pool
.recv_result_timeout(Duration::from_millis(0))
.map_err(TreeRunError::Runner)?
else {
break;
};
let pp_id = result.publication_point_id;
let should_finalize = if let Some(state) = inflight_publication_points.get_mut(&pp_id) {
state.results.push(result);
state.results.len() == state.task_count
} else {
false
};
if should_finalize {
let state = inflight_publication_points
.remove(&pp_id)
.expect("inflight publication point must exist");
let objects_processing_ms = state.objects_started_at.elapsed().as_millis() as u64;
match reduce_parallel_roa_stage(
state.objects_stage,
state.results,
runner.timing.as_ref(),
) {
Ok(mut objects) => {
objects
.router_keys
.extend(state.fresh_stage.discovered_router_keys.clone());
objects.local_outputs_cache.extend(
crate::validation::tree_runner::build_router_key_local_outputs(
&state.node.handle,
&objects.router_keys,
),
);
let result = runner
.finalize_fresh_publication_point_from_reducer(
&state.node.handle,
&state.fresh_stage.fresh_point,
state.warnings,
objects,
state.fresh_stage.child_audits,
state.fresh_stage.discovered_children,
state.repo_outcome.repo_sync_source.as_deref(),
state.repo_outcome.repo_sync_phase.as_deref(),
state.repo_outcome.repo_sync_duration_ms,
state.repo_outcome.repo_sync_err.as_deref(),
)
.map(|out| out.result);
crate::progress_log::emit(
"phase2_publication_point_reduced",
serde_json::json!({
"manifest_rsync_uri": state.node.handle.manifest_rsync_uri,
"publication_point_rsync_uri": state.node.handle.publication_point_rsync_uri,
"objects_processing_ms": objects_processing_ms,
"total_duration_ms": state.started_at.elapsed().as_millis() as u64,
}),
);
finished.push(FinishedPublicationPoint {
node: state.node,
result,
});
}
Err(err) => finished.push(FinishedPublicationPoint {
node: state.node,
result: Err(err),
}),
}
}
}
Ok(())
}
fn drain_repo_events(
repo_runtime: &dyn crate::parallel::repo_runtime::RepoSyncRuntime,
ca_waiting_repo_by_identity: &mut HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
ready_queue: &mut VecDeque<ReadyCaInstance>,
timeout: Duration,
) -> Result<(), TreeRunError> {
if let Some(event) = repo_runtime
.recv_repo_result_timeout(timeout)
.map_err(TreeRunError::Runner)?
{
for completion in event.completions {
let mut outcome = completion.outcome;
if completion.identity != event.transport_identity {
// Shared RRDP/rsync transports release many publication points, but the transport
// wall time should only be counted once in per-PP stage timing aggregation.
outcome.repo_sync_duration_ms = 0;
}
if let Some(waiters) = ca_waiting_repo_by_identity.remove(&completion.identity) {
for node in waiters {
ready_queue.push_back(ReadyCaInstance {
node,
repo_outcome: outcome.clone(),
});
}
}
}
}
Ok(())
}
fn event_poll_timeout(
ca_queue: &VecDeque<QueuedCaInstance>,
ready_queue: &VecDeque<ReadyCaInstance>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
instances_started: usize,
config: &TreeRunConfig,
) -> Duration {
if !ready_queue.is_empty()
|| !pending_roa_dispatch.is_empty()
|| !inflight_publication_points.is_empty()
|| (!ca_queue.is_empty() && can_start_more(instances_started, config))
{
Duration::from_millis(0)
} else {
Duration::from_millis(50)
}
}
fn is_complete(
ca_queue: &VecDeque<QueuedCaInstance>,
ready_queue: &VecDeque<ReadyCaInstance>,
ca_waiting_repo_by_identity: &HashMap<RepoIdentity, Vec<QueuedCaInstance>>,
pending_roa_dispatch: &VecDeque<OwnedRoaTask>,
inflight_publication_points: &HashMap<u64, InflightPublicationPoint>,
instances_started: usize,
config: &TreeRunConfig,
) -> bool {
let ca_queue_done = ca_queue.is_empty() || !can_start_more(instances_started, config);
ca_queue_done
&& ready_queue.is_empty()
&& ca_waiting_repo_by_identity.is_empty()
&& pending_roa_dispatch.is_empty()
&& inflight_publication_points.is_empty()
}
fn build_tree_output(mut finished: Vec<FinishedPublicationPoint>) -> TreeRunAuditOutput {
finished.sort_by_key(|item| item.node.id);
let mut instances_processed = 0usize;
let mut instances_failed = 0usize;
let mut warnings = Vec::new();
let mut vrps = Vec::new();
let mut aspas = Vec::new();
let mut router_keys = Vec::new();
let mut publication_points = Vec::new();
for item in finished {
match item.result {
Ok(result) => {
instances_processed += 1;
warnings.extend(result.warnings.clone());
warnings.extend(result.objects.warnings.clone());
vrps.extend(result.objects.vrps.clone());
aspas.extend(result.objects.aspas.clone());
router_keys.extend(result.objects.router_keys.clone());
let mut audit: PublicationPointAudit = result.audit;
audit.node_id = Some(item.node.id);
audit.parent_node_id = item.node.parent_id;
audit.discovered_from = item.node.discovered_from;
publication_points.push(audit);
}
Err(err) => {
instances_failed += 1;
warnings.push(
Warning::new(format!("publication point failed: {err}"))
.with_context(&item.node.handle.manifest_rsync_uri),
);
}
}
}
TreeRunAuditOutput {
tree: TreeRunOutput {
instances_processed,
instances_failed,
warnings,
vrps,
aspas,
router_keys,
},
publication_points,
}
}
pub fn run_tree_parallel_phase2_audit(
root: CaInstanceHandle,
runner: &Rpkiv1PublicationPointRunner<'_>,
config: &TreeRunConfig,
) -> Result<TreeRunAuditOutput, TreeRunError> {
run_tree_parallel_phase2_audit_multi_root(vec![root], runner, config)
}

File diff suppressed because it is too large Load Diff

View File

@ -182,6 +182,8 @@ fn apnic_tree_full_stats_serial() {
rsync_repo_cache: std::sync::Mutex::new(std::collections::HashMap::new()), rsync_repo_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
current_repo_index: None, current_repo_index: None,
repo_sync_runtime: None, repo_sync_runtime: None,
parallel_phase2_config: None,
parallel_roa_worker_pool: None,
}; };
let stats = RefCell::new(LiveStats::default()); let stats = RefCell::new(LiveStats::default());

View File

@ -1,10 +1,14 @@
use rpki::fetch::rsync::LocalDirRsyncFetcher; use rpki::fetch::rsync::LocalDirRsyncFetcher;
use rpki::parallel::config::ParallelPhase2Config;
use rpki::policy::{Policy, SignedObjectFailurePolicy, SyncPreference}; use rpki::policy::{Policy, SignedObjectFailurePolicy, SyncPreference};
use rpki::storage::{PackFile, PackTime, RocksStore, VcirOutputType}; use rpki::storage::{PackFile, PackTime, RocksStore, VcirOutputType};
use rpki::sync::repo::sync_publication_point; use rpki::sync::repo::sync_publication_point;
use rpki::sync::rrdp::Fetcher; use rpki::sync::rrdp::Fetcher;
use rpki::validation::manifest::process_manifest_publication_point; use rpki::validation::manifest::process_manifest_publication_point;
use rpki::validation::objects::process_publication_point_snapshot_for_issuer; use rpki::validation::objects::{
process_publication_point_snapshot_for_issuer,
process_publication_point_snapshot_for_issuer_parallel_roa,
};
use rpki::validation::publication_point::PublicationPointSnapshot; use rpki::validation::publication_point::PublicationPointSnapshot;
struct NoopHttpFetcher; struct NoopHttpFetcher;
@ -148,6 +152,151 @@ fn process_snapshot_for_issuer_extracts_vrps_from_real_cernet_fixture() {
assert!(out.aspas.is_empty()); assert!(out.aspas.is_empty());
} }
#[test]
fn parallel_roa_processing_matches_serial_for_real_cernet_fixture() {
let (dir, rsync_base_uri, manifest_file) = cernet_fixture();
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let validation_time = validation_time_from_manifest_fixture(&dir, &manifest_file);
let pack = build_publication_point_snapshot_from_local_rsync_fixture(
&dir,
&rsync_base_uri,
&manifest_rsync_uri,
validation_time,
);
let issuer_ca_der = issuer_ca_fixture();
let issuer_ca = rpki::data_model::rc::ResourceCertificate::decode_der(&issuer_ca_der)
.expect("decode issuer ca");
let policy = Policy::default();
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
issuer_ca.tbs.extensions.ip_resources.as_ref(),
issuer_ca.tbs.extensions.as_resources.as_ref(),
validation_time,
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
issuer_ca.tbs.extensions.ip_resources.as_ref(),
issuer_ca.tbs.extensions.as_resources.as_ref(),
validation_time,
None,
&ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 4,
},
);
assert_eq!(parallel.vrps, serial.vrps);
assert_eq!(parallel.aspas, serial.aspas);
assert_eq!(parallel.local_outputs_cache, serial.local_outputs_cache);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
assert_eq!(parallel.stats, serial.stats);
}
#[test]
fn parallel_roa_processing_reports_issuer_decode_failure_like_serial() {
let (dir, rsync_base_uri, manifest_file) = cernet_fixture();
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let validation_time = validation_time_from_manifest_fixture(&dir, &manifest_file);
let pack = build_publication_point_snapshot_from_local_rsync_fixture(
&dir,
&rsync_base_uri,
&manifest_rsync_uri,
validation_time,
);
let policy = Policy::default();
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&[0, 1, 2],
Some(issuer_ca_rsync_uri()),
None,
None,
validation_time,
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&[0, 1, 2],
Some(issuer_ca_rsync_uri()),
None,
None,
validation_time,
None,
&ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 4,
},
);
assert_eq!(parallel.vrps, serial.vrps);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
assert_eq!(parallel.stats, serial.stats);
assert!(parallel.stats.publication_point_dropped);
}
#[test]
fn parallel_roa_processing_reports_missing_crl_like_serial() {
let (dir, rsync_base_uri, manifest_file) = cernet_fixture();
let manifest_rsync_uri = format!("{rsync_base_uri}{manifest_file}");
let validation_time = validation_time_from_manifest_fixture(&dir, &manifest_file);
let mut pack = build_publication_point_snapshot_from_local_rsync_fixture(
&dir,
&rsync_base_uri,
&manifest_rsync_uri,
validation_time,
);
pack.files.retain(|file| !file.rsync_uri.ends_with(".crl"));
let issuer_ca_der = issuer_ca_fixture();
let issuer_ca = rpki::data_model::rc::ResourceCertificate::decode_der(&issuer_ca_der)
.expect("decode issuer ca");
let policy = Policy::default();
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
issuer_ca.tbs.extensions.ip_resources.as_ref(),
issuer_ca.tbs.extensions.as_resources.as_ref(),
validation_time,
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&issuer_ca_der,
Some(issuer_ca_rsync_uri()),
issuer_ca.tbs.extensions.ip_resources.as_ref(),
issuer_ca.tbs.extensions.as_resources.as_ref(),
validation_time,
None,
&ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 4,
},
);
assert_eq!(parallel.vrps, serial.vrps);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
assert_eq!(parallel.stats, serial.stats);
assert!(parallel.stats.publication_point_dropped);
}
#[test] #[test]
fn signed_object_failure_policy_drop_object_drops_only_bad_object() { fn signed_object_failure_policy_drop_object_drops_only_bad_object() {
let (dir, rsync_base_uri, manifest_file) = cernet_fixture(); let (dir, rsync_base_uri, manifest_file) = cernet_fixture();

View File

@ -1,6 +1,11 @@
use rpki::parallel::config::ParallelPhase2Config;
use rpki::policy::{Policy, SignedObjectFailurePolicy}; use rpki::policy::{Policy, SignedObjectFailurePolicy};
use rpki::storage::{PackFile, PackTime}; use rpki::storage::{PackFile, PackTime};
use rpki::validation::objects::process_publication_point_snapshot_for_issuer; use rpki::validation::objects::{
process_publication_point_for_issuer_parallel_roa_with_pool,
process_publication_point_snapshot_for_issuer,
process_publication_point_snapshot_for_issuer_parallel_roa, ParallelRoaWorkerPool,
};
use rpki::validation::publication_point::PublicationPointSnapshot; use rpki::validation::publication_point::PublicationPointSnapshot;
fn fixture_bytes(path: &str) -> Vec<u8> { fn fixture_bytes(path: &str) -> Vec<u8> {
@ -517,3 +522,228 @@ fn process_snapshot_for_issuer_selects_crl_by_ee_crldp_uri_aspa() {
assert_eq!(out.audit.len(), 1); assert_eq!(out.audit.len(), 1);
assert_eq!(out.warnings.len(), 1); assert_eq!(out.warnings.len(), 1);
} }
#[test]
fn parallel_roa_processing_drop_object_records_roa_and_aspa_errors_like_serial() {
let manifest_bytes = fixture_bytes(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let pack = dummy_snapshot(
manifest_bytes,
vec![
PackFile::from_bytes_compute_sha256(
"rsync://example.test/repo/pp/issuer.crl",
vec![0x01],
),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/a.roa", vec![0x00]),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/b.roa", vec![0x00]),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/c.asa", vec![0x00]),
],
);
let policy = Policy {
signed_object_failure_policy: SignedObjectFailurePolicy::DropObject,
..Policy::default()
};
let issuer_ca_der = fixture_bytes("tests/fixtures/ta/apnic-ta.cer");
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
);
let phase2_config = ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 1,
};
let pool = ParallelRoaWorkerPool::new(&phase2_config).expect("parallel roa pool");
let parallel = process_publication_point_for_issuer_parallel_roa_with_pool(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
&pool,
);
let parallel_again = process_publication_point_for_issuer_parallel_roa_with_pool(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
&pool,
);
assert_eq!(parallel.stats, serial.stats);
assert_eq!(parallel_again.stats, serial.stats);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings.len(), serial.warnings.len());
assert_eq!(parallel.stats.roa_total, 2);
assert_eq!(parallel.stats.aspa_total, 1);
assert_eq!(parallel.audit.len(), 3);
}
#[test]
fn parallel_roa_processing_falls_back_for_drop_publication_point_policy() {
let manifest_bytes = fixture_bytes(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let pack = dummy_snapshot(
manifest_bytes,
vec![
PackFile::from_bytes_compute_sha256(
"rsync://example.test/repo/pp/issuer.crl",
vec![0x01],
),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/a.roa", vec![0x00]),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/b.roa", vec![0x00]),
],
);
let policy = Policy {
signed_object_failure_policy: SignedObjectFailurePolicy::DropPublicationPoint,
..Policy::default()
};
let issuer_ca_der = fixture_bytes("tests/fixtures/ta/apnic-ta.cer");
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
&ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 1,
},
);
assert_eq!(parallel.stats, serial.stats);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
assert!(parallel.stats.publication_point_dropped);
}
#[test]
fn parallel_roa_processing_falls_back_for_single_worker_config() {
let manifest_bytes = fixture_bytes(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let pack = dummy_snapshot(
manifest_bytes,
vec![
PackFile::from_bytes_compute_sha256(
"rsync://example.test/repo/pp/issuer.crl",
vec![0x01],
),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/a.roa", vec![0x00]),
],
);
let policy = Policy {
signed_object_failure_policy: SignedObjectFailurePolicy::DropObject,
..Policy::default()
};
let issuer_ca_der = fixture_bytes("tests/fixtures/ta/apnic-ta.cer");
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
&ParallelPhase2Config {
object_workers: 1,
worker_queue_capacity: 1,
},
);
assert_eq!(parallel.stats, serial.stats);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
}
#[test]
fn parallel_roa_processing_falls_back_when_pool_creation_fails() {
let manifest_bytes = fixture_bytes(
"tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/05FC9C5B88506F7C0D3F862C8895BED67E9F8EBA.mft",
);
let pack = dummy_snapshot(
manifest_bytes,
vec![
PackFile::from_bytes_compute_sha256(
"rsync://example.test/repo/pp/issuer.crl",
vec![0x01],
),
PackFile::from_bytes_compute_sha256("rsync://example.test/repo/pp/a.roa", vec![0x00]),
],
);
let policy = Policy {
signed_object_failure_policy: SignedObjectFailurePolicy::DropObject,
..Policy::default()
};
let issuer_ca_der = fixture_bytes("tests/fixtures/ta/apnic-ta.cer");
let serial = process_publication_point_snapshot_for_issuer(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
);
let parallel = process_publication_point_snapshot_for_issuer_parallel_roa(
&pack,
&policy,
&issuer_ca_der,
None,
None,
None,
time::OffsetDateTime::now_utc(),
None,
&ParallelPhase2Config {
object_workers: 2,
worker_queue_capacity: 0,
},
);
assert_eq!(parallel.stats, serial.stats);
assert_eq!(parallel.audit, serial.audit);
assert_eq!(parallel.warnings, serial.warnings);
}

View File

@ -4,7 +4,7 @@ fn fixture_path(rel: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel) PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(rel)
} }
fn run_offline_case(parallel_phase1: bool) -> (serde_json::Value, Vec<u8>) { fn run_offline_case(parallel_phase1: bool, parallel_phase2: bool) -> (serde_json::Value, Vec<u8>) {
let db_dir = tempfile::tempdir().expect("db tempdir"); let db_dir = tempfile::tempdir().expect("db tempdir");
let out_dir = tempfile::tempdir().expect("out tempdir"); let out_dir = tempfile::tempdir().expect("out tempdir");
let report_path = out_dir.path().join("report.json"); let report_path = out_dir.path().join("report.json");
@ -41,6 +41,13 @@ fn run_offline_case(parallel_phase1: bool) -> (serde_json::Value, Vec<u8>) {
if parallel_phase1 { if parallel_phase1 {
argv.push("--parallel-phase1".to_string()); argv.push("--parallel-phase1".to_string());
} }
if parallel_phase2 {
argv.push("--parallel-phase2".to_string());
argv.push("--parallel-phase2-object-workers".to_string());
argv.push("4".to_string());
argv.push("--parallel-phase2-worker-queue-capacity".to_string());
argv.push("64".to_string());
}
rpki::cli::run(&argv).expect("cli run"); rpki::cli::run(&argv).expect("cli run");
@ -52,8 +59,8 @@ fn run_offline_case(parallel_phase1: bool) -> (serde_json::Value, Vec<u8>) {
#[test] #[test]
fn offline_serial_and_parallel_phase1_match_compare_views() { fn offline_serial_and_parallel_phase1_match_compare_views() {
let (serial_report, serial_ccr_bytes) = run_offline_case(false); let (serial_report, serial_ccr_bytes) = run_offline_case(false, false);
let (parallel_report, parallel_ccr_bytes) = run_offline_case(true); let (parallel_report, parallel_ccr_bytes) = run_offline_case(true, false);
let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr"); let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr");
let parallel_ccr = let parallel_ccr =
@ -80,3 +87,32 @@ fn offline_serial_and_parallel_phase1_match_compare_views() {
"publication point counts must match" "publication point counts must match"
); );
} }
#[test]
fn offline_serial_and_parallel_phase2_match_compare_views() {
let (serial_report, serial_ccr_bytes) = run_offline_case(false, false);
let (phase2_report, phase2_ccr_bytes) = run_offline_case(true, true);
let serial_ccr = rpki::ccr::decode_content_info(&serial_ccr_bytes).expect("decode serial ccr");
let phase2_ccr = rpki::ccr::decode_content_info(&phase2_ccr_bytes).expect("decode phase2 ccr");
let (serial_vrps, serial_vaps) =
rpki::bundle::decode_ccr_compare_views(&serial_ccr, "apnic").expect("serial compare view");
let (phase2_vrps, phase2_vaps) =
rpki::bundle::decode_ccr_compare_views(&phase2_ccr, "apnic").expect("phase2 compare view");
assert_eq!(serial_vrps, phase2_vrps, "VRP compare views must match");
assert_eq!(serial_vaps, phase2_vaps, "VAP compare views must match");
let serial_points = serial_report["publication_points"]
.as_array()
.expect("serial publication_points");
let phase2_points = phase2_report["publication_points"]
.as_array()
.expect("phase2 publication_points");
assert_eq!(
serial_points.len(),
phase2_points.len(),
"publication point counts must match"
);
}