V3.5 支持Advanced模式,用户提供python命令

This commit is contained in:
yuyr 2026-01-04 15:28:41 +08:00
parent e3dcfe526f
commit 5d0d849c28
22 changed files with 1355 additions and 34 deletions

7
specs/mvp/v3.5/README.md Normal file
View File

@ -0,0 +1,7 @@
# MVP v3.5
本目录包含 v3.5 的需求与设计(精简版):
- `requirement.md`:需求补充说明(来源于讨论)
- `roadmap_v3.5.png`架构草图Advanced Task + Resume + IB + Serving
- `v3.5_design.md`:详细设计方案(基于 v3.0;当前迭代仅聚焦 Advanced TaskSpec + Custom RewardServing/IB/Resume/多版本 verl 暂缓)

3
specs/mvp/v3.5/note.md Normal file
View File

@ -0,0 +1,3 @@
1. node managementv3.5 引入的接口骨架:通过 SSH/平台能力管理 head/worker 节点生命周期;先做最小可用 --- 这个是干嘛的?
2.

View File

@ -0,0 +1,40 @@
v3.5 版本是在v3.0的基础上进行功能扩展:
1. 支持自定义命令不走固定的TaskSpec模板用户直接提供调用verl 的python命令如下这个灵活度更高需要用户自己把握文件路径用户使用 $HOME服务层替换为用户自己的/private/users/<user>/路径,使用$COMMON 则替换为/private/
```
PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \
data.train_files=$HOME/data/gsm8k/train.parquet \
data.val_files=$HOME/data/gsm8k/test.parquet \
data.train_batch_size=256 \
data.max_prompt_length=512 \
data.max_response_length=512 \
actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=64 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
critic.optim.lr=1e-5 \
critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.kl_ctrl.kl_coef=0.001 \
trainer.logger=console \
trainer.val_before_train=False \
trainer.n_gpus_per_node=1 \
trainer.nnodes=1 \
trainer.save_freq=10 \
trainer.test_freq=10 \
trainer.total_epochs=15
```
2. 支持自定义的奖励函数方法,你参考 verl 项目 [text](../../../verl) 里的示例,设计方案
3. 支持codepath指定用户上传到自己user路径下的 verl版本代码
4. 断点续训支持某个已经complete成功或者fail或者stopped的任务task从最后一个保存的checkpoint 继续训练参数应该保持不变你确认一下是不是对应一个新的ray job或者分析一下verl 是否已经有类似的功能支持。
5. 支持训练走NCCL使用RoCEv2和Infiband网络调研一些verl怎样支持需要哪些配置。

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

View File

