问题标签 [iasyncenumerable]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
876 浏览

c# - 为什么我的 IAsyncEnumerable在 EF Core 中调用不是异步枚举?

我正在编写一个 C# 方法,该方法从 SQL 查询(不是直接!)中流式传输大量行,DBSet<T>对它们执行一些转换,然后将结果写入 MongoDB 数据库。我试图让它尽快运行,并且由于网络延迟相当高,我想避免多次返回 SQL Server。

我有一个类,StreamlinedCrmTicket它表示一个 DTO,EF 将原始 SQL 查询的结果投影到该 DTO 上,该查询不接受参数化输入。我正在使用 EF Core 3.1.6 和.Set<StreamlinedCrmTicket>执行原始 SQL 查询的技术。.AsNoTracking()然后,鉴于这只是一个读取操作,我将其用于性能提升。最后,我调用.AsAsyncEnumerable(),并将整个 shebang 包装在一个await foreach中,而后者又存在于一个标记为 的方法中async

整个事情看起来像这样:

我的原始 SQL 查询的源表当前包含大约 120 万行。当使用 SSMS 测量时,有一些连接似乎对查询的执行时间几乎没有变化。

当我执行我的代码时,似乎 EF 启动了查询,但 foreach 循环的主体,无论它包含什么,都不会开始执行,直到整个查询已执行并从 SQL Server 接收到结果集。这违背了我使用 IAsyncEnumerable 的目的!我的理解是 IAsyncEnumerable 应该允许我在行(或实体)从数据库返回时对其进行操作,而无需等待整个结果集。

一些支持我的理论的想法,即目前这不是异步行为:

  • 一旦调用_affinityContext.Set<StreamlinedCrmTicket>().FromSqlRaw(query).AsNoTracking().AsAsyncEnumerable().WithCancellation(cancellationToken)完成,就会开始大量的网络 IO。我可以在我的 Windows 机器上的性能监视器中看到,IO 是与我的代码应该运行的服务器的 SQL Server 连接。
  • 我将foreach循环体换成了一个非常简单的循环体,它只在网络 IO 停止后运行。
  • ORDER BY从 SQL 查询中删除了所有子句——在这个用例中行排序无关紧要,我担心这可能会导致查询在返回第一行之前需要很长时间,从而产生同步运行的错觉。然而,网络 IO 表明这不是(而且不是 - 我把条款省略了!)的情况。
  • 如果我在查询中添加一个TOP 1000语句SELECT,它的执行速度会更快。

我不确定为什么这是同步运行的,而且微软网站上的文档似乎很差!

0 投票
2 回答
1145 浏览

c# - 如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息

我看过LINQ 上与 IAsyncEnumerable的聊天,这让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于现实世界的应用程序来说不够详细,尤其是就我的经验水平而言,我理解示例/ IAsyncEnumerables 的文档目前还不存在

我正在尝试从文件中读取,对流进行一些转换,返回 a IAsyncEnumerable,然后在获得任意数量的对象后将这些对象发送到下游,例如:

到目前为止,我对此事的理解是,该await foreach行正在处理使用Tasks(或ValueTasks)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。

我正在使用该System.Linq.Async软件包,希望可以使用相关的扩展方法。我可以看到一些TakeWhile以 .

任何帮助或朝着正确的方向推动将不胜感激,谢谢。

0 投票
1 回答
886 浏览

asynchronous - System.Threading.Channels ReadAsync() 方法正在阻塞执行

概述

我正在尝试IAsyncEnumerable<T>围绕接口编写一个包装器IObserver<T>。起初我使用 aBufferBlock<T>作为后备数据存储,但通过性能测试和研究发现它实际上是一个非常慢的类型,所以我决定System.Threading.Channels.Channel试一试。我的 BufferBlock 实现有一个与这个类似的问题,但这次我不知道如何解决它。

问题

如果我的方法尚未写入,我GetAsyncEnumerator()的循环会被await _channel.Reader.WaitToRead(token)调用阻塞。在不阻塞程序执行的情况下等待一个值在此上下文中可用的正确方法是什么?IObserver<T>.OnNext()_channel

执行

在此处输入图像描述

附加上下文

没关系,但在运行时IObservable<T>传递给构造函数的实例是CimAsyncResult从对 api 进行的异步调用返回的Microsoft.Management.Infrastructure。这些使用了我试图用花哨的新异步枚举模式包装的 Observer 设计模式。

编辑

更新了对调试器输出的日志记录,并OnNext()按照一位评论者的建议使我的方法异步/等待。你可以看到它永远不会进入while()循环。

0 投票
2 回答
600 浏览

c# - 在 C#8 IAsyncEnumerable 中并行化收益返回

我有一个返回异步枚举器的方法

和来电者:

因为DoWork这是一项昂贵的操作,我更喜欢以某种方式并行化它,所以它的工作方式类似于:

但是我不能从内部进行收益回报,Parallel.Foreach所以只是想知道最好的方法是什么?

返回结果的顺序无关紧要。

谢谢。

编辑:对不起,我遗漏了一些代码DoWorkAsync,它确实在等待我没有把它放在上面的代码中的东西,因为这与问题不太相关。现已更新

Edit2: DoWork在我的情况下,主要是 I/O 绑定,它从数据库中读取数据。

0 投票
1 回答
103 浏览

c# - 如何在实际迭代发生之前验证 IAsyncEnumerable 返回方法的参数?

我有一个简单的场景,我有一个具有以下方法的类:

是否可以ArgumentException在方法调用时准确地抛出GetEntities(),而不是在迭代的第一步之后,像这里:

我问是因为当我想返回IAsyncEnumerable到我的 API 控制器时,异常实际上是在框架代码中引发的。我没有机会抓住它,并返回一个 HTTP 404 BAD REQUEST 代码。当然,我可以在请求管道中拦截异常,但有时我想根据它们来自的抽象层将它们包装在其他异常中。

0 投票
2 回答
101 浏览

c# - 有没有办法在 IAsyncEnumerator 中使用 System.Diagnostics.Process?

首先,我的目标是 .Net Core 3.1 和 C#8。

我想要这样的东西。

当我尝试此设置时,我收到错误await foreach(string result in RunProcessAsync(args))

CS8417 'Process':异步 using 语句中使用的类型必须隐式转换为 'System.IAsyncDisposable' 或实现合适的 'DisposeAsync' 方法。

这个错误来自yield return e.Data;

CS1621 不能在匿名方法或 lambda 表达式中使用 yield 语句

目标是这样的。我有一个执行一些操作并将信息写入错误输出流的 exe(不确定这是否是它的真实名称)。我想在写入时接受这些写入,解析它们以获得我想要的信息并将它们存储在一个对象中以供以后使用。

我是一个非常新手的编码器,对异步编码非常陌生。我测试了RunProcessAsync但以同步方式的功能;它被调用的地方,只是将所有原始数据写入输出窗口,而不将任何数据返回给调用方法。那工作得很好。另外,我得到了一个使用 的测试 asyc 流IAsyncEnumerable,但它只是使用Task.Delay并返回了一些整数。现在我正在尝试将这些东西结合起来,而我缺乏经验正在妨碍我。

感谢大家提供的任何帮助以及帮助提高 C# 的技能和知识。

0 投票
3 回答
468 浏览

c# - 如何在 IAsyncEnumerable 发射器函数中正确使用 NpgsqlTransaction?

我不需要捕获异常,但如果有异常,我确实需要回滚:

(此代码是我正在做的简化版本,用于说明目的)

这失败并显示以下消息:

不能在带有 catch 子句的 try 块的主体中​​产生值


