7

在这个例子中https://stackoverflow.com/a/9980346/93647和这里为什么我的破坏者例子这么慢?(在问题结束时)有 1 个发布者发布项目和 1 个消费者。

但就我而言,消费者工作要复杂得多,需要一些时间。所以我想要 4 个并行处理数据的消费者。

例如,如果生产者生产数字:1,2,3,4,5,6,7,8,9,10,11..

我希望消费者 1 捕获 1,5,9,...消费者 2 捕获 2,6,10,...消费者 3 捕获 3,7,11,...消费者 4 捕获 4,8,12...(不完全是这些数字,想法是数据应该并行处理,我不在乎哪个消费者处理了哪个特定数字)

请记住,这需要并行完成,因为在实际应用程序中,消费者工作非常昂贵。我希望消费者在不同的线程中执行以使用多核系统的强大功能。

当然,我可以只创建 4 个环形缓冲区并将 1 个消费者附加到 1 个环形缓冲区。这样我就可以使用原始示例。但我觉得这不会是正确的。创建 1 个发布者(1 个环形缓冲区)和 4 个消费者可能是正确的——因为这是我需要的。

在谷歌群组中添加一个非常相似的问题的链接:https ://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

所以我们有两个选择:

  • 一环多个消费者(每个消费者都会在每次添加时“唤醒”,所有消费者应该有相同的WaitStrategy)
  • 许多“一环 - 一个消费者”(每个消费者只会在它应该处理的数据上唤醒。每个消费者可以有自己的 WaitStrategy)。
4

2 回答 2

2

编辑:我忘了提到代码部分取自常见问题解答。我不知道这种方法比弗兰克的建议好还是坏。

该项目的文档记录严重不足,这很可惜,因为它看起来不错。
无论如何尝试以下剪辑(基于您的第一个链接) - 在单声道上测试并且似乎没问题:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}
于 2012-11-12T06:57:17.773 回答
0

从环形缓冲区的规范中,您会看到每个消费者都会尝试处理您的ValueEvent. 在你的情况下,你不需要那个。

我是这样解决的:

向您添加一个已处理的字段ValueEvent,当消费者接受他在该字段上测试的事件时,如果它已被处理,他将继续下一个字段。

不是最漂亮的方式,但它是缓冲区的工作方式。

于 2012-11-12T05:36:21.077 回答