V3.6 增加W&B server,以及配置任务默认都开wandb

This commit is contained in:
yuyr 2026-01-05 17:04:10 +08:00
parent 70f52a907b
commit a6c66995e3
25 changed files with 5852 additions and 2223 deletions

View File

@ -0,0 +1,27 @@
# v3.6
wandb 映射目录是/vol 固定问题:
查过官方文档/公开资料后结论是wandb/localW&B local server 容器)没有提供“把服务端持
久化根目录从 /vol 改成别的路径”的官方环境变量/启动参数。官方用法一直是假设你把持久化卷
挂到容器内的固定路径 /vol例如 -v <something>:/vol。(github.com (https://github.com/
wandb/server))
需要注意区分两类“目录”:
- 服务端wandb/local 容器):持久化目录是容器内固定 /vol用于保存实例元数据、账号/初
始化信息等license 也可以用 env 配,但数据目录仍是 /vol。(github.com (https://
github.com/wandb/server))
- 训练侧wandb Python SDK / VERL 任务WANDB_DIR、WANDB_DATA_DIR 等环境变量只影响“客
户端本地生成文件/缓存”,不改变服务端容器的数据落盘路径。(docs.wandb.ai (https://
docs.wandb.ai/platform/hosting/env-vars))
所以如果你现在的约束是“只能挂 ../../shared:/private不能再额外挂 ../../shared/common/
wandb:/vol”要把 W&B 服务端数据落到 shared 下面,现实可行的路子是:
- 自定义 W&B 容器 entrypoint或 wrapper在启动前做一次 ln -s /private/common/wandb /
vol或 bind-mount 到 /vol让服务仍然写 /vol但实际落到 /private/common/wandb。
这属于“容器层改造”,不是 W&B 官方参数。
如果你允许 compose 再加一条 volume那最简单仍是保留 ../../shared:/private再额外
加 ../../shared/common/wandb:/vol服务端就无需任何改造

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,67 @@
# MVP v3.5 功能变更总结(相对 v3.0
> v3.5 本轮按已确认的精简 scope**只聚焦 Advanced TaskSpec + Custom Reward方式 A用户在 command 里写 overrides**。Serving/IB/断点续训/多版本 verl 等能力本轮仍不做。
## 1. TaskSpec / 任务语义
### 1.1 新增 Advanced TaskSpec自定义 command
- 新增 `kind: advanced` 的 TaskSpec 类型:
- 用户可提交任意 bash `command`,不再局限于平台内置 PPO/GRPO/SFT 模板。
- `workload` 不再要求用户填写,也不做 infer平台内部统一按 `"advanced"` 做任务分类与 task_id 命名(避免未来训练类型扩展带来的限制)。
- 支持 `$HOME` 宏替换(服务端提交前展开):
- `$HOME``/private/users/<user_id>`
- `$HOME/common/datasets``/private/datasets`
- `$HOME/common/hf``/private/hf`
- `command` 校验best-effort面向内部可信用户
- 要求包含 `python3 -m verl.`(允许 `verl.trainer.*` / `verl.model_merger` 等)。
- 不做强沙箱;主要防止明显误用导致的不可预期行为。
### 1.2 Custom Reward方式 A
- 平台不新增 reward 专用字段、不扩展 TaskSpec schema。
- 用户通过在 `command` 里写 VERL 原生 overrides 来注入 reward
- `custom_reward_function.path=...`
- `custom_reward_function.name=...`
- `custom_reward_function.reward_kwargs=...`(如需)
- 平台侧仅做:
- 基础路径/宏展开($HOME
- best-effort 的字符串校验(不做深度 AST 解析)
## 2. WebUINew Task 体验增强,仍兼容 YAML
- `New Task` 页面新增 **YAML 模式 / 表单模式**切换:
- 表单模式只覆盖 **5 个模板**PPO / GRPO / SFT / Advanced / Model Merge。
- 表单模式实时生成 YAML 预览Submit 时提交生成 YAML可一键切回 YAML 模式继续手工编辑。
- `Advanced example`
- 示例命令改为多行、可读性更好。
- 补齐 PPO 常见 fail-fast 所需的关键 overrides例如 actor micro batch避免用户“照抄即失败”。
- 新增 `Model merge example`Advanced command 形式):
- 使用 `python3 -m verl.model_merger merge ...`
- 支持用 `$HOME/jobs/<RAY_SUBMISSION_ID>/...` 访问训练产物目录。
## 3. SFTPGo / common 目录可读性(配合 v3.5 的 $HOME/common 语义)
> 这些变更主要用于保证 v3.5 所定义的 `$HOME/common/{datasets,hf}` 语义在 SFTPGo WebClient/客户端下可用。
- `/common/datasets``/common/hf` 作为 SFTPGo virtual folders 暴露为只读共享目录:
- 允许 list + download用于浏览与下载/查看内容;仍不允许 upload/rename/delete
- 权限规则覆盖到子路径(避免“能进目录但文件不可读”的情况)。
- API 调用 SFTPGo admin API 的连通性增强:
- dev 环境下避免依赖容器内 DNS部分 head 容器环境存在临时解析失败),改为通过 docker bridge 网关 + 映射端口访问 admin API。
- API 启动脚本确保注入 `SFTPGO_ADMIN_PASSWORD`(与 compose 默认值保持一致),避免 Reset Password 走到 401。
## 4. 兼容性与行为变化
- **完全兼容 v3.0 的 PPO/GRPO/SFT TaskSpec YAML**(原有字段与提交方式不变)。
- 新增能力不会影响 ray/node management仍按 v3.0head 发布 discovery、worker watchdog join/self-heal
- Advanced 任务不会进入 PPO/GRPO/SFT 的语义约束;平台仅负责:
- 资源字段(`nnodes` / `n_gpus_per_node`)用于队列调度与提交 gate
- 将 `command` 作为 Ray job entrypoint 执行
## 5. 已知限制v3.5 不做)
- 不提供“可视化” reward 配置面板(仅方式 A用户自己写 command
- 不支持 per-job 自定义 verl 代码快照/多版本共存(本轮不做 code_path 选择)。
- 不支持断点续训一键 resubmit / IB(RDMA) / model serving按 roadmap 后续版本推进)。

9
specs/mvp/v3.6/README.md Normal file
View File

