47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
import threading
|
||
import time
|
||
from types import SimpleNamespace
|
||
|
||
from exporter.config import DeviceConfig, GlobalConfig
|
||
from exporter.registry import DeviceRegistry
|
||
from exporter.scraper import scraper_loop
|
||
from exporter.connection import ConnectionManager
|
||
|
||
|
||
def test_scraper_stop_during_sleep(monkeypatch):
|
||
"""
|
||
在等待间隔时设置 stop_event,应立即退出,而不是等完整周期。
|
||
"""
|
||
global_cfg = GlobalConfig()
|
||
global_cfg.scrape_interval_seconds = 60
|
||
|
||
registry = DeviceRegistry(global_scrape_interval=global_cfg.scrape_interval_seconds)
|
||
connection_mgr = ConnectionManager(global_cfg)
|
||
|
||
# netconf_get_rpc: 不做任何事
|
||
def fake_get_rpc(_mgr, _flt):
|
||
return "<rpc-reply><data/></rpc-reply>"
|
||
|
||
cache = {}
|
||
health = {}
|
||
|
||
# run_one_scrape_round 在当前实现中是真实采集,但我们不关心,只要快速返回即可
|
||
stop_event = threading.Event()
|
||
start = time.time()
|
||
|
||
t = threading.Thread(
|
||
target=scraper_loop,
|
||
args=(stop_event, registry, connection_mgr, fake_get_rpc, cache, health, global_cfg),
|
||
daemon=True,
|
||
)
|
||
t.start()
|
||
|
||
time.sleep(1.0) # 给 loop 一点时间进入 wait
|
||
stop_event.set()
|
||
t.join(timeout=5.0)
|
||
|
||
elapsed = time.time() - start
|
||
# 如果 stop_event.wait 工作正常,不应等完整 60 秒
|
||
assert elapsed < 3.0
|
||
|