关于 zeromq 的一些知识:
- 绑定/连接顺序通常不重要
- 当一个对等方应接收每条消息和/或不应丢弃消息时,使用 PUSH/PULL
- PUB/SUB 用于所有对等方都应接收消息和/或应丢弃在无人收听时发送的消息。
- ZeroMQ 故意从应用程序代码中隐藏连接/断开打开/关闭事件,因此您无法检测到实际的关闭事件。
你需要知道的一件事,你不应该知道:当一个套接字连接时,它会创建一个管道(对等点还不需要存在)。当一个套接字绑定时,它只在对等点连接时创建管道。这些管道控制套接字的 HWM 行为。这意味着没有对等点的连接套接字和没有对等点的绑定套接字的行为是不同的。如果您尝试使用它发送消息,则没有对等点的绑定套接字将阻塞,而连接套接字将愉快地在内存中排队消息,直到对等点到达并开始使用消息。
基于这些点,你想要做的是:
- 使用推/拉
- 接收者应该绑定
- 发送一条特殊的“关闭”消息,指示队列已完成,而不是检测 tcp/ipc 级别的关闭事件。
这是 Python 中的一个工作示例,它使用 IPC 套接字(文件)进行通信,其中接收者在发送者之后的某个时间开始。
双方需要知道的共同信息:
import time
import zmq
# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'
# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'
接收者 (PULL) 绑定,并消耗直到 DONE
def receiver():
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind("ipc://%s" % PIPE)
while True:
parts = s.recv_multipart()
cmd = parts[0]
if cmd == DONE:
print "[R] received DONE"
break
msg = parts[1]
# handle the message
print "[R] %.1f consuming %s" % (time.time() - t0, msg)
s.close()
ctx.term()
print "[R] done"
发送方(PUSH)连接并发送,发送 DONE 表示完成
def sender():
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect("ipc://%s" % PIPE)
for i in range(10):
msg = b'msg %i' % i
print "[S] %.1f sending %s" % (time.time() - t0, msg)
s.send_multipart([MSG, msg])
time.sleep(1)
print "[S] sending DONE"
s.send(DONE)
s.close()
ctx.term()
print "[S] done"
和一个演示脚本一起运行它们,发送者首先启动,接收者在发送者已经发送了几条消息之后启动:
from threading import Thread
# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()
# start the sender
s = Thread(target=sender)
s.start()
# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()
# wait for them both to finish
s.join()
r.join()
可以看到这里一起跑。