v2.5 通过镜像固化stateless ray pool

This commit is contained in:
yuyr 2025-12-29 15:54:13 +08:00
parent f02059126e
commit c06f61ed03
14 changed files with 560 additions and 96 deletions

View File

@ -12,3 +12,4 @@ v2.5 的核心变化:
- `specs/mvp/v2.5/v2.5_api.md`API 设计(用户、任务、队列、日志)与鉴权约定。
- `specs/mvp/v2.5/v2.5_acceptance.md`:开发/部署/验收流程与可验证标准。
- `specs/mvp/v2.5/v2.5_summary.md`v2.5 已实现内容总结(本次迭代做了什么、验收结果、已知限制)。
- `specs/mvp/v2.5/v2.5_container_design.md`:将 stateless pool 固化到单镜像head/worker 复用 + supervisor 守护)的设计与验证流程。

View File

@ -0,0 +1,202 @@
# MVP v2.5 — Stateless Ray Node Pool 容器固化设计
目标:把 v2.5 的 **stateless poolhead discovery + worker watchdog** 能力固化到一个可复用镜像中,避免依赖宿主机脚本在容器内 `docker exec` 启动/守护进程。**同一个镜像同时供 head/worker 复用**,通过环境变量区分角色。
约束:**API server 代码与镜像解耦**,短期仍按现状“宿主机代码挂载到 head 容器,在 head 容器内启动 API”不把 API 代码打进本镜像。
---
## 1. 背景(现状与痛点)
当前 `src/mvp/docker-compose.yaml` 里 head/worker 都基于 `verlai/verl:sgl055.latest`,容器启动后 `command: sleep infinity`,再由宿主机脚本完成:
- head`ray start --head ...` + `head_publisher`(写 `head.json`
- worker`worker_watchdog`(读取 `head.json`,自动加入/重连 ray 集群)
现状问题:
- 启动流程依赖宿主脚本 `docker exec`,易受权限/路径/人为操作影响;
- “守护”目前是 bash while-loop出现异常时排障成本高
- 未来生产环境容器可能由算力平台拉起,我们只能 SSH 纳管,更需要把“自启动 + 自愈”放到容器内部。
---
## 2. v2.5 容器固化目标与非目标
### 2.1 目标
- **一个镜像复用**head/worker 统一镜像,通过 `ARGUS_ROLE=head|worker` 区分。
- **supervisor 守护**:无论 head/worker都使用 `supervisord` 守护关键进程:
- watchdog 崩溃 → supervisor 自动重启 watchdog
- ray 节点崩溃 → watchdog/或 supervisor 触发自动恢复(见 3.2 进程模型)
- **与共享存储对齐**:容器内统一挂载根路径 `/private`discovery 文件写到共享存储。
- **最小内置代码**:镜像只内置 stateless pool 相关 python 脚本discovery/publisher/watchdog/entrypoint不把 API 服务代码打进镜像。
- **远端构建**:镜像构建必须在开发/运行机器(例如 `argus@h1`)上完成,本机不要求具备 `verlai/verl:*` 基础镜像。
### 2.2 非目标(本迭代不做)
- 不把 API server 打包进本镜像(后续可做单独 `argus-api` 镜像)。
- 不改变 v2.5 TaskSpec 约束(仍使用 `/private/common/...` 公共资源;用户隔离只隔离 jobs
- 不在本迭代引入 K8s/operator/autoscaler只固化容器自启动/自愈。
---
## 3. 设计方案
### 3.1 单镜像架构概览
新增一个镜像(示例名):
- `argus/argus-ray-node:v2.5`
该镜像:
- `FROM verlai/verl:sgl055.latest`(通过 build-arg 可切换 base
- 内置:
- `argus_raypool`(或复用现有 `argus.ray.*` 子集)脚本:
- `discovery.py`head record 读写head.json
- `head_publisher.py`head 写入 head.json带 TTL/刷新)
- `worker_watchdog.py`worker 读取 head.json自动加入/重连
- (可选)`head_watchdog.py`:把 “ray head + publisher” 组装成一个可恢复的 watchdog
- `/usr/local/bin/argus-entrypoint.sh`:根据 role 生成 supervisor 配置并启动 supervisor
- supervisor 配置模板(或运行时生成)
### 3.2 进程模型确保“ray 崩/ watchdog 崩都能恢复”)
用户新增要求head/worker 均要 supervisor 守护 watchdogray 节点崩溃或 watchdog 崩溃都要自动恢复。
推荐进程组织(避免 “ray start” 后台化导致 supervisor 无法感知):
#### A) Head 容器ARGUS_ROLE=head
由 supervisor 启动 **两个 program**
1) `argus_head_watchdog`(推荐实现为 python 或 bash内部用 `ray start --head --block` 前台运行)
- 关键点:`ray start --head --block` 让 Ray 进程前台阻塞watchdog 作为父进程能感知退出码
- ray 崩 → `ray start --block` 返回 → watchdog 退出非 0 → supervisor 重启 watchdog → ray 自动重启
2) `argus_head_publisher`
- 定期刷新 `head.json`TTL/refresh
- publisher 崩 → supervisor 自动重启
> 备选:把 publisher 逻辑合并进 `argus_head_watchdog`(一个进程同时跑 ray + publisher 线程),减少 supervisor program 数量;但拆分更易观测与定位问题。
#### B) Worker 容器ARGUS_ROLE=worker
由 supervisor 启动 **一个 program**
1) `argus_worker_watchdog`
- 轮询读取 `head.json`,并以 `ray start --address=<head>:6379 --block` 方式加入集群
- 只要 ray 进程退出ray 崩/被 stop`--block` 结束watchdog 进入下一轮重连/重启
- watchdog 自己异常退出 → supervisor 自动重启 watchdog
> 注意:当前仓库里的 `worker_watchdog.py` 是 “`ray start` 非 block + 仅在 head addr 变化时重启”。容器固化建议升级为 “`--block` + 监测 ray 退出” 模式,否则 supervisor 很难准确感知 ray 的生命周期。
### 3.3 配置与环境变量Role 驱动)
镜像入口只依赖环境变量,不依赖宿主脚本参数。
建议环境变量清单(含默认值):
- `ARGUS_ROLE``head` / `worker`(必填)
- `ARGUS_SHARED_ROOT`:默认 `/private`
- `ARGUS_CLUSTER_NAME`:默认 `argus-ray`
- `ARGUS_HEAD_IP_FILE`:默认 `${ARGUS_SHARED_ROOT}/ray/discovery/${ARGUS_CLUSTER_NAME}/head.json`
- `ARGUS_RAY_PORT`:默认 `6379`
- `ARGUS_DASHBOARD_PORT`:默认 `8265`head
- `ARGUS_TTL_S`:默认 `60`head publisher
- `ARGUS_REFRESH_S`:默认 `10`head publisher
- `ARGUS_POLL_S`:默认 `5`worker watchdog
- `ARGUS_NODE_IP`:默认空;若空则 entrypoint 自动探测容器 IP
- `ARGUS_WORKER_RESOURCES_KV`:默认 `worker_node=100`(用于 driver 强制落 worker 的自定义资源)
- `ARGUS_RAY_EXTRA_ARGS`:可选,传递额外 `ray start` 参数
- `ARGUS_LOG_DIR`:默认 `${ARGUS_SHARED_ROOT}/common/logs`(落到共享目录便于排障)
### 3.4 Dockerfile / entrypoint / supervisor 设计
#### Dockerfile建议路径
在仓库新增(后续实现时):
- `src/mvp/images/argus-ray-node/Dockerfile`
- `src/mvp/images/argus-ray-node/entrypoint.sh`
- `src/mvp/images/argus-ray-node/supervisord.conf.tmpl`(可选)
- `src/mvp/images/argus-ray-node/py/argus_raypool/*.py`(仅 stateless pool 子集)
Dockerfile 关键动作:
- `FROM verlai/verl:sgl055.latest`(可 `ARG BASE_IMAGE=...`
- 安装 supervisor
- Debian/Ubuntu 基底:`apt-get update && apt-get install -y supervisor`
- 设定 `CMD ["supervisord","-n","-c","/etc/supervisor/supervisord.conf"]`
- 拷贝 python 脚本到 `/opt/argus/raypool` 并设置 `PYTHONPATH=/opt/argus`
- 拷贝 entrypoint 到 `/usr/local/bin/argus-entrypoint.sh`
- `ENTRYPOINT ["/usr/local/bin/argus-entrypoint.sh"]`
entrypoint.sh 逻辑:
- 探测容器 IP`hostname -i``ip route get 1.1.1.1`
- 根据 `ARGUS_ROLE` 生成 supervisor 配置:
- head启动 `head_watchdog` + `head_publisher`
- worker启动 `worker_watchdog`
- 配置 supervisor
- `autorestart=true`
- `startretries` 合理配置
- stdout/stderr 指向 `${ARGUS_LOG_DIR}/...` 或直接 stdout便于 `docker logs`
### 3.5 与 API server 的关系(保持解耦)
API server 仍按现状(短期方案):
- **代码存放在宿主机**,通过 volume mount 挂载到 head 容器(例如 `/workspace/mvp`)。
- **在 head 容器内启动 API**(例如用脚本 `docker exec argus-ray-head ... python3 /workspace/mvp/py/server.py`)。
- 关键点:即使 API 进程跑在 head 容器里,也仍视作“独立于 ray node 镜像的业务代码”,后续可独立演进为单独的 `argus-api` 镜像。
- 只要 API 能访问 Ray job server通常 `http://127.0.0.1:8265` 在 head 容器视角)即可。
未来(非本迭代)可将 API server 单独做 `argus-api` 镜像,按相同 `/private` 共享目录运行。
---
## 4. docker-compose 调整建议(后续实现)
当前 compose 的变化点(概念上):
- `image: verlai/verl:sgl055.latest``image: argus/argus-ray-node:v2.5`
- `command: sleep infinity` 移除(镜像自带 entrypoint
- head service 增加:
- `ARGUS_ROLE=head`
- 暴露 dashboard 端口保持 `8265:8265`
- worker service 增加:
- `ARGUS_ROLE=worker`
- `ARGUS_WORKER_RESOURCES_KV=worker_node=100`
- volumes 仍需要:
- `../../shared:/private`(共享存储)
- `../../verl:/workspace/verl`verl 代码/依赖按现状)
---
## 5. 验证与回归流程(落地后怎么验收)
### 5.1 构建镜像
1) **在远端 `argus@h1` 构建**(本机不要求具备基础镜像):
- `cd /home2/argus/infra/mvp/src/mvp`
- `docker build -t argus/argus-ray-node:v2.5 -f images/argus-ray-node/Dockerfile .`
2) 也可以使用 compose build推荐和实际运行一致
- `docker compose -f docker-compose.yaml build --no-cache`
### 5.2 基础连通性stateless pool 验证)
1) `docker compose up -d`
2) 验证 head 写入:
- 共享目录存在 `head.json``${ARGUS_SHARED_ROOT}/ray/discovery/${ARGUS_CLUSTER_NAME}/head.json`
3) 验证 worker 自动加入:
- 在 head 容器内 `ray status` 能看到 worker 节点加入
- Dashboard Nodes 页面能看到 head + worker
### 5.3 故障注入supervisor 自愈验证)
1) watchdog 崩溃:
- `pkill -f worker_watchdog`(或 kill 对应 PID
- 期望supervisor 自动拉起 watchdogworker 最终重新加入集群
2) ray 节点崩溃worker
- `ray stop --force` 或 kill raylet
- 期望watchdog 重新执行 `ray start ... --block`worker 恢复
3) ray 节点崩溃head
- kill head ray 前台进程(由 watchdog 启动)
- 期望supervisor 重启 head_watchdoghead 恢复并重写 head.jsonworkers 自动重连
### 5.4 端到端任务回归(与 v2.5 API 协作)
沿用现有 v2.5 E2E
- `src/mvp/scripts/run_all_v25_api.sh`
- `src/mvp/scripts/run_e2e_v25_cases.sh`
验收标准:
- PPO/GRPO/SFT 均能在 worker 上运行head 不跑训练
- API 的 task_id / submission_id 正常携带用户名
- 资源不足可转 `PENDING_RESOURCES` 并按周期重试
---
## 6. 风险点与对策
- **ray start 后台化**如果继续后台启动supervisor 不易感知 ray 崩溃。对策:使用 `ray start --block`(推荐)。
- **IP 探测不稳定**不同环境compose/平台)容器 IP 获取方式不同。对策entrypoint 做多策略探测并允许 `ARGUS_NODE_IP` 显式覆盖。
- **日志可观测性**:建议同时支持写到 `/private/common/logs`(共享)以及 stdout`docker logs`)。

