argus/src/agent/app/health_reader.py
yuyr 1e5e91b193 dev_1.0.0_yuyr_2:重新提交 PR,增加 master/agent 以及系统集成测试 (#17)
Reviewed-on: #17
Reviewed-by: sundapeng <sundp@mail.zgclab.edu.cn>
Reviewed-by: xuxt <xuxt@zgclab.edu.cn>
2025-10-11 15:04:46 +08:00

33 lines
1.2 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict
from .log import get_logger
LOGGER = get_logger("argus.agent.health")
def read_health_directory(path: str) -> Dict[str, Any]:
"""读取目录中所有 <prefix>-*.json 文件并返回 JSON 映射。"""
result: Dict[str, Any] = {}
directory = Path(path)
if not directory.exists():
LOGGER.debug("Health directory does not exist", extra={"path": str(directory)})
return result
for health_file in sorted(directory.glob("*.json")):
if "-" not in health_file.stem:
LOGGER.debug("Skipping non-prefixed health file", extra={"file": health_file.name})
continue
try:
with health_file.open("r", encoding="utf-8") as handle:
content = json.load(handle)
result[health_file.stem] = content
except json.JSONDecodeError as exc:
LOGGER.warning("Failed to parse health file", extra={"file": health_file.name, "error": str(exc)})
except OSError as exc:
LOGGER.warning("Failed to read health file", extra={"file": health_file.name, "error": str(exc)})
return result