From 64558c8cea46c1f7bbe80abd8d482da4e8aed351 Mon Sep 17 00:00:00 2001 From: yuyr Date: Tue, 23 Dec 2025 17:17:36 +0800 Subject: [PATCH] =?UTF-8?q?mvp=202.0=20=E9=AA=8C=E6=94=B6=E9=80=9A?= =?UTF-8?q?=E8=BF=87=EF=BC=8C=E5=AE=9E=E7=8E=B0=E5=9F=BA=E6=9C=ACAPI?= =?UTF-8?q?=E6=8F=90=E4=BA=A4=EF=BC=8C=E6=9F=A5=E8=AF=A2=EF=BC=8C=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E4=BB=BB=E5=8A=A1=EF=BC=8C=E5=B9=B6=E4=B8=94=E6=9C=89?= =?UTF-8?q?=E7=AE=80=E5=8D=95=E7=9A=84FIFO=E6=8E=92=E9=98=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- specs/mvp/v2.0/v2_api.md | 194 ++++++++++++ specs/mvp/v2.0/v2_plan.md | 306 +++++++++++++++++++ src/mvp/v1.1/docker-compose.yaml | 2 + src/mvp/v1.1/py/configs/dev.yaml | 18 ++ src/mvp/v2.0/README.md | 104 +++++++ src/mvp/v2.0/py/mvp_v11/__init__.py | 1 + src/mvp/v2.0/py/mvp_v11/builders.py | 96 ++++++ src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py | 63 ++++ src/mvp/v2.0/py/mvp_v11/models.py | 121 ++++++++ src/mvp/v2.0/py/mvp_v11/ray_job_tool.py | 171 +++++++++++ src/mvp/v2.0/py/mvp_v11/yaml_io.py | 21 ++ src/mvp/v2.0/py/mvp_v2/__init__.py | 2 + src/mvp/v2.0/py/mvp_v2/app.py | 190 ++++++++++++ src/mvp/v2.0/py/mvp_v2/config.py | 68 +++++ src/mvp/v2.0/py/mvp_v2/db.py | 254 +++++++++++++++ src/mvp/v2.0/py/mvp_v2/ids.py | 15 + src/mvp/v2.0/py/mvp_v2/ray_resources.py | 37 +++ src/mvp/v2.0/py/mvp_v2/scheduler.py | 185 +++++++++++ src/mvp/v2.0/py/requirements.txt | 4 + src/mvp/v2.0/py/server.py | 33 ++ src/mvp/v2.0/scripts/12_install_v2_deps.sh | 10 + src/mvp/v2.0/scripts/20_start_api.sh | 28 ++ src/mvp/v2.0/scripts/21_stop_api.sh | 12 + src/mvp/v2.0/scripts/22_status_api.sh | 10 + src/mvp/v2.0/scripts/lib.sh | 12 + 25 files changed, 1957 insertions(+) create mode 100644 specs/mvp/v2.0/v2_api.md create mode 100644 specs/mvp/v2.0/v2_plan.md create mode 100644 src/mvp/v2.0/README.md create mode 100644 src/mvp/v2.0/py/mvp_v11/__init__.py create mode 100644 src/mvp/v2.0/py/mvp_v11/builders.py create mode 100644 src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py create mode 100644 src/mvp/v2.0/py/mvp_v11/models.py create mode 100644 src/mvp/v2.0/py/mvp_v11/ray_job_tool.py create mode 100644 src/mvp/v2.0/py/mvp_v11/yaml_io.py create mode 100644 src/mvp/v2.0/py/mvp_v2/__init__.py create mode 100644 src/mvp/v2.0/py/mvp_v2/app.py create mode 100644 src/mvp/v2.0/py/mvp_v2/config.py create mode 100644 src/mvp/v2.0/py/mvp_v2/db.py create mode 100644 src/mvp/v2.0/py/mvp_v2/ids.py create mode 100644 src/mvp/v2.0/py/mvp_v2/ray_resources.py create mode 100644 src/mvp/v2.0/py/mvp_v2/scheduler.py create mode 100644 src/mvp/v2.0/py/requirements.txt create mode 100644 src/mvp/v2.0/py/server.py create mode 100755 src/mvp/v2.0/scripts/12_install_v2_deps.sh create mode 100755 src/mvp/v2.0/scripts/20_start_api.sh create mode 100755 src/mvp/v2.0/scripts/21_stop_api.sh create mode 100755 src/mvp/v2.0/scripts/22_status_api.sh create mode 100755 src/mvp/v2.0/scripts/lib.sh diff --git a/specs/mvp/v2.0/v2_api.md b/specs/mvp/v2.0/v2_api.md new file mode 100644 index 0000000..3e859b7 --- /dev/null +++ b/specs/mvp/v2.0/v2_api.md @@ -0,0 +1,194 @@ +# MVP v2.0 API 设计(最小可用) + +v2.0 的 API 目标是:把 v1.1 的“脚本提交”变成“服务化提交”,并在服务侧实现队列/重试/状态聚合。 + +约束: +- 内部 token 鉴权(简单即可)。 +- Ray Job 提交必须使用 **Ray Python SDK**(`JobSubmissionClient`),不使用 `requests` 手写 HTTP。 +- 输出与状态必须落盘到 NFS(容器内 `/private`)。 + +--- + +## 1. 鉴权 + +- Header:`Authorization: Bearer ` +- v2.0 不做用户体系与权限隔离;token 只是“防误用”。 +- 配置建议:复用 `src/mvp/v1.1/py/configs/dev.yaml` 并在 `v2.auth.token_env` 指定 token 环境变量名。 + +## 1.1 运行位置(dev 示例) + +- 服务进程运行在 **Ray head 容器**(便于访问 Ray Job server)。 +- 宿主机侧用脚本控制(`docker exec`): + - `src/mvp/v2.0/scripts/20_start_api.sh` + - `src/mvp/v2.0/scripts/21_stop_api.sh` + - `src/mvp/v2.0/scripts/22_status_api.sh` +- 远程机目录约定(示例):`argus@h1:/home2/argus/infra/mvp/v2/`,容器内挂载到 `/workspace/mvp/v2/`。 + +--- + +## 2. 资源与 ID 约定 + +### 2.1 task_id(服务层主 ID) + +- 格式建议:`mvp2----` + - 示例:`mvp2-ppo-20251223-143201-7f3a` + +### 2.2 ray_submission_id(attempt 级 ID) + +- 由 service 派生:`--a` + - 示例:`mvp2-ppo-20251223-143201-7f3a--a01` + +好处: +- Ray 的 submission id 自带 task_id,可直接从 Ray dashboard 反查到服务侧任务。 +- `/private/jobs//...` 目录天然隔离且可读。 + +--- + +## 3. JobSpec(请求体) + +v2.0 **要求 JobSpec 使用 v1.1 同款 YAML**(字段与语义保持一致),服务端接收 YAML 文本并解析后入库(同时原样保存 `jobspec_yaml` 便于审计/复现)。 + +最小字段(示例 YAML): + +```yaml +workload: "ppo" +submission_id: "" # v2.0 服务端会忽略/覆盖(由 task_id 派生 ray_submission_id) +code_path: "/private/common/code/verl/verl_repo" +model_id: "Qwen/Qwen2.5-0.5B-Instruct" +train_file: "/private/datasets/gsm8k/train.parquet" +val_file: "/private/datasets/gsm8k/test.parquet" +nnodes: 2 +n_gpus_per_node: 4 +total_epochs: 1 +total_training_steps: 10 +save_freq: 10 +test_freq: -1 +trainer_device: null # 仅 sft 使用(通常 "cpu") +``` + +说明: +- `trainer_device` 仅对 `sft` 生效(通常为 `cpu`,避免 driver 无 GPU)。 +- `val_file` 可为 `null`(例如 SFT)。 + +--- + +## 4. API 端点 + +### 4.1 提交任务 + +`POST /api/v2/tasks` + +Request body: +- **raw JobSpec YAML**(与 v1.1 jobspec YAML 结构一致) + +Headers: +- `Content-Type: application/yaml`(或 `text/yaml`) + +Response: +```json +{ + "task_id": "mvp2-ppo-20251223-143201-7f3a", + "state": "QUEUED" +} +``` + +### 4.2 查询任务(聚合状态) + +`GET /api/v2/tasks/{task_id}` + +Response(示例): +```json +{ + "task_id": "mvp2-ppo-20251223-143201-7f3a", + "workload": "ppo", + "state": "RUNNING", + "desired_resources": {"nnodes": 2, "n_gpus_per_node": 4, "total_gpus": 8}, + "latest_attempt": { + "attempt_no": 1, + "ray_submission_id": "mvp2-ppo-20251223-143201-7f3a--a01", + "ray_status": "RUNNING", + "start_time": "2025-12-23T14:32:10+08:00" + }, + "error_summary": null +} +``` + +### 4.3 列出 attempts + +`GET /api/v2/tasks/{task_id}/attempts` + +Response: +```json +{ + "task_id": "mvp2-ppo-20251223-143201-7f3a", + "attempts": [ + { + "attempt_no": 1, + "ray_submission_id": "mvp2-ppo-20251223-143201-7f3a--a01", + "ray_status": "FAILED", + "failure_kind": "INSUFFICIENT_RESOURCES", + "message": "Total available GPUs 0 is less than total desired GPUs 8", + "start_time": "...", + "end_time": "..." + } + ] +} +``` + +### 4.4 取消任务 + +`POST /api/v2/tasks/{task_id}:cancel` + +行为: +- 若 task 处于 `SUBMITTED/RUNNING`:调用 Ray Jobs SDK `stop_job(ray_submission_id)` 并标记 `CANCELED` +- 若处于 `QUEUED/PENDING_RESOURCES`:直接标记 `CANCELED`(不提交) + +Response: +```json +{"task_id":"...","state":"CANCELED"} +``` + +### 4.5 获取日志 + +`GET /api/v2/tasks/{task_id}/logs?attempt=latest&tail=2000` + +返回: +- `text/plain`(直接透传 Ray Job logs tail) + +说明: +- v2.0 先用 Ray SDK `get_job_logs()`。 +- 若需要更稳定的归档,可在 scheduler 定期抓取并落盘(v2.1+)。 + +### 4.6 列出队列(运维/调试) + +`GET /api/v2/queue` + +Response: +```json +{ + "pending": [{"task_id":"...","state":"PENDING_RESOURCES","next_run_at":"..."}], + "running": [{"task_id":"...","ray_submission_id":"..."}] +} +``` + +--- + +## 5. 错误码(最小) + +- `400`:jobspec 缺字段/非法 +- `401`:token 不正确 +- `404`:task 不存在 +- `409`:状态冲突(例如已终态又 cancel) +- `500`:服务内部错误 + +--- + +## 6. SQLite 持久化(API 可见性) + +v2.0 服务端使用 SQLite 持久化保存: +- tasks(`task_id`、`state`、`jobspec_yaml`、`next_run_at`、`latest_attempt_no` 等) +- attempts(`ray_submission_id`、`ray_status`、失败原因等) + +因此: +- `GET /api/v2/tasks/{task_id}` 的数据来自 SQLite(再叠加 Ray 状态同步的结果)。 +- 进程重启后,队列可恢复,`PENDING_RESOURCES` 的任务会在 `next_run_at` 到期后继续尝试提交。 diff --git a/specs/mvp/v2.0/v2_plan.md b/specs/mvp/v2.0/v2_plan.md new file mode 100644 index 0000000..4b0a27e --- /dev/null +++ b/specs/mvp/v2.0/v2_plan.md @@ -0,0 +1,306 @@ +# MVP v2.0 开发计划(服务化入口 + 队列调度 + Ray Jobs SDK) + +目标:在 v1.1(脚本 + Ray Jobs SDK)已验收通过的基础上,交付一个**可独立运行的最小“服务层”**: +- 用户通过 **HTTP API** 提交训练任务(PPO/GRPO/SFT)。 +- 服务层分配一个**人类易读的任务 ID**(`task_id`),并把任务放入队列。 +- 后台调度器在资源满足时再向 Ray 集群提交 Ray Job,并持续追踪 Ray Job 状态。 +- 针对 `verl` 的 **fail-fast 资源预检查**(资源不足直接 `ValueError` 失败)做“服务级重试/排队”,避免用户反复手工提交。 + +> 约束继承 v1.1:head 不跑训练;driver 必须落到 worker;共享存储只考虑 NFS(容器内 `/private`)。 + +--- + +## 1. 背景:为什么 v2.0 需要“服务层调度” + +在 v1.1 中我们通过 Ray Job 提交 `verl` 训练任务。`verl` PPO/GRPO 在初始化 worker 时会创建资源池,并做一次 fail-fast 的资源检查: +- 触发点:`ResourcePoolManager.create_resource_pool()` 末尾调用 `_check_resource_available()` +- `_check_resource_available()` 使用 `ray._private.state.available_resources_per_node()` 统计“可用 GPU/NPU”,如果不足则直接抛异常: + - `ValueError: Total available GPUs 0 is less than total desired GPUs 8` + +这是一种合理的选择(避免 Ray 层面无限 pending/卡死),但会带来一个平台侧问题: +- 当集群暂时没有足够资源时,用户提交会“立刻失败”,需要手动重试。 + +因此 v2.0 的服务层要提供: +- **队列 + gang 约束**:资源不满足则任务在服务层 pending(不提交到 Ray)。 +- **状态追踪**:一旦提交到 Ray,持续获取 Ray Job 状态并回传给用户。 +- **资源不足的“自动重试”**:即使发生 race(提交时资源够、启动时被抢走),也能识别该类失败并延迟重试。 + +--- + +## 2. v2.0 交付范围(Scope) + +### 2.1 必做(MVP v2.0) + +1) **HTTP API**(内部 token): + - 提交任务、查询任务、取消任务、拉取日志(最小可用)。 +2) **任务队列与调度器**: + - FIFO(先到先服务),无配额/公平性(留给 v3+)。 + - gang:按 `nnodes` + `n_gpus_per_node` 的固定资源需求“全有才提交”。 +3) **Ray Jobs SDK 集成**(不使用 `requests` 自己拼 HTTP): + - 通过 `ray.job_submission.JobSubmissionClient` submit/status/stop/logs。 +4) **可观测/可排障最小集**: + - 每个 task/attempt 落盘配置、提交载荷、Ray 返回的 `submission_id`、关键日志。 +5) **失败策略**: + - 识别 “资源不足 fail-fast” 类失败 → 转为 `PENDING_RESOURCES` 并延迟重试。 + - 其他失败保持 `FAILED`(不自动重试,避免掩盖错误)。 + +### 2.2 不做(v2.0 不实现) + +- 多租户/配额/优先级/公平性调度(v3)。 +- Pipeline(多 job 串联)(v3+)。 +- 完整 UI(v3+,v2.0 可只提供 OpenAPI/Swagger)。 +- K8s 编排(明确不做,仍是 Native Ray)。 + +--- + +## 2.3 工程原则(开闭原则 / 复用 v1.1) + +v2.0 研发遵循开闭原则(Open/Closed Principle): +- **对扩展开放**:新增“服务层(API + scheduler + SQLite)”能力以支持排队、重试、状态聚合。 +- **对修改关闭**:尽量不改动 v1.1 已经稳定可用的 Ray Jobs SDK 提交链路代码。 + +落地方式: +- 将 `src/mvp/v1.1/py/mvp_v11/` 作为“成熟可用提交层”,原样拷贝到 `src/mvp/v2.0/py/mvp_v11/` 供 v2.0 复用。 +- v2.0 的新增功能全部在新模块实现(例如 `src/mvp/v2.0/py/mvp_v2/`),通过组合/封装来调用 `mvp_v11`,避免在旧代码中掺杂平台逻辑。 + +--- + +## 3. 总体架构(v2.0) + +### 3.1 组件 + +- **mvp-api**(HTTP Server) + - 接收 JobSpec(结构化字段保持与 v1.1 一致的语义) + - 生成 `task_id` 并写入持久化 + - 提供 query/cancel/logs + +- **mvp-scheduler**(后台调度器,可与 api 同进程也可拆进程) + - 轮询队列:对 `PENDING_RESOURCES` 的任务做资源判断 + - 资源满足 → 调用 Ray Jobs SDK 提交 → 记录 `ray_submission_id` + - 对 `SUBMITTED/RUNNING` 的任务持续同步 Ray Job 状态 + - 如果 Ray Job 失败且命中资源不足模式 → 延迟重试 + +> 部署建议:v2.0 先在 **head 容器**内运行该服务(dev/prod 行为一致;生产环境只能 ssh 进入容器纳管)。 + +### 3.4 dev 环境目录约定(示例) + +以当前远程开发机为例(`argus@h1`): +- 宿主机目录:`/home2/argus/infra/mvp/v2/` +- 容器内挂载:`/workspace/mvp/v2/` +- 共享 NFS:容器内统一为 `/private/`(与 v1.1 保持一致) + +> 注意:服务脚本(`v2/scripts/*.sh`)应在**宿主机**执行,通过 `docker exec` 控制 head 容器;训练 driver 仍通过 Ray entrypoint_resources 强制落到 worker。 + +### 3.2 与 Ray/容器的关系 + +- 服务进程运行在 head(或等价能访问 head 的 Job server 地址)。 +- 提交时仍使用 v1.1 的强约束: + - head:`--num-cpus=0 --num-gpus=0` + - worker:`--resources='{\"worker_node\": 100}'` + - job entrypoint:`entrypoint_resources={\"worker_node\": 1}` 强制 driver 落 worker + +--- + +## 3.3 配置约定(复用 v1.1 dev.yaml 并扩展) + +v2.0 的服务层(API + scheduler)建议复用 v1.1 已存在的 RayConfig 文件: +- `src/mvp/v1.1/py/configs/dev.yaml` + +原因: +- 其中已包含 v1.1 运行所需的 Ray 基础配置(Ray Job server address、entrypoint_resources、runtime_env 等),v2.0 也需要同样的信息来提交 Ray Jobs。 + +扩展方式: +- 在该 YAML 中新增一个顶层 `v2:` section,存放 v2 服务专属配置(API 监听、SQLite 路径、scheduler 间隔等)。 +- v1.1 submitter 只读取 `address/shared_root/entrypoint_* /runtime_env/user_code_path`,会忽略 `v2:` 之类的额外字段;因此不会破坏 v1.1。 + +最小新增项建议(示例): +- `v2.api.host` / `v2.api.port` +- `v2.auth.token_env`(内部 token 环境变量名) +- `v2.sqlite.db_path`(建议 `/private/common/db/mvp_v2.sqlite3`) +- `v2.scheduler.tick_s` / `v2.scheduler.retry_interval_s` / `v2.scheduler.max_running_tasks` + +--- + +## 4. 核心数据模型(Task / Attempt) + +### 4.1 Task(用户视角的任务) + +- `task_id`:**人类易读**且唯一,例如: + - `mvp2-ppo-20251223-143201-7f3a` +- `workload`:`ppo|grpo|sft` +- `jobspec`:提交参数(**保持 v1.1 的 jobspec YAML 字段与语义**;服务端解析 YAML 后入库) +- `state`:见第 5 节状态机 +- `created_at` / `updated_at` +- `latest_attempt`:指向当前 attempt +- `attempts[]`:历史尝试列表 +- `error_summary`:面向用户的简短错误(最后一次失败原因) + +### 4.2 Attempt(一次真实的 Ray Job 提交) + +- `attempt_no`:从 1 开始递增 +- `ray_submission_id`:建议派生自 task_id: + - `ray_submission_id = --a01` + - 好处:Ray 侧输出目录天然可读、可追溯 +- `status`:Ray Job 状态(PENDING/RUNNING/SUCCEEDED/FAILED/STOPPED) +- `start_time` / `end_time` +- `exit_code`(如可取) +- `failure_kind`(枚举): + - `INSUFFICIENT_RESOURCES`(匹配 “Total available GPUs … less than total desired …”) + - `USER_ERROR`(配置/数据路径错误等) + - `RUNTIME_ERROR`(代码异常) + - `UNKNOWN` + +--- + +## 5. 状态机(服务侧) + +建议最小状态集: + +- `QUEUED`:已入队,尚未进行资源判断 +- `PENDING_RESOURCES`:资源不足,等待(服务侧 pending,不提交 Ray) +- `SUBMITTING`:正在向 Ray 提交 attempt +- `SUBMITTED`:Ray 已接受 submission(拿到 `ray_submission_id`) +- `RUNNING`:Ray Job RUNNING +- `SUCCEEDED`:任务成功(终态) +- `FAILED`:任务失败(终态,除非命中“资源不足重试策略”) +- `CANCELED`:用户取消(终态) + +关键转换: +- `QUEUED -> PENDING_RESOURCES`:资源不足 +- `QUEUED/PENDING_RESOURCES -> SUBMITTING`:资源满足 +- `SUBMITTING -> SUBMITTED`:提交成功 +- `SUBMITTED -> RUNNING`:Ray 状态推进 +- `SUBMITTED/RUNNING -> SUCCEEDED|FAILED`:Ray 终态 +- `FAILED (INSUFFICIENT_RESOURCES) -> PENDING_RESOURCES`:进入延迟重试(attempt_no+1) + +--- + +## 6. 调度策略(v2.0) + +### 6.1 资源计算(对齐 verl 的“可用资源”口径) + +由于 verl 使用 `ray._private.state.available_resources_per_node()` 做“可用资源”统计, +v2.0 的 scheduler 应该尽量使用相同口径,避免: +- 我们认为够了 → 实际 verl 认为不够(仍 fail-fast) +- 我们认为不够 → 实际够了(浪费) + +策略(建议): +1) scheduler 周期性获取 per-node 可用 GPU +2) 计算 total_available_gpus = sum(node_gpu_available) +3) 任务需求 total_required_gpus = nnodes * n_gpus_per_node +4) 如果 `total_available_gpus < total_required_gpus` → `PENDING_RESOURCES` + +注意:v2.0 先只做总量判断;节点级分配(保证每个 node 恰好 n_gpus_per_node)可作为 v2.1+(资源池/标签/节点纳管)增强点。 + +### 6.2 排队与并发 + +- 默认 FIFO。 +- 并发度:允许同时跑多个任务,但必须保证资源足够。 + - 简化实现:如果任务默认都吃满 8 卡,则 scheduler 实际上一次只能跑一个。 + - 若未来支持小任务(1*1、1*4),可以自然并发。 + +### 6.3 重试策略(资源不足) + +当出现下面模式时判定为 `INSUFFICIENT_RESOURCES`: +- Ray Job `status=FAILED` +- `JobDetails.message` 或 `job logs` 中匹配: + - `Total available GPUs` 且 `less than total desired` + +处理: +- 将 task 置为 `PENDING_RESOURCES` +- `next_run_at = now + 60s`(固定间隔;v2.1 可改指数退避) +- attempt_no++ 后重提(新 submission id) + +--- + +## 7. SQLite 持久化(队列/状态/attempt) + +v2.0 引入一个**最小但可恢复的持久化层**:使用 SQLite 保存任务队列与状态,确保: +- api/scheduler 进程重启后,队列不丢; +- task/attempt 历史可追溯; +- 能实现“服务侧 pending + 延迟重试”的确定性行为。 + +### 7.1 存放位置 + +建议路径(容器内): +- `DB_PATH=/private/common/db/mvp_v2.sqlite3` + +说明: +- v2.0 默认单实例服务(单 writer),SQLite 足够。 +- 生产环境若 NFS 上的 SQLite 有锁/性能风险,v2.1+ 再演进到 Postgres/Redis;v2.0 先以“可回放/可恢复”为第一目标。 + +### 7.2 表设计(建议最小集合) + +- `tasks` + - `task_id` (PK) + - `workload` + - `state`(服务侧状态机) + - `jobspec_yaml`(原始 YAML 文本,原样落盘便于审计/复现) + - `created_at`, `updated_at` + - `next_run_at`(用于 `PENDING_RESOURCES` 的延迟重试) + - `error_summary` + - `latest_attempt_no` + +- `attempts` + - `task_id` (FK) + - `attempt_no` + - `ray_submission_id` + - `ray_status` + - `failure_kind` + - `message`(截断后的关键信息) + - `start_time`, `end_time` + +- `events`(可选,但非常利于排障) + - `id` (PK) + - `task_id` + - `ts` + - `event_type`(STATE_TRANSITION / SUBMIT / RAY_STATUS_SYNC / RETRY_SCHEDULED 等) + - `payload_json` + +### 7.3 调度循环(与 SQLite 的交互) + +scheduler 每个 tick 做三件事: +1) **挑选可运行任务**(FIFO + next_run_at): + - `state IN ('QUEUED','PENDING_RESOURCES') AND next_run_at <= now` +2) **资源判断**(对齐 verl 的可用资源口径): + - 不满足:更新 `state='PENDING_RESOURCES'`,并写入 `next_run_at=now+60s` +3) **提交 Ray Job 并追踪**: + - 提交成功:写入 `attempts` 并更新 `tasks.latest_attempt_no`、`state='SUBMITTED'` + - 周期性同步 Ray 状态:`SUBMITTED/RUNNING -> SUCCEEDED/FAILED` + - 若失败命中资源不足模式:`FAILED -> PENDING_RESOURCES` + 计划下次重试 + +--- + +## 8. 接口与验收(DoD) + +### 8.1 API 能力(最小集合) + +详见 `specs/mvp/v2.0/v2_api.md`。 + +### 8.2 验收口径(DoD) + +1) API 提交 PPO/GRPO/SFT,返回 `task_id`,并在 NFS 上创建任务目录。 +2) 当集群忙(GPU 不足)时: + - task 状态为 `PENDING_RESOURCES`(不是 FAILED) + - 一旦资源释放,任务自动变为 `SUBMITTED/RUNNING` +3) 当 race 导致触发 verl fail-fast: + - attempt 标记为 `INSUFFICIENT_RESOURCES` + - task 回到 `PENDING_RESOURCES`,并在 60s 后自动重试 +4) 通过 API 查询 task 能看到: + - 当前 state + - 最新 attempt 的 `ray_submission_id` + - attempt 历史(至少包含开始/结束/失败原因) +5) Cancel 能停止正在运行的 Ray Job(调用 Ray Jobs SDK stop) + +--- + +## 9. v2.0 交付物建议(目录) + +`specs/mvp/v2.0/`(本目录): +- `v2_plan.md`:总体设计与开发计划(本文件) +- `v2_api.md`:API 详细定义(请求/响应/字段/错误码) + +代码建议位置(后续实现时): +- `src/mvp/v2.0/` + - `py/`:API server + scheduler + - `scripts/`:启动/停止/查看状态(仍沿用 v1.1 的 compose/cluster 逻辑) diff --git a/src/mvp/v1.1/docker-compose.yaml b/src/mvp/v1.1/docker-compose.yaml index a885956..43892df 100644 --- a/src/mvp/v1.1/docker-compose.yaml +++ b/src/mvp/v1.1/docker-compose.yaml @@ -7,10 +7,12 @@ services: command: sleep infinity ports: - "8265:8265" + - "8080:8080" volumes: - ../verl:/workspace/verl - ../shared:/private - .:/workspace/mvp/v1.1 + - ../v2:/workspace/mvp/v2 shm_size: "10g" ulimits: nofile: diff --git a/src/mvp/v1.1/py/configs/dev.yaml b/src/mvp/v1.1/py/configs/dev.yaml index 2b7c830..c0244e6 100644 --- a/src/mvp/v1.1/py/configs/dev.yaml +++ b/src/mvp/v1.1/py/configs/dev.yaml @@ -1,4 +1,8 @@ # Ray 基础配置(dev 环境 / head 容器内视角) +# +# 说明: +# - v1.1 的 SDK submitter 会读取本文件作为 RayConfig。 +# - v2.0 的 API 服务/调度器也复用本文件作为“基础 RayConfig”,并在其上扩展 v2 专属配置项(见 v2:)。 address: "http://127.0.0.1:8265" # 容器内共享根路径(对齐生产 /private) @@ -18,3 +22,17 @@ runtime_env: # 用户自定义代码目录(可被 PYTHONPATH 注入) user_code_path: "/private/user/code" +# v2.0 服务层配置(v1.1 submitter 会忽略这些字段;v2.0 服务会读取) +v2: + api: + host: "0.0.0.0" + port: 8080 + auth: + # 内部 token 建议通过环境变量提供,避免写入配置文件 + token_env: "MVP_INTERNAL_TOKEN" + sqlite: + db_path: "/private/common/db/mvp_v2.sqlite3" + scheduler: + tick_s: 5 + retry_interval_s: 60 + max_running_tasks: 1 diff --git a/src/mvp/v2.0/README.md b/src/mvp/v2.0/README.md new file mode 100644 index 0000000..5f20c18 --- /dev/null +++ b/src/mvp/v2.0/README.md @@ -0,0 +1,104 @@ +# MVP v2.0(服务化入口) + +v2.0 在 v1.1(Ray Jobs SDK 提交链路)基础上新增一个**服务层**: +- HTTP API 提交任务(PPO/GRPO/SFT) +- 服务侧队列 + gang 资源判断 +- 识别 `verl` fail-fast 的资源不足失败并自动重试 +- SQLite 持久化队列/状态/attempt(NFS:`/private`) + +设计文档见: +- `specs/mvp/v2.0/v2_plan.md` +- `specs/mvp/v2.0/v2_api.md` + +## 快速开始(dev 示例) + +约定: +- Ray 容器仍由 v1.1 的 `docker-compose.yaml` 启动(head+2 workers) +- v2 代码在宿主机:`/home2/argus/infra/mvp/v2/`(容器内挂载 `/workspace/mvp/v2/`) +- v2 配置复用 v1.1:`/workspace/mvp/v1.1/py/configs/dev.yaml`(扩展了 `v2:` 段) + +宿主机执行: + +```bash +export MVP_INTERNAL_TOKEN=... # 内部 token +cd /home2/argus/infra/mvp/v2/scripts +./12_install_v2_deps.sh +./20_start_api.sh +./22_status_api.sh +``` + +API 测试(宿主机): + +```bash +curl -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" http://127.0.0.1:8080/api/v2/queue +``` + +> 进程日志与 pid(容器内路径)默认在 `/private/common/logs/` 与 `/private/common/run/`。 + +## 提交/查询/停止任务 + +约定: +- API 地址(宿主机视角):`http://127.0.0.1:8080` +- 鉴权:`Authorization: Bearer ${MVP_INTERNAL_TOKEN}` +- 请求体:**raw YAML**(JobSpec,格式与 v1.1 jobspec 一致) + +### 1) 提交任务(POST /api/v2/tasks) + +准备一个 jobspec(示例:PPO): + +```yaml +workload: "ppo" +submission_id: "" # v2 会忽略/覆盖,自动生成 task_id 并派生 ray_submission_id +code_path: "/private/common/code/verl/verl_repo" +model_id: "Qwen/Qwen2.5-0.5B-Instruct" +train_file: "/private/datasets/gsm8k/train.parquet" +val_file: "/private/datasets/gsm8k/test.parquet" +nnodes: 2 +n_gpus_per_node: 4 +total_epochs: 1 +total_training_steps: 10 +save_freq: 10 +test_freq: -1 +trainer_device: null +``` + +提交: + +```bash +curl -sS \ + -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ + -H "Content-Type: application/yaml" \ + --data-binary @jobspec.yaml \ + http://127.0.0.1:8080/api/v2/tasks +``` + +返回示例: + +```json +{"task_id":"mvp2-ppo-20251223-082813-6426","state":"QUEUED"} +``` + +### 2) 查询任务(GET /api/v2/tasks/{task_id}) + +```bash +curl -sS \ + -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ + http://127.0.0.1:8080/api/v2/tasks/ | python3 -m json.tool +``` + +可选: +- 查看 attempts:`GET /api/v2/tasks/{task_id}/attempts` +- 拉取日志(latest attempt):`GET /api/v2/tasks/{task_id}/logs?tail=2000` +- 查看队列:`GET /api/v2/queue` + +### 3) 停止/取消任务(POST /api/v2/tasks/{task_id}:cancel) + +```bash +curl -sS -X POST \ + -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \ + http://127.0.0.1:8080/api/v2/tasks/:cancel +``` + +说明: +- 若任务已提交到 Ray(`SUBMITTED/RUNNING`),服务会调用 Ray Jobs SDK `stop_job(ray_submission_id)`。 +- 若任务仍在队列(`QUEUED/PENDING_RESOURCES`),服务直接标记 `CANCELED`(不会产生 attempt)。 diff --git a/src/mvp/v2.0/py/mvp_v11/__init__.py b/src/mvp/v2.0/py/mvp_v11/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/__init__.py @@ -0,0 +1 @@ + diff --git a/src/mvp/v2.0/py/mvp_v11/builders.py b/src/mvp/v2.0/py/mvp_v11/builders.py new file mode 100644 index 0000000..d4b9786 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/builders.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .models import JobSpec + + +@dataclass(frozen=True) +class BuiltCommand: + argv: list[str] + + +def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> BuiltCommand: + """ + Returns argv for the actual training process (Hydra overrides preserved). + This argv is executed by a lightweight Python driver entrypoint. + """ + if spec.workload in ("ppo", "grpo"): + algo_overrides: list[str] = [] + if spec.workload == "grpo": + algo_overrides.append("algorithm.adv_estimator=grpo") + + test_freq = spec.test_freq if spec.test_freq is not None else -1 + val_file = spec.val_file if spec.val_file is not None else "null" + + argv = [ + "python3", + "-m", + "verl.trainer.main_ppo", + f"data.train_files={spec.train_file}", + f"data.val_files={val_file}", + "data.train_batch_size=256", + "data.max_prompt_length=512", + "data.max_response_length=512", + f"actor_rollout_ref.model.path={spec.model_id}", + "actor_rollout_ref.actor.optim.lr=1e-6", + "actor_rollout_ref.actor.ppo_mini_batch_size=64", + "actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4", + "actor_rollout_ref.rollout.name=sglang", + "actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8", + "actor_rollout_ref.rollout.tensor_model_parallel_size=1", + "actor_rollout_ref.rollout.gpu_memory_utilization=0.4", + "actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4", + "critic.optim.lr=1e-5", + f"critic.model.path={spec.model_id}", + "critic.ppo_micro_batch_size_per_gpu=4", + "algorithm.kl_ctrl.kl_coef=0.001", + *algo_overrides, + "trainer.logger=console", + "trainer.val_before_train=False", + f"trainer.n_gpus_per_node={spec.n_gpus_per_node}", + f"trainer.nnodes={spec.nnodes}", + f"trainer.save_freq={spec.save_freq}", + f"trainer.test_freq={test_freq}", + f"trainer.total_epochs={spec.total_epochs}", + f"trainer.total_training_steps={spec.total_training_steps}", + "trainer.resume_mode=disable", + f"trainer.default_local_dir={job_dir}/checkpoints", + "+ray_kwargs.ray_init.address=auto", + f"hydra.run.dir={job_dir}/logs/hydra", + ] + return BuiltCommand(argv=argv) + + if spec.workload == "sft": + val_override = "null" if spec.val_file is None else spec.val_file + trainer_device = spec.trainer_device or "cpu" + + argv = [ + "python3", + "-m", + "verl.trainer.sft_trainer_ray", + f"model.path={spec.model_id}", + f"data.train_files={spec.train_file}", + f"data.val_files={val_override}", + "data.train_batch_size=64", + "data.micro_batch_size_per_gpu=1", + "data.max_token_len_per_gpu=2048", + "data.max_length=1024", + "trainer.logger=console", + "trainer.project_name=mvp11-sft", + f"trainer.experiment_name={submission_id}", + f"trainer.total_epochs={spec.total_epochs}", + f"trainer.total_training_steps={spec.total_training_steps}", + f"trainer.save_freq={spec.save_freq}", + "trainer.test_freq=-1", + "trainer.resume_mode=disable", + f"trainer.device={trainer_device}", + f"trainer.default_local_dir={job_dir}/checkpoints", + f"trainer.nnodes={spec.nnodes}", + f"trainer.n_gpus_per_node={spec.n_gpus_per_node}", + f"hydra.run.dir={job_dir}/logs/hydra", + ] + return BuiltCommand(argv=argv) + + raise ValueError(f"unsupported workload: {spec.workload}") + diff --git a/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py b/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py new file mode 100644 index 0000000..9f99c58 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import argparse +import os +import shlex +import subprocess +import sys +from pathlib import Path + + +def _preflight() -> None: + print("MVP_PRECHECK_PYTHON:", sys.executable, flush=True) + print("MVP_PRECHECK_PYTHONPATH:", os.environ.get("PYTHONPATH"), flush=True) + print("MVP_PRECHECK_MVP_CODE_PATH:", os.environ.get("MVP_CODE_PATH"), flush=True) + try: + import verl # type: ignore + + print("MVP_PRECHECK_VERL_FILE:", getattr(verl, "__file__", None), flush=True) + except Exception as e: + print("MVP_PRECHECK_VERL_IMPORT_ERROR:", repr(e), flush=True) + + try: + import mvp_marker # type: ignore + + print("MVP_PRECHECK_MARKER:", getattr(mvp_marker, "MARKER", None), flush=True) + except Exception as e: + print("MVP_PRECHECK_MARKER_MISSING:", repr(e), flush=True) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--job-dir", required=True) + parser.add_argument("cmd", nargs=argparse.REMAINDER) + args = parser.parse_args() + + job_dir = Path(args.job_dir) + job_dir.mkdir(parents=True, exist_ok=True) + + _preflight() + + if not args.cmd: + print("no command provided", file=sys.stderr) + return 2 + + # argparse includes the leading "--" if the caller uses it; strip it. + cmd = list(args.cmd) + if cmd and cmd[0] == "--": + cmd = cmd[1:] + if not cmd: + print("no command provided", file=sys.stderr) + return 2 + + # Execute training command as a subprocess so that logs are captured by Ray job logs. + cmd_str = " ".join(shlex.quote(x) for x in cmd) + print("MVP_DRIVER_EXEC:", cmd_str, flush=True) + + proc = subprocess.run(cmd, check=False) + print("MVP_DRIVER_EXIT_CODE:", proc.returncode, flush=True) + return proc.returncode + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/mvp/v2.0/py/mvp_v11/models.py b/src/mvp/v2.0/py/mvp_v11/models.py new file mode 100644 index 0000000..41f8804 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/models.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +def _require(d: dict[str, Any], key: str) -> Any: + if key not in d or d[key] in (None, ""): + raise ValueError(f"missing required field: {key}") + return d[key] + + +@dataclass(frozen=True) +class RayConfig: + address: str + shared_root: str + entrypoint_num_cpus: float + entrypoint_resources: dict[str, float] + runtime_env_env_vars: dict[str, str] + user_code_path: str + + @staticmethod + def from_dict(d: dict[str, Any]) -> "RayConfig": + runtime_env = d.get("runtime_env") or {} + env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {} + if not isinstance(env_vars, dict): + raise ValueError("runtime_env.env_vars must be a mapping") + + entrypoint_resources = d.get("entrypoint_resources") or {} + if not isinstance(entrypoint_resources, dict): + raise ValueError("entrypoint_resources must be a mapping") + + return RayConfig( + address=str(_require(d, "address")), + shared_root=str(_require(d, "shared_root")), + entrypoint_num_cpus=float(d.get("entrypoint_num_cpus", 1)), + entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()}, + runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()}, + user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")), + ) + + def to_public_dict(self) -> dict[str, Any]: + return { + "address": self.address, + "shared_root": self.shared_root, + "entrypoint_num_cpus": self.entrypoint_num_cpus, + "entrypoint_resources": self.entrypoint_resources, + "runtime_env": {"env_vars": self.runtime_env_env_vars}, + "user_code_path": self.user_code_path, + } + + +@dataclass(frozen=True) +class JobSpec: + workload: str # ppo|grpo|sft + submission_id: str | None + code_path: str + model_id: str + + train_file: str + val_file: str | None + + nnodes: int + n_gpus_per_node: int + + total_epochs: int + total_training_steps: int + + save_freq: int + test_freq: int | None + + trainer_device: str | None # only for sft (driver-side device) + + @staticmethod + def from_dict(d: dict[str, Any]) -> "JobSpec": + workload = str(_require(d, "workload")) + if workload not in ("ppo", "grpo", "sft"): + raise ValueError(f"unsupported workload: {workload}") + + val_file = d.get("val_file", None) + if val_file in ("", "null"): + val_file = None + + test_freq = d.get("test_freq", None) + if test_freq in ("", "null"): + test_freq = None + + return JobSpec( + workload=workload, + submission_id=(str(d["submission_id"]) if d.get("submission_id") else None), + code_path=str(_require(d, "code_path")), + model_id=str(_require(d, "model_id")), + train_file=str(_require(d, "train_file")), + val_file=(str(val_file) if val_file is not None else None), + nnodes=int(d.get("nnodes", 2)), + n_gpus_per_node=int(d.get("n_gpus_per_node", 4)), + total_epochs=int(d.get("total_epochs", 1)), + total_training_steps=int(d.get("total_training_steps", 10)), + save_freq=int(d.get("save_freq", 10)), + test_freq=(int(test_freq) if test_freq is not None else None), + trainer_device=(str(d.get("trainer_device")) if d.get("trainer_device") else None), + ) + + def to_public_dict(self) -> dict[str, Any]: + out: dict[str, Any] = { + "workload": self.workload, + "submission_id": self.submission_id or "", + "code_path": self.code_path, + "model_id": self.model_id, + "train_file": self.train_file, + "val_file": self.val_file, + "nnodes": self.nnodes, + "n_gpus_per_node": self.n_gpus_per_node, + "total_epochs": self.total_epochs, + "total_training_steps": self.total_training_steps, + "save_freq": self.save_freq, + "test_freq": self.test_freq, + } + if self.workload == "sft": + out["trainer_device"] = self.trainer_device or "cpu" + return out diff --git a/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py b/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py new file mode 100644 index 0000000..01b899a --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/ray_job_tool.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import json +import os +import shlex +from datetime import datetime +from pathlib import Path +from typing import Any + +import ray +from ray.job_submission import JobSubmissionClient + +from .builders import build_training_argv +from .models import JobSpec, RayConfig +from .yaml_io import dump_yaml + + +def _ts() -> str: + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def _mkdir(p: Path) -> None: + p.mkdir(parents=True, exist_ok=True) + + +def _write_text(p: Path, content: str) -> None: + _mkdir(p.parent) + p.write_text(content, encoding="utf-8") + + +def _write_json(p: Path, obj: Any) -> None: + _write_text(p, json.dumps(obj, indent=2, ensure_ascii=False) + "\n") + + +def _safe_basename(path: str) -> str: + return path.rstrip("/").split("/")[-1] + + +class RayJobTool: + def __init__(self, cfg: RayConfig): + self.cfg = cfg + self.client = JobSubmissionClient(cfg.address) + + def _job_dir(self, submission_id: str) -> str: + return f"{self.cfg.shared_root}/jobs/{submission_id}" + + def _runtime_env(self, spec: JobSpec) -> dict[str, Any]: + env_vars = dict(self.cfg.runtime_env_env_vars) + + # Default HF cache + env_vars.setdefault("HF_HOME", f"{self.cfg.shared_root}/hf") + env_vars.setdefault("HUGGINGFACE_HUB_CACHE", f"{self.cfg.shared_root}/hf/hub") + env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers") + env_vars.setdefault("PYTHONUNBUFFERED", "1") + + # Tool code path must be importable on workers (compose mounts v1.1 into all containers). + # Place it before verl code to avoid interfering with verl import priority. + tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py") + + user_code_path = self.cfg.user_code_path + code_path = spec.code_path + + existing = env_vars.get("PYTHONPATH", "") + prefix = f"{tool_code_path}:{code_path}:{user_code_path}" + env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix + + # For debugging / log visibility + env_vars["MVP_CODE_PATH"] = code_path + + # SFT: ensure ray.init() connects to the cluster + if spec.workload == "sft": + env_vars.setdefault("RAY_ADDRESS", "auto") + + return {"env_vars": env_vars} + + def submit(self, spec: JobSpec, no_wait: bool) -> str: + submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}" + job_dir = self._job_dir(submission_id) + + built = build_training_argv(spec, submission_id=submission_id, job_dir=job_dir) + entrypoint_argv = [ + "python3", + "-m", + "mvp_v11.driver_entrypoint", + "--job-dir", + job_dir, + *built.argv, + ] + entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv) + + runtime_env = self._runtime_env(spec) + + # Prepare job artifacts directory + job_root = Path(job_dir) + _mkdir(job_root / "config") + _mkdir(job_root / "logs") + _mkdir(job_root / "debug") + _mkdir(job_root / "checkpoints") + + _write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict())) + _write_text(job_root / "config" / "jobspec.yaml", dump_yaml(spec.to_public_dict())) + _write_json(job_root / "config" / "submit_payload.json", { + "submission_id": submission_id, + "address": self.cfg.address, + "entrypoint": entrypoint, + "entrypoint_num_cpus": self.cfg.entrypoint_num_cpus, + "entrypoint_resources": self.cfg.entrypoint_resources, + "runtime_env": runtime_env, + }) + + # Pre-submit debug snapshot (ray cluster resources via ray.init) + try: + ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False) + _write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources()) + _write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources()) + except Exception as e: + _write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n") + + try: + submitted = self.client.submit_job( + entrypoint=entrypoint, + submission_id=submission_id, + runtime_env=runtime_env, + entrypoint_num_cpus=self.cfg.entrypoint_num_cpus, + entrypoint_resources=self.cfg.entrypoint_resources, + ) + except Exception as e: + _write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n") + raise + + _write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n") + + # Post-submit debug snapshot via SDK + try: + jobs = self.client.list_jobs() + _write_text( + job_root / "debug" / "ray_job_list_post.json", + json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n", + ) + except Exception as e: + _write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n") + + if not no_wait: + # caller can separately wait; keep submit non-blocking by default in scripts + pass + + return submitted + + def status(self, submission_id: str) -> str: + return str(self.client.get_job_status(submission_id)) + + def stop(self, submission_id: str) -> bool: + return bool(self.client.stop_job(submission_id)) + + def logs(self, submission_id: str) -> str: + return self.client.get_job_logs(submission_id) + + def list(self) -> list[dict[str, Any]]: + return [_job_details_to_dict(j) for j in self.client.list_jobs()] + + +def _job_details_to_dict(obj: Any) -> dict[str, Any]: + # Ray uses pydantic models internally, but depending on bundled pydantic version + # we might get `.model_dump()` (v2) or `.dict()` (v1). + if hasattr(obj, "model_dump"): + return obj.model_dump() # type: ignore[no-any-return] + if hasattr(obj, "dict"): + return obj.dict() # type: ignore[no-any-return] + if hasattr(obj, "__dict__"): + return dict(obj.__dict__) + return {"repr": repr(obj)} diff --git a/src/mvp/v2.0/py/mvp_v11/yaml_io.py b/src/mvp/v2.0/py/mvp_v11/yaml_io.py new file mode 100644 index 0000000..c321688 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v11/yaml_io.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml + + +def load_yaml(path: str) -> dict[str, Any]: + p = Path(path) + data = yaml.safe_load(p.read_text(encoding="utf-8")) + if data is None: + return {} + if not isinstance(data, dict): + raise ValueError(f"yaml root must be a mapping: {path}") + return data + + +def dump_yaml(data: dict[str, Any]) -> str: + return yaml.safe_dump(data, sort_keys=False, allow_unicode=True) + diff --git a/src/mvp/v2.0/py/mvp_v2/__init__.py b/src/mvp/v2.0/py/mvp_v2/__init__.py new file mode 100644 index 0000000..fe16459 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/__init__.py @@ -0,0 +1,2 @@ +__all__ = [] + diff --git a/src/mvp/v2.0/py/mvp_v2/app.py b/src/mvp/v2.0/py/mvp_v2/app.py new file mode 100644 index 0000000..798a124 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/app.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import os +import threading +from typing import Any + +import yaml +from fastapi import FastAPI, HTTPException, Request, Response + +from mvp_v11.models import JobSpec, RayConfig + +from .config import V2Config +from .db import Db +from .ids import new_task_id +from .scheduler import Scheduler + + +def _utc_now_iso() -> str: + from datetime import datetime + + return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + + +def _load_yaml_file(path: str) -> dict[str, Any]: + with open(path, "r", encoding="utf-8") as f: + obj = yaml.safe_load(f) or {} + if not isinstance(obj, dict): + raise ValueError("config yaml must be a mapping") + return obj + + +def create_app(config_path: str) -> FastAPI: + root = _load_yaml_file(config_path) + ray_cfg = RayConfig.from_dict(root) + v2_cfg = V2Config.from_root_dict(root) + + db = Db(v2_cfg.sqlite.db_path) + db.init() + + scheduler = Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg) + stop_flag = threading.Event() + tool = scheduler.tool + + app = FastAPI(title="mvp-v2", version="2.0") + + def _require_token(req: Request) -> None: + token_env = v2_cfg.auth.token_env + expected = os.environ.get(token_env, "") + if not expected: + # Misconfigured service; treat as server error. + raise HTTPException(status_code=500, detail=f"missing token env: {token_env}") + + auth = req.headers.get("authorization") or "" + if not auth.startswith("Bearer "): + raise HTTPException(status_code=401, detail="missing bearer token") + got = auth.removeprefix("Bearer ").strip() + if got != expected: + raise HTTPException(status_code=401, detail="invalid token") + + @app.on_event("startup") + def _startup() -> None: + t = threading.Thread(target=scheduler.run_forever, args=(stop_flag,), daemon=True) + t.start() + + @app.on_event("shutdown") + def _shutdown() -> None: + stop_flag.set() + + @app.post("/api/v2/tasks") + async def submit_task(req: Request) -> dict[str, Any]: + _require_token(req) + body = (await req.body()).decode("utf-8") + obj = yaml.safe_load(body) or {} + if not isinstance(obj, dict): + raise HTTPException(status_code=400, detail="jobspec must be a YAML mapping") + + try: + spec = JobSpec.from_dict(obj) + except Exception as e: + raise HTTPException(status_code=400, detail=f"invalid jobspec: {e!r}") + + task_id = new_task_id(spec.workload) + db.create_task( + task_id=task_id, + workload=spec.workload, + jobspec_yaml=body, + nnodes=spec.nnodes, + n_gpus_per_node=spec.n_gpus_per_node, + ) + return {"task_id": task_id, "state": "QUEUED"} + + @app.get("/api/v2/tasks/{task_id}") + async def get_task(task_id: str, req: Request) -> dict[str, Any]: + _require_token(req) + row = db.get_task(task_id) + if not row: + raise HTTPException(status_code=404, detail="task not found") + attempts = db.list_attempts(task_id) + latest_attempt = attempts[-1] if attempts else None + desired = { + "nnodes": int(row["nnodes"]), + "n_gpus_per_node": int(row["n_gpus_per_node"]), + "total_gpus": int(row["nnodes"]) * int(row["n_gpus_per_node"]), + } + out: dict[str, Any] = { + "task_id": row["task_id"], + "workload": row["workload"], + "state": row["state"], + "created_at": row.get("created_at"), + "updated_at": row.get("updated_at"), + "desired_resources": desired, + "error_summary": row.get("error_summary"), + } + if latest_attempt: + out["latest_attempt"] = { + "attempt_no": latest_attempt["attempt_no"], + "ray_submission_id": latest_attempt["ray_submission_id"], + "ray_status": latest_attempt.get("ray_status"), + "start_time": latest_attempt.get("start_time"), + "end_time": latest_attempt.get("end_time"), + "failure_kind": latest_attempt.get("failure_kind"), + "message": latest_attempt.get("message"), + } + return out + + @app.get("/api/v2/tasks/{task_id}/attempts") + async def get_attempts(task_id: str, req: Request) -> dict[str, Any]: + _require_token(req) + row = db.get_task(task_id) + if not row: + raise HTTPException(status_code=404, detail="task not found") + return {"task_id": task_id, "attempts": db.list_attempts(task_id)} + + @app.post("/api/v2/tasks/{task_id}:cancel") + async def cancel(task_id: str, req: Request) -> dict[str, Any]: + _require_token(req) + row = db.get_task(task_id) + if not row: + raise HTTPException(status_code=404, detail="task not found") + + state = str(row["state"]) + if state in ("SUCCEEDED", "FAILED", "CANCELED"): + raise HTTPException(status_code=409, detail=f"task already terminal: {state}") + + attempts = db.list_attempts(task_id) + if attempts: + ray_sid = str(attempts[-1]["ray_submission_id"]) + try: + tool.stop(ray_sid) + except Exception: + pass + # Mark attempt as canceled on the service side so that API doesn't keep reporting RUNNING. + # Ray stop is async; we deliberately reflect the user's intent here. + db.update_attempt( + task_id=task_id, + attempt_no=int(attempts[-1]["attempt_no"]), + ray_status="STOPPED", + failure_kind="CANCELED", + message="Canceled by user via API (Ray stop requested).", + end_time=_utc_now_iso(), + ) + + db.set_task_state(task_id=task_id, state="CANCELED", event_type="CANCELED") + return {"task_id": task_id, "state": "CANCELED"} + + @app.get("/api/v2/tasks/{task_id}/logs") + async def logs(task_id: str, req: Request, tail: int = 2000, attempt: str = "latest") -> Response: + _require_token(req) + row = db.get_task(task_id) + if not row: + raise HTTPException(status_code=404, detail="task not found") + attempts = db.list_attempts(task_id) + if not attempts: + raise HTTPException(status_code=404, detail="no attempts yet") + a = attempts[-1] if attempt == "latest" else None + if a is None: + raise HTTPException(status_code=400, detail="only attempt=latest supported in v2.0") + ray_sid = str(a["ray_submission_id"]) + text = tool.logs(ray_sid) + if tail and tail > 0: + lines = text.splitlines() + text = "\n".join(lines[-tail:]) + ("\n" if lines else "") + return Response(content=text, media_type="text/plain") + + @app.get("/api/v2/queue") + async def queue(req: Request) -> dict[str, Any]: + _require_token(req) + return db.list_queue() + + return app diff --git a/src/mvp/v2.0/py/mvp_v2/config.py b/src/mvp/v2.0/py/mvp_v2/config.py new file mode 100644 index 0000000..c859c9b --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/config.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class V2ApiConfig: + host: str = "0.0.0.0" + port: int = 8080 + + +@dataclass(frozen=True) +class V2AuthConfig: + token_env: str = "MVP_INTERNAL_TOKEN" + + +@dataclass(frozen=True) +class V2SqliteConfig: + db_path: str + + +@dataclass(frozen=True) +class V2SchedulerConfig: + tick_s: int = 5 + retry_interval_s: int = 60 + max_running_tasks: int = 1 + + +@dataclass(frozen=True) +class V2Config: + api: V2ApiConfig + auth: V2AuthConfig + sqlite: V2SqliteConfig + scheduler: V2SchedulerConfig + + @staticmethod + def from_root_dict(root: dict[str, Any]) -> "V2Config": + v2 = root.get("v2") or {} + if not isinstance(v2, dict): + raise ValueError("config.v2 must be a mapping") + + api = v2.get("api") or {} + auth = v2.get("auth") or {} + sqlite = v2.get("sqlite") or {} + scheduler = v2.get("scheduler") or {} + + if not isinstance(api, dict) or not isinstance(auth, dict) or not isinstance(sqlite, dict) or not isinstance(scheduler, dict): + raise ValueError("config.v2.{api,auth,sqlite,scheduler} must be mappings") + + shared_root = str(root.get("shared_root") or "/private") + default_db_path = f"{shared_root}/common/db/mvp_v2.sqlite3" + db_path = str(sqlite.get("db_path") or default_db_path) + + return V2Config( + api=V2ApiConfig( + host=str(api.get("host") or "0.0.0.0"), + port=int(api.get("port") or 8080), + ), + auth=V2AuthConfig(token_env=str(auth.get("token_env") or "MVP_INTERNAL_TOKEN")), + sqlite=V2SqliteConfig(db_path=db_path), + scheduler=V2SchedulerConfig( + tick_s=int(scheduler.get("tick_s") or 5), + retry_interval_s=int(scheduler.get("retry_interval_s") or 60), + max_running_tasks=int(scheduler.get("max_running_tasks") or 1), + ), + ) + diff --git a/src/mvp/v2.0/py/mvp_v2/db.py b/src/mvp/v2.0/py/mvp_v2/db.py new file mode 100644 index 0000000..4bcb2e8 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/db.py @@ -0,0 +1,254 @@ +from __future__ import annotations + +import os +import sqlite3 +from contextlib import contextmanager +from dataclasses import dataclass +from typing import Any, Iterator + + +def _utc_now_iso() -> str: + # Keep it simple; wall-clock ordering only. + import datetime as _dt + + return _dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + + +@dataclass(frozen=True) +class Db: + db_path: str + + def _connect(self) -> sqlite3.Connection: + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + conn = sqlite3.connect(self.db_path, timeout=30, isolation_level=None) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA foreign_keys=ON;") + return conn + + def init(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS tasks ( + task_id TEXT PRIMARY KEY, + workload TEXT NOT NULL, + state TEXT NOT NULL, + jobspec_yaml TEXT NOT NULL, + nnodes INTEGER NOT NULL, + n_gpus_per_node INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + next_run_at TEXT, + error_summary TEXT, + latest_attempt_no INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS attempts ( + task_id TEXT NOT NULL, + attempt_no INTEGER NOT NULL, + ray_submission_id TEXT NOT NULL UNIQUE, + ray_status TEXT, + failure_kind TEXT, + message TEXT, + start_time TEXT NOT NULL, + end_time TEXT, + PRIMARY KEY (task_id, attempt_no), + FOREIGN KEY (task_id) REFERENCES tasks(task_id) ON DELETE CASCADE + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT, + ts TEXT NOT NULL, + event_type TEXT NOT NULL, + payload_json TEXT, + FOREIGN KEY (task_id) REFERENCES tasks(task_id) ON DELETE CASCADE + ) + """ + ) + + @contextmanager + def tx(self) -> Iterator[sqlite3.Connection]: + conn = self._connect() + try: + conn.execute("BEGIN IMMEDIATE;") + yield conn + conn.execute("COMMIT;") + except Exception: + conn.execute("ROLLBACK;") + raise + finally: + conn.close() + + def create_task(self, *, task_id: str, workload: str, jobspec_yaml: str, nnodes: int, n_gpus_per_node: int) -> dict[str, Any]: + now = _utc_now_iso() + with self.tx() as conn: + conn.execute( + """ + INSERT INTO tasks (task_id, workload, state, jobspec_yaml, nnodes, n_gpus_per_node, created_at, updated_at) + VALUES (?, ?, 'QUEUED', ?, ?, ?, ?, ?) + """, + (task_id, workload, jobspec_yaml, nnodes, n_gpus_per_node, now, now), + ) + conn.execute( + "INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'TASK_CREATED', ?)", + (task_id, now, None), + ) + row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone() + return dict(row) if row else {} + + def get_task(self, task_id: str) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone() + return dict(row) if row else None + + def list_attempts(self, task_id: str) -> list[dict[str, Any]]: + with self._connect() as conn: + rows = conn.execute( + "SELECT * FROM attempts WHERE task_id = ? ORDER BY attempt_no ASC", (task_id,) + ).fetchall() + return [dict(r) for r in rows] + + def list_queue(self) -> dict[str, list[dict[str, Any]]]: + with self._connect() as conn: + pending = conn.execute( + """ + SELECT task_id, workload, state, nnodes, n_gpus_per_node, next_run_at, created_at, updated_at + FROM tasks + WHERE state IN ('QUEUED','PENDING_RESOURCES') + ORDER BY created_at ASC + LIMIT 200 + """ + ).fetchall() + running = conn.execute( + """ + SELECT task_id, workload, state, nnodes, n_gpus_per_node, latest_attempt_no, created_at, updated_at + FROM tasks + WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING') + ORDER BY updated_at ASC + LIMIT 200 + """ + ).fetchall() + return {"pending": [dict(r) for r in pending], "running": [dict(r) for r in running]} + + def count_running(self) -> int: + with self._connect() as conn: + row = conn.execute( + "SELECT COUNT(1) AS n FROM tasks WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING')" + ).fetchone() + return int(row["n"]) if row else 0 + + def list_active_tasks(self, limit: int = 50) -> list[dict[str, Any]]: + with self._connect() as conn: + rows = conn.execute( + "SELECT * FROM tasks WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING') ORDER BY updated_at ASC LIMIT ?", + (int(limit),), + ).fetchall() + return [dict(r) for r in rows] + + def pick_next_runnable_task(self) -> dict[str, Any] | None: + now = _utc_now_iso() + with self._connect() as conn: + row = conn.execute( + """ + SELECT * + FROM tasks + WHERE state IN ('QUEUED','PENDING_RESOURCES') + AND (next_run_at IS NULL OR next_run_at <= ?) + ORDER BY created_at ASC + LIMIT 1 + """, + (now,), + ).fetchone() + return dict(row) if row else None + + def set_task_state( + self, + *, + task_id: str, + state: str, + error_summary: str | None = None, + next_run_at: str | None = None, + latest_attempt_no: int | None = None, + event_type: str = "STATE_UPDATE", + payload_json: str | None = None, + ) -> None: + now = _utc_now_iso() + with self.tx() as conn: + sets = ["state = ?", "updated_at = ?"] + params: list[Any] = [state, now] + if error_summary is not None: + sets.append("error_summary = ?") + params.append(error_summary) + if next_run_at is not None: + sets.append("next_run_at = ?") + params.append(next_run_at) + if latest_attempt_no is not None: + sets.append("latest_attempt_no = ?") + params.append(int(latest_attempt_no)) + params.append(task_id) + conn.execute(f"UPDATE tasks SET {', '.join(sets)} WHERE task_id = ?", tuple(params)) + conn.execute( + "INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, ?, ?)", + (task_id, now, event_type, payload_json), + ) + + def create_attempt(self, *, task_id: str, attempt_no: int, ray_submission_id: str) -> None: + now = _utc_now_iso() + with self.tx() as conn: + conn.execute( + """ + INSERT INTO attempts (task_id, attempt_no, ray_submission_id, ray_status, start_time) + VALUES (?, ?, ?, ?, ?) + """, + (task_id, attempt_no, ray_submission_id, "SUBMITTING", now), + ) + conn.execute( + "INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'ATTEMPT_CREATED', ?)", + (task_id, now, None), + ) + + def update_attempt( + self, + *, + task_id: str, + attempt_no: int, + ray_status: str | None = None, + failure_kind: str | None = None, + message: str | None = None, + end_time: str | None = None, + ) -> None: + now = _utc_now_iso() + with self.tx() as conn: + sets = [] + params: list[Any] = [] + if ray_status is not None: + sets.append("ray_status = ?") + params.append(ray_status) + if failure_kind is not None: + sets.append("failure_kind = ?") + params.append(failure_kind) + if message is not None: + sets.append("message = ?") + params.append(message) + if end_time is not None: + sets.append("end_time = ?") + params.append(end_time) + if not sets: + return + params.extend([task_id, attempt_no]) + conn.execute( + f"UPDATE attempts SET {', '.join(sets)} WHERE task_id = ? AND attempt_no = ?", + tuple(params), + ) + conn.execute( + "INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'ATTEMPT_UPDATE', ?)", + (task_id, now, None), + ) diff --git a/src/mvp/v2.0/py/mvp_v2/ids.py b/src/mvp/v2.0/py/mvp_v2/ids.py new file mode 100644 index 0000000..0f1f0cc --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/ids.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import secrets +from datetime import datetime + + +def new_task_id(workload: str) -> str: + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + suffix = secrets.token_hex(2) + return f"mvp2-{workload}-{ts}-{suffix}" + + +def attempt_submission_id(task_id: str, attempt_no: int) -> str: + return f"{task_id}--a{attempt_no:02d}" + diff --git a/src/mvp/v2.0/py/mvp_v2/ray_resources.py b/src/mvp/v2.0/py/mvp_v2/ray_resources.py new file mode 100644 index 0000000..96f4d2a --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/ray_resources.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import ray + + +@dataclass(frozen=True) +class ClusterAvailable: + total_available_gpus: float + total_available_npus: float + + +def get_cluster_available() -> ClusterAvailable: + # Align with verl's fail-fast check which uses ray._private.state.available_resources_per_node(). + # This is a best-effort internal API and may change with Ray versions. + try: + import ray._private.state # type: ignore + + per_node = ray._private.state.available_resources_per_node() + except Exception: + # If we cannot fetch per-node resources, conservatively return 0. + return ClusterAvailable(total_available_gpus=0.0, total_available_npus=0.0) + + total_gpu = 0.0 + total_npu = 0.0 + for _, info in per_node.items(): + if not isinstance(info, dict): + continue + total_gpu += float(info.get("GPU", 0) or 0) + total_npu += float(info.get("NPU", 0) or 0) + return ClusterAvailable(total_available_gpus=total_gpu, total_available_npus=total_npu) + + +def ensure_ray_connected() -> None: + ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False) + diff --git a/src/mvp/v2.0/py/mvp_v2/scheduler.py b/src/mvp/v2.0/py/mvp_v2/scheduler.py new file mode 100644 index 0000000..d84f9f2 --- /dev/null +++ b/src/mvp/v2.0/py/mvp_v2/scheduler.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import re +import time +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + +import yaml + +from mvp_v11.models import JobSpec, RayConfig +from mvp_v11.ray_job_tool import RayJobTool + +from .config import V2Config +from .db import Db +from .ids import attempt_submission_id +from .ray_resources import ensure_ray_connected, get_cluster_available + + +_INSUFFICIENT_RE = re.compile(r"Total available GPUs\\s+\\d+\\s+is less than total desired GPUs\\s+\\d+") + + +def _utc_now_iso() -> str: + return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + + +def _utc_after_s(seconds: int) -> str: + return (datetime.utcnow() + timedelta(seconds=seconds)).replace(microsecond=0).isoformat() + "Z" + + +@dataclass +class Scheduler: + db: Db + ray_cfg: RayConfig + v2_cfg: V2Config + + def __post_init__(self) -> None: + self.tool = RayJobTool(self.ray_cfg) + + def _resources_sufficient(self, *, nnodes: int, n_gpus_per_node: int) -> bool: + avail = get_cluster_available() + required = float(nnodes * n_gpus_per_node) + return avail.total_available_gpus >= required + + def _parse_jobspec(self, jobspec_yaml: str) -> JobSpec: + obj = yaml.safe_load(jobspec_yaml) or {} + if not isinstance(obj, dict): + raise ValueError("jobspec must be a YAML mapping") + return JobSpec.from_dict(obj) + + def _submit_one(self, task_row: dict[str, Any]) -> None: + task_id = str(task_row["task_id"]) + jobspec_yaml = str(task_row["jobspec_yaml"]) + + spec = self._parse_jobspec(jobspec_yaml) + attempt_no = int(task_row.get("latest_attempt_no", 0)) + 1 + ray_sid = attempt_submission_id(task_id, attempt_no) + + # Record attempt first so that we can surface it even if submit crashes. + self.db.create_attempt(task_id=task_id, attempt_no=attempt_no, ray_submission_id=ray_sid) + self.db.set_task_state(task_id=task_id, state="SUBMITTING", latest_attempt_no=attempt_no) + + # Override submission_id in jobspec (v1.1 compatible) + d = spec.to_public_dict() + d["submission_id"] = ray_sid + spec2 = JobSpec.from_dict(d) + + try: + submitted = self.tool.submit(spec2, no_wait=True) + # submitted should equal ray_sid; keep as source of truth. + self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="SUBMITTED") + self.db.set_task_state(task_id=task_id, state="SUBMITTED") + if submitted != ray_sid: + self.db.set_task_state(task_id=task_id, state="SUBMITTED", event_type="WARN_SUBMISSION_ID_MISMATCH") + except Exception as e: + msg = repr(e) + self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="FAILED", failure_kind="UNKNOWN", message=msg, end_time=_utc_now_iso()) + self.db.set_task_state(task_id=task_id, state="FAILED", error_summary=msg) + + def _sync_one_running(self, task_row: dict[str, Any]) -> None: + task_id = str(task_row["task_id"]) + latest_attempt_no = int(task_row.get("latest_attempt_no", 0)) + if latest_attempt_no <= 0: + return + + # Look up ray_submission_id + attempts = self.db.list_attempts(task_id) + if not attempts: + return + ray_sid = str(attempts[-1]["ray_submission_id"]) + + try: + st = self.tool.status(ray_sid) + except Exception as e: + # Keep current state; transient failures should not flap tasks. + self.db.set_task_state(task_id=task_id, state=str(task_row["state"]), event_type="RAY_STATUS_ERROR", payload_json=repr(e)) + return + + st_s = str(st) + if st_s in ("PENDING", "RUNNING"): + self.db.update_attempt(task_id=task_id, attempt_no=latest_attempt_no, ray_status=st_s) + self.db.set_task_state(task_id=task_id, state=("RUNNING" if st_s == "RUNNING" else "SUBMITTED")) + return + + if st_s in ("SUCCEEDED",): + self.db.update_attempt(task_id=task_id, attempt_no=latest_attempt_no, ray_status=st_s, end_time=_utc_now_iso()) + self.db.set_task_state(task_id=task_id, state="SUCCEEDED") + return + + if st_s in ("FAILED", "STOPPED"): + logs = "" + try: + logs = self.tool.logs(ray_sid) + except Exception: + logs = "" + failure_kind = "UNKNOWN" + msg = "" + if _INSUFFICIENT_RE.search(logs): + failure_kind = "INSUFFICIENT_RESOURCES" + msg = "Insufficient resources (verl fail-fast): " + (_INSUFFICIENT_RE.search(logs).group(0)) + self.db.update_attempt( + task_id=task_id, + attempt_no=latest_attempt_no, + ray_status=st_s, + failure_kind=failure_kind, + message=msg, + end_time=_utc_now_iso(), + ) + self.db.set_task_state( + task_id=task_id, + state="PENDING_RESOURCES", + error_summary=msg, + next_run_at=_utc_after_s(self.v2_cfg.scheduler.retry_interval_s), + event_type="RETRY_SCHEDULED", + ) + return + + msg = f"Ray job {st_s}" + self.db.update_attempt( + task_id=task_id, + attempt_no=latest_attempt_no, + ray_status=st_s, + failure_kind=failure_kind, + message=msg, + end_time=_utc_now_iso(), + ) + self.db.set_task_state(task_id=task_id, state="FAILED", error_summary=msg) + + def tick(self) -> None: + ensure_ray_connected() + + # Sync active tasks + for row in self.db.list_active_tasks(limit=50): + self._sync_one_running(row) + + # Submit new tasks if capacity allows + if self.db.count_running() >= self.v2_cfg.scheduler.max_running_tasks: + return + + row = self.db.pick_next_runnable_task() + if not row: + return + + nnodes = int(row["nnodes"]) + n_gpus_per_node = int(row["n_gpus_per_node"]) + + if not self._resources_sufficient(nnodes=nnodes, n_gpus_per_node=n_gpus_per_node): + self.db.set_task_state( + task_id=str(row["task_id"]), + state="PENDING_RESOURCES", + next_run_at=_utc_after_s(self.v2_cfg.scheduler.retry_interval_s), + event_type="PENDING_RESOURCES", + ) + return + + self._submit_one(row) + + def run_forever(self, stop_flag: Any) -> None: + while not stop_flag.is_set(): + try: + self.tick() + except Exception: + # Best-effort: don't crash the scheduler loop + pass + time.sleep(max(1, int(self.v2_cfg.scheduler.tick_s))) diff --git a/src/mvp/v2.0/py/requirements.txt b/src/mvp/v2.0/py/requirements.txt new file mode 100644 index 0000000..870ddce --- /dev/null +++ b/src/mvp/v2.0/py/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.6 +uvicorn==0.30.6 +PyYAML==6.0.2 + diff --git a/src/mvp/v2.0/py/server.py b/src/mvp/v2.0/py/server.py new file mode 100644 index 0000000..b1ea2ef --- /dev/null +++ b/src/mvp/v2.0/py/server.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse + +import uvicorn + +from mvp_v2.app import create_app +from mvp_v2.config import V2Config + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--config", required=True, help="Path to v1.1 RayConfig YAML (extended with v2:)") + args = parser.parse_args() + + # Load app and read v2.api host/port from config. + import yaml + + with open(args.config, "r", encoding="utf-8") as f: + root = yaml.safe_load(f) or {} + if not isinstance(root, dict): + raise SystemExit("config yaml must be a mapping") + v2 = V2Config.from_root_dict(root) + + app = create_app(args.config) + uvicorn.run(app, host=v2.api.host, port=v2.api.port, log_level="info") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/src/mvp/v2.0/scripts/12_install_v2_deps.sh b/src/mvp/v2.0/scripts/12_install_v2_deps.sh new file mode 100755 index 0000000..38f3544 --- /dev/null +++ b/src/mvp/v2.0/scripts/12_install_v2_deps.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Install v2.0 API dependencies inside the head container (best-effort). +# Assumes v1.1 containers are already up and v2.0 code is mounted/available. + +HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}" + +docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true" +docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/v2/py/requirements.txt" diff --git a/src/mvp/v2.0/scripts/20_start_api.sh b/src/mvp/v2.0/scripts/20_start_api.sh new file mode 100755 index 0000000..81f328c --- /dev/null +++ b/src/mvp/v2.0/scripts/20_start_api.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/v1.1/py/configs/dev.yaml}" +LOG_PATH="${LOG_PATH:-/private/common/logs/mvp_v2_api.log}" +PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" + +echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}" + +dexec bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\"" + +# Note: requires /workspace/mvp/v2.0/py to be present in the container (mounted or copied). +# Escape $ so that the command substitution happens in the container, not on the host. +dexec bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi" + +if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then + echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2 + exit 1 +fi + +docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/v2/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'" + +echo "[host] started; pid stored in ${PID_PATH} (container path)" +echo "[host] logs: ${LOG_PATH} (container path)" diff --git a/src/mvp/v2.0/scripts/21_stop_api.sh b/src/mvp/v2.0/scripts/21_stop_api.sh new file mode 100755 index 0000000..4d711ca --- /dev/null +++ b/src/mvp/v2.0/scripts/21_stop_api.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" + +echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})" + +dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'" diff --git a/src/mvp/v2.0/scripts/22_status_api.sh b/src/mvp/v2.0/scripts/22_status_api.sh new file mode 100755 index 0000000..0b3a8b6 --- /dev/null +++ b/src/mvp/v2.0/scripts/22_status_api.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=lib.sh +source "${SCRIPT_DIR}/lib.sh" + +PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}" + +dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi" diff --git a/src/mvp/v2.0/scripts/lib.sh b/src/mvp/v2.0/scripts/lib.sh new file mode 100755 index 0000000..5f2c9bf --- /dev/null +++ b/src/mvp/v2.0/scripts/lib.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +# v2.0 scripts are intended to run on the host and control the existing Ray containers +# (same topology as v1.1). Adjust container names via env vars if needed. + +HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}" + +dexec() { + docker exec -i "${HEAD_CONTAINER}" "$@" +} +