v2.0 代码重构,跑通run_all_api.sh

This commit is contained in:
yuyr 2025-12-24 17:32:21 +08:00
parent 45b6a5f05e
commit 4dacac24f0
63 changed files with 671 additions and 860 deletions

View File

@ -0,0 +1,301 @@
# MVP 代码结构重构方案(按功能模块划分)
背景:当前 `src/mvp/` 下以 `v1.1/``v2.0/` 版本目录来组织代码。实际上 **v2.0 是在 v1.1 的 Ray Jobs SDK 提交链路基础上扩展了服务层**,并且为了让 v2.0 工作又对 v1.1 的 `docker-compose.yaml``dev.yaml` 做了修改(挂载 v2、开放 8080、增加 `v2:` 配置段)。因此“按版本分目录”会让依赖关系变得不清晰(谁是库、谁是应用、哪些配置是共享的)。
本方案目标:把 `src/mvp/` 重构为“按功能模块”划分ray 提交核心库 / service 服务层 / cli 工具 / TaskSpecs / configs / scripts并给出迁移后的验证与执行方案。
> 本文仅给出设计与迁移/验证方案,不直接改代码(待确认后再实施)。
---
## 1. 现状梳理(问题点)
### 1.1 代码重复与耦合
- `src/mvp/v2.0/py/mvp_v11/` 是从 `src/mvp/v1.1/py/mvp_v11/` 复制而来用于复用,但这导致:
- **库代码重复**(修 bug 要改两份)
- 谁是“权威实现”不明确
- v2 API`mvp_v2`)通过引用复制的 `mvp_v11.RayJobTool` 来提交 Ray Job本质上依赖 v1.1 提交链路作为“库”。
### 1.2 配置与部署目录不稳定
- v2.0 复用了 v1.1 config 文件并新增 `v2:` section这是合理的“向后兼容扩展”但它把
- “Ray submit 基础配置”
- “API 服务配置”
- “部署路径约定(/workspace/mvp/v1.1 vs /workspace/mvp/v2
混在一个文件里,不利于长期维护。
### 1.3 命名歧义jobspec 与 Ray job
- v1.1/v2.0 都使用 `jobspec.yaml` 指代“训练语义参数”PPO/GRPO/SFT 的训练字段)。
- 但 Ray 也有 “Ray Job” 概念submission_id、entrypoint、runtime_env 等),易造成沟通误解。
- 需要把训练侧 specs 改名为 **TaskSpecs**(代表平台级任务规范),与 Ray Job 区分。
---
## 2. 重构目标What good looks like
### 2.1 目录与职责清晰
- “提交 Ray Job 的 SDK 封装”是一个可复用模块(库)。
- “服务层API + scheduler + SQLite”是一个独立模块应用/服务)。
- “训练语义参数TaskSpecs”与 “Ray Job 提交参数RayConfig”分层清楚。
### 2.2 单一真源Single Source of Truth
- 只能有一份“Ray submitter core”实现不能复制一份到另一个版本目录
- API 与 CLI/脚本都复用同一份 core。
### 2.3 兼容现有运行方式(渐进迁移)
- 保留现有的脚本式启动/准备流程Ray 起集群、准备模型/数据仍用 scripts
- 允许在迁移期提供薄 wrapper 兼容旧路径(减少一次性 break
---
## 3. 目标结构(按功能模块划分)
建议把 `src/mvp/` 重构为下面的“功能分层”:
```
src/mvp/
py/
argus/ # 顶层包(避免与 Ray 的 `ray` 包冲突)
__init__.py
core/ # 通用yaml/模型定义/工具函数(纯库)
__init__.py
yaml_io.py
ids.py # task_id / attempt_id 生成规则
ray/ # Ray Job 提交“核心库”(由现成 mvp_v11 迁移而来)
__init__.py
models.py # RayConfig, TaskSpec(解析), Attempt, enums
builders.py # build_training_argv (ppo/grpo/sft)
driver_entrypoint.py # 仍然作为 Ray job entrypointworker 上执行)
ray_job_tool.py # Ray Jobs SDK 封装submit/status/stop/logs
runtime_env.py # 统一 PYTHONPATH/runtime_env 组装逻辑
service/ # 服务层FastAPI + scheduler + sqlite应用
__init__.py
app.py
scheduler.py
db.py
config.py # service 相关配置读取(从 configs 读取)
ray_resources.py
cli/ # 命令行/SDK 提交入口(由现成 v1.1 run.py 迁移而来)
__init__.py
run.py # submit/status/logs/stop 等 action
server.py # uvicorn 入口(导入 argus.service.*
configs/
dev.yaml # RayConfig + ServiceConfig按层次组织、可扩展
prod.yaml # (可选)生产配置模板
taskspecs/ # 原 jobspecs/,改名 TaskSpecs训练语义规范
ppo.yaml
grpo.yaml
sft.yaml
README.md # TaskSpec 字段解释、示例
scripts/ # 宿主机脚本docker exec/compose 编排)
lib.sh
00_prereq_check.sh
01_up.sh / 02_down.sh
20_start_head.sh / 21_start_workers.sh
30_prepare_data_and_model.sh
40_submit_cli.sh # 通过 cli/run.py 提交 TaskSpec
60_start_api.sh # 启动 APIservice
61_stop_api.sh
62_status_api.sh
docker-compose.yaml # dev 环境 compose从 v1.1 迁移到这里,路径稳定)
README.md # 总入口文档(运行方式、目录约定)
```
### 3.1 关键点:库 vs 应用边界
- `argus.ray` 是唯一的 Ray submitter 库(替代当前 v1.1/v2.0 的 `mvp_v11` 双份拷贝)。
- `argus.service` 依赖 `argus.ray`,不反向依赖。
- `argus.cli` 依赖 `argus.ray`,用于脚本化提交/调试。
### 3.2 TaskSpecs vs RayConfig
- `taskspecs/*.yaml`描述训练任务语义参数workload、nnodes、n_gpus_per_node、数据/模型路径、训练步数等)。
- `configs/*.yaml`:描述 Ray 提交环境address、entrypoint_resources、runtime_env 以及 service 配置)。
---
## 4. 配置策略(重构后如何组织 configs
### 4.1 建议的 config 分层
把当前 `dev.yaml` 的内容明确分为两段(按模块名分段):
1) `ray:`RayConfig
- job server address
- shared_root`/private`
- entrypoint resources强制 driver 落 worker
- runtime_env env_varsHF cache、PYTHONPATH 注入策略)
2) `service:`ServiceConfig
- api host/port
- auth token_env
- sqlite db_path
- scheduler tick/max_running/retry_interval
示例(结构示意):
```yaml
ray:
address: "http://127.0.0.1:8265"
shared_root: "/private"
entrypoint_num_cpus: 1
entrypoint_resources:
worker_node: 1
runtime_env:
env_vars:
HF_ENDPOINT: "https://hf-mirror.com"
PYTHONUNBUFFERED: "1"
user_code_path: "/private/user/code"
service:
api:
host: "0.0.0.0"
port: 8080
auth:
token_env: "MVP_INTERNAL_TOKEN"
sqlite:
db_path: "/private/common/db/mvp.sqlite3"
scheduler:
tick_s: 5
retry_interval_s: 60
max_running_tasks: 1
```
> 迁移期可以支持“旧格式”v1.1 顶层字段 + v2: 段与“新格式”ray:/service: 两段)并存:解析时兼容读取,降低一次性改动风险;但最终以新格式为准。
---
## 5. 迁移路径(推荐分两阶段实施)
### 阶段 A先拷贝/迁移现成文件,再做最小调整(保持行为不变)
目标:不改功能、不改 API 行为。优先通过“拷贝/迁移现成文件 + 修正包引用/路径”完成重构,避免重头重写逻辑(降低出错概率)。
建议步骤:
1) 抽出 `src/mvp/py/argus/ray/`(由现成代码迁移)
- 将 `src/mvp/v1.1/py/mvp_v11/` 迁移到 `src/mvp/py/argus/ray/`,并把它作为 submitter core 的唯一真源(不再保留一份复制品在其它目录)。
- 只做机械化调整:修正 import、修正默认路径常量例如 tool code path / working dir、修正 scripts 的调用路径。
2) 抽出 `src/mvp/py/argus/service/`(由现成代码迁移)
- 将 `src/mvp/v2.0/py/mvp_v2/` 迁移到 `src/mvp/py/argus/service/`
- service 侧对 submitter 的依赖统一改为 `src/mvp/py/argus/ray/`(不再引用 `src/mvp/v2.0/py/mvp_v11/` 的复制品)。
3) CLI 统一入口:`src/mvp/py/argus/cli/run.py`(由现成代码迁移)
- 将 `src/mvp/v1.1/py/run.py` 迁移到 `src/mvp/py/argus/cli/run.py`,保留 action 语义submit/status/logs/stop
- 仅调整 import 与默认路径使其指向新目录configs/taskspecs/py
4) scripts 合并(以 v1.1 为基、合入 v2 API
- 将 `src/mvp/v1.1/scripts/` 迁移到 `src/mvp/scripts/`Ray 集群编排最成熟)。
- 将 `src/mvp/v2.0/scripts/` 的 API 启停脚本合入 `src/mvp/scripts/`,并统一命名与默认路径。
5) docker-compose / mounts 稳定化(你已确认要迁移)
- 将 `src/mvp/v1.1/docker-compose.yaml` 迁移为 `src/mvp/docker-compose.yaml`
- 容器内挂载统一:宿主机 `.../src/mvp/` → 容器 `/workspace/mvp/`(包含 `py/ configs/ taskspecs/ scripts/`)。
- runtime_env 的 `PYTHONPATH` 注入统一指向 `/workspace/mvp/py`(不再出现 `/workspace/mvp/v1.1/py``/workspace/mvp/v2/...`)。
阶段 A 完成标准:
- 原来 v1.1 的 CLI 提交方式仍可用(提交 PPO/GRPO/SFT
- v2 API 仍可用(队列、取消、日志)。
- 不再存在 `mvp_v11` 的重复目录。
### 阶段 B配置格式升级按模块两段+ TaskSpecs 更名落地
目标:把 jobspec 真正改名为 TaskSpec并把 config 升级为按模块两段(`ray:`/`service:`)清晰分层。
建议步骤:
- `jobspecs/``taskspecs/`,并更新 README/脚本引用。
- `dev.yaml` 从“顶层字段 + v2:”迁移为“ray:/service:”两段。
- 保留一段时间的兼容解析逻辑(读取旧格式时发出 warning
- 完成验证后删除旧版本目录:`src/mvp/v1.1/``src/mvp/v2.0/`(以及远端对应目录),确保新结构成为唯一入口。
阶段 B 完成标准:
- docs 里不再出现 “jobspec” 词汇,统一称 “TaskSpec”。
- `configs/dev.yaml` 分层清晰(`ray:`/`service:` 两段按模块名),服务与 Ray 的配置互不干扰。
---
## 6. 重构后的验证与执行方案(验收/回归)
### 6.1 本地(仓库内)静态验证
1) import / 入口检查(在容器环境)
- `python3 -c "from argus.ray.ray_job_tool import RayJobTool"`
- `python3 -c "from argus.service.app import create_app"`
2) 目录结构检查
- 确保 `src/mvp/py` 是唯一 python 代码根
- 确保 `taskspecs/``configs/``scripts/` 都在 `src/mvp/`
### 6.2 dev 环境argus@h1)端到端验证
前置:
- 远端目录:`argus@h1:/home2/argus/infra/mvp/`(维持不变)
- 共享目录:`/home2/argus/infra/mvp/shared` 挂载到容器 `/private`
验证步骤(推荐顺序):
1) 重新拉起容器 + 启动 Ray
- `scripts/01_up.sh`
- `scripts/20_start_head.sh`head `--num-cpus=0 --num-gpus=0`
- `scripts/21_start_workers.sh`workers 带 `worker_node` 资源)
- `ray status` / `ray list nodes` 确认 head 无 GPU、workers 各 4 GPU
2) CLI 提交回归(等价 v1.1
- 提交 `taskspecs/sft.yaml`,确认成功
- 提交 `taskspecs/grpo.yaml`,确认成功
- 提交 `taskspecs/ppo.yaml`,确认成功
3) API 服务回归(等价 v2.0
- `scripts/60_start_api.sh`
- `POST /api/v2/tasks`raw YAML TaskSpec
- `GET /api/v2/tasks/{task_id}`:确认返回 `created_at/updated_at`
- `POST /api/v2/tasks/{task_id}:cancel`:确认任务 `state=CANCELED` 且 attempt `ray_status=STOPPED`(服务侧语义一致)
4) 队列行为回归
- 在 `service.scheduler.max_running_tasks=1` 下:
- 连续提交两个 8-GPU 任务:第二个应保持 `QUEUED/PENDING_RESOURCES`,直到第一个结束后自动提交。
验收标准:
- 三种 workloadPPO/GRPO/SFT都能通过 CLI 跑通(或至少能正确提交到 Ray 并进入 RUNNING
- API 提交/查询/取消/日志正常。
- “cancel 后 state=CANCELED 但 attempt 仍 RUNNING”的不一致问题不再出现。
---
## 7. 风险与注意事项
1) PYTHONPATH 注入路径变化
- 当前 runtime_env 里有 `MVP_TOOL_CODE_PATH=/workspace/mvp/v1.1/py` 的历史路径假设;
- 重构后需统一为 `/workspace/mvp/py`,并确保所有容器都挂载到相同路径。
2) SQLite WAL 在 NFS 上的稳定性
- 目前 db 使用 WAL生成 `-wal/-shm`NFS 下可能有一致性风险;
- 可作为后续优化:检测 NFS 时退回 `journal_mode=DELETE` 或换成单机本地盘。
3) 渐进迁移的兼容期
- 迁移期可以短暂保留旧路径(例如 `src/mvp/v1.1``src/mvp/v2.0` 仅保留 README 指向新路径)以减少一次性断裂;但你已确认最终会删除这两个目录,因此需要在 scripts/文档同步更新后尽快清理。
---
## 8. 已确认约束(将按此实施)
你已明确:
1) `docker-compose.yaml` 必须迁移到 `src/mvp/` 根;重构完成后 `src/mvp/v1.1``src/mvp/v2.0` 都会删除。
2) `configs/dev.yaml` 升级为两段,按模块名分段:`ray:``service:`;并保持纯 YAML 风格(不混用 JSON inline map
3) TaskSpec 字段先保持与现有 v1.1 YAML 完全一致(仅目录与命名变化),避免引入不必要的不兼容。

