[#2] 增加ruijie支持,注册接口增加vendor类型

This commit is contained in:
yuyr 2025-12-03 14:31:39 +08:00
parent 79afe4fbfc
commit 475bce46f6
31 changed files with 1227 additions and 39 deletions

View File

@ -160,6 +160,17 @@ global:
log_file: "" # 若非空,则写入指定文件 log_file: "" # 若非空,则写入指定文件
devices: [] # 静态设备先留空,通过 API 动态注册 devices: [] # 静态设备先留空,通过 API 动态注册
# 如需在配置文件中声明静态设备,可使用如下结构:
# devices:
# - name: h3c-static-1
# host: 192.168.1.10
# port: 830
# username: netconf_user
# password: "******"
# enabled: true
# supports_xpath: false
# vendor: "h3c" # 可选,多厂商解析时用于选择 H3C 解析策略
``` ```
注意: 注意:
@ -204,7 +215,7 @@ curl -s http://127.0.0.1:19100/healthz
--- ---
## 7. 通过 curl 注册 H3C 设备runtime device ## 7. 通过 curl 注册设备runtime device
假设已经准备好 H3C 的 NETCONF 代理: 假设已经准备好 H3C 的 NETCONF 代理:
@ -226,7 +237,9 @@ curl -s -X POST \
"username": "netconf_user", "username": "netconf_user",
"password": "NASPLab123!", "password": "NASPLab123!",
"enabled": true, "enabled": true,
"supports_xpath": false "supports_xpath": false,
"scrape_interval_seconds": null,
"vendor": "h3c"
}' \ }' \
http://127.0.0.1:19100/api/v1/devices http://127.0.0.1:19100/api/v1/devices
``` ```
@ -241,7 +254,8 @@ curl -s -X POST \
"enabled": true, "enabled": true,
"scrape_interval_seconds": null, "scrape_interval_seconds": null,
"supports_xpath": false, "supports_xpath": false,
"source": "runtime" "source": "runtime",
"vendor": "h3c"
} }
``` ```
@ -251,7 +265,43 @@ curl -s -X POST \
curl -s -H "X-API-Token: changeme" http://127.0.0.1:19100/api/v1/devices curl -s -H "X-API-Token: changeme" http://127.0.0.1:19100/api/v1/devices
``` ```
确认设备已注册(包含 `source: "runtime"`)。 确认设备已注册(包含 `source: "runtime"``vendor: "h3c"`)。
### 7.1 注册 Ruijie 设备(示例)
如果已经在 `.env` 中配置了 Ruijie 的 NETCONF 代理(例如:`RUIJIE_NETCONF_HOST=127.0.0.1``RUIJIE_NETCONF_PORT=9830` 等),可以类似地注册一个 Ruijie 设备:
```bash
curl -s -X POST \
-H "Content-Type: application/json" \
-H "X-API-Token: changeme" \
-d '{
"name": "ruijie-live-1",
"host": "127.0.0.1",
"port": 9830,
"username": "ruijie1-admin",
"password": "******",
"enabled": true,
"supports_xpath": false,
"vendor": " Ruijie "
}' \
http://127.0.0.1:19100/api/v1/devices
```
API 会自动将 `vendor` 规范化为小写、去掉首尾空格,返回类似:
```json
{
"name": "ruijie-live-1",
"host": "127.0.0.1",
"port": 9830,
"enabled": true,
"scrape_interval_seconds": null,
"supports_xpath": false,
"source": "runtime",
"vendor": "ruijie"
}
```
--- ---

View File

