4

所以我有一个预定的芹菜节拍任务(celery.py):

@app.on_after_configure.connect 
def setup_periodic_tasks(sender,
**kwargs):
    sender.add_periodic_task(10.0, test_event, name='test')

以及任务(events/tasks.py):

@shared_task
def test_event():
    from .models import Event
    Event.objects.create()

创建事件时,会触发接收器,该接收器应向通道组(events/receivers.py)发送消息:

@receiver(post_save, sender=Event)
def event_post_add(sender, instance, created, *args, **kwargs):
    if created:
        print("receiver fired")
        Group("test").send({
            "text": json.dumps({
                'type': 'test',
            })
        })

主要问题是接收器在 celery beat 过程中被触发,并且没有通过 django 通道发送任何内容。没有错误消息,什么都没有,它根本没有被发送。

如何整合这两者,以便能够从 celery 后台进程向通道发送消息?

4

2 回答 2

1

嗨,我不知道您是否找到了解决方案。但是由于我自己被困在这个问题上,所以我尝试了解决方法。我为需要通过 websocket 发送的消息创建了一个视图,并从 celery 向它发出请求击败视图:

def send_message(request,uuid,name):
print('lamo')
ty = f"uuid_{uuid}"
data={
    'message':f'{name} Driver is now Avaliable',
    'sender':'HQ',
    'id':str(uuid),
    'to':str(uuid),
    'type':'DriverAvailable',
}
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
    ty,
    {'type':'chat_message',
    'message':data,
    }
)

和任务:

def my_task():
list=[]
for d in Driver_api.objects.all():
    if d.available_on !=None:
        if d.available_on <= timezone.now():
            d.available_on = None
            d.save()
        uuid = str(d.user.uuid)
        requests.get(f'{DOMAIN}message/sendMessage/{uuid}/{d.name}')
    logger.info('Adding {0}'.format(d.user.uuid))

return list

对于我解决问题的方法中的任何疏忽或忽视,我深表歉意。

于 2021-01-22T03:52:03.303 回答
0

信号在 Django 中实际上并不是异步的。所以在:

@shared_task
def test_event():
    from .models import Event
    Event.objects.create() # This will fire a signal and the signal will 
                           # still be interpreted by celery

此问题在以下链接中详细描述: https ://githubmemory.com/repo/CJWorkbench/channels_rabbitmq/issues/37

我已经检查了 redis_channels 的重新连接和次优性能的声明(如链接中所述),但我找不到它正在发生。

这是我正在工作的代码,Celery 正在向 django 频道发送消息。

class DjangoView(APIView):
    def get(request):
        send_message_to_channels.delay()

@shared_task
def send_message_to_channels():
    send_test_message("Hello from celery", 200)
def send_test_message(message: str, code: int):
    channel_layer = get_channel_layer()
    channel_name = "celery-channels-test"
    async_to_sync(channel_layer.group_send)(channel_name, {
        'type': 'consumer.test.message',
        'message': message,
        'code': code
    })

使我的代码工作的先决条件

  • 安装 channels_redis (其他包只是失败)
  • 在 django settings.py 中定义 channel_layer 指向 redis 数据库

如果这不起作用,我想简单但非常讨厌的方法是向 django 中的视图发出请求,如@cosmo_boi 答案

于 2021-12-02T10:50:59.470 回答