问题标签 [concurrent-queue]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
934 浏览

c# - 如何使用 TryDequeue 从并发队列中释放对从并发队列中出列的项目的引用

我正在使用 ConcurrentQueue (C#, ASP.NET Core) 来保存上传大文件的任务。即使在项目从并发队列中出列之后,我的内存消耗也非常大。项目不会从内存中清除。

我了解 ASP.NET Core 中 ConcurrentQueue 的行为,简而言之:并发队列中的 32 个项目的每一组都保存在一个单独的段中。仅在特定段中的所有项目都出列后,而不是在为单个项目调用 TryDequeue() 方法之后,才释放出列项目的引用。在我的情况下这是一个问题,因为如果在最坏的情况下没有清理 32 个项目(可能是非常大的 zip 文件),我的内存中可能会有非常大的数量。我什至不希望队列中同时有超过 32 个项目。

即使我提取文件并单独发送它们,里面的每个文件/图像本身也可以很大。唯一对我来说足够好的事情是释放每个出队项目的参考,而无需进一步推迟。有可能吗?怎么做?

我尝试使用 StrongBox,在一些文章中推荐:

这对我来说没有用。内存没有被释放。我不知道我的问题与我正在向队列发送对函数的引用并且它以某种方式保存在内存中的事实有关。该函数的参数之一是上传的具体文件。其他的只是字符串。

最后,我想在调用 TryDequeue() 方法之后或期间处理该项目的内存释放。有可能吗?怎么做?

0 投票
0 回答
191 浏览

c# - 将 ConcurrentQueue.Count 与整数进行比较会返回错误的结果

我正在编写通常的读写器功能,其中一个主线程排队,几个线程出队。因此,在代码的一部分中,我将我的项目数ConcurrentQueue与某个整数进行比较,我们称之为“maxSize”。尽管.Count返回1maxSize为 10,但queue.Count >= maxSize返回 true。

我尝试使用断点进行调试,只设置一个出队线程,甚至暂停它。这发生在主线程入队之后,并且在几行代码之后,这个比较返回了 1 >= 10 的结果。我确信主线程此时只放置了一项,我确信没有Dequeue()被调用。另外,我尝试过仔细检查锁定,但有时它没有帮助。我想知道可能有一些魔法不允许我以我这样做的方式正确比较值,因为当我在调试器中看到 1 >= 10 为真时,我被撕裂了。

绝望中,我添加了日志记录,我发现在单元测试期间,即使在 lock 语句中,问题也是可重现的。

0 投票
1 回答
2353 浏览

c# - C#,ConcurrentQueue 大小限制和时间范围

我正在寻找一种方法: 1. 从限制为一定大小的 ConcurentQueue 读取消息。2. 一次阅读不超过 X 条消息。我想在 2 次中的一个中停止从 Q 读取,直到完成其他代码并再次执行相同的操作。

我看到了队列溢出的不同实现,在这里固定大小的队列会在新的队列中自动将旧值出列, 但无法弄清楚如何正确组合它们。

}

0 投票
1 回答
387 浏览

javascript - 具有并发工作人员的 RxJS 并行队列并处理每个请求

我在使用 RxJS 和处理请求数组的正确方法时遇到问题。假设我有一个大约 50 个请求的数组,如下所示:

我的目标是:

  • 并行启动请求
  • 只有5个可以同时运行
  • 当一个请求完成时,数组中的下一个开始
  • 当请求完成(成功或错误)时,我需要将变量“requestCounter”加一(requestCounter++)
  • 当队列中的最后一个请求完成后,我需要订阅此事件并处理每个请求结果的数组

到目前为止,我最接近的做法是遵循这篇文章中的回复:

具有并发工作者的 RxJS 并行队列?

问题是我正在发现 RxJS,而这个例子对我来说太复杂了,我找不到如何处理每个请求的计数器。

希望您能够帮助我。(抱歉英语不好,这不是我的母语)

编辑:最终解决方案如下所示:

