3

在 Django 中使用 Kombu 的正确方法是什么?这是我的工作代码,位于app_name/tasks.py

import json

from kombu import Connection, Producer, Exchange
from django.conf import settings
from celery import shared_task


@shared_task
def update_metadata(message, *args, **kwargs):
    print(message)
    message_dict = json.loads(message)
    # process the message
    # convert it into consumable format?
    # update the local file
    publish_metadata(message_dict)


def publish_metadata(message):
    """Puts the metadata in the outbound queues
    """
    connection = Connection(settings.BROKER_URL)
    connection.connect()
    exchange = Exchange(name=settings.OUTBOUND_EXCHANGE, type='fanout')
    producer = Producer(channel=connection, serializer='json',
                        exchange=exchange)
    producer.publish(body=message, serializer='json')

但是每次它想要发布消息时,它都会与交换建立新的连接。另外,我想在publish_metadata. 我想将消息推送到队列,而消费者不是 Celery。所以我不能使用 Celery,因为 Celery 给 body 添加了很多参数和东西。

4

0 回答 0