4

我是 spark 和 kafka 的新手,我对使用 kafka 的 spark 流的使用模式略有不同。我在用

spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1

连续事件数据正在流式传输到我需要从多个火花流应用程序处理的 kafka 主题。但是当我运行 spark 流应用程序时,只有其中一个接收到数据。

     Map<String, Object> kafkaParams = new HashMap<String, Object>();

     kafkaParams.put("bootstrap.servers", "localhost:9092");
     kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     kafkaParams.put("auto.offset.reset", "latest");
     kafkaParams.put("group.id", "test-consumer-group");
     kafkaParams.put("enable.auto.commit", "true");
     kafkaParams.put("auto.commit.interval.ms", "1000");
     kafkaParams.put("session.timeout.ms", "30000");

     Collection<String> topics =  Arrays.asList("4908100105999_000005");;
     JavaInputDStream<ConsumerRecord<String, String>> stream =  org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams) );

      ... //spark processing

我有两个火花流应用程序,通常我提交的第一个使用 kafka 消息。第二个应用程序只是等待消息并且永远不会继续。正如我所读到的,可以从多个消费者订阅 kafka 主题,火花流不是这样吗?或者我在 kafka 主题及其配置方面缺少什么?

提前致谢 。

4

2 回答 2

2

您可以使用相同的 groupid 创建不同的流。以下是 0.8 集成的在线文档中的更多详细信息,有两种方法:

方法 1:基于接收器的方法

可以使用不同的组和主题创建多个 Kafka 输入 DStream,以便使用多个接收器并行接收数据。

方法 2:直接方法(无接收器)

无需创建多个输入 Kafka 流并将它们合并。使用 directStream,Spark Streaming 将创建与要使用的 Kafka 分区一样多的 RDD 分区,这些分区都将从 Kafka 并行读取数据。所以Kafka和RDD分区之间是一一对应的,更容易理解和调优。

您可以在Spark Streaming + Kafka 集成指南 0.8中阅读更多内容

从您的代码看起来您​​使用的是 0.10,请参阅Spark Streaming + Kafka 集成指南(Kafka 代理版本 0.10.0

即使它使用的是 spark 流 api,一切都由 kafka 属性控制,因此取决于您在属性文件中指定的组 id,您可以使用不同的组 id 启动多个流。

干杯!

于 2017-08-29T18:14:00.703 回答
1

消费者数量【在一个消费者组下】,不能超过主题中的分区数量。如果要并行消费消息,则需要引入合适数量的分区并创建接收器来处理每个分区。

于 2017-10-25T09:57:10.827 回答