在尝试创建一个简单的管道时BlockingCollection<T>
,我使用 Task Parallel Library 遇到了一个可量化和可重复的问题ConcurrentQueue<T>
。GetConsumingEnumerable
简而言之,从一个线程将条目添加到默认值BlockingCollection<T>
(在引擎盖下依赖于 a ConcurrentQueue<T>
)并不能保证它们会BlockingCollection<T>
从另一个调用该GetConsumingEnumerable()
方法的线程中弹出。
我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只是将整数打印到屏幕上。
Timer1
负责排队工作项......它使用一个并发字典_tracker
,以便它知道它已经添加到阻塞集合中的内容。Timer2
只是记录BlockingCollection
&的计数状态_tracker
- START 按钮启动 a
Paralell.ForEach
,它简单地遍历阻塞集合GetConsumingEnumerable()
并开始将它们打印到第二个列表框。 - STOP 按钮停止
Timer1
阻止更多条目被添加到阻塞集合中。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
以下是事件的顺序:
- 按开始
- Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 0、1、2)
- ListBox2 随后更新了 3 条消息,间隔 1 秒
- 处理 0
- 处理 1
- 处理 2
- Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 3、4、5)
- ListBox2 用 2 条消息更新,间隔 1 秒
- 处理 3
- 处理 4
- 处理 5未打印...似乎已经“丢失”
- 按 STOP 以防止计时器 1 添加更多消息
- 等等...“处理中 5”仍然没有出现
您可以看到并发字典仍在跟踪 1 项尚未处理并随后从中删除_tracker
如果我再次按开始,则 timer1 开始添加更多 3 个条目,并行循环恢复活力,打印 5、6、7 和 8。
我完全不知道为什么会发生这种情况。再次调用 start 显然调用了一个 newtask,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable(),它神奇地找到了丢失的条目......我
为什么BlockingCollection.GetConsumingEnumerable()
不保证迭代添加到集合中的每个项目。
为什么添加更多条目随后会导致它“解开”并继续处理?