17
src/mvp/README.md Normal file
View File

@ -0,0 +1,17 @@
# MVP模块化结构
本目录是 MVP 的“按功能模块”组织后的最终入口(替代原 `v1.1/``v2.0/` 版本目录)。
目录:
- `py/`Python 代码(`argus.*` 包)
- `argus.ray`Ray Jobs SDK submitter core提交/查询/停止/日志)
- `argus.service`API + scheduler + SQLite服务层队列/重试/状态聚合)
- `argus.cli`CLI 工具(用于脚本化提交/调试)
- `configs/`:配置(`ray:``service:` 两段,纯 YAML 风格)
- `taskspecs/`TaskSpec训练任务语义参数
- `scripts/`宿主机脚本docker compose/exec 编排)
- `docker-compose.yaml`dev 环境容器定义head+workers
快速开始:
- CLI 提交流程:`scripts/run_all_cli.sh`
- API 提交流程:`scripts/run_all_api.sh`

34
src/mvp/configs/dev.yaml Normal file
View File

@ -0,0 +1,34 @@
ray:
# Ray Job server address (head 容器内视角)
address: "http://127.0.0.1:8265"
# 共享根路径(容器内统一 /private对齐生产
shared_root: "/private"
# 强制 driver 落 workerhead 不跑训练)
entrypoint_num_cpus: 1
entrypoint_resources:
worker_node: 1
# 所有 job 通用 runtime env
runtime_env:
env_vars:
HF_ENDPOINT: "https://hf-mirror.com"
PYTHONUNBUFFERED: "1"
# 用户自定义代码目录(可被 PYTHONPATH 注入)
user_code_path: "/private/user/code"
service:
api:
host: "0.0.0.0"
port: 8080
auth:
token_env: "MVP_INTERNAL_TOKEN"
sqlite:
db_path: "/private/common/db/mvp.sqlite3"
scheduler:
tick_s: 5
retry_interval_s: 60
max_running_tasks: 1

