[#2] master模块开发测试

This commit is contained in:
yuyr 2025-09-24 03:48:05 +00:00
parent 5acc1fe92b
commit d9461b3991
29 changed files with 1907 additions and 0 deletions

2
src/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
__pycache__/

25
src/master/Dockerfile Normal file
View File

@ -0,0 +1,25 @@
FROM python:3.11-slim
SHELL ["/bin/bash", "-c"]
ARG PIP_INDEX_URL=
ENV PIP_NO_CACHE_DIR=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
COPY requirements.txt ./
RUN set -euxo pipefail \
&& python -m pip install --upgrade pip \
&& if [[ -n "$PIP_INDEX_URL" ]]; then \
PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \
else \
python -m pip install -r requirements.txt; \
fi
COPY app ./app
EXPOSE 3000
CMD ["gunicorn", "--bind", "0.0.0.0:3000", "app:create_app()"]

View File

@ -0,0 +1,48 @@
# Argus Master Module
A lightweight Flask + SQLite service that manages Argus agent nodes. It exposes node registration, status updates, configuration management, statistics, and generates `nodes.json` for the metric module.
## Build & Run
```bash
cd src/master
./scripts/build_images.sh # builds argus-master:dev
```
Local run via Docker compose (test stack):
```bash
cd src/master/tests
./scripts/01_up_master.sh
```
Service listens on port `3000` (`31300` when using the provided test compose). Key environment variables:
- `DB_PATH` (default `/private/argus/master/db.sqlite3`)
- `METRIC_NODES_JSON_PATH` (default `/private/argus/metric/prometheus/nodes.json`)
- `OFFLINE_THRESHOLD_SECONDS`, `ONLINE_THRESHOLD_SECONDS`, `SCHEDULER_INTERVAL_SECONDS`
## REST API Summary
Base path: `/api/v1/master`
- `GET /nodes` — list node summaries
- `GET /nodes/{id}` — node detail
- `POST /nodes` — register/re-register
- `PUT /nodes/{id}/status` — status report (timestamp + health map)
- `PUT /nodes/{id}/config` — update config/labels (partial)
- `GET /nodes/statistics` — totals + grouped counts
- `GET /healthz`, `GET /readyz` — health checks
`nodes.json` only contains nodes whose status is `online`.
## Tests
End-to-end tests are Docker based:
```bash
cd src/master/tests
./scripts/00_e2e_test.sh
```
Scripts create temporary `private/` and `tmp/` directories under `tests/`; they are cleaned automatically by `06_down.sh`.

View File

@ -0,0 +1,41 @@
from __future__ import annotations
import atexit
import logging
from flask import Flask
from .config import AppConfig, load_config
from .routes import register_routes
from .scheduler import StatusScheduler
from .storage import Storage
def create_app(config: AppConfig | None = None) -> Flask:
app_config = config or load_config()
storage = Storage(app_config.db_path, app_config.node_id_prefix)
scheduler = StatusScheduler(storage, app_config)
app = Flask(__name__)
app.config["APP_CONFIG"] = app_config
app.config["STORAGE"] = storage
app.config["SCHEDULER"] = scheduler
register_routes(app, storage, scheduler, app_config)
scheduler.start()
def _cleanup() -> None:
logging.getLogger("argus.master").info("Shutting down master app")
try:
scheduler.stop()
except Exception: # pragma: no cover - defensive
logging.getLogger("argus.master").exception("Failed to stop scheduler")
try:
storage.close()
except Exception: # pragma: no cover - defensive
logging.getLogger("argus.master").exception("Failed to close storage")
atexit.register(_cleanup)
return app

40
src/master/app/config.py Normal file
View File

@ -0,0 +1,40 @@
from __future__ import annotations
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class AppConfig:
db_path: str
metric_nodes_json_path: str
offline_threshold_seconds: int
online_threshold_seconds: int
scheduler_interval_seconds: int
node_id_prefix: str
auth_mode: str
def _get_int_env(name: str, default: int) -> int:
raw = os.environ.get(name)
if raw is None or raw.strip() == "":
return default
try:
return int(raw)
except ValueError as exc:
raise ValueError(f"Environment variable {name} must be an integer, got {raw!r}") from exc
def load_config() -> AppConfig:
"""读取环境变量生成配置对象,方便统一管理运行参数。"""
return AppConfig(
db_path=os.environ.get("DB_PATH", "/private/argus/master/db.sqlite3"),
metric_nodes_json_path=os.environ.get(
"METRIC_NODES_JSON_PATH", "/private/argus/metric/prometheus/nodes.json"
),
offline_threshold_seconds=_get_int_env("OFFLINE_THRESHOLD_SECONDS", 180),
online_threshold_seconds=_get_int_env("ONLINE_THRESHOLD_SECONDS", 120),
scheduler_interval_seconds=_get_int_env("SCHEDULER_INTERVAL_SECONDS", 30),
node_id_prefix=os.environ.get("NODE_ID_PREFIX", "A"),
auth_mode=os.environ.get("AUTH_MODE", "disabled"),
)

171
src/master/app/models.py Normal file
View File

@ -0,0 +1,171 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Mapping
from .util import parse_iso
class ValidationError(Exception):
"""Raised when user payload fails validation."""
@dataclass
class Node:
id: str
name: str
type: str
version: str | None
status: str
config: Dict[str, Any]
labels: Iterable[str]
meta_data: Dict[str, Any]
health: Dict[str, Any]
register_time: str | None
last_report: str | None
agent_last_report: str | None
last_updated: str | None
def serialize_node_row(row: Mapping[str, Any]) -> Dict[str, Any]:
def _json_or_default(value: str | None, default: Any) -> Any:
if value is None or value == "":
return default
try:
return json.loads(value)
except json.JSONDecodeError:
return default
config = _json_or_default(row["config_json"], {})
labels = _json_or_default(row["labels_json"], [])
meta = _json_or_default(row["meta_json"], {})
health = _json_or_default(row["health_json"], {})
return {
"id": row["id"],
"name": row["name"],
"type": row["type"],
"version": row["version"],
"status": row["status"],
"config": config if isinstance(config, dict) else {},
"label": list(labels) if isinstance(labels, list) else [],
"meta_data": meta if isinstance(meta, dict) else {},
"health": health if isinstance(health, dict) else {},
"register_time": row["register_time"],
"last_report": row["last_report"],
"agent_last_report": row["agent_last_report"],
"last_updated": row["last_updated"],
}
def serialize_node_summary(row: Mapping[str, Any]) -> Dict[str, Any]:
return {
"id": row["id"],
"name": row["name"],
"status": row["status"],
"type": row["type"],
"version": row["version"],
}
def validate_registration_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
name = payload.get("name")
if not isinstance(name, str) or not name.strip():
raise ValidationError("Field 'name' is required and must be a non-empty string")
node_type = payload.get("type", "agent")
if not isinstance(node_type, str) or not node_type:
raise ValidationError("Field 'type' must be a string")
version = payload.get("version")
if version is not None and not isinstance(version, str):
raise ValidationError("Field 'version' must be a string if provided")
meta = payload.get("meta_data")
if not isinstance(meta, Mapping):
raise ValidationError("Field 'meta_data' must be an object")
required_meta = ["hostname", "ip", "env", "user", "instance", "cpu_number", "memory_in_bytes", "gpu_number"]
for key in required_meta:
if key not in meta:
raise ValidationError(f"meta_data.{key} is required")
cpu_number = meta["cpu_number"]
memory_in_bytes = meta["memory_in_bytes"]
gpu_number = meta["gpu_number"]
if not isinstance(cpu_number, int) or cpu_number < 0:
raise ValidationError("meta_data.cpu_number must be a non-negative integer")
if not isinstance(memory_in_bytes, int) or memory_in_bytes < 0:
raise ValidationError("meta_data.memory_in_bytes must be a non-negative integer")
if not isinstance(gpu_number, int) or gpu_number < 0:
raise ValidationError("meta_data.gpu_number must be a non-negative integer")
node_id = payload.get("id")
if node_id is not None and (not isinstance(node_id, str) or not node_id.strip()):
raise ValidationError("Field 'id' must be a non-empty string when provided")
return {
"id": node_id,
"name": name,
"type": node_type,
"version": version,
"meta_data": dict(meta),
}
def validate_status_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
timestamp = payload.get("timestamp")
if not isinstance(timestamp, str) or not timestamp:
raise ValidationError("Field 'timestamp' is required and must be a string")
parsed = parse_iso(timestamp)
if parsed is None:
raise ValidationError("Field 'timestamp' must be an ISO8601 datetime string")
health = payload.get("health", {})
if not isinstance(health, Mapping):
raise ValidationError("Field 'health' must be an object if provided")
sanitized_health: Dict[str, Any] = {}
for key, value in health.items():
if not isinstance(key, str):
raise ValidationError("Keys in 'health' must be strings")
if not isinstance(value, (Mapping, list, str, int, float, bool)) and value is not None:
raise ValidationError("Values in 'health' must be JSON-compatible")
sanitized_health[key] = value
return {
"timestamp": timestamp,
"parsed_timestamp": parsed,
"health": sanitized_health,
}
def validate_config_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
result: Dict[str, Any] = {}
if "config" in payload:
config = payload["config"]
if not isinstance(config, Mapping):
raise ValidationError("Field 'config' must be an object")
result["config"] = dict(config)
if "label" in payload:
labels = payload["label"]
if not isinstance(labels, list) or not all(isinstance(item, str) for item in labels):
raise ValidationError("Field 'label' must be an array of strings")
result["label"] = list(labels)
if not result:
raise ValidationError("At least one of 'config' or 'label' must be provided")
return result

155
src/master/app/nodes_api.py Normal file
View File

@ -0,0 +1,155 @@
from __future__ import annotations
import logging
from http import HTTPStatus
from typing import Any, Mapping
from flask import Blueprint, jsonify, request
from .models import (
ValidationError,
validate_config_payload,
validate_registration_payload,
validate_status_payload,
)
from .scheduler import StatusScheduler
from .storage import Storage
from .util import to_iso, utcnow
def create_nodes_blueprint(storage: Storage, scheduler: StatusScheduler) -> Blueprint:
bp = Blueprint("nodes", __name__)
logger = logging.getLogger("argus.master.api")
def _json_error(message: str, status: HTTPStatus, code: str) -> Any:
response = jsonify({"error": message, "code": code})
response.status_code = status
return response
@bp.errorhandler(ValidationError)
def _handle_validation_error(err: ValidationError):
return _json_error(str(err), HTTPStatus.BAD_REQUEST, "invalid_request")
@bp.get("/nodes")
def list_nodes():
nodes = storage.list_nodes()
return jsonify(nodes)
@bp.get("/nodes/<node_id>")
def get_node(node_id: str):
node = storage.get_node(node_id)
if node is None:
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
return jsonify(node)
@bp.post("/nodes")
def register_node():
payload = _get_json()
data = validate_registration_payload(payload)
now = utcnow()
now_iso = to_iso(now)
node_id = data["id"]
name = data["name"]
node_type = data["type"]
version = data["version"]
meta = data["meta_data"]
if node_id:
# 携带 id 说明是重注册,需要校验名称一致性
existing_row = storage.get_node_raw(node_id)
if existing_row is None:
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
if existing_row["name"] != name:
return _json_error(
"Node id and name mismatch during re-registration",
HTTPStatus.INTERNAL_SERVER_ERROR,
"id_name_mismatch",
)
updated = storage.update_node_meta(
node_id,
node_type=node_type,
version=version,
meta_data=meta,
last_updated_iso=now_iso,
)
scheduler.trigger_nodes_json_refresh()
return jsonify(updated), HTTPStatus.OK
# No id provided → search by name
existing_by_name = storage.get_node_by_name(name)
if existing_by_name:
# 同名节点已存在,视为无 id 重注册
updated = storage.update_node_meta(
existing_by_name["id"],
node_type=node_type,
version=version,
meta_data=meta,
last_updated_iso=now_iso,
)
scheduler.trigger_nodes_json_refresh()
return jsonify(updated), HTTPStatus.OK
new_id = storage.allocate_node_id()
created = storage.create_node(
new_id,
name,
node_type,
version,
meta,
status="initialized",
register_time_iso=now_iso,
last_updated_iso=now_iso,
)
scheduler.trigger_nodes_json_refresh()
return jsonify(created), HTTPStatus.CREATED
@bp.put("/nodes/<node_id>/config")
def update_node_config(node_id: str):
payload = _get_json()
updates = validate_config_payload(payload)
try:
updated = storage.update_config_and_labels(
node_id,
config=updates.get("config"),
labels=updates.get("label"),
)
except KeyError:
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
if "label" in updates:
scheduler.trigger_nodes_json_refresh()
return jsonify(updated)
@bp.get("/nodes/statistics")
def node_statistics():
stats = storage.get_statistics()
return jsonify(stats)
@bp.put("/nodes/<node_id>/status")
def update_status(node_id: str):
payload = _get_json()
data = validate_status_payload(payload)
try:
# master 负责写入 last_report状态由调度器计算
updated = storage.update_last_report(
node_id,
server_timestamp_iso=to_iso(utcnow()),
agent_timestamp_iso=data["timestamp"],
health=data["health"],
)
except KeyError:
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
scheduler.trigger_nodes_json_refresh()
return jsonify(updated)
return bp
def _get_json() -> Mapping[str, Any]:
data = request.get_json(silent=True)
if data is None:
raise ValidationError("Request body must be valid JSON")
if not isinstance(data, Mapping):
raise ValidationError("Request body must be a JSON object")
return data

24
src/master/app/routes.py Normal file
View File

@ -0,0 +1,24 @@
from __future__ import annotations
from flask import Flask, jsonify
from .config import AppConfig
from .nodes_api import create_nodes_blueprint
from .scheduler import StatusScheduler
from .storage import Storage
def register_routes(app: Flask, storage: Storage, scheduler: StatusScheduler, config: AppConfig) -> None:
app.register_blueprint(create_nodes_blueprint(storage, scheduler), url_prefix="/api/v1/master")
@app.get("/healthz")
def healthz():
return jsonify({"status": "ok"})
@app.get("/readyz")
def readyz():
try:
storage.list_nodes() # simple readiness probe
except Exception as exc: # pragma: no cover - defensive
return jsonify({"status": "error", "error": str(exc)}), 500
return jsonify({"status": "ok"})

View File

@ -0,0 +1,90 @@
from __future__ import annotations
import logging
import threading
from typing import Optional
from .config import AppConfig
from .storage import Storage
from .util import atomic_write_json, parse_iso, to_iso, utcnow
class StatusScheduler:
def __init__(self, storage: Storage, config: AppConfig, logger: Optional[logging.Logger] = None) -> None:
self._storage = storage
self._config = config
self._logger = logger or logging.getLogger("argus.master.scheduler")
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run, name="status-scheduler", daemon=True)
self._nodes_json_lock = threading.Lock()
self._pending_nodes_json = threading.Event()
def start(self) -> None:
"""启动后台线程,定期刷新节点状态与 nodes.json。"""
if not self._thread.is_alive():
self._logger.info("Starting scheduler thread")
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
self._pending_nodes_json.set()
self._thread.join(timeout=5)
def trigger_nodes_json_refresh(self) -> None:
self._pending_nodes_json.set()
def generate_nodes_json(self) -> None:
with self._nodes_json_lock:
online_nodes = self._storage.get_online_nodes()
atomic_write_json(self._config.metric_nodes_json_path, online_nodes)
self._logger.info("nodes.json updated", extra={"count": len(online_nodes)})
# ------------------------------------------------------------------
# internal loop
# ------------------------------------------------------------------
def _run(self) -> None:
# 确保启动时 nodes.json 会立即生成
self._pending_nodes_json.set()
while not self._stop_event.is_set():
changed = self._reconcile_statuses()
if changed or self._pending_nodes_json.is_set():
try:
self.generate_nodes_json()
finally:
self._pending_nodes_json.clear()
self._stop_event.wait(self._config.scheduler_interval_seconds)
def _reconcile_statuses(self) -> bool:
"""根据 last_report 与当前时间对比,决定是否切换状态。"""
any_status_changed = False
now = utcnow()
rows = self._storage.fetch_nodes_for_scheduler()
for row in rows:
node_id = row["id"]
last_report_iso = row["last_report"]
current_status = row["status"]
last_report_dt = parse_iso(last_report_iso)
if last_report_dt is None:
# No report yet; treat as initialized until report arrives
continue
delta_seconds = (now - last_report_dt).total_seconds()
new_status = current_status
if delta_seconds > self._config.offline_threshold_seconds:
new_status = "offline"
elif delta_seconds <= self._config.online_threshold_seconds:
new_status = "online"
# Between thresholds: keep current status (sticky)
if new_status != current_status:
any_status_changed = True
self._logger.info(
"Updating node status",
extra={
"node_id": node_id,
"previous": current_status,
"new": new_status,
"delta_seconds": delta_seconds,
},
)
self._storage.update_status(node_id, new_status, last_updated_iso=to_iso(now))
return any_status_changed

