我认为您正在寻找的是Spark Streaming。Spark 有一个可以链接到 Spark Streaming Context的Kafka 连接器。
这是一个非常基本的示例,它将在 1 分钟的时间间隔内为给定主题集中的所有消息创建一个 RDD,然后按消息 id 字段对它们进行分组(getMessageId
当然,您的值序列化程序必须公开这样的方法)。
SparkConf conf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.minutes(1));
Map<String, Object> params = new HashMap<String, Object>() {{
put("bootstrap.servers", kafkaServers);
put("key.deserializer", kafkaKeyDeserializer);
put("value.deserializer", kafkaValueDeserializer);
}};
List<String> topics = new ArrayList<String>() {{
// Add Topics
}};
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, params)
);
stream.foreachRDD(rdd -> rdd.groupBy(record -> record.value().getMessageId()));
ssc.start();
ssc.awaitTermination();
还有其他几种方法可以在流 API 中对消息进行分组。查看文档以获取更多示例。