feat: 自动生成fluent bit配置文件创建data view

This commit is contained in:
huhy 2026-01-07 17:00:22 +08:00
parent b9af54bba5
commit f87bc045ad
3 changed files with 147 additions and 87 deletions

View File

@ -4,6 +4,7 @@
支持配置文件变更检测和Fluent Bit热重启
"""
import requests
import os
import json
import time
@ -30,7 +31,7 @@ class KibanaDataViewManager:
"""Kibana Data View 管理器"""
def __init__(self):
self.kibana_host = os.environ.get("KIBANA_HOST", "localhost")
self.kibana_host = os.environ.get("KIBANA_HOST", "kibana")
self.kibana_port = os.environ.get("KIBANA_PORT", "5601")
self.kibana_user = os.environ.get("KIBANA_USER", "elastic")
self.kibana_password = os.environ.get("KIBANA_PASSWORD", "")
@ -100,7 +101,8 @@ class KibanaDataViewManager:
)
if response.status_code == 200:
data_views = response.json()
response_data = response.json()
data_views = response_data.get("data_view", [])
logger.info(f"获取到 {len(data_views)} 个Data View")
return data_views
else:
@ -138,14 +140,7 @@ class KibanaDataViewManager:
"data_view": {
"title": self.data_view_pattern,
"name": self.data_view_name,
"timeFieldName": "@timestamp",
"allowNoIndex": True, # 允许不存在索引
"allowHidden": False,
"namespace": "default",
"fieldFormats": {},
"runtimeFieldMap": {},
"fieldAttrs": {},
"allowHidden": False
"timeFieldName": "@timestamp"
}
}
@ -191,60 +186,6 @@ class KibanaDataViewManager:
logger.warning(f"检查索引存在失败: {e}")
return False
def set_default_data_view(self):
"""设置为默认Data View可选"""
try:
# 首先获取Data View的ID
url = f"{self.base_url}/api/data_views"
response = requests.get(
url,
auth=self.auth,
timeout=self.timeout,
headers=self._get_headers()
)
if response.status_code != 200:
return False
data_views = response.json()
data_view_id = None
for dv in data_views:
if dv.get("name") == self.data_view_name:
data_view_id = dv.get("id")
break
if not data_view_id:
logger.warning(f"未找到Data View: {self.data_view_name}")
return False
# 更新为默认Data View
update_url = f"{self.base_url}/api/data_views/data_view/{data_view_id}"
update_data = {
"data_view": {
"name": self.data_view_name,
"isDefault": True
}
}
response = requests.patch(
update_url,
auth=self.auth,
json=update_data,
timeout=self.timeout,
headers=self._get_headers()
)
if response.status_code == 200:
logger.info(f"成功将 '{self.data_view_name}' 设置为默认Data View")
return True
else:
logger.warning(f"设置默认Data View失败: {response.status_code}")
return False
except Exception as e:
logger.warning(f"设置默认Data View异常: {e}")
return False
def manage_data_view(self):
"""管理Data View检查并创建"""
@ -265,12 +206,6 @@ class KibanaDataViewManager:
if self.create_data_view():
logger.info(f"Data View '{self.data_view_name}' 创建成功")
# 可选设置为默认Data View
try:
self.set_default_data_view()
except Exception as e:
logger.warning(f"设置默认Data View失败: {e}")
return True
else:
logger.error(f"Data View '{self.data_view_name}' 创建失败")
@ -285,13 +220,13 @@ class ConfigGenerator:
self.fb_user = self._get_fb_user()
# 基础路径
self.base_path = Path("private/argus/agent")
self.base_path = Path("/private/argus/agent")
self.inputs_dir = Path("/etc/fluent-bit/inputs.d")
self.outputs_dir = Path("/etc/fluent-bit/outputs.d")
# 配置文件路径
self.config_file = self.base_path / self.hostname / "node.json"
# 配置文件hash值校验配置是否发生变更
hash_file = self.base_path / self.hostname / "node_json_hash.txt"
# 配置文件哈希值存储文件
@ -622,16 +557,17 @@ class ConfigGenerator:
logger.info(f"生成 {len(input_files)} 个input配置{len(output_files)} 个output配置")
# 5. 重新加载Fluent Bit
# 重新加载Fluent Bit
logger.info("配置文件已更新重新加载Fluent Bit...")
if self._reload_fluentbit():
logger.info("Fluent Bit重载成功")
logger.info("开始管理Kibana Data View...")
self.kibana_manager.manage_data_view()
else:
logger.warning("Fluent Bit重载失败但配置文件已更新")
# 不论配置是否变更每次执行都检查custom视图是否存在避免视图创建失败后若配置不再变更则无法创建
# 检查kibana data view 并进行创建data view
logger.info("开始管理Kibana Data View...")
self.kibana_manager.manage_data_view()
logger.info("任务完成")
return True
@ -672,7 +608,7 @@ class Scheduler:
# 等待到下次执行时间
logger.info(f"下次执行时间: {next_time_str}")
# 使用递减等待,以便可以响应停止信号
# 使用递减等待
for _ in range(self.interval_seconds):
if not self.running:
break
@ -690,7 +626,7 @@ class Scheduler:
def main():
# 定时调度
# 定时调度,设置调度周期
scheduler = Scheduler(3600)
# 启动调度器

View File

@ -1,6 +1,6 @@
[SERVICE]
Daemon Off
Parsers_File parsers.conf
Parsers_File /etc/fluent-bit/parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@ -33,7 +33,7 @@
[FILTER]
Name lua
Match app.*
script inject_labels.lua
script /etc/fluent-bit/inject_labels.lua
call add_labels
@INCLUDE outputs.d/*.conf

View File

@ -69,10 +69,6 @@ if [[ -n "$MISSING" ]]; then
apt-get install -f -y -qq || true
fi
# 安装 Python
apt-get install -y python3 python3-pip
pip3 install pymysql pyyaml
echo "[INFO] Fluent Bit version:"
/opt/fluent-bit/bin/fluent-bit --version || { echo "[ERROR] fluent-bit not installed or libraries missing" >&2; exit 1; }
@ -101,10 +97,138 @@ chmod 770 /buffers || true
# 目录属主设置为 fluent-bit不影响 1777 粘滞位)
chown -R fluent-bit:fluent-bit /logs /buffers 2>/dev/null || true
# 检查Python基础环境
echo "[INFO] Checking Python environment..."
echo "[INFO] Fixing broken dependencies..."
apt-get -f install -y -qq 2>/dev/null || true
# 检查python3是否安装
if ! command -v python3 >/dev/null 2>&1; then
echo "[INFO] Installing python3..."
apt-get update -qq || true
# 尝试安装最小化的python3不安装推荐包减少依赖冲突
apt-get install -y -qq --no-install-recommends python3-minimal 2>/dev/null || {
echo "[WARN] Failed to install python3 via apt, trying alternative approach..."
# 如果apt安装失败检查是否已有python3二进制文件
if [ -f /usr/bin/python3 ] && python3 --version 2>/dev/null; then
echo "[INFO] Python3 binary exists but not in PATH"
fi
}
fi
# 检查pip是否可用
if ! command -v pip3 >/dev/null 2>&1; then
# 尝试安装pip
echo "[INFO] Installing pip..."
apt-get update -qq || true
apt-get install -y -qq python3-pip 2>/dev/null || true
fi
# 检查Python脚本依赖的包
check_python_packages() {
local missing_packages=""
# 根据脚本内容确定需要的包
local required_packages=(
"requests" # HTTP请求库脚本中最关键的依赖
)
# 检查requests是否安装必须
if ! python3 -c "import requests" 2>/dev/null; then
echo "[WARN] 'requests' package not found, installing..."
if command -v pip3 >/dev/null 2>&1; then
# 尝试使用清华镜像源
PIP_INDEX_URL="https://pypi.tuna.tsinghua.edu.cn/simple"
echo "[INFO] Installing requests using pip with mirror..."
pip3 install --no-cache-dir --index-url "$PIP_INDEX_URL" requests 2>/dev/null || {
# 如果镜像失败,尝试官方源
echo "[INFO] Trying official PyPI source..."
pip3 install --no-cache-dir requests 2>/dev/null || {
echo "[ERROR] Failed to install requests via pip"
return 1
}
}
else
echo "[ERROR] pip3 not available, cannot install requests"
return 1
fi
fi
# 检查其他必需包
for pkg in "${required_packages[@]}"; do
if ! python3 -c "import $pkg" 2>/dev/null; then
missing_packages="$missing_packages $pkg"
fi
done
echo "$missing_packages"
}
# 安装Python包依赖
echo "[INFO] Installing Python package dependencies..."
missing_py_deps=$(check_python_packages)
if [[ -n "$missing_py_deps" ]]; then
echo "[INFO] Installing missing Python packages:$missing_py_deps"
# 如果还有缺失的包,尝试安装
if command -v pip3 >/dev/null 2>&1; then
for pkg in $missing_py_deps; do
pip3 install --no-cache-dir "$pkg" 2>/dev/null || true
done
fi
fi
# 验证Python脚本依赖
echo "[INFO] Verifying Python dependencies..."
if python3 -c "import requests; import json; import time; import logging; import hashlib; import subprocess; from datetime import datetime; from pathlib import Path; import re; import socket; print('All required modules imported successfully')" 2>/dev/null; then
echo "[INFO] Python dependencies verified successfully"
else
echo "[WARN] Some Python modules may be missing"
# 尝试查看具体缺少什么
python3 -c "
import sys
modules = ['requests', 'json', 'time', 'logging', 'hashlib', 'subprocess', 'datetime', 'pathlib', 're', 'socket']
missing = []
for module in modules:
try:
__import__(module)
print(f'✓ {module}')
except ImportError as e:
print(f'✗ {module}: {e}')
missing.append(module)
if missing:
sys.exit(1)
" 2>/dev/null || true
fi
# 查找并运行Python脚本
echo "[INFO] Looking for Python configuration generator script..."
PYTHON_SCRIPT="/etc/fluent-bit/app/update_fluent_config.py"
if [ -f "$PYTHON_SCRIPT" ]; then
echo "[INFO] Found Python script: $PYTHON_SCRIPT"
# 检查脚本是否可执行,如果不可执行则添加执行权限
if [ ! -x "$PYTHON_SCRIPT" ]; then
echo "[INFO] Adding execute permission to Python script..."
chmod +x "$PYTHON_SCRIPT" 2>/dev/null || true
fi
# 运行Python脚本后台运行
echo "[INFO] Starting configuration generator script..."
python3 "$PYTHON_SCRIPT" &
CONFIG_GEN_PID=$!
echo "[INFO] Configuration generator started with PID: $CONFIG_GEN_PID"
else
echo "[WARN] Python configuration generator script not found"
fi
# Wait for Elasticsearch via bash /dev/tcp to avoid curl dependency
echo "[INFO] Waiting for Elasticsearch to be ready (tcp ${ES_HOST}:${ES_PORT})..."
for i in $(seq 1 120); do
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT}; then
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT} 2>/dev/null; then
exec 3<&- 3>&-
echo "[INFO] Elasticsearch is ready"
break
@ -115,8 +239,8 @@ done
echo "[INFO] Command: starting python task..."
# 启动配置生成器
exec nohup python3 /etc/fluent-bit/app/update_fluent_config.py &
#exec nohup python3 /etc/fluent-bit/app/update_fluent_config.py &
echo "[INFO] Starting Fluent Bit with configuration from /etc/fluent-bit/"
echo "[INFO] Command: /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf"
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf