from __future__ import annotations import os import re import socket import subprocess from pathlib import Path from typing import Any, Dict from .config import AgentConfig from .log import get_logger LOGGER = get_logger("argus.agent.collector") _HOSTNAME_PATTERN = re.compile(r"^([^-]+)-([^-]+)-([^-]+)-.*$") def collect_metadata(config: AgentConfig) -> Dict[str, Any]: """汇总节点注册需要的静态信息。""" hostname = config.hostname env, user, instance = _parse_hostname(hostname) meta = { "hostname": hostname, "ip": _detect_ip_address(), "env": env, "user": user, "instance": instance, "cpu_number": _detect_cpu_count(), "memory_in_bytes": _detect_memory_bytes(), "gpu_number": _detect_gpu_count(), } return meta def _parse_hostname(hostname: str) -> tuple[str, str, str]: """按照约定的 env-user-instance 前缀拆解主机名。""" match = _HOSTNAME_PATTERN.match(hostname) if not match: LOGGER.warning("Hostname does not match expected pattern", extra={"hostname": hostname}) return "", "", "" return match.group(1), match.group(2), match.group(3) def _detect_cpu_count() -> int: count = os.cpu_count() return count if count is not None else 0 def _detect_memory_bytes() -> int: """优先读取 cgroup 限额,失败时退回 /proc/meminfo。""" cgroup_path = Path("/sys/fs/cgroup/memory.max") try: raw = cgroup_path.read_text(encoding="utf-8").strip() if raw and raw != "max": return int(raw) except FileNotFoundError: LOGGER.debug("cgroup memory.max not found, falling back to /proc/meminfo") except ValueError: LOGGER.warning("Failed to parse memory.max, falling back", extra={"value": raw}) try: with open("/proc/meminfo", "r", encoding="utf-8") as handle: for line in handle: if line.startswith("MemTotal:"): parts = line.split() if len(parts) >= 2: return int(parts[1]) * 1024 except FileNotFoundError: LOGGER.error("/proc/meminfo not found; defaulting memory to 0") return 0 def _detect_gpu_count() -> int: """采集 GPU 数量,如无法探测则默认为 0。""" try: proc = subprocess.run( ["nvidia-smi", "-L"], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5, ) except FileNotFoundError: LOGGER.debug("nvidia-smi not available; assuming 0 GPUs") return 0 except subprocess.SubprocessError as exc: LOGGER.warning("nvidia-smi invocation failed", extra={"error": str(exc)}) return 0 if proc.returncode != 0: LOGGER.debug("nvidia-smi returned non-zero", extra={"stderr": proc.stderr.strip()}) return 0 count = sum(1 for line in proc.stdout.splitlines() if line.strip()) return count def _detect_ip_address() -> str: """尝试通过 UDP socket 获得容器出口 IP,失败则回退解析主机名。""" try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.connect(("8.8.8.8", 80)) return sock.getsockname()[0] except OSError: LOGGER.debug("UDP socket trick failed; falling back to hostname lookup") try: return socket.gethostbyname(socket.gethostname()) except OSError: LOGGER.warning("Unable to resolve hostname to IP; defaulting to 127.0.0.1") return "127.0.0.1"