取决于您使用的是 Kafka Streams DSL 还是 Processor API:
Kafka Streams DSL 您可以KStream#to()
用来实现KStream
主题。这是将数据具体化为主题的规范方法。或者,您可以使用KStream#through()
. 这也会将数据具体化为主题,但也会返回结果KStream
以供进一步使用。#to()
和之间的唯一区别#through()
是,KStreamBuilder#stream()
如果您希望将生成的物化分区作为KStream
.
处理器 API通过将数据转发到接收器处理器,您可以将数据具体化到分区。
无论哪种方式,需要注意的重要一点是,在您使用上述方法之一写入分区之前,数据不会具体化到主题。map()
, filter()
, 等不实现数据。数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现。
要制作成 Kafka Streams:
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());
接着:
Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))
你会需要:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>