View File

@ -1,10 +1,12 @@
version: "3.8"
services:
ray_head:
image: verlai/verl:sgl055.latest
build:
context: .
dockerfile: images/argus-ray-node/Dockerfile
args:
BASE_IMAGE: verlai/verl:sgl055.latest
image: argus/argus-ray-node:v2.5
container_name: argus-ray-head
command: sleep infinity
ports:
- "8265:8265"
- "8080:8080"
@ -26,6 +28,10 @@ services:
networks:
- argus-ray-net
environment:
ARGUS_ROLE: "head"
ARGUS_SHARED_ROOT: "/private"
ARGUS_CLUSTER_NAME: "argus-ray"
ARGUS_LOG_DIR: "/private/common/logs"
HF_HOME: "/private/hf"
HUGGINGFACE_HUB_CACHE: "/private/hf/hub"
TRANSFORMERS_CACHE: "/private/hf/transformers"
@ -33,9 +39,8 @@ services:
PYTHONUNBUFFERED: "1"
ray_worker_0:
image: verlai/verl:sgl055.latest
image: argus/argus-ray-node:v2.5
container_name: argus-ray-worker-0
command: sleep infinity
volumes:
- ../../verl:/workspace/verl
- ../../shared:/private
@ -52,6 +57,11 @@ services:
- argus-ray-net
runtime: nvidia
environment:
ARGUS_ROLE: "worker"
ARGUS_SHARED_ROOT: "/private"
ARGUS_CLUSTER_NAME: "argus-ray"
ARGUS_LOG_DIR: "/private/common/logs"
ARGUS_WORKER_RESOURCES_KV: "worker_node=100"
NVIDIA_VISIBLE_DEVICES: "0,1,2,3"
NVIDIA_DRIVER_CAPABILITIES: "all"
HF_HOME: "/private/hf"
@ -61,9 +71,8 @@ services:
PYTHONUNBUFFERED: "1"
ray_worker_1:
image: verlai/verl:sgl055.latest
image: argus/argus-ray-node:v2.5
container_name: argus-ray-worker-1
command: sleep infinity
volumes:
- ../../verl:/workspace/verl
- ../../shared:/private
@ -80,6 +89,11 @@ services:
- argus-ray-net
runtime: nvidia
environment:
ARGUS_ROLE: "worker"
ARGUS_SHARED_ROOT: "/private"
ARGUS_CLUSTER_NAME: "argus-ray"
ARGUS_LOG_DIR: "/private/common/logs"
ARGUS_WORKER_RESOURCES_KV: "worker_node=100"
NVIDIA_VISIBLE_DEVICES: "4,5,6,7"
NVIDIA_DRIVER_CAPABILITIES: "all"
HF_HOME: "/private/hf"

