问题标签 [kafka-producer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 如何将对象发送到 Kafka
我无法从我的应用程序向 kafka 发送短信,但是当我尝试发送对象(事件)时,我收到一个异常:
信息:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误 [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0];嵌套异常是 java.lang.NullPointerException: in org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent null of org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent, headers={id=9068de1b- a1cc-c1b5-463a-107b7be21af4,时间戳=1456754038408}] 1000
欢迎任何帮助...
apache-kafka - 一个代理关闭后,kafka 新生产者无法更新元数据
我有一个 kafka 环境,其中有 2 个经纪人和 1 个动物园管理员。
当我尝试向 kafka 生成消息时,如果我停止代理 1(它是领导者),客户端停止生成消息并给我以下错误,尽管代理 2 被选为主题和分区的新领导者。
org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败。
10 分钟后,由于代理 2 是新的领导者,我希望生产者向代理 2 发送数据,但它继续失败并给出上述异常。lastRefreshMs 和 lastSuccessfullRefreshMs 仍然相同,尽管生产者的 metadataExpireMs 为 300000。
我在生产者端使用 kafka new Producer 实现。
似乎在启动生产者时,它会绑定到一个代理,如果该代理出现故障,它甚至不会尝试连接到集群中的另一个代理。
但我的期望是,如果一个代理出现故障,它应该直接检查其他可用代理的元数据并将数据发送给他们。
顺便说一句,我的主题是 4 个分区,复制因子为 2。提供此信息以防万一。
配置参数。
用例:
1- 启动 BR1 和 BR2 产生数据(Leader 是 BR1)
2-停止 BR2 产生数据(罚款)
3-停止BR1(这意味着此时集群中没有活动的工作代理)然后启动BR2并产生数据(虽然领导者是BR2但失败了)
4-开始BR1生产数据(领导者仍然是BR2,但数据生产良好)
5-停止BR2(现在BR1是领导者)
6-停止BR1(BR1仍然是领导者)
7-开始BR1产生数据(消息再次产生良好)
如果生产者向 BR1 发送了最新的成功数据,然后所有的 broker 都挂了,生产者希望 BR1 重新起床,尽管 BR2 已经起床并且是新的领导者。这是预期的行为吗?
apache-kafka - 为什么Kafka Producer不能连接到zookeeper来获取broker元数据而不是连接到brokers
Kafka 生产者需要一个 kafka 代理的引导列表才能工作。根据这个解释,它需要它才能连接到其中一个代理,然后获取有关集群中所有实时代理的元数据。
现在,所有代理都已在 Zookeeper 中注册,Kafka 消费者连接到 ZK,ZK 处理来自哪个代理,哪个分区是要读取的数据。当 ZK 已经拥有所有信息时,为什么 Producers 也不能连接到 ZK?
我看到有一些关于此的 SO 问题,但它们似乎解释了为什么消费者需要 ZK 而不是为什么 Producer 需要引导代理列表而不是 ZK?
spring - Kafka Producer 未使用 Spring 在 SSL 上生成消息
我正在尝试将 KAFKA 与 Spring 集成,我的 JAVA 应用程序正在与 KAFKA 服务器通信,并且当我使用 HTTP 运行应用程序时,我也在接收消息。
现在我想使用 Spring 在 KAFKA 上添加 SSL,并且我已经完成了SSL KAFKA和SPRING KAFKA上指定的更改
当我使用命令行(使用 SSL)运行生产者和消费者时,通信正常进行,但是当我更改 Java 应用程序的配置并尝试在主题上生成和使用消息时,消费者工作正常,我正在收到消息指定的主题。
但是生产者没有按预期工作从 JAVA 应用程序生产者发送的消息在消费者(无论是命令行还是 JAVA 应用程序消费者)上都没有收到。
我已经用谷歌搜索了它,但没有找到答案。任何指针?
PS:KAFKA Version kafka 0.9.0.0, JAVA 7, Spring使用的SSL属性如下:
java - Kafka-Producers 打开的文件太多
使用 Kafka Producer 出现此错误
打开的文件太多
我们通过租户 ID 缓存生产者,使用相同的生产者发送消息。
不关闭连接..
apache-kafka - Kafka:在多服务器设置上创建主题时出现 org.apache.zookeeper.KeeperException$NoNodeException
我正在尝试Kafka-0.8.2.2
在不同的机器上设置具有 1 个生产者、1 个消费者和 3 个代理的多节点集群。
在创建主题时producer
,我收到错误消息org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids
。完整的控制台输出可在此处获得。Kafka Producer
的日志中没有错误。
我用来运行的命令Kafka
是:
注意:Zookeeper 服务在所有服务器上运行,并且所有三个代理都在运行 Kafka 服务器(只有代理需要 Kafka 服务器。对吗?)。
我的producer.properties的配置如下:
以下是我用作参考的许多文章中的一些:
perl - kafka:无法绑定:主题错误
当我增加卡夫卡生产者的数量时,我遇到了错误。任何人都知道这里可能是什么问题?
请找到我的制作人设置: https ://gist.github.com/Vibhuti/dbf1c24962b91f2bc217
错误日志:
作为参考,我的卡夫卡代码:
logging - 如何为 Kafka 生产者配置日志记录?
我正在使用 Kafka 生产者客户端,我的项目中没有任何 log4j 配置。
在运行时,程序会打印出很多我真的不想要的 Kafka 调试日志。
因此,我尝试添加一个 log4j.properties 以将日志级别设置为 ERROR,如下所示,这似乎不起作用:
如何更改 Kafka 日志级别?
spring-integration - 看不到从 Kafka 生产者发布的消息(Spring 集成)
我在 Java Rest api 中使用 Spring Integration 将消息生成到 Kafka 中。 https://github.com/spring-projects/spring-integration-kafka
当我说
这返回true,这意味着它的发布..但无法在主题日志中看到消息..我检查了 /usr/local/var/lib/kafka-logs/test-0 下的日志
即使我运行了消费者控制台,但我无法看到从 api 发布的消息..
但是如果我从生产者控制台生成消息,我可以在主题日志和消费者控制台中看到消息。
我在 spring 集成 rest api 中使用相同的代理列表和主题名称。
我的弹簧配置出站 xml ..
和监听器实现..
我尝试调试它甚至没有进入 onError 函数。
apache-spark - spark中的log4j不打印消息也不将它们附加到Kafka
我想使用 log4j 记录在我的 spark 流式处理过程中发生的业务异常。虽然我想集中概览我的所有日志(不是集群中每个节点上的日志的一部分),但我想将日志发送到 Kafka,然后创建 Kafka 消费者,它将它们聚合在一起。
我创建 log4j.properties 文件并将其放在 main/resources 中。它看起来像这样:
为了避免发出序列化,我也这样做了:
在可序列化的对象内。在同一个可序列化对象中是我所有的方法,它们使用 kafkaLogger。
在方法中,我编写了 kafkaLogger.warn(message),并尝试记录某些消息,但我没有看到它们写在控制台中,也没有写在 Kafka 中。
有什么建议么?
[编辑] 我已经意识到我的自定义 log4j.properties 是使用的(因为没有打印低于 ERROR 的消息,因为它是在根记录器中设置的)。但是,没有任何自定义消息被打印到输出中,也没有被添加到 Kafka。
我不知道是什么问题