问题标签 [blockingcollection]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
492 浏览

c# - BlockingCollection,竞争条件?

我已经使用 BlockingCollection 实现了生产者/消费者模式,但它似乎并没有像我预期的那样阻塞。

我有一个线程从网络摄像头接收帧并将它们添加到 BlockingCollection

在另一个线程中,我引用了集合并使用处理框架

但是,正如您在下面看到的,它往往会抛出一个 InvalidOperationException 告诉我我正在拉的 Frame 正在其他地方使用。

img http://i17.photobucket.com/albums/b52/orubap/2012-03-24_020858.png

它并不总是立即发生,但我注意到它仅在队列为空或接近空时发生(即消费者比生产者快)所以我猜这与添加的第一个图像或拍摄的最后一张照片。任何想法为什么会发生这种情况?

0 投票
4 回答
12059 浏览

c# - 为什么迭代 GetConsumingEnumerable() 不能完全清空底层阻塞集合

在尝试创建一个简单的管道时BlockingCollection<T>,我使用 Task Parallel Library 遇到了一个可量化和可重复的问题ConcurrentQueue<T>GetConsumingEnumerable

简而言之,从一个线程将条目添加到默认值BlockingCollection<T>(在引擎盖下依赖于 a ConcurrentQueue<T>)并不能保证它们会BlockingCollection<T>从另一个调用该GetConsumingEnumerable()方法的线程中弹出。

我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只是将整数打印到屏幕上。

  • Timer1负责排队工作项......它使用一个并发字典_tracker,以便它知道它已经添加到阻塞集合中的内容。
  • Timer2只是记录BlockingCollection&的计数状态_tracker
  • START 按钮启动 a Paralell.ForEach,它简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。
  • STOP 按钮停止Timer1阻止更多条目被添加到阻塞集合中。

以下是事件的顺序:

  • 按开始
  • Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 0、1、2)
  • ListBox2 随后更新了 3 条消息,间隔 1 秒
    • 处理 0
    • 处理 1
    • 处理 2
  • Timer1 滴答和 ListBox1 立即更新 3 条消息(添加 3、4、5)
  • ListBox2 用 2 条消息更新,间隔 1 秒
    • 处理 3
    • 处理 4
    • 处理 5未打印...似乎已经“丢失”
  • 按 STOP 以防止计时器 1 添加更多消息
  • 等等...“处理中 5”仍然没有出现

缺少条目

您可以看到并发字典仍在跟踪 1 项尚未处理并随后从中删除_tracker

如果我再次按开始,则 timer1 开始添加更多 3 个条目,并行循环恢复活力,打印 5、6、7 和 8。

在随后的项目被推到它后面后返回条目

我完全不知道为什么会发生这种情况。再次调用 start 显然调用了一个 newtask,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable(),它神奇地找到了丢失的条目......我

为什么BlockingCollection.GetConsumingEnumerable()不保证迭代添加到集合中的每个项目。

为什么添加更多条目随后会导致它“解开”并继续处理?

0 投票
2 回答
547 浏览

c# - 我们有什么比 BlockingCollection 更好的异步执行方法呢?

我写了这样的例子来测量异步执行的 BlockingCollection 有多快

结果令人失望:

平均而言,我花费了大约 50 微秒,但有时我花费了高达 600 微秒!

即使使用我的慢速 Pentium U5400,我希望它应该是几个常数,不超过 10 微秒,并且永远不会超过 10 微秒。

.NET 有什么更快的异步执行方法?在安排异步执行之后,我需要它尽快启动。这是对财务时间敏感的计算。

Blocking collection garantees order and garantees that items will be processing a one, so this question实际上包含两个问题

  1. 如果我需要订单并且我需要按照物品出现的顺序处理物品,我们是否有更快的速度?即我需要先进先出查询。
  2. 如果我不关心订单并且不关心项目是逐个处理还是并行处理,我们是否有更快的速度?

