试图从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流。kafka(0.10.0.0 版)火花流(2.0.1 版)
问问题
3710 次
2 回答
2
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "your.server.com:9092")
.option("subscribe", "your-topic")
.load()
.select($"timestamp", $"value")
字段“时间戳”是您正在寻找的。类型 - java.sql.Timestamp。确保您连接到 0.10 Kafka 服务器。早期版本没有时间戳。此处描述的完整字段列表 - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
于 2017-09-18T15:17:28.167 回答
1
我建议几件事:
假设您通过最新的Kafka Streaming Api (0.10 Kafka)创建一个流
例如,您使用依赖项:
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
根据上面的文档,比你创建一个流:
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "spark-streaming-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val sparkConf = new SparkConf() // suppose you have 60 second window val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, Array[Byte]](topics, kafkaParams))
您的流将是ConsumerRecord[String,Array[Byte]]的 DStream,您可以获得时间戳和键值,如下所示:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
希望有帮助。
于 2017-02-26T13:09:17.100 回答