我是 spark 和 kafka 的新手,我对使用 kafka 的 spark 流的使用模式略有不同。我在用
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
连续事件数据正在流式传输到我需要从多个火花流应用程序处理的 kafka 主题。但是当我运行 spark 流应用程序时,只有其中一个接收到数据。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("group.id", "test-consumer-group");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
Collection<String> topics = Arrays.asList("4908100105999_000005");;
JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams) );
... //spark processing
我有两个火花流应用程序,通常我提交的第一个使用 kafka 消息。第二个应用程序只是等待消息并且永远不会继续。正如我所读到的,可以从多个消费者订阅 kafka 主题,火花流不是这样吗?或者我在 kafka 主题及其配置方面缺少什么?
提前致谢 。