diff --git a/specs/mvp/v3.5/README.md b/specs/mvp/v3.5/README.md new file mode 100644 index 0000000..c6df997 --- /dev/null +++ b/specs/mvp/v3.5/README.md @@ -0,0 +1,7 @@ +# MVP v3.5 + +本目录包含 v3.5 的需求与设计(精简版): + +- `requirement.md`:需求补充说明(来源于讨论) +- `roadmap_v3.5.png`:架构草图(Advanced Task + Resume + IB + Serving) +- `v3.5_design.md`:详细设计方案(基于 v3.0;当前迭代仅聚焦 Advanced TaskSpec + Custom Reward,Serving/IB/Resume/多版本 verl 暂缓) diff --git a/specs/mvp/v3.5/note.md b/specs/mvp/v3.5/note.md new file mode 100644 index 0000000..c68bdfd --- /dev/null +++ b/specs/mvp/v3.5/note.md @@ -0,0 +1,3 @@ + +1. node management(v3.5 引入的接口骨架:通过 SSH/平台能力管理 head/worker 节点生命周期;先做最小可用 --- 这个是干嘛的? +2. \ No newline at end of file diff --git a/specs/mvp/v3.5/requirement.md b/specs/mvp/v3.5/requirement.md new file mode 100644 index 0000000..56520f6 --- /dev/null +++ b/specs/mvp/v3.5/requirement.md @@ -0,0 +1,40 @@ + +v3.5 版本是在v3.0的基础上进行功能扩展: +1. 支持自定义命令,不走固定的TaskSpec模板,用户直接提供调用verl 的python命令,如下,这个灵活度更高,需要用户自己把握文件路径,用户使用 $HOME,服务层替换为用户自己的/private/users//路径,使用$COMMON 则替换为/private/ + +``` +PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \ + data.train_files=$HOME/data/gsm8k/train.parquet \ + data.val_files=$HOME/data/gsm8k/test.parquet \ + data.train_batch_size=256 \ + data.max_prompt_length=512 \ + data.max_response_length=512 \ + actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.actor.ppo_mini_batch_size=64 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \ + critic.optim.lr=1e-5 \ + critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \ + critic.ppo_micro_batch_size_per_gpu=4 \ + algorithm.kl_ctrl.kl_coef=0.001 \ + trainer.logger=console \ + trainer.val_before_train=False \ + trainer.n_gpus_per_node=1 \ + trainer.nnodes=1 \ + trainer.save_freq=10 \ + trainer.test_freq=10 \ + trainer.total_epochs=15 +``` + +2. 支持自定义的奖励函数方法,你参考 verl 项目 [text](../../../verl) 里的示例,设计方案 + +3. 支持codepath指定用户上传到自己user路径下的 verl版本代码 + +4. 断点续训:支持某个已经complete(成功或者fail或者stopped)的任务task,从最后一个保存的checkpoint 继续训练,参数应该保持不变,你确认一下是不是对应一个新的ray job,或者分析一下verl 是否已经有类似的功能支持。 + +5. 支持训练走NCCL,使用RoCEv2和Infiband网络,调研一些verl怎样支持,需要哪些配置。 \ No newline at end of file diff --git a/specs/mvp/v3.5/roadmap_v3.5.png b/specs/mvp/v3.5/roadmap_v3.5.png new file mode 100644 index 0000000..779e05b Binary files /dev/null and b/specs/mvp/v3.5/roadmap_v3.5.png differ diff --git a/specs/mvp/v3.5/v3.5_design.md b/specs/mvp/v3.5/v3.5_design.md new file mode 100644 index 0000000..f85edbc --- /dev/null +++ b/specs/mvp/v3.5/v3.5_design.md @@ -0,0 +1,366 @@ +# MVP v3.5 详细设计方案(进一步精简版,基于 v3.0) + +> 背景:v3.0 已具备 WebUI + API server + 用户/任务隔离 + SFTPGo 数据管理 + Stateless Ray cluster(head + worker node pool)。 +> +> v3.5 本轮 **只做 2 件事**: +> 1) Advanced Task:支持用户提交自定义训练命令(command) +> 2) Custom Reward:支持用户通过 VERL 原生 `custom_reward_function.*` 方式注入 reward(仅方式 A:用户自己写命令) +> +> 明确不做(从上一版设计中移除):(3) 自定义 verl 版本/代码路径、(4) 断点续训、(5) IB/RoCEv2 网络支持、(6) Model Serving。 + +--- + +## 0. 继承 v3.0 的不变点(重要约束) + +1) **Node management 不变** +- v3.5 不新增/不修改 node management 机制;仍按 v3.0 现状运行(head 写 discovery、worker watchdog 自动 join、自愈)。 + +2) **Head 不跑训练** +- 所有训练/Serving driver 通过 Ray entrypoint placement 强制落在 worker(例如 `entrypoint_resources={"worker_node": 1}`)。 + +3) **SFTPGo 的 “common” 目录约定变更** +- 不再使用 `$COMMON` 宏。 +- 在 SFTPGo 中,把共享只读资源映射到用户 home 下的固定目录(用户在 SFTP/WebClient 看到的是 `$HOME/common/...`): + - `$HOME/common/datasets` → 容器内真实路径 `/private/datasets`(只读) + - `$HOME/common/hf` → 容器内真实路径 `/private/hf`(只读) + +> 这里的 `$HOME` 指:`/private/users/`(容器内路径)。 + +--- + +## 1. v3.5 需求范围(精简后) + +### 1.1 In scope + +**A. Advanced TaskSpec(自定义命令)** +- 用户提交 `command`(多行 shell 或单行) +- 平台做 `$HOME` 宏替换 +- 平台做 best-effort 安全检查(路径/关键参数),然后提交为 Ray job + +**B. Custom Reward(仅方式 A)** +- 用户在 `command` 里显式写 hydra overrides: + - `custom_reward_function.path=...` + - `custom_reward_function.name=...` + - `custom_reward_function.reward_kwargs.*=...`(可选) +- 平台不提供结构化 reward 字段(不做方式 B),只做检查(校验 path 合法) + +### 1.2 Out of scope(本轮不做) +- 自定义 verl 版本/代码路径(仍使用平台内置/公共 verl 代码快照) +- 断点续训(resume from checkpoint) +- IB/RoCEv2 网络专门支持(NCCL/RDMA env 先不引入平台) +- Model Serving(暂缓,后续单独设计迭代) + +--- + +## 2. Advanced TaskSpec 设计 + +### 2.1 为什么需要 Advanced Task + +v3.0 的 Basic TaskSpec(ppo/grpo/sft)通过平台模板生成固定 overrides,适合“快速跑通”。 +但科研/调参场景需要更高自由度:用户希望直接写 `python3 -m verl.trainer.main_ppo ...` 并自行控制每个 override。 + +### 2.2 Advanced TaskSpec(建议 schema) + +建议新增一种 TaskSpec 类型,通过 `kind: advanced` 区分: + +```yaml +kind: advanced + +# 资源(平台调度与预检查用;仍需要) +nnodes: 2 +n_gpus_per_node: 4 + +# 自定义命令(用户负责写对 VERL 的参数/路径) +# 平台会对 $HOME 做宏替换;其余保持原样 +command: | + PYTHONUNBUFFERED=1 python3 -m verl.trainer.main_ppo \ + data.train_files=$HOME/datasets/gsm8k/train.parquet \ + data.val_files=$HOME/datasets/gsm8k/test.parquet \ + actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \ + trainer.nnodes=2 \ + trainer.n_gpus_per_node=4 \ + trainer.total_epochs=1 \ + trainer.save_freq=10 \ + +ray_kwargs.ray_init.address=auto +``` + +### 2.3 `$HOME` 宏替换规则 + +仅支持 `$HOME`(v3.5 移除 `$COMMON`): +- `$HOME` → `/private/users/` + +用户如果要用共享数据/缓存: +- 共享数据:`$HOME/common/datasets/...` +- 共享 HF 缓存:`$HOME/common/hf/...`(通常不需要写进 command,但可用于 debug) + +#### 2.3.1 重要说明:SFTPGo “virtual folder” 与训练进程看到的“真实路径” + +在 SFTPGo 中,`$HOME/common/datasets` / `$HOME/common/hf` 是 **SFTP 虚拟目录映射**(virtual folder),它们映射到容器内真实路径: +- `$HOME/common/datasets` ↔ `/private/datasets` +- `$HOME/common/hf` ↔ `/private/hf` + +训练进程(Ray worker 上的 python 进程)看到的是 **容器内真实文件系统**,它并不会理解 SFTPGo 的 virtual folder。 + +因此,为了让用户能沿用 WebClient 里看到的路径语义(写 `$HOME/common/...`),服务层在提交 Advanced command 前需要做 **路径宏映射**: + +- `"$HOME/common/datasets"` → `"/private/datasets"` +- `"$HOME/common/hf"` → `"/private/hf"` +- 其余 `"$HOME"` → `"/private/users/"` + +这样用户写的 command 能在训练进程里正确读到文件。 + +### 2.4 服务层检查(best-effort,强约束 + 弱约束) + +> 目标:在不“解析完整 shell”的前提下,尽可能避免跨用户读文件与明显错误的任务。 + +**强约束(必须通过,否则 400)** +1) `nnodes`、`n_gpus_per_node` 必须存在(用于队列/资源预检查/placement) +2) `command` 必须包含一个明确的 python entry: + - 建议最低要求:包含 `python3` 且包含 `-m verl.trainer.`(防止随意执行系统命令) +3) 路径隔离校验(字符串/正则级别): + - 展开 `$HOME`(含 `$HOME/common/*` 映射到 `/private/*`)后: + - 禁止出现 `/private/users/` 下 “非当前用户”的路径(例如 `/private/users/bob/...`) + - 对 `data.train_files=...`、`data.val_files=...`(若出现)做 allowlist: + - 允许(用户目录):`/private/users//datasets/...` + - 允许(共享目录):`/private/datasets/...` + - 对 `custom_reward_function.path=...`(若出现)做 allowlist: + - 允许:`/private/users//code/...`(用户自行上传) + +**弱约束(warning,不阻塞)** +- 未检测到 `data.train_files=`/`data.val_files=`(可能是用户写成了别的 key 或使用了 config file) +- 未检测到 `+ray_kwargs.ray_init.address=auto`(v3.0/v3.5 推荐加,但用户可自行负责) + +> 说明:Advanced command 本质上属于“内部可信用户”能力,v3.5 不做强沙箱;安全检查以 best-effort 为主。 + +--- + +## 3. Custom Reward(仅方式 A:用户自己写) + +### 3.1 VERL 原生机制(本仓库 `verl/` 已调研) + +VERL PPO trainer 配置里支持: +- `custom_reward_function.path` +- `custom_reward_function.name` +- `custom_reward_function.reward_kwargs` + +对应实现位置: +- 配置模板:`verl/verl/trainer/config/ppo_trainer.yaml` +- 加载逻辑:`verl/verl/trainer/ppo/reward.py:get_custom_reward_fn` +- 典型 reward manager:`verl/verl/workers/reward_manager/naive.py` 会调用 `compute_score(...)` + +### 3.2 用户写法(示例) + +用户上传 `$HOME/code/reward.py`,在 command 里加: + +```bash +custom_reward_function.path=$HOME/code/reward.py \ +custom_reward_function.name=compute_score +``` + +函数签名建议(与 `naive` reward manager 参数对齐): + +```python +def compute_score(*, data_source: str, solution_str: str, ground_truth: str, extra_info=None, **kwargs): + ... +``` + +### 3.3 平台侧只做检查(不做字段扩展) + +v3.5 限定 reward 注入方式为 “用户写 command”,平台只做: +- 展开 `$HOME` +- 若检测到 `custom_reward_function.path=`,校验 path 在 `$HOME/code/` 下 +- 不尝试解析/合并 reward_kwargs(用户自己写) + +--- + +## 4. 服务层与 SFTPGo 的映射修改(你提出的关键点) + +v3.0 时代平台允许用户引用: +- `/private/common/datasets/...` +- `/private/common/hf/...` + +但现在 common 以 **SFTPGo virtual folder** 的形式呈现给用户(用户看到 `$HOME/common/...`,真实路径是 `/private/...`),因此 v3.5 的服务层需要做两件事: + +1) **用户侧语义(写 TaskSpec/command)** +- 共享 datasets(只读):`$HOME/common/datasets/...` +- 共享 hf cache(只读):`$HOME/common/hf/...` + +2) **运行时真实路径(提交到 Ray 前展开)** +- `$HOME/common/datasets/...` → `/private/datasets/...` +- `$HOME/common/hf/...` → `/private/hf/...` + +同时保留用户自有目录: +- 用户 datasets:`$HOME/datasets/...` +- 用户 models:`$HOME/models/...` +- 用户 code(reward):`$HOME/code/...` + +> 这部分主要影响: +> - Advanced command 检查(allowlist) +> - WebUI/Data 页面文案(告诉用户共享数据在哪里) + +> 兼容性建议:为了不影响 v3.0 期间已经习惯使用 `/private/common/datasets/...` 的用户/历史任务, +> v3.5 实现阶段建议 **同时接受**: +> - `/private/common/datasets/...`(旧路径语义,仍可读) +> - `/private/datasets/...`(真实路径语义,推荐) +> - Advanced command 里写的 `$HOME/common/datasets/...` 会先映射到 `/private/datasets/...` + +--- + +## 5. 验收标准(精简版) + +### 5.1 Advanced command +- 提交一个 Advanced PPO command(train/val 使用 `$HOME/common/datasets/...` 或 `$HOME/datasets/...`) +- 确认: + - 任务从 QUEUED → SUBMITTED/RUNNING + - driver 在 worker 上(head 不跑训练) + - 训练能正常跑至少若干 step + +### 5.2 Custom reward(方式 A) +- 用户上传 `$HOME/code/reward.py` +- 在 command 中设置 `custom_reward_function.path=$HOME/code/reward.py` +- 确认训练日志出现 `using customized reward function ...` + +--- + +## 6. 待确认问题(需要你拍板/补充) + +1) Advanced command 的“强约束”是否需要更严格? + - 目前建议要求包含 `python3 -m verl.trainer.`,否则拒绝。 + - 你是否允许用户跑非 verl 的命令(例如自定义评估脚本)? + +2) `$HOME/common/datasets` 与 `$HOME/common/hf` 两个映射目录在平台侧是否需要“强制只读”语义? + - 例如:TaskSpec 校验允许读取但禁止写入(目前设计是 best-effort 字符串级校验)。 + +--- + +## 7. 基于现有源码的改动点分析(实现清单) + +本节按当前 v3.0 已上线的源码结构(`src/mvp/py/argus/...`)逐文件列出 v3.5 需要的具体改动点,并评估对现有能力的影响面。 + +### 7.1 TaskSpec/模型层(解析与兼容) + +**现状** +- Basic TaskSpec 由 `argus.ray.models.JobSpec.from_dict()` 解析:`src/mvp/py/argus/ray/models.py` +- API `/api/v2/tasks` 直接 `JobSpec.from_dict(obj)`,并基于字段做路径校验:`src/mvp/py/argus/service/app.py` +- Scheduler 同样假定 jobspec_yaml 能解析为 `JobSpec`:`src/mvp/py/argus/service/scheduler.py` + +**v3.5 需要新增** +1) 新增 `AdvancedTaskSpec` 数据结构(建议放在 `src/mvp/py/argus/ray/models.py`): + - 必填:`kind: advanced`、`workload`(建议仍要求 ppo/grpo/sft,用于 task_id 命名与 UI 分类)、`nnodes`、`n_gpus_per_node`、`command` + - 可选:`submission_id`(由服务层 override) +2) 新增 “union 解析”: + - 新增 `parse_taskspec(obj: dict) -> Basic(JobSpec) | Advanced(AdvancedTaskSpec)` + - 兼容策略:如果没有 `kind` 字段,则 **默认按 v3.0 Basic JobSpec 解析**(保证老客户端无感)。 + +### 7.2 Builder 层(把 TaskSpec 转为可执行 argv) + +**现状** +- `src/mvp/py/argus/ray/builders.py:build_training_argv(spec: JobSpec, ...)` 只支持模板化 PPO/GRPO/SFT。 + +**v3.5 需要新增** +1) 新增 `build_advanced_argv(command: str) -> list[str]` + - 推荐实现:返回 `["bash", "-lc", ""]` + - 原因:用户 command 允许 `ENV=... python3 ... \` 多行以及 shell 语法,`bash -lc` 兼容性最好。 +2) Driver entrypoint 复用: + - 仍通过 `argus.ray.driver_entrypoint` 执行(统一 job_dir、日志与退出码)。 + +### 7.3 RayJobTool 层(runtime_env 与提交) + +**现状** +- `src/mvp/py/argus/ray/ray_job_tool.py:RayJobTool.submit(spec: JobSpec, ...)`: + - runtime_env 的 `PYTHONPATH` 由 `spec.code_path` 决定 + - entrypoint 固定为 driver_entrypoint + builder 生成 argv + +**v3.5 需要新增** +1) 扩展 submit 支持 AdvancedTaskSpec: + - 方案 A(最小侵入):新增 `submit_advanced(...)` 方法,参数为 `command` + `job_dir` + `submission_id` + `nnodes/n_gpus...` + - 方案 B(统一接口):新增内部抽象 `SubmitPlan`(包含 `runtime_env` + `entrypoint` + `artifacts`),Basic/Advanced 都生成 plan,再走同一 submit 逻辑。 +2) runtime_env 的 code path: + - 因 v3.5 本轮不做“自定义 verl code_path”,建议仍固定使用公共快照(例如 `/private/common/code/verl/verl_repo`)。 + - 为减少散落常量,建议在 config 增加 `ray.verl_code_path`(或 `service.verl_code_path`),RayJobTool 统一读取。 +3) runtime_env 的用户代码目录(可选增强): + - VERL 的自定义 reward 函数是通过 `custom_reward_function.path` 以“文件路径”动态 import 的,理论上不依赖 `PYTHONPATH`。 + - 但用户的 `reward.py` 可能会 `import` 自己目录下的其他模块;为了提升易用性,可将 + `/private/users//code` 追加到 job 的 `PYTHONPATH`。 + - 这需要 RayJobTool.submit/submit_advanced 能感知 `user_id`(由 Scheduler 传入),属于小改动但要注意兼容性。 + +### 7.4 API Server(提交校验、宏替换、spec 展示) + +**现状** +- `POST /api/v2/tasks`:只支持 Basic JobSpec 且强校验 `code_path/train_file/val_file/model_id` 前缀:`src/mvp/py/argus/service/app.py` +- `/api/v2/tasks/{task_id}/spec`:返回 resolved 的 Basic JobSpec(补默认值/补 submission_id):`src/mvp/py/argus/service/app.py` + +**v3.5 需要新增/修改** +1) `POST /api/v2/tasks` 分流: + - `kind != advanced`:走原 Basic 流程(兼容 v3.0) + - `kind == advanced`:走 Advanced 解析 + 校验 +2) Advanced command 宏替换与映射(核心): + - 实现 `expand_command(user_id, command)`: + - 先把 `$HOME/common/datasets` → `/private/datasets` + - 再把 `$HOME/common/hf` → `/private/hf` + - 再把其余 `$HOME` → `/private/users/` + - 校验使用 “展开后的 command” +3) reward 注入检查(仅方式 A): + - 若发现 `custom_reward_function.path=...`: + - 校验展开后的 path 前缀必须是 `/private/users//code/` +4) `/api/v2/tasks/{task_id}/spec`: + - 需要支持返回 AdvancedTaskSpec 的 resolved 版本: + - 展示时可选择“原始 command”(含 `$HOME`)或“展开后的 command”(建议都展示:raw + expanded) + +### 7.5 Scheduler(队列与提交) + +**现状** +- `src/mvp/py/argus/service/scheduler.py` 假定 jobspec_yaml 一定是 Basic JobSpec,并调用 `tool.submit(spec2, ...)`。 + +**v3.5 需要新增** +1) Scheduler 的 `_parse_jobspec` 替换为 `parse_taskspec`(支持 Basic/Advanced)。 +2) `_submit_one` 根据 spec 类型调用: + - Basic:保持现状 `tool.submit(JobSpec, ...)` + - Advanced:调用 `tool.submit_advanced(...)`(或统一 SubmitPlan) + +### 7.6 WebUI(最小改动) + +**现状** +- `src/mvp/py/argus/service/ui.py` 的 New Task 页面只提供 Basic YAML 模板。 + +**v3.5 需要新增** +- 增加 “Advanced Task” 模板按钮: + - `kind: advanced` + - `workload: ppo|grpo|sft`(用于 UI 分类与 task_id) + - `nnodes/n_gpus_per_node` + - `command: | ...`(带中文注释) +- Data 页面文案更新: + - 明确共享目录在 `$HOME/common/datasets`、`$HOME/common/hf`(并解释会映射到 `/private/datasets`、`/private/hf`) + +--- + +## 8. 对现有功能的兼容性影响评估 + +### 8.1 API/TaskSpec 兼容 +- 兼容策略:**没有 `kind` 字段的 YAML 一律按 v3.0 Basic JobSpec 解析**。 + - 现有脚本/客户端(提交 ppo/grpo/sft 的 YAML)无需修改。 +- AdvancedTaskSpec 是新增能力,不影响既有任务状态机/DB。 + +### 8.2 路径策略变更的影响 +风险点:v3.0 的 Basic 任务/模板大量使用 `/private/common/datasets/...`。 + +建议: +- v3.5 实现阶段先保持 “双栈兼容”: + - Basic 继续接受 `/private/common/datasets/...`(旧) + - 同时接受 `/private/datasets/...`(新/真实路径) +- Advanced command 允许用户写 `$HOME/common/datasets/...`,服务层展开为 `/private/datasets/...`(避免虚拟目录不可见问题)。 + +### 8.3 任务执行/调度兼容 +- Scheduler 队列/并发控制(`max_running_tasks`)保持不变。 +- 资源预检查仍只依赖 `nnodes/n_gpus_per_node`,AdvancedTaskSpec 不改变资源模型。 + +### 8.4 安全边界变化 +- Advanced command 引入后,平台从“结构化参数”变成“执行用户命令”,安全边界变宽。 +- 缓解措施(best-effort): + - 强约束要求命令包含 `python3 -m verl.trainer.` + - 基础路径隔离校验(禁止跨用户路径) + - reward 文件路径限制在 `$HOME/code` + +### 8.5 数据库兼容 +- DB schema 不强制变更:仍复用 `tasks.jobspec_yaml` 存储原始 YAML。 +- 若后续需要更强查询/过滤,再考虑增加 `tasks.kind` 字段(可选增量迁移)。 diff --git a/specs/mvp/v3.5/v3.5_dev_plan.md b/specs/mvp/v3.5/v3.5_dev_plan.md new file mode 100644 index 0000000..b05682c --- /dev/null +++ b/specs/mvp/v3.5/v3.5_dev_plan.md @@ -0,0 +1,200 @@ +# MVP v3.5(精简版)开发计划(TDD) + +> 目标:在 v3.0 已有能力基础上,仅新增两项能力: +> 1) **Advanced TaskSpec(自定义 command)** +> 2) **Custom Reward(方式 A:用户自己在 command 里写 `custom_reward_function.*`)** +> +> 设计依据:`specs/mvp/v3.5/v3.5_design.md`(本计划不再扩展 scope)。 + +--- + +## 0. 范围与约束 + +### 0.1 In scope +- 新增 `kind: advanced` 的 TaskSpec:用户提供 `command`,平台做 `$HOME` 宏替换与 best-effort 校验,再提交 Ray Job。 +- Custom Reward:平台仅做 **reward path 校验**(方式 A),不新增结构化字段。 +- `$HOME/common/*` 路径语义支持(关键):用户在 SFTPGo/WebClient 看到的路径能被训练进程正确读取。 + +### 0.2 Out of scope(本轮不做) +- 自定义 verl 版本/代码路径(多版本共存) +- 断点续训(resume from checkpoint) +- IB/RoCEv2/NCCL 专项支持 +- Model Serving +- Node management 改造(v3.0 的 stateless head/worker/watchdog/supervisor 机制保持不变) + +### 0.3 关键路径映射(必须保持一致) +> 说明:SFTPGo 的 `$HOME/common/...` 是 **virtual folder**,训练进程看不到该虚拟路径。 + +提交 Advanced command 前必须展开/映射: +- `$HOME/common/datasets` → `/private/datasets`(只读语义) +- `$HOME/common/hf` → `/private/hf`(只读语义) +- 其余 `$HOME` → `/private/users/` + +并且为兼容历史用法(v3.0): +- Basic TaskSpec 仍接受 `/private/common/datasets/...`、`/private/common/hf/...`(不强制迁移)。 + +--- + +## 1. 测试策略(TDD) + +### 1.1 单元测试优先级 +1) **解析与兼容**:`kind: advanced` 能解析;无 `kind` 仍按 Basic 解析,旧用法不破坏。 +2) **宏替换正确性**:`$HOME` / `$HOME/common/*` 映射严格按约定展开。 +3) **best-effort 校验**:拒绝明显危险/跨用户路径;对 reward path 做 allowlist。 +4) **提交链路**:Scheduler 能识别 Advanced spec 并调用对应的提交方法,确保 submission_id/目录规范不变。 +5) **WebUI/API**:New Task 模板与 `/spec` 展示完整 resolved spec(包含展开后的 command)。 + +### 1.2 本地运行方式 +- 复用已有 `.venv`,执行:`.venv/bin/python -m pytest` +- 若环境没有 pip,使用 uv 的方式参考 v3.0 约定(不在本计划重复)。 + +--- + +## 2. 里程碑划分(每个里程碑可独立验证) + +> 约定:每个里程碑先写测试(失败),再实现代码使测试通过;里程碑结束跑一遍 `pytest`。 + +### M1 — TaskSpec 模型与解析(兼容优先) +**目标** +- 引入 AdvancedTaskSpec 数据结构与 union parser,同时保证 v3.0 Basic 行为不变。 + +**新增/修改(建议位置)** +- `src/mvp/py/argus/ray/models.py` + - 新增 `AdvancedTaskSpec` + - 新增 `parse_taskspec(obj: dict) -> JobSpec | AdvancedTaskSpec` + - 兼容策略:缺省 `kind` → 走 `JobSpec.from_dict` + +**测试(先写)** +- `src/mvp/py/tests/test_models.py` + - `test_parse_taskspec_basic_no_kind_compat()` + - `test_parse_taskspec_advanced_smoke()` + - `test_parse_taskspec_advanced_requires_command_nnodes_gpus()` + +**验收** +- `pytest -q` 通过;旧测试不修改或仅做最小必要更新。 + +--- + +### M2 — Advanced command 展开与校验(核心能力) +**目标** +- 实现 command 展开(含 `$HOME/common/*` 映射)与 best-effort 强约束校验。 + +**实现点(建议新增模块)** +- `src/mvp/py/argus/service/command_expand.py`(或放在 `argus/service/validation.py`) + - `expand_advanced_command(user_id: str, command: str) -> str` + - `validate_advanced_command(user_id: str, expanded_command: str) -> None`(失败抛 `ValueError`) + +**强约束(与设计文档一致)** +- 必须包含 `python3` 且包含 `-m verl.trainer.`(否则 400) +- 禁止出现 `/private/users//...`(跨用户路径) +- 若检测到 `data.train_files=`/`data.val_files=`: + - 只允许 `/private/users//datasets/...` 或 `/private/datasets/...` + - (兼容)允许 `/private/common/datasets/...`(旧路径) +- 若检测到 `custom_reward_function.path=`: + - 只允许 `/private/users//code/...`(展开后校验) + +**测试(先写)** +- 新增:`src/mvp/py/tests/test_advanced_command.py` + - `test_expand_maps_home_common_datasets_to_private_datasets()` + - `test_expand_maps_home_common_hf_to_private_hf()` + - `test_expand_maps_home_to_private_users()` + - `test_validate_rejects_cross_user_paths()` + - `test_validate_requires_verl_trainer_entry()` + - `test_validate_allows_reward_path_under_user_code()` + - `test_validate_rejects_reward_path_outside_user_code()` + +**验收** +- 单测覆盖映射/校验的正反例;错误信息可读(用于 API 400 detail)。 + +--- + +### M3 — Ray 提交链路支持 Advanced(Builder/Tool/Scheduler) +**目标** +- Advanced spec 能进入 scheduler 队列并提交为 Ray job(driver 仍落 worker)。 + +**代码改动点(建议)** +- `src/mvp/py/argus/ray/builders.py` + - 新增 `build_advanced_argv(command: str)`:返回 `["bash","-lc", expanded_command]` +- `src/mvp/py/argus/ray/ray_job_tool.py` + - 新增 `submit_advanced(...)`(或统一成内部 submit plan) + - runtime_env:继续注入公共 verl code path(本轮不支持用户自定义 verl 代码) + - 可选:把 `/private/users//code` 加入 `PYTHONPATH`,提升 reward 代码 `import` 体验 +- `src/mvp/py/argus/service/scheduler.py` + - 使用 `parse_taskspec` 分流 Basic/Advanced + - Advanced 调用 `tool.submit_advanced(...)` + +**测试(先写)** +- `src/mvp/py/tests/test_builders.py` + - `test_build_advanced_argv_uses_bash_lc()` +- `src/mvp/py/tests/test_scheduler.py` + - 新增一个 `kind: advanced` 的任务,断言 scheduler 调用了 `submit_advanced` + - 断言 job_dir/submission_id 规则不变(仍按 `/private/users//jobs/`) +- `src/mvp/py/tests/test_ray_job_tool.py` + - 断言 advanced 提交时 entrypoint 是 driver_entrypoint + `bash -lc ...` + +**验收** +- 单测跑通;Scheduler tick 能完成 Advanced 任务从 QUEUED → SUBMITTED(mock Ray)。 + +--- + +### M4 — API & WebUI(最小功能闭环) +**目标** +- WebUI/HTTP API 能提交 Advanced Task,并在详情页看到 resolved spec(含完整 command)。 + +**API 改动点** +- `src/mvp/py/argus/service/app.py` + - `POST /api/v2/tasks`:支持 `kind: advanced` + - 保存 raw YAML(保持与 Basic 一致) + - 对 Advanced:展开 command + 校验(失败返回 400) + - `GET /api/v2/tasks/{task_id}/spec`: + - 返回 resolved spec(建议同时返回 raw + expanded,或 YAML 中直接给 expanded) + +**WebUI 改动点** +- `src/mvp/py/argus/service/ui.py` + - New Task 页面新增 Advanced 模板(含中文注释) + - 文案强调共享目录:`$HOME/common/datasets`、`$HOME/common/hf` + +**测试(先写)** +- `src/mvp/py/tests/test_app.py` + - `test_create_task_advanced_ok()`(最小 valid command) + - `test_create_task_advanced_rejects_invalid_command()` + - `test_task_spec_endpoint_includes_expanded_command()` +- `src/mvp/py/tests/test_ui.py` + - 断言页面包含 Advanced 示例块 + +**验收** +- `pytest` 通过;浏览器可提交 Advanced YAML 并看到 expanded command。 + +--- + +### M5 — 端到端验证(远端 argus@h1) +**目标** +- 在真实 Ray cluster + VERL 环境下验证 Advanced 与 Custom Reward(方式 A)。 + +**步骤(手工验收脚本化可选)** +1) 启动 v3.0/v3.5 统一的 compose + API(沿用现有 `run_all` 脚本体系) +2) 用户(如 `alice`)通过 SFTP 上传 reward 代码到: + - `$HOME/code/reward.py`(真实路径 `/private/users/alice/code/reward.py`) +3) 通过 WebUI 或 curl 提交 Advanced task: + - `command` 中包含: + - `custom_reward_function.path=$HOME/code/reward.py` + - `custom_reward_function.name=compute_score` + - `data.train_files=$HOME/common/datasets/gsm8k/train.parquet` + - `data.val_files=$HOME/common/datasets/gsm8k/test.parquet` +4) 检查: + - 任务状态从 QUEUED → RUNNING → SUCCEEDED/FAILED(有日志) + - driver 不在 head 上跑(dashboard 验证) + - 日志出现 “custom reward” 生效的提示(按 VERL 实际日志关键字确认) +5) 回归:提交 Basic ppo/grpo/sft 任务仍可运行(确保兼容性) + +**验收** +- Advanced task 能跑至少若干 step,且 reward 注入生效。 +- Basic 任务兼容不回退。 + +--- + +## 3. 风险点与边界(明确写进 PR/变更说明) +- Advanced command 只做 best-effort 校验,不做完整 shell AST 解析;复杂命令可能存在漏检/误判(后续可扩展)。 +- `$HOME/common/*` 是“用户侧语义”,服务层必须映射到真实路径,否则训练必然 FileNotFound。 +- 校验策略(强约束)如果后续要允许非 VERL 命令,需要调整规则并补测试(本轮默认拒绝)。 + diff --git a/src/mvp/py/argus/ray/builders.py b/src/mvp/py/argus/ray/builders.py index 6bb7dfe..9f03dd9 100644 --- a/src/mvp/py/argus/ray/builders.py +++ b/src/mvp/py/argus/ray/builders.py @@ -97,3 +97,12 @@ def build_training_argv(spec: JobSpec, submission_id: str, job_dir: str) -> Buil return BuiltCommand(argv=argv) raise ValueError(f"unsupported workload: {spec.workload}") + + +def build_advanced_argv(command: str) -> BuiltCommand: + """ + Execute an arbitrary user-provided command under bash for maximum shell compatibility + (multiline, env vars, line continuations, etc.). + """ + + return BuiltCommand(argv=["bash", "-lc", command]) diff --git a/src/mvp/py/argus/ray/driver_entrypoint.py b/src/mvp/py/argus/ray/driver_entrypoint.py index 9f99c58..5088405 100644 --- a/src/mvp/py/argus/ray/driver_entrypoint.py +++ b/src/mvp/py/argus/ray/driver_entrypoint.py @@ -35,6 +35,8 @@ def main() -> int: job_dir = Path(args.job_dir) job_dir.mkdir(parents=True, exist_ok=True) + os.environ.setdefault("MVP_JOB_DIR", str(job_dir)) + os.chdir(job_dir) _preflight() diff --git a/src/mvp/py/argus/ray/models.py b/src/mvp/py/argus/ray/models.py index 4a4e74e..cd08d24 100644 --- a/src/mvp/py/argus/ray/models.py +++ b/src/mvp/py/argus/ray/models.py @@ -18,6 +18,7 @@ class RayConfig: entrypoint_resources: dict[str, float] runtime_env_env_vars: dict[str, str] user_code_path: str + verl_code_path: str @staticmethod def from_dict(d: dict[str, Any]) -> "RayConfig": @@ -42,6 +43,7 @@ class RayConfig: entrypoint_resources={str(k): float(v) for k, v in entrypoint_resources.items()}, runtime_env_env_vars={str(k): str(v) for k, v in env_vars.items()}, user_code_path=str(d.get("user_code_path", f"{_require(d, 'shared_root')}/user/code")), + verl_code_path=str(d.get("verl_code_path", f"{_require(d, 'shared_root')}/common/code/verl/verl_repo")), ) def to_public_dict(self) -> dict[str, Any]: @@ -52,6 +54,7 @@ class RayConfig: "entrypoint_resources": self.entrypoint_resources, "runtime_env": {"env_vars": self.runtime_env_env_vars}, "user_code_path": self.user_code_path, + "verl_code_path": self.verl_code_path, } @@ -126,3 +129,55 @@ class JobSpec: if self.workload == "sft": out["trainer_device"] = self.trainer_device or "cpu" return out + + +@dataclass(frozen=True) +class AdvancedTaskSpec: + kind: str # advanced + workload: str # always "advanced" for classification/ID prefix + submission_id: str | None + nnodes: int + n_gpus_per_node: int + command: str + + @staticmethod + def from_dict(d: dict[str, Any]) -> "AdvancedTaskSpec": + kind = str(_require(d, "kind")) + if kind != "advanced": + raise ValueError(f"unsupported taskspec kind: {kind}") + + command = str(_require(d, "command")) + + return AdvancedTaskSpec( + kind=kind, + workload="advanced", + submission_id=(str(d["submission_id"]) if d.get("submission_id") else None), + nnodes=int(_require(d, "nnodes")), + n_gpus_per_node=int(_require(d, "n_gpus_per_node")), + command=command, + ) + + def to_public_dict(self) -> dict[str, Any]: + return { + "kind": self.kind, + "workload": self.workload, + "submission_id": self.submission_id or "", + "nnodes": self.nnodes, + "n_gpus_per_node": self.n_gpus_per_node, + "command": self.command, + } + + +def parse_taskspec(d: dict[str, Any]) -> JobSpec | AdvancedTaskSpec: + """ + v3.x compatibility rule: + - If no "kind": treat as legacy/basic JobSpec. + - If kind == "advanced": parse AdvancedTaskSpec. + """ + + kind = d.get("kind") + if kind in (None, ""): + return JobSpec.from_dict(d) + if kind == "advanced": + return AdvancedTaskSpec.from_dict(d) + raise ValueError(f"unsupported taskspec kind: {kind}") diff --git a/src/mvp/py/argus/ray/ray_job_tool.py b/src/mvp/py/argus/ray/ray_job_tool.py index bff2f3e..a0077a2 100644 --- a/src/mvp/py/argus/ray/ray_job_tool.py +++ b/src/mvp/py/argus/ray/ray_job_tool.py @@ -10,8 +10,8 @@ from typing import Any import ray from ray.job_submission import JobSubmissionClient -from .builders import build_training_argv -from .models import JobSpec, RayConfig +from .builders import build_advanced_argv, build_training_argv +from .models import AdvancedTaskSpec, JobSpec, RayConfig from .yaml_io import dump_yaml @@ -45,6 +45,9 @@ class RayJobTool: return f"{self.cfg.shared_root}/jobs/{submission_id}" def _runtime_env(self, spec: JobSpec) -> dict[str, Any]: + return self._runtime_env_for(code_path=spec.code_path, workload=spec.workload) + + def _runtime_env_for(self, *, code_path: str, workload: str, extra_pythonpath: list[str] | None = None) -> dict[str, Any]: env_vars = dict(self.cfg.runtime_env_env_vars) # Default HF cache @@ -57,18 +60,18 @@ class RayJobTool: # Place it before verl code to avoid interfering with verl import priority. tool_code_path = os.environ.get("MVP_TOOL_CODE_PATH", "/workspace/mvp/py") - user_code_path = self.cfg.user_code_path - code_path = spec.code_path - existing = env_vars.get("PYTHONPATH", "") - prefix = f"{tool_code_path}:{code_path}:{user_code_path}" + base_parts = [tool_code_path, code_path, self.cfg.user_code_path] + if extra_pythonpath: + base_parts.extend(extra_pythonpath) + prefix = ":".join(base_parts) env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix # For debugging / log visibility env_vars["MVP_CODE_PATH"] = code_path # SFT: ensure ray.init() connects to the cluster - if spec.workload == "sft": + if workload == "sft": env_vars.setdefault("RAY_ADDRESS", "auto") return {"env_vars": env_vars} @@ -146,6 +149,91 @@ class RayJobTool: return submitted + def submit_advanced( + self, + spec: AdvancedTaskSpec, + no_wait: bool, + job_dir: str | None = None, + user_id: str | None = None, + ) -> str: + submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}" + job_dir = job_dir or self._job_dir(submission_id) + + built = build_advanced_argv(spec.command) + entrypoint_argv = [ + "python3", + "-m", + "argus.ray.driver_entrypoint", + "--job-dir", + job_dir, + *built.argv, + ] + entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv) + + extra_pythonpath: list[str] = [] + if user_id: + extra_pythonpath.append(f"{self.cfg.shared_root}/users/{user_id}/code") + + runtime_env = self._runtime_env_for(code_path=self.cfg.verl_code_path, workload=spec.workload, extra_pythonpath=extra_pythonpath) + + # Prepare job artifacts directory + job_root = Path(job_dir) + _mkdir(job_root / "config") + _mkdir(job_root / "logs") + _mkdir(job_root / "debug") + _mkdir(job_root / "checkpoints") + + _write_text(job_root / "config" / "ray_config.yaml", dump_yaml(self.cfg.to_public_dict())) + _write_text(job_root / "config" / "taskspec.yaml", dump_yaml(spec.to_public_dict())) + _write_json( + job_root / "config" / "submit_payload.json", + { + "submission_id": submission_id, + "address": self.cfg.address, + "entrypoint": entrypoint, + "entrypoint_num_cpus": self.cfg.entrypoint_num_cpus, + "entrypoint_resources": self.cfg.entrypoint_resources, + "runtime_env": runtime_env, + }, + ) + + # Pre-submit debug snapshot (ray cluster resources via ray.init) + try: + ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False) + _write_json(job_root / "debug" / "ray_cluster_resources_pre.json", ray.cluster_resources()) + _write_json(job_root / "debug" / "ray_available_resources_pre.json", ray.available_resources()) + except Exception as e: + _write_text(job_root / "debug" / "ray_resources_pre.error.txt", repr(e) + "\n") + + try: + submitted = self.client.submit_job( + entrypoint=entrypoint, + submission_id=submission_id, + runtime_env=runtime_env, + entrypoint_num_cpus=self.cfg.entrypoint_num_cpus, + entrypoint_resources=self.cfg.entrypoint_resources, + ) + except Exception as e: + _write_text(job_root / "logs" / "submit.error.txt", repr(e) + "\n") + raise + + _write_text(job_root / "config" / "ray_submission_id.txt", submitted + "\n") + + # Post-submit debug snapshot via SDK + try: + jobs = self.client.list_jobs() + _write_text( + job_root / "debug" / "ray_job_list_post.json", + json.dumps([_job_details_to_dict(j) for j in jobs], indent=2) + "\n", + ) + except Exception as e: + _write_text(job_root / "debug" / "ray_job_list_post.error.txt", repr(e) + "\n") + + if not no_wait: + pass + + return submitted + def status(self, submission_id: str) -> str: return str(self.client.get_job_status(submission_id)) diff --git a/src/mvp/py/argus/service/advanced_command.py b/src/mvp/py/argus/service/advanced_command.py new file mode 100644 index 0000000..4ee699a --- /dev/null +++ b/src/mvp/py/argus/service/advanced_command.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import re + + +_RE_PRIVATE_USER = re.compile(r"/private/users/([^/\s\"']+)") +_RE_TRAIN_FILES = re.compile(r"data\.train_files=([^\s\\]+)") +_RE_VAL_FILES = re.compile(r"data\.val_files=([^\s\\]+)") +_RE_REWARD_PATH = re.compile(r"custom_reward_function\.path=([^\s\\]+)") + + +def _strip_quotes(s: str) -> str: + s = s.strip() + if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'): + return s[1:-1] + return s + + +def expand_advanced_command(*, user_id: str, command: str) -> str: + """ + Expand user-facing macros into real container paths. + + Important: `$HOME/common/...` is a SFTPGo virtual folder; the training process + cannot see it. We map those paths to real shared mounts. + """ + + home = f"/private/users/{user_id}" + expanded = command + expanded = expanded.replace("$HOME/common/datasets", "/private/datasets") + expanded = expanded.replace("$HOME/common/hf", "/private/hf") + expanded = expanded.replace("$HOME", home) + return expanded + + +def validate_advanced_command(*, user_id: str, expanded_command: str) -> None: + """ + Best-effort validation for advanced commands. + + Strong constraints (raise ValueError on failure): + - Must include a python3 VERL trainer entrypoint + - Must not reference other users' /private/users//... paths + - If train/val files are present, must be under allowed roots + - If reward path is present, must be under /private/users//code/... + """ + + if "python3" not in expanded_command or "-m verl.trainer." not in expanded_command: + raise ValueError("command must include 'python3' and '-m verl.trainer.'") + + for other in _RE_PRIVATE_USER.findall(expanded_command): + if other != user_id: + raise ValueError(f"command references another user's path: /private/users/{other}/...") + + allowed_dataset_prefixes = ( + f"/private/users/{user_id}/datasets/", + "/private/datasets/", + # Backward compatible with v3.0 paths used by existing templates/users. + "/private/common/datasets/", + ) + + for m in _RE_TRAIN_FILES.finditer(expanded_command): + path = _strip_quotes(m.group(1)) + if not path.startswith(allowed_dataset_prefixes): + raise ValueError(f"train_file must be under allowed roots, got: {path}") + + for m in _RE_VAL_FILES.finditer(expanded_command): + path = _strip_quotes(m.group(1)) + if not path.startswith(allowed_dataset_prefixes): + raise ValueError(f"val_file must be under allowed roots, got: {path}") + + for m in _RE_REWARD_PATH.finditer(expanded_command): + path = _strip_quotes(m.group(1)) + allow = f"/private/users/{user_id}/code/" + if not path.startswith(allow): + raise ValueError(f"reward path must be under {allow}, got: {path}") + diff --git a/src/mvp/py/argus/service/app.py b/src/mvp/py/argus/service/app.py index 6d9c0f9..7bcb2f0 100644 --- a/src/mvp/py/argus/service/app.py +++ b/src/mvp/py/argus/service/app.py @@ -9,8 +9,9 @@ import yaml from fastapi import FastAPI, HTTPException, Request, Response from argus.core.ids import new_task_id -from argus.ray.models import JobSpec, RayConfig +from argus.ray.models import AdvancedTaskSpec, JobSpec, RayConfig, parse_taskspec +from .advanced_command import expand_advanced_command, validate_advanced_command from .config import V2Config from .db import Db from .janitor import JobsJanitor @@ -316,23 +317,52 @@ def create_app(config_path: str) -> FastAPI: async def submit_task(req: Request) -> dict[str, Any]: subject = _auth(req) body = (await req.body()).decode("utf-8") - obj = yaml.safe_load(body) or {} + try: + obj = yaml.safe_load(body) or {} + except Exception as e: + # yaml.YAMLError and similar should be a user-facing 400, not a 500. + raise HTTPException(status_code=400, detail=f"invalid YAML: {e!r}") if not isinstance(obj, dict): raise HTTPException(status_code=400, detail="jobspec must be a YAML mapping") try: - spec = JobSpec.from_dict(obj) + spec = parse_taskspec(obj) except Exception as e: raise HTTPException(status_code=400, detail=f"invalid jobspec: {e!r}") - # v3.0 path policy: + root = ray_cfg.shared_root.rstrip("/") + user_id = str(subject["user_id"]).strip() + task_id = new_task_id(str(spec.workload), user_id=user_id) + + # v3.5: Advanced TaskSpec (custom command) + if isinstance(spec, AdvancedTaskSpec): + expanded = expand_advanced_command(user_id=user_id, command=spec.command) + try: + validate_advanced_command(user_id=user_id, expanded_command=expanded) + except Exception as e: + raise HTTPException(status_code=400, detail=f"invalid command: {e!r}") + + stored = spec.to_public_dict() + stored["command"] = expanded + stored_yaml = yaml.safe_dump(stored, sort_keys=False) + + db.create_task_v25( + task_id=task_id, + user_id=user_id, + workload=spec.workload, + jobspec_yaml=stored_yaml, + nnodes=spec.nnodes, + n_gpus_per_node=spec.n_gpus_per_node, + ) + return {"task_id": task_id, "state": "QUEUED"} + + # v3.0 path policy (Basic JobSpec): # - code_path: only allow /private/common/... # - train/val: allow /private/common/datasets/... OR /private/users//datasets/... # - model_id: if it looks like a local path (/private/...), allow only models dirs: # /private/common/models/... OR /private/users//models/... - root = ray_cfg.shared_root.rstrip("/") common_prefix = f"{root}/common/" - user_prefix = f"{root}/users/{str(subject['user_id']).strip()}/" + user_prefix = f"{root}/users/{user_id}/" common_datasets_prefix = f"{common_prefix}datasets/" user_datasets_prefix = f"{user_prefix}datasets/" @@ -360,15 +390,17 @@ def create_app(config_path: str) -> FastAPI: detail=f"model_id local path must start with {common_models_prefix} or {user_models_prefix}", ) - task_id = new_task_id(spec.workload, user_id=str(subject["user_id"])) - db.create_task_v25( - task_id=task_id, - user_id=str(subject["user_id"]), - workload=spec.workload, - jobspec_yaml=body, - nnodes=spec.nnodes, - n_gpus_per_node=spec.n_gpus_per_node, - ) + try: + db.create_task_v25( + task_id=task_id, + user_id=user_id, + workload=spec.workload, + jobspec_yaml=body, + nnodes=spec.nnodes, + n_gpus_per_node=spec.n_gpus_per_node, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"db create task failed: {e!r}") return {"task_id": task_id, "state": "QUEUED"} @app.get("/api/v2/tasks/{task_id}") @@ -428,7 +460,7 @@ def create_app(config_path: str) -> FastAPI: obj = yaml.safe_load(raw) or {} if not isinstance(obj, dict): raise ValueError("jobspec must be a YAML mapping") - spec = JobSpec.from_dict(obj) + spec = parse_taskspec(obj) resolved = spec.to_public_dict() attempts = db.list_attempts(task_id) if attempts: diff --git a/src/mvp/py/argus/service/scheduler.py b/src/mvp/py/argus/service/scheduler.py index 0e57253..c9b975e 100644 --- a/src/mvp/py/argus/service/scheduler.py +++ b/src/mvp/py/argus/service/scheduler.py @@ -9,7 +9,7 @@ from typing import Any import yaml from argus.core.ids import attempt_submission_id -from argus.ray.models import JobSpec, RayConfig +from argus.ray.models import AdvancedTaskSpec, JobSpec, RayConfig, parse_taskspec from argus.ray.ray_job_tool import RayJobTool from .config import V2Config @@ -48,11 +48,11 @@ class Scheduler: required = float(nnodes * n_gpus_per_node) return avail.total_available_gpus >= required - def _parse_jobspec(self, jobspec_yaml: str) -> JobSpec: + def _parse_taskspec(self, jobspec_yaml: str) -> JobSpec | AdvancedTaskSpec: obj = yaml.safe_load(jobspec_yaml) or {} if not isinstance(obj, dict): raise ValueError("jobspec must be a YAML mapping") - return JobSpec.from_dict(obj) + return parse_taskspec(obj) def _submit_one(self, task_row: dict[str, Any]) -> None: task_id = str(task_row["task_id"]) @@ -60,7 +60,7 @@ class Scheduler: user_id = task_row.get("user_id") user_id_s = str(user_id) if user_id not in (None, "") else None - spec = self._parse_jobspec(jobspec_yaml) + spec = self._parse_taskspec(jobspec_yaml) attempt_no = int(task_row.get("latest_attempt_no", 0)) + 1 ray_sid = attempt_submission_id(task_id, attempt_no) job_dir = self._job_dir_for_task(user_id=user_id_s, ray_submission_id=ray_sid) @@ -69,13 +69,20 @@ class Scheduler: self.db.create_attempt(task_id=task_id, attempt_no=attempt_no, ray_submission_id=ray_sid) self.db.set_task_state(task_id=task_id, state="SUBMITTING", latest_attempt_no=attempt_no) - # Override submission_id in jobspec (v1.1 compatible) - d = spec.to_public_dict() - d["submission_id"] = ray_sid - spec2 = JobSpec.from_dict(d) + # Override submission_id in taskspec (v1.1 compatible) + if isinstance(spec, JobSpec): + d = spec.to_public_dict() + d["submission_id"] = ray_sid + spec2 = JobSpec.from_dict(d) + submit = lambda: self.tool.submit(spec2, no_wait=True, job_dir=job_dir) + else: + d = spec.to_public_dict() + d["submission_id"] = ray_sid + spec2 = AdvancedTaskSpec.from_dict(d) + submit = lambda: self.tool.submit_advanced(spec2, no_wait=True, job_dir=job_dir, user_id=user_id_s) try: - submitted = self.tool.submit(spec2, no_wait=True, job_dir=job_dir) + submitted = submit() # submitted should equal ray_sid; keep as source of truth. self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="SUBMITTED") self.db.set_task_state(task_id=task_id, state="SUBMITTED") diff --git a/src/mvp/py/argus/service/ui.py b/src/mvp/py/argus/service/ui.py index f8172f2..133a3a8 100644 --- a/src/mvp/py/argus/service/ui.py +++ b/src/mvp/py/argus/service/ui.py @@ -329,16 +329,71 @@ val_file: /private/common/datasets/gsm8k_sft/test.parquet # 验证数据(必 # trainer_device: cpu # 仅 SFT 生效:driver 侧 device(可选,默认:cpu) # submission_id: "" # Ray submission_id(可选,默认空;通常由服务自动生成,无需填写) +""".strip() + adv = """# Advanced TaskSpec (YAML) - v3.5 +kind: advanced # 任务类型(必填):advanced(自定义 command) +# 说明:v3.5 中 Advanced 任务不会按 ppo/grpo/sft 分类;平台统一按 "advanced" 做任务分类与 task_id 命名。 + +nnodes: 2 # 训练节点数(必填):用于平台队列调度与资源预检查 +n_gpus_per_node: 4 # 每节点 GPU 数(必填):用于平台队列调度与资源预检查 + +# 自定义训练命令(必填):平台会做 $HOME 宏替换: +# - $HOME -> /private/users/ +# - $HOME/common/datasets -> /private/datasets(共享只读数据) +# - $HOME/common/hf -> /private/hf(共享只读 HF cache) +command: | + # 注意:PPO 需要一些关键参数,否则 VERL 会在启动前 fail-fast(例如 actor 的 micro batch)。 + PYTHONUNBUFFERED=1 \ + python3 -m verl.trainer.main_ppo \ + data.train_files=$HOME/common/datasets/gsm8k/train.parquet \ + data.val_files=$HOME/common/datasets/gsm8k/test.parquet \ + data.train_batch_size=256 \ + data.max_prompt_length=512 \ + data.max_response_length=512 \ + actor_rollout_ref.model.path=Qwen/Qwen2.5-0.5B-Instruct \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.actor.ppo_mini_batch_size=64 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.name=sglang \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \ + critic.optim.lr=1e-5 \ + critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \ + critic.ppo_micro_batch_size_per_gpu=4 \ + algorithm.kl_ctrl.kl_coef=0.001 \ + trainer.logger=console \ + trainer.val_before_train=False \ + trainer.nnodes=2 \ + trainer.n_gpus_per_node=4 \ + trainer.total_epochs=1 \ + trainer.total_training_steps=10 \ + trainer.save_freq=10 \ + trainer.test_freq=-1 \ + trainer.resume_mode=disable \ + trainer.default_local_dir=checkpoints \ + +ray_kwargs.ray_init.address=auto \ + hydra.run.dir=logs/hydra + +# 可选:自定义 reward(方式 A:直接写在 command 里) +# command 里增加如下 overrides: +# custom_reward_function.path=$HOME/code/reward.py +# custom_reward_function.name=compute_score """.strip() body = f"""

