[#1] agent模块开发

This commit is contained in:
yuyr 2025-09-24 04:19:37 +00:00
parent d9461b3991
commit c3f111e12f
25 changed files with 1123 additions and 0 deletions

23
src/agent/Dockerfile Normal file
View File

@ -0,0 +1,23 @@
FROM python:3.11-slim
SHELL ["/bin/bash", "-c"]
ARG PIP_INDEX_URL=
ENV PIP_NO_CACHE_DIR=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
COPY requirements.txt ./
RUN set -euxo pipefail \
&& python -m pip install --upgrade pip \
&& if [[ -n "$PIP_INDEX_URL" ]]; then \
PIP_INDEX_URL="$PIP_INDEX_URL" python -m pip install -r requirements.txt; \
else \
python -m pip install -r requirements.txt; \
fi
COPY app ./app
CMD ["python", "-m", "app.main"]

View File

@ -0,0 +1,31 @@
# Argus Agent Module
Python agent that registers with the Argus master service, persists node information, gathers host metadata, reads module health files, and periodically reports status.
## Build & Run
```bash
cd src/agent
./scripts/build_images.sh # builds argus-agent:dev
```
Runtime expects a configuration file (generated by installer) at `/private/argus/agent/<hostname>/config`. Key fields:
- `HOSTNAME`, `NODE_FILE`, `VERSION`
- `MASTER_ENDPOINT` (e.g. `http://master:3000`)
- `REPORT_INTERVAL_SECONDS`
- `SUBMODULE_HEALTH_FILE_DIR` (supports `{hostname}` placeholder)
- optional `GPU_NUMBER`
Health files live under `/private/argus/agent/health/<hostname>/` and must follow `<prefix>-*.json` naming (e.g. `log-fluentbit.json`). The agent sends parsed JSON objects keyed by file stem.
## Tests
Docker-based E2E stack (master + agent):
```bash
cd src/agent/tests
./scripts/00_e2e_test.sh
```
The scripts provision configs/health directories under `tests/private/` and clean up via `07_down.sh`.

View File

60
src/agent/app/client.py Normal file
View File

@ -0,0 +1,60 @@
from __future__ import annotations
import json
from typing import Any, Dict, Optional
import requests
from .log import get_logger
LOGGER = get_logger("argus.agent.client")
class MasterAPIError(Exception):
def __init__(self, message: str, status_code: int, payload: Optional[Dict[str, Any]] = None) -> None:
super().__init__(message)
self.status_code = status_code
self.payload = payload or {}
class AgentClient:
def __init__(self, base_url: str, *, timeout: int = 10) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._session = requests.Session()
def register_node(self, body: Dict[str, Any]) -> Dict[str, Any]:
"""调用 master 注册接口,返回节点对象。"""
url = f"{self._base_url}/api/v1/master/nodes"
response = self._session.post(url, json=body, timeout=self._timeout)
return self._parse_response(response, "Failed to register node")
def update_status(self, node_id: str, body: Dict[str, Any]) -> Dict[str, Any]:
"""上报健康信息,由 master 更新 last_report。"""
url = f"{self._base_url}/api/v1/master/nodes/{node_id}/status"
response = self._session.put(url, json=body, timeout=self._timeout)
return self._parse_response(response, "Failed to update node status")
def _parse_response(self, response: requests.Response, error_prefix: str) -> Dict[str, Any]:
content_type = response.headers.get("Content-Type", "")
payload: Dict[str, Any] | None = None
if "application/json" in content_type:
try:
payload = response.json()
except json.JSONDecodeError:
LOGGER.warning("Response contained invalid JSON", extra={"status": response.status_code})
if response.status_code >= 400:
message = payload.get("error") if isinstance(payload, dict) else response.text
raise MasterAPIError(
f"{error_prefix}: {message}",
status_code=response.status_code,
payload=payload if isinstance(payload, dict) else None,
)
if payload is None:
try:
payload = response.json()
except json.JSONDecodeError as exc:
raise MasterAPIError("Master returned non-JSON payload", response.status_code) from exc
return payload

114
src/agent/app/collector.py Normal file
View File

@ -0,0 +1,114 @@
from __future__ import annotations
import os
import re
import socket
import subprocess
from pathlib import Path
from typing import Any, Dict
from .config import AgentConfig
from .log import get_logger
LOGGER = get_logger("argus.agent.collector")
_HOSTNAME_PATTERN = re.compile(r"^([^-]+)-([^-]+)-([^-]+)-.*$")
def collect_metadata(config: AgentConfig) -> Dict[str, Any]:
"""汇总节点注册需要的静态信息。"""
hostname = config.hostname
env, user, instance = _parse_hostname(hostname)
meta = {
"hostname": hostname,
"ip": _detect_ip_address(),
"env": env,
"user": user,
"instance": instance,
"cpu_number": _detect_cpu_count(),
"memory_in_bytes": _detect_memory_bytes(),
"gpu_number": _detect_gpu_count(config),
}
return meta
def _parse_hostname(hostname: str) -> tuple[str, str, str]:
"""按照约定的 env-user-instance 前缀拆解主机名。"""
match = _HOSTNAME_PATTERN.match(hostname)
if not match:
LOGGER.warning("Hostname does not match expected pattern", extra={"hostname": hostname})
return "", "", ""
return match.group(1), match.group(2), match.group(3)
def _detect_cpu_count() -> int:
count = os.cpu_count()
return count if count is not None else 0
def _detect_memory_bytes() -> int:
"""优先读取 cgroup 限额,失败时退回 /proc/meminfo。"""
cgroup_path = Path("/sys/fs/cgroup/memory.max")
try:
raw = cgroup_path.read_text(encoding="utf-8").strip()
if raw and raw != "max":
return int(raw)
except FileNotFoundError:
LOGGER.debug("cgroup memory.max not found, falling back to /proc/meminfo")
except ValueError:
LOGGER.warning("Failed to parse memory.max, falling back", extra={"value": raw})
try:
with open("/proc/meminfo", "r", encoding="utf-8") as handle:
for line in handle:
if line.startswith("MemTotal:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) * 1024
except FileNotFoundError:
LOGGER.error("/proc/meminfo not found; defaulting memory to 0")
return 0
def _detect_gpu_count(config: AgentConfig) -> int:
"""采集 GPU 数量,可被配置覆盖。"""
if config.gpu_number_override is not None:
return config.gpu_number_override
try:
proc = subprocess.run(
["nvidia-smi", "-L"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5,
)
except FileNotFoundError:
LOGGER.debug("nvidia-smi not available; assuming 0 GPUs")
return 0
except subprocess.SubprocessError as exc:
LOGGER.warning("nvidia-smi invocation failed", extra={"error": str(exc)})
return 0
if proc.returncode != 0:
LOGGER.debug("nvidia-smi returned non-zero", extra={"stderr": proc.stderr.strip()})
return 0
count = sum(1 for line in proc.stdout.splitlines() if line.strip())
return count
def _detect_ip_address() -> str:
"""尝试通过 UDP socket 获得容器出口 IP失败则回退解析主机名。"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]
except OSError:
LOGGER.debug("UDP socket trick failed; falling back to hostname lookup")
try:
return socket.gethostbyname(socket.gethostname())
except OSError:
LOGGER.warning("Unable to resolve hostname to IP; defaulting to 127.0.0.1")
return "127.0.0.1"

101
src/agent/app/config.py Normal file
View File

@ -0,0 +1,101 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class AgentConfig:
hostname: str
node_file: str
version: str
master_endpoint: str
report_interval_seconds: int
health_dir_template: str
gpu_number_override: int | None
request_timeout_seconds: int = 10
@property
def health_dir(self) -> str:
return self.health_dir_template.format(hostname=self.hostname)
def _parse_config_file(path: str) -> dict[str, str]:
result: dict[str, str] = {}
try:
with open(path, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
result[key.strip().upper()] = value.strip()
except FileNotFoundError:
raise FileNotFoundError(f"Agent config file not found: {path}") from None
return result
def load_config(path: str) -> AgentConfig:
"""读取配置文件并结合环境变量,返回 AgentConfig。"""
config_values = _parse_config_file(path)
force_env = os.environ.get("AGENT_FORCE_ENV", "0").lower() in {"1", "true", "yes"}
def read_key(key: str, default: str | None = None, *, required: bool = False) -> str:
env_key = f"AGENT_{key}"
if env_key in os.environ:
return os.environ[env_key]
if force_env and key in os.environ:
return os.environ[key]
if key in config_values:
return config_values[key]
if default is not None:
return default
if required:
raise ValueError(f"Missing required configuration key: {key}")
return ""
hostname = read_key("HOSTNAME", required=True)
node_file = read_key("NODE_FILE", f"/private/argus/agent/{hostname}/node.json")
version = read_key("VERSION", "1.0.0")
master_endpoint = read_key("MASTER_ENDPOINT", required=True)
report_interval_raw = read_key("REPORT_INTERVAL_SECONDS", "60")
health_dir_template = read_key(
"SUBMODULE_HEALTH_FILE_DIR",
f"/private/argus/agent/health/{{hostname}}/",
)
gpu_override_raw = read_key("GPU_NUMBER", "")
try:
report_interval_seconds = int(report_interval_raw)
except ValueError as exc:
raise ValueError("REPORT_INTERVAL_SECONDS must be an integer") from exc
if report_interval_seconds <= 0:
raise ValueError("REPORT_INTERVAL_SECONDS must be positive")
gpu_override = None
if gpu_override_raw:
try:
gpu_override = int(gpu_override_raw)
except ValueError as exc:
raise ValueError("GPU_NUMBER must be an integer when provided") from exc
if gpu_override < 0:
raise ValueError("GPU_NUMBER must be non-negative")
if not master_endpoint.startswith("http://") and not master_endpoint.startswith("https://"):
master_endpoint = f"http://{master_endpoint}"
Path(node_file).parent.mkdir(parents=True, exist_ok=True)
Path(health_dir_template.format(hostname=hostname)).mkdir(parents=True, exist_ok=True)
return AgentConfig(
hostname=hostname,
node_file=node_file,
version=version,
master_endpoint=master_endpoint.rstrip("/"),
report_interval_seconds=report_interval_seconds,
health_dir_template=health_dir_template,
gpu_number_override=gpu_override,
)

View File

@ -0,0 +1,32 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict
from .log import get_logger
LOGGER = get_logger("argus.agent.health")
def read_health_directory(path: str) -> Dict[str, Any]:
"""读取目录中所有 <prefix>-*.json 文件并返回 JSON 映射。"""
result: Dict[str, Any] = {}
directory = Path(path)
if not directory.exists():
LOGGER.debug("Health directory does not exist", extra={"path": str(directory)})
return result
for health_file in sorted(directory.glob("*.json")):
if "-" not in health_file.stem:
LOGGER.debug("Skipping non-prefixed health file", extra={"file": health_file.name})
continue
try:
with health_file.open("r", encoding="utf-8") as handle:
content = json.load(handle)
result[health_file.stem] = content
except json.JSONDecodeError as exc:
LOGGER.warning("Failed to parse health file", extra={"file": health_file.name, "error": str(exc)})
except OSError as exc:
LOGGER.warning("Failed to read health file", extra={"file": health_file.name, "error": str(exc)})
return result

18
src/agent/app/log.py Normal file
View File

@ -0,0 +1,18 @@
from __future__ import annotations
import logging
import os
_LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s - %(message)s"
def setup_logging() -> None:
level_name = os.environ.get("AGENT_LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
logging.basicConfig(level=level, format=_LOG_FORMAT)
def get_logger(name: str) -> logging.Logger:
setup_logging()
return logging.getLogger(name)

185
src/agent/app/main.py Normal file
View File

@ -0,0 +1,185 @@
from __future__ import annotations
import argparse
import signal
import sys
import time
from datetime import datetime, timezone
from typing import Optional
from .client import AgentClient, MasterAPIError
from .collector import collect_metadata
from .config import AgentConfig, load_config
from .health_reader import read_health_directory
from .log import get_logger, setup_logging
from .state import clear_node_state, load_node_state, save_node_state
LOGGER = get_logger("argus.agent")
def _current_timestamp() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
class StopSignal:
def __init__(self) -> None:
self._stop = False
def set(self, *_args) -> None: # type: ignore[override]
self._stop = True
def is_set(self) -> bool:
return self._stop
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Argus agent")
parser.add_argument(
"--config",
dest="config_path",
default=None,
help="Path to agent config file",
)
return parser.parse_args(argv)
def main(argv: Optional[list[str]] = None) -> int:
setup_logging()
args = parse_args(argv or sys.argv[1:])
stop_signal = StopSignal()
signal.signal(signal.SIGTERM, stop_signal.set)
signal.signal(signal.SIGINT, stop_signal.set)
try:
config_path = args.config_path or _default_config_path()
config = load_config(config_path)
except Exception as exc:
LOGGER.error("Failed to load configuration", extra={"error": str(exc)})
return 1
LOGGER.info(
"Agent starting",
extra={
"hostname": config.hostname,
"master_endpoint": config.master_endpoint,
"node_file": config.node_file,
},
)
client = AgentClient(config.master_endpoint, timeout=config.request_timeout_seconds)
node_state = load_node_state(config.node_file) or {}
node_id = node_state.get("id")
# 与 master 建立注册关系(支持重注册),失败则重试
register_response = _register_with_retry(client, config, node_id, stop_signal)
if register_response is None:
LOGGER.info("Registration aborted due to shutdown signal")
return 0
node_id = register_response.get("id")
if not node_id:
LOGGER.error("Master did not return node id; aborting")
return 1
save_node_state(config.node_file, register_response)
LOGGER.info("Entering status report loop", extra={"node_id": node_id})
_status_loop(client, config, node_id, stop_signal)
return 0
def _default_config_path() -> str:
from socket import gethostname
hostname = gethostname()
return f"/private/argus/agent/{hostname}/config"
def _register_with_retry(
client: AgentClient,
config: AgentConfig,
node_id: Optional[str],
stop_signal: StopSignal,
):
backoff = 5
while not stop_signal.is_set():
payload = {
"name": config.hostname,
"type": "agent",
"meta_data": collect_metadata(config),
"version": config.version,
}
if node_id:
payload["id"] = node_id
try:
response = client.register_node(payload)
LOGGER.info("Registration successful", extra={"node_id": response.get("id")})
save_node_state(config.node_file, response)
return response
except MasterAPIError as exc:
if exc.status_code == 404 and node_id:
LOGGER.warning(
"Master does not recognise node id; clearing local node state",
extra={"node_id": node_id},
)
clear_node_state(config.node_file)
node_id = None
elif exc.status_code == 500 and node_id:
# id 与 name 不匹配通常意味着配置异常,记录但继续重试
LOGGER.error(
"Master rejected node due to id/name mismatch; will retry",
extra={"node_id": node_id},
)
else:
LOGGER.error("Registration failed", extra={"status_code": exc.status_code, "error": str(exc)})
time.sleep(min(backoff, 60))
backoff = min(backoff * 2, 60)
except Exception as exc: # pragma: no cover - defensive
LOGGER.exception("Unexpected error during registration", extra={"error": str(exc)})
time.sleep(min(backoff, 60))
backoff = min(backoff * 2, 60)
return None
def _status_loop(
client: AgentClient,
config: AgentConfig,
node_id: str,
stop_signal: StopSignal,
) -> None:
interval = config.report_interval_seconds
while not stop_signal.is_set():
timestamp = _current_timestamp()
health_payload = read_health_directory(config.health_dir)
body = {
"timestamp": timestamp,
"health": health_payload,
}
try:
response = client.update_status(node_id, body)
LOGGER.info(
"Status report succeeded",
extra={"node_id": node_id, "health_keys": list(health_payload.keys())},
)
save_node_state(config.node_file, response)
except MasterAPIError as exc:
# 保持循环继续执行,等待下一次重试
LOGGER.error(
"Failed to report status",
extra={"status_code": exc.status_code, "error": str(exc)},
)
except Exception as exc: # pragma: no cover - defensive
LOGGER.exception("Unexpected error during status report", extra={"error": str(exc)})
for _ in range(interval):
if stop_signal.is_set():
break
time.sleep(1)
LOGGER.info("Stop signal received; exiting status loop")
if __name__ == "__main__":
sys.exit(main())

44
src/agent/app/state.py Normal file
View File

@ -0,0 +1,44 @@
from __future__ import annotations
import json
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional
from .log import get_logger
LOGGER = get_logger("argus.agent.state")
def load_node_state(path: str) -> Optional[Dict[str, Any]]:
"""读取本地 node.json容器重启后沿用之前的 ID。"""
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except FileNotFoundError:
return None
except json.JSONDecodeError as exc:
LOGGER.warning("node.json is invalid JSON; ignoring", extra={"error": str(exc)})
return None
def save_node_state(path: str, data: Dict[str, Any]) -> None:
"""原子化写入 node.json避免并发读取坏数据。"""
directory = Path(path).parent
directory.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False, encoding="utf-8") as tmp:
json.dump(data, tmp, separators=(",", ":"))
tmp.flush()
os.fsync(tmp.fileno())
temp_path = tmp.name
os.replace(temp_path, path)
def clear_node_state(path: str) -> None:
try:
os.remove(path)
except FileNotFoundError:
return
except OSError as exc:
LOGGER.warning("Failed to remove node state file", extra={"error": str(exc), "path": path})

View File

View File

@ -0,0 +1 @@
requests==2.31.0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--intranet] [--tag <image_tag>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}"
BUILD_ARGS=()
while [[ "$#" -gt 0 ]]; do
case "$1" in
--intranet)
INTRANET_INDEX="${INTRANET_INDEX:-https://pypi.tuna.tsinghua.edu.cn/simple}"
BUILD_ARGS+=("--build-arg" "PIP_INDEX_URL=${INTRANET_INDEX}")
shift
;;
--tag)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAG="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
echo "[INFO] Building image $IMAGE_TAG"
docker build "${BUILD_ARGS[@]}" -t "$IMAGE_TAG" "$PROJECT_ROOT"
echo "[OK] Image $IMAGE_TAG built"

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--file <tar_path>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
DEFAULT_INPUT="$PROJECT_ROOT/images/argus-agent-dev.tar"
IMAGE_TAR="$DEFAULT_INPUT"
while [[ "$#" -gt 0 ]]; do
case "$1" in
--file)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAR="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
if [[ ! -f "$IMAGE_TAR" ]]; then
echo "[ERROR] Image tarball not found: $IMAGE_TAR" >&2
exit 1
fi
echo "[INFO] Loading image from $IMAGE_TAR"
docker image load -i "$IMAGE_TAR"
echo "[OK] Image loaded"

View File

@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
echo "Usage: $0 [--tag <image_tag>] [--output <tar_path>]" >&2
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
DEFAULT_OUTPUT="$PROJECT_ROOT/images/argus-agent-dev.tar"
IMAGE_TAG="${IMAGE_TAG:-argus-agent:dev}"
OUTPUT_PATH="$DEFAULT_OUTPUT"
while [[ "$#" -gt 0 ]]; do
case "$1" in
--tag)
[[ $# -ge 2 ]] || { usage; exit 1; }
IMAGE_TAG="$2"
shift 2
;;
--output)
[[ $# -ge 2 ]] || { usage; exit 1; }
OUTPUT_PATH="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
mkdir -p "$(dirname "$OUTPUT_PATH")"
echo "[INFO] Saving image $IMAGE_TAG to $OUTPUT_PATH"
docker image save "$IMAGE_TAG" -o "$OUTPUT_PATH"
echo "[OK] Image saved"

2
src/agent/tests/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
private/
tmp/

View File

@ -0,0 +1,31 @@
services:
master:
image: argus-master:dev
container_name: argus-master-agent-e2e
environment:
- OFFLINE_THRESHOLD_SECONDS=6
- ONLINE_THRESHOLD_SECONDS=2
- SCHEDULER_INTERVAL_SECONDS=1
ports:
- "32300:3000"
volumes:
- ./private/argus/master:/private/argus/master
- ./private/argus/metric/prometheus:/private/argus/metric/prometheus
agent:
image: argus-agent:dev
container_name: argus-agent-e2e
hostname: dev-e2euser-e2einst-pod-0
depends_on:
- master
volumes:
- ./private/argus/agent/dev-e2euser-e2einst-pod-0:/private/argus/agent/dev-e2euser-e2einst-pod-0
- ./private/argus/agent/health/dev-e2euser-e2einst-pod-0:/private/argus/agent/health/dev-e2euser-e2einst-pod-0
networks:
default:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.28.0.0/16

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SCRIPTS=(
"01_bootstrap.sh"
"02_up.sh"
"03_wait_and_assert_registration.sh"
"04_write_health_files.sh"
"05_assert_status_on_master.sh"
"06_restart_agent_and_reregister.sh"
"07_down.sh"
)
for script in "${SCRIPTS[@]}"; do
echo "[TEST] Running $script"
"$SCRIPT_DIR/$script"
echo "[TEST] $script completed"
echo
done
echo "[TEST] Agent module E2E tests completed"

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
AGENT_ROOT="$(cd "$TEST_ROOT/.." && pwd)"
MASTER_ROOT="$(cd "$AGENT_ROOT/../master" && pwd)"
PRIVATE_ROOT="$TEST_ROOT/private"
TMP_ROOT="$TEST_ROOT/tmp"
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
AGENT_CONFIG_DIR="$PRIVATE_ROOT/argus/agent/$AGENT_HOSTNAME"
AGENT_HEALTH_DIR="$PRIVATE_ROOT/argus/agent/health/$AGENT_HOSTNAME"
MASTER_PRIVATE_DIR="$PRIVATE_ROOT/argus/master"
METRIC_PRIVATE_DIR="$PRIVATE_ROOT/argus/metric/prometheus"
mkdir -p "$AGENT_CONFIG_DIR"
mkdir -p "$AGENT_HEALTH_DIR"
mkdir -p "$MASTER_PRIVATE_DIR"
mkdir -p "$METRIC_PRIVATE_DIR"
mkdir -p "$TMP_ROOT"
cat > "$AGENT_CONFIG_DIR/config" <<CONFIG
HOSTNAME=$AGENT_HOSTNAME
NODE_FILE=/private/argus/agent/$AGENT_HOSTNAME/node.json
VERSION=1.1.0
MASTER_ENDPOINT=http://master:3000
REPORT_INTERVAL_SECONDS=2
SUBMODULE_HEALTH_FILE_DIR=/private/argus/agent/health/{hostname}/
GPU_NUMBER=
IP_OVERRIDE=
CONFIG
touch "$AGENT_HEALTH_DIR/.keep"
pushd "$MASTER_ROOT" >/dev/null
./scripts/build_images.sh --tag argus-master:dev
popd >/dev/null
pushd "$AGENT_ROOT" >/dev/null
./scripts/build_images.sh --tag argus-agent:dev
popd >/dev/null
echo "[INFO] Agent E2E bootstrap complete"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
docker network rm tests_default >/dev/null 2>&1 || true
pushd "$TEST_ROOT" >/dev/null
compose down --remove-orphans || true
compose up -d
popd >/dev/null
echo "[INFO] Master+Agent stack started"

View File

@ -0,0 +1,65 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:32300/api/v1/master"
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
NODE_FILE="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME/node.json"
mkdir -p "$TMP_ROOT"
node_id=""
for _ in {1..30}; do
sleep 2
response=$(curl -sS "$API_BASE/nodes" || true)
if [[ -z "$response" ]]; then
continue
fi
list_file="$TMP_ROOT/nodes_list.json"
echo "$response" > "$list_file"
node_id=$(python3 - "$list_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
nodes = json.load(handle)
print(nodes[0]["id"] if nodes else "")
PY
)
if [[ -n "$node_id" ]]; then
break
fi
done
if [[ -z "$node_id" ]]; then
echo "[ERROR] Agent did not register within timeout" >&2
exit 1
fi
echo "$node_id" > "$TMP_ROOT/node_id"
if [[ ! -f "$NODE_FILE" ]]; then
echo "[ERROR] node.json not created at $NODE_FILE" >&2
exit 1
fi
python3 - "$NODE_FILE" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
assert "id" in node and node["id"], "node.json missing id"
PY
detail_file="$TMP_ROOT/initial_detail.json"
curl -sS "$API_BASE/nodes/$node_id" -o "$detail_file"
python3 - "$detail_file" "$TMP_ROOT/initial_ip" <<'PY'
import json, sys, pathlib
with open(sys.argv[1]) as handle:
node = json.load(handle)
ip = node["meta_data"].get("ip")
if not ip:
raise SystemExit("meta_data.ip missing")
pathlib.Path(sys.argv[2]).write_text(ip)
PY
echo "[INFO] Agent registered with node id $node_id"

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/dev-e2euser-e2einst-pod-0"
cat > "$HEALTH_DIR/log-fluentbit.json" <<JSON
{
"status": "healthy",
"timestamp": "2023-10-05T12:05:00Z"
}
JSON
cat > "$HEALTH_DIR/metric-node-exporter.json" <<JSON
{
"status": "healthy",
"timestamp": "2023-10-05T12:05:00Z"
}
JSON
echo "[INFO] Health files written"

View File

@ -0,0 +1,53 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:32300/api/v1/master"
NODE_ID="$(cat "$TMP_ROOT/node_id")"
NODES_JSON="$TEST_ROOT/private/argus/metric/prometheus/nodes.json"
success=false
detail_file="$TMP_ROOT/agent_status_detail.json"
for _ in {1..20}; do
sleep 2
if ! curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"; then
continue
fi
if python3 - "$detail_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
if node["status"] != "online":
raise SystemExit(1)
health = node.get("health", {})
if "log-fluentbit" not in health or "metric-node-exporter" not in health:
raise SystemExit(1)
PY
then
success=true
break
fi
done
if [[ "$success" != true ]]; then
echo "[ERROR] Node did not report health data in time" >&2
exit 1
fi
if [[ ! -f "$NODES_JSON" ]]; then
echo "[ERROR] nodes.json missing at $NODES_JSON" >&2
exit 1
fi
python3 - "$NODES_JSON" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
nodes = json.load(handle)
assert len(nodes) == 1, nodes
entry = nodes[0]
assert entry["node_id"], entry
PY
echo "[INFO] Master reflects agent health and nodes.json entries"

View File

@ -0,0 +1,108 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
TMP_ROOT="$TEST_ROOT/tmp"
API_BASE="http://localhost:32300/api/v1/master"
NODE_ID="$(cat "$TMP_ROOT/node_id")"
AGENT_HOSTNAME="dev-e2euser-e2einst-pod-0"
NETWORK_NAME="tests_default"
NEW_AGENT_IP="172.28.0.200"
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
before_file="$TMP_ROOT/before_restart.json"
curl -sS "$API_BASE/nodes/$NODE_ID" -o "$before_file"
prev_last_updated=$(python3 - "$before_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
print(node.get("last_updated", ""))
PY
)
prev_ip=$(python3 - "$before_file" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
print(node["meta_data"].get("ip", ""))
PY
)
initial_ip=$(cat "$TMP_ROOT/initial_ip")
if [[ "$prev_ip" != "$initial_ip" ]]; then
echo "[ERROR] Expected initial IP $initial_ip, got $prev_ip" >&2
exit 1
fi
pushd "$TEST_ROOT" >/dev/null
compose rm -sf agent
popd >/dev/null
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
AGENT_DIR="$TEST_ROOT/private/argus/agent/$AGENT_HOSTNAME"
HEALTH_DIR="$TEST_ROOT/private/argus/agent/health/$AGENT_HOSTNAME"
# 先以 sleep 方式启动容器,确保我们掌握注册时的网络状态
if ! docker run -d \
--name argus-agent-e2e \
--hostname "$AGENT_HOSTNAME" \
--network "$NETWORK_NAME" \
--ip "$NEW_AGENT_IP" \
-v "$AGENT_DIR:/private/argus/agent/$AGENT_HOSTNAME" \
-v "$HEALTH_DIR:/private/argus/agent/health/$AGENT_HOSTNAME" \
argus-agent:dev \
sleep 300 >/dev/null; then
echo "[ERROR] Failed to start agent container with custom IP" >&2
exit 1
fi
# 在容器内启动真实 agent 进程
if ! docker exec -d argus-agent-e2e python -m app.main; then
echo "[ERROR] Failed to spawn agent process inside container" >&2
exit 1
fi
success=false
detail_file="$TMP_ROOT/post_restart.json"
for _ in {1..20}; do
sleep 3
if ! curl -sS "$API_BASE/nodes/$NODE_ID" -o "$detail_file"; then
continue
fi
if python3 - "$detail_file" "$prev_last_updated" "$NODE_ID" "$prev_ip" "$NEW_AGENT_IP" <<'PY'
import json, sys
with open(sys.argv[1]) as handle:
node = json.load(handle)
prev_last_updated = sys.argv[2]
expected_id = sys.argv[3]
old_ip = sys.argv[4]
expected_ip = sys.argv[5]
last_updated = node.get("last_updated")
current_ip = node["meta_data"].get("ip")
assert node["id"] == expected_id
if current_ip != expected_ip:
raise SystemExit(1)
if current_ip == old_ip:
raise SystemExit(1)
if not last_updated or last_updated == prev_last_updated:
raise SystemExit(1)
PY
then
success=true
break
fi
done
if [[ "$success" != true ]]; then
echo "[ERROR] Agent did not report expected new IP $NEW_AGENT_IP after restart" >&2
exit 1
fi
echo "[INFO] Agent restart produced successful re-registration with IP change"

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TEST_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
compose() {
if docker compose version >/dev/null 2>&1; then
docker compose "$@"
else
docker-compose "$@"
fi
}
docker container rm -f argus-agent-e2e >/dev/null 2>&1 || true
pushd "$TEST_ROOT" >/dev/null
compose down --remove-orphans
popd >/dev/null
rm -rf "$TEST_ROOT/private"
rm -rf "$TEST_ROOT/tmp"
echo "[INFO] Agent E2E environment cleaned up"