1

关于示例 Micronaut/Kafka Streams 应用程序有几个我不明白的细节。这是文档中的示例类(原始链接:https ://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStreams )。

我的问题是:

  • 为什么我们只返回源流?
  • 如果我们有多个源KStream对象,EG 做一个连接,我们是否也需要让它们成为 Bean?
  • 我们是否还需要将每个源都KTable设为 Bean?
  • 如果我们不创建源KStreamKTableBean 会发生什么?我们目前至少有一个项目可以做到这一点,但没有明显的问题。
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

@Factory
public class WordCountStream {

    public static final String STREAM_WORD_COUNT = "word-count";
    public static final String INPUT = "streams-plaintext-input"; 
    public static final String OUTPUT = "streams-wordcount-output"; 
    public static final String WORD_COUNT_STORE = "word-count-store";


    @Singleton
    @Named(STREAM_WORD_COUNT)
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { 
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStream<String, String> source = builder
                .stream(INPUT);

        KTable<String, Long> groupedByWord = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
                //Store the result in a store for lookup later
                .count(Materialized.as(WORD_COUNT_STORE)); 

        groupedByWord
                //convert to stream
                .toStream()
                //send to output using specific serdes
                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));

        return source;
    }

}

编辑:这是我们服务的一个版本,具有多个流,经过编辑以删除识别信息。

@Factory
public class TopologyCopy {
    private static class DataOut {}
    private static class DataInOne {}
    private static class DataInTwo {}
    private static class DataInThree {}

    @Singleton
    @Named("data")
    KStream<Integer, DataOut> dataStream(ConfiguredStreamBuilder builder) {
        Properties props = builder.getConfiguration();
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

        KStream<Integer, DataInOne> dataOneStream = builder.stream("data-one",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInOne.class)));
        KStream<Integer, DataInTwo> dataTwoStream = builder.stream("data-two",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));
        GlobalKTable<Integer, DataInThree> signalTable = builder.globalTable("data-three",
                Consumed.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInThree.class)),
                Materialized.as("data-three-store"));
        KTable<Integer, DataInTwo> dataTwoTable = dataTwoStream
                .groupByKey()
                .aggregate(() -> null, (key, device, storedDevice) -> device,
                        Materialized.with(TextualIntSerde.INSTANCE, new JsonSerde<>(DataInTwo.class)));

        dataOneStream
                .transformValues(() -> /* MAGIC */))
                .join(dataTwoTable, (data1, data2) -> /* MAGIC */)
                .selectKey((something, msg) ->  /* MAGIC */)
                .to("topic-out", Produced.with(Serdes.UUID(), new JsonSerde<>(OutMessage.class)));

        return dataOneStream;
    }

}

4

0 回答 0