Compare commits
2 Commits
f1f4ba6bb6
...
bc016826f1
Author | SHA1 | Date | |
---|---|---|---|
bc016826f1 | |||
5e5342ce20 |
2
src/.gitignore
vendored
Normal file
2
src/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
|
||||
__pycache__/
|
23
src/agent/Dockerfile
Normal file
23
src/agent/Dockerfile
Normal file
@ -0,0 +1,23 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
|
||||
ARG PIP_INDEX_URL=
|
||||
ENV PIP_NO_CACHE_DIR=1 \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONPATH=/app
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN set -euxo pipefail \
|
||||
&& python -m pip install --upgrade pip \
|
||||
&& if [[ -n "$PIP_INDEX_URL" ]]; then \
|
||||
PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \
|
||||
else \
|
||||
python -m pip install -r requirements.txt; \
|
||||
fi
|
||||
|
||||
COPY app ./app
|
||||
|
||||
CMD ["python", "-m", "app.main"]
|
@ -0,0 +1,31 @@
|
||||
# Argus Agent Module
|
||||
|
||||
Python agent that registers with the Argus master service, persists node information, gathers host metadata, reads module health files, and periodically reports status.
|
||||
|
||||
## Build & Run
|
||||
|
||||
```bash
|
||||
cd src/agent
|
||||
./scripts/build_images.sh # builds argus-agent:dev
|
||||
```
|
||||
|
||||
Runtime expects a configuration file (generated by installer) at `/private/argus/agent/<hostname>/config`. Key fields:
|
||||
|
||||
- `HOSTNAME`, `NODE_FILE`, `VERSION`
|
||||
- `MASTER_ENDPOINT` (e.g. `http://master:3000`)
|
||||
- `REPORT_INTERVAL_SECONDS`
|
||||
- `SUBMODULE_HEALTH_FILE_DIR` (supports `{hostname}` placeholder)
|
||||
- optional `GPU_NUMBER`
|
||||
|
||||
Health files live under `/private/argus/agent/health/<hostname>/` and must follow `<prefix>-*.json` naming (e.g. `log-fluentbit.json`). The agent sends parsed JSON objects keyed by file stem.
|
||||
|
||||
## Tests
|
||||
|
||||
Docker-based E2E stack (master + agent):
|
||||
|
||||
```bash
|
||||
cd src/agent/tests
|
||||
./scripts/00_e2e_test.sh
|
||||
```
|
||||
|
||||
The scripts provision configs/health directories under `tests/private/` and clean up via `07_down.sh`.
|
0
src/agent/app/__init__.py
Normal file
0
src/agent/app/__init__.py
Normal file
60
src/agent/app/client.py
Normal file
60
src/agent/app/client.py
Normal file
@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import requests
|
||||
|
||||
from .log import get_logger
|
||||
|
||||
LOGGER = get_logger("argus.agent.client")
|
||||
|
||||
|
||||
class MasterAPIError(Exception):
|
||||
def __init__(self, message: str, status_code: int, payload: Optional[Dict[str, Any]] = None) -> None:
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
self.payload = payload or {}
|
||||
|
||||
|
||||
class AgentClient:
|
||||
def __init__(self, base_url: str, *, timeout: int = 10) -> None:
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._timeout = timeout
|
||||
self._session = requests.Session()
|
||||
|
||||
def register_node(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""调用 master 注册接口,返回节点对象。"""
|
||||
url = f"{self._base_url}/api/v1/master/nodes"
|
||||
response = self._session.post(url, json=body, timeout=self._timeout)
|
||||
return self._parse_response(response, "Failed to register node")
|
||||
|
||||
def update_status(self, node_id: str, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""上报健康信息,由 master 更新 last_report。"""
|
||||
url = f"{self._base_url}/api/v1/master/nodes/{node_id}/status"
|
||||
response = self._session.put(url, json=body, timeout=self._timeout)
|
||||
return self._parse_response(response, "Failed to update node status")
|
||||
|
||||
def _parse_response(self, response: requests.Response, error_prefix: str) -> Dict[str, Any]:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
payload: Dict[str, Any] | None = None
|
||||
if "application/json" in content_type:
|
||||
try:
|
||||
payload = response.json()
|
||||
except json.JSONDecodeError:
|
||||
LOGGER.warning("Response contained invalid JSON", extra={"status": response.status_code})
|
||||
|
||||
if response.status_code >= 400:
|
||||
message = payload.get("error") if isinstance(payload, dict) else response.text
|
||||
raise MasterAPIError(
|
||||
f"{error_prefix}: {message}",
|
||||
status_code=response.status_code,
|
||||
payload=payload if isinstance(payload, dict) else None,
|
||||
)
|
||||
|
||||
if payload is None:
|
||||
try:
|
||||
payload = response.json()
|
||||
except json.JSONDecodeError as exc:
|
||||
raise MasterAPIError("Master returned non-JSON payload", response.status_code) from exc
|
||||
return payload
|
114
src/agent/app/collector.py
Normal file
114
src/agent/app/collector.py
Normal file
@ -0,0 +1,114 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
from .config import AgentConfig
|
||||
from .log import get_logger
|
||||
|
||||
LOGGER = get_logger("argus.agent.collector")
|
||||
|
||||
_HOSTNAME_PATTERN = re.compile(r"^([^-]+)-([^-]+)-([^-]+)-.*$")
|
||||
|
||||
|
||||
def collect_metadata(config: AgentConfig) -> Dict[str, Any]:
|
||||
"""汇总节点注册需要的静态信息。"""
|
||||
hostname = config.hostname
|
||||
env, user, instance = _parse_hostname(hostname)
|
||||
meta = {
|
||||
"hostname": hostname,
|
||||
"ip": _detect_ip_address(),
|
||||
"env": env,
|
||||
"user": user,
|
||||
"instance": instance,
|
||||
"cpu_number": _detect_cpu_count(),
|
||||
"memory_in_bytes": _detect_memory_bytes(),
|
||||
"gpu_number": _detect_gpu_count(config),
|
||||
}
|
||||
return meta
|
||||
|
||||
|
||||
def _parse_hostname(hostname: str) -> tuple[str, str, str]:
|
||||
"""按照约定的 env-user-instance 前缀拆解主机名。"""
|
||||
match = _HOSTNAME_PATTERN.match(hostname)
|
||||
if not match:
|
||||
LOGGER.warning("Hostname does not match expected pattern", extra={"hostname": hostname})
|
||||
return "", "", ""
|
||||
return match.group(1), match.group(2), match.group(3)
|
||||
|
||||
|
||||
def _detect_cpu_count() -> int:
|
||||
count = os.cpu_count()
|
||||
return count if count is not None else 0
|
||||
|
||||
|
||||
def _detect_memory_bytes() -> int:
|
||||
"""优先读取 cgroup 限额,失败时退回 /proc/meminfo。"""
|
||||
cgroup_path = Path("/sys/fs/cgroup/memory.max")
|
||||
try:
|
||||
raw = cgroup_path.read_text(encoding="utf-8").strip()
|
||||
if raw and raw != "max":
|
||||
return int(raw)
|
||||
except FileNotFoundError:
|
||||
LOGGER.debug("cgroup memory.max not found, falling back to /proc/meminfo")
|
||||
except ValueError:
|
||||
LOGGER.warning("Failed to parse memory.max, falling back", extra={"value": raw})
|
||||
|
||||
try:
|
||||
with open("/proc/meminfo", "r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
if line.startswith("MemTotal:"):
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
return int(parts[1]) * 1024
|
||||
except FileNotFoundError:
|
||||
LOGGER.error("/proc/meminfo not found; defaulting memory to 0")
|
||||
return 0
|
||||
|
||||
|
||||
def _detect_gpu_count(config: AgentConfig) -> int:
|
||||
"""采集 GPU 数量,可被配置覆盖。"""
|
||||
if config.gpu_number_override is not None:
|
||||
return config.gpu_number_override
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["nvidia-smi", "-L"],
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
LOGGER.debug("nvidia-smi not available; assuming 0 GPUs")
|
||||
return 0
|
||||
except subprocess.SubprocessError as exc:
|
||||
LOGGER.warning("nvidia-smi invocation failed", extra={"error": str(exc)})
|
||||
return 0
|
||||
|
||||
if proc.returncode != 0:
|
||||
LOGGER.debug("nvidia-smi returned non-zero", extra={"stderr": proc.stderr.strip()})
|
||||
return 0
|
||||
|
||||
count = sum(1 for line in proc.stdout.splitlines() if line.strip())
|
||||
return count
|
||||
|
||||
|
||||
def _detect_ip_address() -> str:
|
||||
"""尝试通过 UDP socket 获得容器出口 IP,失败则回退解析主机名。"""
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
||||
sock.connect(("8.8.8.8", 80))
|
||||
return sock.getsockname()[0]
|
||||
except OSError:
|
||||
LOGGER.debug("UDP socket trick failed; falling back to hostname lookup")
|
||||
try:
|
||||
return socket.gethostbyname(socket.gethostname())
|
||||
except OSError:
|
||||
LOGGER.warning("Unable to resolve hostname to IP; defaulting to 127.0.0.1")
|
||||
return "127.0.0.1"
|
101
src/agent/app/config.py
Normal file
101
src/agent/app/config.py
Normal file
@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AgentConfig:
|
||||
hostname: str
|
||||
node_file: str
|
||||
version: str
|
||||
master_endpoint: str
|
||||
report_interval_seconds: int
|
||||
health_dir_template: str
|
||||
gpu_number_override: int | None
|
||||
request_timeout_seconds: int = 10
|
||||
|
||||
@property
|
||||
def health_dir(self) -> str:
|
||||
return self.health_dir_template.format(hostname=self.hostname)
|
||||
|
||||
|
||||
def _parse_config_file(path: str) -> dict[str, str]:
|
||||
result: dict[str, str] = {}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
for raw_line in handle:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
result[key.strip().upper()] = value.strip()
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"Agent config file not found: {path}") from None
|
||||
return result
|
||||
|
||||
|
||||
def load_config(path: str) -> AgentConfig:
|
||||
"""读取配置文件并结合环境变量,返回 AgentConfig。"""
|
||||
config_values = _parse_config_file(path)
|
||||
force_env = os.environ.get("AGENT_FORCE_ENV", "0").lower() in {"1", "true", "yes"}
|
||||
|
||||
def read_key(key: str, default: str | None = None, *, required: bool = False) -> str:
|
||||
env_key = f"AGENT_{key}"
|
||||
if env_key in os.environ:
|
||||
return os.environ[env_key]
|
||||
if force_env and key in os.environ:
|
||||
return os.environ[key]
|
||||
if key in config_values:
|
||||
return config_values[key]
|
||||
if default is not None:
|
||||
return default
|
||||
if required:
|
||||
raise ValueError(f"Missing required configuration key: {key}")
|
||||
return ""
|
||||
|
||||
hostname = read_key("HOSTNAME", required=True)
|
||||
node_file = read_key("NODE_FILE", f"/private/argus/agent/{hostname}/node.json")
|
||||
version = read_key("VERSION", "1.0.0")
|
||||
master_endpoint = read_key("MASTER_ENDPOINT", required=True)
|
||||
report_interval_raw = read_key("REPORT_INTERVAL_SECONDS", "60")
|
||||
health_dir_template = read_key(
|
||||
"SUBMODULE_HEALTH_FILE_DIR",
|
||||
f"/private/argus/agent/health/{{hostname}}/",
|
||||
)
|
||||
gpu_override_raw = read_key("GPU_NUMBER", "")
|
||||
|
||||
try:
|
||||
report_interval_seconds = int(report_interval_raw)
|
||||
except ValueError as exc:
|
||||
raise ValueError("REPORT_INTERVAL_SECONDS must be an integer") from exc
|
||||
if report_interval_seconds <= 0:
|
||||
raise ValueError("REPORT_INTERVAL_SECONDS must be positive")
|
||||
|
||||
gpu_override = None
|
||||
if gpu_override_raw:
|
||||
try:
|
||||
gpu_override = int(gpu_override_raw)
|
||||
except ValueError as exc:
|
||||
raise ValueError("GPU_NUMBER must be an integer when provided") from exc
|
||||
if gpu_override < 0:
|
||||
raise ValueError("GPU_NUMBER must be non-negative")
|
||||
|
||||
if not master_endpoint.startswith("http://") and not master_endpoint.startswith("https://"):
|
||||
master_endpoint = f"http://{master_endpoint}"
|
||||
|
||||
Path(node_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
Path(health_dir_template.format(hostname=hostname)).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return AgentConfig(
|
||||
hostname=hostname,
|
||||
node_file=node_file,
|
||||
version=version,
|
||||
master_endpoint=master_endpoint.rstrip("/"),
|
||||
report_interval_seconds=report_interval_seconds,
|
||||
health_dir_template=health_dir_template,
|
||||
gpu_number_override=gpu_override,
|
||||
)
|
32
src/agent/app/health_reader.py
Normal file
32
src/agent/app/health_reader.py
Normal file
@ -0,0 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
from .log import get_logger
|
||||
|
||||
LOGGER = get_logger("argus.agent.health")
|
||||
|
||||
|
||||
def read_health_directory(path: str) -> Dict[str, Any]:
|
||||
"""读取目录中所有 <prefix>-*.json 文件并返回 JSON 映射。"""
|
||||
result: Dict[str, Any] = {}
|
||||
directory = Path(path)
|
||||
if not directory.exists():
|
||||
LOGGER.debug("Health directory does not exist", extra={"path": str(directory)})
|
||||
return result
|
||||
|
||||
for health_file in sorted(directory.glob("*.json")):
|
||||
if "-" not in health_file.stem:
|
||||
LOGGER.debug("Skipping non-prefixed health file", extra={"file": health_file.name})
|
||||
continue
|
||||
try:
|
||||
with health_file.open("r", encoding="utf-8") as handle:
|
||||
content = json.load(handle)
|
||||
result[health_file.stem] = content
|
||||
except json.JSONDecodeError as exc:
|
||||
LOGGER.warning("Failed to parse health file", extra={"file": health_file.name, "error": str(exc)})
|
||||
except OSError as exc:
|
||||
LOGGER.warning("Failed to read health file", extra={"file": health_file.name, "error": str(exc)})
|
||||
return result
|
18
src/agent/app/log.py
Normal file
18
src/agent/app/log.py
Normal file
@ -0,0 +1,18 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
||||
_LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s - %(message)s"
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
level_name = os.environ.get("AGENT_LOG_LEVEL", "INFO").upper()
|
||||
level = getattr(logging, level_name, logging.INFO)
|
||||
logging.basicConfig(level=level, format=_LOG_FORMAT)
|
||||
|
||||
|
||||
def get_logger(name: str) -> logging.Logger:
|
||||
setup_logging()
|
||||
return logging.getLogger(name)
|
185
src/agent/app/main.py
Normal file
185
src/agent/app/main.py
Normal file
@ -0,0 +1,185 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from .client import AgentClient, MasterAPIError
|
||||
from .collector import collect_metadata
|
||||
from .config import AgentConfig, load_config
|
||||
from .health_reader import read_health_directory
|
||||
from .log import get_logger, setup_logging
|
||||
from .state import clear_node_state, load_node_state, save_node_state
|
||||
|
||||
LOGGER = get_logger("argus.agent")
|
||||
|
||||
|
||||
def _current_timestamp() -> str:
|
||||
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
class StopSignal:
|
||||
def __init__(self) -> None:
|
||||
self._stop = False
|
||||
|
||||
def set(self, *_args) -> None: # type: ignore[override]
|
||||
self._stop = True
|
||||
|
||||
def is_set(self) -> bool:
|
||||
return self._stop
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Argus agent")
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
dest="config_path",
|
||||
default=None,
|
||||
help="Path to agent config file",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: Optional[list[str]] = None) -> int:
|
||||
setup_logging()
|
||||
args = parse_args(argv or sys.argv[1:])
|
||||
|
||||
stop_signal = StopSignal()
|
||||
signal.signal(signal.SIGTERM, stop_signal.set)
|
||||
signal.signal(signal.SIGINT, stop_signal.set)
|
||||
|
||||
try:
|
||||
config_path = args.config_path or _default_config_path()
|
||||
config = load_config(config_path)
|
||||
except Exception as exc:
|
||||
LOGGER.error("Failed to load configuration", extra={"error": str(exc)})
|
||||
return 1
|
||||
|
||||
LOGGER.info(
|
||||
"Agent starting",
|
||||
extra={
|
||||
"hostname": config.hostname,
|
||||
"master_endpoint": config.master_endpoint,
|
||||
"node_file": config.node_file,
|
||||
},
|
||||
)
|
||||
|
||||
client = AgentClient(config.master_endpoint, timeout=config.request_timeout_seconds)
|
||||
|
||||
node_state = load_node_state(config.node_file) or {}
|
||||
node_id = node_state.get("id")
|
||||
|
||||
# 与 master 建立注册关系(支持重注册),失败则重试
|
||||
register_response = _register_with_retry(client, config, node_id, stop_signal)
|
||||
if register_response is None:
|
||||
LOGGER.info("Registration aborted due to shutdown signal")
|
||||
return 0
|
||||
|
||||
node_id = register_response.get("id")
|
||||
if not node_id:
|
||||
LOGGER.error("Master did not return node id; aborting")
|
||||
return 1
|
||||
save_node_state(config.node_file, register_response)
|
||||
|
||||
LOGGER.info("Entering status report loop", extra={"node_id": node_id})
|
||||
_status_loop(client, config, node_id, stop_signal)
|
||||
return 0
|
||||
|
||||
|
||||
def _default_config_path() -> str:
|
||||
from socket import gethostname
|
||||
|
||||
hostname = gethostname()
|
||||
return f"/private/argus/agent/{hostname}/config"
|
||||
|
||||
|
||||
def _register_with_retry(
|
||||
client: AgentClient,
|
||||
config: AgentConfig,
|
||||
node_id: Optional[str],
|
||||
stop_signal: StopSignal,
|
||||
):
|
||||
backoff = 5
|
||||
while not stop_signal.is_set():
|
||||
payload = {
|
||||
"name": config.hostname,
|
||||
"type": "agent",
|
||||
"meta_data": collect_metadata(config),
|
||||
"version": config.version,
|
||||
}
|
||||
if node_id:
|
||||
payload["id"] = node_id
|
||||
|
||||
try:
|
||||
response = client.register_node(payload)
|
||||
LOGGER.info("Registration successful", extra={"node_id": response.get("id")})
|
||||
save_node_state(config.node_file, response)
|
||||
return response
|
||||
except MasterAPIError as exc:
|
||||
if exc.status_code == 404 and node_id:
|
||||
LOGGER.warning(
|
||||
"Master does not recognise node id; clearing local node state",
|
||||
extra={"node_id": node_id},
|
||||
)
|
||||
clear_node_state(config.node_file)
|
||||
node_id = None
|
||||
elif exc.status_code == 500 and node_id:
|
||||
# id 与 name 不匹配通常意味着配置异常,记录但继续重试
|
||||
LOGGER.error(
|
||||
"Master rejected node due to id/name mismatch; will retry",
|
||||
extra={"node_id": node_id},
|
||||
)
|
||||
else:
|
||||
LOGGER.error("Registration failed", extra={"status_code": exc.status_code, "error": str(exc)})
|
||||
time.sleep(min(backoff, 60))
|
||||
backoff = min(backoff * 2, 60)
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
LOGGER.exception("Unexpected error during registration", extra={"error": str(exc)})
|
||||
time.sleep(min(backoff, 60))
|
||||
backoff = min(backoff * 2, 60)
|
||||
return None
|
||||
|
||||
|
||||
def _status_loop(
|
||||
client: AgentClient,
|
||||
config: AgentConfig,
|
||||
node_id: str,
|
||||
stop_signal: StopSignal,
|
||||
) -> None:
|
||||
interval = config.report_interval_seconds
|
||||
while not stop_signal.is_set():
|
||||
timestamp = _current_timestamp()
|
||||
health_payload = read_health_directory(config.health_dir)
|
||||
body = {
|
||||
"timestamp": timestamp,
|
||||
"health": health_payload,
|
||||
}
|
||||
try:
|
||||
response = client.update_status(node_id, body)
|
||||
LOGGER.info(
|
||||
"Status report succeeded",
|
||||
extra={"node_id": node_id, "health_keys": list(health_payload.keys())},
|
||||
)
|
||||
save_node_state(config.node_file, response)
|
||||
except MasterAPIError as exc:
|
||||
# 保持循环继续执行,等待下一次重试
|
||||
LOGGER.error(
|
||||
"Failed to report status",
|
||||
extra={"status_code": exc.status_code, "error": str(exc)},
|
||||
)
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
LOGGER.exception("Unexpected error during status report", extra={"error": str(exc)})
|
||||
|
||||
for _ in range(interval):
|
||||
if stop_signal.is_set():
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
LOGGER.info("Stop signal received; exiting status loop")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
44
src/agent/app/state.py
Normal file
44
src/agent/app/state.py
Normal file
@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from .log import get_logger
|
||||
|
||||
LOGGER = get_logger("argus.agent.state")
|
||||
|
||||
|
||||
def load_node_state(path: str) -> Optional[Dict[str, Any]]:
|
||||
"""读取本地 node.json,容器重启后沿用之前的 ID。"""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return json.load(handle)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
except json.JSONDecodeError as exc:
|
||||
LOGGER.warning("node.json is invalid JSON; ignoring", extra={"error": str(exc)})
|
||||
return None
|
||||
|
||||
|
||||
def save_node_state(path: str, data: Dict[str, Any]) -> None:
|
||||
"""原子化写入 node.json,避免并发读取坏数据。"""
|
||||
directory = Path(path).parent
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False, encoding="utf-8") as tmp:
|
||||
json.dump(data, tmp, separators=(",", ":"))
|
||||
tmp.flush()
|
||||
os.fsync(tmp.fileno())
|
||||
temp_path = tmp.name
|
||||
os.replace(temp_path, path)
|
||||
|
||||
|
||||
def clear_node_state(path: str) -> None:
|
||||
try:
|
||||
os.remove(path)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except OSError as exc:
|
||||
LOGGER.warning("Failed to remove node state file", extra={"error": str(exc), "path": path})
|
0
src/agent/images/.gitkeep
Normal file
0
src/agent/images/.gitkeep
Normal file
1
src/agent/requirements.txt
Normal file
1
src/agent/requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
requests==2.31.0
|
39
src/agent/scripts/build_images.sh
Executable file
39
src/agent/scripts/build_images.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--intranet] [--tag <image_tag>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}"
|
||||
BUILD_ARGS=()
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--intranet)
|
||||
INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}"
|
||||
BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}")
|
||||
shift
|
||||
;;
|
||||
--tag)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAG="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
echo "[INFO] Building image $IMAGE_TAG"
|
||||
docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT"
|
||||
echo "[OK] Image $IMAGE_TAG built"
|
39
src/agent/scripts/load_images.sh
Executable file
39
src/agent/scripts/load_images.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--file <tar_path>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
DEFAULT_INPUT="$PROJECT_ROOT/images/argus-agent-dev.tar"
|
||||
IMAGE_TAR="$DEFAULT_INPUT"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--file)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAR="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ ! -f "$IMAGE_TAR" ]]; then
|
||||
echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] Loading image from $IMAGE_TAR"
|
||||
docker image load -i "$IMAGE_TAR"
|
||||
echo "[OK] Image loaded"
|
41
src/agent/scripts/save_images.sh
Executable file
41
src/agent/scripts/save_images.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--tag <image_tag>] [--output <tar_path>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-agent-dev.tar"
|
||||
IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}"
|
||||
OUTPUT_PATH="$DEFAULT_OUTPUT"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--tag)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAG="$2"
|
||||
shift 2
|
||||
;;
|
||||
--output)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
OUTPUT_PATH="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
mkdir -p "$(dirname "$OUTPUT_PATH")"
|
||||
echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH"
|
||||
docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH"
|
||||
echo "[OK] Image saved"
|
2
src/agent/tests/.gitignore
vendored
Normal file
2
src/agent/tests/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
private/
|
||||
tmp/
|
31
src/agent/tests/docker-compose.yml
Normal file
31
src/agent/tests/docker-compose.yml
Normal file
@ -0,0 +1,31 @@
|
||||
services:
|
||||
master:
|
||||
image: argus-master:dev
|
||||
container_name: argus-master-agent-e2e
|
||||
environment:
|
||||
- OFFLINE_THRESHOLD_SECONDS=6
|
||||
- ONLINE_THRESHOLD_SECONDS=2
|
||||
- SCHEDULER_INTERVAL_SECONDS=1
|
||||
ports:
|
||||
- "32300:3000"
|
||||
volumes:
|
||||
- ./private/argus/master:/private/argus/master
|
||||
- ./private/argus/metric/prometheus:/private/argus/metric/prometheus
|
||||
|
||||
agent:
|
||||
image: argus-agent:dev
|
||||
container_name: argus-agent-e2e
|
||||
hostname: dev-e2euser-e2einst-pod-0
|
||||
depends_on:
|
||||
- master
|
||||
volumes:
|
||||
- ./private/argus/agent/dev-e2euser-e2einst-pod-0:/private/argus/agent/dev-e2euser-e2einst-pod-0
|
||||
- ./private/argus/agent/health/dev-e2euser-e2einst-pod-0:/private/argus/agent/health/dev-e2euser-e2einst-pod-0
|
||||
|
||||
networks:
|
||||
default:
|
||||
driver: bridge
|
||||
ipam:
|
||||
driver: default
|
||||
config:
|
||||
- subnet: 172.28.0.0/16
|
22
src/agent/tests/scripts/00_e2e_test.sh
Executable file
22
src/agent/tests/scripts/00_e2e_test.sh
Executable file
@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
SCRIPTS=(
|
||||
"01_bootstrap.sh"
|
||||
"02_up.sh"
|
||||
"03_wait_and_assert_registration.sh"
|
||||
"04_write_health_files.sh"
|
||||
"05_assert_status_on_master.sh"
|
||||
"06_restart_agent_and_reregister.sh"
|
||||
"07_down.sh"
|
||||
)
|
||||
|
||||
for script in "${SCRIPTS[@]}"; do
|
||||
echo "[TEST] Running $script"
|
||||
"$SCRIPT_DIR/$script"
|
||||
echo "[TEST] $script completed"
|
||||
echo
|
||||
done
|
||||
|
||||
echo "[TEST] Agent module E2E tests completed"
|
44
src/agent/tests/scripts/01_bootstrap.sh
Executable file
44
src/agent/tests/scripts/01_bootstrap.sh
Executable file
@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
AGENT_ROOT="$(cd "$TEST_ROOT/.." && pwd)"
|
||||
MASTER_ROOT="$(cd "$AGENT_ROOT/../master" && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
|
||||
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
|
||||
AGENT_CONFIG_DIR="$PRIVATE_ROOT/argus/agent/$AGENT_HOSTNAME"
|
||||
AGENT_HEALTH_DIR="$PRIVATE_ROOT/argus/agent/health/$AGENT_HOSTNAME"
|
||||
MASTER_PRIVATE_DIR="$PRIVATE_ROOT/argus/master"
|
||||
METRIC_PRIVATE_DIR="$PRIVATE_ROOT/argus/metric/prometheus"
|
||||
|
||||
mkdir -p "$AGENT_CONFIG_DIR"
|
||||
mkdir -p "$AGENT_HEALTH_DIR"
|
||||
mkdir -p "$MASTER_PRIVATE_DIR"
|
||||
mkdir -p "$METRIC_PRIVATE_DIR"
|
||||
mkdir -p "$TMP_ROOT"
|
||||
|
||||
cat > "$AGENT_CONFIG_DIR/config" <<CONFIG
|
||||
HOSTNAME=$AGENT_HOSTNAME
|
||||
NODE_FILE=/private/argus/agent/$AGENT_HOSTNAME/node.json
|
||||
VERSION=1.1.0
|
||||
MASTER_ENDPOINT=http://master:3000
|
||||
REPORT_INTERVAL_SECONDS=2
|
||||
SUBMODULE_HEALTH_FILE_DIR=/private/argus/agent/health/{hostname}/
|
||||
GPU_NUMBER=
|
||||
IP_OVERRIDE=
|
||||
CONFIG
|
||||
|
||||
touch "$AGENT_HEALTH_DIR/.keep"
|
||||
|
||||
pushd "$MASTER_ROOT" >/dev/null
|
||||
./scripts/build_images.sh --tag argus-master:dev
|
||||
popd >/dev/null
|
||||
|
||||
pushd "$AGENT_ROOT" >/dev/null
|
||||
./scripts/build_images.sh --tag argus-agent:dev
|
||||
popd >/dev/null
|
||||
|
||||
echo "[INFO] Agent E2E bootstrap complete"
|
24
src/agent/tests/scripts/02_up.sh
Executable file
24
src/agent/tests/scripts/02_up.sh
Executable file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
|
||||
|
||||
docker network rm tests_default >/dev/null 2>&1 || true
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose down --remove-orphans || true
|
||||
compose up -d
|
||||
popd >/dev/null
|
||||
|
||||
echo "[INFO] Master+Agent stack started"
|
65
src/agent/tests/scripts/03_wait_and_assert_registration.sh
Executable file
65
src/agent/tests/scripts/03_wait_and_assert_registration.sh
Executable file
@ -0,0 +1,65 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:32300/api/v1/master"
|
||||
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
|
||||
NODE_FILE="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME/node.json"
|
||||
|
||||
mkdir -p "$TMP_ROOT"
|
||||
|
||||
node_id=""
|
||||
for _ in {1..30}; do
|
||||
sleep 2
|
||||
response=$(curl -sS "$API_BASE/nodes" || true)
|
||||
if [[ -z "$response" ]]; then
|
||||
continue
|
||||
fi
|
||||
list_file="$TMP_ROOT/nodes_list.json"
|
||||
echo "$response" > "$list_file"
|
||||
node_id=$(python3 - "$list_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
nodes = json.load(handle)
|
||||
print(nodes[0]["id"] if nodes else "")
|
||||
PY
|
||||
)
|
||||
if [[ -n "$node_id" ]]; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ -z "$node_id" ]]; then
|
||||
echo "[ERROR] Agent did not register within timeout" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "$node_id" > "$TMP_ROOT/node_id"
|
||||
|
||||
if [[ ! -f "$NODE_FILE" ]]; then
|
||||
echo "[ERROR] node.json not created at $NODE_FILE" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python3 - "$NODE_FILE" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert "id" in node and node["id"], "node.json missing id"
|
||||
PY
|
||||
|
||||
detail_file="$TMP_ROOT/initial_detail.json"
|
||||
curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file"
|
||||
python3 - "$detail_file" "$TMP_ROOT/initial_ip" <<'PY'
|
||||
import json, sys, pathlib
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
ip = node["meta_data"].get("ip")
|
||||
if not ip:
|
||||
raise SystemExit("meta_data.ip missing")
|
||||
pathlib.Path(sys.argv[2]).write_text(ip)
|
||||
PY
|
||||
|
||||
echo "[INFO] Agent registered with node id $node_id"
|
22
src/agent/tests/scripts/04_write_health_files.sh
Executable file
22
src/agent/tests/scripts/04_write_health_files.sh
Executable file
@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/dev-e2euser-e2einst-pod-0"
|
||||
|
||||
cat > "$HEALTH_DIR/log-fluentbit.json" <<JSON
|
||||
{
|
||||
"status": "healthy",
|
||||
"timestamp": "2023-10-05T12:05:00Z"
|
||||
}
|
||||
JSON
|
||||
|
||||
cat > "$HEALTH_DIR/metric-node-exporter.json" <<JSON
|
||||
{
|
||||
"status": "healthy",
|
||||
"timestamp": "2023-10-05T12:05:00Z"
|
||||
}
|
||||
JSON
|
||||
|
||||
echo "[INFO] Health files written"
|
53
src/agent/tests/scripts/05_assert_status_on_master.sh
Executable file
53
src/agent/tests/scripts/05_assert_status_on_master.sh
Executable file
@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:32300/api/v1/master"
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
NODES_JSON="$TEST_ROOT/private/argus/metric/prometheus/nodes.json"
|
||||
|
||||
success=false
|
||||
detail_file="$TMP_ROOT/agent_status_detail.json"
|
||||
for _ in {1..20}; do
|
||||
sleep 2
|
||||
if ! curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"; then
|
||||
continue
|
||||
fi
|
||||
if python3 - "$detail_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
if node["status"] != "online":
|
||||
raise SystemExit(1)
|
||||
health = node.get("health", {})
|
||||
if "log-fluentbit" not in health or "metric-node-exporter" not in health:
|
||||
raise SystemExit(1)
|
||||
PY
|
||||
then
|
||||
success=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ "$success" != true ]]; then
|
||||
echo "[ERROR] Node did not report health data in time" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -f "$NODES_JSON" ]]; then
|
||||
echo "[ERROR] nodes.json missing at $NODES_JSON" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python3 - "$NODES_JSON" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
nodes = json.load(handle)
|
||||
assert len(nodes) == 1, nodes
|
||||
entry = nodes[0]
|
||||
assert entry["node_id"], entry
|
||||
PY
|
||||
|
||||
echo "[INFO] Master reflects agent health and nodes.json entries"
|
108
src/agent/tests/scripts/06_restart_agent_and_reregister.sh
Executable file
108
src/agent/tests/scripts/06_restart_agent_and_reregister.sh
Executable file
@ -0,0 +1,108 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:32300/api/v1/master"
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
|
||||
NETWORK_NAME="tests_default"
|
||||
NEW_AGENT_IP="172.28.0.200"
|
||||
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
before_file="$TMP_ROOT/before_restart.json"
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$before_file"
|
||||
prev_last_updated=$(python3 - "$before_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
print(node.get("last_updated", ""))
|
||||
PY
|
||||
)
|
||||
prev_ip=$(python3 - "$before_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
print(node["meta_data"].get("ip", ""))
|
||||
PY
|
||||
)
|
||||
initial_ip=$(cat "$TMP_ROOT/initial_ip")
|
||||
if [[ "$prev_ip" != "$initial_ip" ]]; then
|
||||
echo "[ERROR] Expected initial IP $initial_ip, got $prev_ip" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose rm -sf agent
|
||||
popd >/dev/null
|
||||
|
||||
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
|
||||
|
||||
AGENT_DIR="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME"
|
||||
HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/$AGENT_HOSTNAME"
|
||||
|
||||
# 先以 sleep 方式启动容器,确保我们掌握注册时的网络状态
|
||||
if ! docker run -d \
|
||||
--name argus-agent-e2e \
|
||||
--hostname "$AGENT_HOSTNAME" \
|
||||
--network "$NETWORK_NAME" \
|
||||
--ip "$NEW_AGENT_IP" \
|
||||
-v "$AGENT_DIR:/private/argus/agent/$AGENT_HOSTNAME" \
|
||||
-v "$HEALTH_DIR:/private/argus/agent/health/$AGENT_HOSTNAME" \
|
||||
argus-agent:dev \
|
||||
sleep 300 >/dev/null; then
|
||||
echo "[ERROR] Failed to start agent container with custom IP" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 在容器内启动真实 agent 进程
|
||||
if ! docker exec -d argus-agent-e2e python -m app.main; then
|
||||
echo "[ERROR] Failed to spawn agent process inside container" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
success=false
|
||||
detail_file="$TMP_ROOT/post_restart.json"
|
||||
for _ in {1..20}; do
|
||||
sleep 3
|
||||
if ! curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"; then
|
||||
continue
|
||||
fi
|
||||
if python3 - "$detail_file" "$prev_last_updated" "$NODE_ID" "$prev_ip" "$NEW_AGENT_IP" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
prev_last_updated = sys.argv[2]
|
||||
expected_id = sys.argv[3]
|
||||
old_ip = sys.argv[4]
|
||||
expected_ip = sys.argv[5]
|
||||
last_updated = node.get("last_updated")
|
||||
current_ip = node["meta_data"].get("ip")
|
||||
assert node["id"] == expected_id
|
||||
if current_ip != expected_ip:
|
||||
raise SystemExit(1)
|
||||
if current_ip == old_ip:
|
||||
raise SystemExit(1)
|
||||
if not last_updated or last_updated == prev_last_updated:
|
||||
raise SystemExit(1)
|
||||
PY
|
||||
then
|
||||
success=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ "$success" != true ]]; then
|
||||
echo "[ERROR] Agent did not report expected new IP $NEW_AGENT_IP after restart" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] Agent restart produced successful re-registration with IP change"
|
24
src/agent/tests/scripts/07_down.sh
Executable file
24
src/agent/tests/scripts/07_down.sh
Executable file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose down --remove-orphans
|
||||
popd >/dev/null
|
||||
|
||||
rm -rf "$TEST_ROOT/private"
|
||||
rm -rf "$TEST_ROOT/tmp"
|
||||
|
||||
echo "[INFO] Agent E2E environment cleaned up"
|
25
src/master/Dockerfile
Normal file
25
src/master/Dockerfile
Normal file
@ -0,0 +1,25 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
|
||||
ARG PIP_INDEX_URL=
|
||||
ENV PIP_NO_CACHE_DIR=1 \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONPATH=/app
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt ./
|
||||
RUN set -euxo pipefail \
|
||||
&& python -m pip install --upgrade pip \
|
||||
&& if [[ -n "$PIP_INDEX_URL" ]]; then \
|
||||
PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \
|
||||
else \
|
||||
python -m pip install -r requirements.txt; \
|
||||
fi
|
||||
|
||||
COPY app ./app
|
||||
|
||||
EXPOSE 3000
|
||||
|
||||
CMD ["gunicorn", "--bind", "0.0.0.0:3000", "app:create_app()"]
|
@ -0,0 +1,48 @@
|
||||
# Argus Master Module
|
||||
|
||||
A lightweight Flask + SQLite service that manages Argus agent nodes. It exposes node registration, status updates, configuration management, statistics, and generates `nodes.json` for the metric module.
|
||||
|
||||
## Build & Run
|
||||
|
||||
```bash
|
||||
cd src/master
|
||||
./scripts/build_images.sh # builds argus-master:dev
|
||||
```
|
||||
|
||||
Local run via Docker compose (test stack):
|
||||
|
||||
```bash
|
||||
cd src/master/tests
|
||||
./scripts/01_up_master.sh
|
||||
```
|
||||
|
||||
Service listens on port `3000` (`31300` when using the provided test compose). Key environment variables:
|
||||
|
||||
- `DB_PATH` (default `/private/argus/master/db.sqlite3`)
|
||||
- `METRIC_NODES_JSON_PATH` (default `/private/argus/metric/prometheus/nodes.json`)
|
||||
- `OFFLINE_THRESHOLD_SECONDS`, `ONLINE_THRESHOLD_SECONDS`, `SCHEDULER_INTERVAL_SECONDS`
|
||||
|
||||
## REST API Summary
|
||||
|
||||
Base path: `/api/v1/master`
|
||||
|
||||
- `GET /nodes` — list node summaries
|
||||
- `GET /nodes/{id}` — node detail
|
||||
- `POST /nodes` — register/re-register
|
||||
- `PUT /nodes/{id}/status` — status report (timestamp + health map)
|
||||
- `PUT /nodes/{id}/config` — update config/labels (partial)
|
||||
- `GET /nodes/statistics` — totals + grouped counts
|
||||
- `GET /healthz`, `GET /readyz` — health checks
|
||||
|
||||
`nodes.json` only contains nodes whose status is `online`.
|
||||
|
||||
## Tests
|
||||
|
||||
End-to-end tests are Docker based:
|
||||
|
||||
```bash
|
||||
cd src/master/tests
|
||||
./scripts/00_e2e_test.sh
|
||||
```
|
||||
|
||||
Scripts create temporary `private/` and `tmp/` directories under `tests/`; they are cleaned automatically by `06_down.sh`.
|
41
src/master/app/__init__.py
Normal file
41
src/master/app/__init__.py
Normal file
@ -0,0 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import logging
|
||||
|
||||
from flask import Flask
|
||||
|
||||
from .config import AppConfig, load_config
|
||||
from .routes import register_routes
|
||||
from .scheduler import StatusScheduler
|
||||
from .storage import Storage
|
||||
|
||||
|
||||
def create_app(config: AppConfig | None = None) -> Flask:
|
||||
app_config = config or load_config()
|
||||
storage = Storage(app_config.db_path, app_config.node_id_prefix)
|
||||
scheduler = StatusScheduler(storage, app_config)
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config["APP_CONFIG"] = app_config
|
||||
app.config["STORAGE"] = storage
|
||||
app.config["SCHEDULER"] = scheduler
|
||||
|
||||
register_routes(app, storage, scheduler, app_config)
|
||||
|
||||
scheduler.start()
|
||||
|
||||
def _cleanup() -> None:
|
||||
logging.getLogger("argus.master").info("Shutting down master app")
|
||||
try:
|
||||
scheduler.stop()
|
||||
except Exception: # pragma: no cover - defensive
|
||||
logging.getLogger("argus.master").exception("Failed to stop scheduler")
|
||||
try:
|
||||
storage.close()
|
||||
except Exception: # pragma: no cover - defensive
|
||||
logging.getLogger("argus.master").exception("Failed to close storage")
|
||||
|
||||
atexit.register(_cleanup)
|
||||
|
||||
return app
|
40
src/master/app/config.py
Normal file
40
src/master/app/config.py
Normal file
@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AppConfig:
|
||||
db_path: str
|
||||
metric_nodes_json_path: str
|
||||
offline_threshold_seconds: int
|
||||
online_threshold_seconds: int
|
||||
scheduler_interval_seconds: int
|
||||
node_id_prefix: str
|
||||
auth_mode: str
|
||||
|
||||
|
||||
def _get_int_env(name: str, default: int) -> int:
|
||||
raw = os.environ.get(name)
|
||||
if raw is None or raw.strip() == "":
|
||||
return default
|
||||
try:
|
||||
return int(raw)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"Environment variable {name} must be an integer, got {raw!r}") from exc
|
||||
|
||||
|
||||
def load_config() -> AppConfig:
|
||||
"""读取环境变量生成配置对象,方便统一管理运行参数。"""
|
||||
return AppConfig(
|
||||
db_path=os.environ.get("DB_PATH", "/private/argus/master/db.sqlite3"),
|
||||
metric_nodes_json_path=os.environ.get(
|
||||
"METRIC_NODES_JSON_PATH", "/private/argus/metric/prometheus/nodes.json"
|
||||
),
|
||||
offline_threshold_seconds=_get_int_env("OFFLINE_THRESHOLD_SECONDS", 180),
|
||||
online_threshold_seconds=_get_int_env("ONLINE_THRESHOLD_SECONDS", 120),
|
||||
scheduler_interval_seconds=_get_int_env("SCHEDULER_INTERVAL_SECONDS", 30),
|
||||
node_id_prefix=os.environ.get("NODE_ID_PREFIX", "A"),
|
||||
auth_mode=os.environ.get("AUTH_MODE", "disabled"),
|
||||
)
|
171
src/master/app/models.py
Normal file
171
src/master/app/models.py
Normal file
@ -0,0 +1,171 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterable, Mapping
|
||||
|
||||
from .util import parse_iso
|
||||
|
||||
|
||||
class ValidationError(Exception):
|
||||
"""Raised when user payload fails validation."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Node:
|
||||
id: str
|
||||
name: str
|
||||
type: str
|
||||
version: str | None
|
||||
status: str
|
||||
config: Dict[str, Any]
|
||||
labels: Iterable[str]
|
||||
meta_data: Dict[str, Any]
|
||||
health: Dict[str, Any]
|
||||
register_time: str | None
|
||||
last_report: str | None
|
||||
agent_last_report: str | None
|
||||
last_updated: str | None
|
||||
|
||||
|
||||
def serialize_node_row(row: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
def _json_or_default(value: str | None, default: Any) -> Any:
|
||||
if value is None or value == "":
|
||||
return default
|
||||
try:
|
||||
return json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
return default
|
||||
|
||||
config = _json_or_default(row["config_json"], {})
|
||||
labels = _json_or_default(row["labels_json"], [])
|
||||
meta = _json_or_default(row["meta_json"], {})
|
||||
health = _json_or_default(row["health_json"], {})
|
||||
return {
|
||||
"id": row["id"],
|
||||
"name": row["name"],
|
||||
"type": row["type"],
|
||||
"version": row["version"],
|
||||
"status": row["status"],
|
||||
"config": config if isinstance(config, dict) else {},
|
||||
"label": list(labels) if isinstance(labels, list) else [],
|
||||
"meta_data": meta if isinstance(meta, dict) else {},
|
||||
"health": health if isinstance(health, dict) else {},
|
||||
"register_time": row["register_time"],
|
||||
"last_report": row["last_report"],
|
||||
"agent_last_report": row["agent_last_report"],
|
||||
"last_updated": row["last_updated"],
|
||||
}
|
||||
|
||||
|
||||
def serialize_node_summary(row: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
return {
|
||||
"id": row["id"],
|
||||
"name": row["name"],
|
||||
"status": row["status"],
|
||||
"type": row["type"],
|
||||
"version": row["version"],
|
||||
}
|
||||
|
||||
|
||||
def validate_registration_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
if not isinstance(payload, Mapping):
|
||||
raise ValidationError("Request body must be a JSON object")
|
||||
|
||||
name = payload.get("name")
|
||||
if not isinstance(name, str) or not name.strip():
|
||||
raise ValidationError("Field 'name' is required and must be a non-empty string")
|
||||
|
||||
node_type = payload.get("type", "agent")
|
||||
if not isinstance(node_type, str) or not node_type:
|
||||
raise ValidationError("Field 'type' must be a string")
|
||||
|
||||
version = payload.get("version")
|
||||
if version is not None and not isinstance(version, str):
|
||||
raise ValidationError("Field 'version' must be a string if provided")
|
||||
|
||||
meta = payload.get("meta_data")
|
||||
if not isinstance(meta, Mapping):
|
||||
raise ValidationError("Field 'meta_data' must be an object")
|
||||
|
||||
required_meta = ["hostname", "ip", "env", "user", "instance", "cpu_number", "memory_in_bytes", "gpu_number"]
|
||||
for key in required_meta:
|
||||
if key not in meta:
|
||||
raise ValidationError(f"meta_data.{key} is required")
|
||||
|
||||
cpu_number = meta["cpu_number"]
|
||||
memory_in_bytes = meta["memory_in_bytes"]
|
||||
gpu_number = meta["gpu_number"]
|
||||
if not isinstance(cpu_number, int) or cpu_number < 0:
|
||||
raise ValidationError("meta_data.cpu_number must be a non-negative integer")
|
||||
if not isinstance(memory_in_bytes, int) or memory_in_bytes < 0:
|
||||
raise ValidationError("meta_data.memory_in_bytes must be a non-negative integer")
|
||||
if not isinstance(gpu_number, int) or gpu_number < 0:
|
||||
raise ValidationError("meta_data.gpu_number must be a non-negative integer")
|
||||
|
||||
node_id = payload.get("id")
|
||||
if node_id is not None and (not isinstance(node_id, str) or not node_id.strip()):
|
||||
raise ValidationError("Field 'id' must be a non-empty string when provided")
|
||||
|
||||
return {
|
||||
"id": node_id,
|
||||
"name": name,
|
||||
"type": node_type,
|
||||
"version": version,
|
||||
"meta_data": dict(meta),
|
||||
}
|
||||
|
||||
|
||||
def validate_status_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
if not isinstance(payload, Mapping):
|
||||
raise ValidationError("Request body must be a JSON object")
|
||||
|
||||
timestamp = payload.get("timestamp")
|
||||
if not isinstance(timestamp, str) or not timestamp:
|
||||
raise ValidationError("Field 'timestamp' is required and must be a string")
|
||||
|
||||
parsed = parse_iso(timestamp)
|
||||
if parsed is None:
|
||||
raise ValidationError("Field 'timestamp' must be an ISO8601 datetime string")
|
||||
|
||||
health = payload.get("health", {})
|
||||
if not isinstance(health, Mapping):
|
||||
raise ValidationError("Field 'health' must be an object if provided")
|
||||
|
||||
sanitized_health: Dict[str, Any] = {}
|
||||
for key, value in health.items():
|
||||
if not isinstance(key, str):
|
||||
raise ValidationError("Keys in 'health' must be strings")
|
||||
if not isinstance(value, (Mapping, list, str, int, float, bool)) and value is not None:
|
||||
raise ValidationError("Values in 'health' must be JSON-compatible")
|
||||
sanitized_health[key] = value
|
||||
|
||||
return {
|
||||
"timestamp": timestamp,
|
||||
"parsed_timestamp": parsed,
|
||||
"health": sanitized_health,
|
||||
}
|
||||
|
||||
|
||||
def validate_config_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
if not isinstance(payload, Mapping):
|
||||
raise ValidationError("Request body must be a JSON object")
|
||||
|
||||
result: Dict[str, Any] = {}
|
||||
if "config" in payload:
|
||||
config = payload["config"]
|
||||
if not isinstance(config, Mapping):
|
||||
raise ValidationError("Field 'config' must be an object")
|
||||
result["config"] = dict(config)
|
||||
|
||||
if "label" in payload:
|
||||
labels = payload["label"]
|
||||
if not isinstance(labels, list) or not all(isinstance(item, str) for item in labels):
|
||||
raise ValidationError("Field 'label' must be an array of strings")
|
||||
result["label"] = list(labels)
|
||||
|
||||
if not result:
|
||||
raise ValidationError("At least one of 'config' or 'label' must be provided")
|
||||
|
||||
return result
|
||||
|
155
src/master/app/nodes_api.py
Normal file
155
src/master/app/nodes_api.py
Normal file
@ -0,0 +1,155 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Mapping
|
||||
|
||||
from flask import Blueprint, jsonify, request
|
||||
|
||||
from .models import (
|
||||
ValidationError,
|
||||
validate_config_payload,
|
||||
validate_registration_payload,
|
||||
validate_status_payload,
|
||||
)
|
||||
from .scheduler import StatusScheduler
|
||||
from .storage import Storage
|
||||
from .util import to_iso, utcnow
|
||||
|
||||
|
||||
def create_nodes_blueprint(storage: Storage, scheduler: StatusScheduler) -> Blueprint:
|
||||
bp = Blueprint("nodes", __name__)
|
||||
logger = logging.getLogger("argus.master.api")
|
||||
|
||||
def _json_error(message: str, status: HTTPStatus, code: str) -> Any:
|
||||
response = jsonify({"error": message, "code": code})
|
||||
response.status_code = status
|
||||
return response
|
||||
|
||||
@bp.errorhandler(ValidationError)
|
||||
def _handle_validation_error(err: ValidationError):
|
||||
return _json_error(str(err), HTTPStatus.BAD_REQUEST, "invalid_request")
|
||||
|
||||
@bp.get("/nodes")
|
||||
def list_nodes():
|
||||
nodes = storage.list_nodes()
|
||||
return jsonify(nodes)
|
||||
|
||||
@bp.get("/nodes/<node_id>")
|
||||
def get_node(node_id: str):
|
||||
node = storage.get_node(node_id)
|
||||
if node is None:
|
||||
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
|
||||
return jsonify(node)
|
||||
|
||||
@bp.post("/nodes")
|
||||
def register_node():
|
||||
payload = _get_json()
|
||||
data = validate_registration_payload(payload)
|
||||
now = utcnow()
|
||||
now_iso = to_iso(now)
|
||||
node_id = data["id"]
|
||||
name = data["name"]
|
||||
node_type = data["type"]
|
||||
version = data["version"]
|
||||
meta = data["meta_data"]
|
||||
|
||||
if node_id:
|
||||
# 携带 id 说明是重注册,需要校验名称一致性
|
||||
existing_row = storage.get_node_raw(node_id)
|
||||
if existing_row is None:
|
||||
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
|
||||
if existing_row["name"] != name:
|
||||
return _json_error(
|
||||
"Node id and name mismatch during re-registration",
|
||||
HTTPStatus.INTERNAL_SERVER_ERROR,
|
||||
"id_name_mismatch",
|
||||
)
|
||||
updated = storage.update_node_meta(
|
||||
node_id,
|
||||
node_type=node_type,
|
||||
version=version,
|
||||
meta_data=meta,
|
||||
last_updated_iso=now_iso,
|
||||
)
|
||||
scheduler.trigger_nodes_json_refresh()
|
||||
return jsonify(updated), HTTPStatus.OK
|
||||
|
||||
# No id provided → search by name
|
||||
existing_by_name = storage.get_node_by_name(name)
|
||||
if existing_by_name:
|
||||
# 同名节点已存在,视为无 id 重注册
|
||||
updated = storage.update_node_meta(
|
||||
existing_by_name["id"],
|
||||
node_type=node_type,
|
||||
version=version,
|
||||
meta_data=meta,
|
||||
last_updated_iso=now_iso,
|
||||
)
|
||||
scheduler.trigger_nodes_json_refresh()
|
||||
return jsonify(updated), HTTPStatus.OK
|
||||
|
||||
new_id = storage.allocate_node_id()
|
||||
created = storage.create_node(
|
||||
new_id,
|
||||
name,
|
||||
node_type,
|
||||
version,
|
||||
meta,
|
||||
status="initialized",
|
||||
register_time_iso=now_iso,
|
||||
last_updated_iso=now_iso,
|
||||
)
|
||||
scheduler.trigger_nodes_json_refresh()
|
||||
return jsonify(created), HTTPStatus.CREATED
|
||||
|
||||
@bp.put("/nodes/<node_id>/config")
|
||||
def update_node_config(node_id: str):
|
||||
payload = _get_json()
|
||||
updates = validate_config_payload(payload)
|
||||
try:
|
||||
updated = storage.update_config_and_labels(
|
||||
node_id,
|
||||
config=updates.get("config"),
|
||||
labels=updates.get("label"),
|
||||
)
|
||||
except KeyError:
|
||||
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
|
||||
|
||||
if "label" in updates:
|
||||
scheduler.trigger_nodes_json_refresh()
|
||||
return jsonify(updated)
|
||||
|
||||
@bp.get("/nodes/statistics")
|
||||
def node_statistics():
|
||||
stats = storage.get_statistics()
|
||||
return jsonify(stats)
|
||||
|
||||
@bp.put("/nodes/<node_id>/status")
|
||||
def update_status(node_id: str):
|
||||
payload = _get_json()
|
||||
data = validate_status_payload(payload)
|
||||
try:
|
||||
# master 负责写入 last_report,状态由调度器计算
|
||||
updated = storage.update_last_report(
|
||||
node_id,
|
||||
server_timestamp_iso=to_iso(utcnow()),
|
||||
agent_timestamp_iso=data["timestamp"],
|
||||
health=data["health"],
|
||||
)
|
||||
except KeyError:
|
||||
return _json_error("Node not found", HTTPStatus.NOT_FOUND, "not_found")
|
||||
|
||||
scheduler.trigger_nodes_json_refresh()
|
||||
return jsonify(updated)
|
||||
|
||||
return bp
|
||||
|
||||
|
||||
def _get_json() -> Mapping[str, Any]:
|
||||
data = request.get_json(silent=True)
|
||||
if data is None:
|
||||
raise ValidationError("Request body must be valid JSON")
|
||||
if not isinstance(data, Mapping):
|
||||
raise ValidationError("Request body must be a JSON object")
|
||||
return data
|
24
src/master/app/routes.py
Normal file
24
src/master/app/routes.py
Normal file
@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from flask import Flask, jsonify
|
||||
|
||||
from .config import AppConfig
|
||||
from .nodes_api import create_nodes_blueprint
|
||||
from .scheduler import StatusScheduler
|
||||
from .storage import Storage
|
||||
|
||||
|
||||
def register_routes(app: Flask, storage: Storage, scheduler: StatusScheduler, config: AppConfig) -> None:
|
||||
app.register_blueprint(create_nodes_blueprint(storage, scheduler), url_prefix="/api/v1/master")
|
||||
|
||||
@app.get("/healthz")
|
||||
def healthz():
|
||||
return jsonify({"status": "ok"})
|
||||
|
||||
@app.get("/readyz")
|
||||
def readyz():
|
||||
try:
|
||||
storage.list_nodes() # simple readiness probe
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
return jsonify({"status": "error", "error": str(exc)}), 500
|
||||
return jsonify({"status": "ok"})
|
90
src/master/app/scheduler.py
Normal file
90
src/master/app/scheduler.py
Normal file
@ -0,0 +1,90 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from .config import AppConfig
|
||||
from .storage import Storage
|
||||
from .util import atomic_write_json, parse_iso, to_iso, utcnow
|
||||
|
||||
|
||||
class StatusScheduler:
|
||||
def __init__(self, storage: Storage, config: AppConfig, logger: Optional[logging.Logger] = None) -> None:
|
||||
self._storage = storage
|
||||
self._config = config
|
||||
self._logger = logger or logging.getLogger("argus.master.scheduler")
|
||||
self._stop_event = threading.Event()
|
||||
self._thread = threading.Thread(target=self._run, name="status-scheduler", daemon=True)
|
||||
self._nodes_json_lock = threading.Lock()
|
||||
self._pending_nodes_json = threading.Event()
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动后台线程,定期刷新节点状态与 nodes.json。"""
|
||||
if not self._thread.is_alive():
|
||||
self._logger.info("Starting scheduler thread")
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop_event.set()
|
||||
self._pending_nodes_json.set()
|
||||
self._thread.join(timeout=5)
|
||||
|
||||
def trigger_nodes_json_refresh(self) -> None:
|
||||
self._pending_nodes_json.set()
|
||||
|
||||
def generate_nodes_json(self) -> None:
|
||||
with self._nodes_json_lock:
|
||||
online_nodes = self._storage.get_online_nodes()
|
||||
atomic_write_json(self._config.metric_nodes_json_path, online_nodes)
|
||||
self._logger.info("nodes.json updated", extra={"count": len(online_nodes)})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# internal loop
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run(self) -> None:
|
||||
# 确保启动时 nodes.json 会立即生成
|
||||
self._pending_nodes_json.set()
|
||||
while not self._stop_event.is_set():
|
||||
changed = self._reconcile_statuses()
|
||||
if changed or self._pending_nodes_json.is_set():
|
||||
try:
|
||||
self.generate_nodes_json()
|
||||
finally:
|
||||
self._pending_nodes_json.clear()
|
||||
self._stop_event.wait(self._config.scheduler_interval_seconds)
|
||||
|
||||
def _reconcile_statuses(self) -> bool:
|
||||
"""根据 last_report 与当前时间对比,决定是否切换状态。"""
|
||||
any_status_changed = False
|
||||
now = utcnow()
|
||||
rows = self._storage.fetch_nodes_for_scheduler()
|
||||
for row in rows:
|
||||
node_id = row["id"]
|
||||
last_report_iso = row["last_report"]
|
||||
current_status = row["status"]
|
||||
last_report_dt = parse_iso(last_report_iso)
|
||||
if last_report_dt is None:
|
||||
# No report yet; treat as initialized until report arrives
|
||||
continue
|
||||
delta_seconds = (now - last_report_dt).total_seconds()
|
||||
new_status = current_status
|
||||
if delta_seconds > self._config.offline_threshold_seconds:
|
||||
new_status = "offline"
|
||||
elif delta_seconds <= self._config.online_threshold_seconds:
|
||||
new_status = "online"
|
||||
# Between thresholds: keep current status (sticky)
|
||||
if new_status != current_status:
|
||||
any_status_changed = True
|
||||
self._logger.info(
|
||||
"Updating node status",
|
||||
extra={
|
||||
"node_id": node_id,
|
||||
"previous": current_status,
|
||||
"new": new_status,
|
||||
"delta_seconds": delta_seconds,
|
||||
},
|
||||
)
|
||||
self._storage.update_status(node_id, new_status, last_updated_iso=to_iso(now))
|
||||
return any_status_changed
|
332
src/master/app/storage.py
Normal file
332
src/master/app/storage.py
Normal file
@ -0,0 +1,332 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import threading
|
||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
|
||||
|
||||
from .models import serialize_node_row, serialize_node_summary
|
||||
from .util import ensure_parent, to_iso, utcnow
|
||||
|
||||
|
||||
class Storage:
|
||||
def __init__(self, db_path: str, node_id_prefix: str) -> None:
|
||||
self._db_path = db_path
|
||||
self._node_id_prefix = node_id_prefix
|
||||
ensure_parent(db_path)
|
||||
self._lock = threading.Lock()
|
||||
self._conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
with self._lock:
|
||||
self._conn.execute("PRAGMA foreign_keys = ON;")
|
||||
self._ensure_schema()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# schema & helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _ensure_schema(self) -> None:
|
||||
"""初始化表结构,确保服务启动时数据库结构就绪。"""
|
||||
with self._lock:
|
||||
self._conn.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS nodes (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
type TEXT NOT NULL,
|
||||
version TEXT,
|
||||
status TEXT NOT NULL,
|
||||
config_json TEXT,
|
||||
labels_json TEXT,
|
||||
meta_json TEXT,
|
||||
health_json TEXT,
|
||||
register_time TEXT,
|
||||
last_report TEXT,
|
||||
agent_last_report TEXT,
|
||||
last_updated TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS kv (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_status ON nodes(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_name ON nodes(name);
|
||||
"""
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def close(self) -> None:
|
||||
with self._lock:
|
||||
self._conn.close()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Node ID allocation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def allocate_node_id(self) -> str:
|
||||
"""在 kv 表里维护自增序列,为新节点生成形如 A1 的 ID。"""
|
||||
with self._lock:
|
||||
cur = self._conn.execute("SELECT value FROM kv WHERE key = ?", ("node_id_seq",))
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
next_id = 1
|
||||
self._conn.execute("INSERT INTO kv(key, value) VALUES(?, ?)", ("node_id_seq", str(next_id)))
|
||||
else:
|
||||
next_id = int(row["value"]) + 1
|
||||
self._conn.execute("UPDATE kv SET value = ? WHERE key = ?", (str(next_id), "node_id_seq"))
|
||||
self._conn.commit()
|
||||
return f"{self._node_id_prefix}{next_id}"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Query helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_nodes(self) -> List[Dict[str, Any]]:
|
||||
with self._lock:
|
||||
cur = self._conn.execute(
|
||||
"SELECT id, name, status, type, version FROM nodes ORDER BY id ASC"
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
return [serialize_node_summary(row) for row in rows]
|
||||
|
||||
def get_node(self, node_id: str) -> Optional[Dict[str, Any]]:
|
||||
with self._lock:
|
||||
cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,))
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return serialize_node_row(row)
|
||||
|
||||
def get_node_raw(self, node_id: str) -> Optional[sqlite3.Row]:
|
||||
with self._lock:
|
||||
cur = self._conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,))
|
||||
row = cur.fetchone()
|
||||
return row
|
||||
|
||||
def get_node_by_name(self, name: str) -> Optional[Dict[str, Any]]:
|
||||
with self._lock:
|
||||
cur = self._conn.execute("SELECT * FROM nodes WHERE name = ?", (name,))
|
||||
row = cur.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return serialize_node_row(row)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Mutation helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def create_node(
|
||||
self,
|
||||
node_id: str,
|
||||
name: str,
|
||||
node_type: str,
|
||||
version: str | None,
|
||||
meta_data: Mapping[str, Any],
|
||||
status: str,
|
||||
register_time_iso: str,
|
||||
last_updated_iso: str,
|
||||
) -> Dict[str, Any]:
|
||||
"""插入节点初始记录,默认 config/label/health 为空。"""
|
||||
now_iso = last_updated_iso
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"""
|
||||
INSERT INTO nodes (
|
||||
id, name, type, version, status, config_json, labels_json, meta_json,
|
||||
health_json, register_time, last_report, agent_last_report, last_updated
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
node_id,
|
||||
name,
|
||||
node_type,
|
||||
version,
|
||||
status,
|
||||
json.dumps({}),
|
||||
json.dumps([]),
|
||||
json.dumps(dict(meta_data)),
|
||||
json.dumps({}),
|
||||
register_time_iso,
|
||||
None,
|
||||
None,
|
||||
now_iso,
|
||||
),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
created = self.get_node(node_id)
|
||||
if created is None:
|
||||
raise RuntimeError("Failed to read back created node")
|
||||
return created
|
||||
|
||||
def update_node_meta(
|
||||
self,
|
||||
node_id: str,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
node_type: Optional[str] = None,
|
||||
version: Optional[str | None] = None,
|
||||
meta_data: Optional[Mapping[str, Any]] = None,
|
||||
last_updated_iso: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""重注册时更新节点静态信息,缺省字段保持不变。"""
|
||||
updates: List[str] = []
|
||||
params: List[Any] = []
|
||||
if name is not None:
|
||||
updates.append("name = ?")
|
||||
params.append(name)
|
||||
if node_type is not None:
|
||||
updates.append("type = ?")
|
||||
params.append(node_type)
|
||||
if version is not None:
|
||||
updates.append("version = ?")
|
||||
params.append(version)
|
||||
if meta_data is not None:
|
||||
updates.append("meta_json = ?")
|
||||
params.append(json.dumps(dict(meta_data)))
|
||||
if last_updated_iso is not None:
|
||||
updates.append("last_updated = ?")
|
||||
params.append(last_updated_iso)
|
||||
|
||||
if not updates:
|
||||
result = self.get_node(node_id)
|
||||
if result is None:
|
||||
raise KeyError(node_id)
|
||||
return result
|
||||
|
||||
params.append(node_id)
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?",
|
||||
tuple(params),
|
||||
)
|
||||
self._conn.commit()
|
||||
updated = self.get_node(node_id)
|
||||
if updated is None:
|
||||
raise KeyError(node_id)
|
||||
return updated
|
||||
|
||||
def update_config_and_labels(
|
||||
self, node_id: str, *, config: Optional[Mapping[str, Any]] = None, labels: Optional[Iterable[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""部分更新 config/label,并刷新 last_updated 时间戳。"""
|
||||
updates: List[str] = []
|
||||
params: List[Any] = []
|
||||
if config is not None:
|
||||
updates.append("config_json = ?")
|
||||
params.append(json.dumps(dict(config)))
|
||||
if labels is not None:
|
||||
updates.append("labels_json = ?")
|
||||
params.append(json.dumps(list(labels)))
|
||||
updates.append("last_updated = ?")
|
||||
params.append(to_iso(utcnow()))
|
||||
params.append(node_id)
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
f"UPDATE nodes SET {', '.join(updates)} WHERE id = ?",
|
||||
tuple(params),
|
||||
)
|
||||
if self._conn.total_changes == 0:
|
||||
self._conn.rollback()
|
||||
raise KeyError(node_id)
|
||||
self._conn.commit()
|
||||
updated = self.get_node(node_id)
|
||||
if updated is None:
|
||||
raise KeyError(node_id)
|
||||
return updated
|
||||
|
||||
def update_last_report(
|
||||
self,
|
||||
node_id: str,
|
||||
*,
|
||||
server_timestamp_iso: str,
|
||||
agent_timestamp_iso: str,
|
||||
health: Mapping[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""记录最新上报时间和健康信息,用于后续状态计算。"""
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE nodes
|
||||
SET last_report = ?,
|
||||
agent_last_report = ?,
|
||||
health_json = ?,
|
||||
last_updated = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(
|
||||
server_timestamp_iso,
|
||||
agent_timestamp_iso,
|
||||
json.dumps(health),
|
||||
server_timestamp_iso,
|
||||
node_id,
|
||||
),
|
||||
)
|
||||
if self._conn.total_changes == 0:
|
||||
self._conn.rollback()
|
||||
raise KeyError(node_id)
|
||||
self._conn.commit()
|
||||
updated = self.get_node(node_id)
|
||||
if updated is None:
|
||||
raise KeyError(node_id)
|
||||
return updated
|
||||
|
||||
def update_status(self, node_id: str, status: str, *, last_updated_iso: str) -> None:
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"UPDATE nodes SET status = ?, last_updated = ? WHERE id = ?",
|
||||
(status, last_updated_iso, node_id),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Reporting helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""统计节点总数及按状态聚合的数量。"""
|
||||
with self._lock:
|
||||
cur = self._conn.execute("SELECT COUNT(*) AS total FROM nodes")
|
||||
total_row = cur.fetchone()
|
||||
cur = self._conn.execute("SELECT status, COUNT(*) AS count FROM nodes GROUP BY status")
|
||||
status_rows = cur.fetchall()
|
||||
return {
|
||||
"total": total_row["total"] if total_row else 0,
|
||||
"status_statistics": [
|
||||
{"status": row["status"], "count": row["count"]}
|
||||
for row in status_rows
|
||||
],
|
||||
}
|
||||
|
||||
def fetch_nodes_for_scheduler(self) -> List[sqlite3.Row]:
|
||||
with self._lock:
|
||||
cur = self._conn.execute(
|
||||
"SELECT id, last_report, status FROM nodes"
|
||||
)
|
||||
return cur.fetchall()
|
||||
|
||||
def get_online_nodes(self) -> List[Dict[str, Any]]:
|
||||
"""返回在线节点列表,用于生成 nodes.json。"""
|
||||
with self._lock:
|
||||
cur = self._conn.execute(
|
||||
"SELECT id, meta_json, labels_json, name FROM nodes WHERE status = ? ORDER BY id ASC",
|
||||
("online",),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
|
||||
result: List[Dict[str, Any]] = []
|
||||
for row in rows:
|
||||
meta = json.loads(row["meta_json"]) if row["meta_json"] else {}
|
||||
labels = json.loads(row["labels_json"]) if row["labels_json"] else []
|
||||
result.append(
|
||||
{
|
||||
"node_id": row["id"],
|
||||
"user_id": meta.get("user"),
|
||||
"ip": meta.get("ip"),
|
||||
"hostname": meta.get("hostname", row["name"]),
|
||||
"labels": labels if isinstance(labels, list) else [],
|
||||
}
|
||||
)
|
||||
return result
|
51
src/master/app/util.py
Normal file
51
src/master/app/util.py
Normal file
@ -0,0 +1,51 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
|
||||
ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
||||
|
||||
|
||||
def utcnow() -> datetime:
|
||||
"""获取当前 UTC 时间,统一时间基准。"""
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def to_iso(dt: datetime | None) -> str | None:
|
||||
if dt is None:
|
||||
return None
|
||||
return dt.astimezone(timezone.utc).replace(microsecond=0).strftime(ISO_FORMAT)
|
||||
|
||||
|
||||
def parse_iso(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
if value.endswith("Z"):
|
||||
return datetime.strptime(value, ISO_FORMAT).replace(tzinfo=timezone.utc)
|
||||
# Fallback for ISO strings with offset
|
||||
return datetime.fromisoformat(value).astimezone(timezone.utc)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def ensure_parent(path: str) -> None:
|
||||
"""确保目标文件所在目录存在。"""
|
||||
Path(path).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def atomic_write_json(path: str, data: Iterable[Any] | Any) -> None:
|
||||
"""原子化写 JSON,避免被其它进程读到半成品。"""
|
||||
ensure_parent(path)
|
||||
directory = Path(path).parent
|
||||
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as tmp:
|
||||
json.dump(data, tmp, separators=(",", ":"))
|
||||
tmp.flush()
|
||||
os.fsync(tmp.fileno())
|
||||
temp_path = tmp.name
|
||||
os.replace(temp_path, path)
|
0
src/master/images/.gitkeep
Normal file
0
src/master/images/.gitkeep
Normal file
2
src/master/requirements.txt
Normal file
2
src/master/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
Flask==2.3.3
|
||||
gunicorn==21.2.0
|
39
src/master/scripts/build_images.sh
Executable file
39
src/master/scripts/build_images.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--intranet] [--tag <image_tag>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}"
|
||||
BUILD_ARGS=()
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--intranet)
|
||||
INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}"
|
||||
BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}")
|
||||
shift
|
||||
;;
|
||||
--tag)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAG="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
echo "[INFO] Building image $IMAGE_TAG"
|
||||
docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT"
|
||||
echo "[OK] Image $IMAGE_TAG built"
|
39
src/master/scripts/load_images.sh
Executable file
39
src/master/scripts/load_images.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--file <tar_path>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
DEFAULT_INPUT="$PROJECT_ROOT/images/argus-master-dev.tar"
|
||||
IMAGE_TAR="$DEFAULT_INPUT"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--file)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAR="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ ! -f "$IMAGE_TAR" ]]; then
|
||||
echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] Loading image from $IMAGE_TAR"
|
||||
docker image load -i "$IMAGE_TAR"
|
||||
echo "[OK] Image loaded"
|
41
src/master/scripts/save_images.sh
Executable file
41
src/master/scripts/save_images.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [--tag <image_tag>] [--output <tar_path>]" >&2
|
||||
}
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-master-dev.tar"
|
||||
IMAGE_TAG="${IMAGE_TAG:-argus-master:dev}"
|
||||
OUTPUT_PATH="$DEFAULT_OUTPUT"
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--tag)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
IMAGE_TAG="$2"
|
||||
shift 2
|
||||
;;
|
||||
--output)
|
||||
[[ $# -ge 2 ]] || { usage; exit 1; }
|
||||
OUTPUT_PATH="$2"
|
||||
shift 2
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
*)
|
||||
echo "Unknown option: $1" >&2
|
||||
usage
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
mkdir -p "$(dirname "$OUTPUT_PATH")"
|
||||
echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH"
|
||||
docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH"
|
||||
echo "[OK] Image saved"
|
2
src/master/tests/.gitignore
vendored
Normal file
2
src/master/tests/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
private/
|
||||
tmp/
|
18
src/master/tests/docker-compose.yml
Normal file
18
src/master/tests/docker-compose.yml
Normal file
@ -0,0 +1,18 @@
|
||||
services:
|
||||
master:
|
||||
image: argus-master:dev
|
||||
container_name: argus-master-e2e
|
||||
environment:
|
||||
- OFFLINE_THRESHOLD_SECONDS=6
|
||||
- ONLINE_THRESHOLD_SECONDS=2
|
||||
- SCHEDULER_INTERVAL_SECONDS=1
|
||||
ports:
|
||||
- "31300:3000"
|
||||
volumes:
|
||||
- ./private/argus/master:/private/argus/master
|
||||
- ./private/argus/metric/prometheus:/private/argus/metric/prometheus
|
||||
restart: unless-stopped
|
||||
|
||||
networks:
|
||||
default:
|
||||
driver: bridge
|
25
src/master/tests/scripts/00_e2e_test.sh
Executable file
25
src/master/tests/scripts/00_e2e_test.sh
Executable file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
SCRIPTS=(
|
||||
"01_up_master.sh"
|
||||
"02_verify_ready_and_nodes_json.sh"
|
||||
"03_register_via_curl.sh"
|
||||
"04_reregister_and_error_cases.sh"
|
||||
"05_status_report_via_curl.sh"
|
||||
"06_config_update_and_nodes_json.sh"
|
||||
"07_stats_single_node.sh"
|
||||
"08_multi_node_stats.sh"
|
||||
"09_restart_persistence.sh"
|
||||
"10_down.sh"
|
||||
)
|
||||
|
||||
for script in "${SCRIPTS[@]}"; do
|
||||
echo "[TEST] Running $script"
|
||||
"$SCRIPT_DIR/$script"
|
||||
echo "[TEST] $script completed"
|
||||
echo
|
||||
done
|
||||
|
||||
echo "[TEST] Master module E2E tests completed"
|
41
src/master/tests/scripts/01_up_master.sh
Executable file
41
src/master/tests/scripts/01_up_master.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
MODULE_ROOT="$(cd "$TEST_ROOT/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
|
||||
mkdir -p "$PRIVATE_ROOT/argus/master"
|
||||
mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus"
|
||||
mkdir -p "$TMP_ROOT"
|
||||
|
||||
# 确保上一次运行留下的容器/数据被清理
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose down --remove-orphans || true
|
||||
popd >/dev/null
|
||||
|
||||
rm -rf "$TMP_ROOT" "$PRIVATE_ROOT"
|
||||
mkdir -p "$PRIVATE_ROOT/argus/master"
|
||||
mkdir -p "$PRIVATE_ROOT/argus/metric/prometheus"
|
||||
mkdir -p "$TMP_ROOT"
|
||||
|
||||
pushd "$MODULE_ROOT" >/dev/null
|
||||
./scripts/build_images.sh --tag argus-master:dev
|
||||
popd >/dev/null
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose down --remove-orphans || true
|
||||
compose up -d
|
||||
popd >/dev/null
|
||||
|
||||
echo "[INFO] Master container is up on http://localhost:31300"
|
51
src/master/tests/scripts/02_verify_ready_and_nodes_json.sh
Executable file
51
src/master/tests/scripts/02_verify_ready_and_nodes_json.sh
Executable file
@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
API_BASE="http://localhost:31300"
|
||||
NODES_JSON_PATH="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
|
||||
|
||||
# 等待 readyz 返回 200,确保数据库初始化完成
|
||||
for _ in {1..30}; do
|
||||
status=$(curl -s -o /dev/null -w '%{http_code}' "$API_BASE/readyz" || true)
|
||||
if [[ "$status" == "200" ]]; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if [[ "${status:-}" != "200" ]]; then
|
||||
echo "[ERROR] /readyz 未在预期时间内返回 200,实际=$status" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] /readyz 已通过,就绪检查成功"
|
||||
|
||||
# scheduler 启动时会产生空的 nodes.json,这里等待文件出现并校验内容
|
||||
for _ in {1..30}; do
|
||||
if [[ -f "$NODES_JSON_PATH" ]]; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if [[ ! -f "$NODES_JSON_PATH" ]]; then
|
||||
echo "[ERROR] 未在预期时间内生成 $NODES_JSON_PATH" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! python3 - "$NODES_JSON_PATH" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
data = json.load(handle)
|
||||
if data != []:
|
||||
raise SystemExit(f"nodes.json initial content should be [], got {data}")
|
||||
PY
|
||||
then
|
||||
echo "[ERROR] nodes.json 初始内容不是空数组" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] nodes.json 初始状态校验通过"
|
68
src/master/tests/scripts/03_register_via_curl.sh
Executable file
68
src/master/tests/scripts/03_register_via_curl.sh
Executable file
@ -0,0 +1,68 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
|
||||
mkdir -p "$TMP_ROOT"
|
||||
|
||||
for _ in {1..30}; do
|
||||
if curl -sf "$API_BASE/healthz" >/dev/null; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
payload=$(cat <<'JSON'
|
||||
{
|
||||
"name": "dev-testuser-testinst-pod-0",
|
||||
"type": "agent",
|
||||
"meta_data": {
|
||||
"hostname": "dev-testuser-testinst-pod-0",
|
||||
"ip": "10.0.0.10",
|
||||
"env": "dev",
|
||||
"user": "testuser",
|
||||
"instance": "testinst",
|
||||
"cpu_number": 4,
|
||||
"memory_in_bytes": 2147483648,
|
||||
"gpu_number": 0
|
||||
},
|
||||
"version": "1.1.0"
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
body_file="$TMP_ROOT/register_body.json"
|
||||
status=$(curl -sS -o "$body_file" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$payload")
|
||||
body="$(cat "$body_file")"
|
||||
|
||||
if [[ "$status" != "201" ]]; then
|
||||
echo "[ERROR] Unexpected status code: $status" >&2
|
||||
echo "$body" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
node_id=$(python3 - "$body_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
body = json.load(handle)
|
||||
print(body["id"])
|
||||
PY
|
||||
)
|
||||
|
||||
echo "$body" > "$TMP_ROOT/last_response.json"
|
||||
echo "$node_id" > "$TMP_ROOT/node_id"
|
||||
|
||||
list_file="$TMP_ROOT/nodes_list.json"
|
||||
curl -sS "$API_BASE/nodes" -o "$list_file"
|
||||
python3 - "$list_file" "$node_id" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
data = json.load(handle)
|
||||
node_id = sys.argv[2]
|
||||
assert any(item.get("id") == node_id for item in data), "node not in list"
|
||||
PY
|
||||
|
||||
echo "[INFO] Registered node with id $node_id"
|
116
src/master/tests/scripts/04_reregister_and_error_cases.sh
Executable file
116
src/master/tests/scripts/04_reregister_and_error_cases.sh
Executable file
@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
|
||||
# 使用相同 ID 重注册,同时修改部分 meta/version 字段
|
||||
payload=$(cat <<JSON
|
||||
{
|
||||
"id": "$NODE_ID",
|
||||
"name": "dev-testuser-testinst-pod-0",
|
||||
"type": "agent",
|
||||
"meta_data": {
|
||||
"hostname": "dev-testuser-testinst-pod-0",
|
||||
"ip": "10.0.0.11",
|
||||
"env": "dev",
|
||||
"user": "testuser",
|
||||
"instance": "testinst",
|
||||
"cpu_number": 8,
|
||||
"memory_in_bytes": 2147483648,
|
||||
"gpu_number": 0
|
||||
},
|
||||
"version": "1.2.0"
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
status=$(curl -sS -o "$TMP_ROOT/reregister_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$payload")
|
||||
if [[ "$status" != "200" ]]; then
|
||||
echo "[ERROR] 重注册返回非 200: $status" >&2
|
||||
cat "$TMP_ROOT/reregister_response.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python3 - "$TMP_ROOT/reregister_response.json" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["meta_data"]["ip"] == "10.0.0.11", node["meta_data"]
|
||||
assert node["meta_data"]["cpu_number"] == 8, node["meta_data"]
|
||||
assert node["version"] == "1.2.0", node
|
||||
PY
|
||||
|
||||
echo "[INFO] 重注册成功,元数据已更新"
|
||||
|
||||
# 未知 ID => 404
|
||||
unknown_payload=$(cat <<'JSON'
|
||||
{
|
||||
"id": "A999",
|
||||
"name": "dev-testuser-testinst-pod-0",
|
||||
"type": "agent",
|
||||
"meta_data": {
|
||||
"hostname": "dev-testuser-testinst-pod-0",
|
||||
"ip": "10.0.0.12",
|
||||
"env": "dev",
|
||||
"user": "testuser",
|
||||
"instance": "testinst",
|
||||
"cpu_number": 4,
|
||||
"memory_in_bytes": 2147483648,
|
||||
"gpu_number": 0
|
||||
},
|
||||
"version": "1.2.0"
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
status=$(curl -sS -o "$TMP_ROOT/unknown_id_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$unknown_payload")
|
||||
if [[ "$status" != "404" ]]; then
|
||||
echo "[ERROR] 未知 ID 应返回 404,实际=$status" >&2
|
||||
cat "$TMP_ROOT/unknown_id_response.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] 未知 ID 返回 404 验证通过"
|
||||
|
||||
# id 与 name 不匹配 => 500,节点保持原名
|
||||
mismatch_payload=$(cat <<JSON
|
||||
{
|
||||
"id": "$NODE_ID",
|
||||
"name": "dev-testuser-testinst-pod-0-mismatch",
|
||||
"type": "agent",
|
||||
"meta_data": {
|
||||
"hostname": "dev-testuser-testinst-pod-0-mismatch",
|
||||
"ip": "10.0.0.13",
|
||||
"env": "dev",
|
||||
"user": "testuser",
|
||||
"instance": "testinst",
|
||||
"cpu_number": 4,
|
||||
"memory_in_bytes": 2147483648,
|
||||
"gpu_number": 0
|
||||
},
|
||||
"version": "1.2.0"
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
status=$(curl -sS -o "$TMP_ROOT/mismatch_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$mismatch_payload")
|
||||
if [[ "$status" != "500" ]]; then
|
||||
echo "[ERROR] 名称不匹配应返回 500,实际=$status" >&2
|
||||
cat "$TMP_ROOT/mismatch_response.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 验证名称仍保持正确
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/post_mismatch_detail.json"
|
||||
python3 - "$TMP_ROOT/post_mismatch_detail.json" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["name"] == "dev-testuser-testinst-pod-0", node["name"]
|
||||
PY
|
||||
|
||||
echo "[INFO] 名称不匹配返回 500,且原始节点未被篡改"
|
98
src/master/tests/scripts/05_status_report_via_curl.sh
Executable file
98
src/master/tests/scripts/05_status_report_via_curl.sh
Executable file
@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
|
||||
node_id="$(cat "$TMP_ROOT/node_id")"
|
||||
|
||||
payload=$(python3 - <<'PY'
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
body = {
|
||||
"timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
|
||||
"health": {
|
||||
"log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"},
|
||||
"metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}
|
||||
}
|
||||
}
|
||||
print(json.dumps(body))
|
||||
PY
|
||||
)
|
||||
|
||||
response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload")
|
||||
body="$(echo "$response" | head -n -1)"
|
||||
status="$(echo "$response" | tail -n1)"
|
||||
|
||||
if [[ "$status" != "200" ]]; then
|
||||
echo "[ERROR] Status update failed with code $status" >&2
|
||||
echo "$body" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "$body" > "$TMP_ROOT/last_response.json"
|
||||
|
||||
sleep 3
|
||||
|
||||
detail_file="$TMP_ROOT/status_detail.json"
|
||||
curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file"
|
||||
python3 - "$detail_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["status"] == "online", f"Expected online, got {node['status']}"
|
||||
assert "log-fluentbit" in node["health"], node["health"].keys()
|
||||
PY
|
||||
|
||||
echo "[INFO] Status report successful and node is online"
|
||||
|
||||
# 等待超过 offline 阈值,验证会自动转为 offline
|
||||
sleep 7
|
||||
|
||||
offline_detail_file="$TMP_ROOT/status_offline.json"
|
||||
curl -sS "$API_BASE/nodes/$node_id" -o "$offline_detail_file"
|
||||
python3 - "$offline_detail_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["status"] == "offline", f"Expected offline, got {node['status']}"
|
||||
PY
|
||||
|
||||
echo "[INFO] Node transitioned to offline as expected"
|
||||
|
||||
# 再次上报健康,触发状态回到 online
|
||||
payload=$(python3 - <<'PY'
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
body = {
|
||||
"timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
|
||||
"health": {
|
||||
"log-fluentbit": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"},
|
||||
"metric-node-exporter": {"status": "healthy", "timestamp": "2023-10-05T12:05:00Z"}
|
||||
}
|
||||
}
|
||||
print(json.dumps(body))
|
||||
PY
|
||||
)
|
||||
|
||||
curl -sS -o "$TMP_ROOT/second_status_response.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$node_id/status" -d "$payload" > "$TMP_ROOT/second_status_code"
|
||||
if [[ $(cat "$TMP_ROOT/second_status_code") != "200" ]]; then
|
||||
echo "[ERROR] Second status update failed" >&2
|
||||
cat "$TMP_ROOT/second_status_response.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 3
|
||||
|
||||
final_detail_file="$TMP_ROOT/status_back_online.json"
|
||||
curl -sS "$API_BASE/nodes/$node_id" -o "$final_detail_file"
|
||||
python3 - "$final_detail_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["status"] == "online", f"Expected online after second report, got {node['status']}"
|
||||
PY
|
||||
|
||||
echo "[INFO] Node transitioned back to online after new status report"
|
56
src/master/tests/scripts/06_config_update_and_nodes_json.sh
Executable file
56
src/master/tests/scripts/06_config_update_and_nodes_json.sh
Executable file
@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
|
||||
payload='{"config":{"log_level":"debug"},"label":["gpu","exp001"]}'
|
||||
|
||||
response=$(curl -sS -w '\n%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/config" -d "$payload")
|
||||
body="$(echo "$response" | head -n -1)"
|
||||
status="$(echo "$response" | tail -n1)"
|
||||
|
||||
if [[ "$status" != "200" ]]; then
|
||||
echo "[ERROR] Config update failed: $status" >&2
|
||||
echo "$body" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 2
|
||||
|
||||
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
|
||||
if [[ ! -f "$nodes_json_path" ]]; then
|
||||
echo "[ERROR] nodes.json not generated at $nodes_json_path" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 确保节点处于 online 状态,避免因等待导致 nodes.json 为空
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$TMP_ROOT/config_detail.json"
|
||||
if ! python3 - "$TMP_ROOT/config_detail.json" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
if node["status"] != "online":
|
||||
raise SystemExit(1)
|
||||
PY
|
||||
then
|
||||
payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}'
|
||||
curl -sS -o "$TMP_ROOT/config_second_report.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$NODE_ID/status" -d "$payload" > "$TMP_ROOT/config_second_code"
|
||||
sleep 2
|
||||
fi
|
||||
|
||||
python3 - "$nodes_json_path" <<'PY'
|
||||
import json, sys
|
||||
from pathlib import Path
|
||||
path = Path(sys.argv[1])
|
||||
content = json.loads(path.read_text())
|
||||
assert isinstance(content, list) and len(content) == 1
|
||||
entry = content[0]
|
||||
assert entry["labels"] == ["gpu", "exp001"], entry
|
||||
PY
|
||||
|
||||
echo "[INFO] Config updated and nodes.json verified"
|
41
src/master/tests/scripts/07_stats_single_node.sh
Executable file
41
src/master/tests/scripts/07_stats_single_node.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
|
||||
sleep 7
|
||||
|
||||
detail_file="$TMP_ROOT/offline_detail.json"
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"
|
||||
python3 - "$detail_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
assert node["status"] == "offline", f"Expected offline, got {node['status']}"
|
||||
PY
|
||||
|
||||
stats_file="$TMP_ROOT/stats.json"
|
||||
curl -sS "$API_BASE/nodes/statistics" -o "$stats_file"
|
||||
python3 - "$stats_file" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
stats = json.load(handle)
|
||||
assert stats["total"] == 1
|
||||
found = {item["status"]: item["count"] for item in stats["status_statistics"]}
|
||||
assert found.get("offline") == 1
|
||||
PY
|
||||
|
||||
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
|
||||
python3 - "$nodes_json_path" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
content = json.load(handle)
|
||||
assert content == [], content
|
||||
PY
|
||||
|
||||
echo "[INFO] Offline transition and statistics validated"
|
106
src/master/tests/scripts/08_multi_node_stats.sh
Executable file
106
src/master/tests/scripts/08_multi_node_stats.sh
Executable file
@ -0,0 +1,106 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
|
||||
# 注册第二个节点 A2(保持在线)
|
||||
second_payload=$(cat <<'JSON'
|
||||
{
|
||||
"name": "dev-testuser-testinst-pod-1",
|
||||
"type": "agent",
|
||||
"meta_data": {
|
||||
"hostname": "dev-testuser-testinst-pod-1",
|
||||
"ip": "10.0.0.11",
|
||||
"env": "dev",
|
||||
"user": "testuser",
|
||||
"instance": "testinst",
|
||||
"cpu_number": 8,
|
||||
"memory_in_bytes": 2147483648,
|
||||
"gpu_number": 0
|
||||
},
|
||||
"version": "1.1.0"
|
||||
}
|
||||
JSON
|
||||
)
|
||||
|
||||
status=$(curl -sS -o "$TMP_ROOT/second_register.json" -w '%{http_code}' -H 'Content-Type: application/json' -X POST "$API_BASE/nodes" -d "$second_payload")
|
||||
if [[ "$status" != "201" ]]; then
|
||||
echo "[ERROR] Second node registration failed: $status" >&2
|
||||
cat "$TMP_ROOT/second_register.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
SECOND_NODE_ID=$(python3 - "$TMP_ROOT/second_register.json" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
data = json.load(handle)
|
||||
print(data["id"])
|
||||
PY
|
||||
)
|
||||
|
||||
echo "$SECOND_NODE_ID" > "$TMP_ROOT/second_node_id"
|
||||
|
||||
echo "[INFO] Second node registered with id $SECOND_NODE_ID"
|
||||
|
||||
# A2 上报健康信息,保持 online
|
||||
status_payload='{"timestamp":"2025-09-24T00:00:00Z","health":{"log-fluentbit":{"status":"healthy"}}}'
|
||||
status=$(curl -sS -o "$TMP_ROOT/second_status.json" -w '%{http_code}' -H 'Content-Type: application/json' -X PUT "$API_BASE/nodes/$SECOND_NODE_ID/status" -d "$status_payload")
|
||||
if [[ "$status" != "200" ]]; then
|
||||
echo "[ERROR] Second node status update failed: $status" >&2
|
||||
cat "$TMP_ROOT/second_status.json" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 等待调度器把第二节点标记为 online
|
||||
second_online=false
|
||||
for _ in {1..10}; do
|
||||
sleep 1
|
||||
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$TMP_ROOT/second_detail.json" || continue
|
||||
if python3 - "$TMP_ROOT/second_detail.json" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
node = json.load(handle)
|
||||
if node["status"] != "online":
|
||||
raise SystemExit(1)
|
||||
PY
|
||||
then
|
||||
second_online=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ "$second_online" != true ]]; then
|
||||
echo "[ERROR] Second node did not become online" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 再次获取统计信息
|
||||
stats_file="$TMP_ROOT/multi_stats.json"
|
||||
curl -sS "$API_BASE/nodes/statistics" -o "$stats_file"
|
||||
python3 - "$stats_file" "$TMP_ROOT/node_id" "$TMP_ROOT/second_node_id" <<'PY'
|
||||
import json, sys, pathlib
|
||||
with open(sys.argv[1]) as handle:
|
||||
stats = json.load(handle)
|
||||
first_id = pathlib.Path(sys.argv[2]).read_text().strip()
|
||||
second_id = pathlib.Path(sys.argv[3]).read_text().strip()
|
||||
assert stats["total"] == 2, stats
|
||||
found = {item["status"]: item["count"] for item in stats["status_statistics"]}
|
||||
assert found.get("offline") == 1, found
|
||||
assert found.get("online") == 1, found
|
||||
PY
|
||||
|
||||
# 验证 nodes.json 只包含在线节点(应只有第二个 A2)
|
||||
nodes_json_path="$PRIVATE_ROOT/argus/metric/prometheus/nodes.json"
|
||||
python3 - "$nodes_json_path" "$SECOND_NODE_ID" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1]) as handle:
|
||||
content = json.load(handle)
|
||||
expected_id = sys.argv[2]
|
||||
assert len(content) == 1, content
|
||||
assert content[0]["node_id"] == expected_id, content
|
||||
PY
|
||||
|
||||
echo "[INFO] Multi-node statistics and nodes.json validated"
|
161
src/master/tests/scripts/09_restart_persistence.sh
Executable file
161
src/master/tests/scripts/09_restart_persistence.sh
Executable file
@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
API_BASE="http://localhost:31300/api/v1/master"
|
||||
ROOT_BASE="http://localhost:31300"
|
||||
DB_PATH="$PRIVATE_ROOT/argus/master/db.sqlite3"
|
||||
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
if [[ ! -f "$TMP_ROOT/node_id" ]]; then
|
||||
echo "[ERROR] 主节点 ID 缺失,请先执行前置用例" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -f "$TMP_ROOT/second_node_id" ]]; then
|
||||
echo "[ERROR] 第二个节点 ID 缺失,请先执行多节点场景脚本" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -f "$DB_PATH" ]]; then
|
||||
echo "[ERROR] 持久化数据库缺失: $DB_PATH" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
NODE_ID="$(cat "$TMP_ROOT/node_id")"
|
||||
SECOND_NODE_ID="$(cat "$TMP_ROOT/second_node_id")"
|
||||
|
||||
# 在重启前抓取节点详情与节点文件、统计信息,作为对比基线
|
||||
first_before="$TMP_ROOT/${NODE_ID}_pre_restart.json"
|
||||
second_before="$TMP_ROOT/${SECOND_NODE_ID}_pre_restart.json"
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_before"
|
||||
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_before"
|
||||
|
||||
nodes_json_before="$TMP_ROOT/nodes_json_pre_restart.json"
|
||||
cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_before"
|
||||
|
||||
stats_before="$TMP_ROOT/stats_pre_restart.json"
|
||||
curl -sS "$API_BASE/nodes/statistics" -o "$stats_before"
|
||||
|
||||
# 重启 master 容器,模拟服务重启后的持久化场景
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose restart master
|
||||
popd >/dev/null
|
||||
|
||||
# 等待 /readyz 恢复 200
|
||||
for _ in {1..30}; do
|
||||
status=$(curl -s -o /dev/null -w '%{http_code}' "$ROOT_BASE/readyz" || true)
|
||||
if [[ "$status" == "200" ]]; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if [[ "${status:-}" != "200" ]]; then
|
||||
echo "[ERROR] master 容器重启后未恢复健康状态,readyz=$status" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
sleep 2
|
||||
|
||||
first_after="$TMP_ROOT/${NODE_ID}_post_restart.json"
|
||||
second_after="$TMP_ROOT/${SECOND_NODE_ID}_post_restart.json"
|
||||
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$first_after"
|
||||
curl -sS "$API_BASE/nodes/$SECOND_NODE_ID" -o "$second_after"
|
||||
|
||||
# 对比重启前后的节点关键信息,确保无丢失
|
||||
python3 - "$first_before" "$first_after" <<'PY'
|
||||
import json, sys
|
||||
before_path, after_path = sys.argv[1:3]
|
||||
with open(before_path, 'r', encoding='utf-8') as handle:
|
||||
before = json.load(handle)
|
||||
with open(after_path, 'r', encoding='utf-8') as handle:
|
||||
after = json.load(handle)
|
||||
keys = [
|
||||
"id",
|
||||
"name",
|
||||
"type",
|
||||
"version",
|
||||
"register_time",
|
||||
"meta_data",
|
||||
"config",
|
||||
"label",
|
||||
"health",
|
||||
"last_report",
|
||||
"agent_last_report",
|
||||
"status",
|
||||
]
|
||||
for key in keys:
|
||||
if before.get(key) != after.get(key):
|
||||
raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}")
|
||||
PY
|
||||
|
||||
python3 - "$second_before" "$second_after" <<'PY'
|
||||
import json, sys
|
||||
before_path, after_path = sys.argv[1:3]
|
||||
with open(before_path, 'r', encoding='utf-8') as handle:
|
||||
before = json.load(handle)
|
||||
with open(after_path, 'r', encoding='utf-8') as handle:
|
||||
after = json.load(handle)
|
||||
keys = [
|
||||
"id",
|
||||
"name",
|
||||
"type",
|
||||
"version",
|
||||
"register_time",
|
||||
"meta_data",
|
||||
"config",
|
||||
"label",
|
||||
"health",
|
||||
"last_report",
|
||||
"agent_last_report",
|
||||
"status",
|
||||
]
|
||||
for key in keys:
|
||||
if before.get(key) != after.get(key):
|
||||
raise AssertionError(f"Key {key} changed after restart: {before.get(key)} -> {after.get(key)}")
|
||||
PY
|
||||
|
||||
# 对比重启前后的 nodes.json 与统计信息,验证持久化一致性
|
||||
nodes_json_after="$TMP_ROOT/nodes_json_post_restart.json"
|
||||
cp "$PRIVATE_ROOT/argus/metric/prometheus/nodes.json" "$nodes_json_after"
|
||||
|
||||
stats_after="$TMP_ROOT/stats_after_restart.json"
|
||||
curl -sS "$API_BASE/nodes/statistics" -o "$stats_after"
|
||||
|
||||
python3 - "$nodes_json_before" "$nodes_json_after" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
|
||||
before = json.load(handle)
|
||||
with open(sys.argv[2], 'r', encoding='utf-8') as handle:
|
||||
after = json.load(handle)
|
||||
if before != after:
|
||||
raise AssertionError(f"nodes.json changed after restart: {before} -> {after}")
|
||||
PY
|
||||
|
||||
python3 - "$stats_before" "$stats_after" <<'PY'
|
||||
import json, sys
|
||||
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
|
||||
before = json.load(handle)
|
||||
with open(sys.argv[2], 'r', encoding='utf-8') as handle:
|
||||
after = json.load(handle)
|
||||
if before != after:
|
||||
raise AssertionError(f"Statistics changed after restart: {before} -> {after}")
|
||||
PY
|
||||
|
||||
if [[ ! -s "$DB_PATH" ]]; then
|
||||
echo "[ERROR] 数据库文件为空,疑似未持久化" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "[INFO] Master 重启后持久化数据校验通过"
|
24
src/master/tests/scripts/10_down.sh
Executable file
24
src/master/tests/scripts/10_down.sh
Executable file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
|
||||
PRIVATE_ROOT="$TEST_ROOT/private"
|
||||
TMP_ROOT="$TEST_ROOT/tmp"
|
||||
|
||||
compose() {
|
||||
if docker compose version >/dev/null 2>&1; then
|
||||
docker compose "$@"
|
||||
else
|
||||
docker-compose "$@"
|
||||
fi
|
||||
}
|
||||
|
||||
pushd "$TEST_ROOT" >/dev/null
|
||||
compose down --remove-orphans
|
||||
popd >/dev/null
|
||||
|
||||
rm -rf "$TMP_ROOT"
|
||||
rm -rf "$PRIVATE_ROOT"
|
||||
|
||||
echo "[INFO] Master E2E environment cleaned up"
|
Loading…
x
Reference in New Issue
Block a user