332
src/master/app/storage.py Normal file
View File

@ -0,0 +1,332 @@
from __future__ import annotations
import json
import sqlite3
import threading
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
from .models import serialize_node_row, serialize_node_summary
from .util import ensure_parent, to_iso, utcnow
class Storage:
def __init__(self, db_path: str, node_id_prefix: str) -> None:
self._db_path = db_path
self._node_id_prefix = node_id_prefix
ensure_parent(db_path)
self._lock = threading.Lock()
self._conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
with self._lock:
self._conn.execute("PRAGMA foreign_keys = ON;")
self._ensure_schema()
# ------------------------------------------------------------------
# schema & helpers
# ------------------------------------------------------------------
def _ensure_schema(self) -> None:
"""初始化表结构,确保服务启动时数据库结构就绪。"""
with self._lock:
self._conn.executescript(
"""
CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
type TEXT NOT NULL,
version TEXT,
status TEXT NOT NULL,
config_json TEXT,
labels_json TEXT,
meta_json TEXT,
health_json TEXT,
register_time TEXT,
last_report TEXT,
agent_last_report TEXT,
last_updated TEXT
);
CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_nodes_status ON nodes(status);
CREATE INDEX IF NOT EXISTS idx_nodes_name ON nodes(name);
"""
)
self._conn.commit()
def close(self) -> None:
with self._lock:
self._conn.close()
# ------------------------------------------------------------------
# Node ID allocation
# ------------------------------------------------------------------
def allocate_node_id(self) -> str:
"""在 kv 表里维护自增序列,为新节点生成形如 A1 的 ID。"""
with self._lock:
cur = self._conn.execute("SELECT value FROM kv WHERE key = ?", ("node_id_seq",))
row = cur.fetchone()
if row is None:
next_id = 1
self._conn.execute("INSERT INTO kv(key, value) VALUES(?, ?)", ("node_id_seq", str(next_id)))
else:
next_id = int(row["value"]) + 1
self._conn.execute("UPDATE kv SET value = ? WHERE key = ?", (str(next_id), "node_id_seq"))
self._conn.commit()
return f"{self._node_id_prefix}{next_id}"
# ------------------------------------------------------------------
# Query helpers
# ------------------------------------------------------------------
def list_nodes(self) -> List[Dict[str, Any]]:
with self._lock:
cur = self._conn.execute(
"SELECT id, name, status, type, version FROM nodes ORDER BY id ASC"
)
rows = cur.fetchall()
return [serialize_node_summary(row) for row in rows]
def get_node(self, node_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,))
row = cur.fetchone()
if row is None:
return None
return serialize_node_row(row)
def get_node_raw(self, node_id: str) -> Optional[sqlite3.Row]:
with self._lock:
cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,))
row = cur.fetchone()
return row
def get_node_by_name(self, name: str) -> Optional[Dict[str, Any]]:
with self._lock:
cur = self._conn.execute("SELECT * FROM nodes WHERE name = ?", (name,))
row = cur.fetchone()
if row is None:
return None
return serialize_node_row(row)
# ------------------------------------------------------------------
# Mutation helpers
# ------------------------------------------------------------------
def create_node(
self,
node_id: str,
name: str,
node_type: str,
version: str | None,
meta_data: Mapping[str, Any],
status: str,
register_time_iso: str,
last_updated_iso: str,
) -> Dict[str, Any]:
"""插入节点初始记录,默认 config/label/health 为空。"""
now_iso = last_updated_iso
with self._lock:
self._conn.execute(
"""
INSERT INTO nodes (
id, name, type, version, status, config_json, labels_json, meta_json,
health_json, register_time, last_report, agent_last_report, last_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
node_id,
name,
node_type,
version,
status,
json.dumps({}),
json.dumps([]),
json.dumps(dict(meta_data)),
json.dumps({}),
register_time_iso,
None,
None,
now_iso,
),
)
self._conn.commit()
created = self.get_node(node_id)
if created is None:
raise RuntimeError("Failed to read back created node")
return created
def update_node_meta(
self,
node_id: str,
*,
name: Optional[str] = None,
node_type: Optional[str] = None,
version: Optional[str | None] = None,
meta_data: Optional[Mapping[str, Any]] = None,
last_updated_iso: Optional[str] = None,
) -> Dict[str, Any]:
"""重注册时更新节点静态信息,缺省字段保持不变。"""
updates: List[str] = []
params: List[Any] = []
if name is not None:
updates.append("name = ?")
params.append(name)
if node_type is not None:
updates.append("type = ?")
params.append(node_type)
if version is not None:
updates.append("version = ?")
params.append(version)
if meta_data is not None:
updates.append("meta_json = ?")
params.append(json.dumps(dict(meta_data)))
if last_updated_iso is not None:
updates.append("last_updated = ?")
params.append(last_updated_iso)
if not updates:
result = self.get_node(node_id)
if result is None:
raise KeyError(node_id)
return result
params.append(node_id)
with self._lock:
self._conn.execute(
f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?",
tuple(params),
)
self._conn.commit()
updated = self.get_node(node_id)
if updated is None:
raise KeyError(node_id)
return updated
def update_config_and_labels(
self, node_id: str, *, config: Optional[Mapping[str, Any]] = None, labels: Optional[Iterable[str]] = None
) -> Dict[str, Any]:
"""部分更新 config/label并刷新 last_updated 时间戳。"""
updates: List[str] = []
params: List[Any] = []
if config is not None:
updates.append("config_json = ?")
params.append(json.dumps(dict(config)))
if labels is not None:
updates.append("labels_json = ?")
params.append(json.dumps(list(labels)))
updates.append("last_updated = ?")
params.append(to_iso(utcnow()))
params.append(node_id)
with self._lock:
self._conn.execute(
f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?",
tuple(params),
)
if self._conn.total_changes == 0:
self._conn.rollback()
raise KeyError(node_id)
self._conn.commit()
updated = self.get_node(node_id)
if updated is None:
raise KeyError(node_id)
return updated
def update_last_report(
self,
node_id: str,
*,
server_timestamp_iso: str,
agent_timestamp_iso: str,
health: Mapping[str, Any],
) -> Dict[str, Any]:
"""记录最新上报时间和健康信息,用于后续状态计算。"""
with self._lock:
self._conn.execute(
"""
UPDATE nodes
SET last_report = ?,
agent_last_report = ?,
health_json = ?,
last_updated = ?
WHERE id = ?
""",
(
server_timestamp_iso,
agent_timestamp_iso,
json.dumps(health),
server_timestamp_iso,
node_id,
),
)
if self._conn.total_changes == 0:
self._conn.rollback()
raise KeyError(node_id)
self._conn.commit()
updated = self.get_node(node_id)
if updated is None:
raise KeyError(node_id)
return updated
def update_status(self, node_id: str, status: str, *, last_updated_iso: str) -> None:
with self._lock:
self._conn.execute(
"UPDATE nodes SET status = ?, last_updated = ? WHERE id = ?",
(status, last_updated_iso, node_id),
)
self._conn.commit()
# ------------------------------------------------------------------
# Reporting helpers
# ------------------------------------------------------------------
def get_statistics(self) -> Dict[str, Any]:
"""统计节点总数及按状态聚合的数量。"""
with self._lock:
cur = self._conn.execute("SELECT COUNT(*) AS total FROM nodes")
total_row = cur.fetchone()
cur = self._conn.execute("SELECT status, COUNT(*) AS count FROM nodes GROUP BY status")
status_rows = cur.fetchall()
return {
"total": total_row["total"] if total_row else 0,
"status_statistics": [
{"status": row["status"], "count": row["count"]}
for row in status_rows
],
}
def fetch_nodes_for_scheduler(self) -> List[sqlite3.Row]:
with self._lock:
cur = self._conn.execute(
"SELECT id, last_report, status FROM nodes"
)
return cur.fetchall()
def get_online_nodes(self) -> List[Dict[str, Any]]:
"""返回在线节点列表,用于生成 nodes.json。"""
with self._lock:
cur = self._conn.execute(
"SELECT id, meta_json, labels_json, name FROM nodes WHERE status = ? ORDER BY id ASC",
("online",),
)
rows = cur.fetchall()
result: List[Dict[str, Any]] = []
for row in rows:
meta = json.loads(row["meta_json"]) if row["meta_json"] else {}
labels = json.loads(row["labels_json"]) if row["labels_json"] else []
result.append(
{
"node_id": row["id"],
"user_id": meta.get("user"),
"ip": meta.get("ip"),
"hostname": meta.get("hostname", row["name"]),
"labels": labels if isinstance(labels, list) else [],
}
)
return result

51
src/master/app/util.py Normal file
View File

@ -0,0 +1,51 @@
from __future__ import annotations
import json
import os
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def utcnow() -> datetime:
"""获取当前 UTC 时间,统一时间基准。"""
return datetime.now(timezone.utc)
def to_iso(dt: datetime | None) -> str | None:
if dt is None:
return None
return dt.astimezone(timezone.utc).replace(microsecond=0).strftime(ISO_FORMAT)
def parse_iso(value: str | None) -> datetime | None:
if not value:
return None
try:
if value.endswith("Z"):
return datetime.strptime(value, ISO_FORMAT).replace(tzinfo=timezone.utc)
# Fallback for ISO strings with offset
return datetime.fromisoformat(value).astimezone(timezone.utc)
except ValueError:
return None
def ensure_parent(path: str) -> None:
"""确保目标文件所在目录存在。"""
Path(path).parent.mkdir(parents=True, exist_ok=True)
def atomic_write_json(path: str, data: Iterable[Any] | Any) -> None:
"""原子化写 JSON避免被其它进程读到半成品。"""
ensure_parent(path)
directory = Path(path).parent
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as tmp:
json.dump(data, tmp, separators=(",", ":"))
tmp.flush()
os.fsync(tmp.fileno())
temp_path = tmp.name
os.replace(temp_path, path)

View File

View File

@ -0,0 +1,2 @@
Flask==2.3.3
gunicorn==21.2.0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--intranet] [--tag <image_tag>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}"
BUILD_ARGS=()
while [[ "$#" -gt 0 ]]; do
case "$1" in
--intranet)
INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}"
BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}")
shift
;;
--tag)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAG="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
echo "[INFO] Building image $IMAGE_TAG"
docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT"
echo "[OK] Image $IMAGE_TAG built"

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--file <tar_path>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
DEFAULT_INPUT="$PROJECT_ROOT/images/argus-master-dev.tar"
IMAGE_TAR="$DEFAULT_INPUT"
while [[ "$#" -gt 0 ]]; do
case "$1" in
--file)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAR="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
if [[ ! -f "$IMAGE_TAR" ]]; then
echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2
exit 1
fi
echo "[INFO] Loading image from $IMAGE_TAR"
docker image load -i "$IMAGE_TAR"
echo "[OK] Image loaded"

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--tag <image_tag>] [--output <tar_path>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-master-dev.tar"
IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}"
OUTPUT_PATH="$DEFAULT_OUTPUT"
while [[ "$#" -gt 0 ]]; do
case "$1" in
--tag)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAG="$2"
shift 2
;;
--output)
[[ $# -ge 2 ]] || { usage; exit 1; }
OUTPUT_PATH="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
mkdir -p "$(dirname "$OUTPUT_PATH")"
echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH"
docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH"
echo "[OK] Image saved"

2
src/master/tests/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
private/
tmp/

View File

@ -0,0 +1,18 @@
services:
master:
image: argus-master:dev
container_name: argus-master-e2e
environment:
- OFFLINE_THRESHOLD_SECONDS=6
- ONLINE_THRESHOLD_SECONDS=2
- SCHEDULER_INTERVAL_SECONDS=1
ports:
- "31300:3000"
volumes:
- ./private/argus/master:/private/argus/master
- ./private/argus/metric/prometheus:/private/argus/metric/prometheus
restart: unless-stopped
networks:
default:
driver: bridge

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SCRIPTS=(
"01_up_master.sh"
"02_verify_ready_and_nodes_json.sh"
"03_register_via_curl.sh"
"04_reregister_and_error_cases.sh"
"05_status_report_via_curl.sh"
"06_config_update_and_nodes_json.sh"
"07_stats_single_node.sh"
"08_multi_node_stats.sh"
"09_restart_persistence.sh"
"10_down.sh"
)
for script in "${SCRIPTS[@]}"; do
echo "[TEST] Running $script"
"$SCRIPT_DIR/$script"
echo "[TEST] $script completed"
echo
done
echo "[TEST] Master module E2E tests completed"

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
MODULE_ROOT="$(cd "$TEST_ROOT/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
mkdir -p "$PRIVATE_ROOT/argus/master"
mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus"
mkdir -p "$TMP_ROOT"
# 确保上一次运行留下的容器/数据被清理
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
pushd "$TEST_ROOT" >/dev/null
compose down --remove-orphans || true
popd >/dev/null
rm -rf "$TMP_ROOT" "$PRIVATE_ROOT"
mkdir -p "$PRIVATE_ROOT/argus/master"
mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus"
mkdir -p "$TMP_ROOT"
pushd "$MODULE_ROOT" >/dev/null
./scripts/build_images.sh --tag argus-master:dev
popd >/dev/null
pushd "$TEST_ROOT" >/dev/null
compose down --remove-orphans || true
compose up -d
popd >/dev/null
echo "[INFO] Master container is up on http://localhost:31300"

View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
API_BASE="http://localhost:31300"
NODES_JSON_PATH="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
# 等待 readyz 返回 200确保数据库初始化完成
for _ in {1..30}; do
status=$(curl -s -o /dev/null -w '%{http_code}' "$API_BASE/readyz" || true)
if [[ "$status" == "200" ]]; then
break
fi
sleep 1
done
if [[ "${status:-}" != "200" ]]; then
echo "[ERROR] /readyz 未在预期时间内返回 200实际=$status" >&2
exit 1
fi
echo "[INFO] /readyz 已通过,就绪检查成功"
# scheduler 启动时会产生空的 nodes.json这里等待文件出现并校验内容
for _ in {1..30}; do
if [[ -f "$NODES_JSON_PATH" ]]; then
break
fi
sleep 1
done
if [[ ! -f "$NODES_JSON_PATH" ]]; then
echo "[ERROR] 未在预期时间内生成 $NODES_JSON_PATH" >&2
exit 1
fi
if ! python3 - "$NODES_JSON_PATH" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
data = json.load(handle)
if data != []:
raise SystemExit(f"nodes.json initial content should be [], got {data}")
PY
then
echo "[ERROR] nodes.json 初始内容不是空数组" >&2
exit 1
fi
echo "[INFO] nodes.json 初始状态校验通过"

View File

@ -0,0 +1,68 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
mkdir -p "$TMP_ROOT"
for _ in {1..30}; do
if curl -sf "$API_BASE/healthz" >/dev/null; then
break
fi
sleep 1
done
payload=$(cat <<'JSON'
{
"name": "dev-testuser-testinst-pod-0",
"type": "agent",
"meta_data": {
"hostname": "dev-testuser-testinst-pod-0",
"ip": "10.0.0.10",
"env": "dev",
"user": "testuser",
"instance": "testinst",
"cpu_number": 4,
"memory_in_bytes": 2147483648,
"gpu_number": 0
},
"version": "1.1.0"
}
JSON
)
body_file="$TMP_ROOT/register_body.json"
status=$(curl -sS -o "$body_file" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$payload")
body="$(cat "$body_file")"
if [[ "$status" != "201" ]]; then
echo "[ERROR] Unexpected status code: $status" >&2
echo "$body" >&2
exit 1
fi
node_id=$(python3 - "$body_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
body = json.load(handle)
print(body["id"])
PY
)
echo "$body" > "$TMP_ROOT/last_response.json"
echo "$node_id" > "$TMP_ROOT/node_id"
list_file="$TMP_ROOT/nodes_list.json"
curl -sS "$API_BASE/nodes" -o "$list_file"
python3 - "$list_file" "$node_id" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
data = json.load(handle)
node_id = sys.argv[2]
assert any(item.get("id") == node_id for item in data), "node not in list"
PY
echo "[INFO] Registered node with id $node_id"

View File

@ -0,0 +1,116 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
NODE_ID="$(cat "$TMP_ROOT/node_id")"
# 使用相同 ID 重注册,同时修改部分 meta/version 字段
payload=$(cat <<JSON
{
"id": "$NODE_ID",
"name": "dev-testuser-testinst-pod-0",
"type": "agent",
"meta_data": {
"hostname": "dev-testuser-testinst-pod-0",
"ip": "10.0.0.11",
"env": "dev",
"user": "testuser",
"instance": "testinst",
"cpu_number": 8,
"memory_in_bytes": 2147483648,
"gpu_number": 0
},
"version": "1.2.0"
}
JSON
)
status=$(curl -sS -o "$TMP_ROOT/reregister_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$payload")
if [[ "$status" != "200" ]]; then
echo "[ERROR] 重注册返回非 200: $status" >&2
cat "$TMP_ROOT/reregister_response.json" >&2
exit 1
fi
python3 - "$TMP_ROOT/reregister_response.json" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["meta_data"]["ip"] == "10.0.0.11", node["meta_data"]
assert node["meta_data"]["cpu_number"] == 8, node["meta_data"]
assert node["version"] == "1.2.0", node
PY
echo "[INFO] 重注册成功,元数据已更新"
# 未知 ID => 404
unknown_payload=$(cat <<'JSON'
{
"id": "A999",
"name": "dev-testuser-testinst-pod-0",
"type": "agent",
"meta_data": {
"hostname": "dev-testuser-testinst-pod-0",
"ip": "10.0.0.12",
"env": "dev",
"user": "testuser",
"instance": "testinst",
"cpu_number": 4,
"memory_in_bytes": 2147483648,
"gpu_number": 0
},
"version": "1.2.0"
}
JSON
)
status=$(curl -sS -o "$TMP_ROOT/unknown_id_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$unknown_payload")
if [[ "$status" != "404" ]]; then
echo "[ERROR] 未知 ID 应返回 404实际=$status" >&2
cat "$TMP_ROOT/unknown_id_response.json" >&2
exit 1
fi
echo "[INFO] 未知 ID 返回 404 验证通过"
# id 与 name 不匹配 => 500节点保持原名
mismatch_payload=$(cat <<JSON
{
"id": "$NODE_ID",
"name": "dev-testuser-testinst-pod-0-mismatch",
"type": "agent",
"meta_data": {
"hostname": "dev-testuser-testinst-pod-0-mismatch",
"ip": "10.0.0.13",
"env": "dev",
"user": "testuser",
"instance": "testinst",
"cpu_number": 4,
"memory_in_bytes": 2147483648,
"gpu_number": 0
},
"version": "1.2.0"
}
JSON
)
status=$(curl -sS -o "$TMP_ROOT/mismatch_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$mismatch_payload")
if [[ "$status" != "500" ]]; then
echo "[ERROR] 名称不匹配应返回 500实际=$status" >&2
cat "$TMP_ROOT/mismatch_response.json" >&2
exit 1
fi
# 验证名称仍保持正确
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/post_mismatch_detail.json"
python3 - "$TMP_ROOT/post_mismatch_detail.json" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["name"] == "dev-testuser-testinst-pod-0", node["name"]
PY
echo "[INFO] 名称不匹配返回 500且原始节点未被篡改"

View File

@ -0,0 +1,98 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
node_id="$(cat "$TMP_ROOT/node_id")"
payload=$(python3 - <<'PY'
import json
from datetime import datetime, timezone
body = {
"timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"health": {
"log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"},
"metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}
}
}
print(json.dumps(body))
PY
)
response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload")
body="$(echo "$response" | head -n -1)"
status="$(echo "$response" | tail -n1)"
if [[ "$status" != "200" ]]; then
echo "[ERROR] Status update failed with code $status" >&2
echo "$body" >&2
exit 1
fi
echo "$body" > "$TMP_ROOT/last_response.json"
sleep 3
detail_file="$TMP_ROOT/status_detail.json"
curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file"
python3 - "$detail_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["status"] == "online", f"Expected online, got {node['status']}"
assert "log-fluentbit" in node["health"], node["health"].keys()
PY
echo "[INFO] Status report successful and node is online"
# 等待超过 offline 阈值,验证会自动转为 offline
sleep 7
offline_detail_file="$TMP_ROOT/status_offline.json"
curl -sS "$API_BASE/nodes/$node_id" -o "$offline_detail_file"
python3 - "$offline_detail_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["status"] == "offline", f"Expected offline, got {node['status']}"
PY
echo "[INFO] Node transitioned to offline as expected"
# 再次上报健康,触发状态回到 online
payload=$(python3 - <<'PY'
import json
from datetime import datetime, timezone
body = {
"timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"health": {
"log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"},
"metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}
}
}
print(json.dumps(body))
PY
)
curl -sS -o "$TMP_ROOT/second_status_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload" > "$TMP_ROOT/second_status_code"
if [[ $(cat "$TMP_ROOT/second_status_code") != "200" ]]; then
echo "[ERROR] Second status update failed" >&2
cat "$TMP_ROOT/second_status_response.json" >&2
exit 1
fi
sleep 3
final_detail_file="$TMP_ROOT/status_back_online.json"
curl -sS "$API_BASE/nodes/$node_id" -o "$final_detail_file"
python3 - "$final_detail_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["status"] == "online", f"Expected online after second report, got {node['status']}"
PY
echo "[INFO] Node transitioned back to online after new status report"

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
PRIVATE_ROOT="$TEST_ROOT/private"
API_BASE="http://localhost:31300/api/v1/master"
NODE_ID="$(cat "$TMP_ROOT/node_id")"
payload='{"config":{"log_level":"debug"},"label":["gpu","exp001"]}'
response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/config" -d "$payload")
body="$(echo "$response" | head -n -1)"
status="$(echo "$response" | tail -n1)"
if [[ "$status" != "200" ]]; then
echo "[ERROR] Config update failed: $status" >&2
echo "$body" >&2
exit 1
fi
sleep 2
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
if [[ ! -f "$nodes_json_path" ]]; then
echo "[ERROR] nodes.json not generated at $nodes_json_path" >&2
exit 1
fi
# 确保节点处于 online 状态,避免因等待导致 nodes.json 为空
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/config_detail.json"
if ! python3 - "$TMP_ROOT/config_detail.json" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
if node["status"] != "online":
raise SystemExit(1)
PY
then
payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}'
curl -sS -o "$TMP_ROOT/config_second_report.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/status" -d "$payload" > "$TMP_ROOT/config_second_code"
sleep 2
fi
python3 - "$nodes_json_path" <<'PY'
import json, sys
from pathlib import Path
path = Path(sys.argv[1])
content = json.loads(path.read_text())
assert isinstance(content, list) and len(content) == 1
entry = content[0]
assert entry["labels"] == ["gpu", "exp001"], entry
PY
echo "[INFO] Config updated and nodes.json verified"

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
NODE_ID="$(cat "$TMP_ROOT/node_id")"
sleep 7
detail_file="$TMP_ROOT/offline_detail.json"
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"
python3 - "$detail_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert node["status"] == "offline", f"Expected offline, got {node['status']}"
PY
stats_file="$TMP_ROOT/stats.json"
curl -sS "$API_BASE/nodes/statistics" -o "$stats_file"
python3 - "$stats_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
stats = json.load(handle)
assert stats["total"] == 1
found = {item["status"]: item["count"] for item in stats["status_statistics"]}
assert found.get("offline") == 1
PY
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
python3 - "$nodes_json_path" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
content = json.load(handle)
assert content == [], content
PY
echo "[INFO] Offline transition and statistics validated"

View File

@ -0,0 +1,106 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
# 注册第二个节点 A2保持在线
second_payload=$(cat <<'JSON'
{
"name": "dev-testuser-testinst-pod-1",
"type": "agent",
"meta_data": {
"hostname": "dev-testuser-testinst-pod-1",
"ip": "10.0.0.11",
"env": "dev",
"user": "testuser",
"instance": "testinst",
"cpu_number": 8,
"memory_in_bytes": 2147483648,
"gpu_number": 0
},
"version": "1.1.0"
}
JSON
)
status=$(curl -sS -o "$TMP_ROOT/second_register.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$second_payload")
if [[ "$status" != "201" ]]; then
echo "[ERROR] Second node registration failed: $status" >&2
cat "$TMP_ROOT/second_register.json" >&2
exit 1
fi
SECOND_NODE_ID=$(python3 - "$TMP_ROOT/second_register.json" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
data = json.load(handle)
print(data["id"])
PY
)
echo "$SECOND_NODE_ID" > "$TMP_ROOT/second_node_id"
echo "[INFO] Second node registered with id $SECOND_NODE_ID"
# A2 上报健康信息,保持 online
status_payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}'
status=$(curl -sS -o "$TMP_ROOT/second_status.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$SECOND_NODE_ID/status" -d "$status_payload")
if [[ "$status" != "200" ]]; then
echo "[ERROR] Second node status update failed: $status" >&2
cat "$TMP_ROOT/second_status.json" >&2
exit 1
fi
# 等待调度器把第二节点标记为 online
second_online=false
for _ in {1..10}; do
sleep 1
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$TMP_ROOT/second_detail.json" || continue
if python3 - "$TMP_ROOT/second_detail.json" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
if node["status"] != "online":
raise SystemExit(1)
PY
then
second_online=true
break
fi
done
if [[ "$second_online" != true ]]; then
echo "[ERROR] Second node did not become online" >&2
exit 1
fi
# 再次获取统计信息
stats_file="$TMP_ROOT/multi_stats.json"
curl -sS "$API_BASE/nodes/statistics" -o "$stats_file"
python3 - "$stats_file" "$TMP_ROOT/node_id" "$TMP_ROOT/second_node_id" <<'PY'
import json, sys, pathlib
with open(sys.argv[1]) as handle:
stats = json.load(handle)
first_id = pathlib.Path(sys.argv[2]).read_text().strip()
second_id = pathlib.Path(sys.argv[3]).read_text().strip()
assert stats["total"] == 2, stats
found = {item["status"]: item["count"] for item in stats["status_statistics"]}
assert found.get("offline") == 1, found
assert found.get("online") == 1, found
PY
# 验证 nodes.json 只包含在线节点(应只有第二个 A2
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
python3 - "$nodes_json_path" "$SECOND_NODE_ID" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
content = json.load(handle)
expected_id = sys.argv[2]
assert len(content) == 1, content
assert content[0]["node_id"] == expected_id, content
PY
echo "[INFO] Multi-node statistics and nodes.json validated"

View File

@ -0,0 +1,161 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:31300/api/v1/master"
ROOT_BASE="http://localhost:31300"
DB_PATH="$PRIVATE_ROOT/argus/master/db.sqlite3"
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
if [[ ! -f "$TMP_ROOT/node_id" ]]; then
echo "[ERROR] 主节点 ID 缺失,请先执行前置用例" >&2
exit 1
fi
if [[ ! -f "$TMP_ROOT/second_node_id" ]]; then
echo "[ERROR] 第二个节点 ID 缺失,请先执行多节点场景脚本" >&2
exit 1
fi
if [[ ! -f "$DB_PATH" ]]; then
echo "[ERROR] 持久化数据库缺失: $DB_PATH" >&2
exit 1
fi
NODE_ID="$(cat "$TMP_ROOT/node_id")"
SECOND_NODE_ID="$(cat "$TMP_ROOT/second_node_id")"
# 在重启前抓取节点详情与节点文件、统计信息,作为对比基线
first_before="$TMP_ROOT/${NODE_ID}_pre_restart.json"
second_before="$TMP_ROOT/${SECOND_NODE_ID}_pre_restart.json"
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_before"
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_before"
nodes_json_before="$TMP_ROOT/nodes_json_pre_restart.json"
cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_before"
stats_before="$TMP_ROOT/stats_pre_restart.json"
curl -sS "$API_BASE/nodes/statistics" -o "$stats_before"
# 重启 master 容器,模拟服务重启后的持久化场景
pushd "$TEST_ROOT" >/dev/null
compose restart master
popd >/dev/null
# 等待 /readyz 恢复 200
for _ in {1..30}; do
status=$(curl -s -o /dev/null -w '%{http_code}' "$ROOT_BASE/readyz" || true)
if [[ "$status" == "200" ]]; then
break
fi
sleep 1
done
if [[ "${status:-}" != "200" ]]; then
echo "[ERROR] master 容器重启后未恢复健康状态readyz=$status" >&2
exit 1
fi
sleep 2
first_after="$TMP_ROOT/${NODE_ID}_post_restart.json"
second_after="$TMP_ROOT/${SECOND_NODE_ID}_post_restart.json"
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_after"
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_after"
# 对比重启前后的节点关键信息,确保无丢失
python3 - "$first_before" "$first_after" <<'PY'
import json, sys
before_path, after_path = sys.argv[1:3]
with open(before_path, 'r', encoding='utf-8') as handle:
before = json.load(handle)
with open(after_path, 'r', encoding='utf-8') as handle:
after = json.load(handle)
keys = [
"id",
"name",
"type",
"version",
"register_time",
"meta_data",
"config",
"label",
"health",
"last_report",
"agent_last_report",
"status",
]
for key in keys:
if before.get(key) != after.get(key):
raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}")
PY
python3 - "$second_before" "$second_after" <<'PY'
import json, sys
before_path, after_path = sys.argv[1:3]
with open(before_path, 'r', encoding='utf-8') as handle:
before = json.load(handle)
with open(after_path, 'r', encoding='utf-8') as handle:
after = json.load(handle)
keys = [
"id",
"name",
"type",
"version",
"register_time",
"meta_data",
"config",
"label",
"health",
"last_report",
"agent_last_report",
"status",
]
for key in keys:
if before.get(key) != after.get(key):
raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}")
PY
# 对比重启前后的 nodes.json 与统计信息,验证持久化一致性
nodes_json_after="$TMP_ROOT/nodes_json_post_restart.json"
cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_after"
stats_after="$TMP_ROOT/stats_after_restart.json"
curl -sS "$API_BASE/nodes/statistics" -o "$stats_after"
python3 - "$nodes_json_before" "$nodes_json_after" <<'PY'
import json, sys
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
before = json.load(handle)
with open(sys.argv[2], 'r', encoding='utf-8') as handle:
after = json.load(handle)
if before != after:
raise AssertionError(f"nodes.json changed after restart: {before} -> {after}")
PY
python3 - "$stats_before" "$stats_after" <<'PY'
import json, sys
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
before = json.load(handle)
with open(sys.argv[2], 'r', encoding='utf-8') as handle:
after = json.load(handle)
if before != after:
raise AssertionError(f"Statistics changed after restart: {before} -> {after}")
PY
if [[ ! -s "$DB_PATH" ]]; then
echo "[ERROR] 数据库文件为空,疑似未持久化" >&2
exit 1
fi
echo "[INFO] Master 重启后持久化数据校验通过"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
pushd "$TEST_ROOT" >/dev/null
compose down --remove-orphans
popd >/dev/null
rm -rf "$TMP_ROOT"
rm -rf "$PRIVATE_ROOT"
echo "[INFO] Master E2E environment cleaned up"