3

我正在尝试在 C# 中实现消费者。有许多发布者可以同时执行。我创建了三个示例,一个使用 Rx 和主题,一个使用 BlockingCollection,第三个使用来自 BlockingCollection 的 ToObservable。在这个简单的示例中,他们都做同样的事情,我希望他们与多个制作人一起工作。

每种方法的不同之处是什么?

我已经在使用 Rx,所以我更喜欢这种方法。但我担心 OnNext 没有线程安全保证,而且我不知道 Subject 和默认调度程序的排队语义是什么。

有线程安全的主题吗?

是否要处理所有消息?

当这不起作用时,还有其他情况吗?是否同时处理?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}

不是 Rx,但很容易适应使用/订阅它。它需要一个项目,然后处理它。这应该连续发生。

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

使用有点像主题的阻塞集合似乎是一个很好的折衷方案。我隐含地猜测会安排任务,这样我就可以使用异步/等待,对吗?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}
4

1 回答 1

7

主题不是线程安全的。并发发出的 OnNexts 会直接并发调用一个 Observer。考虑到 Rx 的其他领域强制执行正确语义的程度,我个人觉得这非常令人惊讶。我只能假设这样做是出于性能考虑。

不过,Subject 是一种中途的房子,因为它确实使用 OnError 或 OnComplete 强制终止——在它们中的任何一个被引发之后,OnNext 是一个 NOP。这种行为线程安全的。

但是在 Subject 上使用 Observable.Synchronize() 它将强制传出调用遵守正确的 Rx 语义。特别是,如果同时进行 OnNext 调用,则会阻塞。

底层机制是标准的 .NET 锁。当锁被多个线程争用时,大多数时候它们会以先到先得的方式获得锁。在某些情况下违反了公平性。但是,您肯定会获得您正在寻找的序列化访问。

ObserveOn 具有特定于平台的行为 - 如果可用,您可以提供 aSynchronizationContext并将 OnNext 调用发布到它。使用调度程序,它最终将调用放在 a 上ConcurrentQueue<T>并通过调度程序连续调度它们 - 因此执行线程将取决于调度程序。无论哪种方式,排队行为也将强制执行正确的语义。

在这两种情况下(Synchronize & ObserveOn),您肯定不会丢失消息。使用 ObserveOn,您可以通过选择的 Scheduler/Context 隐式选择处理消息的线程,使用 Synchronize,您将处理调用线程上的消息。哪个更好取决于您的情况。

还有更多需要考虑的因素——比如如果你的生产者的速度超过你的消费者,你想做什么。

您可能还想看看 Rxx Consume:http ://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

显示同步行为的示例代码(Nuget Rx-Testing,Nunit)- Thread.Sleep 代码有点做作,但它很糟糕,而且我很懒惰:):

public class SubjectTests
{
    [Test]
    public void SubjectDoesNotRespectGrammar()
    {
        var subject = new Subject<int>();
        var spy = new ObserverSpy(Scheduler.Default);
        var sut = subject.Subscribe(spy);
        // Swap the following with the preceding to make this test pass
        //var sut = subject.Synchronize().Subscribe(spy);

        Task.Factory.StartNew(() => subject.OnNext(1));
        Task.Factory.StartNew(() => subject.OnNext(2));

        Thread.Sleep(2000);

        Assert.IsFalse(spy.ConcurrencyViolation);
    }

    private class ObserverSpy : IObserver<int>
    {
        private int _inOnNext;

        public ObserverSpy(IScheduler scheduler)
        {
            _scheduler = scheduler;
        }

        public bool ConcurrencyViolation = false;
        private readonly IScheduler _scheduler;

        public void OnNext(int value)
        {
            var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);

            if (isInOnNext == 1)
            {
                ConcurrencyViolation = true;
                return;
            }

            var wait = new ManualResetEvent(false);

            _scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
            wait.WaitOne();

            _inOnNext = 0;
        }

        public void OnError(Exception error)
        {

        }

        public void OnCompleted()
        {

        }
    }
}
于 2013-05-12T21:23:45.613 回答