diff --git a/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh b/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh index baab5e3..60e91eb 100755 --- a/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh +++ b/scripts/inter_rp/run_inter_rp_metrics_sidecar.sh @@ -7,19 +7,41 @@ if [[ -f "$CONFIG_FILE" ]]; then source "$CONFIG_FILE" fi +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +INTER_RP_METRICS_EXPORTER="${INTER_RP_METRICS_EXPORTER:-ours-routinator}" RPKI_INTER_RP_METRICS_BIN="${RPKI_INTER_RP_METRICS_BIN:-./bin/rpki_inter_rp_metrics}" +OURS_ROUTINATOR_EXPORTER="${OURS_ROUTINATOR_EXPORTER:-$SCRIPT_DIR/inter_rp_ours_routinator_exporter.py}" OURS_RUN_ROOT="${OURS_RUN_ROOT:?OURS_RUN_ROOT is required}" PEER_ROOT="${PEER_ROOT:-/root/inter-rp-aggregator/synced-from-200}" LISTEN="${INTER_RP_METRICS_LISTEN:-0.0.0.0:9557}" POLL_SECS="${INTER_RP_METRICS_POLL_SECS:-30}" +SCAN_TTL_SECONDS="${INTER_RP_SCAN_TTL_SECONDS:-20}" INSTANCE="${INTER_RP_METRICS_INSTANCE:-remote231-inter-rp}" LOG_DIR="${INTER_RP_METRICS_LOG_DIR:-./logs}" mkdir -p "$LOG_DIR" -exec "$RPKI_INTER_RP_METRICS_BIN" \ - --ours-run-root "$OURS_RUN_ROOT" \ - --peer-root "$PEER_ROOT" \ - --listen "$LISTEN" \ - --poll-secs "$POLL_SECS" \ - --instance "$INSTANCE" \ - >>"$LOG_DIR/inter-rp-metrics.log" 2>&1 +case "$INTER_RP_METRICS_EXPORTER" in + ours-routinator) + exec env \ + OURS_RUN_ROOT="$OURS_RUN_ROOT" \ + PEER_ROOT="$PEER_ROOT" \ + INTER_RP_INSTANCE="$INSTANCE" \ + INTER_RP_LISTEN="$LISTEN" \ + INTER_RP_SCAN_TTL_SECONDS="$SCAN_TTL_SECONDS" \ + "$OURS_ROUTINATOR_EXPORTER" \ + >>"$LOG_DIR/inter-rp-metrics.log" 2>&1 + ;; + rust-generic) + exec "$RPKI_INTER_RP_METRICS_BIN" \ + --ours-run-root "$OURS_RUN_ROOT" \ + --peer-root "$PEER_ROOT" \ + --listen "$LISTEN" \ + --poll-secs "$POLL_SECS" \ + --instance "$INSTANCE" \ + >>"$LOG_DIR/inter-rp-metrics.log" 2>&1 + ;; + *) + echo "unknown INTER_RP_METRICS_EXPORTER: $INTER_RP_METRICS_EXPORTER" >&2 + exit 2 + ;; +esac diff --git a/scripts/soak/build_portable_soak_package.sh b/scripts/soak/build_portable_soak_package.sh index fcbae31..8ab1f86 100755 --- a/scripts/soak/build_portable_soak_package.sh +++ b/scripts/soak/build_portable_soak_package.sh @@ -89,6 +89,8 @@ install -m 0755 "$SCRIPT_DIR/run_soak.sh" "$STAGE_DIR/run_soak.sh" install -m 0755 "$SCRIPT_DIR/run_24h_soak_with_metrics.sh" "$STAGE_DIR/run_24h_soak_with_metrics.sh" install -m 0755 "$SCRIPT_DIR/fixed_phase_loop.sh" "$STAGE_DIR/scripts/soak/fixed_phase_loop.sh" install -m 0755 "$SCRIPT_DIR/hourly_soak_report.py" "$STAGE_DIR/scripts/soak/hourly_soak_report.py" +install -m 0755 "$SCRIPT_DIR/publish_remote231.sh" "$STAGE_DIR/scripts/soak/publish_remote231.sh" +install -m 0755 "$SCRIPT_DIR/publish_remote231_full.sh" "$STAGE_DIR/scripts/soak/publish_remote231_full.sh" install -m 0644 "$SCRIPT_DIR/portable-soak.env.example" "$STAGE_DIR/.env" install -m 0644 "$SCRIPT_DIR/portable-soak.env.example" "$STAGE_DIR/portable-soak.env.example" @@ -115,6 +117,7 @@ cp -a "$REPO_ROOT/tests/fixtures/tal" "$STAGE_DIR/fixtures/" cp -a "$REPO_ROOT/tests/fixtures/ta" "$STAGE_DIR/fixtures/" cp -a "$REPO_ROOT/scripts/periodic" "$STAGE_DIR/scripts/" cp -a "$REPO_ROOT/scripts/cir" "$STAGE_DIR/scripts/" +cp -a "$REPO_ROOT/scripts/inter_rp" "$STAGE_DIR/scripts/" cp -a "$REPO_ROOT/monitor" "$STAGE_DIR/" find "$STAGE_DIR/scripts" -type d -name __pycache__ -prune -exec rm -rf {} + diff --git a/scripts/soak/publish_remote231_full.sh b/scripts/soak/publish_remote231_full.sh new file mode 100755 index 0000000..054389c --- /dev/null +++ b/scripts/soak/publish_remote231_full.sh @@ -0,0 +1,630 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" + +REMOTE_HOST="${REMOTE_HOST:-root@47.251.127.231}" +REMOTE_ROOT="${REMOTE_ROOT:-/root/ours-rp-continuous/portable-soak}" +MODE="${MODE:-dry-run}" +PUBLISH_MODE="${PUBLISH_MODE:-snapshot}" +PROFILE="${PROFILE:-release}" +OUT_DIR="${OUT_DIR:-$REPO_ROOT/target/remote231_publish}" +PACKAGE_PREFIX="${PACKAGE_PREFIX:-remote231-publish}" +PACKAGE_ARCHIVE="${PACKAGE_ARCHIVE:-}" +BUILD_PACKAGE="${BUILD_PACKAGE:-1}" +RESTART_QUERY_SERVICE="${RESTART_QUERY_SERVICE:-1}" +START_ARTIFACT_METRICS="${START_ARTIFACT_METRICS:-1}" +START_QUERY_SERVICE="${START_QUERY_SERVICE:-1}" +START_INTER_RP="${START_INTER_RP:-1}" +START_MONITOR_STACK="${START_MONITOR_STACK:-1}" +START_RPKI_SOAK="${START_RPKI_SOAK:-1}" +RESTART_FIXED_PHASE_LOOP="${RESTART_FIXED_PHASE_LOOP:-1}" +START_ROUTINATOR_SYNC="${START_ROUTINATOR_SYNC:-1}" +VERIFY_INTER_RP_DASHBOARD="${VERIFY_INTER_RP_DASHBOARD:-1}" +WAIT_FIRST_RUN="${WAIT_FIRST_RUN:-1}" +FIRST_RUN_TIMEOUT_SECS="${FIRST_RUN_TIMEOUT_SECS:-7200}" + +usage() { + cat <<'USAGE' +Usage: + scripts/soak/publish_remote231_full.sh [--snapshot|--delta] [--execute|--dry-run] + [--package ] + [--remote-host ] [--remote-root ] + +Builds or reuses a portable soak package, publishes it to remote231, restores the fixed-phase +ours RP soak loop, and starts/verifies sidecars: + - rpki_artifact_metrics (:9556) + - rpki_query_service (:9560, optional) + - ours-rp vs Routinator inter-RP exporter (:9557) + - local Routinator artifact sync helper + - Prometheus/Grafana monitor stack + +Modes: + --snapshot Preserve run history but move state/db aside; first new run is snapshot. + --delta Preserve state/db and continue from current DB; next run should be delta. + +Default is dry-run + snapshot. + +Environment overrides: + REMOTE_HOST=root@47.251.127.231 + REMOTE_ROOT=/root/ours-rp-continuous/portable-soak + PROFILE=release + BUILD_PACKAGE=1 + RESTART_QUERY_SERVICE=1 + START_QUERY_SERVICE=1 + START_INTER_RP=1 + START_MONITOR_STACK=1 + WAIT_FIRST_RUN=1 +USAGE +} + +die() { + echo "error: $*" >&2 + exit 2 +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --snapshot) + PUBLISH_MODE="snapshot" + ;; + --delta) + PUBLISH_MODE="delta" + ;; + --execute) + MODE="execute" + ;; + --dry-run) + MODE="dry-run" + ;; + --package) + shift + PACKAGE_ARCHIVE="${1:?--package requires a value}" + BUILD_PACKAGE=0 + ;; + --remote-host) + shift + REMOTE_HOST="${1:?--remote-host requires a value}" + ;; + --remote-root) + shift + REMOTE_ROOT="${1:?--remote-root requires a value}" + ;; + --profile) + shift + PROFILE="${1:?--profile requires a value}" + ;; + --no-query-service) + START_QUERY_SERVICE=0 + RESTART_QUERY_SERVICE=0 + ;; + --no-monitor-stack) + START_MONITOR_STACK=0 + ;; + --no-inter-rp) + START_INTER_RP=0 + ;; + --no-wait-first-run) + WAIT_FIRST_RUN=0 + ;; + --help|-h) + usage + exit 0 + ;; + *) + die "unknown argument: $1" + ;; + esac + shift +done + +case "$MODE" in + dry-run|execute) ;; + *) die "invalid mode: $MODE" ;; +esac +case "$PUBLISH_MODE" in + snapshot|delta) ;; + *) die "invalid publish mode: $PUBLISH_MODE" ;; +esac + +run_or_echo() { + if [[ "$MODE" == "execute" ]]; then + "$@" + else + printf '[dry-run] ' + printf '%q ' "$@" + printf '\n' + fi +} + +bool_arg() { + case "${1:-0}" in + 1|true|TRUE|yes|YES|on|ON) return 0 ;; + *) return 1 ;; + esac +} + +require_command() { + command -v "$1" >/dev/null 2>&1 || die "missing required command: $1" +} + +require_command ssh +require_command scp +require_command python3 + +if bool_arg "$BUILD_PACKAGE"; then + if [[ "$PROFILE" == "release" ]]; then + TARGET_DIR="$REPO_ROOT/target/release" + else + TARGET_DIR="$REPO_ROOT/target/$PROFILE" + fi + required_bins=(rpki rpki_daemon db_stats rpki_artifact_metrics rpki_query_service rpki_query_indexer) + missing_bins=() + for bin in "${required_bins[@]}"; do + [[ -x "$TARGET_DIR/$bin" ]] || missing_bins+=("$bin") + done + if (( ${#missing_bins[@]} > 0 )); then + die "missing required $PROFILE binaries: ${missing_bins[*]}; build them before publish" + fi + PACKAGE_ARCHIVE="$( + OUT_DIR="$OUT_DIR" PACKAGE_PREFIX="$PACKAGE_PREFIX" \ + "$SCRIPT_DIR/build_portable_soak_package.sh" --profile "$PROFILE" + )" +else + [[ -n "$PACKAGE_ARCHIVE" ]] || die "--package is required when BUILD_PACKAGE=0" +fi +[[ -f "$PACKAGE_ARCHIVE" ]] || die "package not found: $PACKAGE_ARCHIVE" + +echo "remote231 full publish mode=$MODE publish_mode=$PUBLISH_MODE remote=$REMOTE_HOST root=$REMOTE_ROOT package=$PACKAGE_ARCHIVE" +if [[ -n "$(git -C "$REPO_ROOT" status --short 2>/dev/null || true)" ]]; then + echo "warning: local rpki worktree is dirty; package manifest records dirty provenance" >&2 +fi + +if [[ "$PUBLISH_MODE" == "snapshot" ]]; then + ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE'" <<'REMOTE' +set -euo pipefail +remote_root="$1" +mode="$2" +matching_pids() { + local pattern="$1" + pgrep -af "$pattern" 2>/dev/null | while IFS= read -r line; do + local pid="${line%% *}" + local cmd="${line#* }" + [[ "$pid" =~ ^[0-9]+$ ]] || continue + [[ "$pid" == "$$" || "$pid" == "$BASHPID" || "$pid" == "${PPID:-}" ]] && continue + [[ "$cmd" == *"bash -s --"* && "$cmd" == *"$remote_root"* ]] && continue + printf '%s\n' "$pid" + done | sort -u +} +if [[ "$mode" == "execute" ]]; then + mapfile -t pids < <(matching_pids "fixed_phase_loop.sh --name ours-rp") + if (( ${#pids[@]} > 0 )); then + kill -TERM "${pids[@]}" >/dev/null 2>&1 || true + sleep 2 + fi +else + echo "[dry-run] stop existing fixed_phase_loop.sh --name ours-rp before snapshot publish" +fi +REMOTE + publish_args=( + "$SCRIPT_DIR/publish_remote231.sh" + --package "$PACKAGE_ARCHIVE" + --remote-host "$REMOTE_HOST" + --remote-root "$REMOTE_ROOT" + ) + if bool_arg "$RESTART_QUERY_SERVICE"; then + publish_args+=(--restart-query-service) + fi + if [[ "$MODE" == "execute" ]]; then + publish_args+=(--execute) + else + publish_args+=(--dry-run) + fi + "${publish_args[@]}" + if [[ "$MODE" == "execute" && "$START_QUERY_SERVICE" == "1" && "$RESTART_QUERY_SERVICE" == "1" ]]; then + START_QUERY_SERVICE=0 + fi + WAIT_FIRST_RUN=0 +else + REMOTE_STAGE_PARENT="/root/rpki_publish_packages" + PACKAGE_BASENAME="$(basename "$PACKAGE_ARCHIVE")" + REMOTE_ARCHIVE="$REMOTE_STAGE_PARENT/$PACKAGE_BASENAME" + ssh "$REMOTE_HOST" "mkdir -p '$REMOTE_STAGE_PARENT'" + scp "$PACKAGE_ARCHIVE" "$REMOTE_HOST:$REMOTE_ARCHIVE" + ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$REMOTE_ARCHIVE' '$MODE'" <<'REMOTE' +set -euo pipefail +remote_root="$1" +remote_archive="$2" +mode="$3" + +log() { printf '[delta-publish] %s\n' "$*"; } +run_or_echo() { + if [[ "$mode" == "execute" ]]; then + "$@" + else + printf '[dry-run] ' + printf '%q ' "$@" + printf '\n' + fi +} +matching_pids() { + local pattern="$1" + pgrep -af "$pattern" 2>/dev/null | while IFS= read -r line; do + local pid="${line%% *}" + local cmd="${line#* }" + [[ "$pid" =~ ^[0-9]+$ ]] || continue + [[ "$pid" == "$$" || "$pid" == "$BASHPID" || "$pid" == "${PPID:-}" ]] && continue + [[ "$cmd" == *"bash -s --"* && "$cmd" == *"$remote_root"* ]] && continue + printf '%s\n' "$pid" + done | sort -u +} +terminate_matching() { + local signal="$1" + local pattern="$2" + local -a pids=() + mapfile -t pids < <(matching_pids "$pattern") + if (( ${#pids[@]} > 0 )); then + kill "$signal" "${pids[@]}" >/dev/null 2>&1 || true + fi +} +json_status_is_success() { + local path="$1" + python3 - "$path" <<'PY' +import json, sys +try: + data = json.load(open(sys.argv[1], encoding="utf-8")) +except Exception: + sys.exit(1) +sys.exit(0 if data.get("status") == "success" else 1) +PY +} +max_successful_run_name() { + local candidate + find "$remote_root/runs" -maxdepth 1 -type d -name 'run_*' -printf '%f\n' 2>/dev/null | sort -V | while read -r candidate; do + [[ -n "$candidate" ]] || continue + if json_status_is_success "$remote_root/runs/$candidate/run-meta.json" \ + && json_status_is_success "$remote_root/runs/$candidate/run-summary.json"; then + printf '%s\n' "$candidate" + fi + done | tail -1 +} + +[[ -d "$remote_root" ]] || { echo "remote root not found: $remote_root" >&2; exit 2; } +[[ -f "$remote_archive" ]] || { echo "archive not found: $remote_archive" >&2; exit 2; } +last_run="$(max_successful_run_name || true)" +[[ -n "$last_run" ]] || { echo "no successful run found under $remote_root/runs" >&2; exit 2; } +timestamp="$(date -u +%Y%m%dT%H%M%SZ)" +backup_root="$remote_root/state/backups/pre_delta_publish_${timestamp}_after_${last_run}" +extract_root="$remote_root/state/publish-staging/$timestamp" +new_pkg="$extract_root/portable-soak" +log "last_successful_run=$last_run backup_root=$backup_root mode=$mode" + +if [[ "$mode" == "execute" ]]; then + terminate_matching -TERM "$remote_root/bin/rpki " + terminate_matching -TERM "$remote_root/bin/rpki_daemon " + terminate_matching -TERM "$remote_root/run_soak.sh" + terminate_matching -TERM "fixed_phase_loop.sh --name ours-rp" + sleep 3 + terminate_matching -KILL "$remote_root/bin/rpki " + terminate_matching -KILL "$remote_root/bin/rpki_daemon " + terminate_matching -KILL "$remote_root/run_soak.sh" + terminate_matching -KILL "fixed_phase_loop.sh --name ours-rp" +else + log "would stop current ours-rp soak/fixed-phase processes" +fi + +run_or_echo mkdir -p "$backup_root" "$extract_root" +if [[ "$mode" == "execute" ]]; then + tar -C "$extract_root" -xzf "$remote_archive" + [[ -x "$new_pkg/bin/rpki" ]] || { echo "extracted package missing bin/rpki" >&2; exit 5; } +fi +if [[ -f "$remote_root/.env" ]]; then + run_or_echo cp -a "$remote_root/.env" "$backup_root/env.before" +fi +if [[ -d "$remote_root/bin" ]]; then + run_or_echo mv "$remote_root/bin" "$backup_root/bin.before" +fi +for path in run_soak.sh run_24h_soak_with_metrics.sh scripts monitor fixtures copied-binaries.txt missing-optional-binaries.txt fixtures.txt scripts.txt manifest.json portable-soak.env.example; do + if [[ "$mode" == "execute" && ! -e "$new_pkg/$path" ]]; then + continue + fi + if [[ "$mode" == "execute" ]]; then + rm -rf "$remote_root/$path" + cp -a "$new_pkg/$path" "$remote_root/$path" + else + printf '[dry-run] replace %s from package\n' "$path" + fi +done +run_or_echo cp -a "$new_pkg/bin" "$remote_root/bin" +if [[ -f "$backup_root/env.before" ]]; then + run_or_echo cp -a "$backup_root/env.before" "$remote_root/.env" +fi +run_or_echo mkdir -p "$remote_root/state/db" "$remote_root/state/meta" "$remote_root/tmp" "$remote_root/logs" +if [[ "$mode" == "execute" ]]; then + chmod +x "$remote_root/run_soak.sh" "$remote_root/run_24h_soak_with_metrics.sh" "$remote_root/bin/"* "$remote_root/scripts/soak/"* "$remote_root/scripts/inter_rp/"* 2>/dev/null || true +fi +REMOTE +fi + +ssh "$REMOTE_HOST" "bash -s -- '$REMOTE_ROOT' '$MODE' '$START_RPKI_SOAK' '$START_ARTIFACT_METRICS' '$START_QUERY_SERVICE' '$START_INTER_RP' '$START_ROUTINATOR_SYNC' '$START_MONITOR_STACK' '$VERIFY_INTER_RP_DASHBOARD' '$WAIT_FIRST_RUN' '$FIRST_RUN_TIMEOUT_SECS' '$RESTART_FIXED_PHASE_LOOP'" <<'REMOTE' +set -euo pipefail +remote_root="$1" +mode="$2" +start_rpki_soak="$3" +start_artifact_metrics="$4" +start_query_service="$5" +start_inter_rp="$6" +start_routinator_sync="$7" +start_monitor_stack="$8" +verify_inter_rp_dashboard="$9" +wait_first_run="${10}" +first_run_timeout_secs="${11}" +restart_fixed_phase_loop="${12}" + +log() { printf '[remote231-full] %s\n' "$*"; } +is_true() { + case "${1:-0}" in 1|true|TRUE|yes|YES|on|ON) return 0 ;; *) return 1 ;; esac +} +run_or_echo() { + if [[ "$mode" == "execute" ]]; then + "$@" + else + printf '[dry-run] ' + printf '%q ' "$@" + printf '\n' + fi +} +matching_pids() { + local pattern="$1" + pgrep -af "$pattern" 2>/dev/null | while IFS= read -r line; do + local pid="${line%% *}" + local cmd="${line#* }" + [[ "$pid" =~ ^[0-9]+$ ]] || continue + [[ "$pid" == "$$" || "$pid" == "$BASHPID" || "$pid" == "${PPID:-}" ]] && continue + [[ "$cmd" == *"bash -s --"* && "$cmd" == *"$remote_root"* ]] && continue + printf '%s\n' "$pid" + done | sort -u +} +stop_matching() { + local pattern="$1" + if [[ "$mode" != "execute" ]]; then + log "would stop processes matching: $pattern" + return 0 + fi + local -a pids=() + mapfile -t pids < <(matching_pids "$pattern") + if (( ${#pids[@]} > 0 )); then + kill -TERM "${pids[@]}" >/dev/null 2>&1 || true + sleep 1 + fi +} +ensure_artifact_metrics() { + is_true "$start_artifact_metrics" || return 0 + if curl -fsS --max-time 3 http://127.0.0.1:9556/healthz >/dev/null 2>&1 \ + || curl -fsS --max-time 3 http://127.0.0.1:9556/metrics >/dev/null 2>&1; then + log "artifact metrics already up" + return 0 + fi + stop_matching "$remote_root/bin/rpki_artifact_metrics" + if [[ "$mode" == "execute" ]]; then + nohup "$remote_root/bin/rpki_artifact_metrics" \ + --run-root "$remote_root" \ + --listen 0.0.0.0:9556 \ + --poll-secs 120 \ + --instance remote231-continuous \ + >"$remote_root/logs/artifact-metrics.full-publish.log" 2>&1 & + else + log "would start artifact metrics on :9556" + fi +} +ensure_query_service() { + is_true "$start_query_service" || return 0 + stop_matching "$remote_root/bin/rpki_query_service" + local latest_seq + latest_seq="$(find "$remote_root/runs" -maxdepth 1 -type d -name 'run_*' -printf '%f\n' 2>/dev/null | sort -V | tail -1 | sed 's/run_//;s/^0*//')" + [[ -n "$latest_seq" ]] || latest_seq=0 + local min_seq=$((latest_seq + 1)) + log "starting query service watch_min_run_seq=$min_seq" + if [[ "$mode" == "execute" ]]; then + nohup "$remote_root/bin/rpki_query_service" \ + --query-db "$remote_root/state/query-db" \ + --repo-bytes-db "$remote_root/state/db/repo-bytes.db" \ + --export-root "$remote_root/state/query-exports" \ + --listen 0.0.0.0:9560 \ + --watch-run-root "$remote_root" \ + --watch-interval-secs 60 \ + --watch-min-run-seq "$min_seq" \ + --retain-indexed-runs 10 \ + --indexer-bin "$remote_root/bin/rpki_query_indexer" \ + --projection-entry-limit 20 \ + >"$remote_root/logs/query-service.full-publish.log" 2>&1 & + else + log "would start query service on :9560" + fi +} +ensure_inter_rp() { + is_true "$start_inter_rp" || return 0 + stop_matching "rpki_inter_rp_metrics" + stop_matching "inter_rp_ours_routinator_exporter.py" + if [[ "$mode" == "execute" ]]; then + nohup env \ + OURS_RUN_ROOT="$remote_root" \ + PEER_ROOT="$remote_root/inter-rp-peers" \ + INTER_RP_INSTANCE=remote231-inter-rp \ + INTER_RP_LISTEN=0.0.0.0:9557 \ + INTER_RP_SCAN_TTL_SECONDS=20 \ + "$remote_root/scripts/inter_rp/inter_rp_ours_routinator_exporter.py" \ + >"$remote_root/logs/inter-rp-metrics.full-publish.log" 2>&1 & + else + log "would start inter-rp ours+routinator exporter on :9557" + fi +} +ensure_routinator_sync() { + is_true "$start_routinator_sync" || return 0 + if pgrep -af "sync_local_routinator_peer.sh" >/dev/null 2>&1; then + log "routinator local sync already running" + return 0 + fi + if [[ -x "$remote_root/ops/sync_local_routinator_peer.sh" ]]; then + if [[ "$mode" == "execute" ]]; then + nohup "$remote_root/ops/sync_local_routinator_peer.sh" >"$remote_root/logs/sync-local-routinator.full-publish.log" 2>&1 & + else + log "would start existing ops/sync_local_routinator_peer.sh" + fi + else + log "warning: missing $remote_root/ops/sync_local_routinator_peer.sh; inter-RP routinator latest may stale" + fi +} +ensure_monitor_stack() { + is_true "$start_monitor_stack" || return 0 + if [[ -f "$remote_root/monitor/docker-compose.yml" ]]; then + if command -v docker >/dev/null 2>&1; then + if [[ "$mode" == "execute" ]]; then + (cd "$remote_root/monitor" && docker compose up -d) + else + log "would run docker compose up -d under $remote_root/monitor" + fi + else + log "warning: docker not installed; cannot start monitor stack" + fi + else + log "warning: missing monitor/docker-compose.yml" + fi +} +ensure_fixed_phase_soak() { + is_true "$start_rpki_soak" || return 0 + if pgrep -af "fixed_phase_loop.sh --name ours-rp" >/dev/null 2>&1; then + log "ours-rp fixed phase loop already running" + return 0 + fi + is_true "$restart_fixed_phase_loop" || { log "fixed phase loop restart disabled"; return 0; } + if [[ "$mode" == "execute" ]]; then + nohup bash "$remote_root/scripts/soak/fixed_phase_loop.sh" \ + --name ours-rp \ + --cycle-secs 900 \ + --offset-secs 0 \ + --lock-file /var/lock/rpki-heavy-run.lock \ + --lock-wait-secs 60 \ + -- env PACKAGE_ROOT="$remote_root" ENV_FILE="$remote_root/.env" "$remote_root/run_soak.sh" \ + >"$remote_root/logs/fixed-phase-ours.full-publish.log" 2>&1 & + else + log "would start ours-rp fixed phase loop" + fi +} +wait_url() { + local name="$1" + local url="$2" + local deadline=$((SECONDS + 60)) + while (( SECONDS < deadline )); do + if curl -fsS --max-time 5 "$url" >/dev/null 2>&1; then + log "$name up" + return 0 + fi + sleep 2 + done + echo "$name did not become ready: $url" >&2 + return 1 +} +verify_prometheus_queries() { + is_true "$verify_inter_rp_dashboard" || return 0 + [[ "$mode" == "execute" ]] || { log "would verify dashboard PromQL queries"; return 0; } + python3 - "$remote_root/monitor/grafana/dashboards/ours-rp-inter-rp.json" <<'PY' +import json, pathlib, urllib.parse, urllib.request, sys +path = pathlib.Path(sys.argv[1]) +if not path.exists(): + print("missing inter-rp dashboard json", file=sys.stderr) + sys.exit(1) +dash = json.loads(path.read_text()) +exprs = [] +def walk(value): + if isinstance(value, dict): + if "expr" in value: + exprs.append(value["expr"]) + for item in value.values(): + walk(item) + elif isinstance(value, list): + for item in value: + walk(item) +walk(dash) +empty = [] +for expr in sorted(set(exprs)): + url = "http://127.0.0.1:9090/api/v1/query?query=" + urllib.parse.quote(expr) + data = json.load(urllib.request.urlopen(url, timeout=10)) + if not data.get("data", {}).get("result", []): + empty.append(expr) +if empty: + print("empty dashboard queries:") + print("\n".join(empty)) + sys.exit(1) +print(f"inter-rp dashboard queries ok count={len(set(exprs))}") +PY +} +wait_next_run_if_requested() { + is_true "$wait_first_run" || return 0 + [[ "$mode" == "execute" ]] || { log "would wait for next run completion"; return 0; } + local before latest deadline status + before="$(find "$remote_root/runs" -maxdepth 1 -type d -name 'run_*' -printf '%f\n' 2>/dev/null | sort -V | tail -1)" + deadline=$((SECONDS + first_run_timeout_secs)) + log "waiting for next completed run after ${before:-none}" + while (( SECONDS < deadline )); do + latest="$(find "$remote_root/runs" -maxdepth 1 -type d -name 'run_*' -printf '%f\n' 2>/dev/null | sort -V | tail -1)" + if [[ -n "$latest" && "$latest" != "$before" && -f "$remote_root/runs/$latest/run-summary.json" ]]; then + status="$(python3 - "$remote_root/runs/$latest/run-summary.json" <<'PY' +import json, sys +try: + print(json.load(open(sys.argv[1], encoding="utf-8")).get("status", "missing")) +except Exception: + print("missing") +PY +)" + if [[ "$status" == "success" ]]; then + python3 - "$remote_root/runs/$latest/run-summary.json" <<'PY' +import json, sys +s = json.load(open(sys.argv[1], encoding="utf-8")) +st = s.get("stageTiming") or {} +rc = s.get("reportCounts") or {} +print( + "next run success " + f"run={s.get('runId')} wall_ms={s.get('wallMs')} " + f"validation_ms={st.get('validation_ms')} repo_sync_ms_total={st.get('repo_sync_ms_total')} " + f"vrps={rc.get('vrps')} aspas={rc.get('aspas')} pp={rc.get('publicationPoints')}" +) +PY + return 0 + fi + fi + sleep 5 + done + echo "timeout waiting for next completed run" >&2 + return 1 +} + +ensure_artifact_metrics +ensure_query_service +ensure_inter_rp +ensure_routinator_sync +ensure_monitor_stack +ensure_fixed_phase_soak + +if [[ "$mode" == "execute" ]]; then + sleep 5 + wait_url "artifact metrics" http://127.0.0.1:9556/metrics + if is_true "$start_query_service"; then wait_url "query service" http://127.0.0.1:9560/api/v1; fi + if is_true "$start_inter_rp"; then wait_url "inter-rp metrics" http://127.0.0.1:9557/metrics; fi + if is_true "$start_monitor_stack"; then wait_url "prometheus" http://127.0.0.1:9090/-/ready; wait_url "grafana" http://127.0.0.1:3000/api/health; fi + if is_true "$start_inter_rp"; then + curl -fsS http://127.0.0.1:9557/metrics | grep -E 'inter_rp_(service_last_reload_success|parse_errors|repo_sync_overlap_total|vrps_diff_by_class)' | head -30 + fi + verify_prometheus_queries +fi + +wait_next_run_if_requested + +log "process summary" +pgrep -af 'fixed_phase_loop|run_soak.sh|rpki_artifact_metrics|rpki_query_service|inter_rp_ours_routinator_exporter|prometheus|grafana|routinator' || true +log "df" +df -h / /root 2>/dev/null | sort -u || true +REMOTE + +echo "remote231 full publish finished mode=$MODE publish_mode=$PUBLISH_MODE" diff --git a/src/storage.rs b/src/storage.rs index 9473b2d..1c93834 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -80,6 +80,13 @@ enum PublicationPointCacheProjectionIndexState { }, } +#[derive(Clone, Copy)] +pub(crate) enum PublicationPointCacheProjectionWriteAction<'a> { + Keep, + Write(&'a PublicationPointCacheProjection), + Delete { manifest_rsync_uri: &'a str }, +} + fn process_vm_rss_kb() -> Option { let status = std::fs::read_to_string("/proc/self/status").ok()?; status.lines().find_map(|line| { @@ -2533,20 +2540,27 @@ fn write_roa_cache_projection_to_batch( fn write_publication_point_cache_projection_to_batch( projection_cf: &ColumnFamily, batch: &mut WriteBatch, - projection: Option<&PublicationPointCacheProjection>, + action: PublicationPointCacheProjectionWriteAction<'_>, timing: Option<&mut VcirReplaceTimingBreakdown>, ) -> StorageResult<()> { - let Some(projection) = projection else { - return Ok(()); - }; - projection.validate_internal()?; - let key = publication_point_cache_projection_key(&projection.manifest_rsync_uri); - let value = encode_cbor(projection, "publication_point_cache_projection")?; - if let Some(timing) = timing { - timing.publication_point_cache_projection_value_bytes = value.len() as u64; + match action { + PublicationPointCacheProjectionWriteAction::Keep => Ok(()), + PublicationPointCacheProjectionWriteAction::Write(projection) => { + projection.validate_internal()?; + let key = publication_point_cache_projection_key(&projection.manifest_rsync_uri); + let value = encode_cbor(projection, "publication_point_cache_projection")?; + if let Some(timing) = timing { + timing.publication_point_cache_projection_value_bytes = value.len() as u64; + } + batch.put_cf(projection_cf, key.as_bytes(), value); + Ok(()) + } + PublicationPointCacheProjectionWriteAction::Delete { manifest_rsync_uri } => { + let key = publication_point_cache_projection_key(manifest_rsync_uri); + batch.delete_cf(projection_cf, key.as_bytes()); + Ok(()) + } } - batch.put_cf(projection_cf, key.as_bytes(), value); - Ok(()) } impl RocksStore { @@ -2975,6 +2989,22 @@ impl RocksStore { vcir: &ValidatedCaInstanceResult, roa_cache_context: Option<&RoaCacheProjectionContext>, publication_point_projection: Option<&PublicationPointCacheProjection>, + ) -> StorageResult<()> { + let publication_point_projection_action = publication_point_projection + .map(PublicationPointCacheProjectionWriteAction::Write) + .unwrap_or(PublicationPointCacheProjectionWriteAction::Keep); + self.put_vcir_with_projection_action( + vcir, + roa_cache_context, + publication_point_projection_action, + ) + } + + fn put_vcir_with_projection_action( + &self, + vcir: &ValidatedCaInstanceResult, + roa_cache_context: Option<&RoaCacheProjectionContext>, + publication_point_projection_action: PublicationPointCacheProjectionWriteAction<'_>, ) -> StorageResult<()> { vcir.validate_internal()?; let vcir_cf = self.cf(CF_VCIR)?; @@ -3000,11 +3030,13 @@ impl RocksStore { write_publication_point_cache_projection_to_batch( pp_projection_cf, &mut batch, - publication_point_projection, + publication_point_projection_action, None, )?; self.write_batch(batch)?; - self.update_publication_point_cache_projection_index(publication_point_projection)?; + self.apply_publication_point_cache_projection_index_action( + publication_point_projection_action, + )?; Ok(()) } @@ -3032,6 +3064,22 @@ impl RocksStore { vcir: &ValidatedCaInstanceResult, roa_cache_context: Option<&RoaCacheProjectionContext>, publication_point_projection: Option<&PublicationPointCacheProjection>, + ) -> StorageResult { + let publication_point_projection_action = publication_point_projection + .map(PublicationPointCacheProjectionWriteAction::Write) + .unwrap_or(PublicationPointCacheProjectionWriteAction::Keep); + self.replace_vcir_manifest_replay_meta_and_projection_action( + vcir, + roa_cache_context, + publication_point_projection_action, + ) + } + + pub(crate) fn replace_vcir_manifest_replay_meta_and_projection_action( + &self, + vcir: &ValidatedCaInstanceResult, + roa_cache_context: Option<&RoaCacheProjectionContext>, + publication_point_projection_action: PublicationPointCacheProjectionWriteAction<'_>, ) -> StorageResult { let mut timing = VcirReplaceTimingBreakdown { rss_before_kb: process_vm_rss_kb(), @@ -3085,7 +3133,7 @@ impl RocksStore { write_publication_point_cache_projection_to_batch( pp_projection_cf, &mut batch, - publication_point_projection, + publication_point_projection_action, Some(&mut timing), )?; timing.publication_point_cache_projection_encode_ms = @@ -3100,7 +3148,9 @@ impl RocksStore { let write_batch_started = std::time::Instant::now(); self.write_batch(batch)?; - self.update_publication_point_cache_projection_index(publication_point_projection)?; + self.apply_publication_point_cache_projection_index_action( + publication_point_projection_action, + )?; timing.write_batch_ms = write_batch_started.elapsed().as_millis() as u64; timing.rss_after_write_batch_kb = process_vm_rss_kb(); Ok(timing) @@ -3357,13 +3407,25 @@ impl RocksStore { Ok(entries) } + fn apply_publication_point_cache_projection_index_action( + &self, + action: PublicationPointCacheProjectionWriteAction<'_>, + ) -> StorageResult<()> { + match action { + PublicationPointCacheProjectionWriteAction::Keep => Ok(()), + PublicationPointCacheProjectionWriteAction::Write(projection) => { + self.update_publication_point_cache_projection_index(projection) + } + PublicationPointCacheProjectionWriteAction::Delete { manifest_rsync_uri } => { + self.delete_publication_point_cache_projection_index_entry(manifest_rsync_uri) + } + } + } + fn update_publication_point_cache_projection_index( &self, - projection: Option<&PublicationPointCacheProjection>, + projection: &PublicationPointCacheProjection, ) -> StorageResult<()> { - let Some(projection) = projection else { - return Ok(()); - }; let mut guard = self .publication_point_cache_projection_index .lock() @@ -3376,17 +3438,22 @@ impl RocksStore { index, bytes: total_bytes, } => { - *total_bytes = total_bytes.saturating_add(bytes.len()); - index.insert( + if let Some(previous) = index.insert( projection.manifest_rsync_uri.clone(), - Arc::<[u8]>::from(bytes), - ); + Arc::<[u8]>::from(bytes.clone()), + ) { + *total_bytes = total_bytes.saturating_sub(previous.len()); + } + *total_bytes = total_bytes.saturating_add(bytes.len()); } PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, bytes: total_bytes, limit, } => { + if let Some(previous) = index.remove(&projection.manifest_rsync_uri) { + *total_bytes = total_bytes.saturating_sub(previous.len()); + } if total_bytes.saturating_add(bytes.len()) <= *limit { *total_bytes += bytes.len(); index.insert( @@ -3400,11 +3467,13 @@ impl RocksStore { PublicationPointCacheProjectionIndexState::LoadedMmap { dirty, dirty_bytes, .. } => { - *dirty_bytes = dirty_bytes.saturating_add(bytes.len()); - dirty.insert( + if let Some(previous) = dirty.insert( projection.manifest_rsync_uri.clone(), - Arc::<[u8]>::from(bytes), - ); + Arc::<[u8]>::from(bytes.clone()), + ) { + *dirty_bytes = dirty_bytes.saturating_sub(previous.len()); + } + *dirty_bytes = dirty_bytes.saturating_add(bytes.len()); } PublicationPointCacheProjectionIndexState::Uninitialized | PublicationPointCacheProjectionIndexState::Disabled => {} @@ -3412,6 +3481,84 @@ impl RocksStore { Ok(()) } + fn delete_publication_point_cache_projection_index_entry( + &self, + manifest_rsync_uri: &str, + ) -> StorageResult<()> { + let mut invalidate_mmap_files = false; + { + let mut guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!( + "publication point cache index lock poisoned: {e}" + )) + })?; + match &mut *guard { + PublicationPointCacheProjectionIndexState::Loaded { + index, + bytes: total_bytes, + } => { + if let Some(previous) = index.remove(manifest_rsync_uri) { + *total_bytes = total_bytes.saturating_sub(previous.len()); + } + } + PublicationPointCacheProjectionIndexState::BuildingFromEmpty { + index, + bytes: total_bytes, + .. + } => { + if let Some(previous) = index.remove(manifest_rsync_uri) { + *total_bytes = total_bytes.saturating_sub(previous.len()); + } + } + PublicationPointCacheProjectionIndexState::LoadedMmap { .. } => { + *guard = PublicationPointCacheProjectionIndexState::Disabled; + invalidate_mmap_files = true; + } + PublicationPointCacheProjectionIndexState::Uninitialized + | PublicationPointCacheProjectionIndexState::Disabled => {} + } + } + + if invalidate_mmap_files { + self.remove_publication_point_cache_mmap_index_files()?; + let mut guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!( + "publication point cache index lock poisoned: {e}" + )) + })?; + if matches!(*guard, PublicationPointCacheProjectionIndexState::Disabled) { + *guard = PublicationPointCacheProjectionIndexState::Uninitialized; + } + } + Ok(()) + } + + fn remove_publication_point_cache_mmap_index_files(&self) -> StorageResult<()> { + let dir = &self.publication_point_cache_index_dir; + if !dir.exists() { + return Ok(()); + } + for entry in std::fs::read_dir(dir).map_err(|e| StorageError::RocksDb(e.to_string()))? { + let entry = entry.map_err(|e| StorageError::RocksDb(e.to_string()))?; + let path = entry.path(); + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + if file_name == "current.idx" + || (file_name.starts_with("segment-") && file_name.ends_with(".idx")) + { + std::fs::remove_file(&path).map_err(|e| StorageError::RocksDb(e.to_string()))?; + } + } + Ok(()) + } + pub fn refresh_publication_point_cache_mmap_index( &self, ) -> StorageResult> { diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index 1f2d57f..dce4d74 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -26,11 +26,11 @@ use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::report::{RfcRef, Warning}; use crate::storage::{ PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheOutput, - PublicationPointCacheProjection, RawByHashEntry, RoaCacheProjectionContext, RocksStore, - ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, - VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, - VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown, - VcirSourceObjectType, VcirSummary, + PublicationPointCacheProjection, PublicationPointCacheProjectionWriteAction, RawByHashEntry, + RoaCacheProjectionContext, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, + VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, + VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, + VcirRelatedArtifact, VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary, }; use crate::sync::repo::{ sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta, @@ -90,6 +90,7 @@ pub(crate) struct PersistVcirTimingBreakdown { pub(crate) embedded_store_ms: u64, pub(crate) build_vcir_ms: u64, pub(crate) replace_vcir_ms: u64, + pub(crate) publication_point_cache_future_notbefore_guarded: bool, pub(crate) build_vcir: BuildVcirTimingBreakdown, pub(crate) replace_vcir: VcirReplaceTimingBreakdown, } @@ -1021,6 +1022,9 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { .related_artifacts_ms .saturating_mul(1_000_000), ); + if persist_vcir_timing.publication_point_cache_future_notbefore_guarded { + timing.record_count("publication_point_cache_future_notbefore_guarded", 1); + } } self.record_publication_point_step_ms( &ca.manifest_rsync_uri, @@ -1585,6 +1589,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { "persist_related_artifacts_ms": persist_vcir_timing.build_vcir.related_artifacts_ms, "persist_vcir_struct_ms": persist_vcir_timing.build_vcir.struct_build_ms, "persist_replace_breakdown": &persist_vcir_timing.replace_vcir, + "publication_point_cache_future_notbefore_guarded": persist_vcir_timing.publication_point_cache_future_notbefore_guarded, "ccr_projection_build_ms": ccr_projection_build_ms, "ccr_append_ms": ccr_append_ms, "audit_build_ms": audit_build_ms, @@ -1630,6 +1635,7 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { "persist_related_artifacts_ms": persist_vcir_timing.build_vcir.related_artifacts_ms, "persist_vcir_struct_ms": persist_vcir_timing.build_vcir.struct_build_ms, "persist_replace_breakdown": &persist_vcir_timing.replace_vcir, + "publication_point_cache_future_notbefore_guarded": persist_vcir_timing.publication_point_cache_future_notbefore_guarded, "ccr_projection_build_ms": ccr_projection_build_ms, "ccr_append_ms": ccr_append_ms, "audit_build_ms": audit_build_ms, @@ -4181,22 +4187,45 @@ fn persist_vcir_for_fresh_result_with_timing( timing.build_vcir = build_vcir_timing; let replace_vcir_started = std::time::Instant::now(); - let publication_point_cache_projection = if write_publication_point_cache_projection { - Some(build_publication_point_cache_projection_from_fresh( - policy, ca, pack, &vcir, - )?) + let future_not_before_cache_guard = write_publication_point_cache_projection + && publication_point_cache_has_future_not_before_risk( + pack, + objects, + child_audits, + validation_time, + policy, + ); + timing.publication_point_cache_future_notbefore_guarded = future_not_before_cache_guard; + let publication_point_cache_projection = + if write_publication_point_cache_projection && !future_not_before_cache_guard { + Some(build_publication_point_cache_projection_from_fresh( + policy, ca, pack, &vcir, + )?) + } else { + None + }; + let publication_point_cache_projection_action = if !write_publication_point_cache_projection { + PublicationPointCacheProjectionWriteAction::Keep + } else if future_not_before_cache_guard { + PublicationPointCacheProjectionWriteAction::Delete { + manifest_rsync_uri: &vcir.manifest_rsync_uri, + } } else { - None + PublicationPointCacheProjectionWriteAction::Write( + publication_point_cache_projection + .as_ref() + .expect("publication point projection must exist when guard is not active"), + ) }; let replace_timing = store - .replace_vcir_manifest_replay_meta_and_projections( + .replace_vcir_manifest_replay_meta_and_projection_action( &vcir, Some(&RoaCacheProjectionContext { parent_context_digest: parent_context_digest_for_ca(ca), policy_fingerprint: publication_point_cache_policy_fingerprint(policy), object_meta: objects.roa_cache_object_meta.clone(), }), - publication_point_cache_projection.as_ref(), + publication_point_cache_projection_action, ) .map_err(|e| format!("store VCIR and manifest replay meta failed: {e}"))?; timing.replace_vcir_ms = replace_vcir_started.elapsed().as_millis() as u64; @@ -4226,6 +4255,108 @@ fn build_publication_point_cache_projection_from_fresh( .map_err(|e| e.to_string()) } +fn publication_point_cache_has_future_not_before_risk( + pack: &PublicationPointSnapshot, + objects: &crate::validation::objects::ObjectsOutput, + child_audits: &[ObjectAuditEntry], + validation_time: time::OffsetDateTime, + policy: &Policy, +) -> bool { + let mut files_by_uri: HashMap<&str, &PackFile> = HashMap::new(); + for file in &pack.files { + files_by_uri.insert(file.rsync_uri.as_str(), file); + } + + objects + .audit + .iter() + .chain(child_audits.iter()) + .any(|entry| { + audit_entry_has_certificate_time_error(entry) + && files_by_uri + .get(entry.rsync_uri.as_str()) + .map(|file| { + audit_entry_has_future_not_before(entry, file, validation_time, policy) + }) + .unwrap_or(true) + }) +} + +fn audit_entry_has_certificate_time_error(entry: &ObjectAuditEntry) -> bool { + entry.result == AuditObjectResult::Error + && entry + .detail + .as_deref() + .is_some_and(|detail| detail.contains("certificate not valid at validation_time")) +} + +fn audit_entry_has_future_not_before( + entry: &ObjectAuditEntry, + file: &PackFile, + validation_time: time::OffsetDateTime, + policy: &Policy, +) -> bool { + match entry.kind { + AuditObjectKind::Roa => signed_object_ee_not_before(file, policy, SignedObjectKind::Roa) + .map(|not_before| validation_time < not_before) + .unwrap_or(true), + AuditObjectKind::Aspa => signed_object_ee_not_before(file, policy, SignedObjectKind::Aspa) + .map(|not_before| validation_time < not_before) + .unwrap_or(true), + AuditObjectKind::Certificate | AuditObjectKind::RouterCertificate => file + .bytes() + .ok() + .and_then(|bytes| ResourceCertificate::decode_der(bytes).ok()) + .map(|cert| validation_time < cert.tbs.validity_not_before) + .unwrap_or(true), + _ => true, + } +} + +#[derive(Clone, Copy)] +enum SignedObjectKind { + Roa, + Aspa, +} + +fn signed_object_ee_not_before( + file: &PackFile, + policy: &Policy, + kind: SignedObjectKind, +) -> Option { + let bytes = file.bytes().ok()?; + match kind { + SignedObjectKind::Roa => { + let object = RoaObject::decode_der_with_strict_options( + bytes, + policy.strict.cms_der, + policy.strict.name, + ) + .ok()?; + Some( + object.signed_object.signed_data.certificates[0] + .resource_cert + .tbs + .validity_not_before, + ) + } + SignedObjectKind::Aspa => { + let object = AspaObject::decode_der_with_strict_options( + bytes, + policy.strict.cms_der, + policy.strict.name, + ) + .ok()?; + Some( + object.signed_object.signed_data.certificates[0] + .resource_cert + .tbs + .validity_not_before, + ) + } + } +} + fn build_vcir_from_fresh_result_with_timing( ca: &CaInstanceHandle, pack: &PublicationPointSnapshot, diff --git a/src/validation/tree_runner/tests.rs b/src/validation/tree_runner/tests.rs index 9e36b04..22037de 100644 --- a/src/validation/tree_runner/tests.rs +++ b/src/validation/tree_runner/tests.rs @@ -4,7 +4,8 @@ use crate::data_model::roa::RoaAfi; use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::{RsyncFetchError, RsyncFetcher}; use crate::storage::{ - PackFile, PackTime, PublicationPointCacheProjection, RawByHashEntry, RepositoryViewEntry, + PackFile, PackTime, PublicationPointCacheProjection, + PublicationPointCacheProjectionWriteAction, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore, ValidatedCaInstanceResult, ValidatedManifestMeta, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, @@ -1914,6 +1915,89 @@ fn seed_publication_point_cache_projection( vcir } +#[test] +fn publication_point_cache_future_notbefore_guard_detects_future_roa_notbefore_only() { + let uri = "rsync://example.test/repo/issuer/future.roa"; + let bytes = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/repository/rpki.cernet.net/repo/cernet/0/AS4538.roa"), + ) + .expect("read ROA fixture"); + let roa = RoaObject::decode_der(&bytes).expect("decode ROA fixture"); + let ee = &roa.signed_object.signed_data.certificates[0].resource_cert; + let file = PackFile::from_bytes_compute_sha256(uri, bytes); + let pack = dummy_pack_with_files(vec![file]); + let objects = crate::validation::objects::ObjectsOutput { + vrps: Vec::new(), + aspas: Vec::new(), + router_keys: Vec::new(), + local_outputs_cache: Vec::new(), + warnings: Vec::new(), + stats: Default::default(), + audit: vec![ObjectAuditEntry { + rsync_uri: uri.to_string(), + sha256_hex: sha256_hex_from_32(&pack.files[0].sha256), + kind: AuditObjectKind::Roa, + result: AuditObjectResult::Error, + detail: Some( + "EE certificate path validation failed: certificate not valid at validation_time" + .to_string(), + ), + }], + roa_cache_stats: Default::default(), + roa_cache_object_meta: Vec::new(), + }; + + assert!(publication_point_cache_has_future_not_before_risk( + &pack, + &objects, + &[], + ee.tbs.validity_not_before - time::Duration::seconds(1), + &Policy::default(), + )); + assert!(!publication_point_cache_has_future_not_before_risk( + &pack, + &objects, + &[], + ee.tbs.validity_not_after + time::Duration::seconds(1), + &Policy::default(), + )); +} + +#[test] +fn publication_point_cache_delete_action_removes_existing_projection() { + let store_dir = tempfile::tempdir().expect("store dir"); + let store = RocksStore::open(store_dir.path()).expect("open rocksdb"); + let policy = Policy::default(); + let validation_time = time::OffsetDateTime::UNIX_EPOCH + time::Duration::minutes(1); + let ca = publication_point_cache_fixture_ca(); + let vcir = seed_publication_point_cache_projection(&store, &policy, &ca, validation_time); + + assert!( + store + .get_publication_point_cache_projection_cached(&ca.manifest_rsync_uri) + .expect("load projection") + .is_some() + ); + + store + .replace_vcir_manifest_replay_meta_and_projection_action( + &vcir, + None, + PublicationPointCacheProjectionWriteAction::Delete { + manifest_rsync_uri: &vcir.manifest_rsync_uri, + }, + ) + .expect("delete projection"); + + assert!( + store + .get_publication_point_cache_projection_cached(&ca.manifest_rsync_uri) + .expect("load projection after delete") + .is_none() + ); +} + fn publication_point_cache_fixture_ca() -> CaInstanceHandle { CaInstanceHandle { depth: 1,