2

当我收到来自 3rd 方系统的消息时,我遇到了一种情况,它可以保证消息的传递。为此,它需要客户端确认收到的每条消息。这种系统的一个例子可能是 RibbitMQ。

现在,我知道 Monix 的 Observables 中有背压机制,这立即让我想用它来确认消息。换句话说,我想创建一个 observable,这样当onNext返回时Ack.Continue,我向外部系统确认该消息。贝娄是我想法的简单概述。

Observable.create[Event](OverflowStrategy.Fail(100)) { downstram =>
  externaSystem.subscribe { event =>
    downstram.onNext(event).onComplete {
      case Success(Ack.Continue) => externaSystem.acknowledge(event)
      case _ => externaSystem.reset(event)
    }
  }
  Cancelable { () => externaSystem.stop() }
}

当然,整个想法是避免丢失任何消息,并且仅在观察者实际返回 Ack.Continue 时才确认它们。但是,我注意到了几种情况,当 Monix 负责管理可观察对象的背压时,例如,当 Imulticastshare观察者时。在这种情况下,原始 observableAck.Continue无需等待预期订阅者的响应即可接收。

这一切都让我想到了一个问题,如果实际上使用 Monix 背压机制确认第 3 方系统上的消息是一个好主意,如果是这样,是否有任何警告我需要注意(比如不要对multicast这样的观察者)

我将感谢您的帮助,并非常感谢您的提前。

4

0 回答 0