View File

@ -0,0 +1,26 @@
ARG BASE_IMAGE=verlai/verl:sgl055.latest
FROM ${BASE_IMAGE}
SHELL ["/bin/bash", "-lc"]
# Install supervisord (prefer pip to avoid relying on distro package manager).
RUN python3 -m pip install --no-cache-dir supervisor
RUN mkdir -p /opt/argus/py/argus/ray
# Minimal embedded code for stateless pool (API code is intentionally excluded).
COPY py/argus/__init__.py /opt/argus/py/argus/__init__.py
COPY py/argus/ray/__init__.py /opt/argus/py/argus/ray/__init__.py
COPY py/argus/ray/discovery.py /opt/argus/py/argus/ray/discovery.py
COPY py/argus/ray/head_publisher.py /opt/argus/py/argus/ray/head_publisher.py
COPY py/argus/ray/worker_watchdog.py /opt/argus/py/argus/ray/worker_watchdog.py
COPY images/argus-ray-node/entrypoint.sh /usr/local/bin/argus-entrypoint.sh
COPY images/argus-ray-node/argus-head-ray.sh /usr/local/bin/argus-head-ray.sh
COPY images/argus-ray-node/argus-head-publisher.sh /usr/local/bin/argus-head-publisher.sh
COPY images/argus-ray-node/argus-worker-watchdog.sh /usr/local/bin/argus-worker-watchdog.sh
RUN chmod +x /usr/local/bin/argus-entrypoint.sh /usr/local/bin/argus-head-ray.sh /usr/local/bin/argus-head-publisher.sh /usr/local/bin/argus-worker-watchdog.sh
ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["/usr/local/bin/argus-entrypoint.sh"]

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -euo pipefail
cluster_name="${ARGUS_CLUSTER_NAME:-argus-ray}"
head_ip_file="${ARGUS_HEAD_IP_FILE:?missing ARGUS_HEAD_IP_FILE}"
node_ip="${ARGUS_NODE_IP:?missing ARGUS_NODE_IP}"
ray_port="${ARGUS_RAY_PORT:-6379}"
dashboard_port="${ARGUS_DASHBOARD_PORT:-8265}"
ttl_s="${ARGUS_TTL_S:-60}"
refresh_s="${ARGUS_REFRESH_S:-10}"
exec python3 -m argus.ray.head_publisher \
--cluster-name "${cluster_name}" \
--head-ip-file "${head_ip_file}" \
--head-ip "${node_ip}" \
--gcs-port "${ray_port}" \
--dashboard-port "${dashboard_port}" \
--ttl-s "${ttl_s}" \
--refresh-s "${refresh_s}"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
node_ip="${ARGUS_NODE_IP:?missing ARGUS_NODE_IP}"
ray_port="${ARGUS_RAY_PORT:-6379}"
dashboard_port="${ARGUS_DASHBOARD_PORT:-8265}"
export RAY_DISABLE_USAGE_STATS=1
ray stop --force || true
# Head should not run training workloads.
exec ray start \
--head \
--node-ip-address="${node_ip}" \
--port="${ray_port}" \
--dashboard-host=0.0.0.0 \
--dashboard-port="${dashboard_port}" \
--num-cpus=0 \
--num-gpus=0 \
--disable-usage-stats \
--block \
${ARGUS_RAY_EXTRA_ARGS:-}

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
head_ip_file="${ARGUS_HEAD_IP_FILE:?missing ARGUS_HEAD_IP_FILE}"
node_ip="${ARGUS_NODE_IP:?missing ARGUS_NODE_IP}"
poll_s="${ARGUS_POLL_S:-5}"
resources_kv="${ARGUS_WORKER_RESOURCES_KV:-worker_node=100}"
args=()
IFS=',' read -r -a parts <<<"${resources_kv}"
for kv in "${parts[@]}"; do
kv="$(echo "${kv}" | xargs || true)"
[[ -z "${kv}" ]] && continue
args+=( "--resources-kv" "${kv}" )
done
exec python3 -m argus.ray.worker_watchdog \
--head-ip-file "${head_ip_file}" \
--node-ip "${node_ip}" \
"${args[@]}" \
--poll-s "${poll_s}" \
--ray-start-block

