argus/src/metric/prometheus/build/update_targets.py

416 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Prometheus Targets 动态更新脚本
脚本从节点配置文件读取节点信息,并动态生成对应的 Prometheus targets 文件。
"""
import json
import os
import sys
import logging
import argparse
import time
import hashlib
from datetime import datetime
from typing import Dict, List, Any
from pathlib import Path
class PrometheusTargetsManager:
"""Prometheus Targets 管理器"""
def __init__(self, config_file: str, targets_dir: str, exporter_config_file: str = None, log_level: str = "INFO"):
"""
初始化管理器
Args:
config_file: 节点配置文件路径
targets_dir: targets 文件输出目录
exporter_config_file: exporter 配置文件路径
log_level: 日志级别
"""
self.config_file = Path(config_file)
self.targets_dir = Path(targets_dir)
self.exporter_config_file = Path(exporter_config_file) if exporter_config_file else None
self.log_level = log_level
self.last_mtime = 0 # 记录文件最后修改时间
self.last_content_hash = None # 记录文件内容哈希
# 设置日志
self._setup_logging()
# 加载 exporter 配置(必需,失败则程序退出)
try:
full_config = self._load_exporter_config()
self.exporter_configs = full_config.get('exporters', {})
self.label_templates = full_config.get('label_templates', {})
except Exception as e:
self.logger.error(f"初始化失败,无法加载 exporter 配置: {e}")
raise
# 确保 targets 目录存在
self.targets_dir.mkdir(parents=True, exist_ok=True)
def _setup_logging(self):
"""设置日志配置"""
logging.basicConfig(
level=getattr(logging, self.log_level.upper()),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f'{self.targets_dir}/targets_update.log')
]
)
self.logger = logging.getLogger(__name__)
def _load_exporter_config(self) -> Dict[str, Any]:
"""
加载 exporter 配置文件
Returns:
exporter 配置字典
Raises:
FileNotFoundError: 配置文件不存在
json.JSONDecodeError: JSON 格式错误
ValueError: 配置格式错误
"""
if not self.exporter_config_file:
raise FileNotFoundError("Exporter 配置文件路径未指定")
if not self.exporter_config_file.exists():
raise FileNotFoundError(f"Exporter 配置文件不存在: {self.exporter_config_file}")
try:
with open(self.exporter_config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
if not isinstance(config, dict):
raise ValueError("Exporter 配置文件必须是 JSON 对象格式")
exporters = config.get('exporters', {})
if not isinstance(exporters, dict):
raise ValueError("exporters 配置必须是对象格式")
if not exporters:
raise ValueError("exporters 配置不能为空")
self.logger.info(f"成功加载 exporter 配置: {len(exporters)} 个 exporter")
return config
except json.JSONDecodeError as e:
self.logger.error(f"Exporter 配置文件 JSON 解析错误: {e}")
raise
except Exception as e:
self.logger.error(f"加载 exporter 配置失败: {e}")
raise
def load_nodes_config(self) -> List[Dict[str, Any]]:
"""
加载节点配置文件
Returns:
节点配置列表
"""
try:
if not self.config_file.exists():
self.logger.warning(f"节点配置文件不存在: {self.config_file}")
return []
with open(self.config_file, 'r', encoding='utf-8') as f:
nodes = json.load(f)
if not isinstance(nodes, list):
self.logger.error("节点配置必须是数组格式")
return []
self.logger.info(f"成功加载 {len(nodes)} 个节点配置")
return nodes
except json.JSONDecodeError as e:
self.logger.error(f"JSON 解析错误: {e}")
return []
except Exception as e:
self.logger.error(f"加载节点配置失败: {e}")
return []
def generate_targets(self, nodes: List[Dict[str, Any]], exporter_type: str) -> List[Dict[str, Any]]:
"""
生成指定类型的 targets 配置
Args:
nodes: 节点配置列表
exporter_type: exporter 类型 (dcgm, node)
Returns:
targets 配置列表
"""
if exporter_type not in self.exporter_configs:
self.logger.error(f"不支持的 exporter 类型: {exporter_type}")
return []
config = self.exporter_configs[exporter_type]
targets = []
for node in nodes:
# 验证必要字段
if not all(key in node for key in ['node_id', 'ip']):
self.logger.warning(f"节点配置缺少必要字段,跳过: {node}")
continue
# 构建 target 地址
target_address = f"{node['ip']}:{config['port']}"
# 构建上下文变量
context = {
'node_id': node['node_id'],
'ip': node['ip'],
'hostname': node.get('hostname', ''),
'user_id': node.get('user_id', ''),
'tag': self._join_labels(node.get('labels', []))
}
# 使用模板生成标签
label_template = self.label_templates.get(exporter_type, {})
labels = {}
for label_key, template_value in label_template.items():
if isinstance(template_value, str) and '{' in template_value:
# 模板字符串,需要渲染
labels[label_key] = self._render_label_template(template_value, context)
else:
# 固定值
labels[label_key] = template_value
targets.append({
"targets": [target_address],
"labels": labels
})
self.logger.info(f"{exporter_type} exporter 生成了 {len(targets)} 个 targets")
return targets
def write_targets_file(self, targets: List[Dict[str, Any]], exporter_type: str) -> None:
"""
写入 targets 文件
Args:
targets: targets 配置列表
exporter_type: exporter 类型
"""
filename = f"{exporter_type}_exporter.json"
filepath = self.targets_dir / filename
try:
# 写入新文件
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(targets, f, indent=2, ensure_ascii=False)
self.logger.info(f"成功写入 targets 文件: {filepath}")
except Exception as e:
self.logger.error(f"写入 targets 文件失败: {e}")
raise
def update_all_targets(self) -> None:
"""更新所有类型的 targets 文件"""
try:
# 加载节点配置
nodes = self.load_nodes_config()
if not nodes:
self.logger.warning("没有找到任何节点配置")
return
# 为每种 exporter 类型生成 targets
for exporter_type in self.exporter_configs.keys():
targets = self.generate_targets(nodes, exporter_type)
if targets: # 只有当有 targets 时才写入文件
self.write_targets_file(targets, exporter_type)
self.logger.info("所有 targets 文件更新完成")
except Exception as e:
self.logger.error(f"更新 targets 失败: {e}")
raise
def _calculate_file_hash(self, file_path: Path) -> str:
"""
计算文件内容的 MD5 哈希值
Args:
file_path: 文件路径
Returns:
文件内容的 MD5 哈希值
"""
try:
with open(file_path, 'rb') as f:
content = f.read()
return hashlib.md5(content).hexdigest()
except Exception as e:
self.logger.error(f"计算文件哈希失败: {e}")
return ""
def _render_label_template(self, template: str, context: Dict[str, str]) -> str:
"""
渲染标签模板
Args:
template: 模板字符串,如 "dcgm-exporter-{node_id}"
context: 上下文变量字典
Returns:
渲染后的字符串
"""
try:
return template.format(**context)
except KeyError as e:
self.logger.warning(f"模板渲染失败,缺少变量 {e}: {template}")
return template
except Exception as e:
self.logger.warning(f"模板渲染失败: {e}")
return template
def _join_labels(self, labels_list: List[str]) -> str:
"""
将 labels 数组拼接成一个字符串
Args:
labels_list: 标签字符串数组
Returns:
拼接后的字符串,用逗号分隔
"""
if not labels_list:
return ""
# 过滤掉空字符串和 None 值
valid_labels = [label.strip() for label in labels_list if label and label.strip()]
return ",".join(valid_labels)
def check_file_changed(self) -> bool:
"""
检查配置文件是否发生变化
Returns:
True 如果文件发生变化False 否则
"""
try:
if not self.config_file.exists():
return False
# 计算当前文件内容哈希
current_hash = self._calculate_file_hash(self.config_file)
if not current_hash:
return False
# 如果是第一次检查,记录哈希并触发更新
if self.last_content_hash is None:
self.last_content_hash = current_hash
self.logger.info("首次检查,记录文件内容哈希并触发初始更新")
return True
# 比较内容哈希
if current_hash != self.last_content_hash:
self.last_content_hash = current_hash
self.logger.info("检测到文件内容变化")
return True
return False
except Exception as e:
self.logger.error(f"检查文件变化失败: {e}")
return False
def run_daemon(self, check_interval: int = 30) -> None:
"""
以守护进程模式运行,定期检查文件变化
Args:
check_interval: 检查间隔(秒)
"""
self.logger.info(f"启动守护进程模式,检查间隔: {check_interval}")
try:
while True:
if self.check_file_changed():
self.logger.info("检测到配置文件变化,开始更新 targets")
self.update_all_targets()
else:
self.logger.debug("配置文件无变化,跳过更新")
time.sleep(check_interval)
except KeyboardInterrupt:
self.logger.info("收到中断信号,正在退出...")
except Exception as e:
self.logger.error(f"守护进程运行错误: {e}")
raise
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Prometheus Targets 动态更新脚本 (精简版)")
parser.add_argument(
"--config",
default="/private/argus/metric/prometheus/nodes.json",
help="节点配置文件路径 (默认: /private/argus/metric/prometheus/nodes.json)"
)
parser.add_argument(
"--targets-dir",
default="/private/argus/metric/prometheus/targets",
help="targets 文件输出目录 (默认: /private/argus/metric/prometheus/targets)"
)
parser.add_argument(
"--exporter-config",
default="/private/argus/metric/prometheus/exporter_config.json",
help="exporter 配置文件路径 (默认: /private/argus/metric/prometheus/exporter_config.json)"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="日志级别 (默认: INFO)"
)
parser.add_argument(
"--daemon",
action="store_true",
help="以守护进程模式运行"
)
parser.add_argument(
"--check-interval",
type=int,
default=30,
help="守护进程模式下的检查间隔(秒,默认: 30"
)
args = parser.parse_args()
try:
# 创建管理器
manager = PrometheusTargetsManager(
config_file=args.config,
targets_dir=args.targets_dir,
exporter_config_file=args.exporter_config,
log_level=args.log_level
)
if args.daemon:
# 守护进程模式
manager.run_daemon(args.check_interval)
else:
# 单次执行模式
manager.update_all_targets()
print("成功更新所有 exporter targets")
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()