我正在尝试在.NET 中实现一个线程队列,但是当我通过测试运行它时遇到了一些麻烦。
允许该实现放弃线程处理的一些复杂性,因为它强制只有一个线程将项目放入队列中,并且只有一个线程将它们取出(这是设计使然)。
问题是有时,Take() 会跳过一个项目,就好像它从未存在过一样,在我的测试中,我会得到“预期:736 但原为:737”。我在这段代码中看不到会发生这种效果的任何地方。Put 只会放在最后一个项目之后(因此它不应该直接影响 this.m_Head),并且 Take 正在使用 Interlocked.Exchange 从头部获取项目。
此实施如何允许问题发生?
执行:
using System;
using System.Threading;
#pragma warning disable 420
namespace Tychaia.Threading
{
public class TaskPipeline<T>
{
private int? m_InputThread;
private int? m_OutputThread;
private volatile TaskPipelineEntry<T> m_Head;
/// <summary>
/// Creates a new TaskPipeline with the current thread being
/// considered to be the input side of the pipeline. The
/// output thread should call Connect().
/// </summary>
public TaskPipeline()
{
this.m_InputThread = Thread.CurrentThread.ManagedThreadId;
this.m_OutputThread = null;
}
/// <summary>
/// Connects the current thread as the output of the pipeline.
/// </summary>
public void Connect()
{
if (this.m_OutputThread != null)
throw new InvalidOperationException("TaskPipeline can only have one output thread connected.");
this.m_OutputThread = Thread.CurrentThread.ManagedThreadId;
}
/// <summary>
/// Puts an item into the queue to be processed.
/// </summary>
/// <param name="value">Value.</param>
public void Put(T value)
{
if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the input thread may place items into TaskPipeline.");
// Walk the queued items until we find one that
// has Next set to null.
var head = this.m_Head;
while (head != null)
{
if (head.Next != null)
head = head.Next;
if (head.Next == null)
break;
}
if (head == null)
this.m_Head = new TaskPipelineEntry<T> { Value = value };
else
head.Next = new TaskPipelineEntry<T> { Value = value };
}
/// <summary>
/// Takes the next item from the pipeline, or blocks until an item
/// is recieved.
/// </summary>
/// <returns>The next item.</returns>
public T Take()
{
if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId)
throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline.");
// Wait until there is an item to take.
var spin = new SpinWait();
while (this.m_Head == null)
spin.SpinOnce();
// Return the item and exchange the current head with
// the next item, all in an atomic operation.
return Interlocked.Exchange(ref this.m_Head, this.m_Head.Next).Value;
}
}
}
#pragma warning restore 420
失败的测试:
[Test]
public void TestPipelineParallelTo100()
{
var random = new Random();
var pipeline = new TaskPipeline<int>();
var success = true;
int expected = 0, actual = 0;
ThreadStart processor = () =>
{
pipeline.Connect();
for (int i = 0; i < 100; i++)
{
var v = pipeline.Take();
if (v != i)
{
success = false;
expected = i;
actual = v;
break;
}
Thread.Sleep(random.Next(1, 10));
}
};
var thread = new Thread(processor);
thread.Start();
for (int i = 0; i < 100; i++)
{
pipeline.Put(i);
Thread.Sleep(random.Next(1, 10));
}
thread.Join();
if (!success)
Assert.AreEqual(expected, actual);
}