问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
6097 浏览

java - Kafka 0.8.2.2 - Unable to publish messages

We have written a java client for publishing message to kafka. The code is as shown below

When we execute this code , we get the following message and exception

This happens in a infinite loop and the application hangs... When we checked the kafka broker , we found that the topic was created... but we did not get the message... We have been stuck on this for a while... Please help

0 投票
1 回答
325 浏览

java - Kafka 附加分区键或无键分区器

我是 Kafka 的新手,遇到了一些问题。我知道我们可以使用自己的分区逻辑实现 Partitioner 类,该逻辑根据消息键返回特定的分区。我们还可以设置日志压缩策略,在该策略下仅存储具有相同密钥的最新版本的消息。但是我需要为此操作提供不同的消息键。例如,我们有具有 id 和地址(city_id)的实体。我想根据 city_id 选择分区并仅存储有关具有相同 id 的人的最新信息。有没有办法解决这个问题?对不起我的英语不好,我真的很想学习卡夫卡。

0 投票
4 回答
24183 浏览

amazon-ec2 - Kafka:如何连接 kafka-console-consumer 以获取远程代理主题内容?

我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,并试图从另一台机器上使用主题内容。端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放。我什至可以做一个bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remotetopic给我的

主题:remotetopic PartitionCount:1 ReplicationFactor:3 配置:主题:remotetopic 分区:0 领导者:2 副本:2,0,1 Isr:2,0,1 Blockquote

但是,当我这样做bin/kafka-console-consumer.sh --zookeeper 172.X.X.X:2181 --from-beginning --topic remotetopic

WARN 从代理 [id:0,host: localhost ,port:9092]获取主题 [Set(remotetopic)] 的相关 id 为 0 的主题元数据失败 (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException

为什么消费者试图从本地主机读取?是否有任何选项或命令行或默认文件从中读取;我可以改变它吗?

任何帮助,将不胜感激!

0 投票
1 回答
5193 浏览

apache-kafka - Kafka 代理是否为每条消息或每个消息批次发送 ACK?

我正在使用 Kafka 0.8.2。正如文件所说:

batch.num.messages指定:

使用异步模式时一批发送的消息数。生产者将等待直到此数量的消息准备好发送或达到 queue.buffer.max.ms。

request.required.acks控制代理对请求的确认。

我想知道 Kafka 代理如何发送此确认,它是为批处理发送 ACK,还是为每个单独的消息发送 ACK?

0 投票
1 回答
1749 浏览

scala - Kafka 生产者无法从代理获取元数据

我在三个单独的节点上托管 Kafka,每个节点都有本地 Zookeeper 实例。在制作主题live时,我得到以下异常。metadata.brokers.list未设置为localhost, 但kafka1.domain.com, kafka2.domain.com, kafka3.domain.com. advertised.host.name设置为 Amazon 实例的本地 IP 地址。Kafka 设置中没有引用localhost

这是生产者配置:

每个都会producer.send产生异常。

有谁知道可能出了什么问题?

0 投票
3 回答
2970 浏览

java - Kafka 消费者无法接收序列化对象?

所以我想实现一个简单的应用程序,将通知 kafka 生产者发送给 kafka 消费者。到目前为止,我已经成功地将字符串消息发送给生产者给消费者。但是当我尝试发送通知对象时,kafka 消费者没有收到任何对象。这是代码我用过。

这是制片人

这是消费者

最后我使用customserializer 序列化和反序列化对象。

有人可以告诉我是什么问题吗?这是正确的方法吗?

0 投票
1 回答
14379 浏览

apache-kafka - 卡夫卡有重复的消息

我在生产或使用数据时没有看到任何失败,但是生产中有一堆重复的消息。对于一个收到大约 100k 消息的小主题,有大约 4k 重复,尽管就像我说的没有失败,最重要的是没有实现重试逻辑或设置配置值。

我还检查了这些重复消息的偏移值,每个消息都有不同的值,这告诉我问题出在生产者身上。

任何帮助将不胜感激

0 投票
3 回答
23254 浏览

java - 如果消息是由生产者生成的,如何从 Kafka 代理获得确认?

当我产生消息时,我想从经纪人那里得到一些回应。我已经尝试过使用的 CallBack 机制(通过实现 CallBack),KafkaProducer.send但它不起作用并且不调用onCompletion方法。

当我关闭 Kafka 服务器并尝试生成消息时,它会调用回调方法。

有没有其他方法可以得到认可?

我正在使用带有代理版本 0.8.2 的 kafka-client api 0.9.1。

0 投票
1 回答
6056 浏览

apache-kafka - Java consumer group missing?

I recently set up a test Kafka cluster. I am running a consumer group listening on items and things seem to work. The name of consumer group is default. What surprises me is that listing consumer groups gives me an empty list:

Also, explicitly querying the offsets doesn't yield anything:

Do I need to manually create a consumer group using kafka-consumer-groups.sh —new-consumer to be able to track its offsets?

0 投票
1 回答
740 浏览

spring - 使用 Spring 集成和 Kafka 的 EvaluationContext Null

我试图在 Spring Integration 中定义一个简单的消息流,它从一个通道读取,然后将消息转储到 Kafka 队列中。为此,我正在使用spring-integration-kafka。问题是我得到一个EvaluationContext我无法破译的错误。

这是我在 XML 中的配置:

当我通过 Spring Boot 运行我的应用程序时,我得到了这个异常:

创建名为“org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0”的bean时出错:调用init方法失败;嵌套异常是 java.lang.IllegalArgumentException: [Assertion failed] - 此参数是必需的;它不能为空

这是堆栈跟踪中的违规行:

在 org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:68)

这是第 68 行发生的情况:

Assert.notNull(this.evaluationContext);

所以EvaluationContext是空的。我不知道为什么。

顺便说一句,当我用一个普通的端点替换 Kafka 端点stdout来打印消息正文时,一切正常。

你能告诉我我的 Kafka 端点配置有什么问题,没有EvaluationContext可用的吗?