@ -0,0 +1,300 @@
./run_yangcli.sh "sget /oc-platform:components/oc-platform:component/oc-transceiver:transceiver"
rpc-reply {
data {
components {
component TRANSCEIVER-1/0/1-Te0/1 {
name TRANSCEIVER-1/0/1-Te0/1
transceiver {
config {
enabled true
}
state {
enabled true
present NOT_PRESENT
}
}
}
component TRANSCEIVER-1/0/2-Te0/2 {
name TRANSCEIVER-1/0/2-Te0/2
transceiver {
config {
enabled true
}
state {
enabled true
present NOT_PRESENT
}
}
}
component {
name TRANSCEIVER-1/0/129-FH0/1:1
transceiver {
config {
enabled true
}
state {
enabled true
present PRESENT
form-factor openconfig-transport-types:QSFP112
connector-type oc-opt-types:MPO_CONNECTOR
vendor H3C
vendor-part EQ854HG01M3-H3C
vendor-rev 03
ethernet-pmd oc-opt-types:ETH_UNDEFINED
serial-no G80231AM995701FK
date-code 2025-07-09T00:00:00Z-08:00
supply-voltage {
instant 3.31
}
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
voltage {
instant 3.31
}
}
physical-channels {
channel 1 {
index 1
state {
index 1
description TRANSCEIVER-1/0/129/1-FH0/1:1
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 2 {
index 2
state {
index 2
description TRANSCEIVER-1/0/129/2-FH0/1:1
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 3 {
index 3
state {
index 3
description TRANSCEIVER-1/0/129/3-FH0/1:1
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 4 {
index 4
state {
index 4
description TRANSCEIVER-1/0/129/4-FH0/1:1
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
}
}
}
component {
name TRANSCEIVER-1/0/130-FH0/1:2
transceiver {
config {
enabled true
}
state {
enabled true
present PRESENT
form-factor openconfig-transport-types:QSFP112
connector-type oc-opt-types:MPO_CONNECTOR
vendor H3C
vendor-part EQ854HG01M3-H3C
vendor-rev 03
ethernet-pmd oc-opt-types:ETH_UNDEFINED
serial-no G80231AM995701FK
date-code 2025-07-09T00:00:00Z-08:00
supply-voltage {
instant 3.31
}
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
voltage {
instant 3.31
}
}
physical-channels {
channel 1 {
index 1
state {
index 1
description TRANSCEIVER-1/0/130/1-FH0/1:2
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 2 {
index 2
state {
index 2
description TRANSCEIVER-1/0/130/2-FH0/1:2
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 3 {
index 3
state {
index 3
description TRANSCEIVER-1/0/130/3-FH0/1:2
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
channel 4 {
index 4
state {
index 4
description TRANSCEIVER-1/0/130/4-FH0/1:2
output-power {
instant -40.00
}
input-power {
instant 1.50
}
laser-bias-current {
instant 0.0
}
}
}
}
}
}
component {
name TRANSCEIVER-1/0/131-FH0/2:1
transceiver {
config {
enabled true
}
state {
enabled true
present PRESENT
form-factor openconfig-transport-types:QSFP112
connector-type oc-opt-types:MPO_CONNECTOR
vendor H3C
vendor-part EQ854HG01M3-H3C
vendor-rev 03
ethernet-pmd oc-opt-types:ETH_UNDEFINED
serial-no G80231AM995701J8
date-code 2025-07-09T00:00:00Z-08:00
supply-voltage {
instant 3.32
}
output-power {
instant -40.00
}
input-power {
instant 1.47
}
laser-bias-current {
instant 0.0
}
voltage {
instant 3.32
}
}
physical-channels {
channel 1 {
index 1
state {
index 1
description TRANSCEIVER-1/0/131/1-FH0/2:1
output-power {
instant -40.00
}
input-power {
instant 1.47
}
laser-bias-current {
instant 0.0
}
}
}
channel 2 {
index 2
state {
index 2
description TRANSCEIVER-1/0/131/2-FH0/2:1
output-power {
instant -40.00
}
input-power {
instant 1.47
}
laser-bias-current {
instant 0.0
}
}
}
channel 3 {
index 3
state {

View File

@ -2,3 +2,4 @@
markers = markers =
h3c_live: tests that talk to a live H3C NETCONF device h3c_live: tests that talk to a live H3C NETCONF device
http_e2e: end-to-end tests that start the full HTTP server in a subprocess http_e2e: end-to-end tests that start the full HTTP server in a subprocess
ruijie_live: tests that talk to a live Ruijie NETCONF device

View File

@ -0,0 +1,26 @@
curl -s -X POST -H "Content-Type: application/json" -H "X-API-Token: changeme" -d '{
"name": "h3c-live-1",
"host": "127.0.0.1",
"port": 8830,
"username": "netconf_user",
"password": "NASPLab123!",
"enabled": true,
"supports_xpath": false,
"scrape_interval_seconds": null,
"vendor": "h3c"
}' http://127.0.0.1:19100/api/v1/devices
curl -s -X POST -H "Content-Type: application/json" -H "X-API-Token: changeme" -d '{
"name": "ruijie-live-1",
"host": "127.0.0.1",
"port": 9830,
"username": "ruijie1-admin",
"password": "1qw2#ER$_ruijie",
"enabled": true,
"supports_xpath": false,
"vendor": " Ruijie "
}' http://127.0.0.1:19100/api/v1/devices

View File

@ -1,2 +1,6 @@
# 本地8830 转发到h3c交换机830端口经过c1服务器 # 本地8830 转发到h3c交换机830端口经过c1服务器
ssh -L 8830:192.168.19.11:830 yuyr@c1 # 本地9830 转发到锐捷交换机830端口经过c1服务器
ssh -L 8830:192.168.19.11:830 \
-L 9830:192.168.19.152:830 \
yuyr@c1

View File

@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import sqlite3
from fastapi import Depends, FastAPI, Header, HTTPException, status from fastapi import Depends, FastAPI, Header, HTTPException, status
from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.responses import JSONResponse, PlainTextResponse
from pydantic import BaseModel from pydantic import BaseModel, field_validator
from prometheus_client import CollectorRegistry, generate_latest from prometheus_client import CollectorRegistry, generate_latest
from .config import DeviceConfig, GlobalConfig from .config import DeviceConfig, GlobalConfig
@ -22,6 +23,15 @@ class DeviceIn(BaseModel):
enabled: bool = True enabled: bool = True
supports_xpath: bool = False supports_xpath: bool = False
scrape_interval_seconds: Optional[int] = None scrape_interval_seconds: Optional[int] = None
vendor: Optional[str] = None
@field_validator("vendor")
@classmethod
def normalize_vendor(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return None
v_str = str(v).strip().lower()
return v_str or None
class DeviceOut(BaseModel): class DeviceOut(BaseModel):
@ -32,6 +42,7 @@ class DeviceOut(BaseModel):
scrape_interval_seconds: Optional[int] scrape_interval_seconds: Optional[int]
supports_xpath: bool supports_xpath: bool
source: str source: str
vendor: Optional[str]
def _require_token( def _require_token(
@ -99,6 +110,7 @@ def create_app(
scrape_interval_seconds=d.scrape_interval_seconds, scrape_interval_seconds=d.scrape_interval_seconds,
supports_xpath=d.supports_xpath, supports_xpath=d.supports_xpath,
source=d.source, source=d.source,
vendor=d.vendor,
) )
for d in devices for d in devices
] ]
@ -127,6 +139,7 @@ def create_app(
enabled=device.enabled, enabled=device.enabled,
scrape_interval_seconds=device.scrape_interval_seconds, scrape_interval_seconds=device.scrape_interval_seconds,
supports_xpath=device.supports_xpath, supports_xpath=device.supports_xpath,
vendor=device.vendor,
source="runtime", source="runtime",
) )
# 持久化并注册到 registry # 持久化并注册到 registry
@ -148,6 +161,7 @@ def create_app(
scrape_interval_seconds=cfg.scrape_interval_seconds, scrape_interval_seconds=cfg.scrape_interval_seconds,
supports_xpath=cfg.supports_xpath, supports_xpath=cfg.supports_xpath,
source=cfg.source, source=cfg.source,
vendor=cfg.vendor,
) )
@app.delete( @app.delete(
@ -178,4 +192,3 @@ def create_app(
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None) return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None)
return app return app

View File

@ -49,6 +49,8 @@ class DeviceConfig:
enabled: bool = True enabled: bool = True
scrape_interval_seconds: Optional[int] = None scrape_interval_seconds: Optional[int] = None
supports_xpath: bool = False supports_xpath: bool = False
# 设备厂商标识,例如 "h3c"、"ruijie"、"huawei" 等;若未设置则为 None
vendor: Optional[str] = None
source: str = "static" # "static" | "runtime" source: str = "static" # "static" | "runtime"
@ -100,6 +102,12 @@ class Config:
def _load_devices(raw_list: list[dict[str, Any]]) -> List[DeviceConfig]: def _load_devices(raw_list: list[dict[str, Any]]) -> List[DeviceConfig]:
devices: List[DeviceConfig] = [] devices: List[DeviceConfig] = []
for raw in raw_list: for raw in raw_list:
raw_vendor = raw.get("vendor")
vendor: Optional[str]
if raw_vendor is None:
vendor = None
else:
vendor = str(raw_vendor).strip().lower() or None
dev = DeviceConfig( dev = DeviceConfig(
name=str(raw["name"]), name=str(raw["name"]),
host=str(raw["host"]), host=str(raw["host"]),
@ -109,6 +117,7 @@ class Config:
enabled=bool(raw.get("enabled", True)), enabled=bool(raw.get("enabled", True)),
scrape_interval_seconds=raw.get("scrape_interval_seconds"), scrape_interval_seconds=raw.get("scrape_interval_seconds"),
supports_xpath=bool(raw.get("supports_xpath", False)), supports_xpath=bool(raw.get("supports_xpath", False)),
vendor=vendor,
source="static", source="static",
) )
devices.append(dev) devices.append(dev)
@ -128,4 +137,3 @@ class Config:
UserWarning, UserWarning,
stacklevel=2, stacklevel=2,
) )

View File

@ -1,5 +1,8 @@
from __future__ import annotations from __future__ import annotations
import logging
import re
import threading
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import Iterable, List, Tuple from typing import Iterable, List, Tuple
@ -15,6 +18,20 @@ NS = {
} }
logger = logging.getLogger(__name__)
_RE_RUIJIE_CHANNEL = re.compile(
r"^TRANSCEIVER-(?P<prefix>.+)/(?P<ch>\d+)-(?P<ifname>.+):(?P<subport>\d+)$"
)
_RE_RUIJIE_COMPONENT = re.compile(
r"^TRANSCEIVER-(?P<prefix>.+?)-(?P<ifname>.+):(?P<subport>\d+)$"
)
_ruijie_warning_issued: set[str] = set()
_ruijie_warning_lock = threading.Lock()
def build_transceiver_filter() -> str: def build_transceiver_filter() -> str:
""" """
构造 subtree filter XML 片段不包含外层 <filter> 元素 构造 subtree filter XML 片段不包含外层 <filter> 元素
@ -30,17 +47,12 @@ def build_transceiver_filter() -> str:
) )
def parse_port_and_channel( def _parse_port_channel_h3c_or_default(
description: str | None, description: str | None,
component_name: str, component_name: str,
channel_index: int, channel_index: int,
) -> Tuple[str, str]: ) -> Tuple[str, str]:
""" """H3C 及默认设备的端口/通道解析策略."""
description 中解析 (logical_port, logical_channel)并在异常时提供安全 fallback
- 正常格式: "1/0/66:1" -> ("1/0/66", "1/0/66:1")
- description 为空/缺失: 使用 (component_name, f"{component_name}:ch{index}")
- 其他格式: logical_port = description; logical_channel = f"{description}:ch{index}"
"""
if not description: if not description:
logical_port = component_name logical_port = component_name
logical_channel = f"{component_name}:ch{channel_index}" logical_channel = f"{component_name}:ch{channel_index}"
@ -60,6 +72,91 @@ def parse_port_and_channel(
return logical_port, logical_channel return logical_port, logical_channel
def _parse_port_channel_ruijie(
description: str | None,
component_name: str,
channel_index: int,
device_name: str | None = None,
) -> Tuple[str, str]:
"""Ruijie 设备的端口/通道解析策略."""
if not description:
return _parse_port_channel_h3c_or_default(description, component_name, channel_index)
m = _RE_RUIJIE_CHANNEL.match(description)
if not m:
# 不符合 Ruijie 预期模式,退回默认策略
return _parse_port_channel_h3c_or_default(description, component_name, channel_index)
ch_from_desc = int(m.group("ch"))
ifname = m.group("ifname")
subport = m.group("subport")
# 若 description 中的 ch 与 XML index 不一致,则记录 warning便于定位数据异常
if ch_from_desc != channel_index:
logger.warning(
"Ruijie channel index mismatch: description='%s' (ch=%d, subport=%s), xml_index=%d",
description,
ch_from_desc,
subport,
channel_index,
extra={"device": device_name or "-"},
)
logical_port = ifname or component_name
logical_channel = f"{logical_port}:{channel_index}"
return logical_port, logical_channel
def parse_transceiver_port_from_component_name(
component_name: str,
vendor: str | None,
) -> str:
"""根据 component_name 与厂商信息解析 transceiver 的 logical_port。"""
vendor_norm = (vendor or "").strip().lower()
if vendor_norm == "ruijie":
m = _RE_RUIJIE_COMPONENT.match(component_name)
if m:
ifname = m.group("ifname")
return ifname or component_name
# 默认/H3C尝试提取形如 "1/0/1" 的端口模式
m = re.search(r"\d+/\d+/\d+", component_name)
if m:
return m.group(0)
return component_name
def parse_port_and_channel(
description: str | None,
component_name: str,
channel_index: int,
vendor: str | None = None,
device_name: str | None = None,
) -> Tuple[str, str]:
"""
description 中解析 (logical_port, logical_channel)并在异常时提供安全 fallback
- H3C/默认: 与现有逻辑保持一致
- Ruijie: 使用专用正则解析 TRANSCEIVER- 前缀结构
"""
vendor_norm = (vendor or "").strip().lower()
if vendor_norm in ("", "h3c"):
return _parse_port_channel_h3c_or_default(description, component_name, channel_index)
if vendor_norm == "ruijie":
return _parse_port_channel_ruijie(description, component_name, channel_index, device_name)
# 未知厂商:给出 warning回退到 H3C 默认策略
logger.warning(
"Unknown vendor '%s' for device, using default H3C strategy",
vendor,
extra={"device": device_name or "-"},
)
return _parse_port_channel_h3c_or_default(description, component_name, channel_index)
def _get_text(elem: ET.Element | None) -> str | None: def _get_text(elem: ET.Element | None) -> str | None:
if elem is None: if elem is None:
return None return None
@ -83,12 +180,32 @@ def _parse_float(elem: ET.Element | None) -> float | None:
def parse_netconf_response( def parse_netconf_response(
xml_str: str, xml_str: str,
device_name: str, device_name: str,
vendor: str | None = None,
) -> Tuple[List[TransceiverRecord], List[TransceiverChannelRecord]]: ) -> Tuple[List[TransceiverRecord], List[TransceiverChannelRecord]]:
""" """
解析 NETCONF `<get>` RPC 返回的 XML生成 transceiver channel 记录 解析 NETCONF `<get>` RPC 返回的 XML生成 transceiver channel 记录
""" """
root = ET.fromstring(xml_str) root = ET.fromstring(xml_str)
# vendor 参数表示“设备厂商”,用于端口/通道解析策略,
# 不应与 transceiver 模块自身的 vendor 字段混用。
device_vendor = vendor
device_vendor_norm = (device_vendor or "").strip().lower()
# 对疑似 Ruijie 但 vendor 未显式设置的情况给出一次性 warning线程安全
if device_vendor_norm in ("", "h3c"):
with _ruijie_warning_lock:
if device_name not in _ruijie_warning_issued:
if "TRANSCEIVER-" in xml_str and re.search(r"TRANSCEIVER-\d+/\d+/\d+", xml_str):
logger.warning(
"Device '%s' response looks like Ruijie (TRANSCEIVER-*), "
"but vendor is not set to 'ruijie'. Using default H3C parsing strategy; "
"labels may be suboptimal.",
device_name,
extra={"device": device_name},
)
_ruijie_warning_issued.add(device_name)
tx_records: List[TransceiverRecord] = [] tx_records: List[TransceiverRecord] = []
ch_records: List[TransceiverChannelRecord] = [] ch_records: List[TransceiverChannelRecord] = []
@ -111,7 +228,7 @@ def parse_netconf_response(
present = _get_text( present = _get_text(
tx_state.find("oc-transceiver:present", NS) if tx_state is not None else None tx_state.find("oc-transceiver:present", NS) if tx_state is not None else None
) )
vendor = _get_text( module_vendor = _get_text(
tx_state.find("oc-transceiver:vendor", NS) if tx_state is not None else None tx_state.find("oc-transceiver:vendor", NS) if tx_state is not None else None
) )
serial = _get_text( serial = _get_text(
@ -149,7 +266,7 @@ def parse_netconf_response(
) )
channel_elems: Iterable[ET.Element] = comp.findall(channels_path, NS) channel_elems: Iterable[ET.Element] = comp.findall(channels_path, NS)
# logical_port 以第一个 channel 的 description 为主fallback 到 component_name # logical_port 以第一个成功解析的 channel 为主fallback 到 component_name
logical_port_for_tx: str | None = None logical_port_for_tx: str | None = None
for ch in channel_elems: for ch in channel_elems:
@ -166,7 +283,11 @@ def parse_netconf_response(
desc_elem = ch.find("oc-transceiver:state/oc-transceiver:description", NS) desc_elem = ch.find("oc-transceiver:state/oc-transceiver:description", NS)
description = _get_text(desc_elem) description = _get_text(desc_elem)
logical_port, logical_channel = parse_port_and_channel( logical_port, logical_channel = parse_port_and_channel(
description, component_name, ch_index description,
component_name,
ch_index,
vendor=device_vendor,
device_name=device_name,
) )
if logical_port_for_tx is None: if logical_port_for_tx is None:
logical_port_for_tx = logical_port logical_port_for_tx = logical_port
@ -208,7 +329,12 @@ def parse_netconf_response(
) )
# transceiver record逻辑端口 # transceiver record逻辑端口
logical_port_tx = logical_port_for_tx or component_name if logical_port_for_tx is None:
logical_port_tx = parse_transceiver_port_from_component_name(
component_name, device_vendor
)
else:
logical_port_tx = logical_port_for_tx
tx_records.append( tx_records.append(
TransceiverRecord( TransceiverRecord(
@ -219,7 +345,7 @@ def parse_netconf_response(
oper_status=oper_status, oper_status=oper_status,
temperature_c=temperature_c, temperature_c=temperature_c,
supply_voltage_v=supply_voltage_v, supply_voltage_v=supply_voltage_v,
vendor=vendor, vendor=module_vendor,
serial=serial, serial=serial,
part_number=part_number, part_number=part_number,
hardware_rev=hardware_rev, hardware_rev=hardware_rev,

View File

@ -112,7 +112,11 @@ def scrape_device(
# 构造 filter 并调用外部提供的 RPC 函数 # 构造 filter 并调用外部提供的 RPC 函数
flt = build_transceiver_filter() flt = build_transceiver_filter()
xml_reply = netconf_get_rpc(mgr, flt) xml_reply = netconf_get_rpc(mgr, flt)
tx_records, ch_records = parse_netconf_response(xml_reply, device) tx_records, ch_records = parse_netconf_response(
xml_reply,
device,
vendor=state.cfg.vendor,
)
snapshot = DeviceMetricsSnapshot( snapshot = DeviceMetricsSnapshot(
device=device, device=device,

View File

@ -41,7 +41,7 @@ class SQLiteDeviceStore:
lock: threading.Lock = field(default_factory=threading.Lock, repr=False) lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def init_db(self) -> None: def init_db(self) -> None:
"""初始化 DB设置 WAL 模式并创建 devices 表.""" """初始化 DB设置 WAL 模式并创建/更新 devices 表."""
conn = sqlite3.connect(self.db_path, timeout=self.timeout) conn = sqlite3.connect(self.db_path, timeout=self.timeout)
try: try:
conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA journal_mode=WAL;")
@ -57,11 +57,22 @@ class SQLiteDeviceStore:
enabled INTEGER NOT NULL, enabled INTEGER NOT NULL,
scrape_interval_seconds INTEGER, scrape_interval_seconds INTEGER,
supports_xpath INTEGER NOT NULL DEFAULT 0, supports_xpath INTEGER NOT NULL DEFAULT 0,
vendor TEXT,
created_at INTEGER NOT NULL, created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL updated_at INTEGER NOT NULL
); );
""" """
) )
# 防御性补列:针对旧版本 DB 中尚未包含 vendor 列的情况
try:
conn.execute("ALTER TABLE devices ADD COLUMN vendor TEXT;")
except sqlite3.OperationalError as exc: # noqa: PERF203
msg = str(exc).lower()
# 对于“重复列”等情况忽略,其它错误抛出
if "duplicate column" in msg or "already exists" in msg:
pass
else:
raise
conn.commit() conn.commit()
finally: finally:
conn.close() conn.close()
@ -86,11 +97,11 @@ class SQLiteDeviceStore:
""" """
INSERT OR REPLACE INTO devices ( INSERT OR REPLACE INTO devices (
name, host, port, username, password_cipher, enabled, name, host, port, username, password_cipher, enabled,
scrape_interval_seconds, supports_xpath, scrape_interval_seconds, supports_xpath, vendor,
created_at, updated_at created_at, updated_at
) VALUES ( ) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
COALESCE( COALESCE(
(SELECT created_at FROM devices WHERE name=?), (SELECT created_at FROM devices WHERE name=?),
? ?
@ -107,6 +118,7 @@ class SQLiteDeviceStore:
int(cfg.enabled), int(cfg.enabled),
cfg.scrape_interval_seconds, cfg.scrape_interval_seconds,
int(cfg.supports_xpath), int(cfg.supports_xpath),
cfg.vendor,
cfg.name, cfg.name,
now_ts, now_ts,
now_ts, now_ts,
@ -143,7 +155,7 @@ class SQLiteDeviceStore:
""" """
SELECT SELECT
name, host, port, username, password_cipher, name, host, port, username, password_cipher,
enabled, scrape_interval_seconds, supports_xpath enabled, scrape_interval_seconds, supports_xpath, vendor
FROM devices; FROM devices;
""" """
) )
@ -158,8 +170,12 @@ class SQLiteDeviceStore:
enabled, enabled,
scrape_interval_seconds, scrape_interval_seconds,
supports_xpath, supports_xpath,
vendor,
) in rows: ) in rows:
password = self.encryptor.decrypt(password_cipher) password = self.encryptor.decrypt(password_cipher)
vendor_norm = None
if vendor is not None:
vendor_norm = str(vendor).strip().lower() or None
dev = DeviceConfig( dev = DeviceConfig(
name=name, name=name,
host=host, host=host,
@ -169,6 +185,7 @@ class SQLiteDeviceStore:
enabled=bool(enabled), enabled=bool(enabled),
scrape_interval_seconds=scrape_interval_seconds, scrape_interval_seconds=scrape_interval_seconds,
supports_xpath=bool(supports_xpath), supports_xpath=bool(supports_xpath),
vendor=vendor_norm,
source="runtime", source="runtime",
) )
devices.append(dev) devices.append(dev)

View File

@ -5,7 +5,7 @@ import sqlite3
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from exporter.api import create_app from exporter.api import create_app, DeviceIn
from exporter.config import DeviceConfig, GlobalConfig from exporter.config import DeviceConfig, GlobalConfig
from exporter.metrics import TransceiverCollector from exporter.metrics import TransceiverCollector
from exporter.models import DeviceHealthState, DeviceMetricsSnapshot from exporter.models import DeviceHealthState, DeviceMetricsSnapshot
@ -46,6 +46,17 @@ def app_with_registry(global_cfg) -> Tuple[TestClient, DeviceRegistry]:
return _make_app_and_registry(global_cfg) return _make_app_and_registry(global_cfg)
def test_devicein_vendor_validator_accepts_none():
# vendor 省略时应保持为 None走 validator 的 None 分支
d = DeviceIn(
name="dev1",
host="192.0.2.1",
username="u",
password="p",
)
assert d.vendor is None
def test_get_devices_requires_auth(app_with_registry): def test_get_devices_requires_auth(app_with_registry):
client, _ = app_with_registry client, _ = app_with_registry
resp = client.get("/api/v1/devices") resp = client.get("/api/v1/devices")
@ -86,6 +97,35 @@ def test_post_device_creates_runtime_device(app_with_registry):
assert any(d.name == "new-device" and d.source == "runtime" for d in devices) assert any(d.name == "new-device" and d.source == "runtime" for d in devices)
def test_post_device_accepts_vendor_and_normalizes(app_with_registry):
client, registry = app_with_registry
device_data = {
"name": "rj-dev",
"host": "192.168.1.200",
"port": 830,
"username": "admin",
"password": "secret",
"enabled": True,
"vendor": " Ruijie ",
}
resp = client.post(
"/api/v1/devices",
headers={"X-API-Token": "changeme"},
json=device_data,
)
assert resp.status_code == 201
body = resp.json()
# API 返回的 vendor 应已被 strip + lower
assert body["vendor"] == "ruijie"
# registry 中也应保存规范化后的 vendor
devices = registry.list_devices()
dev = next(d for d in devices if d.name == "rj-dev")
assert dev.vendor == "ruijie"
def test_post_duplicate_device_returns_409(app_with_registry): def test_post_duplicate_device_returns_409(app_with_registry):
client, _ = app_with_registry client, _ = app_with_registry
@ -152,6 +192,7 @@ def test_delete_static_device_fails(app_with_registry):
port=830, port=830,
username="u", username="u",
password="p", password="p",
vendor="h3c",
source="static", source="static",
) )
registry.register_static_device(static_dev) registry.register_static_device(static_dev)
@ -181,3 +222,24 @@ def test_metrics_endpoint_returns_prometheus_format(app_with_registry):
assert "# HELP" in resp.text assert "# HELP" in resp.text
assert "netconf_scrape_success" in resp.text assert "netconf_scrape_success" in resp.text
def test_get_devices_when_api_token_disabled(tmp_path):
# 当 global.api_token 为空时,/api/v1/devices 不应要求鉴权
gc = GlobalConfig()
gc.api_token = ""
gc.runtime_db_path = str(tmp_path / "devices.db")
gc.password_secret = VALID_FERNET_KEY
encryptor = PasswordEncryptor(gc.password_secret)
store = SQLiteDeviceStore(gc.runtime_db_path, encryptor)
store.init_db()
registry = DeviceRegistry(global_scrape_interval=gc.scrape_interval_seconds)
cache: dict[str, DeviceMetricsSnapshot] = {}
health: dict[str, DeviceHealthState] = {}
collector = TransceiverCollector(cache, health)
app = create_app(registry, store, collector, gc)
client = TestClient(app)
resp = client.get("/api/v1/devices")
assert resp.status_code == 200

View File

@ -0,0 +1,86 @@
import sqlite3
from typing import Any, Dict, List
from fastapi.testclient import TestClient
from exporter.api import create_app
from exporter.config import DeviceConfig, GlobalConfig
from exporter.metrics import TransceiverCollector
from exporter.models import DeviceHealthState, DeviceMetricsSnapshot
from exporter.registry import DeviceRegistry
VALID_FERNET_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
class DummyStore:
"""简单的测试用存储,实现 API 需要的接口,并在指定操作上抛 OperationalError。"""
def __init__(self, fail_on: str | None = None) -> None:
self.fail_on = fail_on
# 与 SQLiteDeviceStore 接口对齐
def init_db(self) -> None: # pragma: no cover - 在这些测试中不会调用
return
def load_runtime_devices(self) -> List[DeviceConfig]:
return []
def save_device(self, cfg: DeviceConfig) -> None:
if self.fail_on == "save":
raise sqlite3.OperationalError("database is locked")
def delete_device(self, name: str) -> None:
if self.fail_on == "delete":
raise sqlite3.OperationalError("database is locked")
def close(self) -> None: # pragma: no cover - 这里无需验证
return
def _build_app_with_dummy_store(fail_on: str | None) -> TestClient:
gc = GlobalConfig()
gc.api_token = "token"
gc.runtime_db_path = ":memory:"
gc.password_secret = VALID_FERNET_KEY
registry = DeviceRegistry(global_scrape_interval=gc.scrape_interval_seconds)
cache: Dict[str, DeviceMetricsSnapshot] = {}
health: Dict[str, DeviceHealthState] = {}
collector = TransceiverCollector(cache, health)
store = DummyStore(fail_on=fail_on)
app = create_app(registry, store, collector, gc)
return TestClient(app)
def test_post_device_sqlite_operational_error_returns_503():
client = _build_app_with_dummy_store(fail_on="save")
payload = {
"name": "dev-save-error",
"host": "192.0.2.10",
"port": 830,
"username": "u",
"password": "p",
"enabled": True,
}
resp = client.post(
"/api/v1/devices",
headers={"X-API-Token": "token"},
json=payload,
)
assert resp.status_code == 503
assert "database is locked" in resp.json()["detail"]
def test_delete_device_sqlite_operational_error_returns_503():
client = _build_app_with_dummy_store(fail_on="delete")
resp = client.delete(
"/api/v1/devices/nonexistent",
headers={"X-API-Token": "token"},
)
assert resp.status_code == 503
assert "database is locked" in resp.json()["detail"]

View File

@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import base64
import pytest import pytest
@ -58,6 +59,31 @@ def test_config_invalid_fernet_key_raises():
Config.from_dict(data) Config.from_dict(data)
def test_config_missing_password_secret_raises():
data = {
"global": {
"runtime_db_path": "./devices.db",
# password_secret 缺失
}
}
with pytest.raises(ValueError, match="global.password_secret must be configured"):
Config.from_dict(data)
def test_config_invalid_fernet_key_length_raises():
# 构造一个合法 base64 但长度不是 32 字节的 key
bad_key_bytes = b"too-short"
bad_key = base64.urlsafe_b64encode(bad_key_bytes).decode()
data = {
"global": {
"runtime_db_path": "./devices.db",
"password_secret": bad_key,
}
}
with pytest.raises(ValueError, match="Invalid Fernet key length"):
Config.from_dict(data)
def test_shutdown_timeout_too_small_warns(): def test_shutdown_timeout_too_small_warns():
data = { data = {
"global": { "global": {
@ -84,3 +110,39 @@ def test_shutdown_timeout_too_small_warns():
with pytest.warns(UserWarning): with pytest.warns(UserWarning):
Config.from_dict(data) Config.from_dict(data)
def test_deviceconfig_vendor_parsed_and_normalized_from_yaml(tmp_path: Path):
yaml_content = f"""
global:
runtime_db_path: "./devices.db"
password_secret: "{VALID_FERNET_KEY}"
devices:
- name: rj-1
host: 192.0.2.10
port: 830
username: u
password: p
enabled: true
supports_xpath: false
vendor: " Ruijie "
- name: h3c-1
host: 198.51.100.10
port: 830
username: u2
password: p2
enabled: true
supports_xpath: false
"""
cfg_file = tmp_path / "config_vendor.yaml"
cfg_file.write_text(yaml_content)
cfg = Config.from_file(cfg_file)
assert len(cfg.devices) == 2
rj = next(d for d in cfg.devices if d.name == "rj-1")
h3c = next(d for d in cfg.devices if d.name == "h3c-1")
# vendor 显式配置应被 strip + lower
assert rj.vendor == "ruijie"
# 未配置 vendor 时应为 None
assert h3c.vendor is None

View File

@ -123,3 +123,21 @@ def test_close_all_closes_all_sessions(monkeypatch, global_cfg):
assert mgr_instances[0].closed is True assert mgr_instances[0].closed is True
assert mgr_instances[1].closed is True assert mgr_instances[1].closed is True
def test_acquire_session_reuses_existing_manager(monkeypatch, global_cfg, device_cfg):
calls: list[dict] = []
def fake_connect(**kwargs):
calls.append(kwargs)
return DummyManager()
monkeypatch.setattr("exporter.connection.ncclient.manager.connect", fake_connect)
cm = ConnectionManager(global_cfg)
# 第一次会触发 connect
sess1 = cm.acquire_session(device_cfg)
# 第二次在会话仍有效时应复用,不再调用 connect
sess2 = cm.acquire_session(device_cfg)
assert sess1 is sess2
assert len(calls) == 1

View File

@ -13,4 +13,9 @@ def test_classify_error_from_exception():
assert classify_error(ET.ParseError()) == "XMLParseError" assert classify_error(ET.ParseError()) == "XMLParseError"
assert classify_error(PermissionError()) == "AuthenticationError" assert classify_error(PermissionError()) == "AuthenticationError"
assert classify_error(RuntimeError("filter failed")) == "FilterError" assert classify_error(RuntimeError("filter failed")) == "FilterError"
# SessionCloseError / SessionError 通过类名匹配
SessionCloseErrorType = type("SessionCloseError", (Exception,), {})
assert classify_error(SessionCloseErrorType("closed")) == "SessionCloseError"
assert classify_error(RuntimeError("something else")) == "UnknownError" assert classify_error(RuntimeError("something else")) == "UnknownError"

View File

@ -29,7 +29,8 @@ def test_exporter_http_end_to_end(tmp_path) -> None:
config: Dict[str, Any] = { config: Dict[str, Any] = {
"global": { "global": {
"http_listen": "127.0.0.1:19100", # 使用 29200 端口,避免与独立部署冲突
"http_listen": "127.0.0.1:29200",
"scrape_interval_seconds": 2, "scrape_interval_seconds": 2,
"rpc_timeout_seconds": 2, "rpc_timeout_seconds": 2,
"shutdown_timeout_seconds": 10, "shutdown_timeout_seconds": 10,
@ -57,12 +58,12 @@ def test_exporter_http_end_to_end(tmp_path) -> None:
text=True, text=True,
) )
base_url = "http://127.0.0.1:19100" base_url = "http://127.0.0.1:29200"
def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None): def _http_request(path: str, method: str = "GET", body: bytes | None = None, headers: Dict[str, str] | None = None):
import http.client import http.client
conn = http.client.HTTPConnection("127.0.0.1", 19100, timeout=5) conn = http.client.HTTPConnection("127.0.0.1", 29200, timeout=5)
try: try:
conn.request(method, path, body=body, headers=headers or {}) conn.request(method, path, body=body, headers=headers or {})
resp = conn.getresponse() resp = conn.getresponse()
@ -93,29 +94,72 @@ def test_exporter_http_end_to_end(tmp_path) -> None:
# server 可能尚未 ready稍后重试 # server 可能尚未 ready稍后重试
time.sleep(0.5) time.sleep(0.5)
# 4. 通过 API 注册一个 runtime 设备 # 4. 通过 API 注册两个 runtime 设备:一个 H3C、一个 Ruijie
device_payload = { # 使用带时间戳的名称,避免受残留 runtime DB 状态影响
"name": "e2e-device-1", base_name = f"e2e-{int(time.time() * 1000)}"
h3c_name = f"{base_name}-h3c"
ruijie_name = f"{base_name}-ruijie"
headers = {
"Content-Type": "application/json",
"X-API-Token": "changeme",
}
# H3C 设备(不显式设置 vendor 或设置为 h3c
h3c_payload = {
"name": h3c_name,
"host": "192.0.2.10", "host": "192.0.2.10",
"port": 830, "port": 830,
"username": "netconf_user", "username": "netconf_user",
"password": "secret", "password": "secret",
"enabled": True, "enabled": True,
} "vendor": "h3c",
headers = {
"Content-Type": "application/json",
"X-API-Token": "changeme",
} }
status, _, data = _http_request( status, _, data = _http_request(
"/api/v1/devices", "/api/v1/devices",
method="POST", method="POST",
body=json.dumps(device_payload).encode("utf-8"), body=json.dumps(h3c_payload).encode("utf-8"),
headers=headers, headers=headers,
) )
assert status == 201, f"unexpected status for POST /api/v1/devices: {status}, body={data!r}" assert status == 201, f"unexpected status for POST /api/v1/devices (h3c): {status}, body={data!r}"
body_json = json.loads(data.decode("utf-8")) body_json = json.loads(data.decode("utf-8"))
assert body_json["name"] == "e2e-device-1" assert body_json["name"] == h3c_name
assert body_json["source"] == "runtime" assert body_json["source"] == "runtime"
assert body_json.get("vendor") == "h3c"
# Ruijie 设备vendor 应被规范化为 ruijie
ruijie_payload = {
"name": ruijie_name,
"host": "192.0.2.11",
"port": 830,
"username": "ruijie",
"password": "secret",
"enabled": True,
"vendor": " Ruijie ",
}
status, _, data = _http_request(
"/api/v1/devices",
method="POST",
body=json.dumps(ruijie_payload).encode("utf-8"),
headers=headers,
)
assert status == 201, f"unexpected status for POST /api/v1/devices (ruijie): {status}, body={data!r}"
body_json = json.loads(data.decode("utf-8"))
assert body_json["name"] == ruijie_name
assert body_json["source"] == "runtime"
assert body_json.get("vendor") == "ruijie"
# 验证 GET /api/v1/devices 能同时看到 H3C 和 Ruijie 两个设备
status, _, data = _http_request(
"/api/v1/devices",
method="GET",
headers=headers,
)
assert status == 200
devices = json.loads(data.decode("utf-8"))
names = {d["name"] for d in devices}
assert h3c_name in names
assert ruijie_name in names
# 5. 访问 /metrics验证 Prometheus 输出可用 # 5. 访问 /metrics验证 Prometheus 输出可用
status, headers_list, data = _http_request("/metrics") status, headers_list, data = _http_request("/metrics")

View File

@ -75,3 +75,30 @@ def test_init_logging_configures_root_logger_handlers() -> None:
for handler in root.handlers for handler in root.handlers
for flt in handler.filters for flt in handler.filters
) )
def test_init_logging_with_file_handler(tmp_path) -> None:
"""当配置 log_file 时,应创建文件 handler 并挂载 DeviceFieldFilter。"""
log_file = tmp_path / "exporter.log"
gc = GlobalConfig(
log_level="INFO",
log_to_stdout=False,
log_file=str(log_file),
log_file_max_bytes=1024,
log_file_backup_count=1,
)
init_logging(gc)
root = logging.getLogger()
# 应至少存在一个 RotatingFileHandler
file_handlers = [
h for h in root.handlers if isinstance(h, logging.handlers.RotatingFileHandler)
]
assert file_handlers
# 并且这些 handler 上也应安装 DeviceFieldFilter
assert any(
isinstance(flt, DeviceFieldFilter)
for h in file_handlers
for flt in h.filters
)

View File

@ -0,0 +1,58 @@
import xml.etree.ElementTree as ET
import pytest
from exporter.netconf_client import parse_netconf_response
RUJIE_SAMPLE_XML = """\
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<data>
<components xmlns="http://openconfig.net/yang/platform">
<component>
<name>TRANSCEIVER-1/0/129-FH0/1:1</name>
<state>
<type>TRANSCEIVER</type>
</state>
<transceiver xmlns="http://openconfig.net/yang/platform/transceiver">
<state>
<serial-no>ABC123</serial-no>
</state>
<physical-channels>
<channel>
<index>1</index>
<state>
<description>TRANSCEIVER-1/0/129/1-FH0/1:1</description>
</state>
</channel>
</physical-channels>
</transceiver>
</component>
</components>
</data>
</rpc-reply>
"""
def test_vendor_none_with_ruijie_sample_uses_h3c_strategy():
txs, chs = parse_netconf_response(RUJIE_SAMPLE_XML, "dev-rj", vendor=None)
assert len(chs) == 1
ch = chs[0]
# H3C 默认策略:冒号前为端口
assert ch.logical_port == "TRANSCEIVER-1/0/129/1-FH0/1"
assert ch.logical_channel == "TRANSCEIVER-1/0/129/1-FH0/1:1"
def test_vendor_none_to_ruijie_changes_labels():
_, chs_none = parse_netconf_response(RUJIE_SAMPLE_XML, "dev-rj", vendor=None)
_, chs_ruijie = parse_netconf_response(RUJIE_SAMPLE_XML, "dev-rj", vendor="ruijie")
assert len(chs_none) == len(chs_ruijie) == 1
ch_none = chs_none[0]
ch_rj = chs_ruijie[0]
assert ch_none.logical_port.startswith("TRANSCEIVER-")
# Ruijie 策略应清洗出短端口 FH0/1
assert ch_rj.logical_port == "FH0/1"
assert ch_rj.logical_channel == "FH0/1:1"

View File

@ -0,0 +1,117 @@
from __future__ import annotations
"""
与真实 Ruijie 设备联调的活体测试用例
说明
- 连接参数通过环境变量注入你已经在 .env 中配置
- RUIJIE_NETCONF_HOST
- RUIJIE_NETCONF_PORT
- RUIJIE_NETCONF_USER
- RUIJIE_NETCONF_PASSWORD
默认行为
- 若未设置 RUIJIE_NETCONF_PASSWORD或无法建立到指定 host:port TCP 连接
则使用 pytest.skip() 自动跳过不影响普通单元测试/CI
- 仅在本地联调时显式设置上述环境变量后此测试才会真正访问设备
"""
import os
import socket
import pytest
from ncclient import manager
from exporter.netconf_client import build_transceiver_filter, parse_netconf_response
RUIJIE_HOST = os.getenv("RUIJIE_NETCONF_HOST", "127.0.0.1")
RUIJIE_PORT = int(os.getenv("RUIJIE_NETCONF_PORT", "9830"))
RUIJIE_USER = os.getenv("RUIJIE_NETCONF_USER", "ruijie1-admin")
RUIJIE_PASSWORD = os.getenv("RUIJIE_NETCONF_PASSWORD", "")
def _can_connect(host: str, port: int, timeout: float = 2.0) -> bool:
"""快速探测 host:port 是否可连,用于决定是否跳过 live 测试。"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout)
sock.connect((host, port))
return True
except OSError:
return False
finally:
sock.close()
@pytest.mark.ruijie_live
def test_ruijie_live_transceiver_rpc_and_parse() -> None:
"""
使用真实 Ruijie 设备验证
- ncclient 能与设备建立 NETCONF 会话
- build_transceiver_filter() 构造的 subtree filter 在设备上可用
- parse_netconf_response(..., vendor='ruijie') 能正确解析设备返回的 XML
"""
if not RUIJIE_PASSWORD:
pytest.skip("RUIJIE_NETCONF_PASSWORD 未设置,跳过 Ruijie live 测试")
if not _can_connect(RUIJIE_HOST, RUIJIE_PORT):
pytest.skip(f"Ruijie NETCONF {RUIJIE_HOST}:{RUIJIE_PORT} 不可达,跳过 live 测试")
flt = build_transceiver_filter()
with manager.connect(
host=RUIJIE_HOST,
port=RUIJIE_PORT,
username=RUIJIE_USER,
password=RUIJIE_PASSWORD,
hostkey_verify=False,
timeout=30,
allow_agent=False,
look_for_keys=False,
) as m:
reply = m.get(filter=("subtree", flt))
xml_str = str(reply)
# vendor="ruijie" 走厂商感知解析路径
transceivers, channels = parse_netconf_response(
xml_str,
device_name=f"ruijie-{RUIJIE_HOST}",
vendor="ruijie",
)
# 只要返回非空结果,就说明 "连接 + filter + 解析" 在真实设备上可以工作
assert transceivers or channels, "Ruijie 设备未返回任何 transceiver/channel 数据"
# 至少有一个 transceiver 拥有对应的 channel
tx_by_component = {t.component_name: t for t in transceivers}
ch_by_component = {}
for ch in channels:
ch_by_component.setdefault(ch.component_name, []).append(ch)
has_tx_with_channel = any(
comp in tx_by_component and len(ch_list) > 0
for comp, ch_list in ch_by_component.items()
)
assert has_tx_with_channel, (
"Ruijie live 数据中未发现“同时存在 transceiver 与 channel”的组件"
"请检查设备返回的 transceiver/physical-channels 数据是否完整"
)
# 至少有一个 channel 具有 rx 或 tx power 数值
channels_with_power = [
ch
for ch in channels
if ch.rx_power_dbm is not None or ch.tx_power_dbm is not None
]
assert channels_with_power, (
"Ruijie live 数据中未发现带 rx/tx power 的通道,"
"请检查设备是否开启了相关光功率采集"
)
# 额外 sanity 检查:至少有一个端口 label 不以 TRANSCEIVER- 开头,验证清洗逻辑生效
ports = {t.logical_port for t in transceivers} | {c.logical_port for c in channels}
assert any(not p.startswith("TRANSCEIVER-") for p in ports), (
"Ruijie live 数据中未发现清洗后的端口名,"
"请检查 vendor='ruijie' 解析逻辑是否生效"
)

View File

@ -0,0 +1,160 @@
import sqlite3
from pathlib import Path
import pytest
from exporter.config import DeviceConfig
from exporter.sqlite_store import PasswordEncryptor, SQLiteDeviceStore
@pytest.fixture
def encryptor() -> PasswordEncryptor:
# 生成一个有效 Fernet key
from cryptography.fernet import Fernet
key = Fernet.generate_key().decode()
return PasswordEncryptor(key)
def test_init_db_creates_vendor_column_on_fresh_db(tmp_path: Path, encryptor: PasswordEncryptor):
db_path = tmp_path / "test_vendor.db"
store = SQLiteDeviceStore(str(db_path), encryptor)
store.init_db()
conn = sqlite3.connect(str(db_path))
cols = [row[1] for row in conn.execute("PRAGMA table_info(devices)").fetchall()]
conn.close()
assert "vendor" in cols
def test_init_db_alter_table_vendor_preserves_existing_rows(tmp_path: Path, encryptor: PasswordEncryptor):
db_path = tmp_path / "legacy.db"
conn = sqlite3.connect(str(db_path))
# 创建旧版本 devices 表(无 vendor 列)
conn.execute(
"""
CREATE TABLE devices (
name TEXT PRIMARY KEY,
host TEXT NOT NULL,
port INTEGER NOT NULL,
username TEXT NOT NULL,
password_cipher BLOB NOT NULL,
enabled INTEGER NOT NULL,
scrape_interval_seconds INTEGER,
supports_xpath INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
"""
)
conn.execute(
"INSERT INTO devices (name, host, port, username, password_cipher, enabled, "
"scrape_interval_seconds, supports_xpath, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
("old-dev", "h", 830, "u", b"cipher", 1, None, 0, 1, 1),
)
conn.commit()
conn.close()
store = SQLiteDeviceStore(str(db_path), encryptor)
store.init_db()
conn2 = sqlite3.connect(str(db_path))
row = conn2.execute("SELECT name, vendor FROM devices WHERE name = 'old-dev'").fetchone()
conn2.close()
assert row is not None
assert row[0] == "old-dev"
# 旧数据 vendor 应为空
assert row[1] is None
def test_save_and_load_device_persists_vendor(tmp_path: Path, encryptor: PasswordEncryptor):
db_path = tmp_path / "vendor_persist.db"
store = SQLiteDeviceStore(str(db_path), encryptor)
store.init_db()
dev = DeviceConfig(
name="dev-vendor",
host="h",
port=830,
username="u",
password="p",
enabled=True,
vendor="ruijie",
source="runtime",
)
store.save_device(dev)
loaded = store.load_runtime_devices()
assert len(loaded) == 1
assert loaded[0].name == "dev-vendor"
assert loaded[0].vendor == "ruijie"
def test_save_and_load_device_vendor_none_roundtrip(tmp_path: Path, encryptor: PasswordEncryptor):
db_path = tmp_path / "vendor_none.db"
store = SQLiteDeviceStore(str(db_path), encryptor)
store.init_db()
dev = DeviceConfig(
name="dev-no-vendor",
host="h",
port=830,
username="u",
password="p",
enabled=True,
vendor=None,
source="runtime",
)
store.save_device(dev)
loaded = store.load_runtime_devices()
assert len(loaded) == 1
assert loaded[0].name == "dev-no-vendor"
assert loaded[0].vendor is None
def test_init_db_alter_table_silently_ignores_duplicate_column_error(tmp_path: Path, encryptor: PasswordEncryptor):
db_path = tmp_path / "dup_col.db"
store = SQLiteDeviceStore(str(db_path), encryptor)
# 第一次初始化,创建带 vendor 列的表
store.init_db()
# 第二次调用,不应抛异常
store.init_db()
def test_init_db_alter_table_raises_on_non_duplicate_errors(tmp_path: Path, encryptor: PasswordEncryptor, monkeypatch):
db_path = tmp_path / "locked.db"
real_connect = sqlite3.connect
class ConnWrapper:
def __init__(self, inner: sqlite3.Connection) -> None:
self._inner = inner
self._alter_attempted = False
def execute(self, sql: str, *args, **kwargs):
# 在第一次尝试 ALTER TABLE 时注入错误
if "ALTER TABLE devices ADD COLUMN vendor" in sql and not self._alter_attempted:
self._alter_attempted = True
raise sqlite3.OperationalError("database is locked")
return self._inner.execute(sql, *args, **kwargs)
def commit(self) -> None:
return self._inner.commit()
def close(self) -> None:
return self._inner.close()
def wrapped_connect(*args, **kwargs):
conn = real_connect(*args, **kwargs)
return ConnWrapper(conn)
monkeypatch.setattr(sqlite3, "connect", wrapped_connect)
store = SQLiteDeviceStore(str(db_path), encryptor)
with pytest.raises(sqlite3.OperationalError):
store.init_db()