问题标签 [producer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
9000 浏览

producer - kafka 生产者能够在列表中发送多条消息吗?

在 Kafka v 0.8.2 中,作为生产者发送 ProducerRecords 列表而不是一次仅发送一个列表是否仍然可能/有益?检查 API 似乎 org.apache.kafka.clients.producer.KafkaProducer 无法使用一次发送调用发送多个 producerRecords。

但是,在 javaapi 下的生产者中,可以发送 keyedMessages 列表。有人可以解释其中的区别或指出我正确的方向吗?非常感激。

0 投票
2 回答
2558 浏览

apache-kafka - Kafka 生产者代码 FailedToSendMessageException

使用 Kafka 生成消息

在使用 eclipse 运行这个程序时,我得到了以下异常:

Zookeeper 服务启动,Broker 启动,Topic 创建。消费者也准备好了。

有人可以帮我解决这个问题吗?

0 投票
1 回答
962 浏览

java - Kafka 不会在主题中均匀地填充分区

我想要 1 个主题和 10 个分区。我正在使用 Kafka 的默认配置。我通过该帮助脚本创建了 1 个具有 10 个分区的主题,现在我将要向它生成消息。

问题是,似乎只有 5 个分区可供消费者从中获取数据。

让我们更详细地描述它。

我知道每个分区需要一个消费者线程的常见问题。我希望能够提交每个分区的偏移量,这只有在每个分区的每个消费者连接器有 1 个线程时才有可能(我使用的是高级消费者)。

所以我创建了 10 个线程,在每个线程中我调用 Consumer.createJavaConsumerConnector() 我正在执行此操作

最后我有 1 个迭代器,它消耗来自 1 个分区的消息。

当我这样做 10 次时,我有 10 个消费者,每个分区每个线程的消费者,我可以在每个分区独立提交偏移量,因为如果我在主题映射中放置与 1 不同的数字,我最终会得到超过 1 个该主题的消费者线程因此,如果我要使用创建的消费者实例提交偏移量,它将为所有不需要的线程提交它们,因此对于不需要的多个分区。

但问题是,当我使用消费者时,只涉及 5 个消费者,其他线程似乎处于空闲状态,但我不知道为什么。

第一个可能的原因是,即使我有 10 个分区,也只有 5 个分区有消息,所以其他 5 个消费者处于空闲状态,但我不明白当我使用生产者时,消息如何不能均匀地分布在所有分区中。我正在发送大约 100 万条消息,所以如果说它们是均匀分布的,那么每个分区必须至少包含一些消息。

// 编辑

我设法在一个主题中创建了 10 个分区,但我只有 7 个消费者。这对我来说简直是个奇迹。

问题是我正在循环中创建这些消费者线程。所以我启动第一个线程(提交给执行器服务),然后是另一个,然后是另一个,依此类推。

所以场景是第一个消费者获得所有 10 个分区,然后第二个连接,所以它在这两个之间拆分为 5 和 5(或类似的东西),然后其他线程正在连接。

我将其理解为所有消费者之间的分区重新平衡,因此它在某种意义上表现良好,如果创建更多消费者,则这些消费者之间会发生分区平衡,因此每个消费者都应该有一些分区可以操作。

但从结果中我看到只有 7 个消费者,根据消费的消息,它们似乎被分割为 3,2,1,1,1,1,1 分区方式。是的,这 7 个消费者覆盖了所有 10 个分区,但是为什么超过 1 个分区的消费者不进行拆分并将分区给剩下的 3 个消费者呢?

我非常想知道剩余的 3 个线程发生了什么,以及为什么它们不从分配了超过 1 个分区的消费者那里“抓取”分区。

0 投票
2 回答
75 浏览

java - Java:在自定义集合类上同步

我想要一个由生产者线程填充的共享集合类,输出由消费者线程显示。它有时使用集合类的 0 元素,但永远不会更进一步。在 Eclipse 中,我在应用程序冻结后观察“DestroyJVM”线程。生产者有人工延迟来模拟“慢”生产者。我不知道为什么应用程序没有按顺序工作,例如 “生产者获取集合类的锁,添加整数,消费者等待,生产者释放锁,消费者获取锁,消费者打印,消费者释放锁,生产者获取...... “等等。谁能指出错误在哪里?

这是我的代码:

编辑:我设法重写代码,以便它以所需的方式工作,但是,producerThread 和 consumerThread 不会正常退出。任何想法为什么?

编辑 2:找到解决方案:需要从 producerThread 通知 consumerThread 可以重新获取 getInts() 锁。带有我评论的工作代码如下所示(添加了一些由 consumerThread 修改的数据):

0 投票
1 回答
851 浏览

rx-java - RxJava 背压和对生产者的调用次数

我正在尝试使用 rx Java 中的背压在我的 Android 应用程序中创建无限滚动。我希望它只调用外部服务请求的次数(调用后request(1))。但是在使用 flatmap 之后,每次subscribe加载 16 页。

在我的代码下面有预期的结果。几乎每个测试都因为第一次请求而失败(n=16)

0 投票
2 回答
506 浏览

interface - 使用生产者方法选择 Bean 实现

我按照此处的示例动态选择在运行时注入的实现。然后我尝试根据我的理解来实现它,但我的代码总是返回默认实现;

这是我的代码

当我调用该方法时,testCDI我会更新该方法,handlerValue以便我的生产者方法可以使用该值。我在这里缺少什么来确保在正确的值可用时调用生产者方法?

代码在 Wildfly 8.2.0 上运行

0 投票
5 回答
22820 浏览

docker - 从 docker 主机外部与 kafka docker 容器交互

我已经构建了一个 kafka docker 容器并使用 docker-compose 对其进行编排。

调用docker ps我得到以下输入:

我可以从 docker 容器内部运行生产者和消费者,但它不能从 docker 网络外部运行。

例如

我在本地主机上运行 kafka 生产者,出现以下错误:

这是我在 github 上的 kafka docker 示例,其中包含上述问题。

那么,有没有人遇到同样的问题并且可以以任何方式帮助我?

附加信息

(只需从 ches/kafka 派生并为 docker-compose 修改一些内容):

0 投票
1 回答
1699 浏览

c++ - C++11 中的生产者-消费者

我已经开始学习线程操作,并从一个用于生成随机大写字母的简单程序开始。字母是随机生成的,并通过生产者添加到一个char数组中,任何添加的都以小写形式输出。消费者只需从 char 数组中输出常规大写字母。到目前为止,我有以下内容:

然而,我的输出是大写字母和随机数的混合。怎么了?这是我第一次尝试,所以要温柔。:)