@ -0,0 +1,9 @@
# MVP v3.6
本目录包含 v3.6 的需求与设计:
- `Snipaste_2026-01-05_10-56-34.png`v3.6 架构草图(在 v3.5 基础上增加 Weights & Biases其余模块保持不变
- `requirements.md`需求要点W&B + Evaluation 模板)
- `wandb.md`W&B local server 的前期调研与资料license、部署方式、VERL 配置要点等)
- `v3.6_design.md`v3.6 详细设计方案(基于 v3.5
- `v3.6_progress.md`v3.6 里程碑进度记录

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

View File

@ -0,0 +1,3 @@
1. 增加wandb功能
2. 增加evaluation 模板

View File

@ -0,0 +1,280 @@
# MVP v3.6 详细设计方案(基于 v3.5
> 设计基线:当前线上已具备 v3.5 能力PPO/GRPO/SFT + Advanced TaskSpec + SFTPGo 数据管理 + WebUI
> v3.6 的架构草图见:`specs/mvp/v3.6/Snipaste_2026-01-05_10-56-34.png`
## 0. 目标与范围
### 0.1 v3.6 目标
1) **Weights & BiasesW&B集成**
- 训练/任务运行时自动打点到 W&B local server。
- 采用“共享 W&B 账号license 只支持 1 user + 为每个 MVP 用户创建独立 project”的方式隔离可视化与检索体验。
- 平台提供 W&B 的跳转链接、以及每个 task 对应 run 的可定位信息(最小闭环)。
2) **New Task 增加 Evaluation 模板**
- 在 New Task 页面提供一个最小可用的 “Evaluation” 任务模板(用于离线评估/打分),并能把结果落到 job 输出与可选W&B。
### 0.2 非目标v3.6 不做)
- 不引入新的 node management 机制;保持 v3.5 的 head discovery + worker watchdog stateless pool。
- 不做 Serving/IB/RDMA/断点续训/多版本 verl code_path 等(仍按 v3.5 的范围)。
- 不做多租户安全隔离W&B API Key 注入属于内部可信环境)。
---
## 1. W&B local server部署与连通
> 资料参考:`specs/mvp/v3.6/wandb.md`。**文档中 license/token 属敏感信息v3.6 设计不在代码/文档里写死。**
### 1.1 部署方式dev/h1
建议在 `src/mvp/docker-compose.yaml` 新增 `wandb` 服务(与现有 ray_head / sftpgo 同一 compose network
- 镜像:`wandb/local:latest`
- 容器端口:`8080`
- 宿主机端口:建议 `8090:8080`(避免和 MVP API `8080`、SFTPGo `8081` 冲突)
- 持久化:挂载到 NFS/共享目录(例如 `/private/common/wandb`)以持久化 runs/artifacts
- 首次启动后由管理员在 W&B System Admin 页面粘贴 license`wandb.md`
### 1.1.1 持久化策略(必须)
v3.6 约定 **W&B server 的元数据必须持久化**(否则会丢 license/账号/API key、历史 runs/project 索引等):
- W&B server 数据目录(`/vol`挂载到共享目录NFS例如 `/private/common/wandb:/vol`
- 这部分数据由平台/管理员长期保留(不跟随单个 job 清理)
与之相对,**每个 Ray job 对应的本地 W&B run 文件**放在 job 目录下(见 §2.3 的 `WANDB_DIR`),并由现有 janitor 随 job 一起清理:
- `WANDB_DIR=/private/users/<user_id>/jobs/<ray_submission_id>/wandb`
- janitor 对 job 的策略是“结束后 3 天移入回收站、7 天后删除”,因此该目录会被一并处理
> 说明W&B 的最终“事实数据”在 server 侧持久化runs/metrics/可视化job 目录下的 `WANDB_DIR` 更像运行期缓存/临时文件与调试材料。
### 1.2 容器内访问 W&B 的 base_url
在 v3.5 经验中Ray head 容器对 docker 内部 DNS 名称解析不稳定(`Temporary failure in name resolution`)。
为避免再次踩坑v3.6 统一采用 **“docker bridge 网关 + host 映射端口”** 的方式让容器访问 W&B
- `WANDB_BASE_URL=http://172.22.0.1:8090`(其中 `172.22.0.1``mvp_argus-ray-net` 网关)
> 注意:如果未来 dev 环境 network 网段变化,需要把网关地址做成配置项(见 §2
---
## 2. 平台侧 W&B 集成API/Scheduler/Ray Job runtime_env
### 2.1 配置设计YAML
`configs/dev.yaml`(以及生产配置)中新增一段 W&B 配置,建议结构:
```yaml
tracking:
wandb:
enabled: true
base_url: "http://172.22.0.1:8090"
api_key_env: "WANDB_API_KEY"
entity: "" # 可选verL 通过 env WANDB_ENTITY 读取
project_suffix: "_project" # 例如 alice_project
# 可选代理verL 支持 trainer.wandb_proxy
proxy: null
```
平台读取 `api_key_env` 对应的环境变量,并在 job 维度注入 `WANDB_API_KEY`
**不在配置文件中存明文 API KEY**(避免泄露)。
### 2.2 项目/命名规范(共享账号 + user project
由于 W&B local license 限制 “最多 1 user”v3.6 采用:
- **同一个 W&B 账号**(同一个 `WANDB_API_KEY`
- 每个 MVP user 使用不同 project
- `project_name = <user_id> + project_suffix`
- 示例:`alice_project`
- 每个 Ray job attempt 对应一个 run
- `experiment_name = <ray_submission_id>`(保证 attempt 唯一;也便于从 dashboard 反查)
verL 内部 `wandb.init(project=project_name, name=experiment_name, entity=$WANDB_ENTITY)`(见 `verl/utils/tracking.py`)。
### 2.3 Ray Job 注入runtime_env env_vars
`tracking.wandb.enabled=true` 时,在 scheduler 提交 ray job 时,统一注入:
- `WANDB_BASE_URL`(来自配置)
- `WANDB_API_KEY`(来自 env `tracking.wandb.api_key_env`
- `WANDB_ENTITY`(可选)
- `WANDB_MODE=online`(默认在线;可在 dev/离线时切换)
- `WANDB_DIR=/private/users/<user_id>/jobs/<ray_submission_id>/wandb`(保证每个 job 的本地 run 文件落到对应 job 目录;便于随 job 一起被 janitor 清理)
> 对 Advanced TaskSpec平台无法替用户改 command但仍可注入上述 env让用户在 command 中自行启用 wandb。
### 2.4 verL 训练任务自动开启 wandb logger
对平台内置 workloadPPO/GRPO/SFT平台将 hydra overrides 改为:
- `trainer.logger=[console,wandb]`(替代 v3.5 的 `trainer.logger=console`
- `trainer.project_name=<user_id>_project`
- `trainer.experiment_name=<ray_submission_id>`
- 可选:如果配置了 `tracking.wandb.proxy`,注入 `trainer.wandb_proxy=...`
兼容性说明:
- `trainer.logger` 在 verL 的 ppo/sft config 中本身是 list示例为 `["console","wandb"]`),因此不会破坏 verL 配置解析。
- 现有 v3.5 的 checkpoint/log dir 仍按 job_dir 注入,不依赖 `trainer.default_local_dir`
### 2.5 数据库存储与 API 输出(最小闭环)
v3.6 需要让用户能在 WebUI/Task 详情中 “点一下跳转到 W&B”
建议在 DBsqlite里新增 attempt 级字段(或 metadata JSON
- `wandb_project`:如 `alice_project`
- `wandb_run_name`:如 `<ray_submission_id>`
- `wandb_base_url`:如 `http://<host>:8090`
- `wandb_url`:拼出来的最终链接(若 W&B local 的 URL 结构不稳定,可只存前 3 项,由前端拼接)
API 侧在:
- `GET /api/v2/tasks/<task_id>``latest_attempt` 增加 `wandb` 字段(或顶层增加 `tracking` 字段)
- `GET /api/v2/me` 增加 `wandb` 信息base_url、project_name便于页面展示 “Open W&B”
> W&B local 的 URL 路径结构需要在接入时确认;若不确定,先只提供 base_url + project/run 名称,用户可在 W&B UI 搜索。
---
## 3. WebUI 变更v3.6
### 3.1 Data / Login 页
- 在 Data 或 Login 页新增:
- “Open W&B” 链接(跳转到 `tracking.wandb.base_url` 的 Web UI
- 展示当前用户的 project 名称(例如 `alice_project`),并提供 Copy 按钮
### 3.2 Tasks / Task Detail
- Task list 无需大改Task detail 页面增加:
- W&B run 信息project / run name / link
- 若任务失败W&B 仍可用于查看已上报的中间日志(对训练类任务有价值)
### 3.3 New Task 增加 Evaluation 模板
New Task 页面新增一个 “Evaluation example”
#### 方案 A推荐最小改动作为 Advanced TaskSpec 模板
- 直接提供 `kind: advanced` 的模板command 运行 verL 自带评估入口:
- `python3 -m verl.trainer.main_eval data.path=... custom_reward_function.path=... +ray_kwargs.ray_init.address=auto`
- 优点:不需要扩展 TaskSpec schema / scheduler 逻辑
- 缺点Evaluation 在平台侧不可结构化(仍属于 advanced
#### 方案 B更产品化新增内置 workload: evaluation后续可选
若希望在任务分类/队列中把 evaluation 作为一类“内置任务”,可新增:
```yaml
workload: evaluation
code_path: /private/common/code/verl/verl_repo
nnodes: 1
n_gpus_per_node: 0
data_path: $HOME/common/datasets/<...>.parquet
response_key: responses
data_source_key: data_source
reward_model_key: reward_model
custom_reward_path: $HOME/code/reward.py # 可选
custom_reward_name: compute_score # 可选
```
平台把它编译成 `verl.trainer.main_eval` 的 hydra overrides + ray init address并可选择把结果解析后上报 W&B。
v3.6 建议先落地 **方案 A**(模板即可),方案 B 作为 v3.7+ 的演进。
---
## 4. 验收标准v3.6
### 4.1 W&B训练任务
- 提交 PPO/GRPO/SFT 任一任务:
- W&B local server 能看到对应 project`alice_project`
- 能看到 run 名称为该任务 attempt 的 `ray_submission_id`
- run 内能持续刷新 metrics至少包含 loss/step 等 verL 默认上报)
- WebUI
- 能打开 W&B UI 链接
- Task detail 能展示 project/run或至少可检索信息
### 4.2 Evaluation 模板
- New Task 中出现 “Evaluation example”
- 复制模板提交后:
- 任务能在 Ray 上运行CPU 即可,`+ray_kwargs.ray_init.address=auto` 连接集群)
- 输出 metrics`main_eval.py` 默认 print dict
- (可选)若用户在 command 里启用 wandb则能在对应 project 下看到评估 run
---
## 5. 兼容性影响评估
- 对现有 v3.5 功能:
- 默认行为改变PPO/GRPO/SFT 会默认多一个 loggerwandb并对外发起 HTTP 请求到 W&B server。
- 若 W&B server 不可用/配置缺失:
- 建议行为:平台自动降级为 `trainer.logger=[console]` 并在 task 状态中给出 warning避免训练直接失败
- 初版也可选择 fail-fast缺少 `WANDB_API_KEY` 时拒绝开启 wandb由配置开关控制
- 对资源/存储:
- W&B server 自身会写入一定量数据license 限制 10GB需配合 retention 策略做清理v3.6 先手动,后续可自动)。
- job 目录下的 `WANDB_DIR` 会随 jobs retention 被清理;不会无限增长。
---
## 6. 已确认的落地约定
- W&B server 对外端口:`8090`
- Project 命名:`<user_id>_project`(不使用 `WANDB_ENTITY`
- Evaluation先只提供 **Advanced 模板**New Task 页面提供示例即可)
---
## 7. 运维与验收流程dev/h1
### 7.1 启动服务docker compose
1) 启动/重启 composeRay head/worker + SFTPGo + W&B
```bash
cd /home2/argus/infra/mvp/src/mvp
docker compose up -d
```
2) 访问 UI
- MVP WebUI`http://<HOST>:8080/ui`
- Ray Dashboard`http://<HOST>:8265`
- SFTPGo Web`http://<HOST>:8081/web/`
- W&B Web`http://<HOST>:8090`
> W&B 容器数据目录已挂载到共享盘:`/private/common/wandb`(容器内 `/vol`)。
### 7.2 初始化 W&B管理员一次性操作
1) 打开 `http://<HOST>:8090/system-admin` 粘贴 license详见 `specs/mvp/v3.6/wandb.md`)。
2) 进入 W&B UI 创建/登录到共享账号license 只支持 1 user
3) 在 W&B UI 的用户设置页面生成 API Key通常位于 “User Settings / API Keys”
### 7.3 启动 API server需要注入 WANDB_API_KEY
平台不会把 `WANDB_API_KEY` 写入配置文件;必须在启动 API server 时通过环境变量提供。
示例(在宿主机):
```bash
export MVP_INTERNAL_TOKEN="my-dev-token"
export WANDB_API_KEY="...从 W&B UI 复制..."
cd /home2/argus/infra/mvp/src/mvp/scripts
./60_start_api.sh
```
> 说明:`./60_start_api.sh` 会把 `WANDB_API_KEY` 透传给 head 容器内运行的 API server。
### 7.4 验收(最小闭环)
1) 在 WebUI Login 页面能看到 W&B 区块Open W&B + project copy
2) 提交一个 PPO/GRPO/SFT 任务(任意一个即可):
- W&B project 为 `<user_id>_project`(如 `alice_project`
- run name 为该 attempt 的 `ray_submission_id`
3) 提交 Evaluation 模板Advanced能在 Ray 上运行并输出评估结果stdout / logs

View File

