问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2337 浏览

apache-kafka - 为kafka在线添加分区或代理是否安全?

谢谢你的耐心。

  1. 在线为某个主题添加分区后,kafka 消费者停止读取消息,不抛出异常。消费者只是阻止。每次我们都要重启消费者。我认为这是不合理的,我找不到任何关于它的文档。

此外,当处理消息发生错误时,消费者线程将不会恢复。我们的消费者读取消息并将其插入 MySql。一旦网络出现故障,消费者无法连接到 MySql,然后它会阻塞并停止读取消息,直到我们重新启动它。

  1. 添加分区时,旧数据和新数据会发生什么变化?文档(<a href="https://kafka.apache.org/documentation.html#basic_ops_modify_topic" rel="nofollow">https://kafka.apache.org/documentation.html#basic_ops_modify_topic)说:

“请注意,分区的一个用例是对数据进行语义分区,并且添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,这可能会打扰消费者。也就是说,如果数据按哈希(键)分区% number_of_partitions 那么这个分区可能会通过添加分区而被打乱,但 Kafka 不会尝试以任何方式自动重新分配数据。”

“不尝试自动重新分配数据”是什么意思?旧数据不变,新数据不会发送到添加的分区?

  1. 当代理关闭时,kafka 生产者无法发送消息。

我们有一个包含 3 个分区和 2 个副本的主题。kafka 集群有 3 个代理。但是当一个broker宕机时,就会出现异常:

添加新代理时也会出现同样的问题。我们必须在生产者的“metadata.broker.list”配置中添加新的代理主机名和端口并重新启动它。

我们使用的是高级api,kafka的版本是:

生产者配置:

消费者配置:

生产者代码和消费者代码如下: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group +示例

0 投票
1 回答
1956 浏览

apache-kafka - 卡夫卡抛出 java.lang.NoSuchMethodException

我已经编写了自己的序列化程序来将 java 对象发布到主题。我将 serializer.class 属性设置为我的自定义序列化程序。运行生产者时,我得到以下异常。有人可以帮我吗?

例外

我的制作人

0 投票
2 回答
219 浏览

apache-kafka - kafka复制分区中的PartitionId

我正在实现具有多个分区的单个主题的 kafka 生产者。我正在选择消息通过消息中的特定值(消息 json 中的 feedName 属性值)进入哪个分区。我正在为 feedName - partitionId 映射维护一个 SQL 表。我的问题是领导者和副本的分区 ID 是否相同?如果不同,我如何才能在所有代理中唯一标识一个分区?

0 投票
0 回答
162 浏览

hadoop - Storm - AutoHBase [错误] 无法从凭证映射中获取凭证

我在 Kerberized 环境中创建并部署了 Storm 拓扑。我们有一个可以在 HBase 和 HDFS 中写入数据的螺栓,但它无法获取 HBase 和 HDFS 凭据。

我之前遇到了同样的错误,但通过重新启动集群得到了解决。我怀疑这是否是正确的解决方案。

我也遇到过类似的情况,其中螺栓工作正常,但在集群几天未使用后突然出现异常。

这些凭据是否缓存在临时目录中?

请在下面找到相同的工作日志:

0 投票
2 回答
13107 浏览

apache-kafka - 不同线程使用 Kafka Producer

我的基于 java 的 Web 应用程序有 kafka 生产者,可以将消息推送到 Kafka。根据文档,我可以看到 kafka 生产者是线程安全的。这是否意味着我可以拥有 Kafka 生产者的单个实例并由不同的线程(网络请求)使用它,在我的情况下,每个线程都会打开和关闭生产者。这会产生任何问题吗?还是更好地根据请求启动生产者?

0 投票
1 回答
5529 浏览

apache-kafka - 如何克服 kafka.consumer.ConsumerTimeoutException?

我正在使用 kafka 2.11 版本来编写消费者。我不断收到超时异常。我不确定我在这里使用了正确的 API

有谁能够帮我?

执行者

线

0 投票
1 回答
762 浏览

java - Kafka如何知道Java主题的最新创建的分区ID

我正在使用 AdminUtils 为主题动态创建分区。我必须获取新创建的分区的 partitionId 才能将其保存在 Mysql 中以用于我的业务逻辑。我怎样才能做到这一点?我当前的代码:

上面的问题是最新创建的分区没有出现在 PartitionInfo 列表中。不知道为什么。producer 是 Kafka Producer API。

0 投票
1 回答
834 浏览

apache-kafka - Kafka 消费者返回空迭代器

在我的示例程序中,我尝试发布一个文件并尝试立即使用它。但我的消费者迭代器返回 null。知道我做错了什么吗?

测试

消息监听器

监听线程

在上面的迭代器中,它不会进入 while 循环,因为迭代器是空的。但我确定我正在向同一个主题发布一条消息,并且消费者会收听该主题。

任何帮助,将不胜感激

0 投票
2 回答
1192 浏览

java - 保证将多条消息传递到 Kafka 集群

如果我连续向 Kafka 集群发布多条消息(使用新的 Producer API),我会Future从生产者那里为每条消息获取一个。

现在,假设我已经将我的生产者配置为拥有max.in.flight.requests.per.connection = 1并且retries > 0我可以等待最后一个未来并确定所有以前的都已经交付(并且按顺序)?还是我需要等待所有期货?在代码中,我可以这样做

而不是这个:

并请放心,如果这里没有发现任何内容(来自第一个片段):

然后我所有的消息都按顺序存储在集群中(无论生产者是否在后台执行了任何重试),如果出现问题,那么即使它不是最后一个未来,我也会在那里得到一个异常(我是等着)那第一次遇到的问题?

还有更多奇怪的角落案例需要注意吗?

0 投票
1 回答
3585 浏览

apache-kafka - org.apache.kafka.common.errors.ApiException:会话超时不在可接受的范围内

我在使用 kafka 0.9.0.1 的消费者中遇到了上述异常。根据这个线程 [1],我在 server.properties 文件中看不到“group.max.session.timeout.ms”属性。

有什么线索吗?

[1] http://comments.gmane.org/gmane.comp.apache.kafka.user/12426