0

用例:我有具有 messageId 的消息,多条消息可以具有相同的消息 ID,这些消息存在于由 messageId 分区的流式管道(如 kafka)中,因此我确保具有相同 messageId 的所有消息都将进入同一个分区。

因此,我需要编写一个作业,该作业应将消息缓冲一段时间(比如说 1 分钟),然后将具有相同 messageId 的所有消息合并为单个大消息。

我认为可以使用 spark Datasets和 spark sql(或其他东西?)来完成。但是我找不到任何关于如何为给定的消息 id 存储消息一段时间然后对这些消息进行聚合的示例/文档。

4

1 回答 1

0

我认为您正在寻找的是Spark Streaming。Spark 有一个可以链接到 Spark Streaming Context的Kafka 连接器。

这是一个非常基本的示例,它将在 1 分钟的时间间隔内为给定主题集中的所有消息创建一个 RDD,然后按消息 id 字段对它们进行分组(getMessageId当然,您的值序列化程序必须公开这样的方法)。

SparkConf conf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.minutes(1));

Map<String, Object> params = new HashMap<String, Object>() {{
    put("bootstrap.servers", kafkaServers);
    put("key.deserializer", kafkaKeyDeserializer);
    put("value.deserializer", kafkaValueDeserializer);
}};

List<String> topics = new ArrayList<String>() {{
    // Add Topics
}};

JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, params)
    );

stream.foreachRDD(rdd -> rdd.groupBy(record -> record.value().getMessageId()));

ssc.start();
ssc.awaitTermination(); 

还有其他几种方法可以在流 API 中对消息进行分组。查看文档以获取更多示例。

于 2018-02-12T18:59:44.180 回答