0 投票
1 回答
344 浏览

java - 为什么 ConcurrentLinkedQueue.size() 的复杂性不能保持不变?

JavadocConcurrentLinkedQueue明确指出 size() 方法的复杂度为 O(n)。我觉得这很令人惊讶。我会通过在柜台累积大小来遵循库存LinkedList.size()方法。鉴于ConcurrentLinkedQueue's 操作的异步性质,计数器自然必须是AtomicInteger。这样做是不是因为它会违反非阻塞保证?如果是这样,我应该期望AtomicInteger计数器引入多少开销?

0 投票
2 回答
637 浏览

c# - 以原子方式从 ConcurrentQueue 中获取所有内容

我有多个线程生成项目并将它们粘贴在一个共同的位置ConcurrentQueue

我有另一个单一的消费者线程,但它需要在这个应用程序的上下文中工作的方式是,偶尔,它只需要抓住当前线程队列中的所有内容,将其从该队列中删除,一键完成。就像是:

我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到类似“同时从队列中获取所有对象”,甚至“同时与另一个队列交换内容”之类的东西。

如果我放弃并只用 aConcurrentQueue保护正常,我可以做到这一点,如下所示:Queuelock

但是,我喜欢它的便利性,ConcurrentQueue而且因为我刚刚学习 C#,所以我对 API 很好奇;所以我的问题是,有没有办法用其中一个并发集合来做到这一点?

是否有某种方法可以访问任何同步对象ConcurrentQueue使用并为我自己的目的将其锁定,以便一切都可以很好地协同工作?然后我可以锁定它,拿走所有东西,然后释放?

0 投票
1 回答
1752 浏览

c# - C#并发队列使用

有一个快速的问题。

如果一个线程正在排队而另一个正在出队,我是否必须使用并发队列?在这种情况下(1 位读者和 1 位作者)使用常规容器时是否存在任何竞争条件/其他风险?

0 投票
1 回答
326 浏览

c# - C# Abortable 异步 Fifo 队列 - 泄漏大量内存

我需要以 FIFO 方式处理来自生产者的数据,如果同一生产者产生新的数据位,则能够中止处理。

因此,我基于 Stephen Cleary 的AsyncCollectionAsyncCollectionAbortableFifoQueue在我的示例中调用)和 TPL 的BufferBlockBufferBlockAbortableAsyncFifoQueue在我的示例中)实现了一个可中止的 FIFO 队列。这是基于的实现AsyncCollection

processQueuedItems是从队列中出列的任务AsyncWorkItem,并执行它们,除非已请求取消。

要执行的异步操作被包装成一个AsyncWorkItem看起来像这样的

然后有一个任务查找和出列项目以进行处理,或者处理它们,或者如果CancellationToken已触发则中止。

一切正常 - 数据得到处理,如果接收到新数据,旧数据的处理将中止。我的问题现在源于这些队列泄漏大量内存,如果我提高使用率(生产者生产的比消费者进程多得多)。鉴于它是可中止的,未处理的数据应该被丢弃并最终从内存中消失。

那么让我们看看我是如何使用这些队列的。我有生产者和消费者的 1:1 匹配。每个消费者处理单个生产者的数据。每当我得到一个新的数据项,并且它与前一个不匹配时,我都会为给定的生产者(User.UserId)捕获队列或创建一个新的(代码片段中的“执行者”)。然后我有ConcurrentDictionary一个CancellationTokenSource每个生产者/消费者组合。如果有 previous CancellationTokenSource,我Cancel会在 20 秒后调用它Dispose(立即处理会导致队列中出现异常)。然后我将新数据的处理排入队列。队列返回给我一个我可以等待的任务,以便我知道数据处理何时完成,然后我返回结果。

这是代码中的

在这个示例实现中,所做的只是等待,然后返回 true。

为了测试这种机制,我编写了以下数据生产者类:

