3

我正在尝试将 structlog 集成到使用multiprocessing. 虽然我计划structlog完全切换到,但我也想捕获logging第三部分库的 stdlib 调用。由于我们计划发出 JSON 日志文件和键值输出,我认为正确的方法是将实际格式委托给logging

    logging   \
               structlog processors (worker) -> logging queuehandler -> logging handler (main process) -> stdlib logging output
    structlog /

根据https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes我们使用 aQueueHandler将所有日志消息发送到主进程写作。

这是我目前的配置:

import logging.handlers
import multiprocessing
import sys
import traceback

import structlog
from structlog.processors import JSONRenderer, KeyValueRenderer


def proc_info(logger, method, event_dict):
    event_dict['worker-process'] = multiprocessing.current_process().name
    return event_dict

pre_chain = [
    structlog.stdlib.add_log_level,
    proc_info
]

def configure_root_logging():
    logging.config.dictConfig({
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "plain": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": KeyValueRenderer(),
                    "foreign_pre_chain": pre_chain,
                },
                "json": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": JSONRenderer(),
                    "foreign_pre_chain": pre_chain,
                },
            },
            "handlers": {
                "console_kvp": {
                    "level": "DEBUG",
                    "class": "logging.StreamHandler",
                    "formatter": "json",
                },
                "console_json": {
                    "level": "DEBUG",
                    "class": "logging.StreamHandler",
                    "formatter": "plain",
                },
            },
            "loggers": {
                "": {
                    "handlers": ["console_kvp", "console_json"],
                    "level": "DEBUG",
                    "propagate": True,
                },
            }
    })


def configure_structlog():
    structlog.configure(
        processors=pre_chain + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )


def setup_worker_logging(logging_queue):
    root = logging.getLogger()
    queue_handler = logging.handlers.QueueHandler(logging_queue)
    root.addHandler(queue_handler)
    root.setLevel(logging.INFO)


def worker_entrypoint(logging_queue):
    setup_worker_logging(logging_queue)
    configure_structlog()

    logger = logging.getLogger(__name__)
    logger.warning("worker_startup_std_logging")

    slogger = structlog.get_logger()
    slogger.warning("worker_startup_structlog")


def handle_logging_queue_messages(queue):
    while True:
        try:
            record = queue.get()
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except KeyboardInterrupt:
            break
        except Exception:
            print('LOGGING FAILED!!! ', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


def main():
    logging_queue = multiprocessing.Queue()

    t = multiprocessing.Process(
        target=worker_entrypoint, args=[logging_queue], daemon=True,
        name='worker-9999')
    t.start()

    configure_root_logging()
    configure_structlog()

    handle_logging_queue_messages(logging_queue)

    slogger = structlog.get_logger()
    slogger.warning("Startup")


main()

注意:我在调试过程中对两种输出格式都使用了 StreamHandler,但 JSON 格式的输出FileHandler最终会输出,因此 QueueHandler 介于两者之间。

输出是:

{"event": "worker_startup_std_logging", "level": "warning", "worker-process": "MainProcess"}
event='worker_startup_std_logging' level='warning' worker-process='MainProcess'
{"event": "{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}", "level": "warning", "worker-process": "MainProcess"}
event="{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}" level='warning' worker-process='MainProcess'

这里有多个问题:

  1. worker-process字段有主进程的值,而不是工作进程的值
  2. 记录器的event字段structlog包含的序列化值event_dict包含正确的值worker-process

我希望structlog在移交给loggingQueueHandler 之前评估处理器链。

有人可以解释一下 structlog 在这种情况下应该如何工作吗?

4

0 回答 0