View File

@ -3,16 +3,18 @@ version: "3.8"
services:
ray_head:
image: verlai/verl:sgl055.latest
container_name: mvp11-ray-head
container_name: argus-ray-head
command: sleep infinity
ports:
- "8265:8265"
- "8080:8080"
volumes:
- ../verl:/workspace/verl
- ../shared:/private
- .:/workspace/mvp/v1.1
- ../v2:/workspace/mvp/v2
# NOTE: this compose file is intended for the dev env layout like:
# /home2/argus/infra/mvp/{shared,verl,src/mvp}
# so from `src/mvp/` the shared/verl dirs are `../../shared` and `../../verl`.
- ../../verl:/workspace/verl
- ../../shared:/private
- .:/workspace/mvp
shm_size: "10g"
ulimits:
nofile:
@ -22,7 +24,7 @@ services:
- SYS_ADMIN
- SYS_PTRACE
networks:
- mvp11-ray-net
- argus-ray-net
environment:
HF_HOME: "/private/hf"
HUGGINGFACE_HUB_CACHE: "/private/hf/hub"
@ -32,12 +34,12 @@ services:
ray_worker_0:
image: verlai/verl:sgl055.latest
container_name: mvp11-ray-worker-0
container_name: argus-ray-worker-0
command: sleep infinity
volumes:
- ../verl:/workspace/verl
- ../shared:/private
- .:/workspace/mvp/v1.1
- ../../verl:/workspace/verl
- ../../shared:/private
- .:/workspace/mvp
shm_size: "10g"
ulimits:
nofile:
@ -47,7 +49,7 @@ services:
- SYS_ADMIN
- SYS_PTRACE
networks:
- mvp11-ray-net
- argus-ray-net
runtime: nvidia
environment:
NVIDIA_VISIBLE_DEVICES: "0,1,2,3"
@ -60,12 +62,12 @@ services:
ray_worker_1:
image: verlai/verl:sgl055.latest
container_name: mvp11-ray-worker-1
container_name: argus-ray-worker-1
command: sleep infinity
volumes:
- ../verl:/workspace/verl
- ../shared:/private
- .:/workspace/mvp/v1.1
- ../../verl:/workspace/verl
- ../../shared:/private
- .:/workspace/mvp
shm_size: "10g"
ulimits:
nofile:
@ -75,7 +77,7 @@ services:
- SYS_ADMIN
- SYS_PTRACE
networks:
- mvp11-ray-net
- argus-ray-net
runtime: nvidia
environment:
NVIDIA_VISIBLE_DEVICES: "4,5,6,7"
@ -87,5 +89,5 @@ services:
PYTHONUNBUFFERED: "1"
networks:
mvp11-ray-net:
argus-ray-net:
driver: bridge

View File

@ -0,0 +1,2 @@
__all__ = []

View File

@ -8,22 +8,25 @@ import sys
def _ensure_import_path() -> None:
# Allow `python3 /workspace/.../py/run.py` to import `mvp_v11.*`
# Allow running as a file path (not just `python3 -m argus.cli.run`).
# Ensure `/workspace/mvp/py` is on sys.path so `import argus.*` works.
here = os.path.dirname(os.path.abspath(__file__))
if here not in sys.path:
sys.path.insert(0, here)
py_root = os.path.dirname(os.path.dirname(here)) # .../py
if py_root not in sys.path:
sys.path.insert(0, py_root)
def main() -> int:
_ensure_import_path()
from mvp_v11.models import JobSpec, RayConfig
from mvp_v11.ray_job_tool import RayJobTool
from mvp_v11.yaml_io import load_yaml
from argus.ray.models import JobSpec, RayConfig
from argus.ray.ray_job_tool import RayJobTool
from argus.ray.yaml_io import load_yaml
parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True, help="Ray base config yaml")
parser.add_argument("--jobspec", help="Training jobspec yaml (required for submit)")
parser.add_argument("--config", required=True, help="MVP config yaml (ray/service)")
parser.add_argument("--taskspec", help="Training TaskSpec yaml (required for submit)")
parser.add_argument("--jobspec", dest="taskspec", help=argparse.SUPPRESS) # backward compatible alias
parser.add_argument("--action", required=True, choices=["submit", "status", "stop", "logs", "list"])
parser.add_argument("--submission-id", help="For status/stop/logs")
parser.add_argument("--no-wait", action="store_true", help="Submit and return immediately")
@ -34,9 +37,9 @@ def main() -> int:
tool = RayJobTool(cfg)
if args.action == "submit":
if not args.jobspec:
raise SystemExit("--jobspec is required for submit")
spec = JobSpec.from_dict(load_yaml(args.jobspec))
if not args.taskspec:
raise SystemExit("--taskspec is required for submit")
spec = JobSpec.from_dict(load_yaml(args.taskspec))
submitted = tool.submit(spec, no_wait=args.no_wait)
print(submitted)
return 0

View File

@ -0,0 +1,2 @@
__all__ = []

View File

@ -21,6 +21,11 @@ class RayConfig:
@staticmethod
def from_dict(d: dict[str, Any]) -> "RayConfig":
# New format: root.ray
# Backward compatible: root (flat)
if isinstance(d.get("ray"), dict):
d = d["ray"] # type: ignore[assignment]
runtime_env = d.get("runtime_env") or {}
env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {}
if not isinstance(env_vars, dict):

View File

@ -53,9 +53,9 @@ class RayJobTool:
env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers")
env_vars.setdefault("PYTHONUNBUFFERED", "1")
# Tool code path must be importable on workers (compose mounts v1.1 into all containers).
# Tool code path must be importable on workers (compose mounts `/workspace/mvp` into all containers).
# Place it before verl code to avoid interfering with verl import priority.
tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py")
tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/py")
user_code_path = self.cfg.user_code_path
code_path = spec.code_path
@ -81,7 +81,7 @@ class RayJobTool:
entrypoint_argv = [
"python3",
"-m",
"mvp_v11.driver_entrypoint",
"argus.ray.driver_entrypoint",
"--job-dir",
job_dir,
*built.argv,

View File

@ -0,0 +1,2 @@
__all__ = []

View File

@ -7,11 +7,11 @@ from typing import Any
import yaml
from fastapi import FastAPI, HTTPException, Request, Response
from mvp_v11.models import JobSpec, RayConfig
from argus.core.ids import new_task_id
from argus.ray.models import JobSpec, RayConfig
from .config import V2Config
from .db import Db
from .ids import new_task_id
from .scheduler import Scheduler

View File

@ -36,20 +36,29 @@ class V2Config:
@staticmethod
def from_root_dict(root: dict[str, Any]) -> "V2Config":
v2 = root.get("v2") or {}
if not isinstance(v2, dict):
raise ValueError("config.v2 must be a mapping")
# New format: root.service
# Backward compatible: root.v2
service = root.get("service")
if service is None:
service = root.get("v2") or {}
if not isinstance(service, dict):
raise ValueError("config.service must be a mapping")
api = v2.get("api") or {}
auth = v2.get("auth") or {}
sqlite = v2.get("sqlite") or {}
scheduler = v2.get("scheduler") or {}
api = service.get("api") or {}
auth = service.get("auth") or {}
sqlite = service.get("sqlite") or {}
scheduler = service.get("scheduler") or {}
if not isinstance(api, dict) or not isinstance(auth, dict) or not isinstance(sqlite, dict) or not isinstance(scheduler, dict):
raise ValueError("config.v2.{api,auth,sqlite,scheduler} must be mappings")
raise ValueError("config.service.{api,auth,sqlite,scheduler} must be mappings")
shared_root = str(root.get("shared_root") or "/private")
default_db_path = f"{shared_root}/common/db/mvp_v2.sqlite3"
ray = root.get("ray")
if isinstance(ray, dict) and ray.get("shared_root"):
shared_root = str(ray.get("shared_root"))
else:
shared_root = str(root.get("shared_root") or "/private")
default_db_path = f"{shared_root}/common/db/mvp.sqlite3"
db_path = str(sqlite.get("db_path") or default_db_path)
return V2Config(
@ -65,4 +74,3 @@ class V2Config:
max_running_tasks=int(scheduler.get("max_running_tasks") or 1),
),
)

View File

@ -8,12 +8,12 @@ from typing import Any
import yaml
from mvp_v11.models import JobSpec, RayConfig
from mvp_v11.ray_job_tool import RayJobTool
from argus.core.ids import attempt_submission_id
from argus.ray.models import JobSpec, RayConfig
from argus.ray.ray_job_tool import RayJobTool
from .config import V2Config
from .db import Db
from .ids import attempt_submission_id
from .ray_resources import ensure_ray_connected, get_cluster_available

View File

@ -5,13 +5,13 @@ import argparse
import uvicorn
from mvp_v2.app import create_app
from mvp_v2.config import V2Config
from argus.service.app import create_app
from argus.service.config import V2Config
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True, help="Path to v1.1 RayConfig YAML (extended with v2:)")
parser.add_argument("--config", required=True, help="Path to MVP config YAML (ray:/service:)")
args = parser.parse_args()
# Load app and read v2.api host/port from config.
@ -30,4 +30,3 @@ def main() -> int:
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -27,16 +27,15 @@ else
nvidia-smi -L || true
fi
echo "[host] ensure shared dirs exist under ../shared"
mkdir -p "${ROOT_DIR}/../shared"/{datasets,hf,jobs,outputs,ray,common,user}
mkdir -p "${ROOT_DIR}/../shared/common"/{code,datasets,models}
mkdir -p "${ROOT_DIR}/../shared/user"/{code}
echo "[host] ensure shared dirs exist under ../../shared"
mkdir -p "${ROOT_DIR}/../../shared"/{datasets,hf,jobs,outputs,ray,common,user}
mkdir -p "${ROOT_DIR}/../../shared/common"/{code,datasets,models}
mkdir -p "${ROOT_DIR}/../../shared/user"/{code}
echo "[host] ensure verl repo exists under ../verl (required by prepare scripts)"
if [[ ! -d "${ROOT_DIR}/../verl" ]]; then
echo "missing ../verl. On remote, ensure /home2/argus/infra/mvp/verl exists (git clone volcengine/verl)." >&2
echo "[host] ensure verl repo exists under ../../verl (required by prepare scripts)"
if [[ ! -d "${ROOT_DIR}/../../verl" ]]; then
echo "missing ../../verl. On remote, ensure /home2/argus/infra/mvp/verl exists (git clone volcengine/verl)." >&2
exit 1
fi
echo "ok"

View File

@ -9,8 +9,8 @@ if [[ "${SKIP_CLEANUP_V1:-0}" != "1" ]]; then
"${SCRIPT_DIR}/03_cleanup_v1_legacy.sh" || true
fi
echo "[host] docker compose up -d (v1.1)"
echo "[host] docker compose up -d (mvp)"
dc up -d
echo "[host] containers:"
docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' | (head -n 1 && grep -E "mvp11-ray-") || true
docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' | (head -n 1 && grep -E "argus-ray-") || true

View File

@ -5,8 +5,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
echo "[host] docker compose down (v1.1)"
echo "[host] docker compose down (mvp)"
dc down -v || true
echo "[host] done"

View File

@ -5,7 +5,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
VERL_DIR="${ROOT_DIR}/../verl"
VERL_DIR="${ROOT_DIR}/../../verl"
echo "[host] ensure verl repo exists at: ${VERL_DIR}"
if [[ -d "${VERL_DIR}/.git" ]]; then
@ -20,4 +20,3 @@ fi
echo "cloning volcengine/verl -> ${VERL_DIR}"
git clone https://github.com/volcengine/verl.git "${VERL_DIR}"

View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
# Install API/CLI dependencies inside the head container (best-effort).
# Assumes containers are already up and `/workspace/mvp/` is mounted.
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true"
dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/py/requirements.txt"

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
echo "[head] install python deps for CLI/API"
dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/py/requirements.txt"

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CONFIG_PATH="${1:-/workspace/mvp/configs/dev.yaml}"
TASKSPEC_PATH="${2:-}"
if [[ -z "${TASKSPEC_PATH}" ]]; then
echo "usage: $0 <config_yaml_in_container> <taskspec_yaml_in_container>" >&2
echo "example: $0 /workspace/mvp/configs/dev.yaml /workspace/mvp/taskspecs/ppo.yaml" >&2
exit 1
fi
echo "[head] submit via Ray Python SDK"
dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config '${CONFIG_PATH}' --taskspec '${TASKSPEC_PATH}' --action submit --no-wait"

View File

28
src/mvp/scripts/60_start_api.sh Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/configs/dev.yaml}"
LOG_PATH="${LOG_PATH:-/private/common/logs/argus_mvp_api.log}"
PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}"
echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\""
# Note: requires /workspace/mvp/py to be present in the container (mounted).
# Escape $ so that the command substitution happens in the container, not on the host.
dexec "${HEAD_CONTAINER}" bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi"
if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then
echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2
exit 1
fi
docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'"
echo "[host] started; pid stored in ${PID_PATH} (container path)"
echo "[host] logs: ${LOG_PATH} (container path)"

12
src/mvp/scripts/61_stop_api.sh Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}"
echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})"
dexec "${HEAD_CONTAINER}" bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'"

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}"
dexec "${HEAD_CONTAINER}" bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi"

8
src/mvp/v1.1/scripts/lib.sh → src/mvp/scripts/lib.sh Normal file → Executable file
View File

@ -6,9 +6,10 @@ ROOT_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)"
COMPOSE_FILE="${ROOT_DIR}/docker-compose.yaml"
HEAD_CONTAINER="mvp11-ray-head"
WORKER0_CONTAINER="mvp11-ray-worker-0"
WORKER1_CONTAINER="mvp11-ray-worker-1"
# Container names (refactor)
HEAD_CONTAINER="${HEAD_CONTAINER:-argus-ray-head}"
WORKER0_CONTAINER="${WORKER0_CONTAINER:-argus-ray-worker-0}"
WORKER1_CONTAINER="${WORKER1_CONTAINER:-argus-ray-worker-1}"
SHARED_ROOT="${SHARED_ROOT:-/private}"
RAY_DASHBOARD_ADDR="${RAY_DASHBOARD_ADDR:-http://127.0.0.1:8265}"
@ -49,4 +50,3 @@ container_ip() {
timestamp() {
date +"%Y%m%d_%H%M%S"
}

120
src/mvp/scripts/run_all_api.sh Executable file
View File

@ -0,0 +1,120 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
# Run the end-to-end flow using the API service (argus.service).
# This script restarts the Ray cluster, prepares model/data, starts the API,
# then submits PPO/GRPO/SFT via HTTP API and monitors the queue.
API_ADDR="${API_ADDR:-http://127.0.0.1:8080}"
TOKEN="${MVP_INTERNAL_TOKEN:-}"
DB_IN_CONTAINER="${DB_IN_CONTAINER:-/private/common/db/mvp.sqlite3}"
if [[ -z "${TOKEN}" ]]; then
echo "ERROR: MVP_INTERNAL_TOKEN must be set in the host env for run_all_api.sh" >&2
exit 1
fi
api_curl() {
curl -sS -H "Authorization: Bearer ${TOKEN}" "$@"
}
api_wait_ready() {
local tries="${1:-60}"
for i in $(seq 1 "${tries}"); do
if curl -sS -m 2 "${API_ADDR}/docs" >/dev/null 2>&1; then
echo "[host] api_ready: ${API_ADDR}"
return 0
fi
echo "[host] waiting api... (${i}/${tries})"
sleep 2
done
echo "ERROR: api not ready: ${API_ADDR}" >&2
return 1
}
submit_taskspec() {
local taskspec_path="$1"
echo "[host] submit via API: ${taskspec_path}" >&2
local resp
resp="$(api_curl -H "Content-Type: application/yaml" --data-binary @"${taskspec_path}" "${API_ADDR}/api/v2/tasks")"
echo "[host] submit_resp: ${resp}" >&2
printf '%s' "${resp}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["task_id"])'
}
print_queue() {
echo "[host] queue:"
api_curl "${API_ADDR}/api/v2/queue" || true
echo
}
wait_task() {
local task_id="$1"
while true; do
local body state
body="$(api_curl "${API_ADDR}/api/v2/tasks/${task_id}")"
state="$(printf '%s' "${body}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["state"])')"
echo "[host] task ${task_id}: ${state}"
if [[ "${state}" == "SUCCEEDED" ]]; then
return 0
fi
if [[ "${state}" == "FAILED" || "${state}" == "CANCELED" ]]; then
echo "[host] terminal=${state}; tail logs (best-effort):" >&2
api_curl "${API_ADDR}/api/v2/tasks/${task_id}/logs?tail=200" >&2 || true
return 1
fi
print_queue
sleep 10
done
}
echo "[host] ===== run_all_api.sh begin ====="
"${SCRIPT_DIR}/00_prereq_check.sh"
"${SCRIPT_DIR}/03_cleanup_v1_legacy.sh"
"${SCRIPT_DIR}/05_ensure_verl_repo.sh"
echo "[host] (re)create containers"
"${SCRIPT_DIR}/01_up.sh"
echo "[host] restart ray cluster"
"${SCRIPT_DIR}/20_start_head.sh"
"${SCRIPT_DIR}/21_start_workers.sh"
echo "[host] prepare data/model/code snapshot"
"${SCRIPT_DIR}/30_prepare_data_and_model.sh"
echo "[host] install api deps in head container"
"${SCRIPT_DIR}/12_install_api_deps.sh"
echo "[host] stop api (best-effort)"
"${SCRIPT_DIR}/61_stop_api.sh" || true
echo "[host] reset api sqlite db in container (best-effort): ${DB_IN_CONTAINER}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${DB_IN_CONTAINER}")\"; rm -f '${DB_IN_CONTAINER}' '${DB_IN_CONTAINER}-wal' '${DB_IN_CONTAINER}-shm' || true"
echo "[host] start api"
MVP_INTERNAL_TOKEN="${TOKEN}" "${SCRIPT_DIR}/60_start_api.sh"
api_wait_ready 60
print_queue
PPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/ppo.yaml")"
GRPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/grpo.yaml")"
SFT_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/sft.yaml")"
echo "[host] submitted task ids:"
echo " ppo=${PPO_TASK_ID}"
echo " grpo=${GRPO_TASK_ID}"
echo " sft=${SFT_TASK_ID}"
echo "[host] wait for tasks (in submission order)"
wait_task "${PPO_TASK_ID}"
wait_task "${GRPO_TASK_ID}"
wait_task "${SFT_TASK_ID}"
echo "[host] ===== run_all_api.sh done ====="

View File

@ -5,13 +5,16 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
# Run the end-to-end flow using the CLI submitter (argus.cli).
# This script restarts the Ray cluster and submits PPO/GRPO/SFT sequentially.
submit_and_wait() {
local jobspec_in_container="$1"
local taskspec_in_container="$1"
local sid
local out
echo "[host] submit via SDK: ${jobspec_in_container}"
out="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --jobspec '${jobspec_in_container}' --action submit --no-wait" | tr -d '\r')"
echo "[host] submit via SDK: ${taskspec_in_container}"
out="$(dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --taskspec '${taskspec_in_container}' --action submit --no-wait" | tr -d '\r')"
sid="$(printf '%s\n' "${out}" | tail -n 1)"
if [[ -z "${sid}" ]]; then
@ -23,7 +26,7 @@ submit_and_wait() {
echo "[host] submitted: ${sid}"
while true; do
st="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)"
st="$(dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)"
echo "[host] status: ${sid} -> ${st}"
case "${st}" in
*SUCCEEDED*)
@ -32,7 +35,7 @@ submit_and_wait() {
*FAILED*|*STOPPED*)
echo "[host] job failed: ${sid} (${st})" >&2
echo "[host] last logs:" >&2
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action logs --submission-id '${sid}' --tail 200" >&2 || true
dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --action logs --submission-id '${sid}' --tail 200" >&2 || true
return 1
;;
*)
@ -50,7 +53,7 @@ submit_and_wait() {
"${SCRIPT_DIR}/21_start_workers.sh"
"${SCRIPT_DIR}/30_prepare_data_and_model.sh"
"${SCRIPT_DIR}/12_install_py_deps.sh"
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/ppo.yaml
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/grpo.yaml
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/sft.yaml
submit_and_wait /workspace/mvp/taskspecs/ppo.yaml
submit_and_wait /workspace/mvp/taskspecs/grpo.yaml
submit_and_wait /workspace/mvp/taskspecs/sft.yaml
"${SCRIPT_DIR}/50_status.sh"

View File

@ -0,0 +1,10 @@
# TaskSpecs
这里的 `*.yaml`**TaskSpec**(训练任务语义参数),用于描述要跑的 workloadPPO/GRPO/SFT以及数据/模型路径、训练步数、分布式规模等。
注意区分:
- **TaskSpec**:训练语义(本目录)
- **Ray Job**Ray Jobs 的 submission由 submitter 将 TaskSpec 转换为 Ray Job entrypoint + runtime_env 等)
字段保持与原 v1.1 `jobspecs/` 一致(迁移期不做字段改名/规范化,以降低风险)。

View File

