from __future__ import annotations import json from typing import Any, Dict, Optional import requests from .log import get_logger LOGGER = get_logger("argus.agent.client") class MasterAPIError(Exception): def __init__(self, message: str, status_code: int, payload: Optional[Dict[str, Any]] = None) -> None: super().__init__(message) self.status_code = status_code self.payload = payload or {} class AgentClient: def __init__(self, base_url: str, *, timeout: int = 10) -> None: self._base_url = base_url.rstrip("/") self._timeout = timeout self._session = requests.Session() def register_node(self, body: Dict[str, Any]) -> Dict[str, Any]: """调用 master 注册接口,返回节点对象。""" url = f"{self._base_url}/api/v1/master/nodes" response = self._session.post(url, json=body, timeout=self._timeout) return self._parse_response(response, "Failed to register node") def update_status(self, node_id: str, body: Dict[str, Any]) -> Dict[str, Any]: """上报健康信息,由 master 更新 last_report。""" url = f"{self._base_url}/api/v1/master/nodes/{node_id}/status" response = self._session.put(url, json=body, timeout=self._timeout) return self._parse_response(response, "Failed to update node status") def _parse_response(self, response: requests.Response, error_prefix: str) -> Dict[str, Any]: content_type = response.headers.get("Content-Type", "") payload: Dict[str, Any] | None = None if "application/json" in content_type: try: payload = response.json() except json.JSONDecodeError: LOGGER.warning("Response contained invalid JSON", extra={"status": response.status_code}) if response.status_code >= 400: message = payload.get("error") if isinstance(payload, dict) else response.text raise MasterAPIError( f"{error_prefix}: {message}", status_code=response.status_code, payload=payload if isinstance(payload, dict) else None, ) if payload is None: try: payload = response.json() except json.JSONDecodeError as exc: raise MasterAPIError("Master returned non-JSON payload", response.status_code) from exc return payload