1

我试图使用 KStreamBuilder 将数据从一个主题移动到另一个主题。我尝试了以下代码,但有异常

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class StreamsInTopic {

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();
    System.out.println("KStreamBuilder initialized!!");

    builder.stream("nil_PF1_P1").to("nil_RF1_P1_1");
    System.out.println("Streaming prepared!!");

    KafkaStreams streams = new KafkaStreams(builder, props);
    System.out.println("KafkaStreams Initialised!!");

    streams.start();
    System.out.println("Streams started!!");

    Thread.sleep(30000L);
    streams.close();
    System.out.println("Streams closed!!");
}
}

输出 :

KStreamBuilder initialized!!
Streaming prepared!!
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
KafkaStreams Initialised!!
Streams started!!
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Streams closed!!

然后我尝试使用数据。

$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF1_P1_1 --from-beginning

任何想法?我需要任何额外的配置吗?我正在使用 kafka 0.10.0.0 集群和客户端。

使用的依赖项。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.0.0</version>
</dependency>
4

2 回答 2

2

查看您在问题中分享的内容,问题似乎是您没有在输入主题“nil_PF1_P1”中写入(=产生)任何数据:

  • Kafka Streams 应用程序配置为将数据从 Kafka 输入主题“nil_PF1_P1”写入 Kafka 输出主题“nil_RF1_P1_1”。
  • 控制台使用者从(应用程序的输出主题)“nil_RF1_P1_1”读取任何数据。
  • 但是您没有提及是否或如何将数据输入输入主题“nil_PF1_P1”。

另外:您将立即在代码中关闭 Kafka Streams 实例:

streams.start();
System.out.println("Streams started!!");

//Thread.sleep(1000L);
streams.close();

这不会给应用程序足够的时间来实际执行任何处理。通常,您只会调用上面streams.start()main方法,并在您的 Java 应用程序中注册一个关闭钩子,该钩子会streams.close()在被触发时调用。

出于测试/开发目的,您当然也可以streams.close()从 within调用main(),但是我会增加 start 和 close 之间的睡眠时间(例如尝试 30 秒而不是 1 秒) - 当然您还需要确保您实际上是在该时间窗口内将一些数据写入应用程序的输入主题。

编辑:错误的原因java.lang.IllegalArgumentException: Invalid timestamp -1很可能是您一直在使用非 0.10 生产者将数据写入输入主题。详细信息在http://docs.confluent.io/current/streams/faq.html#invalid-timestamp-exception进行了解释。

于 2016-07-26T17:34:44.380 回答
1

Kafka Stream 是第一个版本发布0.10,因此要求写入主题的所有记录都具有关联的时间戳(键和值旁边的附加字段,在 v0.10 中引入)。对于 Streams,此时间戳不能为负(即使代理不检查此并允许插入具有负时间戳的数据)。

因此,使用较旧的 Java 生产者(即 0.10 之前的生产者)编写的主题可能会出现缺少时间戳字段的记录。也有可能,您使用“旧”主题,即写入 0.9 代理的主题,然后您将代理升级到 0.10——所有这些消息都不会设置时间戳。出于兼容性原因,KafkaConsumer(v0.10) 将缺少的时间戳设置为 value -1

在 Kafka Streams 中,在内部,来自输入消息的时间戳被“转发”到输出消息,因此,如果您使用没有时间戳的消息,Kafka Streams 会尝试将带有时间戳的消息-1写入输出主题,从而导致上述错误。(Kafka Streams 使用 0.10 Java 生产者检查时间戳是否有效,并为负时间戳值抛出上述异常)。

为避免此问题,您需要通过流配置参数更改使用的时间戳提取器timestamp.extractor(请参阅http://docs.confluent.io/3.0.0/streams/developer-guide.html#optional-configuration-parameters)。根据您的语义,您可以使用WallclockTimestampExtractor或提供自定义提取器。

于 2016-08-29T23:55:29.057 回答