0 投票
1 回答
590 浏览

jsf - bean-discovery-mode="annotated" 无法识别 @Named @Produces getter

我正在尝试运行“Java EE 7 Development with WildFly”一书的示例。现在我面临以下问题/问题:

剧院信息.java:

座位.java:

索引.xhtml:

豆类.xml:

这非常有效——我在我的网络浏览器中看到一张座位表——只要我在 beans.xml 中使用 bean-discovery-mode="all"。一旦我在 beans.xml 中使用 bean-discovery-mode="annotated" ,我在浏览器中就再也看不到座位表了,我看到一个空表,但没有发生错误。

在书中他们使用 bean-discovery-mode="all" 但我更喜欢查看哪些类是托管 bean,哪些不是。要使用 bean-discovery-mode="annotated" 我必须将 @Dependent 添加到某些类,但我无法解决名称生产者方法的问题。任何人都可以帮忙吗?

0 投票
1 回答
1486 浏览

java - CDI-Unit @Produces 不工作

首先,我在 Google 上进行了密集搜索,根据http://jglue.org/cdi-unit-user-guide/生成要注入单元测试的东西应该可以正常工作。

我的设置:

例外:

org.jboss.weld.exceptions.DeploymentException:异常列表有 3 个异常:

异常0:org.jboss.weld.exceptions.DeploymentException:WELD-001408:在注入点[BackedAnnotatedField] @Inject private at.fhhagenberg.unitTesting.beans.SurveyBean.log ...

这意味着 CdiRunner 尝试构建我的 SurveyBean 并注入记录器,但它找不到要注入的对象,尽管我专门在基类中生成它(EntityManager 也是如此)。

有人知道如何解决这个问题吗?

PS:我不允许添加的标签:cdi-unit,jglue