huhy_20251211 #54

Open
huhy wants to merge 2 commits from huhy_20251211 into dev_1.0.0
3 changed files with 780 additions and 4 deletions

View File

@ -0,0 +1,637 @@
#!/usr/bin/env python3
"""
定时任务脚本每小时执行一次根据node.json配置文件生成fluent-bit配置文件
支持配置文件变更检测和Fluent Bit热重启
"""
import requests
import os
import json
import time
import logging
import hashlib
import subprocess
from datetime import datetime
from pathlib import Path
import re
import socket
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/config_generator.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class KibanaDataViewManager:
"""Kibana Data View 管理器"""
def __init__(self):
self.kibana_host = os.environ.get("KIBANA_HOST", "kibana")
self.kibana_port = os.environ.get("KIBANA_PORT", "5601")
self.kibana_user = os.environ.get("KIBANA_USER", "elastic")
self.kibana_password = os.environ.get("KIBANA_PASSWORD", "")
self.es_host = os.environ.get("ES_HOST")
self.es_port = os.environ.get("ES_PORT", "9200")
self.fb_user = os.environ.get("AGENT_USER", "unknown_user")
self.base_url = f"http://{self.kibana_host}:{self.kibana_port}"
self.auth = (self.kibana_user, self.kibana_password)
# Data View名称模板
self.data_view_name = f"custom"
self.data_view_pattern = f"custom-*"
# 请求超时设置
self.timeout = 30
def _get_headers(self):
"""获取请求头"""
return {
"kbn-xsrf": "true",
"Content-Type": "application/json"
}
def check_kibana_available(self):
"""检查Kibana服务是否可用"""
try:
url = f"{self.base_url}/api/status"
response = requests.get(
url,
auth=self.auth,
timeout=self.timeout,
headers=self._get_headers()
)
if response.status_code == 200:
status = response.json().get("status", {}).get("overall", {}).get("level")
if status == "available":
logger.info("Kibana服务可用")
return True
else:
logger.warning(f"Kibana服务状态: {status}")
return False
else:
logger.warning(f"Kibana状态检查失败: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
logger.error("无法连接到Kibana服务")
return False
except requests.exceptions.Timeout:
logger.error("连接Kibana服务超时")
return False
except Exception as e:
logger.error(f"检查Kibana服务失败: {e}")
return False
def get_existing_data_views(self):
"""获取现有的Data View列表"""
try:
url = f"{self.base_url}/api/data_views"
response = requests.get(
url,
auth=self.auth,
timeout=self.timeout,
headers=self._get_headers()
)
if response.status_code == 200:
response_data = response.json()
data_views = response_data.get("data_view", [])
logger.info(f"获取到 {len(data_views)} 个Data View")
return data_views
else:
logger.warning(f"获取Data View列表失败: {response.status_code}")
return []
except Exception as e:
logger.error(f"获取Data View列表失败: {e}")
return []
def check_data_view_exists(self):
"""检查custom Data View是否存在"""
data_views = self.get_existing_data_views()
for data_view in data_views:
if data_view.get("name") == self.data_view_name:
logger.info(f"Data View '{self.data_view_name}' 已存在")
return True
logger.info(f"Data View '{self.data_view_name}' 不存在")
return False
def create_data_view(self):
"""创建Data View"""
try:
# 检查对应的索引是否存在
index_exists = self._check_index_exists()
if not index_exists:
logger.warning(f"索引模式 '{self.data_view_pattern}' 对应的索引不存在但将继续创建Data View")
url = f"{self.base_url}/api/data_views/data_view"
# Data View配置
data_view_config = {
"data_view": {
"title": self.data_view_pattern,
"name": self.data_view_name,
"timeFieldName": "@timestamp"
}
}
response = requests.post(
url,
auth=self.auth,
json=data_view_config,
timeout=self.timeout,
headers=self._get_headers()
)
if response.status_code == 200:
result = response.json()
logger.info(f"成功创建Data View: {result.get('data_view', {}).get('name')}")
return True
elif response.status_code == 409:
logger.info(f"Data View已存在: {response.json().get('message', 'Unknown error')}")
return True
else:
error_msg = response.json().get('message', 'Unknown error')
logger.error(f"创建Data View失败 {response.status_code}: {error_msg}")
return False
except Exception as e:
logger.error(f"创建Data View异常: {e}")
return False
def _check_index_exists(self):
"""检查ES中是否存在对应的索引"""
try:
es_url = f"http://{self.es_host}:{self.es_port}"
index_url = f"{es_url}/{self.data_view_pattern}"
response = requests.head(
index_url,
auth=(self.kibana_user, self.kibana_password) if self.kibana_password else None,
timeout=self.timeout
)
return response.status_code == 200
except Exception as e:
logger.warning(f"检查索引存在失败: {e}")
return False
def manage_data_view(self):
"""管理Data View检查并创建"""
logger.info("开始管理Kibana Data View")
# 检查Kibana是否可用
if not self.check_kibana_available():
logger.warning("Kibana服务不可用跳过Data View管理")
return False
# 检查Data View是否存在
if self.check_data_view_exists():
logger.info(f"Data View '{self.data_view_name}' 已存在,无需创建")
return True
# 创建Data View
logger.info(f"创建Data View: {self.data_view_name}")
if self.create_data_view():
logger.info(f"Data View '{self.data_view_name}' 创建成功")
return True
else:
logger.error(f"Data View '{self.data_view_name}' 创建失败")
return False
class ConfigGenerator:
def __init__(self):
# 获取主机名
self.hostname = self._get_hostname()
# 获取用户帐号
self.fb_user = self._get_fb_user()
# 基础路径
self.base_path = Path("/private/argus/agent")
self.inputs_dir = Path("/etc/fluent-bit/inputs.d")
self.outputs_dir = Path("/etc/fluent-bit/outputs.d")
# 配置文件路径
self.config_file = self.base_path / self.hostname / "node.json"
# 配置文件hash值校验配置是否发生变更
hash_file = self.base_path / self.hostname / "node_json_hash.txt"
# 配置文件哈希值存储文件
self.config_hash_file = Path(hash_file)
# 需要保留的文件
self.inputs_to_keep = ["10-train.conf", "20-infer.conf"]
self.outputs_to_keep = ["10-es.conf"]
# 生成的配置文件前缀
self.generated_prefix = "custom_"
# Kibana Data View管理器
self.kibana_manager = KibanaDataViewManager()
# 确保目录存在
self._ensure_directories()
def _get_hostname(self):
"""获取容器主机名"""
return os.environ.get("AGENT_HOSTNAME") or socket.gethostname()
def _get_fb_user(self):
"""获取容器用户"""
return os.environ.get("AGENT_USER", "unknown_user")
def _ensure_directories(self):
"""确保必要的目录存在"""
directories = [
self.base_path / self.hostname,
self.inputs_dir,
self.outputs_dir
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
logger.debug(f"确保目录存在: {directory}")
def _get_config_hash(self, config_content):
"""计算配置内容的哈希值"""
return hashlib.md5(config_content.encode('utf-8')).hexdigest()
def _save_config_hash(self, config_hash):
"""保存配置哈希值"""
try:
with open(self.config_hash_file, 'w') as f:
f.write(config_hash)
logger.debug(f"保存配置哈希值: {config_hash}")
except Exception as e:
logger.warning(f"保存配置哈希值失败: {e}")
def _load_config_hash(self):
"""加载保存的配置哈希值"""
if not self.config_hash_file.exists():
return None
try:
with open(self.config_hash_file, 'r') as f:
return f.read().strip()
except Exception as e:
logger.warning(f"加载配置哈希值失败: {e}")
return None
def _has_config_changed(self, current_config):
"""检查配置是否发生变化"""
# 生成当前配置的哈希值
config_str = json.dumps(current_config, sort_keys=True)
current_hash = self._get_config_hash(config_str)
# 获取之前保存的哈希值
saved_hash = self._load_config_hash()
if saved_hash is None:
# 没有保存的哈希值,说明是第一次运行
logger.info("首次运行,未找到保存的配置哈希值")
self._save_config_hash(current_hash)
return True
# 比较哈希值
if current_hash == saved_hash:
logger.info("配置文件未发生变化")
return False
else:
logger.info(f"配置文件发生变化: {saved_hash[:8]} -> {current_hash[:8]}")
self._save_config_hash(current_hash)
return True
def _load_node_config(self):
"""加载node.json配置文件"""
if not self.config_file.exists():
logger.warning(f"配置文件不存在: {self.config_file}")
return None, None
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
# 获取log路径列表
log_configs = config.get("config", {}).get("log", [])
paths = [log.get("path") for log in log_configs if log.get("path")]
logger.info(f"从配置文件中读取到 {len(paths)} 个日志路径")
return paths, config
except json.JSONDecodeError as e:
logger.error(f"配置文件JSON格式错误: {e}")
return None, None
except Exception as e:
logger.error(f"读取配置文件失败: {e}")
return None, None
def _clean_generated_files(self):
"""只清理生成的配置文件,保留系统文件"""
try:
# 清理inputs.d目录中生成的配置文件
for file_path in self.inputs_dir.glob(f"{self.generated_prefix}*.conf"):
if file_path.is_file() and file_path.name not in self.inputs_to_keep:
try:
file_path.unlink()
logger.debug(f"已删除生成的input文件: {file_path}")
except Exception as e:
logger.warning(f"删除文件失败 {file_path}: {e}")
# 清理outputs.d目录中生成的配置文件
for file_path in self.outputs_dir.glob(f"{self.generated_prefix}*.conf"):
if file_path.is_file() and file_path.name not in self.outputs_to_keep:
try:
file_path.unlink()
logger.debug(f"已删除生成的output文件: {file_path}")
except Exception as e:
logger.warning(f"删除文件失败 {file_path}: {e}")
logger.info("已清理生成的配置文件")
except Exception as e:
logger.error(f"清理生成的配置文件失败: {e}")
raise
def _generate_input_config(self, paths):
"""生成input配置文件"""
configs = []
for i, path in enumerate(paths, 1):
# 清理路径,移除非法字符用于文件名
safe_name = re.sub(r'[^\w\-_]', '_', path.replace('/', '_').replace('*', 'all'))
filename = f"{self.generated_prefix}{safe_name}_input.conf"
filepath = self.inputs_dir / filename
# 生成配置内容
config_content = self._create_input_config(path, i)
# 写入文件
with open(filepath, 'w') as f:
f.write(config_content)
configs.append(filepath)
logger.info(f"生成input配置文件: {filepath}")
return configs
def _create_input_config(self, path, index):
"""创建单个input配置"""
config = f"""[INPUT]
name tail
path {path}
tag app.custom
Path_Key log_path
refresh_interval 5
skip_long_lines on
storage.type filesystem
multiline.parser python,go,java
DB /buffers/{self.fb_user}_custom.db
Mem_Buf_Limit 50MB
Buffer_Chunk_Size 1MB
Buffer_Max_Size 5MB
Rotate_Wait 30
Ignore_Older 24h
"""
return config
def _generate_output_config(self):
"""生成output配置文件生成一个output配置文件"""
filename = f"{self.generated_prefix}output.conf"
filepath = self.outputs_dir / filename
# 检查是否已存在
if filepath.exists():
logger.info(f"Output配置文件已存在: {filepath}")
return [filepath]
# 生成配置内容
config_content = self._create_output_config()
# 写入文件
with open(filepath, 'w') as f:
f.write(config_content)
logger.info(f"生成output配置文件: {filepath}")
return [filepath]
def _create_output_config(self):
"""创建单个output配置"""
config = f"""# Source: customer log path
[OUTPUT]
name es
Match app.custom
Host ${{ES_HOST}}
Port ${{ES_PORT}}
Logstash_Format On
Logstash_Prefix custom-${{FB_USER}}
Replace_Dots On
Generate_ID On
Retry_Limit False
Suppress_Type_Name On
"""
return config
def _reload_fluentbit(self):
"""重新加载Fluent Bit配置"""
try:
# 方法1: 使用SIGHUP信号如果支持
logger.info("尝试使用SIGHUP信号重新加载Fluent Bit...")
result = subprocess.run(
["pkill", "-HUP", "fluent-bit"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logger.info("成功发送SIGHUP信号重新加载Fluent Bit")
return True
# 方法2: 使用fluent-bit的热重载如果支持
logger.info("尝试使用热重载命令...")
result = subprocess.run(
["fluent-bit", "-R"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logger.info("成功执行Fluent Bit热重载")
return True
logger.error("所有重载方法都失败")
return False
except subprocess.TimeoutExpired:
logger.error("Fluent Bit重载操作超时")
return False
except Exception as e:
logger.error(f"重载Fluent Bit失败: {e}")
return False
def _check_fluentbit_status(self):
"""检查Fluent Bit运行状态"""
try:
# 方法1: 检查进程
result = subprocess.run(
["pgrep", "fluent-bit"],
capture_output=True,
text=True
)
if result.returncode == 0:
logger.info("Fluent Bit进程正在运行")
return True
# 方法2: 检查服务状态
result = subprocess.run(
["systemctl", "is-active", "fluent-bit"],
capture_output=True,
text=True
)
if result.returncode == 0 and result.stdout.strip() == "active":
logger.info("Fluent Bit服务正在运行")
return True
logger.warning("Fluent Bit未运行")
return False
except Exception as e:
logger.error(f"检查Fluent Bit状态失败: {e}")
return False
def run(self):
"""执行主逻辑检查配置文件更新生成fluent bit配置文件并热启动进程检查kinbana data view"""
logger.info("=" * 50)
logger.info(f"开始执行配置生成任务 - {datetime.now()}")
logger.info(f"主机名: {self.hostname}")
try:
# 1. 加载配置文件
paths, config = self._load_node_config()
if not paths or not config:
logger.warning("未找到有效的日志路径配置,跳过本次执行")
return False
# 2. 检查配置是否发生变化
config_str = json.dumps(config, sort_keys=True)
current_hash = self._get_config_hash(config_str)
saved_hash = self._load_config_hash()
# 如果没有变化检查Fluent Bit状态后退出
if saved_hash and current_hash == saved_hash:
logger.info("配置文件未发生变化,跳过配置生成")
# 检查Fluent Bit状态
if not self._check_fluentbit_status():
logger.warning("Fluent Bit未运行尝试启动...")
self._reload_fluentbit()
return True
# 保存新的哈希值
self._save_config_hash(current_hash)
logger.info("配置文件发生变化,重新生成配置")
# 3. 清理之前生成的配置文件
self._clean_generated_files()
# 4. 生成新的配置文件
input_files = self._generate_input_config(paths)
output_files = self._generate_output_config()
logger.info(f"生成 {len(input_files)} 个input配置{len(output_files)} 个output配置")
# 重新加载Fluent Bit
logger.info("配置文件已更新重新加载Fluent Bit...")
if self._reload_fluentbit():
logger.info("Fluent Bit重载成功")
else:
logger.warning("Fluent Bit重载失败但配置文件已更新")
# 不论配置是否变更每次执行都检查custom视图是否存在避免视图创建失败后若配置不再变更则无法创建
# 检查kibana data view 并进行创建data view
logger.info("开始管理Kibana Data View...")
self.kibana_manager.manage_data_view()
logger.info("任务完成")
return True
except Exception as e:
logger.error(f"任务执行失败: {e}", exc_info=True)
return False
class Scheduler:
"""定时调度器"""
def __init__(self, interval_seconds=3600):
self.interval_hours = interval_seconds/3600
self.interval_seconds = interval_seconds
self.running = False
self.generator = ConfigGenerator()
def run(self):
"""执行任务"""
return self.generator.run()
def start(self):
"""启动定时调度"""
self.running = True
logger.info(f"启动定时调度器,每隔 {self.interval_hours} 小时执行一次")
try:
while self.running:
# 记录下次执行时间
next_run = datetime.now().timestamp() + self.interval_seconds
next_time_str = datetime.fromtimestamp(next_run).strftime('%Y-%m-%d %H:%M:%S')
# 执行任务
logger.info(f"开始执行定时任务 - {datetime.now()}")
self.run()
# 等待到下次执行时间
logger.info(f"下次执行时间: {next_time_str}")
# 使用递减等待
for _ in range(self.interval_seconds):
if not self.running:
break
time.sleep(1)
except Exception as e:
logger.error(f"调度器运行异常: {e}")
finally:
self.stop()
def stop(self):
"""停止调度器"""
self.running = False
logger.info("调度器已停止")
def main():
# 定时调度,设置调度周期
scheduler = Scheduler(3600)
# 启动调度器
scheduler.start()
if __name__ == "__main__":
main()

View File

@ -1,9 +1,11 @@
[SERVICE]
Daemon Off
Parsers_File parsers.conf
Parsers_File /etc/fluent-bit/parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
Hot_Reload On
Hot_Reload.Period 5
storage.path /buffers
storage.sync normal
storage.checksum on
@ -31,7 +33,7 @@
[FILTER]
Name lua
Match app.*
script inject_labels.lua
script /etc/fluent-bit/inject_labels.lua
call add_labels
@INCLUDE outputs.d/*.conf

View File

@ -9,6 +9,7 @@ export DEBIAN_FRONTEND=noninteractive
echo "[INFO] Staging fluent-bit bundle..."
rm -rf /tmp/flb && mkdir -p /tmp/flb
cp -r /private/etc /tmp/flb/
cp -r /private/app /tmp
mkdir -p /tmp/flb/packages
cp -r /private/packages/* /tmp/flb/packages/ 2>/dev/null || true
@ -75,6 +76,10 @@ echo "[INFO] Fluent Bit version:"
mkdir -p /etc/fluent-bit
cp -r /tmp/flb/etc/* /etc/fluent-bit/
# 创建定时脚本目录,并复制脚本
mkdir -p /etc/fluent-bit/app/
cp -r /tmp/app/* /etc/fluent-bit/app/
# Create logs/buffers dirs
mkdir -p /logs/train /logs/infer /buffers
@ -92,10 +97,138 @@ chmod 770 /buffers || true
# 目录属主设置为 fluent-bit不影响 1777 粘滞位)
chown -R fluent-bit:fluent-bit /logs /buffers 2>/dev/null || true
# 检查Python基础环境
echo "[INFO] Checking Python environment..."
echo "[INFO] Fixing broken dependencies..."
apt-get -f install -y -qq 2>/dev/null || true
# 检查python3是否安装
if ! command -v python3 >/dev/null 2>&1; then
echo "[INFO] Installing python3..."
apt-get update -qq || true
# 尝试安装最小化的python3不安装推荐包减少依赖冲突
apt-get install -y -qq --no-install-recommends python3-minimal 2>/dev/null || {
echo "[WARN] Failed to install python3 via apt, trying alternative approach..."
# 如果apt安装失败检查是否已有python3二进制文件
if [ -f /usr/bin/python3 ] && python3 --version 2>/dev/null; then
echo "[INFO] Python3 binary exists but not in PATH"
fi
}
fi
# 检查pip是否可用
if ! command -v pip3 >/dev/null 2>&1; then
# 尝试安装pip
echo "[INFO] Installing pip..."
apt-get update -qq || true
apt-get install -y -qq python3-pip 2>/dev/null || true
fi
# 检查Python脚本依赖的包
check_python_packages() {
local missing_packages=""
# 根据脚本内容确定需要的包
local required_packages=(
"requests" # HTTP请求库脚本中最关键的依赖
)
# 检查requests是否安装必须
if ! python3 -c "import requests" 2>/dev/null; then
echo "[WARN] 'requests' package not found, installing..."
if command -v pip3 >/dev/null 2>&1; then
# 尝试使用清华镜像源
PIP_INDEX_URL="https://pypi.tuna.tsinghua.edu.cn/simple"
echo "[INFO] Installing requests using pip with mirror..."
pip3 install --no-cache-dir --index-url "$PIP_INDEX_URL" requests 2>/dev/null || {
# 如果镜像失败,尝试官方源
echo "[INFO] Trying official PyPI source..."
pip3 install --no-cache-dir requests 2>/dev/null || {
echo "[ERROR] Failed to install requests via pip"
return 1
}
}
else
echo "[ERROR] pip3 not available, cannot install requests"
return 1
fi
fi
# 检查其他必需包
for pkg in "${required_packages[@]}"; do
if ! python3 -c "import $pkg" 2>/dev/null; then
missing_packages="$missing_packages $pkg"
fi
done
echo "$missing_packages"
}
# 安装Python包依赖
echo "[INFO] Installing Python package dependencies..."
missing_py_deps=$(check_python_packages)
if [[ -n "$missing_py_deps" ]]; then
echo "[INFO] Installing missing Python packages:$missing_py_deps"
# 如果还有缺失的包,尝试安装
if command -v pip3 >/dev/null 2>&1; then
for pkg in $missing_py_deps; do
pip3 install --no-cache-dir "$pkg" 2>/dev/null || true
done
fi
fi
# 验证Python脚本依赖
echo "[INFO] Verifying Python dependencies..."
if python3 -c "import requests; import json; import time; import logging; import hashlib; import subprocess; from datetime import datetime; from pathlib import Path; import re; import socket; print('All required modules imported successfully')" 2>/dev/null; then
echo "[INFO] Python dependencies verified successfully"
else
echo "[WARN] Some Python modules may be missing"
# 尝试查看具体缺少什么
python3 -c "
import sys
modules = ['requests', 'json', 'time', 'logging', 'hashlib', 'subprocess', 'datetime', 'pathlib', 're', 'socket']
missing = []
for module in modules:
try:
__import__(module)
print(f'✓ {module}')
except ImportError as e:
print(f'✗ {module}: {e}')
missing.append(module)
if missing:
sys.exit(1)
" 2>/dev/null || true
fi
# 查找并运行Python脚本
echo "[INFO] Looking for Python configuration generator script..."
PYTHON_SCRIPT="/etc/fluent-bit/app/update_fluent_config.py"
if [ -f "$PYTHON_SCRIPT" ]; then
echo "[INFO] Found Python script: $PYTHON_SCRIPT"
# 检查脚本是否可执行,如果不可执行则添加执行权限
if [ ! -x "$PYTHON_SCRIPT" ]; then
echo "[INFO] Adding execute permission to Python script..."
chmod +x "$PYTHON_SCRIPT" 2>/dev/null || true
fi
# 运行Python脚本后台运行
echo "[INFO] Starting configuration generator script..."
python3 "$PYTHON_SCRIPT" &
CONFIG_GEN_PID=$!
echo "[INFO] Configuration generator started with PID: $CONFIG_GEN_PID"
else
echo "[WARN] Python configuration generator script not found"
fi
# Wait for Elasticsearch via bash /dev/tcp to avoid curl dependency
echo "[INFO] Waiting for Elasticsearch to be ready (tcp ${ES_HOST}:${ES_PORT})..."
for i in $(seq 1 120); do
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT}; then
if exec 3<>/dev/tcp/${ES_HOST}/${ES_PORT} 2>/dev/null; then
exec 3<&- 3>&-
echo "[INFO] Elasticsearch is ready"
break
@ -104,6 +237,10 @@ for i in $(seq 1 120); do
sleep 1
done
echo "[INFO] Command: starting python task..."
# 启动配置生成器
#exec nohup python3 /etc/fluent-bit/app/update_fluent_config.py &
echo "[INFO] Starting Fluent Bit with configuration from /etc/fluent-bit/"
echo "[INFO] Command: /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf"
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf
exec /opt/fluent-bit/bin/fluent-bit --config=/etc/fluent-bit/fluent-bit.conf