1

如何从自定义 manage.py 命令向 Django 消费者发送消息

from django.core.management.base import BaseCommand, CommandError
from channels import Channel

class Command(BaseCommand):
   help = 'Sends a message to a Django channel from the thing'

   def add_arguments(self, parser):
       parser.add_argument('json_object', nargs='+', type=str)

   def handle(self, *args, **options):
       self.stdout.write("TEST !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")


       print Channel("test").channel_layer
       Channel("test").send({'op':options['json_object'][0]})

这是我的消费者

class MyConsumer(WebsocketConsumer):

   @classmethod
   def channel_names(self):
      return {"test"}

   def connection_groups(self):
      return ["test"]

   def dispatch(self, message, **kwargs):
      return self.get_handler(message, **kwargs)(message, **kwargs)

  def get_handler(self, message, **kwargs):
          channel_routing = [
    consumers.MyConsumer.as_route(path=r"^/test/"),
    route("test.receive", consumers.chat_join),
]         for _filter, value in kwargs.items():
        filter_mapping = getattr(self, _filter + '_mapping', None)
        if not filter_mapping:
            continue

        consumer = getattr(self, filter_mapping.get(value), None)
        if consumer:
            return consumer
     raise ValueError('Message')


  def connect(self,message):
    self.message.reply_channel.send({"accept": True})

  def receive(self,text=None, bytes= None):
    print text

  def disconnect(self,message):
    pass

但是,当我尝试运行该命令时,我收到此消息

2017-03-08 03:45:33,839 - 错误 - 工人 - 在测试中找不到匹配的消息!检查你的路由。

如果它是相关的,这是我的路由

channel_routing = [
    consumers.MyConsumer.as_route(path=r"^/test/"),
]         
4

1 回答 1

1

简而言之,添加pathcontent您要发送的:

Channel("test").send({
    'op':options['json_object'][0],
    'path': '/test/',
})

就是这样!

我遇到了同样的问题,我发现这是因为我使用as_route通用消费者的方法来生成route_class,它总是path作为它的过滤器。

如果我们route改用,我们不一定提供path参数,这就是文档中的代码(https://channels.readthedocs.io/en/stable/getting-started.html#models)有效的原因

于 2017-07-23T21:14:55.197 回答