我想要实现的是拥有一个消费者生产者方法。可以有许多生产者,但只有一个消费者。由于可扩展性,不可能有专门的消费者,所以想法是让生产者在有数据要消费并且当前没有活动消费者的情况下启动消费过程。
1. Many threads can be producing messages. (Asynchronous)
2. Only one thread can be consuming messages. (Synchronous)
3. We should only have a consumer in process if there is data to be consumed
4. A continuous consumer that waits for data would not be efficient if we add many of these classes.
在我的示例中,我有一组发送数据的方法。多个线程可以写入数据Write()
,但只有其中一个线程会循环并发送数据SendNewData()
。之所以只有一个循环可以写入数据,是因为数据的顺序必须是同步的,而且在AsyncWrite()
我们无法控制的情况下,我们只能通过一次运行一个来保证顺序AyncWrite()
。
我遇到的问题是,如果一个线程被调用来Write()
生产,它会将数据排队并检查Interlocked.CompareExchance
是否有消费者。如果它看到另一个线程在循环中已经在消费,它会假设这个消费者将发送数据。如果该循环线程消费者位于“Race Point A”,这是一个问题,因为该消费者已经检查了没有更多消息要发送并且即将关闭消费进程。
有没有办法在不锁定大部分代码的情况下防止这种竞争条件。真实场景有很多队列,比这复杂一点。
在真正的代码List<INetworkSerializable>
中实际上是一个 byte[] BufferPool。我使用 List 作为示例,以使该块更易于阅读。
由于有 1000 多个此类同时处于活动状态,我无法承受 SendNewData 使用专用线程连续循环。只有在有数据要发送时,循环线程才应该处于活动状态。
public void Write(INetworkSerializable messageToSend)
{
Queue.Enqueue(messageToSend);
// Check if there are any current consumers. If not then we should instigate the consuming.
if (Interlocked.CompareExchange(ref RunningWrites, 1, 0) == 0)
{ //We are now the thread that consumes and sends data
SendNewData();
}
}
//Only one thread should be looping here to keep consuming and sending data synchronously.
private void SendNewData()
{
INetworkSerializable dataToSend;
List<INetworkSerializable> dataToSendList = new List<INetworkSerializable>();
while (true)
{
if (!Queue.TryDequeue(out dataToSend))
{
//Race Point A
if (dataToSendList.IsEmpty)
{
//All data is sent, return so that another thread can take responsibility.
Interlocked.Decrement(ref RunningWrites);
return;
}
//We have data in the list to send but nothing more to consume so lets send the data that we do have.
break;
}
dataToSendList.Add(dataToSend);
}
//Async callback is WriteAsyncCallback()
WriteAsync(dataToSendList);
}
//Callback after WriteAsync() has sent the data.
private void WriteAsyncCallback()
{
//Data was written to sockets, now lets loop back for more data
SendNewData();
}