我正在尝试在 C# 中实现消费者。有许多发布者可以同时执行。我创建了三个示例,一个使用 Rx 和主题,一个使用 BlockingCollection,第三个使用来自 BlockingCollection 的 ToObservable。在这个简单的示例中,他们都做同样的事情,我希望他们与多个制作人一起工作。
每种方法的不同之处是什么?
我已经在使用 Rx,所以我更喜欢这种方法。但我担心 OnNext 没有线程安全保证,而且我不知道 Subject 和默认调度程序的排队语义是什么。
有线程安全的主题吗?
是否要处理所有消息?
当这不起作用时,还有其他情况吗?是否同时处理?
void SubjectOnDefaultScheduler()
{
var observable = new Subject<long>();
observable.
ObserveOn(Scheduler.Default).
Subscribe(i => { DoWork(i); });
observable.OnNext(1);
observable.OnNext(2);
observable.OnNext(3);
}
不是 Rx,但很容易适应使用/订阅它。它需要一个项目,然后处理它。这应该连续发生。
void BlockingCollectionAndConsumingTask()
{
var blockingCollection = new BlockingCollection<long>();
var taskFactory = new TaskFactory();
taskFactory.StartNew(() =>
{
foreach (var i in blockingCollection.GetConsumingEnumerable())
{
DoWork(i);
}
});
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
使用有点像主题的阻塞集合似乎是一个很好的折衷方案。我隐含地猜测会安排任务,这样我就可以使用异步/等待,对吗?
void BlockingCollectionToObservable()
{
var blockingCollection = new BlockingCollection<long>();
blockingCollection.
GetConsumingEnumerable().
ToObservable(Scheduler.Default).
Subscribe(i => { DoWork(i); });
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}