问题标签 [blockingqueue]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
6060 浏览

java - ThreadPoolExecutor 策略

我正在尝试使用 ThreadPoolExecutor 来安排任务,但在其策略上遇到了一些问题。这是其声明的行为:

  1. 如果运行的线程少于 corePoolSize,则 Executor 总是更喜欢添加新线程而不是排队。
  2. 如果 corePoolSize 或更多线程正在运行,Executor 总是更喜欢排队请求而不是添加新线程。
  3. 如果请求无法排队,则会创建一个新线程,除非这将超过 maximumPoolSize,在这种情况下,该任务将被拒绝。

我想要的行为是这样的:

  1. 和上面一样
  2. 如果超过 corePoolSize 但小于 maximumPoolSize 线程正在运行,则更喜欢添加新线程而不是排队,并且使用空闲线程而不是添加新线程。
  3. 和上面一样

基本上我不希望任何任务被拒绝;我希望他们在无界队列中排队。但我确实希望拥有最多 maximumPoolSize 线程。如果我使用无界队列,它在达到 coreSize 后永远不会生成线程。如果我使用有界队列,它会拒绝任务。有没有办法解决?

我现在正在考虑的是在 SynchronousQueue 上运行 ThreadPoolExecutor,但不直接将任务提供给它 - 而是将它们提供给单独的无界 LinkedBlockingQueue。然后另一个线程从 LinkedBlockingQueue 提供给 Executor,如果一个被拒绝,它会再次尝试,直到它没有被拒绝。不过,这似乎是一种痛苦和一种黑客行为 - 有没有更清洁的方法来做到这一点?

0 投票
4 回答
1043 浏览

java - ThreadPoolExecutor - ArrayBlockingQueue ...在从队列中删除元素之前等待

我正在尝试调整执行以下操作的线程:

只有 1 个线程的线程池[CorePoolSize =0, maxPoolSize = 1]

使用的队列是ArrayBlockingQueue

Quesize = 20

背景:
线程尝试读取请求并对其执行操作。

但是,最终请求增加得如此之多,以至于线程总是很忙并消耗 1 个 CPU,这使其成为资源占用。

我想要做的是,而是每隔一段时间对请求进行采样并处理它们。可以安全地忽略其他请求。

我要做的就是在“操作”函数中设置一个睡眠,这样对于每个任务,线程都会睡眠一段时间并释放 CPU。

问题:
但是,我想知道是否有一种方法可以使用一个队列,该队列基本上本身会在读取下一个元素之前休眠一段时间。这将是理想的,因为在执行过程中休眠任务并保持执行不完整对我来说听起来并不是最好的。

如果您对任务还有其他建议,请告诉我

谢谢。

编辑:我在这里添加了一个后续问题, 将 maxpool 大小更正为 1 [匆忙编写] .. 感谢 tim 指出。

0 投票
2 回答
1880 浏览

java - 如果当前线程休眠,ThreadPoolExecutor 是否会产生一个新线程

这个问题是对这个问题的跟进。

本质上,我正在做的是ThreadPoolExecutor只用一个线程声明 a 。我正在重写该beforeExecute()方法以使睡眠,以便我的每个任务在它们之间都有一些延迟执行。这基本上是为了将 CPU 交给其他线程,因为我的线程有点颠簸。

所以预期的行为是:

对于 中的每个新任务ThreadPoolExecutor,它会在执行任务之前调用 before execute 函数,因此它会在执行任务之前休眠 20 秒。

然而,这就是我所看到的:

对于提交的每个新任务:

  1. 它执行任务
  2. 调用 beforeExecute 方法
  3. 睡了 20 多岁
  4. 重新执行任务!

1. & 2. 的顺序并不总是相同的。

