问题标签 [system.threading.channels]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
291 浏览

c# - 如何以正确的方式完成频道?如何使用多个渠道?

我有一个数据列表,想要创建与列表中元素数量相对应的任务数量。但我不知道如何正确完成频道。

我的代码,但通道没有像我预期的那样关闭。

如果我想为每个任务创建单独的频道,我该如何完成它们?请给我一个例子。

0 投票
0 回答
69 浏览

c# - 使用多个渠道,我做错了什么?

我想创建一个任务数组,称为 listTask,listTask 的每个元素都是 A 类型的任务,A 类型的任务由函数 Task.WhenAll 创建。然后我等待 Task.WhenAll(listTask) 但程序不执行 listTask 数组中的工作。我设置了调试,那些任务都完成了,我不明白为什么

我似乎已经提前完成了任务。有时它可以工作,但我不知道为什么作业总是在没有在列表任务代码中运行的情况下完成。

0 投票
2 回答
127 浏览

c# - 通过回调消息代理将推送公开为 IAsyncEnumerable

我正在使用第三方库,它充当发布-订阅消息代理的接口。经纪人是 Solace PubSub+。

对于订阅者,供应商库采用“通过回调推送消息”模式。

我正在围绕供应商库编写自己的包装库,以使其他开发人员更容易使用(隐藏库如何与网络通信的所有内部结构等等)。

同样,我认为将订阅者提要公开IAsyncEnumerableSystem.Threading.Channels. 我有两个担忧:

  1. 这里的渠道合适吗,还是我过度设计了这个?即,是否有更“C# 惯用”的方式来包装回调?
  2. 我的EnumerableBroker包装器实现是安全的,还是我在某个地方陷入了异步陷阱?

我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题有关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免IObservable/Rx,因为我的目标是让我的界面比供应商的基本,而不是要求其他开发人员和我自己学习 Rx!了解生产者和消费者进程是如何独立的对于中间的通道也是微不足道的,而对于一个可观察的,我的第一个心理过程是“好的,生产者和消费者仍然独立吗?乍一看,我必须现在学习调度程序......天哪,我只使用一个await foreach怎么样?”

这是一个没有 的消费消息的最小模型EnumerableBroker

现在有了最小的复制EnumerableBroker(仍然使用上面列出的相同的模拟Broker类)。这里至少有一个好处似乎是,如果订阅者需要做很多工作来处理消息,它不会占用代理的线程 - 至少在缓冲区填满之前。这似乎可以正常工作,但我已经学会警惕我对异步的有限掌握。

0 投票
2 回答
309 浏览

c# - 引导多个生产者和消费者

我有以下代码:

哪个可以正常工作,但是我希望它在生产的同时消费。然而

阻止消费者运行直到它完成,我想不出一种让它们都运行的方法?

0 投票
2 回答
108 浏览

c# - 给定一个可以停止和启动的外部生产者 API,当本地缓冲区已满时有效地停止生产者

假设我提供了一个由 、 和 方法组成的事件生成器 API Start()Pause()以及Resume()一个ItemAvailable事件。生产者本身是外部代码,我无法控制它的线程。一些项目在被调用后可能仍然通过Pause()(生产者实际上是远程的,所以项目可能已经在网络上传输)。

还假设我正在编写消费者代码,其中消费可能比生产慢。

关键要求是

  1. 消费者事件处理程序不得阻塞生产者线程,并且
  2. 必须处理所有事件(不能删除任何数据)。

我在消费者中引入了一个缓冲区来消除一些突发性。但是在扩展突发的情况下,我想调用Producer.Pause(),然后Resume()在适当的时候调用,以避免在消费者端耗尽内存。

我有一个解决方案,Interlocked用于增加和减少一个计数器,该计数器与一个阈值进行比较以确定是时间到Pause还是Resume.

问题:就效率(和优雅)而言,有没有比Interlocked计数器(在下面的代码中)更好的解决方案?int current

更新的 MVP(不再从限制器中反弹):

0 投票
2 回答
87 浏览

c# - 如何在 .NET 频道上提供计数

我有一个UnboundedChannel读者\多个作者,我希望能够使用Count属性,但由于CanCount属性始终为 false Count,因此会引发异常。我应该怎么做才能使它工作,我找不到文档中列出的任何限制。

0 投票
2 回答
88 浏览

c# - 使用 System.Threading.Channels 时未执行异步方法

由于某种原因,消费者和生产者任务中的代码似乎从未被执行过。我哪里错了?

0 投票
2 回答
126 浏览

c# - 使用 System.Threading.Channels.Channel 中的所有消息

假设我有很多生产者,1 个消费者未绑定 Channel,有一个消费者:

问题是该consume函数会进行一些 IO 访问,并且可能还会进行一些网络访问,因此在消耗 1 条消息之前可能会产生更多消息。但是由于IO资源不能并发访问,所以不能有很多消费者,也不能把consume函数扔到Task中就忘掉了。

consume功能可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有办法让消费者在尝试访问通道队列时获取通道队列中的所有消息,如下所示:

编辑:我能想到的 1 个选项是:

但我觉得应该是一个更好的方法来做到这一点。

0 投票
1 回答
39 浏览

c# - Channel Writer 需要 Task.Delay 并且不处理所有消息

我正在将消息添加到线程通道并不断从该通道读取。我注意到如果我不添加 aTask.Delay那么所有的消息都不会被处理。当它应该是 1000 时,程序将退出可能处理 10 条消息。

每次写入后添加一个Task.Delay似乎很hacky。有没有更好的方法来读取频道中的所有消息,而无需Task.Delay在每次写入后添加一个?

我的StartListener()方法有问题吗?

0 投票
1 回答
69 浏览

c# - ChannelReader.ReadAsync(CancellationToken) 和 ChannelWriter.WriteAsync(CancellationToken) 在发出令牌信号时不返回或抛出

编辑:我正在清理描述,因为我已经确定这也会影响WriteAsync,而不仅仅是ReadAsync......

如果其中一个调用当前正在阻塞——ReadAsync因为通道是空的,或者WriteAsync因为通道已满——那么发出取消令牌的信号不会导致向调用者返回执行。即它不返回值也不抛出。它只会永远阻塞。从另一个线程调用Complete通道将导致阻塞调用 throw ChannelClosedException,但我不清楚为什么发出信号的取消令牌是不够的。

为了进一步增加混淆,该代码实际上像 .NET Fiddle 一样工作,但在 Visual Studio 2019 内部或命令提示符下(两者都在 Windows 10 x64 上)不起作用。

在下面的示例代码中,取消注释Completemain 中的行将允许彻底关闭,但没有它,对 never 的调用将WriteAsync不会返回,因此对 never的调用也会Task.WaitAll返回。

和输出: