1

我正在尝试将响应式扩展与 Oracle AQ 一起使用。当消息到达 Oracle 队列时,它会触发“OracleAQMessageAvailableEvent”,告诉消费者有消息。在 OracleAQMessageAvailableEventHandler 中,消费者调用 OracleAQQueue.Dequeue() 来检索消息。

我已经将上述内容与RX一起使用。以下是我使用的代码。

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
                    h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
                .Where(x => x.EventArgs.AvailableMessages > 0)
                .Select(x =>
                {
                    OracleAQMessage msg = _queue.Dequeue();
                    return (UpdateMsg) msg.Payload;
                });
messages.subscribe(....)

问题是,如果我在一切正常后订阅消息,但如果我多次订阅消息(即我的应用程序中的多个消费者),那么每个消费者都会尝试调用“_queue.Dequeue()”,并且第一次调用之后的每个调用都会失败如果我们没有新消息。

谁能指导我该怎么做。我认为,我的场景适用于 Hot Observable,但我正在努力解决它。

4

2 回答 2

4

我认为您正在寻找 Hot Observable 是正确的。如果我们遵循代码,可能会更清楚为什么您会看到_queue.Dequeue();被多次调用。

首先,您从 Oracle 订阅事件

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h)

这就像在 Rx 之前的世界中连接事件处理程序一样。收听(订阅)的每个人都将收到相同的事件。如果他们在引发事件后订阅,那么他们就错过了它。

然后你过滤掉空集。

.Where(x => x.EventArgs.AvailableMessages > 0)

那里没什么特别的。

然后,您从查询内部执行副作用。

.Select(x =>
    {
        OracleAQMessage msg = _queue.Dequeue();
        return (UpdateMsg) msg.Payload;
    });

这里的副作用是您正在进行破坏性读取 ( Dequeue)。所有订阅者在从上游推送事件时_queue.MessageAvailable都会尝试调用Dequeue().

为避免所有订阅者调用副作用,您可以将序列设为 Hot(如您所建议的那样)。为此,您可以查看Publish()运算符。

操作员将Publish()返回一个IConnectableObservable<T>IObservable<T>通过添加该方法进行扩展的Connect()方法。这允许对何时执行订阅逻辑进行细粒度控制。但是,这可能对您来说控制过多,您可能会发现这RefCount()正是您所需要的。

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
    {
        OracleAQMessage msg = _queue.Dequeue();
        return (UpdateMsg) msg.Payload;
    })
.Publish()
.Refcount();

现在您的每个订阅者都将收到相同的消息,并且您Dequeue()的副作用只会在每个事件中调用一次(并且仅在有订阅者时)。

此处涵盖了热和冷可观察对象

于 2016-05-31T08:44:00.677 回答
-1

李坎贝尔,对不起,我的错。您提到的解决方案确实有效。实际上,我用错了。我有一个名为 QueueWrapper 的类,它有一个名为 Messages 的属性。我有这个 Messages 的实现

    public IObservable<UpdateMsg> Messages { 
        get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,         OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
       .Where(x => x.EventArgs.AvailableMessages > 0)
       .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
       .Publish()
       .Refcount();
}}

并且我的客户端代码正在使用像这样的 Messages 属性进行订阅

// First Subscription
_queueWrapper.Messages.Subscribe(....)

// Second Subscription
_queueWrapper.Messages.Subscribe(....)

所以对于每个订阅,Messages 属性都会返回一个新的 IObservable。为了解决这个问题,我将 observable 的初始化移到了 QueueWrapper 的构造函数中,即以下代码:

    public QueueWrapper() {
     _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
    .Where(x => x.EventArgs.AvailableMessages > 0)
    .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
    .Publish()
    .Refcount();
}

我的 Messages 属性只返回 _messages;

public IObservable<UpdateMsg> Messages { get { return _messages; } }

之后,一切都开始按预期工作。

于 2016-05-31T12:59:52.527 回答