问题标签 [producer-consumer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
5 回答
2067 浏览

c# - 审核请求:线程安全队列(并行推送弹出)

这是线程安全队列的实现。push 和 pop 不会互相阻塞。但是,如果队列为空,pop 将等到一个项目被推送。多个生产者和消费者可以使用队列。如果您发现任何问题,请告诉我。

更新:根据答案编辑 1)解决了“队列已满”情况的问题。2).NET4中有BlockingCollection<T>ConcurrentQueue<T>。所以没有必要重新发明轮子(对于.NET4)

0 投票
4 回答
2892 浏览

java - JMS 是对持久阻塞队列需求的解决方案吗?

我正在创建一个由 Log4J 附加程序组成的库,该附加程序将事件异步发送到远程服务器。当一个日志语句被创建时,appender 将异步记录事件到一个本地队列中,然后一个消费者池将检索并发送到远程。

完全在内存中的解决方案是创建一个 BlockingQueue 来处理并发问题。但是,我希望保留队列,以便如果远程服务器不可用,我不会无限制地增长队列或在有界队列的情况下开始丢弃消息。

我正在考虑使用嵌入式 H2 数据库在本地存储事件,然后使用轮询机制来检索事件并发送到远程。我宁愿使用 BlockingQueue 而不是轮询数据库表。

JMS 是答案吗?

编辑:

如果 JMS 是答案,而且似乎是这样,那么是否有人对可以配置为仅接受进程内消息的轻量级、可嵌入 JMS 解决方案提出建议?换句话说,我不想也可能不会被允许打开一个 TCP 套接字来监听。

编辑:

我现在嵌入了 ActiveMQ,它似乎正在工作。谢谢大家。

0 投票
4 回答
1509 浏览

c# - 具有变体的生产者-消费者 - 如何与线程信号/等待同步?

在从事一个大型项目时,我意识到我要打很多电话来安排将来的时间。由于这些都是相当轻量级的,我认为使用单独的调度程序可能会更好。

