问题标签 [system.threading.channels]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
772 浏览

c# - 如果在 X 分钟内没有新项目进入“频道”,如何读取“频道”中小于批量大小的剩余项目

我正在使用ChannelfromSystem.Threading.Channels并希望批量读取项目(5 个项目),我有如下方法,

在 asp.net 核心后台服务中,我正在使用它,如下所示,

这是有效的,只要有 5 个项目Channel,我就会得到range

问题是,当只剩下 2 个项目Channel并且自最后 10 分钟以来没有项目出现时Channel,那么如何读取剩余的 2 个项目Channel

0 投票
0 回答
204 浏览

c# - 如果数据库调用失败,我们可以将项目移回 Channel

我在 asp.net core 3.1 应用程序中使用Channelsystem.threading.channels我不断地写入通道并从中读取。

当我从 中读取项目时channel,我正在调用数据库来保存该项目。

问题是,如果数据库调用失败,我可以移动到 item back tochannel重新读取它吗?

0 投票
1 回答
316 浏览

c# - 是否“system.threading.channels”将解决跨进程应用程序的 pub sub 问题

我有一个 asp.net 核心应用程序“A”,它每 1 分钟在一个文件夹中生成文件。

应用程序“B”想要一个通知或文件详细信息我们生成了什么文件以及该文件的一些哈希信息。基于此通知,应用程序“B”想要处理文件。

我正在考虑一些发布/订阅机制,我想要非常轻量级的组件,其中应用程序“A”将发布文件相关信息,应用程序“B”将订阅和收听。

“system.threading.channels”会解决这个问题吗?

0 投票
1 回答
83 浏览

c# - 为什么我的工人工作分配计数不计此 System.Threading.Channel 示例中生产的项目数?

这篇文章之后,我一直在玩System.Threading.Channel以获得足够的信心并在我的生产代码中使用它,替换Threads/Monitor.Pulse/Wait我目前使用的基于方法(在引用的帖子中描述)。

基本上,我创建了一个有界通道的示例,在其中我在开始时运行了几个生产者任务,并且无需等待,就启动了我的消费者任务,这些任务开始从通道中推送元素。在等待生产者任务完成后,我会向通道发出完成信号,因此消费者任务可以退出监听新的通道元素。

我的频道是 a Channel<Action>,在每个操作中,我都会增加WorkDistribution并发字典中每个给定工作人员的计数,并在示例结束时打印它,这样我就可以检查我消耗了多少我预期的项目,以及如何渠道在消费者之间分配动作。

由于某种原因,此“工作分配页脚”打印的项目数量与生产者任务生产的项目总数不同。

我错过了什么?添加的一些变量的唯一目的是帮助排除故障。

这是完整的代码:

这是 Program.cs 中的调用代码

0 投票
1 回答
95 浏览

.net-core - 使用 VS 2019 Worker 模板,后台服务正在退出

使用 .Net core 3.1 和 Visual Studio 2019 工作模板。

我按照网上找到的示例代码进行操作。后台任务 MessageReadingService 从 Amazon SQS 读取消息。消息被写入 ChannelReader。后台任务 MessageProcessingService,从 ChannelReader 读取消息并对其进行处理。

这对第一条消息按预期工作。在 MessageProcessingService 中设置断点会在第一条消息之后停止,但不会再停止。我假设任务正在退出。

我的代码遵循示例代码的模式(效果很好)。任何的意见都将会有帮助。也许是一种调试技术?谢谢!

0 投票
1 回答
529 浏览

c# - 将 Channels 与 SignalR 服务器到客户端流式传输一起使用时,是否保证将服务器端 Complete 传递给客户端?

我正在使用System.Threading.Channel.NET 客户端进行 SignalR 服务器到客户端的流式传输。用法相当基本,类似于介绍性文档中描述的内容。

集线器代码与此类似:

和客户端类似:

当我的集线器方法完成流式传输时,它会调用Complete()它的ChannelWriter. SignalR 大概是在内部看到Complete对相应 的调用ChannelReader,将其转换为内部 SignalR 消息并将其传递给客户端。然后,SignalR将客户端自己ChannelReader的代码标记为完成,我的客户端代码将自己的工作封装在流上。

从服务器到客户端的“已完成”通知是否可以保证交付?在集线器向客户端广播非流式消息的其他情况下,它通常“触发并忘记”,但我必须假设调用Complete流式传输Channel已确认传递,否则客户端可能处于它的状态ChannelReader当服务器将流视为关闭时,无限期地保持流打开。

这个问题不太重要,但我问的原因是我试图缩小这种情况,即消耗 SignalR 流接口的数据流管道偶尔会挂起,而且似乎唯一挂起的地方是SignalR 客户端的某处。

0 投票
1 回答
130 浏览

c# - 如何在并发字典中存储通道

这是一个相当棘手的问题,但我试图理解的概念相当简单。

我有一个公共静态并发字典,可以从其他类和线程访问:

我有一个特定的后台线程,我最初创建一个新的 C# Channel 并将实例存储在字典中。在同一个线程中(在同一个方法中),我开始异步等待消息。从方法中删除了一些不必要的代码膨胀:

第一个绊脚石是我无法将静态字典定义为“对象”作为通道,即 ConcurrentDictionary<int, Channel>... 因为我会收到警告信号“通道:静态类型不能用作类型参数" 所以这就是为什么我声明了一个对象类型。

这似乎在没有 VS 代码警告的情况下工作,直到我尝试从另一个类(以及另一个线程)访问字典对象:

在此处输入图像描述

我不知何故需要将对象投射到我试图访问的频道,以便我可以向它写一条消息,或者其他一些防止警告的解决方案。我是频道的新手,但立即看到使用它们会有多么强大。

0 投票
1 回答
260 浏览

c# - ChannelReader 完成任务在 OperationCanceledException 之后永远不会完成

如果我打电话Stop()OperationCanceledException是发生了,_writer.TryComplete(exp)是真的。但是_reader.Completion Task还没有完成。

这是频道的期望行为吗?如果是的话,有人可以告诉我如何在Channel不等到它为空并使其Completion Task处于Completed状态的情况下停止它吗?

0 投票
2 回答
788 浏览

c# - ChannelReader.ReadAllAsync(CancellationToken) 实际上并未取消中间迭代

我一直在研究一个在频道中排队耗时的工作的功能,在那里我使用例如迭代频道 await foreach(var item in channel.Reader.ReadAllAsync(cancellationToken)) {...}

我期待当通过 that 请求取消时cancellationTokenReadAllAsync会在取消之后的第一次迭代中抛出。

在我看来,情况并非如此。循环一直持续到处理完所有项目,然后抛出一个OperationCanceledException.

这看起来有点奇怪,至少可以这么说。从ChannelReadergithub repo可以看到取消令牌标记有[EnumeratorCancellation]属性,因此应该将其传递给周围生成的状态机yield return item;(如果我错了,请纠正我)。

我的问题是,这是(有点)正常的行为ReadAllAsync(CancellationToken),还是我错过了什么?

这是一个演示问题的简单测试代码(在 dotnetfiddle 上尝试):

这是上面的输出。请注意在中间取消项目后如何继续获取项目:

0 投票
3 回答
846 浏览

c# - 具有 CancellationTokenSource 的通道在处理后具有超时内存泄漏

完整的可重现代码在 github 上,启动可执行文件后内存很快就会飙升。代码主要位于AsyncBlockingQueue.cs类中。

以下代码实现了一个简单的异步“阻塞”队列:

以这种方式使用时,会出现内存泄漏:

在此处输入图像描述