这就是为什么一劳永逸的任务通常是一个坏主意。在您的情况下,它们甚至更糟糕,因为您没有将添加内容包装try/catch
在块中,records.CompleteAdding
这finally
意味着MoveNext
您对枚举数的调用GetConsumingEnumerable
最终将无限期地阻塞 - 这很糟糕。
如果您完全在 C# 的范围内操作,那么解决方案将很简单:更好地分离关注点。您剥离该BlockingCollection
位并在其所属的位置运行它:在消费者(客户端)或中间流水线处理阶段(这最终是您要实现的目标),其设计方式将保持对任何生产者抛出的异常。然后你的GetData
签名保持不变,但它变成了一个简单的阻塞枚举,具有完整的异常传播:
public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
Boolean isMoreData = false;
do
{
// make server request and process response
// this block can throw
yield return response.record;
isMoreData = response.IsMoreData;
}
while (isMoreData);
}
然后管道看起来像这样:
var records = new BlockingCollection<DataRecord>();
var producer = Task.Run(() =>
{
try
{
foreach (var record in GetData("http://foo.com/Service", 22))
{
// Hand over the record to the
// consumer and continue enumerating.
records.Add(record);
}
}
finally
{
// This needs to be called even in
// exceptional scenarios so that the
// consumer task does not block
// indefinitely on the call to MoveNext.
records.CompleteAdding();
}
});
var consumer = Task.Run(() =>
{
foreach (var record in records.GetConsumingEnumerable())
{
// Do something with the record yielded by GetData.
// This runs in parallel with the producer,
// So you get concurrent download and processing
// with a safe handover via the BlockingCollection.
}
});
await Task.WhenAll(producer, consumer);
现在你可以吃蛋糕了:处理是并行发生的,因为记录是由 产生的GetData
,并且await
生产者任务传播任何异常,而CompleteAdding
在内部调用finally
确保你的消费者不会无限期地陷入阻塞状态。
由于您正在使用 C++,上述内容在一定程度上仍然适用(也就是说,正确的做法是在 C++ 中重新实现管道),但实现可能不是那么漂亮,而且您已经走了有了 ,您的答案很可能是首选的解决方案,即使由于未观察到的任务而感觉像是黑客攻击。我真的无法想到它实际上会出错的场景,因为CompleteAdding
由于新引入的try/catch
.
显然,另一种解决方案是将处理代码移动到您的 C# 项目中,这可能会也可能不会,具体取决于您的架构。