问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
117828 浏览

apache-kafka - 向 Kafka 发送消息时是否需要密钥?

目前,我正在发送没有任何密钥的消息作为密钥消息的一部分,它仍然可以使用delete.retention.ms吗?我需要将密钥作为消息的一部分发送吗?将密钥作为消息的一部分这样好吗?

0 投票
1 回答
1163 浏览

java - Kafka 2.9.1 生产者 0.8.2.1 编译与运行时依赖项

因此,API 0.8.2 中生产者的 Kakfa 配置属性发生了变化;在过去工作并让我的 java Producer 编译之后,我得到了一个异常。生产者针对我的 Kafka_2.9.1-0.8.2.1 集群的节点,我得到了关于DefaultSerializer未实例化的异常:

鉴于这是在 Kakfa 中实现的,我想知道将 Kafka 作为依赖项进行编译是否不够,因为我可能需要在运行时打包一个或多个 Kafka jar。我还没有找到关于此的文档(最新的或其他的)。是否有我缺少的生产者运行时 jar?

作为参考,我在build.gradle这里包括了我的(有点乱)。编译中的排除项是在已经收到此错误之后的新增内容,因此无论是否依赖项块中的这些行都会发生错误。我确实尝试只依赖kafka-client0.8.2 的模块,但我认为这不适用于制作人。这是文件:

0 投票
2 回答
21772 浏览

json - Kafka 序列化器 JSON

我是 Kafka、序列化和 JSON 的新手

我想要的是生产者通过 kafka 发送 JSON 文件,消费者以原始文件形式使用和使用 JSON 文件。

我能够得到它,因此 JSON 转换为字符串并通过字符串序列化器发送,然后消费者将解析字符串并重新创建 JSON 对象,但我担心这不是有效的或正确的方法(可能会丢失字段类型对于 JSON)

所以我考虑制作一个 JSON 序列化程序并将其设置在我的生产者的配置中。

我在这里使用了 JsonEncoder:Kafka:编写自定义序列化程序

但是当我现在尝试运行我的生产者时,似乎在编码器的 toBytes 函数中,try 块永远不会像我想要的那样返回任何东西

似乎objectMapper.writeValueAsString(object).getBytes(); 获取我的 JSON obj ( {"name":"Kate","age":25}) 并将其转换为空,

这是我的制片人的运行功能

我错过了什么?我的原始方法(转换为字符串并发送然后重建 JSON obj)可以吗?还是不是正确的方法?

谢谢!

0 投票
1 回答
5132 浏览

java - Apache Kafka Producer 错误:3 次尝试后发送消息失败

我们在 Linux Ubuntu 服务器上安装了 kafka,并使用批处理文件 - kafka-console-producer.shkafka-console-consumer.sh测试了通信,发现我们可以发布和接收消息

在同一网络上运行的Windows 机器上。我们写了一个java生产者客户端,代码如下

当我们运行客户端时,我们收到以下错误

log4j:WARN 找不到记录器的附加程序(kafka.utils.VerifiableProperties)。log4j:WARN 请正确初始化 log4j 系统。log4j:WARN 有关详细信息,请参阅 http://logging.apache.org/log4j/1.2/faq.html#noconfig。3 次尝试后发送消息失败。

我们尝试了以下

  1. 从 Windows 机器 ping ubuntu 机器,它似乎工作正常
  2. 尝试了Apache Kafka 示例错误中的解决方案:在 3 次尝试后无法发送消息,但它不起作用

我们观察到一件奇怪的事情,当我们在服务器上运行以下命令时 - bin/kafka-topics.sh --list --zookeeper localhost:2181 ,我们发现主题是从Java代码创建的,但消息没有发布

任何帮助表示赞赏

0 投票
1 回答
1396 浏览

java - 当消费者情绪低落时写一个 kafka 主题

我试图整合卡夫卡风暴。我刚开始举几个例子。

我能够从 GitHub 运行示例。接下来,我尝试在 Eclipse 中编写一个 Producer 类,以使用 KAFKA PRODUCER API 将消息发布到 kafka 主题。

场景1:

当我的消费者外壳使用说主题测试运行时,我运行我的生产者类。我能够看到我的消费者外壳与所有已发布的消息。

情景2

我还没有启动我的消费者外壳(说消费者已关闭)。我经营我的制作人课程。消息正在发布到 kafka。

现在,如果消息已发布,现在如果我启动消费者 shell,则在停机后,它不会读取已发布的消息主题。

为什么?我想它会维护主题消费的日志。不应该是看消息吗?

有没有我需要提及的配置参数?

或者我需要做些什么来改变消费者。我正在使用包中提供的消费者外壳,并使用它启动它

0 投票
3 回答
368 浏览

apache-kafka - Kafka 生产者行为

我在本地主机上设置了 Kafka,并尝试在出现网络问题时监控 Kafka Producer 的行为。

即使所有经纪人都关闭了,生产者也没有给出任何错误。我正在使用同步生产者和 Kafka 版本 0.8。

如果所有经纪人都关闭了,生产者是否可以收到异常?

0 投票
6 回答
21739 浏览

apache-kafka - 将消息发布到 Kafka 主题时出错

我是 Kafka 的新手,并尝试为它设置环境。我正在尝试运行单个节点 Kafka,但我遇到了错误。

在mac上按照以下步骤

但我收到以下错误。请让我知道,如果我错过了什么

谢谢

0 投票
4 回答
77506 浏览

file - 如何将文件写入 Kafka Producer

我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入。下载 Kafka 后,我执行了以下步骤:

启动动物园管理员:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为“test”的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行制片人:

消费者聆听:

我想将一个数据文件甚至一个简单的文本文件传递给消费者可以直接看到的生产者,而不是标准输入。任何帮助将不胜感激。谢谢!

0 投票
1 回答
2516 浏览

java - Kafka集群zookeeper故障处理

我将实现一个由 3 台机器组成的 kafka 集群,一台用于 zookeeper,另外 2 台作为代理。我有大约 6 台消费者机器和大约一百台生产者。

现在,如果其中一个代理失败,由于复制功能,可以避免数据丢失。但是如果zo​​okeeper失败,同一台机器无法启动怎么办?我有几个问题:

  1. 我注意到,即使在 zookeeper 失败后,生产者仍继续在指定的代理中推送消息。但消费者再也无法取回它们。因为消费者没有注册。那么在这种情况下,数据会永久丢失吗?
  2. 如何在运行时更改代理配置中的 zookeeper ip?他们是否必须关闭才能更改 zookeeper ip?
  3. 即使以某种方式将新的 Zookeeper 机器带入集群,以前的数据也会丢失吗?
0 投票
1 回答
1522 浏览

apache-kafka - 我可以在三台不同的机器上安装生产者、代理和消费者吗?

我对卡夫卡很陌生。我有以下架构:

我很困惑