我正在尝试将 structlog 集成到使用multiprocessing
. 虽然我计划structlog
完全切换到,但我也想捕获logging
第三部分库的 stdlib 调用。由于我们计划发出 JSON 日志文件和键值输出,我认为正确的方法是将实际格式委托给logging
:
logging \
structlog processors (worker) -> logging queuehandler -> logging handler (main process) -> stdlib logging output
structlog /
根据https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes我们使用 aQueueHandler
将所有日志消息发送到主进程写作。
这是我目前的配置:
import logging.handlers
import multiprocessing
import sys
import traceback
import structlog
from structlog.processors import JSONRenderer, KeyValueRenderer
def proc_info(logger, method, event_dict):
event_dict['worker-process'] = multiprocessing.current_process().name
return event_dict
pre_chain = [
structlog.stdlib.add_log_level,
proc_info
]
def configure_root_logging():
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"plain": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": KeyValueRenderer(),
"foreign_pre_chain": pre_chain,
},
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": JSONRenderer(),
"foreign_pre_chain": pre_chain,
},
},
"handlers": {
"console_kvp": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "json",
},
"console_json": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "plain",
},
},
"loggers": {
"": {
"handlers": ["console_kvp", "console_json"],
"level": "DEBUG",
"propagate": True,
},
}
})
def configure_structlog():
structlog.configure(
processors=pre_chain + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def setup_worker_logging(logging_queue):
root = logging.getLogger()
queue_handler = logging.handlers.QueueHandler(logging_queue)
root.addHandler(queue_handler)
root.setLevel(logging.INFO)
def worker_entrypoint(logging_queue):
setup_worker_logging(logging_queue)
configure_structlog()
logger = logging.getLogger(__name__)
logger.warning("worker_startup_std_logging")
slogger = structlog.get_logger()
slogger.warning("worker_startup_structlog")
def handle_logging_queue_messages(queue):
while True:
try:
record = queue.get()
logger = logging.getLogger(record.name)
logger.handle(record)
except KeyboardInterrupt:
break
except Exception:
print('LOGGING FAILED!!! ', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def main():
logging_queue = multiprocessing.Queue()
t = multiprocessing.Process(
target=worker_entrypoint, args=[logging_queue], daemon=True,
name='worker-9999')
t.start()
configure_root_logging()
configure_structlog()
handle_logging_queue_messages(logging_queue)
slogger = structlog.get_logger()
slogger.warning("Startup")
main()
注意:我在调试过程中对两种输出格式都使用了 StreamHandler,但 JSON 格式的输出FileHandler
最终会输出,因此 QueueHandler 介于两者之间。
输出是:
{"event": "worker_startup_std_logging", "level": "warning", "worker-process": "MainProcess"}
event='worker_startup_std_logging' level='warning' worker-process='MainProcess'
{"event": "{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}", "level": "warning", "worker-process": "MainProcess"}
event="{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}" level='warning' worker-process='MainProcess'
这里有多个问题:
- 该
worker-process
字段有主进程的值,而不是工作进程的值 - 记录器的
event
字段structlog
包含的序列化值event_dict
包含正确的值worker-process
我希望structlog
在移交给logging
QueueHandler 之前评估处理器链。
有人可以解释一下 structlog 在这种情况下应该如何工作吗?