diff --git a/.gitignore b/.gitignore index 91b2fe2..763332d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,7 @@ specs/ __pycache__ +.coverage +.coveragerc + diff --git a/README.md b/README.md index 0272b73..eb6c40b 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,45 @@ PYTHONPATH=src .venv/bin/pytest -q -m "http_e2e" --- -## 3. 配置 H3C NETCONF 访问参数(.env) +--- + +## 3. 测试覆盖率统计 + +项目已集成 `pytest-cov`,并在根目录提供了 `.coveragerc`(仅统计 `src/exporter/` 下源码,忽略 `tests/` 与 `.venv/`)。 + +获取覆盖率统计的推荐命令: + +```bash +cd /home/yuyr/dev/switch_lab/netconf_exporter + +PYTHONPATH=src .venv/bin/pytest \ + --cov=exporter \ + --cov-report=term-missing \ + -q +``` + +说明: + +- `--cov=exporter`:统计 `exporter` 包(即 `src/exporter/`)的覆盖率; +- `--cov-report=term-missing`:在终端打印每个文件未覆盖的行,方便补测试; +- `.coveragerc` 中已开启 `branch = True`,统计分支覆盖率。 + +若需要 HTML 报告,便于在浏览器中查看: + +```bash +PYTHONPATH=src .venv/bin/pytest \ + --cov=exporter \ + --cov-report=html \ + -q + +# 生成的报告在 htmlcov/index.html +``` + +用浏览器打开 `htmlcov/index.html` 即可查看详细覆盖率情况。 + +--- + +## 4. 配置 H3C NETCONF 访问参数(.env) 为了方便本地联调 H3C 设备,本项目支持从 `.env` 文件中加载 H3C 连接参数。`tests/conftest.py` 会在 pytest 启动时自动读取 `.env`。 @@ -90,7 +128,7 @@ H3C_NETCONF_PASSWORD='NASPLab123!' --- -## 4. 编辑配置文件 config.yaml +## 5. 编辑配置文件 config.yaml Exporter 在启动时从 `config.yaml` 中加载全局配置和静态设备列表。典型最小配置示例如下(仅含全局配置,设备通过 HTTP API 注册): @@ -131,7 +169,7 @@ devices: [] # 静态设备先留空,通过 API --- -## 5. 启动 Exporter HTTP Server +## 6. 启动 Exporter HTTP Server 虚拟环境里,使用如下命令启动服务: @@ -166,7 +204,7 @@ curl -s http://127.0.0.1:19100/healthz --- -## 6. 通过 curl 注册 H3C 设备(runtime device) +## 7. 通过 curl 注册 H3C 设备(runtime device) 假设已经准备好 H3C 的 NETCONF 代理: @@ -217,7 +255,7 @@ curl -s -H "X-API-Token: changeme" http://127.0.0.1:19100/api/v1/devices --- -## 7. 通过 curl 获取 Prometheus 指标 +## 8. 通过 curl 获取 Prometheus 指标 Scraper 线程会按 `global.scrape_interval_seconds` 周期性访问所有启用的设备,通过 NETCONF `` 拉取 transceiver/channel 数据,并写入内存缓存。 @@ -255,7 +293,7 @@ transceiver_channel_tx_power_dbm{device="h3c-live-1",port="1/0/1",channel="1/0/1 --- -## 8. 删除 runtime 设备 +## 9. 删除 runtime 设备 若需删除通过 API 注册的 H3C 设备: @@ -269,7 +307,7 @@ curl -s -X DELETE \ --- -## 9. 关停 Exporter +## 10. 关停 Exporter 在运行 `exporter.main` 的终端中按 `Ctrl+C`: @@ -284,4 +322,3 @@ Exporter 本身不持久化运行时状态,只有: - `devices.db`:运行时注册的设备列表(已加密的密码)。 因此重启 Exporter 不会影响 H3C 设备,只会重新加载配置并恢复运行时设备列表。 - diff --git a/tests/__pycache__/test_http_e2e_exporter.cpython-312-pytest-9.0.1.pyc b/tests/__pycache__/test_http_e2e_exporter.cpython-312-pytest-9.0.1.pyc index 2e90117..8e847e6 100644 Binary files a/tests/__pycache__/test_http_e2e_exporter.cpython-312-pytest-9.0.1.pyc and b/tests/__pycache__/test_http_e2e_exporter.cpython-312-pytest-9.0.1.pyc differ diff --git a/tests/test_http_e2e_exporter.py b/tests/test_http_e2e_exporter.py index d60071d..d470265 100644 --- a/tests/test_http_e2e_exporter.py +++ b/tests/test_http_e2e_exporter.py @@ -246,11 +246,13 @@ def test_exporter_http_with_h3c_device(tmp_path) -> None: # 等待至少一轮 Scraper(scrape_interval_seconds=5) time.sleep(7) - # 检查 /metrics 可访问且包含健康指标定义 + # 检查 /metrics 可访问且包含健康指标与 transceiver channel info 指标 status, _, data = _http_request("/metrics") assert status == 200 text = data.decode("utf-8") assert "# HELP netconf_scrape_success" in text + # 至少应有一条 transceiver_channel_info_info 样本(业务指标已成功导出) + assert "transceiver_channel_info_info" in text finally: if proc.poll() is None: diff --git a/tests/test_scraper_behavior.py b/tests/test_scraper_behavior.py new file mode 100644 index 0000000..eee9f6d --- /dev/null +++ b/tests/test_scraper_behavior.py @@ -0,0 +1,391 @@ +from __future__ import annotations + +import threading +import time +from types import SimpleNamespace + +from exporter.config import DeviceConfig, GlobalConfig +from exporter.models import DeviceHealthState, DeviceMetricsSnapshot +from exporter.registry import DeviceRegistry, DeviceRuntimeState +from exporter.scraper import run_one_scrape_round, scrape_device, scraper_loop + + +class DummyConnectionManager: + def __init__(self) -> None: + self.acquired = [] + self.invalidated = [] + + def acquire_session(self, cfg: DeviceConfig): + self.acquired.append(cfg.name) + return SimpleNamespace() # manager 对象对测试无关紧要 + + def mark_session_invalid(self, name: str) -> None: + self.invalidated.append(name) + + def close_all(self) -> None: # pragma: no cover - not used here + pass + + +def test_scrape_device_success_updates_cache_and_health_and_registry(): + global_cfg = GlobalConfig() + global_cfg.scrape_interval_seconds = 10 + global_cfg.failure_threshold = 3 + global_cfg.max_backoff_factor = 8 + + dev_cfg = DeviceConfig( + name="dev1", + host="1.1.1.1", + port=830, + username="u", + password="p", + ) + + registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds) + registry.register_static_device(dev_cfg) + state = registry.get_enabled_devices(time.time())[0] + + cm = DummyConnectionManager() + + # 构造一个包含简单 transceiver/channel 的 XML + xml_reply = """ + + + + + comp1 + + TRANSCEIVER + 40.0 + + + + PRESENT + H3C + SN001 + + + + 0 + + 1/0/1:1 + -2.5 + + + + + + + + + """.strip() + + def fake_get_rpc(_mgr, _flt: str) -> str: + return xml_reply + + cache: dict[str, DeviceMetricsSnapshot] = {} + health: dict[str, DeviceHealthState] = {} + + now = time.time() + scrape_device( + now, + state, + registry, + cm, + fake_get_rpc, + cache, + health, + global_cfg, + failure_threshold=global_cfg.failure_threshold, + max_backoff_factor=global_cfg.max_backoff_factor, + ) + + # cache 中应有快照,且包含一个 transceiver 和一个 channel + assert "dev1" in cache + snapshot = cache["dev1"] + assert len(snapshot.transceivers) == 1 + assert len(snapshot.channels) == 1 + + # health 状态应标记为成功 + hs = health["dev1"] + assert hs.last_scrape_success is True + assert hs.last_error_type is None + + # registry 的调度状态应更新(下次采集时间向后推进) + state_after = registry.get_enabled_devices(now + 100)[0] + assert state_after.next_scrape_at > now + + +def test_scrape_device_failure_updates_health_and_invalidates_session(monkeypatch): + global_cfg = GlobalConfig() + global_cfg.scrape_interval_seconds = 10 + global_cfg.failure_threshold = 1 + global_cfg.max_backoff_factor = 8 + + dev_cfg = DeviceConfig( + name="dev2", + host="1.1.1.2", + port=830, + username="u", + password="p", + ) + + registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds) + registry.register_static_device(dev_cfg) + state = registry.get_enabled_devices(time.time())[0] + + cm = DummyConnectionManager() + + def failing_get_rpc(_mgr, _flt: str) -> str: + raise RuntimeError("filter failed") + + cache: dict[str, DeviceMetricsSnapshot] = {} + health: dict[str, DeviceHealthState] = {} + + now = time.time() + scrape_device( + now, + state, + registry, + cm, + failing_get_rpc, + cache, + health, + global_cfg, + failure_threshold=global_cfg.failure_threshold, + max_backoff_factor=global_cfg.max_backoff_factor, + ) + + # cache 中不应有 dev2 的快照 + assert "dev2" not in cache + + # health 状态应为失败,且 error_type 为 FilterError + hs = health["dev2"] + assert hs.last_scrape_success is False + assert hs.last_error_type == "FilterError" + + # 连接应被标记为无效 + assert "dev2" in cm.invalidated + + +def test_run_one_scrape_round_invokes_scrape_for_enabled_devices(monkeypatch): + global_cfg = GlobalConfig() + global_cfg.scrape_interval_seconds = 5 + global_cfg.failure_threshold = 3 + global_cfg.max_backoff_factor = 8 + + dev_cfg = DeviceConfig( + name="dev3", + host="1.1.1.3", + port=830, + username="u", + password="p", + ) + + registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds) + registry.register_static_device(dev_cfg) + state = registry.get_enabled_devices(time.time())[0] + + cm = DummyConnectionManager() + + def fake_get_rpc(_mgr, _flt: str) -> str: + # 返回最小合法 XML + return """ + + + + + compX + TRANSCEIVER + + + + 0 + + + + + + + + + """.strip() + + cache: dict[str, DeviceMetricsSnapshot] = {} + health: dict[str, DeviceHealthState] = {} + + now = time.time() + # 调用 run_one_scrape_round,确保会调用到 scrape_device + run_one_scrape_round( + now, + registry, + cm, + fake_get_rpc, + cache, + health, + global_cfg, + failure_threshold=global_cfg.failure_threshold, + max_backoff_factor=global_cfg.max_backoff_factor, + ) + + # dev3 应该被采集一次,并产生快照 + assert "dev3" in cache + assert "dev3" in health + + +def test_scraper_loop_covers_wait_true_and_false(monkeypatch): + """ + 覆盖 scraper_loop 中 stop_event.wait 的 True/False 两个分支, + 以及 while 条件的退出分支。 + """ + global_cfg = GlobalConfig() + global_cfg.scrape_interval_seconds = 0 # 立即触发多轮调度 + + registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds) + cm = DummyConnectionManager() + + # 使用计数器控制 run_one_scrape_round 调用次数 + call_count = {"n": 0} + + def fake_get_rpc(_mgr, _flt: str) -> str: + # 返回最小合法 XML + return """ + + + + + compY + TRANSCEIVER + + + + + """.strip() + + cache: dict[str, DeviceMetricsSnapshot] = {} + health: dict[str, DeviceHealthState] = {} + + # monkeypatch run_one_scrape_round,使其在第二次调用时设置 stop_event + from exporter import scraper as scraper_mod + + real_run_one = scraper_mod.run_one_scrape_round + + def counting_run_one( + now: float, + registry_: DeviceRegistry, + connection_manager_, + netconf_get_rpc_, + cache_, + health_, + global_cfg_, + failure_threshold: int, + max_backoff_factor: int, + ): + call_count["n"] += 1 + if call_count["n"] >= 2: + # 第二次调用后设置 stop_event,确保有一次 wait 返回 False,一次返回 True + stop_event.set() + return real_run_one( + now, + registry_, + connection_manager_, + netconf_get_rpc_, + cache_, + health_, + global_cfg_, + failure_threshold, + max_backoff_factor, + ) + + monkeypatch.setattr(scraper_mod, "run_one_scrape_round", counting_run_one) + + stop_event = threading.Event() + t = threading.Thread( + target=scraper_loop, + args=(stop_event, registry, cm, fake_get_rpc, cache, health, global_cfg), + daemon=True, + ) + t.start() + t.join(timeout=5.0) + + # 至少调用了两次 run_one_scrape_round(一次 wait=False,一次 wait=True) + assert call_count["n"] >= 2 + + +def test_scrape_device_preserves_existing_health_entry(): + """ + 第二次采集同一设备时,health 字典中已存在条目,应走 device in health 分支。 + """ + global_cfg = GlobalConfig() + dev_cfg = DeviceConfig( + name="dev4", + host="1.1.1.4", + port=830, + username="u", + password="p", + ) + + registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds) + registry.register_static_device(dev_cfg) + state = registry.get_enabled_devices(time.time())[0] + + cm = DummyConnectionManager() + + xml_reply = """ + + + + + compZ + TRANSCEIVER + + + + 0 + + + + + + + + + """.strip() + + def fake_get_rpc(_mgr, _flt: str) -> str: + return xml_reply + + cache: dict[str, DeviceMetricsSnapshot] = {} + health: dict[str, DeviceHealthState] = {} + + now = time.time() + # 第一次采集,health 中还没有 dev4 + scrape_device( + now, + state, + registry, + cm, + fake_get_rpc, + cache, + health, + global_cfg, + failure_threshold=global_cfg.failure_threshold, + max_backoff_factor=global_cfg.max_backoff_factor, + ) + assert "dev4" in health + + # 第二次采集,应走 device 已存在分支 + scrape_device( + now + 1, + state, + registry, + cm, + fake_get_rpc, + cache, + health, + global_cfg, + failure_threshold=global_cfg.failure_threshold, + max_backoff_factor=global_cfg.max_backoff_factor, + ) + # health 条目仍然存在且状态为成功 + assert health["dev4"].last_scrape_success is True +