mvp 2.0 验收通过,实现基本API提交,查询,取消任务,并且有简单的FIFO排队
This commit is contained in:
parent
2a1168c43c
commit
64558c8cea
194
specs/mvp/v2.0/v2_api.md
Normal file
194
specs/mvp/v2.0/v2_api.md
Normal file
@ -0,0 +1,194 @@
|
||||
# MVP v2.0 API 设计(最小可用)
|
||||
|
||||
v2.0 的 API 目标是:把 v1.1 的“脚本提交”变成“服务化提交”,并在服务侧实现队列/重试/状态聚合。
|
||||
|
||||
约束:
|
||||
- 内部 token 鉴权(简单即可)。
|
||||
- Ray Job 提交必须使用 **Ray Python SDK**(`JobSubmissionClient`),不使用 `requests` 手写 HTTP。
|
||||
- 输出与状态必须落盘到 NFS(容器内 `/private`)。
|
||||
|
||||
---
|
||||
|
||||
## 1. 鉴权
|
||||
|
||||
- Header:`Authorization: Bearer <INTERNAL_TOKEN>`
|
||||
- v2.0 不做用户体系与权限隔离;token 只是“防误用”。
|
||||
- 配置建议:复用 `src/mvp/v1.1/py/configs/dev.yaml` 并在 `v2.auth.token_env` 指定 token 环境变量名。
|
||||
|
||||
## 1.1 运行位置(dev 示例)
|
||||
|
||||
- 服务进程运行在 **Ray head 容器**(便于访问 Ray Job server)。
|
||||
- 宿主机侧用脚本控制(`docker exec`):
|
||||
- `src/mvp/v2.0/scripts/20_start_api.sh`
|
||||
- `src/mvp/v2.0/scripts/21_stop_api.sh`
|
||||
- `src/mvp/v2.0/scripts/22_status_api.sh`
|
||||
- 远程机目录约定(示例):`argus@h1:/home2/argus/infra/mvp/v2/`,容器内挂载到 `/workspace/mvp/v2/`。
|
||||
|
||||
---
|
||||
|
||||
## 2. 资源与 ID 约定
|
||||
|
||||
### 2.1 task_id(服务层主 ID)
|
||||
|
||||
- 格式建议:`mvp2-<workload>-<YYYYMMDD>-<HHMMSS>-<suffix>`
|
||||
- 示例:`mvp2-ppo-20251223-143201-7f3a`
|
||||
|
||||
### 2.2 ray_submission_id(attempt 级 ID)
|
||||
|
||||
- 由 service 派生:`<task_id>--a<NN>`
|
||||
- 示例:`mvp2-ppo-20251223-143201-7f3a--a01`
|
||||
|
||||
好处:
|
||||
- Ray 的 submission id 自带 task_id,可直接从 Ray dashboard 反查到服务侧任务。
|
||||
- `/private/jobs/<ray_submission_id>/...` 目录天然隔离且可读。
|
||||
|
||||
---
|
||||
|
||||
## 3. JobSpec(请求体)
|
||||
|
||||
v2.0 **要求 JobSpec 使用 v1.1 同款 YAML**(字段与语义保持一致),服务端接收 YAML 文本并解析后入库(同时原样保存 `jobspec_yaml` 便于审计/复现)。
|
||||
|
||||
最小字段(示例 YAML):
|
||||
|
||||
```yaml
|
||||
workload: "ppo"
|
||||
submission_id: "" # v2.0 服务端会忽略/覆盖(由 task_id 派生 ray_submission_id)
|
||||
code_path: "/private/common/code/verl/verl_repo"
|
||||
model_id: "Qwen/Qwen2.5-0.5B-Instruct"
|
||||
train_file: "/private/datasets/gsm8k/train.parquet"
|
||||
val_file: "/private/datasets/gsm8k/test.parquet"
|
||||
nnodes: 2
|
||||
n_gpus_per_node: 4
|
||||
total_epochs: 1
|
||||
total_training_steps: 10
|
||||
save_freq: 10
|
||||
test_freq: -1
|
||||
trainer_device: null # 仅 sft 使用(通常 "cpu")
|
||||
```
|
||||
|
||||
说明:
|
||||
- `trainer_device` 仅对 `sft` 生效(通常为 `cpu`,避免 driver 无 GPU)。
|
||||
- `val_file` 可为 `null`(例如 SFT)。
|
||||
|
||||
---
|
||||
|
||||
## 4. API 端点
|
||||
|
||||
### 4.1 提交任务
|
||||
|
||||
`POST /api/v2/tasks`
|
||||
|
||||
Request body:
|
||||
- **raw JobSpec YAML**(与 v1.1 jobspec YAML 结构一致)
|
||||
|
||||
Headers:
|
||||
- `Content-Type: application/yaml`(或 `text/yaml`)
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"task_id": "mvp2-ppo-20251223-143201-7f3a",
|
||||
"state": "QUEUED"
|
||||
}
|
||||
```
|
||||
|
||||
### 4.2 查询任务(聚合状态)
|
||||
|
||||
`GET /api/v2/tasks/{task_id}`
|
||||
|
||||
Response(示例):
|
||||
```json
|
||||
{
|
||||
"task_id": "mvp2-ppo-20251223-143201-7f3a",
|
||||
"workload": "ppo",
|
||||
"state": "RUNNING",
|
||||
"desired_resources": {"nnodes": 2, "n_gpus_per_node": 4, "total_gpus": 8},
|
||||
"latest_attempt": {
|
||||
"attempt_no": 1,
|
||||
"ray_submission_id": "mvp2-ppo-20251223-143201-7f3a--a01",
|
||||
"ray_status": "RUNNING",
|
||||
"start_time": "2025-12-23T14:32:10+08:00"
|
||||
},
|
||||
"error_summary": null
|
||||
}
|
||||
```
|
||||
|
||||
### 4.3 列出 attempts
|
||||
|
||||
`GET /api/v2/tasks/{task_id}/attempts`
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"task_id": "mvp2-ppo-20251223-143201-7f3a",
|
||||
"attempts": [
|
||||
{
|
||||
"attempt_no": 1,
|
||||
"ray_submission_id": "mvp2-ppo-20251223-143201-7f3a--a01",
|
||||
"ray_status": "FAILED",
|
||||
"failure_kind": "INSUFFICIENT_RESOURCES",
|
||||
"message": "Total available GPUs 0 is less than total desired GPUs 8",
|
||||
"start_time": "...",
|
||||
"end_time": "..."
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### 4.4 取消任务
|
||||
|
||||
`POST /api/v2/tasks/{task_id}:cancel`
|
||||
|
||||
行为:
|
||||
- 若 task 处于 `SUBMITTED/RUNNING`:调用 Ray Jobs SDK `stop_job(ray_submission_id)` 并标记 `CANCELED`
|
||||
- 若处于 `QUEUED/PENDING_RESOURCES`:直接标记 `CANCELED`(不提交)
|
||||
|
||||
Response:
|
||||
```json
|
||||
{"task_id":"...","state":"CANCELED"}
|
||||
```
|
||||
|
||||
### 4.5 获取日志
|
||||
|
||||
`GET /api/v2/tasks/{task_id}/logs?attempt=latest&tail=2000`
|
||||
|
||||
返回:
|
||||
- `text/plain`(直接透传 Ray Job logs tail)
|
||||
|
||||
说明:
|
||||
- v2.0 先用 Ray SDK `get_job_logs()`。
|
||||
- 若需要更稳定的归档,可在 scheduler 定期抓取并落盘(v2.1+)。
|
||||
|
||||
### 4.6 列出队列(运维/调试)
|
||||
|
||||
`GET /api/v2/queue`
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"pending": [{"task_id":"...","state":"PENDING_RESOURCES","next_run_at":"..."}],
|
||||
"running": [{"task_id":"...","ray_submission_id":"..."}]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. 错误码(最小)
|
||||
|
||||
- `400`:jobspec 缺字段/非法
|
||||
- `401`:token 不正确
|
||||
- `404`:task 不存在
|
||||
- `409`:状态冲突(例如已终态又 cancel)
|
||||
- `500`:服务内部错误
|
||||
|
||||
---
|
||||
|
||||
## 6. SQLite 持久化(API 可见性)
|
||||
|
||||
v2.0 服务端使用 SQLite 持久化保存:
|
||||
- tasks(`task_id`、`state`、`jobspec_yaml`、`next_run_at`、`latest_attempt_no` 等)
|
||||
- attempts(`ray_submission_id`、`ray_status`、失败原因等)
|
||||
|
||||
因此:
|
||||
- `GET /api/v2/tasks/{task_id}` 的数据来自 SQLite(再叠加 Ray 状态同步的结果)。
|
||||
- 进程重启后,队列可恢复,`PENDING_RESOURCES` 的任务会在 `next_run_at` 到期后继续尝试提交。
|
||||
306
specs/mvp/v2.0/v2_plan.md
Normal file
306
specs/mvp/v2.0/v2_plan.md
Normal file
@ -0,0 +1,306 @@
|
||||
# MVP v2.0 开发计划(服务化入口 + 队列调度 + Ray Jobs SDK)
|
||||
|
||||
目标:在 v1.1(脚本 + Ray Jobs SDK)已验收通过的基础上,交付一个**可独立运行的最小“服务层”**:
|
||||
- 用户通过 **HTTP API** 提交训练任务(PPO/GRPO/SFT)。
|
||||
- 服务层分配一个**人类易读的任务 ID**(`task_id`),并把任务放入队列。
|
||||
- 后台调度器在资源满足时再向 Ray 集群提交 Ray Job,并持续追踪 Ray Job 状态。
|
||||
- 针对 `verl` 的 **fail-fast 资源预检查**(资源不足直接 `ValueError` 失败)做“服务级重试/排队”,避免用户反复手工提交。
|
||||
|
||||
> 约束继承 v1.1:head 不跑训练;driver 必须落到 worker;共享存储只考虑 NFS(容器内 `/private`)。
|
||||
|
||||
---
|
||||
|
||||
## 1. 背景:为什么 v2.0 需要“服务层调度”
|
||||
|
||||
在 v1.1 中我们通过 Ray Job 提交 `verl` 训练任务。`verl` PPO/GRPO 在初始化 worker 时会创建资源池,并做一次 fail-fast 的资源检查:
|
||||
- 触发点:`ResourcePoolManager.create_resource_pool()` 末尾调用 `_check_resource_available()`
|
||||
- `_check_resource_available()` 使用 `ray._private.state.available_resources_per_node()` 统计“可用 GPU/NPU”,如果不足则直接抛异常:
|
||||
- `ValueError: Total available GPUs 0 is less than total desired GPUs 8`
|
||||
|
||||
这是一种合理的选择(避免 Ray 层面无限 pending/卡死),但会带来一个平台侧问题:
|
||||
- 当集群暂时没有足够资源时,用户提交会“立刻失败”,需要手动重试。
|
||||
|
||||
因此 v2.0 的服务层要提供:
|
||||
- **队列 + gang 约束**:资源不满足则任务在服务层 pending(不提交到 Ray)。
|
||||
- **状态追踪**:一旦提交到 Ray,持续获取 Ray Job 状态并回传给用户。
|
||||
- **资源不足的“自动重试”**:即使发生 race(提交时资源够、启动时被抢走),也能识别该类失败并延迟重试。
|
||||
|
||||
---
|
||||
|
||||
## 2. v2.0 交付范围(Scope)
|
||||
|
||||
### 2.1 必做(MVP v2.0)
|
||||
|
||||
1) **HTTP API**(内部 token):
|
||||
- 提交任务、查询任务、取消任务、拉取日志(最小可用)。
|
||||
2) **任务队列与调度器**:
|
||||
- FIFO(先到先服务),无配额/公平性(留给 v3+)。
|
||||
- gang:按 `nnodes` + `n_gpus_per_node` 的固定资源需求“全有才提交”。
|
||||
3) **Ray Jobs SDK 集成**(不使用 `requests` 自己拼 HTTP):
|
||||
- 通过 `ray.job_submission.JobSubmissionClient` submit/status/stop/logs。
|
||||
4) **可观测/可排障最小集**:
|
||||
- 每个 task/attempt 落盘配置、提交载荷、Ray 返回的 `submission_id`、关键日志。
|
||||
5) **失败策略**:
|
||||
- 识别 “资源不足 fail-fast” 类失败 → 转为 `PENDING_RESOURCES` 并延迟重试。
|
||||
- 其他失败保持 `FAILED`(不自动重试,避免掩盖错误)。
|
||||
|
||||
### 2.2 不做(v2.0 不实现)
|
||||
|
||||
- 多租户/配额/优先级/公平性调度(v3)。
|
||||
- Pipeline(多 job 串联)(v3+)。
|
||||
- 完整 UI(v3+,v2.0 可只提供 OpenAPI/Swagger)。
|
||||
- K8s 编排(明确不做,仍是 Native Ray)。
|
||||
|
||||
---
|
||||
|
||||
## 2.3 工程原则(开闭原则 / 复用 v1.1)
|
||||
|
||||
v2.0 研发遵循开闭原则(Open/Closed Principle):
|
||||
- **对扩展开放**:新增“服务层(API + scheduler + SQLite)”能力以支持排队、重试、状态聚合。
|
||||
- **对修改关闭**:尽量不改动 v1.1 已经稳定可用的 Ray Jobs SDK 提交链路代码。
|
||||
|
||||
落地方式:
|
||||
- 将 `src/mvp/v1.1/py/mvp_v11/` 作为“成熟可用提交层”,原样拷贝到 `src/mvp/v2.0/py/mvp_v11/` 供 v2.0 复用。
|
||||
- v2.0 的新增功能全部在新模块实现(例如 `src/mvp/v2.0/py/mvp_v2/`),通过组合/封装来调用 `mvp_v11`,避免在旧代码中掺杂平台逻辑。
|
||||
|
||||
---
|
||||
|
||||
## 3. 总体架构(v2.0)
|
||||
|
||||
### 3.1 组件
|
||||
|
||||
- **mvp-api**(HTTP Server)
|
||||
- 接收 JobSpec(结构化字段保持与 v1.1 一致的语义)
|
||||
- 生成 `task_id` 并写入持久化
|
||||
- 提供 query/cancel/logs
|
||||
|
||||
- **mvp-scheduler**(后台调度器,可与 api 同进程也可拆进程)
|
||||
- 轮询队列:对 `PENDING_RESOURCES` 的任务做资源判断
|
||||
- 资源满足 → 调用 Ray Jobs SDK 提交 → 记录 `ray_submission_id`
|
||||
- 对 `SUBMITTED/RUNNING` 的任务持续同步 Ray Job 状态
|
||||
- 如果 Ray Job 失败且命中资源不足模式 → 延迟重试
|
||||
|
||||
> 部署建议:v2.0 先在 **head 容器**内运行该服务(dev/prod 行为一致;生产环境只能 ssh 进入容器纳管)。
|
||||
|
||||
### 3.4 dev 环境目录约定(示例)
|
||||
|
||||
以当前远程开发机为例(`argus@h1`):
|
||||
- 宿主机目录:`/home2/argus/infra/mvp/v2/`
|
||||
- 容器内挂载:`/workspace/mvp/v2/`
|
||||
- 共享 NFS:容器内统一为 `/private/`(与 v1.1 保持一致)
|
||||
|
||||
> 注意:服务脚本(`v2/scripts/*.sh`)应在**宿主机**执行,通过 `docker exec` 控制 head 容器;训练 driver 仍通过 Ray entrypoint_resources 强制落到 worker。
|
||||
|
||||
### 3.2 与 Ray/容器的关系
|
||||
|
||||
- 服务进程运行在 head(或等价能访问 head 的 Job server 地址)。
|
||||
- 提交时仍使用 v1.1 的强约束:
|
||||
- head:`--num-cpus=0 --num-gpus=0`
|
||||
- worker:`--resources='{\"worker_node\": 100}'`
|
||||
- job entrypoint:`entrypoint_resources={\"worker_node\": 1}` 强制 driver 落 worker
|
||||
|
||||
---
|
||||
|
||||
## 3.3 配置约定(复用 v1.1 dev.yaml 并扩展)
|
||||
|
||||
v2.0 的服务层(API + scheduler)建议复用 v1.1 已存在的 RayConfig 文件:
|
||||
- `src/mvp/v1.1/py/configs/dev.yaml`
|
||||
|
||||
原因:
|
||||
- 其中已包含 v1.1 运行所需的 Ray 基础配置(Ray Job server address、entrypoint_resources、runtime_env 等),v2.0 也需要同样的信息来提交 Ray Jobs。
|
||||
|
||||
扩展方式:
|
||||
- 在该 YAML 中新增一个顶层 `v2:` section,存放 v2 服务专属配置(API 监听、SQLite 路径、scheduler 间隔等)。
|
||||
- v1.1 submitter 只读取 `address/shared_root/entrypoint_* /runtime_env/user_code_path`,会忽略 `v2:` 之类的额外字段;因此不会破坏 v1.1。
|
||||
|
||||
最小新增项建议(示例):
|
||||
- `v2.api.host` / `v2.api.port`
|
||||
- `v2.auth.token_env`(内部 token 环境变量名)
|
||||
- `v2.sqlite.db_path`(建议 `/private/common/db/mvp_v2.sqlite3`)
|
||||
- `v2.scheduler.tick_s` / `v2.scheduler.retry_interval_s` / `v2.scheduler.max_running_tasks`
|
||||
|
||||
---
|
||||
|
||||
## 4. 核心数据模型(Task / Attempt)
|
||||
|
||||
### 4.1 Task(用户视角的任务)
|
||||
|
||||
- `task_id`:**人类易读**且唯一,例如:
|
||||
- `mvp2-ppo-20251223-143201-7f3a`
|
||||
- `workload`:`ppo|grpo|sft`
|
||||
- `jobspec`:提交参数(**保持 v1.1 的 jobspec YAML 字段与语义**;服务端解析 YAML 后入库)
|
||||
- `state`:见第 5 节状态机
|
||||
- `created_at` / `updated_at`
|
||||
- `latest_attempt`:指向当前 attempt
|
||||
- `attempts[]`:历史尝试列表
|
||||
- `error_summary`:面向用户的简短错误(最后一次失败原因)
|
||||
|
||||
### 4.2 Attempt(一次真实的 Ray Job 提交)
|
||||
|
||||
- `attempt_no`:从 1 开始递增
|
||||
- `ray_submission_id`:建议派生自 task_id:
|
||||
- `ray_submission_id = <task_id>--a01`
|
||||
- 好处:Ray 侧输出目录天然可读、可追溯
|
||||
- `status`:Ray Job 状态(PENDING/RUNNING/SUCCEEDED/FAILED/STOPPED)
|
||||
- `start_time` / `end_time`
|
||||
- `exit_code`(如可取)
|
||||
- `failure_kind`(枚举):
|
||||
- `INSUFFICIENT_RESOURCES`(匹配 “Total available GPUs … less than total desired …”)
|
||||
- `USER_ERROR`(配置/数据路径错误等)
|
||||
- `RUNTIME_ERROR`(代码异常)
|
||||
- `UNKNOWN`
|
||||
|
||||
---
|
||||
|
||||
## 5. 状态机(服务侧)
|
||||
|
||||
建议最小状态集:
|
||||
|
||||
- `QUEUED`:已入队,尚未进行资源判断
|
||||
- `PENDING_RESOURCES`:资源不足,等待(服务侧 pending,不提交 Ray)
|
||||
- `SUBMITTING`:正在向 Ray 提交 attempt
|
||||
- `SUBMITTED`:Ray 已接受 submission(拿到 `ray_submission_id`)
|
||||
- `RUNNING`:Ray Job RUNNING
|
||||
- `SUCCEEDED`:任务成功(终态)
|
||||
- `FAILED`:任务失败(终态,除非命中“资源不足重试策略”)
|
||||
- `CANCELED`:用户取消(终态)
|
||||
|
||||
关键转换:
|
||||
- `QUEUED -> PENDING_RESOURCES`:资源不足
|
||||
- `QUEUED/PENDING_RESOURCES -> SUBMITTING`:资源满足
|
||||
- `SUBMITTING -> SUBMITTED`:提交成功
|
||||
- `SUBMITTED -> RUNNING`:Ray 状态推进
|
||||
- `SUBMITTED/RUNNING -> SUCCEEDED|FAILED`:Ray 终态
|
||||
- `FAILED (INSUFFICIENT_RESOURCES) -> PENDING_RESOURCES`:进入延迟重试(attempt_no+1)
|
||||
|
||||
---
|
||||
|
||||
## 6. 调度策略(v2.0)
|
||||
|
||||
### 6.1 资源计算(对齐 verl 的“可用资源”口径)
|
||||
|
||||
由于 verl 使用 `ray._private.state.available_resources_per_node()` 做“可用资源”统计,
|
||||
v2.0 的 scheduler 应该尽量使用相同口径,避免:
|
||||
- 我们认为够了 → 实际 verl 认为不够(仍 fail-fast)
|
||||
- 我们认为不够 → 实际够了(浪费)
|
||||
|
||||
策略(建议):
|
||||
1) scheduler 周期性获取 per-node 可用 GPU
|
||||
2) 计算 total_available_gpus = sum(node_gpu_available)
|
||||
3) 任务需求 total_required_gpus = nnodes * n_gpus_per_node
|
||||
4) 如果 `total_available_gpus < total_required_gpus` → `PENDING_RESOURCES`
|
||||
|
||||
注意:v2.0 先只做总量判断;节点级分配(保证每个 node 恰好 n_gpus_per_node)可作为 v2.1+(资源池/标签/节点纳管)增强点。
|
||||
|
||||
### 6.2 排队与并发
|
||||
|
||||
- 默认 FIFO。
|
||||
- 并发度:允许同时跑多个任务,但必须保证资源足够。
|
||||
- 简化实现:如果任务默认都吃满 8 卡,则 scheduler 实际上一次只能跑一个。
|
||||
- 若未来支持小任务(1*1、1*4),可以自然并发。
|
||||
|
||||
### 6.3 重试策略(资源不足)
|
||||
|
||||
当出现下面模式时判定为 `INSUFFICIENT_RESOURCES`:
|
||||
- Ray Job `status=FAILED`
|
||||
- `JobDetails.message` 或 `job logs` 中匹配:
|
||||
- `Total available GPUs` 且 `less than total desired`
|
||||
|
||||
处理:
|
||||
- 将 task 置为 `PENDING_RESOURCES`
|
||||
- `next_run_at = now + 60s`(固定间隔;v2.1 可改指数退避)
|
||||
- attempt_no++ 后重提(新 submission id)
|
||||
|
||||
---
|
||||
|
||||
## 7. SQLite 持久化(队列/状态/attempt)
|
||||
|
||||
v2.0 引入一个**最小但可恢复的持久化层**:使用 SQLite 保存任务队列与状态,确保:
|
||||
- api/scheduler 进程重启后,队列不丢;
|
||||
- task/attempt 历史可追溯;
|
||||
- 能实现“服务侧 pending + 延迟重试”的确定性行为。
|
||||
|
||||
### 7.1 存放位置
|
||||
|
||||
建议路径(容器内):
|
||||
- `DB_PATH=/private/common/db/mvp_v2.sqlite3`
|
||||
|
||||
说明:
|
||||
- v2.0 默认单实例服务(单 writer),SQLite 足够。
|
||||
- 生产环境若 NFS 上的 SQLite 有锁/性能风险,v2.1+ 再演进到 Postgres/Redis;v2.0 先以“可回放/可恢复”为第一目标。
|
||||
|
||||
### 7.2 表设计(建议最小集合)
|
||||
|
||||
- `tasks`
|
||||
- `task_id` (PK)
|
||||
- `workload`
|
||||
- `state`(服务侧状态机)
|
||||
- `jobspec_yaml`(原始 YAML 文本,原样落盘便于审计/复现)
|
||||
- `created_at`, `updated_at`
|
||||
- `next_run_at`(用于 `PENDING_RESOURCES` 的延迟重试)
|
||||
- `error_summary`
|
||||
- `latest_attempt_no`
|
||||
|
||||
- `attempts`
|
||||
- `task_id` (FK)
|
||||
- `attempt_no`
|
||||
- `ray_submission_id`
|
||||
- `ray_status`
|
||||
- `failure_kind`
|
||||
- `message`(截断后的关键信息)
|
||||
- `start_time`, `end_time`
|
||||
|
||||
- `events`(可选,但非常利于排障)
|
||||
- `id` (PK)
|
||||
- `task_id`
|
||||
- `ts`
|
||||
- `event_type`(STATE_TRANSITION / SUBMIT / RAY_STATUS_SYNC / RETRY_SCHEDULED 等)
|
||||
- `payload_json`
|
||||
|
||||
### 7.3 调度循环(与 SQLite 的交互)
|
||||
|
||||
scheduler 每个 tick 做三件事:
|
||||
1) **挑选可运行任务**(FIFO + next_run_at):
|
||||
- `state IN ('QUEUED','PENDING_RESOURCES') AND next_run_at <= now`
|
||||
2) **资源判断**(对齐 verl 的可用资源口径):
|
||||
- 不满足:更新 `state='PENDING_RESOURCES'`,并写入 `next_run_at=now+60s`
|
||||
3) **提交 Ray Job 并追踪**:
|
||||
- 提交成功:写入 `attempts` 并更新 `tasks.latest_attempt_no`、`state='SUBMITTED'`
|
||||
- 周期性同步 Ray 状态:`SUBMITTED/RUNNING -> SUCCEEDED/FAILED`
|
||||
- 若失败命中资源不足模式:`FAILED -> PENDING_RESOURCES` + 计划下次重试
|
||||
|
||||
---
|
||||
|
||||
## 8. 接口与验收(DoD)
|
||||
|
||||
### 8.1 API 能力(最小集合)
|
||||
|
||||
详见 `specs/mvp/v2.0/v2_api.md`。
|
||||
|
||||
### 8.2 验收口径(DoD)
|
||||
|
||||
1) API 提交 PPO/GRPO/SFT,返回 `task_id`,并在 NFS 上创建任务目录。
|
||||
2) 当集群忙(GPU 不足)时:
|
||||
- task 状态为 `PENDING_RESOURCES`(不是 FAILED)
|
||||
- 一旦资源释放,任务自动变为 `SUBMITTED/RUNNING`
|
||||
3) 当 race 导致触发 verl fail-fast:
|
||||
- attempt 标记为 `INSUFFICIENT_RESOURCES`
|
||||
- task 回到 `PENDING_RESOURCES`,并在 60s 后自动重试
|
||||
4) 通过 API 查询 task 能看到:
|
||||
- 当前 state
|
||||
- 最新 attempt 的 `ray_submission_id`
|
||||
- attempt 历史(至少包含开始/结束/失败原因)
|
||||
5) Cancel 能停止正在运行的 Ray Job(调用 Ray Jobs SDK stop)
|
||||
|
||||
---
|
||||
|
||||
## 9. v2.0 交付物建议(目录)
|
||||
|
||||
`specs/mvp/v2.0/`(本目录):
|
||||
- `v2_plan.md`:总体设计与开发计划(本文件)
|
||||
- `v2_api.md`:API 详细定义(请求/响应/字段/错误码)
|
||||
|
||||
代码建议位置(后续实现时):
|
||||
- `src/mvp/v2.0/`
|
||||
- `py/`:API server + scheduler
|
||||
- `scripts/`:启动/停止/查看状态(仍沿用 v1.1 的 compose/cluster 逻辑)
|
||||
@ -7,10 +7,12 @@ services:
|
||||
command: sleep infinity
|
||||
ports:
|
||||
- "8265:8265"
|
||||
- "8080:8080"
|
||||
volumes:
|
||||
- ../verl:/workspace/verl
|
||||
- ../shared:/private
|
||||
- .:/workspace/mvp/v1.1
|
||||
- ../v2:/workspace/mvp/v2
|
||||
shm_size: "10g"
|
||||
ulimits:
|
||||
nofile:
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
# Ray 基础配置(dev 环境 / head 容器内视角)
|
||||
#
|
||||
# 说明:
|
||||
# - v1.1 的 SDK submitter 会读取本文件作为 RayConfig。
|
||||
# - v2.0 的 API 服务/调度器也复用本文件作为“基础 RayConfig”,并在其上扩展 v2 专属配置项(见 v2:)。
|
||||
address: "http://127.0.0.1:8265"
|
||||
|
||||
# 容器内共享根路径(对齐生产 /private)
|
||||
@ -18,3 +22,17 @@ runtime_env:
|
||||
# 用户自定义代码目录(可被 PYTHONPATH 注入)
|
||||
user_code_path: "/private/user/code"
|
||||
|
||||
# v2.0 服务层配置(v1.1 submitter 会忽略这些字段;v2.0 服务会读取)
|
||||
v2:
|
||||
api:
|
||||
host: "0.0.0.0"
|
||||
port: 8080
|
||||
auth:
|
||||
# 内部 token 建议通过环境变量提供,避免写入配置文件
|
||||
token_env: "MVP_INTERNAL_TOKEN"
|
||||
sqlite:
|
||||
db_path: "/private/common/db/mvp_v2.sqlite3"
|
||||
scheduler:
|
||||
tick_s: 5
|
||||
retry_interval_s: 60
|
||||
max_running_tasks: 1
|
||||
|
||||
104
src/mvp/v2.0/README.md
Normal file
104
src/mvp/v2.0/README.md
Normal file
@ -0,0 +1,104 @@
|
||||
# MVP v2.0(服务化入口)
|
||||
|
||||
v2.0 在 v1.1(Ray Jobs SDK 提交链路)基础上新增一个**服务层**:
|
||||
- HTTP API 提交任务(PPO/GRPO/SFT)
|
||||
- 服务侧队列 + gang 资源判断
|
||||
- 识别 `verl` fail-fast 的资源不足失败并自动重试
|
||||
- SQLite 持久化队列/状态/attempt(NFS:`/private`)
|
||||
|
||||
设计文档见:
|
||||
- `specs/mvp/v2.0/v2_plan.md`
|
||||
- `specs/mvp/v2.0/v2_api.md`
|
||||
|
||||
## 快速开始(dev 示例)
|
||||
|
||||
约定:
|
||||
- Ray 容器仍由 v1.1 的 `docker-compose.yaml` 启动(head+2 workers)
|
||||
- v2 代码在宿主机:`/home2/argus/infra/mvp/v2/`(容器内挂载 `/workspace/mvp/v2/`)
|
||||
- v2 配置复用 v1.1:`/workspace/mvp/v1.1/py/configs/dev.yaml`(扩展了 `v2:` 段)
|
||||
|
||||
宿主机执行:
|
||||
|
||||
```bash
|
||||
export MVP_INTERNAL_TOKEN=... # 内部 token
|
||||
cd /home2/argus/infra/mvp/v2/scripts
|
||||
./12_install_v2_deps.sh
|
||||
./20_start_api.sh
|
||||
./22_status_api.sh
|
||||
```
|
||||
|
||||
API 测试(宿主机):
|
||||
|
||||
```bash
|
||||
curl -H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" http://127.0.0.1:8080/api/v2/queue
|
||||
```
|
||||
|
||||
> 进程日志与 pid(容器内路径)默认在 `/private/common/logs/` 与 `/private/common/run/`。
|
||||
|
||||
## 提交/查询/停止任务
|
||||
|
||||
约定:
|
||||
- API 地址(宿主机视角):`http://127.0.0.1:8080`
|
||||
- 鉴权:`Authorization: Bearer ${MVP_INTERNAL_TOKEN}`
|
||||
- 请求体:**raw YAML**(JobSpec,格式与 v1.1 jobspec 一致)
|
||||
|
||||
### 1) 提交任务(POST /api/v2/tasks)
|
||||
|
||||
准备一个 jobspec(示例:PPO):
|
||||
|
||||
```yaml
|
||||
workload: "ppo"
|
||||
submission_id: "" # v2 会忽略/覆盖,自动生成 task_id 并派生 ray_submission_id
|
||||
code_path: "/private/common/code/verl/verl_repo"
|
||||
model_id: "Qwen/Qwen2.5-0.5B-Instruct"
|
||||
train_file: "/private/datasets/gsm8k/train.parquet"
|
||||
val_file: "/private/datasets/gsm8k/test.parquet"
|
||||
nnodes: 2
|
||||
n_gpus_per_node: 4
|
||||
total_epochs: 1
|
||||
total_training_steps: 10
|
||||
save_freq: 10
|
||||
test_freq: -1
|
||||
trainer_device: null
|
||||
```
|
||||
|
||||
提交:
|
||||
|
||||
```bash
|
||||
curl -sS \
|
||||
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
|
||||
-H "Content-Type: application/yaml" \
|
||||
--data-binary @jobspec.yaml \
|
||||
http://127.0.0.1:8080/api/v2/tasks
|
||||
```
|
||||
|
||||
返回示例:
|
||||
|
||||
```json
|
||||
{"task_id":"mvp2-ppo-20251223-082813-6426","state":"QUEUED"}
|
||||
```
|
||||
|
||||
### 2) 查询任务(GET /api/v2/tasks/{task_id})
|
||||
|
||||
```bash
|
||||
curl -sS \
|
||||
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
|
||||
http://127.0.0.1:8080/api/v2/tasks/<task_id> | python3 -m json.tool
|
||||
```
|
||||
|
||||
可选:
|
||||
- 查看 attempts:`GET /api/v2/tasks/{task_id}/attempts`
|
||||
- 拉取日志(latest attempt):`GET /api/v2/tasks/{task_id}/logs?tail=2000`
|
||||
- 查看队列:`GET /api/v2/queue`
|
||||
|
||||
### 3) 停止/取消任务(POST /api/v2/tasks/{task_id}:cancel)
|
||||
|
||||
```bash
|
||||
curl -sS -X POST \
|
||||
-H "Authorization: Bearer ${MVP_INTERNAL_TOKEN}" \
|
||||
http://127.0.0.1:8080/api/v2/tasks/<task_id>:cancel
|
||||
```
|
||||
|
||||
说明:
|
||||
- 若任务已提交到 Ray(`SUBMITTED/RUNNING`),服务会调用 Ray Jobs SDK `stop_job(ray_submission_id)`。
|
||||
- 若任务仍在队列(`QUEUED/PENDING_RESOURCES`),服务直接标记 `CANCELED`(不会产生 attempt)。
|
||||
1
src/mvp/v2.0/py/mvp_v11/__init__.py
Normal file
1
src/mvp/v2.0/py/mvp_v11/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
|
||||
96
src/mvp/v2.0/py/mvp_v11/builders.py
Normal file
96
src/mvp/v2.0/py/mvp_v11/builders.py
Normal file
@ -0,0 +1,96 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .models import JobSpec
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BuiltCommand:
|
||||
argv: list[str]
|
||||
|
||||
|
||||
def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> BuiltCommand:
|
||||
"""
|
||||
Returns argv for the actual training process (Hydra overrides preserved).
|
||||
This argv is executed by a lightweight Python driver entrypoint.
|
||||
"""
|
||||
if spec.workload in ("ppo", "grpo"):
|
||||
algo_overrides: list[str] = []
|
||||
if spec.workload == "grpo":
|
||||
algo_overrides.append("algorithm.adv_estimator=grpo")
|
||||
|
||||
test_freq = spec.test_freq if spec.test_freq is not None else -1
|
||||
val_file = spec.val_file if spec.val_file is not None else "null"
|
||||
|
||||
argv = [
|
||||
"python3",
|
||||
"-m",
|
||||
"verl.trainer.main_ppo",
|
||||
f"data.train_files={spec.train_file}",
|
||||
f"data.val_files={val_file}",
|
||||
"data.train_batch_size=256",
|
||||
"data.max_prompt_length=512",
|
||||
"data.max_response_length=512",
|
||||
f"actor_rollout_ref.model.path={spec.model_id}",
|
||||
"actor_rollout_ref.actor.optim.lr=1e-6",
|
||||
"actor_rollout_ref.actor.ppo_mini_batch_size=64",
|
||||
"actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4",
|
||||
"actor_rollout_ref.rollout.name=sglang",
|
||||
"actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8",
|
||||
"actor_rollout_ref.rollout.tensor_model_parallel_size=1",
|
||||
"actor_rollout_ref.rollout.gpu_memory_utilization=0.4",
|
||||
"actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4",
|
||||
"critic.optim.lr=1e-5",
|
||||
f"critic.model.path={spec.model_id}",
|
||||
"critic.ppo_micro_batch_size_per_gpu=4",
|
||||
"algorithm.kl_ctrl.kl_coef=0.001",
|
||||
*algo_overrides,
|
||||
"trainer.logger=console",
|
||||
"trainer.val_before_train=False",
|
||||
f"trainer.n_gpus_per_node={spec.n_gpus_per_node}",
|
||||
f"trainer.nnodes={spec.nnodes}",
|
||||
f"trainer.save_freq={spec.save_freq}",
|
||||
f"trainer.test_freq={test_freq}",
|
||||
f"trainer.total_epochs={spec.total_epochs}",
|
||||
f"trainer.total_training_steps={spec.total_training_steps}",
|
||||
"trainer.resume_mode=disable",
|
||||
f"trainer.default_local_dir={job_dir}/checkpoints",
|
||||
"+ray_kwargs.ray_init.address=auto",
|
||||
f"hydra.run.dir={job_dir}/logs/hydra",
|
||||
]
|
||||
return BuiltCommand(argv=argv)
|
||||
|
||||
if spec.workload == "sft":
|
||||
val_override = "null" if spec.val_file is None else spec.val_file
|
||||
trainer_device = spec.trainer_device or "cpu"
|
||||
|
||||
argv = [
|
||||
"python3",
|
||||
"-m",
|
||||
"verl.trainer.sft_trainer_ray",
|
||||
f"model.path={spec.model_id}",
|
||||
f"data.train_files={spec.train_file}",
|
||||
f"data.val_files={val_override}",
|
||||
"data.train_batch_size=64",
|
||||
"data.micro_batch_size_per_gpu=1",
|
||||
"data.max_token_len_per_gpu=2048",
|
||||
"data.max_length=1024",
|
||||
"trainer.logger=console",
|
||||
"trainer.project_name=mvp11-sft",
|
||||
f"trainer.experiment_name={submission_id}",
|
||||
f"trainer.total_epochs={spec.total_epochs}",
|
||||
f"trainer.total_training_steps={spec.total_training_steps}",
|
||||
f"trainer.save_freq={spec.save_freq}",
|
||||
"trainer.test_freq=-1",
|
||||
"trainer.resume_mode=disable",
|
||||
f"trainer.device={trainer_device}",
|
||||
f"trainer.default_local_dir={job_dir}/checkpoints",
|
||||
f"trainer.nnodes={spec.nnodes}",
|
||||
f"trainer.n_gpus_per_node={spec.n_gpus_per_node}",
|
||||
f"hydra.run.dir={job_dir}/logs/hydra",
|
||||
]
|
||||
return BuiltCommand(argv=argv)
|
||||
|
||||
raise ValueError(f"unsupported workload: {spec.workload}")
|
||||
|
||||
63
src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py
Normal file
63
src/mvp/v2.0/py/mvp_v11/driver_entrypoint.py
Normal file
@ -0,0 +1,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _preflight() -> None:
|
||||
print("MVP_PRECHECK_PYTHON:", sys.executable, flush=True)
|
||||
print("MVP_PRECHECK_PYTHONPATH:", os.environ.get("PYTHONPATH"), flush=True)
|
||||
print("MVP_PRECHECK_MVP_CODE_PATH:", os.environ.get("MVP_CODE_PATH"), flush=True)
|
||||
try:
|
||||
import verl # type: ignore
|
||||
|
||||
print("MVP_PRECHECK_VERL_FILE:", getattr(verl, "__file__", None), flush=True)
|
||||
except Exception as e:
|
||||
print("MVP_PRECHECK_VERL_IMPORT_ERROR:", repr(e), flush=True)
|
||||
|
||||
try:
|
||||
import mvp_marker # type: ignore
|
||||
|
||||
print("MVP_PRECHECK_MARKER:", getattr(mvp_marker, "MARKER", None), flush=True)
|
||||
except Exception as e:
|
||||
print("MVP_PRECHECK_MARKER_MISSING:", repr(e), flush=True)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--job-dir", required=True)
|
||||
parser.add_argument("cmd", nargs=argparse.REMAINDER)
|
||||
args = parser.parse_args()
|
||||
|
||||
job_dir = Path(args.job_dir)
|
||||
job_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
_preflight()
|
||||
|
||||
if not args.cmd:
|
||||
print("no command provided", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
# argparse includes the leading "--" if the caller uses it; strip it.
|
||||
cmd = list(args.cmd)
|
||||
if cmd and cmd[0] == "--":
|
||||
cmd = cmd[1:]
|
||||
if not cmd:
|
||||
print("no command provided", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
# Execute training command as a subprocess so that logs are captured by Ray job logs.
|
||||
cmd_str = " ".join(shlex.quote(x) for x in cmd)
|
||||
print("MVP_DRIVER_EXEC:", cmd_str, flush=True)
|
||||
|
||||
proc = subprocess.run(cmd, check=False)
|
||||
print("MVP_DRIVER_EXIT_CODE:", proc.returncode, flush=True)
|
||||
return proc.returncode
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
121
src/mvp/v2.0/py/mvp_v11/models.py
Normal file
121
src/mvp/v2.0/py/mvp_v11/models.py
Normal file
@ -0,0 +1,121 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _require(d: dict[str, Any], key: str) -> Any:
|
||||
if key not in d or d[key] in (None, ""):
|
||||
raise ValueError(f"missing required field: {key}")
|
||||
return d[key]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RayConfig:
|
||||
address: str
|
||||
shared_root: str
|
||||
entrypoint_num_cpus: float
|
||||
entrypoint_resources: dict[str, float]
|
||||
runtime_env_env_vars: dict[str, str]
|
||||
user_code_path: str
|
||||
|
||||
@staticmethod
|
||||
def from_dict(d: dict[str, Any]) -> "RayConfig":
|
||||
runtime_env = d.get("runtime_env") or {}
|
||||
env_vars = (runtime_env.get("env_vars") or {}) if isinstance(runtime_env, dict) else {}
|
||||
if not isinstance(env_vars, dict):
|
||||
raise ValueError("runtime_env.env_vars must be a mapping")
|
||||
|
||||
entrypoint_resources = d.get("entrypoint_resources") or {}
|
||||
if not isinstance(entrypoint_resources, dict):
|
||||
raise ValueError("entrypoint_resources must be a mapping")
|
||||
|
||||
return RayConfig(
|
||||
address=str(_require(d, "address")),
|
||||
shared_root=str(_require(d, "shared_root")),
|
||||
entrypoint_num_cpus=float(d.get("entrypoint_num_cpus", 1)),
|
||||
entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()},
|
||||
runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()},
|
||||
user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")),
|
||||
)
|
||||
|
||||
def to_public_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"address": self.address,
|
||||
"shared_root": self.shared_root,
|
||||
"entrypoint_num_cpus": self.entrypoint_num_cpus,
|
||||
"entrypoint_resources": self.entrypoint_resources,
|
||||
"runtime_env": {"env_vars": self.runtime_env_env_vars},
|
||||
"user_code_path": self.user_code_path,
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class JobSpec:
|
||||
workload: str # ppo|grpo|sft
|
||||
submission_id: str | None
|
||||
code_path: str
|
||||
model_id: str
|
||||
|
||||
train_file: str
|
||||
val_file: str | None
|
||||
|
||||
nnodes: int
|
||||
n_gpus_per_node: int
|
||||
|
||||
total_epochs: int
|
||||
total_training_steps: int
|
||||
|
||||
save_freq: int
|
||||
test_freq: int | None
|
||||
|
||||
trainer_device: str | None # only for sft (driver-side device)
|
||||
|
||||
@staticmethod
|
||||
def from_dict(d: dict[str, Any]) -> "JobSpec":
|
||||
workload = str(_require(d, "workload"))
|
||||
if workload not in ("ppo", "grpo", "sft"):
|
||||
raise ValueError(f"unsupported workload: {workload}")
|
||||
|
||||
val_file = d.get("val_file", None)
|
||||
if val_file in ("", "null"):
|
||||
val_file = None
|
||||
|
||||
test_freq = d.get("test_freq", None)
|
||||
if test_freq in ("", "null"):
|
||||
test_freq = None
|
||||
|
||||
return JobSpec(
|
||||
workload=workload,
|
||||
submission_id=(str(d["submission_id"]) if d.get("submission_id") else None),
|
||||
code_path=str(_require(d, "code_path")),
|
||||
model_id=str(_require(d, "model_id")),
|
||||
train_file=str(_require(d, "train_file")),
|
||||
val_file=(str(val_file) if val_file is not None else None),
|
||||
nnodes=int(d.get("nnodes", 2)),
|
||||
n_gpus_per_node=int(d.get("n_gpus_per_node", 4)),
|
||||
total_epochs=int(d.get("total_epochs", 1)),
|
||||
total_training_steps=int(d.get("total_training_steps", 10)),
|
||||
save_freq=int(d.get("save_freq", 10)),
|
||||
test_freq=(int(test_freq) if test_freq is not None else None),
|
||||
trainer_device=(str(d.get("trainer_device")) if d.get("trainer_device") else None),
|
||||
)
|
||||
|
||||
def to_public_dict(self) -> dict[str, Any]:
|
||||
out: dict[str, Any] = {
|
||||
"workload": self.workload,
|
||||
"submission_id": self.submission_id or "",
|
||||
"code_path": self.code_path,
|
||||
"model_id": self.model_id,
|
||||
"train_file": self.train_file,
|
||||
"val_file": self.val_file,
|
||||
"nnodes": self.nnodes,
|
||||
"n_gpus_per_node": self.n_gpus_per_node,
|
||||
"total_epochs": self.total_epochs,
|
||||
"total_training_steps": self.total_training_steps,
|
||||
"save_freq": self.save_freq,
|
||||
"test_freq": self.test_freq,
|
||||
}
|
||||
if self.workload == "sft":
|
||||
out["trainer_device"] = self.trainer_device or "cpu"
|
||||
return out
|
||||
171
src/mvp/v2.0/py/mvp_v11/ray_job_tool.py
Normal file
171
src/mvp/v2.0/py/mvp_v11/ray_job_tool.py
Normal file
@ -0,0 +1,171 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import shlex
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import ray
|
||||
from ray.job_submission import JobSubmissionClient
|
||||
|
||||
from .builders import build_training_argv
|
||||
from .models import JobSpec, RayConfig
|
||||
from .yaml_io import dump_yaml
|
||||
|
||||
|
||||
def _ts() -> str:
|
||||
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
|
||||
def _mkdir(p: Path) -> None:
|
||||
p.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def _write_text(p: Path, content: str) -> None:
|
||||
_mkdir(p.parent)
|
||||
p.write_text(content, encoding="utf-8")
|
||||
|
||||
|
||||
def _write_json(p: Path, obj: Any) -> None:
|
||||
_write_text(p, json.dumps(obj, indent=2, ensure_ascii=False) + "\n")
|
||||
|
||||
|
||||
def _safe_basename(path: str) -> str:
|
||||
return path.rstrip("/").split("/")[-1]
|
||||
|
||||
|
||||
class RayJobTool:
|
||||
def __init__(self, cfg: RayConfig):
|
||||
self.cfg = cfg
|
||||
self.client = JobSubmissionClient(cfg.address)
|
||||
|
||||
def _job_dir(self, submission_id: str) -> str:
|
||||
return f"{self.cfg.shared_root}/jobs/{submission_id}"
|
||||
|
||||
def _runtime_env(self, spec: JobSpec) -> dict[str, Any]:
|
||||
env_vars = dict(self.cfg.runtime_env_env_vars)
|
||||
|
||||
# Default HF cache
|
||||
env_vars.setdefault("HF_HOME", f"{self.cfg.shared_root}/hf")
|
||||
env_vars.setdefault("HUGGINGFACE_HUB_CACHE", f"{self.cfg.shared_root}/hf/hub")
|
||||
env_vars.setdefault("TRANSFORMERS_CACHE", f"{self.cfg.shared_root}/hf/transformers")
|
||||
env_vars.setdefault("PYTHONUNBUFFERED", "1")
|
||||
|
||||
# Tool code path must be importable on workers (compose mounts v1.1 into all containers).
|
||||
# Place it before verl code to avoid interfering with verl import priority.
|
||||
tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/v1.1/py")
|
||||
|
||||
user_code_path = self.cfg.user_code_path
|
||||
code_path = spec.code_path
|
||||
|
||||
existing = env_vars.get("PYTHONPATH", "")
|
||||
prefix = f"{tool_code_path}:{code_path}:{user_code_path}"
|
||||
env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix
|
||||
|
||||
# For debugging / log visibility
|
||||
env_vars["MVP_CODE_PATH"] = code_path
|
||||
|
||||
# SFT: ensure ray.init() connects to the cluster
|
||||
if spec.workload == "sft":
|
||||
env_vars.setdefault("RAY_ADDRESS", "auto")
|
||||
|
||||
return {"env_vars": env_vars}
|
||||
|
||||
def submit(self, spec: JobSpec, no_wait: bool) -> str:
|
||||
submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}"
|
||||
job_dir = self._job_dir(submission_id)
|
||||
|
||||
built = build_training_argv(spec, submission_id=submission_id, job_dir=job_dir)
|
||||
entrypoint_argv = [
|
||||
"python3",
|
||||
"-m",
|
||||
"mvp_v11.driver_entrypoint",
|
||||
"--job-dir",
|
||||
job_dir,
|
||||
*built.argv,
|
||||
]
|
||||
entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv)
|
||||
|
||||
runtime_env = self._runtime_env(spec)
|
||||
|
||||
# Prepare job artifacts directory
|
||||
job_root = Path(job_dir)
|
||||
_mkdir(job_root / "config")
|
||||
_mkdir(job_root / "logs")
|
||||
_mkdir(job_root / "debug")
|
||||
_mkdir(job_root / "checkpoints")
|
||||
|
||||
_write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict()))
|
||||
_write_text(job_root / "config" / "jobspec.yaml", dump_yaml(spec.to_public_dict()))
|
||||
_write_json(job_root / "config" / "submit_payload.json", {
|
||||
"submission_id": submission_id,
|
||||
"address": self.cfg.address,
|
||||
"entrypoint": entrypoint,
|
||||
"entrypoint_num_cpus": self.cfg.entrypoint_num_cpus,
|
||||
"entrypoint_resources": self.cfg.entrypoint_resources,
|
||||
"runtime_env": runtime_env,
|
||||
})
|
||||
|
||||
# Pre-submit debug snapshot (ray cluster resources via ray.init)
|
||||
try:
|
||||
ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False)
|
||||
_write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources())
|
||||
_write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources())
|
||||
except Exception as e:
|
||||
_write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n")
|
||||
|
||||
try:
|
||||
submitted = self.client.submit_job(
|
||||
entrypoint=entrypoint,
|
||||
submission_id=submission_id,
|
||||
runtime_env=runtime_env,
|
||||
entrypoint_num_cpus=self.cfg.entrypoint_num_cpus,
|
||||
entrypoint_resources=self.cfg.entrypoint_resources,
|
||||
)
|
||||
except Exception as e:
|
||||
_write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n")
|
||||
raise
|
||||
|
||||
_write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n")
|
||||
|
||||
# Post-submit debug snapshot via SDK
|
||||
try:
|
||||
jobs = self.client.list_jobs()
|
||||
_write_text(
|
||||
job_root / "debug" / "ray_job_list_post.json",
|
||||
json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n",
|
||||
)
|
||||
except Exception as e:
|
||||
_write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n")
|
||||
|
||||
if not no_wait:
|
||||
# caller can separately wait; keep submit non-blocking by default in scripts
|
||||
pass
|
||||
|
||||
return submitted
|
||||
|
||||
def status(self, submission_id: str) -> str:
|
||||
return str(self.client.get_job_status(submission_id))
|
||||
|
||||
def stop(self, submission_id: str) -> bool:
|
||||
return bool(self.client.stop_job(submission_id))
|
||||
|
||||
def logs(self, submission_id: str) -> str:
|
||||
return self.client.get_job_logs(submission_id)
|
||||
|
||||
def list(self) -> list[dict[str, Any]]:
|
||||
return [_job_details_to_dict(j) for j in self.client.list_jobs()]
|
||||
|
||||
|
||||
def _job_details_to_dict(obj: Any) -> dict[str, Any]:
|
||||
# Ray uses pydantic models internally, but depending on bundled pydantic version
|
||||
# we might get `.model_dump()` (v2) or `.dict()` (v1).
|
||||
if hasattr(obj, "model_dump"):
|
||||
return obj.model_dump() # type: ignore[no-any-return]
|
||||
if hasattr(obj, "dict"):
|
||||
return obj.dict() # type: ignore[no-any-return]
|
||||
if hasattr(obj, "__dict__"):
|
||||
return dict(obj.__dict__)
|
||||
return {"repr": repr(obj)}
|
||||
21
src/mvp/v2.0/py/mvp_v11/yaml_io.py
Normal file
21
src/mvp/v2.0/py/mvp_v11/yaml_io.py
Normal file
@ -0,0 +1,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def load_yaml(path: str) -> dict[str, Any]:
|
||||
p = Path(path)
|
||||
data = yaml.safe_load(p.read_text(encoding="utf-8"))
|
||||
if data is None:
|
||||
return {}
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"yaml root must be a mapping: {path}")
|
||||
return data
|
||||
|
||||
|
||||
def dump_yaml(data: dict[str, Any]) -> str:
|
||||
return yaml.safe_dump(data, sort_keys=False, allow_unicode=True)
|
||||
|
||||
2
src/mvp/v2.0/py/mvp_v2/__init__.py
Normal file
2
src/mvp/v2.0/py/mvp_v2/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
__all__ = []
|
||||
|
||||
190
src/mvp/v2.0/py/mvp_v2/app.py
Normal file
190
src/mvp/v2.0/py/mvp_v2/app.py
Normal file
@ -0,0 +1,190 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
from fastapi import FastAPI, HTTPException, Request, Response
|
||||
|
||||
from mvp_v11.models import JobSpec, RayConfig
|
||||
|
||||
from .config import V2Config
|
||||
from .db import Db
|
||||
from .ids import new_task_id
|
||||
from .scheduler import Scheduler
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
from datetime import datetime
|
||||
|
||||
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||
|
||||
|
||||
def _load_yaml_file(path: str) -> dict[str, Any]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
obj = yaml.safe_load(f) or {}
|
||||
if not isinstance(obj, dict):
|
||||
raise ValueError("config yaml must be a mapping")
|
||||
return obj
|
||||
|
||||
|
||||
def create_app(config_path: str) -> FastAPI:
|
||||
root = _load_yaml_file(config_path)
|
||||
ray_cfg = RayConfig.from_dict(root)
|
||||
v2_cfg = V2Config.from_root_dict(root)
|
||||
|
||||
db = Db(v2_cfg.sqlite.db_path)
|
||||
db.init()
|
||||
|
||||
scheduler = Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg)
|
||||
stop_flag = threading.Event()
|
||||
tool = scheduler.tool
|
||||
|
||||
app = FastAPI(title="mvp-v2", version="2.0")
|
||||
|
||||
def _require_token(req: Request) -> None:
|
||||
token_env = v2_cfg.auth.token_env
|
||||
expected = os.environ.get(token_env, "")
|
||||
if not expected:
|
||||
# Misconfigured service; treat as server error.
|
||||
raise HTTPException(status_code=500, detail=f"missing token env: {token_env}")
|
||||
|
||||
auth = req.headers.get("authorization") or ""
|
||||
if not auth.startswith("Bearer "):
|
||||
raise HTTPException(status_code=401, detail="missing bearer token")
|
||||
got = auth.removeprefix("Bearer ").strip()
|
||||
if got != expected:
|
||||
raise HTTPException(status_code=401, detail="invalid token")
|
||||
|
||||
@app.on_event("startup")
|
||||
def _startup() -> None:
|
||||
t = threading.Thread(target=scheduler.run_forever, args=(stop_flag,), daemon=True)
|
||||
t.start()
|
||||
|
||||
@app.on_event("shutdown")
|
||||
def _shutdown() -> None:
|
||||
stop_flag.set()
|
||||
|
||||
@app.post("/api/v2/tasks")
|
||||
async def submit_task(req: Request) -> dict[str, Any]:
|
||||
_require_token(req)
|
||||
body = (await req.body()).decode("utf-8")
|
||||
obj = yaml.safe_load(body) or {}
|
||||
if not isinstance(obj, dict):
|
||||
raise HTTPException(status_code=400, detail="jobspec must be a YAML mapping")
|
||||
|
||||
try:
|
||||
spec = JobSpec.from_dict(obj)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=400, detail=f"invalid jobspec: {e!r}")
|
||||
|
||||
task_id = new_task_id(spec.workload)
|
||||
db.create_task(
|
||||
task_id=task_id,
|
||||
workload=spec.workload,
|
||||
jobspec_yaml=body,
|
||||
nnodes=spec.nnodes,
|
||||
n_gpus_per_node=spec.n_gpus_per_node,
|
||||
)
|
||||
return {"task_id": task_id, "state": "QUEUED"}
|
||||
|
||||
@app.get("/api/v2/tasks/{task_id}")
|
||||
async def get_task(task_id: str, req: Request) -> dict[str, Any]:
|
||||
_require_token(req)
|
||||
row = db.get_task(task_id)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="task not found")
|
||||
attempts = db.list_attempts(task_id)
|
||||
latest_attempt = attempts[-1] if attempts else None
|
||||
desired = {
|
||||
"nnodes": int(row["nnodes"]),
|
||||
"n_gpus_per_node": int(row["n_gpus_per_node"]),
|
||||
"total_gpus": int(row["nnodes"]) * int(row["n_gpus_per_node"]),
|
||||
}
|
||||
out: dict[str, Any] = {
|
||||
"task_id": row["task_id"],
|
||||
"workload": row["workload"],
|
||||
"state": row["state"],
|
||||
"created_at": row.get("created_at"),
|
||||
"updated_at": row.get("updated_at"),
|
||||
"desired_resources": desired,
|
||||
"error_summary": row.get("error_summary"),
|
||||
}
|
||||
if latest_attempt:
|
||||
out["latest_attempt"] = {
|
||||
"attempt_no": latest_attempt["attempt_no"],
|
||||
"ray_submission_id": latest_attempt["ray_submission_id"],
|
||||
"ray_status": latest_attempt.get("ray_status"),
|
||||
"start_time": latest_attempt.get("start_time"),
|
||||
"end_time": latest_attempt.get("end_time"),
|
||||
"failure_kind": latest_attempt.get("failure_kind"),
|
||||
"message": latest_attempt.get("message"),
|
||||
}
|
||||
return out
|
||||
|
||||
@app.get("/api/v2/tasks/{task_id}/attempts")
|
||||
async def get_attempts(task_id: str, req: Request) -> dict[str, Any]:
|
||||
_require_token(req)
|
||||
row = db.get_task(task_id)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="task not found")
|
||||
return {"task_id": task_id, "attempts": db.list_attempts(task_id)}
|
||||
|
||||
@app.post("/api/v2/tasks/{task_id}:cancel")
|
||||
async def cancel(task_id: str, req: Request) -> dict[str, Any]:
|
||||
_require_token(req)
|
||||
row = db.get_task(task_id)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="task not found")
|
||||
|
||||
state = str(row["state"])
|
||||
if state in ("SUCCEEDED", "FAILED", "CANCELED"):
|
||||
raise HTTPException(status_code=409, detail=f"task already terminal: {state}")
|
||||
|
||||
attempts = db.list_attempts(task_id)
|
||||
if attempts:
|
||||
ray_sid = str(attempts[-1]["ray_submission_id"])
|
||||
try:
|
||||
tool.stop(ray_sid)
|
||||
except Exception:
|
||||
pass
|
||||
# Mark attempt as canceled on the service side so that API doesn't keep reporting RUNNING.
|
||||
# Ray stop is async; we deliberately reflect the user's intent here.
|
||||
db.update_attempt(
|
||||
task_id=task_id,
|
||||
attempt_no=int(attempts[-1]["attempt_no"]),
|
||||
ray_status="STOPPED",
|
||||
failure_kind="CANCELED",
|
||||
message="Canceled by user via API (Ray stop requested).",
|
||||
end_time=_utc_now_iso(),
|
||||
)
|
||||
|
||||
db.set_task_state(task_id=task_id, state="CANCELED", event_type="CANCELED")
|
||||
return {"task_id": task_id, "state": "CANCELED"}
|
||||
|
||||
@app.get("/api/v2/tasks/{task_id}/logs")
|
||||
async def logs(task_id: str, req: Request, tail: int = 2000, attempt: str = "latest") -> Response:
|
||||
_require_token(req)
|
||||
row = db.get_task(task_id)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="task not found")
|
||||
attempts = db.list_attempts(task_id)
|
||||
if not attempts:
|
||||
raise HTTPException(status_code=404, detail="no attempts yet")
|
||||
a = attempts[-1] if attempt == "latest" else None
|
||||
if a is None:
|
||||
raise HTTPException(status_code=400, detail="only attempt=latest supported in v2.0")
|
||||
ray_sid = str(a["ray_submission_id"])
|
||||
text = tool.logs(ray_sid)
|
||||
if tail and tail > 0:
|
||||
lines = text.splitlines()
|
||||
text = "\n".join(lines[-tail:]) + ("\n" if lines else "")
|
||||
return Response(content=text, media_type="text/plain")
|
||||
|
||||
@app.get("/api/v2/queue")
|
||||
async def queue(req: Request) -> dict[str, Any]:
|
||||
_require_token(req)
|
||||
return db.list_queue()
|
||||
|
||||
return app
|
||||
68
src/mvp/v2.0/py/mvp_v2/config.py
Normal file
68
src/mvp/v2.0/py/mvp_v2/config.py
Normal file
@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class V2ApiConfig:
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8080
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class V2AuthConfig:
|
||||
token_env: str = "MVP_INTERNAL_TOKEN"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class V2SqliteConfig:
|
||||
db_path: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class V2SchedulerConfig:
|
||||
tick_s: int = 5
|
||||
retry_interval_s: int = 60
|
||||
max_running_tasks: int = 1
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class V2Config:
|
||||
api: V2ApiConfig
|
||||
auth: V2AuthConfig
|
||||
sqlite: V2SqliteConfig
|
||||
scheduler: V2SchedulerConfig
|
||||
|
||||
@staticmethod
|
||||
def from_root_dict(root: dict[str, Any]) -> "V2Config":
|
||||
v2 = root.get("v2") or {}
|
||||
if not isinstance(v2, dict):
|
||||
raise ValueError("config.v2 must be a mapping")
|
||||
|
||||
api = v2.get("api") or {}
|
||||
auth = v2.get("auth") or {}
|
||||
sqlite = v2.get("sqlite") or {}
|
||||
scheduler = v2.get("scheduler") or {}
|
||||
|
||||
if not isinstance(api, dict) or not isinstance(auth, dict) or not isinstance(sqlite, dict) or not isinstance(scheduler, dict):
|
||||
raise ValueError("config.v2.{api,auth,sqlite,scheduler} must be mappings")
|
||||
|
||||
shared_root = str(root.get("shared_root") or "/private")
|
||||
default_db_path = f"{shared_root}/common/db/mvp_v2.sqlite3"
|
||||
db_path = str(sqlite.get("db_path") or default_db_path)
|
||||
|
||||
return V2Config(
|
||||
api=V2ApiConfig(
|
||||
host=str(api.get("host") or "0.0.0.0"),
|
||||
port=int(api.get("port") or 8080),
|
||||
),
|
||||
auth=V2AuthConfig(token_env=str(auth.get("token_env") or "MVP_INTERNAL_TOKEN")),
|
||||
sqlite=V2SqliteConfig(db_path=db_path),
|
||||
scheduler=V2SchedulerConfig(
|
||||
tick_s=int(scheduler.get("tick_s") or 5),
|
||||
retry_interval_s=int(scheduler.get("retry_interval_s") or 60),
|
||||
max_running_tasks=int(scheduler.get("max_running_tasks") or 1),
|
||||
),
|
||||
)
|
||||
|
||||
254
src/mvp/v2.0/py/mvp_v2/db.py
Normal file
254
src/mvp/v2.0/py/mvp_v2/db.py
Normal file
@ -0,0 +1,254 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Iterator
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
# Keep it simple; wall-clock ordering only.
|
||||
import datetime as _dt
|
||||
|
||||
return _dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Db:
|
||||
db_path: str
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
||||
conn = sqlite3.connect(self.db_path, timeout=30, isolation_level=None)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
conn.execute("PRAGMA foreign_keys=ON;")
|
||||
return conn
|
||||
|
||||
def init(self) -> None:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
task_id TEXT PRIMARY KEY,
|
||||
workload TEXT NOT NULL,
|
||||
state TEXT NOT NULL,
|
||||
jobspec_yaml TEXT NOT NULL,
|
||||
nnodes INTEGER NOT NULL,
|
||||
n_gpus_per_node INTEGER NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
next_run_at TEXT,
|
||||
error_summary TEXT,
|
||||
latest_attempt_no INTEGER NOT NULL DEFAULT 0
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS attempts (
|
||||
task_id TEXT NOT NULL,
|
||||
attempt_no INTEGER NOT NULL,
|
||||
ray_submission_id TEXT NOT NULL UNIQUE,
|
||||
ray_status TEXT,
|
||||
failure_kind TEXT,
|
||||
message TEXT,
|
||||
start_time TEXT NOT NULL,
|
||||
end_time TEXT,
|
||||
PRIMARY KEY (task_id, attempt_no),
|
||||
FOREIGN KEY (task_id) REFERENCES tasks(task_id) ON DELETE CASCADE
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
task_id TEXT,
|
||||
ts TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload_json TEXT,
|
||||
FOREIGN KEY (task_id) REFERENCES tasks(task_id) ON DELETE CASCADE
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
@contextmanager
|
||||
def tx(self) -> Iterator[sqlite3.Connection]:
|
||||
conn = self._connect()
|
||||
try:
|
||||
conn.execute("BEGIN IMMEDIATE;")
|
||||
yield conn
|
||||
conn.execute("COMMIT;")
|
||||
except Exception:
|
||||
conn.execute("ROLLBACK;")
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def create_task(self, *, task_id: str, workload: str, jobspec_yaml: str, nnodes: int, n_gpus_per_node: int) -> dict[str, Any]:
|
||||
now = _utc_now_iso()
|
||||
with self.tx() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO tasks (task_id, workload, state, jobspec_yaml, nnodes, n_gpus_per_node, created_at, updated_at)
|
||||
VALUES (?, ?, 'QUEUED', ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(task_id, workload, jobspec_yaml, nnodes, n_gpus_per_node, now, now),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'TASK_CREATED', ?)",
|
||||
(task_id, now, None),
|
||||
)
|
||||
row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone()
|
||||
return dict(row) if row else {}
|
||||
|
||||
def get_task(self, task_id: str) -> dict[str, Any] | None:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def list_attempts(self, task_id: str) -> list[dict[str, Any]]:
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM attempts WHERE task_id = ? ORDER BY attempt_no ASC", (task_id,)
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def list_queue(self) -> dict[str, list[dict[str, Any]]]:
|
||||
with self._connect() as conn:
|
||||
pending = conn.execute(
|
||||
"""
|
||||
SELECT task_id, workload, state, nnodes, n_gpus_per_node, next_run_at, created_at, updated_at
|
||||
FROM tasks
|
||||
WHERE state IN ('QUEUED','PENDING_RESOURCES')
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 200
|
||||
"""
|
||||
).fetchall()
|
||||
running = conn.execute(
|
||||
"""
|
||||
SELECT task_id, workload, state, nnodes, n_gpus_per_node, latest_attempt_no, created_at, updated_at
|
||||
FROM tasks
|
||||
WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING')
|
||||
ORDER BY updated_at ASC
|
||||
LIMIT 200
|
||||
"""
|
||||
).fetchall()
|
||||
return {"pending": [dict(r) for r in pending], "running": [dict(r) for r in running]}
|
||||
|
||||
def count_running(self) -> int:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT COUNT(1) AS n FROM tasks WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING')"
|
||||
).fetchone()
|
||||
return int(row["n"]) if row else 0
|
||||
|
||||
def list_active_tasks(self, limit: int = 50) -> list[dict[str, Any]]:
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM tasks WHERE state IN ('SUBMITTING','SUBMITTED','RUNNING') ORDER BY updated_at ASC LIMIT ?",
|
||||
(int(limit),),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def pick_next_runnable_task(self) -> dict[str, Any] | None:
|
||||
now = _utc_now_iso()
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT *
|
||||
FROM tasks
|
||||
WHERE state IN ('QUEUED','PENDING_RESOURCES')
|
||||
AND (next_run_at IS NULL OR next_run_at <= ?)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 1
|
||||
""",
|
||||
(now,),
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def set_task_state(
|
||||
self,
|
||||
*,
|
||||
task_id: str,
|
||||
state: str,
|
||||
error_summary: str | None = None,
|
||||
next_run_at: str | None = None,
|
||||
latest_attempt_no: int | None = None,
|
||||
event_type: str = "STATE_UPDATE",
|
||||
payload_json: str | None = None,
|
||||
) -> None:
|
||||
now = _utc_now_iso()
|
||||
with self.tx() as conn:
|
||||
sets = ["state = ?", "updated_at = ?"]
|
||||
params: list[Any] = [state, now]
|
||||
if error_summary is not None:
|
||||
sets.append("error_summary = ?")
|
||||
params.append(error_summary)
|
||||
if next_run_at is not None:
|
||||
sets.append("next_run_at = ?")
|
||||
params.append(next_run_at)
|
||||
if latest_attempt_no is not None:
|
||||
sets.append("latest_attempt_no = ?")
|
||||
params.append(int(latest_attempt_no))
|
||||
params.append(task_id)
|
||||
conn.execute(f"UPDATE tasks SET {', '.join(sets)} WHERE task_id = ?", tuple(params))
|
||||
conn.execute(
|
||||
"INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, ?, ?)",
|
||||
(task_id, now, event_type, payload_json),
|
||||
)
|
||||
|
||||
def create_attempt(self, *, task_id: str, attempt_no: int, ray_submission_id: str) -> None:
|
||||
now = _utc_now_iso()
|
||||
with self.tx() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO attempts (task_id, attempt_no, ray_submission_id, ray_status, start_time)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(task_id, attempt_no, ray_submission_id, "SUBMITTING", now),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'ATTEMPT_CREATED', ?)",
|
||||
(task_id, now, None),
|
||||
)
|
||||
|
||||
def update_attempt(
|
||||
self,
|
||||
*,
|
||||
task_id: str,
|
||||
attempt_no: int,
|
||||
ray_status: str | None = None,
|
||||
failure_kind: str | None = None,
|
||||
message: str | None = None,
|
||||
end_time: str | None = None,
|
||||
) -> None:
|
||||
now = _utc_now_iso()
|
||||
with self.tx() as conn:
|
||||
sets = []
|
||||
params: list[Any] = []
|
||||
if ray_status is not None:
|
||||
sets.append("ray_status = ?")
|
||||
params.append(ray_status)
|
||||
if failure_kind is not None:
|
||||
sets.append("failure_kind = ?")
|
||||
params.append(failure_kind)
|
||||
if message is not None:
|
||||
sets.append("message = ?")
|
||||
params.append(message)
|
||||
if end_time is not None:
|
||||
sets.append("end_time = ?")
|
||||
params.append(end_time)
|
||||
if not sets:
|
||||
return
|
||||
params.extend([task_id, attempt_no])
|
||||
conn.execute(
|
||||
f"UPDATE attempts SET {', '.join(sets)} WHERE task_id = ? AND attempt_no = ?",
|
||||
tuple(params),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO events (task_id, ts, event_type, payload_json) VALUES (?, ?, 'ATTEMPT_UPDATE', ?)",
|
||||
(task_id, now, None),
|
||||
)
|
||||
15
src/mvp/v2.0/py/mvp_v2/ids.py
Normal file
15
src/mvp/v2.0/py/mvp_v2/ids.py
Normal file
@ -0,0 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def new_task_id(workload: str) -> str:
|
||||
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
suffix = secrets.token_hex(2)
|
||||
return f"mvp2-{workload}-{ts}-{suffix}"
|
||||
|
||||
|
||||
def attempt_submission_id(task_id: str, attempt_no: int) -> str:
|
||||
return f"{task_id}--a{attempt_no:02d}"
|
||||
|
||||
37
src/mvp/v2.0/py/mvp_v2/ray_resources.py
Normal file
37
src/mvp/v2.0/py/mvp_v2/ray_resources.py
Normal file
@ -0,0 +1,37 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import ray
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ClusterAvailable:
|
||||
total_available_gpus: float
|
||||
total_available_npus: float
|
||||
|
||||
|
||||
def get_cluster_available() -> ClusterAvailable:
|
||||
# Align with verl's fail-fast check which uses ray._private.state.available_resources_per_node().
|
||||
# This is a best-effort internal API and may change with Ray versions.
|
||||
try:
|
||||
import ray._private.state # type: ignore
|
||||
|
||||
per_node = ray._private.state.available_resources_per_node()
|
||||
except Exception:
|
||||
# If we cannot fetch per-node resources, conservatively return 0.
|
||||
return ClusterAvailable(total_available_gpus=0.0, total_available_npus=0.0)
|
||||
|
||||
total_gpu = 0.0
|
||||
total_npu = 0.0
|
||||
for _, info in per_node.items():
|
||||
if not isinstance(info, dict):
|
||||
continue
|
||||
total_gpu += float(info.get("GPU", 0) or 0)
|
||||
total_npu += float(info.get("NPU", 0) or 0)
|
||||
return ClusterAvailable(total_available_gpus=total_gpu, total_available_npus=total_npu)
|
||||
|
||||
|
||||
def ensure_ray_connected() -> None:
|
||||
ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False)
|
||||
|
||||
185
src/mvp/v2.0/py/mvp_v2/scheduler.py
Normal file
185
src/mvp/v2.0/py/mvp_v2/scheduler.py
Normal file
@ -0,0 +1,185 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from mvp_v11.models import JobSpec, RayConfig
|
||||
from mvp_v11.ray_job_tool import RayJobTool
|
||||
|
||||
from .config import V2Config
|
||||
from .db import Db
|
||||
from .ids import attempt_submission_id
|
||||
from .ray_resources import ensure_ray_connected, get_cluster_available
|
||||
|
||||
|
||||
_INSUFFICIENT_RE = re.compile(r"Total available GPUs\\s+\\d+\\s+is less than total desired GPUs\\s+\\d+")
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||
|
||||
|
||||
def _utc_after_s(seconds: int) -> str:
|
||||
return (datetime.utcnow() + timedelta(seconds=seconds)).replace(microsecond=0).isoformat() + "Z"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Scheduler:
|
||||
db: Db
|
||||
ray_cfg: RayConfig
|
||||
v2_cfg: V2Config
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.tool = RayJobTool(self.ray_cfg)
|
||||
|
||||
def _resources_sufficient(self, *, nnodes: int, n_gpus_per_node: int) -> bool:
|
||||
avail = get_cluster_available()
|
||||
required = float(nnodes * n_gpus_per_node)
|
||||
return avail.total_available_gpus >= required
|
||||
|
||||
def _parse_jobspec(self, jobspec_yaml: str) -> JobSpec:
|
||||
obj = yaml.safe_load(jobspec_yaml) or {}
|
||||
if not isinstance(obj, dict):
|
||||
raise ValueError("jobspec must be a YAML mapping")
|
||||
return JobSpec.from_dict(obj)
|
||||
|
||||
def _submit_one(self, task_row: dict[str, Any]) -> None:
|
||||
task_id = str(task_row["task_id"])
|
||||
jobspec_yaml = str(task_row["jobspec_yaml"])
|
||||
|
||||
spec = self._parse_jobspec(jobspec_yaml)
|
||||
attempt_no = int(task_row.get("latest_attempt_no", 0)) + 1
|
||||
ray_sid = attempt_submission_id(task_id, attempt_no)
|
||||
|
||||
# Record attempt first so that we can surface it even if submit crashes.
|
||||
self.db.create_attempt(task_id=task_id, attempt_no=attempt_no, ray_submission_id=ray_sid)
|
||||
self.db.set_task_state(task_id=task_id, state="SUBMITTING", latest_attempt_no=attempt_no)
|
||||
|
||||
# Override submission_id in jobspec (v1.1 compatible)
|
||||
d = spec.to_public_dict()
|
||||
d["submission_id"] = ray_sid
|
||||
spec2 = JobSpec.from_dict(d)
|
||||
|
||||
try:
|
||||
submitted = self.tool.submit(spec2, no_wait=True)
|
||||
# submitted should equal ray_sid; keep as source of truth.
|
||||
self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="SUBMITTED")
|
||||
self.db.set_task_state(task_id=task_id, state="SUBMITTED")
|
||||
if submitted != ray_sid:
|
||||
self.db.set_task_state(task_id=task_id, state="SUBMITTED", event_type="WARN_SUBMISSION_ID_MISMATCH")
|
||||
except Exception as e:
|
||||
msg = repr(e)
|
||||
self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="FAILED", failure_kind="UNKNOWN", message=msg, end_time=_utc_now_iso())
|
||||
self.db.set_task_state(task_id=task_id, state="FAILED", error_summary=msg)
|
||||
|
||||
def _sync_one_running(self, task_row: dict[str, Any]) -> None:
|
||||
task_id = str(task_row["task_id"])
|
||||
latest_attempt_no = int(task_row.get("latest_attempt_no", 0))
|
||||
if latest_attempt_no <= 0:
|
||||
return
|
||||
|
||||
# Look up ray_submission_id
|
||||
attempts = self.db.list_attempts(task_id)
|
||||
if not attempts:
|
||||
return
|
||||
ray_sid = str(attempts[-1]["ray_submission_id"])
|
||||
|
||||
try:
|
||||
st = self.tool.status(ray_sid)
|
||||
except Exception as e:
|
||||
# Keep current state; transient failures should not flap tasks.
|
||||
self.db.set_task_state(task_id=task_id, state=str(task_row["state"]), event_type="RAY_STATUS_ERROR", payload_json=repr(e))
|
||||
return
|
||||
|
||||
st_s = str(st)
|
||||
if st_s in ("PENDING", "RUNNING"):
|
||||
self.db.update_attempt(task_id=task_id, attempt_no=latest_attempt_no, ray_status=st_s)
|
||||
self.db.set_task_state(task_id=task_id, state=("RUNNING" if st_s == "RUNNING" else "SUBMITTED"))
|
||||
return
|
||||
|
||||
if st_s in ("SUCCEEDED",):
|
||||
self.db.update_attempt(task_id=task_id, attempt_no=latest_attempt_no, ray_status=st_s, end_time=_utc_now_iso())
|
||||
self.db.set_task_state(task_id=task_id, state="SUCCEEDED")
|
||||
return
|
||||
|
||||
if st_s in ("FAILED", "STOPPED"):
|
||||
logs = ""
|
||||
try:
|
||||
logs = self.tool.logs(ray_sid)
|
||||
except Exception:
|
||||
logs = ""
|
||||
failure_kind = "UNKNOWN"
|
||||
msg = ""
|
||||
if _INSUFFICIENT_RE.search(logs):
|
||||
failure_kind = "INSUFFICIENT_RESOURCES"
|
||||
msg = "Insufficient resources (verl fail-fast): " + (_INSUFFICIENT_RE.search(logs).group(0))
|
||||
self.db.update_attempt(
|
||||
task_id=task_id,
|
||||
attempt_no=latest_attempt_no,
|
||||
ray_status=st_s,
|
||||
failure_kind=failure_kind,
|
||||
message=msg,
|
||||
end_time=_utc_now_iso(),
|
||||
)
|
||||
self.db.set_task_state(
|
||||
task_id=task_id,
|
||||
state="PENDING_RESOURCES",
|
||||
error_summary=msg,
|
||||
next_run_at=_utc_after_s(self.v2_cfg.scheduler.retry_interval_s),
|
||||
event_type="RETRY_SCHEDULED",
|
||||
)
|
||||
return
|
||||
|
||||
msg = f"Ray job {st_s}"
|
||||
self.db.update_attempt(
|
||||
task_id=task_id,
|
||||
attempt_no=latest_attempt_no,
|
||||
ray_status=st_s,
|
||||
failure_kind=failure_kind,
|
||||
message=msg,
|
||||
end_time=_utc_now_iso(),
|
||||
)
|
||||
self.db.set_task_state(task_id=task_id, state="FAILED", error_summary=msg)
|
||||
|
||||
def tick(self) -> None:
|
||||
ensure_ray_connected()
|
||||
|
||||
# Sync active tasks
|
||||
for row in self.db.list_active_tasks(limit=50):
|
||||
self._sync_one_running(row)
|
||||
|
||||
# Submit new tasks if capacity allows
|
||||
if self.db.count_running() >= self.v2_cfg.scheduler.max_running_tasks:
|
||||
return
|
||||
|
||||
row = self.db.pick_next_runnable_task()
|
||||
if not row:
|
||||
return
|
||||
|
||||
nnodes = int(row["nnodes"])
|
||||
n_gpus_per_node = int(row["n_gpus_per_node"])
|
||||
|
||||
if not self._resources_sufficient(nnodes=nnodes, n_gpus_per_node=n_gpus_per_node):
|
||||
self.db.set_task_state(
|
||||
task_id=str(row["task_id"]),
|
||||
state="PENDING_RESOURCES",
|
||||
next_run_at=_utc_after_s(self.v2_cfg.scheduler.retry_interval_s),
|
||||
event_type="PENDING_RESOURCES",
|
||||
)
|
||||
return
|
||||
|
||||
self._submit_one(row)
|
||||
|
||||
def run_forever(self, stop_flag: Any) -> None:
|
||||
while not stop_flag.is_set():
|
||||
try:
|
||||
self.tick()
|
||||
except Exception:
|
||||
# Best-effort: don't crash the scheduler loop
|
||||
pass
|
||||
time.sleep(max(1, int(self.v2_cfg.scheduler.tick_s)))
|
||||
4
src/mvp/v2.0/py/requirements.txt
Normal file
4
src/mvp/v2.0/py/requirements.txt
Normal file
@ -0,0 +1,4 @@
|
||||
fastapi==0.115.6
|
||||
uvicorn==0.30.6
|
||||
PyYAML==6.0.2
|
||||
|
||||
33
src/mvp/v2.0/py/server.py
Normal file
33
src/mvp/v2.0/py/server.py
Normal file
@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
|
||||
import uvicorn
|
||||
|
||||
from mvp_v2.app import create_app
|
||||
from mvp_v2.config import V2Config
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--config", required=True, help="Path to v1.1 RayConfig YAML (extended with v2:)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load app and read v2.api host/port from config.
|
||||
import yaml
|
||||
|
||||
with open(args.config, "r", encoding="utf-8") as f:
|
||||
root = yaml.safe_load(f) or {}
|
||||
if not isinstance(root, dict):
|
||||
raise SystemExit("config yaml must be a mapping")
|
||||
v2 = V2Config.from_root_dict(root)
|
||||
|
||||
app = create_app(args.config)
|
||||
uvicorn.run(app, host=v2.api.host, port=v2.api.port, log_level="info")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
10
src/mvp/v2.0/scripts/12_install_v2_deps.sh
Executable file
10
src/mvp/v2.0/scripts/12_install_v2_deps.sh
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Install v2.0 API dependencies inside the head container (best-effort).
|
||||
# Assumes v1.1 containers are already up and v2.0 code is mounted/available.
|
||||
|
||||
HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}"
|
||||
|
||||
docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -U pip >/dev/null 2>&1 || true"
|
||||
docker exec -i "${HEAD_CONTAINER}" bash -lc "python3 -m pip install -r /workspace/mvp/v2/py/requirements.txt"
|
||||
28
src/mvp/v2.0/scripts/20_start_api.sh
Executable file
28
src/mvp/v2.0/scripts/20_start_api.sh
Executable file
@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
# shellcheck source=lib.sh
|
||||
source "${SCRIPT_DIR}/lib.sh"
|
||||
|
||||
CONFIG_IN_CONTAINER="${CONFIG_IN_CONTAINER:-/workspace/mvp/v1.1/py/configs/dev.yaml}"
|
||||
LOG_PATH="${LOG_PATH:-/private/common/logs/mvp_v2_api.log}"
|
||||
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
|
||||
|
||||
echo "[host] starting mvp v2 api in head container: ${HEAD_CONTAINER}"
|
||||
|
||||
dexec bash -lc "mkdir -p \"$(dirname "${LOG_PATH}")\" \"$(dirname "${PID_PATH}")\""
|
||||
|
||||
# Note: requires /workspace/mvp/v2.0/py to be present in the container (mounted or copied).
|
||||
# Escape $ so that the command substitution happens in the container, not on the host.
|
||||
dexec bash -lc "if test -f '${PID_PATH}'; then pid=\$(cat '${PID_PATH}'); if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo 'already_running'; exit 0; fi; fi"
|
||||
|
||||
if [[ -z "${MVP_INTERNAL_TOKEN:-}" ]]; then
|
||||
echo "ERROR: MVP_INTERNAL_TOKEN env var must be set on host (will be passed into container)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
docker exec -d -e MVP_INTERNAL_TOKEN="${MVP_INTERNAL_TOKEN}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/v2/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'"
|
||||
|
||||
echo "[host] started; pid stored in ${PID_PATH} (container path)"
|
||||
echo "[host] logs: ${LOG_PATH} (container path)"
|
||||
12
src/mvp/v2.0/scripts/21_stop_api.sh
Executable file
12
src/mvp/v2.0/scripts/21_stop_api.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
# shellcheck source=lib.sh
|
||||
source "${SCRIPT_DIR}/lib.sh"
|
||||
|
||||
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
|
||||
|
||||
echo "[host] stopping mvp v2 api (pid file: ${PID_PATH})"
|
||||
|
||||
dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then kill \"\${pid}\"; fi; rm -f '${PID_PATH}'; echo 'stopped'"
|
||||
10
src/mvp/v2.0/scripts/22_status_api.sh
Executable file
10
src/mvp/v2.0/scripts/22_status_api.sh
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
# shellcheck source=lib.sh
|
||||
source "${SCRIPT_DIR}/lib.sh"
|
||||
|
||||
PID_PATH="${PID_PATH:-/private/common/run/mvp_v2_api.pid}"
|
||||
|
||||
dexec bash -lc "if ! test -f '${PID_PATH}'; then echo 'not_running'; exit 0; fi; pid=\"\$(cat '${PID_PATH}')\"; if kill -0 \"\${pid}\" >/dev/null 2>&1; then echo \"running pid=\${pid}\"; else echo \"stale pid=\${pid}\"; fi"
|
||||
12
src/mvp/v2.0/scripts/lib.sh
Executable file
12
src/mvp/v2.0/scripts/lib.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# v2.0 scripts are intended to run on the host and control the existing Ray containers
|
||||
# (same topology as v1.1). Adjust container names via env vars if needed.
|
||||
|
||||
HEAD_CONTAINER="${HEAD_CONTAINER:-mvp11-ray-head}"
|
||||
|
||||
dexec() {
|
||||
docker exec -i "${HEAD_CONTAINER}" "$@"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user