我想答案是:

  1. 不,我必须使用 BlockingCollection 作为参考,使用 BlockingCollection<T> 作为单生产者、单消费者 FIFO 查询是否很好?

  2. 我可能可以尝试代表?http://msdn.microsoft.com/en-us/library/2e08f6yc.aspx

0 投票
1 回答
347 浏览

c# - 包含不同类型的阻塞集合而不进行强制转换的字典

MyDict<Type, BlockingCollection<"differentTypes">可以通过规避任何强制转换从而防止使用对象或动态类型的接口来设置字典吗?

我想使用它的原因是因为我想在运行时初始化不同类型的阻塞集合,因为传入的对象是不同的类型,然后我想将这些传入的对象添加到类型匹配的阻塞集合中。我还想稍后访问 BlockingCollection 中的对象而不必强制转换。Blocking 集合的必要类型在包装类的实例化时是已知的。传入对象的类型仅在运行时已知。传入的对象类型永远不会与类实例化时已知的类型不同。我想从匹配的 BlockingCollection 中检索的对象的类型也仅在运行时才知道。有什么方法可以设置它而无需投射?

我浏览了许多相关问题,但没有找到此处描述的完全相同的规格。

0 投票
3 回答
308 浏览

c# - 当我尝试将它们添加到列表时,BlockingCollection 自动执行我的功能

我不希望它执行现在添加的功能。稍后我将使用业务逻辑手动执行此操作。!!!

0 投票
2 回答
1064 浏览

wcf - 任务持久性 C#

我很难让我的任务保持持久性并从 WCF 服务无限期地运行。我可能以错误的方式这样做,并愿意接受建议。

我有一个任务开始处理放入 BlockingCollection 的任何传入请求。据我了解,GetConsumingEnumerable() 方法应该允许我在数据到达时持续提取数据。它本身没有问题。我能够处理数十个请求,而没有一个错误或缺陷,使用 Windows 表单填写请求并提交它们。一旦我对这个过程充满信心,我就会通过 asmx Web 服务将它连接到我的站点,并使用 jQuery ajax 调用来提交请求。

站点根据提交的 url 提交请求,Web 服务从 url 下载 html 内容并在内容中查找其他 url。然后它继续为它找到的每个 url 创建一个请求,并将其提交给 BlockingCollection。在 WCF 服务中,如果应用程序处于联机状态(即任务已启动) - 它使用 GetConsumingEnumerable 通过 Parallel.ForEach 拉取请求并处理请求。

这适用于前几次提交,但随后任务意外停止。当然,这比我在测试中模拟的请求多 10 倍——但我预计它只会节流。我相信问题出在我启动任务的方法中:

我曾考虑将其移至 WF4 服务中,然后将其连接到工作流中并使用工作流持久性,但除非必要,否则我不愿意学习 WF4。如果需要更多信息,请告诉我。

0 投票
1 回答
374 浏览

.net - 快速使用 BlockingCollection 导致 UI 线程卡住?

我即将编写一个从 TCP 端口获取流数据的应用程序,并对它们进行一些实时计算。到目前为止一切都很好,但是当生产者线程开始对数据块进行一些打包(参见代码)时,用户界面和选框进度条会卡住(对于不规则的短时间段)。

我做了很多测试,发现对 BlockingCollection bcPort 的访问似乎是问题所在。bcPort 不断从另一个数据加法器线程获取块,该线程也应该不会影响 ui 线程。所以我不明白以下内容:

1.) 当我使用不同的线程来添加和打包块时,为什么 GUI 会卡住?

2.) 为什么在我使用 BC 存储数据时会发生这种情况?这些线程安全集合不是为了这个特定目的吗?

顺便说一句:Windows 7 ResourceManager 在流式传输期间显示 100% 的 CPU 使用率,每个块包含大约 2000 个浮点值,其中 4 或 5 个每秒涌入。我也禁用了记录器,但没有效果。消费者和评估线程被禁用。

