From d9461b3991b849e0310d97480faa850e12c000c4 Mon Sep 17 00:00:00 2001 From: yuyr Date: Wed, 24 Sep 2025 03:48:05 +0000 Subject: [PATCH] =?UTF-8?q?[#2]=20master=E6=A8=A1=E5=9D=97=E5=BC=80?= =?UTF-8?q?=E5=8F=91=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/.gitignore | 2 + src/master/Dockerfile | 25 ++ src/master/README.md | 48 +++ src/master/app/__init__.py | 41 +++ src/master/app/config.py | 40 +++ src/master/app/models.py | 171 +++++++++ src/master/app/nodes_api.py | 155 ++++++++ src/master/app/routes.py | 24 ++ src/master/app/scheduler.py | 90 +++++ src/master/app/storage.py | 332 ++++++++++++++++++ src/master/app/util.py | 51 +++ src/master/images/.gitkeep | 0 src/master/requirements.txt | 2 + src/master/scripts/build_images.sh | 39 ++ src/master/scripts/load_images.sh | 39 ++ src/master/scripts/save_images.sh | 41 +++ src/master/tests/.gitignore | 2 + src/master/tests/docker-compose.yml | 18 + src/master/tests/scripts/00_e2e_test.sh | 25 ++ src/master/tests/scripts/01_up_master.sh | 41 +++ .../scripts/02_verify_ready_and_nodes_json.sh | 51 +++ .../tests/scripts/03_register_via_curl.sh | 68 ++++ .../scripts/04_reregister_and_error_cases.sh | 116 ++++++ .../scripts/05_status_report_via_curl.sh | 98 ++++++ .../06_config_update_and_nodes_json.sh | 56 +++ .../tests/scripts/07_stats_single_node.sh | 41 +++ .../tests/scripts/08_multi_node_stats.sh | 106 ++++++ .../tests/scripts/09_restart_persistence.sh | 161 +++++++++ src/master/tests/scripts/10_down.sh | 24 ++ 29 files changed, 1907 insertions(+) create mode 100644 src/.gitignore create mode 100644 src/master/Dockerfile create mode 100644 src/master/app/__init__.py create mode 100644 src/master/app/config.py create mode 100644 src/master/app/models.py create mode 100644 src/master/app/nodes_api.py create mode 100644 src/master/app/routes.py create mode 100644 src/master/app/scheduler.py create mode 100644 src/master/app/storage.py create mode 100644 src/master/app/util.py create mode 100644 src/master/images/.gitkeep create mode 100644 src/master/requirements.txt create mode 100755 src/master/scripts/build_images.sh create mode 100755 src/master/scripts/load_images.sh create mode 100755 src/master/scripts/save_images.sh create mode 100644 src/master/tests/.gitignore create mode 100644 src/master/tests/docker-compose.yml create mode 100755 src/master/tests/scripts/00_e2e_test.sh create mode 100755 src/master/tests/scripts/01_up_master.sh create mode 100755 src/master/tests/scripts/02_verify_ready_and_nodes_json.sh create mode 100755 src/master/tests/scripts/03_register_via_curl.sh create mode 100755 src/master/tests/scripts/04_reregister_and_error_cases.sh create mode 100755 src/master/tests/scripts/05_status_report_via_curl.sh create mode 100755 src/master/tests/scripts/06_config_update_and_nodes_json.sh create mode 100755 src/master/tests/scripts/07_stats_single_node.sh create mode 100755 src/master/tests/scripts/08_multi_node_stats.sh create mode 100755 src/master/tests/scripts/09_restart_persistence.sh create mode 100755 src/master/tests/scripts/10_down.sh diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..1b05740 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1,2 @@ + +__pycache__/ diff --git a/src/master/Dockerfile b/src/master/Dockerfile new file mode 100644 index 0000000..9ab73de --- /dev/null +++ b/src/master/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.11-slim + +SHELL ["/bin/bash", "-c"] + +ARG PIP_INDEX_URL= +ENV PIP_NO_CACHE_DIR=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app + +WORKDIR /app + +COPY requirements.txt ./ +RUN set -euxo pipefail \ + && python -m pip install --upgrade pip \ + && if [[ -n "$PIP_INDEX_URL" ]]; then \ + PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \ + else \ + python -m pip install -r requirements.txt; \ + fi + +COPY app ./app + +EXPOSE 3000 + +CMD ["gunicorn", "--bind", "0.0.0.0:3000", "app:create_app()"] diff --git a/src/master/README.md b/src/master/README.md index e69de29..4a24d22 100644 --- a/src/master/README.md +++ b/src/master/README.md @@ -0,0 +1,48 @@ +# Argus Master Module + +A lightweight Flask + SQLite service that manages Argus agent nodes. It exposes node registration, status updates, configuration management, statistics, and generates `nodes.json` for the metric module. + +## Build & Run + +```bash +cd src/master +./scripts/build_images.sh # builds argus-master:dev +``` + +Local run via Docker compose (test stack): + +```bash +cd src/master/tests +./scripts/01_up_master.sh +``` + +Service listens on port `3000` (`31300` when using the provided test compose). Key environment variables: + +- `DB_PATH` (default `/private/argus/master/db.sqlite3`) +- `METRIC_NODES_JSON_PATH` (default `/private/argus/metric/prometheus/nodes.json`) +- `OFFLINE_THRESHOLD_SECONDS`, `ONLINE_THRESHOLD_SECONDS`, `SCHEDULER_INTERVAL_SECONDS` + +## REST API Summary + +Base path: `/api/v1/master` + +- `GET /nodes` — list node summaries +- `GET /nodes/{id}` — node detail +- `POST /nodes` — register/re-register +- `PUT /nodes/{id}/status` — status report (timestamp + health map) +- `PUT /nodes/{id}/config` — update config/labels (partial) +- `GET /nodes/statistics` — totals + grouped counts +- `GET /healthz`, `GET /readyz` — health checks + +`nodes.json` only contains nodes whose status is `online`. + +## Tests + +End-to-end tests are Docker based: + +```bash +cd src/master/tests +./scripts/00_e2e_test.sh +``` + +Scripts create temporary `private/` and `tmp/` directories under `tests/`; they are cleaned automatically by `06_down.sh`. diff --git a/src/master/app/__init__.py b/src/master/app/__init__.py new file mode 100644 index 0000000..9e66eaa --- /dev/null +++ b/src/master/app/__init__.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import atexit +import logging + +from flask import Flask + +from .config import AppConfig, load_config +from .routes import register_routes +from .scheduler import StatusScheduler +from .storage import Storage + + +def create_app(config: AppConfig | None = None) -> Flask: + app_config = config or load_config() + storage = Storage(app_config.db_path, app_config.node_id_prefix) + scheduler = StatusScheduler(storage, app_config) + + app = Flask(__name__) + app.config["APP_CONFIG"] = app_config + app.config["STORAGE"] = storage + app.config["SCHEDULER"] = scheduler + + register_routes(app, storage, scheduler, app_config) + + scheduler.start() + + def _cleanup() -> None: + logging.getLogger("argus.master").info("Shutting down master app") + try: + scheduler.stop() + except Exception: # pragma: no cover - defensive + logging.getLogger("argus.master").exception("Failed to stop scheduler") + try: + storage.close() + except Exception: # pragma: no cover - defensive + logging.getLogger("argus.master").exception("Failed to close storage") + + atexit.register(_cleanup) + + return app diff --git a/src/master/app/config.py b/src/master/app/config.py new file mode 100644 index 0000000..246d3bf --- /dev/null +++ b/src/master/app/config.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass(frozen=True) +class AppConfig: + db_path: str + metric_nodes_json_path: str + offline_threshold_seconds: int + online_threshold_seconds: int + scheduler_interval_seconds: int + node_id_prefix: str + auth_mode: str + + +def _get_int_env(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None or raw.strip() == "": + return default + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"Environment variable {name} must be an integer, got {raw!r}") from exc + + +def load_config() -> AppConfig: + """读取环境变量生成配置对象,方便统一管理运行参数。""" + return AppConfig( + db_path=os.environ.get("DB_PATH", "/private/argus/master/db.sqlite3"), + metric_nodes_json_path=os.environ.get( + "METRIC_NODES_JSON_PATH", "/private/argus/metric/prometheus/nodes.json" + ), + offline_threshold_seconds=_get_int_env("OFFLINE_THRESHOLD_SECONDS", 180), + online_threshold_seconds=_get_int_env("ONLINE_THRESHOLD_SECONDS", 120), + scheduler_interval_seconds=_get_int_env("SCHEDULER_INTERVAL_SECONDS", 30), + node_id_prefix=os.environ.get("NODE_ID_PREFIX", "A"), + auth_mode=os.environ.get("AUTH_MODE", "disabled"), + ) diff --git a/src/master/app/models.py b/src/master/app/models.py new file mode 100644 index 0000000..f4e37a9 --- /dev/null +++ b/src/master/app/models.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any, Dict, Iterable, Mapping + +from .util import parse_iso + + +class ValidationError(Exception): + """Raised when user payload fails validation.""" + + +@dataclass +class Node: + id: str + name: str + type: str + version: str | None + status: str + config: Dict[str, Any] + labels: Iterable[str] + meta_data: Dict[str, Any] + health: Dict[str, Any] + register_time: str | None + last_report: str | None + agent_last_report: str | None + last_updated: str | None + + +def serialize_node_row(row: Mapping[str, Any]) -> Dict[str, Any]: + def _json_or_default(value: str | None, default: Any) -> Any: + if value is None or value == "": + return default + try: + return json.loads(value) + except json.JSONDecodeError: + return default + + config = _json_or_default(row["config_json"], {}) + labels = _json_or_default(row["labels_json"], []) + meta = _json_or_default(row["meta_json"], {}) + health = _json_or_default(row["health_json"], {}) + return { + "id": row["id"], + "name": row["name"], + "type": row["type"], + "version": row["version"], + "status": row["status"], + "config": config if isinstance(config, dict) else {}, + "label": list(labels) if isinstance(labels, list) else [], + "meta_data": meta if isinstance(meta, dict) else {}, + "health": health if isinstance(health, dict) else {}, + "register_time": row["register_time"], + "last_report": row["last_report"], + "agent_last_report": row["agent_last_report"], + "last_updated": row["last_updated"], + } + + +def serialize_node_summary(row: Mapping[str, Any]) -> Dict[str, Any]: + return { + "id": row["id"], + "name": row["name"], + "status": row["status"], + "type": row["type"], + "version": row["version"], + } + + +def validate_registration_payload(payload: Mapping[str, Any]) -> Dict[str, Any]: + if not isinstance(payload, Mapping): + raise ValidationError("Request body must be a JSON object") + + name = payload.get("name") + if not isinstance(name, str) or not name.strip(): + raise ValidationError("Field 'name' is required and must be a non-empty string") + + node_type = payload.get("type", "agent") + if not isinstance(node_type, str) or not node_type: + raise ValidationError("Field 'type' must be a string") + + version = payload.get("version") + if version is not None and not isinstance(version, str): + raise ValidationError("Field 'version' must be a string if provided") + + meta = payload.get("meta_data") + if not isinstance(meta, Mapping): + raise ValidationError("Field 'meta_data' must be an object") + + required_meta = ["hostname", "ip", "env", "user", "instance", "cpu_number", "memory_in_bytes", "gpu_number"] + for key in required_meta: + if key not in meta: + raise ValidationError(f"meta_data.{key} is required") + + cpu_number = meta["cpu_number"] + memory_in_bytes = meta["memory_in_bytes"] + gpu_number = meta["gpu_number"] + if not isinstance(cpu_number, int) or cpu_number < 0: + raise ValidationError("meta_data.cpu_number must be a non-negative integer") + if not isinstance(memory_in_bytes, int) or memory_in_bytes < 0: + raise ValidationError("meta_data.memory_in_bytes must be a non-negative integer") + if not isinstance(gpu_number, int) or gpu_number < 0: + raise ValidationError("meta_data.gpu_number must be a non-negative integer") + + node_id = payload.get("id") + if node_id is not None and (not isinstance(node_id, str) or not node_id.strip()): + raise ValidationError("Field 'id' must be a non-empty string when provided") + + return { + "id": node_id, + "name": name, + "type": node_type, + "version": version, + "meta_data": dict(meta), + } + + +def validate_status_payload(payload: Mapping[str, Any]) -> Dict[str, Any]: + if not isinstance(payload, Mapping): + raise ValidationError("Request body must be a JSON object") + + timestamp = payload.get("timestamp") + if not isinstance(timestamp, str) or not timestamp: + raise ValidationError("Field 'timestamp' is required and must be a string") + + parsed = parse_iso(timestamp) + if parsed is None: + raise ValidationError("Field 'timestamp' must be an ISO8601 datetime string") + + health = payload.get("health", {}) + if not isinstance(health, Mapping): + raise ValidationError("Field 'health' must be an object if provided") + + sanitized_health: Dict[str, Any] = {} + for key, value in health.items(): + if not isinstance(key, str): + raise ValidationError("Keys in 'health' must be strings") + if not isinstance(value, (Mapping, list, str, int, float, bool)) and value is not None: + raise ValidationError("Values in 'health' must be JSON-compatible") + sanitized_health[key] = value + + return { + "timestamp": timestamp, + "parsed_timestamp": parsed, + "health": sanitized_health, + } + + +def validate_config_payload(payload: Mapping[str, Any]) -> Dict[str, Any]: + if not isinstance(payload, Mapping): + raise ValidationError("Request body must be a JSON object") + + result: Dict[str, Any] = {} + if "config" in payload: + config = payload["config"] + if not isinstance(config, Mapping): + raise ValidationError("Field 'config' must be an object") + result["config"] = dict(config) + + if "label" in payload: + labels = payload["label"] + if not isinstance(labels, list) or not all(isinstance(item, str) for item in labels): + raise ValidationError("Field 'label' must be an array of strings") + result["label"] = list(labels) + + if not result: + raise ValidationError("At least one of 'config' or 'label' must be provided") + + return result + diff --git a/src/master/app/nodes_api.py b/src/master/app/nodes_api.py new file mode 100644 index 0000000..0a2f57f --- /dev/null +++ b/src/master/app/nodes_api.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import logging +from http import HTTPStatus +from typing import Any, Mapping + +from flask import Blueprint, jsonify, request + +from .models import ( + ValidationError, + validate_config_payload, + validate_registration_payload, + validate_status_payload, +) +from .scheduler import StatusScheduler +from .storage import Storage +from .util import to_iso, utcnow + + +def create_nodes_blueprint(storage: Storage, scheduler: StatusScheduler) -> Blueprint: + bp = Blueprint("nodes", __name__) + logger = logging.getLogger("argus.master.api") + + def _json_error(message: str, status: HTTPStatus, code: str) -> Any: + response = jsonify({"error": message, "code": code}) + response.status_code = status + return response + + @bp.errorhandler(ValidationError) + def _handle_validation_error(err: ValidationError): + return _json_error(str(err), HTTPStatus.BAD_REQUEST, "invalid_request") + + @bp.get("/nodes") + def list_nodes(): + nodes = storage.list_nodes() + return jsonify(nodes) + + @bp.get("/nodes/") + def get_node(node_id: str): + node = storage.get_node(node_id) + if node is None: + return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found") + return jsonify(node) + + @bp.post("/nodes") + def register_node(): + payload = _get_json() + data = validate_registration_payload(payload) + now = utcnow() + now_iso = to_iso(now) + node_id = data["id"] + name = data["name"] + node_type = data["type"] + version = data["version"] + meta = data["meta_data"] + + if node_id: + # 携带 id 说明是重注册,需要校验名称一致性 + existing_row = storage.get_node_raw(node_id) + if existing_row is None: + return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found") + if existing_row["name"] != name: + return _json_error( + "Node id and name mismatch during re-registration", + HTTPStatus.INTERNAL_SERVER_ERROR, + "id_name_mismatch", + ) + updated = storage.update_node_meta( + node_id, + node_type=node_type, + version=version, + meta_data=meta, + last_updated_iso=now_iso, + ) + scheduler.trigger_nodes_json_refresh() + return jsonify(updated), HTTPStatus.OK + + # No id provided → search by name + existing_by_name = storage.get_node_by_name(name) + if existing_by_name: + # 同名节点已存在,视为无 id 重注册 + updated = storage.update_node_meta( + existing_by_name["id"], + node_type=node_type, + version=version, + meta_data=meta, + last_updated_iso=now_iso, + ) + scheduler.trigger_nodes_json_refresh() + return jsonify(updated), HTTPStatus.OK + + new_id = storage.allocate_node_id() + created = storage.create_node( + new_id, + name, + node_type, + version, + meta, + status="initialized", + register_time_iso=now_iso, + last_updated_iso=now_iso, + ) + scheduler.trigger_nodes_json_refresh() + return jsonify(created), HTTPStatus.CREATED + + @bp.put("/nodes//config") + def update_node_config(node_id: str): + payload = _get_json() + updates = validate_config_payload(payload) + try: + updated = storage.update_config_and_labels( + node_id, + config=updates.get("config"), + labels=updates.get("label"), + ) + except KeyError: + return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found") + + if "label" in updates: + scheduler.trigger_nodes_json_refresh() + return jsonify(updated) + + @bp.get("/nodes/statistics") + def node_statistics(): + stats = storage.get_statistics() + return jsonify(stats) + + @bp.put("/nodes//status") + def update_status(node_id: str): + payload = _get_json() + data = validate_status_payload(payload) + try: + # master 负责写入 last_report,状态由调度器计算 + updated = storage.update_last_report( + node_id, + server_timestamp_iso=to_iso(utcnow()), + agent_timestamp_iso=data["timestamp"], + health=data["health"], + ) + except KeyError: + return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found") + + scheduler.trigger_nodes_json_refresh() + return jsonify(updated) + + return bp + + +def _get_json() -> Mapping[str, Any]: + data = request.get_json(silent=True) + if data is None: + raise ValidationError("Request body must be valid JSON") + if not isinstance(data, Mapping): + raise ValidationError("Request body must be a JSON object") + return data diff --git a/src/master/app/routes.py b/src/master/app/routes.py new file mode 100644 index 0000000..10bbba6 --- /dev/null +++ b/src/master/app/routes.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from flask import Flask, jsonify + +from .config import AppConfig +from .nodes_api import create_nodes_blueprint +from .scheduler import StatusScheduler +from .storage import Storage + + +def register_routes(app: Flask, storage: Storage, scheduler: StatusScheduler, config: AppConfig) -> None: + app.register_blueprint(create_nodes_blueprint(storage, scheduler), url_prefix="/api/v1/master") + + @app.get("/healthz") + def healthz(): + return jsonify({"status": "ok"}) + + @app.get("/readyz") + def readyz(): + try: + storage.list_nodes() # simple readiness probe + except Exception as exc: # pragma: no cover - defensive + return jsonify({"status": "error", "error": str(exc)}), 500 + return jsonify({"status": "ok"}) diff --git a/src/master/app/scheduler.py b/src/master/app/scheduler.py new file mode 100644 index 0000000..8797b25 --- /dev/null +++ b/src/master/app/scheduler.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import logging +import threading +from typing import Optional + +from .config import AppConfig +from .storage import Storage +from .util import atomic_write_json, parse_iso, to_iso, utcnow + + +class StatusScheduler: + def __init__(self, storage: Storage, config: AppConfig, logger: Optional[logging.Logger] = None) -> None: + self._storage = storage + self._config = config + self._logger = logger or logging.getLogger("argus.master.scheduler") + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._run, name="status-scheduler", daemon=True) + self._nodes_json_lock = threading.Lock() + self._pending_nodes_json = threading.Event() + + def start(self) -> None: + """启动后台线程,定期刷新节点状态与 nodes.json。""" + if not self._thread.is_alive(): + self._logger.info("Starting scheduler thread") + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + self._pending_nodes_json.set() + self._thread.join(timeout=5) + + def trigger_nodes_json_refresh(self) -> None: + self._pending_nodes_json.set() + + def generate_nodes_json(self) -> None: + with self._nodes_json_lock: + online_nodes = self._storage.get_online_nodes() + atomic_write_json(self._config.metric_nodes_json_path, online_nodes) + self._logger.info("nodes.json updated", extra={"count": len(online_nodes)}) + + # ------------------------------------------------------------------ + # internal loop + # ------------------------------------------------------------------ + + def _run(self) -> None: + # 确保启动时 nodes.json 会立即生成 + self._pending_nodes_json.set() + while not self._stop_event.is_set(): + changed = self._reconcile_statuses() + if changed or self._pending_nodes_json.is_set(): + try: + self.generate_nodes_json() + finally: + self._pending_nodes_json.clear() + self._stop_event.wait(self._config.scheduler_interval_seconds) + + def _reconcile_statuses(self) -> bool: + """根据 last_report 与当前时间对比,决定是否切换状态。""" + any_status_changed = False + now = utcnow() + rows = self._storage.fetch_nodes_for_scheduler() + for row in rows: + node_id = row["id"] + last_report_iso = row["last_report"] + current_status = row["status"] + last_report_dt = parse_iso(last_report_iso) + if last_report_dt is None: + # No report yet; treat as initialized until report arrives + continue + delta_seconds = (now - last_report_dt).total_seconds() + new_status = current_status + if delta_seconds > self._config.offline_threshold_seconds: + new_status = "offline" + elif delta_seconds <= self._config.online_threshold_seconds: + new_status = "online" + # Between thresholds: keep current status (sticky) + if new_status != current_status: + any_status_changed = True + self._logger.info( + "Updating node status", + extra={ + "node_id": node_id, + "previous": current_status, + "new": new_status, + "delta_seconds": delta_seconds, + }, + ) + self._storage.update_status(node_id, new_status, last_updated_iso=to_iso(now)) + return any_status_changed diff --git a/src/master/app/storage.py b/src/master/app/storage.py new file mode 100644 index 0000000..3547066 --- /dev/null +++ b/src/master/app/storage.py @@ -0,0 +1,332 @@ +from __future__ import annotations + +import json +import sqlite3 +import threading +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple + +from .models import serialize_node_row, serialize_node_summary +from .util import ensure_parent, to_iso, utcnow + + +class Storage: + def __init__(self, db_path: str, node_id_prefix: str) -> None: + self._db_path = db_path + self._node_id_prefix = node_id_prefix + ensure_parent(db_path) + self._lock = threading.Lock() + self._conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + with self._lock: + self._conn.execute("PRAGMA foreign_keys = ON;") + self._ensure_schema() + + # ------------------------------------------------------------------ + # schema & helpers + # ------------------------------------------------------------------ + + def _ensure_schema(self) -> None: + """初始化表结构,确保服务启动时数据库结构就绪。""" + with self._lock: + self._conn.executescript( + """ + CREATE TABLE IF NOT EXISTS nodes ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + type TEXT NOT NULL, + version TEXT, + status TEXT NOT NULL, + config_json TEXT, + labels_json TEXT, + meta_json TEXT, + health_json TEXT, + register_time TEXT, + last_report TEXT, + agent_last_report TEXT, + last_updated TEXT + ); + + CREATE TABLE IF NOT EXISTS kv ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_nodes_status ON nodes(status); + CREATE INDEX IF NOT EXISTS idx_nodes_name ON nodes(name); + """ + ) + self._conn.commit() + + def close(self) -> None: + with self._lock: + self._conn.close() + + # ------------------------------------------------------------------ + # Node ID allocation + # ------------------------------------------------------------------ + + def allocate_node_id(self) -> str: + """在 kv 表里维护自增序列,为新节点生成形如 A1 的 ID。""" + with self._lock: + cur = self._conn.execute("SELECT value FROM kv WHERE key = ?", ("node_id_seq",)) + row = cur.fetchone() + if row is None: + next_id = 1 + self._conn.execute("INSERT INTO kv(key, value) VALUES(?, ?)", ("node_id_seq", str(next_id))) + else: + next_id = int(row["value"]) + 1 + self._conn.execute("UPDATE kv SET value = ? WHERE key = ?", (str(next_id), "node_id_seq")) + self._conn.commit() + return f"{self._node_id_prefix}{next_id}" + + # ------------------------------------------------------------------ + # Query helpers + # ------------------------------------------------------------------ + + def list_nodes(self) -> List[Dict[str, Any]]: + with self._lock: + cur = self._conn.execute( + "SELECT id, name, status, type, version FROM nodes ORDER BY id ASC" + ) + rows = cur.fetchall() + return [serialize_node_summary(row) for row in rows] + + def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: + with self._lock: + cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)) + row = cur.fetchone() + if row is None: + return None + return serialize_node_row(row) + + def get_node_raw(self, node_id: str) -> Optional[sqlite3.Row]: + with self._lock: + cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)) + row = cur.fetchone() + return row + + def get_node_by_name(self, name: str) -> Optional[Dict[str, Any]]: + with self._lock: + cur = self._conn.execute("SELECT * FROM nodes WHERE name = ?", (name,)) + row = cur.fetchone() + if row is None: + return None + return serialize_node_row(row) + + # ------------------------------------------------------------------ + # Mutation helpers + # ------------------------------------------------------------------ + + def create_node( + self, + node_id: str, + name: str, + node_type: str, + version: str | None, + meta_data: Mapping[str, Any], + status: str, + register_time_iso: str, + last_updated_iso: str, + ) -> Dict[str, Any]: + """插入节点初始记录,默认 config/label/health 为空。""" + now_iso = last_updated_iso + with self._lock: + self._conn.execute( + """ + INSERT INTO nodes ( + id, name, type, version, status, config_json, labels_json, meta_json, + health_json, register_time, last_report, agent_last_report, last_updated + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + node_id, + name, + node_type, + version, + status, + json.dumps({}), + json.dumps([]), + json.dumps(dict(meta_data)), + json.dumps({}), + register_time_iso, + None, + None, + now_iso, + ), + ) + self._conn.commit() + + created = self.get_node(node_id) + if created is None: + raise RuntimeError("Failed to read back created node") + return created + + def update_node_meta( + self, + node_id: str, + *, + name: Optional[str] = None, + node_type: Optional[str] = None, + version: Optional[str | None] = None, + meta_data: Optional[Mapping[str, Any]] = None, + last_updated_iso: Optional[str] = None, + ) -> Dict[str, Any]: + """重注册时更新节点静态信息,缺省字段保持不变。""" + updates: List[str] = [] + params: List[Any] = [] + if name is not None: + updates.append("name = ?") + params.append(name) + if node_type is not None: + updates.append("type = ?") + params.append(node_type) + if version is not None: + updates.append("version = ?") + params.append(version) + if meta_data is not None: + updates.append("meta_json = ?") + params.append(json.dumps(dict(meta_data))) + if last_updated_iso is not None: + updates.append("last_updated = ?") + params.append(last_updated_iso) + + if not updates: + result = self.get_node(node_id) + if result is None: + raise KeyError(node_id) + return result + + params.append(node_id) + with self._lock: + self._conn.execute( + f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?", + tuple(params), + ) + self._conn.commit() + updated = self.get_node(node_id) + if updated is None: + raise KeyError(node_id) + return updated + + def update_config_and_labels( + self, node_id: str, *, config: Optional[Mapping[str, Any]] = None, labels: Optional[Iterable[str]] = None + ) -> Dict[str, Any]: + """部分更新 config/label,并刷新 last_updated 时间戳。""" + updates: List[str] = [] + params: List[Any] = [] + if config is not None: + updates.append("config_json = ?") + params.append(json.dumps(dict(config))) + if labels is not None: + updates.append("labels_json = ?") + params.append(json.dumps(list(labels))) + updates.append("last_updated = ?") + params.append(to_iso(utcnow())) + params.append(node_id) + with self._lock: + self._conn.execute( + f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?", + tuple(params), + ) + if self._conn.total_changes == 0: + self._conn.rollback() + raise KeyError(node_id) + self._conn.commit() + updated = self.get_node(node_id) + if updated is None: + raise KeyError(node_id) + return updated + + def update_last_report( + self, + node_id: str, + *, + server_timestamp_iso: str, + agent_timestamp_iso: str, + health: Mapping[str, Any], + ) -> Dict[str, Any]: + """记录最新上报时间和健康信息,用于后续状态计算。""" + with self._lock: + self._conn.execute( + """ + UPDATE nodes + SET last_report = ?, + agent_last_report = ?, + health_json = ?, + last_updated = ? + WHERE id = ? + """, + ( + server_timestamp_iso, + agent_timestamp_iso, + json.dumps(health), + server_timestamp_iso, + node_id, + ), + ) + if self._conn.total_changes == 0: + self._conn.rollback() + raise KeyError(node_id) + self._conn.commit() + updated = self.get_node(node_id) + if updated is None: + raise KeyError(node_id) + return updated + + def update_status(self, node_id: str, status: str, *, last_updated_iso: str) -> None: + with self._lock: + self._conn.execute( + "UPDATE nodes SET status = ?, last_updated = ? WHERE id = ?", + (status, last_updated_iso, node_id), + ) + self._conn.commit() + + # ------------------------------------------------------------------ + # Reporting helpers + # ------------------------------------------------------------------ + + def get_statistics(self) -> Dict[str, Any]: + """统计节点总数及按状态聚合的数量。""" + with self._lock: + cur = self._conn.execute("SELECT COUNT(*) AS total FROM nodes") + total_row = cur.fetchone() + cur = self._conn.execute("SELECT status, COUNT(*) AS count FROM nodes GROUP BY status") + status_rows = cur.fetchall() + return { + "total": total_row["total"] if total_row else 0, + "status_statistics": [ + {"status": row["status"], "count": row["count"]} + for row in status_rows + ], + } + + def fetch_nodes_for_scheduler(self) -> List[sqlite3.Row]: + with self._lock: + cur = self._conn.execute( + "SELECT id, last_report, status FROM nodes" + ) + return cur.fetchall() + + def get_online_nodes(self) -> List[Dict[str, Any]]: + """返回在线节点列表,用于生成 nodes.json。""" + with self._lock: + cur = self._conn.execute( + "SELECT id, meta_json, labels_json, name FROM nodes WHERE status = ? ORDER BY id ASC", + ("online",), + ) + rows = cur.fetchall() + + result: List[Dict[str, Any]] = [] + for row in rows: + meta = json.loads(row["meta_json"]) if row["meta_json"] else {} + labels = json.loads(row["labels_json"]) if row["labels_json"] else [] + result.append( + { + "node_id": row["id"], + "user_id": meta.get("user"), + "ip": meta.get("ip"), + "hostname": meta.get("hostname", row["name"]), + "labels": labels if isinstance(labels, list) else [], + } + ) + return result diff --git a/src/master/app/util.py b/src/master/app/util.py new file mode 100644 index 0000000..903846c --- /dev/null +++ b/src/master/app/util.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import json +import os +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable + + +ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + + +def utcnow() -> datetime: + """获取当前 UTC 时间,统一时间基准。""" + return datetime.now(timezone.utc) + + +def to_iso(dt: datetime | None) -> str | None: + if dt is None: + return None + return dt.astimezone(timezone.utc).replace(microsecond=0).strftime(ISO_FORMAT) + + +def parse_iso(value: str | None) -> datetime | None: + if not value: + return None + try: + if value.endswith("Z"): + return datetime.strptime(value, ISO_FORMAT).replace(tzinfo=timezone.utc) + # Fallback for ISO strings with offset + return datetime.fromisoformat(value).astimezone(timezone.utc) + except ValueError: + return None + + +def ensure_parent(path: str) -> None: + """确保目标文件所在目录存在。""" + Path(path).parent.mkdir(parents=True, exist_ok=True) + + +def atomic_write_json(path: str, data: Iterable[Any] | Any) -> None: + """原子化写 JSON,避免被其它进程读到半成品。""" + ensure_parent(path) + directory = Path(path).parent + with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as tmp: + json.dump(data, tmp, separators=(",", ":")) + tmp.flush() + os.fsync(tmp.fileno()) + temp_path = tmp.name + os.replace(temp_path, path) diff --git a/src/master/images/.gitkeep b/src/master/images/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/src/master/requirements.txt b/src/master/requirements.txt new file mode 100644 index 0000000..7eb4708 --- /dev/null +++ b/src/master/requirements.txt @@ -0,0 +1,2 @@ +Flask==2.3.3 +gunicorn==21.2.0 diff --git a/src/master/scripts/build_images.sh b/src/master/scripts/build_images.sh new file mode 100755 index 0000000..501ac02 --- /dev/null +++ b/src/master/scripts/build_images.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--intranet] [--tag ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}" +BUILD_ARGS=() + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --intranet) + INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}" + BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}") + shift + ;; + --tag) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAG="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +echo "[INFO] Building image $IMAGE_TAG" +docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT" +echo "[OK] Image $IMAGE_TAG built" diff --git a/src/master/scripts/load_images.sh b/src/master/scripts/load_images.sh new file mode 100755 index 0000000..fb1e126 --- /dev/null +++ b/src/master/scripts/load_images.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--file ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +DEFAULT_INPUT="$PROJECT_ROOT/images/argus-master-dev.tar" +IMAGE_TAR="$DEFAULT_INPUT" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --file) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAR="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +if [[ ! -f "$IMAGE_TAR" ]]; then + echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2 + exit 1 +fi + +echo "[INFO] Loading image from $IMAGE_TAR" +docker image load -i "$IMAGE_TAR" +echo "[OK] Image loaded" diff --git a/src/master/scripts/save_images.sh b/src/master/scripts/save_images.sh new file mode 100755 index 0000000..8261ca8 --- /dev/null +++ b/src/master/scripts/save_images.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--tag ] [--output ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-master-dev.tar" +IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}" +OUTPUT_PATH="$DEFAULT_OUTPUT" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --tag) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAG="$2" + shift 2 + ;; + --output) + [[ $# -ge 2 ]] || { usage; exit 1; } + OUTPUT_PATH="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +mkdir -p "$(dirname "$OUTPUT_PATH")" +echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH" +docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH" +echo "[OK] Image saved" diff --git a/src/master/tests/.gitignore b/src/master/tests/.gitignore new file mode 100644 index 0000000..285ed60 --- /dev/null +++ b/src/master/tests/.gitignore @@ -0,0 +1,2 @@ +private/ +tmp/ diff --git a/src/master/tests/docker-compose.yml b/src/master/tests/docker-compose.yml new file mode 100644 index 0000000..ffb9b4b --- /dev/null +++ b/src/master/tests/docker-compose.yml @@ -0,0 +1,18 @@ +services: + master: + image: argus-master:dev + container_name: argus-master-e2e + environment: + - OFFLINE_THRESHOLD_SECONDS=6 + - ONLINE_THRESHOLD_SECONDS=2 + - SCHEDULER_INTERVAL_SECONDS=1 + ports: + - "31300:3000" + volumes: + - ./private/argus/master:/private/argus/master + - ./private/argus/metric/prometheus:/private/argus/metric/prometheus + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/src/master/tests/scripts/00_e2e_test.sh b/src/master/tests/scripts/00_e2e_test.sh new file mode 100755 index 0000000..9d142db --- /dev/null +++ b/src/master/tests/scripts/00_e2e_test.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SCRIPTS=( + "01_up_master.sh" + "02_verify_ready_and_nodes_json.sh" + "03_register_via_curl.sh" + "04_reregister_and_error_cases.sh" + "05_status_report_via_curl.sh" + "06_config_update_and_nodes_json.sh" + "07_stats_single_node.sh" + "08_multi_node_stats.sh" + "09_restart_persistence.sh" + "10_down.sh" +) + +for script in "${SCRIPTS[@]}"; do + echo "[TEST] Running $script" + "$SCRIPT_DIR/$script" + echo "[TEST] $script completed" + echo +done + +echo "[TEST] Master module E2E tests completed" diff --git a/src/master/tests/scripts/01_up_master.sh b/src/master/tests/scripts/01_up_master.sh new file mode 100755 index 0000000..8b2d385 --- /dev/null +++ b/src/master/tests/scripts/01_up_master.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +MODULE_ROOT="$(cd "$TEST_ROOT/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" + +mkdir -p "$PRIVATE_ROOT/argus/master" +mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus" +mkdir -p "$TMP_ROOT" + +# 确保上一次运行留下的容器/数据被清理 +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +pushd "$TEST_ROOT" >/dev/null +compose down --remove-orphans || true +popd >/dev/null + +rm -rf "$TMP_ROOT" "$PRIVATE_ROOT" +mkdir -p "$PRIVATE_ROOT/argus/master" +mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus" +mkdir -p "$TMP_ROOT" + +pushd "$MODULE_ROOT" >/dev/null +./scripts/build_images.sh --tag argus-master:dev +popd >/dev/null + +pushd "$TEST_ROOT" >/dev/null +compose down --remove-orphans || true +compose up -d +popd >/dev/null + +echo "[INFO] Master container is up on http://localhost:31300" diff --git a/src/master/tests/scripts/02_verify_ready_and_nodes_json.sh b/src/master/tests/scripts/02_verify_ready_and_nodes_json.sh new file mode 100755 index 0000000..1fa59df --- /dev/null +++ b/src/master/tests/scripts/02_verify_ready_and_nodes_json.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +API_BASE="http://localhost:31300" +NODES_JSON_PATH="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" + +# 等待 readyz 返回 200,确保数据库初始化完成 +for _ in {1..30}; do + status=$(curl -s -o /dev/null -w '%{http_code}' "$API_BASE/readyz" || true) + if [[ "$status" == "200" ]]; then + break + fi + sleep 1 + done + +if [[ "${status:-}" != "200" ]]; then + echo "[ERROR] /readyz 未在预期时间内返回 200,实际=$status" >&2 + exit 1 +fi + +echo "[INFO] /readyz 已通过,就绪检查成功" + +# scheduler 启动时会产生空的 nodes.json,这里等待文件出现并校验内容 +for _ in {1..30}; do + if [[ -f "$NODES_JSON_PATH" ]]; then + break + fi + sleep 1 + done + +if [[ ! -f "$NODES_JSON_PATH" ]]; then + echo "[ERROR] 未在预期时间内生成 $NODES_JSON_PATH" >&2 + exit 1 +fi + +if ! python3 - "$NODES_JSON_PATH" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + data = json.load(handle) +if data != []: + raise SystemExit(f"nodes.json initial content should be [], got {data}") +PY +then + echo "[ERROR] nodes.json 初始内容不是空数组" >&2 + exit 1 +fi + +echo "[INFO] nodes.json 初始状态校验通过" diff --git a/src/master/tests/scripts/03_register_via_curl.sh b/src/master/tests/scripts/03_register_via_curl.sh new file mode 100755 index 0000000..8bf5547 --- /dev/null +++ b/src/master/tests/scripts/03_register_via_curl.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" + +mkdir -p "$TMP_ROOT" + +for _ in {1..30}; do + if curl -sf "$API_BASE/healthz" >/dev/null; then + break + fi + sleep 1 +done + +payload=$(cat <<'JSON' +{ + "name": "dev-testuser-testinst-pod-0", + "type": "agent", + "meta_data": { + "hostname": "dev-testuser-testinst-pod-0", + "ip": "10.0.0.10", + "env": "dev", + "user": "testuser", + "instance": "testinst", + "cpu_number": 4, + "memory_in_bytes": 2147483648, + "gpu_number": 0 + }, + "version": "1.1.0" +} +JSON +) + +body_file="$TMP_ROOT/register_body.json" +status=$(curl -sS -o "$body_file" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$payload") +body="$(cat "$body_file")" + +if [[ "$status" != "201" ]]; then + echo "[ERROR] Unexpected status code: $status" >&2 + echo "$body" >&2 + exit 1 +fi + +node_id=$(python3 - "$body_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + body = json.load(handle) +print(body["id"]) +PY +) + +echo "$body" > "$TMP_ROOT/last_response.json" +echo "$node_id" > "$TMP_ROOT/node_id" + +list_file="$TMP_ROOT/nodes_list.json" +curl -sS "$API_BASE/nodes" -o "$list_file" +python3 - "$list_file" "$node_id" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + data = json.load(handle) +node_id = sys.argv[2] +assert any(item.get("id") == node_id for item in data), "node not in list" +PY + +echo "[INFO] Registered node with id $node_id" diff --git a/src/master/tests/scripts/04_reregister_and_error_cases.sh b/src/master/tests/scripts/04_reregister_and_error_cases.sh new file mode 100755 index 0000000..58795a7 --- /dev/null +++ b/src/master/tests/scripts/04_reregister_and_error_cases.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" +NODE_ID="$(cat "$TMP_ROOT/node_id")" + +# 使用相同 ID 重注册,同时修改部分 meta/version 字段 +payload=$(cat <&2 + cat "$TMP_ROOT/reregister_response.json" >&2 + exit 1 +fi + +python3 - "$TMP_ROOT/reregister_response.json" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["meta_data"]["ip"] == "10.0.0.11", node["meta_data"] +assert node["meta_data"]["cpu_number"] == 8, node["meta_data"] +assert node["version"] == "1.2.0", node +PY + +echo "[INFO] 重注册成功,元数据已更新" + +# 未知 ID => 404 +unknown_payload=$(cat <<'JSON' +{ + "id": "A999", + "name": "dev-testuser-testinst-pod-0", + "type": "agent", + "meta_data": { + "hostname": "dev-testuser-testinst-pod-0", + "ip": "10.0.0.12", + "env": "dev", + "user": "testuser", + "instance": "testinst", + "cpu_number": 4, + "memory_in_bytes": 2147483648, + "gpu_number": 0 + }, + "version": "1.2.0" +} +JSON +) + +status=$(curl -sS -o "$TMP_ROOT/unknown_id_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$unknown_payload") +if [[ "$status" != "404" ]]; then + echo "[ERROR] 未知 ID 应返回 404,实际=$status" >&2 + cat "$TMP_ROOT/unknown_id_response.json" >&2 + exit 1 +fi + +echo "[INFO] 未知 ID 返回 404 验证通过" + +# id 与 name 不匹配 => 500,节点保持原名 +mismatch_payload=$(cat <&2 + cat "$TMP_ROOT/mismatch_response.json" >&2 + exit 1 +fi + +# 验证名称仍保持正确 +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/post_mismatch_detail.json" +python3 - "$TMP_ROOT/post_mismatch_detail.json" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["name"] == "dev-testuser-testinst-pod-0", node["name"] +PY + +echo "[INFO] 名称不匹配返回 500,且原始节点未被篡改" diff --git a/src/master/tests/scripts/05_status_report_via_curl.sh b/src/master/tests/scripts/05_status_report_via_curl.sh new file mode 100755 index 0000000..567cf69 --- /dev/null +++ b/src/master/tests/scripts/05_status_report_via_curl.sh @@ -0,0 +1,98 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" + +node_id="$(cat "$TMP_ROOT/node_id")" + +payload=$(python3 - <<'PY' +import json +from datetime import datetime, timezone +body = { + "timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "health": { + "log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}, + "metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"} + } +} +print(json.dumps(body)) +PY +) + +response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload") +body="$(echo "$response" | head -n -1)" +status="$(echo "$response" | tail -n1)" + +if [[ "$status" != "200" ]]; then + echo "[ERROR] Status update failed with code $status" >&2 + echo "$body" >&2 + exit 1 +fi + +echo "$body" > "$TMP_ROOT/last_response.json" + +sleep 3 + +detail_file="$TMP_ROOT/status_detail.json" +curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file" +python3 - "$detail_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["status"] == "online", f"Expected online, got {node['status']}" +assert "log-fluentbit" in node["health"], node["health"].keys() +PY + +echo "[INFO] Status report successful and node is online" + +# 等待超过 offline 阈值,验证会自动转为 offline +sleep 7 + +offline_detail_file="$TMP_ROOT/status_offline.json" +curl -sS "$API_BASE/nodes/$node_id" -o "$offline_detail_file" +python3 - "$offline_detail_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["status"] == "offline", f"Expected offline, got {node['status']}" +PY + +echo "[INFO] Node transitioned to offline as expected" + +# 再次上报健康,触发状态回到 online +payload=$(python3 - <<'PY' +import json +from datetime import datetime, timezone +body = { + "timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "health": { + "log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}, + "metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"} + } +} +print(json.dumps(body)) +PY +) + +curl -sS -o "$TMP_ROOT/second_status_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload" > "$TMP_ROOT/second_status_code" +if [[ $(cat "$TMP_ROOT/second_status_code") != "200" ]]; then + echo "[ERROR] Second status update failed" >&2 + cat "$TMP_ROOT/second_status_response.json" >&2 + exit 1 +fi + +sleep 3 + +final_detail_file="$TMP_ROOT/status_back_online.json" +curl -sS "$API_BASE/nodes/$node_id" -o "$final_detail_file" +python3 - "$final_detail_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["status"] == "online", f"Expected online after second report, got {node['status']}" +PY + +echo "[INFO] Node transitioned back to online after new status report" diff --git a/src/master/tests/scripts/06_config_update_and_nodes_json.sh b/src/master/tests/scripts/06_config_update_and_nodes_json.sh new file mode 100755 index 0000000..ed08750 --- /dev/null +++ b/src/master/tests/scripts/06_config_update_and_nodes_json.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +PRIVATE_ROOT="$TEST_ROOT/private" +API_BASE="http://localhost:31300/api/v1/master" +NODE_ID="$(cat "$TMP_ROOT/node_id")" + +payload='{"config":{"log_level":"debug"},"label":["gpu","exp001"]}' + +response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/config" -d "$payload") +body="$(echo "$response" | head -n -1)" +status="$(echo "$response" | tail -n1)" + +if [[ "$status" != "200" ]]; then + echo "[ERROR] Config update failed: $status" >&2 + echo "$body" >&2 + exit 1 +fi + +sleep 2 + +nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" +if [[ ! -f "$nodes_json_path" ]]; then + echo "[ERROR] nodes.json not generated at $nodes_json_path" >&2 + exit 1 +fi + +# 确保节点处于 online 状态,避免因等待导致 nodes.json 为空 +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/config_detail.json" +if ! python3 - "$TMP_ROOT/config_detail.json" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +if node["status"] != "online": + raise SystemExit(1) +PY +then + payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}' + curl -sS -o "$TMP_ROOT/config_second_report.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/status" -d "$payload" > "$TMP_ROOT/config_second_code" + sleep 2 +fi + +python3 - "$nodes_json_path" <<'PY' +import json, sys +from pathlib import Path +path = Path(sys.argv[1]) +content = json.loads(path.read_text()) +assert isinstance(content, list) and len(content) == 1 +entry = content[0] +assert entry["labels"] == ["gpu", "exp001"], entry +PY + +echo "[INFO] Config updated and nodes.json verified" diff --git a/src/master/tests/scripts/07_stats_single_node.sh b/src/master/tests/scripts/07_stats_single_node.sh new file mode 100755 index 0000000..e2dfa9b --- /dev/null +++ b/src/master/tests/scripts/07_stats_single_node.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" +NODE_ID="$(cat "$TMP_ROOT/node_id")" + +sleep 7 + +detail_file="$TMP_ROOT/offline_detail.json" +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file" +python3 - "$detail_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert node["status"] == "offline", f"Expected offline, got {node['status']}" +PY + +stats_file="$TMP_ROOT/stats.json" +curl -sS "$API_BASE/nodes/statistics" -o "$stats_file" +python3 - "$stats_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + stats = json.load(handle) +assert stats["total"] == 1 +found = {item["status"]: item["count"] for item in stats["status_statistics"]} +assert found.get("offline") == 1 +PY + +nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" +python3 - "$nodes_json_path" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + content = json.load(handle) +assert content == [], content +PY + +echo "[INFO] Offline transition and statistics validated" diff --git a/src/master/tests/scripts/08_multi_node_stats.sh b/src/master/tests/scripts/08_multi_node_stats.sh new file mode 100755 index 0000000..e835857 --- /dev/null +++ b/src/master/tests/scripts/08_multi_node_stats.sh @@ -0,0 +1,106 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" + +# 注册第二个节点 A2(保持在线) +second_payload=$(cat <<'JSON' +{ + "name": "dev-testuser-testinst-pod-1", + "type": "agent", + "meta_data": { + "hostname": "dev-testuser-testinst-pod-1", + "ip": "10.0.0.11", + "env": "dev", + "user": "testuser", + "instance": "testinst", + "cpu_number": 8, + "memory_in_bytes": 2147483648, + "gpu_number": 0 + }, + "version": "1.1.0" +} +JSON +) + +status=$(curl -sS -o "$TMP_ROOT/second_register.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$second_payload") +if [[ "$status" != "201" ]]; then + echo "[ERROR] Second node registration failed: $status" >&2 + cat "$TMP_ROOT/second_register.json" >&2 + exit 1 +fi +SECOND_NODE_ID=$(python3 - "$TMP_ROOT/second_register.json" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + data = json.load(handle) +print(data["id"]) +PY +) + +echo "$SECOND_NODE_ID" > "$TMP_ROOT/second_node_id" + +echo "[INFO] Second node registered with id $SECOND_NODE_ID" + +# A2 上报健康信息,保持 online +status_payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}' +status=$(curl -sS -o "$TMP_ROOT/second_status.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$SECOND_NODE_ID/status" -d "$status_payload") +if [[ "$status" != "200" ]]; then + echo "[ERROR] Second node status update failed: $status" >&2 + cat "$TMP_ROOT/second_status.json" >&2 + exit 1 +fi + +# 等待调度器把第二节点标记为 online +second_online=false +for _ in {1..10}; do + sleep 1 + curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$TMP_ROOT/second_detail.json" || continue + if python3 - "$TMP_ROOT/second_detail.json" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +if node["status"] != "online": + raise SystemExit(1) +PY + then + second_online=true + break + fi +done + +if [[ "$second_online" != true ]]; then + echo "[ERROR] Second node did not become online" >&2 + exit 1 +fi + +# 再次获取统计信息 +stats_file="$TMP_ROOT/multi_stats.json" +curl -sS "$API_BASE/nodes/statistics" -o "$stats_file" +python3 - "$stats_file" "$TMP_ROOT/node_id" "$TMP_ROOT/second_node_id" <<'PY' +import json, sys, pathlib +with open(sys.argv[1]) as handle: + stats = json.load(handle) +first_id = pathlib.Path(sys.argv[2]).read_text().strip() +second_id = pathlib.Path(sys.argv[3]).read_text().strip() +assert stats["total"] == 2, stats +found = {item["status"]: item["count"] for item in stats["status_statistics"]} +assert found.get("offline") == 1, found +assert found.get("online") == 1, found +PY + +# 验证 nodes.json 只包含在线节点(应只有第二个 A2) +nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" +python3 - "$nodes_json_path" "$SECOND_NODE_ID" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + content = json.load(handle) +expected_id = sys.argv[2] +assert len(content) == 1, content +assert content[0]["node_id"] == expected_id, content +PY + +echo "[INFO] Multi-node statistics and nodes.json validated" diff --git a/src/master/tests/scripts/09_restart_persistence.sh b/src/master/tests/scripts/09_restart_persistence.sh new file mode 100755 index 0000000..6f142bd --- /dev/null +++ b/src/master/tests/scripts/09_restart_persistence.sh @@ -0,0 +1,161 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:31300/api/v1/master" +ROOT_BASE="http://localhost:31300" +DB_PATH="$PRIVATE_ROOT/argus/master/db.sqlite3" + +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +if [[ ! -f "$TMP_ROOT/node_id" ]]; then + echo "[ERROR] 主节点 ID 缺失,请先执行前置用例" >&2 + exit 1 +fi + +if [[ ! -f "$TMP_ROOT/second_node_id" ]]; then + echo "[ERROR] 第二个节点 ID 缺失,请先执行多节点场景脚本" >&2 + exit 1 +fi + +if [[ ! -f "$DB_PATH" ]]; then + echo "[ERROR] 持久化数据库缺失: $DB_PATH" >&2 + exit 1 +fi + +NODE_ID="$(cat "$TMP_ROOT/node_id")" +SECOND_NODE_ID="$(cat "$TMP_ROOT/second_node_id")" + +# 在重启前抓取节点详情与节点文件、统计信息,作为对比基线 +first_before="$TMP_ROOT/${NODE_ID}_pre_restart.json" +second_before="$TMP_ROOT/${SECOND_NODE_ID}_pre_restart.json" +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_before" +curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_before" + +nodes_json_before="$TMP_ROOT/nodes_json_pre_restart.json" +cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_before" + +stats_before="$TMP_ROOT/stats_pre_restart.json" +curl -sS "$API_BASE/nodes/statistics" -o "$stats_before" + +# 重启 master 容器,模拟服务重启后的持久化场景 +pushd "$TEST_ROOT" >/dev/null +compose restart master +popd >/dev/null + +# 等待 /readyz 恢复 200 +for _ in {1..30}; do + status=$(curl -s -o /dev/null -w '%{http_code}' "$ROOT_BASE/readyz" || true) + if [[ "$status" == "200" ]]; then + break + fi + sleep 1 +done + +if [[ "${status:-}" != "200" ]]; then + echo "[ERROR] master 容器重启后未恢复健康状态,readyz=$status" >&2 + exit 1 +fi + +sleep 2 + +first_after="$TMP_ROOT/${NODE_ID}_post_restart.json" +second_after="$TMP_ROOT/${SECOND_NODE_ID}_post_restart.json" +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_after" +curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_after" + +# 对比重启前后的节点关键信息,确保无丢失 +python3 - "$first_before" "$first_after" <<'PY' +import json, sys +before_path, after_path = sys.argv[1:3] +with open(before_path, 'r', encoding='utf-8') as handle: + before = json.load(handle) +with open(after_path, 'r', encoding='utf-8') as handle: + after = json.load(handle) +keys = [ + "id", + "name", + "type", + "version", + "register_time", + "meta_data", + "config", + "label", + "health", + "last_report", + "agent_last_report", + "status", +] +for key in keys: + if before.get(key) != after.get(key): + raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}") +PY + +python3 - "$second_before" "$second_after" <<'PY' +import json, sys +before_path, after_path = sys.argv[1:3] +with open(before_path, 'r', encoding='utf-8') as handle: + before = json.load(handle) +with open(after_path, 'r', encoding='utf-8') as handle: + after = json.load(handle) +keys = [ + "id", + "name", + "type", + "version", + "register_time", + "meta_data", + "config", + "label", + "health", + "last_report", + "agent_last_report", + "status", +] +for key in keys: + if before.get(key) != after.get(key): + raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}") +PY + +# 对比重启前后的 nodes.json 与统计信息,验证持久化一致性 +nodes_json_after="$TMP_ROOT/nodes_json_post_restart.json" +cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_after" + +stats_after="$TMP_ROOT/stats_after_restart.json" +curl -sS "$API_BASE/nodes/statistics" -o "$stats_after" + +python3 - "$nodes_json_before" "$nodes_json_after" <<'PY' +import json, sys +with open(sys.argv[1], 'r', encoding='utf-8') as handle: + before = json.load(handle) +with open(sys.argv[2], 'r', encoding='utf-8') as handle: + after = json.load(handle) +if before != after: + raise AssertionError(f"nodes.json changed after restart: {before} -> {after}") +PY + +python3 - "$stats_before" "$stats_after" <<'PY' +import json, sys +with open(sys.argv[1], 'r', encoding='utf-8') as handle: + before = json.load(handle) +with open(sys.argv[2], 'r', encoding='utf-8') as handle: + after = json.load(handle) +if before != after: + raise AssertionError(f"Statistics changed after restart: {before} -> {after}") +PY + +if [[ ! -s "$DB_PATH" ]]; then + echo "[ERROR] 数据库文件为空,疑似未持久化" >&2 + exit 1 +fi + +echo "[INFO] Master 重启后持久化数据校验通过" diff --git a/src/master/tests/scripts/10_down.sh b/src/master/tests/scripts/10_down.sh new file mode 100755 index 0000000..7afce88 --- /dev/null +++ b/src/master/tests/scripts/10_down.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" + +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +pushd "$TEST_ROOT" >/dev/null +compose down --remove-orphans +popd >/dev/null + +rm -rf "$TMP_ROOT" +rm -rf "$PRIVATE_ROOT" + +echo "[INFO] Master E2E environment cleaned up"