20

我这周开始使用 ZeroMQ,当使用请求响应模式时,我不确定如何让工作人员安全地“挂断”并关闭他的套接字,而不会丢失消息并导致发送该消息的客户永远不会收到回复。想象一个用 Python 编写的工人看起来像这样:

import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
    s.recv()
    s.send('reply')
s.close()

我一直在做实验,发现提出公平排队请求127.0.0.1:9999的套接字类型的客户可能会不幸让公平排队算法在工人完成最后一次但在它运行之前立即zmq.REQ选择上述工人send()下面的close()方法。在这种情况下,似乎请求被工作进程中的ØMQ堆栈接收和缓冲,然后当close()抛出与套接字相关的所有内容时,请求就会丢失。

工作人员如何“安全地”分离——有什么方法可以发出“我不再需要消息”的信号,然后(a)循环在信号传输期间到达的任何最终消息,(b)生成它们的回复,然后(c)close()在保证没有消息被丢弃的情况下执行?

编辑:我想我想进入的原始状态是“半关闭”状态,在这种状态下无法接收到进一步的请求——并且发件人会知道——但返回路径仍然打开,以便我可以检查我的最后一条消息的传入缓冲区,如果缓冲区中有一条消息,则对其进行响应。

编辑:为了回答一个好问题,更正了描述以使等待消息的数量成为复数,因为可能有许多连接在等待回复。

4

6 回答 6

11

您似乎认为您正试图避免“简单”的竞争条件,例如

... = zmq_recv(fd);
do_something();
zmq_send(fd, answer);
/* Let's hope a new request does not arrive just now, please close it quickly! */
zmq_close(fd);

但我认为问题在于公平排队(循环)使事情变得更加困难:您的工作人员甚至可能已经有几个排队的请求。如果轮到它接收一个新请求,发件人不会等待您的工作人员空闲后再发送一个新请求,因此在您调用zmq_send其他请求时可能已经在等待。

事实上,看起来您可能选择了错误的数据方向。与其让请求池向您的工作人员发送请求(即使您不想接收新请求),您可能希望让您的工作人员从请求队列中获取新请求,处理它,然后发送答案。

当然,这意味着使用XREP/ XREQ,但我认为这是值得的。

编辑:我写了一些代码来实现另一个方向来解释我的意思。

于 2010-12-07T20:36:35.190 回答
3

我认为问题在于您的消息传递架构是错误的。您的工作人员应该使用 REQ 套接字发送工作请求,这样只有一个工作在工作人员排队。然后为了确认工作的完成,您可以使用另一个 REQ 请求,该请求兼作前一个作业的 ack 并请求新的作业,或者您可以有第二个控制套接字。

有些人使用 PUB/SUB 作为控件来执行此操作,以便每个工作人员都发布确认信息,而主服务器订阅它们。

您必须记住,使用 ZeroMQ 时有 0 个消息队列。一个都没有!只是消息在发送方或接收方中缓冲,具体取决于高水位标记和套接字类型等设置。如果您确实需要消息队列,那么您需要编写一个代理应用程序来处理它,或者只是切换到所有通信都通过第 3 方代理进行的 AMQP。

于 2011-07-30T01:08:07.370 回答
2

我也一直在思考这个问题。您可能希望实现一条 CLOSE 消息,通知客户工人即将离开。然后,您可以让工作人员在关闭之前耗尽一段时间。当然,这并不理想,但可能是可行的。

于 2010-10-28T16:48:06.227 回答
0

在尽可能快地向工作人员发送请求和在工作崩溃或死亡的情况下获得可靠性之间存在利益冲突。ZeroMQ 指南有一整节解释了这个可靠性问题的不同答案。读一读,会有很大帮助。

tl; dr 工作人员可能/将会崩溃,并且客户端需要重新发送功能。该指南以多种语言为此提供了可重用的代码。

于 2012-01-23T03:47:36.157 回答
-1

最简单的解决方案不是让客户在等待回复时超时,然后在没有收到回复时重试吗?

于 2010-11-22T02:26:38.220 回答
-3

尝试在通话结束前睡觉。这在 2.1 中已修复,但在 2.0 中尚未修复。

于 2010-10-01T18:16:22.903 回答