我正在尝试将响应式扩展与 Oracle AQ 一起使用。当消息到达 Oracle 队列时,它会触发“OracleAQMessageAvailableEvent”,告诉消费者有消息。在 OracleAQMessageAvailableEventHandler 中,消费者调用 OracleAQQueue.Dequeue() 来检索消息。
我已经将上述内容与RX一起使用。以下是我使用的代码。
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
问题是,如果我在一切正常后订阅消息,但如果我多次订阅消息(即我的应用程序中的多个消费者),那么每个消费者都会尝试调用“_queue.Dequeue()”,并且第一次调用之后的每个调用都会失败如果我们没有新消息。
谁能指导我该怎么做。我认为,我的场景适用于 Hot Observable,但我正在努力解决它。