argus-netconf-exporter/tests/test_http_e2e_exporter.py

268 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Any, Dict
import pytest
import yaml
from cryptography.fernet import Fernet
@pytest.mark.http_e2e
def test_exporter_http_end_to_end(tmp_path) -> None:
"""
端到端集成测试:
- 使用独立子进程启动 `python -m exporter.main`
- 通过 HTTP 访问 /healthz, /api/v1/devices, /metrics
- 验证配置加载、SQLite 初始化、API 路由与 Prometheus 输出整体链路可用。
"""
# 1. 生成最小可用 config.yaml
key = Fernet.generate_key().decode()
db_path = tmp_path / "devices.db"
cfg_path = tmp_path / "config.yaml"
config: Dict[str, Any] = {
"global": {
"http_listen": "127.0.0.1:19100",
"scrape_interval_seconds": 2,
"rpc_timeout_seconds": 2,
"shutdown_timeout_seconds": 10,
"runtime_db_path": str(db_path),
"password_secret": key,
"api_token": "changeme",
},
"devices": [],
}
cfg_path.write_text(yaml.safe_dump(config), encoding="utf-8")
# 2. 启动 exporter 子进程
root = Path(__file__).resolve().parents[1]
env = os.environ.copy()
src_path = str(root / "src")
pythonpath = env.get("PYTHONPATH")
env["PYTHONPATH"] = src_path if not pythonpath else os.pathsep.join([src_path, pythonpath])
proc = subprocess.Popen(
[sys.executable, "-m", "exporter.main", "--config", str(cfg_path)],
cwd=str(root),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
base_url = "http://127.0.0.1:19100"
def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None):
import http.client
conn = http.client.HTTPConnection("127.0.0.1", 19100, timeout=5)
try:
conn.request(method, path, body=body, headers=headers or {})
resp = conn.getresponse()
data = resp.read()
return resp.status, resp.getheaders(), data
finally:
conn.close()
try:
# 3. 等待 HTTP server 启动(轮询 /healthz
deadline = time.time() + 30
last_stdout = ""
while True:
if time.time() > deadline:
proc.terminate()
out, err = proc.communicate(timeout=5)
raise AssertionError(f"Exporter did not start in time.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
if proc.poll() is not None:
out, err = proc.communicate(timeout=5)
raise AssertionError(f"Exporter exited early with code {proc.returncode}.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
try:
status, _, data = _http_request("/healthz")
if status == 200:
health = json.loads(data.decode("utf-8"))
assert health.get("status") == "ok"
break
except Exception:
# server 可能尚未 ready稍后重试
time.sleep(0.5)
# 4. 通过 API 注册一个 runtime 设备
device_payload = {
"name": "e2e-device-1",
"host": "192.0.2.10",
"port": 830,
"username": "netconf_user",
"password": "secret",
"enabled": True,
}
headers = {
"Content-Type": "application/json",
"X-API-Token": "changeme",
}
status, _, data = _http_request(
"/api/v1/devices",
method="POST",
body=json.dumps(device_payload).encode("utf-8"),
headers=headers,
)
assert status == 201, f"unexpected status for POST /api/v1/devices: {status}, body={data!r}"
body_json = json.loads(data.decode("utf-8"))
assert body_json["name"] == "e2e-device-1"
assert body_json["source"] == "runtime"
# 5. 访问 /metrics验证 Prometheus 输出可用
status, headers_list, data = _http_request("/metrics")
assert status == 200
header_dict = {k.lower(): v for k, v in headers_list}
assert "text/plain" in header_dict.get("content-type", "")
text = data.decode("utf-8")
assert "# HELP netconf_scrape_success" in text
finally:
# 6. 优雅关闭 exporter 进程
if proc.poll() is None:
try:
proc.send_signal(signal.SIGTERM)
except Exception:
proc.terminate()
try:
proc.wait(timeout=20)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=10)
@pytest.mark.http_e2e
@pytest.mark.h3c_live
def test_exporter_http_with_h3c_device(tmp_path) -> None:
"""
带真实 H3C 设备的端到端测试:
- 使用 run_yangcli*.sh 中的 H3C 连接参数;
- 启动完整 exporter.main包含 Scraper 和 HTTP server
- 静态配置一台 H3C 设备;
- 等待至少一轮采集后,通过 /metrics 检查该设备的健康指标是否存在。
"""
# 为避免依赖其他测试模块,这里重复读取 H3C 连接参数
import socket
host = os.getenv("H3C_NETCONF_HOST", "127.0.0.1")
port = int(os.getenv("H3C_NETCONF_PORT", "8830"))
user = os.getenv("H3C_NETCONF_USER", "netconf_user")
password = os.getenv("H3C_NETCONF_PASSWORD", "")
def _can_connect_local(h: str, p: int, timeout: float = 2.0) -> bool:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.settimeout(timeout)
s.connect((h, p))
return True
except OSError:
return False
finally:
s.close()
if not _can_connect_local(host, port):
pytest.skip(f"H3C NETCONF {host}:{port} 不可达,跳过 H3C HTTP E2E 测试")
key = Fernet.generate_key().decode()
db_path = tmp_path / "devices.db"
cfg_path = tmp_path / "config_h3c.yaml"
config: Dict[str, Any] = {
"global": {
"http_listen": "127.0.0.1:19101",
"scrape_interval_seconds": 5,
"rpc_timeout_seconds": 10,
"shutdown_timeout_seconds": 20,
"runtime_db_path": str(db_path),
"password_secret": key,
"api_token": "changeme-h3c",
},
"devices": [
{
"name": "h3c-live-1",
"host": host,
"port": port,
"username": user,
"password": password,
"enabled": True,
}
],
}
cfg_path.write_text(yaml.safe_dump(config), encoding="utf-8")
root = Path(__file__).resolve().parents[1]
env = os.environ.copy()
src_path = str(root / "src")
pythonpath = env.get("PYTHONPATH")
env["PYTHONPATH"] = src_path if not pythonpath else os.pathsep.join([src_path, pythonpath])
proc = subprocess.Popen(
[sys.executable, "-m", "exporter.main", "--config", str(cfg_path)],
cwd=str(root),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None):
import http.client
conn = http.client.HTTPConnection("127.0.0.1", 19101, timeout=10)
try:
conn.request(method, path, body=body, headers=headers or {})
resp = conn.getresponse()
data = resp.read()
return resp.status, resp.getheaders(), data
finally:
conn.close()
try:
# 等待 server 起
deadline = time.time() + 60
while True:
if time.time() > deadline:
proc.terminate()
out, err = proc.communicate(timeout=5)
raise AssertionError(f"Exporter (H3C E2E) did not start in time.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
if proc.poll() is not None:
out, err = proc.communicate(timeout=5)
raise AssertionError(f"Exporter (H3C E2E) exited early with code {proc.returncode}.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
try:
status, _, data = _http_request("/healthz")
if status == 200:
break
except Exception:
time.sleep(0.5)
# 等待至少一轮 Scraperscrape_interval_seconds=5
time.sleep(7)
# 检查 /metrics 可访问且包含健康指标与 transceiver channel info 指标
status, _, data = _http_request("/metrics")
assert status == 200
text = data.decode("utf-8")
assert "# HELP netconf_scrape_success" in text
# 至少应有一条 transceiver_channel_info_info 样本(业务指标已成功导出)
assert "transceiver_channel_info_info" in text
finally:
if proc.poll() is None:
try:
proc.send_signal(signal.SIGTERM)
except Exception:
proc.terminate()
try:
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=10)