diff --git a/specs/mvp/refactor/code_refactor.md b/specs/mvp/refactor/code_refactor.md new file mode 100644 index 0000000..58ea4f6 --- /dev/null +++ b/specs/mvp/refactor/code_refactor.md @@ -0,0 +1,301 @@ +# MVP 代码结构重构方案(按功能模块划分) + +背景:当前 `src/mvp/` 下以 `v1.1/`、`v2.0/` 版本目录来组织代码。实际上 **v2.0 是在 v1.1 的 Ray Jobs SDK 提交链路基础上扩展了服务层**,并且为了让 v2.0 工作又对 v1.1 的 `docker-compose.yaml`、`dev.yaml` 做了修改(挂载 v2、开放 8080、增加 `v2:` 配置段)。因此“按版本分目录”会让依赖关系变得不清晰(谁是库、谁是应用、哪些配置是共享的)。 + +本方案目标:把 `src/mvp/` 重构为“按功能模块”划分(ray 提交核心库 / service 服务层 / cli 工具 / TaskSpecs / configs / scripts),并给出迁移后的验证与执行方案。 + +> 本文仅给出设计与迁移/验证方案,不直接改代码(待确认后再实施)。 + +--- + +## 1. 现状梳理(问题点) + +### 1.1 代码重复与耦合 + +- `src/mvp/v2.0/py/mvp_v11/` 是从 `src/mvp/v1.1/py/mvp_v11/` 复制而来用于复用,但这导致: + - **库代码重复**(修 bug 要改两份) + - 谁是“权威实现”不明确 +- v2 API(`mvp_v2`)通过引用复制的 `mvp_v11.RayJobTool` 来提交 Ray Job,本质上依赖 v1.1 提交链路作为“库”。 + +### 1.2 配置与部署目录不稳定 + +- v2.0 复用了 v1.1 config 文件并新增 `v2:` section,这是合理的“向后兼容扩展”,但它把: + - “Ray submit 基础配置” + - “API 服务配置” + - “部署路径约定(/workspace/mvp/v1.1 vs /workspace/mvp/v2)” + 混在一个文件里,不利于长期维护。 + +### 1.3 命名歧义:jobspec 与 Ray job + +- v1.1/v2.0 都使用 `jobspec.yaml` 指代“训练语义参数”(PPO/GRPO/SFT 的训练字段)。 +- 但 Ray 也有 “Ray Job” 概念(submission_id、entrypoint、runtime_env 等),易造成沟通误解。 +- 需要把训练侧 specs 改名为 **TaskSpecs**(代表平台级任务规范),与 Ray Job 区分。 + +--- + +## 2. 重构目标(What good looks like) + +### 2.1 目录与职责清晰 + +- “提交 Ray Job 的 SDK 封装”是一个可复用模块(库)。 +- “服务层(API + scheduler + SQLite)”是一个独立模块(应用/服务)。 +- “训练语义参数(TaskSpecs)”与 “Ray Job 提交参数(RayConfig)”分层清楚。 + +### 2.2 单一真源(Single Source of Truth) + +- 只能有一份“Ray submitter core”实现(不能复制一份到另一个版本目录)。 +- API 与 CLI/脚本都复用同一份 core。 + +### 2.3 兼容现有运行方式(渐进迁移) + +- 保留现有的脚本式启动/准备流程(Ray 起集群、准备模型/数据仍用 scripts)。 +- 允许在迁移期提供薄 wrapper 兼容旧路径(减少一次性 break)。 + +--- + +## 3. 目标结构(按功能模块划分) + +建议把 `src/mvp/` 重构为下面的“功能分层”: + +``` +src/mvp/ + py/ + argus/ # 顶层包(避免与 Ray 的 `ray` 包冲突) + __init__.py + + core/ # 通用:yaml/模型定义/工具函数(纯库) + __init__.py + yaml_io.py + ids.py # task_id / attempt_id 生成规则 + + ray/ # Ray Job 提交“核心库”(由现成 mvp_v11 迁移而来) + __init__.py + models.py # RayConfig, TaskSpec(解析), Attempt, enums + builders.py # build_training_argv (ppo/grpo/sft) + driver_entrypoint.py # 仍然作为 Ray job entrypoint(worker 上执行) + ray_job_tool.py # Ray Jobs SDK 封装(submit/status/stop/logs) + runtime_env.py # 统一 PYTHONPATH/runtime_env 组装逻辑 + + service/ # 服务层:FastAPI + scheduler + sqlite(应用) + __init__.py + app.py + scheduler.py + db.py + config.py # service 相关配置读取(从 configs 读取) + ray_resources.py + + cli/ # 命令行/SDK 提交入口(由现成 v1.1 run.py 迁移而来) + __init__.py + run.py # submit/status/logs/stop 等 action + + server.py # uvicorn 入口(导入 argus.service.*) + + configs/ + dev.yaml # RayConfig + ServiceConfig(按层次组织、可扩展) + prod.yaml # (可选)生产配置模板 + + taskspecs/ # 原 jobspecs/,改名 TaskSpecs(训练语义规范) + ppo.yaml + grpo.yaml + sft.yaml + README.md # TaskSpec 字段解释、示例 + + scripts/ # 宿主机脚本(docker exec/compose 编排) + lib.sh + 00_prereq_check.sh + 01_up.sh / 02_down.sh + 20_start_head.sh / 21_start_workers.sh + 30_prepare_data_and_model.sh + 40_submit_cli.sh # 通过 cli/run.py 提交 TaskSpec + 60_start_api.sh # 启动 API(service) + 61_stop_api.sh + 62_status_api.sh + + docker-compose.yaml # dev 环境 compose(从 v1.1 迁移到这里,路径稳定) + README.md # 总入口文档(运行方式、目录约定) +``` + +### 3.1 关键点:库 vs 应用边界 + +- `argus.ray` 是唯一的 Ray submitter 库(替代当前 v1.1/v2.0 的 `mvp_v11` 双份拷贝)。 +- `argus.service` 依赖 `argus.ray`,不反向依赖。 +- `argus.cli` 依赖 `argus.ray`,用于脚本化提交/调试。 + +### 3.2 TaskSpecs vs RayConfig + +- `taskspecs/*.yaml`:描述训练任务语义参数(workload、nnodes、n_gpus_per_node、数据/模型路径、训练步数等)。 +- `configs/*.yaml`:描述 Ray 提交环境(address、entrypoint_resources、runtime_env 以及 service 配置)。 + +--- + +## 4. 配置策略(重构后如何组织 configs) + +### 4.1 建议的 config 分层 + +把当前 `dev.yaml` 的内容明确分为两段(按模块名分段): + +1) `ray:`(RayConfig) +- job server address +- shared_root(`/private`) +- entrypoint resources(强制 driver 落 worker) +- runtime_env env_vars(HF cache、PYTHONPATH 注入策略) + +2) `service:`(ServiceConfig) +- api host/port +- auth token_env +- sqlite db_path +- scheduler tick/max_running/retry_interval + +示例(结构示意): + +```yaml +ray: + address: "http://127.0.0.1:8265" + shared_root: "/private" + entrypoint_num_cpus: 1 + entrypoint_resources: + worker_node: 1 + runtime_env: + env_vars: + HF_ENDPOINT: "https://hf-mirror.com" + PYTHONUNBUFFERED: "1" + user_code_path: "/private/user/code" + +service: + api: + host: "0.0.0.0" + port: 8080 + auth: + token_env: "MVP_INTERNAL_TOKEN" + sqlite: + db_path: "/private/common/db/mvp.sqlite3" + scheduler: + tick_s: 5 + retry_interval_s: 60 + max_running_tasks: 1 +``` + +> 迁移期可以支持“旧格式”(v1.1 顶层字段 + v2: 段)与“新格式”(ray:/service: 两段)并存:解析时兼容读取,降低一次性改动风险;但最终以新格式为准。 + +--- + +## 5. 迁移路径(推荐分两阶段实施) + +### 阶段 A:先拷贝/迁移现成文件,再做最小调整(保持行为不变) + +目标:不改功能、不改 API 行为。优先通过“拷贝/迁移现成文件 + 修正包引用/路径”完成重构,避免重头重写逻辑(降低出错概率)。 + +建议步骤: + +1) 抽出 `src/mvp/py/argus/ray/`(由现成代码迁移) + - 将 `src/mvp/v1.1/py/mvp_v11/` 迁移到 `src/mvp/py/argus/ray/`,并把它作为 submitter core 的唯一真源(不再保留一份复制品在其它目录)。 + - 只做机械化调整:修正 import、修正默认路径常量(例如 tool code path / working dir)、修正 scripts 的调用路径。 + +2) 抽出 `src/mvp/py/argus/service/`(由现成代码迁移) + - 将 `src/mvp/v2.0/py/mvp_v2/` 迁移到 `src/mvp/py/argus/service/`。 + - service 侧对 submitter 的依赖统一改为 `src/mvp/py/argus/ray/`(不再引用 `src/mvp/v2.0/py/mvp_v11/` 的复制品)。 + +3) CLI 统一入口:`src/mvp/py/argus/cli/run.py`(由现成代码迁移) + - 将 `src/mvp/v1.1/py/run.py` 迁移到 `src/mvp/py/argus/cli/run.py`,保留 action 语义(submit/status/logs/stop)。 + - 仅调整 import 与默认路径,使其指向新目录(configs/taskspecs/py)。 + +4) scripts 合并(以 v1.1 为基、合入 v2 API) + - 将 `src/mvp/v1.1/scripts/` 迁移到 `src/mvp/scripts/`(Ray 集群编排最成熟)。 + - 将 `src/mvp/v2.0/scripts/` 的 API 启停脚本合入 `src/mvp/scripts/`,并统一命名与默认路径。 + +5) docker-compose / mounts 稳定化(你已确认要迁移) + - 将 `src/mvp/v1.1/docker-compose.yaml` 迁移为 `src/mvp/docker-compose.yaml`。 + - 容器内挂载统一:宿主机 `.../src/mvp/` → 容器 `/workspace/mvp/`(包含 `py/ configs/ taskspecs/ scripts/`)。 + - runtime_env 的 `PYTHONPATH` 注入统一指向 `/workspace/mvp/py`(不再出现 `/workspace/mvp/v1.1/py`、`/workspace/mvp/v2/...`)。 + +阶段 A 完成标准: +- 原来 v1.1 的 CLI 提交方式仍可用(提交 PPO/GRPO/SFT)。 +- v2 API 仍可用(队列、取消、日志)。 +- 不再存在 `mvp_v11` 的重复目录。 + +### 阶段 B:配置格式升级(按模块两段)+ TaskSpecs 更名落地 + +目标:把 jobspec 真正改名为 TaskSpec,并把 config 升级为按模块两段(`ray:`/`service:`)清晰分层。 + +建议步骤: +- `jobspecs/` → `taskspecs/`,并更新 README/脚本引用。 +- `dev.yaml` 从“顶层字段 + v2:”迁移为“ray:/service:”两段。 +- 保留一段时间的兼容解析逻辑(读取旧格式时发出 warning)。 +- 完成验证后删除旧版本目录:`src/mvp/v1.1/`、`src/mvp/v2.0/`(以及远端对应目录),确保新结构成为唯一入口。 + +阶段 B 完成标准: +- docs 里不再出现 “jobspec” 词汇,统一称 “TaskSpec”。 +- `configs/dev.yaml` 分层清晰(`ray:`/`service:` 两段按模块名),服务与 Ray 的配置互不干扰。 + +--- + +## 6. 重构后的验证与执行方案(验收/回归) + +### 6.1 本地(仓库内)静态验证 + +1) import / 入口检查(在容器环境) +- `python3 -c "from argus.ray.ray_job_tool import RayJobTool"` +- `python3 -c "from argus.service.app import create_app"` + +2) 目录结构检查 +- 确保 `src/mvp/py` 是唯一 python 代码根 +- 确保 `taskspecs/`、`configs/`、`scripts/` 都在 `src/mvp/` 下 + +### 6.2 dev 环境(argus@h1)端到端验证 + +前置: +- 远端目录:`argus@h1:/home2/argus/infra/mvp/`(维持不变) +- 共享目录:`/home2/argus/infra/mvp/shared` 挂载到容器 `/private` + +验证步骤(推荐顺序): + +1) 重新拉起容器 + 启动 Ray +- `scripts/01_up.sh` +- `scripts/20_start_head.sh`(head `--num-cpus=0 --num-gpus=0`) +- `scripts/21_start_workers.sh`(workers 带 `worker_node` 资源) +- `ray status` / `ray list nodes` 确认 head 无 GPU、workers 各 4 GPU + +2) CLI 提交回归(等价 v1.1) +- 提交 `taskspecs/sft.yaml`,确认成功 +- 提交 `taskspecs/grpo.yaml`,确认成功 +- 提交 `taskspecs/ppo.yaml`,确认成功 + +3) API 服务回归(等价 v2.0) +- `scripts/60_start_api.sh` +- `POST /api/v2/tasks`(raw YAML TaskSpec) +- `GET /api/v2/tasks/{task_id}`:确认返回 `created_at/updated_at` +- `POST /api/v2/tasks/{task_id}:cancel`:确认任务 `state=CANCELED` 且 attempt `ray_status=STOPPED`(服务侧语义一致) + +4) 队列行为回归 +- 在 `service.scheduler.max_running_tasks=1` 下: + - 连续提交两个 8-GPU 任务:第二个应保持 `QUEUED/PENDING_RESOURCES`,直到第一个结束后自动提交。 + +验收标准: +- 三种 workload(PPO/GRPO/SFT)都能通过 CLI 跑通(或至少能正确提交到 Ray 并进入 RUNNING)。 +- API 提交/查询/取消/日志正常。 +- “cancel 后 state=CANCELED 但 attempt 仍 RUNNING”的不一致问题不再出现。 + +--- + +## 7. 风险与注意事项 + +1) PYTHONPATH 注入路径变化 +- 当前 runtime_env 里有 `MVP_TOOL_CODE_PATH=/workspace/mvp/v1.1/py` 的历史路径假设; +- 重构后需统一为 `/workspace/mvp/py`,并确保所有容器都挂载到相同路径。 + +2) SQLite WAL 在 NFS 上的稳定性 +- 目前 db 使用 WAL(生成 `-wal/-shm`),NFS 下可能有一致性风险; +- 可作为后续优化:检测 NFS 时退回 `journal_mode=DELETE` 或换成单机本地盘。 + +3) 渐进迁移的兼容期 +- 迁移期可以短暂保留旧路径(例如 `src/mvp/v1.1`、`src/mvp/v2.0` 仅保留 README 指向新路径)以减少一次性断裂;但你已确认最终会删除这两个目录,因此需要在 scripts/文档同步更新后尽快清理。 + +--- + +## 8. 已确认约束(将按此实施) + +你已明确: + +1) `docker-compose.yaml` 必须迁移到 `src/mvp/` 根;重构完成后 `src/mvp/v1.1`、`src/mvp/v2.0` 都会删除。 +2) `configs/dev.yaml` 升级为两段,按模块名分段:`ray:` 与 `service:`;并保持纯 YAML 风格(不混用 JSON inline map)。 +3) TaskSpec 字段先保持与现有 v1.1 YAML 完全一致(仅目录与命名变化),避免引入不必要的不兼容。 diff --git a/src/mvp/README.md b/src/mvp/README.md new file mode 100644 index 0000000..d7db5f7 --- /dev/null +++ b/src/mvp/README.md @@ -0,0 +1,17 @@ +# MVP(模块化结构) + +本目录是 MVP 的“按功能模块”组织后的最终入口(替代原 `v1.1/`、`v2.0/` 版本目录)。 + +目录: +- `py/`:Python 代码(`argus.*` 包) + - `argus.ray`:Ray Jobs SDK submitter core(提交/查询/停止/日志) + - `argus.service`:API + scheduler + SQLite(服务层队列/重试/状态聚合) + - `argus.cli`:CLI 工具(用于脚本化提交/调试) +- `configs/`:配置(`ray:` 与 `service:` 两段,纯 YAML 风格) +- `taskspecs/`:TaskSpec(训练任务语义参数) +- `scripts/`:宿主机脚本(docker compose/exec 编排) +- `docker-compose.yaml`:dev 环境容器定义(head+workers) + +快速开始: +- CLI 提交流程:`scripts/run_all_cli.sh` +- API 提交流程:`scripts/run_all_api.sh` diff --git a/src/mvp/configs/dev.yaml b/src/mvp/configs/dev.yaml new file mode 100644 index 0000000..d117f65 --- /dev/null +++ b/src/mvp/configs/dev.yaml @@ -0,0 +1,34 @@ +ray: + # Ray Job server address (head 容器内视角) + address: "http://127.0.0.1:8265" + + # 共享根路径(容器内统一 /private,对齐生产) + shared_root: "/private" + + # 强制 driver 落 worker(head 不跑训练) + entrypoint_num_cpus: 1 + entrypoint_resources: + worker_node: 1 + + # 所有 job 通用 runtime env + runtime_env: + env_vars: + HF_ENDPOINT: "https://hf-mirror.com" + PYTHONUNBUFFERED: "1" + + # 用户自定义代码目录(可被 PYTHONPATH 注入) + user_code_path: "/private/user/code" + +service: + api: + host: "0.0.0.0" + port: 8080 + auth: + token_env: "MVP_INTERNAL_TOKEN" + sqlite: + db_path: "/private/common/db/mvp.sqlite3" + scheduler: + tick_s: 5 + retry_interval_s: 60 + max_running_tasks: 1 + diff --git a/src/mvp/v1.1/docker-compose.yaml b/src/mvp/docker-compose.yaml similarity index 71% rename from src/mvp/v1.1/docker-compose.yaml rename to src/mvp/docker-compose.yaml index 43892df..7759941 100644 --- a/src/mvp/v1.1/docker-compose.yaml +++ b/src/mvp/docker-compose.yaml @@ -3,16 +3,18 @@ version: "3.8" services: ray_head: image: verlai/verl:sgl055.latest - container_name: mvp11-ray-head + container_name: argus-ray-head command: sleep infinity ports: - "8265:8265" - "8080:8080" volumes: - - ../verl:/workspace/verl - - ../shared:/private - - .:/workspace/mvp/v1.1 - - ../v2:/workspace/mvp/v2 + # NOTE: this compose file is intended for the dev env layout like: + # /home2/argus/infra/mvp/{shared,verl,src/mvp} + # so from `src/mvp/` the shared/verl dirs are `../../shared` and `../../verl`. + - ../../verl:/workspace/verl + - ../../shared:/private + - .:/workspace/mvp shm_size: "10g" ulimits: nofile: @@ -22,7 +24,7 @@ services: - SYS_ADMIN - SYS_PTRACE networks: - - mvp11-ray-net + - argus-ray-net environment: HF_HOME: "/private/hf" HUGGINGFACE_HUB_CACHE: "/private/hf/hub" @@ -32,12 +34,12 @@ services: ray_worker_0: image: verlai/verl:sgl055.latest - container_name: mvp11-ray-worker-0 + container_name: argus-ray-worker-0 command: sleep infinity volumes: - - ../verl:/workspace/verl - - ../shared:/private - - .:/workspace/mvp/v1.1 + - ../../verl:/workspace/verl + - ../../shared:/private + - .:/workspace/mvp shm_size: "10g" ulimits: nofile: @@ -47,7 +49,7 @@ services: - SYS_ADMIN - SYS_PTRACE networks: - - mvp11-ray-net + - argus-ray-net runtime: nvidia environment: NVIDIA_VISIBLE_DEVICES: "0,1,2,3" @@ -60,12 +62,12 @@ services: ray_worker_1: image: verlai/verl:sgl055.latest - container_name: mvp11-ray-worker-1 + container_name: argus-ray-worker-1 command: sleep infinity volumes: - - ../verl:/workspace/verl - - ../shared:/private - - .:/workspace/mvp/v1.1 + - ../../verl:/workspace/verl + - ../../shared:/private + - .:/workspace/mvp shm_size: "10g" ulimits: nofile: @@ -75,7 +77,7 @@ services: - SYS_ADMIN - SYS_PTRACE networks: - - mvp11-ray-net + - argus-ray-net runtime: nvidia environment: NVIDIA_VISIBLE_DEVICES: "4,5,6,7" @@ -87,5 +89,5 @@ services: PYTHONUNBUFFERED: "1" networks: - mvp11-ray-net: + argus-ray-net: driver: bridge diff --git a/src/mvp/v2.0/py/mvp_v2/__init__.py b/src/mvp/py/argus/__init__.py similarity index 100% rename from src/mvp/v2.0/py/mvp_v2/__init__.py rename to src/mvp/py/argus/__init__.py diff --git a/src/mvp/py/argus/cli/__init__.py b/src/mvp/py/argus/cli/__init__.py new file mode 100644 index 0000000..fe16459 --- /dev/null +++ b/src/mvp/py/argus/cli/__init__.py @@ -0,0 +1,2 @@ +__all__ = [] + diff --git a/src/mvp/v1.1/py/run.py b/src/mvp/py/argus/cli/run.py similarity index 66% rename from src/mvp/v1.1/py/run.py rename to src/mvp/py/argus/cli/run.py index 67fe975..9cb52a1 100644 --- a/src/mvp/v1.1/py/run.py +++ b/src/mvp/py/argus/cli/run.py @@ -8,22 +8,25 @@ import sys def _ensure_import_path() -> None: - # Allow `python3 /workspace/.../py/run.py` to import `mvp_v11.*` + # Allow running as a file path (not just `python3 -m argus.cli.run`). + # Ensure `/workspace/mvp/py` is on sys.path so `import argus.*` works. here = os.path.dirname(os.path.abspath(__file__)) - if here not in sys.path: - sys.path.insert(0, here) + py_root = os.path.dirname(os.path.dirname(here)) # .../py + if py_root not in sys.path: + sys.path.insert(0, py_root) def main() -> int: _ensure_import_path() - from mvp_v11.models import JobSpec, RayConfig - from mvp_v11.ray_job_tool import RayJobTool - from mvp_v11.yaml_io import load_yaml + from argus.ray.models import JobSpec, RayConfig + from argus.ray.ray_job_tool import RayJobTool + from argus.ray.yaml_io import load_yaml parser = argparse.ArgumentParser() - parser.add_argument("--config", required=True, help="Ray base config yaml") - parser.add_argument("--jobspec", help="Training jobspec yaml (required for submit)") + parser.add_argument("--config", required=True, help="MVP config yaml (ray/service)") + parser.add_argument("--taskspec", help="Training TaskSpec yaml (required for submit)") + parser.add_argument("--jobspec", dest="taskspec", help=argparse.SUPPRESS) # backward compatible alias parser.add_argument("--action", required=True, choices=["submit", "status", "stop", "logs", "list"]) parser.add_argument("--submission-id", help="For status/stop/logs") parser.add_argument("--no-wait", action="store_true", help="Submit and return immediately") @@ -34,9 +37,9 @@ def main() -> int: tool = RayJobTool(cfg) if args.action == "submit": - if not args.jobspec: - raise SystemExit("--jobspec is required for submit") - spec = JobSpec.from_dict(load_yaml(args.jobspec)) + if not args.taskspec: + raise SystemExit("--taskspec is required for submit") + spec = JobSpec.from_dict(load_yaml(args.taskspec)) submitted = tool.submit(spec, no_wait=args.no_wait) print(submitted) return 0 diff --git a/src/mvp/py/argus/core/__init__.py b/src/mvp/py/argus/core/__init__.py new file mode 100644 index 0000000..fe16459 --- /dev/null +++ b/src/mvp/py/argus/core/__init__.py @@ -0,0 +1,2 @@ +__all__ = [] + diff --git a/src/mvp/v2.0/py/mvp_v2/ids.py b/src/mvp/py/argus/core/ids.py similarity index 100% rename from src/mvp/v2.0/py/mvp_v2/ids.py rename to src/mvp/py/argus/core/ids.py diff --git a/src/mvp/v1.1/py/mvp_v11/__init__.py b/src/mvp/py/argus/ray/__init__.py similarity index 100% rename from src/mvp/v1.1/py/mvp_v11/__init__.py rename to src/mvp/py/argus/ray/__init__.py diff --git a/src/mvp/v1.1/py/mvp_v11/builders.py b/src/mvp/py/argus/ray/builders.py similarity index 100% rename from src/mvp/v1.1/py/mvp_v11/builders.py rename to src/mvp/py/argus/ray/builders.py diff --git a/src/mvp/v1.1/py/mvp_v11/driver_entrypoint.py b/src/mvp/py/argus/ray/driver_entrypoint.py similarity index 100% rename from src/mvp/v1.1/py/mvp_v11/driver_entrypoint.py rename to src/mvp/py/argus/ray/driver_entrypoint.py diff --git a/src/mvp/v2.0/py/mvp_v11/models.py b/src/mvp/py/argus/ray/models.py similarity index 96% rename from src/mvp/v2.0/py/mvp_v11/models.py rename to src/mvp/py/argus/ray/models.py index 41f8804..66bd823 100644 --- a/src/mvp/v2.0/py/mvp_v11/models.py +++ b/src/mvp/py/argus/ray/models.py @@ -21,6 +21,11 @@ class RayConfig: @staticmethod def from_dict(d: dict[str, Any]) -> "RayConfig": + # New format: root.ray + # Backward compatible: root (flat) + if isinstance(d.get("ray"), dict): + d = d["ray"] # type: ignore[assignment] + runtime_env = d.get("runtime_env") or {} env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {} if not isinstance(env_vars, dict): diff --git a/src/mvp/v1.1/py/mvp_v11/ray_job_tool.py b/src/mvp/py/argus/ray/ray_job_tool.py similarity index 98% rename from src/mvp/v1.1/py/mvp_v11/ray_job_tool.py rename to src/mvp/py/argus/ray/ray_job_tool.py index 01b899a..8d7c538 100644 --- a/src/mvp/v1.1/py/mvp_v11/ray_job_tool.py +++ b/src/mvp/py/argus/ray/ray_job_tool.py @@ -53,9 +53,9 @@ class RayJobTool: env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers") env_vars.setdefault("PYTHONUNBUFFERED", "1") - # Tool code path must be importable on workers (compose mounts v1.1 into all containers). + # Tool code path must be importable on workers (compose mounts `/workspace/mvp` into all containers). # Place it before verl code to avoid interfering with verl import priority. - tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py") + tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/py") user_code_path = self.cfg.user_code_path code_path = spec.code_path @@ -81,7 +81,7 @@ class RayJobTool: entrypoint_argv = [ "python3", "-m", - "mvp_v11.driver_entrypoint", + "argus.ray.driver_entrypoint", "--job-dir", job_dir, *built.argv, diff --git a/src/mvp/v1.1/py/mvp_v11/yaml_io.py b/src/mvp/py/argus/ray/yaml_io.py similarity index 100% rename from src/mvp/v1.1/py/mvp_v11/yaml_io.py rename to src/mvp/py/argus/ray/yaml_io.py diff --git a/src/mvp/py/argus/service/__init__.py b/src/mvp/py/argus/service/__init__.py new file mode 100644 index 0000000..fe16459 --- /dev/null +++ b/src/mvp/py/argus/service/__init__.py @@ -0,0 +1,2 @@ +__all__ = [] + diff --git a/src/mvp/v2.0/py/mvp_v2/app.py b/src/mvp/py/argus/service/app.py similarity index 98% rename from src/mvp/v2.0/py/mvp_v2/app.py rename to src/mvp/py/argus/service/app.py index 798a124..6df707a 100644 --- a/src/mvp/v2.0/py/mvp_v2/app.py +++ b/src/mvp/py/argus/service/app.py @@ -7,11 +7,11 @@ from typing import Any import yaml from fastapi import FastAPI, HTTPException, Request, Response -from mvp_v11.models import JobSpec, RayConfig +from argus.core.ids import new_task_id +from argus.ray.models import JobSpec, RayConfig from .config import V2Config from .db import Db -from .ids import new_task_id from .scheduler import Scheduler diff --git a/src/mvp/v2.0/py/mvp_v2/config.py b/src/mvp/py/argus/service/config.py similarity index 63% rename from src/mvp/v2.0/py/mvp_v2/config.py rename to src/mvp/py/argus/service/config.py index c859c9b..7d3f5dd 100644 --- a/src/mvp/v2.0/py/mvp_v2/config.py +++ b/src/mvp/py/argus/service/config.py @@ -36,20 +36,29 @@ class V2Config: @staticmethod def from_root_dict(root: dict[str, Any]) -> "V2Config": - v2 = root.get("v2") or {} - if not isinstance(v2, dict): - raise ValueError("config.v2 must be a mapping") + # New format: root.service + # Backward compatible: root.v2 + service = root.get("service") + if service is None: + service = root.get("v2") or {} + if not isinstance(service, dict): + raise ValueError("config.service must be a mapping") - api = v2.get("api") or {} - auth = v2.get("auth") or {} - sqlite = v2.get("sqlite") or {} - scheduler = v2.get("scheduler") or {} + api = service.get("api") or {} + auth = service.get("auth") or {} + sqlite = service.get("sqlite") or {} + scheduler = service.get("scheduler") or {} if not isinstance(api, dict) or not isinstance(auth, dict) or not isinstance(sqlite, dict) or not isinstance(scheduler, dict): - raise ValueError("config.v2.{api,auth,sqlite,scheduler} must be mappings") + raise ValueError("config.service.{api,auth,sqlite,scheduler} must be mappings") - shared_root = str(root.get("shared_root") or "/private") - default_db_path = f"{shared_root}/common/db/mvp_v2.sqlite3" + ray = root.get("ray") + if isinstance(ray, dict) and ray.get("shared_root"): + shared_root = str(ray.get("shared_root")) + else: + shared_root = str(root.get("shared_root") or "/private") + + default_db_path = f"{shared_root}/common/db/mvp.sqlite3" db_path = str(sqlite.get("db_path") or default_db_path) return V2Config( @@ -65,4 +74,3 @@ class V2Config: max_running_tasks=int(scheduler.get("max_running_tasks") or 1), ), ) - diff --git a/src/mvp/v2.0/py/mvp_v2/db.py b/src/mvp/py/argus/service/db.py similarity index 100% rename from src/mvp/v2.0/py/mvp_v2/db.py rename to src/mvp/py/argus/service/db.py diff --git a/src/mvp/v2.0/py/mvp_v2/ray_resources.py b/src/mvp/py/argus/service/ray_resources.py similarity index 100% rename from src/mvp/v2.0/py/mvp_v2/ray_resources.py rename to src/mvp/py/argus/service/ray_resources.py diff --git a/src/mvp/v2.0/py/mvp_v2/scheduler.py b/src/mvp/py/argus/service/scheduler.py similarity index 97% rename from src/mvp/v2.0/py/mvp_v2/scheduler.py rename to src/mvp/py/argus/service/scheduler.py index d84f9f2..452b77c 100644 --- a/src/mvp/v2.0/py/mvp_v2/scheduler.py +++ b/src/mvp/py/argus/service/scheduler.py @@ -8,12 +8,12 @@ from typing import Any import yaml -from mvp_v11.models import JobSpec, RayConfig -from mvp_v11.ray_job_tool import RayJobTool +from argus.core.ids import attempt_submission_id +from argus.ray.models import JobSpec, RayConfig +from argus.ray.ray_job_tool import RayJobTool from .config import V2Config from .db import Db -from .ids import attempt_submission_id from .ray_resources import ensure_ray_connected, get_cluster_available diff --git a/src/mvp/v2.0/py/requirements.txt b/src/mvp/py/requirements.txt similarity index 100% rename from src/mvp/v2.0/py/requirements.txt rename to src/mvp/py/requirements.txt diff --git a/src/mvp/v2.0/py/server.py b/src/mvp/py/server.py similarity index 86% rename from src/mvp/v2.0/py/server.py rename to src/mvp/py/server.py index b1ea2ef..31482f6 100644 --- a/src/mvp/v2.0/py/server.py +++ b/src/mvp/py/server.py @@ -5,13 +5,13 @@ import argparse import uvicorn -from mvp_v2.app import create_app -from mvp_v2.config import V2Config +from argus.service.app import create_app +from argus.service.config import V2Config def main() -> int: parser = argparse.ArgumentParser() - parser.add_argument("--config", required=True, help="Path to v1.1 RayConfig YAML (extended with v2:)") + parser.add_argument("--config", required=True, help="Path to MVP config YAML (ray:/service:)") args = parser.parse_args() # Load app and read v2.api host/port from config. @@ -30,4 +30,3 @@ def main() -> int: if __name__ == "__main__": raise SystemExit(main()) - diff --git a/src/mvp/v1.1/scripts/00_prereq_check.sh b/src/mvp/scripts/00_prereq_check.sh old mode 100644 new mode 100755 similarity index 56% rename from src/mvp/v1.1/scripts/00_prereq_check.sh rename to src/mvp/scripts/00_prereq_check.sh index e6479f8..c6335df --- a/src/mvp/v1.1/scripts/00_prereq_check.sh +++ b/src/mvp/scripts/00_prereq_check.sh @@ -27,16 +27,15 @@ else nvidia-smi -L || true fi -echo "[host] ensure shared dirs exist under ../shared" -mkdir -p "${ROOT_DIR}/../shared"/{datasets,hf,jobs,outputs,ray,common,user} -mkdir -p "${ROOT_DIR}/../shared/common"/{code,datasets,models} -mkdir -p "${ROOT_DIR}/../shared/user"/{code} +echo "[host] ensure shared dirs exist under ../../shared" +mkdir -p "${ROOT_DIR}/../../shared"/{datasets,hf,jobs,outputs,ray,common,user} +mkdir -p "${ROOT_DIR}/../../shared/common"/{code,datasets,models} +mkdir -p "${ROOT_DIR}/../../shared/user"/{code} -echo "[host] ensure verl repo exists under ../verl (required by prepare scripts)" -if [[ ! -d "${ROOT_DIR}/../verl" ]]; then - echo "missing ../verl. On remote, ensure /home2/argus/infra/mvp/verl exists (git clone volcengine/verl)." >&2 +echo "[host] ensure verl repo exists under ../../verl (required by prepare scripts)" +if [[ ! -d "${ROOT_DIR}/../../verl" ]]; then + echo "missing ../../verl. On remote, ensure /home2/argus/infra/mvp/verl exists (git clone volcengine/verl)." >&2 exit 1 fi echo "ok" - diff --git a/src/mvp/v1.1/scripts/01_up.sh b/src/mvp/scripts/01_up.sh old mode 100644 new mode 100755 similarity index 80% rename from src/mvp/v1.1/scripts/01_up.sh rename to src/mvp/scripts/01_up.sh index 5dffc6c..ab67090 --- a/src/mvp/v1.1/scripts/01_up.sh +++ b/src/mvp/scripts/01_up.sh @@ -9,8 +9,8 @@ if [[ "${SKIP_CLEANUP_V1:-0}" != "1" ]]; then "${SCRIPT_DIR}/03_cleanup_v1_legacy.sh" || true fi -echo "[host] docker compose up -d (v1.1)" +echo "[host] docker compose up -d (mvp)" dc up -d echo "[host] containers:" -docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' | (head -n 1 && grep -E "mvp11-ray-") || true +docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' | (head -n 1 && grep -E "argus-ray-") || true diff --git a/src/mvp/v1.1/scripts/02_down.sh b/src/mvp/scripts/02_down.sh old mode 100644 new mode 100755 similarity index 82% rename from src/mvp/v1.1/scripts/02_down.sh rename to src/mvp/scripts/02_down.sh index a0d6eb0..9e950f9 --- a/src/mvp/v1.1/scripts/02_down.sh +++ b/src/mvp/scripts/02_down.sh @@ -5,8 +5,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck source=lib.sh source "${SCRIPT_DIR}/lib.sh" -echo "[host] docker compose down (v1.1)" +echo "[host] docker compose down (mvp)" dc down -v || true echo "[host] done" - diff --git a/src/mvp/v1.1/scripts/03_cleanup_v1_legacy.sh b/src/mvp/scripts/03_cleanup_v1_legacy.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/03_cleanup_v1_legacy.sh rename to src/mvp/scripts/03_cleanup_v1_legacy.sh diff --git a/src/mvp/v1.1/scripts/05_ensure_verl_repo.sh b/src/mvp/scripts/05_ensure_verl_repo.sh old mode 100644 new mode 100755 similarity index 94% rename from src/mvp/v1.1/scripts/05_ensure_verl_repo.sh rename to src/mvp/scripts/05_ensure_verl_repo.sh index 3a6da0d..3d8d7a1 --- a/src/mvp/v1.1/scripts/05_ensure_verl_repo.sh +++ b/src/mvp/scripts/05_ensure_verl_repo.sh @@ -5,7 +5,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck source=lib.sh source "${SCRIPT_DIR}/lib.sh" -VERL_DIR="${ROOT_DIR}/../verl" +VERL_DIR="${ROOT_DIR}/../../verl" echo "[host] ensure verl repo exists at: ${VERL_DIR}" if [[ -d "${VERL_DIR}/.git" ]]; then @@ -20,4 +20,3 @@ fi echo "cloning volcengine/verl -> ${VERL_DIR}" git clone https://github.com/volcengine/verl.git "${VERL_DIR}" - diff --git a/src/mvp/scripts/12_install_api_deps.sh b/src/mvp/scripts/12_install_api_deps.sh new file mode 100755 index 0000000..565db17 --- /dev/null +++ b/src/mvp/scripts/12_install_api_deps.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Install API/CLI dependencies inside the head container (best-effort). +# Assumes containers are already up and `/workspace/mvp/` is mounted. + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true" +dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/py/requirements.txt" diff --git a/src/mvp/scripts/12_install_py_deps.sh b/src/mvp/scripts/12_install_py_deps.sh new file mode 100755 index 0000000..4c1126f --- /dev/null +++ b/src/mvp/scripts/12_install_py_deps.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +echo "[head] install python deps for CLI/API" +dexec "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/py/requirements.txt" diff --git a/src/mvp/v1.1/scripts/20_start_head.sh b/src/mvp/scripts/20_start_head.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/20_start_head.sh rename to src/mvp/scripts/20_start_head.sh diff --git a/src/mvp/v1.1/scripts/21_start_workers.sh b/src/mvp/scripts/21_start_workers.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/21_start_workers.sh rename to src/mvp/scripts/21_start_workers.sh diff --git a/src/mvp/v1.1/scripts/30_prepare_data_and_model.sh b/src/mvp/scripts/30_prepare_data_and_model.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/30_prepare_data_and_model.sh rename to src/mvp/scripts/30_prepare_data_and_model.sh diff --git a/src/mvp/v1.1/scripts/31_snapshot_verl_code.sh b/src/mvp/scripts/31_snapshot_verl_code.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/31_snapshot_verl_code.sh rename to src/mvp/scripts/31_snapshot_verl_code.sh diff --git a/src/mvp/scripts/44_submit_sdk.sh b/src/mvp/scripts/44_submit_sdk.sh new file mode 100755 index 0000000..1df83e0 --- /dev/null +++ b/src/mvp/scripts/44_submit_sdk.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +CONFIG_PATH="${1:-/workspace/mvp/configs/dev.yaml}" +TASKSPEC_PATH="${2:-}" + +if [[ -z "${TASKSPEC_PATH}" ]]; then + echo "usage: $0 " >&2 + echo "example: $0 /workspace/mvp/configs/dev.yaml /workspace/mvp/taskspecs/ppo.yaml" >&2 + exit 1 +fi + +echo "[head] submit via Ray Python SDK" +dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config '${CONFIG_PATH}' --taskspec '${TASKSPEC_PATH}' --action submit --no-wait" diff --git a/src/mvp/v1.1/scripts/50_status.sh b/src/mvp/scripts/50_status.sh old mode 100644 new mode 100755 similarity index 100% rename from src/mvp/v1.1/scripts/50_status.sh rename to src/mvp/scripts/50_status.sh diff --git a/src/mvp/scripts/60_start_api.sh b/src/mvp/scripts/60_start_api.sh new file mode 100755 index 0000000..29cef85 --- /dev/null +++ b/src/mvp/scripts/60_start_api.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/configs/dev.yaml}" +LOG_PATH="${LOG_PATH:-/private/common/logs/argus_mvp_api.log}" +PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}" + +echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}" + +dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\"" + +# Note: requires /workspace/mvp/py to be present in the container (mounted). +# Escape $ so that the command substitution happens in the container, not on the host. +dexec "${HEAD_CONTAINER}" bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi" + +if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then + echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2 + exit 1 +fi + +docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'" + +echo "[host] started; pid stored in ${PID_PATH} (container path)" +echo "[host] logs: ${LOG_PATH} (container path)" diff --git a/src/mvp/scripts/61_stop_api.sh b/src/mvp/scripts/61_stop_api.sh new file mode 100755 index 0000000..f4b46b1 --- /dev/null +++ b/src/mvp/scripts/61_stop_api.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}" + +echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})" + +dexec "${HEAD_CONTAINER}" bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'" diff --git a/src/mvp/scripts/62_status_api.sh b/src/mvp/scripts/62_status_api.sh new file mode 100755 index 0000000..ec5b6c8 --- /dev/null +++ b/src/mvp/scripts/62_status_api.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +PID_PATH="${PID_PATH:-/private/common/run/argus_mvp_api.pid}" + +dexec "${HEAD_CONTAINER}" bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi" diff --git a/src/mvp/v1.1/scripts/lib.sh b/src/mvp/scripts/lib.sh old mode 100644 new mode 100755 similarity index 83% rename from src/mvp/v1.1/scripts/lib.sh rename to src/mvp/scripts/lib.sh index 23e0997..778d1a6 --- a/src/mvp/v1.1/scripts/lib.sh +++ b/src/mvp/scripts/lib.sh @@ -6,9 +6,10 @@ ROOT_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" COMPOSE_FILE="${ROOT_DIR}/docker-compose.yaml" -HEAD_CONTAINER="mvp11-ray-head" -WORKER0_CONTAINER="mvp11-ray-worker-0" -WORKER1_CONTAINER="mvp11-ray-worker-1" +# Container names (refactor) +HEAD_CONTAINER="${HEAD_CONTAINER:-argus-ray-head}" +WORKER0_CONTAINER="${WORKER0_CONTAINER:-argus-ray-worker-0}" +WORKER1_CONTAINER="${WORKER1_CONTAINER:-argus-ray-worker-1}" SHARED_ROOT="${SHARED_ROOT:-/private}" RAY_DASHBOARD_ADDR="${RAY_DASHBOARD_ADDR:-http://127.0.0.1:8265}" @@ -49,4 +50,3 @@ container_ip() { timestamp() { date +"%Y%m%d_%H%M%S" } - diff --git a/src/mvp/scripts/run_all_api.sh b/src/mvp/scripts/run_all_api.sh new file mode 100755 index 0000000..3ff190b --- /dev/null +++ b/src/mvp/scripts/run_all_api.sh @@ -0,0 +1,120 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +# Run the end-to-end flow using the API service (argus.service). +# This script restarts the Ray cluster, prepares model/data, starts the API, +# then submits PPO/GRPO/SFT via HTTP API and monitors the queue. + +API_ADDR="${API_ADDR:-http://127.0.0.1:8080}" +TOKEN="${MVP_INTERNAL_TOKEN:-}" +DB_IN_CONTAINER="${DB_IN_CONTAINER:-/private/common/db/mvp.sqlite3}" + +if [[ -z "${TOKEN}" ]]; then + echo "ERROR: MVP_INTERNAL_TOKEN must be set in the host env for run_all_api.sh" >&2 + exit 1 +fi + +api_curl() { + curl -sS -H "Authorization: Bearer ${TOKEN}" "$@" +} + +api_wait_ready() { + local tries="${1:-60}" + for i in $(seq 1 "${tries}"); do + if curl -sS -m 2 "${API_ADDR}/docs" >/dev/null 2>&1; then + echo "[host] api_ready: ${API_ADDR}" + return 0 + fi + echo "[host] waiting api... (${i}/${tries})" + sleep 2 + done + echo "ERROR: api not ready: ${API_ADDR}" >&2 + return 1 +} + +submit_taskspec() { + local taskspec_path="$1" + echo "[host] submit via API: ${taskspec_path}" >&2 + local resp + resp="$(api_curl -H "Content-Type: application/yaml" --data-binary @"${taskspec_path}" "${API_ADDR}/api/v2/tasks")" + echo "[host] submit_resp: ${resp}" >&2 + printf '%s' "${resp}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["task_id"])' +} + +print_queue() { + echo "[host] queue:" + api_curl "${API_ADDR}/api/v2/queue" || true + echo +} + +wait_task() { + local task_id="$1" + while true; do + local body state + body="$(api_curl "${API_ADDR}/api/v2/tasks/${task_id}")" + state="$(printf '%s' "${body}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["state"])')" + echo "[host] task ${task_id}: ${state}" + + if [[ "${state}" == "SUCCEEDED" ]]; then + return 0 + fi + if [[ "${state}" == "FAILED" || "${state}" == "CANCELED" ]]; then + echo "[host] terminal=${state}; tail logs (best-effort):" >&2 + api_curl "${API_ADDR}/api/v2/tasks/${task_id}/logs?tail=200" >&2 || true + return 1 + fi + + print_queue + sleep 10 + done +} + +echo "[host] ===== run_all_api.sh begin =====" + +"${SCRIPT_DIR}/00_prereq_check.sh" +"${SCRIPT_DIR}/03_cleanup_v1_legacy.sh" +"${SCRIPT_DIR}/05_ensure_verl_repo.sh" + +echo "[host] (re)create containers" +"${SCRIPT_DIR}/01_up.sh" + +echo "[host] restart ray cluster" +"${SCRIPT_DIR}/20_start_head.sh" +"${SCRIPT_DIR}/21_start_workers.sh" + +echo "[host] prepare data/model/code snapshot" +"${SCRIPT_DIR}/30_prepare_data_and_model.sh" + +echo "[host] install api deps in head container" +"${SCRIPT_DIR}/12_install_api_deps.sh" + +echo "[host] stop api (best-effort)" +"${SCRIPT_DIR}/61_stop_api.sh" || true + +echo "[host] reset api sqlite db in container (best-effort): ${DB_IN_CONTAINER}" +dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p \"$(dirname "${DB_IN_CONTAINER}")\"; rm -f '${DB_IN_CONTAINER}' '${DB_IN_CONTAINER}-wal' '${DB_IN_CONTAINER}-shm' || true" + +echo "[host] start api" +MVP_INTERNAL_TOKEN="${TOKEN}" "${SCRIPT_DIR}/60_start_api.sh" +api_wait_ready 60 + +print_queue +PPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/ppo.yaml")" +GRPO_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/grpo.yaml")" +SFT_TASK_ID="$(submit_taskspec "${ROOT_DIR}/taskspecs/sft.yaml")" + +echo "[host] submitted task ids:" +echo " ppo=${PPO_TASK_ID}" +echo " grpo=${GRPO_TASK_ID}" +echo " sft=${SFT_TASK_ID}" + +echo "[host] wait for tasks (in submission order)" +wait_task "${PPO_TASK_ID}" +wait_task "${GRPO_TASK_ID}" +wait_task "${SFT_TASK_ID}" + +echo "[host] ===== run_all_api.sh done =====" diff --git a/src/mvp/v1.1/scripts/run_all.sh b/src/mvp/scripts/run_all_cli.sh old mode 100644 new mode 100755 similarity index 51% rename from src/mvp/v1.1/scripts/run_all.sh rename to src/mvp/scripts/run_all_cli.sh index 1df293d..3e89801 --- a/src/mvp/v1.1/scripts/run_all.sh +++ b/src/mvp/scripts/run_all_cli.sh @@ -5,13 +5,16 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck source=lib.sh source "${SCRIPT_DIR}/lib.sh" +# Run the end-to-end flow using the CLI submitter (argus.cli). +# This script restarts the Ray cluster and submits PPO/GRPO/SFT sequentially. + submit_and_wait() { - local jobspec_in_container="$1" + local taskspec_in_container="$1" local sid local out - echo "[host] submit via SDK: ${jobspec_in_container}" - out="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --jobspec '${jobspec_in_container}' --action submit --no-wait" | tr -d '\r')" + echo "[host] submit via SDK: ${taskspec_in_container}" + out="$(dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --taskspec '${taskspec_in_container}' --action submit --no-wait" | tr -d '\r')" sid="$(printf '%s\n' "${out}" | tail -n 1)" if [[ -z "${sid}" ]]; then @@ -23,7 +26,7 @@ submit_and_wait() { echo "[host] submitted: ${sid}" while true; do - st="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)" + st="$(dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)" echo "[host] status: ${sid} -> ${st}" case "${st}" in *SUCCEEDED*) @@ -32,7 +35,7 @@ submit_and_wait() { *FAILED*|*STOPPED*) echo "[host] job failed: ${sid} (${st})" >&2 echo "[host] last logs:" >&2 - dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action logs --submission-id '${sid}' --tail 200" >&2 || true + dexec "${HEAD_CONTAINER}" bash -lc "cd /workspace/mvp/py && python3 -m argus.cli.run --config /workspace/mvp/configs/dev.yaml --action logs --submission-id '${sid}' --tail 200" >&2 || true return 1 ;; *) @@ -50,7 +53,7 @@ submit_and_wait() { "${SCRIPT_DIR}/21_start_workers.sh" "${SCRIPT_DIR}/30_prepare_data_and_model.sh" "${SCRIPT_DIR}/12_install_py_deps.sh" -submit_and_wait /workspace/mvp/v1.1/py/jobspecs/ppo.yaml -submit_and_wait /workspace/mvp/v1.1/py/jobspecs/grpo.yaml -submit_and_wait /workspace/mvp/v1.1/py/jobspecs/sft.yaml +submit_and_wait /workspace/mvp/taskspecs/ppo.yaml +submit_and_wait /workspace/mvp/taskspecs/grpo.yaml +submit_and_wait /workspace/mvp/taskspecs/sft.yaml "${SCRIPT_DIR}/50_status.sh" diff --git a/src/mvp/taskspecs/README.md b/src/mvp/taskspecs/README.md new file mode 100644 index 0000000..e8fb073 --- /dev/null +++ b/src/mvp/taskspecs/README.md @@ -0,0 +1,10 @@ +# TaskSpecs + +这里的 `*.yaml` 是 **TaskSpec**(训练任务语义参数),用于描述要跑的 workload(PPO/GRPO/SFT)以及数据/模型路径、训练步数、分布式规模等。 + +注意区分: +- **TaskSpec**:训练语义(本目录) +- **Ray Job**:Ray Jobs 的 submission(由 submitter 将 TaskSpec 转换为 Ray Job entrypoint + runtime_env 等) + +字段保持与原 v1.1 `jobspecs/` 一致(迁移期不做字段改名/规范化,以降低风险)。 + diff --git a/src/mvp/v1.1/py/jobspecs/grpo.yaml b/src/mvp/taskspecs/grpo.yaml similarity index 100% rename from src/mvp/v1.1/py/jobspecs/grpo.yaml rename to src/mvp/taskspecs/grpo.yaml diff --git a/src/mvp/v1.1/py/jobspecs/ppo.yaml b/src/mvp/taskspecs/ppo.yaml similarity index 100% rename from src/mvp/v1.1/py/jobspecs/ppo.yaml rename to src/mvp/taskspecs/ppo.yaml diff --git a/src/mvp/v1.1/py/jobspecs/sft.yaml b/src/mvp/taskspecs/sft.yaml similarity index 100% rename from src/mvp/v1.1/py/jobspecs/sft.yaml rename to src/mvp/taskspecs/sft.yaml diff --git a/src/mvp/v1.1/README.md b/src/mvp/v1.1/README.md deleted file mode 100644 index 63a7a5b..0000000 --- a/src/mvp/v1.1/README.md +++ /dev/null @@ -1,65 +0,0 @@ -# MVP v1.1(GRPO + SFT on Ray)运行说明 - -本目录是一套**独立可运行**的 v1.1 交付:使用 1 个 Ray head(不跑训练)+ 2 个 Ray worker(各 4 GPU)在同一宿主机通过 `docker exec` 协调容器,并通过 **head 上的 `ray job submit`** 提交作业,同时强制 driver 落到 worker。 - -> 远程 dev 环境推荐目录布局: -> -> - `/home2/argus/infra/mvp/` -> - `shared/`(持久化:datasets/hf/jobs/...) -> - `verl/`(代码仓库,用于 prepare / snapshot) -> - `v1.1/`(本目录内容:compose + scripts) - ---- - -## 快速开始(远程机 argus@h1) - -在 `/home2/argus/infra/mvp/v1.1/` 下执行: - -```bash -./scripts/run_all.sh -``` - -说明: -- `run_all.sh` 会按顺序提交 `ppo -> grpo -> sft`,并等待每个 job 结束后再提交下一个(避免 8 卡集群并发提交导致 “available GPUs 0” 直接失败)。 - -等价的“分步执行”: - -```bash -./scripts/00_prereq_check.sh -./scripts/03_cleanup_v1_legacy.sh -./scripts/05_ensure_verl_repo.sh -./scripts/01_up.sh -./scripts/20_start_head.sh -./scripts/21_start_workers.sh -./scripts/30_prepare_data_and_model.sh -./scripts/12_install_py_deps.sh -./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml # no-wait -./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/grpo.yaml # no-wait -./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/sft.yaml # no-wait -./scripts/50_status.sh -``` - -停止并清理: - -```bash -./scripts/02_down.sh -``` - ---- - -## 关键约束(必须满足) - -- **必须通过 head 执行 `ray job submit`** 提交任务(满足“从 head 提交”要求)。 -- **head 不跑训练**:head 以 `--num-cpus=0 --num-gpus=0` 启动;worker 具备自定义资源 `worker_node`;提交时 `--entrypoint-resources='{"worker_node": 1}'` 强制 driver 落 worker。 -- **共享路径统一为 `/private`(容器内)**:compose 将宿主机 `../shared` 挂载到容器内 `/private`,对齐生产环境。 -- **job 级别 code_path**:训练 JobSpec 中的 `code_path` 指向 `/private/common/code/verl/verl_repo`(由 `scripts/30_prepare_data_and_model.sh` 准备)。 - ---- - -## 共享目录(容器内 /private) - -- `/private/datasets/`:数据(PPO 的 gsm8k RL parquet、SFT parquet) -- `/private/hf/`:HF 缓存(模型持久化,避免重复下载) -- `/private/jobs//`:每个 Ray Job 的输出目录(logs/config/debug/checkpoints) -- `/private/common/`:共享区(模型/数据/代码快照) -- `/private/user/`:用户自定义代码(例如 reward_fn) diff --git a/src/mvp/v1.1/py/configs/dev.yaml b/src/mvp/v1.1/py/configs/dev.yaml deleted file mode 100644 index c0244e6..0000000 --- a/src/mvp/v1.1/py/configs/dev.yaml +++ /dev/null @@ -1,38 +0,0 @@ -# Ray 基础配置(dev 环境 / head 容器内视角) -# -# 说明: -# - v1.1 的 SDK submitter 会读取本文件作为 RayConfig。 -# - v2.0 的 API 服务/调度器也复用本文件作为“基础 RayConfig”,并在其上扩展 v2 专属配置项(见 v2:)。 -address: "http://127.0.0.1:8265" - -# 容器内共享根路径(对齐生产 /private) -shared_root: "/private" - -# 强制 driver 落 worker(head 不跑训练) -entrypoint_num_cpus: 1 -entrypoint_resources: - worker_node: 1 - -# 运行时环境变量(所有 job 通用) -runtime_env: - env_vars: - HF_ENDPOINT: "https://hf-mirror.com" - PYTHONUNBUFFERED: "1" - -# 用户自定义代码目录(可被 PYTHONPATH 注入) -user_code_path: "/private/user/code" - -# v2.0 服务层配置(v1.1 submitter 会忽略这些字段;v2.0 服务会读取) -v2: - api: - host: "0.0.0.0" - port: 8080 - auth: - # 内部 token 建议通过环境变量提供,避免写入配置文件 - token_env: "MVP_INTERNAL_TOKEN" - sqlite: - db_path: "/private/common/db/mvp_v2.sqlite3" - scheduler: - tick_s: 5 - retry_interval_s: 60 - max_running_tasks: 1 diff --git a/src/mvp/v1.1/py/mvp_v11/models.py b/src/mvp/v1.1/py/mvp_v11/models.py deleted file mode 100644 index 41f8804..0000000 --- a/src/mvp/v1.1/py/mvp_v11/models.py +++ /dev/null @@ -1,121 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - - -def _require(d: dict[str, Any], key: str) -> Any: - if key not in d or d[key] in (None, ""): - raise ValueError(f"missing required field: {key}") - return d[key] - - -@dataclass(frozen=True) -class RayConfig: - address: str - shared_root: str - entrypoint_num_cpus: float - entrypoint_resources: dict[str, float] - runtime_env_env_vars: dict[str, str] - user_code_path: str - - @staticmethod - def from_dict(d: dict[str, Any]) -> "RayConfig": - runtime_env = d.get("runtime_env") or {} - env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {} - if not isinstance(env_vars, dict): - raise ValueError("runtime_env.env_vars must be a mapping") - - entrypoint_resources = d.get("entrypoint_resources") or {} - if not isinstance(entrypoint_resources, dict): - raise ValueError("entrypoint_resources must be a mapping") - - return RayConfig( - address=str(_require(d, "address")), - shared_root=str(_require(d, "shared_root")), - entrypoint_num_cpus=float(d.get("entrypoint_num_cpus", 1)), - entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()}, - runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()}, - user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")), - ) - - def to_public_dict(self) -> dict[str, Any]: - return { - "address": self.address, - "shared_root": self.shared_root, - "entrypoint_num_cpus": self.entrypoint_num_cpus, - "entrypoint_resources": self.entrypoint_resources, - "runtime_env": {"env_vars": self.runtime_env_env_vars}, - "user_code_path": self.user_code_path, - } - - -@dataclass(frozen=True) -class JobSpec: - workload: str # ppo|grpo|sft - submission_id: str | None - code_path: str - model_id: str - - train_file: str - val_file: str | None - - nnodes: int - n_gpus_per_node: int - - total_epochs: int - total_training_steps: int - - save_freq: int - test_freq: int | None - - trainer_device: str | None # only for sft (driver-side device) - - @staticmethod - def from_dict(d: dict[str, Any]) -> "JobSpec": - workload = str(_require(d, "workload")) - if workload not in ("ppo", "grpo", "sft"): - raise ValueError(f"unsupported workload: {workload}") - - val_file = d.get("val_file", None) - if val_file in ("", "null"): - val_file = None - - test_freq = d.get("test_freq", None) - if test_freq in ("", "null"): - test_freq = None - - return JobSpec( - workload=workload, - submission_id=(str(d["submission_id"]) if d.get("submission_id") else None), - code_path=str(_require(d, "code_path")), - model_id=str(_require(d, "model_id")), - train_file=str(_require(d, "train_file")), - val_file=(str(val_file) if val_file is not None else None), - nnodes=int(d.get("nnodes", 2)), - n_gpus_per_node=int(d.get("n_gpus_per_node", 4)), - total_epochs=int(d.get("total_epochs", 1)), - total_training_steps=int(d.get("total_training_steps", 10)), - save_freq=int(d.get("save_freq", 10)), - test_freq=(int(test_freq) if test_freq is not None else None), - trainer_device=(str(d.get("trainer_device")) if d.get("trainer_device") else None), - ) - - def to_public_dict(self) -> dict[str, Any]: - out: dict[str, Any] = { - "workload": self.workload, - "submission_id": self.submission_id or "", - "code_path": self.code_path, - "model_id": self.model_id, - "train_file": self.train_file, - "val_file": self.val_file, - "nnodes": self.nnodes, - "n_gpus_per_node": self.n_gpus_per_node, - "total_epochs": self.total_epochs, - "total_training_steps": self.total_training_steps, - "save_freq": self.save_freq, - "test_freq": self.test_freq, - } - if self.workload == "sft": - out["trainer_device"] = self.trainer_device or "cpu" - return out diff --git a/src/mvp/v1.1/py/requirements.txt b/src/mvp/v1.1/py/requirements.txt deleted file mode 100644 index e3af026..0000000 --- a/src/mvp/v1.1/py/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -PyYAML>=6.0.1 - diff --git a/src/mvp/v1.1/scripts/12_install_py_deps.sh b/src/mvp/v1.1/scripts/12_install_py_deps.sh deleted file mode 100644 index da1649c..0000000 --- a/src/mvp/v1.1/scripts/12_install_py_deps.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# shellcheck source=lib.sh -source "${SCRIPT_DIR}/lib.sh" - -echo "[head] install python deps for v1.1 SDK submitter (PyYAML)" -dexec "${HEAD_CONTAINER}" bash -lc "pip install -r /workspace/mvp/v1.1/py/requirements.txt" - diff --git a/src/mvp/v1.1/scripts/44_submit_sdk.sh b/src/mvp/v1.1/scripts/44_submit_sdk.sh deleted file mode 100644 index d7bf010..0000000 --- a/src/mvp/v1.1/scripts/44_submit_sdk.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# shellcheck source=lib.sh -source "${SCRIPT_DIR}/lib.sh" - -CONFIG_PATH="${1:-/workspace/mvp/v1.1/py/configs/dev.yaml}" -JOBSPEC_PATH="${2:-}" - -if [[ -z "${JOBSPEC_PATH}" ]]; then - echo "usage: $0 " >&2 - echo "example: $0 /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml" >&2 - exit 1 -fi - -echo "[head] submit via Ray Python SDK" -dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --jobspec '${JOBSPEC_PATH}' --action submit --no-wait" - diff --git a/src/mvp/v2.0/README.md b/src/mvp/v2.0/README.md deleted file mode 100644 index 5f20c18..0000000 --- a/src/mvp/v2.0/README.md +++ /dev/null @@ -1,104 +0,0 @@ -# MVP v2.0(服务化入口) - -v2.0 在 v1.1(Ray Jobs SDK 提交链路)基础上新增一个**服务层**: -- HTTP API 提交任务(PPO/GRPO/SFT) -- 服务侧队列 + gang 资源判断 -- 识别 `verl` fail-fast 的资源不足失败并自动重试 -- SQLite 持久化队列/状态/attempt(NFS:`/private`) - -设计文档见: -- `specs/mvp/v2.0/v2_plan.md` -- `specs/mvp/v2.0/v2_api.md` - -## 快速开始(dev 示例) - -约定: -- Ray 容器仍由 v1.1 的 `docker-compose.yaml` 启动(head+2 workers) -- v2 代码在宿主机:`/home2/argus/infra/mvp/v2/`(容器内挂载 `/workspace/mvp/v2/`) -- v2 配置复用 v1.1:`/workspace/mvp/v1.1/py/configs/dev.yaml`(扩展了 `v2:` 段) - -宿主机执行: - -```bash -export MVP_INTERNAL_TOKEN=... # 内部 token -cd /home2/argus/infra/mvp/v2/scripts -./12_install_v2_deps.sh -./20_start_api.sh -./22_status_api.sh -``` - -API 测试(宿主机): - -```bash -curl -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" http://127.0.0.1:8080/api/v2/queue -``` - -> 进程日志与 pid(容器内路径)默认在 `/private/common/logs/` 与 `/private/common/run/`。 - -## 提交/查询/停止任务 - -约定: -- API 地址(宿主机视角):`http://127.0.0.1:8080` -- 鉴权:`Authorization: Bearer ${MVP_INTERNAL_TOKEN}` -- 请求体:**raw YAML**(JobSpec,格式与 v1.1 jobspec 一致) - -### 1) 提交任务(POST /api/v2/tasks) - -准备一个 jobspec(示例:PPO): - -```yaml -workload: "ppo" -submission_id: "" # v2 会忽略/覆盖,自动生成 task_id 并派生 ray_submission_id -code_path: "/private/common/code/verl/verl_repo" -model_id: "Qwen/Qwen2.5-0.5B-Instruct" -train_file: "/private/datasets/gsm8k/train.parquet" -val_file: "/private/datasets/gsm8k/test.parquet" -nnodes: 2 -n_gpus_per_node: 4 -total_epochs: 1 -total_training_steps: 10 -save_freq: 10 -test_freq: -1 -trainer_device: null -``` - -提交: - -```bash -curl -sS \ - -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ - -H "Content-Type: application/yaml" \ - --data-binary @jobspec.yaml \ - http://127.0.0.1:8080/api/v2/tasks -``` - -返回示例: - -```json -{"task_id":"mvp2-ppo-20251223-082813-6426","state":"QUEUED"} -``` - -### 2) 查询任务(GET /api/v2/tasks/{task_id}) - -```bash -curl -sS \ - -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ - http://127.0.0.1:8080/api/v2/tasks/ | python3 -m json.tool -``` - -可选: -- 查看 attempts:`GET /api/v2/tasks/{task_id}/attempts` -- 拉取日志(latest attempt):`GET /api/v2/tasks/{task_id}/logs?tail=2000` -- 查看队列:`GET /api/v2/queue` - -### 3) 停止/取消任务(POST /api/v2/tasks/{task_id}:cancel) - -```bash -curl -sS -X POST \ - -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ - http://127.0.0.1:8080/api/v2/tasks/:cancel -``` - -说明: -- 若任务已提交到 Ray(`SUBMITTED/RUNNING`),服务会调用 Ray Jobs SDK `stop_job(ray_submission_id)`。 -- 若任务仍在队列(`QUEUED/PENDING_RESOURCES`),服务直接标记 `CANCELED`(不会产生 attempt)。 diff --git a/src/mvp/v2.0/py/mvp_v11/__init__.py b/src/mvp/v2.0/py/mvp_v11/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/src/mvp/v2.0/py/mvp_v11/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/mvp/v2.0/py/mvp_v11/builders.py b/src/mvp/v2.0/py/mvp_v11/builders.py deleted file mode 100644 index d4b9786..0000000 --- a/src/mvp/v2.0/py/mvp_v11/builders.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass - -from .models import JobSpec - - -@dataclass(frozen=True) -class BuiltCommand: - argv: list[str] - - -def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> BuiltCommand: - """ - Returns argv for the actual training process (Hydra overrides preserved). - This argv is executed by a lightweight Python driver entrypoint. - """ - if spec.workload in ("ppo", "grpo"): - algo_overrides: list[str] = [] - if spec.workload == "grpo": - algo_overrides.append("algorithm.adv_estimator=grpo") - - test_freq = spec.test_freq if spec.test_freq is not None else -1 - val_file = spec.val_file if spec.val_file is not None else "null" - - argv = [ - "python3", - "-m", - "verl.trainer.main_ppo", - f"data.train_files={spec.train_file}", - f"data.val_files={val_file}", - "data.train_batch_size=256", - "data.max_prompt_length=512", - "data.max_response_length=512", - f"actor_rollout_ref.model.path={spec.model_id}", - "actor_rollout_ref.actor.optim.lr=1e-6", - "actor_rollout_ref.actor.ppo_mini_batch_size=64", - "actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4", - "actor_rollout_ref.rollout.name=sglang", - "actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8", - "actor_rollout_ref.rollout.tensor_model_parallel_size=1", - "actor_rollout_ref.rollout.gpu_memory_utilization=0.4", - "actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4", - "critic.optim.lr=1e-5", - f"critic.model.path={spec.model_id}", - "critic.ppo_micro_batch_size_per_gpu=4", - "algorithm.kl_ctrl.kl_coef=0.001", - *algo_overrides, - "trainer.logger=console", - "trainer.val_before_train=False", - f"trainer.n_gpus_per_node={spec.n_gpus_per_node}", - f"trainer.nnodes={spec.nnodes}", - f"trainer.save_freq={spec.save_freq}", - f"trainer.test_freq={test_freq}", - f"trainer.total_epochs={spec.total_epochs}", - f"trainer.total_training_steps={spec.total_training_steps}", - "trainer.resume_mode=disable", - f"trainer.default_local_dir={job_dir}/checkpoints", - "+ray_kwargs.ray_init.address=auto", - f"hydra.run.dir={job_dir}/logs/hydra", - ] - return BuiltCommand(argv=argv) - - if spec.workload == "sft": - val_override = "null" if spec.val_file is None else spec.val_file - trainer_device = spec.trainer_device or "cpu" - - argv = [ - "python3", - "-m", - "verl.trainer.sft_trainer_ray", - f"model.path={spec.model_id}", - f"data.train_files={spec.train_file}", - f"data.val_files={val_override}", - "data.train_batch_size=64", - "data.micro_batch_size_per_gpu=1", - "data.max_token_len_per_gpu=2048", - "data.max_length=1024", - "trainer.logger=console", - "trainer.project_name=mvp11-sft", - f"trainer.experiment_name={submission_id}", - f"trainer.total_epochs={spec.total_epochs}", - f"trainer.total_training_steps={spec.total_training_steps}", - f"trainer.save_freq={spec.save_freq}", - "trainer.test_freq=-1", - "trainer.resume_mode=disable", - f"trainer.device={trainer_device}", - f"trainer.default_local_dir={job_dir}/checkpoints", - f"trainer.nnodes={spec.nnodes}", - f"trainer.n_gpus_per_node={spec.n_gpus_per_node}", - f"hydra.run.dir={job_dir}/logs/hydra", - ] - return BuiltCommand(argv=argv) - - raise ValueError(f"unsupported workload: {spec.workload}") - diff --git a/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py b/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py deleted file mode 100644 index 9f99c58..0000000 --- a/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py +++ /dev/null @@ -1,63 +0,0 @@ -from __future__ import annotations - -import argparse -import os -import shlex -import subprocess -import sys -from pathlib import Path - - -def _preflight() -> None: - print("MVP_PRECHECK_PYTHON:", sys.executable, flush=True) - print("MVP_PRECHECK_PYTHONPATH:", os.environ.get("PYTHONPATH"), flush=True) - print("MVP_PRECHECK_MVP_CODE_PATH:", os.environ.get("MVP_CODE_PATH"), flush=True) - try: - import verl # type: ignore - - print("MVP_PRECHECK_VERL_FILE:", getattr(verl, "__file__", None), flush=True) - except Exception as e: - print("MVP_PRECHECK_VERL_IMPORT_ERROR:", repr(e), flush=True) - - try: - import mvp_marker # type: ignore - - print("MVP_PRECHECK_MARKER:", getattr(mvp_marker, "MARKER", None), flush=True) - except Exception as e: - print("MVP_PRECHECK_MARKER_MISSING:", repr(e), flush=True) - - -def main() -> int: - parser = argparse.ArgumentParser() - parser.add_argument("--job-dir", required=True) - parser.add_argument("cmd", nargs=argparse.REMAINDER) - args = parser.parse_args() - - job_dir = Path(args.job_dir) - job_dir.mkdir(parents=True, exist_ok=True) - - _preflight() - - if not args.cmd: - print("no command provided", file=sys.stderr) - return 2 - - # argparse includes the leading "--" if the caller uses it; strip it. - cmd = list(args.cmd) - if cmd and cmd[0] == "--": - cmd = cmd[1:] - if not cmd: - print("no command provided", file=sys.stderr) - return 2 - - # Execute training command as a subprocess so that logs are captured by Ray job logs. - cmd_str = " ".join(shlex.quote(x) for x in cmd) - print("MVP_DRIVER_EXEC:", cmd_str, flush=True) - - proc = subprocess.run(cmd, check=False) - print("MVP_DRIVER_EXIT_CODE:", proc.returncode, flush=True) - return proc.returncode - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py b/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py deleted file mode 100644 index 01b899a..0000000 --- a/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py +++ /dev/null @@ -1,171 +0,0 @@ -from __future__ import annotations - -import json -import os -import shlex -from datetime import datetime -from pathlib import Path -from typing import Any - -import ray -from ray.job_submission import JobSubmissionClient - -from .builders import build_training_argv -from .models import JobSpec, RayConfig -from .yaml_io import dump_yaml - - -def _ts() -> str: - return datetime.now().strftime("%Y%m%d_%H%M%S") - - -def _mkdir(p: Path) -> None: - p.mkdir(parents=True, exist_ok=True) - - -def _write_text(p: Path, content: str) -> None: - _mkdir(p.parent) - p.write_text(content, encoding="utf-8") - - -def _write_json(p: Path, obj: Any) -> None: - _write_text(p, json.dumps(obj, indent=2, ensure_ascii=False) + "\n") - - -def _safe_basename(path: str) -> str: - return path.rstrip("/").split("/")[-1] - - -class RayJobTool: - def __init__(self, cfg: RayConfig): - self.cfg = cfg - self.client = JobSubmissionClient(cfg.address) - - def _job_dir(self, submission_id: str) -> str: - return f"{self.cfg.shared_root}/jobs/{submission_id}" - - def _runtime_env(self, spec: JobSpec) -> dict[str, Any]: - env_vars = dict(self.cfg.runtime_env_env_vars) - - # Default HF cache - env_vars.setdefault("HF_HOME", f"{self.cfg.shared_root}/hf") - env_vars.setdefault("HUGGINGFACE_HUB_CACHE", f"{self.cfg.shared_root}/hf/hub") - env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers") - env_vars.setdefault("PYTHONUNBUFFERED", "1") - - # Tool code path must be importable on workers (compose mounts v1.1 into all containers). - # Place it before verl code to avoid interfering with verl import priority. - tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py") - - user_code_path = self.cfg.user_code_path - code_path = spec.code_path - - existing = env_vars.get("PYTHONPATH", "") - prefix = f"{tool_code_path}:{code_path}:{user_code_path}" - env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix - - # For debugging / log visibility - env_vars["MVP_CODE_PATH"] = code_path - - # SFT: ensure ray.init() connects to the cluster - if spec.workload == "sft": - env_vars.setdefault("RAY_ADDRESS", "auto") - - return {"env_vars": env_vars} - - def submit(self, spec: JobSpec, no_wait: bool) -> str: - submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}" - job_dir = self._job_dir(submission_id) - - built = build_training_argv(spec, submission_id=submission_id, job_dir=job_dir) - entrypoint_argv = [ - "python3", - "-m", - "mvp_v11.driver_entrypoint", - "--job-dir", - job_dir, - *built.argv, - ] - entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv) - - runtime_env = self._runtime_env(spec) - - # Prepare job artifacts directory - job_root = Path(job_dir) - _mkdir(job_root / "config") - _mkdir(job_root / "logs") - _mkdir(job_root / "debug") - _mkdir(job_root / "checkpoints") - - _write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict())) - _write_text(job_root / "config" / "jobspec.yaml", dump_yaml(spec.to_public_dict())) - _write_json(job_root / "config" / "submit_payload.json", { - "submission_id": submission_id, - "address": self.cfg.address, - "entrypoint": entrypoint, - "entrypoint_num_cpus": self.cfg.entrypoint_num_cpus, - "entrypoint_resources": self.cfg.entrypoint_resources, - "runtime_env": runtime_env, - }) - - # Pre-submit debug snapshot (ray cluster resources via ray.init) - try: - ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False) - _write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources()) - _write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources()) - except Exception as e: - _write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n") - - try: - submitted = self.client.submit_job( - entrypoint=entrypoint, - submission_id=submission_id, - runtime_env=runtime_env, - entrypoint_num_cpus=self.cfg.entrypoint_num_cpus, - entrypoint_resources=self.cfg.entrypoint_resources, - ) - except Exception as e: - _write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n") - raise - - _write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n") - - # Post-submit debug snapshot via SDK - try: - jobs = self.client.list_jobs() - _write_text( - job_root / "debug" / "ray_job_list_post.json", - json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n", - ) - except Exception as e: - _write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n") - - if not no_wait: - # caller can separately wait; keep submit non-blocking by default in scripts - pass - - return submitted - - def status(self, submission_id: str) -> str: - return str(self.client.get_job_status(submission_id)) - - def stop(self, submission_id: str) -> bool: - return bool(self.client.stop_job(submission_id)) - - def logs(self, submission_id: str) -> str: - return self.client.get_job_logs(submission_id) - - def list(self) -> list[dict[str, Any]]: - return [_job_details_to_dict(j) for j in self.client.list_jobs()] - - -def _job_details_to_dict(obj: Any) -> dict[str, Any]: - # Ray uses pydantic models internally, but depending on bundled pydantic version - # we might get `.model_dump()` (v2) or `.dict()` (v1). - if hasattr(obj, "model_dump"): - return obj.model_dump() # type: ignore[no-any-return] - if hasattr(obj, "dict"): - return obj.dict() # type: ignore[no-any-return] - if hasattr(obj, "__dict__"): - return dict(obj.__dict__) - return {"repr": repr(obj)} diff --git a/src/mvp/v2.0/py/mvp_v11/yaml_io.py b/src/mvp/v2.0/py/mvp_v11/yaml_io.py deleted file mode 100644 index c321688..0000000 --- a/src/mvp/v2.0/py/mvp_v11/yaml_io.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Any - -import yaml - - -def load_yaml(path: str) -> dict[str, Any]: - p = Path(path) - data = yaml.safe_load(p.read_text(encoding="utf-8")) - if data is None: - return {} - if not isinstance(data, dict): - raise ValueError(f"yaml root must be a mapping: {path}") - return data - - -def dump_yaml(data: dict[str, Any]) -> str: - return yaml.safe_dump(data, sort_keys=False, allow_unicode=True) - diff --git a/src/mvp/v2.0/scripts/12_install_v2_deps.sh b/src/mvp/v2.0/scripts/12_install_v2_deps.sh deleted file mode 100755 index 38f3544..0000000 --- a/src/mvp/v2.0/scripts/12_install_v2_deps.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Install v2.0 API dependencies inside the head container (best-effort). -# Assumes v1.1 containers are already up and v2.0 code is mounted/available. - -HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}" - -docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true" -docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/v2/py/requirements.txt" diff --git a/src/mvp/v2.0/scripts/20_start_api.sh b/src/mvp/v2.0/scripts/20_start_api.sh deleted file mode 100755 index 81f328c..0000000 --- a/src/mvp/v2.0/scripts/20_start_api.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# shellcheck source=lib.sh -source "${SCRIPT_DIR}/lib.sh" - -CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/v1.1/py/configs/dev.yaml}" -LOG_PATH="${LOG_PATH:-/private/common/logs/mvp_v2_api.log}" -PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" - -echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}" - -dexec bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\"" - -# Note: requires /workspace/mvp/v2.0/py to be present in the container (mounted or copied). -# Escape $ so that the command substitution happens in the container, not on the host. -dexec bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi" - -if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then - echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2 - exit 1 -fi - -docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/v2/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'" - -echo "[host] started; pid stored in ${PID_PATH} (container path)" -echo "[host] logs: ${LOG_PATH} (container path)" diff --git a/src/mvp/v2.0/scripts/21_stop_api.sh b/src/mvp/v2.0/scripts/21_stop_api.sh deleted file mode 100755 index 4d711ca..0000000 --- a/src/mvp/v2.0/scripts/21_stop_api.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# shellcheck source=lib.sh -source "${SCRIPT_DIR}/lib.sh" - -PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" - -echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})" - -dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'" diff --git a/src/mvp/v2.0/scripts/22_status_api.sh b/src/mvp/v2.0/scripts/22_status_api.sh deleted file mode 100755 index 0b3a8b6..0000000 --- a/src/mvp/v2.0/scripts/22_status_api.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# shellcheck source=lib.sh -source "${SCRIPT_DIR}/lib.sh" - -PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" - -dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi" diff --git a/src/mvp/v2.0/scripts/lib.sh b/src/mvp/v2.0/scripts/lib.sh deleted file mode 100755 index 5f2c9bf..0000000 --- a/src/mvp/v2.0/scripts/lib.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# v2.0 scripts are intended to run on the host and control the existing Ray containers -# (same topology as v1.1). Adjust container names via env vars if needed. - -HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}" - -dexec() { - docker exec -i "${HEAD_CONTAINER}" "$@" -} -