@ -0,0 +1,366 @@
# MVP v3.5 详细设计方案(进一步精简版,基于 v3.0
> 背景v3.0 已具备 WebUI + API server + 用户/任务隔离 + SFTPGo 数据管理 + Stateless Ray clusterhead + worker node pool
>
> v3.5 本轮 **只做 2 件事**
> 1) Advanced Task支持用户提交自定义训练命令command
> 2) Custom Reward支持用户通过 VERL 原生 `custom_reward_function.*` 方式注入 reward仅方式 A用户自己写命令
>
> 明确不做(从上一版设计中移除):(3) 自定义 verl 版本/代码路径、(4) 断点续训、(5) IB/RoCEv2 网络支持、(6) Model Serving。
---
## 0. 继承 v3.0 的不变点(重要约束)
1) **Node management 不变**
- v3.5 不新增/不修改 node management 机制;仍按 v3.0 现状运行head 写 discovery、worker watchdog 自动 join、自愈
2) **Head 不跑训练**
- 所有训练/Serving driver 通过 Ray entrypoint placement 强制落在 worker例如 `entrypoint_resources={"worker_node": 1}`)。
3) **SFTPGo 的 “common” 目录约定变更**
- 不再使用 `$COMMON` 宏。
- 在 SFTPGo 中,把共享只读资源映射到用户 home 下的固定目录(用户在 SFTP/WebClient 看到的是 `$HOME/common/...`
- `$HOME/common/datasets` → 容器内真实路径 `/private/datasets`(只读)
- `$HOME/common/hf` → 容器内真实路径 `/private/hf`(只读)
> 这里的 `$HOME` 指:`/private/users/<user_id>`(容器内路径)。
---
## 1. v3.5 需求范围(精简后)
### 1.1 In scope
**A. Advanced TaskSpec自定义命令**
- 用户提交 `command`(多行 shell 或单行)
- 平台做 `$HOME` 宏替换
- 平台做 best-effort 安全检查(路径/关键参数),然后提交为 Ray job
**B. Custom Reward仅方式 A**
- 用户在 `command` 里显式写 hydra overrides
- `custom_reward_function.path=...`
- `custom_reward_function.name=...`
- `custom_reward_function.reward_kwargs.*=...`(可选)
- 平台不提供结构化 reward 字段(不做方式 B只做检查校验 path 合法)
### 1.2 Out of scope本轮不做
- 自定义 verl 版本/代码路径(仍使用平台内置/公共 verl 代码快照)
- 断点续训resume from checkpoint
- IB/RoCEv2 网络专门支持NCCL/RDMA env 先不引入平台)
- Model Serving暂缓后续单独设计迭代
---
## 2. Advanced TaskSpec 设计
### 2.1 为什么需要 Advanced Task
v3.0 的 Basic TaskSpecppo/grpo/sft通过平台模板生成固定 overrides适合“快速跑通”。
但科研/调参场景需要更高自由度:用户希望直接写 `python3 -m verl.trainer.main_ppo ...` 并自行控制每个 override。
### 2.2 Advanced TaskSpec建议 schema
建议新增一种 TaskSpec 类型,通过 `kind: advanced` 区分:
```yaml
kind: advanced
# 资源(平台调度与预检查用;仍需要)
nnodes: 2
n_gpus_per_node: 4
# 自定义命令(用户负责写对 VERL 的参数/路径)
# 平台会对 $HOME 做宏替换;其余保持原样
command: |
PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \
data.train_files=$HOME/datasets/gsm8k/train.parquet \
data.val_files=$HOME/datasets/gsm8k/test.parquet \
actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \
trainer.nnodes=2 \
trainer.n_gpus_per_node=4 \
trainer.total_epochs=1 \
trainer.save_freq=10 \
+ray_kwargs.ray_init.address=auto
```
### 2.3 `$HOME` 宏替换规则
仅支持 `$HOME`v3.5 移除 `$COMMON`
- `$HOME``/private/users/<user_id>`
用户如果要用共享数据/缓存:
- 共享数据:`$HOME/common/datasets/...`
- 共享 HF 缓存:`$HOME/common/hf/...`(通常不需要写进 command但可用于 debug
#### 2.3.1 重要说明SFTPGo “virtual folder” 与训练进程看到的“真实路径”
在 SFTPGo 中,`$HOME/common/datasets` / `$HOME/common/hf`**SFTP 虚拟目录映射**virtual folder它们映射到容器内真实路径
- `$HOME/common/datasets``/private/datasets`
- `$HOME/common/hf``/private/hf`
训练进程Ray worker 上的 python 进程)看到的是 **容器内真实文件系统**,它并不会理解 SFTPGo 的 virtual folder。
因此,为了让用户能沿用 WebClient 里看到的路径语义(写 `$HOME/common/...`),服务层在提交 Advanced command 前需要做 **路径宏映射**
- `"$HOME/common/datasets"``"/private/datasets"`
- `"$HOME/common/hf"``"/private/hf"`
- 其余 `"$HOME"``"/private/users/<user_id>"`
这样用户写的 command 能在训练进程里正确读到文件。
### 2.4 服务层检查best-effort强约束 + 弱约束)
> 目标:在不“解析完整 shell”的前提下尽可能避免跨用户读文件与明显错误的任务。
**强约束(必须通过,否则 400**
1) `nnodes``n_gpus_per_node` 必须存在(用于队列/资源预检查/placement
2) `command` 必须包含一个明确的 python entry
- 建议最低要求:包含 `python3` 且包含 `-m verl.trainer.`(防止随意执行系统命令)
3) 路径隔离校验(字符串/正则级别):
- 展开 `$HOME`(含 `$HOME/common/*` 映射到 `/private/*`)后:
- 禁止出现 `/private/users/` 下 “非当前用户”的路径(例如 `/private/users/bob/...`
- 对 `data.train_files=...``data.val_files=...`(若出现)做 allowlist
- 允许(用户目录):`/private/users/<me>/datasets/...`
- 允许(共享目录):`/private/datasets/...`
- 对 `custom_reward_function.path=...`(若出现)做 allowlist
- 允许:`/private/users/<me>/code/...`(用户自行上传)
**弱约束warning不阻塞**
- 未检测到 `data.train_files=`/`data.val_files=`(可能是用户写成了别的 key 或使用了 config file
- 未检测到 `+ray_kwargs.ray_init.address=auto`v3.0/v3.5 推荐加,但用户可自行负责)
> 说明Advanced command 本质上属于“内部可信用户”能力v3.5 不做强沙箱;安全检查以 best-effort 为主。
---
## 3. Custom Reward仅方式 A用户自己写
### 3.1 VERL 原生机制(本仓库 `verl/` 已调研)
VERL PPO trainer 配置里支持:
- `custom_reward_function.path`
- `custom_reward_function.name`
- `custom_reward_function.reward_kwargs`
对应实现位置:
- 配置模板:`verl/verl/trainer/config/ppo_trainer.yaml`
- 加载逻辑:`verl/verl/trainer/ppo/reward.py:get_custom_reward_fn`
- 典型 reward manager`verl/verl/workers/reward_manager/naive.py` 会调用 `compute_score(...)`
### 3.2 用户写法(示例)
用户上传 `$HOME/code/reward.py`,在 command 里加:
```bash
custom_reward_function.path=$HOME/code/reward.py \
custom_reward_function.name=compute_score
```
函数签名建议(与 `naive` reward manager 参数对齐):
```python
def compute_score(*, data_source: str, solution_str: str, ground_truth: str, extra_info=None, **kwargs):
...
```
### 3.3 平台侧只做检查(不做字段扩展)
v3.5 限定 reward 注入方式为 “用户写 command”平台只做
- 展开 `$HOME`
- 若检测到 `custom_reward_function.path=`,校验 path 在 `$HOME/code/`
- 不尝试解析/合并 reward_kwargs用户自己写
---
## 4. 服务层与 SFTPGo 的映射修改(你提出的关键点)
v3.0 时代平台允许用户引用:
- `/private/common/datasets/...`
- `/private/common/hf/...`
但现在 common 以 **SFTPGo virtual folder** 的形式呈现给用户(用户看到 `$HOME/common/...`,真实路径是 `/private/...`),因此 v3.5 的服务层需要做两件事:
1) **用户侧语义(写 TaskSpec/command**
- 共享 datasets只读`$HOME/common/datasets/...`
- 共享 hf cache只读`$HOME/common/hf/...`
2) **运行时真实路径(提交到 Ray 前展开)**
- `$HOME/common/datasets/...``/private/datasets/...`
- `$HOME/common/hf/...``/private/hf/...`
同时保留用户自有目录:
- 用户 datasets`$HOME/datasets/...`
- 用户 models`$HOME/models/...`
- 用户 codereward`$HOME/code/...`
> 这部分主要影响:
> - Advanced command 检查allowlist
> - WebUI/Data 页面文案(告诉用户共享数据在哪里)
> 兼容性建议:为了不影响 v3.0 期间已经习惯使用 `/private/common/datasets/...` 的用户/历史任务,
> v3.5 实现阶段建议 **同时接受**
> - `/private/common/datasets/...`(旧路径语义,仍可读)
> - `/private/datasets/...`(真实路径语义,推荐)
> - Advanced command 里写的 `$HOME/common/datasets/...` 会先映射到 `/private/datasets/...`
---
## 5. 验收标准(精简版)
### 5.1 Advanced command
- 提交一个 Advanced PPO commandtrain/val 使用 `$HOME/common/datasets/...``$HOME/datasets/...`
- 确认:
- 任务从 QUEUED → SUBMITTED/RUNNING
- driver 在 worker 上head 不跑训练)
- 训练能正常跑至少若干 step
### 5.2 Custom reward方式 A
- 用户上传 `$HOME/code/reward.py`
- 在 command 中设置 `custom_reward_function.path=$HOME/code/reward.py`
- 确认训练日志出现 `using customized reward function ...`
---
## 6. 待确认问题(需要你拍板/补充)
1) Advanced command 的“强约束”是否需要更严格?
- 目前建议要求包含 `python3 -m verl.trainer.`,否则拒绝。
- 你是否允许用户跑非 verl 的命令(例如自定义评估脚本)?
2) `$HOME/common/datasets``$HOME/common/hf` 两个映射目录在平台侧是否需要“强制只读”语义?
- 例如TaskSpec 校验允许读取但禁止写入(目前设计是 best-effort 字符串级校验)。
---
## 7. 基于现有源码的改动点分析(实现清单)
本节按当前 v3.0 已上线的源码结构(`src/mvp/py/argus/...`)逐文件列出 v3.5 需要的具体改动点,并评估对现有能力的影响面。
### 7.1 TaskSpec/模型层(解析与兼容)
**现状**
- Basic TaskSpec 由 `argus.ray.models.JobSpec.from_dict()` 解析:`src/mvp/py/argus/ray/models.py`
- API `/api/v2/tasks` 直接 `JobSpec.from_dict(obj)`,并基于字段做路径校验:`src/mvp/py/argus/service/app.py`
- Scheduler 同样假定 jobspec_yaml 能解析为 `JobSpec``src/mvp/py/argus/service/scheduler.py`
**v3.5 需要新增**
1) 新增 `AdvancedTaskSpec` 数据结构(建议放在 `src/mvp/py/argus/ray/models.py`
- 必填:`kind: advanced``workload`(建议仍要求 ppo/grpo/sft用于 task_id 命名与 UI 分类)、`nnodes``n_gpus_per_node``command`
- 可选:`submission_id`(由服务层 override
2) 新增 “union 解析”:
- 新增 `parse_taskspec(obj: dict) -> Basic(JobSpec) | Advanced(AdvancedTaskSpec)`
- 兼容策略:如果没有 `kind` 字段,则 **默认按 v3.0 Basic JobSpec 解析**(保证老客户端无感)。
### 7.2 Builder 层(把 TaskSpec 转为可执行 argv
**现状**
- `src/mvp/py/argus/ray/builders.py:build_training_argv(spec: JobSpec, ...)` 只支持模板化 PPO/GRPO/SFT。
**v3.5 需要新增**
1) 新增 `build_advanced_argv(command: str) -> list[str]`
- 推荐实现:返回 `["bash", "-lc", "<expanded_command>"]`
- 原因:用户 command 允许 `ENV=... python3 ... \` 多行以及 shell 语法,`bash -lc` 兼容性最好。
2) Driver entrypoint 复用:
- 仍通过 `argus.ray.driver_entrypoint` 执行(统一 job_dir、日志与退出码
### 7.3 RayJobTool 层runtime_env 与提交)
**现状**
- `src/mvp/py/argus/ray/ray_job_tool.py:RayJobTool.submit(spec: JobSpec, ...)`
- runtime_env 的 `PYTHONPATH``spec.code_path` 决定
- entrypoint 固定为 driver_entrypoint + builder 生成 argv
**v3.5 需要新增**
1) 扩展 submit 支持 AdvancedTaskSpec
- 方案 A最小侵入新增 `submit_advanced(...)` 方法,参数为 `command` + `job_dir` + `submission_id` + `nnodes/n_gpus...`
- 方案 B统一接口新增内部抽象 `SubmitPlan`(包含 `runtime_env` + `entrypoint` + `artifacts`Basic/Advanced 都生成 plan再走同一 submit 逻辑。
2) runtime_env 的 code path
- 因 v3.5 本轮不做“自定义 verl code_path”建议仍固定使用公共快照例如 `/private/common/code/verl/verl_repo`)。
- 为减少散落常量,建议在 config 增加 `ray.verl_code_path`(或 `service.verl_code_path`RayJobTool 统一读取。
3) runtime_env 的用户代码目录(可选增强):
- VERL 的自定义 reward 函数是通过 `custom_reward_function.path` 以“文件路径”动态 import 的,理论上不依赖 `PYTHONPATH`
- 但用户的 `reward.py` 可能会 `import` 自己目录下的其他模块;为了提升易用性,可将
`/private/users/<user>/code` 追加到 job 的 `PYTHONPATH`
- 这需要 RayJobTool.submit/submit_advanced 能感知 `user_id`(由 Scheduler 传入),属于小改动但要注意兼容性。
### 7.4 API Server提交校验、宏替换、spec 展示)
**现状**
- `POST /api/v2/tasks`:只支持 Basic JobSpec 且强校验 `code_path/train_file/val_file/model_id` 前缀:`src/mvp/py/argus/service/app.py`
- `/api/v2/tasks/{task_id}/spec`:返回 resolved 的 Basic JobSpec补默认值/补 submission_id`src/mvp/py/argus/service/app.py`
**v3.5 需要新增/修改**
1) `POST /api/v2/tasks` 分流:
- `kind != advanced`:走原 Basic 流程(兼容 v3.0
- `kind == advanced`:走 Advanced 解析 + 校验
2) Advanced command 宏替换与映射(核心):
- 实现 `expand_command(user_id, command)`
- 先把 `$HOME/common/datasets``/private/datasets`
- 再把 `$HOME/common/hf``/private/hf`
- 再把其余 `$HOME``/private/users/<user>`
- 校验使用 “展开后的 command”
3) reward 注入检查(仅方式 A
- 若发现 `custom_reward_function.path=...`
- 校验展开后的 path 前缀必须是 `/private/users/<me>/code/`
4) `/api/v2/tasks/{task_id}/spec`
- 需要支持返回 AdvancedTaskSpec 的 resolved 版本:
- 展示时可选择“原始 command”`$HOME`)或“展开后的 command”建议都展示raw + expanded
### 7.5 Scheduler队列与提交
**现状**
- `src/mvp/py/argus/service/scheduler.py` 假定 jobspec_yaml 一定是 Basic JobSpec并调用 `tool.submit(spec2, ...)`
**v3.5 需要新增**
1) Scheduler 的 `_parse_jobspec` 替换为 `parse_taskspec`(支持 Basic/Advanced
2) `_submit_one` 根据 spec 类型调用:
- Basic保持现状 `tool.submit(JobSpec, ...)`
- Advanced调用 `tool.submit_advanced(...)`(或统一 SubmitPlan
### 7.6 WebUI最小改动
**现状**
- `src/mvp/py/argus/service/ui.py` 的 New Task 页面只提供 Basic YAML 模板。
**v3.5 需要新增**
- 增加 “Advanced Task” 模板按钮:
- `kind: advanced`
- `workload: ppo|grpo|sft`(用于 UI 分类与 task_id
- `nnodes/n_gpus_per_node`
- `command: | ...`(带中文注释)
- Data 页面文案更新:
- 明确共享目录在 `$HOME/common/datasets``$HOME/common/hf`(并解释会映射到 `/private/datasets``/private/hf`
---
## 8. 对现有功能的兼容性影响评估
### 8.1 API/TaskSpec 兼容
- 兼容策略:**没有 `kind` 字段的 YAML 一律按 v3.0 Basic JobSpec 解析**。
- 现有脚本/客户端(提交 ppo/grpo/sft 的 YAML无需修改。
- AdvancedTaskSpec 是新增能力,不影响既有任务状态机/DB。
### 8.2 路径策略变更的影响
风险点v3.0 的 Basic 任务/模板大量使用 `/private/common/datasets/...`
建议:
- v3.5 实现阶段先保持 “双栈兼容”:
- Basic 继续接受 `/private/common/datasets/...`(旧)
- 同时接受 `/private/datasets/...`(新/真实路径)
- Advanced command 允许用户写 `$HOME/common/datasets/...`,服务层展开为 `/private/datasets/...`(避免虚拟目录不可见问题)。
### 8.3 任务执行/调度兼容
- Scheduler 队列/并发控制(`max_running_tasks`)保持不变。
- 资源预检查仍只依赖 `nnodes/n_gpus_per_node`AdvancedTaskSpec 不改变资源模型。
### 8.4 安全边界变化
- Advanced command 引入后,平台从“结构化参数”变成“执行用户命令”,安全边界变宽。
- 缓解措施best-effort
- 强约束要求命令包含 `python3 -m verl.trainer.`
- 基础路径隔离校验(禁止跨用户路径)
- reward 文件路径限制在 `$HOME/code`
### 8.5 数据库兼容
- DB schema 不强制变更:仍复用 `tasks.jobspec_yaml` 存储原始 YAML。
- 若后续需要更强查询/过滤,再考虑增加 `tasks.kind` 字段(可选增量迁移)。

View File

@ -0,0 +1,200 @@
# MVP v3.5精简版开发计划TDD
> 目标:在 v3.0 已有能力基础上,仅新增两项能力:
> 1) **Advanced TaskSpec自定义 command**
> 2) **Custom Reward方式 A用户自己在 command 里写 `custom_reward_function.*`**
>
> 设计依据:`specs/mvp/v3.5/v3.5_design.md`(本计划不再扩展 scope
---
## 0. 范围与约束
### 0.1 In scope
- 新增 `kind: advanced` 的 TaskSpec用户提供 `command`,平台做 `$HOME` 宏替换与 best-effort 校验,再提交 Ray Job。
- Custom Reward平台仅做 **reward path 校验**(方式 A不新增结构化字段。
- `$HOME/common/*` 路径语义支持(关键):用户在 SFTPGo/WebClient 看到的路径能被训练进程正确读取。
### 0.2 Out of scope本轮不做
- 自定义 verl 版本/代码路径(多版本共存)
- 断点续训resume from checkpoint
- IB/RoCEv2/NCCL 专项支持
- Model Serving
- Node management 改造v3.0 的 stateless head/worker/watchdog/supervisor 机制保持不变)
### 0.3 关键路径映射(必须保持一致)
> 说明SFTPGo 的 `$HOME/common/...`**virtual folder**,训练进程看不到该虚拟路径。
提交 Advanced command 前必须展开/映射:
- `$HOME/common/datasets``/private/datasets`(只读语义)
- `$HOME/common/hf``/private/hf`(只读语义)
- 其余 `$HOME``/private/users/<user_id>`
并且为兼容历史用法v3.0
- Basic TaskSpec 仍接受 `/private/common/datasets/...``/private/common/hf/...`(不强制迁移)。
---
## 1. 测试策略TDD
### 1.1 单元测试优先级
1) **解析与兼容**`kind: advanced` 能解析;无 `kind` 仍按 Basic 解析,旧用法不破坏。
2) **宏替换正确性**`$HOME` / `$HOME/common/*` 映射严格按约定展开。
3) **best-effort 校验**:拒绝明显危险/跨用户路径;对 reward path 做 allowlist。
4) **提交链路**Scheduler 能识别 Advanced spec 并调用对应的提交方法,确保 submission_id/目录规范不变。
5) **WebUI/API**New Task 模板与 `/spec` 展示完整 resolved spec包含展开后的 command
### 1.2 本地运行方式
- 复用已有 `.venv`,执行:`.venv/bin/python -m pytest`
- 若环境没有 pip使用 uv 的方式参考 v3.0 约定(不在本计划重复)。
---
## 2. 里程碑划分(每个里程碑可独立验证)
> 约定:每个里程碑先写测试(失败),再实现代码使测试通过;里程碑结束跑一遍 `pytest`
### M1 — TaskSpec 模型与解析(兼容优先)
**目标**
- 引入 AdvancedTaskSpec 数据结构与 union parser同时保证 v3.0 Basic 行为不变。
**新增/修改(建议位置)**
- `src/mvp/py/argus/ray/models.py`
- 新增 `AdvancedTaskSpec`
- 新增 `parse_taskspec(obj: dict) -> JobSpec | AdvancedTaskSpec`
- 兼容策略:缺省 `kind` → 走 `JobSpec.from_dict`
**测试(先写)**
- `src/mvp/py/tests/test_models.py`
- `test_parse_taskspec_basic_no_kind_compat()`
- `test_parse_taskspec_advanced_smoke()`
- `test_parse_taskspec_advanced_requires_command_nnodes_gpus()`
**验收**
- `pytest -q` 通过;旧测试不修改或仅做最小必要更新。
---
### M2 — Advanced command 展开与校验(核心能力)
**目标**
- 实现 command 展开(含 `$HOME/common/*` 映射)与 best-effort 强约束校验。
**实现点(建议新增模块)**
- `src/mvp/py/argus/service/command_expand.py`(或放在 `argus/service/validation.py`
- `expand_advanced_command(user_id: str, command: str) -> str`
- `validate_advanced_command(user_id: str, expanded_command: str) -> None`(失败抛 `ValueError`
**强约束(与设计文档一致)**
- 必须包含 `python3` 且包含 `-m verl.trainer.`(否则 400
- 禁止出现 `/private/users/<other>/...`(跨用户路径)
- 若检测到 `data.train_files=`/`data.val_files=`
- 只允许 `/private/users/<me>/datasets/...``/private/datasets/...`
- (兼容)允许 `/private/common/datasets/...`(旧路径)
- 若检测到 `custom_reward_function.path=`
- 只允许 `/private/users/<me>/code/...`(展开后校验)
**测试(先写)**
- 新增:`src/mvp/py/tests/test_advanced_command.py`
- `test_expand_maps_home_common_datasets_to_private_datasets()`
- `test_expand_maps_home_common_hf_to_private_hf()`
- `test_expand_maps_home_to_private_users()`
- `test_validate_rejects_cross_user_paths()`
- `test_validate_requires_verl_trainer_entry()`
- `test_validate_allows_reward_path_under_user_code()`
- `test_validate_rejects_reward_path_outside_user_code()`
**验收**
- 单测覆盖映射/校验的正反例;错误信息可读(用于 API 400 detail
---
### M3 — Ray 提交链路支持 AdvancedBuilder/Tool/Scheduler
**目标**
- Advanced spec 能进入 scheduler 队列并提交为 Ray jobdriver 仍落 worker
**代码改动点(建议)**
- `src/mvp/py/argus/ray/builders.py`
- 新增 `build_advanced_argv(command: str)`:返回 `["bash","-lc", expanded_command]`
- `src/mvp/py/argus/ray/ray_job_tool.py`
- 新增 `submit_advanced(...)`(或统一成内部 submit plan
- runtime_env继续注入公共 verl code path本轮不支持用户自定义 verl 代码)
- 可选:把 `/private/users/<user>/code` 加入 `PYTHONPATH`,提升 reward 代码 `import` 体验
- `src/mvp/py/argus/service/scheduler.py`
- 使用 `parse_taskspec` 分流 Basic/Advanced
- Advanced 调用 `tool.submit_advanced(...)`
**测试(先写)**
- `src/mvp/py/tests/test_builders.py`
- `test_build_advanced_argv_uses_bash_lc()`
- `src/mvp/py/tests/test_scheduler.py`
- 新增一个 `kind: advanced` 的任务,断言 scheduler 调用了 `submit_advanced`
- 断言 job_dir/submission_id 规则不变(仍按 `/private/users/<user>/jobs/<sid>`
- `src/mvp/py/tests/test_ray_job_tool.py`
- 断言 advanced 提交时 entrypoint 是 driver_entrypoint + `bash -lc ...`
**验收**
- 单测跑通Scheduler tick 能完成 Advanced 任务从 QUEUED → SUBMITTEDmock Ray
---
### M4 — API & WebUI最小功能闭环
**目标**
- WebUI/HTTP API 能提交 Advanced Task并在详情页看到 resolved spec含完整 command
**API 改动点**
- `src/mvp/py/argus/service/app.py`
- `POST /api/v2/tasks`:支持 `kind: advanced`
- 保存 raw YAML保持与 Basic 一致)
- 对 Advanced展开 command + 校验(失败返回 400
- `GET /api/v2/tasks/{task_id}/spec`
- 返回 resolved spec建议同时返回 raw + expanded或 YAML 中直接给 expanded
**WebUI 改动点**
- `src/mvp/py/argus/service/ui.py`
- New Task 页面新增 Advanced 模板(含中文注释)
- 文案强调共享目录:`$HOME/common/datasets``$HOME/common/hf`
**测试(先写)**
- `src/mvp/py/tests/test_app.py`
- `test_create_task_advanced_ok()`(最小 valid command
- `test_create_task_advanced_rejects_invalid_command()`
- `test_task_spec_endpoint_includes_expanded_command()`
- `src/mvp/py/tests/test_ui.py`
- 断言页面包含 Advanced 示例块
**验收**
- `pytest` 通过;浏览器可提交 Advanced YAML 并看到 expanded command。
---
### M5 — 端到端验证(远端 argus@h1
**目标**
- 在真实 Ray cluster + VERL 环境下验证 Advanced 与 Custom Reward方式 A
**步骤(手工验收脚本化可选)**
1) 启动 v3.0/v3.5 统一的 compose + API沿用现有 `run_all` 脚本体系)
2) 用户(如 `alice`)通过 SFTP 上传 reward 代码到:
- `$HOME/code/reward.py`(真实路径 `/private/users/alice/code/reward.py`
3) 通过 WebUI 或 curl 提交 Advanced task
- `command` 中包含:
- `custom_reward_function.path=$HOME/code/reward.py`
- `custom_reward_function.name=compute_score`
- `data.train_files=$HOME/common/datasets/gsm8k/train.parquet`
- `data.val_files=$HOME/common/datasets/gsm8k/test.parquet`
4) 检查:
- 任务状态从 QUEUED → RUNNING → SUCCEEDED/FAILED有日志
- driver 不在 head 上跑dashboard 验证)
- 日志出现 “custom reward” 生效的提示(按 VERL 实际日志关键字确认)
5) 回归:提交 Basic ppo/grpo/sft 任务仍可运行(确保兼容性)
**验收**
- Advanced task 能跑至少若干 step且 reward 注入生效。
- Basic 任务兼容不回退。
---
## 3. 风险点与边界(明确写进 PR/变更说明)
- Advanced command 只做 best-effort 校验,不做完整 shell AST 解析;复杂命令可能存在漏检/误判(后续可扩展)。
- `$HOME/common/*` 是“用户侧语义”,服务层必须映射到真实路径,否则训练必然 FileNotFound。
- 校验策略(强约束)如果后续要允许非 VERL 命令,需要调整规则并补测试(本轮默认拒绝)。

View File

@ -97,3 +97,12 @@ def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> Buil
return BuiltCommand(argv=argv)
raise ValueError(f"unsupported workload: {spec.workload}")
def build_advanced_argv(command: str) -> BuiltCommand:
"""
Execute an arbitrary user-provided command under bash for maximum shell compatibility
(multiline, env vars, line continuations, etc.).
"""
return BuiltCommand(argv=["bash", "-lc", command])

View File

@ -35,6 +35,8 @@ def main() -> int:
job_dir = Path(args.job_dir)
job_dir.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("MVP_JOB_DIR", str(job_dir))
os.chdir(job_dir)
_preflight()

View File

@ -18,6 +18,7 @@ class RayConfig:
entrypoint_resources: dict[str, float]
runtime_env_env_vars: dict[str, str]
user_code_path: str
verl_code_path: str
@staticmethod
def from_dict(d: dict[str, Any]) -> "RayConfig":
@ -42,6 +43,7 @@ class RayConfig:
entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()},
runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()},
user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")),
verl_code_path=str(d.get("verl_code_path", f"{_require(d, 'shared_root')}/common/code/verl/verl_repo")),
)
def to_public_dict(self) -> dict[str, Any]:
@ -52,6 +54,7 @@ class RayConfig:
"entrypoint_resources": self.entrypoint_resources,
"runtime_env": {"env_vars": self.runtime_env_env_vars},
"user_code_path": self.user_code_path,
"verl_code_path": self.verl_code_path,
}
@ -126,3 +129,55 @@ class JobSpec:
if self.workload == "sft":
out["trainer_device"] = self.trainer_device or "cpu"
return out
@dataclass(frozen=True)
class AdvancedTaskSpec:
kind: str # advanced
workload: str # always "advanced" for classification/ID prefix
submission_id: str | None
nnodes: int
n_gpus_per_node: int
command: str
@staticmethod
def from_dict(d: dict[str, Any]) -> "AdvancedTaskSpec":
kind = str(_require(d, "kind"))
if kind != "advanced":
raise ValueError(f"unsupported taskspec kind: {kind}")
command = str(_require(d, "command"))
return AdvancedTaskSpec(
kind=kind,
workload="advanced",
submission_id=(str(d["submission_id"]) if d.get("submission_id") else None),
nnodes=int(_require(d, "nnodes")),
n_gpus_per_node=int(_require(d, "n_gpus_per_node")),
command=command,
)
def to_public_dict(self) -> dict[str, Any]:
return {
"kind": self.kind,
"workload": self.workload,
"submission_id": self.submission_id or "",
"nnodes": self.nnodes,
"n_gpus_per_node": self.n_gpus_per_node,
"command": self.command,
}
def parse_taskspec(d: dict[str, Any]) -> JobSpec | AdvancedTaskSpec:
"""
v3.x compatibility rule:
- If no "kind": treat as legacy/basic JobSpec.
- If kind == "advanced": parse AdvancedTaskSpec.
"""
kind = d.get("kind")
if kind in (None, ""):
return JobSpec.from_dict(d)
if kind == "advanced":
return AdvancedTaskSpec.from_dict(d)
raise ValueError(f"unsupported taskspec kind: {kind}")

View File

@ -10,8 +10,8 @@ from typing import Any
import ray
from ray.job_submission import JobSubmissionClient
from .builders import build_training_argv
from .models import JobSpec, RayConfig
from .builders import build_advanced_argv, build_training_argv
from .models import AdvancedTaskSpec, JobSpec, RayConfig
from .yaml_io import dump_yaml
@ -45,6 +45,9 @@ class RayJobTool:
return f"{self.cfg.shared_root}/jobs/{submission_id}"
def _runtime_env(self, spec: JobSpec) -> dict[str, Any]:
return self._runtime_env_for(code_path=spec.code_path, workload=spec.workload)
def _runtime_env_for(self, *, code_path: str, workload: str, extra_pythonpath: list[str] | None = None) -> dict[str, Any]:
env_vars = dict(self.cfg.runtime_env_env_vars)
# Default HF cache
@ -57,18 +60,18 @@ class RayJobTool:
# Place it before verl code to avoid interfering with verl import priority.
tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/py")
user_code_path = self.cfg.user_code_path
code_path = spec.code_path
existing = env_vars.get("PYTHONPATH", "")
prefix = f"{tool_code_path}:{code_path}:{user_code_path}"
base_parts = [tool_code_path, code_path, self.cfg.user_code_path]
if extra_pythonpath:
base_parts.extend(extra_pythonpath)
prefix = ":".join(base_parts)
env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix
# For debugging / log visibility
env_vars["MVP_CODE_PATH"] = code_path
# SFT: ensure ray.init() connects to the cluster
if spec.workload == "sft":
if workload == "sft":
env_vars.setdefault("RAY_ADDRESS", "auto")
return {"env_vars": env_vars}
@ -146,6 +149,91 @@ class RayJobTool:
return submitted
def submit_advanced(
self,
spec: AdvancedTaskSpec,
no_wait: bool,
job_dir: str | None = None,
user_id: str | None = None,
) -> str:
submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}"
job_dir = job_dir or self._job_dir(submission_id)
built = build_advanced_argv(spec.command)
entrypoint_argv = [
"python3",
"-m",
"argus.ray.driver_entrypoint",
"--job-dir",
job_dir,
*built.argv,
]
entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv)
extra_pythonpath: list[str] = []
if user_id:
extra_pythonpath.append(f"{self.cfg.shared_root}/users/{user_id}/code")
runtime_env = self._runtime_env_for(code_path=self.cfg.verl_code_path, workload=spec.workload, extra_pythonpath=extra_pythonpath)
# Prepare job artifacts directory
job_root = Path(job_dir)
_mkdir(job_root / "config")
_mkdir(job_root / "logs")
_mkdir(job_root / "debug")
_mkdir(job_root / "checkpoints")
_write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict()))
_write_text(job_root / "config" / "taskspec.yaml", dump_yaml(spec.to_public_dict()))
_write_json(
job_root / "config" / "submit_payload.json",
{
"submission_id": submission_id,
"address": self.cfg.address,
"entrypoint": entrypoint,
"entrypoint_num_cpus": self.cfg.entrypoint_num_cpus,
"entrypoint_resources": self.cfg.entrypoint_resources,
"runtime_env": runtime_env,
},
)
# Pre-submit debug snapshot (ray cluster resources via ray.init)
try:
ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False)
_write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources())
_write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources())
except Exception as e:
_write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n")
try:
submitted = self.client.submit_job(
entrypoint=entrypoint,
submission_id=submission_id,
runtime_env=runtime_env,
entrypoint_num_cpus=self.cfg.entrypoint_num_cpus,
entrypoint_resources=self.cfg.entrypoint_resources,
)
except Exception as e:
_write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n")
raise
_write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n")
# Post-submit debug snapshot via SDK
try:
jobs = self.client.list_jobs()
_write_text(
job_root / "debug" / "ray_job_list_post.json",
json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n",
)
except Exception as e:
_write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n")
if not no_wait:
pass
return submitted
def status(self, submission_id: str) -> str:
return str(self.client.get_job_status(submission_id))

View File

@ -0,0 +1,75 @@
from __future__ import annotations
import re
_RE_PRIVATE_USER = re.compile(r"/private/users/([^/\s\"']+)")
_RE_TRAIN_FILES = re.compile(r"data\.train_files=([^\s\\]+)")
_RE_VAL_FILES = re.compile(r"data\.val_files=([^\s\\]+)")
_RE_REWARD_PATH = re.compile(r"custom_reward_function\.path=([^\s\\]+)")
def _strip_quotes(s: str) -> str:
s = s.strip()
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
return s[1:-1]
return s
def expand_advanced_command(*, user_id: str, command: str) -> str:
"""
Expand user-facing macros into real container paths.
Important: `$HOME/common/...` is a SFTPGo virtual folder; the training process
cannot see it. We map those paths to real shared mounts.
"""
home = f"/private/users/{user_id}"
expanded = command
expanded = expanded.replace("$HOME/common/datasets", "/private/datasets")
expanded = expanded.replace("$HOME/common/hf", "/private/hf")
expanded = expanded.replace("$HOME", home)
return expanded
def validate_advanced_command(*, user_id: str, expanded_command: str) -> None:
"""
Best-effort validation for advanced commands.
Strong constraints (raise ValueError on failure):
- Must include a python3 VERL trainer entrypoint
- Must not reference other users' /private/users/<other>/... paths
- If train/val files are present, must be under allowed roots
- If reward path is present, must be under /private/users/<me>/code/...
"""
if "python3" not in expanded_command or "-m verl.trainer." not in expanded_command:
raise ValueError("command must include 'python3' and '-m verl.trainer.'")
for other in _RE_PRIVATE_USER.findall(expanded_command):
if other != user_id:
raise ValueError(f"command references another user's path: /private/users/{other}/...")
allowed_dataset_prefixes = (
f"/private/users/{user_id}/datasets/",
"/private/datasets/",
# Backward compatible with v3.0 paths used by existing templates/users.
"/private/common/datasets/",
)
for m in _RE_TRAIN_FILES.finditer(expanded_command):
path = _strip_quotes(m.group(1))
if not path.startswith(allowed_dataset_prefixes):
raise ValueError(f"train_file must be under allowed roots, got: {path}")
for m in _RE_VAL_FILES.finditer(expanded_command):
path = _strip_quotes(m.group(1))
if not path.startswith(allowed_dataset_prefixes):
raise ValueError(f"val_file must be under allowed roots, got: {path}")
for m in _RE_REWARD_PATH.finditer(expanded_command):
path = _strip_quotes(m.group(1))
allow = f"/private/users/{user_id}/code/"
if not path.startswith(allow):
raise ValueError(f"reward path must be under {allow}, got: {path}")

View File

@ -9,8 +9,9 @@ import yaml
from fastapi import FastAPI, HTTPException, Request, Response
from argus.core.ids import new_task_id
from argus.ray.models import JobSpec, RayConfig
from argus.ray.models import AdvancedTaskSpec, JobSpec, RayConfig, parse_taskspec
from .advanced_command import expand_advanced_command, validate_advanced_command
from .config import V2Config
from .db import Db
from .janitor import JobsJanitor
@ -316,23 +317,52 @@ def create_app(config_path: str) -> FastAPI:
async def submit_task(req: Request) -> dict[str, Any]:
subject = _auth(req)
body = (await req.body()).decode("utf-8")
obj = yaml.safe_load(body) or {}
try:
obj = yaml.safe_load(body) or {}
except Exception as e:
# yaml.YAMLError and similar should be a user-facing 400, not a 500.
raise HTTPException(status_code=400, detail=f"invalid YAML: {e!r}")
if not isinstance(obj, dict):
raise HTTPException(status_code=400, detail="jobspec must be a YAML mapping")
try:
spec = JobSpec.from_dict(obj)
spec = parse_taskspec(obj)
except Exception as e:
raise HTTPException(status_code=400, detail=f"invalid jobspec: {e!r}")
# v3.0 path policy:
root = ray_cfg.shared_root.rstrip("/")
user_id = str(subject["user_id"]).strip()
task_id = new_task_id(str(spec.workload), user_id=user_id)
# v3.5: Advanced TaskSpec (custom command)
if isinstance(spec, AdvancedTaskSpec):
expanded = expand_advanced_command(user_id=user_id, command=spec.command)
try:
validate_advanced_command(user_id=user_id, expanded_command=expanded)
except Exception as e:
raise HTTPException(status_code=400, detail=f"invalid command: {e!r}")
stored = spec.to_public_dict()
stored["command"] = expanded
stored_yaml = yaml.safe_dump(stored, sort_keys=False)
db.create_task_v25(
task_id=task_id,
user_id=user_id,
workload=spec.workload,
jobspec_yaml=stored_yaml,
nnodes=spec.nnodes,
n_gpus_per_node=spec.n_gpus_per_node,
)
return {"task_id": task_id, "state": "QUEUED"}
# v3.0 path policy (Basic JobSpec):
# - code_path: only allow /private/common/...
# - train/val: allow /private/common/datasets/... OR /private/users/<me>/datasets/...
# - model_id: if it looks like a local path (/private/...), allow only models dirs:
# /private/common/models/... OR /private/users/<me>/models/...
root = ray_cfg.shared_root.rstrip("/")
common_prefix = f"{root}/common/"
user_prefix = f"{root}/users/{str(subject['user_id']).strip()}/"
user_prefix = f"{root}/users/{user_id}/"
common_datasets_prefix = f"{common_prefix}datasets/"
user_datasets_prefix = f"{user_prefix}datasets/"
@ -360,15 +390,17 @@ def create_app(config_path: str) -> FastAPI:
detail=f"model_id local path must start with {common_models_prefix} or {user_models_prefix}",
)
task_id = new_task_id(spec.workload, user_id=str(subject["user_id"]))
db.create_task_v25(
task_id=task_id,
user_id=str(subject["user_id"]),
workload=spec.workload,
jobspec_yaml=body,
nnodes=spec.nnodes,
n_gpus_per_node=spec.n_gpus_per_node,
)
try:
db.create_task_v25(
task_id=task_id,
user_id=user_id,
workload=spec.workload,
jobspec_yaml=body,
nnodes=spec.nnodes,
n_gpus_per_node=spec.n_gpus_per_node,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"db create task failed: {e!r}")
return {"task_id": task_id, "state": "QUEUED"}
@app.get("/api/v2/tasks/{task_id}")
@ -428,7 +460,7 @@ def create_app(config_path: str) -> FastAPI:
obj = yaml.safe_load(raw) or {}
if not isinstance(obj, dict):
raise ValueError("jobspec must be a YAML mapping")
spec = JobSpec.from_dict(obj)
spec = parse_taskspec(obj)
resolved = spec.to_public_dict()
attempts = db.list_attempts(task_id)
if attempts:

View File

@ -9,7 +9,7 @@ from typing import Any
import yaml
from argus.core.ids import attempt_submission_id
from argus.ray.models import JobSpec, RayConfig
from argus.ray.models import AdvancedTaskSpec, JobSpec, RayConfig, parse_taskspec
from argus.ray.ray_job_tool import RayJobTool
from .config import V2Config
@ -48,11 +48,11 @@ class Scheduler:
required = float(nnodes * n_gpus_per_node)
return avail.total_available_gpus >= required
def _parse_jobspec(self, jobspec_yaml: str) -> JobSpec:
def _parse_taskspec(self, jobspec_yaml: str) -> JobSpec | AdvancedTaskSpec:
obj = yaml.safe_load(jobspec_yaml) or {}
if not isinstance(obj, dict):
raise ValueError("jobspec must be a YAML mapping")
return JobSpec.from_dict(obj)
return parse_taskspec(obj)
def _submit_one(self, task_row: dict[str, Any]) -> None:
task_id = str(task_row["task_id"])
@ -60,7 +60,7 @@ class Scheduler:
user_id = task_row.get("user_id")
user_id_s = str(user_id) if user_id not in (None, "") else None
spec = self._parse_jobspec(jobspec_yaml)
spec = self._parse_taskspec(jobspec_yaml)
attempt_no = int(task_row.get("latest_attempt_no", 0)) + 1
ray_sid = attempt_submission_id(task_id, attempt_no)
job_dir = self._job_dir_for_task(user_id=user_id_s, ray_submission_id=ray_sid)
@ -69,13 +69,20 @@ class Scheduler:
self.db.create_attempt(task_id=task_id, attempt_no=attempt_no, ray_submission_id=ray_sid)
self.db.set_task_state(task_id=task_id, state="SUBMITTING", latest_attempt_no=attempt_no)
# Override submission_id in jobspec (v1.1 compatible)
d = spec.to_public_dict()
d["submission_id"] = ray_sid
spec2 = JobSpec.from_dict(d)
# Override submission_id in taskspec (v1.1 compatible)
if isinstance(spec, JobSpec):
d = spec.to_public_dict()
d["submission_id"] = ray_sid
spec2 = JobSpec.from_dict(d)
submit = lambda: self.tool.submit(spec2, no_wait=True, job_dir=job_dir)
else:
d = spec.to_public_dict()
d["submission_id"] = ray_sid
spec2 = AdvancedTaskSpec.from_dict(d)
submit = lambda: self.tool.submit_advanced(spec2, no_wait=True, job_dir=job_dir, user_id=user_id_s)
try:
submitted = self.tool.submit(spec2, no_wait=True, job_dir=job_dir)
submitted = submit()
# submitted should equal ray_sid; keep as source of truth.
self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="SUBMITTED")
self.db.set_task_state(task_id=task_id, state="SUBMITTED")

View File

@ -329,16 +329,71 @@ val_file: /private/common/datasets/gsm8k_sft/test.parquet # 验证数据(必
# trainer_device: cpu # 仅 SFT 生效driver 侧 device可选默认cpu
# submission_id: "" # Ray submission_id可选默认空通常由服务自动生成无需填写
""".strip()
adv = """# Advanced TaskSpec (YAML) - v3.5
kind: advanced # 任务类型必填advanced自定义 command
# 说明v3.5 中 Advanced 任务不会按 ppo/grpo/sft 分类;平台统一按 "advanced" 做任务分类与 task_id 命名。
nnodes: 2 # 训练节点数(必填):用于平台队列调度与资源预检查
n_gpus_per_node: 4 # 每节点 GPU 数(必填):用于平台队列调度与资源预检查
# 自定义训练命令(必填):平台会做 $HOME 宏替换:
# - $HOME -> /private/users/<user>
# - $HOME/common/datasets -> /private/datasets共享只读数据
# - $HOME/common/hf -> /private/hf共享只读 HF cache
command: |
# 注意PPO 需要一些关键参数,否则 VERL 会在启动前 fail-fast例如 actor 的 micro batch
PYTHONUNBUFFERED=1 \
python3 -m verl.trainer.main_ppo \
data.train_files=$HOME/common/datasets/gsm8k/train.parquet \
data.val_files=$HOME/common/datasets/gsm8k/test.parquet \
data.train_batch_size=256 \
data.max_prompt_length=512 \
data.max_response_length=512 \
actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=64 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.name=sglang \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
critic.optim.lr=1e-5 \
critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.kl_ctrl.kl_coef=0.001 \
trainer.logger=console \
trainer.val_before_train=False \
trainer.nnodes=2 \
trainer.n_gpus_per_node=4 \
trainer.total_epochs=1 \
trainer.total_training_steps=10 \
trainer.save_freq=10 \
trainer.test_freq=-1 \
trainer.resume_mode=disable \
trainer.default_local_dir=checkpoints \
+ray_kwargs.ray_init.address=auto \
hydra.run.dir=logs/hydra
# 可选:自定义 reward方式 A直接写在 command 里)
# command 里增加如下 overrides
# custom_reward_function.path=$HOME/code/reward.py
# custom_reward_function.name=compute_score
""".strip()
body = f"""
<h1>New Task</h1>
<div class="card">
<div class="muted">Paste TaskSpec YAML and submit to API server. Note: <code>code_path</code> is required (v3.0 does not execute user code; use the common snapshot).</div>
<div class="muted">
Paste TaskSpec YAML and submit to API server.
Basic tasks require <code>code_path</code>; Advanced tasks use <code>kind: advanced</code> with a custom <code>command</code>.
</div>
<div style="height:10px"></div>
<div class="row">
<button class="btn" id="tpl-ppo">PPO example</button>
<button class="btn" id="tpl-grpo">GRPO example</button>
<button class="btn" id="tpl-sft">SFT example</button>
<button class="btn" id="tpl-adv">Advanced example</button>
</div>
<div style="height:10px"></div>
<textarea id="yaml" rows="16">{html.escape(ppo)}</textarea>
@ -354,6 +409,7 @@ val_file: /private/common/datasets/gsm8k_sft/test.parquet # 验证数据(必
tpl_ppo = json.dumps(ppo)
tpl_grpo = json.dumps(grpo)
tpl_sft = json.dumps(sft)
tpl_adv = json.dumps(adv)
script = (
"""
const msg = document.getElementById("msg");
@ -361,9 +417,11 @@ const yamlEl = document.getElementById("yaml");
const TPL_PPO = __TPL_PPO__;
const TPL_GRPO = __TPL_GRPO__;
const TPL_SFT = __TPL_SFT__;
const TPL_ADV = __TPL_ADV__;
document.getElementById("tpl-ppo").onclick = () => { yamlEl.value = TPL_PPO; msg.textContent = ""; };
document.getElementById("tpl-grpo").onclick = () => { yamlEl.value = TPL_GRPO; msg.textContent = ""; };
document.getElementById("tpl-sft").onclick = () => { yamlEl.value = TPL_SFT; msg.textContent = ""; };
document.getElementById("tpl-adv").onclick = () => { yamlEl.value = TPL_ADV; msg.textContent = ""; };
document.getElementById("submit").onclick = async () => {
msg.textContent = "Submitting...";
const body = yamlEl.value;
@ -378,6 +436,7 @@ document.getElementById("submit").onclick = async () => {
.replace("__TPL_PPO__", tpl_ppo)
.replace("__TPL_GRPO__", tpl_grpo)
.replace("__TPL_SFT__", tpl_sft)
.replace("__TPL_ADV__", tpl_adv)
)
return HTMLResponse(content=_page("New Task", "new", body, script))

View File

@ -0,0 +1,74 @@
from __future__ import annotations
import pytest
from argus.service.advanced_command import expand_advanced_command, validate_advanced_command
def test_expand_maps_home_common_datasets_to_private_datasets():
cmd = "python3 -m verl.trainer.main_ppo data.train_files=$HOME/common/datasets/gsm8k/train.parquet"
expanded = expand_advanced_command(user_id="alice", command=cmd)
assert "/private/datasets/gsm8k/train.parquet" in expanded
assert "$HOME/common/datasets" not in expanded
def test_expand_maps_home_common_hf_to_private_hf():
cmd = "python3 -m verl.trainer.main_ppo HF_HOME=$HOME/common/hf"
expanded = expand_advanced_command(user_id="alice", command=cmd)
assert "HF_HOME=/private/hf" in expanded
def test_expand_maps_home_to_private_users():
cmd = "python3 -m verl.trainer.main_ppo data.train_files=$HOME/datasets/x.parquet"
expanded = expand_advanced_command(user_id="alice", command=cmd)
assert "data.train_files=/private/users/alice/datasets/x.parquet" in expanded
def test_validate_requires_verl_trainer_entry():
with pytest.raises(ValueError, match="verl\\.trainer"):
validate_advanced_command(user_id="alice", expanded_command="echo hi")
def test_validate_rejects_cross_user_paths():
cmd = "python3 -m verl.trainer.main_ppo x=/private/users/bob/datasets/x"
with pytest.raises(ValueError, match="another user's path"):
validate_advanced_command(user_id="alice", expanded_command=cmd)
def test_validate_allows_train_val_under_allowed_roots():
cmd = (
"python3 -m verl.trainer.main_ppo "
"data.train_files=/private/datasets/gsm8k/train.parquet "
"data.val_files=/private/users/alice/datasets/gsm8k/test.parquet"
)
validate_advanced_command(user_id="alice", expanded_command=cmd)
def test_validate_rejects_train_val_outside_allowed_roots():
cmd = (
"python3 -m verl.trainer.main_ppo "
"data.train_files=/etc/passwd "
"data.val_files=/private/datasets/gsm8k/test.parquet"
)
with pytest.raises(ValueError, match="train_file must be under allowed roots"):
validate_advanced_command(user_id="alice", expanded_command=cmd)
def test_validate_allows_reward_path_under_user_code():
cmd = (
"python3 -m verl.trainer.main_ppo "
"custom_reward_function.path=/private/users/alice/code/reward.py "
"custom_reward_function.name=compute_score"
)
validate_advanced_command(user_id="alice", expanded_command=cmd)
def test_validate_rejects_reward_path_outside_user_code():
cmd = (
"python3 -m verl.trainer.main_ppo "
"custom_reward_function.path=/private/datasets/reward.py "
"custom_reward_function.name=compute_score"
)
with pytest.raises(ValueError, match="reward path must be under"):
validate_advanced_command(user_id="alice", expanded_command=cmd)

View File

@ -185,6 +185,29 @@ def test_submit_rejects_non_mapping_jobspec(tmp_path: Path, monkeypatch):
assert r.status_code == 400
def test_submit_rejects_invalid_yaml_returns_400(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod
cfg_path = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "token1")
class _Scheduler:
def __init__(self, **kwargs):
self.tool = object()
def run_forever(self, stop_flag):
return None
monkeypatch.setattr(app_mod, "Scheduler", _Scheduler)
app = app_mod.create_app(str(cfg_path))
with TestClient(app) as c:
# YAML parse error: unclosed quote
r = c.post("/api/v2/tasks", headers={"authorization": "Bearer token1"}, data="workload: \"ppo\n")
assert r.status_code == 400
assert "invalid YAML" in r.text
def test_submit_rejects_invalid_jobspec(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod
@ -206,6 +229,99 @@ def test_submit_rejects_invalid_jobspec(tmp_path: Path, monkeypatch):
assert r.status_code == 400
def test_submit_advanced_ok_and_command_expanded(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod
cfg_path = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token")
monkeypatch.setattr(app_mod, "new_task_id", lambda workload, **kwargs: "tid_adv")
class _Scheduler:
def __init__(self, **kwargs):
self.tool = object()
def run_forever(self, stop_flag):
return None
monkeypatch.setattr(app_mod, "Scheduler", _Scheduler)
app = app_mod.create_app(str(cfg_path))
# seed user + token
from argus.service.config import V2Config
from argus.service.db import Db
root = yaml.safe_load(cfg_path.read_text(encoding="utf-8"))
v2_cfg = V2Config.from_root_dict(root)
db = Db(v2_cfg.sqlite.db_path)
db.create_user(user_id="u1", display_name=None)
token = db.issue_token(user_id="u1")
spec = """
kind: advanced
workload: ppo
nnodes: 2
n_gpus_per_node: 4
command: |
python3 -m verl.trainer.main_ppo \\
data.train_files=$HOME/common/datasets/gsm8k/train.parquet \\
data.val_files=$HOME/datasets/gsm8k/test.parquet \\
+ray_kwargs.ray_init.address=auto
""".lstrip()
with TestClient(app) as c:
r = c.post("/api/v2/tasks", headers={"authorization": f"Bearer {token}"}, data=spec)
assert r.status_code == 200
assert r.json()["task_id"] == "tid_adv"
r2 = c.get("/api/v2/tasks/tid_adv/spec", headers={"authorization": f"Bearer {token}"})
assert r2.status_code == 200
assert "kind: advanced" in r2.text
# Expanded mapping for SFTPGo "virtual common" folders:
assert "/private/datasets/gsm8k/train.parquet" in r2.text
assert "/private/users/u1/datasets/gsm8k/test.parquet" in r2.text
def test_submit_advanced_rejects_invalid_command(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod
cfg_path = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token")
class _Scheduler:
def __init__(self, **kwargs):
self.tool = object()
def run_forever(self, stop_flag):
return None
monkeypatch.setattr(app_mod, "Scheduler", _Scheduler)
app = app_mod.create_app(str(cfg_path))
# seed user + token
from argus.service.config import V2Config
from argus.service.db import Db
root = yaml.safe_load(cfg_path.read_text(encoding="utf-8"))
v2_cfg = V2Config.from_root_dict(root)
db = Db(v2_cfg.sqlite.db_path)
db.create_user(user_id="u1", display_name=None)
token = db.issue_token(user_id="u1")
bad = """
kind: advanced
workload: ppo
nnodes: 2
n_gpus_per_node: 4
command: |
echo hello
""".lstrip()
with TestClient(app) as c:
r = c.post("/api/v2/tasks", headers={"authorization": f"Bearer {token}"}, data=bad)
assert r.status_code == 400
assert "invalid command" in r.text
def test_me_sftp_reset_password_disabled_returns_400(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod

View File

@ -56,3 +56,10 @@ def test_build_training_argv_unsupported_raises():
spec = _base_spec("bad")
with pytest.raises(ValueError, match="unsupported workload"):
build_training_argv(spec, submission_id="sid", job_dir="/job")
def test_build_advanced_argv_uses_bash_lc():
from argus.ray.builders import build_advanced_argv
built = build_advanced_argv("echo hi")
assert built.argv[:2] == ["bash", "-lc"]

View File

@ -18,8 +18,10 @@ def test_driver_entrypoint_strips_double_dash_and_returns_code(monkeypatch, tmp_
returncode = 7
monkeypatch.setattr(mod.subprocess, "run", lambda cmd, check: _Proc())
got = {}
monkeypatch.setattr(mod.os, "chdir", lambda p: got.update({"cwd": str(p)}))
monkeypatch.setattr(sys, "argv", ["x", "--job-dir", str(tmp_path), "--", "echo", "hi"])
assert mod.main() == 7
assert (tmp_path).exists()
assert got["cwd"] == str(tmp_path)

View File

@ -31,6 +31,7 @@ def test_ray_config_from_dict_new_format_and_defaults():
assert cfg.entrypoint_resources["worker_node"] == 1.0
assert cfg.runtime_env_env_vars["HF_ENDPOINT"] == "x"
assert cfg.user_code_path == "/private/user/code"
assert cfg.verl_code_path == "/private/common/code/verl/verl_repo"
public = cfg.to_public_dict()
assert public["runtime_env"]["env_vars"]["HF_ENDPOINT"] == "x"
@ -106,3 +107,50 @@ def test_jobspec_unsupported_workload():
JobSpec.from_dict(
{"workload": "nope", "code_path": "x", "model_id": "m", "train_file": "t", "val_file": "v"}
)
def test_parse_taskspec_basic_no_kind_compat():
from argus.ray.models import JobSpec, parse_taskspec
got = parse_taskspec(
{
"workload": "ppo",
"code_path": "/code",
"model_id": "m",
"train_file": "train.jsonl",
"val_file": "val.jsonl",
}
)
assert isinstance(got, JobSpec)
assert got.workload == "ppo"
def test_parse_taskspec_advanced_smoke():
from argus.ray.models import AdvancedTaskSpec, parse_taskspec
got = parse_taskspec(
{
"kind": "advanced",
"nnodes": 2,
"n_gpus_per_node": 4,
"command": "python3 -m verl.trainer.main_ppo +ray_kwargs.ray_init.address=auto",
}
)
assert isinstance(got, AdvancedTaskSpec)
assert got.kind == "advanced"
assert got.workload == "advanced"
assert got.nnodes == 2
assert got.n_gpus_per_node == 4
def test_parse_taskspec_advanced_requires_command_nnodes_gpus():
from argus.ray.models import parse_taskspec
with pytest.raises(ValueError, match="missing required field: command"):
parse_taskspec({"kind": "advanced", "nnodes": 1, "n_gpus_per_node": 1})
with pytest.raises(ValueError, match="missing required field: nnodes"):
parse_taskspec({"kind": "advanced", "n_gpus_per_node": 1, "command": "python3 -m verl.trainer.main_ppo"})
with pytest.raises(ValueError, match="missing required field: n_gpus_per_node"):
parse_taskspec({"kind": "advanced", "nnodes": 1, "command": "python3 -m verl.trainer.main_ppo"})

View File

@ -161,3 +161,58 @@ def test_submit_error_writes_file_then_reraises(tmp_path: Path, monkeypatch):
err = tmp_path / "jobs" / "sid2" / "logs" / "submit.error.txt"
assert err.exists()
def test_submit_advanced_writes_artifacts_and_returns_submission_id(tmp_path: Path, monkeypatch):
from argus.ray import ray_job_tool as mod
class _FakeClient:
def __init__(self, address: str):
self.address = address
self.last_submit_kwargs = None
def submit_job(self, **kwargs):
self.last_submit_kwargs = dict(kwargs)
return str(kwargs["submission_id"])
def list_jobs(self):
class X:
def dict(self):
return {"ok": True}
return [X()]
monkeypatch.setattr(mod, "JobSubmissionClient", _FakeClient)
monkeypatch.setattr(mod.ray, "init", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("no ray")))
cfg = RayConfig.from_dict(
{
"address": "http://127.0.0.1:8265",
"shared_root": str(tmp_path),
"entrypoint_resources": {"worker_node": 1},
"runtime_env": {"env_vars": {}},
}
)
spec = mod.AdvancedTaskSpec.from_dict(
{
"kind": "advanced",
"submission_id": "sid3",
"nnodes": 2,
"n_gpus_per_node": 4,
"command": "echo hi",
}
)
tool = mod.RayJobTool(cfg)
submitted = tool.submit_advanced(spec, no_wait=True, user_id="alice")
assert submitted == "sid3"
job_root = tmp_path / "jobs" / "sid3"
assert (job_root / "config" / "ray_config.yaml").exists()
assert (job_root / "config" / "taskspec.yaml").exists()
assert (job_root / "config" / "ray_submission_id.txt").read_text(encoding="utf-8").strip() == "sid3"
payload = json.loads((job_root / "config" / "submit_payload.json").read_text(encoding="utf-8"))
assert payload["submission_id"] == "sid3"
assert "argus.ray.driver_entrypoint" in payload["entrypoint"]
assert (job_root / "debug" / "ray_resources_pre.error.txt").exists()

View File

@ -85,6 +85,67 @@ def test_tick_submits_one_task(monkeypatch, tmp_path: Path):
assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01"
def test_tick_submits_one_task_advanced(monkeypatch, tmp_path: Path):
from argus.service import scheduler as sched_mod
ray_cfg, v2_cfg = _mk_cfg(tmp_path)
db = Db(v2_cfg.sqlite.db_path)
db.init()
db.create_task_v25(
task_id="t1",
user_id="alice",
workload="advanced",
jobspec_yaml=yaml.safe_dump(
{
"kind": "advanced",
"nnodes": 2,
"n_gpus_per_node": 4,
"command": "python3 -m verl.trainer.main_ppo +ray_kwargs.ray_init.address=auto",
}
),
nnodes=2,
n_gpus_per_node=4,
)
monkeypatch.setattr(sched_mod, "ensure_ray_connected", lambda: None)
monkeypatch.setattr(
sched_mod,
"get_cluster_available",
lambda: SimpleNamespace(total_available_gpus=999.0, total_available_npus=0.0),
)
class _Tool:
def __init__(self, cfg):
self.submitted = []
self.job_dirs = []
def submit(self, spec, no_wait: bool, job_dir: str | None = None):
raise AssertionError("should not call submit() for advanced")
def submit_advanced(self, spec, no_wait: bool, job_dir: str | None = None, user_id: str | None = None):
self.submitted.append(spec.submission_id)
self.job_dirs.append(job_dir)
return str(spec.submission_id)
def status(self, submission_id: str):
return "RUNNING"
def logs(self, submission_id: str):
return ""
monkeypatch.setattr(sched_mod, "RayJobTool", _Tool)
s = sched_mod.Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg)
s.tick()
row = db.get_task("t1")
assert row and row["state"] == "SUBMITTED"
attempts = db.list_attempts("t1")
assert len(attempts) == 1
assert attempts[0]["ray_submission_id"] == "t1--a01"
assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01"
def test_tick_marks_pending_resources(monkeypatch, tmp_path: Path):
from argus.service import scheduler as sched_mod

View File

@ -78,3 +78,18 @@ def test_ui_task_detail_shows_ids(tmp_path, monkeypatch):
assert f"/ui/tasks/{task_id}/logs" in r.text
assert "TaskSpec (YAML)" in r.text
assert "/api/v2/tasks/" in r.text
def test_ui_new_task_contains_advanced_example_snippet(tmp_path, monkeypatch):
cfg = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token")
app = create_app(str(cfg))
c = TestClient(app)
r = c.get("/ui/tasks/new")
assert r.status_code == 200
# Ensure Advanced example includes required PPO micro batch override (common failure mode if omitted).
assert "kind: advanced" in r.text
# workload is not needed for advanced in v3.5.
assert "# workload:" not in r.text
assert "actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu" in r.text