rpki/scripts/soak/run_cache_ablation_experiment.sh

505 lines
19 KiB
Bash
Executable File

#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PACKAGE_ROOT="${PACKAGE_ROOT:-$(cd "$SCRIPT_DIR/../.." && pwd)}"
RUN_SOAK_SCRIPT="${RUN_SOAK_SCRIPT:-$PACKAGE_ROOT/run_soak.sh}"
if [[ ! -x "$RUN_SOAK_SCRIPT" && -x "$SCRIPT_DIR/run_soak.sh" ]]; then
RUN_SOAK_SCRIPT="$SCRIPT_DIR/run_soak.sh"
fi
if [[ ! -x "$RUN_SOAK_SCRIPT" && -x "$SCRIPT_DIR/../../run_soak.sh" ]]; then
RUN_SOAK_SCRIPT="$SCRIPT_DIR/../../run_soak.sh"
fi
ENV_FILE="${ENV_FILE:-$PACKAGE_ROOT/.env}"
EXPERIMENT_RUN_ROOT="${EXPERIMENT_RUN_ROOT:-$PACKAGE_ROOT}"
EXPERIMENT_DIR="${EXPERIMENT_DIR:-$EXPERIMENT_RUN_ROOT/experiments/cache-ablation-$(date -u +%Y%m%dT%H%M%SZ)}"
CASE_RUNS="${CASE_RUNS:-10}"
RUN_START_INTERVAL_SECS="${RUN_START_INTERVAL_SECS:-600}"
FIRST_RUN_DELAY_SECS="${FIRST_RUN_DELAY_SECS:-0}"
SNAPSHOT_EXTRA_ARGS="${SNAPSHOT_EXTRA_ARGS:-}"
EXPERIMENT_CASE_SET="${EXPERIMENT_CASE_SET:-default}"
DRY_RUN="${DRY_RUN:-0}"
RUN_SNAPSHOT="${RUN_SNAPSHOT:-1}"
BASE_RETAIN_RUNS="${RETAIN_RUNS:-100}"
usage() {
cat <<'USAGE'
Usage:
run_cache_ablation_experiment.sh [--dry-run] [--experiment-dir <path>]
Runs a fixed-cadence cache ablation experiment:
1 snapshot warmup, then selected cases x CASE_RUNS delta runs.
Environment:
PACKAGE_ROOT portable package root
RUN_SOAK_SCRIPT path to run_soak.sh
ENV_FILE base .env for run_soak.sh
EXPERIMENT_RUN_ROOT shared run root/state root; default PACKAGE_ROOT
EXPERIMENT_DIR experiment metadata output directory
CASE_RUNS delta runs per case; default 10
EXPERIMENT_CASE_SET default or cache-only; default runs the original 4-case matrix
RUN_START_INTERVAL_SECS fixed start cadence for all runs; default 600
FIRST_RUN_DELAY_SECS delay before the first scheduled run; default 0
SNAPSHOT_EXTRA_ARGS extra rpki args for snapshot warmup
DRY_RUN=1 print plan without executing run_soak.sh
RUN_SNAPSHOT=0 skip snapshot warmup, useful when continuing a prepared state
USAGE
}
die() {
echo "error: $*" >&2
exit 2
}
is_true() {
case "${1:-}" in
1|true|TRUE|yes|YES|on|ON) return 0 ;;
*) return 1 ;;
esac
}
validate_non_negative_int() {
local name="$1"
local value="$2"
[[ "$value" =~ ^[0-9]+$ ]] || die "$name must be a non-negative integer: $value"
}
validate_positive_int() {
local name="$1"
local value="$2"
[[ "$value" =~ ^[0-9]+$ ]] || die "$name must be a positive integer: $value"
[[ "$value" != "0" ]] || die "$name must be > 0"
}
shell_quote() {
printf '%q' "$1"
}
append_env_assignment() {
local env_path="$1"
local name="$2"
local value="$3"
printf '%s=%s\n' "$name" "$(shell_quote "$value")" >> "$env_path"
}
timestamp_utc() {
date -u +%Y-%m-%dT%H:%M:%SZ
}
format_epoch_utc() {
date -u -d "@$1" +%Y-%m-%dT%H:%M:%SZ
}
case_count() {
case "$EXPERIMENT_CASE_SET" in
default) printf '%s' 4 ;;
cache-only) printf '%s' 3 ;;
*) die "EXPERIMENT_CASE_SET must be default or cache-only: $EXPERIMENT_CASE_SET" ;;
esac
}
case_id_for_index() {
case "$EXPERIMENT_CASE_SET:$1" in
default:1) printf '%s' "case1" ;;
default:2) printf '%s' "case2" ;;
default:3) printf '%s' "case3" ;;
default:4) printf '%s' "case4" ;;
cache-only:1) printf '%s' "pp-only" ;;
cache-only:2) printf '%s' "object-only" ;;
cache-only:3) printf '%s' "pp-object-only" ;;
*) die "unknown case index: $1 for set $EXPERIMENT_CASE_SET" ;;
esac
}
case_name_for_index() {
case "$EXPERIMENT_CASE_SET:$1" in
default:1) printf '%s' "all-cache-off" ;;
default:2) printf '%s' "prefetch-only" ;;
default:3) printf '%s' "prefetch-pp-cache" ;;
default:4) printf '%s' "full-cache" ;;
cache-only:1) printf '%s' "pp-cache-only" ;;
cache-only:2) printf '%s' "object-cache-only" ;;
cache-only:3) printf '%s' "pp-cache-object-cache-only" ;;
*) die "unknown case index: $1 for set $EXPERIMENT_CASE_SET" ;;
esac
}
case_extra_args_for_index() {
case "$EXPERIMENT_CASE_SET:$1" in
default:1) printf '%s' "" ;;
default:2) printf '%s' "--enable-transport-request-prefetch" ;;
default:3) printf '%s' "--enable-transport-request-prefetch --enable-publication-point-validation-cache" ;;
default:4) printf '%s' "--enable-transport-request-prefetch --enable-publication-point-validation-cache --enable-roa-validation-cache" ;;
cache-only:1) printf '%s' "--enable-publication-point-validation-cache" ;;
cache-only:2) printf '%s' "--enable-roa-validation-cache" ;;
cache-only:3) printf '%s' "--enable-publication-point-validation-cache --enable-roa-validation-cache" ;;
*) die "unknown case index: $1 for set $EXPERIMENT_CASE_SET" ;;
esac
}
case_child_cert_cache_for_index() {
case "$EXPERIMENT_CASE_SET:$1" in
default:4|cache-only:2|cache-only:3) printf '%s' "1" ;;
default:1|default:2|default:3|cache-only:1) printf '%s' "0" ;;
*) die "unknown case index: $1 for set $EXPERIMENT_CASE_SET" ;;
esac
}
write_cases_json() {
python3 - "$EXPERIMENT_CASE_SET" <<'PY'
import json, sys
case_set = sys.argv[1]
if case_set == "default":
cases = [
{"caseId": "case1", "caseName": "all-cache-off", "extraArgs": "", "enableChildCertificateValidationCache": False},
{"caseId": "case2", "caseName": "prefetch-only", "extraArgs": "--enable-transport-request-prefetch", "enableChildCertificateValidationCache": False},
{"caseId": "case3", "caseName": "prefetch-pp-cache", "extraArgs": "--enable-transport-request-prefetch --enable-publication-point-validation-cache", "enableChildCertificateValidationCache": False},
{"caseId": "case4", "caseName": "full-cache", "extraArgs": "--enable-transport-request-prefetch --enable-publication-point-validation-cache --enable-roa-validation-cache", "enableChildCertificateValidationCache": True},
]
elif case_set == "cache-only":
cases = [
{"caseId": "pp-only", "caseName": "pp-cache-only", "extraArgs": "--enable-publication-point-validation-cache", "enableChildCertificateValidationCache": False},
{"caseId": "object-only", "caseName": "object-cache-only", "extraArgs": "--enable-roa-validation-cache", "enableChildCertificateValidationCache": True},
{"caseId": "pp-object-only", "caseName": "pp-cache-object-cache-only", "extraArgs": "--enable-publication-point-validation-cache --enable-roa-validation-cache", "enableChildCertificateValidationCache": True},
]
else:
raise SystemExit(f"unknown case set: {case_set}")
print(json.dumps(cases, ensure_ascii=False, indent=8))
PY
}
max_existing_run_index() {
local runs_root="$EXPERIMENT_RUN_ROOT/runs"
local max_index
if [[ ! -d "$runs_root" ]]; then
printf '%s\n' 0
return 0
fi
max_index="$(find "$runs_root" -maxdepth 1 -type d -name 'run_[0-9][0-9][0-9][0-9]*' -printf '%f\n' \
| sed -E 's/^run_0*([0-9]+)$/\1/' \
| sort -n \
| tail -1 \
| awk '{print $1 + 0}')"
printf '%s\n' "${max_index:-0}"
}
write_config() {
local path="$1"
local git_sha
local git_dirty
local git_dirty_py
git_sha="$(git -C "$PACKAGE_ROOT" rev-parse --short HEAD 2>/dev/null || printf 'unknown')"
if [[ -n "$(git -C "$PACKAGE_ROOT" status --short 2>/dev/null || true)" ]]; then
git_dirty=true
git_dirty_py=True
else
git_dirty=false
git_dirty_py=False
fi
python3 - "$path" <<PY
import json, os, socket, sys
path = sys.argv[1]
config = {
"createdAtRfc3339Utc": "$(timestamp_utc)",
"packageRoot": os.environ.get("PACKAGE_ROOT", "$PACKAGE_ROOT"),
"runSoakScript": os.environ.get("RUN_SOAK_SCRIPT", "$RUN_SOAK_SCRIPT"),
"envFile": os.environ.get("ENV_FILE", "$ENV_FILE"),
"experimentRunRoot": os.environ.get("EXPERIMENT_RUN_ROOT", "$EXPERIMENT_RUN_ROOT"),
"experimentDir": os.environ.get("EXPERIMENT_DIR", "$EXPERIMENT_DIR"),
"caseRuns": int(os.environ.get("CASE_RUNS", "$CASE_RUNS")),
"caseSet": os.environ.get("EXPERIMENT_CASE_SET", "$EXPERIMENT_CASE_SET"),
"runStartIntervalSecs": int(os.environ.get("RUN_START_INTERVAL_SECS", "$RUN_START_INTERVAL_SECS")),
"firstRunDelaySecs": int(os.environ.get("FIRST_RUN_DELAY_SECS", "$FIRST_RUN_DELAY_SECS")),
"runSnapshot": os.environ.get("RUN_SNAPSHOT", "$RUN_SNAPSHOT"),
"snapshotExtraArgs": os.environ.get("SNAPSHOT_EXTRA_ARGS", "$SNAPSHOT_EXTRA_ARGS"),
"gitSha": "$git_sha",
"gitDirty": $git_dirty_py,
"host": socket.gethostname(),
"cases": json.loads(r'''$(write_cases_json)'''),
}
with open(path, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
f.write("\\n")
PY
}
write_effective_env() {
local event="$1"
local case_id="$2"
local case_run_index="$3"
local extra_args="$4"
local child_cert_cache="$5"
local env_dir="$EXPERIMENT_DIR/effective-env"
local env_path="$env_dir/${event}-${case_id}-${case_run_index}.env"
mkdir -p "$env_dir"
if [[ -f "$ENV_FILE" ]]; then
cp "$ENV_FILE" "$env_path"
else
: > "$env_path"
fi
{
printf '\n# cache ablation experiment overrides generated at %s\n' "$(timestamp_utc)"
} >> "$env_path"
append_env_assignment "$env_path" "RUN_ROOT" "$EXPERIMENT_RUN_ROOT"
append_env_assignment "$env_path" "MAX_RUNS" "1"
append_env_assignment "$env_path" "INTERVAL_SECS" "0"
append_env_assignment "$env_path" "RETAIN_RUNS" "$BASE_RETAIN_RUNS"
append_env_assignment "$env_path" "RPKI_EXTRA_ARGS" "$extra_args"
append_env_assignment "$env_path" "ENABLE_CHILD_CERTIFICATE_VALIDATION_CACHE" "$child_cert_cache"
printf '%s\n' "$env_path"
}
extract_summary() {
local event="$1"
local case_id="$2"
local case_name="$3"
local case_run_index="$4"
local planned_epoch="$5"
local actual_epoch="$6"
local completed_epoch="$7"
local schedule_lag_ms="$8"
local extra_args="$9"
local child_cert_cache="${10}"
local max_index_before="${11}"
local max_index_after="${12}"
local run_dir="$EXPERIMENT_RUN_ROOT/runs/$(printf 'run_%04d' "$max_index_after")"
local summary_path="$run_dir/run-summary.json"
local meta_path="$run_dir/run-meta.json"
python3 - "$event" "$case_id" "$case_name" "$case_run_index" "$planned_epoch" "$actual_epoch" \
"$completed_epoch" "$schedule_lag_ms" "$extra_args" "$child_cert_cache" "$max_index_before" \
"$max_index_after" "$summary_path" "$meta_path" <<'PY'
import json, sys
(
event, case_id, case_name, case_run_index, planned_epoch, actual_epoch,
completed_epoch, schedule_lag_ms, extra_args, child_cert_cache,
max_index_before, max_index_after, summary_path, meta_path,
) = sys.argv[1:]
def ts(epoch):
import datetime
if int(epoch) <= 0:
return None
return datetime.datetime.fromtimestamp(int(epoch), datetime.timezone.utc).isoformat().replace("+00:00", "Z")
record = {
"event": event,
"caseId": case_id,
"caseName": case_name,
"caseRunIndex": int(case_run_index),
"plannedStartEpoch": int(planned_epoch),
"plannedStartRfc3339Utc": ts(planned_epoch),
"actualStartEpoch": int(actual_epoch),
"actualStartRfc3339Utc": ts(actual_epoch),
"completedEpoch": int(completed_epoch),
"completedRfc3339Utc": ts(completed_epoch),
"scheduleLagMs": int(schedule_lag_ms),
"extraArgs": extra_args,
"enableChildCertificateValidationCache": child_cert_cache == "1",
"maxRunIndexBefore": int(max_index_before),
"maxRunIndexAfter": int(max_index_after),
"summaryPath": summary_path,
"metaPath": meta_path,
}
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
except Exception as exc:
record["metaError"] = str(exc)
else:
record["syncMode"] = meta.get("syncMode") or meta.get("sync_mode")
record["snapshotReason"] = meta.get("snapshotReason") or meta.get("snapshot_reason")
record["runMetaStatus"] = meta.get("status")
try:
with open(summary_path, "r", encoding="utf-8") as f:
summary = json.load(f)
except Exception as exc:
record["summaryError"] = str(exc)
else:
record["status"] = summary.get("status")
record["runId"] = summary.get("runId")
record["runSeq"] = summary.get("runSeq")
counts = summary.get("reportCounts") or {}
record["wallMs"] = summary.get("wallMs")
record["vrps"] = counts.get("vrps")
record["vaps"] = counts.get("aspas")
record["publicationPoints"] = counts.get("publicationPoints")
record["warnings"] = counts.get("warnings")
metrics = summary.get("processMetrics") or {}
record["maxRssKb"] = metrics.get("maxRssKb")
record["cpuPercent"] = metrics.get("cpuPercent")
stage = summary.get("stageTiming") or {}
record["stageTimingMs"] = {
k: v
for k, v in stage.items()
if isinstance(v, (int, float)) and "_ms" in k
}
for key in [
"download_bytes_total",
"download_event_count",
"enable_transport_request_prefetch",
"enable_publication_point_validation_cache",
"enable_roa_validation_cache",
"enable_child_certificate_validation_cache",
"publication_point_cache_index_load",
"publication_point_cache_index_refresh",
"roa_validation_cache",
]:
if key in stage:
record[key] = stage.get(key)
analysis_counts = stage.get("analysis_counts") or {}
interesting = [
"publication_point_cache_lookup_total",
"publication_point_cache_reuse_hits",
"publication_point_cache_miss_total",
"roa_validation_cache_hit_roas",
"roa_validation_cache_miss_roas",
"child_certificate_cache_hit",
"child_certificate_cache_lookup",
"child_certificate_cache_miss_not_found",
"fresh_publication_points",
"fresh_manifest_files_total",
]
record["cacheCounts"] = {k: analysis_counts.get(k) for k in interesting if k in analysis_counts}
record["repoSyncStats"] = summary.get("repoSyncStats")
print(json.dumps(record, ensure_ascii=False, sort_keys=True))
PY
}
run_soak_once() {
local event="$1"
local case_id="$2"
local case_name="$3"
local case_run_index="$4"
local planned_epoch="$5"
local extra_args="$6"
local child_cert_cache="$7"
local max_index_before
local max_index_after
local actual_epoch
local completed_epoch
local schedule_lag_ms
local effective_env
max_index_before="$(max_existing_run_index)"
actual_epoch="$(date +%s)"
if (( actual_epoch > planned_epoch )); then
schedule_lag_ms=$(( (actual_epoch - planned_epoch) * 1000 ))
else
schedule_lag_ms=0
fi
echo "[$(timestamp_utc)] start event=$event case=$case_id run=$case_run_index planned=$(format_epoch_utc "$planned_epoch") lag_ms=$schedule_lag_ms args='$extra_args'" >&2
if is_true "$DRY_RUN"; then
completed_epoch="$(date +%s)"
max_index_after="$max_index_before"
python3 - "$event" "$case_id" "$case_name" "$case_run_index" "$planned_epoch" "$actual_epoch" "$completed_epoch" "$schedule_lag_ms" "$extra_args" "$child_cert_cache" "$max_index_before" "$max_index_after" <<'PY'
import json, sys
keys = ["event","caseId","caseName","caseRunIndex","plannedStartEpoch","actualStartEpoch","completedEpoch","scheduleLagMs","extraArgs","enableChildCertificateValidationCache","maxRunIndexBefore","maxRunIndexAfter"]
values = sys.argv[1:]
record = dict(zip(keys, values))
record["caseRunIndex"] = int(record["caseRunIndex"])
record["plannedStartEpoch"] = int(record["plannedStartEpoch"])
record["actualStartEpoch"] = int(record["actualStartEpoch"])
record["completedEpoch"] = int(record["completedEpoch"])
record["scheduleLagMs"] = int(record["scheduleLagMs"])
record["enableChildCertificateValidationCache"] = record["enableChildCertificateValidationCache"] == "1"
record["maxRunIndexBefore"] = int(record["maxRunIndexBefore"])
record["maxRunIndexAfter"] = int(record["maxRunIndexAfter"])
record["dryRun"] = True
print(json.dumps(record, ensure_ascii=False, sort_keys=True))
PY
return 0
fi
effective_env="$(write_effective_env "$event" "$case_id" "$case_run_index" "$extra_args" "$child_cert_cache")"
env \
PACKAGE_ROOT="$PACKAGE_ROOT" \
ENV_FILE="$effective_env" \
"$RUN_SOAK_SCRIPT" >&2
completed_epoch="$(date +%s)"
max_index_after="$(max_existing_run_index)"
extract_summary "$event" "$case_id" "$case_name" "$case_run_index" "$planned_epoch" \
"$actual_epoch" "$completed_epoch" "$schedule_lag_ms" "$extra_args" "$child_cert_cache" \
"$max_index_before" "$max_index_after"
}
while [[ $# -gt 0 ]]; do
case "$1" in
--dry-run)
DRY_RUN=1
;;
--experiment-dir)
shift
EXPERIMENT_DIR="${1:?--experiment-dir requires a value}"
;;
--help|-h)
usage
exit 0
;;
*)
die "unknown argument: $1"
;;
esac
shift
done
command -v python3 >/dev/null 2>&1 || die "python3 is required"
command -v date >/dev/null 2>&1 || die "date is required"
validate_positive_int "CASE_RUNS" "$CASE_RUNS"
validate_positive_int "RUN_START_INTERVAL_SECS" "$RUN_START_INTERVAL_SECS"
validate_non_negative_int "FIRST_RUN_DELAY_SECS" "$FIRST_RUN_DELAY_SECS"
validate_positive_int "BASE_RETAIN_RUNS" "$BASE_RETAIN_RUNS"
[[ -x "$RUN_SOAK_SCRIPT" ]] || die "missing executable run_soak.sh: $RUN_SOAK_SCRIPT"
mkdir -p "$EXPERIMENT_DIR" "$EXPERIMENT_RUN_ROOT"
SUMMARY_JSONL="$EXPERIMENT_DIR/experiment-summary.jsonl"
CONFIG_JSON="$EXPERIMENT_DIR/experiment-config.json"
: > "$SUMMARY_JSONL"
write_config "$CONFIG_JSON"
echo "experiment_dir=$EXPERIMENT_DIR"
echo "experiment_run_root=$EXPERIMENT_RUN_ROOT"
echo "run_soak_script=$RUN_SOAK_SCRIPT"
echo "case_set=$EXPERIMENT_CASE_SET case_runs=$CASE_RUNS run_start_interval_secs=$RUN_START_INTERVAL_SECS dry_run=$DRY_RUN"
first_run_epoch=$(( $(date +%s) + FIRST_RUN_DELAY_SECS ))
run_index_global=0
if is_true "$RUN_SNAPSHOT"; then
planned_epoch=$(( first_run_epoch + run_index_global * RUN_START_INTERVAL_SECS ))
now_epoch="$(date +%s)"
if (( now_epoch < planned_epoch )) && ! is_true "$DRY_RUN"; then
sleep_secs=$((planned_epoch - now_epoch))
echo "[$(timestamp_utc)] waiting ${sleep_secs}s for snapshot target=$(format_epoch_utc "$planned_epoch")"
sleep "$sleep_secs"
fi
run_soak_once "snapshot-warmup" "warmup" "snapshot-warmup" 1 "$planned_epoch" "$SNAPSHOT_EXTRA_ARGS" "0" \
| tee -a "$SUMMARY_JSONL"
run_index_global=$((run_index_global + 1))
fi
for case_index in $(seq 1 "$(case_count)"); do
case_id="$(case_id_for_index "$case_index")"
case_name="$(case_name_for_index "$case_index")"
extra_args="$(case_extra_args_for_index "$case_index")"
child_cert_cache="$(case_child_cert_cache_for_index "$case_index")"
for case_run_index in $(seq 1 "$CASE_RUNS"); do
planned_epoch=$(( first_run_epoch + run_index_global * RUN_START_INTERVAL_SECS ))
now_epoch="$(date +%s)"
if (( now_epoch < planned_epoch )) && ! is_true "$DRY_RUN"; then
sleep_secs=$((planned_epoch - now_epoch))
echo "[$(timestamp_utc)] waiting ${sleep_secs}s for $case_id run=$case_run_index target=$(format_epoch_utc "$planned_epoch")"
sleep "$sleep_secs"
fi
run_soak_once "delta" "$case_id" "$case_name" "$case_run_index" "$planned_epoch" "$extra_args" "$child_cert_cache" \
| tee -a "$SUMMARY_JSONL"
run_index_global=$((run_index_global + 1))
done
done
echo "[$(timestamp_utc)] cache ablation experiment complete summary=$SUMMARY_JSONL"