你所有的代码对我来说都是正确的。如果您想检查每个参与者使用的队列,您可以actor_inbox
在从Actor#start
.
我在继承时遇到了类似的问题,EventletActor
以便测试我使用 anEventletActor
和使用 a尝试了相同的代码ThreadingActor
。据我从源代码中可以看出,他们都在使用它们eventlet
来工作。对ThreadingActor
我来说效果很好,但EventletActor
不适用于ActorRef#tell
,它确实适用ActorRef#ask
。
我从同一目录中的两个文件开始,如下所示。
my_actors.py
:初始化两个actor,它们将通过打印以类名开头的消息内容来响应消息。
from pykka.eventlet import EventletActor
import pykka
class MyThreadingActor(pykka.ThreadingActor):
def __init__(self):
super(MyThreadingActor, self).__init__()
def on_receive(self, message):
print(
"MyThreadingActor Received: {message}".format(
message=message)
)
class MyEventletActor(EventletActor):
def __init__(self):
super(MyEventletActor, self).__init__()
def on_receive(self, message):
print(
"MyEventletActor Received: {message}".format(
message=message)
)
my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()
my_queue.py
:在 pika 中设置一个队列,向队列发送一条消息,该消息转发给之前设置的两个 Actor。在每个参与者都被告知消息后,他们当前的参与者收件箱会检查队列中的任何内容。
from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika
def on_message(channel, method_frame, header_frame, body):
print "Received Message", body
my_threading_actor_ref.tell({"msg": body})
my_eventlet_actor_ref.tell({"msg": body})
print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
queue_name = 'test'
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# It is very important to stop these actors, otherwise you may lockup
my_threading_actor_ref.stop()
my_eventlet_actor_ref.stop()
connection.close()
当我运行my_queue.py
输出如下:
收到消息一条消息
ThreadingActor 收件箱<Queue.Queue instance at 0x10bf55878>
MyThreadingActor 收到:{'msg': 'A Message'}
EventletActor 收件箱<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>
当我点击CTRL+C
停止队列时,我注意到EventletActor
finally 收到消息并打印它:
^C
MyEventletActor 收到:{'msg': 'A Message'}
所有这一切让我相信可能存在错误EventletActor
,我认为您的代码很好,并且存在一个错误,我在第一次检查时无法在代码中找到。
我希望这个信息帮助。