36

我一直试图弄清楚在使用 pika 时应该使用哪种连接形式,据我所知,我有两种选择。

无论是BlockingConnection还是SelectConnection,但我不太确定这两者之间的区别(即 BlockingConnection 阻塞是什么?等等)

的文档pika说这SelectConnection是连接到 rabbit 的首选方式,因为它提供了“多种事件通知方法,包括 select、epoll、kqueue 和 poll。”

所以我想知道这两种不同类型的连接有什么含义?

PS:我知道我不应该在标题中添加标签,但在这种情况下,我认为它确实有助于澄清问题。

4

2 回答 2

28

如果您的应用程序架构可以从异步设计中受益,则 SelectConnection 很有用,例如在 RabbitMQ IO 完成时执行其他操作(例如切换到其他一些 IO 等)。这种类型的连接使用回调来指示函数何时返回。例如,您可以声明回调

on_connected、on_channel_open、on_exchange_declared、on_queue_declared 等。

...在触发这些事件时执行操作。

如果您的 RabbitMQ 服务器(或与该服务器的连接)速度较慢或过载,则该好处特别好。

手头上的 BlockingConnection 就是这样 - 它阻塞直到被调用的函数返回。因此它将阻塞执行线程,直到连接或 channel_open 或 exchange_declared 或 queue_declared 返回例如。也就是说,编写这种序列化逻辑通常比异步 SelectConnection 逻辑更简单。对于具有响应式 RabbitMQ 服务器的简单应用程序,这些在 IMO 上也可以正常工作。

我想您已经阅读了 Pika 文档http://pika.readthedocs.io/en/stable/intro.html,如果没有,那么在您使用 Pika 之前,这是绝对重要的信息!

干杯!

于 2012-08-16T14:19:11.013 回答
4

Pika 文档非常清楚连接类型之间的差异。主要区别在于pika.adapters.blocking_connection.BlockingConnection()适配器用于非异步编程,而pika.adapters.select_connection.SelectConnection()适配器用于异步编程。

如果您不知道非异步/同步和异步编程之间的区别是什么,我建议您阅读这个问题或这篇文章以获得更深入的技术解释。

现在让我们深入了解不同的 Pika 适配器,看看它们做了什么,出于示例的目的,我想我们使用 Pika 来建立与RabbitMQ作为 AMQP 消息代理的客户端连接。

阻塞连接()

在以下示例中,使用用户名guest和密码guest以及虚拟主机“/”连接到 RabbitMQ,侦听localhost上的端口5672 。连接后,将打开一个通道,并使用test_routing_key路由键将消息发布到test_exchange交换器。传入的 BasicProperties 值将消息设置为传递模式 1(非持久),内容类型为text/plain。消息发布后,连接将关闭:

import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.BlockingConnection(parameters)

channel = connection.channel()

channel.basic_publish('test_exchange',
                      'test_routing_key',
                      'message body value',
                      pika.BasicProperties(content_type='text/plain',
                                           delivery_mode=1))

connection.close()

选择连接()

相比之下,使用这个连接适配器更复杂,更不Python,但是当与其他异步服务一起使用时,它可以有巨大的性能提升。在以下代码示例中,使用的所有参数和值与上一个示例中使用的相同:

import pika

# Step #3
def on_open(connection):

    connection.channel(on_open_callback=on_channel_open)

# Step #4
def on_channel_open(channel):

    channel.basic_publish('test_exchange',
                            'test_routing_key',
                            'message body value',
                            pika.BasicProperties(content_type='text/plain',
                                                 delivery_mode=1))

    connection.close()

# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:

    # Step #2 - Block on the IOLoop
    connection.ioloop.start()

# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:

    # Gracefully close the connection
    connection.close()

    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
    connection.ioloop.start()

结论

对于那些进行简单、非异步/同步编程的人来说,BlockingConnection()适配器被证明是启动和运行 Pika 以发布消息的最简单方法。但是,如果您正在寻找一种实现异步消息处理的方法,那么SelectConnection()处理程序是您更好的选择。

快乐编码!

于 2020-05-03T10:47:10.130 回答