3

我在 Express 应用程序中使用 Redis。我的应用程序既是流的发布者又是消费者,使用单个 redis 连接 (redis.createClient)。我对管理永久订阅(使用 xreadgroup)的最佳方式有疑问。目前我正在这样做:

    const readStream = () => xreadgroup('GROUP' appId, consumerId, 'BLOCK', 1, 'COUNT', 1, 'STREAMS' key, '>')
        .then(handleData)
        .then(() => setImmeadiate(readStream));

wherexreadgroup只是 node-redis' 的一个承诺版本xreadgroup

我的问题 - BLOCK 的适当用法是什么?如果我无限期或长时间阻止,那么我的客户xadd在解除阻止或阻止超时之前无法发布任何消息(带有 )。由于我必须使用某种循环/递归来继续阅读事件,BLOCK因此似乎没有必要;我可以把它关掉吗?这是预期的用途吗?

同样,使用 setImmeadiate 是否合适,还是首选 process.nextTick 或异步循环?

node-redis 中的文档很少,少数示例只是在阻塞后读取消息一次,并且不在同一个客户端上生成/使用。

4

1 回答 1

0

不是这方面的专家,但我想分享一些可能有帮助的想法。

  1. 我不确定 node-redis 是否可以“堆叠”多个命令,这意味着 - 它是否能够在等待 XREADGROUP 完成时触发新命令?

  2. 从你的描述来看,好像是这样的。在这种情况下,我建议您创建一个专用连接来调用 XREADGROUP - 这样您就可以发布和收听而不会相互阻塞。

  3. 您不需要使用BLOCK; 但是,如果您的目标是监听所有事件并等待那些尚未发布的事件,那么使用它可能是明智的,并且会在减少对 redis 的调用的同时为您提供更好的性能。

  4. setImmediate可能很好,尤其是使用BLOCK. 如果您不使用它,那么在调用之间添加一点超时可能会很好 - 没有BLOCK调用将几乎立即响应返回。您可以检查以获取更多详细信息。

友情提示:不要忘记确认您的消息或使用 NOACK(根据您的用例可能没问题):

消费者组需要通过 XACK 命令明确确认消费者成功处理的消息。

NOACK 子命令可用于避免在不要求可靠性并且可以接受偶尔的消息丢失的情况下将消息添加到 PEL。

来源:https ://redis.io/commands/xreadgroup

于 2021-10-29T02:33:29.697 回答