View File

@ -0,0 +1,93 @@
#!/usr/bin/env bash
set -euo pipefail
role="${ARGUS_ROLE:-}"
if [[ -z "${role}" ]]; then
echo "ERROR: ARGUS_ROLE must be set to head|worker" >&2
exit 1
fi
if [[ "${role}" != "head" && "${role}" != "worker" ]]; then
echo "ERROR: invalid ARGUS_ROLE=${role} (expected head|worker)" >&2
exit 1
fi
shared_root="${ARGUS_SHARED_ROOT:-/private}"
cluster_name="${ARGUS_CLUSTER_NAME:-argus-ray}"
head_ip_file="${ARGUS_HEAD_IP_FILE:-${shared_root}/ray/discovery/${cluster_name}/head.json}"
ray_port="${ARGUS_RAY_PORT:-6379}"
dashboard_port="${ARGUS_DASHBOARD_PORT:-8265}"
ttl_s="${ARGUS_TTL_S:-60}"
refresh_s="${ARGUS_REFRESH_S:-10}"
poll_s="${ARGUS_POLL_S:-5}"
log_dir="${ARGUS_LOG_DIR:-${shared_root}/common/logs}"
mkdir -p "${log_dir}"
node_ip="${ARGUS_NODE_IP:-}"
if [[ -z "${node_ip}" ]]; then
# Prefer the first IP returned by `hostname -i`.
node_ip="$(hostname -i | awk '{print $1}')"
fi
if [[ -z "${node_ip}" ]]; then
echo "ERROR: failed to determine container IP; set ARGUS_NODE_IP" >&2
exit 1
fi
export PYTHONPATH="/opt/argus/py:${PYTHONPATH:-}"
export ARGUS_SHARED_ROOT="${shared_root}"
export ARGUS_CLUSTER_NAME="${cluster_name}"
export ARGUS_HEAD_IP_FILE="${head_ip_file}"
export ARGUS_NODE_IP="${node_ip}"
export ARGUS_RAY_PORT="${ray_port}"
export ARGUS_DASHBOARD_PORT="${dashboard_port}"
export ARGUS_TTL_S="${ttl_s}"
export ARGUS_REFRESH_S="${refresh_s}"
export ARGUS_POLL_S="${poll_s}"
export ARGUS_LOG_DIR="${log_dir}"
supervisor_conf="/tmp/supervisord.conf"
cat >"${supervisor_conf}" <<'CONF'
[supervisord]
nodaemon=true
pidfile=/tmp/supervisord.pid
logfile=/dev/null
childlogdir=/tmp
user=root
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock
[unix_http_server]
file=/tmp/supervisor.sock
chmod=0700
CONF
append_program() {
local name="$1"
local cmd="$2"
local stdout="$3"
local stderr="$4"
cat >>"${supervisor_conf}" <<CONF
[program:${name}]
command=${cmd}
autorestart=true
startsecs=1
startretries=999
stdout_logfile=${stdout}
stdout_logfile_maxbytes=0
stderr_logfile=${stderr}
stderr_logfile_maxbytes=0
CONF
}
if [[ "${role}" == "head" ]]; then
append_program "argus_head_ray" "/usr/local/bin/argus-head-ray.sh" "/dev/fd/1" "/dev/fd/2"
append_program "argus_head_publisher" "/usr/local/bin/argus-head-publisher.sh" "${log_dir}/argus_head_publisher.log" "${log_dir}/argus_head_publisher.err.log"
else
append_program "argus_worker_watchdog" "/usr/local/bin/argus-worker-watchdog.sh" "${log_dir}/argus_worker_watchdog.log" "${log_dir}/argus_worker_watchdog.err.log"
fi
exec supervisord -n -c "${supervisor_conf}"

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import argparse
import json
import os
import subprocess
import time
from dataclasses import dataclass
@ -22,10 +23,12 @@ def _ray_stop_cmd() -> list[str]:
return ["ray", "stop", "--force"]
def _ray_start_cmd(*, head_addr: str, node_ip: str | None, resources_json: str) -> list[str]:
def _ray_start_cmd(*, head_addr: str, node_ip: str | None, resources_json: str, block: bool) -> list[str]:
argv = ["ray", "start", f"--address={head_addr}", f"--resources={resources_json}", "--disable-usage-stats"]
if node_ip:
argv.append(f"--node-ip-address={node_ip}")
if block:
argv.append("--block")
return argv
@ -36,13 +39,54 @@ class Watchdog:
resources_json: str
poll_s: int
runner: Runner
ray_start_block: bool
_current_head_addr: str | None = None
_ray_proc: subprocess.Popen[bytes] | None = None
def _kill_ray_proc(self) -> None:
p = self._ray_proc
self._ray_proc = None
if not p:
return
try:
p.terminate()
except Exception:
pass
try:
p.wait(timeout=10)
except Exception:
try:
p.kill()
except Exception:
pass
try:
p.wait(timeout=5)
except Exception:
pass
def _restart_ray(self, desired: str) -> None:
# Best-effort stop then start.
self._kill_ray_proc()
self.runner(_ray_stop_cmd())
self.runner(_ray_start_cmd(head_addr=desired, node_ip=self.node_ip, resources_json=self.resources_json))
if self.ray_start_block:
argv = _ray_start_cmd(
head_addr=desired,
node_ip=self.node_ip,
resources_json=self.resources_json,
block=True,
)
# Use Popen so watchdog can notice exits and reconnect if needed.
self._ray_proc = subprocess.Popen(argv, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
self.runner(
_ray_start_cmd(
head_addr=desired,
node_ip=self.node_ip,
resources_json=self.resources_json,
block=False,
)
)
self._current_head_addr = desired
def tick_once(self) -> HeadRecord | None:
@ -52,6 +96,12 @@ class Watchdog:
desired = rec.head_addr()
if self._current_head_addr != desired:
self._restart_ray(desired)
return rec
# If we're in block mode and the ray process died, restart it against the same head.
if self.ray_start_block and self._ray_proc is not None:
if self._ray_proc.poll() is not None:
self._restart_ray(desired)
return rec
def run_forever(self) -> None:
@ -75,6 +125,7 @@ def main(argv: list[str] | None = None) -> int:
help='Repeatable k=v resources, e.g. --resources-kv worker_node=100',
)
ap.add_argument("--poll-s", type=int, default=5)
ap.add_argument("--ray-start-block", action="store_true", help="Run `ray start ... --block` and self-heal on exit.")
ap.add_argument("--once", action="store_true", help="Run one tick then exit (for testing/debug).")
args = ap.parse_args(argv)
@ -91,12 +142,17 @@ def main(argv: list[str] | None = None) -> int:
raise SystemExit(f"invalid --resources-kv key: {item!r}")
resources[k] = float(v)
node_ip = args.node_ip
if node_ip in (None, ""):
node_ip = os.environ.get("ARGUS_NODE_IP") or None
wd = Watchdog(
head_ip_file=args.head_ip_file,
node_ip=args.node_ip,
node_ip=node_ip,
resources_json=json.dumps(resources),
poll_s=int(args.poll_s),
runner=_default_runner,
ray_start_block=bool(args.ray_start_block),
)
if args.once:
wd.tick_once()

