rpki/scripts/soak/run_soak.sh

1156 lines
37 KiB
Bash
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PACKAGE_ROOT="${PACKAGE_ROOT:-$SCRIPT_DIR}"
ENV_FILE="${ENV_FILE:-$PACKAGE_ROOT/.env}"
if [[ -f "$ENV_FILE" ]]; then
# shellcheck disable=SC1090
source "$ENV_FILE"
fi
MAX_RUNS="${MAX_RUNS:-3}"
INTERVAL_SECS="${INTERVAL_SECS:-0}"
STOP_AFTER_SECS="${STOP_AFTER_SECS:-0}"
RIRS="${RIRS:-afrinic,apnic,arin,lacnic,ripe}"
TAL_INPUT_MODE="${TAL_INPUT_MODE:-file-with-ta}"
RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}"
RETAIN_RUNS="${RETAIN_RUNS:-10}"
CLEAN_TMP_AFTER_RUN="${CLEAN_TMP_AFTER_RUN:-0}"
OUTPUT_COMPACT_REPORT="${OUTPUT_COMPACT_REPORT:-1}"
ALLOW_RSYNC_MIRROR_REUSE="${ALLOW_RSYNC_MIRROR_REUSE:-1}"
RSYNC_SCOPE="${RSYNC_SCOPE:-module-root}"
FAILURE_SNAPSHOT_RESET="${FAILURE_SNAPSHOT_RESET:-1}"
PERIODIC_SNAPSHOT_RESET="${PERIODIC_SNAPSHOT_RESET:-0}"
PERIODIC_SNAPSHOT_MAX_DELTAS="${PERIODIC_SNAPSHOT_MAX_DELTAS:-100}"
DB_STATS_EXACT_EVERY="${DB_STATS_EXACT_EVERY:-3}"
RPKI_PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}"
RPKI_PROGRESS_SLOW_SECS="${RPKI_PROGRESS_SLOW_SECS:-10}"
RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="${RPKI_PROGRESS_STAGE_FRESH_SLOW_MS:-1000}"
RPKI_PROGRESS_PP_CONTROL_SLOW_MS="${RPKI_PROGRESS_PP_CONTROL_SLOW_MS:-100}"
RPKI_PROGRESS_PP_CACHE_SLOW_MS="${RPKI_PROGRESS_PP_CACHE_SLOW_MS:-50}"
RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="${RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS:-1000}"
DISABLE_COMPETING_RPS="${DISABLE_COMPETING_RPS:-1}"
ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE="${ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE:-0}"
RPKI_EXTRA_ARGS="${RPKI_EXTRA_ARGS:-}"
RPKI_ANALYZE="${RPKI_ANALYZE:-0}"
BIN_DIR="${BIN_DIR:-$PACKAGE_ROOT/bin}"
FIXTURE_DIR="${FIXTURE_DIR:-$PACKAGE_ROOT/fixtures}"
STATE_ROOT="$RUN_ROOT/state"
RUNS_ROOT="$RUN_ROOT/runs"
LOG_ROOT="$RUN_ROOT/logs"
DB_DIR="${DB_DIR:-$STATE_ROOT/db}"
META_DIR="${META_DIR:-$STATE_ROOT/meta}"
TMP_DIR="${TMP_DIR:-$RUN_ROOT/tmp}"
RSYNC_MIRROR_ROOT="${RSYNC_MIRROR_ROOT:-$STATE_ROOT/rsync-mirror}"
INVALID_ROOT="$STATE_ROOT/invalid"
RESET_STAGING_ROOT="$STATE_ROOT/reset-staging"
LIVE_TA_REFRESH_DIR="${LIVE_TA_REFRESH_DIR:-$META_DIR/live-ta-refresh}"
LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS="${LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS:-15}"
LIVE_TA_REFRESH_MAX_TIME_SECS="${LIVE_TA_REFRESH_MAX_TIME_SECS:-120}"
LIVE_TA_REFRESH_BEFORE_SNAPSHOT="${LIVE_TA_REFRESH_BEFORE_SNAPSHOT:-1}"
RPKI_BIN="$BIN_DIR/rpki"
RPKI_DAEMON_BIN="$BIN_DIR/rpki_daemon"
DB_STATS_BIN="$BIN_DIR/db_stats"
usage() {
cat <<'USAGE'
Usage:
./run_soak.sh
配置来自 package 根目录下的 .env也可以用 ENV_FILE=/path/to/.env 覆盖。
USAGE
}
die() {
echo "error: $*" >&2
exit 2
}
warn() {
echo "warning: $*" >&2
}
is_true() {
case "${1:-}" in
1|true|TRUE|yes|YES|on|ON) return 0 ;;
*) return 1 ;;
esac
}
require_command() {
command -v "$1" >/dev/null 2>&1 || die "missing required command: $1"
}
validate_positive_int() {
local name="$1"
local value="$2"
[[ "$value" =~ ^[0-9]+$ ]] || die "$name must be an integer: $value"
[[ "$value" != "0" ]] || die "$name must be > 0"
}
validate_non_negative_int() {
local name="$1"
local value="$2"
[[ "$value" =~ ^[0-9]+$ ]] || die "$name must be an integer: $value"
}
validate_max_runs() {
[[ "$MAX_RUNS" =~ ^-?[0-9]+$ ]] || die "MAX_RUNS must be an integer: $MAX_RUNS"
[[ "$MAX_RUNS" != "0" ]] || die "MAX_RUNS must be non-zero; use a positive value for fixed runs or -1 for continuous mode"
}
validate_rsync_scope() {
case "$RSYNC_SCOPE" in
host|publication-point|module-root)
;;
*)
die "RSYNC_SCOPE must be host, publication-point, or module-root: $RSYNC_SCOPE"
;;
esac
}
validate_tal_input_mode() {
case "$TAL_INPUT_MODE" in
file-with-ta|file-live-ta|url)
;;
*)
die "TAL_INPUT_MODE must be file-with-ta, file-live-ta or url: $TAL_INPUT_MODE"
;;
esac
}
normalize_token() {
local token="$1"
token="${token#"${token%%[![:space:]]*}"}"
token="${token%"${token##*[![:space:]]}"}"
printf '%s' "$token" | tr '[:upper:]' '[:lower:]'
}
parse_rirs() {
RIR_LIST=()
local raw_token
local normalized
IFS=',' read -r -a raw_rirs <<< "$RIRS"
for raw_token in "${raw_rirs[@]}"; do
normalized="$(normalize_token "$raw_token")"
[[ -n "$normalized" ]] || continue
case "$normalized" in
afrinic|apnic|arin|lacnic|ripe)
RIR_LIST+=("$normalized")
;;
*)
die "invalid RIRS entry: $raw_token; allowed: afrinic,apnic,arin,lacnic,ripe"
;;
esac
done
[[ "${#RIR_LIST[@]}" -gt 0 ]] || die "RIRS must contain at least one RIR"
}
tal_file_for_rir() {
case "$1" in
afrinic) printf '%s' "$FIXTURE_DIR/tal/afrinic.tal" ;;
apnic) printf '%s' "$FIXTURE_DIR/tal/apnic-rfc7730-https.tal" ;;
arin) printf '%s' "$FIXTURE_DIR/tal/arin.tal" ;;
lacnic) printf '%s' "$FIXTURE_DIR/tal/lacnic.tal" ;;
ripe) printf '%s' "$FIXTURE_DIR/tal/ripe-ncc.tal" ;;
*) die "unknown RIR: $1" ;;
esac
}
ta_file_for_rir() {
case "$1" in
afrinic) printf '%s' "$FIXTURE_DIR/ta/afrinic-ta.cer" ;;
apnic) printf '%s' "$FIXTURE_DIR/ta/apnic-ta.cer" ;;
arin) printf '%s' "$FIXTURE_DIR/ta/arin-ta.cer" ;;
lacnic) printf '%s' "$FIXTURE_DIR/ta/lacnic-ta.cer" ;;
ripe) printf '%s' "$FIXTURE_DIR/ta/ripe-ncc-ta.cer" ;;
*) die "unknown RIR: $1" ;;
esac
}
tal_url_for_rir() {
case "$1" in
afrinic) printf '%s' "https://rpki.afrinic.net/tal/afrinic.tal" ;;
apnic) printf '%s' "https://tal.apnic.net/apnic.tal" ;;
arin) printf '%s' "https://www.arin.net/resources/manage/rpki/arin.tal" ;;
lacnic) printf '%s' "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal" ;;
ripe) printf '%s' "https://tal.rpki.ripe.net/ripe-ncc.tal" ;;
*) die "unknown RIR: $1" ;;
esac
}
cir_tal_uri_for_rir() {
tal_url_for_rir "$1"
}
tal_https_uri_from_fixture() {
local tal_path="$1"
awk '
/^[[:space:]]*#/ { next }
/^[[:space:]]*$/ { next }
{
gsub(/^[[:space:]]+|[[:space:]]+$/, "", $0)
}
/^https?:\/\// {
print
exit 0
}
' "$tal_path"
}
live_ta_file_for_rir() {
printf '%s' "$STATE_ROOT/live-ta/$(basename "$(tal_file_for_rir "$1")" .tal).cer"
}
live_ta_refresh_pid_file_for_rir() {
printf '%s' "$LIVE_TA_REFRESH_DIR/$1.pid"
}
refresh_live_ta_for_rir() {
local rir_name="$1"
local run_id="${2:-manual}"
local log_path="${3:-}"
local tal_path
local ta_uri
local ta_file
local tmp_file
if [[ -n "$log_path" ]]; then
mkdir -p "$(dirname "$log_path")"
exec >> "$log_path" 2>&1
fi
echo "live-ta-refresh start run=$run_id rir=$rir_name at=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
tal_path="$(tal_file_for_rir "$rir_name")"
ta_uri="$(tal_https_uri_from_fixture "$tal_path")"
if [[ -z "$ta_uri" ]]; then
echo "live-ta-refresh failed rir=$rir_name reason=missing_https_uri tal=$tal_path"
return 1
fi
ta_file="$(live_ta_file_for_rir "$rir_name")"
mkdir -p "$(dirname "$ta_file")"
tmp_file="${ta_file}.tmp.$$.$RANDOM"
if ! curl -fsSL --connect-timeout "$LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS" --max-time "$LIVE_TA_REFRESH_MAX_TIME_SECS" "$ta_uri" -o "$tmp_file"; then
rm -f "$tmp_file"
echo "live-ta-refresh failed rir=$rir_name reason=curl uri=$ta_uri"
return 1
fi
if [[ ! -s "$tmp_file" ]]; then
rm -f "$tmp_file"
echo "live-ta-refresh failed rir=$rir_name reason=empty_download uri=$ta_uri"
return 1
fi
mv "$tmp_file" "$ta_file"
echo "live-ta-refresh success rir=$rir_name uri=$ta_uri output=$ta_file bytes=$(wc -c < "$ta_file" | tr -d ' ')"
}
ensure_live_ta_for_rir() {
local rir_name="$1"
local live_ta_file
local fixture_ta_file
live_ta_file="$(live_ta_file_for_rir "$rir_name")"
if [[ -s "$live_ta_file" ]]; then
return 0
fi
fixture_ta_file="$(ta_file_for_rir "$rir_name")"
[[ -s "$fixture_ta_file" ]] || die "missing live TA and fixture TA for $rir_name: $live_ta_file / $fixture_ta_file"
mkdir -p "$(dirname "$live_ta_file")"
cp "$fixture_ta_file" "$live_ta_file"
}
reap_finished_live_ta_refresh_for_rir() {
local rir_name="$1"
local pid_file
local pid
local pid_state
local pid_file_mtime
local now_epoch
local stale_after_secs
pid_file="$(live_ta_refresh_pid_file_for_rir "$rir_name")"
[[ -f "$pid_file" ]] || return 0
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [[ "$pid" =~ ^[0-9]+$ ]] && kill -0 "$pid" >/dev/null 2>&1; then
pid_state=""
if [[ -r "/proc/$pid/stat" ]]; then
pid_state="$(awk '{ print $3 }' "/proc/$pid/stat" 2>/dev/null || true)"
fi
if [[ "$pid_state" == "Z" ]]; then
wait "$pid" >/dev/null 2>&1 || true
rm -f "$pid_file"
return 0
fi
pid_file_mtime="$(stat -c %Y "$pid_file" 2>/dev/null || date +%s)"
now_epoch="$(date +%s)"
stale_after_secs=$((LIVE_TA_REFRESH_MAX_TIME_SECS + 60))
if (( now_epoch - pid_file_mtime > stale_after_secs )); then
rm -f "$pid_file"
return 0
fi
return 1
fi
if [[ "$pid" =~ ^[0-9]+$ ]]; then
wait "$pid" >/dev/null 2>&1 || true
fi
rm -f "$pid_file"
return 0
}
start_live_ta_refresh_for_rir() {
local rir_name="$1"
local run_id="$2"
local pid_file
local log_path
local pid
mkdir -p "$LIVE_TA_REFRESH_DIR" "$LOG_ROOT"
pid_file="$(live_ta_refresh_pid_file_for_rir "$rir_name")"
if ! reap_finished_live_ta_refresh_for_rir "$rir_name"; then
pid="$(cat "$pid_file" 2>/dev/null || true)"
echo "live-ta-refresh skip run=$run_id rir=$rir_name reason=previous_refresh_running pid=$pid" \
>> "$LOG_ROOT/live-ta-refresh-$run_id-$rir_name.log"
return 0
fi
log_path="$LOG_ROOT/live-ta-refresh-$run_id-$rir_name.log"
refresh_live_ta_for_rir "$rir_name" "$run_id" "$log_path" &
pid=$!
printf '%s\n' "$pid" > "$pid_file"
}
wait_for_previous_live_ta_refresh_for_rir() {
local rir_name="$1"
local pid_file
local pid
local deadline_epoch
pid_file="$(live_ta_refresh_pid_file_for_rir "$rir_name")"
[[ -f "$pid_file" ]] || return 0
if reap_finished_live_ta_refresh_for_rir "$rir_name"; then
return 0
fi
pid="$(cat "$pid_file" 2>/dev/null || true)"
deadline_epoch=$(( $(date +%s) + LIVE_TA_REFRESH_MAX_TIME_SECS + 60 ))
echo "live-ta-refresh wait rir=$rir_name reason=previous_refresh_running pid=$pid"
while ! reap_finished_live_ta_refresh_for_rir "$rir_name"; do
if (( $(date +%s) > deadline_epoch )); then
die "timed out waiting for previous live TA refresh for $rir_name pid=$pid"
fi
sleep 1
done
}
refresh_live_ta_blocking_for_run() {
local run_id="$1"
local rir_name
local pid
local failed=0
local pids=()
local names=()
local log_path
local pid_file
for rir_name in "${RIR_LIST[@]}"; do
wait_for_previous_live_ta_refresh_for_rir "$rir_name"
done
for rir_name in "${RIR_LIST[@]}"; do
mkdir -p "$LIVE_TA_REFRESH_DIR" "$LOG_ROOT"
pid_file="$(live_ta_refresh_pid_file_for_rir "$rir_name")"
log_path="$LOG_ROOT/live-ta-refresh-$run_id-$rir_name.log"
refresh_live_ta_for_rir "$rir_name" "$run_id" "$log_path" &
pid=$!
printf '%s\n' "$pid" > "$pid_file"
pids+=("$pid")
names+=("$rir_name")
done
local index
for index in "${!pids[@]}"; do
pid="${pids[$index]}"
rir_name="${names[$index]}"
pid_file="$(live_ta_refresh_pid_file_for_rir "$rir_name")"
if wait "$pid"; then
rm -f "$pid_file"
else
failed=1
rm -f "$pid_file"
echo "live-ta-refresh failed before snapshot rir=$rir_name log=$LOG_ROOT/live-ta-refresh-$run_id-$rir_name.log" >&2
fi
done
if (( failed != 0 )); then
die "live TA refresh failed before snapshot run=$run_id; see $LOG_ROOT/live-ta-refresh-$run_id-*.log"
fi
echo "live-ta-refresh completed before snapshot run=$run_id rirs=${#RIR_LIST[@]}"
}
prepare_live_ta_inputs_for_run() {
local run_id="$1"
local sync_mode="$2"
local rir_name
if [[ "$TAL_INPUT_MODE" != "file-live-ta" ]]; then
return 0
fi
if [[ "$sync_mode" == "snapshot" ]] && is_true "$LIVE_TA_REFRESH_BEFORE_SNAPSHOT"; then
refresh_live_ta_blocking_for_run "$run_id"
return 0
fi
for rir_name in "${RIR_LIST[@]}"; do
ensure_live_ta_for_rir "$rir_name"
done
for rir_name in "${RIR_LIST[@]}"; do
start_live_ta_refresh_for_rir "$rir_name" "$run_id"
done
}
compare_view_trust_anchor() {
if [[ "${#RIR_LIST[@]}" -eq 1 ]]; then
printf '%s' "${RIR_LIST[0]}"
else
printf '%s' "all5"
fi
}
max_existing_run_index() {
local max_index=0
local run_dir
local run_name
local numeric_part
shopt -s nullglob
for run_dir in "$RUNS_ROOT"/run_[0-9][0-9][0-9][0-9]; do
[[ -d "$run_dir" ]] || continue
run_name="$(basename "$run_dir")"
numeric_part="${run_name#run_}"
if (( 10#$numeric_part > max_index )); then
max_index=$((10#$numeric_part))
fi
done
shopt -u nullglob
printf '%s' "$max_index"
}
json_status_is_success() {
local json_path="$1"
python3 - "$json_path" <<'PY'
import json
import sys
path = sys.argv[1]
try:
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception:
sys.exit(1)
sys.exit(0 if data.get("status") == "success" else 1)
PY
}
previous_run_success() {
local run_dir="$1"
[[ -d "$run_dir" ]] || return 1
[[ -f "$run_dir/run-meta.json" ]] || return 1
[[ -f "$run_dir/run-summary.json" ]] || return 1
json_status_is_success "$run_dir/run-meta.json" || return 1
json_status_is_success "$run_dir/run-summary.json" || return 1
for required_artifact in report.json result.ccr input.cir stage-timing.json process-time.txt stdout.log stderr.log; do
[[ -f "$run_dir/$required_artifact" ]] || return 1
done
return 0
}
move_if_exists() {
local source_path="$1"
local target_dir="$2"
if [[ -e "$source_path" ]]; then
mkdir -p "$target_dir"
mv "$source_path" "$target_dir/"
fi
}
db_state_exists() {
[[ -e "$DB_DIR/work-db" || -e "$DB_DIR/repo-bytes.db" ]]
}
delta_state_available() {
[[ -e "$DB_DIR/work-db" ]]
}
isolate_state_after_failure() {
local previous_run_id="$1"
local timestamp
timestamp="$(date -u +%Y%m%dT%H%M%SZ)"
local invalid_dir="$INVALID_ROOT/${previous_run_id}-${timestamp}"
mkdir -p "$invalid_dir"
move_if_exists "$DB_DIR" "$invalid_dir"
move_if_exists "$META_DIR" "$invalid_dir"
move_if_exists "$TMP_DIR" "$invalid_dir"
mkdir -p "$DB_DIR" "$META_DIR" "$TMP_DIR"
INVALID_DB_PATH="$invalid_dir/$(basename "$DB_DIR")"
INVALID_STATE_PATH="$invalid_dir/$(basename "$META_DIR")"
INVALID_TMP_PATH="$invalid_dir/$(basename "$TMP_DIR")"
}
periodic_snapshot_delta_scan() {
python3 - "$RUNS_ROOT" <<'PY'
import json
import pathlib
import sys
runs_root = pathlib.Path(sys.argv[1])
delta_count = 0
run_dirs = sorted(
[
path
for path in runs_root.glob("run_*")
if path.is_dir()
and path.name.startswith("run_")
and path.name[4:].isdigit()
],
key=lambda path: int(path.name[4:]),
reverse=True,
)
for run_dir in run_dirs:
meta_path = run_dir / "run-meta.json"
summary_path = run_dir / "run-summary.json"
try:
with meta_path.open("r", encoding="utf-8") as handle:
meta = json.load(handle)
with summary_path.open("r", encoding="utf-8") as handle:
summary = json.load(handle)
except Exception as exc:
print(f"error\t{delta_count}\t{run_dir.name}\tjson_parse:{exc.__class__.__name__}")
sys.exit(0)
if meta.get("status") != "success" or summary.get("status") != "success":
continue
sync_mode = meta.get("sync_mode") or meta.get("syncMode")
if sync_mode == "delta":
delta_count += 1
continue
if sync_mode == "snapshot":
print(f"ok\t{delta_count}\t{run_dir.name}\t")
sys.exit(0)
print(f"error\t{delta_count}\t{run_dir.name}\tmissing_sync_mode")
sys.exit(0)
print(f"error\t{delta_count}\t\tmissing_success_snapshot")
PY
}
periodic_snapshot_force_needed() {
PERIODIC_SCAN_STATUS=""
PERIODIC_SCAN_DELTA_COUNT=""
PERIODIC_SCAN_SNAPSHOT_RUN_ID=""
PERIODIC_SCAN_DETAIL=""
local scan_output
scan_output="$(periodic_snapshot_delta_scan)"
IFS=$'\t' read -r PERIODIC_SCAN_STATUS PERIODIC_SCAN_DELTA_COUNT PERIODIC_SCAN_SNAPSHOT_RUN_ID PERIODIC_SCAN_DETAIL <<< "$scan_output"
if [[ "$PERIODIC_SCAN_STATUS" != "ok" ]]; then
warn "periodic snapshot reset scan skipped status=${PERIODIC_SCAN_STATUS:-missing} snapshot_run=${PERIODIC_SCAN_SNAPSHOT_RUN_ID:-none} detail=${PERIODIC_SCAN_DETAIL:-unknown}"
return 1
fi
[[ -n "$PERIODIC_SCAN_DELTA_COUNT" ]] || PERIODIC_SCAN_DELTA_COUNT="0"
(( PERIODIC_SCAN_DELTA_COUNT >= PERIODIC_SNAPSHOT_MAX_DELTAS ))
}
prepare_periodic_reset_state_db() {
local run_id="$1"
RESET_DB_STAGING_PATH=""
RESET_DB_CLEANUP_STATUS=""
db_state_exists || return 0
local timestamp
local staging_root
timestamp="$(date -u +%Y%m%dT%H%M%SZ)"
staging_root="$RESET_STAGING_ROOT/${run_id}-${timestamp}"
mkdir -p "$staging_root"
mv "$DB_DIR" "$staging_root/"
mkdir -p "$DB_DIR"
RESET_DB_STAGING_PATH="$staging_root/$(basename "$DB_DIR")"
RESET_DB_CLEANUP_STATUS="pending"
}
finalize_periodic_reset_state_db() {
local final_status="$1"
local reset_db_staging_path="$2"
[[ -n "$reset_db_staging_path" ]] || {
printf '%s\n' ""
return 0
}
local staging_root
staging_root="$(dirname "$reset_db_staging_path")"
if [[ "$final_status" == "success" ]]; then
if rm -rf "$staging_root"; then
printf '%s\n' "deleted"
return 0
fi
warn "failed to delete periodic reset staging: $staging_root"
printf '%s\n' "cleanup_failed"
return 1
fi
printf '%s\n' "retained_failure"
}
write_run_meta() {
local output_path="$1"
local status="$2"
local run_index="$3"
local run_id="$4"
local sync_mode="$5"
local snapshot_reason="$6"
local previous_run_id="$7"
local previous_run_success_value="$8"
local started_at="$9"
local completed_at="${10}"
local invalid_db_path="${11}"
local invalid_state_path="${12}"
local invalid_tmp_path="${13}"
local daemon_exit_code="${14}"
local package_root="${15}"
local env_file="${16}"
local periodic_snapshot_reset_enabled="${17}"
local periodic_snapshot_max_deltas="${18}"
local periodic_snapshot_delta_count="${19}"
local periodic_snapshot_forced="${20}"
local reset_db_staging_path="${21}"
local reset_db_cleanup_status="${22}"
python3 - "$output_path" "$status" "$run_index" "$run_id" "$sync_mode" "$snapshot_reason" \
"$previous_run_id" "$previous_run_success_value" "$started_at" "$completed_at" \
"$invalid_db_path" "$invalid_state_path" "$invalid_tmp_path" "$daemon_exit_code" \
"$package_root" "$env_file" "$periodic_snapshot_reset_enabled" \
"$periodic_snapshot_max_deltas" "$periodic_snapshot_delta_count" \
"$periodic_snapshot_forced" "$reset_db_staging_path" "$reset_db_cleanup_status" <<'PY'
import json
import sys
def nullable(value):
return None if value == "" else value
def nullable_bool(value):
if value == "":
return None
return value == "true"
def nullable_int(value):
if value == "":
return None
return int(value)
def bool_value(value):
return value == "true"
(
output_path,
status,
run_index,
run_id,
sync_mode,
snapshot_reason,
previous_run_id,
previous_run_success,
started_at,
completed_at,
invalid_db_path,
invalid_state_path,
invalid_tmp_path,
daemon_exit_code,
package_root,
env_file,
periodic_snapshot_reset_enabled,
periodic_snapshot_max_deltas,
periodic_snapshot_delta_count,
periodic_snapshot_forced,
reset_db_staging_path,
reset_db_cleanup_status,
) = sys.argv[1:]
data = {
"status": status,
"run_index": int(run_index),
"run_id": run_id,
"sync_mode": sync_mode,
"snapshot_reason": nullable(snapshot_reason),
"previous_run_id": nullable(previous_run_id),
"previous_run_success": nullable_bool(previous_run_success),
"started_at_rfc3339_utc": started_at,
"completed_at_rfc3339_utc": nullable(completed_at),
"invalid_db_path": nullable(invalid_db_path),
"invalid_state_path": nullable(invalid_state_path),
"invalid_tmp_path": nullable(invalid_tmp_path),
"daemon_exit_code": nullable_int(daemon_exit_code),
"package_root": package_root,
"env_file": env_file,
"periodic_snapshot_reset_enabled": bool_value(periodic_snapshot_reset_enabled),
"periodic_snapshot_max_deltas": int(periodic_snapshot_max_deltas),
"periodic_snapshot_delta_count": nullable_int(periodic_snapshot_delta_count),
"periodic_snapshot_forced": bool_value(periodic_snapshot_forced),
"reset_db_staging_path": nullable(reset_db_staging_path),
"reset_db_cleanup_status": nullable(reset_db_cleanup_status),
}
with open(output_path, "w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2, sort_keys=True)
handle.write("\n")
PY
}
summary_status() {
local summary_path="$1"
python3 - "$summary_path" <<'PY'
import json
import sys
try:
with open(sys.argv[1], "r", encoding="utf-8") as handle:
print(json.load(handle).get("status", "missing"))
except Exception:
print("missing")
PY
}
prepare_competing_rp_state() {
if ! is_true "$DISABLE_COMPETING_RPS"; then
return 0
fi
systemctl disable --now rpki-client.timer >/dev/null 2>&1 || true
systemctl stop rpki-client.service >/dev/null 2>&1 || true
pkill -x rpki-client >/dev/null 2>&1 || true
pkill -x routinator >/dev/null 2>&1 || true
}
write_machine_snapshot() {
local suffix="$1"
df -h > "$LOG_ROOT/df-${suffix}.txt" 2>&1 || true
free -h > "$LOG_ROOT/free-${suffix}.txt" 2>&1 || true
ps -eo pid,ppid,stat,pcpu,pmem,rss,args --sort=-pcpu \
| grep -E 'rpki_daemon|/bin/rpki|rpki-client|routinator' \
| grep -v grep > "$LOG_ROOT/process-${suffix}.txt" || true
systemctl is-active rpki-client.timer > "$LOG_ROOT/rpki-client-timer-active-${suffix}.txt" 2>&1 || true
systemctl is-enabled rpki-client.timer > "$LOG_ROOT/rpki-client-timer-enabled-${suffix}.txt" 2>&1 || true
}
build_child_args() {
CHILD_ARGS=(
--db "$DB_DIR/work-db"
--repo-bytes-db "$DB_DIR/repo-bytes.db"
--rsync-scope "$RSYNC_SCOPE"
)
if is_true "$ALLOW_RSYNC_MIRROR_REUSE"; then
CHILD_ARGS+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
else
CHILD_ARGS+=(--rsync-mirror-root "$TMP_DIR/rsync-mirror-{run_id}")
fi
CHILD_ARGS+=(
--parallel-phase2-ready-batch-size 256
--parallel-phase2-ready-batch-wall-time-budget-ms 100
--parallel-phase2-result-drain-batch-size 2048
--parallel-phase2-finalize-batch-size 256
--parallel-phase2-finalize-batch-wall-time-budget-ms 100
)
local rir_name
for rir_name in "${RIR_LIST[@]}"; do
if [[ "$TAL_INPUT_MODE" == "url" ]]; then
CHILD_ARGS+=(--tal-url "$(tal_url_for_rir "$rir_name")")
elif [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
CHILD_ARGS+=(--tal-path "$(tal_file_for_rir "$rir_name")")
CHILD_ARGS+=(--ta-path "$(live_ta_file_for_rir "$rir_name")")
else
CHILD_ARGS+=(--tal-path "$(tal_file_for_rir "$rir_name")")
CHILD_ARGS+=(--ta-path "$(ta_file_for_rir "$rir_name")")
fi
done
CHILD_ARGS+=(
--report-json "{run_out}/report.json"
)
if is_true "$OUTPUT_COMPACT_REPORT"; then
CHILD_ARGS+=(--report-json-compact)
fi
CHILD_ARGS+=(
--ccr-out "{run_out}/result.ccr"
--cir-enable
--cir-out "{run_out}/input.cir"
)
for rir_name in "${RIR_LIST[@]}"; do
CHILD_ARGS+=(--cir-tal-uri "$(cir_tal_uri_for_rir "$rir_name")")
done
CHILD_ARGS+=(
--vrps-csv-out "{run_out}/vrps.csv"
--vaps-csv-out "{run_out}/vaps.csv"
--compare-view-trust-anchor "$(compare_view_trust_anchor)"
)
if is_true "$ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE"; then
CHILD_ARGS+=(--enable-child-certificate-validation-cache)
fi
if [[ -n "$RPKI_EXTRA_ARGS" ]]; then
# shellcheck disable=SC2206
local extra_args=( $RPKI_EXTRA_ARGS )
CHILD_ARGS+=("${extra_args[@]}")
fi
}
copy_inner_run_outputs() {
local daemon_state_root="$1"
local run_dir="$2"
local outer_run_index="$3"
local outer_run_id="$4"
local inner_run_dir
inner_run_dir="$(find "$daemon_state_root/runs" -mindepth 1 -maxdepth 1 -type d 2>/dev/null | sort | tail -n 1 || true)"
if [[ -n "$inner_run_dir" && -d "$inner_run_dir" ]]; then
shopt -s dotglob nullglob
cp -a "$inner_run_dir"/. "$run_dir"/
shopt -u dotglob nullglob
fi
[[ -f "$daemon_state_root/daemon-status.json" ]] && cp "$daemon_state_root/daemon-status.json" "$run_dir/daemon-status.inner.json"
[[ -f "$daemon_state_root/daemon-runs.jsonl" ]] && cp "$daemon_state_root/daemon-runs.jsonl" "$run_dir/daemon-runs.inner.jsonl"
normalize_outer_run_metadata "$run_dir" "$outer_run_index" "$outer_run_id" "$inner_run_dir" "$daemon_state_root"
}
normalize_outer_run_metadata() {
local run_dir="$1"
local outer_run_index="$2"
local outer_run_id="$3"
local inner_run_dir="$4"
local daemon_state_root="$5"
python3 - "$run_dir" "$outer_run_index" "$outer_run_id" "$inner_run_dir" "$daemon_state_root" <<'PY'
import json
import pathlib
import sys
run_dir = pathlib.Path(sys.argv[1]).resolve()
outer_run_index = int(sys.argv[2])
outer_run_id = sys.argv[3]
inner_run_dir = sys.argv[4]
daemon_state_root = pathlib.Path(sys.argv[5])
def replace_paths(value):
if isinstance(value, dict):
return {key: replace_paths(item) for key, item in value.items()}
if isinstance(value, list):
return [replace_paths(item) for item in value]
if isinstance(value, str) and inner_run_dir:
return value.replace(inner_run_dir, str(run_dir))
return value
def normalize_summary(summary):
summary = dict(summary)
summary.setdefault("innerRunSeq", summary.get("runSeq"))
summary.setdefault("innerRunId", summary.get("runId"))
summary.setdefault("innerRunDir", summary.get("runDir"))
summary = replace_paths(summary)
summary["runSeq"] = outer_run_index
summary["runId"] = outer_run_id
summary["runDir"] = str(run_dir)
return summary
summary_path = run_dir / "run-summary.json"
if summary_path.exists():
summary = json.loads(summary_path.read_text(encoding="utf-8"))
summary_path.write_text(
json.dumps(normalize_summary(summary), indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
inner_status_path = run_dir / "daemon-status.inner.json"
if not inner_status_path.exists():
raw_status_path = daemon_state_root / "daemon-status.json"
if raw_status_path.exists():
inner_status_path.write_text(raw_status_path.read_text(encoding="utf-8"), encoding="utf-8")
if inner_status_path.exists():
status = json.loads(inner_status_path.read_text(encoding="utf-8"))
status.setdefault("innerLastRunId", status.get("lastRunId"))
status["lastRunId"] = outer_run_id
status["outerRunId"] = outer_run_id
status["outerRunIndex"] = outer_run_index
(run_dir / "daemon-status.json").write_text(
json.dumps(status, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
inner_runs_path = run_dir / "daemon-runs.inner.jsonl"
if not inner_runs_path.exists():
raw_runs_path = daemon_state_root / "daemon-runs.jsonl"
if raw_runs_path.exists():
inner_runs_path.write_text(raw_runs_path.read_text(encoding="utf-8"), encoding="utf-8")
if inner_runs_path.exists():
lines = []
for line in inner_runs_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
lines.append(json.dumps(normalize_summary(json.loads(line)), sort_keys=True))
(run_dir / "daemon-runs.jsonl").write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
PY
}
apply_outer_retention() {
local dirs=()
local retain_limit="$RETAIN_RUNS"
local keep_run="${1:-}"
local run_dir
shopt -s nullglob
for run_dir in "$RUNS_ROOT"/run_[0-9][0-9][0-9][0-9]; do
[[ -d "$run_dir" ]] && dirs+=("$run_dir")
done
shopt -u nullglob
if (( ${#dirs[@]} <= retain_limit )); then
return 0
fi
mapfile -t dirs < <(printf '%s\n' "${dirs[@]}" | sort)
local remove_count=$(( ${#dirs[@]} - retain_limit ))
local removed_count=0
local candidate
for candidate in "${dirs[@]}"; do
if [[ -n "$keep_run" && "$(basename "$candidate")" == "$keep_run" ]]; then
continue
fi
rm -rf "$candidate"
removed_count=$((removed_count + 1))
if (( removed_count >= remove_count )); then
break
fi
done
}
run_one_round() {
local run_index="$1"
local run_id
run_id="$(printf 'run_%04d' "$run_index")"
local run_dir="$RUNS_ROOT/$run_id"
local previous_run_id="$2"
local previous_success_value="$3"
local sync_mode="$4"
local snapshot_reason="$5"
local daemon_state_root="$TMP_DIR/daemon-$run_id"
local started_at
local completed_at
local daemon_exit_code
local summary_state
mkdir -p "$run_dir" "$daemon_state_root"
apply_outer_retention "$run_id"
started_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
write_run_meta "$run_dir/run-meta.json" "running" "$run_index" "$run_id" "$sync_mode" \
"$snapshot_reason" "$previous_run_id" "$previous_success_value" "$started_at" "" \
"$INVALID_DB_PATH" "$INVALID_STATE_PATH" "$INVALID_TMP_PATH" "" "$PACKAGE_ROOT" "$ENV_FILE" \
"$RUN_META_PERIODIC_ENABLED" "$RUN_META_PERIODIC_MAX_DELTAS" "$RUN_META_PERIODIC_DELTA_COUNT" \
"$RUN_META_PERIODIC_FORCED" "$RUN_META_RESET_DB_STAGING_PATH" "$RUN_META_RESET_DB_CLEANUP_STATUS"
prepare_live_ta_inputs_for_run "$run_id" "$sync_mode"
build_child_args
if is_true "$RPKI_ANALYZE"; then
CHILD_ARGS+=(--analyze --analysis-out "$run_dir/analyze")
fi
local daemon_args=(
--state-root "$daemon_state_root"
--rpki-bin "$RPKI_BIN"
--interval-secs 0
--max-runs 1
--retain-runs "$RETAIN_RUNS"
--work-db "$DB_DIR/work-db"
--repo-bytes-db "$DB_DIR/repo-bytes.db"
)
if [[ -x "$DB_STATS_BIN" ]]; then
daemon_args+=(--db-stats-bin "$DB_STATS_BIN")
if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then
daemon_args+=(--db-stats-exact-every "$DB_STATS_EXACT_EVERY")
fi
fi
set +e
env \
RPKI_PROGRESS_LOG="$RPKI_PROGRESS_LOG" \
RPKI_PROGRESS_SLOW_SECS="$RPKI_PROGRESS_SLOW_SECS" \
RPKI_PROGRESS_STAGE_FRESH_SLOW_MS="$RPKI_PROGRESS_STAGE_FRESH_SLOW_MS" \
RPKI_PROGRESS_PP_CONTROL_SLOW_MS="$RPKI_PROGRESS_PP_CONTROL_SLOW_MS" \
RPKI_PROGRESS_PP_CACHE_SLOW_MS="$RPKI_PROGRESS_PP_CACHE_SLOW_MS" \
RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS="$RPKI_PROGRESS_CONTROL_LOOP_SLOW_MS" \
"$RPKI_DAEMON_BIN" "${daemon_args[@]}" -- "${CHILD_ARGS[@]}" \
> "$run_dir/daemon-stdout.log" 2> "$run_dir/daemon-stderr.log"
daemon_exit_code=$?
set -e
copy_inner_run_outputs "$daemon_state_root" "$run_dir" "$run_index" "$run_id"
completed_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
summary_state="$(summary_status "$run_dir/run-summary.json")"
local final_status="failed"
if [[ "$daemon_exit_code" -eq 0 && "$summary_state" == "success" ]]; then
final_status="success"
fi
if [[ -n "$RUN_META_RESET_DB_STAGING_PATH" ]]; then
if RUN_META_RESET_DB_CLEANUP_STATUS="$(finalize_periodic_reset_state_db "$final_status" "$RUN_META_RESET_DB_STAGING_PATH")"; then
:
else
final_status="failed"
fi
fi
write_run_meta "$run_dir/run-meta.json" "$final_status" "$run_index" "$run_id" "$sync_mode" \
"$snapshot_reason" "$previous_run_id" "$previous_success_value" "$started_at" "$completed_at" \
"$INVALID_DB_PATH" "$INVALID_STATE_PATH" "$INVALID_TMP_PATH" "$daemon_exit_code" "$PACKAGE_ROOT" "$ENV_FILE" \
"$RUN_META_PERIODIC_ENABLED" "$RUN_META_PERIODIC_MAX_DELTAS" "$RUN_META_PERIODIC_DELTA_COUNT" \
"$RUN_META_PERIODIC_FORCED" "$RUN_META_RESET_DB_STAGING_PATH" "$RUN_META_RESET_DB_CLEANUP_STATUS"
printf '%s\n' "$run_id" > "$META_DIR/last-run-id"
if is_true "$CLEAN_TMP_AFTER_RUN"; then
rm -rf "$daemon_state_root"
fi
apply_outer_retention
[[ "$final_status" == "success" ]]
}
main() {
if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then
usage
exit 0
fi
require_command python3
require_command date
require_command find
if [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
require_command curl
validate_positive_int "LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS" "$LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS"
validate_positive_int "LIVE_TA_REFRESH_MAX_TIME_SECS" "$LIVE_TA_REFRESH_MAX_TIME_SECS"
fi
validate_max_runs
validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS"
validate_non_negative_int "STOP_AFTER_SECS" "$STOP_AFTER_SECS"
validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS"
validate_rsync_scope
validate_tal_input_mode
validate_non_negative_int "PERIODIC_SNAPSHOT_MAX_DELTAS" "$PERIODIC_SNAPSHOT_MAX_DELTAS"
if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then
validate_positive_int "DB_STATS_EXACT_EVERY" "$DB_STATS_EXACT_EVERY"
fi
parse_rirs
[[ -x "$RPKI_BIN" ]] || die "missing executable: $RPKI_BIN"
[[ -x "$RPKI_DAEMON_BIN" ]] || die "missing executable: $RPKI_DAEMON_BIN"
local rir_name
for rir_name in "${RIR_LIST[@]}"; do
if [[ "$TAL_INPUT_MODE" == "url" ]]; then
[[ -n "$(tal_url_for_rir "$rir_name")" ]] || die "missing TAL URL for $rir_name"
elif [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
[[ -f "$(tal_file_for_rir "$rir_name")" ]] || die "missing TAL fixture for $rir_name"
[[ -n "$(tal_https_uri_from_fixture "$(tal_file_for_rir "$rir_name")")" ]] || die "missing http(s) TA URI in TAL fixture for $rir_name"
else
[[ -f "$(tal_file_for_rir "$rir_name")" ]] || die "missing TAL fixture for $rir_name"
[[ -f "$(ta_file_for_rir "$rir_name")" ]] || die "missing TA fixture for $rir_name"
fi
done
mkdir -p "$RUNS_ROOT" "$LOG_ROOT" "$DB_DIR" "$META_DIR" "$TMP_DIR" "$INVALID_ROOT" "$RESET_STAGING_ROOT" "$LIVE_TA_REFRESH_DIR"
if is_true "$ALLOW_RSYNC_MIRROR_REUSE"; then
mkdir -p "$RSYNC_MIRROR_ROOT"
fi
prepare_competing_rp_state
write_machine_snapshot "before"
local max_index
local next_index
local run_forever=0
local stop_index=0
local started_epoch
local elapsed_secs
started_epoch="$(date +%s)"
max_index="$(max_existing_run_index)"
next_index=$((max_index + 1))
if (( MAX_RUNS < 0 )); then
run_forever=1
echo "run_soak mode=continuous max_existing_run_index=$max_index next_run=$(printf 'run_%04d' "$next_index")"
else
stop_index=$((max_index + MAX_RUNS))
echo "run_soak mode=fixed max_existing_run_index=$max_index next_run=$(printf 'run_%04d' "$next_index") stop_run=$(printf 'run_%04d' "$stop_index")"
fi
local any_failed=0
while (( run_forever == 1 || next_index <= stop_index )); do
INVALID_DB_PATH=""
INVALID_STATE_PATH=""
INVALID_TMP_PATH=""
PERIODIC_SCAN_STATUS=""
PERIODIC_SCAN_DELTA_COUNT=""
PERIODIC_SCAN_SNAPSHOT_RUN_ID=""
PERIODIC_SCAN_DETAIL=""
RESET_DB_STAGING_PATH=""
RESET_DB_CLEANUP_STATUS=""
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
RUN_META_PERIODIC_ENABLED="true"
else
RUN_META_PERIODIC_ENABLED="false"
fi
RUN_META_PERIODIC_MAX_DELTAS="$PERIODIC_SNAPSHOT_MAX_DELTAS"
RUN_META_PERIODIC_DELTA_COUNT=""
RUN_META_PERIODIC_FORCED="false"
RUN_META_RESET_DB_STAGING_PATH=""
RUN_META_RESET_DB_CLEANUP_STATUS=""
local previous_run_id=""
local previous_success_value=""
local sync_mode="snapshot"
local snapshot_reason=""
if (( next_index > 1 )); then
previous_run_id="$(printf 'run_%04d' $((next_index - 1)))"
if previous_run_success "$RUNS_ROOT/$previous_run_id"; then
previous_success_value="true"
if delta_state_available; then
sync_mode="delta"
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
if periodic_snapshot_force_needed; then
RUN_META_PERIODIC_DELTA_COUNT="$PERIODIC_SCAN_DELTA_COUNT"
RUN_META_PERIODIC_FORCED="true"
sync_mode="snapshot"
snapshot_reason="periodic_snapshot_delta_limit"
prepare_periodic_reset_state_db "$(printf 'run_%04d' "$next_index")"
RUN_META_RESET_DB_STAGING_PATH="$RESET_DB_STAGING_PATH"
RUN_META_RESET_DB_CLEANUP_STATUS="$RESET_DB_CLEANUP_STATUS"
echo "periodic snapshot reset forcing snapshot run=$(printf 'run_%04d' "$next_index") delta_count=$PERIODIC_SCAN_DELTA_COUNT max_deltas=$PERIODIC_SNAPSHOT_MAX_DELTAS"
else
if [[ "$PERIODIC_SCAN_STATUS" == "ok" ]]; then
RUN_META_PERIODIC_DELTA_COUNT="$PERIODIC_SCAN_DELTA_COUNT"
fi
fi
fi
else
sync_mode="snapshot"
snapshot_reason="missing_db"
fi
else
previous_success_value="false"
if is_true "$FAILURE_SNAPSHOT_RESET"; then
isolate_state_after_failure "$previous_run_id"
sync_mode="snapshot"
snapshot_reason="previous_run_failed"
else
die "previous run is not successful: $previous_run_id"
fi
fi
else
sync_mode="snapshot"
if db_state_exists; then
isolate_state_after_failure "no_previous_run"
snapshot_reason="no_successful_previous_run"
else
snapshot_reason="first_run"
fi
fi
echo "starting run $(printf 'run_%04d' "$next_index") sync_mode=$sync_mode"
if run_one_round "$next_index" "$previous_run_id" "$previous_success_value" "$sync_mode" "$snapshot_reason"; then
echo "completed run $(printf 'run_%04d' "$next_index") status=success"
else
echo "completed run $(printf 'run_%04d' "$next_index") status=failed" >&2
any_failed=1
fi
if (( STOP_AFTER_SECS > 0 )); then
elapsed_secs=$(( $(date +%s) - started_epoch ))
if (( elapsed_secs >= STOP_AFTER_SECS )); then
echo "run_soak stop_after_secs reached elapsed_secs=$elapsed_secs stop_after_secs=$STOP_AFTER_SECS"
break
fi
fi
if (( (run_forever == 1 || next_index < stop_index) && INTERVAL_SECS > 0 )); then
sleep "$INTERVAL_SECS"
fi
next_index=$((next_index + 1))
done
write_machine_snapshot "after"
exit "$any_failed"
}
main "$@"