argus-cluster/src/mvp/scripts/run_all_api.sh

158 lines
4.8 KiB
Bash
Executable File

#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
# Run the end-to-end flow using the API service (argus.service).
# This script restarts the Ray cluster, prepares model/data, starts the API,
# then submits PPO/GRPO/SFT via HTTP API and monitors the queue.
API_ADDR="${API_ADDR:-http://127.0.0.1:8080}"
TOKEN="${MVP_INTERNAL_TOKEN:-}"
DB_IN_CONTAINER="${DB_IN_CONTAINER:-/private/common/db/mvp.sqlite3}"
EXPECTED_RAY_NODES="${EXPECTED_RAY_NODES:-3}" # head + 2 workers
if [[ -z "${TOKEN}" ]]; then
echo "ERROR: MVP_INTERNAL_TOKEN must be set in the host env for run_all_api.sh" >&2
exit 1
fi
api_curl() {
curl -sS -H "Authorization: Bearer ${TOKEN}" "$@"
}
api_wait_ready() {
local tries="${1:-60}"
for i in $(seq 1 "${tries}"); do
if curl -sS -m 2 "${API_ADDR}/docs" >/dev/null 2>&1; then
echo "[host] api_ready: ${API_ADDR}"
return 0
fi
echo "[host] waiting api... (${i}/${tries})"
sleep 2
done
echo "ERROR: api not ready: ${API_ADDR}" >&2
return 1
}
ray_wait_ready() {
local tries="${1:-60}"
for i in $(seq 1 "${tries}"); do
if curl -sS -m 2 "${RAY_DASHBOARD_ADDR}/api/version" >/dev/null 2>&1; then
echo "[host] ray_dashboard_ready: ${RAY_DASHBOARD_ADDR}"
return 0
fi
echo "[host] waiting ray dashboard... (${i}/${tries})"
sleep 2
done
echo "ERROR: ray dashboard not ready: ${RAY_DASHBOARD_ADDR}" >&2
return 1
}
ray_wait_nodes() {
local want="${1:-3}"
local tries="${2:-60}"
for i in $(seq 1 "${tries}"); do
local out n
out="$(docker exec -i "${HEAD_CONTAINER}" python3 -c "import ray; ray.init(address='auto', ignore_reinit_error=True, log_to_driver=False, logging_level='ERROR'); print(sum(1 for n in ray.nodes() if n.get('Alive')))" 2>/dev/null || true)"
n="$(printf '%s\n' "${out}" | tail -n 1 | tr -cd '0-9' || true)"
if [[ "${n}" =~ ^[0-9]+$ ]]; then
echo "[host] ray_nodes_alive=${n} (want>=${want})"
if [[ "${n}" -ge "${want}" ]]; then
return 0
fi
else
echo "[host] waiting ray nodes... (${i}/${tries})"
fi
sleep 2
done
echo "ERROR: ray nodes not ready (want>=${want})" >&2
docker exec -i "${HEAD_CONTAINER}" bash -lc "ray status || true" >&2 || true
return 1
}
submit_taskspec() {
local taskspec_path="$1"
echo "[host] submit via API: ${taskspec_path}" >&2
local resp
resp="$(api_curl -H "Content-Type: application/yaml" --data-binary @"${taskspec_path}" "${API_ADDR}/api/v2/tasks")"
echo "[host] submit_resp: ${resp}" >&2
printf '%s' "${resp}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["task_id"])'
}
print_queue() {
echo "[host] queue:"
api_curl "${API_ADDR}/api/v2/queue" || true
echo
}
wait_task() {
local task_id="$1"
while true; do
local body state
body="$(api_curl "${API_ADDR}/api/v2/tasks/${task_id}")"
state="$(printf '%s' "${body}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["state"])')"
echo "[host] task ${task_id}: ${state}"
if [[ "${state}" == "SUCCEEDED" ]]; then
return 0
fi
if [[ "${state}" == "FAILED" || "${state}" == "CANCELED" ]]; then
echo "[host] terminal=${state}; tail logs (best-effort):" >&2
api_curl "${API_ADDR}/api/v2/tasks/${task_id}/logs?tail=200" >&2 || true
return 1
fi
print_queue
sleep 10
done
}
echo "[host] ===== run_all_api.sh begin ====="
"${SCRIPT_DIR}/00_prereq_check.sh"
"${SCRIPT_DIR}/03_cleanup_v1_legacy.sh"
"${SCRIPT_DIR}/05_ensure_verl_repo.sh"
echo "[host] (re)create containers"
"${SCRIPT_DIR}/01_up.sh"
echo "[host] wait ray cluster ready (supervised in-container)"
ray_wait_ready 60
ray_wait_nodes "${EXPECTED_RAY_NODES}" 120
echo "[host] prepare data/model/code snapshot"
"${SCRIPT_DIR}/30_prepare_data_and_model.sh"
echo "[host] install api deps in head container"
"${SCRIPT_DIR}/12_install_api_deps.sh"
echo "[host] stop api (best-effort)"
"${SCRIPT_DIR}/61_stop_api.sh" || true
echo "[host] reset api sqlite db in container (best-effort): ${DB_IN_CONTAINER}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${DB_IN_CONTAINER}")\"; rm -f '${DB_IN_CONTAINER}' '${DB_IN_CONTAINER}-wal' '${DB_IN_CONTAINER}-shm' || true"
echo "[host] start api"
MVP_INTERNAL_TOKEN="${TOKEN}" "${SCRIPT_DIR}/60_start_api.sh"
api_wait_ready 60
print_queue
PPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/ppo.yaml")"
GRPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/grpo.yaml")"
SFT_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/sft.yaml")"
echo "[host] submitted task ids:"
echo " ppo=${PPO_TASK_ID}"
echo " grpo=${GRPO_TASK_ID}"
echo " sft=${SFT_TASK_ID}"
echo "[host] wait for tasks (in submission order)"
wait_task "${PPO_TASK_ID}"
wait_task "${GRPO_TASK_ID}"
wait_task "${SFT_TASK_ID}"
echo "[host] ===== run_all_api.sh done ====="