问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1581 浏览

apache-kafka - 在卡夫卡处理经纪人

我在异步模式下使用 kafka 生产者,但是当所有代理都关闭时,它就像同步一样,它一直等到 metadata.fetch.timeout.ms 过期,我的情况是 60 秒。我的第一个问题,这是正常行为还是我做错了什么?

由于我的逻辑中的事务最多应在 100 毫秒内完成,因此这个超时值对我来说是一个非常大的延迟。也许将 metadata.fetch.timeout.ms 设置为 10 ms 可能会解决我的问题,但我不确定这对我的系统有何影响。这是否会导致某个地方出现瓶颈或大量消耗 CPU?

另一种可能的解决方案可能是在 executorservice 中生成消息,这使得生成真正异步,但我不想让事情变得更复杂。以前有人试过吗?

我的最后一个问题是,如果所有代理都关闭,我是否可以使用切换机制禁用对 kafka 的生产,如果所有代理都启动,则启用。kafka 中的心跳问题是否有任何功能?

谢谢。

0 投票
2 回答
2575 浏览

apache-kafka - How to set the max queue time in Kafka 0.9 producer?

How to setup the max queue time in Kafka v0.9?

My understanding is that the Kafka producer will try to buffer the messages in a queue to a certain size and then send the messages in the queue as batch to broker. However in Kafka v0.8, the configuration of "queue.buffering.max.ms" will send the messages in the queue when it reaches the time limit, even if they are smaller than the buffer size.

The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms).

from Kafka 0.9 document: http://kafka.apache.org/documentation.html#design_asyncsend. (This section is the same as Kafka 0.8.2)

But I didn't find the configuration of "queue.buffering.max.ms" or equivalent in the v0.9 producer configuration section http://kafka.apache.org/documentation.html#producerconfigs

Is there still a way to configure this in Kafka v0.9 or I have to call KafkaProducer.close() or KafkaProducer.flush() to do so?

0 投票
1 回答
1055 浏览

java - 我是卡夫卡的新手。我正在创建的消费者没有收到生产者发布的消息

我正在创建消费者(其中包含单个消费者的消费者组):

我为同一主题订阅了 2 个消费者。我通过存储其未来对象然后调用 consumerFuture.cancel(Boolean.TRUE); 来取消订阅消费者

现在我用上面的代码再次订阅了同一个消费者,它成功注册了。但是,当发布者现在发布时,新订阅的消费者没有收到消息,而另一个注册的消费者正在收到消息

我也在检查消费者的偏移量,当生产者发布但消费者没有收到消息时,它们会得到更新。生产前:

组主题 Pid 偏移 logSize 滞后

A T1 0 94 94 1

组主题 Pid 偏移 logSize 滞后

B T1 0 94 94 1

生产后:

组主题 Pid 偏移 logSize 滞后

A T1 0 95 97 2

组主题 Pid 偏移 logSize 滞后

B T1 0 94 97 2

我无法弄清楚这是否是生产者方面的问题(分区不够)或者我是否以不正确的方式创建了消费者此外,我无法弄清楚日志和滞后列在此意味着什么。

让我知道是否有人可以提供帮助或需要更多详细信息。

0 投票
0 回答
644 浏览

java - kafka生产者api错误线程主kafka.common.FailedToSendMessageException中的异常

我的所有生产者 api 程序都收到此错误,我试图解决它,但我无法找到问题所在。

请帮助我。

提前致谢。

0 投票
2 回答
1253 浏览

apache-kafka - 单个分区上有多个主题?

我只是好奇,找不到任何有关此的信息。我的问题是单个分区上可以有多个主题吗?如果是,它们将如何在该分区中生产或稍后被消费者消费?还是一个分区总是持有一个主题?

0 投票
1 回答
5534 浏览

scala - kafka ProducerRecord 和 KeyedMessage 有什么区别

我正在测量 kafka 生产者生产者的表现。目前我遇到了两个配置和使用略有不同的客户:

常见的:

第一个客户:

用法:

第二个客户:

用法:

我的问题是:

  • 2个客户有什么区别?
  • 对于大规模应用程序,我应该配置哪些属性,以实现最佳的高重写入性能?
0 投票
1 回答
11115 浏览

apache-kafka - 创建 Kafka 主题导致没有领导者

我正在使用 Kafka v0.9.0.1 (Scala v2.11) 和com.101tec:zkclientv0.7。我正在尝试使用AdminUtils创建一个 kafka 主题。我的代码如下。

该主题实际上是通过以下命令验证创建的。

但是,输出并不像预期的那样。

如果我使用脚本。

然后我看到以下内容。

关于我做错了什么的任何想法?效果是,如果我使用 aProducer向主题发送 a ProducerRecord,则主题上不会显示任何内容。

0 投票
1 回答
1064 浏览

java - Kafka生产者从Java为主题创建分区

我正在创建一个 kafka 生产者,并且我想在根据消息类型发送消息时指定 partitionId。我的 Mysql 表中有 type-partitonId 映射。如果是新消息类型,我必须创建新分区并将该 ID 保存到 Mysql,以便下次相同类型出现时它可以直接向该 ID 发送消息。如何为主题创建新分区并获取创建的 partitionId ?我正在使用最新的 kafka API .9 和 Java 1.8。

0 投票
0 回答
183 浏览

apache-kafka - Kafka 生产者 Java api 与 zookeeper 集成

使用 Kafka Nodejs 客户端(nodejs 是 kafka 生产者),我们可以找到带有 zookeeper 集成的 kafka-node。是否有与 Kafka 生产者类似的 Java 包与 Zookeeper 的集成。目的是Java可以只和zookeeper对话,让zookeeper去寻找kafka broker,这样即使一个broker宕机了,zookeeper也会建议producer client去和新选择的primary broker对话。

0 投票
1 回答
1813 浏览

hadoop - 基于时间的桶记录(kafka-hdfs-connector)

我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中。虽然我能够成功地做到这一点,但我想知道如何根据时间间隔存储传入的数据。例如,我想每 5 分钟创建一个新分区。

我用partition.duration.ms尝试了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner但我认为我做错了。我在 Hive 表中只看到一个分区,所有数据都进入该特定分区。像这样的东西:

所有的 avro 对象都被复制到这个分区中。

相反,我想要这样的东西:

最初,连接器将创建路径year=2016/month=03/day=15/hour=19/minute=03并将在接下来的 5 分钟内继续将所有传入数据复制到此目录中,并在第 6 分钟开始时复制应该创建一个新路径,即year=2016/month=03/day=15/hour=19/minute=08并将接下来 5 分钟的数据复制到该目录中,依此类推。

这是我的配置文件的样子:

如果有人能指出我正确的方向,那将非常有帮助。如果需要,我很乐意分享更多细节。不想让这个问题看起来像一个永无止境的问题。

非常感谢!