@ -1,65 +0,0 @@
# MVP v1.1GRPO + SFT on Ray运行说明
本目录是一套**独立可运行**的 v1.1 交付:使用 1 个 Ray head不跑训练+ 2 个 Ray worker各 4 GPU在同一宿主机通过 `docker exec` 协调容器,并通过 **head 上的 `ray job submit`** 提交作业,同时强制 driver 落到 worker。
> 远程 dev 环境推荐目录布局:
>
> - `/home2/argus/infra/mvp/`
> - `shared/`持久化datasets/hf/jobs/...
> - `verl/`(代码仓库,用于 prepare / snapshot
> - `v1.1/`本目录内容compose + scripts
---
## 快速开始(远程机 argus@h1
`/home2/argus/infra/mvp/v1.1/` 下执行:
```bash
./scripts/run_all.sh
```
说明:
- `run_all.sh` 会按顺序提交 `ppo -> grpo -> sft`,并等待每个 job 结束后再提交下一个(避免 8 卡集群并发提交导致 “available GPUs 0” 直接失败)。
等价的“分步执行”:
```bash
./scripts/00_prereq_check.sh
./scripts/03_cleanup_v1_legacy.sh
./scripts/05_ensure_verl_repo.sh
./scripts/01_up.sh
./scripts/20_start_head.sh
./scripts/21_start_workers.sh
./scripts/30_prepare_data_and_model.sh
./scripts/12_install_py_deps.sh
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml # no-wait
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/grpo.yaml # no-wait
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/sft.yaml # no-wait
./scripts/50_status.sh
```
停止并清理:
```bash
./scripts/02_down.sh
```
---
## 关键约束(必须满足)
- **必须通过 head 执行 `ray job submit`** 提交任务(满足“从 head 提交”要求)。
- **head 不跑训练**head 以 `--num-cpus=0 --num-gpus=0` 启动worker 具备自定义资源 `worker_node`;提交时 `--entrypoint-resources='{"worker_node": 1}'` 强制 driver 落 worker。
- **共享路径统一为 `/private`(容器内)**compose 将宿主机 `../shared` 挂载到容器内 `/private`,对齐生产环境。
- **job 级别 code_path**:训练 JobSpec 中的 `code_path` 指向 `/private/common/code/verl/verl_repo`(由 `scripts/30_prepare_data_and_model.sh` 准备)。
---
## 共享目录(容器内 /private
- `/private/datasets/`数据PPO 的 gsm8k RL parquet、SFT parquet
- `/private/hf/`HF 缓存(模型持久化,避免重复下载)
- `/private/jobs/<submission_id>/`:每个 Ray Job 的输出目录logs/config/debug/checkpoints
- `/private/common/`:共享区(模型/数据/代码快照)
- `/private/user/`:用户自定义代码(例如 reward_fn

View File

@ -1,38 +0,0 @@
# Ray 基础配置dev 环境 / head 容器内视角)
#
# 说明:
# - v1.1 的 SDK submitter 会读取本文件作为 RayConfig。
# - v2.0 的 API 服务/调度器也复用本文件作为“基础 RayConfig”并在其上扩展 v2 专属配置项(见 v2:)。
address: "http://127.0.0.1:8265"
# 容器内共享根路径(对齐生产 /private
shared_root: "/private"
# 强制 driver 落 workerhead 不跑训练)
entrypoint_num_cpus: 1
entrypoint_resources:
worker_node: 1
# 运行时环境变量(所有 job 通用)
runtime_env:
env_vars:
HF_ENDPOINT: "https://hf-mirror.com"
PYTHONUNBUFFERED: "1"
# 用户自定义代码目录(可被 PYTHONPATH 注入)
user_code_path: "/private/user/code"
# v2.0 服务层配置v1.1 submitter 会忽略这些字段v2.0 服务会读取)
v2:
api:
host: "0.0.0.0"
port: 8080
auth:
# 内部 token 建议通过环境变量提供,避免写入配置文件
token_env: "MVP_INTERNAL_TOKEN"
sqlite:
db_path: "/private/common/db/mvp_v2.sqlite3"
scheduler:
tick_s: 5
retry_interval_s: 60
max_running_tasks: 1

View File

@ -1,121 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
def _require(d: dict[str, Any], key: str) -> Any:
if key not in d or d[key] in (None, ""):
raise ValueError(f"missing required field: {key}")
return d[key]
@dataclass(frozen=True)
class RayConfig:
address: str
shared_root: str
entrypoint_num_cpus: float
entrypoint_resources: dict[str, float]
runtime_env_env_vars: dict[str, str]
user_code_path: str
@staticmethod
def from_dict(d: dict[str, Any]) -> "RayConfig":
runtime_env = d.get("runtime_env") or {}
env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {}
if not isinstance(env_vars, dict):
raise ValueError("runtime_env.env_vars must be a mapping")
entrypoint_resources = d.get("entrypoint_resources") or {}
if not isinstance(entrypoint_resources, dict):
raise ValueError("entrypoint_resources must be a mapping")
return RayConfig(
address=str(_require(d, "address")),
shared_root=str(_require(d, "shared_root")),
entrypoint_num_cpus=float(d.get("entrypoint_num_cpus", 1)),
entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()},
runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()},
user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")),
)
def to_public_dict(self) -> dict[str, Any]:
return {
"address": self.address,
"shared_root": self.shared_root,
"entrypoint_num_cpus": self.entrypoint_num_cpus,
"entrypoint_resources": self.entrypoint_resources,
"runtime_env": {"env_vars": self.runtime_env_env_vars},
"user_code_path": self.user_code_path,
}
@dataclass(frozen=True)
class JobSpec:
workload: str # ppo|grpo|sft
submission_id: str | None
code_path: str
model_id: str
train_file: str
val_file: str | None
nnodes: int
n_gpus_per_node: int
total_epochs: int
total_training_steps: int
save_freq: int
test_freq: int | None
trainer_device: str | None # only for sft (driver-side device)
@staticmethod
def from_dict(d: dict[str, Any]) -> "JobSpec":
workload = str(_require(d, "workload"))
if workload not in ("ppo", "grpo", "sft"):
raise ValueError(f"unsupported workload: {workload}")
val_file = d.get("val_file", None)
if val_file in ("", "null"):
val_file = None
test_freq = d.get("test_freq", None)
if test_freq in ("", "null"):
test_freq = None
return JobSpec(
workload=workload,
submission_id=(str(d["submission_id"]) if d.get("submission_id") else None),
code_path=str(_require(d, "code_path")),
model_id=str(_require(d, "model_id")),
train_file=str(_require(d, "train_file")),
val_file=(str(val_file) if val_file is not None else None),
nnodes=int(d.get("nnodes", 2)),
n_gpus_per_node=int(d.get("n_gpus_per_node", 4)),
total_epochs=int(d.get("total_epochs", 1)),
total_training_steps=int(d.get("total_training_steps", 10)),
save_freq=int(d.get("save_freq", 10)),
test_freq=(int(test_freq) if test_freq is not None else None),
trainer_device=(str(d.get("trainer_device")) if d.get("trainer_device") else None),
)
def to_public_dict(self) -> dict[str, Any]:
out: dict[str, Any] = {
"workload": self.workload,
"submission_id": self.submission_id or "",
"code_path": self.code_path,
"model_id": self.model_id,
"train_file": self.train_file,
"val_file": self.val_file,
"nnodes": self.nnodes,
"n_gpus_per_node": self.n_gpus_per_node,
"total_epochs": self.total_epochs,
"total_training_steps": self.total_training_steps,
"save_freq": self.save_freq,
"test_freq": self.test_freq,
}
if self.workload == "sft":
out["trainer_device"] = self.trainer_device or "cpu"
return out

View File

@ -1,2 +0,0 @@
PyYAML>=6.0.1

View File

@ -1,10 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
echo "[head] install python deps for v1.1 SDK submitter (PyYAML)"
dexec "${HEAD_CONTAINER}" bash -lc "pip install -r /workspace/mvp/v1.1/py/requirements.txt"

View File

@ -1,19 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CONFIG_PATH="${1:-/workspace/mvp/v1.1/py/configs/dev.yaml}"
JOBSPEC_PATH="${2:-}"
if [[ -z "${JOBSPEC_PATH}" ]]; then
echo "usage: $0 <ray_config_yaml_in_container> <jobspec_yaml_in_container>" >&2
echo "example: $0 /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml" >&2
exit 1
fi
echo "[head] submit via Ray Python SDK"
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --jobspec '${JOBSPEC_PATH}' --action submit --no-wait"

View File