这类似于从 try/catch 块返回的 Yield,但它具有新颖的上下文:

  • “IAsyncEnumerable”相对较新。
  • Postgresql(答案使用内部属性)
  • 这个问题有一个更好的标题,明确提到“交易”上下文。具有相同错误消息的其他上下文不会有相同的答案。

这与为什么不能在带有 catch 的 try 块中出现 yield return 不同?. 就我而言,上下文更具体:我需要 catch 块来回滚,而不是做任何其他事情。此外,如您所见,我已经知道答案并将其创建为问答组合。正如您从答案中看到的那样,该答案与为什么不能在带有 catch 的 try 块内出现 yield return 无关?

0 投票
1 回答
779 浏览

parallel-processing - 如何使用 IAsyncEnumerable 进行并行化

我有一种情况,我正在启动Task-s,我希望他们的结果以某种方式尽快在数据结构中传输/排队,而不关心他们的顺序。

IAsyncEnumerable适合这种情况吗?

没有IAsyncEnumerable

IAsyncEnumerable

是否有任何性能提升,IAsyncEnumerable因为我最终希望在所有调用完成之前不进一步使用算法async,但我希望它们并行执行,因此而不是按顺序等待所有它们,我想只需等待最长的完成即可。

PS在这种情况下,我需要ConcurrentQueue/locking 吗?

0 投票
2 回答
554 浏览

c# - 可以为多个消费者缓存 IAsyncEnumerable 吗?

我正在研究用 IAsyncEnumerable 替换某些常规 C# 事件模式实例的效果。这将通过延迟实例化/激活 IAsyncEnumerable 并缓存该引用以供所有调用者/侦听器使用来完成。一些快速测试(见下文)表明这是可行的,但我还没有看到其他以这种方式使用 IAsyncEnumerable 的在线示例。

我意识到这并不是创建 IAsyncEnumerable 的目的,在这种情况下,大多数人会提倡 ReactiveX ( https://github.com/dotnet/reactive )。但是,我希望分析一下为什么要或不希望按照描述的方式执行此操作(而不仅仅是如何使用 Rx 执行此操作)。我在下面提供了几个例子。我的候选事件模式替换是一个事件流(例如从串行连接或 UDP 套接字等产生的反序列化消息)

示例 1:

这会产生我希望作为此模式的用户的输出:

前面的示例将每个使用者放在不同的线程上,但根据上下文(可能是 WPF 应用程序),同一线程上可能有多个使用者(IEnumerable 不可能,但使用 IAsyncEnumerable 打开门)。以下是在控制台应用程序中,但可以想象在 WPF 应用程序的 UI 线程上创建生产者和消费者。

示例 2:

同样,这是我作为用户想要的输出:

像这样的模式固有的一个好处是消费者/调用者可以让他们的回调/item-of-type-T-handling-code 出现在他们自己的 SynchronizationContext 中。SerialPort 或 Timer 或其他来源的事件通常会发生在后台线程上,而用户(尤其是在 UI 线程上)可能需要执行他们自己的同步。在这种情况下,UI 线程上的使用者总是可以让他们的代码发生在 UI 线程上,而控制台应用程序中的用户将让它发生在线程池上。

我错过了什么吗?

0 投票
3 回答
295 浏览

c# - 如何使用 2 个数据源返回异步流

我有以下函数,它以异步流的形式返回标准输出数据,该数据由运行System.Diagnostics.Process. 当前方法中的所有内容都按预期工作;我可以在一个循环中调用它,await foreach()我得到的每一行输出都是由外部 exe 生成的。

我的问题是现在我需要它来返回标准输出和标准错误结果。我创建了这个类来保存每个流中的数据。

并变成ProcessAsyncStream()了这样

问题是,如果任何一个缓冲区在另一个缓冲区完成之前完成,而不是该方法挂起,因为该缓冲区.Receive()没有任何数据要接收。如果我将while条件更改为&&then 我将不会从另一个缓冲区获取所有数据。

有什么建议么?