@ -0,0 +1,194 @@
# MVP v3.6(基于 v3.5开发计划TDD
> 设计依据:`specs/mvp/v3.6/v3.6_design.md`
> 本计划默认已确认:
> 1) W&B host 端口:`8090`2) project`<user_id>_project`3) evaluation 先做 Advanced 模板4) 不使用 `WANDB_ENTITY`
## 总体原则
- **TDD**:每个里程碑先补齐单测(或契约测试)再实现功能;保证覆盖率门槛不回退。
- **最小闭环**:先做到“可用+可验证”,再做体验优化;不做超出 v3.6 scope 的扩展。
- **配置不落盘敏感信息**`WANDB_API_KEY` 只能来自运行环境变量,不写入仓库配置文件。
---
## Milestones
### M1配置层tracking.wandb与解析
**目标**
- 在服务配置中新增 `tracking.wandb`支持开关、base_url、api_key_env。
- 不引入 `WANDB_ENTITY`(保持为空即可)。
**开发任务**
- 在 `src/mvp/py/argus/service/config.py`
- 新增 `TrackingConfig/WandbConfig` dataclass或等价结构
- `V2Config.from_root_dict()` 解析 `tracking.wandb.*`(缺省为 disabled
- 校验:`enabled=true``base_url` 不能为空;`api_key_env` 默认 `WANDB_API_KEY`
**测试(先写)**
- `test_config_parses_wandb_defaults`:没有 tracking 字段时,默认 disabled。
- `test_config_parses_wandb_enabled`enabled=true 能读到 base_url/api_key_env。
- `test_config_rejects_empty_base_url_when_enabled`enabled=true 且 base_url 空时报错(或记录 warning取决于实现选择
**验收**
- 仅通过 config 即能决定是否启用 W&B且不会破坏 v3.5 现有配置解析。
---
### M2Ray Job runtime_env 注入WANDB_* env
**目标**
- 当 `tracking.wandb.enabled=true` 时:平台在 job 粒度注入 `WANDB_BASE_URL/WANDB_API_KEY/WANDB_MODE/WANDB_DIR` 等 env。
- `WANDB_API_KEY` 从 API server 进程环境变量中读取:`os.environ[api_key_env]`
**开发任务**
- 在 scheduler / ray job builder`src/mvp/py/argus/service/scheduler.py``src/mvp/py/argus/ray/builders.py`
- 构造 job runtime_env.env_vars 的 merge 逻辑:
- 现有 `ray.runtime_env.env_vars` 为基础;
- 追加 W&B env不覆盖用户显式指定的同名变量或按“平台优先”策略二选一并写清楚
- 注入:
- `WANDB_BASE_URL=<cfg.tracking.wandb.base_url>`
- `WANDB_API_KEY=<os.environ[cfg.tracking.wandb.api_key_env]>`
- `WANDB_MODE=online`
- `WANDB_DIR=/private/users/<user_id>/jobs/<ray_submission_id>/wandb`(本地 run 文件随 job 一起由 janitor 清理)
**测试(先写)**
- `test_scheduler_injects_wandb_env_when_enabled`
- mock env 中存在 `WANDB_API_KEY`
- 提交一个内置任务ppo/sft 任意),断言构造出来的 runtime_env 含以上 env_vars。
- `test_scheduler_sets_wandb_dir_under_job_dir`
- 断言 `WANDB_DIR` 位于该 attempt 的 job 目录下(而不是 common 目录),避免无法跟随 job retention 清理。
- `test_scheduler_does_not_inject_wandb_env_when_disabled`
- `test_scheduler_wandb_missing_api_key_behaviour`
- enabled=true 但缺少 env 时的行为:
- 方案 A推荐**自动降级**为 console不注入 wandb并在 task/attempt message 记录 warning
- 或方案 Bfail-fast返回 500/400
- 需在实现前确认采用哪种策略;建议 v3.6 选 A 提升可用性。
**验收**
- 任意内置任务提交后,在 Ray job runtime_env 中能看到 `WANDB_*`
---
### M3内置训练任务自动开启 wandb loggerPPO/GRPO/SFT
**目标**
- 当 W&B enabled 时,平台默认把内置训练任务改成 `trainer.logger=["console","wandb"]`,并设置 project/run 命名。
**开发任务**
- 在 job 构建PPO/GRPO/SFT 的 overrides 生成处):
- 将 `trainer.logger``console` 改为 list`[console,wandb]`hydra 语法按现有实现方式拼接)。
- `trainer.project_name=<user_id>_project`
- `trainer.experiment_name=<ray_submission_id>`
- 保持 v3.5 的 job_dir/checkpoint/log dir 注入方式不变。
**测试(先写)**
- `test_job_overrides_include_wandb_logger_when_enabled`
- 断言 entrypoint/overrides 包含 `trainer.logger=[console,wandb]`(或等价写法)。
- 断言包含 `trainer.project_name=<user>_project``trainer.experiment_name=<submission_id>`
- `test_job_overrides_keep_console_only_when_wandb_disabled_or_missing_key`
**验收**
- 训练任务 run 会自动出现在 W&B 对应 project 下E2E 验证在 M6
---
### M4API 输出与 WebUI 链接(最小闭环)
**目标**
- 用户可以在 UI 里“知道去哪看 W&B”以及知道本 task 对应哪个 project/run。
**开发任务**
- API
- `GET /api/v2/me` 增加 `wandb` 信息(仅当 enabled 时返回):
- `base_url`
- `project_name``<user_id>_project`
- `GET /api/v2/tasks/{task_id}`(或 attempt 结构)增加 `wandb_project` / `wandb_run_name`run_name=ray_submission_id
- WebUI
- Login/Data 页增加 “Open W&B” 链接(跳 `base_url`),展示 project_name + Copy。
- Task detail 增加 wandb 字段展示project/run/可点击链接或可复制文本)。
**测试(先写)**
- `test_me_includes_wandb_when_enabled`mock config + env
- `test_task_detail_contains_wandb_fields_when_enabled`mock task/attempt
- `test_ui_contains_wandb_link`(渲染 HTML 断言包含 base_url/project_name 字样)。
**验收**
- WebUI 能一键跳转 W&BTask detail 能定位到 run。
---
### M5New Task 增加 Evaluation 模板Advanced
**目标**
- New Task 页面增加一个 Evaluation 模板按钮/示例(先按 Advanced TaskSpec 提供)。
**开发任务**
- 在 `src/mvp/py/argus/service/ui.py`
- YAML 模式增加 “Evaluation example”。
- 表单模式本轮可选(不要求):
- 如果要支持:把 evaluation 作为 advanced 模板的一种预填 command`kind: advanced`)。
- 模板建议使用 verL 自带入口:
- `python3 -m verl.trainer.main_eval ... +ray_kwargs.ray_init.address=auto`
- `data.path=$HOME/common/datasets/...`(按 v3.5 的宏规则)
**测试(先写)**
- `test_ui_new_task_contains_evaluation_example`:断言页面包含 `main_eval` 与关键字段。
**验收**
- 用户复制 evaluation 模板可提交成功并在 Ray 上运行E2E 在 M6
---
### M6端到端dev/h1部署与验收流程
> 里程碑 M6 以脚本/手工步骤为主;不强制写 e2e 自动化测试。
**部署任务**
- compose 增加 `wandb` 服务:
- `wandb/local:latest`
- host 端口 `8090:8080`
- 数据挂载到 `/private/common/wandb:/vol` 持久化W&B 元数据/账号/API key/历史 runs
- API 启动方式:
- 在宿主机 export`WANDB_API_KEY=<从 W&B UI 生成的 key>`
- 启动 API确保 env 透传到容器内)
- 首次初始化:
- 打开 `http://<h1_ip>:8090/system-admin` 粘贴 license管理员操作
**验收用例**
1) **训练类任务自动打点**
- 用 alice 提交一个 SFT或 PPO任务内置 workload
- 在 W&B UI 中看到 project`alice_project`
- run name 为 `ray_submission_id`metrics 可见
2) **Advanced task 可手动打点**
- 提交一个 Advanced用户自己写 command并在 command 中启用 `trainer.logger=["console","wandb"]`(如需)
- 确认 env 注入生效W&B 记录出现)
3) **Evaluation 模板**
- 用 New Task 的 Evaluation example 提交
- 任务成功运行并输出 metricsstdout/logs
- (可选)如果 evaluation 也启用 wandb则能出现在对应 project 下
**回归**
- v3.5 的三类任务ppo/grpo/sft在 W&B disabled 或缺少 key 时仍可跑通(至少 console 输出不受影响)。
**Retention 联动检查**
- 提交一个短任务生成 `WANDB_DIR`,结束后确认:
- `WANDB_DIR` 位于 `/private/users/<user>/jobs/<ray_submission_id>/wandb`
- janitor 运行后该目录会随 job 一起进入 trash / 被 purge与 jobs retention 一致)
---
## 交付物清单v3.6
- 文档:
- `specs/mvp/v3.6/v3.6_design.md`(已存在,必要时补充操作流程)
- `specs/mvp/v3.6/v3.6_dev_plan.md`(本文)
- 代码(预期变更点):
- `src/mvp/py/argus/service/config.py`
- `src/mvp/py/argus/service/scheduler.py` / `src/mvp/py/argus/ray/builders.py` / `src/mvp/py/argus/ray/ray_job_tool.py`
- `src/mvp/py/argus/service/app.py`/me 与 task detail 输出)
- `src/mvp/py/argus/service/ui.py`Open W&B + Evaluation template
- `src/mvp/docker-compose.yaml`wandb service
- `src/mvp/configs/dev.yaml`tracking.wandb 配置)
- `src/mvp/scripts/*`API 启动时 env 透传,必要时补充)

View File