New Task

-
Paste TaskSpec YAML and submit to API server. Note: code_path is required (v3.0 does not execute user code; use the common snapshot).
+
+ Paste TaskSpec YAML and submit to API server. + Basic tasks require code_path; Advanced tasks use kind: advanced with a custom command. +
+
@@ -354,6 +409,7 @@ val_file: /private/common/datasets/gsm8k_sft/test.parquet # 验证数据(必 tpl_ppo = json.dumps(ppo) tpl_grpo = json.dumps(grpo) tpl_sft = json.dumps(sft) + tpl_adv = json.dumps(adv) script = ( """ const msg = document.getElementById("msg"); @@ -361,9 +417,11 @@ const yamlEl = document.getElementById("yaml"); const TPL_PPO = __TPL_PPO__; const TPL_GRPO = __TPL_GRPO__; const TPL_SFT = __TPL_SFT__; +const TPL_ADV = __TPL_ADV__; document.getElementById("tpl-ppo").onclick = () => { yamlEl.value = TPL_PPO; msg.textContent = ""; }; document.getElementById("tpl-grpo").onclick = () => { yamlEl.value = TPL_GRPO; msg.textContent = ""; }; document.getElementById("tpl-sft").onclick = () => { yamlEl.value = TPL_SFT; msg.textContent = ""; }; +document.getElementById("tpl-adv").onclick = () => { yamlEl.value = TPL_ADV; msg.textContent = ""; }; document.getElementById("submit").onclick = async () => { msg.textContent = "Submitting..."; const body = yamlEl.value; @@ -378,6 +436,7 @@ document.getElementById("submit").onclick = async () => { .replace("__TPL_PPO__", tpl_ppo) .replace("__TPL_GRPO__", tpl_grpo) .replace("__TPL_SFT__", tpl_sft) + .replace("__TPL_ADV__", tpl_adv) ) return HTMLResponse(content=_page("New Task", "new", body, script)) diff --git a/src/mvp/py/tests/test_advanced_command.py b/src/mvp/py/tests/test_advanced_command.py new file mode 100644 index 0000000..118e848 --- /dev/null +++ b/src/mvp/py/tests/test_advanced_command.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import pytest + +from argus.service.advanced_command import expand_advanced_command, validate_advanced_command + + +def test_expand_maps_home_common_datasets_to_private_datasets(): + cmd = "python3 -m verl.trainer.main_ppo data.train_files=$HOME/common/datasets/gsm8k/train.parquet" + expanded = expand_advanced_command(user_id="alice", command=cmd) + assert "/private/datasets/gsm8k/train.parquet" in expanded + assert "$HOME/common/datasets" not in expanded + + +def test_expand_maps_home_common_hf_to_private_hf(): + cmd = "python3 -m verl.trainer.main_ppo HF_HOME=$HOME/common/hf" + expanded = expand_advanced_command(user_id="alice", command=cmd) + assert "HF_HOME=/private/hf" in expanded + + +def test_expand_maps_home_to_private_users(): + cmd = "python3 -m verl.trainer.main_ppo data.train_files=$HOME/datasets/x.parquet" + expanded = expand_advanced_command(user_id="alice", command=cmd) + assert "data.train_files=/private/users/alice/datasets/x.parquet" in expanded + + +def test_validate_requires_verl_trainer_entry(): + with pytest.raises(ValueError, match="verl\\.trainer"): + validate_advanced_command(user_id="alice", expanded_command="echo hi") + + +def test_validate_rejects_cross_user_paths(): + cmd = "python3 -m verl.trainer.main_ppo x=/private/users/bob/datasets/x" + with pytest.raises(ValueError, match="another user's path"): + validate_advanced_command(user_id="alice", expanded_command=cmd) + + +def test_validate_allows_train_val_under_allowed_roots(): + cmd = ( + "python3 -m verl.trainer.main_ppo " + "data.train_files=/private/datasets/gsm8k/train.parquet " + "data.val_files=/private/users/alice/datasets/gsm8k/test.parquet" + ) + validate_advanced_command(user_id="alice", expanded_command=cmd) + + +def test_validate_rejects_train_val_outside_allowed_roots(): + cmd = ( + "python3 -m verl.trainer.main_ppo " + "data.train_files=/etc/passwd " + "data.val_files=/private/datasets/gsm8k/test.parquet" + ) + with pytest.raises(ValueError, match="train_file must be under allowed roots"): + validate_advanced_command(user_id="alice", expanded_command=cmd) + + +def test_validate_allows_reward_path_under_user_code(): + cmd = ( + "python3 -m verl.trainer.main_ppo " + "custom_reward_function.path=/private/users/alice/code/reward.py " + "custom_reward_function.name=compute_score" + ) + validate_advanced_command(user_id="alice", expanded_command=cmd) + + +def test_validate_rejects_reward_path_outside_user_code(): + cmd = ( + "python3 -m verl.trainer.main_ppo " + "custom_reward_function.path=/private/datasets/reward.py " + "custom_reward_function.name=compute_score" + ) + with pytest.raises(ValueError, match="reward path must be under"): + validate_advanced_command(user_id="alice", expanded_command=cmd) + diff --git a/src/mvp/py/tests/test_app.py b/src/mvp/py/tests/test_app.py index 175a89b..fd97d19 100644 --- a/src/mvp/py/tests/test_app.py +++ b/src/mvp/py/tests/test_app.py @@ -185,6 +185,29 @@ def test_submit_rejects_non_mapping_jobspec(tmp_path: Path, monkeypatch): assert r.status_code == 400 +def test_submit_rejects_invalid_yaml_returns_400(tmp_path: Path, monkeypatch): + from argus.service import app as app_mod + + cfg_path = _write_config(tmp_path) + monkeypatch.setenv("MVP_INTERNAL_TOKEN", "token1") + + class _Scheduler: + def __init__(self, **kwargs): + self.tool = object() + + def run_forever(self, stop_flag): + return None + + monkeypatch.setattr(app_mod, "Scheduler", _Scheduler) + app = app_mod.create_app(str(cfg_path)) + + with TestClient(app) as c: + # YAML parse error: unclosed quote + r = c.post("/api/v2/tasks", headers={"authorization": "Bearer token1"}, data="workload: \"ppo\n") + assert r.status_code == 400 + assert "invalid YAML" in r.text + + def test_submit_rejects_invalid_jobspec(tmp_path: Path, monkeypatch): from argus.service import app as app_mod @@ -206,6 +229,99 @@ def test_submit_rejects_invalid_jobspec(tmp_path: Path, monkeypatch): assert r.status_code == 400 +def test_submit_advanced_ok_and_command_expanded(tmp_path: Path, monkeypatch): + from argus.service import app as app_mod + + cfg_path = _write_config(tmp_path) + monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token") + monkeypatch.setattr(app_mod, "new_task_id", lambda workload, **kwargs: "tid_adv") + + class _Scheduler: + def __init__(self, **kwargs): + self.tool = object() + + def run_forever(self, stop_flag): + return None + + monkeypatch.setattr(app_mod, "Scheduler", _Scheduler) + app = app_mod.create_app(str(cfg_path)) + + # seed user + token + from argus.service.config import V2Config + from argus.service.db import Db + + root = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) + v2_cfg = V2Config.from_root_dict(root) + db = Db(v2_cfg.sqlite.db_path) + db.create_user(user_id="u1", display_name=None) + token = db.issue_token(user_id="u1") + + spec = """ +kind: advanced +workload: ppo +nnodes: 2 +n_gpus_per_node: 4 +command: | + python3 -m verl.trainer.main_ppo \\ + data.train_files=$HOME/common/datasets/gsm8k/train.parquet \\ + data.val_files=$HOME/datasets/gsm8k/test.parquet \\ + +ray_kwargs.ray_init.address=auto +""".lstrip() + + with TestClient(app) as c: + r = c.post("/api/v2/tasks", headers={"authorization": f"Bearer {token}"}, data=spec) + assert r.status_code == 200 + assert r.json()["task_id"] == "tid_adv" + + r2 = c.get("/api/v2/tasks/tid_adv/spec", headers={"authorization": f"Bearer {token}"}) + assert r2.status_code == 200 + assert "kind: advanced" in r2.text + # Expanded mapping for SFTPGo "virtual common" folders: + assert "/private/datasets/gsm8k/train.parquet" in r2.text + assert "/private/users/u1/datasets/gsm8k/test.parquet" in r2.text + + +def test_submit_advanced_rejects_invalid_command(tmp_path: Path, monkeypatch): + from argus.service import app as app_mod + + cfg_path = _write_config(tmp_path) + monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token") + + class _Scheduler: + def __init__(self, **kwargs): + self.tool = object() + + def run_forever(self, stop_flag): + return None + + monkeypatch.setattr(app_mod, "Scheduler", _Scheduler) + app = app_mod.create_app(str(cfg_path)) + + # seed user + token + from argus.service.config import V2Config + from argus.service.db import Db + + root = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) + v2_cfg = V2Config.from_root_dict(root) + db = Db(v2_cfg.sqlite.db_path) + db.create_user(user_id="u1", display_name=None) + token = db.issue_token(user_id="u1") + + bad = """ +kind: advanced +workload: ppo +nnodes: 2 +n_gpus_per_node: 4 +command: | + echo hello +""".lstrip() + + with TestClient(app) as c: + r = c.post("/api/v2/tasks", headers={"authorization": f"Bearer {token}"}, data=bad) + assert r.status_code == 400 + assert "invalid command" in r.text + + def test_me_sftp_reset_password_disabled_returns_400(tmp_path: Path, monkeypatch): from argus.service import app as app_mod diff --git a/src/mvp/py/tests/test_builders.py b/src/mvp/py/tests/test_builders.py index 8704836..74f48d7 100644 --- a/src/mvp/py/tests/test_builders.py +++ b/src/mvp/py/tests/test_builders.py @@ -56,3 +56,10 @@ def test_build_training_argv_unsupported_raises(): spec = _base_spec("bad") with pytest.raises(ValueError, match="unsupported workload"): build_training_argv(spec, submission_id="sid", job_dir="/job") + + +def test_build_advanced_argv_uses_bash_lc(): + from argus.ray.builders import build_advanced_argv + + built = build_advanced_argv("echo hi") + assert built.argv[:2] == ["bash", "-lc"] diff --git a/src/mvp/py/tests/test_driver_entrypoint.py b/src/mvp/py/tests/test_driver_entrypoint.py index 34046c6..4902bbf 100644 --- a/src/mvp/py/tests/test_driver_entrypoint.py +++ b/src/mvp/py/tests/test_driver_entrypoint.py @@ -18,8 +18,10 @@ def test_driver_entrypoint_strips_double_dash_and_returns_code(monkeypatch, tmp_ returncode = 7 monkeypatch.setattr(mod.subprocess, "run", lambda cmd, check: _Proc()) + got = {} + monkeypatch.setattr(mod.os, "chdir", lambda p: got.update({"cwd": str(p)})) monkeypatch.setattr(sys, "argv", ["x", "--job-dir", str(tmp_path), "--", "echo", "hi"]) assert mod.main() == 7 assert (tmp_path).exists() - + assert got["cwd"] == str(tmp_path) diff --git a/src/mvp/py/tests/test_models.py b/src/mvp/py/tests/test_models.py index 68a4fd0..52406e9 100644 --- a/src/mvp/py/tests/test_models.py +++ b/src/mvp/py/tests/test_models.py @@ -31,6 +31,7 @@ def test_ray_config_from_dict_new_format_and_defaults(): assert cfg.entrypoint_resources["worker_node"] == 1.0 assert cfg.runtime_env_env_vars["HF_ENDPOINT"] == "x" assert cfg.user_code_path == "/private/user/code" + assert cfg.verl_code_path == "/private/common/code/verl/verl_repo" public = cfg.to_public_dict() assert public["runtime_env"]["env_vars"]["HF_ENDPOINT"] == "x" @@ -106,3 +107,50 @@ def test_jobspec_unsupported_workload(): JobSpec.from_dict( {"workload": "nope", "code_path": "x", "model_id": "m", "train_file": "t", "val_file": "v"} ) + + +def test_parse_taskspec_basic_no_kind_compat(): + from argus.ray.models import JobSpec, parse_taskspec + + got = parse_taskspec( + { + "workload": "ppo", + "code_path": "/code", + "model_id": "m", + "train_file": "train.jsonl", + "val_file": "val.jsonl", + } + ) + assert isinstance(got, JobSpec) + assert got.workload == "ppo" + + +def test_parse_taskspec_advanced_smoke(): + from argus.ray.models import AdvancedTaskSpec, parse_taskspec + + got = parse_taskspec( + { + "kind": "advanced", + "nnodes": 2, + "n_gpus_per_node": 4, + "command": "python3 -m verl.trainer.main_ppo +ray_kwargs.ray_init.address=auto", + } + ) + assert isinstance(got, AdvancedTaskSpec) + assert got.kind == "advanced" + assert got.workload == "advanced" + assert got.nnodes == 2 + assert got.n_gpus_per_node == 4 + + +def test_parse_taskspec_advanced_requires_command_nnodes_gpus(): + from argus.ray.models import parse_taskspec + + with pytest.raises(ValueError, match="missing required field: command"): + parse_taskspec({"kind": "advanced", "nnodes": 1, "n_gpus_per_node": 1}) + + with pytest.raises(ValueError, match="missing required field: nnodes"): + parse_taskspec({"kind": "advanced", "n_gpus_per_node": 1, "command": "python3 -m verl.trainer.main_ppo"}) + + with pytest.raises(ValueError, match="missing required field: n_gpus_per_node"): + parse_taskspec({"kind": "advanced", "nnodes": 1, "command": "python3 -m verl.trainer.main_ppo"}) diff --git a/src/mvp/py/tests/test_ray_job_tool.py b/src/mvp/py/tests/test_ray_job_tool.py index 15390cf..562392f 100644 --- a/src/mvp/py/tests/test_ray_job_tool.py +++ b/src/mvp/py/tests/test_ray_job_tool.py @@ -161,3 +161,58 @@ def test_submit_error_writes_file_then_reraises(tmp_path: Path, monkeypatch): err = tmp_path / "jobs" / "sid2" / "logs" / "submit.error.txt" assert err.exists() + + +def test_submit_advanced_writes_artifacts_and_returns_submission_id(tmp_path: Path, monkeypatch): + from argus.ray import ray_job_tool as mod + + class _FakeClient: + def __init__(self, address: str): + self.address = address + self.last_submit_kwargs = None + + def submit_job(self, **kwargs): + self.last_submit_kwargs = dict(kwargs) + return str(kwargs["submission_id"]) + + def list_jobs(self): + class X: + def dict(self): + return {"ok": True} + + return [X()] + + monkeypatch.setattr(mod, "JobSubmissionClient", _FakeClient) + monkeypatch.setattr(mod.ray, "init", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("no ray"))) + + cfg = RayConfig.from_dict( + { + "address": "http://127.0.0.1:8265", + "shared_root": str(tmp_path), + "entrypoint_resources": {"worker_node": 1}, + "runtime_env": {"env_vars": {}}, + } + ) + spec = mod.AdvancedTaskSpec.from_dict( + { + "kind": "advanced", + "submission_id": "sid3", + "nnodes": 2, + "n_gpus_per_node": 4, + "command": "echo hi", + } + ) + + tool = mod.RayJobTool(cfg) + submitted = tool.submit_advanced(spec, no_wait=True, user_id="alice") + assert submitted == "sid3" + + job_root = tmp_path / "jobs" / "sid3" + assert (job_root / "config" / "ray_config.yaml").exists() + assert (job_root / "config" / "taskspec.yaml").exists() + assert (job_root / "config" / "ray_submission_id.txt").read_text(encoding="utf-8").strip() == "sid3" + + payload = json.loads((job_root / "config" / "submit_payload.json").read_text(encoding="utf-8")) + assert payload["submission_id"] == "sid3" + assert "argus.ray.driver_entrypoint" in payload["entrypoint"] + assert (job_root / "debug" / "ray_resources_pre.error.txt").exists() diff --git a/src/mvp/py/tests/test_scheduler.py b/src/mvp/py/tests/test_scheduler.py index 59c016a..b6c929a 100644 --- a/src/mvp/py/tests/test_scheduler.py +++ b/src/mvp/py/tests/test_scheduler.py @@ -85,6 +85,67 @@ def test_tick_submits_one_task(monkeypatch, tmp_path: Path): assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01" +def test_tick_submits_one_task_advanced(monkeypatch, tmp_path: Path): + from argus.service import scheduler as sched_mod + + ray_cfg, v2_cfg = _mk_cfg(tmp_path) + db = Db(v2_cfg.sqlite.db_path) + db.init() + db.create_task_v25( + task_id="t1", + user_id="alice", + workload="advanced", + jobspec_yaml=yaml.safe_dump( + { + "kind": "advanced", + "nnodes": 2, + "n_gpus_per_node": 4, + "command": "python3 -m verl.trainer.main_ppo +ray_kwargs.ray_init.address=auto", + } + ), + nnodes=2, + n_gpus_per_node=4, + ) + + monkeypatch.setattr(sched_mod, "ensure_ray_connected", lambda: None) + monkeypatch.setattr( + sched_mod, + "get_cluster_available", + lambda: SimpleNamespace(total_available_gpus=999.0, total_available_npus=0.0), + ) + + class _Tool: + def __init__(self, cfg): + self.submitted = [] + self.job_dirs = [] + + def submit(self, spec, no_wait: bool, job_dir: str | None = None): + raise AssertionError("should not call submit() for advanced") + + def submit_advanced(self, spec, no_wait: bool, job_dir: str | None = None, user_id: str | None = None): + self.submitted.append(spec.submission_id) + self.job_dirs.append(job_dir) + return str(spec.submission_id) + + def status(self, submission_id: str): + return "RUNNING" + + def logs(self, submission_id: str): + return "" + + monkeypatch.setattr(sched_mod, "RayJobTool", _Tool) + + s = sched_mod.Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg) + s.tick() + + row = db.get_task("t1") + assert row and row["state"] == "SUBMITTED" + attempts = db.list_attempts("t1") + assert len(attempts) == 1 + assert attempts[0]["ray_submission_id"] == "t1--a01" + assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01" + + def test_tick_marks_pending_resources(monkeypatch, tmp_path: Path): from argus.service import scheduler as sched_mod diff --git a/src/mvp/py/tests/test_ui.py b/src/mvp/py/tests/test_ui.py index facf6b8..24d306d 100644 --- a/src/mvp/py/tests/test_ui.py +++ b/src/mvp/py/tests/test_ui.py @@ -78,3 +78,18 @@ def test_ui_task_detail_shows_ids(tmp_path, monkeypatch): assert f"/ui/tasks/{task_id}/logs" in r.text assert "TaskSpec (YAML)" in r.text assert "/api/v2/tasks/" in r.text + + +def test_ui_new_task_contains_advanced_example_snippet(tmp_path, monkeypatch): + cfg = _write_config(tmp_path) + monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token") + app = create_app(str(cfg)) + c = TestClient(app) + + r = c.get("/ui/tasks/new") + assert r.status_code == 200 + # Ensure Advanced example includes required PPO micro batch override (common failure mode if omitted). + assert "kind: advanced" in r.text + # workload is not needed for advanced in v3.5. + assert "# workload:" not in r.text + assert "actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu" in r.text