1

我有一个简单的消息传递框架,它围绕附加到多个“客户端”实例的“主机”的单个实例构建。每个实例通过向其推送消息来与另一个实例通信。然后,实例使用单独的任务按照接收消息的顺序处理它们自己的消息。

这是代码的简化版本:

interface IMessage
{
}

class Host
{
    ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
    BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Host()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });
    }

    public void AddClient(Client client)
    {
        _Clients.Add(client);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // consume the message and typically generate new messages
        // For now just echo the message back
        Broadcast(message);  
    }

    private void Broadcast(IMessage message)
    {
        foreach (var client in _Clients)
        {
            client.PushMessage(message);
        }
    }
}


class Client
{
    private Host _Host;
    private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();

    public Client(Host host)
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });

        _Host = host;
        _Host.AddClient(this);
    }

    public void PushMessage(IMessage message)
    {
        _IncomingMessages.Add(message);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

我的要求是每个处理它们的消息的实例都使用并行执行(如果可能的话),并且每条消息都按照每个班级一次收到一条的顺序进行处理。

生成仅从 GetConsumingEnumerable() 中提取的新任务是不是一个坏主意?

我觉得我可以使用响应式扩展和 IObserveable 来简化这一点,但我对使用 Rx 还是很陌生,并且不确定如何构建它来满足我的并行执行需求。

任何指导将不胜感激!

4

1 回答 1

3

IObservable通过使用 Rx(特别是)作为将消息传递到 的中介Host,然后将消息从 传递Host到所有客户端,您当然可以使您的生活变得更轻松。对代码的两个更改(您可以做一个或另一个,或两者都做)将传递一个IObservable<IMessage>toHost作为它的输入并让它产生一个IObservable<IMessage>, 并且类似地传递一个IObservable<IMessage>to Client

据我所知,HostClient不需要了解彼此(这很好)。如果必须主动管理客户端(例如启动它们),这将不起作用Host- 在这些情况下,需要更改以下代码以Host管理客户端订阅,这并不重要,但肯定会涉及请求订阅的客户端来自主机,而主机持有由此产生的IDisposable.

此外,以下内容通常不处理错误或取消订阅,可以轻松添加。它是一个与你的功能相当的骨架。

class Host
{
    private Subject<IMessage> _outbound;

    public Host(IObservable<IMessage> messages)
    {
        _outbound = new Subject<IMessage>();        
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        _outbound.OnNext(message); // just echo
    }

    public IObservable<IMessage> Messages { get { return _outbound.AsObservable(); } }
}

class Client
{
    public Client(IObservable<IMessage> messages)
    {
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }

    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

那么用法很简单:

var source = new Subject<long>();

var host = new Host(source);
var client1 = new Client(host.Messages);
var client2 = new Client(host.Messages);
var client3 = new Client(host.Messages);

source.OnNext(1); // assuming each of these is an IMessage
source.OnNext(2);
source.OnNext(3);

当然,您可以使用IObservable任何源来驱动Host.

请注意,如果消息的处理是“长时间运行的”,您希望Scheduler.NewThreadSubscribeOn调用中使用。否则,每条消息当前都被添加到Task Pool要处理的消息中。

于 2012-07-11T21:26:42.720 回答