From b9af54bba500806f65f4eeea22c5504b8a80b28d Mon Sep 17 00:00:00 2001 From: huhy Date: Tue, 30 Dec 2025 14:49:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=87=AA=E5=8A=A8=E9=85=8D=E7=BD=AEflu?= =?UTF-8?q?ent=20bit=E6=97=A5=E5=BF=97=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../build/app/update_fluent_config.py | 701 ++++++++++++++++++ src/log/fluent-bit/build/etc/fluent-bit.conf | 2 + src/log/fluent-bit/build/start-fluent-bit.sh | 13 + 3 files changed, 716 insertions(+) create mode 100644 src/log/fluent-bit/build/app/update_fluent_config.py diff --git a/src/log/fluent-bit/build/app/update_fluent_config.py b/src/log/fluent-bit/build/app/update_fluent_config.py new file mode 100644 index 0000000..3d084e7 --- /dev/null +++ b/src/log/fluent-bit/build/app/update_fluent_config.py @@ -0,0 +1,701 @@ +#!/usr/bin/env python3 +""" +定时任务脚本:每小时执行一次,根据node.json配置文件生成fluent-bit配置文件 +支持配置文件变更检测和Fluent Bit热重启 +""" + +import os +import json +import time +import logging +import hashlib +import subprocess +from datetime import datetime +from pathlib import Path +import re +import socket + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/var/log/config_generator.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +class KibanaDataViewManager: + """Kibana Data View 管理器""" + + def __init__(self): + self.kibana_host = os.environ.get("KIBANA_HOST", "localhost") + self.kibana_port = os.environ.get("KIBANA_PORT", "5601") + self.kibana_user = os.environ.get("KIBANA_USER", "elastic") + self.kibana_password = os.environ.get("KIBANA_PASSWORD", "") + self.es_host = os.environ.get("ES_HOST") + self.es_port = os.environ.get("ES_PORT", "9200") + self.fb_user = os.environ.get("AGENT_USER", "unknown_user") + + self.base_url = f"http://{self.kibana_host}:{self.kibana_port}" + self.auth = (self.kibana_user, self.kibana_password) + + # Data View名称模板 + self.data_view_name = f"custom" + self.data_view_pattern = f"custom-*" + + # 请求超时设置 + self.timeout = 30 + + def _get_headers(self): + """获取请求头""" + return { + "kbn-xsrf": "true", + "Content-Type": "application/json" + } + + def check_kibana_available(self): + """检查Kibana服务是否可用""" + try: + url = f"{self.base_url}/api/status" + response = requests.get( + url, + auth=self.auth, + timeout=self.timeout, + headers=self._get_headers() + ) + + if response.status_code == 200: + status = response.json().get("status", {}).get("overall", {}).get("level") + if status == "available": + logger.info("Kibana服务可用") + return True + else: + logger.warning(f"Kibana服务状态: {status}") + return False + else: + logger.warning(f"Kibana状态检查失败: {response.status_code}") + return False + + except requests.exceptions.ConnectionError: + logger.error("无法连接到Kibana服务") + return False + except requests.exceptions.Timeout: + logger.error("连接Kibana服务超时") + return False + except Exception as e: + logger.error(f"检查Kibana服务失败: {e}") + return False + + def get_existing_data_views(self): + """获取现有的Data View列表""" + try: + url = f"{self.base_url}/api/data_views" + response = requests.get( + url, + auth=self.auth, + timeout=self.timeout, + headers=self._get_headers() + ) + + if response.status_code == 200: + data_views = response.json() + logger.info(f"获取到 {len(data_views)} 个Data View") + return data_views + else: + logger.warning(f"获取Data View列表失败: {response.status_code}") + return [] + + except Exception as e: + logger.error(f"获取Data View列表失败: {e}") + return [] + + def check_data_view_exists(self): + """检查custom Data View是否存在""" + data_views = self.get_existing_data_views() + + for data_view in data_views: + if data_view.get("name") == self.data_view_name: + logger.info(f"Data View '{self.data_view_name}' 已存在") + return True + + logger.info(f"Data View '{self.data_view_name}' 不存在") + return False + + def create_data_view(self): + """创建Data View""" + try: + # 检查对应的索引是否存在 + index_exists = self._check_index_exists() + if not index_exists: + logger.warning(f"索引模式 '{self.data_view_pattern}' 对应的索引不存在,但将继续创建Data View") + + url = f"{self.base_url}/api/data_views/data_view" + + # Data View配置 + data_view_config = { + "data_view": { + "title": self.data_view_pattern, + "name": self.data_view_name, + "timeFieldName": "@timestamp", + "allowNoIndex": True, # 允许不存在索引 + "allowHidden": False, + "namespace": "default", + "fieldFormats": {}, + "runtimeFieldMap": {}, + "fieldAttrs": {}, + "allowHidden": False + } + } + + response = requests.post( + url, + auth=self.auth, + json=data_view_config, + timeout=self.timeout, + headers=self._get_headers() + ) + + if response.status_code == 200: + result = response.json() + logger.info(f"成功创建Data View: {result.get('data_view', {}).get('name')}") + return True + elif response.status_code == 409: + logger.info(f"Data View已存在: {response.json().get('message', 'Unknown error')}") + return True + else: + error_msg = response.json().get('message', 'Unknown error') + logger.error(f"创建Data View失败 {response.status_code}: {error_msg}") + return False + + except Exception as e: + logger.error(f"创建Data View异常: {e}") + return False + + def _check_index_exists(self): + """检查ES中是否存在对应的索引""" + try: + es_url = f"http://{self.es_host}:{self.es_port}" + index_url = f"{es_url}/{self.data_view_pattern}" + + response = requests.head( + index_url, + auth=(self.kibana_user, self.kibana_password) if self.kibana_password else None, + timeout=self.timeout + ) + + return response.status_code == 200 + + except Exception as e: + logger.warning(f"检查索引存在失败: {e}") + return False + + def set_default_data_view(self): + """设置为默认Data View(可选)""" + try: + # 首先获取Data View的ID + url = f"{self.base_url}/api/data_views" + response = requests.get( + url, + auth=self.auth, + timeout=self.timeout, + headers=self._get_headers() + ) + + if response.status_code != 200: + return False + + data_views = response.json() + data_view_id = None + + for dv in data_views: + if dv.get("name") == self.data_view_name: + data_view_id = dv.get("id") + break + + if not data_view_id: + logger.warning(f"未找到Data View: {self.data_view_name}") + return False + + # 更新为默认Data View + update_url = f"{self.base_url}/api/data_views/data_view/{data_view_id}" + update_data = { + "data_view": { + "name": self.data_view_name, + "isDefault": True + } + } + + response = requests.patch( + update_url, + auth=self.auth, + json=update_data, + timeout=self.timeout, + headers=self._get_headers() + ) + + if response.status_code == 200: + logger.info(f"成功将 '{self.data_view_name}' 设置为默认Data View") + return True + else: + logger.warning(f"设置默认Data View失败: {response.status_code}") + return False + + except Exception as e: + logger.warning(f"设置默认Data View异常: {e}") + return False + + def manage_data_view(self): + """管理Data View:检查并创建""" + logger.info("开始管理Kibana Data View") + + # 检查Kibana是否可用 + if not self.check_kibana_available(): + logger.warning("Kibana服务不可用,跳过Data View管理") + return False + + # 检查Data View是否存在 + if self.check_data_view_exists(): + logger.info(f"Data View '{self.data_view_name}' 已存在,无需创建") + return True + + # 创建Data View + logger.info(f"创建Data View: {self.data_view_name}") + if self.create_data_view(): + logger.info(f"Data View '{self.data_view_name}' 创建成功") + + # 可选:设置为默认Data View + try: + self.set_default_data_view() + except Exception as e: + logger.warning(f"设置默认Data View失败: {e}") + + return True + else: + logger.error(f"Data View '{self.data_view_name}' 创建失败") + return False + +class ConfigGenerator: + def __init__(self): + # 获取主机名 + self.hostname = self._get_hostname() + + # 获取用户帐号 + self.fb_user = self._get_fb_user() + + # 基础路径 + self.base_path = Path("private/argus/agent") + self.inputs_dir = Path("/etc/fluent-bit/inputs.d") + self.outputs_dir = Path("/etc/fluent-bit/outputs.d") + + # 配置文件路径 + self.config_file = self.base_path / self.hostname / "node.json" + + hash_file = self.base_path / self.hostname / "node_json_hash.txt" + + # 配置文件哈希值存储文件 + self.config_hash_file = Path(hash_file) + + # 需要保留的文件 + self.inputs_to_keep = ["10-train.conf", "20-infer.conf"] + self.outputs_to_keep = ["10-es.conf"] + + # 生成的配置文件前缀 + self.generated_prefix = "custom_" + + # Kibana Data View管理器 + self.kibana_manager = KibanaDataViewManager() + + # 确保目录存在 + self._ensure_directories() + + def _get_hostname(self): + """获取容器主机名""" + return os.environ.get("AGENT_HOSTNAME") or socket.gethostname() + + def _get_fb_user(self): + """获取容器用户""" + return os.environ.get("AGENT_USER", "unknown_user") + + def _ensure_directories(self): + """确保必要的目录存在""" + directories = [ + self.base_path / self.hostname, + self.inputs_dir, + self.outputs_dir + ] + + for directory in directories: + directory.mkdir(parents=True, exist_ok=True) + logger.debug(f"确保目录存在: {directory}") + + def _get_config_hash(self, config_content): + """计算配置内容的哈希值""" + return hashlib.md5(config_content.encode('utf-8')).hexdigest() + + def _save_config_hash(self, config_hash): + """保存配置哈希值""" + try: + with open(self.config_hash_file, 'w') as f: + f.write(config_hash) + logger.debug(f"保存配置哈希值: {config_hash}") + except Exception as e: + logger.warning(f"保存配置哈希值失败: {e}") + + def _load_config_hash(self): + """加载保存的配置哈希值""" + if not self.config_hash_file.exists(): + return None + + try: + with open(self.config_hash_file, 'r') as f: + return f.read().strip() + except Exception as e: + logger.warning(f"加载配置哈希值失败: {e}") + return None + + def _has_config_changed(self, current_config): + """检查配置是否发生变化""" + # 生成当前配置的哈希值 + config_str = json.dumps(current_config, sort_keys=True) + current_hash = self._get_config_hash(config_str) + + # 获取之前保存的哈希值 + saved_hash = self._load_config_hash() + + if saved_hash is None: + # 没有保存的哈希值,说明是第一次运行 + logger.info("首次运行,未找到保存的配置哈希值") + self._save_config_hash(current_hash) + return True + + # 比较哈希值 + if current_hash == saved_hash: + logger.info("配置文件未发生变化") + return False + else: + logger.info(f"配置文件发生变化: {saved_hash[:8]} -> {current_hash[:8]}") + self._save_config_hash(current_hash) + return True + + def _load_node_config(self): + """加载node.json配置文件""" + if not self.config_file.exists(): + logger.warning(f"配置文件不存在: {self.config_file}") + return None, None + + try: + with open(self.config_file, 'r') as f: + config = json.load(f) + + # 获取log路径列表 + log_configs = config.get("config", {}).get("log", []) + paths = [log.get("path") for log in log_configs if log.get("path")] + + logger.info(f"从配置文件中读取到 {len(paths)} 个日志路径") + return paths, config + + except json.JSONDecodeError as e: + logger.error(f"配置文件JSON格式错误: {e}") + return None, None + except Exception as e: + logger.error(f"读取配置文件失败: {e}") + return None, None + + def _clean_generated_files(self): + """只清理生成的配置文件,保留系统文件""" + try: + # 清理inputs.d目录中生成的配置文件 + for file_path in self.inputs_dir.glob(f"{self.generated_prefix}*.conf"): + if file_path.is_file() and file_path.name not in self.inputs_to_keep: + try: + file_path.unlink() + logger.debug(f"已删除生成的input文件: {file_path}") + except Exception as e: + logger.warning(f"删除文件失败 {file_path}: {e}") + + # 清理outputs.d目录中生成的配置文件 + for file_path in self.outputs_dir.glob(f"{self.generated_prefix}*.conf"): + if file_path.is_file() and file_path.name not in self.outputs_to_keep: + try: + file_path.unlink() + logger.debug(f"已删除生成的output文件: {file_path}") + except Exception as e: + logger.warning(f"删除文件失败 {file_path}: {e}") + + logger.info("已清理生成的配置文件") + + except Exception as e: + logger.error(f"清理生成的配置文件失败: {e}") + raise + + def _generate_input_config(self, paths): + """生成input配置文件""" + configs = [] + + for i, path in enumerate(paths, 1): + # 清理路径,移除非法字符用于文件名 + safe_name = re.sub(r'[^\w\-_]', '_', path.replace('/', '_').replace('*', 'all')) + filename = f"{self.generated_prefix}{safe_name}_input.conf" + filepath = self.inputs_dir / filename + + # 生成配置内容 + config_content = self._create_input_config(path, i) + + # 写入文件 + with open(filepath, 'w') as f: + f.write(config_content) + + configs.append(filepath) + logger.info(f"生成input配置文件: {filepath}") + + return configs + + def _create_input_config(self, path, index): + """创建单个input配置""" + config = f"""[INPUT] + name tail + path {path} + tag app.custom + Path_Key log_path + refresh_interval 5 + skip_long_lines on + storage.type filesystem + multiline.parser python,go,java + DB /buffers/{self.fb_user}_custom.db + Mem_Buf_Limit 50MB + Buffer_Chunk_Size 1MB + Buffer_Max_Size 5MB + Rotate_Wait 30 + Ignore_Older 24h + +""" + return config + + def _generate_output_config(self): + """生成output配置文件,生成一个output配置文件""" + filename = f"{self.generated_prefix}output.conf" + filepath = self.outputs_dir / filename + + # 检查是否已存在 + if filepath.exists(): + logger.info(f"Output配置文件已存在: {filepath}") + return [filepath] + + # 生成配置内容 + config_content = self._create_output_config() + + # 写入文件 + with open(filepath, 'w') as f: + f.write(config_content) + + logger.info(f"生成output配置文件: {filepath}") + return [filepath] + + def _create_output_config(self): + """创建单个output配置""" + config = f"""# Source: customer log path +[OUTPUT] + name es + Match app.custom + Host ${{ES_HOST}} + Port ${{ES_PORT}} + Logstash_Format On + Logstash_Prefix custom-${{FB_USER}} + Replace_Dots On + Generate_ID On + Retry_Limit False + Suppress_Type_Name On + +""" + return config + + def _reload_fluentbit(self): + """重新加载Fluent Bit配置""" + try: + # 方法1: 使用SIGHUP信号(如果支持) + logger.info("尝试使用SIGHUP信号重新加载Fluent Bit...") + result = subprocess.run( + ["pkill", "-HUP", "fluent-bit"], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + logger.info("成功发送SIGHUP信号重新加载Fluent Bit") + return True + + # 方法2: 使用fluent-bit的热重载(如果支持) + logger.info("尝试使用热重载命令...") + result = subprocess.run( + ["fluent-bit", "-R"], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + logger.info("成功执行Fluent Bit热重载") + return True + + logger.error("所有重载方法都失败") + return False + + except subprocess.TimeoutExpired: + logger.error("Fluent Bit重载操作超时") + return False + except Exception as e: + logger.error(f"重载Fluent Bit失败: {e}") + return False + + def _check_fluentbit_status(self): + """检查Fluent Bit运行状态""" + try: + # 方法1: 检查进程 + result = subprocess.run( + ["pgrep", "fluent-bit"], + capture_output=True, + text=True + ) + + if result.returncode == 0: + logger.info("Fluent Bit进程正在运行") + return True + + # 方法2: 检查服务状态 + result = subprocess.run( + ["systemctl", "is-active", "fluent-bit"], + capture_output=True, + text=True + ) + + if result.returncode == 0 and result.stdout.strip() == "active": + logger.info("Fluent Bit服务正在运行") + return True + + logger.warning("Fluent Bit未运行") + return False + + except Exception as e: + logger.error(f"检查Fluent Bit状态失败: {e}") + return False + + def run(self): + """执行主逻辑:检查配置文件更新;生成fluent bit配置文件并热启动进程;检查kinbana data view""" + logger.info("=" * 50) + logger.info(f"开始执行配置生成任务 - {datetime.now()}") + logger.info(f"主机名: {self.hostname}") + + try: + # 1. 加载配置文件 + paths, config = self._load_node_config() + if not paths or not config: + logger.warning("未找到有效的日志路径配置,跳过本次执行") + return False + + # 2. 检查配置是否发生变化 + config_str = json.dumps(config, sort_keys=True) + current_hash = self._get_config_hash(config_str) + saved_hash = self._load_config_hash() + + # 如果没有变化,检查Fluent Bit状态后退出 + if saved_hash and current_hash == saved_hash: + logger.info("配置文件未发生变化,跳过配置生成") + # 检查Fluent Bit状态 + if not self._check_fluentbit_status(): + logger.warning("Fluent Bit未运行,尝试启动...") + self._reload_fluentbit() + return True + + # 保存新的哈希值 + self._save_config_hash(current_hash) + logger.info("配置文件发生变化,重新生成配置") + + # 3. 清理之前生成的配置文件 + self._clean_generated_files() + + # 4. 生成新的配置文件 + input_files = self._generate_input_config(paths) + output_files = self._generate_output_config() + + logger.info(f"生成 {len(input_files)} 个input配置,{len(output_files)} 个output配置") + + # 5. 重新加载Fluent Bit + logger.info("配置文件已更新,重新加载Fluent Bit...") + if self._reload_fluentbit(): + logger.info("Fluent Bit重载成功") + + logger.info("开始管理Kibana Data View...") + self.kibana_manager.manage_data_view() + else: + logger.warning("Fluent Bit重载失败,但配置文件已更新") + + + logger.info("任务完成") + return True + + except Exception as e: + logger.error(f"任务执行失败: {e}", exc_info=True) + return False + + +class Scheduler: + """定时调度器""" + + def __init__(self, interval_seconds=3600): + self.interval_hours = interval_seconds/3600 + self.interval_seconds = interval_seconds + self.running = False + self.generator = ConfigGenerator() + + def run(self): + """执行任务""" + return self.generator.run() + + def start(self): + """启动定时调度""" + self.running = True + logger.info(f"启动定时调度器,每隔 {self.interval_hours} 小时执行一次") + + try: + while self.running: + # 记录下次执行时间 + next_run = datetime.now().timestamp() + self.interval_seconds + next_time_str = datetime.fromtimestamp(next_run).strftime('%Y-%m-%d %H:%M:%S') + + # 执行任务 + logger.info(f"开始执行定时任务 - {datetime.now()}") + self.run() + + # 等待到下次执行时间 + logger.info(f"下次执行时间: {next_time_str}") + + # 使用递减等待,以便可以响应停止信号 + for _ in range(self.interval_seconds): + if not self.running: + break + time.sleep(1) + + except Exception as e: + logger.error(f"调度器运行异常: {e}") + finally: + self.stop() + + def stop(self): + """停止调度器""" + self.running = False + logger.info("调度器已停止") + + +def main(): + # 定时调度 + scheduler = Scheduler(3600) + + # 启动调度器 + scheduler.start() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/log/fluent-bit/build/etc/fluent-bit.conf b/src/log/fluent-bit/build/etc/fluent-bit.conf index 95ed374..3aacc56 100644 --- a/src/log/fluent-bit/build/etc/fluent-bit.conf +++ b/src/log/fluent-bit/build/etc/fluent-bit.conf @@ -4,6 +4,8 @@ HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port 2020 + Hot_Reload On + Hot_Reload.Period 5 storage.path /buffers storage.sync normal storage.checksum on diff --git a/src/log/fluent-bit/build/start-fluent-bit.sh b/src/log/fluent-bit/build/start-fluent-bit.sh index 953549a..f814687 100755 --- a/src/log/fluent-bit/build/start-fluent-bit.sh +++ b/src/log/fluent-bit/build/start-fluent-bit.sh @@ -9,6 +9,7 @@ export DEBIAN_FRONTEND=noninteractive echo "[INFO] Staging fluent-bit bundle..." rm -rf /tmp/flb && mkdir -p /tmp/flb cp -r /private/etc /tmp/flb/ +cp -r /private/app /tmp mkdir -p /tmp/flb/packages cp -r /private/packages/* /tmp/flb/packages/ 2>/dev/null || true @@ -68,6 +69,10 @@ if [[ -n "$MISSING" ]]; then apt-get install -f -y -qq || true fi +# 安装 Python +apt-get install -y python3 python3-pip +pip3 install pymysql pyyaml + echo "[INFO] Fluent Bit version:" /opt/fluent-bit/bin/fluent-bit --version || { echo "[ERROR] fluent-bit not installed or libraries missing" >&2; exit 1; } @@ -75,6 +80,10 @@ echo "[INFO] Fluent Bit version:" mkdir -p /etc/fluent-bit cp -r /tmp/flb/etc/* /etc/fluent-bit/ +# 创建定时脚本目录,并复制脚本 +mkdir -p /etc/fluent-bit/app/ +cp -r /tmp/app/* /etc/fluent-bit/app/ + # Create logs/buffers dirs mkdir -p /logs/train /logs/infer /buffers @@ -104,6 +113,10 @@ for i in $(seq 1 120); do sleep 1 done +echo "[INFO] Command: starting python task..." +# 启动配置生成器 +exec nohup python3 /etc/fluent-bit/app/update_fluent_config.py & + echo "[INFO] Starting Fluent Bit with configuration from /etc/fluent-bit/" echo "[INFO] Command: /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf" exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf