argus/src/agent/app/collector.py

112 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
import re
import socket
import subprocess
from pathlib import Path
from typing import Any, Dict
from .config import AgentConfig
from .log import get_logger
LOGGER = get_logger("argus.agent.collector")
_HOSTNAME_PATTERN = re.compile(r"^([^-]+)-([^-]+)-([^-]+)-.*$")
def collect_metadata(config: AgentConfig) -> Dict[str, Any]:
"""汇总节点注册需要的静态信息。"""
hostname = config.hostname
env, user, instance = _parse_hostname(hostname)
meta = {
"hostname": hostname,
"ip": _detect_ip_address(),
"env": env,
"user": user,
"instance": instance,
"cpu_number": _detect_cpu_count(),
"memory_in_bytes": _detect_memory_bytes(),
"gpu_number": _detect_gpu_count(),
}
return meta
def _parse_hostname(hostname: str) -> tuple[str, str, str]:
"""按照约定的 env-user-instance 前缀拆解主机名。"""
match = _HOSTNAME_PATTERN.match(hostname)
if not match:
LOGGER.warning("Hostname does not match expected pattern", extra={"hostname": hostname})
return "", "", ""
return match.group(1), match.group(2), match.group(3)
def _detect_cpu_count() -> int:
count = os.cpu_count()
return count if count is not None else 0
def _detect_memory_bytes() -> int:
"""优先读取 cgroup 限额,失败时退回 /proc/meminfo。"""
cgroup_path = Path("/sys/fs/cgroup/memory.max")
try:
raw = cgroup_path.read_text(encoding="utf-8").strip()
if raw and raw != "max":
return int(raw)
except FileNotFoundError:
LOGGER.debug("cgroup memory.max not found, falling back to /proc/meminfo")
except ValueError:
LOGGER.warning("Failed to parse memory.max, falling back", extra={"value": raw})
try:
with open("/proc/meminfo", "r", encoding="utf-8") as handle:
for line in handle:
if line.startswith("MemTotal:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) * 1024
except FileNotFoundError:
LOGGER.error("/proc/meminfo not found; defaulting memory to 0")
return 0
def _detect_gpu_count() -> int:
"""采集 GPU 数量,如无法探测则默认为 0。"""
try:
proc = subprocess.run(
["nvidia-smi", "-L"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5,
)
except FileNotFoundError:
LOGGER.debug("nvidia-smi not available; assuming 0 GPUs")
return 0
except subprocess.SubprocessError as exc:
LOGGER.warning("nvidia-smi invocation failed", extra={"error": str(exc)})
return 0
if proc.returncode != 0:
LOGGER.debug("nvidia-smi returned non-zero", extra={"stderr": proc.stderr.strip()})
return 0
count = sum(1 for line in proc.stdout.splitlines() if line.strip())
return count
def _detect_ip_address() -> str:
"""尝试通过 UDP socket 获得容器出口 IP失败则回退解析主机名。"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]
except OSError:
LOGGER.debug("UDP socket trick failed; falling back to hostname lookup")
try:
return socket.gethostbyname(socket.gethostname())
except OSError:
LOGGER.warning("Unable to resolve hostname to IP; defaulting to 127.0.0.1")
return "127.0.0.1"