from __future__ import annotations import signal import time from datetime import datetime, timezone from typing import Optional from .client import AgentClient, MasterAPIError from .collector import collect_metadata from .config import AgentConfig, load_config from .health_reader import read_health_directory from .log import get_logger, setup_logging from .state import clear_node_state, load_node_state, save_node_state LOGGER = get_logger("argus.agent") def _current_timestamp() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") class StopSignal: def __init__(self) -> None: self._stop = False def set(self, *_args) -> None: # type: ignore[override] self._stop = True def is_set(self) -> bool: return self._stop def main(argv: Optional[list[str]] = None) -> int: # noqa: ARG001 - 保留签名以兼容入口调用 setup_logging() stop_signal = StopSignal() signal.signal(signal.SIGTERM, stop_signal.set) signal.signal(signal.SIGINT, stop_signal.set) try: config = load_config() except Exception as exc: LOGGER.error("Failed to load configuration", extra={"error": str(exc)}) return 1 LOGGER.info( "Agent starting", extra={ "hostname": config.hostname, "master_endpoint": config.master_endpoint, "node_file": config.node_file, }, ) client = AgentClient(config.master_endpoint, timeout=config.request_timeout_seconds) node_state = load_node_state(config.node_file) or {} node_id = node_state.get("id") # 与 master 建立注册关系(支持重注册),失败则重试 register_response = _register_with_retry(client, config, node_id, stop_signal) if register_response is None: LOGGER.info("Registration aborted due to shutdown signal") return 0 node_id = register_response.get("id") if not node_id: LOGGER.error("Master did not return node id; aborting") return 1 save_node_state(config.node_file, register_response) LOGGER.info("Entering status report loop", extra={"node_id": node_id}) _status_loop(client, config, node_id, stop_signal) return 0 def _register_with_retry( client: AgentClient, config: AgentConfig, node_id: Optional[str], stop_signal: StopSignal, ): backoff = 5 while not stop_signal.is_set(): payload = { "name": config.hostname, "type": "agent", "meta_data": collect_metadata(config), "version": config.version, } if node_id: payload["id"] = node_id try: response = client.register_node(payload) LOGGER.info("Registration successful", extra={"node_id": response.get("id")}) save_node_state(config.node_file, response) return response except MasterAPIError as exc: if exc.status_code == 404 and node_id: LOGGER.warning( "Master does not recognise node id; clearing local node state", extra={"node_id": node_id}, ) clear_node_state(config.node_file) node_id = None elif exc.status_code == 500 and node_id: # id 与 name 不匹配通常意味着配置异常,记录但继续重试 LOGGER.error( "Master rejected node due to id/name mismatch; will retry", extra={"node_id": node_id}, ) else: LOGGER.error("Registration failed", extra={"status_code": exc.status_code, "error": str(exc)}) time.sleep(min(backoff, 60)) backoff = min(backoff * 2, 60) except Exception as exc: # pragma: no cover - defensive LOGGER.exception("Unexpected error during registration", extra={"error": str(exc)}) time.sleep(min(backoff, 60)) backoff = min(backoff * 2, 60) return None def _status_loop( client: AgentClient, config: AgentConfig, node_id: str, stop_signal: StopSignal, ) -> None: interval = config.report_interval_seconds while not stop_signal.is_set(): timestamp = _current_timestamp() health_payload = read_health_directory(config.health_dir) body = { "timestamp": timestamp, "health": health_payload, } try: response = client.update_status(node_id, body) LOGGER.info( "Status report succeeded", extra={"node_id": node_id, "health_keys": list(health_payload.keys())}, ) save_node_state(config.node_file, response) except MasterAPIError as exc: # 保持循环继续执行,等待下一次重试 LOGGER.error( "Failed to report status", extra={"status_code": exc.status_code, "error": str(exc)}, ) except Exception as exc: # pragma: no cover - defensive LOGGER.exception("Unexpected error during status report", extra={"error": str(exc)}) for _ in range(interval): if stop_signal.is_set(): break time.sleep(1) LOGGER.info("Stop signal received; exiting status loop") if __name__ == "__main__": sys.exit(main())