@ -1,104 +0,0 @@
# MVP v2.0(服务化入口)
v2.0 在 v1.1Ray Jobs SDK 提交链路)基础上新增一个**服务层**
- HTTP API 提交任务PPO/GRPO/SFT
- 服务侧队列 + gang 资源判断
- 识别 `verl` fail-fast 的资源不足失败并自动重试
- SQLite 持久化队列/状态/attemptNFS`/private`
设计文档见:
- `specs/mvp/v2.0/v2_plan.md`
- `specs/mvp/v2.0/v2_api.md`
## 快速开始dev 示例)
约定:
- Ray 容器仍由 v1.1 的 `docker-compose.yaml` 启动head+2 workers
- v2 代码在宿主机:`/home2/argus/infra/mvp/v2/`(容器内挂载 `/workspace/mvp/v2/`
- v2 配置复用 v1.1`/workspace/mvp/v1.1/py/configs/dev.yaml`(扩展了 `v2:` 段)
宿主机执行:
```bash
export MVP_INTERNAL_TOKEN=... # 内部 token
cd /home2/argus/infra/mvp/v2/scripts
./12_install_v2_deps.sh
./20_start_api.sh
./22_status_api.sh
```
API 测试(宿主机):
```bash
curl -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" http://127.0.0.1:8080/api/v2/queue
```
> 进程日志与 pid容器内路径默认在 `/private/common/logs/``/private/common/run/`
## 提交/查询/停止任务
约定:
- API 地址(宿主机视角):`http://127.0.0.1:8080`
- 鉴权:`Authorization: Bearer ${MVP_INTERNAL_TOKEN}`
- 请求体:**raw YAML**JobSpec格式与 v1.1 jobspec 一致)
### 1) 提交任务POST /api/v2/tasks
准备一个 jobspec示例PPO
```yaml
workload: "ppo"
submission_id: "" # v2 会忽略/覆盖,自动生成 task_id 并派生 ray_submission_id
code_path: "/private/common/code/verl/verl_repo"
model_id: "Qwen/Qwen2.5-0.5B-Instruct"
train_file: "/private/datasets/gsm8k/train.parquet"
val_file: "/private/datasets/gsm8k/test.parquet"
nnodes: 2
n_gpus_per_node: 4
total_epochs: 1
total_training_steps: 10
save_freq: 10
test_freq: -1
trainer_device: null
```
提交:
```bash
curl -sS \
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
-H "Content-Type: application/yaml" \
--data-binary @jobspec.yaml \
http://127.0.0.1:8080/api/v2/tasks
```
返回示例:
```json
{"task_id":"mvp2-ppo-20251223-082813-6426","state":"QUEUED"}
```
### 2) 查询任务GET /api/v2/tasks/{task_id}
```bash
curl -sS \
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
http://127.0.0.1:8080/api/v2/tasks/<task_id> | python3 -m json.tool
```
可选:
- 查看 attempts`GET /api/v2/tasks/{task_id}/attempts`
- 拉取日志latest attempt`GET /api/v2/tasks/{task_id}/logs?tail=2000`
- 查看队列:`GET /api/v2/queue`
### 3) 停止/取消任务POST /api/v2/tasks/{task_id}:cancel
```bash
curl -sS -X POST \
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
http://127.0.0.1:8080/api/v2/tasks/<task_id>:cancel
```
说明:
- 若任务已提交到 Ray`SUBMITTED/RUNNING`),服务会调用 Ray Jobs SDK `stop_job(ray_submission_id)`
- 若任务仍在队列(`QUEUED/PENDING_RESOURCES`),服务直接标记 `CANCELED`(不会产生 attempt

View File

@ -1 +0,0 @@

View File

@ -1,96 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from .models import JobSpec
@dataclass(frozen=True)
class BuiltCommand:
argv: list[str]
def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> BuiltCommand:
"""
Returns argv for the actual training process (Hydra overrides preserved).
This argv is executed by a lightweight Python driver entrypoint.
"""
if spec.workload in ("ppo", "grpo"):
algo_overrides: list[str] = []
if spec.workload == "grpo":
algo_overrides.append("algorithm.adv_estimator=grpo")
test_freq = spec.test_freq if spec.test_freq is not None else -1
val_file = spec.val_file if spec.val_file is not None else "null"
argv = [
"python3",
"-m",
"verl.trainer.main_ppo",
f"data.train_files={spec.train_file}",
f"data.val_files={val_file}",
"data.train_batch_size=256",
"data.max_prompt_length=512",
"data.max_response_length=512",
f"actor_rollout_ref.model.path={spec.model_id}",
"actor_rollout_ref.actor.optim.lr=1e-6",
"actor_rollout_ref.actor.ppo_mini_batch_size=64",
"actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4",
"actor_rollout_ref.rollout.name=sglang",
"actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8",
"actor_rollout_ref.rollout.tensor_model_parallel_size=1",
"actor_rollout_ref.rollout.gpu_memory_utilization=0.4",
"actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4",
"critic.optim.lr=1e-5",
f"critic.model.path={spec.model_id}",
"critic.ppo_micro_batch_size_per_gpu=4",
"algorithm.kl_ctrl.kl_coef=0.001",
*algo_overrides,
"trainer.logger=console",
"trainer.val_before_train=False",
f"trainer.n_gpus_per_node={spec.n_gpus_per_node}",
f"trainer.nnodes={spec.nnodes}",
f"trainer.save_freq={spec.save_freq}",
f"trainer.test_freq={test_freq}",
f"trainer.total_epochs={spec.total_epochs}",
f"trainer.total_training_steps={spec.total_training_steps}",
"trainer.resume_mode=disable",
f"trainer.default_local_dir={job_dir}/checkpoints",
"+ray_kwargs.ray_init.address=auto",
f"hydra.run.dir={job_dir}/logs/hydra",
]
return BuiltCommand(argv=argv)
if spec.workload == "sft":
val_override = "null" if spec.val_file is None else spec.val_file
trainer_device = spec.trainer_device or "cpu"
argv = [
"python3",
"-m",
"verl.trainer.sft_trainer_ray",
f"model.path={spec.model_id}",
f"data.train_files={spec.train_file}",
f"data.val_files={val_override}",
"data.train_batch_size=64",
"data.micro_batch_size_per_gpu=1",
"data.max_token_len_per_gpu=2048",
"data.max_length=1024",
"trainer.logger=console",
"trainer.project_name=mvp11-sft",
f"trainer.experiment_name={submission_id}",
f"trainer.total_epochs={spec.total_epochs}",
f"trainer.total_training_steps={spec.total_training_steps}",
f"trainer.save_freq={spec.save_freq}",
"trainer.test_freq=-1",
"trainer.resume_mode=disable",
f"trainer.device={trainer_device}",
f"trainer.default_local_dir={job_dir}/checkpoints",
f"trainer.nnodes={spec.nnodes}",
f"trainer.n_gpus_per_node={spec.n_gpus_per_node}",
f"hydra.run.dir={job_dir}/logs/hydra",
]
return BuiltCommand(argv=argv)
raise ValueError(f"unsupported workload: {spec.workload}")

View File

@ -1,63 +0,0 @@
from __future__ import annotations
import argparse
import os
import shlex
import subprocess
import sys
from pathlib import Path
def _preflight() -> None:
print("MVP_PRECHECK_PYTHON:", sys.executable, flush=True)
print("MVP_PRECHECK_PYTHONPATH:", os.environ.get("PYTHONPATH"), flush=True)
print("MVP_PRECHECK_MVP_CODE_PATH:", os.environ.get("MVP_CODE_PATH"), flush=True)
try:
import verl # type: ignore
print("MVP_PRECHECK_VERL_FILE:", getattr(verl, "__file__", None), flush=True)
except Exception as e:
print("MVP_PRECHECK_VERL_IMPORT_ERROR:", repr(e), flush=True)
try:
import mvp_marker # type: ignore
print("MVP_PRECHECK_MARKER:", getattr(mvp_marker, "MARKER", None), flush=True)
except Exception as e:
print("MVP_PRECHECK_MARKER_MISSING:", repr(e), flush=True)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--job-dir", required=True)
parser.add_argument("cmd", nargs=argparse.REMAINDER)
args = parser.parse_args()
job_dir = Path(args.job_dir)
job_dir.mkdir(parents=True, exist_ok=True)
_preflight()
if not args.cmd:
print("no command provided", file=sys.stderr)
return 2
# argparse includes the leading "--" if the caller uses it; strip it.
cmd = list(args.cmd)
if cmd and cmd[0] == "--":
cmd = cmd[1:]
if not cmd:
print("no command provided", file=sys.stderr)
return 2
# Execute training command as a subprocess so that logs are captured by Ray job logs.
cmd_str = " ".join(shlex.quote(x) for x in cmd)
print("MVP_DRIVER_EXEC:", cmd_str, flush=True)
proc = subprocess.run(cmd, check=False)
print("MVP_DRIVER_EXIT_CODE:", proc.returncode, flush=True)
return proc.returncode
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -1,171 +0,0 @@
from __future__ import annotations
import json
import os
import shlex
from datetime import datetime
from pathlib import Path
from typing import Any
import ray
from ray.job_submission import JobSubmissionClient
from .builders import build_training_argv
from .models import JobSpec, RayConfig
from .yaml_io import dump_yaml
def _ts() -> str:
return datetime.now().strftime("%Y%m%d_%H%M%S")
def _mkdir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def _write_text(p: Path, content: str) -> None:
_mkdir(p.parent)
p.write_text(content, encoding="utf-8")
def _write_json(p: Path, obj: Any) -> None:
_write_text(p, json.dumps(obj, indent=2, ensure_ascii=False) + "\n")
def _safe_basename(path: str) -> str:
return path.rstrip("/").split("/")[-1]
class RayJobTool:
def __init__(self, cfg: RayConfig):
self.cfg = cfg
self.client = JobSubmissionClient(cfg.address)
def _job_dir(self, submission_id: str) -> str:
return f"{self.cfg.shared_root}/jobs/{submission_id}"
def _runtime_env(self, spec: JobSpec) -> dict[str, Any]:
env_vars = dict(self.cfg.runtime_env_env_vars)
# Default HF cache
env_vars.setdefault("HF_HOME", f"{self.cfg.shared_root}/hf")
env_vars.setdefault("HUGGINGFACE_HUB_CACHE", f"{self.cfg.shared_root}/hf/hub")
env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers")
env_vars.setdefault("PYTHONUNBUFFERED", "1")
# Tool code path must be importable on workers (compose mounts v1.1 into all containers).
# Place it before verl code to avoid interfering with verl import priority.
tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py")
user_code_path = self.cfg.user_code_path
code_path = spec.code_path
existing = env_vars.get("PYTHONPATH", "")
prefix = f"{tool_code_path}:{code_path}:{user_code_path}"
env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix
# For debugging / log visibility
env_vars["MVP_CODE_PATH"] = code_path
# SFT: ensure ray.init() connects to the cluster
if spec.workload == "sft":
env_vars.setdefault("RAY_ADDRESS", "auto")
return {"env_vars": env_vars}
def submit(self, spec: JobSpec, no_wait: bool) -> str:
submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}"
job_dir = self._job_dir(submission_id)
built = build_training_argv(spec, submission_id=submission_id, job_dir=job_dir)
entrypoint_argv = [
"python3",
"-m",
"mvp_v11.driver_entrypoint",
"--job-dir",
job_dir,
*built.argv,
]
entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv)
runtime_env = self._runtime_env(spec)
# Prepare job artifacts directory
job_root = Path(job_dir)
_mkdir(job_root / "config")
_mkdir(job_root / "logs")
_mkdir(job_root / "debug")
_mkdir(job_root / "checkpoints")
_write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict()))
_write_text(job_root / "config" / "jobspec.yaml", dump_yaml(spec.to_public_dict()))
_write_json(job_root / "config" / "submit_payload.json", {
"submission_id": submission_id,
"address": self.cfg.address,
"entrypoint": entrypoint,
"entrypoint_num_cpus": self.cfg.entrypoint_num_cpus,
"entrypoint_resources": self.cfg.entrypoint_resources,
"runtime_env": runtime_env,
})
# Pre-submit debug snapshot (ray cluster resources via ray.init)
try:
ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False)
_write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources())
_write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources())
except Exception as e:
_write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n")
try:
submitted = self.client.submit_job(
entrypoint=entrypoint,
submission_id=submission_id,
runtime_env=runtime_env,
entrypoint_num_cpus=self.cfg.entrypoint_num_cpus,
entrypoint_resources=self.cfg.entrypoint_resources,
)
except Exception as e:
_write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n")
raise
_write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n")
# Post-submit debug snapshot via SDK
try:
jobs = self.client.list_jobs()
_write_text(
job_root / "debug" / "ray_job_list_post.json",
json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n",
)
except Exception as e:
_write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n")
if not no_wait:
# caller can separately wait; keep submit non-blocking by default in scripts
pass
return submitted
def status(self, submission_id: str) -> str:
return str(self.client.get_job_status(submission_id))
def stop(self, submission_id: str) -> bool:
return bool(self.client.stop_job(submission_id))
def logs(self, submission_id: str) -> str:
return self.client.get_job_logs(submission_id)
def list(self) -> list[dict[str, Any]]:
return [_job_details_to_dict(j) for j in self.client.list_jobs()]
def _job_details_to_dict(obj: Any) -> dict[str, Any]:
# Ray uses pydantic models internally, but depending on bundled pydantic version
# we might get `.model_dump()` (v2) or `.dict()` (v1).
if hasattr(obj, "model_dump"):
return obj.model_dump() # type: ignore[no-any-return]
if hasattr(obj, "dict"):
return obj.dict() # type: ignore[no-any-return]
if hasattr(obj, "__dict__"):
return dict(obj.__dict__)
return {"repr": repr(obj)}

