4

我有两个进程(“发送者”和“接收者”)需要通过瞬态单向 FIFO 通信管道在单台机器上本地进行通信。这是我想要发生的事情(使用更接近 Unix 域套接字的语言):

  • 发送者在已知地址“创建”管道,并立即向下发送消息
  • 在某个时刻(在发送方“创建”管道之前或之后),接收方连接到管道
  • 阅读器从管道中读取消息
  • 发件人“关闭”管道
  • 读者注意到所有消息都已被阅读(可能管道已关闭)

我的问题是:我如何用 ZeroMQ 实现这个?“PUB/SUB”,“推/拉”?在 ZMQ 套接字中检测“数据结束”的机制是什么?是否可以同时允许上述前两项的排序:即发送方或接收方是否首先尝试连接?如果是这样,怎么做?

谢谢。

4

1 回答 1

6

关于 zeromq 的一些知识:

  1. 绑定/连接顺序通常不重要
  2. 当一个对等方应接收每条消息和/或不应丢弃消息时,使用 PUSH/PULL
  3. PUB/SUB 用于所有对等方都应接收消息和/或应丢弃在无人收听时发送的消息。
  4. ZeroMQ 故意从应用程序代码中隐藏连接/断开打开/关闭事件,因此您无法检测到实际的关闭事件。

你需要知道的一件事,你不应该知道:当一个套接字连接时,它会创建一个管道(对等点还不需要存在)。当一个套接字绑定时,它只在对等点连接时创建管道。这些管道控制套接字的 HWM 行为。这意味着没有对等点的连接套接字和没有对等点的绑定套接字的行为是不同的。如果您尝试使用它发送消息,则没有对等点的绑定套接字将阻塞,而连接套接字将愉快地在内存中排队消息,直到对等点到达并开始使用消息。

基于这些点,你想要做的是:

  1. 使用推/拉
  2. 接收者应该绑定
  3. 发送一条特殊的“关闭”消息,指示队列已完成,而不是检测 tcp/ipc 级别的关闭事件。

这是 Python 中的一个工作示例,它使用 IPC 套接字(文件)进行通信,其中接收者在发送者之后的某个时间开始。

双方需要知道的共同信息:

import time

import zmq

# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'

# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'

接收者 (PULL) 绑定,并消耗直到 DONE

def receiver():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PULL)
    s.bind("ipc://%s" % PIPE)
    while True:
        parts = s.recv_multipart()
        cmd = parts[0]
        if cmd == DONE:
            print "[R] received DONE"
            break
        msg = parts[1]
        # handle the message
        print "[R] %.1f consuming %s" % (time.time() - t0, msg)
    s.close()
    ctx.term()
    print "[R] done"

发送方(PUSH)连接并发送,发送 DONE 表示完成

def sender():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUSH)
    s.connect("ipc://%s" % PIPE)

    for i in range(10):
        msg = b'msg %i' % i
        print "[S] %.1f sending %s" % (time.time() - t0, msg)
        s.send_multipart([MSG, msg])
        time.sleep(1)
    print "[S] sending DONE"
    s.send(DONE)
    s.close()
    ctx.term()
    print "[S] done"

和一个演示脚本一起运行它们,发送者首先启动,接收者在发送者已经发送了几条消息之后启动:

from threading import Thread

# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()

# start the sender
s = Thread(target=sender)
s.start()

# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()

# wait for them both to finish
s.join()
r.join()

可以看到这里一起跑。

于 2013-03-02T02:10:12.827 回答