问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
635 浏览

apache-kafka - Apache Kafka Producer Program 正在抛出 kafka.common.KafkaException、java.nio.channels.ClosedChannelException 和 kafka.common。如何解决?

[2016-02-17 09:02:36,876] 错误无法发送对具有 [0,32] 中相关 ID 的主题推文的请求

如何解决此问题任何帮助将不胜感激

提前致谢

0 投票
1 回答
22597 浏览

maven - Kafka Maven 依赖项

以下两个依赖项有什么区别?我真的需要第一个来制作消费者或生产者应用程序吗?

我的 Producer 只适用于第一个,但消费者需要第二个。

我曾认为“kafka-clients”工件适用于生产者和消费者。但看起来“kafka.consumer.Consumer”来自另一个依赖项。为什么有区别?

另外,为什么第一个工件命名为kafka_2.9.2?即为什么名称中有版本标识符?

0 投票
1 回答
729 浏览

apache-kafka - Kafka消费者Java代码不起作用

我刚开始使用 Kafka,我能够通过命令提示符生成和使用数据,甚至可以从远程服务器通过 Java 代码生成数据。

但我正在尝试这个简单的消费者 Java 代码,但它不起作用。

输出是: -

没有错误,程序没有终止或给出任何输出!!

动物园管理员日志

我在 Kafka 控制台中无限循环地得到这个。请解释

我以以下方式创建了主题

我可以在命令提示符下使用它

无法理解是什么问题。

0 投票
1 回答
2008 浏览

networking - 如何连接到 Virtualbox 上的 kafka 集群?

这是我在本地的设置:三个虚拟机(使用 Virtualbox)、kafka 和 zookeeper 都安装在这三个虚拟机上。他们也都在互相交谈。

我正在尝试使用本地的 kafka-console-producer,这需要代理列表。我正在提供我的虚拟机的 IP,但它似乎不起作用。我也尝试过虚拟机上的 Advertisementd.host 属性,但对我没有影响。这是我的三台机器的 server.properties:

服务器 1:

服务器 2:

服务器 3:

我的 virtualbox 也有端口转发设置: 在此处输入图像描述 同样,对于其他两台机器,端口也只是稍微调整了一下。

我可以很好地连接到 Zookeeper,所以:

能够连接到 VM 上的 zookeeper。但是,如果我尝试连接 kafka-console-producer,当我尝试发送消息时它会失败:

导致:

不知道我在这里做错了什么?有任何想法吗?(如果有人愿意,我可以提供 ifconfig 输出)。任何帮助将不胜感激。

[编辑 1]:添加 zookeeper quorum 的输出:

0 投票
4 回答
26186 浏览

apache-kafka - Kafka 是否支持请求响应消息传递

我正在研究 Kafka 9 作为一个爱好项目,并完成了一些“Hello World”类型的示例。

我不得不考虑基于请求响应消息的真实世界 Kafka 应用程序,更具体地说,如何将 Kafka 请求消息链接到其响应消息。

我正在考虑使用生成的 UUID 作为请求消息键并将此请求 UUID 用作关联的响应消息键。与 WebSphere MQ 具有消息相关 ID 的机制类型非常相似。

我的结束 2 结束过程将是。

1)。Kafka 客户端生成随机 UUID 并发送单个 Kafka 请求消息。2)。服务器将使用此请求消息提取并存储请求 UUID 值 3)。使用消息有效负载完成业务流程。4)。使用请求消息中存储的 UUID 值作为响应消息 Key 的响应消息进行响应。5)。Kafka 客户端轮询响应主题,直到超时或检索到具有原始请求 UUID 值的消息。

我担心的是 Kafka 消费者轮询会从响应主题中删除其他客户端消息,并增加偏移量,使其他客户端失败。

我是否正在尝试将 Kafka 应用到它从未设计过的用例中?

是否可以在 Kafka 中实现请求/响应消息传递?

0 投票
0 回答
163 浏览

apache-kafka - 我有一个包含 3 个分区和 3 个复制的 Kafka 主题。我的 Storm 拓扑如何接受来自这 3 个分区和复制的数据

我用 3 个分区和 3 个复制创建了一个 Kafka 主题

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

并编写了 Kafka-Producer Program 从 API 获取数据,它工作正常。我的数据成功存储在 3 个分区和 3 个复制中。

现在我必须处理分区 Kafka 日志中的数据。我有一个风暴拓扑。我的拓扑如何接受来自日志的数据作为输入。

最初我创建了一个具有 1 个分区、1 个复制的主题,并且我的风暴拓扑工作正常。但是当我将分区保持为 3 并且复制为 3 时,数据不会进入风暴拓扑。如何解决它。

我的拓扑代码

帮我解决这个问题

提前致谢

0 投票
2 回答
4573 浏览

apache-kafka - Kafka block.on.buffer.full 默认值

我正在检查 kafka(0.9.0.1) 生产者配置,block.on.buffer.full文档中的属性说:

当我们的内存缓冲区耗尽时,我们必须停止接受新记录(块)或抛出错误。默认情况下,此设置为 true,我们会阻止,但在某些情况下,阻止是不可取的,最好立即给出错误。将此设置为 false 将完成:如果发送记录并且缓冲区空间已满,生产者将抛出 BufferExhaustedException。

所以理论上它应该是真的,但是在同一个文档(http://kafka.apache.org/documentation.html)中,该表有一个名为“default”的列,指出默认值实际上是假的。

哪一个是正确的?

0 投票
1 回答
1444 浏览

apache-kafka - 生产者和消费者是否需要指定分区

我们计划让生产者(Java rest api)和消费者(Java 客户端)在多台机器上运行,并且所有消费者都属于同一个消费者组。

1)在这种情况下,如果我在生产者发布消息期间未指定分区,其中 Kafka 写入随机/默认分区......

2)当消费者检索消息时,如果我不提及分区,我会传递主题名称..并将偏移量提交给Kafka服务器..

如果我有一个消费者群体,这是一个好方法吗???

3)我是否需要拥有 Kafka 休息代理,据我了解,这不是必需的,因为我正在使用 Java 客户端..如果我错了,请纠正我...请让我知道如果我有什么好处我会得到在这里使用 Kafka 休息代理....

0 投票
1 回答
900 浏览

apache-kafka - KAFKA:找到已发布消息的分区?

我使用 KeyedMessage data = new KeyedMessage("topic",partition_key, msg); 发布了一条消息

如何重新检查消息是否到达了哪个分区?

0 投票
3 回答
18758 浏览

java - 如何在 Kafka 中设置消息的大小?

我目前正在使用 Kafka 0.9.0.1。根据我发现的一些来源,设置消息大小的方法是修改server.properties.

  • message.max.bytes
  • 副本.fetch.max.bytes
  • fetch.message.max.bytes

我的server.properties文件实际上有这些设置。

其他可能相关的设置如下。

但是,当我尝试发送有效负载为 4 到 6 MB 的消息时,消费者永远不会收到任何消息。生产者似乎在没有抛出任何异常的情况下发送消息。如果我确实发送了较小的有效负载(例如 < 1 MB),那么消费者确实会收到消息。

知道我在配置设置方面做错了什么吗?

这是发送消息的示例代码。

这是接收消息的示例代码。

以下是为生产者和消费者填充属性文件的方法。