View File

@ -21,6 +21,7 @@ def test_watchdog_restarts_on_first_seen_and_on_change(tmp_path: Path):
resources_json=json.dumps({"worker_node": 100}),
poll_s=1,
runner=runner,
ray_start_block=False,
)
write_head_record_atomic(str(head_file), build_head_record(cluster_name="c", head_ip="1.1.1.1"))
@ -40,6 +41,72 @@ def test_watchdog_restarts_on_first_seen_and_on_change(tmp_path: Path):
assert any(c[1] == "start" for c in calls)
def test_watchdog_block_mode_restarts_when_ray_exits(monkeypatch, tmp_path: Path):
import argus.ray.worker_watchdog as mod
head_file = tmp_path / "head.json"
write_head_record_atomic(str(head_file), build_head_record(cluster_name="c", head_ip="1.1.1.1"))
calls: list[list[str]] = []
popen_argvs: list[list[str]] = []
class DummyProc:
def __init__(self, argv: list[str]):
self.argv = argv
self._poll: int | None = None
self.terminated = False
self.killed = False
def poll(self):
return self._poll
def terminate(self):
self.terminated = True
def wait(self, timeout: float | None = None):
# Force kill path to be exercised.
raise TimeoutError("timeout")
def kill(self):
self.killed = True
proc_holder: dict[str, DummyProc] = {}
def fake_popen(argv, stdout=None, stderr=None):
popen_argvs.append(list(argv))
p = DummyProc(list(argv))
proc_holder["p"] = p
return p
def runner(argv: list[str]) -> int:
calls.append(argv)
return 0
monkeypatch.setattr(mod.subprocess, "Popen", fake_popen)
wd = Watchdog(
head_ip_file=str(head_file),
node_ip="10.0.0.2",
resources_json=json.dumps({"worker_node": 100}),
poll_s=1,
runner=runner,
ray_start_block=True,
)
# First tick: should start ray with --block.
assert wd.tick_once() is not None
assert any(c[1] == "stop" for c in calls)
assert len(popen_argvs) == 1
assert "--block" in popen_argvs[0]
# Simulate ray process exit; next tick should restart (stop + new popen).
proc_holder["p"]._poll = 1
calls.clear()
wd.tick_once()
assert any(c[1] == "stop" for c in calls)
assert len(popen_argvs) == 2
def test_watchdog_main_once_invokes_runner(monkeypatch, tmp_path: Path):
from argus.ray import worker_watchdog as mod