View File

@ -1,21 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
def load_yaml(path: str) -> dict[str, Any]:
p = Path(path)
data = yaml.safe_load(p.read_text(encoding="utf-8"))
if data is None:
return {}
if not isinstance(data, dict):
raise ValueError(f"yaml root must be a mapping: {path}")
return data
def dump_yaml(data: dict[str, Any]) -> str:
return yaml.safe_dump(data, sort_keys=False, allow_unicode=True)

View File

@ -1,10 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# Install v2.0 API dependencies inside the head container (best-effort).
# Assumes v1.1 containers are already up and v2.0 code is mounted/available.
HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}"
docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true"
docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/v2/py/requirements.txt"

View File

@ -1,28 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/v1.1/py/configs/dev.yaml}"
LOG_PATH="${LOG_PATH:-/private/common/logs/mvp_v2_api.log}"
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}"
dexec bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\""
# Note: requires /workspace/mvp/v2.0/py to be present in the container (mounted or copied).
# Escape $ so that the command substitution happens in the container, not on the host.
dexec bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi"
if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then
echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2
exit 1
fi
docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/v2/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'"
echo "[host] started; pid stored in ${PID_PATH} (container path)"
echo "[host] logs: ${LOG_PATH} (container path)"

View File

@ -1,12 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})"
dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'"

View File

@ -1,10 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi"

View File

@ -1,12 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# v2.0 scripts are intended to run on the host and control the existing Ray containers
# (same topology as v1.1). Adjust container names via env vars if needed.
HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}"
dexec() {
docker exec -i "${HEAD_CONTAINER}" "$@"
}