在 Django 中使用 Kombu 的正确方法是什么?这是我的工作代码,位于app_name/tasks.py
:
import json
from kombu import Connection, Producer, Exchange
from django.conf import settings
from celery import shared_task
@shared_task
def update_metadata(message, *args, **kwargs):
print(message)
message_dict = json.loads(message)
# process the message
# convert it into consumable format?
# update the local file
publish_metadata(message_dict)
def publish_metadata(message):
"""Puts the metadata in the outbound queues
"""
connection = Connection(settings.BROKER_URL)
connection.connect()
exchange = Exchange(name=settings.OUTBOUND_EXCHANGE, type='fanout')
producer = Producer(channel=connection, serializer='json',
exchange=exchange)
producer.publish(body=message, serializer='json')
但是每次它想要发布消息时,它都会与交换建立新的连接。另外,我想在publish_metadata
. 我想将消息推送到队列,而消费者不是 Celery。所以我不能使用 Celery,因为 Celery 给 body 添加了很多参数和东西。