1

我有一个监听器,它以 IPayload 的形式接收工作。听众应该把这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:

public interface IObserver
{
    void DoWork(IPayload payload);
}

public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}

public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}

public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();

    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }

    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }

    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }

    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

这是遵循观察者/可观察模式(即pub sub?)的有效方法吗?我的理解是,NotifyObservers 也会为每个有效负载带来威胁。这个对吗?非常欢迎任何改进建议。

请注意,所有观察者都必须在以有效载荷的形式将新工作传递给他们之前完成他们的工作——“观察”的顺序无关紧要。基本上,侦听器必须像大师一样发挥作用,同时尽可能多地使用 TPL 来利用主机的内核。恕我直言,这需要向侦听器/可观察对象显式注册观察者。

PS:

我认为 Parallel.ForEach 不会为每个观察者创建一个线程:为什么 Parallel.ForEach 不运行多个线程?如果这是真的,我如何确保为每个观察者创建一个线程?

我想到的另一种选择是:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}
4

1 回答 1

2

当然你可以这样做,但是在.net中如果你不想重新发明轮子就不需要了:-)在c#中可以使用事件来完成。

一个简短的例子:

  //Your Listener who has a public eventhandler where others can add them as listeners
  public class Listener{
      //your eventhandler where others "add" them as listeners
      public event EventHandler<PayLoadEventsArgs> IncomingPayload;

      //a method where you process new data and want to notify the others
      public void PushIncomingPayLoad(IPayload payload)
      {
          //check if there are any listeners
          if(IncomingPayload != null)
              //if so, then notify them with the data in the PayloadEventArgs
              IncomingPayload(this, new PayloadEventArgs(payload));
      }
  }  

  //Your EventArgs class to hold the data
  public class PayloadEventArgs : EventArgs{

      Payload payload { get; private set; }  

      public PayloadEventArgs(Payload payload){
          this.payload = payload;
      }
  }

  public class Worker{
      //add this instance as a observer
      YourListenerInstance.IncomingPayload += DoWork;

      //remove this instance 
      YourListenerInstance.IncomingPayload -= DoWork;

      //This method gets called when the Listener notifies the  IncomingPayload listeners
      void DoWork(Object sender, PayloadEventArgs e){
          Console.WriteLine(e.payload);
      }
   }

编辑:当问题要求并行执行时,在订阅者端做新线程怎么样?我认为这是实现这一目标的最简单方法。

//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () => 
{
      //Do your work here
});

//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () => 
{
      //Do your work here
}, TaskCreationOptions.LongRunning);

希望这能回答你的问题?如果没有,请发表评论。

于 2016-02-13T14:18:13.737 回答