3

给定 2 个应用程序,其中应用程序 A 使用发布者客户端有争议地将数据流式传输到应用程序 B,应用程序 B 有一个子服务器套接字来接受该数据,我们如何在应用程序 A 中配置 pub 客户端套接字,以便当 B 不可用时(就像它正在重新部署,重新启动)A 缓冲所有未决消息,当 B 变为可用时,缓冲消息进入低谷并且套接字赶上实时流?

简而言之,当 SUB SERVER 不可用时,我们如何使 PUB CLIENT 套接字缓冲消息具有一定的限制?

PUB 客户端的默认行为是进入静音状态,但如果我们可以将其更改为限制大小的缓冲区,那就太好了,zmq 可以吗?还是我需要在应用程序级别进行...

我已经尝试在我的套接字中设置 HWM 和 LINGER,但如果我没记错的话,他们只负责慢消费者情况,我的发布者连接到订阅者,但订阅者太慢以至于发布者开始缓冲消息(hwm 将限制这些消息的数量)...

我正在使用jeromq,因为我的目标是 jvm 平台。

4

3 回答 3

3

我发布了一个快速更新,因为其他两个答案(尽管信息量很大实际上是错误的),我不希望其他人从我接受的答案中被误导。不仅你可以用 zmq 做到这一点,它实际上是默认行为

诀窍是,如果您的发布者客户端在不断丢弃消息之前从未连接到订阅者服务器(这就是我认为它不缓冲消息的原因),但是如果您的发布者连接到订阅者并且您重新启动订阅者,则发布者将缓冲消息直到达到 HWM,这正是我所要求的......所以简而言之,发布者想知道另一端有人在接受消息之后才会缓冲消息......

这是一些示例代码,演示了这一点(您可能需要做一些基本的编辑来编译它)。

我只使用了这个依赖org.zeromq:jeromq:0.5.1

zmq-publisher.kt

fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.PUB)

   socket.hwm = 10000
   socket.linger = 0
   "connecting to $uri".log()
   socket.connect(uri)

   fun publish(path: String, msg: Msg) {
      ">> $path | ${msg.json()}".log()
      socket.sendMore(path)
      socket.send(msg.toByteArray())
   }

   var count = 0

   while (notInterrupted()) {
      val msg = telegramMessage("message : ${++count}")
      publish("/some/feed", msg)
      println()

      sleepInterruptible(1.second)
   }
}

而且当然zmq-subscriber.kt


fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.SUB)

   socket.hwm = 10000
   socket.receiveTimeOut = 250

   "connecting to $uri".log()
   socket.bind(uri)

   socket.subscribe("/some/feed")

   while (true) {
      val path = socket.recvStr() ?: continue
      val bytes = socket.recv()
      val msg = Msg.parseFrom(bytes)
      "<< $path | ${msg.json()}".log()
   }
}

尝试在没有订阅者的情况下首先运行发布者,然后当您启动订阅者时,您错过了到目前为止的所有消息......现在无需重新启动发布者,停止订阅者等待一段时间并重新启动它。

这是我的一项服务实际上从中受益的示例...这是结构[current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]

因为我在中间重新启动了服务,所以所有发布者都开始缓冲他们的消息,观察到每秒约 200 条消息的最终服务观察到下降到 0(那些 1 或 2 是心跳)然后突然爆发 1000 多条消息,因为所有发布者都刷新了他们的缓冲区(重新启动大约需要 5 秒)......我实际上并没有在这里丢失一条消息......

在此处输入图像描述

请注意,您必须有subscriber:server <= publisher:client一对(这样发布者知道“我只需要将这些消息传递到一个地方”(您可以尝试绑定发布者并连接订阅者,但您不会再看到发布者缓冲消息,因为如果刚刚断开连接的订阅者这样做是因为它不再需要数据或因为它失败了)

于 2019-11-03T07:46:33.573 回答
3

首先,欢迎来到零之禅的世界,延迟最重要

序言:

ZeroMQ 是由 Pieter HINTJENS 的最终经验丰富的大师团队设计的——Martin SUSTRIK 被命名为第一名。该设计经过专业设计,以避免任何不必要的延迟。那么询问是否具有(有限的)持久性?不,先生,未确认 - PUB/SUBScalable Formal Communication Pattern Archetype 不会内置它,因为增加了问题以及降低了性能和可扩展性(附加延迟、附加处理、附加内存管理)。

如果需要(有限的)持久性(对于缺少远程 SUB 端代理的连接),请随时在应用程序端实现它,或者可以设计和实现一种新的 ZMTP 兼容的此类行为模式Archetype,扩展 ZeroMQ 框架,如果这样的工作进入稳定和公众接受的状态,但不要求高性能、延迟削减的标准PUB/SUB已经完善了几乎线性的可扩展性 ad astra,在这个方向上进行修改。这绝对不是一条路。

解决方案 ?

应用程序端可以轻松实现您添加的逻辑,使用双指针循环缓冲区,在某种(应用程序端管理的) - Persistence-PROXY中工作,但在PUB-sender 的前面。

您的设计可能会成功地从 ZeroMQ 内部细节中榨取一些额外的调味汁,以防您的设计也喜欢使用最近提供的内置 ZeroMQ--组件socket_monitor设置额外的控制层并在那里接收事件来自 PUB 端实例的“内部” ,其中一些额外的网络和连接管理相关事件可能会为您的(应用端管理的)Context带来更多亮点- Persistence-PROXY

然而,请注意

_zmq_socket_monitor()_方法仅支持面向连接的传输,即 TCP、IPC 和 TIPC。

所以人们可能会直接忘记这一点,以防计划使用任何最终有趣的传输类{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


小心 !

我们的社区尊敬的成员smac89提供了不准确的信息(如果没有错的话),他已尽力解决您在评论中表达的额外兴趣:

“...zmq 优化主题发布?例如,如果您继续快速发布大约 100char 长topic它实际上是topic每次发送还是映射到某个 int 并随后发送 int...?”

告诉你:

“它总是会发布topic.当我使用该pub-sub模式时,我通常发布topic第一条然后是实际消息,所以在订阅者中我只是读取第一帧并忽略它​​然后读取实际消息”

ZeroMQ 不能以这种方式工作。没有什么作为“单独”<topic>后跟 a <message-body>,而是相反

主题过滤TOPIC机械化以非常不同的方式工作。

1)你永远不知道,谁.connect()-s:
即几乎可以肯定版本 2.x 直到版本 4.2+ 将以不同的方式处理主题过滤( ZMTP:RFC 定义了初始能力版本握手,让Context-instance决定,必须使用哪个版本的主题过滤:
ver 2.x用于将所有消息移动到所有对等点,并让所有 SUB 端(ver 2.x+)传递消息(并SUB- side Context-instance 处理本地topic-list 过滤处理

,而
4.2+ 版topic肯定会在 ** PUB 端执行-list 过滤处理Context-instance (CPU 使用率增长,网络传输相反),因此您的 SUB 端将永远不会收到一个字节“无用”读取“未订阅”消息。

2)(你可以,但是)没有必要将“主题”分离成这样隐含的多帧消息的第一帧。也许恰恰相反(在高性能、低延迟的分布式系统设计中这样做是一种相当反模式。

主题过滤过程被定义并按字节方式工作,从左到右,将每个主题列表成员值与传递的消息有效负载进行模式匹配。

添加额外的数据、额外的帧管理处理只会增加端到端延迟和处理开销。这样做不是一个好主意,而不是适当的设计工作。


结语:

设计没有容易的胜利,也没有任何容易实现的成果,或超低延迟是设计目标。

另一方面,确保 ZeroMQ 框架是考虑到这一点的,并且这些努力得到了稳定、最终性能良好且平衡的工具集,用于智能(按设计)、快速(运行中)和可扩展(如地狱可能羡慕)由于这种设计智慧,人们喜欢正确使用的信号/消息服务。

希望您对 ZeroMQ 感到满意,并随时在您选择的应用程序套件中的 ZeroMQ 层“前面”添加任何额外的功能集。

于 2019-10-18T12:48:54.757 回答
2

正如我们在评论中讨论的那样,发布者无法在没有任何连接的情况下缓冲消息,它只会丢弃任何新消息:

从文档:

如果发布者没有连接的订阅者,那么它将简单地丢弃所有消息。

这意味着您的缓冲区需要不在 zeromq 的关注范围内。然后,您的缓冲区可以是列表、数据库或您选择的任何其他存储方法,但您不能使用您的发布者来执行此操作。

现在下一个问题是处理如何检测订户已连接/断开连接。这需要告诉我们何时需要开始从缓冲区读取/填充缓冲区。

我建议使用Socket.monitor并监听ZMQ_EVENT_CONNECTEDand ZMQ_EVENT_DISCONNECTED,因为它们会告诉您客户端何时连接/断开连接,从而使您能够切换到填充您选择的缓冲区。当然,可能还有其他不直接涉及 zeromq 的方法,但这由您决定。

于 2019-10-18T05:33:52.837 回答