huhy_20251211 #54
637
src/log/fluent-bit/build/app/update_fluent_config.py
Normal file
637
src/log/fluent-bit/build/app/update_fluent_config.py
Normal file
@ -0,0 +1,637 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
定时任务脚本:每小时执行一次,根据node.json配置文件生成fluent-bit配置文件
|
||||||
|
支持配置文件变更检测和Fluent Bit热重启
|
||||||
|
"""
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
import hashlib
|
||||||
|
import subprocess
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
import socket
|
||||||
|
|
||||||
|
# 配置日志
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler('/var/log/config_generator.log'),
|
||||||
|
logging.StreamHandler()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class KibanaDataViewManager:
|
||||||
|
"""Kibana Data View 管理器"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.kibana_host = os.environ.get("KIBANA_HOST", "kibana")
|
||||||
|
self.kibana_port = os.environ.get("KIBANA_PORT", "5601")
|
||||||
|
self.kibana_user = os.environ.get("KIBANA_USER", "elastic")
|
||||||
|
self.kibana_password = os.environ.get("KIBANA_PASSWORD", "")
|
||||||
|
self.es_host = os.environ.get("ES_HOST")
|
||||||
|
self.es_port = os.environ.get("ES_PORT", "9200")
|
||||||
|
self.fb_user = os.environ.get("AGENT_USER", "unknown_user")
|
||||||
|
|
||||||
|
self.base_url = f"http://{self.kibana_host}:{self.kibana_port}"
|
||||||
|
self.auth = (self.kibana_user, self.kibana_password)
|
||||||
|
|
||||||
|
# Data View名称模板
|
||||||
|
self.data_view_name = f"custom"
|
||||||
|
self.data_view_pattern = f"custom-*"
|
||||||
|
|
||||||
|
# 请求超时设置
|
||||||
|
self.timeout = 30
|
||||||
|
|
||||||
|
def _get_headers(self):
|
||||||
|
"""获取请求头"""
|
||||||
|
return {
|
||||||
|
"kbn-xsrf": "true",
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
}
|
||||||
|
|
||||||
|
def check_kibana_available(self):
|
||||||
|
"""检查Kibana服务是否可用"""
|
||||||
|
try:
|
||||||
|
url = f"{self.base_url}/api/status"
|
||||||
|
response = requests.get(
|
||||||
|
url,
|
||||||
|
auth=self.auth,
|
||||||
|
timeout=self.timeout,
|
||||||
|
headers=self._get_headers()
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
status = response.json().get("status", {}).get("overall", {}).get("level")
|
||||||
|
if status == "available":
|
||||||
|
logger.info("Kibana服务可用")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.warning(f"Kibana服务状态: {status}")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
logger.warning(f"Kibana状态检查失败: {response.status_code}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
logger.error("无法连接到Kibana服务")
|
||||||
|
return False
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
logger.error("连接Kibana服务超时")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"检查Kibana服务失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_existing_data_views(self):
|
||||||
|
"""获取现有的Data View列表"""
|
||||||
|
try:
|
||||||
|
url = f"{self.base_url}/api/data_views"
|
||||||
|
response = requests.get(
|
||||||
|
url,
|
||||||
|
auth=self.auth,
|
||||||
|
timeout=self.timeout,
|
||||||
|
headers=self._get_headers()
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
response_data = response.json()
|
||||||
|
data_views = response_data.get("data_view", [])
|
||||||
|
logger.info(f"获取到 {len(data_views)} 个Data View")
|
||||||
|
return data_views
|
||||||
|
else:
|
||||||
|
logger.warning(f"获取Data View列表失败: {response.status_code}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"获取Data View列表失败: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def check_data_view_exists(self):
|
||||||
|
"""检查custom Data View是否存在"""
|
||||||
|
data_views = self.get_existing_data_views()
|
||||||
|
|
||||||
|
for data_view in data_views:
|
||||||
|
if data_view.get("name") == self.data_view_name:
|
||||||
|
logger.info(f"Data View '{self.data_view_name}' 已存在")
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.info(f"Data View '{self.data_view_name}' 不存在")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def create_data_view(self):
|
||||||
|
"""创建Data View"""
|
||||||
|
try:
|
||||||
|
# 检查对应的索引是否存在
|
||||||
|
index_exists = self._check_index_exists()
|
||||||
|
if not index_exists:
|
||||||
|
logger.warning(f"索引模式 '{self.data_view_pattern}' 对应的索引不存在,但将继续创建Data View")
|
||||||
|
|
||||||
|
url = f"{self.base_url}/api/data_views/data_view"
|
||||||
|
|
||||||
|
# Data View配置
|
||||||
|
data_view_config = {
|
||||||
|
"data_view": {
|
||||||
|
"title": self.data_view_pattern,
|
||||||
|
"name": self.data_view_name,
|
||||||
|
"timeFieldName": "@timestamp"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(
|
||||||
|
url,
|
||||||
|
auth=self.auth,
|
||||||
|
json=data_view_config,
|
||||||
|
timeout=self.timeout,
|
||||||
|
headers=self._get_headers()
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
logger.info(f"成功创建Data View: {result.get('data_view', {}).get('name')}")
|
||||||
|
return True
|
||||||
|
elif response.status_code == 409:
|
||||||
|
logger.info(f"Data View已存在: {response.json().get('message', 'Unknown error')}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
error_msg = response.json().get('message', 'Unknown error')
|
||||||
|
logger.error(f"创建Data View失败 {response.status_code}: {error_msg}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"创建Data View异常: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _check_index_exists(self):
|
||||||
|
"""检查ES中是否存在对应的索引"""
|
||||||
|
try:
|
||||||
|
es_url = f"http://{self.es_host}:{self.es_port}"
|
||||||
|
index_url = f"{es_url}/{self.data_view_pattern}"
|
||||||
|
|
||||||
|
response = requests.head(
|
||||||
|
index_url,
|
||||||
|
auth=(self.kibana_user, self.kibana_password) if self.kibana_password else None,
|
||||||
|
timeout=self.timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
return response.status_code == 200
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"检查索引存在失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def manage_data_view(self):
|
||||||
|
"""管理Data View:检查并创建"""
|
||||||
|
logger.info("开始管理Kibana Data View")
|
||||||
|
|
||||||
|
# 检查Kibana是否可用
|
||||||
|
if not self.check_kibana_available():
|
||||||
|
logger.warning("Kibana服务不可用,跳过Data View管理")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检查Data View是否存在
|
||||||
|
if self.check_data_view_exists():
|
||||||
|
logger.info(f"Data View '{self.data_view_name}' 已存在,无需创建")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 创建Data View
|
||||||
|
logger.info(f"创建Data View: {self.data_view_name}")
|
||||||
|
if self.create_data_view():
|
||||||
|
logger.info(f"Data View '{self.data_view_name}' 创建成功")
|
||||||
|
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"Data View '{self.data_view_name}' 创建失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
class ConfigGenerator:
|
||||||
|
def __init__(self):
|
||||||
|
# 获取主机名
|
||||||
|
self.hostname = self._get_hostname()
|
||||||
|
|
||||||
|
# 获取用户帐号
|
||||||
|
self.fb_user = self._get_fb_user()
|
||||||
|
|
||||||
|
# 基础路径
|
||||||
|
self.base_path = Path("/private/argus/agent")
|
||||||
|
self.inputs_dir = Path("/etc/fluent-bit/inputs.d")
|
||||||
|
self.outputs_dir = Path("/etc/fluent-bit/outputs.d")
|
||||||
|
|
||||||
|
# 配置文件路径
|
||||||
|
self.config_file = self.base_path / self.hostname / "node.json"
|
||||||
|
# 配置文件hash值,校验配置是否发生变更
|
||||||
|
hash_file = self.base_path / self.hostname / "node_json_hash.txt"
|
||||||
|
|
||||||
|
# 配置文件哈希值存储文件
|
||||||
|
self.config_hash_file = Path(hash_file)
|
||||||
|
|
||||||
|
# 需要保留的文件
|
||||||
|
self.inputs_to_keep = ["10-train.conf", "20-infer.conf"]
|
||||||
|
self.outputs_to_keep = ["10-es.conf"]
|
||||||
|
|
||||||
|
# 生成的配置文件前缀
|
||||||
|
self.generated_prefix = "custom_"
|
||||||
|
|
||||||
|
# Kibana Data View管理器
|
||||||
|
self.kibana_manager = KibanaDataViewManager()
|
||||||
|
|
||||||
|
# 确保目录存在
|
||||||
|
self._ensure_directories()
|
||||||
|
|
||||||
|
def _get_hostname(self):
|
||||||
|
"""获取容器主机名"""
|
||||||
|
return os.environ.get("AGENT_HOSTNAME") or socket.gethostname()
|
||||||
|
|
||||||
|
def _get_fb_user(self):
|
||||||
|
"""获取容器用户"""
|
||||||
|
return os.environ.get("AGENT_USER", "unknown_user")
|
||||||
|
|
||||||
|
def _ensure_directories(self):
|
||||||
|
"""确保必要的目录存在"""
|
||||||
|
directories = [
|
||||||
|
self.base_path / self.hostname,
|
||||||
|
self.inputs_dir,
|
||||||
|
self.outputs_dir
|
||||||
|
]
|
||||||
|
|
||||||
|
for directory in directories:
|
||||||
|
directory.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.debug(f"确保目录存在: {directory}")
|
||||||
|
|
||||||
|
def _get_config_hash(self, config_content):
|
||||||
|
"""计算配置内容的哈希值"""
|
||||||
|
return hashlib.md5(config_content.encode('utf-8')).hexdigest()
|
||||||
|
|
||||||
|
def _save_config_hash(self, config_hash):
|
||||||
|
"""保存配置哈希值"""
|
||||||
|
try:
|
||||||
|
with open(self.config_hash_file, 'w') as f:
|
||||||
|
f.write(config_hash)
|
||||||
|
logger.debug(f"保存配置哈希值: {config_hash}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"保存配置哈希值失败: {e}")
|
||||||
|
|
||||||
|
def _load_config_hash(self):
|
||||||
|
"""加载保存的配置哈希值"""
|
||||||
|
if not self.config_hash_file.exists():
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.config_hash_file, 'r') as f:
|
||||||
|
return f.read().strip()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"加载配置哈希值失败: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _has_config_changed(self, current_config):
|
||||||
|
"""检查配置是否发生变化"""
|
||||||
|
# 生成当前配置的哈希值
|
||||||
|
config_str = json.dumps(current_config, sort_keys=True)
|
||||||
|
current_hash = self._get_config_hash(config_str)
|
||||||
|
|
||||||
|
# 获取之前保存的哈希值
|
||||||
|
saved_hash = self._load_config_hash()
|
||||||
|
|
||||||
|
if saved_hash is None:
|
||||||
|
# 没有保存的哈希值,说明是第一次运行
|
||||||
|
logger.info("首次运行,未找到保存的配置哈希值")
|
||||||
|
self._save_config_hash(current_hash)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 比较哈希值
|
||||||
|
if current_hash == saved_hash:
|
||||||
|
logger.info("配置文件未发生变化")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
logger.info(f"配置文件发生变化: {saved_hash[:8]} -> {current_hash[:8]}")
|
||||||
|
self._save_config_hash(current_hash)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _load_node_config(self):
|
||||||
|
"""加载node.json配置文件"""
|
||||||
|
if not self.config_file.exists():
|
||||||
|
logger.warning(f"配置文件不存在: {self.config_file}")
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.config_file, 'r') as f:
|
||||||
|
config = json.load(f)
|
||||||
|
|
||||||
|
# 获取log路径列表
|
||||||
|
log_configs = config.get("config", {}).get("log", [])
|
||||||
|
paths = [log.get("path") for log in log_configs if log.get("path")]
|
||||||
|
|
||||||
|
logger.info(f"从配置文件中读取到 {len(paths)} 个日志路径")
|
||||||
|
return paths, config
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error(f"配置文件JSON格式错误: {e}")
|
||||||
|
return None, None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"读取配置文件失败: {e}")
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def _clean_generated_files(self):
|
||||||
|
"""只清理生成的配置文件,保留系统文件"""
|
||||||
|
try:
|
||||||
|
# 清理inputs.d目录中生成的配置文件
|
||||||
|
for file_path in self.inputs_dir.glob(f"{self.generated_prefix}*.conf"):
|
||||||
|
if file_path.is_file() and file_path.name not in self.inputs_to_keep:
|
||||||
|
try:
|
||||||
|
file_path.unlink()
|
||||||
|
logger.debug(f"已删除生成的input文件: {file_path}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"删除文件失败 {file_path}: {e}")
|
||||||
|
|
||||||
|
# 清理outputs.d目录中生成的配置文件
|
||||||
|
for file_path in self.outputs_dir.glob(f"{self.generated_prefix}*.conf"):
|
||||||
|
if file_path.is_file() and file_path.name not in self.outputs_to_keep:
|
||||||
|
try:
|
||||||
|
file_path.unlink()
|
||||||
|
logger.debug(f"已删除生成的output文件: {file_path}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"删除文件失败 {file_path}: {e}")
|
||||||
|
|
||||||
|
logger.info("已清理生成的配置文件")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"清理生成的配置文件失败: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _generate_input_config(self, paths):
|
||||||
|
"""生成input配置文件"""
|
||||||
|
configs = []
|
||||||
|
|
||||||
|
for i, path in enumerate(paths, 1):
|
||||||
|
# 清理路径,移除非法字符用于文件名
|
||||||
|
safe_name = re.sub(r'[^\w\-_]', '_', path.replace('/', '_').replace('*', 'all'))
|
||||||
|
filename = f"{self.generated_prefix}{safe_name}_input.conf"
|
||||||
|
filepath = self.inputs_dir / filename
|
||||||
|
|
||||||
|
# 生成配置内容
|
||||||
|
config_content = self._create_input_config(path, i)
|
||||||
|
|
||||||
|
# 写入文件
|
||||||
|
with open(filepath, 'w') as f:
|
||||||
|
f.write(config_content)
|
||||||
|
|
||||||
|
configs.append(filepath)
|
||||||
|
logger.info(f"生成input配置文件: {filepath}")
|
||||||
|
|
||||||
|
return configs
|
||||||
|
|
||||||
|
def _create_input_config(self, path, index):
|
||||||
|
"""创建单个input配置"""
|
||||||
|
config = f"""[INPUT]
|
||||||
|
name tail
|
||||||
|
path {path}
|
||||||
|
tag app.custom
|
||||||
|
Path_Key log_path
|
||||||
|
refresh_interval 5
|
||||||
|
skip_long_lines on
|
||||||
|
storage.type filesystem
|
||||||
|
multiline.parser python,go,java
|
||||||
|
DB /buffers/{self.fb_user}_custom.db
|
||||||
|
Mem_Buf_Limit 50MB
|
||||||
|
Buffer_Chunk_Size 1MB
|
||||||
|
Buffer_Max_Size 5MB
|
||||||
|
Rotate_Wait 30
|
||||||
|
Ignore_Older 24h
|
||||||
|
|
||||||
|
"""
|
||||||
|
return config
|
||||||
|
|
||||||
|
def _generate_output_config(self):
|
||||||
|
"""生成output配置文件,生成一个output配置文件"""
|
||||||
|
filename = f"{self.generated_prefix}output.conf"
|
||||||
|
filepath = self.outputs_dir / filename
|
||||||
|
|
||||||
|
# 检查是否已存在
|
||||||
|
if filepath.exists():
|
||||||
|
logger.info(f"Output配置文件已存在: {filepath}")
|
||||||
|
return [filepath]
|
||||||
|
|
||||||
|
# 生成配置内容
|
||||||
|
config_content = self._create_output_config()
|
||||||
|
|
||||||
|
# 写入文件
|
||||||
|
with open(filepath, 'w') as f:
|
||||||
|
f.write(config_content)
|
||||||
|
|
||||||
|
logger.info(f"生成output配置文件: {filepath}")
|
||||||
|
return [filepath]
|
||||||
|
|
||||||
|
def _create_output_config(self):
|
||||||
|
"""创建单个output配置"""
|
||||||
|
config = f"""# Source: customer log path
|
||||||
|
[OUTPUT]
|
||||||
|
name es
|
||||||
|
Match app.custom
|
||||||
|
Host ${{ES_HOST}}
|
||||||
|
Port ${{ES_PORT}}
|
||||||
|
Logstash_Format On
|
||||||
|
Logstash_Prefix custom-${{FB_USER}}
|
||||||
|
Replace_Dots On
|
||||||
|
Generate_ID On
|
||||||
|
Retry_Limit False
|
||||||
|
Suppress_Type_Name On
|
||||||
|
|
||||||
|
"""
|
||||||
|
return config
|
||||||
|
|
||||||
|
def _reload_fluentbit(self):
|
||||||
|
"""重新加载Fluent Bit配置"""
|
||||||
|
try:
|
||||||
|
# 方法1: 使用SIGHUP信号(如果支持)
|
||||||
|
logger.info("尝试使用SIGHUP信号重新加载Fluent Bit...")
|
||||||
|
result = subprocess.run(
|
||||||
|
["pkill", "-HUP", "fluent-bit"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
logger.info("成功发送SIGHUP信号重新加载Fluent Bit")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 方法2: 使用fluent-bit的热重载(如果支持)
|
||||||
|
logger.info("尝试使用热重载命令...")
|
||||||
|
result = subprocess.run(
|
||||||
|
["fluent-bit", "-R"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
logger.info("成功执行Fluent Bit热重载")
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.error("所有重载方法都失败")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
logger.error("Fluent Bit重载操作超时")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"重载Fluent Bit失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _check_fluentbit_status(self):
|
||||||
|
"""检查Fluent Bit运行状态"""
|
||||||
|
try:
|
||||||
|
# 方法1: 检查进程
|
||||||
|
result = subprocess.run(
|
||||||
|
["pgrep", "fluent-bit"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
logger.info("Fluent Bit进程正在运行")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 方法2: 检查服务状态
|
||||||
|
result = subprocess.run(
|
||||||
|
["systemctl", "is-active", "fluent-bit"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode == 0 and result.stdout.strip() == "active":
|
||||||
|
logger.info("Fluent Bit服务正在运行")
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.warning("Fluent Bit未运行")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"检查Fluent Bit状态失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""执行主逻辑:检查配置文件更新;生成fluent bit配置文件并热启动进程;检查kinbana data view"""
|
||||||
|
logger.info("=" * 50)
|
||||||
|
logger.info(f"开始执行配置生成任务 - {datetime.now()}")
|
||||||
|
logger.info(f"主机名: {self.hostname}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. 加载配置文件
|
||||||
|
paths, config = self._load_node_config()
|
||||||
|
if not paths or not config:
|
||||||
|
logger.warning("未找到有效的日志路径配置,跳过本次执行")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 2. 检查配置是否发生变化
|
||||||
|
config_str = json.dumps(config, sort_keys=True)
|
||||||
|
current_hash = self._get_config_hash(config_str)
|
||||||
|
saved_hash = self._load_config_hash()
|
||||||
|
|
||||||
|
# 如果没有变化,检查Fluent Bit状态后退出
|
||||||
|
if saved_hash and current_hash == saved_hash:
|
||||||
|
logger.info("配置文件未发生变化,跳过配置生成")
|
||||||
|
# 检查Fluent Bit状态
|
||||||
|
if not self._check_fluentbit_status():
|
||||||
|
logger.warning("Fluent Bit未运行,尝试启动...")
|
||||||
|
self._reload_fluentbit()
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 保存新的哈希值
|
||||||
|
self._save_config_hash(current_hash)
|
||||||
|
logger.info("配置文件发生变化,重新生成配置")
|
||||||
|
|
||||||
|
# 3. 清理之前生成的配置文件
|
||||||
|
self._clean_generated_files()
|
||||||
|
|
||||||
|
# 4. 生成新的配置文件
|
||||||
|
input_files = self._generate_input_config(paths)
|
||||||
|
output_files = self._generate_output_config()
|
||||||
|
|
||||||
|
logger.info(f"生成 {len(input_files)} 个input配置,{len(output_files)} 个output配置")
|
||||||
|
|
||||||
|
# 重新加载Fluent Bit
|
||||||
|
logger.info("配置文件已更新,重新加载Fluent Bit...")
|
||||||
|
if self._reload_fluentbit():
|
||||||
|
logger.info("Fluent Bit重载成功")
|
||||||
|
else:
|
||||||
|
logger.warning("Fluent Bit重载失败,但配置文件已更新")
|
||||||
|
|
||||||
|
# 不论配置是否变更,每次执行都检查custom视图是否存在,避免视图创建失败后,若配置不再变更则无法创建
|
||||||
|
# 检查kibana data view 并进行创建data view
|
||||||
|
logger.info("开始管理Kibana Data View...")
|
||||||
|
self.kibana_manager.manage_data_view()
|
||||||
|
|
||||||
|
logger.info("任务完成")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"任务执行失败: {e}", exc_info=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class Scheduler:
|
||||||
|
"""定时调度器"""
|
||||||
|
|
||||||
|
def __init__(self, interval_seconds=3600):
|
||||||
|
self.interval_hours = interval_seconds/3600
|
||||||
|
self.interval_seconds = interval_seconds
|
||||||
|
self.running = False
|
||||||
|
self.generator = ConfigGenerator()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""执行任务"""
|
||||||
|
return self.generator.run()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""启动定时调度"""
|
||||||
|
self.running = True
|
||||||
|
logger.info(f"启动定时调度器,每隔 {self.interval_hours} 小时执行一次")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self.running:
|
||||||
|
# 记录下次执行时间
|
||||||
|
next_run = datetime.now().timestamp() + self.interval_seconds
|
||||||
|
next_time_str = datetime.fromtimestamp(next_run).strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
# 执行任务
|
||||||
|
logger.info(f"开始执行定时任务 - {datetime.now()}")
|
||||||
|
self.run()
|
||||||
|
|
||||||
|
# 等待到下次执行时间
|
||||||
|
logger.info(f"下次执行时间: {next_time_str}")
|
||||||
|
|
||||||
|
# 使用递减等待
|
||||||
|
for _ in range(self.interval_seconds):
|
||||||
|
if not self.running:
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"调度器运行异常: {e}")
|
||||||
|
finally:
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止调度器"""
|
||||||
|
self.running = False
|
||||||
|
logger.info("调度器已停止")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# 定时调度,设置调度周期
|
||||||
|
scheduler = Scheduler(3600)
|
||||||
|
|
||||||
|
# 启动调度器
|
||||||
|
scheduler.start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -1,9 +1,11 @@
|
|||||||
[SERVICE]
|
[SERVICE]
|
||||||
Daemon Off
|
Daemon Off
|
||||||
Parsers_File parsers.conf
|
Parsers_File /etc/fluent-bit/parsers.conf
|
||||||
HTTP_Server On
|
HTTP_Server On
|
||||||
HTTP_Listen 0.0.0.0
|
HTTP_Listen 0.0.0.0
|
||||||
HTTP_Port 2020
|
HTTP_Port 2020
|
||||||
|
Hot_Reload On
|
||||||
|
Hot_Reload.Period 5
|
||||||
storage.path /buffers
|
storage.path /buffers
|
||||||
storage.sync normal
|
storage.sync normal
|
||||||
storage.checksum on
|
storage.checksum on
|
||||||
@ -31,7 +33,7 @@
|
|||||||
[FILTER]
|
[FILTER]
|
||||||
Name lua
|
Name lua
|
||||||
Match app.*
|
Match app.*
|
||||||
script inject_labels.lua
|
script /etc/fluent-bit/inject_labels.lua
|
||||||
call add_labels
|
call add_labels
|
||||||
|
|
||||||
@INCLUDE outputs.d/*.conf
|
@INCLUDE outputs.d/*.conf
|
||||||
|
|||||||
@ -9,6 +9,7 @@ export DEBIAN_FRONTEND=noninteractive
|
|||||||
echo "[INFO] Staging fluent-bit bundle..."
|
echo "[INFO] Staging fluent-bit bundle..."
|
||||||
rm -rf /tmp/flb && mkdir -p /tmp/flb
|
rm -rf /tmp/flb && mkdir -p /tmp/flb
|
||||||
cp -r /private/etc /tmp/flb/
|
cp -r /private/etc /tmp/flb/
|
||||||
|
cp -r /private/app /tmp
|
||||||
mkdir -p /tmp/flb/packages
|
mkdir -p /tmp/flb/packages
|
||||||
cp -r /private/packages/* /tmp/flb/packages/ 2>/dev/null || true
|
cp -r /private/packages/* /tmp/flb/packages/ 2>/dev/null || true
|
||||||
|
|
||||||
@ -75,6 +76,10 @@ echo "[INFO] Fluent Bit version:"
|
|||||||
mkdir -p /etc/fluent-bit
|
mkdir -p /etc/fluent-bit
|
||||||
cp -r /tmp/flb/etc/* /etc/fluent-bit/
|
cp -r /tmp/flb/etc/* /etc/fluent-bit/
|
||||||
|
|
||||||
|
# 创建定时脚本目录,并复制脚本
|
||||||
|
mkdir -p /etc/fluent-bit/app/
|
||||||
|
cp -r /tmp/app/* /etc/fluent-bit/app/
|
||||||
|
|
||||||
# Create logs/buffers dirs
|
# Create logs/buffers dirs
|
||||||
mkdir -p /logs/train /logs/infer /buffers
|
mkdir -p /logs/train /logs/infer /buffers
|
||||||
|
|
||||||
@ -92,10 +97,138 @@ chmod 770 /buffers || true
|
|||||||
# 目录属主设置为 fluent-bit(不影响 1777 粘滞位)
|
# 目录属主设置为 fluent-bit(不影响 1777 粘滞位)
|
||||||
chown -R fluent-bit:fluent-bit /logs /buffers 2>/dev/null || true
|
chown -R fluent-bit:fluent-bit /logs /buffers 2>/dev/null || true
|
||||||
|
|
||||||
|
# 检查Python基础环境
|
||||||
|
echo "[INFO] Checking Python environment..."
|
||||||
|
|
||||||
|
echo "[INFO] Fixing broken dependencies..."
|
||||||
|
apt-get -f install -y -qq 2>/dev/null || true
|
||||||
|
|
||||||
|
# 检查python3是否安装
|
||||||
|
if ! command -v python3 >/dev/null 2>&1; then
|
||||||
|
echo "[INFO] Installing python3..."
|
||||||
|
apt-get update -qq || true
|
||||||
|
# 尝试安装最小化的python3(不安装推荐包,减少依赖冲突)
|
||||||
|
apt-get install -y -qq --no-install-recommends python3-minimal 2>/dev/null || {
|
||||||
|
echo "[WARN] Failed to install python3 via apt, trying alternative approach..."
|
||||||
|
|
||||||
|
# 如果apt安装失败,检查是否已有python3二进制文件
|
||||||
|
if [ -f /usr/bin/python3 ] && python3 --version 2>/dev/null; then
|
||||||
|
echo "[INFO] Python3 binary exists but not in PATH"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 检查pip是否可用
|
||||||
|
if ! command -v pip3 >/dev/null 2>&1; then
|
||||||
|
# 尝试安装pip
|
||||||
|
echo "[INFO] Installing pip..."
|
||||||
|
apt-get update -qq || true
|
||||||
|
apt-get install -y -qq python3-pip 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 检查Python脚本依赖的包
|
||||||
|
check_python_packages() {
|
||||||
|
local missing_packages=""
|
||||||
|
|
||||||
|
# 根据脚本内容确定需要的包
|
||||||
|
local required_packages=(
|
||||||
|
"requests" # HTTP请求库(脚本中最关键的依赖)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 检查requests是否安装(必须)
|
||||||
|
if ! python3 -c "import requests" 2>/dev/null; then
|
||||||
|
echo "[WARN] 'requests' package not found, installing..."
|
||||||
|
if command -v pip3 >/dev/null 2>&1; then
|
||||||
|
# 尝试使用清华镜像源
|
||||||
|
PIP_INDEX_URL="https://pypi.tuna.tsinghua.edu.cn/simple"
|
||||||
|
echo "[INFO] Installing requests using pip with mirror..."
|
||||||
|
pip3 install --no-cache-dir --index-url "$PIP_INDEX_URL" requests 2>/dev/null || {
|
||||||
|
# 如果镜像失败,尝试官方源
|
||||||
|
echo "[INFO] Trying official PyPI source..."
|
||||||
|
pip3 install --no-cache-dir requests 2>/dev/null || {
|
||||||
|
echo "[ERROR] Failed to install requests via pip"
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
echo "[ERROR] pip3 not available, cannot install requests"
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 检查其他必需包
|
||||||
|
for pkg in "${required_packages[@]}"; do
|
||||||
|
if ! python3 -c "import $pkg" 2>/dev/null; then
|
||||||
|
missing_packages="$missing_packages $pkg"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "$missing_packages"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 安装Python包依赖
|
||||||
|
echo "[INFO] Installing Python package dependencies..."
|
||||||
|
missing_py_deps=$(check_python_packages)
|
||||||
|
|
||||||
|
if [[ -n "$missing_py_deps" ]]; then
|
||||||
|
echo "[INFO] Installing missing Python packages:$missing_py_deps"
|
||||||
|
# 如果还有缺失的包,尝试安装
|
||||||
|
if command -v pip3 >/dev/null 2>&1; then
|
||||||
|
for pkg in $missing_py_deps; do
|
||||||
|
pip3 install --no-cache-dir "$pkg" 2>/dev/null || true
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 验证Python脚本依赖
|
||||||
|
echo "[INFO] Verifying Python dependencies..."
|
||||||
|
if python3 -c "import requests; import json; import time; import logging; import hashlib; import subprocess; from datetime import datetime; from pathlib import Path; import re; import socket; print('All required modules imported successfully')" 2>/dev/null; then
|
||||||
|
echo "[INFO] Python dependencies verified successfully"
|
||||||
|
else
|
||||||
|
echo "[WARN] Some Python modules may be missing"
|
||||||
|
# 尝试查看具体缺少什么
|
||||||
|
python3 -c "
|
||||||
|
import sys
|
||||||
|
modules = ['requests', 'json', 'time', 'logging', 'hashlib', 'subprocess', 'datetime', 'pathlib', 're', 'socket']
|
||||||
|
missing = []
|
||||||
|
for module in modules:
|
||||||
|
try:
|
||||||
|
__import__(module)
|
||||||
|
print(f'✓ {module}')
|
||||||
|
except ImportError as e:
|
||||||
|
print(f'✗ {module}: {e}')
|
||||||
|
missing.append(module)
|
||||||
|
if missing:
|
||||||
|
sys.exit(1)
|
||||||
|
" 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 查找并运行Python脚本
|
||||||
|
echo "[INFO] Looking for Python configuration generator script..."
|
||||||
|
PYTHON_SCRIPT="/etc/fluent-bit/app/update_fluent_config.py"
|
||||||
|
|
||||||
|
if [ -f "$PYTHON_SCRIPT" ]; then
|
||||||
|
echo "[INFO] Found Python script: $PYTHON_SCRIPT"
|
||||||
|
|
||||||
|
# 检查脚本是否可执行,如果不可执行则添加执行权限
|
||||||
|
if [ ! -x "$PYTHON_SCRIPT" ]; then
|
||||||
|
echo "[INFO] Adding execute permission to Python script..."
|
||||||
|
chmod +x "$PYTHON_SCRIPT" 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 运行Python脚本(后台运行)
|
||||||
|
echo "[INFO] Starting configuration generator script..."
|
||||||
|
python3 "$PYTHON_SCRIPT" &
|
||||||
|
CONFIG_GEN_PID=$!
|
||||||
|
echo "[INFO] Configuration generator started with PID: $CONFIG_GEN_PID"
|
||||||
|
else
|
||||||
|
echo "[WARN] Python configuration generator script not found"
|
||||||
|
fi
|
||||||
|
|
||||||
# Wait for Elasticsearch via bash /dev/tcp to avoid curl dependency
|
# Wait for Elasticsearch via bash /dev/tcp to avoid curl dependency
|
||||||
echo "[INFO] Waiting for Elasticsearch to be ready (tcp ${ES_HOST}:${ES_PORT})..."
|
echo "[INFO] Waiting for Elasticsearch to be ready (tcp ${ES_HOST}:${ES_PORT})..."
|
||||||
for i in $(seq 1 120); do
|
for i in $(seq 1 120); do
|
||||||
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT}; then
|
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT} 2>/dev/null; then
|
||||||
exec 3<&- 3>&-
|
exec 3<&- 3>&-
|
||||||
echo "[INFO] Elasticsearch is ready"
|
echo "[INFO] Elasticsearch is ready"
|
||||||
break
|
break
|
||||||
@ -104,6 +237,10 @@ for i in $(seq 1 120); do
|
|||||||
sleep 1
|
sleep 1
|
||||||
done
|
done
|
||||||
|
|
||||||
|
echo "[INFO] Command: starting python task..."
|
||||||
|
# 启动配置生成器
|
||||||
|
#exec nohup python3 /etc/fluent-bit/app/update_fluent_config.py &
|
||||||
|
|
||||||
echo "[INFO] Starting Fluent Bit with configuration from /etc/fluent-bit/"
|
echo "[INFO] Starting Fluent Bit with configuration from /etc/fluent-bit/"
|
||||||
echo "[INFO] Command: /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf"
|
echo "[INFO] Command: /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf"
|
||||||
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf
|
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf
|
||||||
Loading…
x
Reference in New Issue
Block a user