我有一个监听器,它以 IPayload 的形式接收工作。听众应该把这项工作推给真正做这项工作的观察者。这是我第一次粗略地尝试实现这一目标:
public interface IObserver
{
void DoWork(IPayload payload);
}
public interface IObservable
{
void RegisterObserver(IObserver observer);
void RemoveObserver(IObserver observer);
void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
public void DoWork(IPayload payload)
{
// do some heavy lifting
}
}
public class Listener : IObservable
{
private readonly List<IObserver> _observers = new List<IObserver>();
public void PushIncomingPayLoad(IPayload payload)
{
NotifyObservers(payload);
}
public void RegisterObserver(IObserver observer)
{
_observers.Add(observer);
}
public void RemoveObserver(IObserver observer)
{
_observers.Remove(observer);
}
public void NotifyObservers(IPayload payload)
{
Parallel.ForEach(_observers, observer =>
{
observer.DoWork(payload);
});
}
}
这是遵循观察者/可观察模式(即pub sub?)的有效方法吗?我的理解是,NotifyObservers 也会为每个有效负载带来威胁。这个对吗?非常欢迎任何改进建议。
请注意,所有观察者都必须在以有效载荷的形式将新工作传递给他们之前完成他们的工作——“观察”的顺序无关紧要。基本上,侦听器必须像大师一样发挥作用,同时尽可能多地使用 TPL 来利用主机的内核。恕我直言,这需要向侦听器/可观察对象显式注册观察者。
PS:
我认为 Parallel.ForEach 不会为每个观察者创建一个线程:为什么 Parallel.ForEach 不运行多个线程?如果这是真的,我如何确保为每个观察者创建一个线程?
我想到的另一种选择是:
public async void NotifyObservers(IPayload payload)
{
foreach (var observer in _observers)
{
var observer1 = observer;
await Task.Run(() => observer1.DoWork(payload));
}
await Task.WhenAll();
}