它使用类开头的变量来控制测试。您可以定义用户数量(nbOfusers- 每个用户都是产生新数据的生产者)、用户产生下一个数据之间的最小 ( minimumDelayBetweenTest) 和最大 ( ) 延迟以及消费者处理数据所需的时间 ( )。maximumDelayBetweenTestsoperationDuration

StartTests开始实际测试,然后StopTests再次停止测试。

我这样称呼这些

因此,如果我运行我的测试器并键入“开始”,则Producer该类开始生成由Consumer. 并且内存使用量开始增长和增长。该示例配置到极端,我正在处理的实际场景不那么密集,但是生产者的一个动作可能会触发消费者端的多个动作,这些动作也必须以相同的异步可中止 fifo 方式执行 -所以最坏的情况是,生成的一组数据会触发约 10 个消费者的操作(为简洁起见,我删除了最后一部分)。

当我有 100 个生产者时,每个生产者每 1-6 秒产生一个新数据项(随机产生的数据也是随机的)。消耗数据需要 3 秒。所以在很多情况下,在旧数据被正确处理之前就有一组新数据。

查看两个连续的内存转储,很明显内存使用量来自哪里......所有与队列有关的片段。鉴于我正在处理每个 TaskCancellationSource 并且没有保留对生成数据的任何引用(以及AsyncWorkItem它们被放入的数据),我无法解释为什么这会一直占用我的内存,我希望其他人可以告诉我我的方式的错误。你也可以通过输入'stop'来中止测试。你会看到内存不再被吃掉,但即使你暂停并触发GC,内存也没有被释放。

可运行形式的项目源代码在Github上。启动后,你必须start在控制台中输入(加回车)来告诉生产者开始生产数据。您可以通过键入stop(加回车)来停止生成数据

0 投票
0 回答
340 浏览

c# - 如何在 Web api 的并发队列中处理未发送的消息

我在我的 Web API 中使用并发队列,如线程 如何维护 Web API 中的请求状态或队列中所述

但是我遇到了一个问题,如果应用程序池停止或回收,它会丢失内存中的所有数据。有人可以帮助我如何处理同样的事情吗?

我的用例需要接受来自不同来源的消息并将其放入队列中,每隔 5 秒我将这些消息写入输入 .txt 文件。

0 投票
1 回答
97 浏览

c# - c# 一维字节数组到二维双精度数组

我正在处理套接字编程 tcp/ip 中的 c# concurrent-queue 和多线程

首先,我已经完成了套接字编程本身。这意味着,我已经完成了关于客户端、服务器和通信本身的编码

基本结构是流水线的(生产者-消费者问题),现在我正在做位转换

以下是关于我的代码的简短摘要

client-socket -> server-socket -> concurrent_queue_1(byte[65536],Thread_1 处理这个) -> concurrent_queue_2(double[40,3500], Thread_2 处理这个) -> display-data 或其他工作(它可以是gpu-work)

*(double[40,3500]可以改成其他大小)

到目前为止,我已经将 put_data 放入 queue1(Thread1) 并仅将所有 (Thread2) 出列,它的速度约为 700Mbps

我使用两个 concurrent_queue 的原因是,我希望在后台处理通信和类型转换工作,而不管有关控制事物的主要过程。

这是关于我自己的带有阻塞的 concurrent_queue 的代码

我的问题在这里

正如我上面提到的,concurrent_queue1 的类型是 byte[65536] 并且 65536 bytes = 8192 double data。(40 * 3500=8192 * 17.08984375) 我想将多个 8192 双数据合并成双 [40,3500] 的形式(大小可以更改)并使用 Thread2 排队到 concurrent_queue2

使用简单的方法(使用许多复杂的 for 循环)很容易做到这一点,但速度很慢,它会复制所有数据并暴露给上层或层。

我正在搜索自动以匹配大小排队的方法,例如 foreach 循环自动以行主要方式遍历二维数组,尚未找到

有没有什么快速的方法可以将 1D 字节数组合并为 2D 双数组形式并将其入队?

谢谢你的帮助!