2

我在方法中有这样的代码:

ISubject<Message> messages = new ReplaySubject<Message>(messageTimeout);

public void HandleNext(string clientId, Action<object> callback)
{
    messages.Where(message => !message.IsHandledBy(clientId))
            .Take(1)
            .Subscribe(message =>
                       {
                           callback(message.Message);
                           message.MarkAsHandledBy(clientId);
                       });
}

rx'y 编码它的方式是什么,以便在多个并发调用时不会发生MarkAsHandledBy()和之间的竞争?IsHandledBy()HandleNext()

编辑:

这是用于长轮询。HandleNext()为每个 Web 请求调用。请求只能处理一条消息,然后返回给客户端。下一个请求接受下一条消息,依此类推。

完整的代码(当然仍在进行中)是这样的:

public class Queue
{
    readonly ISubject<MessageWrapper> messages;

    public Queue() : this(TimeSpan.FromSeconds(30)) {}

    public Queue(TimeSpan messageTimeout)
    {
        messages = new ReplaySubject<MessageWrapper>(messageTimeout);
    }

    public void Send(string channel, object message)
    {
        messages.OnNext(new MessageWrapper(new List<string> {channel}, message));
    }

    public void ReceiveNext(string clientId, string channel, Action<object> callback)
    {
        messages
            .Where(message => message.Channels.Contains(channel) && !message.IsReceivedBy(clientId))
            .Take(1)
            .Subscribe(message =>
                       {
                           callback(message.Message);
                           message.MarkAsReceivedFor(clientId);
                       });
    }

    class MessageWrapper
    {
        readonly List<string> receivers;

        public MessageWrapper(List<string> channels, object message)
        {
            receivers = new List<string>();
            Channels = channels;
            Message = message;
        }

        public List<string> Channels { get; private set; }
        public object Message { get; private set; }

        public void MarkAsReceivedFor(string clientId)
        {
            receivers.Add(clientId);
        }

        public bool IsReceivedBy(string clientId)
        {
            return receivers.Contains(clientId);
        }
    }
}

编辑2:

现在我的代码如下所示:

public void ReceiveNext(string clientId, string channel, Action<object> callback)
{
    var subscription = Disposable.Empty;
    subscription = messages
        .Where(message => message.Channels.Contains(channel))
        .Subscribe(message =>
                   {
                       if (message.TryDispatchTo(clientId, callback))
                           subscription.Dispose();
                   });
}

class MessageWrapper
{
    readonly object message;
    readonly List<string> receivers;

    public MessageWrapper(List<string> channels, object message)
    {
        this.message = message;
        receivers = new List<string>();
        Channels = channels;
    }

    public List<string> Channels { get; private set; }

    public bool TryDispatchTo(string clientId, Action<object> handler)
    {
        lock (receivers)
        {
            if (IsReceivedBy(clientId)) return false;
            handler(message);
            MarkAsReceivedFor(clientId);
            return true;
        }
    }

    void MarkAsReceivedFor(string clientId)
    {
        receivers.Add(clientId);
    }

    bool IsReceivedBy(string clientId)
    {
        return receivers.Contains(clientId);
    }
}
4

2 回答 2

2

在我看来,你正在为自己做一场 Rx 噩梦。Rx 应该提供一种非常简单的方法来将订阅者连接到您的消息。

我喜欢这样一个事实,即你有一个自包含的类来保存你的ReplaySubject- 它可以阻止你的代码中的其他地方是恶意的并且OnCompleted过早地调用。

但是,该ReceiveNext方法没有为您提供任何删除订阅者的方法。至少是内存泄漏。您对客户端 ID 的跟踪MessageWrapper也是潜在的内存泄漏。

我建议您尝试使用这种功能,而不是ReceiveNext

public IDisposable RegisterChannel(string channel, Action<object> callback)
{
    return messages
        .Where(message => message.Channels.Contains(channel))
        .Subscribe(message => callback(message.Message));
}

这是非常 Rx 的。这是一个很好的纯查询,您可以轻松取消订阅。

由于Action<object> callback毫无疑问与我直接相关,因此clientId我会考虑将逻辑放置在其中以防止重复的消息处理。

现在你的代码是非常程序化的,不适合 Rx。似乎您还没有完全了解如何最好地使用 Rx。这是一个好的开始,但您需要更多地考虑功能(如在函数式编程中)。


如果您必须按原样使用您的代码,我建议您进行一些更改。

Queue这样做:

public IDisposable ReceiveNext(
    string clientId, string channel, Action<object> callback)
{
    return
        messages
            .Where(message => message.Channels.Contains(channel))
            .Take(1)
            .Subscribe(message =>
                message.TryReceive(clientId, callback));
}

MessageWrapper摆脱MarkAsReceivedFor&IsReceivedBy并改为这样做:

    public bool TryReceive(string clientId, Action<object> callback)
    {
        lock (receivers)
        {
            if (!receivers.Contains(clientId))
            {
                callback(this.Message);
                receivers.Add(clientId);
                return true;
            }
            else
                return false;
        }
    }

我真的不明白您为什么会有这种.Take(1)情况,但是这些更改可能会根据其原因减少竞争条件。

于 2012-09-11T03:20:00.810 回答
1

我不确定像这样使用 Rx 是一个好习惯。Rx 定义了流的概念,它要求没有并发通知。

也就是说,要回答您的问题,为了避免竞争条件,请在IsReceivedByandMarkAsReceivedFor方法中放置一个锁。

至于更好的方法,您可以放弃整个处理业务,在接收请求时使用ConcurrentQueueTryDequeue消息(您只是在做Take(1)- 这适合队列模型)。Rx 可以帮助您为每条消息提供 TTL 并将其从队列中删除,但您也可以在传入请求时执行此操作。

于 2012-09-10T20:42:33.137 回答