14

谁必须管理 ZeroMQ 中的持久化?

当我们在 Python 语言中使用 ZeroMQ 客户端时,有哪些插件/模块可用于管理持久化?

我想知道使用 ZeroMQ 的模式。

4

3 回答 3

9

据我所知,Zeromq 没有任何持久性。它超出了它的范围,需要由最终用户处理。就像序列化消息一样。在 C# 中,我使用 db4o 来添加持久性。通常我将对象保持在其原始状态,然后将其序列化并将其发送到 ZMQ 套接字。顺便说一句,这是针对 PUB/SUB 对的。

于 2010-12-07T07:49:46.777 回答
3

在应用程序端,您可以相应地持久化,例如,我在 node.js 中构建了一个持久层,它与后端 php 调用和通过 websockets 通信。

持久性方面将消息保存一段时间(http://en.wikipedia.org/wiki/Time_to_live),这是为了给客户端一个连接的机会。我使用了内存数据结构,但我玩弄了使用 redis 来获得磁盘持久性的想法。

于 2012-03-06T22:46:53.680 回答
2

我们需要在处理从订阅者收到的消息之前对其进行持久化。消息在单独的线程中接收并存储在磁盘上,而持久消息队列在主线程中操作。

该模块位于:https ://pypi.org/project/persizmq 。从文档中:

import pathlib

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(
    callback=storage.add_message, subscriber=subscriber, 
    on_exception=on_exception):

    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()
于 2018-05-05T08:28:25.953 回答