8

实际设计:

对于那些回到这个问题的人,下面的有用答案将我推向了一个运行良好的可行设计。三个见解是关键:

  1. Eventlet 是一个非常安全的环境——如果两个 greenlet 都尝试recv()或同时尝试send()从同一个套接字,那么 Eventlet 优雅地杀死第二个 greenlet 并出现异常。这很棒,意味着如果amqplib“果岭”不佳,将导致简单的异常,而不是不可能重现的数据交错错误。
  2. 这些amqplib方法大致分为两组:在AMQP 消息组装之前wait()在内部循环,而其他方法返回消息并且不会尝试自己的方法。考虑到作者不知道有人会尝试“绿化”他们的库,这真是太幸运了!这意味着消息发送不仅对 调用的回调是安全的,而且还可以从完全不受循环控制的其他 greenlet 安全地发送消息。这些安全方法——可以从任何greenlet调用,而不仅仅是从回调中调用——是: recv()send()recv()amqplibwait()wait()wait()
    1. basic_ack
    2. basic_consumenowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declarenowait=True
    7. exchange_deletenowait=True
    8. queue_bindnowait=True
    9. queue_unbindnowait=True
    10. queue_declarenowait=True
    11. queue_deletenowait=True
    12. queue_purgenowait=True
  3. 信号量可以用作锁:用计数初始化信号量,1然后用acquire()release()来锁定和解锁。我所有想要编写消息的异步 greenlet 都可以使用这样的锁来避免它们的单独send()调用交错并破坏 AMQP 协议。

所以我的代码大致是这样的:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)

享受!

原始问题:

想象一下,每分钟有数百条 AMQP 消息到达一个小型 Python Eventlet应用程序,每条消息都需要处理和回答——其中处理的 CPU 开销最小,但可能需要等待来自其他服务和套接字的回答。

例如,为了允许一次处理 100 条消息,我当然可以启动 100 个到 RabbitMQ 的单独 TCP 连接,并为每个连接设置一个工作程序,以同步接收、处理和回答单个消息。但是为了节省 TCP 连接,我宁愿只创建一个 AMQP 连接,允许 RabbitMQ 将消息全速流向我的管道,将这些任务交给工作人员,并在每个工作人员完成时发回答案:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+

请注意:

  • Eventlet队列可以优雅地在工作人员之间分配传入的工作,因为他们可以进行更多工作。
  • 来自 RabbitMQ 的流控制甚至可能是可能的:我只能在我的工作人员都忙之前 ACK 消息,然后在发送进一步的 ACK 之前等待,直到队列开始为空。
  • 工作几乎肯定会乱序完成:一个请求可能很快完成,而另一个较早到达的事件需要更长的时间;有些请求可能永远不会完成;因此工作人员将以不可预测的异步顺序返回响应。

在看到这篇关于 AMQP 库可以多么容易地被拉入 Eventlet 处理模型的有吸引力的博客文章后,我一直计划使用 Eventlet 和py-amqplib编写这个:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

我的问题是,在阅读了这两个库的文档、amqplib 源代码和大部分 Eventlet 源代码后,我无法弄清楚如何教授拥有 AMQP 连接的 eventlet——connect_to_host()博客文章中命名的 eventlet——当工作人员完成工作并生成答案时也会醒来。amqplib 中的wait()方法只能通过 AMQP 套接字上的活动来唤醒。虽然感觉我应该能够让工作人员将他们的答案写入队列,并在新的传入消息到达connect_to_host()工作人员准备好发送答案时唤醒 eventlet 但我找不到任何方法让一个小事件说“当任何一个时叫醒我这些事情都会发生。”</p>

我确实想到,工作人员可以尝试控制 AMQP 连接对象——甚至是原始套接字——并通过 TCP 写回他们自己的消息;但似乎有必要使用锁来防止传出的工作消息相互交错或与主侦听器 eventlet 编写的 ACK 消息交错,而且我也找不到在 Eventlet 中可用的锁。

所有这一切让我几乎可以肯定,我正试图以某种方式完全倒退来解决这个问题。像这样的问题——让一个单一的连接在侦听器-调度程序和许多工作人员之间安全地共享——根本不映射到协程模型,并且需要一个成熟的异步库吗?(在这种情况下:你会为这个问题推荐一个,以及如何在传入消息和传出工作人员响应之间进行多路复用?我今天早些时候没有找到干净的解决方案,尝试像 Pika + ioloop 这样的组合——尽管我刚刚看到了另一个库,stormed_amqp,它可能比 Pika 做得更好。)或者如果我想要可以制定这个模型的干净和可维护的代码,我是否真的需要依赖真正的实时 Python 线程?我对所有选择持开放态度。

感谢您的任何帮助或想法!我一直在想,我已经把 Python 中的整个并发问题搞得差不多了,然后我又一次知道我没有。:) 无论如何,我希望你喜欢上面的 ASCII 艺术。

4

1 回答 1

5

在阅读了您的帖子并使用 gevent 与 eventlet 类似的库之后,我明白了一些事情,因为我刚刚解决了一个类似的问题

一般来说,不需要锁定,因为只有一个 eventlet 或 greenlet 同时运行,只要它们都没有阻塞,一切似乎都在同时运行.. 但是你不想在套接字发送数据时另一个greenlet正在发送到。你是对的,确实需要一个锁定。

如果我有这样的问题,看文档是不够的..去看看源代码!无论如何,它是开源的,你从其他人的代码中学到了更多。

这是一些简化的示例代码,可能会为您解决问题。

在您的调度程序中有 2 个队列

self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server

让工作人员将他们的结果放在服务器队列中。

发送和接收代码

def send_into_ampq():
    while True:
       message = dispatcher.get_workger_msg()

       try:
          connection.send(self.encode(message))
       except:
           connection.kill()

def read_from_ampq():
    while True:
        message = connection.wait()

        dispatcher.put_ampq_msg(self.decode(message))

在您的连接代码的发送功能中

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)

def send(self, message):
    """
    you need a write lock to prevent more greenlets
    sending more messages when previous sent is not done yet.
    """

    with self._writelock:
        self.socket.sendall(message)
于 2011-11-02T10:02:50.680 回答