5

使用示例代码在 celery 4.1.1 中跟踪任务。每个工人运行:

import logging
from jaeger_client import Config
import opentracing 

def get_tracer(service="Vienna"):
    config = Config(
        config={

            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name=service,
    )
    return config.initialize_tracer() or opentracing.global_tracer()

当我第一次启动 celery 并运行任务时,每个工作人员都会获得一个工作跟踪器,并且每个工作人员都有一个日志输出:

[2019-07-04 19:17:00,527: INFO/ForkPoolWorker-2] Initializing Jaeger Tracer with UDP reporter
[2019-07-04 19:17:00,546: INFO/ForkPoolWorker-2] opentracing.tracer initialized to <jaeger_client.tracer.Tracer object at 0x7f804d079c10>[app_name=SocketIOTask]

在初始之后运行的任何任务都会从Config.initialze_tracer(返回None)和日志警告中获取全局跟踪器Jaeger tracer already initialized, skipping

在控制台上观看 tcpdump 表明没有发送 UDP 数据包,我想我得到了一个未初始化的默认跟踪器,它正在使用 noop 报告器。

我仔细研究了 opentracing 和 jaeger_client 中的代码,但找不到解决此问题的规范方法。

4

2 回答 2

2
from jaeger_client import Config
def get_tracer(service="Vienna"):

    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name=service,
     )

    tracer = config.initialize_tracer()
    if tracer is None:
        Config._initialized = False
        tracer = config.initialize_tracer()
    return tracer
于 2019-12-17T08:11:44.727 回答
2

Celery forks multiple processes by default (--pool=prefork). The problem is that jaeger_client performs that actual message sending in a separate thread. This thread is created before forking and not available in the subprocesses. As the result, logging from subprocesses doesn't work.

The most obvious solution is to use --pool=solo, -pool=threads etc. But in the case of CPU-bound tasks, we still need to use --pool=prefork. In this case, we get another issue.

On the one side, jaeger_client is designed to be used as a singleton. It opens file descriptors but never closes them (even close() is more flush than close). On the other side, we need to create a separate tracer for every celery process.

In order to solve the issue, I used the following workaround:

tracer = None
tracer_owner_pid = 0

def get_jaeger_tracer():
    global tracer_owner_pid
    global tracer

    pid = os.getpid()
    if tracer_owner_pid == pid:
        return tracer

    logging.getLogger('').handlers = []
    logging.basicConfig(format='%(message)s', level=logging.DEBUG)
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name='foo',
    )

    tracer = config.new_tracer()
    tracer_owner_pid = pid
    return tracer
于 2021-04-30T11:23:22.237 回答