我试图使用 KStreamBuilder 将数据从一个主题移动到另一个主题。我尝试了以下代码,但有异常
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class StreamsInTopic {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
System.out.println("KStreamBuilder initialized!!");
builder.stream("nil_PF1_P1").to("nil_RF1_P1_1");
System.out.println("Streaming prepared!!");
KafkaStreams streams = new KafkaStreams(builder, props);
System.out.println("KafkaStreams Initialised!!");
streams.start();
System.out.println("Streams started!!");
Thread.sleep(30000L);
streams.close();
System.out.println("Streams closed!!");
}
}
输出 :
KStreamBuilder initialized!!
Streaming prepared!!
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig).
log4j:WARN Please initialize the log4j system properly.
KafkaStreams Initialised!!
Streams started!!
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Streams closed!!
然后我尝试使用数据。
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF1_P1_1 --from-beginning
任何想法?我需要任何额外的配置吗?我正在使用 kafka 0.10.0.0 集群和客户端。
使用的依赖项。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>