问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
305 浏览

apache-kafka - 如何将对象发送到 Kafka

我无法从我的应用程序向 kafka 发送短信,但是当我尝试发送对象(事件)时,我收到一个异常:

信息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0];嵌套异常是 java.lang.NullPointerException: in org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent null of org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent, headers={id=9068de1b- a1cc-c1b5-463a-107b7be21af4,时间戳=1456754038408}] 1000

欢迎任何帮助...

0 投票
3 回答
29012 浏览

apache-kafka - 一个代理关闭后,kafka 新生产者无法更新元数据

我有一个 kafka 环境,其中有 2 个经纪人和 1 个动物园管理员。

当我尝试向 kafka 生成消息时,如果我停止代理 1(它是领导者),客户端停止生成消息并给我以下错误,尽管代理 2 被选为主题和分区的新领导者。

org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败。

10 分钟后,由于代理 2 是新的领导者,我希望生产者向代理 2 发送数据,但它继续失败并给出上述异常。lastRefreshMs 和 lastSuccessfullRefreshMs 仍然相同,尽管生产者的 metadataExpireMs 为 300000。

我在生产者端使用 kafka new Producer 实现。

似乎在启动生产者时,它会绑定到一个代理,如果该代理出现故障,它甚至不会尝试连接到集群中的另一个代理。

但我的期望是,如果一个代理出现故障,它应该直接检查其他可用代理的元数据并将数据发送给他们。

顺便说一句,我的主题是 4 个分区,复制因子为 2。提供此信息以防万一。

配置参数。

用例:

1- 启动 BR1 和 BR2 产生数据(Leader 是 BR1)

2-停止 BR2 产生数据(罚款)

3-停止BR1(这意味着此时集群中没有活动的工作代理)然后启动BR2并产生数据(虽然领导者是BR2但失败了)

4-开始BR1生产数据(领导者仍然是BR2,但数据生产良好)

5-停止BR2(现在BR1是领导者)

6-停止BR1(BR1仍然是领导者)

7-开始BR1产生数据(消息再次产生良好)

如果生产者向 BR1 发送了最新的成功数据,然后所有的 broker 都挂了,生产者希望 BR1 重新起床,尽管 BR2 已经起床并且是新的领导者。这是预期的行为吗?

0 投票
1 回答
3725 浏览

apache-kafka - 为什么Kafka Producer不能连接到zookeeper来获取broker元数据而不是连接到brokers

Kafka 生产者需要一个 kafka 代理的引导列表才能工作。根据这个解释,它需要它才能连接到其中一个代理,然后获取有关集群中所有实时代理的元数据。

现在,所有代理都已在 Zookeeper 中注册,Kafka 消费者连接到 ZK,ZK 处理来自哪个代理,哪个分区是要读取的数据。当 ZK 已经拥有所有信息时,为什么 Producers 也不能连接到 ZK?

我看到有一些关于此的 SO 问题,但它们似乎解释了为什么消费者需要 ZK 而不是为什么 Producer 需要引导代理列表而不是 ZK?

0 投票
1 回答
4089 浏览

spring - Kafka Producer 未使用 Spring 在 SSL 上生成消息

我正在尝试将 KAFKA 与 Spring 集成,我的 JAVA 应用程序正在与 KAFKA 服务器通信,并且当我使用 HTTP 运行应用程序时,我也在接收消息。

现在我想使用 Spring 在 KAFKA 上添加 SSL,并且我已经完成了SSL KAFKASPRING KAFKA上指定的更改

当我使用命令行(使用 SSL)运行生产者和消费者时,通信正常进行,但是当我更改 Java 应用程序的配置并尝试在主题上生成和使用消息时,消费者工作正常,我正在收到消息指定的主题。

但是生产者没有按预期工作从 JAVA 应用程序生产者发送的消息在消费者(无论是命令行还是 JAVA 应用程序消费者)上都没有收到。

我已经用谷歌搜索了它,但没有找到答案。任何指针?

PS:KAFKA Version kafka 0.9.0.0, JAVA 7, Spring使用的SSL属性如下:

0 投票
0 回答
883 浏览

java - Kafka-Producers 打开的文件太多

使用 Kafka Producer 出现此错误

打开的文件太多

我们通过租户 ID 缓存生产者,使用相同的生产者发送消息。
不关闭连接..

0 投票
2 回答
3831 浏览

apache-kafka - Kafka:在多服务器设置上创建主题时出现 org.apache.zookeeper.KeeperException$NoNodeException

我正在尝试Kafka-0.8.2.2在不同的机器上设置具有 1 个生产者、1 个消费者和 3 个代理的多节点集群。

在创建主题时producer,我收到错误消息org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids。完整的控制台输出可在此处获得。Kafka Producer的日志中没有错误。

我用来运行的命令Kafka是:

注意:Zookeeper 服务在所有服务器上运行,并且所有三个代理都在运行 Kafka 服务器(只有代理需要 Kafka 服务器。对吗?)。

我的producer.properties的配置如下:

以下是我用作参考的许多文章中的一些:

0 投票
1 回答
334 浏览

perl - kafka:无法绑定:主题错误

当我增加卡夫卡生产者的数量时,我遇到了错误。任何人都知道这里可能是什么问题?

请找到我的制作人设置: https ://gist.github.com/Vibhuti/dbf1c24962b91f2bc217

错误日志:

作为参考,我的卡夫卡代码:

0 投票
5 回答
46050 浏览

logging - 如何为 Kafka 生产者配置日志记录?

我正在使用 Kafka 生产者客户端,我的项目中没有任何 log4j 配置。

在运行时,程序会打印出很多我真的不想要的 Kafka 调试日志。

因此,我尝试添加一个 log4j.properties 以将日志级别设置为 ERROR,如下所示,这似乎不起作用:

如何更改 Kafka 日志级别?

0 投票
0 回答
855 浏览

spring-integration - 看不到从 Kafka 生产者发布的消息(Spring 集成)

我在 Java Rest api 中使用 Spring Integration 将消息生成到 Kafka 中。 https://github.com/spring-projects/spring-integration-kafka

当我说

这返回true,这意味着它的发布..但无法在主题日志中看到消息..我检查了 /usr/local/var/lib/kafka-logs/test-0 下的日志

即使我运行了消费者控制台,但我无法看到从 api 发布的消息..

但是如果我从生产者控制台生成消息,我可以在主题日志和消费者控制台中看到消息。

我在 spring 集成 rest api 中使用相同的代理列表和主题名称。

我的弹簧配置出站 xml ..

和监听器实现..

我尝试调试它甚至没有进入 onError 函数。

0 投票
0 回答
530 浏览

apache-spark - spark中的log4j不打印消息也不将它们附加到Kafka

我想使用 log4j 记录在我的 spark 流式处理过程中发生的业务异常。虽然我想集中概览我的所有日​​志(不是集群中每个节点上的日志的一部分),但我想将日志发送到 Kafka,然后创建 Kafka 消费者,它将它们聚合在一起。

我创建 log4j.properties 文件并将其放在 main/resources 中。它看起来像这样:

为了避免发出序列化,我也这样做了:

在可序列化的对象内。在同一个可序列化对象中是我所有的方法,它们使用 kafkaLogger。

在方法中,我编写了 kafkaLogger.warn(message),并尝试记录某些消息,但我没有看到它们写在控制台中,也没有写在 Kafka 中。

有什么建议么?

[编辑] 我已经意识到我的自定义 log4j.properties 是使用的(因为没有打印低于 ERROR 的消息,因为它是在根记录器中设置的)。但是,没有任何自定义消息被打印到输出中,也没有被添加到 Kafka。

我不知道是什么问题