View File

@ -10,7 +10,12 @@ if [[ "${SKIP_CLEANUP_V1:-0}" != "1" ]]; then
fi
echo "[host] docker compose up -d (mvp)"
dc up -d
BUILD="${BUILD:-1}"
if [[ "${BUILD}" == "1" ]]; then
dc up -d --build
else
dc up -d
fi
echo "[host] containers:"
docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' | (head -n 1 && grep -E "argus-ray-") || true

View File

@ -5,14 +5,17 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
HEAD_IP="$(container_ip "${HEAD_CONTAINER}")"
echo "[head] restart head container (ray + publisher are supervised in-container): ${HEAD_CONTAINER}"
docker restart "${HEAD_CONTAINER}" >/dev/null
echo "[head] ray stop (best-effort)"
dexec "${HEAD_CONTAINER}" bash -lc "ray stop --force || true"
echo "[head] wait ray dashboard ready (best-effort)"
for i in $(seq 1 60); do
if curl -sS -m 2 "${RAY_DASHBOARD_ADDR}" >/dev/null 2>&1; then
echo "[head] dashboard ready: ${RAY_DASHBOARD_ADDR}"
break
fi
sleep 2
done
echo "[head] start ray head (CPU=0 GPU=0): ${HEAD_IP}"
dexec "${HEAD_CONTAINER}" bash -lc "ray start --head --node-ip-address='${HEAD_IP}' --port=6379 --dashboard-host=0.0.0.0 --dashboard-port=8265 --num-cpus=0 --num-gpus=0 --disable-usage-stats"
echo "[head] ray status"
echo "[head] ray status (best-effort)"
dexec "${HEAD_CONTAINER}" bash -lc "ray status || true"

