diff --git a/specs/mvp/v3.8/Snipaste_2026-01-08_17-17-57.png b/specs/mvp/v3.8/Snipaste_2026-01-08_17-17-57.png new file mode 100644 index 0000000..3ec8be7 Binary files /dev/null and b/specs/mvp/v3.8/Snipaste_2026-01-08_17-17-57.png differ diff --git a/specs/mvp/v3.8/v3.8_per_model_app.md b/specs/mvp/v3.8/v3.8_per_model_app.md new file mode 100644 index 0000000..fec725f --- /dev/null +++ b/specs/mvp/v3.8/v3.8_per_model_app.md @@ -0,0 +1,189 @@ +# v3.8 方案补充:每个模型一个 Ray Serve App(隔离增删影响) + +## 背景与问题复现 + +当前 v3.8 的实现采用 **单 application + 多模型** 的方式: + +- 服务层每次 reconcile 都会构造“全量 llm_configs”并调用一次 `serve.run(app, name="argus_llm_app", route_prefix="/")` +- **新增/删除一个模型**会触发对同一个 app 的“整体更新” +- Ray Serve 在 app 更新时会对该 app 内的 deployments/replicas 做滚动更新与重新调度 + +因此你在 Ray Dashboard 中观察到: + +- 添加/删除一个模型时,其他模型的 Serve deployment 也进入更新状态 +- 内存/显存占用重新变化,甚至出现 GPU 卡位变化(replica 重新调度到不同 node/GPU) + +这与“其他未变更 model 不受影响”的期望不一致。 + +--- + +## 目标 + +将 serving 架构调整为: + +- **每个模型一个 Serve App(独立 app name)** +- 每个模型一个独立 `route_prefix` +- 新增/删除/缩放某个模型只更新该模型对应的 app,不影响其他模型 app + +约束保持不变: + +- 推理端口固定 `8000` +- 推理侧不接入现有 token 鉴权(OpenAI endpoint 无鉴权) +- `model_id` 前缀规则:`--` +- `LLMConfig.accelerator_type` 由 `configs/dev.yaml` 配置(dev/h1: `H20`) + +--- + +## 总体设计 + +### 1) 命名与路由 + +为每个 model 生成: + +- `app_name`:建议直接使用 `model_key`(天然唯一且 URL-safe),例如: + - `app_name = "mvp2-alice-serve-20260106-060203-aad8"` +- `route_prefix`:建议使用 model_key,避免 model_id 中的 `.`、`_` 等带来的 URL/编码歧义: + - `route_prefix = f"/serve/{model_key}"` + +于是该模型的 OpenAI base url 为: + +- `openai_base_url = http://:8000/serve//v1` + +说明: + +- 仍然是 **OpenAI-compatible**,只是 base_url 不再是根路径 `/v1`,而是每个模型一个前缀。 +- 这样可以做到“每个模型的 OpenAI endpoint 互不影响”,也更容易做按模型的观测/下线。 + +### 2) 运行方式(Ray Serve) + +单模型 app 的创建/更新: + +- `app = build_openai_app({"llm_configs":[LLMConfig(...)]})` +- `serve.run(app, name=app_name, route_prefix=route_prefix)` + +单模型 app 的删除: + +- `serve.delete(app_name)` + +关键点: + +- **更新/删除只作用于对应 app_name**;其它 app 不会被 serve.run “整体重建”触发滚动更新。 + +### 3) 服务层(Scheduler/Reconciler)改造点(高层) + +现状:`ServingReconciler.tick()` 每次对“全量模型集合” apply 一次 app。 + +目标:改成按 model_key 的“局部 reconcile”: + +- 对于状态 `QUEUED` 的 model: + - 只构建该 model 的 `LLMConfig` + - `serve.run(app, name=model_key, route_prefix="/serve/")` + - 状态:`DEPLOYING` →(probe 成功)→ `RUNNING` +- 对于状态 `DELETING` 的 model: + - `serve.delete(model_key)` + - 状态:`DELETED` + +资源预检查: + +- 只需要预检查“本次变更模型”需要的 GPU(`num_replicas * gpus_per_replica`) +- 不需要把其他模型资源都算入 needed_total_gpus(因为不再重建全量 app) + +### 4) API/UI 返回的 endpoint 结构 + +现状 API 返回: + +- `endpoint.openai_base_url = http://:8000/v1` +- `endpoint.model = ` + +建议改为(字段不变,值变化): + +- `endpoint.openai_base_url = http://:8000/serve//v1` +- `endpoint.model = `(保持) + +UI 的示例 curl 也应使用上面的 base_url。 + +--- + +## 行为变化与兼容性影响 + +### 1) `/v1/models` 聚合能力变化(重要) + +采用“每模型一个 route_prefix”后: + +- `http://:8000/v1/models` **不再是“所有模型的总览”**(除非我们再提供一个聚合层) +- 每个模型的 models list 在它自己的前缀下: + - `http://:8000/serve//v1/models` + +如果仍然希望保留一个统一入口(可选增强,非本方案必做): + +- 额外引入一个“稳定不重建”的 **OpenAI Router**(可以是: + - FastAPI(8080) 侧做反向代理;或 + - 一个单独 Ray Serve app 只负责路由,不随模型变更重建) +- Router 读取 SQLite/内存缓存的 model 映射: + - `model_id -> route_prefix` +- 将 `/v1/chat/completions` 转发到对应 model 的 prefix + +这可以作为 v3.9+ 的增强项;v3.8 的核心目标是“变更隔离”,优先保证稳定性。 + +### 2) 资源与调度稳定性 + +改为 per-app 后: + +- 新增模型 B 不再引起模型 A 的 replica 重新调度 → **A 的 GPU/内存占用更稳定** +- 删除模型 B 也不会触发 A 的滚动更新 + +但仍需注意: + +- 如果 Ray 集群发生节点故障/资源回收,Serve 本身仍可能重启个别 replica(这是系统层行为) + +--- + +## 验证与验收流程(建议) + +### A. 功能验收(API/UI) + +1. 通过 UI/或 API 创建模型 A,等待 RUNNING +2. 记录 A 的: + - `model_key_A` + - `endpoint.openai_base_url_A` +3. 再创建模型 B,等待 RUNNING +4. 确认: + - A 的 endpoint 仍可用(对 A 的 base_url 发 chat completion) + - B 的 endpoint 可用 +5. 删除模型 B,确认: + - B endpoint 404/不可用 + - A endpoint 仍可用 + +### B. “不影响其它模型”的强验证(Ray actor 级别) + +在 Ray 上抓取 A 对应 `LLMServer` replica 的 actor_id/node_id: + +- 创建 B 前:`actor_id_A_before` +- 创建 B 后:`actor_id_A_after` +- 删除 B 后:`actor_id_A_after_delete` + +预期: + +- `actor_id_A_before == actor_id_A_after == actor_id_A_after_delete` + +(允许 `LLMRouter` 变化,但 **LLMServer for A 不应变化**) + +--- + +## 需要修改的代码点(清单级) + +> 这里只列“改哪里”,不展开具体实现(实现时按 TDD 补单测)。 + +- `argus.service.serving_reconciler`: + - 从“全量 apply 单 app”改为“按 model_key 局部 apply/delete 单 app” + - GPU 预检查改为 per-model +- `argus.service.serve_client`: + - 增加 `delete_app(app_name)`(封装 `serve.delete(app_name)`) + - `apply_app` 传入 `app_name/route_prefix`(已存在,但将不再传固定 app_name) +- `argus.service.app`(Serving API 输出): + - `_serve_model_public().endpoint.openai_base_url` 改为 per-model 前缀 + - `/api/v2/serve/models` list/get 的 openai_base_url 语义调整(可返回“该模型的 base_url”,列表里每条都有) +- `argus.service.ui`(Serving 页面): + - “OpenAI /v1/models” 需要调整为“选择某个模型后打开该模型的 /v1/models” + - 详情页 curl 示例使用 per-model base_url + diff --git a/specs/mvp/v3.8/v3.8_per_model_app_dev_plan.md b/specs/mvp/v3.8/v3.8_per_model_app_dev_plan.md new file mode 100644 index 0000000..e61b579 --- /dev/null +++ b/specs/mvp/v3.8/v3.8_per_model_app_dev_plan.md @@ -0,0 +1,174 @@ +# MVP v3.8(变更)开发计划:Per-Model Serve App(TDD) + +> 目标:按 `specs/mvp/v3.8/v3.8_per_model_app.md` 将 v3.8 从“单 app 多模型(全量重建)”改为“**每个模型一个 Ray Serve app + 独立 route_prefix**”,实现增删/缩放某个模型不触发其它模型重启与重调度。 + +## 约束与结论 + +- 推理端口固定:`8000` +- 推理 endpoint **不做鉴权** +- `model_id` 前缀规则:`--` +- `LLMConfig.accelerator_type` 由 `configs/dev.yaml` 决定(dev/h1: `H20`) +- 路由方案(本迭代固定): + - `app_name = model_key` + - `route_prefix = /serve/` + - `openai_base_url = http://:8000/serve//v1` + +## 非目标(明确不做) + +- 不提供统一 `/v1` 的“跨模型聚合路由”(如要,需要额外 router 层;可在后续迭代做) +- 不改 ServingSpec 语义(输入仍为 `model_id/model_source/num_replicas/gpus_per_replica/engine_kwargs`) + +--- + +## M0 - 基线回归与分支保护 + +**目的**:确保切换架构前训练/现有功能不回退。 + +### 测试 +- [ ] 本地:`.venv/bin/python -m pytest` 全绿(coverage ≥ 90%) + +### 验收 +- [ ] 基线可用;进入 M1 + +--- + +## M1 - API 输出与 endpoint 语义调整(单测驱动) + +**目的**:API/DB/前端都统一 per-model 的 `openai_base_url` 语义;避免 UI/脚本继续使用 `/v1` 根路径。 + +### 变更点 +- `GET /api/v2/serve/models`: + - 保持返回 `items[]`,但每个 item 的 `endpoint.openai_base_url` 必须是 per-model base url + - `openai_base_url`(列表层级字段)处理策略二选一: + - A(推荐):移除该字段(breaking,需同步 UI/脚本) + - B(兼容):保留但改为 `null` 或提示字符串(不再保证可用) +- `GET /api/v2/serve/models/{model_key}`: + - `model.endpoint.openai_base_url` 改为 per-model base url + +### 单测(先写) +- [ ] 更新/新增 `src/mvp/py/tests/test_app_serving_api.py` + - 断言 `endpoint.openai_base_url` 包含 `/serve//v1` + - 断言多条 models 的 base_url 不相同(随 model_key 变化) + +### 实现 +- [ ] 更新 `src/mvp/py/argus/service/app.py`: + - `_serve_model_public()` 的 `endpoint.openai_base_url` 拼接 per-model prefix + - 如选择移除/调整 list 层的 `openai_base_url`,同步实现 + +### 验收 +- [ ] API 单测通过;返回结构可被 UI/脚本消费 + +--- + +## M2 - ServeClient 扩展(delete_app)+ Reconciler 改造成 per-model(单测驱动) + +**目的**:核心行为变更:每次 tick 只部署/删除一个模型对应的 app,不重建全量 app。 + +### 变更点(行为) +- `QUEUED`: + - 对该 `model_key` 执行 `serve.run(app, name=model_key, route_prefix=/serve/)` + - 状态:`DEPLOYING → RUNNING` +- `DELETING`: + - 对该 `model_key` 执行 `serve.delete(model_key)` + - 状态:`DELETED` +- 资源预检查从“全量 needed_total_gpus”改为“本次变更模型所需 GPU” + +### 单测(先写) +- [ ] 更新 `src/mvp/py/tests/test_serving_reconciler.py` + - `create A` 后,reconciler 只 `apply_app(app_name=A.model_key, route_prefix=/serve/A)` + - `create B` 后,reconciler 只 `apply_app(app_name=B.model_key, route_prefix=/serve/B)`(不再对 A 触发 apply) + - `delete B` 后,reconciler 只 `delete_app(B.model_key)`(不触发 A) + - GPU 不足时:保持 `QUEUED` 且 event=SERVE_PENDING_RESOURCES + +### 实现 +- [ ] `src/mvp/py/argus/service/serve_client.py` + - 增加 `delete_app(app_name: str)`(封装 `serve.delete`) +- [ ] `src/mvp/py/argus/service/serving_reconciler.py` + - 移除“全量 app apply”逻辑 + - 每个 model_key 独立部署:`app_name=model_key`、`route_prefix=/serve/` + - 删除路径走 `delete_app` + +### 验收 +- [ ] reconciler 单测全绿;逻辑可解释(events/state 正确) + +--- + +## M3 - WebUI Serving 页面适配 per-model base_url(单测驱动) + +**目的**:用户从 UI 复制的示例命令必须可用;不再指向根 `/v1`。 + +### 变更点 +- `/ui/serving` 列表: + - “OpenAI /v1/models”按钮改为: + - A:移除(因为没有聚合 `/v1/models`) + - B:保留但文案改为“OpenAI base 需进入详情页” +- `/ui/serving/{model_key}` 详情页: + - `curl` 示例使用 per-model `openai_base_url` + - 增加一键打开:`/serve//v1/models` + +### 单测(先写) +- [ ] 更新/新增 `src/mvp/py/tests/test_ui_serving.py` + - 断言页面包含 `/serve/` 前缀 + - 断言详情页示例里包含 `/serve//v1/chat/completions` + +### 实现 +- [ ] `src/mvp/py/argus/service/ui.py` 更新 Serving UI + +### 验收 +- [ ] UI 单测全绿;页面内容与 API 语义一致 + +--- + +## M4 - E2E 脚本更新(v3.8 serving) + +**目的**:在 dev/h1 一键验证 per-model app:A/B 增删不互相影响,且推理可用。 + +### 变更点 +- 更新 `src/mvp/scripts/run_all_v38_serving.sh`: + - `/v1/models` 与 `chat/completions` 均改用 per-model base_url(`/serve//v1`) + - 增加“隔离验证”步骤: + - 部署 A → 记录 A 的 serve replica actor_id/node_id + - 部署 B → 再次记录 A 的 actor_id/node_id,必须一致 + - 删除 B → 再次记录 A 的 actor_id/node_id,必须一致 + - 最后删除 A + +### 验收 +- [ ] E2E 脚本能跑通且输出明确断言(一致/不一致) + +--- + +## M5 - h1 端到端验证与回归 + +**目的**:确认实际 Ray Serve 行为满足“其它模型不滚动更新”的核心目标。 + +### 操作 +- [ ] 同步代码到:`argus@h1:/home2/argus/infra/mvp/src/mvp` +- [ ] 重启 API:`scripts/61_stop_api.sh && scripts/60_start_api.sh` +- [ ] 执行:`MVP_INTERNAL_TOKEN=... scripts/run_all_v38_serving.sh` + +### 验收标准(必须满足) +- [ ] 新增/删除 B 时,A 的 `LLMServer` replica actor_id/node_id 不变 +- [ ] A/B 的 OpenAI endpoint 均可完成 `chat/completions` +- [ ] 删除 B 后 A 仍可推理 + +--- + +## M6 - 文档与迁移说明 + +**目的**:明确“路由语义变化”和“如何使用”。 + +- [ ] 更新 `src/mvp/README.md`: + - 新增 per-model base_url 说明(`/serve//v1`) + - 提示不再提供聚合 `/v1/models` +- [ ] 更新 `specs/mvp/v3.8/v3.8_progress.md`: + - 记录 per-model app 变更与验收结论 + +--- + +## 风险与缓解 + +- **风险:旧 `argus_llm_app` 残留** + - 缓解:在 E2E/迁移步骤里增加一次 best-effort `serve.delete("argus_llm_app")`(可选) +- **风险:用户仍按旧方式访问 `/v1`** + - 缓解:UI/文档/脚本统一切换到 per-model base_url,并在列表页给出明显提示 + diff --git a/src/mvp/py/argus/service/app.py b/src/mvp/py/argus/service/app.py index f6ab121..ba8c498 100644 --- a/src/mvp/py/argus/service/app.py +++ b/src/mvp/py/argus/service/app.py @@ -91,19 +91,17 @@ def create_app(config_path: str) -> FastAPI: def _serving_enabled() -> bool: return bool(v2_cfg.serving.enabled) - def _openai_base_url(req: Request) -> str: + def _openai_base_url_for_model(req: Request, *, model_key: str) -> str: # Prefer forwarded headers if present; otherwise fall back to Host. host = req.headers.get("x-forwarded-host") or req.headers.get("host") or req.url.hostname or "127.0.0.1" - # Strip port if present (common for Host header). hostname = host if hostname.startswith("[") and "]" in hostname: - # IPv6 like: [::1]:8080 hostname = hostname.split("]")[0] + "]" else: hostname = hostname.split(":")[0] scheme = req.headers.get("x-forwarded-proto") or req.url.scheme or "http" port = int(v2_cfg.serving.serve.http_port) - return f"{scheme}://{hostname}:{port}/v1" + return f"{scheme}://{hostname}:{port}/serve/{model_key}/v1" def _dump_yaml(obj: Any) -> str: return yaml.safe_dump(obj, sort_keys=False) @@ -122,8 +120,9 @@ def create_app(config_path: str) -> FastAPI: gpus_per_replica = int(row.get("gpus_per_replica") or 0) total_gpus = num_replicas * gpus_per_replica model_id = str(row.get("model_id") or "") + model_key = str(row.get("model_key") or "") return { - "model_key": str(row.get("model_key") or ""), + "model_key": model_key, "user_id": str(row.get("user_id") or ""), "model_id": model_id, "model_id_suffix": str(row.get("model_id_suffix") or ""), @@ -138,7 +137,7 @@ def create_app(config_path: str) -> FastAPI: "updated_at": str(row.get("updated_at") or ""), "deleted_at": row.get("deleted_at"), "endpoint": { - "openai_base_url": _openai_base_url(req), + "openai_base_url": _openai_base_url_for_model(req, model_key=model_key), "model": model_id, }, } @@ -688,7 +687,8 @@ def create_app(config_path: str) -> FastAPI: out = [_serve_model_public(i, req=req) for i in items] return { "items": out, - "openai_base_url": _openai_base_url(req), + # Per-model route_prefix; see each item.endpoint.openai_base_url. + "openai_base_url": None, "limit": lim, "offset": off, "has_more": bool(len(items) == lim), @@ -773,7 +773,8 @@ def create_app(config_path: str) -> FastAPI: raise HTTPException(status_code=400, detail="serving is not enabled") return { "enabled": True, - "openai_base_url": _openai_base_url(req), + # Per-model route_prefix; see /api/v2/serve/models items[].endpoint.openai_base_url. + "openai_base_url": None, "http_port": int(v2_cfg.serving.serve.http_port), "proxy_location": str(v2_cfg.serving.serve.proxy_location), "accelerator_type": str(v2_cfg.serving.llm.accelerator_type), diff --git a/src/mvp/py/argus/service/serve_client.py b/src/mvp/py/argus/service/serve_client.py index a630468..82c81d9 100644 --- a/src/mvp/py/argus/service/serve_client.py +++ b/src/mvp/py/argus/service/serve_client.py @@ -43,3 +43,8 @@ class RayServeClient: from ray import serve # type: ignore return serve.status() + + def delete_app(self, *, app_name: str) -> Any: + from ray import serve # type: ignore + + return serve.delete(app_name) diff --git a/src/mvp/py/argus/service/serving_reconciler.py b/src/mvp/py/argus/service/serving_reconciler.py index 2c830d5..eb7e458 100644 --- a/src/mvp/py/argus/service/serving_reconciler.py +++ b/src/mvp/py/argus/service/serving_reconciler.py @@ -18,6 +18,8 @@ class ServeClient(Protocol): def apply_app(self, *, app: Any, app_name: str, route_prefix: str = "/") -> Any: ... + def delete_app(self, *, app_name: str) -> Any: ... + def get_status(self) -> Any: ... @@ -45,21 +47,18 @@ def _row_to_resolved_spec(row: dict[str, Any]) -> ResolvedServingSpec: ) -def _needed_total_gpus(rows: list[dict[str, Any]]) -> int: - total = 0 - for r in rows: - total += int(r.get("num_replicas") or 0) * int(r.get("gpus_per_replica") or 0) - return total +def _needed_model_gpus(row: dict[str, Any]) -> int: + return int(row.get("num_replicas") or 0) * int(row.get("gpus_per_replica") or 0) @dataclass class ServingReconciler: """ - v3.8: reconcile declared serve_models (SQLite) into a multi-model Ray Serve app. + v3.8 (per-model apps): reconcile serve_models (SQLite) into per-model Ray Serve apps. This reconciler is intentionally conservative: - Only acts on models in states QUEUED/DELETING. - - Performs a minimal GPU precheck using ray available GPU totals. + - Performs a minimal GPU precheck using ray available GPU totals (per-model). - Writes events and state transitions for explainability. """ @@ -67,8 +66,6 @@ class ServingReconciler: v2_cfg: V2Config ray_runtime_env_env_vars: dict[str, str] serve_client: ServeClient - app_name: str = "argus_llm_app" - route_prefix: str = "/" cpu_per_gpu: float = 1.0 get_available_fn: Any = get_cluster_available @@ -88,33 +85,40 @@ class ServingReconciler: self.db.append_serve_event(model_key=model_key, event_type="SERVE_START_ERROR", payload_json=repr(e)) return - # Desired set: all non-deleted models except those marked DELETING. - all_rows = self.db.list_all_serve_models(include_deleted=False, limit=5000, offset=0) - # FAILED models are not part of the desired running set. A user can PATCH to - # re-queue a failed model (e.g., after fixing env/deps) which will move it back to QUEUED. - desired_rows = [r for r in all_rows if str(r.get("state") or "") not in ("DELETING", "DELETED", "FAILED")] + # Deleting is per-model: remove the model's app by name. This avoids touching other models. + if state == "DELETING": + try: + self.db.append_serve_event(model_key=model_key, event_type="SERVE_DELETE_REQUESTED", payload_json=None) + self.serve_client.delete_app(app_name=model_key) + except Exception as e: + err = f"{type(e).__name__}: {e}" + tb = traceback.format_exc(limit=10) + self.db.set_serve_model_state( + model_key=model_key, state="FAILED", error_summary=err, event_type="SERVE_DELETE_FAILED", payload_json=tb + ) + return + self.db.set_serve_model_state(model_key=model_key, state="DELETED", event_type="SERVE_DELETE_APPLIED") + return - # Precheck resources: multi-model app apply needs enough GPUs for the whole desired set. - needed = _needed_total_gpus(desired_rows) + # Precheck resources: per-model apply needs enough GPUs for this model only. + needed = _needed_model_gpus(change) avail: ClusterAvailable = self.get_available_fn() if float(avail.total_available_gpus) < float(needed): msg = f"Insufficient GPUs: need {needed}, available {avail.total_available_gpus}" self.db.append_serve_event(model_key=model_key, event_type="SERVE_PENDING_RESOURCES", payload_json=msg) return - # Build per-model LLM configs (dict form in M4). - llm_cfg_dicts: list[dict[str, Any]] = [] + # Build the single-model LLM config (dict form for unit tests). accelerator_type = str(self.v2_cfg.serving.llm.accelerator_type or "") - for r in desired_rows: - resolved = _row_to_resolved_spec(r) - llm_cfg_dicts.append( - build_llm_config_dict( - resolved, - accelerator_type=accelerator_type, - runtime_env_env_vars=self.ray_runtime_env_env_vars, - cpu_per_gpu=self.cpu_per_gpu, - ) + resolved = _row_to_resolved_spec(change) + llm_cfg_dicts = [ + build_llm_config_dict( + resolved, + accelerator_type=accelerator_type, + runtime_env_env_vars=self.ray_runtime_env_env_vars, + cpu_per_gpu=self.cpu_per_gpu, ) + ] # Build a Ray Serve OpenAI-compatible app if Ray Serve LLM is available. # Fall back to a plain dict so unit tests can run without real Ray Serve. @@ -129,19 +133,14 @@ class ServingReconciler: app_obj = {"llm_configs": llm_cfg_dicts} try: - self.db.append_serve_event(model_key=model_key, event_type="SERVE_APPLY_REQUESTED", payload_json=str(len(llm_cfg_dicts))) - self.serve_client.apply_app(app=app_obj, app_name=self.app_name, route_prefix=self.route_prefix) + self.db.append_serve_event(model_key=model_key, event_type="SERVE_APPLY_REQUESTED", payload_json="1") + self.serve_client.apply_app(app=app_obj, app_name=model_key, route_prefix=f"/serve/{model_key}") except Exception as e: err = f"{type(e).__name__}: {e}" tb = traceback.format_exc(limit=10) self.db.set_serve_model_state(model_key=model_key, state="FAILED", error_summary=err, event_type="SERVE_APPLY_FAILED", payload_json=tb) return - # Apply succeeded. Update the changing model's state. - if state == "DELETING": - self.db.set_serve_model_state(model_key=model_key, state="DELETED", event_type="SERVE_DELETE_APPLIED") - return - # Mark as deploying; best-effort status probe can promote to RUNNING. self.db.set_serve_model_state(model_key=model_key, state="DEPLOYING", event_type="SERVE_DEPLOYING") try: diff --git a/src/mvp/py/argus/service/ui.py b/src/mvp/py/argus/service/ui.py index c77b629..d46be5b 100644 --- a/src/mvp/py/argus/service/ui.py +++ b/src/mvp/py/argus/service/ui.py @@ -1001,7 +1001,6 @@ refresh();
Loading...
@@ -1009,7 +1008,6 @@ refresh(); """.strip() script = """ document.getElementById("nav-ray-dashboard").href = curOriginWithPort(8265); -document.getElementById("openai-models").href = curOriginWithPort(8000) + "/v1/models"; const out = document.getElementById("out"); function pill(state) { @@ -1031,12 +1029,15 @@ async function refresh() { const prevDisabled = off <= 0; const nextDisabled = !hasMore; + const baseHint = curOriginWithPort(8000) + "/serve//v1"; function row(m) { + const endpoint = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/serve/" + encodeURIComponent(m.model_key) + "/v1"); return ` ${m.model_key} ${m.model_id} ${pill(m.state)} ${m.num_replicas} × ${m.gpus_per_replica} GPU + ${endpoint} ${m.updated_at || ""} `; } @@ -1044,7 +1045,7 @@ async function refresh() { out.innerHTML = `
-
OpenAI base: ${resp.openai_base_url || curOriginWithPort(8000) + "/v1"}
+
Per-model OpenAI base: ${baseHint}
Page ${pageNo} @@ -1052,8 +1053,8 @@ async function refresh() {
- - ${rows || ""} + + ${rows || ""}
Model KeyModel IDStateResourcesUpdated
(none)
Model KeyModel IDStateResourcesEndpointUpdated
(none)
`; @@ -1160,8 +1161,8 @@ document.getElementById("submit").onclick = async () => { """.strip() script = f""" document.getElementById("nav-ray-dashboard").href = curOriginWithPort(8265); -document.getElementById("openai-models").href = curOriginWithPort(8000) + "/v1/models"; const modelKey = {json.dumps(model_key)}; +document.getElementById("openai-models").href = curOriginWithPort(8000) + "/serve/" + encodeURIComponent(modelKey) + "/v1/models"; const meta = document.getElementById("meta"); const spec = document.getElementById("spec"); const eventsEl = document.getElementById("events"); @@ -1194,19 +1195,20 @@ async function refresh() {{ const obj = await apiJson("/api/v2/serve/models/" + encodeURIComponent(modelKey)); const m = obj.model || {{}}; replicas.value = String(m.num_replicas || 1); + const endpoint = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/serve/" + encodeURIComponent(modelKey) + "/v1"); meta.innerHTML = `
state: ${{pill(m.state)}}
model_id: ${{m.model_id || ""}}
source: ${{m.model_source || ""}}
-
endpoint: ${{(m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/v1")}}
+
endpoint: ${{endpoint}}
`; spec.textContent = obj.resolved_spec_yaml || ""; eventsEl.innerHTML = renderEvents(obj.events || []); - const base = (m.endpoint && m.endpoint.openai_base_url) || (curOriginWithPort(8000) + "/v1"); + const base = endpoint; const mid = m.model_id || ""; - example.textContent = `curl -sS -H 'Content-Type: application/json' -H 'Authorization: Bearer FAKE_KEY' \\\\\\n -X POST ${{base}}/chat/completions \\\\\\n --data-binary '{{\\"model\\":\\"${{mid}}\\",\\"messages\\":[{{\\"role\\":\\"user\\",\\"content\\":\\"hello\\"}}],\\"max_tokens\\":16,\\"stream\\":false}}' | python3 -m json.tool`; + example.textContent = `curl -sS -H 'Content-Type: application/json' \\\\\\n -X POST ${{base}}/chat/completions \\\\\\n --data-binary '{{\\"model\\":\\"${{mid}}\\",\\"messages\\":[{{\\"role\\":\\"user\\",\\"content\\":\\"hello\\"}}],\\"max_tokens\\":16,\\"stream\\":false}}' | python3 -m json.tool`; }} catch (e) {{ meta.textContent = "Error: " + (e.status || "") + "\\n" + (e.body || String(e)); spec.textContent = ""; diff --git a/src/mvp/py/tests/test_app_serving_api.py b/src/mvp/py/tests/test_app_serving_api.py index 4f9167f..541255f 100644 --- a/src/mvp/py/tests/test_app_serving_api.py +++ b/src/mvp/py/tests/test_app_serving_api.py @@ -75,9 +75,10 @@ def test_serving_api_crud_flow(tmp_path: Path, monkeypatch): r4 = c.get("/api/v2/serve/models?limit=10&offset=0", headers=headers) assert r4.status_code == 200 obj = r4.json() - assert obj["openai_base_url"] == "http://testserver:8000/v1" + assert obj["openai_base_url"] is None assert len(obj["items"]) == 1 assert obj["items"][0]["model_key"] == "mk-alice" + assert obj["items"][0]["endpoint"]["openai_base_url"] == "http://testserver:8000/serve/mk-alice/v1" r5 = c.get("/api/v2/serve/models/mk-alice", headers=headers) assert r5.status_code == 200 @@ -99,6 +100,7 @@ def test_serving_api_crud_flow(tmp_path: Path, monkeypatch): r8 = c.get("/api/v2/serve/status", headers=admin_headers) assert r8.status_code == 200 assert r8.json()["http_port"] == 8000 + assert r8.json()["openai_base_url"] is None def test_serving_api_rejects_path_outside_user_and_hf(tmp_path: Path, monkeypatch): @@ -245,8 +247,17 @@ def test_serving_api_list_include_deleted_and_forwarded_base_url(tmp_path: Path, headers={**headers, "x-forwarded-host": "example.com:8080", "x-forwarded-proto": "https"}, ) assert r1.status_code == 200 - assert r1.json()["openai_base_url"] == "https://example.com:8000/v1" + assert r1.json()["openai_base_url"] is None assert {m["model_key"] for m in r1.json()["items"]} == {"mk-alice-1"} + assert r1.json()["items"][0]["endpoint"]["openai_base_url"] == "https://example.com:8000/serve/mk-alice-1/v1" + + # IPv6 bracketed host should be preserved. + r1b = c.get( + "/api/v2/serve/models?limit=10&offset=0&include_deleted=0", + headers={**headers, "x-forwarded-host": "[::1]:8080", "x-forwarded-proto": "https"}, + ) + assert r1b.status_code == 200 + assert r1b.json()["items"][0]["endpoint"]["openai_base_url"] == "https://[::1]:8000/serve/mk-alice-1/v1" r2 = c.get("/api/v2/serve/models?include_deleted=1", headers=headers) assert r2.status_code == 200 diff --git a/src/mvp/py/tests/test_serve_client.py b/src/mvp/py/tests/test_serve_client.py index 1f79d60..3a7d2b5 100644 --- a/src/mvp/py/tests/test_serve_client.py +++ b/src/mvp/py/tests/test_serve_client.py @@ -3,6 +3,8 @@ from __future__ import annotations import sys import types +import pytest + def test_ray_serve_client_calls_start_run_status(monkeypatch): import ray # provided by conftest stub @@ -28,9 +30,14 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch): calls.append(("serve.status", None)) return {"ok": True} + def _delete(name: str): + calls.append(("serve.delete", {"name": name})) + return {"deleted": True} + serve.start = _start # type: ignore[attr-defined] serve.run = _run # type: ignore[attr-defined] serve.status = _status # type: ignore[attr-defined] + serve.delete = _delete # type: ignore[attr-defined] sys.modules["ray.serve"] = serve ray.serve = serve # type: ignore[attr-defined] @@ -41,9 +48,11 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch): client.ensure_started() out = client.apply_app(app="APP", app_name="argus_llm_app", route_prefix="/") st = client.get_status() + deleted = client.delete_app(app_name="mk1") assert out == {"deployed": True} assert st == {"ok": True} + assert deleted == {"deleted": True} # Verify call order and key args. assert calls[0][0] == "ray.init" @@ -53,3 +62,19 @@ def test_ray_serve_client_calls_start_run_status(monkeypatch): assert calls[2][0] == "serve.run" assert calls[2][1]["name"] == "argus_llm_app" assert calls[3][0] == "serve.status" + assert calls[4][0] == "serve.delete" + + +def test_ray_serve_client_rejects_dict_app(monkeypatch): + import ray # provided by conftest stub + + def _init(*args, **kwargs): + return None + + monkeypatch.setattr(ray, "init", _init, raising=False) + + from argus.service.serve_client import RayServeClient + + client = RayServeClient(http_port=8000, proxy_location="HeadOnly", ray_init_address="auto") + with pytest.raises(ValueError, match="invalid serve app object"): + client.apply_app(app={"llm_configs": []}, app_name="mk1", route_prefix="/serve/mk1") diff --git a/src/mvp/py/tests/test_serving_reconciler.py b/src/mvp/py/tests/test_serving_reconciler.py index 0859f20..e28291c 100644 --- a/src/mvp/py/tests/test_serving_reconciler.py +++ b/src/mvp/py/tests/test_serving_reconciler.py @@ -8,11 +8,16 @@ class _FakeServeClient: def __init__(self): self.started = 0 self.applied = [] + self.deleted = [] self.status_calls = 0 self.fail_apply = False self.fail_status = False + self.fail_delete = False + self.fail_start = False def ensure_started(self) -> None: + if self.fail_start: + raise RuntimeError("start boom") self.started += 1 def apply_app(self, *, app, app_name: str, route_prefix: str = "/"): @@ -21,6 +26,12 @@ class _FakeServeClient: self.applied.append({"app": app, "app_name": app_name, "route_prefix": route_prefix}) return {"ok": True} + def delete_app(self, *, app_name: str): + if self.fail_delete: + raise RuntimeError("delete boom") + self.deleted.append({"app_name": app_name}) + return {"ok": True} + def get_status(self): self.status_calls += 1 if self.fail_status: @@ -133,6 +144,8 @@ def test_reconciler_apply_success_marks_running(tmp_path: Path): rec.tick() assert client.started == 1 assert len(client.applied) == 1 + assert client.applied[0]["app_name"] == "mk1" + assert client.applied[0]["route_prefix"] == "/serve/mk1" applied = client.applied[0]["app"]["llm_configs"] assert applied[0]["engine_kwargs"]["tensor_parallel_size"] == 1 assert applied[0]["runtime_env"]["env_vars"]["HF_HUB_OFFLINE"] == "1" @@ -167,9 +180,8 @@ def test_reconciler_delete_removes_and_marks_deleted(tmp_path: Path): get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(), ) rec.tick() - assert len(client.applied) == 1 - cfgs = client.applied[0]["app"]["llm_configs"] - assert {c["model_loading_config"]["model_id"] for c in cfgs} == {"alice-202601061235-x"} # only keep remains + assert client.applied == [] + assert client.deleted == [{"app_name": "del"}] row = db.get_serve_model("del") assert row and row["state"] == "DELETED" assert row.get("deleted_at") @@ -205,3 +217,138 @@ def test_reconciler_apply_failure_marks_failed(tmp_path: Path): row = db.get_serve_model("mk1") assert row and row["state"] == "FAILED" assert row.get("error_summary") + + +def test_reconciler_applies_one_model_per_tick(tmp_path: Path): + from argus.service.config import V2Config + from argus.service.db import Db + from argus.service.serving_reconciler import ServingReconciler + + cfg = V2Config.from_root_dict( + { + "ray": {"shared_root": "/private"}, + "service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}}, + "data": {"sftpgo": {}, "retention": {}}, + "serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}}, + } + ) + db = Db(cfg.sqlite.db_path) + db.init() + _seed_model(db, model_key="mk1", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1) + _seed_model(db, model_key="mk2", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1) + + client = _FakeServeClient() + rec = ServingReconciler( + db=db, + v2_cfg=cfg, + ray_runtime_env_env_vars={}, + serve_client=client, + get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(), + ) + + rec.tick() + assert len(client.applied) == 1 + assert client.applied[0]["app_name"] == "mk1" + assert db.get_serve_model("mk1")["state"] == "RUNNING" + assert db.get_serve_model("mk2")["state"] == "QUEUED" + + rec.tick() + assert len(client.applied) == 2 + assert client.applied[1]["app_name"] == "mk2" + assert db.get_serve_model("mk2")["state"] == "RUNNING" + + +def test_reconciler_start_error_is_recorded(tmp_path: Path): + from argus.service.config import V2Config + from argus.service.db import Db + from argus.service.serving_reconciler import ServingReconciler + + cfg = V2Config.from_root_dict( + { + "ray": {"shared_root": "/private"}, + "service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}}, + "data": {"sftpgo": {}, "retention": {}}, + "serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}}, + } + ) + db = Db(cfg.sqlite.db_path) + db.init() + _seed_model(db, model_key="mk1", user_id="alice", state="QUEUED", num_replicas=1, gpus_per_replica=1) + + client = _FakeServeClient() + client.fail_start = True + rec = ServingReconciler(db=db, v2_cfg=cfg, ray_runtime_env_env_vars={}, serve_client=client) + rec.tick() + assert db.get_serve_model("mk1")["state"] == "QUEUED" + ev = db.list_serve_events("mk1", limit=50) + assert any(e["event_type"] == "SERVE_START_ERROR" for e in ev) + + +def test_reconciler_delete_failure_marks_failed(tmp_path: Path): + from argus.service.config import V2Config + from argus.service.db import Db + from argus.service.serving_reconciler import ServingReconciler + + cfg = V2Config.from_root_dict( + { + "ray": {"shared_root": "/private"}, + "service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}}, + "data": {"sftpgo": {}, "retention": {}}, + "serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}}, + } + ) + db = Db(cfg.sqlite.db_path) + db.init() + _seed_model(db, model_key="mk1", user_id="alice", state="DELETING", num_replicas=1, gpus_per_replica=1) + + client = _FakeServeClient() + client.fail_delete = True + rec = ServingReconciler(db=db, v2_cfg=cfg, ray_runtime_env_env_vars={}, serve_client=client) + rec.tick() + row = db.get_serve_model("mk1") + assert row and row["state"] == "FAILED" + assert row.get("error_summary") + + +def test_reconciler_engine_kwargs_bad_json_is_ignored(tmp_path: Path): + from argus.service.config import V2Config + from argus.service.db import Db + from argus.service.serving_reconciler import ServingReconciler + + cfg = V2Config.from_root_dict( + { + "ray": {"shared_root": "/private"}, + "service": {"api": {}, "auth": {}, "sqlite": {"db_path": str(tmp_path / "mvp.sqlite3")}, "scheduler": {}}, + "data": {"sftpgo": {}, "retention": {}}, + "serving": {"serve": {"http_port": 8000}, "llm": {"accelerator_type": "H20"}}, + } + ) + db = Db(cfg.sqlite.db_path) + db.init() + + db.create_serve_model( + model_key="mk1", + user_id="alice", + model_id_suffix="x", + model_id_prefix="alice-202601061235", + model_id="alice-202601061235-x", + model_source="/private/hf/x", + num_replicas=1, + gpus_per_replica=1, + engine_kwargs_json="not-json", + spec_yaml="model_id: x\nmodel_source: $HOME/common/hf/x\n", + resolved_spec_yaml="user_id: alice\nmodel_id: alice-202601061235-x\n", + ) + db.set_serve_model_state(model_key="mk1", state="QUEUED", event_type="TEST_SEED") + + client = _FakeServeClient() + rec = ServingReconciler( + db=db, + v2_cfg=cfg, + ray_runtime_env_env_vars={}, + serve_client=client, + get_available_fn=lambda: type("A", (), {"total_available_gpus": 8, "total_available_npus": 0})(), + ) + rec.tick() + cfgs = client.applied[0]["app"]["llm_configs"] + assert cfgs[0]["engine_kwargs"]["tensor_parallel_size"] == 1 diff --git a/src/mvp/py/tests/test_ui_serving.py b/src/mvp/py/tests/test_ui_serving.py index 8b022cd..5262d9b 100644 --- a/src/mvp/py/tests/test_ui_serving.py +++ b/src/mvp/py/tests/test_ui_serving.py @@ -53,4 +53,4 @@ def test_ui_serving_contains_openai_port_8000(tmp_path, monkeypatch): r = c.get("/ui/serving") assert r.status_code == 200 assert "curOriginWithPort(8000)" in r.text - assert "/v1/models" in r.text + assert "/serve//v1" in r.text diff --git a/src/mvp/scripts/run_all_v38_serving.sh b/src/mvp/scripts/run_all_v38_serving.sh index 2c6f172..636f060 100755 --- a/src/mvp/scripts/run_all_v38_serving.sh +++ b/src/mvp/scripts/run_all_v38_serving.sh @@ -6,7 +6,6 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" source "${SCRIPT_DIR}/lib.sh" API_ADDR="${API_ADDR:-http://127.0.0.1:8080}" -OPENAI_BASE_URL="${OPENAI_BASE_URL:-http://127.0.0.1:8000/v1}" ADMIN_TOKEN="${MVP_INTERNAL_TOKEN:-}" USER_ID="${USER_ID:-alice}" EXPECTED_RAY_NODES="${EXPECTED_RAY_NODES:-3}" # head + 2 workers @@ -57,7 +56,7 @@ ray_wait_nodes() { local tries="${2:-60}" for i in $(seq 1 "${tries}"); do local out n - out="$(docker exec -i "${HEAD_CONTAINER}" python3 -c "import ray; ray.init(address='auto', ignore_reinit_error=True, log_to_driver=False, logging_level='ERROR'); print(sum(1 for n in ray.nodes() if n.get('Alive')))" 2>/dev/null || true)" + out="$(docker exec "${HEAD_CONTAINER}" python3 -c "import ray; ray.init(address='auto', ignore_reinit_error=True, log_to_driver=False, logging_level='ERROR'); print(sum(1 for n in ray.nodes() if n.get('Alive')))" 2>/dev/null || true)" n="$(printf '%s\n' "${out}" | tail -n 1 | tr -cd '0-9' || true)" if [[ "${n}" =~ ^[0-9]+$ ]]; then echo "[host] ray_nodes_alive=${n} (want>=${want})" @@ -75,16 +74,17 @@ ray_wait_nodes() { } openai_wait_ready() { - local tries="${1:-120}" + local base_url="$1" + local tries="${2:-120}" for i in $(seq 1 "${tries}"); do - if curl -sS -m 2 "${OPENAI_BASE_URL}/models" >/dev/null 2>&1; then - echo "[host] openai_ready: ${OPENAI_BASE_URL}" + if curl -sS -m 2 "${base_url}/models" >/dev/null 2>&1; then + echo "[host] openai_ready: ${base_url}" return 0 fi echo "[host] waiting openai... (${i}/${tries})" sleep 2 done - echo "ERROR: openai not ready: ${OPENAI_BASE_URL}" >&2 + echo "ERROR: openai not ready: ${base_url}" >&2 return 1 } @@ -112,6 +112,64 @@ wait_model_state() { return 1 } +model_openai_base_url() { + local token="$1" + local model_key="$2" + curl -sS -H "Authorization: Bearer ${token}" "${API_ADDR}/api/v2/serve/models/${model_key}" \ + | python3 -c 'import sys,json; print(json.load(sys.stdin)["model"]["endpoint"]["openai_base_url"])' +} + +serve_replica_fingerprint_json() { + local model_key="$1" + # Best-effort: list ServeReplica actors for this model app and return a stable JSON signature. + dexec "${HEAD_CONTAINER}" python3 - "${model_key}" <<'PY' | tail -n 1 +import json +import sys + +import ray + +model_key = sys.argv[1] +ray.init(address="auto", ignore_reinit_error=True, log_to_driver=False) +try: + from ray.util.state import list_actors +except Exception: + from ray.experimental.state.api import list_actors # type: ignore + +actors = list_actors(limit=10000) +want = f"ServeReplica:{model_key}:" +out = [] +for a in actors or []: + if isinstance(a, dict): + cls = a.get("class_name") + st = a.get("state") + actor_id = a.get("actor_id") + node_id = a.get("node_id") + pid = a.get("pid") + else: + cls = getattr(a, "class_name", None) + st = getattr(a, "state", None) + actor_id = getattr(a, "actor_id", None) + node_id = getattr(a, "node_id", None) + pid = getattr(a, "pid", None) + + cls_s = str(cls or "") + if not cls_s.startswith(want): + continue + if str(st or "") and str(st) != "ALIVE": + continue + out.append( + { + "actor_id": actor_id, + "node_id": node_id, + "class_name": cls_s, + "pid": pid, + } + ) +out = sorted(out, key=lambda x: (x.get("class_name") or "", x.get("actor_id") or "")) +print(json.dumps(out, sort_keys=True)) +PY +} + echo "[host] ===== run_all_v38_serving.sh begin =====" "${SCRIPT_DIR}/00_prereq_check.sh" @@ -122,8 +180,8 @@ echo "[host] bring down existing containers (best-effort)" "${SCRIPT_DIR}/02_down.sh" || true echo "[host] (re)create containers (Ray + SFTPGo + W&B)" -# For v3.8, we need the latest ray-node image (ray[llm] deps). Force build once. -BUILD="${BUILD:-1}" "${SCRIPT_DIR}/01_up.sh" +# Default: don't rebuild if image already exists (01_up.sh will auto-build if missing). +BUILD="${BUILD:-0}" "${SCRIPT_DIR}/01_up.sh" echo "[host] wait ray ready" ray_wait_ready 60 @@ -150,44 +208,84 @@ if [[ -z "${LOCAL_MODEL_PATH}" || "${LOCAL_MODEL_PATH}" != /* ]]; then fi echo "[host] local_model_path: ${LOCAL_MODEL_PATH}" -echo "[host] submit serving model via API" -SERVE_SPEC=$'model_id: qwen-0.5b\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n' -CREATE_RESP="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC}" "${API_ADDR}/api/v2/serve/models")" -echo "[host] create_model_resp: ${CREATE_RESP}" -MODEL_KEY="$(printf '%s' "${CREATE_RESP}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')" +echo "[host] submit serving model A via API" +SERVE_SPEC_A=$'model_id: qwen-0.5b-a\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n' +CREATE_RESP_A="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC_A}" "${API_ADDR}/api/v2/serve/models")" +echo "[host] create_model_a_resp: ${CREATE_RESP_A}" +MODEL_KEY_A="$(printf '%s' "${CREATE_RESP_A}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')" -echo "[host] wait model RUNNING" -wait_model_state "${USER_TOKEN}" "${MODEL_KEY}" "RUNNING" 300 +echo "[host] wait model A RUNNING" +wait_model_state "${USER_TOKEN}" "${MODEL_KEY_A}" "RUNNING" 300 +OPENAI_BASE_A="$(model_openai_base_url "${USER_TOKEN}" "${MODEL_KEY_A}")" +echo "[host] openai_base_a: ${OPENAI_BASE_A}" +openai_wait_ready "${OPENAI_BASE_A}" 120 -echo "[host] wait OpenAI ingress ready" -openai_wait_ready 120 - -echo "[host] verify /v1/models contains model" -MODEL_ID="$( - curl -sS "${OPENAI_BASE_URL}/models" \ +echo "[host] verify A /models contains model" +MODEL_ID_A="$( + curl -sS "${OPENAI_BASE_A}/models" \ | python3 -c 'import sys,json; obj=json.load(sys.stdin); print("\n".join([m.get("id","") for m in obj.get("data",[]) if isinstance(m,dict)]))' \ - | grep -E "^${USER_ID}-[0-9]{12}-qwen-0\\.5b$" \ + | grep -E "^${USER_ID}-[0-9]{12}-qwen-0\\.5b-a$" \ | head -n1 \ || true )" -if [[ -z "${MODEL_ID}" ]]; then - echo "ERROR: model id not found in /v1/models" >&2 - curl -sS "${OPENAI_BASE_URL}/models" | python3 -m json.tool >&2 || true +if [[ -z "${MODEL_ID_A}" ]]; then + echo "ERROR: model id A not found in /models" >&2 + curl -sS "${OPENAI_BASE_A}/models" | python3 -m json.tool >&2 || true exit 1 fi -echo "[host] model_id: ${MODEL_ID}" +echo "[host] model_id_a: ${MODEL_ID_A}" -echo "[host] chat completion (best-effort)" -CHAT_RESP="$(curl -sS -H "Content-Type: application/json" -H "Authorization: Bearer FAKE_KEY" -X POST "${OPENAI_BASE_URL}/chat/completions" --data-binary "{\"model\":\"${MODEL_ID}\",\"messages\":[{\"role\":\"user\",\"content\":\"hello\"}],\"max_tokens\":16,\"stream\":false}")" -printf '%s\n' "${CHAT_RESP}" | python3 -m json.tool >/dev/null 2>&1 || { - echo "ERROR: invalid chat response" >&2 - printf '%s\n' "${CHAT_RESP}" >&2 +echo "[host] chat completion A (best-effort)" +CHAT_RESP_A="$(curl -sS -H "Content-Type: application/json" -X POST "${OPENAI_BASE_A}/chat/completions" --data-binary "{\"model\":\"${MODEL_ID_A}\",\"messages\":[{\"role\":\"user\",\"content\":\"hello\"}],\"max_tokens\":16,\"stream\":false}")" +printf '%s\n' "${CHAT_RESP_A}" | python3 -m json.tool >/dev/null 2>&1 || { + echo "ERROR: invalid chat response A" >&2 + printf '%s\n' "${CHAT_RESP_A}" >&2 exit 1 } -echo "[host] chat_ok" +echo "[host] chat_a_ok" -echo "[host] delete model" -curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY}" >/dev/null -wait_model_state "${USER_TOKEN}" "${MODEL_KEY}" "DELETED" 300 +echo "[host] fingerprint A (serve replica actors)" +FP_A_1="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")" +echo "[host] fp_a_1: ${FP_A_1}" +if [[ -z "${FP_A_1}" || "${FP_A_1}" == "[]" ]]; then + echo "ERROR: failed to fingerprint A replicas (empty)" >&2 + exit 1 +fi + +echo "[host] submit serving model B via API" +SERVE_SPEC_B=$'model_id: qwen-0.5b-b\nmodel_source: '"${LOCAL_MODEL_PATH}"$'\nnum_replicas: 1\ngpus_per_replica: 1\n' +CREATE_RESP_B="$(curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -H "Content-Type: application/yaml" --data-binary "${SERVE_SPEC_B}" "${API_ADDR}/api/v2/serve/models")" +echo "[host] create_model_b_resp: ${CREATE_RESP_B}" +MODEL_KEY_B="$(printf '%s' "${CREATE_RESP_B}" | python3 -c 'import sys,json; print(json.load(sys.stdin)["model_key"])')" + +echo "[host] wait model B RUNNING" +wait_model_state "${USER_TOKEN}" "${MODEL_KEY_B}" "RUNNING" 300 +OPENAI_BASE_B="$(model_openai_base_url "${USER_TOKEN}" "${MODEL_KEY_B}")" +echo "[host] openai_base_b: ${OPENAI_BASE_B}" +openai_wait_ready "${OPENAI_BASE_B}" 120 + +echo "[host] verify isolation: A replica fingerprint unchanged after creating B" +FP_A_2="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")" +echo "[host] fp_a_2: ${FP_A_2}" +if [[ "${FP_A_2}" != "${FP_A_1}" ]]; then + echo "ERROR: A replicas changed after creating B (unexpected redeploy)" >&2 + exit 1 +fi + +echo "[host] delete model B" +curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY_B}" >/dev/null +wait_model_state "${USER_TOKEN}" "${MODEL_KEY_B}" "DELETED" 300 + +echo "[host] verify isolation: A replica fingerprint unchanged after deleting B" +FP_A_3="$(serve_replica_fingerprint_json "${MODEL_KEY_A}")" +echo "[host] fp_a_3: ${FP_A_3}" +if [[ "${FP_A_3}" != "${FP_A_1}" ]]; then + echo "ERROR: A replicas changed after deleting B (unexpected redeploy)" >&2 + exit 1 +fi + +echo "[host] delete model A" +curl -sS -H "Authorization: Bearer ${USER_TOKEN}" -X DELETE "${API_ADDR}/api/v2/serve/models/${MODEL_KEY_A}" >/dev/null +wait_model_state "${USER_TOKEN}" "${MODEL_KEY_A}" "DELETED" 300 echo "[host] ===== run_all_v38_serving.sh done ====="