所以我创建了一个单独的调度程序类,它在自己的线程上运行并执行这些事件。我有 2 个函数可以从不同的线程访问共享队列。我会使用锁,但由于其中一个线程需要休眠等待,我不确定如何释放锁。

  • 问题是,我不确定如何在 2 个函数之间同步对队列的访问。我不能使用监视器或互斥锁,因为 Run() 会休眠等待,从而导致死锁。在这里使用什么正确的同步机制?(如果有一种机制可以自动启动睡眠等待过程并立即释放锁,那可能会解决我的问题)
  • 如何验证没有竞争条件?
  • 这是生产者消费者问题的变体,还是有更相关的同步问题描述?

    虽然这有点面向 C#,但我很高兴听到对此的一般解决方案。谢谢!

  • 0 投票
    0 回答
    888 浏览

    producer-consumer - 生产者消费者通过 Spring 应用程序事件

    我正在尝试通过使用spring的应用程序事件和线程池执行器来实现演员模型模式(与生产者消费者有点混搭)我的主要目标是解耦每一层。我的架构如下:我部署了一个战争,通过休息 API 接收业务事务请求,在任何给定时刻都可以有 X 数量的事务活动,其中 X 是可配置的数量,实际执行必须是异步的,并且每个事务必须在不同的线程中。请求本身以先进先出的方式处理,但它有一些复杂性,因为某些请求必须等待其他请求完成才能处理,但这并不意味着无法处理其他请求,例如:don'

    存款(2) 取款(2) 存款(3)

    其中数字是帐号,我想按以下顺序处理它们:

    存款(2) 存款(3) 取款(2)

    我已经以这种方式构建了体系结构:我有一个休息 api,它获取命中并将它们写入数据库(分布式系统必须在数据库中具有状态)并在应用程序上下文中发布一个 clientrequestevent 我有单例 bean负责发布生产者事件并监控他发送了多少事件(即:他负责限制并发进程的数量并实现上述逻辑)并且我还有几个其他监听器每个动作(提款押金)等..) 监听后者发布的事件并发布完成事件。

    每件事都很好,一切都完成了不同的线程,所有的流程都很好,但是我在中间层有问题,负责确定是否有空闲插槽我不想有同步方法,我也不想想围绕 atomiclong 或类似的东西做一些技巧,我宁愿使用一些阻塞队列来发布事件,但我找不到确定事件何时完成的好方法,所以我可以放一个新的。最大的问题是,为了请求一项新工作,我必须去数据库,这是一项繁重的任务,因为这个系统应该在重负载下工作。我想以某种方式利用阻塞队列和线程池,以便在一个插槽空闲的那一刻从带有线程的大小有界队列中获取

    什么是处理这个的好方法?提前致谢

    0 投票
    1 回答
    1403 浏览

    c++ - 中断或加入后重用 Boost 线程(来自线程池)

    目前,我正在使用生产者消费者模型来渲染实时图形应用程序的部分。消费者将不断在我们的队列中寻找数据(无限循环);但是我担心这可能会导致我的模拟与主循环不同步。我认为这是快速生产者慢消费者问题 - 模拟被限制在一定时间内的事实使情况更加复杂。

    问题-保持这一切平衡并确保消费者有足够的时间完成的最佳方法是什么,而且在我们完成渲染当前帧之前模拟不会移动到下一帧(或者至少能够检测到这一点并跳过渲染下一帧 - 或中断正在渲染的当前帧)我目前只是在每个消费者完成后中断和加入

    第二个问题:如果您查看下面的代码,您会看到我目前只是在将渲染作业添加到队列后调用中断和加入 - 这允许线程始终完成其操作并响应中断等结束了。在调用 interrupt_all 和 join_all 之后,如何重用线程池中的线程?(即如果我再次调用drawNextFrame)

    生产者是执行主线程的一部分(我认为这不会影响任何事情)

    如果您需要查看消费者类,请参见下文:

    我试着让这个简单快速地消化,谢谢你的时间

    0 投票
    4 回答
    2465 浏览

    java - 生产者-消费者使用同步

    我编写了代码来实现生产者-消费者问题,它似乎工作正常,不需要同步。有可能吗?

    我如何测试代码并检查它是否真的工作正常?我怎么知道是否会发生死锁?现在,我没有打破循环(即生产者不断插入,消费者不断地在无限循环中消费)。我使用大小为 3 的循环队列(为简单起见)作为共享资源。

    这是我的代码:

    0 投票
    2 回答
    1517 浏览

    c# - C# 生产者/消费者/观察者?

    我有一个生产者/消费者队列,除了有特定类型的对象。因此,并非任何消费者都可以消费添加的对象。我不想为每种类型创建一个特定的队列,因为太多了。(它有点延伸了生产者/消费者的定义,但我不确定正确的术语是什么。)

    是否存在允许带有参数的脉冲的 EventWaitHandle 之类的东西?例如myHandle.Set(AddedType = "foo")。现在我正在使用Monitor.Wait,然后每个消费者都会检查脉冲是否真的是为他们准备的,但这似乎毫无意义。

    我现在拥有的伪代码版本:

    正如你所看到的,当其他东西被添加到字典中时,我可能会得到脉冲。我只关心何时将 MyType 添加到字典中。有没有办法做到这一点?这不是什么大不了的事,但是,例如,我现在必须手动处理超时,因为每次获取锁都可以在超时内成功,但MyType永远不会添加到timeout.

    0 投票
    1 回答
    500 浏览

    c - “信号量的使用是微妙的错误”

    上个学期我参加了 C 语言的操作系统实习,其中第一个项目涉及制作一个线程包,然后编写一个多生产者-消费者程序来演示该功能。然而,在获得评分反馈后,我因“信号量的使用存在微妙的错误”和“程序假定抢占(例如使用 yield 更改控制)”而失分(我们从非抢占线程包开始,后来添加了抢占。请注意,评论和示例相互矛盾。我相信它也不假设,并且在两种环境中都可以使用)。

    这一直困扰着我很长一段时间 - 课程人员有点不知所措,所以我不能问他们这学期有什么问题。我花了很长时间思考这个问题,但我看不到问题所在。如果有人可以查看并指出错误,或者向我保证实际上没有问题,我将非常感激。

    我相信就线程包函数(微型线程和信号量)而言,语法应该是相当标准的,但如果有什么令人困惑的地方,请告诉我。

    0 投票
    2 回答
    4896 浏览

    message-queue - RabbitMQ:一个队列上的多个消费者是否可以使用非轮询策略?

    我们使用 RabbitMQ 将作业从一台机器上的生产者发送到分布在多台机器上的一小群消费者。

    生产者生成作业并将它们放入队列中,消费者每 10 毫秒检查一次队列以查看是否有无人认领的作业,如果有作业则一次获取作业。如果一个特定的工作人员处理一项工作的时间太长(GC 暂停或其他暂时性问题),其他消费者可以自由地从队列中删除工作,以便整体工作吞吐量保持较高。

    当我们最初建立这个系统时,我们无法弄清楚如何为队列中的多个消费者建立订阅关系,这将防止我们不得不轮询并引入一点额外的延迟。

    检查文档并没有得到令人满意的答案。我们是使用消息队列的新手,我们可能不知道准确描述上述场景的词语。这有点像黑板系统,但在这种情况下,“专家”都是相同的,从不消耗彼此的结果——结果总是报告给工作制作人。

    有任何想法吗?

    0 投票
    1 回答
    1103 浏览

    objective-c - 使用 NSData 解决生产者-消费者问题(用于音频流)

    我正在使用 AVAssetReader 将 PCM 数据从 iPod 轨道复制到缓冲区,然后使用 RemoteIO 音频单元播放。我正在尝试创建一个单独的线程来加载声音数据,这样我就可以在加载缓冲区时访问和播放缓冲区中的数据。

    我目前有一个大型 NSMutableData 对象,它最终保存了整首歌曲的数据。目前,我使用 NSOperation 在单独的线程中加载音频数据,如下所示:

    1. AVAssetReaderOutput 一次最多复制 8192 个字节到 CMBlockBuffer
    2. 将这些字节复制到 NSData 对象
    3. 将此 NSData 对象附加到更大的 NSMutableData 对象(最终保存整首歌曲)
    4. 完成后,通过访问 NSMutableData 对象中的每个数据包来播放歌曲

    我试图在复制这些字节的同时播放歌曲。我不确定同时写入和读取文件的好方法是什么。

    我有一个简短的想法:

    1. 创建并填充 3 个 NSData 对象,每个对象的长度为 8192 字节,作为缓冲区。
    2. 开始播放。当我播放完第一个缓冲区后,将新数据加载到第一个缓冲区中。
    3. 当我播放完第二个缓冲区后,将新数据加载到第二个缓冲区中。第三个也是一样
    4. 再次从第一个缓冲区开始播放,填充第三个。等等。

    或者,创建一个包含 3 * 8192 个 PCM 单元的 NSData 对象,并以某种方式使用两个不同的线程同时对其进行写入和读取。

    我的代码现在在两个不同的线程上工作。我将数据附加到数组,直到我按下播放,此时它停止(可能是因为线程被阻塞,但我现在不知道)并播放直到它到达我加载的任何内容的末尾并导致 EXC_BAD_ACCESS 异常。

    简而言之,我想找到正确的方式在 PCM 数据被复制时播放,比如一次复制 8192 个字节。我可能不得不使用另一个线程(我现在正在使用 NSOperation)这样做,但不清楚如何同时写入和读取缓冲区,最好使用一些更高级别的 Objective-C 方法。