View File

@ -7,38 +7,6 @@ source "${SCRIPT_DIR}/lib.sh"
CLUSTER_NAME="${CLUSTER_NAME:-argus-ray}"
HEAD_IP_FILE="${HEAD_IP_FILE:-${SHARED_ROOT}/ray/discovery/${CLUSTER_NAME}/head.json}"
TTL_S="${TTL_S:-60}"
REFRESH_S="${REFRESH_S:-10}"
# PID file must be container-local to avoid conflicts if /private is shared across containers.
PID_PATH="${PID_PATH:-/tmp/argus_head_publisher.pid}"
LOG_PATH="${LOG_PATH:-${SHARED_ROOT}/common/logs/argus_head_publisher.log}"
HEAD_IP="$(container_ip "${HEAD_CONTAINER}")"
echo "[head] start discovery publisher (supervised): ${HEAD_IP_FILE} (head_ip=${HEAD_IP})"
# stop existing (best-effort)
dexec "${HEAD_CONTAINER}" bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\" || true; fi; rm -f '${PID_PATH}'; fi"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${HEAD_IP_FILE}")\""
# Supervisor loop: restart publisher if it exits.
docker exec -d "${HEAD_CONTAINER}" bash -lc "
nohup bash -lc '
export PYTHONPATH=/workspace/mvp/py
while true; do
python3 -m argus.ray.head_publisher \
--cluster-name \"${CLUSTER_NAME}\" \
--head-ip-file \"${HEAD_IP_FILE}\" \
--head-ip \"${HEAD_IP}\" \
--ttl-s \"${TTL_S}\" \
--refresh-s \"${REFRESH_S}\"
sleep 2
done
' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'
"
echo "[head] publisher pid stored in ${PID_PATH} (container-local)"
echo "[head] logs: ${LOG_PATH}"
echo "[head] discovery publisher is supervised in-container; verify head record exists: ${HEAD_IP_FILE}"
dexec "${HEAD_CONTAINER}" bash -lc "test -f '${HEAD_IP_FILE}' && python3 -c 'import json,sys; print(json.load(open(sys.argv[1]))[\"job_server_url\"])' '${HEAD_IP_FILE}' || true"

