5

我有以下代码:

innerExceptions = dbconnByServer
    .AsParallel()
    .WithDegreeOfParallelism(dbconnByServer.Count)
    // A stream of groups of server connections proceeding in parallel per server
    .Select(dbconns => dbconns.Select(dbconn => m_sqlUtilProvider.Get(dbconn)))
    // A stream of groups of SqlUtil objects proceeding in parallel per server
    .Select(sqlUtils => GetTaskException(sqlUtils
        // Aggregate SqlUtil objects to form a single Task which runs the SQL asynchronously for the first SqlUtil, then upon completion
        // for the next SqlUtil and so long until all the SqlUtil objects are processed asynchronously one after another.
        .Aggregate<ISqlUtil, Task>(null, (res, sqlUtil) =>
        {
            if (res == null)
            {
                return sqlUtil.ExecuteSqlAsync(SQL, parameters);
            }
            return res.ContinueWith(_ => sqlUtil.ExecuteSqlAsync(SQL, parameters)).Unwrap();
        })))
    .Where(e => e != null)
    .ToList();

在哪里:

private static Exception GetTaskException(Task t)
{
    try
    {
        t.Wait();
        return null;
    }
    catch (Exception exc)
    {
        return exc;
    }
}

这段代码所做的是跨多个数据库连接执行某些 SQL 语句,其中一些连接可能属于一个数据库服务器,而其他连接可能属于另一个数据库服务器,依此类推。

该代码确保满足两个条件:

  1. SQL 语句在可用的数据库服务器上并行运行。
  2. 在同一个数据库服务器中,SQL 语句异步运行,但按顺序运行。

给定每个数据库服务器的 N 个数据库连接,在聚合结束时将有一个连接Task,执行具有以下效果:

  • 为 db conn 1 执行 SQL
    • 完成上一步后,为 db conn 2 执行 SQL
      • 完成前面的操作后,为 db conn 3 执行 SQL
      • ...
        • 完成上一步后,为 db conn N 执行 SQL

我的问题是,除了第一个数据库连接之外,现在异常丢失了。我知道我应该检查_参数并以某种方式处理_.Exception延续函数内的属性。我想知道是否有一种优雅的方式来做到这一点。

有任何想法吗?

4

1 回答 1

1
  1. 如果你打算使用异步方法,那么整个方法应该是异步的,并且不应该阻塞任何线程。
  2. 如果您不打算阻塞,则没有太多理由使用 PLINQ,从单个线程设置延续应该足够快。
  3. 如果您想在发生异常时继续,您必须自己将异常存储在某个地方。
  4. 我对处理一组异常而不抛出它们感到有点不安,但我想这对于一个可能部分失败和部分成功的操作来说是可以的。

这样,我的代码将如下所示:

public Task<IEnumerable<Exception>> ExecuteOnServersAsync(
    IList<IEnumerable<Connection>> dbConnByServer,
    string sql, object parameters)
{
    var tasks = new List<Task>();
    var exceptions = new ConcurrentQueue<Exception>();

    Action<Task> handleException = t =>
    {
        if (t.IsFaulted)
            exceptions.Enqueue(t.Exception);
    };

    foreach (var dbConns in dbConnByServer)
    {
        Task task = null;

        foreach (var dbConn in dbConns)
        {
            var sqlUtil = m_sqlUtilProvider.Get(dbConn);

            if (task == null)
            {
                task = sqlUtil.ExecuteSqlAsync(sql, parameters);
            }
            else
            {
                task = task.ContinueWith(
                    t =>
                    {
                        handleException(t);
                        return sqlUtil.ExecuteSqlAsync(sql, parameters);
                    }).Unwrap();
            }
        }

        if (task != null)
        {
            task = task.ContinueWith(handleException);
            tasks.Add(task);
        }
    }

    return Task.Factory.ContinueWhenAll(
        tasks.ToArray(), _ => exceptions.AsEnumerable());
}
于 2013-06-08T13:22:28.100 回答