2

我有一种方法可以从服务器中分块提取数据并将其返回以进行处理。我做了一些测量,发现在后台下载块并通过BlockingCollection<T>. 这允许客户端和服务器同时工作,而不是相互等待。

public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
    BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();

    Task.Run(
        () =>
        {
            Boolean isMoreData = false;
            do
            {
                // make server request and process response
                // this block can throw

                records.Add(response.record);
                isMoreData = response.IsMoreData;
            }
            while (isMoreData);

            records.CompleteAdding();
        });

    return records.GetConsumingEnumerable();
}

调用者(一个 C++/CLI 库)应该知道发生了异常,以便它可以重试或酌情退出。在最小限度地更改返回类型的同时将异常传播给调用者的最佳方法是什么?

4

2 回答 2

2

这就是为什么一劳永逸的任务通常是一个坏主意。在您的情况下,它们甚至更糟糕,因为您没有将添加内容包装try/catch在块中,records.CompleteAddingfinally意味着MoveNext您对枚举数的调用GetConsumingEnumerable最终将无限期地阻塞 - 这很糟糕。

如果您完全在 C# 的范围内操作,那么解决方案将很简单:更好地分离关注点。您剥离该BlockingCollection位并在其所属的位置运行它:在消费者(客户端)或中间流水线处理阶段(这最终是您要实现的目标),其设计方式将保持对任何生产者抛出的异常。然后你的GetData签名保持不变,但它变成了一个简单的阻塞枚举,具有完整的异常传播:

public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
    Boolean isMoreData = false;
    do
    {
        // make server request and process response
        // this block can throw

        yield return response.record;
        isMoreData = response.IsMoreData;
    }
    while (isMoreData);
}

然后管道看起来像这样:

var records = new BlockingCollection<DataRecord>();

var producer = Task.Run(() =>
{
    try
    {
        foreach (var record in GetData("http://foo.com/Service", 22))
        {
            // Hand over the record to the
            // consumer and continue enumerating.
            records.Add(record);
        }
    }
    finally
    {
        // This needs to be called even in
        // exceptional scenarios so that the
        // consumer task does not block
        // indefinitely on the call to MoveNext.
        records.CompleteAdding();
    }
});

var consumer = Task.Run(() =>
{
    foreach (var record in records.GetConsumingEnumerable())
    {
        // Do something with the record yielded by GetData.
        // This runs in parallel with the producer,
        // So you get concurrent download and processing
        // with a safe handover via the BlockingCollection.
    }
});

await Task.WhenAll(producer, consumer);

现在你可以吃蛋糕了:处理是并行发生的,因为记录是由 产生的GetData,并且await生产者任务传播任何异常,而CompleteAdding在内部调用finally确保你的消费者不会无限期地陷入阻塞状态。

由于您正在使用 C++,上述内容在一定程度上仍然适用(也就是说,正确的做法是在 C++ 中重新实现管道),但实现可能不是那么漂亮,而且您已经走了有了 ,您的答案很可能是首选的解决方案,即使由于未观察到的任务而感觉像是黑客攻击。我真的无法想到它实际上会出错的场景,因为CompleteAdding由于新引入的try/catch.

显然,另一种解决方案是将处理代码移动到您的 C# 项目中,这可能会也可能不会,具体取决于您的架构。

于 2014-07-09T22:59:48.487 回答
0

我发现的最简单的解决方案是返回一个DataResult上下文,在枚举其记录后可能包含一个异常。

public class DataResult
{
    internal DataResult(IEnumerable<DataRecord> records)
    {
        Records = records;
    }

    public IEnumerable<DataRecord> Records { get; private set; }
    public Exception Exception { get; internal set; }
}

public static DataResult GetData(String serverAddress, Int64 dataID)
{
    BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();
    DataResult result = new DataResult(records.GetConsumingEnumerable());

    Task.Run(
        () =>
        {
            try
            {
                Boolean isMoreData = false;
                do
                {
                    // make server request and process response
                    // this block can throw

                    records.Add(response.record);
                    isMoreData = response.IsMoreData;
                }
                while (isMoreData);
            }
            catch (Exception ex)
            {
                result.Exception = ex;
            }
            finally
            {
                records.CompleteAdding();
            }
        });

    return result;
}

如果出现异常,调用者 (C++/CLI) 可以重新抛出它。

void Caller()
{
    DataResult^ result = GetData("http://foo.com/Service", 22);

    foreach (DataRecord record in result->Records)
    {
        // process records
    }

    Exception^ ex = result->Exception;
    if (ex != nullptr)
    {
        throw ex;
    }
}
于 2014-07-09T16:30:37.543 回答