我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我写信<String, String>
给这个并努力在 10 秒的窗口中聚合这些值。
我收到的错误消息:
DEBUG o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.
cluster metadata
# 不断前进。
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> dbwriteTable = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate", 10000));
dbwriteTable.toStream().print();
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
当有任何事情发生时,在哪里DBAggregateInit
并被DBAggregate
存根以记录到 DEBUG。没有其他功能。
这些存根函数都没有受到影响。
不知道我在这里错过了哪些步骤。如果我.foreach()
或对该主题进行简单的阅读,它似乎可以正常工作。
FWIW:
当我让 kafka 创建主题而不是使用kafka-topic --create --topic ...
.