View File

@ -5,48 +5,9 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CLUSTER_NAME="${CLUSTER_NAME:-argus-ray}"
HEAD_IP_FILE="${HEAD_IP_FILE:-${SHARED_ROOT}/ray/discovery/${CLUSTER_NAME}/head.json}"
POLL_S="${POLL_S:-5}"
WORKER_NODE_RESOURCE="${WORKER_NODE_RESOURCE:-100}"
start_one() {
local worker="$1"
local ip
ip="$(container_ip "${worker}")"
local pid_path="/tmp/argus_worker_watchdog.pid"
local log_path="${SHARED_ROOT}/common/logs/argus_worker_watchdog.${worker}.log"
echo "[${worker}] start stateless watchdog (supervised): head_file=${HEAD_IP_FILE} node_ip=${ip}"
# stop existing watchdog (best-effort)
dexec "${worker}" bash -lc "if test -f '${pid_path}'; then pid=\$(cat '${pid_path}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\" || true; fi; rm -f '${pid_path}'; fi"
# stop any legacy ray process to avoid split-brain
dexec "${worker}" bash -lc "ray stop --force || true"
dexec "${worker}" bash -lc "mkdir -p \"$(dirname "${log_path}")\""
docker exec -d "${worker}" bash -lc "
nohup bash -lc '
export PYTHONPATH=/workspace/mvp/py
while true; do
python3 -m argus.ray.worker_watchdog \
--head-ip-file \"${HEAD_IP_FILE}\" \
--node-ip \"${ip}\" \
--resources-kv \"worker_node=${WORKER_NODE_RESOURCE}\" \
--poll-s \"${POLL_S}\"
sleep 2
done
' >>'${log_path}' 2>&1 & echo \$! >'${pid_path}'
"
echo "[${worker}] watchdog pid stored in ${pid_path} (container-local)"
echo "[${worker}] logs: ${log_path}"
}
start_one "${WORKER0_CONTAINER}"
start_one "${WORKER1_CONTAINER}"
echo "[workers] restart worker containers (ray is supervised in-container)"
docker restart "${WORKER0_CONTAINER}" >/dev/null
docker restart "${WORKER1_CONTAINER}" >/dev/null
echo "[head] ray status"
dexec "${HEAD_CONTAINER}" bash -lc "ray status || true"