0

InMemorySpanExporter测试类完成后打开遥测未重置。这导致span_list = self.memory_exporter.get_finished_spans()第二个测试类为空。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from contextlib import contextmanager
import unittest

tracer = trace.get_tracer(__name__)


@contextmanager
def method(name):
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise
          
          
class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.tracer_provider = TracerProvider()
        cls.memory_exporter = InMemorySpanExporter()
        cls.span_processor = SimpleSpanProcessor(cls.memory_exporter)
        cls.tracer_provider.add_span_processor(cls.span_processor)
        trace.set_tracer_provider(cls.tracer_provider)

    def tearDown(self):
        self.memory_exporter.clear()

class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(span_list[0].status.description, "Exception: Failed trace 1")
        self.assertEqual(span_list[0].name, "TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(span_list[0].status.description, "Exception: Failed traceB 1")
        self.assertEqual(span_list[0].name, "TestTracingB1")

仅运行测试类 1 的命令:py.test tracing.py::TestTracing:成功执行。

仅运行测试类 2 的命令py.test tracing.py::TestTracingB::成功执行。

但是同时运行两个测试类的命令:py.test tracing.py

输出:tracing.py .F第一次成功,第二次测试失败。

FAILED tracing.py::TestTracingB::test_method_2 - AssertionError: 0 != 1

即使我memory_exporter.clear()在每次测试中都使用了 in tera down。

对于该setUpClass方法,如果我将其更改为设置,则同一类中的多个测试将开始失败,只有第一个将通过其余测试将失败。

4

2 回答 2

2

您的描述不清楚,但我可以分享您遇到的原因AssertionError。发生这种情况是因为一旦设置了全局跟踪器提供程序,我们就不允许设置它;链接到执行此操作的代码。只能有一个全局跟踪器提供程序。因此,当trace.set_tracer_provider在第二次测试中调用它时,它会记录警告而不做任何事情,因此您第二次尝试设置管道不成功,即第二个导出器从未收到跨度。

于 2021-06-03T19:32:21.923 回答
1

Srikanth Chekuri 击败了我:这里的基本问题是第二次调用set_tracer_provider没有做任何事情,你最终使用与第一次设置调用相同的导出器。

不幸的是,跟踪器提供程序是全局的,只配置一次,并且该库没有提供一种简单的方法来交换提供程序或跨处理器以进行此类测试。

就像暴力引发暴力一样,全局变量产生全局变量。解决此问题的一种(丑陋)方法是将您的测试导出器也设为全局,并且仅在初始化测试时将其附加到跟踪提供程序。这意味着所有测试实例都OpenTelemetryBase将共享同一个导出器,因此您可以在拆卸时清除已完成的跨度,并且测试可能无法并行工作。

from contextlib import contextmanager
import logging
import unittest

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter


logger = logging.getLogger(__name__)

# Only here for demonstration reasons
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

# Typically called somewhere else. If this isn't called anywhere we get the
# default ProxyTracerProvider provider, which doesn't support adding span
# processors. This is usually only the case if the OT SDK isn't installed.
trace.set_tracer_provider(TracerProvider())

_TEST_OT_EXPORTER = None
_TEST_OT_PROVIDER_INITIALIZED = False

def get_test_ot_exporter():
    global _TEST_OT_EXPORTER

    if _TEST_OT_EXPORTER is None:
        _TEST_OT_EXPORTER = InMemorySpanExporter()
    return _TEST_OT_EXPORTER

def use_test_ot_exporter():
    global _TEST_OT_PROVIDER_INITIALIZED

    if _TEST_OT_PROVIDER_INITIALIZED:
        logger.info("Skipping repeated call to use_test_ot_exporter")
        return

    provider = trace.get_tracer_provider()
    if not hasattr(provider, 'add_span_processor'):
        logger.warn("OT TracerProvider has no add_span_processor. Is the OT "
                    "SDK installed?")
        return
    provider.add_span_processor(SimpleSpanProcessor(get_test_ot_exporter()))
    _TEST_OT_PROVIDER_INITIALIZED = True


@contextmanager
def method(name):
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise


class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        use_test_ot_exporter()
        cls.ot_exporter = get_test_ot_exporter()

    def tearDown(self):
        self.ot_exporter.clear()


class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(
            span_list[0].status.description, "Exception: Failed trace 1"
        )
        self.assertEqual(span_list[0].name, "TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(
            span_list[0].status.description, "Exception: Failed traceB 1"
        )
        self.assertEqual(span_list[0].name, "TestTracingB1")

直到 OT python 库添加了更好的方法来临时替换跟踪器提供程序,或者临时将跨度处理器或导出器添加到全局提供程序,这可能是侵入性最小的方法。

于 2021-06-03T20:02:49.547 回答