@ -0,0 +1,42 @@
# MVP v3.6 进度记录
> 基线v3.5 已完成Advanced TaskSpec + Custom reward方式A+ WebUI + SFTPGo + stateless ray node pool
> 本文件用于记录 v3.6 每个 milestone 的完成情况与关键改动点。
## M1完成
- 新增 `tracking.wandb` 配置解析与校验enabled/base_url/api_key_env
## M2完成
- Ray job 维度注入 `WANDB_*` env`WANDB_BASE_URL/WANDB_API_KEY/WANDB_MODE/WANDB_DIR`),缺少 key 时降级并记录 warning。
## M3完成
- PPO/GRPO/SFT 内置训练任务在 wandb 可用时自动追加 overrides
- `trainer.logger=[console,wandb]`
- `trainer.project_name=<user_id>_project`
- `trainer.experiment_name=<ray_submission_id>`
## M4完成
- API 输出增加 W&B 定位信息:
- `/api/v2/me` 返回 `wandb.{enabled,base_url,project_name}`
- `/api/v2/tasks/{task_id}``latest_attempt.wandb` 返回 `{base_url,project_name,run_name}`
- WebUI
- Login 页面增加 W&B 区块(跳转 8090、copy project
- Task detail 页面增加 W&B 区块copy run
## M5完成
- WebUI New Task 增加 Evaluation 模板Advanced
- 使用 `python3 -m verl.trainer.main_eval ... +ray_kwargs.ray_init.address=auto`
- 以占位符路径示例(用户替换 `<RAY_SUBMISSION_ID>/<EVAL_PARQUET>`
## M6完成
- `docker-compose.yaml` 集成 W&B local server
- host 端口 `8090`
- 持久化目录 `/private/common/wandb`(容器内 `/vol`
- dev 配置新增 `tracking.wandb` 默认开启(缺 key 自动降级并记录 warning
- API 启动脚本支持把 `WANDB_API_KEY` 从宿主机透传到 head 容器中的 API server。

View File

@ -0,0 +1,126 @@
# MVP v3.6 迭代研发总结(基于 v3.5
> 时间基线2026-01H20 dev 环境:`argus@h1:/home2/argus/infra/mvp`
> v3.6 架构草图:`specs/mvp/v3.6/Snipaste_2026-01-05_10-56-34.png`
## 1. 迭代目标回顾
v3.6 在 v3.5WebUI + API server + Ray stateless node pool + SFTPGo + Advanced TaskSpec基础上新增两块能力
1) **Weights & BiasesW&Blocal server 集成**
- 训练任务PPO/GRPO/SFT默认可写入 W&B。
- 采用“共享 W&B 账号 + 按用户拆分 project`<user_id>_project`)”的隔离策略。
2) **New Task 增加 Evaluation 示例**
- New Task 页面新增一个最小可用的 evaluation 模板(以 Advanced command 方式运行 `verl.trainer.main_eval`)。
## 2. 交付内容(代码/配置/脚本)
### 2.1 部署形态docker compose
v3.6 在 `src/mvp/docker-compose.yaml` 新增 W&B 服务:
- 服务名:`wandb`(容器名:`argus-wandb`
- 宿主机端口:`8090:8080`
- 持久化:`../../shared/common/wandb:/vol`
- 同 network`argus-ray-net`(便于 Ray 容器内访问)
### 2.2 平台配置YAML
`src/mvp/configs/dev.yaml` 增加/启用 W&B 配置:
```yaml
tracking:
wandb:
enabled: true
base_url: "http://172.22.0.1:8090"
api_key_env: "WANDB_API_KEY"
project_suffix: "_project"
```
说明:
- `base_url` 采用 docker bridge 网关 + 宿主机映射端口的方式,规避容器内 DNS 偶发解析失败问题。
- 不在 config 中写明文 key统一通过启动 API server 时注入 `WANDB_API_KEY`
### 2.3 Ray Job runtime_env 注入(核心)
v3.6 在**每个 Ray job attempt**提交时注入两类环境变量:
1) **始终注入(无论是否启用 W&B**:便于 Advanced command 在不改模板的情况下能运行
- `MVP_TRAINER_LOGGER``console``[console,wandb]`
- `MVP_WANDB_PROJECT``<user_id>_project`(例如 `alice_project`
- `MVP_WANDB_RUN``<ray_submission_id>`(每次 attempt 唯一)
2) **当 W&B 有效启用时注入**
- `WANDB_BASE_URL`
- `WANDB_API_KEY`
- `WANDB_MODE=online`
- `WANDB_DIR=<job_dir>/wandb`(例如 `/private/users/alice/jobs/<ray_sid>/wandb`
降级策略:
- 当 `tracking.wandb.enabled=true` 但缺少 `WANDB_API_KEY` 时,平台会**降级为 console**(并在 attempt.message 中记录 warning避免训练失败。
### 2.4 WebUI 变更
1) **Login 页面**
- 增加 “Open W&B” 跳转(指向 `tracking.wandb.base_url`
2) **New Task 页面**
- 新增 **Evaluation example**
- Advanced example 更新为 v3.6
- `command: |` 内不再包含任何注释(避免 YAML/命令解析报错)
- W&B 参数不再让用户手填,改为引用平台注入的 `${MVP_*}` env
- `trainer.logger=${MVP_TRAINER_LOGGER}`
- `trainer.project_name=${MVP_WANDB_PROJECT}`
- `trainer.experiment_name=${MVP_WANDB_RUN}`
> 备注driver 日志里会打印 `MVP_DRIVER_EXEC: bash -lc '...'`,此处看到 `${MVP_*}` 仍是“未替换”的字符串是正常现象;变量替换发生在 `bash` 执行阶段,而不是打印 argv 阶段。
### 2.5 启动脚本
`src/mvp/scripts/60_start_api.sh` 支持将宿主机的 `WANDB_API_KEY` 透传进 head 容器内启动的 API server
- 宿主机设置:`export WANDB_API_KEY=...`
- 启动 API脚本会 `docker exec -e WANDB_API_KEY=...` 进入 head 容器启动 `python3 /workspace/mvp/py/server.py`
## 3. 用户侧操作流程v3.6
### 3.1 一次性初始化(只在首次启用/清空 /vol 时需要)
1) 打开 W&B UI`http://<h1机器IP>:8090`
2) 在 System Admin 页面粘贴 license 完成初始化
3) 生成并记录 `WANDB_API_KEY`local key
4) 以后启动 API server 时注入该 key`WANDB_API_KEY=...`
只要保留 `shared/common/wandb`(即 `/vol` 持久化目录),重建容器无需再次进入 8090 配置。
### 3.2 日常使用
1) WebUI 登录:`http://<h1机器IP>:8080/ui/login`(输入 user token
2) New Task 提交任务:`http://<h1机器IP>:8080/ui/tasks/new`
3) 到 Tasks 查看状态/日志:`/ui/tasks` 与 task detail
4) 打开 W&B`http://<h1机器IP>:8090`,在 `<user_id>_project` 下查看 runs/metrics
## 4. 验收结果(本迭代应达成)
1) PPO/GRPO/SFT 任一任务运行后:
- W&B local server 可见对应 project`alice_project`
- run name 与 `ray_submission_id` 对齐(便于追踪每次 attempt
2) Evaluation 示例:
- 可作为 Advanced 任务提交并在 Ray 上执行 `verl.trainer.main_eval`
- 支持用户在 command 内自行加入 reward overrides平台不做封装
## 5. 已知限制与后续建议
1) **W&B 初始化自动化**
- 当前:首次仍需在 8090 页面粘贴 license、生成 key更稳、侵入最小
- 若需要“从零部署也完全免页面操作”,可进一步调研 W&B local 的可用管理 API/启动参数(自动注入 license + 自动创建 key
2) **Advanced command 的自由度**
- 平台只负责:
- `$HOME` 宏替换
- runtime_env env_vars 注入
- 任务队列与 Ray job 提交
- command 的语义正确性仍由用户负责(例如 PPO 必需的 micro batch 等参数)。

70
specs/mvp/v3.6/wandb.md Normal file
View File

@ -0,0 +1,70 @@
# License
License:
eyJhbGciOiJSUzI1NiIsImtpZCI6InUzaHgyQjQyQWhEUXM1M0xQY09yNnZhaTdoSlduYnF1bTRZTlZWd1VwSWM9In0.eyJjb25jdXJyZW50QWdlbnRzIjoxLCJ0cmlhbCI6ZmFsc2UsIm1heFN0b3JhZ2VHYiI6MTAsIm1heFRlYW1zIjowLCJtYXhVc2VycyI6MSwibWF4Vmlld09ubHlVc2VycyI6MCwibWF4UmVnaXN0ZXJlZE1vZGVscyI6MiwiZXhwaXJlc0F0IjoiMjAyNy0wMS0wNVQwMjoxMjo1MC4zMjRaIiwiZGVwbG95bWVudElkIjoiYzNmN2Y5N2ItMzAxOS00Nzk2LTkxYTgtZDUyMjc1NDBiMTI1IiwiZmxhZ3MiOltdLCJjb250cmFjdFN0YXJ0RGF0ZSI6IjIwMjYtMDEtMDVUMDI6MTI6NTAuMzI0WiIsImFjY2Vzc0tleSI6IjYxMGM5NjliLTk4ZWEtNGRhNS1iYzU1LWM2MzVlZWNhNzc0OCIsInNlYXRzIjoxLCJ2aWV3T25seVNlYXRzIjowLCJ0ZWFtcyI6MCwicmVnaXN0ZXJlZE1vZGVscyI6Miwic3RvcmFnZUdpZ3MiOjEwLCJleHAiOjE3OTkxMTUxNzAsIndlYXZlTGltaXRzIjp7IndlYXZlTGltaXRCeXRlcyI6bnVsbCwid2VhdmVPdmVyYWdlQ29zdENlbnRzIjowLCJ3ZWF2ZU92ZXJhZ2VVbml0IjoiTUIifX0.VADnc0PExWhGDAxMIbu0vlmPN423B398of4HFl6BMJ1vqGA9H1ESElOZfk0VQ0YnYgwZc_CZF9k0HRyfCBgRhtRKyB1PpGnaKT_kKNVQryykWRpNhnpDqhmTa-wfTUBXNxhu1ktNPKBFNaEbaYuPsLN_aXPGW0dDwp6coGnGEXEqdRmuvekE6ytu7t6IA6flYs35WqCojvvjAmfBdovo2zPTfmlqKeaz7GPrApMo9JBpmb1a6bZEjCoRhhUx_k-v2rbvE3hd9ix9_UMZ6siJ5IKtNuXy_cprcCXXIFVUMcfTnt78RRXY0jCRMQqWkNq9ZGF0Mgcjsh3ts9xSxPgWnw
# License 限制
Add License to your Local Instance
Create up to 0 teams
Create up to 1 users
Store up to 10 GB of data
Create up to 2 Registered Models
Quickstart
On a machine with Docker and Python installed, run:
1 pip install wandb --upgrade
2 wandb server start
Generate a free license from the Deployer.
Add it to your W&B Server's localhost's settings.
Paste the license in the /system-admin page on your localhost
# docker 部署
deployment:
version: "3.8"
services:
wandb:
image: wandb/local:latest
container_name: wandb-local
ports:
- "8080:8080"
volumes:
- wandb_data:/vol
restart: unless-stopped
volumes:
wandb_data:
# 连接方式:
方式 B环境变量适合容器/批处理/CI
通过 ray job的runtime_env来设置环境变量
export WANDB_BASE_URL=http://<服务器IP或域名>:8080
export WANDB_API_KEY=<你的API_KEY>
官方文档说明可以用 WANDB_BASE_URL + WANDB_API_KEY 代替 wandb login --host ..
# verl配置
在 verl 里打开 wandb你只需要配 trainer
verl 的配置里最关键是这三个字段trainer.logger、trainer.project_name、trainer.experiment_name。文档里也写了 logger 用于 console + trackingtracking 会初始化 wandb
veRL Documentation
+1
推荐写法(新版本):
trainer:
logger: ["console", "wandb"]
project_name: my_project # 用argus的用户名_project ,譬如 alice_project
experiment_name: exp_001 # 用 task id 作为实验名

View File

@ -32,6 +32,16 @@ service:
retry_interval_s: 60
max_running_tasks: 1
tracking:
wandb:
# v3.6: enable Weights & Biases integration (best-effort).
# - If WANDB_API_KEY is missing, the platform degrades to console logging and records a warning.
enabled: true
# For dev compose, use docker bridge gateway + published port for stability.
base_url: "http://172.22.0.1:8090"
api_key_env: "WANDB_API_KEY"
project_suffix: "_project"
# v3.0: user data management (filesystem + SFTPGo)
data:
# All user writable data is placed under this root:

View File

@ -32,6 +32,14 @@ service:
retry_interval_s: 60
max_running_tasks: 1
tracking:
wandb:
enabled: true
# For dev compose, recommend docker bridge gateway + host published port for stability.
base_url: "http://172.22.0.1:8090"
api_key_env: "WANDB_API_KEY"
project_suffix: "_project"
data:
user_root: "/private/users"
sftpgo:
@ -48,4 +56,3 @@ data:
jobs_trash_after_days: 3
jobs_purge_after_days: 7
janitor_interval_s: 3600

View File

