问题标签 [producer-consumer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
717 浏览

.net - .NET 生产者-消费者问题

我正在为 Web 服务编写一个相对简单的“代理”应用程序。总体思路是 TCP 服务器(带异步连接)将从客户端读取(字符串)数据并将该数据(作为读取回调函数的一部分)放入两个队列(Q1 和 Q2)之一。另一个线程将读取这些队列中的数据并将其传递给 Web 服务。Q1 中的数据应优先于 Q2 中可能存在的任何数据。

我一直在阅读有关生产者/消费者模式的信息,这似乎就是我试图在队列方面实现的内容。由于我的入队和出队操作将发生在不同的线程上,显然我的队列需要是线程安全的并支持某种锁定机制?这是一个 .NET 4.0 应用程序,我看到了有关新 BlockingCollection 和 ConcurrentQueue 类的文档,但我不确定到底有什么区别,或者我将如何在这种情况下实现它们。任何人都可以对此有所了解吗?谢谢!

0 投票
3 回答
600 浏览

.net - .NET 消费者线程处理两个队列的算法(基于优先级)

我有一个 C# 4.0 应用程序,其“高优先级”和“低优先级”队列实现如下:

BlockingCollection highPriority = new BlockingCollection(1000); BlockingCollection lowPriority = new BlockingCollection(1000);

在 highPriority 中产生的任何数据都应该在 lowPriority 中产生的任何数据之前被使用。这里的转折是数据可以随时生成到两个队列中的任何一个。因此,在我使用 highPriority 中的所有数据之后,我将使用可能处于 lowPriority 的任何数据。如果在我以低优先级消费数据时,以高优先级产生新数据,我想以低优先级完成当前项目的消费,然后切换回并以高优先级处理数据。

任何人都可以建议一种算法来帮助解决这个问题吗?伪代码很好。非常感谢。

0 投票
1 回答
700 浏览

python - 如何在 Python 中管理/托管 AWS SQS 队列消费者?

我目前正在开发一个使用 Amazon SQS 的用 Python 编写的分布式处理应用程序。

在 Python 中正确创建和托管队列使用者的最 Pythonic 方式是什么:

  • 可靠:如果出现问题并且消费者终止,应该执行一些反应性代码并重新启动处理
  • 重用代码:必须有一些包可以帮助解决这个问题,而且不必重新发明轮子会很棒:)

提前致谢!

0 投票
5 回答
899 浏览

c# - 在 C# 中使用通知的生产者和消费者

我有一个生产者 (P) 将项目推入消费者 (C)。C 可能决定在所有物品到达后不立即消费;相反,它可能会异步等待直到队列中有n 个项目或直到某个时间间隔t已经过去,然后消耗队列中的所有项目并重新启动计时器。

每当它推送到 C 的项目确实已被消费时,需要通知 P。这是因为 P 负责在发生故障时恢复(P 将当前状态保存到 DB)。因此,在重启的情况下,P 需要知道它需要再次推送哪些项目,因为它们还没有被消费者确认。

我的第一个想法是让 C 通过回调函数(委托)通知 P:每当一个项目被消费时,P 的回调函数就会被一个消费项目列表调用。

但我想知道是否可能有其他(更好的)方式通知制片人。你怎么认为?

斯特凡诺

更新:感谢您到目前为止的回答。我目前正在研究使用异步方法是否可能是一种优雅的方式来同步发布者和消费者正在发布的项目的状态。

0 投票
5 回答
10337 浏览

java - 是否建议向 ThreadPoolExecutor 的 BlockingQueue 添加任务?

ThreadPoolExecutor的 JavaDoc不清楚是否可以直接将任务添加到BlockingQueue后备执行程序。文档说调用executor.getQueue()“主要用于调试和监控”。

我正在ThreadPoolExecutor用我自己的BlockingQueue. 我保留了对队列的引用,因此可以直接向其中添加任务。返回相同的队列getQueue()所以我假设警告getQueue()适用于通过我的手段获得的支持队列的引用。

例子

代码的一般模式是:

queue.offer()对比executor.execute()

