v3.8 改成每个模型独立app,测试通过容器down, ray serve自动迁移模型
This commit is contained in:
parent
686739fea2
commit
938f84f1ba
BIN
specs/mvp/v3.8/Snipaste_2026-01-08_17-17-57.png
Normal file
BIN
specs/mvp/v3.8/Snipaste_2026-01-08_17-17-57.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 403 KiB |
189
specs/mvp/v3.8/v3.8_per_model_app.md
Normal file
189
specs/mvp/v3.8/v3.8_per_model_app.md
Normal file
@ -0,0 +1,189 @@
|
||||
# v3.8 方案补充:每个模型一个 Ray Serve App(隔离增删影响)
|
||||
|
||||
## 背景与问题复现
|
||||
|
||||
当前 v3.8 的实现采用 **单 application + 多模型** 的方式:
|
||||
|
||||
- 服务层每次 reconcile 都会构造“全量 llm_configs”并调用一次 `serve.run(app, name="argus_llm_app", route_prefix="/")`
|
||||
- **新增/删除一个模型**会触发对同一个 app 的“整体更新”
|
||||
- Ray Serve 在 app 更新时会对该 app 内的 deployments/replicas 做滚动更新与重新调度
|
||||
|
||||
因此你在 Ray Dashboard 中观察到:
|
||||
|
||||
- 添加/删除一个模型时,其他模型的 Serve deployment 也进入更新状态
|
||||
- 内存/显存占用重新变化,甚至出现 GPU 卡位变化(replica 重新调度到不同 node/GPU)
|
||||
|
||||
这与“其他未变更 model 不受影响”的期望不一致。
|
||||
|
||||
---
|
||||
|
||||
## 目标
|
||||
|
||||
将 serving 架构调整为:
|
||||
|
||||
- **每个模型一个 Serve App(独立 app name)**
|
||||
- 每个模型一个独立 `route_prefix`
|
||||
- 新增/删除/缩放某个模型只更新该模型对应的 app,不影响其他模型 app
|
||||
|
||||
约束保持不变:
|
||||
|
||||
- 推理端口固定 `8000`
|
||||
- 推理侧不接入现有 token 鉴权(OpenAI endpoint 无鉴权)
|
||||
- `model_id` 前缀规则:`<user_id>-<YYYYMMDDHHMM>-<suffix>`
|
||||
- `LLMConfig.accelerator_type` 由 `configs/dev.yaml` 配置(dev/h1: `H20`)
|
||||
|
||||
---
|
||||
|
||||
## 总体设计
|
||||
|
||||
### 1) 命名与路由
|
||||
|
||||
为每个 model 生成:
|
||||
|
||||
- `app_name`:建议直接使用 `model_key`(天然唯一且 URL-safe),例如:
|
||||
- `app_name = "mvp2-alice-serve-20260106-060203-aad8"`
|
||||
- `route_prefix`:建议使用 model_key,避免 model_id 中的 `.`、`_` 等带来的 URL/编码歧义:
|
||||
- `route_prefix = f"/serve/{model_key}"`
|
||||
|
||||
于是该模型的 OpenAI base url 为:
|
||||
|
||||
- `openai_base_url = http://<host>:8000/serve/<model_key>/v1`
|
||||
|
||||
说明:
|
||||
|
||||
- 仍然是 **OpenAI-compatible**,只是 base_url 不再是根路径 `/v1`,而是每个模型一个前缀。
|
||||
- 这样可以做到“每个模型的 OpenAI endpoint 互不影响”,也更容易做按模型的观测/下线。
|
||||
|
||||
### 2) 运行方式(Ray Serve)
|
||||
|
||||
单模型 app 的创建/更新:
|
||||
|
||||
- `app = build_openai_app({"llm_configs":[LLMConfig(...)]})`
|
||||
- `serve.run(app, name=app_name, route_prefix=route_prefix)`
|
||||
|
||||
单模型 app 的删除:
|
||||
|
||||
- `serve.delete(app_name)`
|
||||
|
||||
关键点:
|
||||
|
||||
- **更新/删除只作用于对应 app_name**;其它 app 不会被 serve.run “整体重建”触发滚动更新。
|
||||
|
||||
### 3) 服务层(Scheduler/Reconciler)改造点(高层)
|
||||
|
||||
现状:`ServingReconciler.tick()` 每次对“全量模型集合” apply 一次 app。
|
||||
|
||||
目标:改成按 model_key 的“局部 reconcile”:
|
||||
|
||||
- 对于状态 `QUEUED` 的 model:
|
||||
- 只构建该 model 的 `LLMConfig`
|
||||
- `serve.run(app, name=model_key, route_prefix="/serve/<model_key>")`
|
||||
- 状态:`DEPLOYING` →(probe 成功)→ `RUNNING`
|
||||
- 对于状态 `DELETING` 的 model:
|
||||
- `serve.delete(model_key)`
|
||||
- 状态:`DELETED`
|
||||
|
||||
资源预检查:
|
||||
|
||||
- 只需要预检查“本次变更模型”需要的 GPU(`num_replicas * gpus_per_replica`)
|
||||
- 不需要把其他模型资源都算入 needed_total_gpus(因为不再重建全量 app)
|
||||
|
||||
### 4) API/UI 返回的 endpoint 结构
|
||||
|
||||
现状 API 返回:
|
||||
|
||||
- `endpoint.openai_base_url = http://<host>:8000/v1`
|
||||
- `endpoint.model = <model_id>`
|
||||
|
||||
建议改为(字段不变,值变化):
|
||||
|
||||
- `endpoint.openai_base_url = http://<host>:8000/serve/<model_key>/v1`
|
||||
- `endpoint.model = <model_id>`(保持)
|
||||
|
||||
UI 的示例 curl 也应使用上面的 base_url。
|
||||
|
||||
---
|
||||
|
||||
## 行为变化与兼容性影响
|
||||
|
||||
### 1) `/v1/models` 聚合能力变化(重要)
|
||||
|
||||
采用“每模型一个 route_prefix”后:
|
||||
|
||||
- `http://<host>:8000/v1/models` **不再是“所有模型的总览”**(除非我们再提供一个聚合层)
|
||||
- 每个模型的 models list 在它自己的前缀下:
|
||||
- `http://<host>:8000/serve/<model_key>/v1/models`
|
||||
|
||||
如果仍然希望保留一个统一入口(可选增强,非本方案必做):
|
||||
|
||||
- 额外引入一个“稳定不重建”的 **OpenAI Router**(可以是:
|
||||
- FastAPI(8080) 侧做反向代理;或
|
||||
- 一个单独 Ray Serve app 只负责路由,不随模型变更重建)
|
||||
- Router 读取 SQLite/内存缓存的 model 映射:
|
||||
- `model_id -> route_prefix`
|
||||
- 将 `/v1/chat/completions` 转发到对应 model 的 prefix
|
||||
|
||||
这可以作为 v3.9+ 的增强项;v3.8 的核心目标是“变更隔离”,优先保证稳定性。
|
||||
|
||||
### 2) 资源与调度稳定性
|
||||
|
||||
改为 per-app 后:
|
||||
|
||||
- 新增模型 B 不再引起模型 A 的 replica 重新调度 → **A 的 GPU/内存占用更稳定**
|
||||
- 删除模型 B 也不会触发 A 的滚动更新
|
||||
|
||||
但仍需注意:
|
||||
|
||||
- 如果 Ray 集群发生节点故障/资源回收,Serve 本身仍可能重启个别 replica(这是系统层行为)
|
||||
|
||||
---
|
||||
|
||||
## 验证与验收流程(建议)
|
||||
|
||||
### A. 功能验收(API/UI)
|
||||
|
||||
1. 通过 UI/或 API 创建模型 A,等待 RUNNING
|
||||
2. 记录 A 的:
|
||||
- `model_key_A`
|
||||
- `endpoint.openai_base_url_A`
|
||||
3. 再创建模型 B,等待 RUNNING
|
||||
4. 确认:
|
||||
- A 的 endpoint 仍可用(对 A 的 base_url 发 chat completion)
|
||||
- B 的 endpoint 可用
|
||||
5. 删除模型 B,确认:
|
||||
- B endpoint 404/不可用
|
||||
- A endpoint 仍可用
|
||||
|
||||
### B. “不影响其它模型”的强验证(Ray actor 级别)
|
||||
|
||||
在 Ray 上抓取 A 对应 `LLMServer` replica 的 actor_id/node_id:
|
||||
|
||||
- 创建 B 前:`actor_id_A_before`
|
||||
- 创建 B 后:`actor_id_A_after`
|
||||
- 删除 B 后:`actor_id_A_after_delete`
|
||||
|
||||
预期:
|
||||
|
||||
- `actor_id_A_before == actor_id_A_after == actor_id_A_after_delete`
|
||||
|
||||
(允许 `LLMRouter` 变化,但 **LLMServer for A 不应变化**)
|
||||
|
||||
---
|
||||
|
||||
## 需要修改的代码点(清单级)
|
||||
|
||||
> 这里只列“改哪里”,不展开具体实现(实现时按 TDD 补单测)。
|
||||
|
||||
- `argus.service.serving_reconciler`:
|
||||
- 从“全量 apply 单 app”改为“按 model_key 局部 apply/delete 单 app”
|
||||
- GPU 预检查改为 per-model
|
||||
- `argus.service.serve_client`:
|
||||
- 增加 `delete_app(app_name)`(封装 `serve.delete(app_name)`)
|
||||
- `apply_app` 传入 `app_name/route_prefix`(已存在,但将不再传固定 app_name)
|
||||
- `argus.service.app`(Serving API 输出):
|
||||
- `_serve_model_public().endpoint.openai_base_url` 改为 per-model 前缀
|
||||
- `/api/v2/serve/models` list/get 的 openai_base_url 语义调整(可返回“该模型的 base_url”,列表里每条都有)
|
||||
- `argus.service.ui`(Serving 页面):
|
||||
- “OpenAI /v1/models” 需要调整为“选择某个模型后打开该模型的 /v1/models”
|
||||
- 详情页 curl 示例使用 per-model base_url
|
||||
|
||||
174
specs/mvp/v3.8/v3.8_per_model_app_dev_plan.md
Normal file
174
specs/mvp/v3.8/v3.8_per_model_app_dev_plan.md
Normal file
@ -0,0 +1,174 @@
|
||||
# MVP v3.8(变更)开发计划:Per-Model Serve App(TDD)
|
||||
|
||||
> 目标:按 `specs/mvp/v3.8/v3.8_per_model_app.md` 将 v3.8 从“单 app 多模型(全量重建)”改为“**每个模型一个 Ray Serve app + 独立 route_prefix**”,实现增删/缩放某个模型不触发其它模型重启与重调度。
|
||||
|
||||
## 约束与结论
|
||||
|
||||
- 推理端口固定:`8000`
|
||||
- 推理 endpoint **不做鉴权**
|
||||
- `model_id` 前缀规则:`<user_id>-<YYYYMMDDHHMM>-<suffix>`
|
||||
- `LLMConfig.accelerator_type` 由 `configs/dev.yaml` 决定(dev/h1: `H20`)
|
||||
- 路由方案(本迭代固定):
|
||||
- `app_name = model_key`
|
||||
- `route_prefix = /serve/<model_key>`
|
||||
- `openai_base_url = http://<host>:8000/serve/<model_key>/v1`
|
||||
|
||||
## 非目标(明确不做)
|
||||
|
||||
- 不提供统一 `/v1` 的“跨模型聚合路由”(如要,需要额外 router 层;可在后续迭代做)
|
||||
- 不改 ServingSpec 语义(输入仍为 `model_id/model_source/num_replicas/gpus_per_replica/engine_kwargs`)
|
||||
|
||||
---
|
||||
|
||||
## M0 - 基线回归与分支保护
|
||||
|
||||
**目的**:确保切换架构前训练/现有功能不回退。
|
||||
|
||||
### 测试
|
||||
- [ ] 本地:`.venv/bin/python -m pytest` 全绿(coverage ≥ 90%)
|
||||
|
||||
### 验收
|
||||
- [ ] 基线可用;进入 M1
|
||||
|
||||
---
|
||||
|
||||
## M1 - API 输出与 endpoint 语义调整(单测驱动)
|
||||
|
||||
**目的**:API/DB/前端都统一 per-model 的 `openai_base_url` 语义;避免 UI/脚本继续使用 `/v1` 根路径。
|
||||
|
||||
### 变更点
|
||||
- `GET /api/v2/serve/models`:
|
||||
- 保持返回 `items[]`,但每个 item 的 `endpoint.openai_base_url` 必须是 per-model base url
|
||||
- `openai_base_url`(列表层级字段)处理策略二选一:
|
||||
- A(推荐):移除该字段(breaking,需同步 UI/脚本)
|
||||
- B(兼容):保留但改为 `null` 或提示字符串(不再保证可用)
|
||||
- `GET /api/v2/serve/models/{model_key}`:
|
||||
- `model.endpoint.openai_base_url` 改为 per-model base url
|
||||
|
||||
### 单测(先写)
|
||||
- [ ] 更新/新增 `src/mvp/py/tests/test_app_serving_api.py`
|
||||
- 断言 `endpoint.openai_base_url` 包含 `/serve/<model_key>/v1`
|
||||
- 断言多条 models 的 base_url 不相同(随 model_key 变化)
|
||||
|
||||
### 实现
|
||||
- [ ] 更新 `src/mvp/py/argus/service/app.py`:
|
||||
- `_serve_model_public()` 的 `endpoint.openai_base_url` 拼接 per-model prefix
|
||||
- 如选择移除/调整 list 层的 `openai_base_url`,同步实现
|
||||
|
||||
### 验收
|
||||
- [ ] API 单测通过;返回结构可被 UI/脚本消费
|
||||
|
||||
---
|
||||
|
||||
## M2 - ServeClient 扩展(delete_app)+ Reconciler 改造成 per-model(单测驱动)
|
||||
|
||||
**目的**:核心行为变更:每次 tick 只部署/删除一个模型对应的 app,不重建全量 app。
|
||||
|
||||
### 变更点(行为)
|
||||
- `QUEUED`:
|
||||
- 对该 `model_key` 执行 `serve.run(app, name=model_key, route_prefix=/serve/<model_key>)`
|
||||
- 状态:`DEPLOYING → RUNNING`
|
||||
- `DELETING`:
|
||||
- 对该 `model_key` 执行 `serve.delete(model_key)`
|
||||
- 状态:`DELETED`
|
||||
- 资源预检查从“全量 needed_total_gpus”改为“本次变更模型所需 GPU”
|
||||
|
||||
### 单测(先写)
|
||||
- [ ] 更新 `src/mvp/py/tests/test_serving_reconciler.py`
|
||||
- `create A` 后,reconciler 只 `apply_app(app_name=A.model_key, route_prefix=/serve/A)`
|
||||
- `create B` 后,reconciler 只 `apply_app(app_name=B.model_key, route_prefix=/serve/B)`(不再对 A 触发 apply)
|
||||
- `delete B` 后,reconciler 只 `delete_app(B.model_key)`(不触发 A)
|
||||
- GPU 不足时:保持 `QUEUED` 且 event=SERVE_PENDING_RESOURCES
|
||||
|
||||
### 实现
|
||||
- [ ] `src/mvp/py/argus/service/serve_client.py`
|
||||
- 增加 `delete_app(app_name: str)`(封装 `serve.delete`)
|
||||
- [ ] `src/mvp/py/argus/service/serving_reconciler.py`
|
||||
- 移除“全量 app apply”逻辑
|
||||
- 每个 model_key 独立部署:`app_name=model_key`、`route_prefix=/serve/<model_key>`
|
||||
- 删除路径走 `delete_app`
|
||||
|
||||
### 验收
|
||||
- [ ] reconciler 单测全绿;逻辑可解释(events/state 正确)
|
||||
|
||||
---
|
||||
|
||||
## M3 - WebUI Serving 页面适配 per-model base_url(单测驱动)
|
||||
|
||||
**目的**:用户从 UI 复制的示例命令必须可用;不再指向根 `/v1`。
|
||||
|
||||
### 变更点
|
||||
- `/ui/serving` 列表:
|
||||
- “OpenAI /v1/models”按钮改为:
|
||||
- A:移除(因为没有聚合 `/v1/models`)
|
||||
- B:保留但文案改为“OpenAI base 需进入详情页”
|
||||
- `/ui/serving/{model_key}` 详情页:
|
||||
- `curl` 示例使用 per-model `openai_base_url`
|
||||
- 增加一键打开:`/serve/<model_key>/v1/models`
|
||||
|
||||
### 单测(先写)
|
||||
- [ ] 更新/新增 `src/mvp/py/tests/test_ui_serving.py`
|
||||
- 断言页面包含 `/serve/` 前缀
|
||||
- 断言详情页示例里包含 `/serve/<model_key>/v1/chat/completions`
|
||||
|
||||
### 实现
|
||||
- [ ] `src/mvp/py/argus/service/ui.py` 更新 Serving UI
|
||||
|
||||
### 验收
|
||||
- [ ] UI 单测全绿;页面内容与 API 语义一致
|
||||
|
||||
---
|
||||
|
||||
## M4 - E2E 脚本更新(v3.8 serving)
|
||||
|
||||
**目的**:在 dev/h1 一键验证 per-model app:A/B 增删不互相影响,且推理可用。
|
||||
|
||||
### 变更点
|
||||
- 更新 `src/mvp/scripts/run_all_v38_serving.sh`:
|
||||
- `/v1/models` 与 `chat/completions` 均改用 per-model base_url(`/serve/<model_key>/v1`)
|
||||
- 增加“隔离验证”步骤:
|
||||
- 部署 A → 记录 A 的 serve replica actor_id/node_id
|
||||
- 部署 B → 再次记录 A 的 actor_id/node_id,必须一致
|
||||
- 删除 B → 再次记录 A 的 actor_id/node_id,必须一致
|
||||
- 最后删除 A
|
||||
|
||||
### 验收
|
||||
- [ ] E2E 脚本能跑通且输出明确断言(一致/不一致)
|
||||
|
||||
---
|
||||
|
||||
## M5 - h1 端到端验证与回归
|
||||
|
||||
**目的**:确认实际 Ray Serve 行为满足“其它模型不滚动更新”的核心目标。
|
||||
|
||||
### 操作
|
||||
- [ ] 同步代码到:`argus@h1:/home2/argus/infra/mvp/src/mvp`
|
||||
- [ ] 重启 API:`scripts/61_stop_api.sh && scripts/60_start_api.sh`
|
||||
- [ ] 执行:`MVP_INTERNAL_TOKEN=... scripts/run_all_v38_serving.sh`
|
||||
|
||||
### 验收标准(必须满足)
|
||||
- [ ] 新增/删除 B 时,A 的 `LLMServer` replica actor_id/node_id 不变
|
||||
- [ ] A/B 的 OpenAI endpoint 均可完成 `chat/completions`
|
||||
- [ ] 删除 B 后 A 仍可推理
|
||||
|
||||
---
|
||||
|
||||
## M6 - 文档与迁移说明
|
||||
|
||||
**目的**:明确“路由语义变化”和“如何使用”。
|
||||
|
||||
- [ ] 更新 `src/mvp/README.md`:
|
||||
- 新增 per-model base_url 说明(`/serve/<model_key>/v1`)
|
||||
- 提示不再提供聚合 `/v1/models`
|
||||
- [ ] 更新 `specs/mvp/v3.8/v3.8_progress.md`:
|
||||
- 记录 per-model app 变更与验收结论
|
||||
|
||||
---
|
||||
|
||||
## 风险与缓解
|
||||
|
||||
- **风险:旧 `argus_llm_app` 残留**
|
||||
- 缓解:在 E2E/迁移步骤里增加一次 best-effort `serve.delete("argus_llm_app")`(可选)
|
||||
- **风险:用户仍按旧方式访问 `/v1`**
|
||||
- 缓解:UI/文档/脚本统一切换到 per-model base_url,并在列表页给出明显提示
|
||||
|
||||
@ -91,19 +91,17 @@ def create_app(config_path: str) -> FastAPI:
|
||||
def _serving_enabled() -> bool:
|
||||
return bool(v2_cfg.serving.enabled)
|
||||
|
||||
def _openai_base_url(req: Request) -> str:
|
||||
def _openai_base_url_for_model(req: Request, *, model_key: str) -> str:
|
||||
# Prefer forwarded headers if present; otherwise fall back to Host.
|
||||
host = req.headers.get("x-forwarded-host") or req.headers.get("host") or req.url.hostname or "127.0.0.1"
|
||||
# Strip port if present (common for Host header).
|
||||
hostname = host
|
||||
if hostname.startswith("[") and "]" in hostname:
|
||||
# IPv6 like: [::1]:8080
|
||||
hostname = hostname.split("]")[0] + "]"
|
||||
else:
|
||||
hostname = hostname.split(":")[0]
|
||||
scheme = req.headers.get("x-forwarded-proto") or req.url.scheme or "http"
|
||||
port = int(v2_cfg.serving.serve.http_port)
|
||||
return f"{scheme}://{hostname}:{port}/v1"
|
||||
return f"{scheme}://{hostname}:{port}/serve/{model_key}/v1"
|
||||
|
||||
def _dump_yaml(obj: Any) -> str:
|
||||
return yaml.safe_dump(obj, sort_keys=False)
|
||||
@ -122,8 +120,9 @@ def create_app(config_path: str) -> FastAPI:
|
||||
gpus_per_replica = int(row.get("gpus_per_replica") or 0)
|
||||
total_gpus = num_replicas * gpus_per_replica
|
||||
model_id = str(row.get("model_id") or "")
|
||||
model_key = str(row.get("model_key") or "")
|
||||
return {
|
||||
"model_key": str(row.get("model_key") or ""),
|
||||
"model_key": model_key,
|
||||
"user_id": str(row.get("user_id") or ""),
|
||||
"model_id": model_id,
|
||||
"model_id_suffix": str(row.get("model_id_suffix") or ""),
|
||||
@ -138,7 +137,7 @@ def create_app(config_path: str) -> FastAPI:
|
||||
"updated_at": str(row.get("updated_at") or ""),
|
||||
"deleted_at": row.get("deleted_at"),
|
||||
"endpoint": {
|
||||
"openai_base_url": _openai_base_url(req),
|
||||
"openai_base_url": _openai_base_url_for_model(req, model_key=model_key),
|
||||
"model": model_id,
|
||||
},
|
||||
}
|
||||
@ -688,7 +687,8 @@ def create_app(config_path: str) -> FastAPI:
|
||||
out = [_serve_model_public(i, req=req) for i in items]
|
||||
return {
|
||||
"items": out,
|
||||
"openai_base_url": _openai_base_url(req),
|
||||
# Per-model route_prefix; see each item.endpoint.openai_base_url.
|
||||
"openai_base_url": None,
|
||||
"limit": lim,
|
||||
"offset": off,
|
||||
"has_more": bool(len(items) == lim),
|
||||
@ -773,7 +773,8 @@ def create_app(config_path: str) -> FastAPI:
|
||||
raise HTTPException(status_code=400, detail="serving is not enabled")
|
||||
return {
|
||||
"enabled": True,
|
||||
"openai_base_url": _openai_base_url(req),
|
||||
# Per-model route_prefix; see /api/v2/serve/models items[].endpoint.openai_base_url.
|
||||
"openai_base_url": None,
|
||||
"http_port": int(v2_cfg.serving.serve.http_port),
|
||||
"proxy_location": str(v2_cfg.serving.serve.proxy_location),
|
||||
"accelerator_type": str(v2_cfg.serving.llm.accelerator_type),
|
||||
|
||||
@ -43,3 +43,8 @@ class RayServeClient:
|
||||
from ray import serve # type: ignore
|
||||
|
||||
return serve.status()
|
||||
|
||||
def delete_app(self, *, app_name: str) -> Any:
|
||||
from ray import serve # type: ignore
|
||||
|
||||
return serve.delete(app_name)
|
||||
|
||||
@ -18,6 +18,8 @@ class ServeClient(Protocol):
|
||||
|
||||
def apply_app(self, *, app: Any, app_name: str, route_prefix: str = "/") -> Any: ...
|
||||
|
||||
def delete_app(self, *, app_name: str) -> Any: ...
|
||||
|
||||
def get_status(self) -> Any: ...
|
||||
|
||||
|
||||
@ -45,21 +47,18 @@ def _row_to_resolved_spec(row: dict[str, Any]) -> ResolvedServingSpec:
|
||||
)
|
||||
|
||||
|
||||
def _needed_total_gpus(rows: list[dict[str, Any]]) -> int:
|
||||
total = 0
|
||||
for r in rows:
|
||||
total += int(r.get("num_replicas") or 0) * int(r.get("gpus_per_replica") or 0)
|
||||
return total
|
||||
def _needed_model_gpus(row: dict[str, Any]) -> int:
|
||||
return int(row.get("num_replicas") or 0) * int(row.get("gpus_per_replica") or 0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServingReconciler:
|
||||
"""
|
||||
v3.8: reconcile declared serve_models (SQLite) into a multi-model Ray Serve app.
|
||||
v3.8 (per-model apps): reconcile serve_models (SQLite) into per-model Ray Serve apps.
|
||||
|
||||
This reconciler is intentionally conservative:
|
||||
- Only acts on models in states QUEUED/DELETING.
|
||||
- Performs a minimal GPU precheck using ray available GPU totals.
|
||||
- Performs a minimal GPU precheck using ray available GPU totals (per-model).
|
||||
- Writes events and state transitions for explainability.
|
||||
"""
|
||||
|
||||
@ -67,8 +66,6 @@ class ServingReconciler:
|
||||
v2_cfg: V2Config
|
||||
ray_runtime_env_env_vars: dict[str, str]
|
||||
serve_client: ServeClient
|
||||
app_name: str = "argus_llm_app"
|
||||
route_prefix: str = "/"
|
||||
cpu_per_gpu: float = 1.0
|
||||
get_available_fn: Any = get_cluster_available
|
||||
|
||||
@ -88,33 +85,40 @@ class ServingReconciler:
|
||||
self.db.append_serve_event(model_key=model_key, event_type="SERVE_START_ERROR", payload_json=repr(e))
|
||||
return
|
||||
|
||||
# Desired set: all non-deleted models except those marked DELETING.
|
||||
all_rows = self.db.list_all_serve_models(include_deleted=False, limit=5000, offset=0)
|
||||
# FAILED models are not part of the desired running set. A user can PATCH to
|
||||
# re-queue a failed model (e.g., after fixing env/deps) which will move it back to QUEUED.
|
||||
desired_rows = [r for r in all_rows if str(r.get("state") or "") not in ("DELETING", "DELETED", "FAILED")]
|
||||
# Deleting is per-model: remove the model's app by name. This avoids touching other models.
|
||||
if state == "DELETING":
|
||||
try:
|
||||
self.db.append_serve_event(model_key=model_key, event_type="SERVE_DELETE_REQUESTED", payload_json=None)
|
||||
self.serve_client.delete_app(app_name=model_key)
|
||||
except Exception as e:
|
||||
err = f"{type(e).__name__}: {e}"
|
||||
tb = traceback.format_exc(limit=10)
|
||||
self.db.set_serve_model_state(
|
||||
model_key=model_key, state="FAILED", error_summary=err, event_type="SERVE_DELETE_FAILED", payload_json=tb
|
||||
)
|
||||
return
|
||||
self.db.set_serve_model_state(model_key=model_key, state="DELETED", event_type="SERVE_DELETE_APPLIED")
|
||||
return
|
||||
|
||||
# Precheck resources: multi-model app apply needs enough GPUs for the whole desired set.
|
||||
needed = _needed_total_gpus(desired_rows)
|
||||
# Precheck resources: per-model apply needs enough GPUs for this model only.
|
||||
needed = _needed_model_gpus(change)
|
||||
avail: ClusterAvailable = self.get_available_fn()
|
||||
if float(avail.total_available_gpus) < float(needed):
|
||||
msg = f"Insufficient GPUs: need {needed}, available {avail.total_available_gpus}"
|
||||
self.db.append_serve_event(model_key=model_key, event_type="SERVE_PENDING_RESOURCES", payload_json=msg)
|
||||
return
|
||||
|
||||
# Build per-model LLM configs (dict form in M4).
|
||||
llm_cfg_dicts: list[dict[str, Any]] = []
|
||||
# Build the single-model LLM config (dict form for unit tests).
|
||||
accelerator_type = str(self.v2_cfg.serving.llm.accelerator_type or "")
|
||||
for r in desired_rows:
|
||||
resolved = _row_to_resolved_spec(r)
|
||||
llm_cfg_dicts.append(
|
||||
build_llm_config_dict(
|
||||
resolved,
|
||||
accelerator_type=accelerator_type,
|
||||
runtime_env_env_vars=self.ray_runtime_env_env_vars,
|
||||
cpu_per_gpu=self.cpu_per_gpu,
|
||||
)
|
||||
resolved = _row_to_resolved_spec(change)
|
||||
llm_cfg_dicts = [
|
||||
build_llm_config_dict(
|
||||
resolved,
|
||||
accelerator_type=accelerator_type,
|
||||
runtime_env_env_vars=self.ray_runtime_env_env_vars,
|
||||
cpu_per_gpu=self.cpu_per_gpu,
|
||||
)
|
||||
]
|
||||
|
||||
# Build a Ray Serve OpenAI-compatible app if Ray Serve LLM is available.
|
||||
# Fall back to a plain dict so unit tests can run without real Ray Serve.
|
||||
@ -129,19 +133,14 @@ class ServingReconciler:
|
||||
app_obj = {"llm_configs": llm_cfg_dicts}
|
||||
|
||||
try:
|
||||
self.db.append_serve_event(model_key=model_key, event_type="SERVE_APPLY_REQUESTED", payload_json=str(len(llm_cfg_dicts)))
|
||||
self.serve_client.apply_app(app=app_obj, app_name=self.app_name, route_prefix=self.route_prefix)
|
||||
self.db.append_serve_event(model_key=model_key, event_type="SERVE_APPLY_REQUESTED", payload_json="1")
|
||||
self.serve_client.apply_app(app=app_obj, app_name=model_key, route_prefix=f"/serve/{model_key}")
|
||||
except Exception as e:
|
||||
err = f"{type(e).__name__}: {e}"
|
||||
tb = traceback.format_exc(limit=10)
|
||||
self.db.set_serve_model_state(model_key=model_key, state="FAILED", error_summary=err, event_type="SERVE_APPLY_FAILED", payload_json=tb)
|
||||
return
|
||||
|
||||
# Apply succeeded. Update the changing model's state.
|
||||
if state == "DELETING":
|
||||
self.db.set_serve_model_state(model_key=model_key, state="DELETED", event_type="SERVE_DELETE_APPLIED")
|
||||
return
|
||||
|
||||
# Mark as deploying; best-effort status probe can promote to RUNNING.
|
||||
self.db.set_serve_model_state(model_key=model_key, state="DEPLOYING", event_type="SERVE_DEPLOYING")
|
||||
try:
|
||||
|
||||
@ -1001,7 +1001,6 @@ refresh();
|
||||
<div class="row">
|
||||
<button class="btn" id="refresh">Refresh</button>
|
||||
<a class="btn" href="/ui/serving/new" style="display:inline-block">New Model</a>
|
||||
<a class="btn" id="openai-models" target="_blank" rel="noopener" href="#">OpenAI /v1/models</a>
|
||||
</div>
|
||||
<div style="height:10px"></div>
|
||||
<div id="out" class="muted">Loading...</div>
|
||||
@ -1009,7 +1008,6 @@ refresh();
|
||||
""".strip()
|
||||
script = """
|
||||
document.getElementById("nav-ray-dashboard").href = curOriginWithPort(8265);
|
||||
document.getElementById("openai-models").href = curOriginWithPort(8000) + "/v1/models";
|
||||
const out = document.getElementById("out");
|
||||
|
||||
function pill(state) {
|
||||
@ -1031,12 +1029,15 @@ async function refresh() {
|
||||
const prevDisabled = off <= 0;
|
||||
const nextDisabled = !hasMore;
|
||||
|
||||
const baseHint = curOriginWithPort(8000) + "/serve/<model_key>/v1";
|
||||
function row(m) {
|
||||
const endpoint = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/serve/" + encodeURIComponent(m.model_key) + "/v1");
|
||||
return `<tr>
|
||||
<td><a href="/ui/serving/${m.model_key}">${m.model_key}</a></td>
|
||||
<td><code>${m.model_id}</code></td>
|
||||
<td>${pill(m.state)}</td>
|
||||
<td>${m.num_replicas} × ${m.gpus_per_replica} GPU</td>
|
||||
<td><code>${endpoint}</code></td>
|
||||
<td>${m.updated_at || ""}</td>
|
||||
</tr>`;
|
||||
}
|
||||
@ -1044,7 +1045,7 @@ async function refresh() {
|
||||
|
||||
out.innerHTML = `
|
||||
<div class="row" style="justify-content: space-between; margin-bottom: 8px;">
|
||||
<div class="muted">OpenAI base: <code>${resp.openai_base_url || curOriginWithPort(8000) + "/v1"}</code></div>
|
||||
<div class="muted">Per-model OpenAI base: <code>${baseHint}</code></div>
|
||||
<div class="row">
|
||||
<span class="muted">Page ${pageNo}</span>
|
||||
<button class="btn" id="prev" ${prevDisabled ? "disabled" : ""}>Prev</button>
|
||||
@ -1052,8 +1053,8 @@ async function refresh() {
|
||||
</div>
|
||||
</div>
|
||||
<table>
|
||||
<thead><tr><th>Model Key</th><th>Model ID</th><th>State</th><th>Resources</th><th>Updated</th></tr></thead>
|
||||
<tbody>${rows || "<tr><td colspan=5 class=muted>(none)</td></tr>"}</tbody>
|
||||
<thead><tr><th>Model Key</th><th>Model ID</th><th>State</th><th>Resources</th><th>Endpoint</th><th>Updated</th></tr></thead>
|
||||
<tbody>${rows || "<tr><td colspan=6 class=muted>(none)</td></tr>"}</tbody>
|
||||
</table>
|
||||
`;
|
||||
|
||||
@ -1160,8 +1161,8 @@ document.getElementById("submit").onclick = async () => {
|
||||
""".strip()
|
||||
script = f"""
|
||||
document.getElementById("nav-ray-dashboard").href = curOriginWithPort(8265);
|
||||
document.getElementById("openai-models").href = curOriginWithPort(8000) + "/v1/models";
|
||||
const modelKey = {json.dumps(model_key)};
|
||||
document.getElementById("openai-models").href = curOriginWithPort(8000) + "/serve/" + encodeURIComponent(modelKey) + "/v1/models";
|
||||
const meta = document.getElementById("meta");
|
||||
const spec = document.getElementById("spec");
|
||||
const eventsEl = document.getElementById("events");
|
||||
@ -1194,19 +1195,20 @@ async function refresh() {{
|
||||
const obj = await apiJson("/api/v2/serve/models/" + encodeURIComponent(modelKey));
|
||||
const m = obj.model || {{}};
|
||||
replicas.value = String(m.num_replicas || 1);
|
||||
const endpoint = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/serve/" + encodeURIComponent(modelKey) + "/v1");
|
||||
meta.innerHTML = `
|
||||
<div class=row>
|
||||
<div>state: ${{pill(m.state)}}</div>
|
||||
<div class=muted>model_id: <code>${{m.model_id || ""}}</code></div>
|
||||
<div class=muted>source: <code>${{m.model_source || ""}}</code></div>
|
||||
</div>
|
||||
<div class=muted>endpoint: <code>${{(m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/v1")}}</code></div>
|
||||
<div class=muted>endpoint: <code>${{endpoint}}</code></div>
|
||||
`;
|
||||
spec.textContent = obj.resolved_spec_yaml || "";
|
||||
eventsEl.innerHTML = renderEvents(obj.events || []);
|
||||
const base = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/v1");
|
||||
const base = endpoint;
|
||||
const mid = m.model_id || "";
|
||||
example.textContent = `curl -sS -H 'Content-Type: application/json' -H 'Authorization: Bearer FAKE_KEY' \\\\\\n -X POST ${{base}}/chat/completions \\\\\\n --data-binary '{{\\"model\\":\\"${{mid}}\\",\\"messages\\":[{{\\"role\\":\\"user\\",\\"content\\":\\"hello\\"}}],\\"max_tokens\\":16,\\"stream\\":false}}' | python3 -m json.tool`;
|
||||
example.textContent = `curl -sS -H 'Content-Type: application/json' \\\\\\n -X POST ${{base}}/chat/completions \\\\\\n --data-binary '{{\\"model\\":\\"${{mid}}\\",\\"messages\\":[{{\\"role\\":\\"user\\",\\"content\\":\\"hello\\"}}],\\"max_tokens\\":16,\\"stream\\":false}}' | python3 -m json.tool`;
|
||||
}} catch (e) {{
|
||||
meta.textContent = "Error: " + (e.status || "") + "\\n" + (e.body || String(e));
|
||||
spec.textContent = "";
|
||||
|
||||
@ -75,9 +75,10 @@ def test_serving_api_crud_flow(tmp_path: Path, monkeypatch):
|
||||
r4 = c.get("/api/v2/serve/models?limit=10&offset=0", headers=headers)
|
||||
assert r4.status_code == 200
|
||||
obj = r4.json()
|
||||
assert obj["openai_base_url"] == "http://testserver:8000/v1"
|
||||
assert obj["openai_base_url"] is None
|
||||
assert len(obj["items"]) == 1
|
||||
assert obj["items"][0]["model_key"] == "mk-alice"
|
||||
assert obj["items"][0]["endpoint"]["openai_base_url"] == "http://testserver:8000/serve/mk-alice/v1"
|
||||
|
||||
r5 = c.get("/api/v2/serve/models/mk-alice", headers=headers)
|
||||
assert r5.status_code == 200
|
||||
@ -99,6 +100,7 @@ def test_serving_api_crud_flow(tmp_path: Path, monkeypatch):
|
||||
r8 = c.get("/api/v2/serve/status", headers=admin_headers)
|
||||
assert r8.status_code == 200
|
||||
assert r8.json()["http_port"] == 8000
|
||||
assert r8.json()["openai_base_url"] is None
|
||||
|
||||
|
||||
def test_serving_api_rejects_path_outside_user_and_hf(tmp_path: Path, monkeypatch):
|
||||
@ -245,8 +247,17 @@ def test_serving_api_list_include_deleted_and_forwarded_base_url(tmp_path: Path,
|
||||
headers={**headers, "x-forwarded-host": "example.com:8080", "x-forwarded-proto": "https"},
|
||||
)
|
||||
assert r1.status_code == 200
|
||||
assert r1.json()["openai_base_url"] == "https://example.com:8000/v1"
|
||||
assert r1.json()["openai_base_url"] is None
|
||||
assert {m["model_key"] for m in r1.json()["items"]} == {"mk-alice-1"}
|
||||
assert r1.json()["items"][0]["endpoint"]["openai_base_url"] == "https://example.com:8000/serve/mk-alice-1/v1"
|
||||
|
||||
# IPv6 bracketed host should be preserved.
|
||||
r1b = c.get(
|
||||
"/api/v2/serve/models?limit=10&offset=0&include_deleted=0",
|
||||
headers={**headers, "x-forwarded-host": "[::1]:8080", "x-forwarded-proto": "https"},
|
||||
)
|
||||
assert r1b.status_code == 200
|
||||
assert r1b.json()["items"][0]["endpoint"]["openai_base_url"] == "https://[::1]:8000/serve/mk-alice-1/v1"
|
||||
|
||||
r2 = c.get("/api/v2/serve/models?include_deleted=1", headers=headers)
|
||||
assert r2.status_code == 200
|
||||
|
||||
@ -3,6 +3,8 @@ from __future__ import annotations
|
||||
import sys
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_ray_serve_client_calls_start_run_status(monkeypatch):
|
||||
import ray # provided by conftest stub
|
||||
@ -28,9 +30,14 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch):
|
||||
calls.append(("serve.status", None))
|
||||
return {"ok": True}
|
||||
|
||||
def _delete(name: str):
|
||||
calls.append(("serve.delete", {"name": name}))
|
||||
return {"deleted": True}
|
||||
|
||||
serve.start = _start # type: ignore[attr-defined]
|
||||
serve.run = _run # type: ignore[attr-defined]
|
||||
serve.status = _status # type: ignore[attr-defined]
|
||||
serve.delete = _delete # type: ignore[attr-defined]
|
||||
|
||||
sys.modules["ray.serve"] = serve
|
||||
ray.serve = serve # type: ignore[attr-defined]
|
||||
@ -41,9 +48,11 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch):
|
||||
client.ensure_started()
|
||||
out = client.apply_app(app="APP", app_name="argus_llm_app", route_prefix="/")
|
||||
st = client.get_status()
|
||||
deleted = client.delete_app(app_name="mk1")
|
||||
|
||||
assert out == {"deployed": True}
|
||||
assert st == {"ok": True}
|
||||
assert deleted == {"deleted": True}
|
||||
|
||||
# Verify call order and key args.
|
||||
assert calls[0][0] == "ray.init"
|
||||
@ -53,3 +62,19 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch):
|
||||
assert calls[2][0] == "serve.run"
|
||||
assert calls[2][1]["name"] == "argus_llm_app"
|
||||
assert calls[3][0] == "serve.status"
|
||||
assert calls[4][0] == "serve.delete"
|
||||
|
||||
|
||||
def test_ray_serve_client_rejects_dict_app(monkeypatch):
|
||||
import ray # provided by conftest stub
|
||||
|
||||
def _init(*args, **kwargs):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(ray, "init", _init, raising=False)
|
||||
|
||||
from argus.service.serve_client import RayServeClient
|
||||
|
||||
client = RayServeClient(http_port=8000, proxy_location="HeadOnly", ray_init_address="auto")
|
||||
with pytest.raises(ValueError, match="invalid serve app object"):
|
||||
client.apply_app(app={"llm_configs": []}, app_name="mk1", route_prefix="/serve/mk1")
|
||||
|
||||
@ -8,11 +8,16 @@ class _FakeServeClient:
|
||||
def __init__(self):
|
||||
self.started = 0
|
||||
self.applied = []
|
||||
self.deleted = []
|
||||
self.status_calls = 0
|
||||
self.fail_apply = False
|
||||
self.fail_status = False
|
||||
self.fail_delete = False
|
||||
self.fail_start = False
|
||||
|
||||
def ensure_started(self) -> None:
|
||||
if self.fail_start:
|
||||
raise RuntimeError("start boom")
|
||||
self.started += 1
|
||||
|
||||
def apply_app(self, *, app, app_name: str, route_prefix: str = "/"):
|
||||
@ -21,6 +26,12 @@ class _FakeServeClient:
|
||||
self.applied.append({"app": app, "app_name": app_name, "route_prefix": route_prefix})
|
||||
return {"ok": True}
|
||||
|
||||
def delete_app(self, *, app_name: str):
|
||||
if self.fail_delete:
|
||||
raise RuntimeError("delete boom")
|
||||
self.deleted.append({"app_name": app_name})
|
||||
return {"ok": True}
|
||||
|
||||
def get_status(self):
|
||||
self.status_calls += 1
|
||||
if self.fail_status:
|
||||
@ -133,6 +144,8 @@ def test_reconciler_apply_success_marks_running(tmp_path: Path):
|
||||
rec.tick()
|
||||
assert client.started == 1
|
||||
assert len(client.applied) == 1
|
||||
assert client.applied[0]["app_name"] == "mk1"
|
||||
assert client.applied[0]["route_prefix"] == "/serve/mk1"
|
||||
applied = client.applied[0]["app"]["llm_configs"]
|
||||
assert applied[0]["engine_kwargs"]["tensor_parallel_size"] == 1
|
||||
assert applied[0]["runtime_env"]["env_vars"]["HF_HUB_OFFLINE"] == "1"
|
||||
@ -167,9 +180,8 @@ def test_reconciler_delete_removes_and_marks_deleted(tmp_path: Path):
|
||||
get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(),
|
||||
)
|
||||
rec.tick()
|
||||
assert len(client.applied) == 1
|
||||
cfgs = client.applied[0]["app"]["llm_configs"]
|
||||
assert {c["model_loading_config"]["model_id"] for c in cfgs} == {"alice-202601061235-x"} # only keep remains
|
||||
assert client.applied == []
|
||||
assert client.deleted == [{"app_name": "del"}]
|
||||
row = db.get_serve_model("del")
|
||||
assert row and row["state"] == "DELETED"
|
||||
assert row.get("deleted_at")
|
||||
@ -205,3 +217,138 @@ def test_reconciler_apply_failure_marks_failed(tmp_path: Path):
|
||||
row = db.get_serve_model("mk1")
|
||||
assert row and row["state"] == "FAILED"
|
||||
assert row.get("error_summary")
|
||||
|
||||
|
||||
def test_reconciler_applies_one_model_per_tick(tmp_path: Path):
|
||||
from argus.service.config import V2Config
|
||||
from argus.service.db import Db
|
||||
from argus.service.serving_reconciler import ServingReconciler
|
||||
|
||||
cfg = V2Config.from_root_dict(
|
||||
{
|
||||
"ray": {"shared_root": "/private"},
|
||||
"service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}},
|
||||
"data": {"sftpgo": {}, "retention": {}},
|
||||
"serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}},
|
||||
}
|
||||
)
|
||||
db = Db(cfg.sqlite.db_path)
|
||||
db.init()
|
||||
_seed_model(db, model_key="mk1", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1)
|
||||
_seed_model(db, model_key="mk2", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1)
|
||||
|
||||
client = _FakeServeClient()
|
||||
rec = ServingReconciler(
|
||||
db=db,
|
||||
v2_cfg=cfg,
|
||||
ray_runtime_env_env_vars={},
|
||||
serve_client=client,
|
||||
get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(),
|
||||
)
|
||||
|
||||
rec.tick()
|
||||
assert len(client.applied) == 1
|
||||
assert client.applied[0]["app_name"] == "mk1"
|
||||
assert db.get_serve_model("mk1")["state"] == "RUNNING"
|
||||
assert db.get_serve_model("mk2")["state"] == "QUEUED"
|
||||
|
||||
rec.tick()
|
||||
assert len(client.applied) == 2
|
||||
assert client.applied[1]["app_name"] == "mk2"
|
||||
assert db.get_serve_model("mk2")["state"] == "RUNNING"
|
||||
|
||||
|
||||
def test_reconciler_start_error_is_recorded(tmp_path: Path):
|
||||
from argus.service.config import V2Config
|
||||
from argus.service.db import Db
|
||||
from argus.service.serving_reconciler import ServingReconciler
|
||||
|
||||
cfg = V2Config.from_root_dict(
|
||||
{
|
||||
"ray": {"shared_root": "/private"},
|
||||
"service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}},
|
||||
"data": {"sftpgo": {}, "retention": {}},
|
||||
"serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}},
|
||||
}
|
||||
)
|
||||
db = Db(cfg.sqlite.db_path)
|
||||
db.init()
|
||||
_seed_model(db, model_key="mk1", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1)
|
||||
|
||||
client = _FakeServeClient()
|
||||
client.fail_start = True
|
||||
rec = ServingReconciler(db=db, v2_cfg=cfg, ray_runtime_env_env_vars={}, serve_client=client)
|
||||
rec.tick()
|
||||
assert db.get_serve_model("mk1")["state"] == "QUEUED"
|
||||
ev = db.list_serve_events("mk1", limit=50)
|
||||
assert any(e["event_type"] == "SERVE_START_ERROR" for e in ev)
|
||||
|
||||
|
||||
def test_reconciler_delete_failure_marks_failed(tmp_path: Path):
|
||||
from argus.service.config import V2Config
|
||||
from argus.service.db import Db
|
||||
from argus.service.serving_reconciler import ServingReconciler
|
||||
|
||||
cfg = V2Config.from_root_dict(
|
||||
{
|
||||
"ray": {"shared_root": "/private"},
|
||||
"service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}},
|
||||
"data": {"sftpgo": {}, "retention": {}},
|
||||
"serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}},
|
||||
}
|
||||
)
|
||||
db = Db(cfg.sqlite.db_path)
|
||||
db.init()
|
||||
_seed_model(db, model_key="mk1", user_id="alice", state="DELETING", num_replicas=1, gpus_per_replica=1)
|
||||
|
||||
client = _FakeServeClient()
|
||||
client.fail_delete = True
|
||||
rec = ServingReconciler(db=db, v2_cfg=cfg, ray_runtime_env_env_vars={}, serve_client=client)
|
||||
rec.tick()
|
||||
row = db.get_serve_model("mk1")
|
||||
assert row and row["state"] == "FAILED"
|
||||
assert row.get("error_summary")
|
||||
|
||||
|
||||
def test_reconciler_engine_kwargs_bad_json_is_ignored(tmp_path: Path):
|
||||
from argus.service.config import V2Config
|
||||
from argus.service.db import Db
|
||||
from argus.service.serving_reconciler import ServingReconciler
|
||||
|
||||
cfg = V2Config.from_root_dict(
|
||||
{
|
||||
"ray": {"shared_root": "/private"},
|
||||
"service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}},
|
||||
"data": {"sftpgo": {}, "retention": {}},
|
||||
"serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}},
|
||||
}
|
||||
)
|
||||
db = Db(cfg.sqlite.db_path)
|
||||
db.init()
|
||||
|
||||
db.create_serve_model(
|
||||
model_key="mk1",
|
||||
user_id="alice",
|
||||
model_id_suffix="x",
|
||||
model_id_prefix="alice-202601061235",
|
||||
model_id="alice-202601061235-x",
|
||||
model_source="/private/hf/x",
|
||||
num_replicas=1,
|
||||
gpus_per_replica=1,
|
||||
engine_kwargs_json="not-json",
|
||||
spec_yaml="model_id: x\nmodel_source: $HOME/common/hf/x\n",
|
||||
resolved_spec_yaml="user_id: alice\nmodel_id: alice-202601061235-x\n",
|
||||
)
|
||||
db.set_serve_model_state(model_key="mk1", state="QUEUED", event_type="TEST_SEED")
|
||||
|
||||
client = _FakeServeClient()
|
||||
rec = ServingReconciler(
|
||||
db=db,
|
||||
v2_cfg=cfg,
|
||||
ray_runtime_env_env_vars={},
|
||||
serve_client=client,
|
||||
get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(),
|
||||
)
|
||||
rec.tick()
|
||||
cfgs = client.applied[0]["app"]["llm_configs"]
|
||||
assert cfgs[0]["engine_kwargs"]["tensor_parallel_size"] == 1
|
||||
|
||||
@ -53,4 +53,4 @@ def test_ui_serving_contains_openai_port_8000(tmp_path, monkeypatch):
|
||||
r = c.get("/ui/serving")
|
||||
assert r.status_code == 200
|
||||
assert "curOriginWithPort(8000)" in r.text
|
||||
assert "/v1/models" in r.text
|
||||
assert "/serve/<model_key>/v1" in r.text
|
||||
|
||||
@ -6,7 +6,6 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
source "${SCRIPT_DIR}/lib.sh"
|
||||
|
||||
API_ADDR="${API_ADDR:-http://127.0.0.1:8080}"
|
||||
OPENAI_BASE_URL="${OPENAI_BASE_URL:-http://127.0.0.1:8000/v1}"
|
||||
ADMIN_TOKEN="${MVP_INTERNAL_TOKEN:-}"
|
||||
USER_ID="${USER_ID:-alice}"
|
||||
EXPECTED_RAY_NODES="${EXPECTED_RAY_NODES:-3}" # head + 2 workers
|
||||
@ -57,7 +56,7 @@ ray_wait_nodes() {
|
||||
local tries="${2:-60}"
|
||||
for i in $(seq 1 "${tries}"); do
|
||||
local out n
|
||||
out="$(docker exec -i "${HEAD_CONTAINER}" python3 -c "import ray; ray.init(address='auto', ignore_reinit_error=True, log_to_driver=False, logging_level='ERROR'); print(sum(1 for n in ray.nodes() if n.get('Alive')))" 2>/dev/null || true)"
|
||||
out="$(docker exec "${HEAD_CONTAINER}" python3 -c "import ray; ray.init(address='auto', ignore_reinit_error=True, log_to_driver=False, logging_level='ERROR'); print(sum(1 for n in ray.nodes() if n.get('Alive')))" 2>/dev/null || true)"
|
||||
n="$(printf '%s\n' "${out}" | tail -n 1 | tr -cd '0-9' || true)"
|
||||
if [[ "${n}" =~ ^[0-9]+$ ]]; then
|
||||
echo "[host] ray_nodes_alive=${n} (want>=${want})"
|
||||
@ -75,16 +74,17 @@ ray_wait_nodes() {
|
||||
}
|
||||
|
||||
openai_wait_ready() {
|
||||
local tries="${1:-120}"
|
||||
local base_url="$1"
|
||||
local tries="${2:-120}"
|
||||
for i in $(seq 1 "${tries}"); do
|
||||
if curl -sS -m 2 "${OPENAI_BASE_URL}/models" >/dev/null 2>&1; then
|
||||
echo "[host] openai_ready: ${OPENAI_BASE_URL}"
|
||||
if curl -sS -m 2 "${base_url}/models" >/dev/null 2>&1; then
|
||||
echo "[host] openai_ready: ${base_url}"
|
||||
return 0
|
||||
fi
|
||||
echo "[host] waiting openai... (${i}/${tries})"
|
||||
sleep 2
|
||||
done
|
||||
echo "ERROR: openai not ready: ${OPENAI_BASE_URL}" >&2
|
||||
echo "ERROR: openai not ready: ${base_url}" >&2
|
||||
return 1
|
||||
}
|
||||
|
||||
@ -112,6 +112,64 @@ wait_model_state() {
|
||||
return 1
|
||||
}
|
||||
|
||||
model_openai_base_url() {
|
||||
local token="$1"
|
||||
local model_key="$2"
|
||||
curl -sS -H "Authorization: Bearer ${token}" "${API_ADDR}/api/v2/serve/models/${model_key}" \
|
||||
| python3 -c 'import sys,json; print(json.load(sys.stdin)["model"]["endpoint"]["openai_base_url"])'
|
||||
}
|
||||
|
||||
serve_replica_fingerprint_json() {
|
||||
local model_key="$1"
|
||||
# Best-effort: list ServeReplica actors for this model app and return a stable JSON signature.
|
||||
dexec "${HEAD_CONTAINER}" python3 - "${model_key}" <<'PY' | tail -n 1
|
||||
import json
|
||||
import sys
|
||||
|
||||
import ray
|
||||
|
||||
model_key = sys.argv[1]
|
||||
ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False)
|
||||
try:
|
||||
from ray.util.state import list_actors
|
||||
except Exception:
|
||||
from ray.experimental.state.api import list_actors # type: ignore
|
||||
|
||||
actors = list_actors(limit=10000)
|
||||
want = f"ServeReplica:{model_key}:"
|
||||
out = []
|
||||
for a in actors or []:
|
||||
if isinstance(a, dict):
|
||||
cls = a.get("class_name")
|
||||
st = a.get("state")
|
||||
actor_id = a.get("actor_id")
|
||||
node_id = a.get("node_id")
|
||||
pid = a.get("pid")
|
||||
else:
|
||||
cls = getattr(a, "class_name", None)
|
||||
st = getattr(a, "state", None)
|
||||
actor_id = getattr(a, "actor_id", None)
|
||||
node_id = getattr(a, "node_id", None)
|
||||
pid = getattr(a, "pid", None)
|
||||
|
||||
cls_s = str(cls or "")
|
||||
if not cls_s.startswith(want):
|
||||
continue
|
||||
if str(st or "") and str(st) != "ALIVE":
|
||||
continue
|
||||
out.append(
|
||||
{
|
||||
"actor_id": actor_id,
|
||||
"node_id": node_id,
|
||||
"class_name": cls_s,
|
||||
"pid": pid,
|
||||
}
|
||||
)
|
||||
out = sorted(out, key=lambda x: (x.get("class_name") or "", x.get("actor_id") or ""))
|
||||
print(json.dumps(out, sort_keys=True))
|
||||
PY
|
||||
}
|
||||
|
||||
echo "[host] ===== run_all_v38_serving.sh begin ====="
|
||||
|
||||
"${SCRIPT_DIR}/00_prereq_check.sh"
|
||||
@ -122,8 +180,8 @@ echo "[host] bring down existing containers (best-effort)"
|
||||
"${SCRIPT_DIR}/02_down.sh" || true
|
||||
|
||||
echo "[host] (re)create containers (Ray + SFTPGo + W&B)"
|
||||
# For v3.8, we need the latest ray-node image (ray[llm] deps). Force build once.
|
||||
BUILD="${BUILD:-1}" "${SCRIPT_DIR}/01_up.sh"
|
||||
# Default: don't rebuild if image already exists (01_up.sh will auto-build if missing).
|
||||
BUILD="${BUILD:-0}" "${SCRIPT_DIR}/01_up.sh"
|
||||
|
||||
echo "[host] wait ray ready"
|
||||
ray_wait_ready 60
|
||||
@ -150,44 +208,84 @@ if [[ -z "${LOCAL_MODEL_PATH}" || "${LOCAL_MODEL_PATH}" != /* ]]; then
|
||||
fi
|
||||
echo "[host] local_model_path: ${LOCAL_MODEL_PATH}"
|
||||
|
||||
echo "[host] submit serving model via API"
|
||||
SERVE_SPEC=$'model_id: qwen-0.5b\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n'
|
||||
CREATE_RESP="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC}" "${API_ADDR}/api/v2/serve/models")"
|
||||
echo "[host] create_model_resp: ${CREATE_RESP}"
|
||||
MODEL_KEY="$(printf '%s' "${CREATE_RESP}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')"
|
||||
echo "[host] submit serving model A via API"
|
||||
SERVE_SPEC_A=$'model_id: qwen-0.5b-a\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n'
|
||||
CREATE_RESP_A="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC_A}" "${API_ADDR}/api/v2/serve/models")"
|
||||
echo "[host] create_model_a_resp: ${CREATE_RESP_A}"
|
||||
MODEL_KEY_A="$(printf '%s' "${CREATE_RESP_A}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')"
|
||||
|
||||
echo "[host] wait model RUNNING"
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY}" "RUNNING" 300
|
||||
echo "[host] wait model A RUNNING"
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY_A}" "RUNNING" 300
|
||||
OPENAI_BASE_A="$(model_openai_base_url "${USER_TOKEN}" "${MODEL_KEY_A}")"
|
||||
echo "[host] openai_base_a: ${OPENAI_BASE_A}"
|
||||
openai_wait_ready "${OPENAI_BASE_A}" 120
|
||||
|
||||
echo "[host] wait OpenAI ingress ready"
|
||||
openai_wait_ready 120
|
||||
|
||||
echo "[host] verify /v1/models contains model"
|
||||
MODEL_ID="$(
|
||||
curl -sS "${OPENAI_BASE_URL}/models" \
|
||||
echo "[host] verify A /models contains model"
|
||||
MODEL_ID_A="$(
|
||||
curl -sS "${OPENAI_BASE_A}/models" \
|
||||
| python3 -c 'import sys,json; obj=json.load(sys.stdin); print("\n".join([m.get("id","") for m in obj.get("data",[]) if isinstance(m,dict)]))' \
|
||||
| grep -E "^${USER_ID}-[0-9]{12}-qwen-0\\.5b$" \
|
||||
| grep -E "^${USER_ID}-[0-9]{12}-qwen-0\\.5b-a$" \
|
||||
| head -n1 \
|
||||
|| true
|
||||
)"
|
||||
if [[ -z "${MODEL_ID}" ]]; then
|
||||
echo "ERROR: model id not found in /v1/models" >&2
|
||||
curl -sS "${OPENAI_BASE_URL}/models" | python3 -m json.tool >&2 || true
|
||||
if [[ -z "${MODEL_ID_A}" ]]; then
|
||||
echo "ERROR: model id A not found in /models" >&2
|
||||
curl -sS "${OPENAI_BASE_A}/models" | python3 -m json.tool >&2 || true
|
||||
exit 1
|
||||
fi
|
||||
echo "[host] model_id: ${MODEL_ID}"
|
||||
echo "[host] model_id_a: ${MODEL_ID_A}"
|
||||
|
||||
echo "[host] chat completion (best-effort)"
|
||||
CHAT_RESP="$(curl -sS -H "Content-Type: application/json" -H "Authorization: Bearer FAKE_KEY" -X POST "${OPENAI_BASE_URL}/chat/completions" --data-binary "{\"model\":\"${MODEL_ID}\",\"messages\":[{\"role\":\"user\",\"content\":\"hello\"}],\"max_tokens\":16,\"stream\":false}")"
|
||||
printf '%s\n' "${CHAT_RESP}" | python3 -m json.tool >/dev/null 2>&1 || {
|
||||
echo "ERROR: invalid chat response" >&2
|
||||
printf '%s\n' "${CHAT_RESP}" >&2
|
||||
echo "[host] chat completion A (best-effort)"
|
||||
CHAT_RESP_A="$(curl -sS -H "Content-Type: application/json" -X POST "${OPENAI_BASE_A}/chat/completions" --data-binary "{\"model\":\"${MODEL_ID_A}\",\"messages\":[{\"role\":\"user\",\"content\":\"hello\"}],\"max_tokens\":16,\"stream\":false}")"
|
||||
printf '%s\n' "${CHAT_RESP_A}" | python3 -m json.tool >/dev/null 2>&1 || {
|
||||
echo "ERROR: invalid chat response A" >&2
|
||||
printf '%s\n' "${CHAT_RESP_A}" >&2
|
||||
exit 1
|
||||
}
|
||||
echo "[host] chat_ok"
|
||||
echo "[host] chat_a_ok"
|
||||
|
||||
echo "[host] delete model"
|
||||
curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY}" >/dev/null
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY}" "DELETED" 300
|
||||
echo "[host] fingerprint A (serve replica actors)"
|
||||
FP_A_1="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")"
|
||||
echo "[host] fp_a_1: ${FP_A_1}"
|
||||
if [[ -z "${FP_A_1}" || "${FP_A_1}" == "[]" ]]; then
|
||||
echo "ERROR: failed to fingerprint A replicas (empty)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[host] submit serving model B via API"
|
||||
SERVE_SPEC_B=$'model_id: qwen-0.5b-b\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n'
|
||||
CREATE_RESP_B="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC_B}" "${API_ADDR}/api/v2/serve/models")"
|
||||
echo "[host] create_model_b_resp: ${CREATE_RESP_B}"
|
||||
MODEL_KEY_B="$(printf '%s' "${CREATE_RESP_B}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')"
|
||||
|
||||
echo "[host] wait model B RUNNING"
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY_B}" "RUNNING" 300
|
||||
OPENAI_BASE_B="$(model_openai_base_url "${USER_TOKEN}" "${MODEL_KEY_B}")"
|
||||
echo "[host] openai_base_b: ${OPENAI_BASE_B}"
|
||||
openai_wait_ready "${OPENAI_BASE_B}" 120
|
||||
|
||||
echo "[host] verify isolation: A replica fingerprint unchanged after creating B"
|
||||
FP_A_2="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")"
|
||||
echo "[host] fp_a_2: ${FP_A_2}"
|
||||
if [[ "${FP_A_2}" != "${FP_A_1}" ]]; then
|
||||
echo "ERROR: A replicas changed after creating B (unexpected redeploy)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[host] delete model B"
|
||||
curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY_B}" >/dev/null
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY_B}" "DELETED" 300
|
||||
|
||||
echo "[host] verify isolation: A replica fingerprint unchanged after deleting B"
|
||||
FP_A_3="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")"
|
||||
echo "[host] fp_a_3: ${FP_A_3}"
|
||||
if [[ "${FP_A_3}" != "${FP_A_1}" ]]; then
|
||||
echo "ERROR: A replicas changed after deleting B (unexpected redeploy)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[host] delete model A"
|
||||
curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY_A}" >/dev/null
|
||||
wait_model_state "${USER_TOKEN}" "${MODEL_KEY_A}" "DELETED" 300
|
||||
|
||||
echo "[host] ===== run_all_v38_serving.sh done ====="
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user