我编写了一个小脚本来检测 Flask 应用程序,我想编写单元测试,其中每个测试都可以针对模拟 Flask 应用程序编写请求并测试指标,而无需处理来自以前测试方法的指标/请求,如下所示:
def test_grouped_codes():
app = create_app()
instrument(app)
# test stuff
但我无法“重置”注册表,因此我总是收到错误“CollectorRegistry 中的时间序列重复”。
如何在运行时重置 Prometheus Python 客户端库的注册表(或将其设置为空注册表)?
除其他外,我尝试了以下方法,但它不起作用:
def create_app():
app = Flask(__name__)
registry = CollectorRegistry() # Create new registry.
prometheus_client.REGISTRY = registry # Try to override global registry.
prometheus_client.registry.REGISTRY = registry # Try to override global registry.
@app.route("/")
def home():
return "Hello World!"
# More functions ...
@app.route("/metrics")
@FlaskInstrumentator.do_not_track()
def metrics():
data = generate_latest(registry)
headers = {
"Content-Type": CONTENT_TYPE_LATEST,
"Content-Length": str(len(data))}
return data, 200, headers
return app
我在堆栈溢出上发现了以下问答here https://stackoverflow.com/questions/57095978/reset-collectorregistry-of-prometheus-lib-after-each-unit-test。 @brian-brazil 建议在模块级别声明指标,但随后我必须对我想避免的标签名称进行硬编码。一些使用handler
, 其他method
or path
所以我想保留它的可定制性。
好的,感谢提示this https://stackoverflow.com/a/57096527/7391331@Xitrum“取消注册方法”的回答我找到了一个解决方案:
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)
现在所有测试都可以从自己的注册表开始:
def test_metrics_endpoint_availability():
app = create_app()
FlaskInstrumentator(app).instrument()
client = app.test_client()
response = client.get("/")
response = client.get("/metrics")
# Test stuff
def test_grouped_status_codes():
app = create_app()
FlaskInstrumentator(app).instrument()
client = app.test_client()
client.get("/does_not_exist") # Should be ignored.
client.get("/does_not_exist") # Should be ignored.
client.post("/")
client.post("/")
# Test stuff
编辑2023
这是我用于 atm 单元测试的辅助函数:
from prometheus_client import REGISTRY
def reset_prom_collectors() -> None:
"""Resets collectors in the default Prometheus registry.
Modifies the `REGISTRY` registry. Supposed to be called at the beginning
of individual test functions. Else registry is reused across test functions
and so we can run into errors like duplicate metrics or unexpected values
for metrics.
"""
# Unregister all collectors.
collectors = list(REGISTRY._collector_to_names.keys())
print(f"before unregister collectors={collectors}")
for collector in collectors:
REGISTRY.unregister(collector)
print(f"after unregister collectors={list(REGISTRY._collector_to_names.keys())}")
# Import default collectors.
from prometheus_client import gc_collector, platform_collector, process_collector
# Re-register default collectors.
process_collector.ProcessCollector()
platform_collector.PlatformCollector()
gc_collector.GCCollector()
print(f"after re-register collectors={list(REGISTRY._collector_to_names.keys())}")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)