64

看了一堆 LINQ 相关的东西后,我突然意识到没有文章介绍如何编写异步 LINQ 查询。

假设我们使用 LINQ to SQL,下面的语句很清楚。但是,如果 SQL 数据库响应缓慢,那么使用这块代码的线程就会受到阻碍。

var result = from item in Products where item.Price > 3 select item.Name;
foreach (var name in result)
{
    Console.WriteLine(name);
}

似乎当前的 LINQ 查询规范不对此提供支持。

有没有办法做异步编程LINQ?当结果准备好使用时,它的工作方式就像有一个回调通知,而 I/O 没有任何阻塞延迟。

4

4 回答 4

38

虽然 LINQ 本身并没有真正具有此功能,但框架本身确实...您可以轻松地在 30 行左右滚动您自己的异步查询执行器...事实上,我只是为您把它放在一起:)

编辑:通过写这篇文章,我发现了他们为什么没有实现它。它不能处理匿名类型,因为它们是本地的。因此,您无法定义回调函数。 这是一件非常重要的事情,因为很多 linq to sql 的东西在 select 子句中创建它们。以下任何建议都遭受同样的命运,所以我仍然认为这个是最容易使用的!

编辑:唯一的解决方案是不使用匿名类型。您可以将回调声明为仅采用 IEnumerable(无类型参数),并使用反射来访问字段(ICK !!)。另一种方法是将回调声明为“动态”......哦......等等......这还没有。:) 这是另一个很好的例子,说明如何使用动态。有些人可能称之为滥用。

将其放入您的实用程序库中:

public static class AsynchronousQueryExecutor
{
    public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback)
    {
        Func<IEnumerable<T>, IEnumerable<T>> func =
            new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
        IEnumerable<T> result = null;
        IAsyncResult ar = func.BeginInvoke(
                            query,
                            new AsyncCallback(delegate(IAsyncResult arr)
                            {
                                try
                                {
                                    result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                }
                                catch (Exception ex)
                                {
                                    if (errorCallback != null)
                                    {
                                        errorCallback(ex);
                                    }
                                    return;
                                }
                                //errors from inside here are the callbacks problem
                                //I think it would be confusing to report them
                                callback(result);
                            }),
                            null);
    }
    private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
    {
        foreach (var item in query) //the method hangs here while the query executes
        {
            yield return item;
        }
    }
}

你可以像这样使用它:

class Program
{

    public static void Main(string[] args)
    {
        //this could be your linq query
        var qry = TestSlowLoadingEnumerable();

        //We begin the call and give it our callback delegate
        //and a delegate to an error handler
        AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError);

        Console.WriteLine("Call began on seperate thread, execution continued");
        Console.ReadLine();
    }

    public static void HandleResults(IEnumerable<int> results)
    {
        //the results are available in here
        foreach (var item in results)
        {
            Console.WriteLine(item);
        }
    }

    public static void HandleError(Exception ex)
    {
        Console.WriteLine("error");
    }

    //just a sample lazy loading enumerable
    public static IEnumerable<int> TestSlowLoadingEnumerable()
    {
        Thread.Sleep(5000);
        foreach (var i in new int[] { 1, 2, 3, 4, 5, 6 })
        {
            yield return i;
        }
    }

}

现在打算把它放在我的博客上,非常方便。

于 2008-10-31T02:05:54.047 回答
16

TheSoftwareJedi 和ulrikb的(又名 user316318)解决方案适用于任何 LINQ 类型,但(正如 Chris Moschini所指出的)不会委托给利用 Windows I/O 完成端口的底层异步调用。

Wesley Bakker 的Asynchronous DataContext文章(由Scott Hanselman 的博客文章触发)描述了使用 sqlCommand.BeginExecuteReader/sqlCommand.EndExecuteReader 的 LINQ to SQL 类,它利用了 Windows I/O 完成端口。

I/O 完成端口为在多处理器系统上处理多个异步 I/O 请求提供了一个高效的线程模型。

于 2011-07-03T08:21:16.227 回答
7

根据Michael Freidgeim 的回答和 Scott Hansellman 提到的博客文章async以及您可以使用/的事实,await您可以实现可重用的方法,该方法以异步ExecuteAsync<T>(...)方式执行底层:SqlCommand

protected static async Task<IEnumerable<T>> ExecuteAsync<T>(IQueryable<T> query,
    DataContext ctx,
    CancellationToken token = default(CancellationToken))
{
    var cmd = (SqlCommand)ctx.GetCommand(query);

    if (cmd.Connection.State == ConnectionState.Closed)
        await cmd.Connection.OpenAsync(token);
    var reader = await cmd.ExecuteReaderAsync(token);

    return ctx.Translate<T>(reader);
}

然后你可以像这样(重新)使用它:

public async Task WriteNamesToConsoleAsync(string connectionString, CancellationToken token = default(CancellationToken))
{
    using (var ctx = new DataContext(connectionString))
    {
        var query = from item in Products where item.Price > 3 select item.Name;
        var result = await ExecuteAsync(query, ctx, token);
        foreach (var name in result)
        {
            Console.WriteLine(name);
        }
    }
}
于 2016-07-30T21:54:45.467 回答
4

我启动了一个名为Asynq的简单 github 项目来执行异步 LINQ-to-SQL 查询。这个想法很简单,尽管在这个阶段“脆弱”(截至 2011 年 8 月 16 日):

  1. 让 LINQ-to-SQLIQueryable完成DbCommand通过DataContext.GetCommand().
  2. 对于 SQL 200[058],从DbCommand您获得的抽象实例中向上GetCommand()转换以获得SqlCommand. 如果您使用的是 SQL CE,那么您就不走运了,因为SqlCeCommand它没有公开BeginExecuteReaderand的异步模式EndExecuteReader
  3. 使用BeginExecuteReaderEndExecuteReader关闭SqlCommand使用标准 .NET 框架异步 I/O 模式来让自己DbDataReader在传递给BeginExecuteReader方法的完成回调委托中获得一个。
  4. 现在我们有了一个DbDataReader,我们不知道它包含哪些列,也不知道如何将这些值映射回IQueryable's ElementType(在连接的情况下很可能是匿名类型)。当然,此时您可以手动编写自己的列映射器,将其结果具体化回您的匿名类型或其他类型。您必须为每种查询结果类型编写一个新的,具体取决于 LINQ-to-SQL 如何处理您的 IQueryable 以及它生成的 SQL 代码。这是一个非常讨厌的选项,我不推荐它,因为它不可维护,也不总是正确的。LINQ-to-SQL 可以根据您传入的参数值更改您的查询形式,例如query.Take(10).Skip(0)生成不同的 SQLquery.Take(10).Skip(10),也许还有不同的结果集架构。最好的办法是以编程方式处理这个物化问题:
  5. “重新实现”一个简单的运行时对象物化器,它DbDataReader根据. 正确实现这一点可能是该解决方案中最具挑战性的部分。ElementTypeIQueryable

正如其他人发现的那样,该DataContext.Translate()方法不处理匿名类型,只能将 aDbDataReader直接映射到属性正确的 LINQ-to-SQL 代理对象。DataContext.Translate()由于大多数值得在 LINQ 中编写的查询都将涉及复杂的连接,最终不可避免地需要匿名类型用于最终的 select 子句,因此无论如何使用这种提供的淡化方法是毫无意义的。

在利用现有成熟的 LINQ-to-SQL IQueryable 提供程序时,此解决方案存在一些小缺点:

  1. 您不能将单个对象实例映射到您的最终选择子句中的多个匿名类型属性IQueryable,例如from x in db.Table1 select new { a = x, b = x }。LINQ-to-SQL 在内部跟踪哪些列序号映射到哪些属性;它不会将此信息公开给最终用户,因此您不知道其中的哪些列DbDataReader被重用,哪些是“不同的”。
  2. DbDataReader您不能在最终IQueryable的select 子句中包含常量值 - 这些不会被翻译成 SQL 并且不会出现Expression在麻烦,根本没有道理。

我确信还有其他查询模式可能会中断,但这是我能想到的两个最大的可能会导致现有 LINQ-to-SQL 数据访问层出现问题的模式。

这些问题很容易解决 - 只需不要在查询中执行它们,因为这两种模式都不会对查询的最终结果带来任何好处。希望这个建议适用于所有可能导致对象实现问题的查询模式:-P。无法访问 LINQ-to-SQL 的列映射信息是一个很难解决的问题。

解决问题的更“完整”的方法是有效地重新实现几乎所有的 LINQ-to-SQL,这有点耗时:-P。从高质量的开源 LINQ-to-SQL 提供程序实施开始将是一个很好的方法。您需要重新实现它的原因是,您可以访问所有用于将DbDataReader结果具体化到对象实例的列映射信息,而不会丢失任何信息。

于 2011-08-16T21:15:07.947 回答