是否可以从 zeromq 中的 PUB 套接字获取订阅者总数?
谢谢!
是的,但不幸的是不是通过任何简单的属性或方法。
您需要使用zmq_socket_monitor()函数将inproc
服务套接字连接到要观察的主套接字。从那里您可以收听有关连接/断开连接的事件并保留自己的订阅者数量。不过,这可能不是一项微不足道的任务,因为(至少对我而言)似乎有点难以知道何时将订阅者(或任何远程连接)视为启动/关闭(关闭/断开/重试等)。你将不得不玩一下。
该链接包括示例和事件描述。
这是代表 NodeJS 的实现,我认为对于 pub 是一样的。
就像 Jakob Möllås 所说,需要使用监视器。
const zmq = require('zmq')
, rep = zmq.socket('rep');
let counter = 0;
rep.bind('tcp://*:5560', function (err) {
if (err) {
console.log(err);
} else {
console.log("Listening on 5560…");
rep.monitor(500, 0);
}
});
// Register to monitoring events
rep.on('connect', function (fd, ep) {
console.log('connect, endpoint:', ep);
});
rep.on('connect_delay', function (fd, ep) {
console.log('connect_delay, endpoint:', ep);
});
rep.on('connect_retry', function (fd, ep) {
console.log('connect_retry, endpoint:', ep);
});
rep.on('listen', function (fd, ep) {
console.log('listen, endpoint:', ep);
});
rep.on('bind_error', function (fd, ep) {
console.log('bind_error, endpoint:', ep);
});
rep.on('accept', function (fd, ep) {
console.log('accept, endpoint:', ep);
counter++;
});
rep.on('accept_error', function (fd, ep) {
console.log('accept_error, endpoint:', ep);
});
rep.on('close', function (fd, ep) {
console.log('close, endpoint:', ep);
});
rep.on('close_error', function (fd, ep) {
console.log('close_error, endpoint:', ep);
});
rep.on('disconnect', function (fd, ep) {
console.log('disconnect, endpoint:', ep);
counter--;
});
// Handle monitor error
rep.on('monitor_error', function(err) {
console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
setTimeout(function() { rep.monitor(500, 0); }, 5000);
});
rep.on('message', function (msg) {
console.log(`recieve: `, JSON.parse(msg));
rep.send(JSON.stringify({ "status": FAIL, "code": 3666 }));
});
安慰
recieve: { method: 'login', login: 'a', password: 'b1' }
accept, endpoint: tcp://0.0.0.0:5560
accept, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
recieve: { method: 'login', login: 'a', password: 'b1' }
disconnect, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
disconnect, endpoint: tcp://0.0.0.0:5560
似乎没有任何直接的方法。下面是监控套接字事件的 Python 代码,可用于维护计数:
import zmq
from zmq.eventloop import ioloop, zmqstream
import zmq.utils.monitor
class Publication:
def start(self, port, host):
context = zmq.Context()
self._socket = context.socket(zmq.PUB)
self._socket.bind("tcp://%s:%d" % (host, port))
self._mon_socket = self._socket.get_monitor_socket(zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED)
self._mon_stream = zmqstream.ZMQStream(self._mon_socket)
self._mon_stream.on_recv(self._on_mon)
def _on_mon(self, msg):
ev = zmq.utils.monitor.parse_monitor_message(msg)
event = ev['event']
endpoint = ev['endpoint']
if event == zmq.EVENT_CONNECTED:
pass
# print(endpoint)
elif event == zmq.EVENT_DISCONNECTED:
pass
#print(endpoint)
一个问题是由于某种原因 CONNECTED 事件没有触发。另一个问题是,即使事件触发,您也只能获得类似于 tcp://ip:port 字符串的端点 ID。因此,对于同一节点上的多个客户端,您将获得相同的端点 ID。
我遇到了一个(测试)场景,我必须等待 n 个订阅者才能开始发布消息。这是对我有用的函数(在 Python 中):
def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
"""
blocks until pub_socket had n_subscribers connected to it
"""
connections = 0
events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED) # only accept this event
while connections < n_subscribers:
recv_monitor_message(events_socket) # this will block until a handshake was successful
connections += 1
解释:
创建PUB
套接字后,我们将一个PAIR
套接字附加到它,它将监视PUB
套接字的事件。
当SUB
套接字连接到PUB
套接字时,它会在 PUB(绑定)端生成两个事件:
EVENT_ACCEPTED (32)
后跟 EVENT_HANDSHAKE_SUCCEEDED (4096)
.
因此,我们将监视EVENT_HANDSHAKE_SUCCEEDED
作为成功订阅连接的指标。一旦连接了指定的订阅者,函数就会返回。
这是一个完整的玩具示例:
import threading
import time
import zmq
from zmq.utils.monitor import recv_monitor_message # requires libzmq >= 4.0
ep = "ipc:///tmp/test-socket"
def print_events_map():
"auxilliary function to print all zmq socket events"
print("Event names:")
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
print("%21s : %4i" % (name, value))
context = zmq.Context()
def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
"""
blocks until pub_socket had n_subscribers connected to it
"""
connections = 0
events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED) # only accept this event
while connections < n_subscribers:
recv_monitor_message(events_socket) # this will block until a handshake was successful
connections += 1
def simulate_sender(wait, n):
s_pub = context.socket(zmq.PUB)
s_pub.bind(ep)
if wait:
wait_for_n_subscribers(s_pub, n)
for i in range(5):
s_pub.send_pyobj(i)
time.sleep(1)
subscribers = 2
s_sub_1 = context.socket(zmq.SUB)
s_sub_1.setsockopt(zmq.RCVTIMEO, 3000) # wait at most 3 seconds
s_sub_1.subscribe("")
s_sub_2 = context.socket(zmq.SUB)
s_sub_2.subscribe("")
wait = True # set to false if publisher should not wait
threading.Thread(target=simulate_sender, args=(wait, subscribers,)).start()
time.sleep(1)
print("connecting 1")
s_sub_1.connect(ep)
print("connecting 2")
s_sub_2.connect(ep)
while True:
try:
print("received %s" % s_sub_1.recv_pyobj())
except zmq.error.Again:
print("no incoming msgs for 3 seconds")
break
笔记:
wait
为 False 将导致订阅者错过第一个发布的消息,因为订阅者在连接之前有 1 秒的延迟,并且发布者不会等待(等待订阅者连接)。