实际设计:
对于那些回到这个问题的人,下面的有用答案将我推向了一个运行良好的可行设计。三个见解是关键:
- Eventlet 是一个非常安全的环境——如果两个 greenlet 都尝试
recv()
或同时尝试send()
从同一个套接字,那么 Eventlet 优雅地杀死第二个 greenlet 并出现异常。这很棒,意味着如果amqplib
“果岭”不佳,将导致简单的异常,而不是不可能重现的数据交错错误。 - 这些
amqplib
方法大致分为两组:在AMQP 消息组装之前wait()
在内部循环,而其他方法返回消息并且不会尝试自己的方法。考虑到作者不知道有人会尝试“绿化”他们的库,这真是太幸运了!这意味着消息发送不仅对 调用的回调是安全的,而且还可以从完全不受循环控制的其他 greenlet 安全地发送消息。这些安全方法——可以从任何greenlet调用,而不仅仅是从回调中调用——是:recv()
send()
recv()
amqplib
wait()
wait()
wait()
basic_ack
basic_consume
和nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
和nowait=True
exchange_delete
和nowait=True
queue_bind
和nowait=True
queue_unbind
和nowait=True
queue_declare
和nowait=True
queue_delete
和nowait=True
queue_purge
和nowait=True
- 信号量可以用作锁:用计数初始化信号量,
1
然后用acquire()
和release()
来锁定和解锁。我所有想要编写消息的异步 greenlet 都可以使用这样的锁来避免它们的单独send()
调用交错并破坏 AMQP 协议。
所以我的代码大致是这样的:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
享受!
原始问题:
想象一下,每分钟有数百条 AMQP 消息到达一个小型 Python Eventlet应用程序,每条消息都需要处理和回答——其中处理的 CPU 开销最小,但可能需要等待来自其他服务和套接字的回答。
例如,为了允许一次处理 100 条消息,我当然可以启动 100 个到 RabbitMQ 的单独 TCP 连接,并为每个连接设置一个工作程序,以同步接收、处理和回答单个消息。但是为了节省 TCP 连接,我宁愿只创建一个 AMQP 连接,允许 RabbitMQ 将消息全速流向我的管道,将这些任务交给工作人员,并在每个工作人员完成时发回答案:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
请注意:
- Eventlet队列可以优雅地在工作人员之间分配传入的工作,因为他们可以进行更多工作。
- 来自 RabbitMQ 的流控制甚至可能是可能的:我只能在我的工作人员都忙之前 ACK 消息,然后在发送进一步的 ACK 之前等待,直到队列开始为空。
- 工作几乎肯定会乱序完成:一个请求可能很快完成,而另一个较早到达的事件需要更长的时间;有些请求可能永远不会完成;因此工作人员将以不可预测的异步顺序返回响应。
在看到这篇关于 AMQP 库可以多么容易地被拉入 Eventlet 处理模型的有吸引力的博客文章后,我一直计划使用 Eventlet 和py-amqplib编写这个:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
我的问题是,在阅读了这两个库的文档、amqplib 源代码和大部分 Eventlet 源代码后,我无法弄清楚如何教授拥有 AMQP 连接的 eventlet——connect_to_host()
博客文章中命名的 eventlet——当工作人员完成工作并生成答案时也会醒来。amqplib 中的wait()
方法只能通过 AMQP 套接字上的活动来唤醒。虽然感觉我应该能够让工作人员将他们的答案写入队列,并在新的传入消息到达或connect_to_host()
工作人员准备好发送答案时唤醒 eventlet ,但我找不到任何方法让一个小事件说“当任何一个时叫醒我这些事情都会发生。”</p>
我确实想到,工作人员可以尝试控制 AMQP 连接对象——甚至是原始套接字——并通过 TCP 写回他们自己的消息;但似乎有必要使用锁来防止传出的工作消息相互交错或与主侦听器 eventlet 编写的 ACK 消息交错,而且我也找不到在 Eventlet 中可用的锁。
所有这一切让我几乎可以肯定,我正试图以某种方式完全倒退来解决这个问题。像这样的问题——让一个单一的连接在侦听器-调度程序和许多工作人员之间安全地共享——根本不映射到协程模型,并且需要一个成熟的异步库吗?(在这种情况下:你会为这个问题推荐一个,以及如何在传入消息和传出工作人员响应之间进行多路复用?我今天早些时候没有找到干净的解决方案,尝试像 Pika + ioloop 这样的组合——尽管我刚刚看到了另一个库,stormed_amqp,它可能比 Pika 做得更好。)或者如果我想要可以制定这个模型的干净和可维护的代码,我是否真的需要依赖真正的实时 Python 线程?我对所有选择持开放态度。
感谢您的任何帮助或想法!我一直在想,我已经把 Python 中的整个并发问题搞得差不多了,然后我又一次知道我没有。:) 无论如何,我希望你喜欢上面的 ASCII 艺术。