175 lines
5.3 KiB
Python
175 lines
5.3 KiB
Python
import xml.etree.ElementTree as ET
|
|
|
|
import pytest
|
|
|
|
from exporter.netconf_client import (
|
|
build_transceiver_filter,
|
|
parse_netconf_response,
|
|
parse_port_and_channel,
|
|
)
|
|
|
|
|
|
def test_build_transceiver_filter_contains_expected_paths():
|
|
flt = build_transceiver_filter()
|
|
assert "<components" in flt
|
|
assert "platform/transceiver" in flt
|
|
|
|
|
|
SAMPLE_XML = """\
|
|
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
|
|
<data>
|
|
<components xmlns="http://openconfig.net/yang/platform">
|
|
<component>
|
|
<name>323.FourHundredGigE1/0/66</name>
|
|
<state>
|
|
<type>TRANSCEIVER</type>
|
|
<temperature>
|
|
<instant>45.5</instant>
|
|
</temperature>
|
|
</state>
|
|
<transceiver xmlns="http://openconfig.net/yang/platform/transceiver">
|
|
<state>
|
|
<present>PRESENT</present>
|
|
<vendor>Cisco</vendor>
|
|
<serial-no>ABC123</serial-no>
|
|
</state>
|
|
<physical-channels>
|
|
<channel>
|
|
<index>0</index>
|
|
<state>
|
|
<description>1/0/66:1</description>
|
|
<output-power><instant>-2.5</instant></output-power>
|
|
</state>
|
|
</channel>
|
|
</physical-channels>
|
|
</transceiver>
|
|
</component>
|
|
</components>
|
|
</data>
|
|
</rpc-reply>
|
|
"""
|
|
|
|
|
|
def test_parse_transceiver_basic():
|
|
txs, chs = parse_netconf_response(SAMPLE_XML, "dev1")
|
|
assert len(txs) == 1
|
|
r = txs[0]
|
|
assert r.device == "dev1"
|
|
assert r.component_name == "323.FourHundredGigE1/0/66"
|
|
assert r.temperature_c == 45.5
|
|
assert r.vendor == "Cisco"
|
|
assert r.serial == "ABC123"
|
|
assert r.logical_port == "1/0/66"
|
|
assert len(chs) == 1
|
|
|
|
|
|
def test_parse_channel_and_description():
|
|
_, chs = parse_netconf_response(SAMPLE_XML, "dev1")
|
|
assert len(chs) == 1
|
|
ch = chs[0]
|
|
assert ch.logical_port == "1/0/66"
|
|
assert ch.logical_channel == "1/0/66:1"
|
|
assert ch.tx_power_dbm == -2.5
|
|
|
|
|
|
def test_missing_temperature_not_fatal():
|
|
root = ET.fromstring(SAMPLE_XML)
|
|
# remove temperature node
|
|
ns_platform = "{http://openconfig.net/yang/platform}"
|
|
temp_elem = root.find(f".//{ns_platform}temperature")
|
|
parent = root.find(f".//{ns_platform}state")
|
|
if temp_elem is not None and parent is not None:
|
|
parent.remove(temp_elem)
|
|
xml_no_temp = ET.tostring(root, encoding="unicode")
|
|
|
|
txs, _ = parse_netconf_response(xml_no_temp, "dev1")
|
|
assert txs[0].temperature_c is None
|
|
|
|
|
|
def test_invalid_power_filtered():
|
|
xml_invalid = SAMPLE_XML.replace("-2.5", "2147483647.00")
|
|
_, chs = parse_netconf_response(xml_invalid, "dev1")
|
|
assert chs[0].tx_power_dbm is None
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"description, component_name, index, expected_port, expected_channel",
|
|
[
|
|
(None, "comp1", 0, "comp1", "comp1:ch0"),
|
|
("", "comp1", 1, "comp1", "comp1:ch1"),
|
|
("1/0/1:1", "comp1", 2, "1/0/1", "1/0/1:1"),
|
|
("GigabitEthernet1/0/1", "comp1", 3, "GigabitEthernet1/0/1", "GigabitEthernet1/0/1:ch3"),
|
|
],
|
|
)
|
|
def test_parse_port_and_channel_fallbacks(
|
|
description, component_name, index, expected_port, expected_channel
|
|
):
|
|
lp, lc = parse_port_and_channel(description, component_name, index)
|
|
assert lp == expected_port
|
|
assert lc == expected_channel
|
|
|
|
|
|
def test_multiple_channels_missing_description_produce_unique_logical_channel():
|
|
xml = """\
|
|
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
|
|
<data>
|
|
<components xmlns="http://openconfig.net/yang/platform">
|
|
<component>
|
|
<name>comp1</name>
|
|
<state><type>TRANSCEIVER</type></state>
|
|
<transceiver xmlns="http://openconfig.net/yang/platform/transceiver">
|
|
<physical-channels>
|
|
<channel>
|
|
<index>0</index>
|
|
<state></state>
|
|
</channel>
|
|
<channel>
|
|
<index>1</index>
|
|
<state></state>
|
|
</channel>
|
|
</physical-channels>
|
|
</transceiver>
|
|
</component>
|
|
</components>
|
|
</data>
|
|
</rpc-reply>
|
|
"""
|
|
_, channels = parse_netconf_response(xml, "dev1")
|
|
assert len(channels) == 2
|
|
ch0 = next(c for c in channels if c.channel_index == 0)
|
|
ch1 = next(c for c in channels if c.channel_index == 1)
|
|
assert ch0.logical_port == "comp1"
|
|
assert ch1.logical_port == "comp1"
|
|
assert ch0.logical_channel != ch1.logical_channel
|
|
|
|
|
|
def test_h3c_multi_component_same_serial():
|
|
xml_multi = """\
|
|
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
|
|
<data>
|
|
<components xmlns="http://openconfig.net/yang/platform">
|
|
<component>
|
|
<name>63.TwoHundredGigE1/0/1:1</name>
|
|
<state><type>TRANSCEIVER</type></state>
|
|
<transceiver xmlns="http://openconfig.net/yang/platform/transceiver">
|
|
<state><serial-no>SN001</serial-no></state>
|
|
</transceiver>
|
|
</component>
|
|
<component>
|
|
<name>64.TwoHundredGigE1/0/1:2</name>
|
|
<state><type>TRANSCEIVER</type></state>
|
|
<transceiver xmlns="http://openconfig.net/yang/platform/transceiver">
|
|
<state><serial-no>SN001</serial-no></state>
|
|
</transceiver>
|
|
</component>
|
|
</components>
|
|
</data>
|
|
</rpc-reply>
|
|
"""
|
|
txs, _ = parse_netconf_response(xml_multi, "h3c")
|
|
assert len(txs) == 2
|
|
assert txs[0].serial == "SN001"
|
|
assert txs[1].serial == "SN001"
|
|
assert txs[0].component_name != txs[1].component_name
|
|
|