问题标签 [producer-consumer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
579 浏览

java - 等待完成 n 个工作:生产者、消费者和计数器

我面临的问题是我有一个线程 A 并且需要完成 n 个工作。线程 A 必须等到这 n 个工作完全完成。我的想法是使用CountDownLatchn 计数并使用 Producer/Consumer 模式来控制 Worker。

我使用 anAtomicInteger作为计数器:生产者检查计数器值是否大于 0,然后将信号发送到BlockingQueue,如果计数器值小于或等于 0,则生产者将 stopSignal 放入队列。Consumer 从队列中取出,检查信号是否不等于 stopSignal,然后使用 aExecutorService来调度Worker.

Worker 调用getAndDecrement并检查计数器的值是否大于 0,如果是则执行工作,如果工作完成则调用 CountDownLatch#countdown 否则增加计数器incrementAndGet

问题是当工作没有完成时,工人必须增加计数器,但这是在 a 之后,getAndDecrement所以生产者可能会看到计数器的值为 0,并发出停止信号,即使工作总和小于 n!

0 投票
3 回答
26643 浏览

c# - 在 ConcurrentQueue 中尝试出队

如果队列中没有项目,ConcurrentQueue 中的 TryDequeue 将返回 false。

如果队列为空,我需要我的队列将等到将新项目添加到队列中并将新项目出列,并且该过程将继续这样。

我应该在 C# 4.0 中使用 monitor.enter、wait、pulse 还是任何更好的选项

0 投票
3 回答
14203 浏览

c# - 多个生产者,单个消费者

我必须开发一个多线程应用程序,其中将有多个线程,每个线程生成需要保存在队列中的自定义事件日志(不是 Microsoft MSMQ)。

将有另一个线程从队列中读取日志数据并对其进行操作,并使用某些信息将日志信息保存到文件中。基本上,我们在这里实现了多生产者、单消费者范式。

任何人都可以就如何在 C++ 或 C# 中实现这一点提供建议。

谢谢,

0 投票
1 回答
2157 浏览

python - Twisted Python 中的另一个生产者/消费者问题

我正在构建一个服务器,它使用 Twisted Python 在 Redis 之上存储键/值数据。服务器通过 HTTP 接收 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会安排一个任务,该任务会使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。

基本上,我面临两个不同连接之间的生产者/消费者问题,但我不确定当前的实现是否在 Twisted 范式中运行良好。我已经阅读了 Twisted 中关于生产者/消费者接口的小文档,但我不确定是否可以在我的案例中使用它们。欢迎任何批评:经过多年的线程并发,我正试图掌握事件驱动编程。

0 投票
2 回答
257 浏览

apache-flex - 如何使用 Weborb(Producer) 和 MSMQ 设置消息标签

我正在使用以下代码通过我的 flex 程序在 MSMQ 中添加一条消息:

和我的 weborb messing-config.xml 如下:

问题是,如何在我发送的 AsyncMessage 中设置消息标签?

0 投票
1 回答
571 浏览

cuda - 生产者/消费者模型和并发内核


我正在编写一个可以解释为生产者/消费者模型的 cuda 程序。

有两个内核,一个在设备内存上
产生数据,另一个内核产生数据。

消耗线程的数量设置为 32 的倍数,即经纱大小的两倍。
并且每个 warp 等待直到 32 个数据已经产生。

我这里有点问题。
如果消费者内核的加载时间晚于生产者内核,
则程序不会停止。
即使首先加载了消费者,该程序有时也会不确定地运行。

我要问的是 CUDA 中是否有一个很好的生产者/消费者实现模型?
任何人都可以给我一个方向或参考吗?

这是我的代码的骨架。

0 投票
2 回答
4987 浏览

java - Java:使用 BlockingQueue 的生产者/消费者:让消费者线程等待()直到另一个对象排队

最近,我遇到了一些与线程相关的问题,该问题与需要积分的消费者有关。这是原始版本,除了占用大量cpu不断检查队列外,它工作正常。想法是cuePoint可以随便调用,主线程继续运行。

我尝试通过在每次调用 cue 函数时添加 notify() 来解决 cpu 问题,并将 doFirstPoint() 重新工作为如下所示:

但是,我发现 notify() 和 wait() 仅适用于同步函数。当我使 doFirstPoint 和 cuePoint 同步时,调用 cuePoint 的主线程将一直等待。

我有一些想法来解决这个问题,包括使线程成为对象并直接通知它,但我不确定这是否会导致比它修复的问题更多、形式非常糟糕或根本不起作用。我缺少这个问题的简单解决方案吗?

0 投票
1 回答
811 浏览

java - 生产者-消费者场景的正确实现和线程池的“优雅”终止

我正在做我的第一个多线程项目,因此有一些我不确定的事情。关于我的设置的详细信息是关于上一个问题,简而言之:我有一个由Executors.newFixedThreadPool(N). 给一个线程一个操作,该操作对本地和远程资源进行一系列查询并迭代填充一个ArrayBlockingQueue,而其余线程调用take()队列上的方法并处理队列中的对象。

尽管小型和有监督的测试似乎运行良好,但我不确定如何处理特殊情况,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终InterruptedExceptions的 . 我在这里阅读了一些关于 SO 的文章,这让我看到了GoetzKabutz的两篇非常好的文章。共识似乎是不应忽视这些例外。但是我不确定提供的示例与我的情况有何关系,我没有thread.interrupt()在我的代码中调用任何地方......说到这里,我不确定我是否应该这样做......

综上所述,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrruptedExceptions?希望这些问题有意义,否则我会尽我所能进一步描述它。

提前致谢,


编辑:我一直在努力实施一段时间,我遇到了一个新的问题,所以我想我会更新情况。我很不幸遇到了ConcurrentModificationException这很可能是由于线程池的不完全关闭/终止。一旦我发现我可以使用isTerminated()我就尝试了,然后我得到了一个IllegalMonitorStateException由于不同步的wait(). 代码的当前状态如下:

我遵循了@Jonathan 回答中的一些建议,但我认为他的建议与我需要/想要的不太一样。背景故事和我上面提到的一样,相关代码如下:

类持有/管理池,并提交可运行文件:


元素通过位于单独类中的以下代码添加到队列中:

......背后的动机offer()而不是take()乔纳森提到的。由于我的分析需要很长时间,因此无法预料的块很烦人并且很难弄清楚。所以我需要相对快速地知道失败是由于坏块,还是只是处理数字......


最后;这是我的测试类中的代码,我在其中检查“并发服务”(此处命名为 cs)与要分析的其余对象之间的交互:

我意识到这是一个很长的问题,但我试图做到详细和具体。希望它不会太拖累,如果它是我道歉......

0 投票
1 回答
151 浏览

messaging - 单一生产者单一消费者问题的变体

我有这样的情况:

一个生产者执行许多数据库查询 - 结果作为消息发送并到达单个消费者的队列(消息到达时没有特殊顺序)

另一方面 - 单个消费者读取这些消息并希望仅在所有消息到达时才开始处理消息。

解决此类问题的最佳方法是什么?

0 投票
3 回答
2580 浏览

c# - BlockingCollection - 高同步问题

从多个线程获取消息到队列并让一个单独的线程一次处理该队列的项目的最佳方法是什么?

在尝试从多个线程断开活动时,我经常使用这种模式。

我为此使用了 BlockingCollection,如下面的代码摘录所示:

这很好用——所以我想——直到 VS2010 中的性能向导开始突出显示_q.Take()行作为我代码中的最高争用行!

注意我还使用了标准的 ConcurrentQueue 和 ManualResetEvent 组合,每次我将一个项目插入队列时,我都会发出 resetevent 信号,允许工作线程检查和处理队列,但这也具有在 . WaitOne() 方法...

是否有其他方法可以解决这种让许多线程将对象添加到并发队列中的常见模式 - 并且让一个线程在其自己的时间一次一个地遍历项目......

谢谢!!