1

我不完全确定我正在寻找的正确术语,所以请多多包涵。在我正在维护的应用程序中,我们有一个标准的 PubSub 实现,用于从几个不同的发布者获取数据到应用程序中。我们有一个“服务”来订阅处理 PubSub 协商的数据,不同类型的数据有不同的服务(例如产品、订单、供应商等)。这些服务通常通过普通的 .NET 事件通知应用程序消息。

然而,并非应用程序的所有部分都对所有事件感兴趣。例如,某些地方可能只对产品 A、B 和 C 感兴趣,而应用程序中的另一个地方需要产品 B、D 和 E,而想要的内容通常由用户输入驱动。最终发生的事情是,在应用程序的每个位置,都会将处理程序添加到服务的事件中,并且该处理程序会根据其他代码过滤掉不需要的事件,这些代码根据用户输入设置过滤器。毫不奇怪,用于过滤和设置过滤器的代码在任何地方都基本相同。

对于一个通用的 PubSub 对象集来说,这似乎是一个位于服务和需要数据的地方之间的好地方。我可以根据需要为每个数据场景设置对象的具体版本,并将它们添加到服务中,以供应用程序中的各个位置使用。不想重新发明轮子,我一直在谷歌搜索 PubSub,我能找到的最接近主题的是主题,但这些不适合这种情况,因为它需要为每个产品设置一个主题。

有没有人知道这方面的任何资源?我正在使用 C#,但如果需要,我可以适应其他语言。主要要求是这不能是单独的服务或进程,比如WCF;它需要驻留在应用程序中并使用现有的基础设施,大概是通过附加到服务的事件,然后以这种方式通知订阅者。同样值得关注的是这个应用程序是大量多线程的,因此倾向于线程安全会有所帮助,尽管我可以根据需要进行调整。

提前致谢!

4

2 回答 2

0

我会关注 DDS 产品(数据分发服务),例如 RTI。他们提供您所需要的一切以及更多。见: http ://www.rti.com/products/dds/

于 2013-03-04T20:18:34.863 回答
0

嗯..不确定我是否理解正确。这样的事情怎么样?使用模式

public void handleui(dynamic s)
{
    Application.Current.Dispatcher.Invoke(delegate
                                              {
                                                  btn1.Content = s.ToString();
                                              });
}
Globals.Events["error"] += msg=> Console.WriteLine(msg);//logger perhaps
Globals.Events["productb"] += handleui;//sub
Globals.Events["productb"] -= handleui;//unsub    
Globals.Events["productb"].Send("productbdata");//raise the event or publish to productb channel subscribers
Globals.Events.Send("broadcast?");

我想您可能会做一个过滤器,它只是将所有事件发送到 Events["producta"]、Events["productb"]、Events["productc"] 等,并且这些部分可以在需要时进行 sub/unsub。

执行。

using System;
using System.Collections.Concurrent;


public class Globals
{
    public static MsgBus Events = new MsgBus();
}

public class MsgBus
{
    private readonly ConcurrentDictionary<dynamic, MsgChannel> channels = new ConcurrentDictionary<dynamic, MsgChannel>();

    public MsgChannel this[dynamic channel]
    {
        set { channels[channel] = value; }
        get
        {
            var ch = (MsgChannel)channels.GetOrAdd(channel, new MsgChannel());
            return ch;
        }
    }

    private MsgChannel broadcast = new MsgChannel();
    public void Send(dynamic msg)
    {
        broadcast.Send(msg);
    }

    public static MsgBus operator +(MsgBus left, Action<dynamic> right)
    {
        left.broadcast += right;
        return left;
    }

    public static MsgBus operator -(MsgBus left, Action<dynamic> right)
    {
        left.broadcast -= right;
        return left;
    }
}

public class MsgChannel
{
    ConcurrentDictionary<Action<dynamic>, int> observers = new ConcurrentDictionary<Action<dynamic>, int>();
    public void Send(dynamic msg)
    {
        foreach (var observer in observers)
        {
            for (int i = 0; i < observer.Value; i++)
            {
                observer.Key.Invoke(msg);
            }
        }
    }
    public static MsgChannel operator +(MsgChannel left, Action<dynamic> right)
    {

        if (!left.observers.ContainsKey(right))
        {
            left.observers.GetOrAdd(right, 0);
        }

        left.observers[right]++;
        return left;
    }

    public static MsgChannel operator -(MsgChannel left, Action<dynamic> right)
    {
        if (left.observers.ContainsKey(right) &&
            left.observers[right] > 0)
        {
            left.observers[right]--;
            int dummy;
            if (left.observers[right] <= 0) left.observers.TryRemove(right, out dummy);
        }

        return left;
    }
}
于 2013-03-31T02:09:41.860 回答