除了 ui 线程,只有一个名为“ReceiveAndSave”的线程从传入的浮点值中生成块(参见代码,方法“Add”)。线程“Producer”正在做一些进一步的打包并将消费者的块排入队列(停用)。

生产者以 'myThreads[0] = new Thread(Produce); 开始 myThreads[0].Name = "生产"; myThreads[0].Start();'

0 投票
0 回答
796 浏览

c# - 当生产者也是消费者时,如何在生产者/消费者模式中使用阻塞集合 - 我该如何结束?

我有一个递归问题,消费者在树的每一层都做了一些工作,然后需要沿着树递归并在下一层执行相同的工作。

我想使用 ConcurrentBag/BlockingCollection 等来并行运行它。在这种情况下,队列的消费者,也是队列的生产者!

我的问题是这样的:使用 BlockingCollection,我可以编写非常简单的 foreach 逻辑来使项目出队,并将新项目排队 - 当队列为空时,阻塞集合将正确阻塞,并等待另一个产生新工作消费者。

但是我怎么知道是否所有的消费者都在阻止?!

我知道 CompleteAdding(),但这似乎没有用,因为你真正完成的唯一时间是所有生产者都完成生产并且队列为空 - 因为它们都会阻塞,所以没有人“ free" 来设置 CompleteAdding()。有没有办法检测到这一点?(也许一个事件可以在阻塞时触发,并在解除阻塞时再次触发?)

我可以手动处理这个问题,不使用 foreach,而是手动使用 while(!complete) 循环,并使用 TryTake,但是我需要手动休眠,这似乎效率低下(拥有阻塞集合的全部原因与只是首先是并发集合!)每次通过循环,如果 TryTake 为 false,我可以设置一个 Idle 标志,然后让 Master 检查队列是否为空,并且所有线程都处于空闲状态,设置一个完整标志,但是,这似乎很笨拙。

直觉告诉我有一些方法可以使用阻塞收集来做到这一点,但我无法做到。

无论如何,当消费者是生产者并且能够检测到何时释放所有块时,任何人都有一个很好的模式会很棒

0 投票
2 回答
3629 浏览

c# - 什么更好以及为什么使用 List 作为线程安全:BlockingCollection 或 ReaderWriterLockSlim 或锁定?

我有System.Collections.Generic.List _myList很多线程可以从中读取或同时向其中添加项目。从我读过的内容来看,我应该使用“BlockingCollection”,这样就可以了。我还阅读了ReaderWriterLockSlimand lock,但我不知道如何使用它们而不是BlockingCollection,所以我的问题是我可以这样做:

  1. ReaderWriterLockSlim

而不是使用“BlockingCollection”。BlockingCollection如果是,您能否提供简单的示例以及使用, ReaderWriterLockSlim,的优缺点lock

更新 的读者将不仅仅是作家!

0 投票
1 回答
681 浏览

c# - BlockingCollection 存在字节数组问题

我遇到一个问题,其中一个带有字节 [20] 的对象被传递到一个线程上的 BlockingCollection 中,而另一个线程使用 BlockingCollection.Take() 返回带有字节 [0] 的对象。我认为这是一个线程问题,但考虑到 BlockingCollection 是一个并发集合,我不知道发生在哪里或为什么会发生这种情况。

有时在 thread2 上,myclass2.mybytes 等于 byte[0]。非常感谢有关如何解决此问题的任何信息。

[编辑] 原始代码。我删除了上面似乎运行良好的代码,所以我花时间浏览了我的原始代码并发布了它。

消息缓冲区.cs

在具有 Listener() 和 ReceivedMessageHandler(object messageProcessor) 的类中

在线程 1

Message.ReadMessage(NetworkStream 流,iTcpConnectClient 客户端)

在线程 2

PlayerStateMessage.cs

Message.GetVector2FromBuffer(int bufferlocation) 这是抛出异常的地方,因为 this.Buffer 应该是 byte[0] 而应该是 byte[20]。