@ -74,6 +74,27 @@ services:
SFTPGO_HTTPD__BINDINGS__0__PORT: "8080"
SFTPGO_SFTPD__BINDINGS__0__PORT: "2022"
# v3.6: Weights & Biases local server (shared single account; per-user projects).
#
# - UI: 8080 in container, mapped to host 8090
# - Data: persist under /private/common/wandb (NFS/shared)
#
# NOTE:
# - The license/initial setup is done via the W&B UI.
# - Ray jobs use WANDB_BASE_URL from config (recommended: docker bridge gateway + host published port).
wandb:
image: wandb/local:latest
container_name: argus-wandb
ports:
- "8090:8080"
volumes:
- ../../shared/common/wandb:/vol
networks:
argus-ray-net:
aliases:
- wandb
- argus-wandb
ray_worker_0:
image: argus/argus-ray-node:v2.5
container_name: argus-ray-worker-0

View File

@ -10,7 +10,7 @@ from typing import Any
import ray
from ray.job_submission import JobSubmissionClient
from .builders import build_advanced_argv, build_training_argv
from .builders import BuiltCommand, build_advanced_argv, build_training_argv
from .models import AdvancedTaskSpec, JobSpec, RayConfig
from .yaml_io import dump_yaml
@ -44,10 +44,17 @@ class RayJobTool:
def _job_dir(self, submission_id: str) -> str:
return f"{self.cfg.shared_root}/jobs/{submission_id}"
def _runtime_env(self, spec: JobSpec) -> dict[str, Any]:
return self._runtime_env_for(code_path=spec.code_path, workload=spec.workload)
def _runtime_env(self, spec: JobSpec, *, extra_env_vars: dict[str, str] | None = None) -> dict[str, Any]:
return self._runtime_env_for(code_path=spec.code_path, workload=spec.workload, extra_env_vars=extra_env_vars)
def _runtime_env_for(self, *, code_path: str, workload: str, extra_pythonpath: list[str] | None = None) -> dict[str, Any]:
def _runtime_env_for(
self,
*,
code_path: str,
workload: str,
extra_pythonpath: list[str] | None = None,
extra_env_vars: dict[str, str] | None = None,
) -> dict[str, Any]:
env_vars = dict(self.cfg.runtime_env_env_vars)
# Default HF cache
@ -74,13 +81,29 @@ class RayJobTool:
if workload == "sft":
env_vars.setdefault("RAY_ADDRESS", "auto")
# Per-job extra env vars (e.g. W&B). Do not override existing keys.
if extra_env_vars:
for k, v in dict(extra_env_vars).items():
if k not in env_vars:
env_vars[k] = str(v)
return {"env_vars": env_vars}
def submit(self, spec: JobSpec, no_wait: bool, job_dir: str | None = None) -> str:
def submit(
self,
spec: JobSpec,
no_wait: bool,
job_dir: str | None = None,
*,
extra_env_vars: dict[str, str] | None = None,
extra_overrides: list[str] | None = None,
) -> str:
submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}"
job_dir = job_dir or self._job_dir(submission_id)
built = build_training_argv(spec, submission_id=submission_id, job_dir=job_dir)
if extra_overrides:
built = BuiltCommand(argv=[*built.argv, *list(extra_overrides)])
entrypoint_argv = [
"python3",
"-m",
@ -91,7 +114,7 @@ class RayJobTool:
]
entrypoint = " ".join(shlex.quote(x) for x in entrypoint_argv)
runtime_env = self._runtime_env(spec)
runtime_env = self._runtime_env(spec, extra_env_vars=extra_env_vars)
# Prepare job artifacts directory
job_root = Path(job_dir)
@ -155,6 +178,8 @@ class RayJobTool:
no_wait: bool,
job_dir: str | None = None,
user_id: str | None = None,
*,
extra_env_vars: dict[str, str] | None = None,
) -> str:
submission_id = spec.submission_id or f"mvp11_{spec.workload}_{_ts()}_{os.getpid()}"
job_dir = job_dir or self._job_dir(submission_id)
@ -174,7 +199,12 @@ class RayJobTool:
if user_id:
extra_pythonpath.append(f"{self.cfg.shared_root}/users/{user_id}/code")
runtime_env = self._runtime_env_for(code_path=self.cfg.verl_code_path, workload=spec.workload, extra_pythonpath=extra_pythonpath)
runtime_env = self._runtime_env_for(
code_path=self.cfg.verl_code_path,
workload=spec.workload,
extra_pythonpath=extra_pythonpath,
extra_env_vars=extra_env_vars,
)
# Prepare job artifacts directory
job_root = Path(job_dir)

View File

@ -276,6 +276,14 @@ def create_app(config_path: str) -> FastAPI:
"jobs_purge_after_days": int(v2_cfg.data.retention.jobs_purge_after_days),
},
}
if v2_cfg.tracking.wandb.enabled:
suffix = v2_cfg.tracking.wandb.project_suffix or "_project"
out["wandb"] = {
"enabled": True,
# Training uses base_url; UI can also open the host port directly.
"base_url": v2_cfg.tracking.wandb.base_url,
"project_name": f"{user_id}{suffix}",
}
if _sftpgo_enabled():
out["sftp"] = {
"host": v2_cfg.data.sftpgo.host,
@ -429,7 +437,7 @@ def create_app(config_path: str) -> FastAPI:
"error_summary": row.get("error_summary"),
}
if latest_attempt:
out["latest_attempt"] = {
latest_out: dict[str, Any] = {
"attempt_no": latest_attempt["attempt_no"],
"ray_submission_id": latest_attempt["ray_submission_id"],
"ray_status": latest_attempt.get("ray_status"),
@ -438,6 +446,16 @@ def create_app(config_path: str) -> FastAPI:
"failure_kind": latest_attempt.get("failure_kind"),
"message": latest_attempt.get("message"),
}
if v2_cfg.tracking.wandb.enabled:
suffix = v2_cfg.tracking.wandb.project_suffix or "_project"
# For v3.6: shared W&B account, user-level projects.
# Use attempt's ray_submission_id as run name for stable mapping.
latest_out["wandb"] = {
"base_url": v2_cfg.tracking.wandb.base_url,
"project_name": f"{str(row.get('user_id') or subject.get('user_id'))}{suffix}",
"run_name": str(latest_attempt.get("ray_submission_id") or ""),
}
out["latest_attempt"] = latest_out
return out
@app.get("/api/v2/tasks/{task_id}/spec")

View File

@ -34,6 +34,19 @@ class V2RetentionConfig:
janitor_interval_s: int = 3600
@dataclass(frozen=True)
class V2WandbConfig:
enabled: bool = False
base_url: str = ""
api_key_env: str = "WANDB_API_KEY"
project_suffix: str = "_project"
@dataclass(frozen=True)
class V2TrackingConfig:
wandb: V2WandbConfig
@dataclass(frozen=True)
class V2SFTPGoConfig:
enabled: bool = False
@ -57,6 +70,7 @@ class V2Config:
auth: V2AuthConfig
sqlite: V2SqliteConfig
scheduler: V2SchedulerConfig
tracking: V2TrackingConfig
data: V2DataConfig
@staticmethod
@ -83,6 +97,13 @@ class V2Config:
else:
shared_root = str(root.get("shared_root") or "/private")
tracking = root.get("tracking") or {}
if not isinstance(tracking, dict):
raise ValueError("config.tracking must be a mapping")
wandb = tracking.get("wandb") or {}
if not isinstance(wandb, dict):
raise ValueError("config.tracking.wandb must be a mapping")
data = root.get("data") or {}
if not isinstance(data, dict):
raise ValueError("config.data must be a mapping")
@ -96,6 +117,11 @@ class V2Config:
user_root = str(data.get("user_root") or f"{shared_root}/users")
wandb_enabled = bool(wandb.get("enabled") or False)
wandb_base_url = str(wandb.get("base_url") or "")
if wandb_enabled and not wandb_base_url:
raise ValueError("tracking.wandb.base_url is required when tracking.wandb.enabled=true")
return V2Config(
api=V2ApiConfig(
host=str(api.get("host") or "0.0.0.0"),
@ -108,6 +134,14 @@ class V2Config:
retry_interval_s=int(scheduler.get("retry_interval_s") or 60),
max_running_tasks=int(scheduler.get("max_running_tasks") or 1),
),
tracking=V2TrackingConfig(
wandb=V2WandbConfig(
enabled=wandb_enabled,
base_url=wandb_base_url,
api_key_env=str(wandb.get("api_key_env") or "WANDB_API_KEY"),
project_suffix=str(wandb.get("project_suffix") or "_project"),
)
),
data=V2DataConfig(
user_root=user_root,
sftpgo=V2SFTPGoConfig(

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import os
import re
import time
from dataclasses import dataclass
@ -48,6 +49,74 @@ class Scheduler:
required = float(nnodes * n_gpus_per_node)
return avail.total_available_gpus >= required
def _wandb_env_vars(self, *, ray_submission_id: str, job_dir: str) -> tuple[dict[str, str], str | None]:
"""
Returns (env_vars, warn_message).
env_vars is always non-empty so that AdvancedTaskSpec examples can safely
reference injected env vars without requiring the user to edit their command:
- MVP_TRAINER_LOGGER: "console" or "[console,wandb]"
- MVP_WANDB_PROJECT: "<user_id>_project"
- MVP_WANDB_RUN: "<ray_submission_id>"
When W&B is effectively enabled (config enabled + API key present),
WANDB_* env vars are included as well.
"""
env, warn, _enabled = self._tracking_env_vars(user_id=None, ray_submission_id=ray_submission_id, job_dir=job_dir)
return env, warn
def _tracking_env_vars(
self, *, user_id: str | None, ray_submission_id: str, job_dir: str
) -> tuple[dict[str, str], str | None, bool]:
"""
Returns (env_vars, warn_message, wandb_enabled_effective).
- env_vars always includes MVP_* vars so advanced commands can use them.
- wandb_enabled_effective is True only when tracking.wandb.enabled AND api key exists.
"""
cfg = self.v2_cfg.tracking.wandb
project_suffix = cfg.project_suffix or "_project"
project_name = f"{user_id}{project_suffix}" if user_id else f"mvp{project_suffix}"
api_key = os.environ.get(cfg.api_key_env, "").strip()
wandb_enabled_effective = bool(cfg.enabled and api_key)
warn: str | None = None
if cfg.enabled and not api_key:
warn = f"WARN_WANDB_DISABLED_MISSING_ENV:{cfg.api_key_env}"
env: dict[str, str] = {
"MVP_TRAINER_LOGGER": ("[console,wandb]" if wandb_enabled_effective else "console"),
"MVP_WANDB_PROJECT": project_name,
"MVP_WANDB_RUN": ray_submission_id,
}
if wandb_enabled_effective:
env.update(
{
"WANDB_BASE_URL": cfg.base_url,
"WANDB_API_KEY": api_key,
"WANDB_MODE": "online",
"WANDB_DIR": f"{job_dir.rstrip('/')}/wandb",
}
)
return env, warn, wandb_enabled_effective
def _wandb_training_overrides(self, *, user_id: str | None, ray_submission_id: str) -> list[str] | None:
"""
Returns hydra overrides to enable W&B logging for built-in training workloads.
Only call this when W&B is effectively enabled for the job (i.e. API key exists).
"""
cfg = self.v2_cfg.tracking.wandb
project_suffix = cfg.project_suffix or "_project"
project_name = f"{user_id}{project_suffix}" if user_id else f"mvp{project_suffix}"
return [
"trainer.logger=[console,wandb]",
f"trainer.project_name={project_name}",
f"trainer.experiment_name={ray_submission_id}",
]
def _parse_taskspec(self, jobspec_yaml: str) -> JobSpec | AdvancedTaskSpec:
obj = yaml.safe_load(jobspec_yaml) or {}
if not isinstance(obj, dict):
@ -70,21 +139,38 @@ class Scheduler:
self.db.set_task_state(task_id=task_id, state="SUBMITTING", latest_attempt_no=attempt_no)
# Override submission_id in taskspec (v1.1 compatible)
tracking_env, wandb_warn, wandb_enabled = self._tracking_env_vars(
user_id=user_id_s, ray_submission_id=ray_sid, job_dir=job_dir
)
if isinstance(spec, JobSpec):
d = spec.to_public_dict()
d["submission_id"] = ray_sid
spec2 = JobSpec.from_dict(d)
submit = lambda: self.tool.submit(spec2, no_wait=True, job_dir=job_dir)
wandb_overrides = self._wandb_training_overrides(user_id=user_id_s, ray_submission_id=ray_sid) if wandb_enabled else None
submit = lambda: self.tool.submit(
spec2,
no_wait=True,
job_dir=job_dir,
extra_env_vars=tracking_env,
extra_overrides=wandb_overrides,
)
else:
d = spec.to_public_dict()
d["submission_id"] = ray_sid
spec2 = AdvancedTaskSpec.from_dict(d)
submit = lambda: self.tool.submit_advanced(spec2, no_wait=True, job_dir=job_dir, user_id=user_id_s)
submit = lambda: self.tool.submit_advanced(
spec2, no_wait=True, job_dir=job_dir, user_id=user_id_s, extra_env_vars=tracking_env
)
try:
submitted = submit()
# submitted should equal ray_sid; keep as source of truth.
self.db.update_attempt(task_id=task_id, attempt_no=attempt_no, ray_status="SUBMITTED")
self.db.update_attempt(
task_id=task_id,
attempt_no=attempt_no,
ray_status="SUBMITTED",
message=wandb_warn,
)
self.db.set_task_state(task_id=task_id, state="SUBMITTED")
if submitted != ray_sid:
self.db.set_task_state(task_id=task_id, state="SUBMITTED", event_type="WARN_SUBMISSION_ID_MISMATCH")

View File

@ -188,11 +188,29 @@ def register_ui_routes(app: FastAPI) -> None:
<div style="height:10px"></div>
<pre id="me" class="muted">(not loaded)</pre>
</div>
<div style="height:12px"></div>
<div class="card">
<h3 style="margin-top:0">W&B</h3>
<div class="muted">Weights &amp; Biases local server (v3.6). Metrics are written by training jobs; this UI is for viewing.</div>
<div style="height:10px"></div>
<div class="row">
<a class="btn" id="wandb-open" target="_blank" rel="noopener" href="#">Open W&amp;B (:8090)</a>
<button class="btn" id="wandb-copy-project">Copy project</button>
</div>
<div style="height:10px"></div>
<div class="muted">project: <code id="wandb-project">(unknown)</code></div>
<div class="muted">base_url (job runtime): <code id="wandb-base-url">(unknown)</code></div>
</div>
""".strip()
script = """
const tokEl = document.getElementById("tok");
const msg = document.getElementById("msg");
const me = document.getElementById("me");
const wandbOpen = document.getElementById("wandb-open");
const wandbProject = document.getElementById("wandb-project");
const wandbBaseUrl = document.getElementById("wandb-base-url");
document.getElementById("wandb-copy-project").onclick = async () => { await copyText(wandbProject.textContent || ""); };
wandbOpen.href = curOriginWithPort(8090);
tokEl.value = mvpTokenGet();
async function refreshMe() {
@ -200,8 +218,17 @@ async function refreshMe() {
try {
const obj = await apiJson("/api/v2/me");
me.textContent = fmtJson(obj);
if (obj.wandb && obj.wandb.enabled) {
wandbProject.textContent = obj.wandb.project_name || "(unknown)";
wandbBaseUrl.textContent = obj.wandb.base_url || "(unknown)";
} else {
wandbProject.textContent = "(disabled)";
wandbBaseUrl.textContent = "(disabled)";
}
} catch (e) {
me.textContent = "Error: " + (e.status || "") + "\\n" + (e.body || String(e));
wandbProject.textContent = "(error)";
wandbBaseUrl.textContent = "(error)";
}
}
@ -358,19 +385,26 @@ val_file: /private/common/datasets/gsm8k_sft/test.parquet # 验证数据(必
# trainer_device: cpu # 仅 SFT 生效driver 侧 device可选默认cpu
# submission_id: "" # Ray submission_id可选默认空通常由服务自动生成无需填写
""".strip()
adv = """# Advanced TaskSpec (YAML) - v3.5
adv = """# Advanced TaskSpec (YAML) - v3.6
kind: advanced # 任务类型必填advanced自定义 command
# 说明v3.5 中 Advanced 任务不会按 ppo/grpo/sft 分类;平台统一按 "advanced" 做任务分类与 task_id 命名。
nnodes: 2 # 训练节点数(必填):用于平台队列调度与资源预检查
n_gpus_per_node: 4 # 每节点 GPU 数(必填):用于平台队列调度与资源预检查
# 自定义训练命令(必填):平台会做 $HOME 宏替换:
# 说明:平台统一按 "advanced" 做任务分类与 task_id 命名(不按 ppo/grpo/sft 细分)。
#
# 自定义训练命令:平台会做 $HOME 宏替换:
# - $HOME -> /private/users/<user>
# - $HOME/common/datasets -> /private/datasets共享只读数据
# - $HOME/common/hf -> /private/hf共享只读 HF cache
#
# W&Bv3.6
# - 平台会在 runtime_env 注入 WANDB_BASE_URL/WANDB_API_KEY/WANDB_DIR
# - 以及注入以下 env供 Advanced command 使用(无需用户手动修改 project
# - MVP_TRAINER_LOGGER: "console" 或 "[console,wandb]"
# - MVP_WANDB_PROJECT: "<user_id>_project"
# - MVP_WANDB_RUN: "<ray_submission_id>"
#
nnodes: 2 # 训练节点数(必填):用于平台队列调度与资源预检查
n_gpus_per_node: 4 # 每节点 GPU 数(必填):用于平台队列调度与资源预检查
command: |
# 注意PPO 需要一些关键参数,否则 VERL 会在启动前 fail-fast例如 actor 的 micro batch
PYTHONUNBUFFERED=1 \
python3 -m verl.trainer.main_ppo \
data.train_files=$HOME/common/datasets/gsm8k/train.parquet \
@ -391,7 +425,9 @@ command: |
critic.model.path=Qwen/Qwen2.5-0.5B-Instruct \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.kl_ctrl.kl_coef=0.001 \
trainer.logger=console \
trainer.logger=${MVP_TRAINER_LOGGER} \
trainer.project_name=${MVP_WANDB_PROJECT} \
trainer.experiment_name=${MVP_WANDB_RUN} \
trainer.val_before_train=False \
trainer.nnodes=2 \
trainer.n_gpus_per_node=4 \
@ -426,6 +462,32 @@ command: |
--backend fsdp \
--local_dir $HOME/jobs/<RAY_SUBMISSION_ID>/checkpoints/<GLOBAL_STEP>/actor \
--target_dir $HOME/jobs/<RAY_SUBMISSION_ID>/checkpoints/<GLOBAL_STEP>/actor/huggingface
""".strip()
evaluation = """# Evaluation (YAML) - v3.6 (Advanced command)
# 用途:对“已生成的结果 parquet”做离线评估基于 custom reward function示例使用 VERL 内置 main_eval。
#
# 说明:
# - 你需要准备一个包含生成 responses 的 parquet并替换 <EVAL_PARQUET>(示例放在某个 job 目录下)。
# - main_eval 会在 Ray 上并发计算,因此这里也用 +ray_kwargs 连接已有 Ray 集群。
#
kind: advanced
nnodes: 1
n_gpus_per_node: 0
command: |
PYTHONUNBUFFERED=1 \
python3 -m verl.trainer.main_eval \
data.path=$HOME/jobs/<RAY_SUBMISSION_ID>/outputs/<EVAL_PARQUET>.parquet \
+ray_kwargs.ray_init.address=auto
# 可选:指定 parquet schema按你的 parquet 列名调整)
# data.response_key=responses
# data.data_source_key=data_source
# data.reward_model_key=reward_model
#
# 可选:自定义 reward方式 A用户自行提供 reward.py
# custom_reward_function.path=$HOME/code/reward.py
# custom_reward_function.name=compute_score
""".strip()
body = f"""
<h1>New Task</h1>
@ -447,6 +509,7 @@ command: |
<button class="btn" id="tpl-grpo">GRPO example</button>
<button class="btn" id="tpl-sft">SFT example</button>
<button class="btn" id="tpl-adv">Advanced example</button>
<button class="btn" id="tpl-eval">Evaluation example</button>
<button class="btn" id="tpl-merge">Model merge example</button>
</div>
<div style="height:10px"></div>
@ -603,21 +666,24 @@ command: |
tpl_grpo = json.dumps(grpo)
tpl_sft = json.dumps(sft)
tpl_adv = json.dumps(adv)
tpl_eval = json.dumps(evaluation)
tpl_merge = json.dumps(merge)
script = (
"""
const msg = document.getElementById("msg");
const yamlEl = document.getElementById("yaml");
const TPL_PPO = __TPL_PPO__;
const TPL_GRPO = __TPL_GRPO__;
const TPL_SFT = __TPL_SFT__;
const TPL_ADV = __TPL_ADV__;
const TPL_MERGE = __TPL_MERGE__;
document.getElementById("tpl-ppo").onclick = () => { yamlEl.value = TPL_PPO; msg.textContent = ""; };
document.getElementById("tpl-grpo").onclick = () => { yamlEl.value = TPL_GRPO; msg.textContent = ""; };
document.getElementById("tpl-sft").onclick = () => { yamlEl.value = TPL_SFT; msg.textContent = ""; };
document.getElementById("tpl-adv").onclick = () => { yamlEl.value = TPL_ADV; msg.textContent = ""; };
document.getElementById("tpl-merge").onclick = () => { yamlEl.value = TPL_MERGE; msg.textContent = ""; };
const msg = document.getElementById("msg");
const yamlEl = document.getElementById("yaml");
const TPL_PPO = __TPL_PPO__;
const TPL_GRPO = __TPL_GRPO__;
const TPL_SFT = __TPL_SFT__;
const TPL_ADV = __TPL_ADV__;
const TPL_EVAL = __TPL_EVAL__;
const TPL_MERGE = __TPL_MERGE__;
document.getElementById("tpl-ppo").onclick = () => { yamlEl.value = TPL_PPO; msg.textContent = ""; };
document.getElementById("tpl-grpo").onclick = () => { yamlEl.value = TPL_GRPO; msg.textContent = ""; };
document.getElementById("tpl-sft").onclick = () => { yamlEl.value = TPL_SFT; msg.textContent = ""; };
document.getElementById("tpl-adv").onclick = () => { yamlEl.value = TPL_ADV; msg.textContent = ""; };
document.getElementById("tpl-eval").onclick = () => { yamlEl.value = TPL_EVAL; msg.textContent = ""; };
document.getElementById("tpl-merge").onclick = () => { yamlEl.value = TPL_MERGE; msg.textContent = ""; };
function yamlQuote(s) {
s = String(s ?? "");
@ -802,11 +868,12 @@ document.getElementById("submit").onclick = async () => {
msg.textContent = "OK: " + fmtJson(obj);
if (obj.task_id) window.location.href = "/ui/tasks/" + obj.task_id;
};
""".strip()
""".strip()
.replace("__TPL_PPO__", tpl_ppo)
.replace("__TPL_GRPO__", tpl_grpo)
.replace("__TPL_SFT__", tpl_sft)
.replace("__TPL_ADV__", tpl_adv)
.replace("__TPL_EVAL__", tpl_eval)
.replace("__TPL_MERGE__", tpl_merge)
)
return HTMLResponse(content=_page("New Task", "new", body, script))
@ -827,6 +894,19 @@ document.getElementById("submit").onclick = async () => {
<pre id="out" class="muted">Loading...</pre>
</div>
<div style="height:12px"></div>
<div class="card">
<h3 style="margin-top:0">W&amp;B</h3>
<div class="muted">This is a best-effort hint. Run name maps to <code>ray_submission_id</code> (attempt).</div>
<div style="height:10px"></div>
<div class="row">
<a class="btn" id="wandb-open" target="_blank" rel="noopener" href="#">Open W&amp;B (:8090)</a>
<button class="btn" id="wandb-copy-run">Copy run</button>
</div>
<div style="height:10px"></div>
<div class="muted">project: <code id="wandb-project">(unknown)</code></div>
<div class="muted">run: <code id="wandb-run">(unknown)</code></div>
</div>
<div style="height:12px"></div>
<div class="card">
<h3 style="margin-top:0">TaskSpec (YAML)</h3>
<div class="muted">Resolved TaskSpec (includes default values; submission_id reflects latest attempt when available).</div>
@ -836,6 +916,8 @@ document.getElementById("submit").onclick = async () => {
""".strip()
script = f"""
document.getElementById("nav-ray-dashboard").href = curOriginWithPort(8265);
document.getElementById("wandb-open").href = curOriginWithPort(8090);
document.getElementById("wandb-copy-run").onclick = async () => {{ await copyText((document.getElementById("wandb-run").textContent || \"\").trim()); }};
const out = document.getElementById("out");
const spec = document.getElementById("spec");
async function refresh() {{
@ -844,7 +926,18 @@ async function refresh() {{
const resp = await apiFetch("/api/v2/tasks/{task_id}");
const text = await resp.text();
if (!resp.ok) {{ out.textContent = "Error: " + resp.status + "\\n" + text; return; }}
out.textContent = fmtJson(JSON.parse(text));
const obj = JSON.parse(text);
out.textContent = fmtJson(obj);
const p = document.getElementById("wandb-project");
const r = document.getElementById("wandb-run");
const w = (obj.latest_attempt && obj.latest_attempt.wandb) ? obj.latest_attempt.wandb : null;
if (w) {{
p.textContent = w.project_name || "(unknown)";
r.textContent = w.run_name || "(unknown)";
}} else {{
p.textContent = "(not available)";
r.textContent = "(not available)";
}}
const resp2 = await apiFetch("/api/v2/tasks/{task_id}/spec");
const text2 = await resp2.text();

View File

@ -164,6 +164,37 @@ def test_task_submit_get_cancel_logs_queue(tmp_path: Path, monkeypatch):
assert r5.text.strip() == "c"
def test_me_includes_wandb_when_enabled(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod
cfg_path = _write_config(tmp_path)
# Enable tracking.wandb
cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8"))
cfg["tracking"] = {"wandb": {"enabled": True, "base_url": "http://172.22.0.1:8090"}}
cfg_path.write_text(yaml.safe_dump(cfg), encoding="utf-8")
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "token1")
class _Scheduler:
def __init__(self, **kwargs):
self.tool = object()
def run_forever(self, stop_flag):
return None
monkeypatch.setattr(app_mod, "Scheduler", _Scheduler)
app = app_mod.create_app(str(cfg_path))
with TestClient(app) as c:
r = c.get("/api/v2/me", headers={"authorization": "Bearer token1"})
assert r.status_code == 200
obj = r.json()
assert obj["user_id"] == "admin"
assert obj.get("wandb", {}).get("enabled") is True
assert obj["wandb"]["base_url"] == "http://172.22.0.1:8090"
assert obj["wandb"]["project_name"] == "admin_project"
def test_submit_rejects_non_mapping_jobspec(tmp_path: Path, monkeypatch):
from argus.service import app as app_mod

View File

@ -60,10 +60,21 @@ def test_tick_submits_one_task(monkeypatch, tmp_path: Path):
def __init__(self, cfg):
self.submitted = []
self.job_dirs = []
self.extra_env = []
self.extra_overrides = []
def submit(self, spec, no_wait: bool, job_dir: str | None = None):
def submit(
self,
spec,
no_wait: bool,
job_dir: str | None = None,
extra_env_vars: dict | None = None,
extra_overrides: list[str] | None = None,
):
self.submitted.append(spec.submission_id)
self.job_dirs.append(job_dir)
self.extra_env.append(extra_env_vars)
self.extra_overrides.append(extra_overrides)
return str(spec.submission_id)
def status(self, submission_id: str):
@ -83,6 +94,11 @@ def test_tick_submits_one_task(monkeypatch, tmp_path: Path):
assert len(attempts) == 1
assert attempts[0]["ray_submission_id"] == "t1--a01"
assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01"
env = s.tool.extra_env[-1]
assert env and env["MVP_WANDB_PROJECT"] == "alice_project"
assert env["MVP_WANDB_RUN"] == "t1--a01"
assert env["MVP_TRAINER_LOGGER"] == "console"
assert s.tool.extra_overrides[-1] is None
def test_tick_submits_one_task_advanced(monkeypatch, tmp_path: Path):
@ -118,13 +134,23 @@ def test_tick_submits_one_task_advanced(monkeypatch, tmp_path: Path):
def __init__(self, cfg):
self.submitted = []
self.job_dirs = []
self.extra_env = []
def submit(self, spec, no_wait: bool, job_dir: str | None = None):
raise AssertionError("should not call submit() for advanced")
def submit_advanced(self, spec, no_wait: bool, job_dir: str | None = None, user_id: str | None = None):
def submit_advanced(
self,
spec,
no_wait: bool,
job_dir: str | None = None,
user_id: str | None = None,
extra_env_vars: dict | None = None,
extra_overrides: list[str] | None = None,
):
self.submitted.append(spec.submission_id)
self.job_dirs.append(job_dir)
self.extra_env.append(extra_env_vars)
return str(spec.submission_id)
def status(self, submission_id: str):
@ -144,6 +170,252 @@ def test_tick_submits_one_task_advanced(monkeypatch, tmp_path: Path):
assert len(attempts) == 1
assert attempts[0]["ray_submission_id"] == "t1--a01"
assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01"
env = s.tool.extra_env[-1]
assert env and env["MVP_WANDB_PROJECT"] == "alice_project"
assert env["MVP_WANDB_RUN"] == "t1--a01"
assert env["MVP_TRAINER_LOGGER"] == "console"
def test_tick_injects_wandb_env_when_enabled(monkeypatch, tmp_path: Path):
from argus.service import scheduler as sched_mod
root = {
"ray": {
"address": "http://127.0.0.1:8265",
"shared_root": "/private",
"entrypoint_resources": {"worker_node": 1},
"runtime_env": {"env_vars": {}},
},
"service": {
"sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")},
"scheduler": {"tick_s": 1, "retry_interval_s": 1, "max_running_tasks": 1},
},
"tracking": {"wandb": {"enabled": True, "base_url": "http://172.22.0.1:8090"}},
}
ray_cfg, v2_cfg = RayConfig.from_dict(root), V2Config.from_root_dict(root)
db = Db(v2_cfg.sqlite.db_path)
db.init()
db.create_task_v25(
task_id="t1",
user_id="alice",
workload="ppo",
jobspec_yaml=yaml.safe_dump(
{
"workload": "ppo",
"code_path": "/private/common/code/verl",
"model_id": "m",
"train_file": "/private/common/datasets/t",
"val_file": "/private/common/datasets/v",
}
),
nnodes=2,
n_gpus_per_node=4,
)
monkeypatch.setenv("WANDB_API_KEY", "k")
monkeypatch.setattr(sched_mod, "ensure_ray_connected", lambda: None)
monkeypatch.setattr(
sched_mod,
"get_cluster_available",
lambda: SimpleNamespace(total_available_gpus=999.0, total_available_npus=0.0),
)
class _Tool:
def __init__(self, cfg):
self.extra_env = []
self.job_dirs = []
def submit(
self,
spec,
no_wait: bool,
job_dir: str | None = None,
extra_env_vars: dict | None = None,
extra_overrides: list[str] | None = None,
):
self.job_dirs.append(job_dir)
self.extra_env.append(extra_env_vars)
return str(spec.submission_id)
def status(self, submission_id: str):
return "RUNNING"
def logs(self, submission_id: str):
return ""
monkeypatch.setattr(sched_mod, "RayJobTool", _Tool)
s = sched_mod.Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg)
s.tick()
assert s.tool.job_dirs[-1] == "/private/users/alice/jobs/t1--a01"
env = s.tool.extra_env[-1]
assert env and env["WANDB_API_KEY"] == "k"
assert env["WANDB_BASE_URL"] == "http://172.22.0.1:8090"
assert env["WANDB_MODE"] == "online"
assert env["WANDB_DIR"] == "/private/users/alice/jobs/t1--a01/wandb"
assert env["MVP_WANDB_PROJECT"] == "alice_project"
assert env["MVP_WANDB_RUN"] == "t1--a01"
assert env["MVP_TRAINER_LOGGER"] == "[console,wandb]"
def test_tick_wandb_enabled_missing_key_degrades(monkeypatch, tmp_path: Path):
from argus.service import scheduler as sched_mod
root = {
"ray": {
"address": "http://127.0.0.1:8265",
"shared_root": "/private",
"entrypoint_resources": {"worker_node": 1},
"runtime_env": {"env_vars": {}},
},
"service": {
"sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")},
"scheduler": {"tick_s": 1, "retry_interval_s": 1, "max_running_tasks": 1},
},
"tracking": {"wandb": {"enabled": True, "base_url": "http://172.22.0.1:8090"}},
}
ray_cfg, v2_cfg = RayConfig.from_dict(root), V2Config.from_root_dict(root)
db = Db(v2_cfg.sqlite.db_path)
db.init()
db.create_task_v25(
task_id="t1",
user_id="alice",
workload="ppo",
jobspec_yaml=yaml.safe_dump(
{
"workload": "ppo",
"code_path": "/private/common/code/verl",
"model_id": "m",
"train_file": "/private/common/datasets/t",
"val_file": "/private/common/datasets/v",
}
),
nnodes=2,
n_gpus_per_node=4,
)
monkeypatch.delenv("WANDB_API_KEY", raising=False)
monkeypatch.setattr(sched_mod, "ensure_ray_connected", lambda: None)
monkeypatch.setattr(
sched_mod,
"get_cluster_available",
lambda: SimpleNamespace(total_available_gpus=999.0, total_available_npus=0.0),
)
class _Tool:
def __init__(self, cfg):
self.extra_env = []
def submit(
self,
spec,
no_wait: bool,
job_dir: str | None = None,
extra_env_vars: dict | None = None,
extra_overrides: list[str] | None = None,
):
self.extra_env.append(extra_env_vars)
return str(spec.submission_id)
def status(self, submission_id: str):
return "RUNNING"
def logs(self, submission_id: str):
return ""
monkeypatch.setattr(sched_mod, "RayJobTool", _Tool)
s = sched_mod.Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg)
s.tick()
env = s.tool.extra_env[-1]
assert env and env["MVP_WANDB_PROJECT"] == "alice_project"
assert env["MVP_WANDB_RUN"] == "t1--a01"
assert env["MVP_TRAINER_LOGGER"] == "console"
attempts = db.list_attempts("t1")
assert attempts and "WANDB_API_KEY" in str(attempts[0].get("message") or "")
def test_tick_wandb_enabled_updates_jobspec_logger(monkeypatch, tmp_path: Path):
from argus.service import scheduler as sched_mod
root = {
"ray": {
"address": "http://127.0.0.1:8265",
"shared_root": "/private",
"entrypoint_resources": {"worker_node": 1},
"runtime_env": {"env_vars": {}},
},
"service": {
"sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")},
"scheduler": {"tick_s": 1, "retry_interval_s": 1, "max_running_tasks": 1},
},
"tracking": {"wandb": {"enabled": True, "base_url": "http://172.22.0.1:8090"}},
}
ray_cfg, v2_cfg = RayConfig.from_dict(root), V2Config.from_root_dict(root)
db = Db(v2_cfg.sqlite.db_path)
db.init()
db.create_task_v25(
task_id="t1",
user_id="alice",
workload="ppo",
jobspec_yaml=yaml.safe_dump(
{
"workload": "ppo",
"code_path": "/private/common/code/verl",
"model_id": "m",
"train_file": "/private/common/datasets/t",
"val_file": "/private/common/datasets/v",
}
),
nnodes=2,
n_gpus_per_node=4,
)
monkeypatch.setenv("WANDB_API_KEY", "k")
monkeypatch.setattr(sched_mod, "ensure_ray_connected", lambda: None)
monkeypatch.setattr(
sched_mod,
"get_cluster_available",
lambda: SimpleNamespace(total_available_gpus=999.0, total_available_npus=0.0),
)
class _Tool:
def __init__(self, cfg):
self.specs = []
self.extra_overrides = []
def submit(
self,
spec,
no_wait: bool,
job_dir: str | None = None,
extra_env_vars: dict | None = None,
extra_overrides: list[str] | None = None,
):
self.specs.append(spec)
self.extra_overrides.append(extra_overrides)
return str(spec.submission_id)
def status(self, submission_id: str):
return "RUNNING"
def logs(self, submission_id: str):
return ""
monkeypatch.setattr(sched_mod, "RayJobTool", _Tool)
s = sched_mod.Scheduler(db=db, ray_cfg=ray_cfg, v2_cfg=v2_cfg)
s.tick()
spec = s.tool.specs[-1]
# v3.6: built-in training should be configured to use wandb when enabled and API key exists.
assert spec.workload == "ppo"
ov = s.tool.extra_overrides[-1]
assert ov and "trainer.logger=[console,wandb]" in ov
assert ov and "trainer.project_name=alice_project" in ov
assert ov and "trainer.experiment_name=t1--a01" in ov
def test_tick_marks_pending_resources(monkeypatch, tmp_path: Path):

View File

@ -22,6 +22,7 @@ def test_v2_config_from_root_dict_new_format_defaults():
assert cfg.auth.token_env == "X"
assert cfg.sqlite.db_path.endswith(".sqlite3")
assert cfg.scheduler.max_running_tasks == 3
assert cfg.tracking.wandb.enabled is False
def test_v2_config_backward_compat_v2_section_and_default_db_path():
@ -29,6 +30,7 @@ def test_v2_config_backward_compat_v2_section_and_default_db_path():
cfg = V2Config.from_root_dict({"shared_root": "/private", "v2": {"sqlite": {}}})
assert cfg.sqlite.db_path == "/private/common/db/mvp.sqlite3"
assert cfg.tracking.wandb.enabled is False
def test_v2_config_requires_mappings():
@ -53,3 +55,49 @@ def test_v2_config_requires_data_mappings():
with pytest.raises(ValueError, match="config\\.data\\.\\{sftpgo,retention\\} must be mappings"):
V2Config.from_root_dict({**base, "data": {"sftpgo": ["x"], "retention": {}}})
def test_tracking_wandb_defaults_disabled():
from argus.service.config import V2Config
cfg = V2Config.from_root_dict(
{
"ray": {"shared_root": "/private"},
"service": {"api": {}, "auth": {}, "sqlite": {}, "scheduler": {}},
"data": {"sftpgo": {}, "retention": {}},
}
)
assert cfg.tracking.wandb.enabled is False
assert cfg.tracking.wandb.base_url == ""
assert cfg.tracking.wandb.api_key_env == "WANDB_API_KEY"
assert cfg.tracking.wandb.project_suffix == "_project"
def test_tracking_wandb_enabled_parses():
from argus.service.config import V2Config
cfg = V2Config.from_root_dict(
{
"ray": {"shared_root": "/private"},
"service": {"api": {}, "auth": {}, "sqlite": {}, "scheduler": {}},
"tracking": {"wandb": {"enabled": True, "base_url": "http://127.0.0.1:8090", "api_key_env": "X"}},
"data": {"sftpgo": {}, "retention": {}},
}
)
assert cfg.tracking.wandb.enabled is True
assert cfg.tracking.wandb.base_url == "http://127.0.0.1:8090"
assert cfg.tracking.wandb.api_key_env == "X"
def test_tracking_wandb_enabled_requires_base_url():
from argus.service.config import V2Config
with pytest.raises(ValueError, match="tracking\\.wandb\\.base_url is required"):
V2Config.from_root_dict(
{
"ray": {"shared_root": "/private"},
"service": {"api": {}, "auth": {}, "sqlite": {}, "scheduler": {}},
"tracking": {"wandb": {"enabled": True, "base_url": ""}},
"data": {"sftpgo": {}, "retention": {}},
}
)

View File

@ -93,5 +93,35 @@ def test_ui_new_task_contains_advanced_example_snippet(tmp_path, monkeypatch):
# workload is not needed for advanced in v3.5.
assert "# workload:" not in r.text
assert "actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu" in r.text
# v3.6: Advanced example uses platform-injected env vars so users don't need to edit W&B project/run.
assert "trainer.logger=${MVP_TRAINER_LOGGER}" in r.text
assert "trainer.project_name=${MVP_WANDB_PROJECT}" in r.text
assert "trainer.experiment_name=${MVP_WANDB_RUN}" in r.text
assert "Model merge example" in r.text
assert "verl.model_merger" in r.text
def test_ui_new_task_contains_evaluation_example(tmp_path, monkeypatch):
cfg = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token")
app = create_app(str(cfg))
c = TestClient(app)
r = c.get("/ui/tasks/new")
assert r.status_code == 200
assert "Evaluation example" in r.text
assert "verl.trainer.main_eval" in r.text
assert "data.path=$HOME/jobs/<RAY_SUBMISSION_ID>/outputs/<EVAL_PARQUET>.parquet" in r.text
assert "+ray_kwargs.ray_init.address=auto" in r.text
def test_ui_login_contains_wandb_section(tmp_path, monkeypatch):
cfg = _write_config(tmp_path)
monkeypatch.setenv("MVP_INTERNAL_TOKEN", "admin-token")
app = create_app(str(cfg))
c = TestClient(app)
r = c.get("/ui/login")
assert r.status_code == 200
assert "W&amp;B" in r.text
assert "Open W&amp;B" in r.text

View File

@ -31,6 +31,10 @@ echo "[host] ensure shared dirs exist under ../../shared"
mkdir -p "${ROOT_DIR}/../../shared"/{datasets,hf,jobs,outputs,ray,common,user}
mkdir -p "${ROOT_DIR}/../../shared/common"/{code,datasets,models}
mkdir -p "${ROOT_DIR}/../../shared/user"/{code}
# v3.6: W&B local server persists metadata under /private/common/wandb (mounted as /vol).
# The wandb container runs as a non-root user; ensure the bind mount is writable.
mkdir -p "${ROOT_DIR}/../../shared/common/wandb"
chmod 777 "${ROOT_DIR}/../../shared/common/wandb" 2>/dev/null || true
echo "[host] ensure verl repo exists under ../../verl (required by prepare scripts)"
if [[ ! -d "${ROOT_DIR}/../../verl" ]]; then

View File

@ -25,6 +25,9 @@ fi
env_args=(-e "MVP_INTERNAL_TOKEN=${MVP_INTERNAL_TOKEN}")
# If host does not provide it, fall back to the dev default used by docker-compose (kept in sync).
env_args+=(-e "SFTPGO_ADMIN_PASSWORD=${SFTPGO_ADMIN_PASSWORD:-my-dev-sftpgo-admin}")
if [[ -n "${WANDB_API_KEY:-}" ]]; then
env_args+=(-e "WANDB_API_KEY=${WANDB_API_KEY}")
fi
docker exec -d "${env_args[@]}" "${HEAD_CONTAINER}" bash -lc "nohup python3 /workspace/mvp/py/server.py --config '${CONFIG_IN_CONTAINER}' >>'${LOG_PATH}' 2>&1 & echo \$! >'${PID_PATH}'"