from __future__ import annotations from typing import Dict, Iterable, List from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily, InfoMetricFamily from .models import DeviceHealthState, DeviceMetricsSnapshot class TransceiverCollector: """ 自定义 Collector,基于 MetricsCache 与 HealthState 输出所有指标。 """ def __init__( self, cache: Dict[str, DeviceMetricsSnapshot], health: Dict[str, DeviceHealthState], ) -> None: self._cache = cache self._health = health def collect(self) -> Iterable: # 复制快照,避免长时间持锁(由调用方负责线程安全) cache_items = list(self._cache.items()) health_items = list(self._health.items()) # 健康指标 scrape_duration = GaugeMetricFamily( "netconf_scrape_duration_seconds", "Duration of last NETCONF scrape per device", labels=["device"], ) scrape_success = GaugeMetricFamily( "netconf_scrape_success", "Whether last NETCONF scrape succeeded (1) or failed (0)", labels=["device"], ) last_scrape_ts = GaugeMetricFamily( "netconf_last_scrape_timestamp_seconds", "Timestamp of last NETCONF scrape per device", labels=["device"], ) staleness = GaugeMetricFamily( "transceiver_data_staleness_seconds", "Age of last successful transceiver scrape per device", labels=["device"], ) errors_total = CounterMetricFamily( "netconf_scrape_errors_total", "Total number of NETCONF scrape errors by device and type", labels=["device", "error_type"], ) # health map for quick lookup health_map = dict(health_items) # 填充健康指标 for device, hs in health_items: if hs.last_scrape_duration is not None: scrape_duration.add_metric([device], hs.last_scrape_duration) if hs.last_scrape_success is not None: scrape_success.add_metric([device], 1.0 if hs.last_scrape_success else 0.0) if hs.last_scrape_timestamp is not None: last_scrape_ts.add_metric([device], hs.last_scrape_timestamp) if hs.last_error_type: errors_total.add_metric([device, hs.last_error_type], 1.0) # 业务指标 tx_temp = GaugeMetricFamily( "transceiver_temperature_celsius", "Transceiver temperature in degrees Celsius", labels=["device", "port", "component_name"], ) tx_supply_v = GaugeMetricFamily( "transceiver_supply_voltage_volts", "Transceiver supply voltage", labels=["device", "port", "component_name"], ) tx_present = GaugeMetricFamily( "transceiver_present", "Transceiver present state (1 present, 0 otherwise)", labels=["device", "port", "component_name"], ) ch_rx_power = GaugeMetricFamily( "transceiver_channel_rx_power_dbm", "Receive optical power per channel (dBm)", labels=["device", "port", "channel", "component_name"], ) ch_tx_power = GaugeMetricFamily( "transceiver_channel_tx_power_dbm", "Transmit optical power per channel (dBm)", labels=["device", "port", "channel", "component_name"], ) ch_bias = GaugeMetricFamily( "transceiver_channel_bias_current_ma", "Laser bias current per channel (mA)", labels=["device", "port", "channel", "component_name"], ) ch_laser_temp = GaugeMetricFamily( "transceiver_channel_laser_temperature_celsius", "Laser temperature per channel (Celsius)", labels=["device", "port", "channel", "component_name"], ) tx_info = InfoMetricFamily( "transceiver_info", "Transceiver static information", labels=["device", "port", "component_name", "vendor", "serial", "form_factor", "part_number", "hardware_rev"], ) ch_info = InfoMetricFamily( "transceiver_channel_info", "Transceiver channel info", labels=["device", "port", "channel", "channel_index", "component_name"], ) # 为 last_scrape_success==True 的设备导出业务指标 for device, snapshot in cache_items: hs = health_map.get(device) if not hs or not hs.last_scrape_success: continue # transceiver 指标 for t in snapshot.transceivers: labels = [t.device, t.logical_port, t.component_name] if t.temperature_c is not None: tx_temp.add_metric(labels, t.temperature_c) if t.supply_voltage_v is not None: tx_supply_v.add_metric(labels, t.supply_voltage_v) if t.present is not None: present_val = 1.0 if t.present.upper() == "PRESENT" else 0.0 tx_present.add_metric(labels, present_val) # Info info_labels = [ t.device, t.logical_port, t.component_name, t.vendor or "", t.serial or "", "", # form_factor 留空占位 t.part_number or "", t.hardware_rev or "", ] tx_info.add_metric(info_labels, {}) # channel 指标 for ch in snapshot.channels: labels = [ch.device, ch.logical_port, ch.logical_channel, ch.component_name] if ch.rx_power_dbm is not None: ch_rx_power.add_metric(labels, ch.rx_power_dbm) if ch.tx_power_dbm is not None: ch_tx_power.add_metric(labels, ch.tx_power_dbm) if ch.bias_current_ma is not None: ch_bias.add_metric(labels, ch.bias_current_ma) if ch.laser_temperature_c is not None: ch_laser_temp.add_metric(labels, ch.laser_temperature_c) ch_info.add_metric( [ ch.device, ch.logical_port, ch.logical_channel, str(ch.channel_index), ch.component_name, ], {}, ) # 依次 yield 所有指标 yield scrape_duration yield scrape_success yield last_scrape_ts yield staleness yield errors_total yield tx_temp yield tx_supply_v yield tx_present yield ch_rx_power yield ch_tx_power yield ch_bias yield ch_laser_temp yield tx_info yield ch_info