据我了解,典型用途是通过executor.execute(). 我上面示例中的方法具有阻塞队列的好处,而execute()如果队列已满并拒绝我的任务,则会立即失败。我也喜欢提交作业与阻塞队列交互;这对我来说感觉更“纯粹”的生产者消费者。

直接将任务添加到队列的含义:我必须调用prestartAllCoreThreads()否则没有工作线程正在运行。假设没有与执行程序的其他交互,则不会监视队列(ThreadPoolExecutor来源检查证实了这一点)。这也意味着对于直接入队,ThreadPoolExecutor必须另外为 > 0 个核心线程进行配置,并且不得将其配置为允许核心线程超时。

tl;博士

给定一个ThreadPoolExecutor配置如下:

  • 核心线程 > 0
  • 核心线程不允许超时
  • 核心线程已预先启动
  • 持有对BlockingQueue支持执行者的引用

将任务直接添加到队列而不是调用是否可以接受executor.execute()

有关的

这个问题(生产者/消费者工作队列)类似,但不具体涉及直接添加到队列中。

0 投票
1 回答
736 浏览

c - Posix Semaphore => 如何在一段时间后关闭信号量。生产者-消费者问题

我正在通过生产者读取文件并将它们写入消费者的其他文件,当生产者读取输入文件时,我需要关闭其中一个信号量,消费者不再等待生产者将一些新输入插入缓冲区。有没有办法做到这一点?

问候...

0 投票
1 回答
116 浏览

c++ - 从另一个进程读取未刷新的页面

我有一个生产者进程,它写入一个 mmap'd 文件和一个从中读取的消费者进程。这是在 Linux 上。

如果生产者对 mmap 进行了更改并且没有立即刷新,那么当消费者访问它时会发生什么?它会从磁盘中获取旧版本,还是足够聪明地获取未刷新的页面?

0 投票
2 回答
2507 浏览

c - How to quit simple Producer-Consumer problem

I am trying to work out a simple producer-consumer program. I have this code:

Unfortunately this code ends with deadlock. I have this output:

Please notice that "Bye!" is not written. On the other hand extra "consuming" is. What is wrong with this solution? Using global variable for detecting the end is not ok? Can't figure it out...

Thank you for any ideas.

EDIT: Acording to your advices I changed the allocation of local variable to volatile and added the '\n' but the problem persists.

0 投票
1 回答
951 浏览

python - python 中的协程提供了什么来改进幼稚的消费者/生产者设置?

我读过一些关于协程的文章,尤其是关于 python 的,但对我来说,有些东西并不完全显而易见。

我已经实现了一个生产者/消费者模型,其基本版本如下:

显然,MyConsumer 也可以有生产例程,因此可以轻松构建数据管道。正如我在实践中实现的那样,定义了一个实现消费者/生产者模型逻辑的基类,并实现了在子类中覆盖的单个处理功能。这使得生成具有易于定义、隔离的处理元素的数据管道变得非常简单。

在我看来,这似乎是为协程提供的典型应用程序,例如在经常引用的教程中:http ://www.dabeaz.com/coroutines/index.html 。不幸的是,对我来说,协程相对于上面的实现有什么优势并不明显。我可以看到,在可调用对象更难处理的语言中,有一些收获,但在 python 的情况下,这似乎不是问题。

有人可以为我解释一下吗?谢谢。

编辑:抱歉,上面代码中的生产者从 0 到 9 计数并通知消费者,然后消费者打印出他们的名字和计数值。

0 投票
4 回答
4374 浏览

java - 使用 ConcurrentLinkedQueue 的 Java 线程问题

我对以下代码片段有疑问。它旨在处理添加到事件队列 (ConcurrentLinkedQueue) 的事件(通过调用 processEvent 方法提供)。事件被添加到事件队列并在 run 方法中定期处理。

几乎总是一切都很好。但是有时在调用 processEvent 方法之后,当一个事件被添加到队列中时,运行部分无法看到有一个新事件。

知道什么是错的吗?除了使用字符串常量作为锁的明显错误之外?