Reviewed-on: #17 Reviewed-by: sundapeng <sundp@mail.zgclab.edu.cn> Reviewed-by: xuxt <xuxt@zgclab.edu.cn>
45 lines
1.3 KiB
Python
45 lines
1.3 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import tempfile
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional
|
||
|
||
from .log import get_logger
|
||
|
||
LOGGER = get_logger("argus.agent.state")
|
||
|
||
|
||
def load_node_state(path: str) -> Optional[Dict[str, Any]]:
|
||
"""读取本地 node.json,容器重启后沿用之前的 ID。"""
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as handle:
|
||
return json.load(handle)
|
||
except FileNotFoundError:
|
||
return None
|
||
except json.JSONDecodeError as exc:
|
||
LOGGER.warning("node.json is invalid JSON; ignoring", extra={"error": str(exc)})
|
||
return None
|
||
|
||
|
||
def save_node_state(path: str, data: Dict[str, Any]) -> None:
|
||
"""原子化写入 node.json,避免并发读取坏数据。"""
|
||
directory = Path(path).parent
|
||
directory.mkdir(parents=True, exist_ok=True)
|
||
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False, encoding="utf-8") as tmp:
|
||
json.dump(data, tmp, separators=(",", ":"))
|
||
tmp.flush()
|
||
os.fsync(tmp.fileno())
|
||
temp_path = tmp.name
|
||
os.replace(temp_path, path)
|
||
|
||
|
||
def clear_node_state(path: str) -> None:
|
||
try:
|
||
os.remove(path)
|
||
except FileNotFoundError:
|
||
return
|
||
except OSError as exc:
|
||
LOGGER.warning("Failed to remove node state file", extra={"error": str(exc), "path": path})
|