argus/src/master/app/models.py

172 lines
5.9 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Mapping
from .util import parse_iso
class ValidationError(Exception):
"""Raised when user payload fails validation."""
@dataclass
class Node:
id: str
name: str
type: str
version: str | None
status: str
config: Dict[str, Any]
labels: Iterable[str]
meta_data: Dict[str, Any]
health: Dict[str, Any]
register_time: str | None
last_report: str | None
agent_last_report: str | None
last_updated: str | None
def serialize_node_row(row: Mapping[str, Any]) -> Dict[str, Any]:
def _json_or_default(value: str | None, default: Any) -> Any:
if value is None or value == "":
return default
try:
return json.loads(value)
except json.JSONDecodeError:
return default
config = _json_or_default(row["config_json"], {})
labels = _json_or_default(row["labels_json"], [])
meta = _json_or_default(row["meta_json"], {})
health = _json_or_default(row["health_json"], {})
return {
"id": row["id"],
"name": row["name"],
"type": row["type"],
"version": row["version"],
"status": row["status"],
"config": config if isinstance(config, dict) else {},
"label": list(labels) if isinstance(labels, list) else [],
"meta_data": meta if isinstance(meta, dict) else {},
"health": health if isinstance(health, dict) else {},
"register_time": row["register_time"],
"last_report": row["last_report"],
"agent_last_report": row["agent_last_report"],
"last_updated": row["last_updated"],
}
def serialize_node_summary(row: Mapping[str, Any]) -> Dict[str, Any]:
return {
"id": row["id"],
"name": row["name"],
"status": row["status"],
"type": row["type"],
"version": row["version"],
}
def validate_registration_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
name = payload.get("name")
if not isinstance(name, str) or not name.strip():
raise ValidationError("Field 'name' is required and must be a non-empty string")
node_type = payload.get("type", "agent")
if not isinstance(node_type, str) or not node_type:
raise ValidationError("Field 'type' must be a string")
version = payload.get("version")
if version is not None and not isinstance(version, str):
raise ValidationError("Field 'version' must be a string if provided")
meta = payload.get("meta_data")
if not isinstance(meta, Mapping):
raise ValidationError("Field 'meta_data' must be an object")
required_meta = ["hostname", "ip", "env", "user", "instance", "cpu_number", "memory_in_bytes", "gpu_number"]
for key in required_meta:
if key not in meta:
raise ValidationError(f"meta_data.{key} is required")
cpu_number = meta["cpu_number"]
memory_in_bytes = meta["memory_in_bytes"]
gpu_number = meta["gpu_number"]
if not isinstance(cpu_number, int) or cpu_number < 0:
raise ValidationError("meta_data.cpu_number must be a non-negative integer")
if not isinstance(memory_in_bytes, int) or memory_in_bytes < 0:
raise ValidationError("meta_data.memory_in_bytes must be a non-negative integer")
if not isinstance(gpu_number, int) or gpu_number < 0:
raise ValidationError("meta_data.gpu_number must be a non-negative integer")
node_id = payload.get("id")
if node_id is not None and (not isinstance(node_id, str) or not node_id.strip()):
raise ValidationError("Field 'id' must be a non-empty string when provided")
return {
"id": node_id,
"name": name,
"type": node_type,
"version": version,
"meta_data": dict(meta),
}
def validate_status_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
timestamp = payload.get("timestamp")
if not isinstance(timestamp, str) or not timestamp:
raise ValidationError("Field 'timestamp' is required and must be a string")
parsed = parse_iso(timestamp)
if parsed is None:
raise ValidationError("Field 'timestamp' must be an ISO8601 datetime string")
health = payload.get("health", {})
if not isinstance(health, Mapping):
raise ValidationError("Field 'health' must be an object if provided")
sanitized_health: Dict[str, Any] = {}
for key, value in health.items():
if not isinstance(key, str):
raise ValidationError("Keys in 'health' must be strings")
if not isinstance(value, (Mapping, list, str, int, float, bool)) and value is not None:
raise ValidationError("Values in 'health' must be JSON-compatible")
sanitized_health[key] = value
return {
"timestamp": timestamp,
"parsed_timestamp": parsed,
"health": sanitized_health,
}
def validate_config_payload(payload: Mapping[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, Mapping):
raise ValidationError("Request body must be a JSON object")
result: Dict[str, Any] = {}
if "config" in payload:
config = payload["config"]
if not isinstance(config, Mapping):
raise ValidationError("Field 'config' must be an object")
result["config"] = dict(config)
if "label" in payload:
labels = payload["label"]
if not isinstance(labels, list) or not all(isinstance(item, str) for item in labels):
raise ValidationError("Field 'label' must be an array of strings")
result["label"] = list(labels)
if not result:
raise ValidationError("At least one of 'config' or 'label' must be provided")
return result