问题标签 [kafka-producer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3443 浏览

apache-camel - ERROR 由于错误(kafka.network.Processor)而关闭 /127.0.0.1 的套接字

我是 apache camel 和 apache kafka 的新手,并为我的项目做一个小的 POC。尝试使用 Camel-kafka 组件从 kafka 读取时,我收到以下问题错误日志。

我的java代码如下:

}

我从 kafka 生产者控制台工具中放了一些 txt,并尝试使用 kafka 的骆驼组件进行阅读。

0 投票
1 回答
1600 浏览

apache-kafka - 无法在独立的 Kafka 代理上创建主题

我正在尝试在 Kafka 服务器上创建一个新主题。但低于错误。请不要将其作为仅设置一个经纪人的独立系统。它之前工作正常,我改变的只是要创建的新主题名称。突然怎么了?以及它之前是如何工作的?

谢谢你。~沙

0 投票
1 回答
546 浏览

c - 用C将二进制结构发送到kafka

我正在创建一个基于 kafka 的消息系统,它必须在 kafka 中转储一个复杂的 C 结构。首先,我试图通过创建一个简单的结构并发送它来做一些简单的事情:

在生产者中:

在消费者中,我定义了相同的结构,但随后我使用:

但它打印垃圾。

i,j 是 1,1

% 消息(偏移量 0,8 个字节):

消息有效负载 hexdump(8 个字节):

00000000: 01 00 00 00 01 00 00 00

结构 p:-469758844 32530


i,j 为 256,256

% 消息(偏移量 1,8 个字节):

消息有效负载 hexdump(8 个字节):

00000000: 80 00 00 00 80 00 00 00

结构 p:-469758810 32530


i,j 是 4294967295(最大整数),4294967295(最大整数)

% 消息(偏移量 2,8 字节):

消息有效负载 hexdump(8 个字节):

00000000: ff ff ff ff ff ff ff ff

结构 p:-469758776 32530


i,j 为 0,0

% 消息(偏移量 3,8 字节):

消息有效负载 hexdump(8 个字节):

00000000: 00 00 00 00 00 00 00 00

结构 p:-469758742 32530


如何正确转储并获取结构?我知道我可能会序列化数据,但将来我将不得不发送一个更复杂的不透明结构。此外,通过序列化它,我可能会使用更多的内存。

谢谢

0 投票
2 回答
127 浏览

apache-kafka - Apache Kafka 的分区和复制

我已从建议的网站http://kafka.apache.org/阅读了整个文档,但无法理解硬件要求

1)我需要澄清:为单个主题每天收集至少 50GB 数据需要多少个分区和复制

2)假设0000000000000.log文件最多可以存储100GB的数据。是否可以减小此日志文件大小以减少 I/O 的使用?

0 投票
1 回答
2166 浏览

scala - KafkaProducer 用于键控和非键控 ProducerRecords

我在 Scala 中使用 0.9 Kafka Java 客户端。

ProducerRecord有几个构造函数允许您包含或不包含键和/或分区。

应该没有问题。

但是,未键入ProducerRecord会产生类型错误。

这是否违反了 Kafka 的规则,或者它可能是在 Scala 中使用这个 Java API 所带来的不必要的预防措施?

更根本的是,将键控消息和非键控消息放在同一个 Kafka 主题中是不是很糟糕?

谢谢

Javadoc: http: //kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/package-summary.html


编辑

可以改变参数的方差来K解决KafkaProducer这个问题吗?

0 投票
1 回答
3053 浏览

java - kafka-connect 错误:找不到或加载主类

我正在关注官方文档来实现kakf-connect文件中读取数据。

我有卡夫卡完美运行。生产者和消费者发送和接收消息。

但是,当我运行以下命令时:

我收到以下错误:

错误:无法找到或加载主类 org.apache.kafka.connect.cli.ConnectStandalone

我交叉检查,我有文件ConnectStandalone here connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone

connect-file-source.properties的如下:

有什么遗漏吗?我应该怎么做才能摆脱这个错误?

0 投票
1 回答
503 浏览

java - java 1.6 上的 Kafka Producer 错误

我正在尝试在 java 1.6 上运行 kafka 生产者,我使用的是 0.8.1.1 版本。它正在运行,但是当它尝试发送它给出的消息时

如果我在 java 1.8 上运行相同的代码,它可以工作并且我能够生成消息。

任何想法 ?

0 投票
1 回答
1497 浏览

apache-kafka - 有没有办法在kafka中原子批量生产?

我有一个来源,我可以从那里收到一批消息。这些消息需要添加到 Kafka - 可靠 - 没有遗漏并且没有乱序。

如果我使用 aync 生产者,当我放置许多消息时,我想知道一个分区是否关闭了一段时间,它会跳过该消息并放置下一条消息 - 这将导致消息无序。

有没有办法,我可以告诉卡夫卡 - 批量生成一组消息,或者原子地通过一切失败的一切?

*我不想做同步生产,因为它会严重影响吞吐量。

0 投票
2 回答
2245 浏览

apache-kafka - Kafka Async Producer 发生故障时如何保证排序?

如果我使用的是 Kafka 异步生产者,假设缓冲区中有 X 条消息。当它们在客户端实际处理时,如果代理或特定分区关闭了一段时间,kafka 客户端将重试,如果一条消息失败,它是否会将特定消息标记为失败并继续下一条消息(这可能导致乱序消息)?或者,为了保持顺序,它是否会使批处理中的剩余消息失败?

我接下来要保持顺序,所以理想情况下希望 kafka 从失败的地方使批次失败,所以我可以从失败点重试,我将如何实现?

0 投票
3 回答
9247 浏览

log4j - 来自 Kafka Producer 的过多控制台消息

您如何控制 Kafka 生产者或消费者的控制台日志记录级别?我在 Scala 中使用 Kafka 0.9 API。

每次调用时,控制台都会给出如下输出sendKafkaProducer这是否表明我没有KafkaProducer正确设置,而不仅仅是过度日志记录的问题?

Kafka 服务器和 Zookeeper 查看的文件中有日志记录配置properties,但我假设这些不会影响 Kafka 客户端。更改这些文件中的一些日志记录配置,并重新启动 Kafka 服务器和 Zookeeper 以重新加载这些文件,并没有解决问题。

谢谢