from __future__ import annotations import json import os import signal import subprocess import sys import time from pathlib import Path from typing import Any, Dict import pytest import yaml from cryptography.fernet import Fernet @pytest.mark.http_e2e def test_exporter_http_end_to_end(tmp_path) -> None: """ 端到端集成测试: - 使用独立子进程启动 `python -m exporter.main`; - 通过 HTTP 访问 /healthz, /api/v1/devices, /metrics; - 验证配置加载、SQLite 初始化、API 路由与 Prometheus 输出整体链路可用。 """ # 1. 生成最小可用 config.yaml key = Fernet.generate_key().decode() db_path = tmp_path / "devices.db" cfg_path = tmp_path / "config.yaml" config: Dict[str, Any] = { "global": { "http_listen": "127.0.0.1:19100", "scrape_interval_seconds": 2, "rpc_timeout_seconds": 2, "shutdown_timeout_seconds": 10, "runtime_db_path": str(db_path), "password_secret": key, "api_token": "changeme", }, "devices": [], } cfg_path.write_text(yaml.safe_dump(config), encoding="utf-8") # 2. 启动 exporter 子进程 root = Path(__file__).resolve().parents[1] env = os.environ.copy() src_path = str(root / "src") pythonpath = env.get("PYTHONPATH") env["PYTHONPATH"] = src_path if not pythonpath else os.pathsep.join([src_path, pythonpath]) proc = subprocess.Popen( [sys.executable, "-m", "exporter.main", "--config", str(cfg_path)], cwd=str(root), env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) base_url = "http://127.0.0.1:19100" def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None): import http.client conn = http.client.HTTPConnection("127.0.0.1", 19100, timeout=5) try: conn.request(method, path, body=body, headers=headers or {}) resp = conn.getresponse() data = resp.read() return resp.status, resp.getheaders(), data finally: conn.close() try: # 3. 等待 HTTP server 启动(轮询 /healthz) deadline = time.time() + 30 last_stdout = "" while True: if time.time() > deadline: proc.terminate() out, err = proc.communicate(timeout=5) raise AssertionError(f"Exporter did not start in time.\nSTDOUT:\n{out}\nSTDERR:\n{err}") if proc.poll() is not None: out, err = proc.communicate(timeout=5) raise AssertionError(f"Exporter exited early with code {proc.returncode}.\nSTDOUT:\n{out}\nSTDERR:\n{err}") try: status, _, data = _http_request("/healthz") if status == 200: health = json.loads(data.decode("utf-8")) assert health.get("status") == "ok" break except Exception: # server 可能尚未 ready,稍后重试 time.sleep(0.5) # 4. 通过 API 注册一个 runtime 设备 device_payload = { "name": "e2e-device-1", "host": "192.0.2.10", "port": 830, "username": "netconf_user", "password": "secret", "enabled": True, } headers = { "Content-Type": "application/json", "X-API-Token": "changeme", } status, _, data = _http_request( "/api/v1/devices", method="POST", body=json.dumps(device_payload).encode("utf-8"), headers=headers, ) assert status == 201, f"unexpected status for POST /api/v1/devices: {status}, body={data!r}" body_json = json.loads(data.decode("utf-8")) assert body_json["name"] == "e2e-device-1" assert body_json["source"] == "runtime" # 5. 访问 /metrics,验证 Prometheus 输出可用 status, headers_list, data = _http_request("/metrics") assert status == 200 header_dict = {k.lower(): v for k, v in headers_list} assert "text/plain" in header_dict.get("content-type", "") text = data.decode("utf-8") assert "# HELP netconf_scrape_success" in text finally: # 6. 优雅关闭 exporter 进程 if proc.poll() is None: try: proc.send_signal(signal.SIGTERM) except Exception: proc.terminate() try: proc.wait(timeout=20) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=10) @pytest.mark.http_e2e @pytest.mark.h3c_live def test_exporter_http_with_h3c_device(tmp_path) -> None: """ 带真实 H3C 设备的端到端测试: - 使用 run_yangcli*.sh 中的 H3C 连接参数; - 启动完整 exporter.main(包含 Scraper 和 HTTP server); - 静态配置一台 H3C 设备; - 等待至少一轮采集后,通过 /metrics 检查该设备的健康指标是否存在。 """ # 为避免依赖其他测试模块,这里重复读取 H3C 连接参数 import socket host = os.getenv("H3C_NETCONF_HOST", "127.0.0.1") port = int(os.getenv("H3C_NETCONF_PORT", "8830")) user = os.getenv("H3C_NETCONF_USER", "netconf_user") password = os.getenv("H3C_NETCONF_PASSWORD", "") def _can_connect_local(h: str, p: int, timeout: float = 2.0) -> bool: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.settimeout(timeout) s.connect((h, p)) return True except OSError: return False finally: s.close() if not _can_connect_local(host, port): pytest.skip(f"H3C NETCONF {host}:{port} 不可达,跳过 H3C HTTP E2E 测试") key = Fernet.generate_key().decode() db_path = tmp_path / "devices.db" cfg_path = tmp_path / "config_h3c.yaml" config: Dict[str, Any] = { "global": { "http_listen": "127.0.0.1:19101", "scrape_interval_seconds": 5, "rpc_timeout_seconds": 10, "shutdown_timeout_seconds": 20, "runtime_db_path": str(db_path), "password_secret": key, "api_token": "changeme-h3c", }, "devices": [ { "name": "h3c-live-1", "host": host, "port": port, "username": user, "password": password, "enabled": True, } ], } cfg_path.write_text(yaml.safe_dump(config), encoding="utf-8") root = Path(__file__).resolve().parents[1] env = os.environ.copy() src_path = str(root / "src") pythonpath = env.get("PYTHONPATH") env["PYTHONPATH"] = src_path if not pythonpath else os.pathsep.join([src_path, pythonpath]) proc = subprocess.Popen( [sys.executable, "-m", "exporter.main", "--config", str(cfg_path)], cwd=str(root), env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None): import http.client conn = http.client.HTTPConnection("127.0.0.1", 19101, timeout=10) try: conn.request(method, path, body=body, headers=headers or {}) resp = conn.getresponse() data = resp.read() return resp.status, resp.getheaders(), data finally: conn.close() try: # 等待 server 起 deadline = time.time() + 60 while True: if time.time() > deadline: proc.terminate() out, err = proc.communicate(timeout=5) raise AssertionError(f"Exporter (H3C E2E) did not start in time.\nSTDOUT:\n{out}\nSTDERR:\n{err}") if proc.poll() is not None: out, err = proc.communicate(timeout=5) raise AssertionError(f"Exporter (H3C E2E) exited early with code {proc.returncode}.\nSTDOUT:\n{out}\nSTDERR:\n{err}") try: status, _, data = _http_request("/healthz") if status == 200: break except Exception: time.sleep(0.5) # 等待至少一轮 Scraper(scrape_interval_seconds=5) time.sleep(7) # 检查 /metrics 可访问且包含健康指标定义 status, _, data = _http_request("/metrics") assert status == 200 text = data.decode("utf-8") assert "# HELP netconf_scrape_success" in text finally: if proc.poll() is None: try: proc.send_signal(signal.SIGTERM) except Exception: proc.terminate() try: proc.wait(timeout=30) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=10)