1

我想要实现的是拥有一个消费者生产者方法。可以有许多生产者,但只有一个消费者。由于可扩展性,不可能有专门的消费者,所以想法是让生产者在有数据要消费并且当前没有活动消费者的情况下启动消费过程。

 1. Many threads can be producing messages. (Asynchronous)
 2. Only one thread can be consuming messages. (Synchronous)
 3. We should only have a consumer in process if there is data to be consumed 
 4. A continuous consumer that waits for data would not be efficient if we add many of these classes.

在我的示例中,我有一组发送数据的方法。多个线程可以写入数据Write(),但只有其中一个线程会循环并发送数据SendNewData()。之所以只有一个循环可以写入数据,是因为数据的顺序必须是同步的,而且在AsyncWrite()我们无法控制的情况下,我们只能通过一次运行一个来保证顺序AyncWrite()

我遇到的问题是,如果一个线程被调用来Write()生产,它会将数据排队并检查Interlocked.CompareExchance是否有消费者。如果它看到另一个线程在循环中已经在消费,它会假设这个消费者将发送数据。如果该循环线程消费者位于“Race Point A”,这是一个问题,因为该消费者已经检查了没有更多消息要发送并且即将关闭消费进程。

有没有办法在不锁定大部分代码的情况下防止这种竞争条件。真实场景有很多队列,比这复杂一点。

在真正的代码List<INetworkSerializable>中实际上是一个 byte[] BufferPool。我使用 List 作为示例,以使该块更易于阅读。

由于有 1000 多个此类同时处于活动状态,我无法承受 SendNewData 使用专用线程连续循环。只有在有数据要发送时,循环线程才应该处于活动状态。

public void Write(INetworkSerializable messageToSend)
{
   Queue.Enqueue(messageToSend);

   // Check if there are any current consumers. If not then we should instigate the consuming.
   if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
   { //We are now the thread that consumes and sends data
     SendNewData();
   }
}

//Only one thread should be looping here to keep consuming and sending data synchronously.
private void SendNewData()
{
    INetworkSerializable dataToSend;
    List<INetworkSerializable> dataToSendList = new List<INetworkSerializable>();

    while (true)
    {
        if (!Queue.TryDequeue(out dataToSend))
        {
           //Race Point A
           if (dataToSendList.IsEmpty)
           {
              //All data is sent, return so that another thread can take responsibility.
              Interlocked.Decrement(ref RunningWrites);
              return;
           }

           //We have data in the list to send but nothing more to consume so lets send the data that we do have.             
           break;
        }

        dataToSendList.Add(dataToSend);
    }

    //Async callback is WriteAsyncCallback()
    WriteAsync(dataToSendList);
}

//Callback after WriteAsync() has sent the data.
private void WriteAsyncCallback()
{
    //Data was written to sockets, now lets loop back for more data
    SendNewData();
}
4

3 回答 3

1

听起来你会更好地使用BlockingCollection轻松实现的生产者-消费者模式:

var toSend = new BlockingCollection<something>();

// producers
toSend.Add(something);

// when all producers are done
toSend.CompleteAdding();


// consumer -- this won't end until CompleteAdding is called
foreach(var item in toSend.GetConsumingEnumerable())
   Send(item);

为了解决知道何时调用 CompleteAdding 的评论,我将启动 1000 个生产者作为任务,等待所有这些任务完成 (Task.WaitAll),然后调用 CompleteAdding。如果需要,有一些很好的重载可以让你更好地控制 CancellationTokens。

此外,TPL 在调度阻塞线程方面非常好。

更完整的代码:

var toSend = new BlockingCollection<int>();            
Parallel.Invoke(() => Produce(toSend), () => Consume(toSend));

...

private static void Consume(BlockingCollection<int> toSend)
{
    foreach (var value in toSend.GetConsumingEnumerable())
    {
        Console.WriteLine("Sending {0}", value);
    }
}

private static void Produce(BlockingCollection<int> toSend)
{
    Action<int> generateToSend = toSend.Add;

    var producers = Enumerable.Range(0, 1000)
                              .Select(n => new Task(value => generateToSend((int) value), n))
                              .ToArray();

    foreach(var p in producers)
    {
        p.Start();
    }

    Task.WaitAll(producers);
    toSend.CompleteAdding();
}
于 2013-02-28T20:38:59.743 回答
1

检查此变体。代码中有一些描述性的注释。另请注意,WriteAsyncCallback现在不再调用SendNewData方法

private int _pendingMessages;

    private int _consuming;

    public void Write(INetworkSerializable messageToSend)
    {
        Interlocked.Increment(ref _pendingMessages);
        Queue.Enqueue(messageToSend);

        // Check if there is anyone consuming messages
        // if not, we will have to become a consumer and process our own message, 
        // and any other further messages until we have cleaned the queue
        if (Interlocked.CompareExchange(ref _consuming, 1, 0) == 0)
        {
            // We are now the thread that consumes and sends data
            SendNewData();
        }
    }

    // Only one thread should be looping here to keep consuming and sending data synchronously.
    private void SendNewData()
    {
        INetworkSerializable dataToSend;
        var dataToSendList = new List<INetworkSerializable>();
        int messagesLeft;

        do
        {
            if (!Queue.TryDequeue(out dataToSend))
            {
                // there is one possibility that we get here while _pendingMessages != 0:
                // some other thread had just increased _pendingMessages from 0 to 1, but haven't put a message to queue.
                if (dataToSendList.Count == 0)
                {
                    if (_pendingMessages == 0)
                    {
                        _consuming = 0;
                        // and if we have no data this mean that we are safe to exit from current thread.
                        return;
                    }
                }
                else
                {
                    // We have data in the list to send but nothing more to consume so lets send the data that we do have.             
                    break;
                }
            }

            dataToSendList.Add(dataToSend);
            messagesLeft = Interlocked.Decrement(ref _pendingMessages);
        }
        while (messagesLeft > 0);

        // Async callback is WriteAsyncCallback()
        WriteAsync(dataToSendList);
    }

    private void WriteAsync(List<INetworkSerializable> dataToSendList)
    {
        // some code
    }

    // Callback after WriteAsync() has sent the data.
    private void WriteAsyncCallback()
    {
        // ...
        SendNewData();
    }
于 2013-03-01T10:59:21.777 回答
0

在我们声明我们不再是消费者之后,可以通过添加以下内容并仔细检查队列来防止竞争条件。

if (dataToSend.IsEmpty)
{
     //Declare that we are no longer the consumer.
     Interlocked.Decrement(ref RunningWrites);

     //Double check the queue to prevent race condition A
     if (Queue.IsEmpty)
         return;
     else
     {   //Race condition A occurred. There is data again.

         //Let's try to become a consumer.
         if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
               continue;

         //Another thread has nominated itself as the consumer. Our job is done.
         return;
     }                                    
}

break;
于 2013-03-01T19:08:32.267 回答