0

我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我写信<String, String>给这个并努力在 10 秒的窗口中聚合这些值。

我收到的错误消息:

DEBUG  o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG  org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG  o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.

cluster metadata# 不断前进。

代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

KTable<Windowed<String>, String>  dbwriteTable = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate", 10000));

dbwriteTable.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

当有任何事情发生时,在哪里DBAggregateInit并被DBAggregate存根以记录到 DEBUG。没有其他功能。

这些存根函数都没有受到影响。

不知道我在这里错过了哪些步骤。如果我.foreach()或对该主题进行简单的阅读,它似乎可以正常工作。

FWIW:

当我让 kafka 创建主题而不是使用kafka-topic --create --topic ....

4

1 回答 1

0

我相信这种错误是由于我以不同的用户身份运行 zookeeper 和 kafka,并且各种数据文件夹中可能存在权限问题。

一旦这两个服务都以 root 身份运行并且所有相关的数据文件都被删除/重新创建,这些错误就消失了。

于 2016-08-23T14:55:05.743 回答