diff --git a/src/agent/Dockerfile b/src/agent/Dockerfile new file mode 100644 index 0000000..c4dc076 --- /dev/null +++ b/src/agent/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11-slim + +SHELL ["/bin/bash", "-c"] + +ARG PIP_INDEX_URL= +ENV PIP_NO_CACHE_DIR=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONPATH=/app + +WORKDIR /app + +COPY requirements.txt ./ +RUN set -euxo pipefail \ + && python -m pip install --upgrade pip \ + && if [[ -n "$PIP_INDEX_URL" ]]; then \ + PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \ + else \ + python -m pip install -r requirements.txt; \ + fi + +COPY app ./app + +CMD ["python", "-m", "app.main"] diff --git a/src/agent/README.md b/src/agent/README.md index e69de29..04475db 100644 --- a/src/agent/README.md +++ b/src/agent/README.md @@ -0,0 +1,31 @@ +# Argus Agent Module + +Python agent that registers with the Argus master service, persists node information, gathers host metadata, reads module health files, and periodically reports status. + +## Build & Run + +```bash +cd src/agent +./scripts/build_images.sh # builds argus-agent:dev +``` + +Runtime expects a configuration file (generated by installer) at `/private/argus/agent//config`. Key fields: + +- `HOSTNAME`, `NODE_FILE`, `VERSION` +- `MASTER_ENDPOINT` (e.g. `http://master:3000`) +- `REPORT_INTERVAL_SECONDS` +- `SUBMODULE_HEALTH_FILE_DIR` (supports `{hostname}` placeholder) +- optional `GPU_NUMBER` + +Health files live under `/private/argus/agent/health//` and must follow `-*.json` naming (e.g. `log-fluentbit.json`). The agent sends parsed JSON objects keyed by file stem. + +## Tests + +Docker-based E2E stack (master + agent): + +```bash +cd src/agent/tests +./scripts/00_e2e_test.sh +``` + +The scripts provision configs/health directories under `tests/private/` and clean up via `07_down.sh`. diff --git a/src/agent/app/__init__.py b/src/agent/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/agent/app/client.py b/src/agent/app/client.py new file mode 100644 index 0000000..f4f8bd6 --- /dev/null +++ b/src/agent/app/client.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +import requests + +from .log import get_logger + +LOGGER = get_logger("argus.agent.client") + + +class MasterAPIError(Exception): + def __init__(self, message: str, status_code: int, payload: Optional[Dict[str, Any]] = None) -> None: + super().__init__(message) + self.status_code = status_code + self.payload = payload or {} + + +class AgentClient: + def __init__(self, base_url: str, *, timeout: int = 10) -> None: + self._base_url = base_url.rstrip("/") + self._timeout = timeout + self._session = requests.Session() + + def register_node(self, body: Dict[str, Any]) -> Dict[str, Any]: + """调用 master 注册接口,返回节点对象。""" + url = f"{self._base_url}/api/v1/master/nodes" + response = self._session.post(url, json=body, timeout=self._timeout) + return self._parse_response(response, "Failed to register node") + + def update_status(self, node_id: str, body: Dict[str, Any]) -> Dict[str, Any]: + """上报健康信息,由 master 更新 last_report。""" + url = f"{self._base_url}/api/v1/master/nodes/{node_id}/status" + response = self._session.put(url, json=body, timeout=self._timeout) + return self._parse_response(response, "Failed to update node status") + + def _parse_response(self, response: requests.Response, error_prefix: str) -> Dict[str, Any]: + content_type = response.headers.get("Content-Type", "") + payload: Dict[str, Any] | None = None + if "application/json" in content_type: + try: + payload = response.json() + except json.JSONDecodeError: + LOGGER.warning("Response contained invalid JSON", extra={"status": response.status_code}) + + if response.status_code >= 400: + message = payload.get("error") if isinstance(payload, dict) else response.text + raise MasterAPIError( + f"{error_prefix}: {message}", + status_code=response.status_code, + payload=payload if isinstance(payload, dict) else None, + ) + + if payload is None: + try: + payload = response.json() + except json.JSONDecodeError as exc: + raise MasterAPIError("Master returned non-JSON payload", response.status_code) from exc + return payload diff --git a/src/agent/app/collector.py b/src/agent/app/collector.py new file mode 100644 index 0000000..c733fa4 --- /dev/null +++ b/src/agent/app/collector.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import os +import re +import socket +import subprocess +from pathlib import Path +from typing import Any, Dict + +from .config import AgentConfig +from .log import get_logger + +LOGGER = get_logger("argus.agent.collector") + +_HOSTNAME_PATTERN = re.compile(r"^([^-]+)-([^-]+)-([^-]+)-.*$") + + +def collect_metadata(config: AgentConfig) -> Dict[str, Any]: + """汇总节点注册需要的静态信息。""" + hostname = config.hostname + env, user, instance = _parse_hostname(hostname) + meta = { + "hostname": hostname, + "ip": _detect_ip_address(), + "env": env, + "user": user, + "instance": instance, + "cpu_number": _detect_cpu_count(), + "memory_in_bytes": _detect_memory_bytes(), + "gpu_number": _detect_gpu_count(config), + } + return meta + + +def _parse_hostname(hostname: str) -> tuple[str, str, str]: + """按照约定的 env-user-instance 前缀拆解主机名。""" + match = _HOSTNAME_PATTERN.match(hostname) + if not match: + LOGGER.warning("Hostname does not match expected pattern", extra={"hostname": hostname}) + return "", "", "" + return match.group(1), match.group(2), match.group(3) + + +def _detect_cpu_count() -> int: + count = os.cpu_count() + return count if count is not None else 0 + + +def _detect_memory_bytes() -> int: + """优先读取 cgroup 限额,失败时退回 /proc/meminfo。""" + cgroup_path = Path("/sys/fs/cgroup/memory.max") + try: + raw = cgroup_path.read_text(encoding="utf-8").strip() + if raw and raw != "max": + return int(raw) + except FileNotFoundError: + LOGGER.debug("cgroup memory.max not found, falling back to /proc/meminfo") + except ValueError: + LOGGER.warning("Failed to parse memory.max, falling back", extra={"value": raw}) + + try: + with open("/proc/meminfo", "r", encoding="utf-8") as handle: + for line in handle: + if line.startswith("MemTotal:"): + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) * 1024 + except FileNotFoundError: + LOGGER.error("/proc/meminfo not found; defaulting memory to 0") + return 0 + + +def _detect_gpu_count(config: AgentConfig) -> int: + """采集 GPU 数量,可被配置覆盖。""" + if config.gpu_number_override is not None: + return config.gpu_number_override + + try: + proc = subprocess.run( + ["nvidia-smi", "-L"], + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=5, + ) + except FileNotFoundError: + LOGGER.debug("nvidia-smi not available; assuming 0 GPUs") + return 0 + except subprocess.SubprocessError as exc: + LOGGER.warning("nvidia-smi invocation failed", extra={"error": str(exc)}) + return 0 + + if proc.returncode != 0: + LOGGER.debug("nvidia-smi returned non-zero", extra={"stderr": proc.stderr.strip()}) + return 0 + + count = sum(1 for line in proc.stdout.splitlines() if line.strip()) + return count + + +def _detect_ip_address() -> str: + """尝试通过 UDP socket 获得容器出口 IP,失败则回退解析主机名。""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.connect(("8.8.8.8", 80)) + return sock.getsockname()[0] + except OSError: + LOGGER.debug("UDP socket trick failed; falling back to hostname lookup") + try: + return socket.gethostbyname(socket.gethostname()) + except OSError: + LOGGER.warning("Unable to resolve hostname to IP; defaulting to 127.0.0.1") + return "127.0.0.1" diff --git a/src/agent/app/config.py b/src/agent/app/config.py new file mode 100644 index 0000000..95f2d81 --- /dev/null +++ b/src/agent/app/config.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class AgentConfig: + hostname: str + node_file: str + version: str + master_endpoint: str + report_interval_seconds: int + health_dir_template: str + gpu_number_override: int | None + request_timeout_seconds: int = 10 + + @property + def health_dir(self) -> str: + return self.health_dir_template.format(hostname=self.hostname) + + +def _parse_config_file(path: str) -> dict[str, str]: + result: dict[str, str] = {} + try: + with open(path, "r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + result[key.strip().upper()] = value.strip() + except FileNotFoundError: + raise FileNotFoundError(f"Agent config file not found: {path}") from None + return result + + +def load_config(path: str) -> AgentConfig: + """读取配置文件并结合环境变量,返回 AgentConfig。""" + config_values = _parse_config_file(path) + force_env = os.environ.get("AGENT_FORCE_ENV", "0").lower() in {"1", "true", "yes"} + + def read_key(key: str, default: str | None = None, *, required: bool = False) -> str: + env_key = f"AGENT_{key}" + if env_key in os.environ: + return os.environ[env_key] + if force_env and key in os.environ: + return os.environ[key] + if key in config_values: + return config_values[key] + if default is not None: + return default + if required: + raise ValueError(f"Missing required configuration key: {key}") + return "" + + hostname = read_key("HOSTNAME", required=True) + node_file = read_key("NODE_FILE", f"/private/argus/agent/{hostname}/node.json") + version = read_key("VERSION", "1.0.0") + master_endpoint = read_key("MASTER_ENDPOINT", required=True) + report_interval_raw = read_key("REPORT_INTERVAL_SECONDS", "60") + health_dir_template = read_key( + "SUBMODULE_HEALTH_FILE_DIR", + f"/private/argus/agent/health/{{hostname}}/", + ) + gpu_override_raw = read_key("GPU_NUMBER", "") + + try: + report_interval_seconds = int(report_interval_raw) + except ValueError as exc: + raise ValueError("REPORT_INTERVAL_SECONDS must be an integer") from exc + if report_interval_seconds <= 0: + raise ValueError("REPORT_INTERVAL_SECONDS must be positive") + + gpu_override = None + if gpu_override_raw: + try: + gpu_override = int(gpu_override_raw) + except ValueError as exc: + raise ValueError("GPU_NUMBER must be an integer when provided") from exc + if gpu_override < 0: + raise ValueError("GPU_NUMBER must be non-negative") + + if not master_endpoint.startswith("http://") and not master_endpoint.startswith("https://"): + master_endpoint = f"http://{master_endpoint}" + + Path(node_file).parent.mkdir(parents=True, exist_ok=True) + Path(health_dir_template.format(hostname=hostname)).mkdir(parents=True, exist_ok=True) + + return AgentConfig( + hostname=hostname, + node_file=node_file, + version=version, + master_endpoint=master_endpoint.rstrip("/"), + report_interval_seconds=report_interval_seconds, + health_dir_template=health_dir_template, + gpu_number_override=gpu_override, + ) diff --git a/src/agent/app/health_reader.py b/src/agent/app/health_reader.py new file mode 100644 index 0000000..754ca24 --- /dev/null +++ b/src/agent/app/health_reader.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict + +from .log import get_logger + +LOGGER = get_logger("argus.agent.health") + + +def read_health_directory(path: str) -> Dict[str, Any]: + """读取目录中所有 -*.json 文件并返回 JSON 映射。""" + result: Dict[str, Any] = {} + directory = Path(path) + if not directory.exists(): + LOGGER.debug("Health directory does not exist", extra={"path": str(directory)}) + return result + + for health_file in sorted(directory.glob("*.json")): + if "-" not in health_file.stem: + LOGGER.debug("Skipping non-prefixed health file", extra={"file": health_file.name}) + continue + try: + with health_file.open("r", encoding="utf-8") as handle: + content = json.load(handle) + result[health_file.stem] = content + except json.JSONDecodeError as exc: + LOGGER.warning("Failed to parse health file", extra={"file": health_file.name, "error": str(exc)}) + except OSError as exc: + LOGGER.warning("Failed to read health file", extra={"file": health_file.name, "error": str(exc)}) + return result diff --git a/src/agent/app/log.py b/src/agent/app/log.py new file mode 100644 index 0000000..fffecbe --- /dev/null +++ b/src/agent/app/log.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +import logging +import os + + +_LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s - %(message)s" + + +def setup_logging() -> None: + level_name = os.environ.get("AGENT_LOG_LEVEL", "INFO").upper() + level = getattr(logging, level_name, logging.INFO) + logging.basicConfig(level=level, format=_LOG_FORMAT) + + +def get_logger(name: str) -> logging.Logger: + setup_logging() + return logging.getLogger(name) diff --git a/src/agent/app/main.py b/src/agent/app/main.py new file mode 100644 index 0000000..7d87595 --- /dev/null +++ b/src/agent/app/main.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import argparse +import signal +import sys +import time +from datetime import datetime, timezone +from typing import Optional + +from .client import AgentClient, MasterAPIError +from .collector import collect_metadata +from .config import AgentConfig, load_config +from .health_reader import read_health_directory +from .log import get_logger, setup_logging +from .state import clear_node_state, load_node_state, save_node_state + +LOGGER = get_logger("argus.agent") + + +def _current_timestamp() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +class StopSignal: + def __init__(self) -> None: + self._stop = False + + def set(self, *_args) -> None: # type: ignore[override] + self._stop = True + + def is_set(self) -> bool: + return self._stop + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Argus agent") + parser.add_argument( + "--config", + dest="config_path", + default=None, + help="Path to agent config file", + ) + return parser.parse_args(argv) + + +def main(argv: Optional[list[str]] = None) -> int: + setup_logging() + args = parse_args(argv or sys.argv[1:]) + + stop_signal = StopSignal() + signal.signal(signal.SIGTERM, stop_signal.set) + signal.signal(signal.SIGINT, stop_signal.set) + + try: + config_path = args.config_path or _default_config_path() + config = load_config(config_path) + except Exception as exc: + LOGGER.error("Failed to load configuration", extra={"error": str(exc)}) + return 1 + + LOGGER.info( + "Agent starting", + extra={ + "hostname": config.hostname, + "master_endpoint": config.master_endpoint, + "node_file": config.node_file, + }, + ) + + client = AgentClient(config.master_endpoint, timeout=config.request_timeout_seconds) + + node_state = load_node_state(config.node_file) or {} + node_id = node_state.get("id") + + # 与 master 建立注册关系(支持重注册),失败则重试 + register_response = _register_with_retry(client, config, node_id, stop_signal) + if register_response is None: + LOGGER.info("Registration aborted due to shutdown signal") + return 0 + + node_id = register_response.get("id") + if not node_id: + LOGGER.error("Master did not return node id; aborting") + return 1 + save_node_state(config.node_file, register_response) + + LOGGER.info("Entering status report loop", extra={"node_id": node_id}) + _status_loop(client, config, node_id, stop_signal) + return 0 + + +def _default_config_path() -> str: + from socket import gethostname + + hostname = gethostname() + return f"/private/argus/agent/{hostname}/config" + + +def _register_with_retry( + client: AgentClient, + config: AgentConfig, + node_id: Optional[str], + stop_signal: StopSignal, +): + backoff = 5 + while not stop_signal.is_set(): + payload = { + "name": config.hostname, + "type": "agent", + "meta_data": collect_metadata(config), + "version": config.version, + } + if node_id: + payload["id"] = node_id + + try: + response = client.register_node(payload) + LOGGER.info("Registration successful", extra={"node_id": response.get("id")}) + save_node_state(config.node_file, response) + return response + except MasterAPIError as exc: + if exc.status_code == 404 and node_id: + LOGGER.warning( + "Master does not recognise node id; clearing local node state", + extra={"node_id": node_id}, + ) + clear_node_state(config.node_file) + node_id = None + elif exc.status_code == 500 and node_id: + # id 与 name 不匹配通常意味着配置异常,记录但继续重试 + LOGGER.error( + "Master rejected node due to id/name mismatch; will retry", + extra={"node_id": node_id}, + ) + else: + LOGGER.error("Registration failed", extra={"status_code": exc.status_code, "error": str(exc)}) + time.sleep(min(backoff, 60)) + backoff = min(backoff * 2, 60) + except Exception as exc: # pragma: no cover - defensive + LOGGER.exception("Unexpected error during registration", extra={"error": str(exc)}) + time.sleep(min(backoff, 60)) + backoff = min(backoff * 2, 60) + return None + + +def _status_loop( + client: AgentClient, + config: AgentConfig, + node_id: str, + stop_signal: StopSignal, +) -> None: + interval = config.report_interval_seconds + while not stop_signal.is_set(): + timestamp = _current_timestamp() + health_payload = read_health_directory(config.health_dir) + body = { + "timestamp": timestamp, + "health": health_payload, + } + try: + response = client.update_status(node_id, body) + LOGGER.info( + "Status report succeeded", + extra={"node_id": node_id, "health_keys": list(health_payload.keys())}, + ) + save_node_state(config.node_file, response) + except MasterAPIError as exc: + # 保持循环继续执行,等待下一次重试 + LOGGER.error( + "Failed to report status", + extra={"status_code": exc.status_code, "error": str(exc)}, + ) + except Exception as exc: # pragma: no cover - defensive + LOGGER.exception("Unexpected error during status report", extra={"error": str(exc)}) + + for _ in range(interval): + if stop_signal.is_set(): + break + time.sleep(1) + + LOGGER.info("Stop signal received; exiting status loop") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/agent/app/state.py b/src/agent/app/state.py new file mode 100644 index 0000000..5cf6211 --- /dev/null +++ b/src/agent/app/state.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import json +import os +import tempfile +from pathlib import Path +from typing import Any, Dict, Optional + +from .log import get_logger + +LOGGER = get_logger("argus.agent.state") + + +def load_node_state(path: str) -> Optional[Dict[str, Any]]: + """读取本地 node.json,容器重启后沿用之前的 ID。""" + try: + with open(path, "r", encoding="utf-8") as handle: + return json.load(handle) + except FileNotFoundError: + return None + except json.JSONDecodeError as exc: + LOGGER.warning("node.json is invalid JSON; ignoring", extra={"error": str(exc)}) + return None + + +def save_node_state(path: str, data: Dict[str, Any]) -> None: + """原子化写入 node.json,避免并发读取坏数据。""" + directory = Path(path).parent + directory.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile("w", dir=directory, delete=False, encoding="utf-8") as tmp: + json.dump(data, tmp, separators=(",", ":")) + tmp.flush() + os.fsync(tmp.fileno()) + temp_path = tmp.name + os.replace(temp_path, path) + + +def clear_node_state(path: str) -> None: + try: + os.remove(path) + except FileNotFoundError: + return + except OSError as exc: + LOGGER.warning("Failed to remove node state file", extra={"error": str(exc), "path": path}) diff --git a/src/agent/images/.gitkeep b/src/agent/images/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/src/agent/requirements.txt b/src/agent/requirements.txt new file mode 100644 index 0000000..2c24336 --- /dev/null +++ b/src/agent/requirements.txt @@ -0,0 +1 @@ +requests==2.31.0 diff --git a/src/agent/scripts/build_images.sh b/src/agent/scripts/build_images.sh new file mode 100755 index 0000000..8bf88a5 --- /dev/null +++ b/src/agent/scripts/build_images.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--intranet] [--tag ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}" +BUILD_ARGS=() + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --intranet) + INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}" + BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}") + shift + ;; + --tag) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAG="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +echo "[INFO] Building image $IMAGE_TAG" +docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT" +echo "[OK] Image $IMAGE_TAG built" diff --git a/src/agent/scripts/load_images.sh b/src/agent/scripts/load_images.sh new file mode 100755 index 0000000..4b8c423 --- /dev/null +++ b/src/agent/scripts/load_images.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--file ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +DEFAULT_INPUT="$PROJECT_ROOT/images/argus-agent-dev.tar" +IMAGE_TAR="$DEFAULT_INPUT" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --file) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAR="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +if [[ ! -f "$IMAGE_TAR" ]]; then + echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2 + exit 1 +fi + +echo "[INFO] Loading image from $IMAGE_TAR" +docker image load -i "$IMAGE_TAR" +echo "[OK] Image loaded" diff --git a/src/agent/scripts/save_images.sh b/src/agent/scripts/save_images.sh new file mode 100755 index 0000000..e629de1 --- /dev/null +++ b/src/agent/scripts/save_images.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + echo "Usage: $0 [--tag ] [--output ]" >&2 +} + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-agent-dev.tar" +IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}" +OUTPUT_PATH="$DEFAULT_OUTPUT" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --tag) + [[ $# -ge 2 ]] || { usage; exit 1; } + IMAGE_TAG="$2" + shift 2 + ;; + --output) + [[ $# -ge 2 ]] || { usage; exit 1; } + OUTPUT_PATH="$2" + shift 2 + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown option: $1" >&2 + usage + exit 1 + ;; + esac + done + +mkdir -p "$(dirname "$OUTPUT_PATH")" +echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH" +docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH" +echo "[OK] Image saved" diff --git a/src/agent/tests/.gitignore b/src/agent/tests/.gitignore new file mode 100644 index 0000000..285ed60 --- /dev/null +++ b/src/agent/tests/.gitignore @@ -0,0 +1,2 @@ +private/ +tmp/ diff --git a/src/agent/tests/docker-compose.yml b/src/agent/tests/docker-compose.yml new file mode 100644 index 0000000..1442bb4 --- /dev/null +++ b/src/agent/tests/docker-compose.yml @@ -0,0 +1,31 @@ +services: + master: + image: argus-master:dev + container_name: argus-master-agent-e2e + environment: + - OFFLINE_THRESHOLD_SECONDS=6 + - ONLINE_THRESHOLD_SECONDS=2 + - SCHEDULER_INTERVAL_SECONDS=1 + ports: + - "32300:3000" + volumes: + - ./private/argus/master:/private/argus/master + - ./private/argus/metric/prometheus:/private/argus/metric/prometheus + + agent: + image: argus-agent:dev + container_name: argus-agent-e2e + hostname: dev-e2euser-e2einst-pod-0 + depends_on: + - master + volumes: + - ./private/argus/agent/dev-e2euser-e2einst-pod-0:/private/argus/agent/dev-e2euser-e2einst-pod-0 + - ./private/argus/agent/health/dev-e2euser-e2einst-pod-0:/private/argus/agent/health/dev-e2euser-e2einst-pod-0 + +networks: + default: + driver: bridge + ipam: + driver: default + config: + - subnet: 172.28.0.0/16 diff --git a/src/agent/tests/scripts/00_e2e_test.sh b/src/agent/tests/scripts/00_e2e_test.sh new file mode 100755 index 0000000..a3bf42d --- /dev/null +++ b/src/agent/tests/scripts/00_e2e_test.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SCRIPTS=( + "01_bootstrap.sh" + "02_up.sh" + "03_wait_and_assert_registration.sh" + "04_write_health_files.sh" + "05_assert_status_on_master.sh" + "06_restart_agent_and_reregister.sh" + "07_down.sh" +) + +for script in "${SCRIPTS[@]}"; do + echo "[TEST] Running $script" + "$SCRIPT_DIR/$script" + echo "[TEST] $script completed" + echo +done + +echo "[TEST] Agent module E2E tests completed" diff --git a/src/agent/tests/scripts/01_bootstrap.sh b/src/agent/tests/scripts/01_bootstrap.sh new file mode 100755 index 0000000..410d95a --- /dev/null +++ b/src/agent/tests/scripts/01_bootstrap.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +AGENT_ROOT="$(cd "$TEST_ROOT/.." && pwd)" +MASTER_ROOT="$(cd "$AGENT_ROOT/../master" && pwd)" +PRIVATE_ROOT="$TEST_ROOT/private" +TMP_ROOT="$TEST_ROOT/tmp" + +AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0" +AGENT_CONFIG_DIR="$PRIVATE_ROOT/argus/agent/$AGENT_HOSTNAME" +AGENT_HEALTH_DIR="$PRIVATE_ROOT/argus/agent/health/$AGENT_HOSTNAME" +MASTER_PRIVATE_DIR="$PRIVATE_ROOT/argus/master" +METRIC_PRIVATE_DIR="$PRIVATE_ROOT/argus/metric/prometheus" + +mkdir -p "$AGENT_CONFIG_DIR" +mkdir -p "$AGENT_HEALTH_DIR" +mkdir -p "$MASTER_PRIVATE_DIR" +mkdir -p "$METRIC_PRIVATE_DIR" +mkdir -p "$TMP_ROOT" + +cat > "$AGENT_CONFIG_DIR/config" </dev/null +./scripts/build_images.sh --tag argus-master:dev +popd >/dev/null + +pushd "$AGENT_ROOT" >/dev/null +./scripts/build_images.sh --tag argus-agent:dev +popd >/dev/null + +echo "[INFO] Agent E2E bootstrap complete" diff --git a/src/agent/tests/scripts/02_up.sh b/src/agent/tests/scripts/02_up.sh new file mode 100755 index 0000000..06c1e8e --- /dev/null +++ b/src/agent/tests/scripts/02_up.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true + +docker network rm tests_default >/dev/null 2>&1 || true + +pushd "$TEST_ROOT" >/dev/null +compose down --remove-orphans || true +compose up -d +popd >/dev/null + +echo "[INFO] Master+Agent stack started" diff --git a/src/agent/tests/scripts/03_wait_and_assert_registration.sh b/src/agent/tests/scripts/03_wait_and_assert_registration.sh new file mode 100755 index 0000000..7e9c127 --- /dev/null +++ b/src/agent/tests/scripts/03_wait_and_assert_registration.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:32300/api/v1/master" +AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0" +NODE_FILE="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME/node.json" + +mkdir -p "$TMP_ROOT" + +node_id="" +for _ in {1..30}; do + sleep 2 + response=$(curl -sS "$API_BASE/nodes" || true) + if [[ -z "$response" ]]; then + continue + fi + list_file="$TMP_ROOT/nodes_list.json" + echo "$response" > "$list_file" + node_id=$(python3 - "$list_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + nodes = json.load(handle) +print(nodes[0]["id"] if nodes else "") +PY +) + if [[ -n "$node_id" ]]; then + break + fi + done + +if [[ -z "$node_id" ]]; then + echo "[ERROR] Agent did not register within timeout" >&2 + exit 1 +fi + +echo "$node_id" > "$TMP_ROOT/node_id" + +if [[ ! -f "$NODE_FILE" ]]; then + echo "[ERROR] node.json not created at $NODE_FILE" >&2 + exit 1 +fi + +python3 - "$NODE_FILE" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +assert "id" in node and node["id"], "node.json missing id" +PY + +detail_file="$TMP_ROOT/initial_detail.json" +curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file" +python3 - "$detail_file" "$TMP_ROOT/initial_ip" <<'PY' +import json, sys, pathlib +with open(sys.argv[1]) as handle: + node = json.load(handle) +ip = node["meta_data"].get("ip") +if not ip: + raise SystemExit("meta_data.ip missing") +pathlib.Path(sys.argv[2]).write_text(ip) +PY + +echo "[INFO] Agent registered with node id $node_id" diff --git a/src/agent/tests/scripts/04_write_health_files.sh b/src/agent/tests/scripts/04_write_health_files.sh new file mode 100755 index 0000000..d5ec974 --- /dev/null +++ b/src/agent/tests/scripts/04_write_health_files.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/dev-e2euser-e2einst-pod-0" + +cat > "$HEALTH_DIR/log-fluentbit.json" < "$HEALTH_DIR/metric-node-exporter.json" <&2 + exit 1 +fi + +if [[ ! -f "$NODES_JSON" ]]; then + echo "[ERROR] nodes.json missing at $NODES_JSON" >&2 + exit 1 +fi + +python3 - "$NODES_JSON" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + nodes = json.load(handle) +assert len(nodes) == 1, nodes +entry = nodes[0] +assert entry["node_id"], entry +PY + +echo "[INFO] Master reflects agent health and nodes.json entries" diff --git a/src/agent/tests/scripts/06_restart_agent_and_reregister.sh b/src/agent/tests/scripts/06_restart_agent_and_reregister.sh new file mode 100755 index 0000000..43e5ba5 --- /dev/null +++ b/src/agent/tests/scripts/06_restart_agent_and_reregister.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TMP_ROOT="$TEST_ROOT/tmp" +API_BASE="http://localhost:32300/api/v1/master" +NODE_ID="$(cat "$TMP_ROOT/node_id")" +AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0" +NETWORK_NAME="tests_default" +NEW_AGENT_IP="172.28.0.200" + +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +before_file="$TMP_ROOT/before_restart.json" +curl -sS "$API_BASE/nodes/$NODE_ID" -o "$before_file" +prev_last_updated=$(python3 - "$before_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +print(node.get("last_updated", "")) +PY +) +prev_ip=$(python3 - "$before_file" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +print(node["meta_data"].get("ip", "")) +PY +) +initial_ip=$(cat "$TMP_ROOT/initial_ip") +if [[ "$prev_ip" != "$initial_ip" ]]; then + echo "[ERROR] Expected initial IP $initial_ip, got $prev_ip" >&2 + exit 1 +fi + +pushd "$TEST_ROOT" >/dev/null +compose rm -sf agent +popd >/dev/null + +docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true + +AGENT_DIR="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME" +HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/$AGENT_HOSTNAME" + +# 先以 sleep 方式启动容器,确保我们掌握注册时的网络状态 +if ! docker run -d \ + --name argus-agent-e2e \ + --hostname "$AGENT_HOSTNAME" \ + --network "$NETWORK_NAME" \ + --ip "$NEW_AGENT_IP" \ + -v "$AGENT_DIR:/private/argus/agent/$AGENT_HOSTNAME" \ + -v "$HEALTH_DIR:/private/argus/agent/health/$AGENT_HOSTNAME" \ + argus-agent:dev \ + sleep 300 >/dev/null; then + echo "[ERROR] Failed to start agent container with custom IP" >&2 + exit 1 +fi + +# 在容器内启动真实 agent 进程 +if ! docker exec -d argus-agent-e2e python -m app.main; then + echo "[ERROR] Failed to spawn agent process inside container" >&2 + exit 1 +fi + +success=false +detail_file="$TMP_ROOT/post_restart.json" +for _ in {1..20}; do + sleep 3 + if ! curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"; then + continue + fi + if python3 - "$detail_file" "$prev_last_updated" "$NODE_ID" "$prev_ip" "$NEW_AGENT_IP" <<'PY' +import json, sys +with open(sys.argv[1]) as handle: + node = json.load(handle) +prev_last_updated = sys.argv[2] +expected_id = sys.argv[3] +old_ip = sys.argv[4] +expected_ip = sys.argv[5] +last_updated = node.get("last_updated") +current_ip = node["meta_data"].get("ip") +assert node["id"] == expected_id +if current_ip != expected_ip: + raise SystemExit(1) +if current_ip == old_ip: + raise SystemExit(1) +if not last_updated or last_updated == prev_last_updated: + raise SystemExit(1) +PY + then + success=true + break + fi +done + +if [[ "$success" != true ]]; then + echo "[ERROR] Agent did not report expected new IP $NEW_AGENT_IP after restart" >&2 + exit 1 +fi + +echo "[INFO] Agent restart produced successful re-registration with IP change" diff --git a/src/agent/tests/scripts/07_down.sh b/src/agent/tests/scripts/07_down.sh new file mode 100755 index 0000000..db7e9db --- /dev/null +++ b/src/agent/tests/scripts/07_down.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +compose() { + if docker compose version >/dev/null 2>&1; then + docker compose "$@" + else + docker-compose "$@" + fi +} + +docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true + +pushd "$TEST_ROOT" >/dev/null +compose down --remove-orphans +popd >/dev/null + +rm -rf "$TEST_ROOT/private" +rm -rf "$TEST_ROOT/tmp" + +echo "[INFO] Agent E2E environment cleaned up"