argus/src/agent/app/main.py
yuyr 7430896fab v1.0.0 发布合并回master (#52)
Co-authored-by: sundapeng <sundp@mail.zgclab.edu.cn>
Co-authored-by: xuxt <xuxt@zgclab.edu.cn>
Reviewed-on: #52
2025-11-25 16:00:50 +08:00

164 lines
5.3 KiB
Python

from __future__ import annotations
import signal
import time
from datetime import datetime, timezone
from typing import Optional
from .client import AgentClient, MasterAPIError
from .collector import collect_metadata
from .config import AgentConfig, load_config
from .health_reader import read_health_directory
from .log import get_logger, setup_logging
from .state import clear_node_state, load_node_state, save_node_state
LOGGER = get_logger("argus.agent")
def _current_timestamp() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
class StopSignal:
def __init__(self) -> None:
self._stop = False
def set(self, *_args) -> None: # type: ignore[override]
self._stop = True
def is_set(self) -> bool:
return self._stop
def main(argv: Optional[list[str]] = None) -> int: # noqa: ARG001 - 保留签名以兼容入口调用
setup_logging()
stop_signal = StopSignal()
signal.signal(signal.SIGTERM, stop_signal.set)
signal.signal(signal.SIGINT, stop_signal.set)
try:
config = load_config()
except Exception as exc:
LOGGER.error("Failed to load configuration", extra={"error": str(exc)})
return 1
LOGGER.info(
"Agent starting",
extra={
"hostname": config.hostname,
"master_endpoint": config.master_endpoint,
"node_file": config.node_file,
},
)
client = AgentClient(config.master_endpoint, timeout=config.request_timeout_seconds)
node_state = load_node_state(config.node_file) or {}
node_id = node_state.get("id")
# 与 master 建立注册关系(支持重注册),失败则重试
register_response = _register_with_retry(client, config, node_id, stop_signal)
if register_response is None:
LOGGER.info("Registration aborted due to shutdown signal")
return 0
node_id = register_response.get("id")
if not node_id:
LOGGER.error("Master did not return node id; aborting")
return 1
save_node_state(config.node_file, register_response)
LOGGER.info("Entering status report loop", extra={"node_id": node_id})
_status_loop(client, config, node_id, stop_signal)
return 0
def _register_with_retry(
client: AgentClient,
config: AgentConfig,
node_id: Optional[str],
stop_signal: StopSignal,
):
backoff = 5
while not stop_signal.is_set():
payload = {
"name": config.hostname,
"type": "agent",
"meta_data": collect_metadata(config),
"version": config.version,
}
if node_id:
payload["id"] = node_id
try:
response = client.register_node(payload)
LOGGER.info("Registration successful", extra={"node_id": response.get("id")})
save_node_state(config.node_file, response)
return response
except MasterAPIError as exc:
if exc.status_code == 404 and node_id:
LOGGER.warning(
"Master does not recognise node id; clearing local node state",
extra={"node_id": node_id},
)
clear_node_state(config.node_file)
node_id = None
elif exc.status_code == 500 and node_id:
# id 与 name 不匹配通常意味着配置异常,记录但继续重试
LOGGER.error(
"Master rejected node due to id/name mismatch; will retry",
extra={"node_id": node_id},
)
else:
LOGGER.error("Registration failed", extra={"status_code": exc.status_code, "error": str(exc)})
time.sleep(min(backoff, 60))
backoff = min(backoff * 2, 60)
except Exception as exc: # pragma: no cover - defensive
LOGGER.exception("Unexpected error during registration", extra={"error": str(exc)})
time.sleep(min(backoff, 60))
backoff = min(backoff * 2, 60)
return None
def _status_loop(
client: AgentClient,
config: AgentConfig,
node_id: str,
stop_signal: StopSignal,
) -> None:
interval = config.report_interval_seconds
while not stop_signal.is_set():
timestamp = _current_timestamp()
health_payload = read_health_directory(config.health_dir)
body = {
"timestamp": timestamp,
"health": health_payload,
}
try:
response = client.update_status(node_id, body)
LOGGER.info(
"Status report succeeded",
extra={"node_id": node_id, "health_keys": list(health_payload.keys())},
)
save_node_state(config.node_file, response)
except MasterAPIError as exc:
# 保持循环继续执行,等待下一次重试
LOGGER.error(
"Failed to report status",
extra={"status_code": exc.status_code, "error": str(exc)},
)
except Exception as exc: # pragma: no cover - defensive
LOGGER.exception("Unexpected error during status report", extra={"error": str(exc)})
for _ in range(interval):
if stop_signal.is_set():
break
time.sleep(1)
LOGGER.info("Stop signal received; exiting status loop")
if __name__ == "__main__":
sys.exit(main())