以下是我的问题:

  1. 似乎有一个新线程在睡眠之后/期间进入,并在实际线程处于睡眠状态时立即执行我的任务。
    那么ThreadPoolExecutor一旦现有线程休眠[认为线程已终止],是否会产生一个新线程?我试图把 keepAliveTime > sleeptime ..这样如果上面的断言是真的..它至少等待超过睡眠时间来产生一个新线程......[希望同时睡眠线程会被唤醒并且会ThreadPoolExecutor抛弃产生新线程的想法
  2. 即使它确实产生了一个新线程并立即执行我的任务,为什么在睡眠线程唤醒后要重新执行任务!在此之前不应该将任务从任务队列中取出吗?
  3. 我在这里错过了什么吗?还有其他方法可以调试这种情况吗?

=>我正在考虑完成所需任务[而不是解决问题]的另一种方法是用一个可运行的可运行文件包装可运行文件,并在调用内部可运行文件之前休眠外部可运行文件。

0 投票
4 回答
505 浏览

java - Java 的 BlockingQueue 设计问题

方法 java.util.concurrent.BlockingQueue.add(E e) 的 JavaDoc 内容如下:

布尔加法(E e)

如果可以在不违反容量限制的情况下立即将指定元素插入此队列,则在成功时返回 true,如果当前没有可用空间则抛出 IllegalStateException。当使用容量受限的队列时,通常最好使用 offer。

我的问题是:它会返回假吗?如果不是,为什么这个方法返回一个布尔值?我觉得很奇怪。这背后的设计决策是什么?

谢谢你的知识!
曼努埃尔

0 投票
4 回答
2892 浏览

java - JMS 是对持久阻塞队列需求的解决方案吗?

我正在创建一个由 Log4J 附加程序组成的库,该附加程序将事件异步发送到远程服务器。当一个日志语句被创建时,appender 将异步记录事件到一个本地队列中,然后一个消费者池将检索并发送到远程。

完全在内存中的解决方案是创建一个 BlockingQueue 来处理并发问题。但是,我希望保留队列,以便如果远程服务器不可用,我不会无限制地增长队列或在有界队列的情况下开始丢弃消息。

我正在考虑使用嵌入式 H2 数据库在本地存储事件,然后使用轮询机制来检索事件并发送到远程。我宁愿使用 BlockingQueue 而不是轮询数据库表。

JMS 是答案吗?

编辑:

如果 JMS 是答案,而且似乎是这样,那么是否有人对可以配置为仅接受进程内消息的轻量级、可嵌入 JMS 解决方案提出建议?换句话说,我不想也可能不会被允许打开一个 TCP 套接字来监听。

编辑:

我现在嵌入了 ActiveMQ,它似乎正在工作。谢谢大家。

0 投票
3 回答
4925 浏览

c++ - C++ pthread阻塞队列死锁(我认为)

我在使用 pthreads 时遇到问题,我认为我遇到了死锁。我创建了一个我认为可以正常工作的阻塞队列,但是在进行了更多测试后,我发现如果我尝试取消阻塞在 blocking_queue 上的多个线程,我似乎遇到了死锁。

阻塞队列非常简单,看起来像这样:

为了测试,我创建了 4 个线程来拉这个阻塞队列。我在阻塞队列中添加了一些打印语句,每个线程都进入 pthread_cond_wait() 方法。但是,当我尝试在每个线程上调用 pthread_cancel() 和 pthread_join() 时,程序就会挂起。

我也只用一个线程对此进行了测试,并且效果很好。

根据文档, pthread_cond_wait() 是一个取消点,因此在这些线程上调用取消应该会导致它们停止执行(这仅适用于 1 个线程)。但是 pthread_mutex_lock 不是取消点。在调用 pthread_cancel() 时是否会发生某些事情,被取消的线程在终止之前获取互斥锁并且不解锁它,然后当下一个线程被取消时它无法获取互斥锁和死锁?还是我做错了什么。

任何建议都会很可爱。谢谢 :)

0 投票
2 回答
4987 浏览

java - Java:使用 BlockingQueue 的生产者/消费者:让消费者线程等待()直到另一个对象排队

最近,我遇到了一些与线程相关的问题,该问题与需要积分的消费者有关。这是原始版本,除了占用大量cpu不断检查队列外,它工作正常。想法是cuePoint可以随便调用,主线程继续运行。

我尝试通过在每次调用 cue 函数时添加 notify() 来解决 cpu 问题,并将 doFirstPoint() 重新工作为如下所示:

但是,我发现 notify() 和 wait() 仅适用于同步函数。当我使 doFirstPoint 和 cuePoint 同步时,调用 cuePoint 的主线程将一直等待。

我有一些想法来解决这个问题,包括使线程成为对象并直接通知它,但我不确定这是否会导致比它修复的问题更多、形式非常糟糕或根本不起作用。我缺少这个问题的简单解决方案吗?

0 投票
1 回答
974 浏览

java - 从 Socket 读取行并将每行放入 BlockingQueue

任何人都可以提供 Java 中的示例,或者建议实现一个从套接字异步读取行并将每行放入 BlockingQueue 的类。假设套接字已连接,并且 BlockingQueue 和使用者已经存在。

编辑:还有一件事,它需要能够在一段时间不活动后超时,并在命令下立即停止。

这不是家庭作业,我根本无法找到完整的例子来说明如何做好这件事,并且可靠。

非常感谢你。

0 投票
10 回答
21024 浏览

java - “关闭”阻塞队列

我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue 。例如,这个伪代码描述了消费者部分:

到目前为止,一切都很好。在阻塞队列的javadoc中我读到:

BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作来指示不再添加项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是生产者插入特殊的流尾或有毒对象,当消费者采用时会相应地解释这些对象。

不幸的是,由于使用的泛型和 ComplexObject 的性质,将“毒物”推入队列并非易事。所以这种“通用策略”在我的场景中并不是很方便。

我的问题是:我可以使用哪些其他好的策略/模式来“关闭”队列?

谢谢!

0 投票
2 回答
346 浏览

java - 更新存储在阻塞优先级队列中的自定义对象

我有一个阻塞优先级队列,它存储消息类型的对象,消息具有字符串 [] 数据 = 新字符串 [10]。现在我必须遍历整个阻塞队列,检查它的对象消息的第二个元素是否等于传入消息的第六个元素。

Messages 的比较器不是基于需要更新的第 6 个元素。问题是,如果我取出一个对象,那么如何将它放在同一位置,如果我使用下面的代码更新它,那么无论何时运行 iter.next(),它都可能开始指向下一个对象。

这是我正在尝试的。