我想要一个可以同时消费任意数量消息的戏剧演员。这些方面的东西:
@dramatiq.actor
def some_actor(*messages):
print(f"GOT {len(messages)} messages")
print(messages)
# publisher A runs
some_actor.send("message1")
# publisher B runs
some_actor.send("message2")
# finally the worker would print this if the timing is correct
GOT 2 MESSAGES
["message1", "message2"]
我正在尝试重写Worker
类以使用具有自_ConsumerThread
定义run
和handle_messages
方法的自定义的想法来改变它们以处理同一参与者的多达 N 条预取消息。相关的变化将在这些线路和这些线路上。
虽然希望这种方法可能会奏效,但它非常hackish,并且还需要在cli上进行更改,可能还需要在actor装饰器上进行更改。
有没有更优雅的方式通过中间件或其他方法来做到这一点?