0

我正在使用Spark MongoDB 连接器从 mongodb 获取数据..但是我无法使用聚合管道(rdd.withPipeline)获取如何使用 Spark 在 Mongo 上查询。以下是我想要根据时间戳获取记录的代码& 存储在数据框中:

 val appData=MongoSpark.load(spark.sparkContext,readConfig)
val df=appData.withPipeline(Seq(Document.parse("{ $match: { createdAt : { $gt : 2017-01-01 00:00:00 } } }"))).toDF()

这是使用 spark 查询时间戳值的正确方法吗?

4

2 回答 2

0

如评论所述,您可以将扩展 JSON格式用于日期过滤器。

val appDataRDD  = MongoSpark.load(sc)
val filteredRDD = appDataRDD.withPipeline(Seq(Document.parse("{$match:{timestamp:{$gt:{$date:'2017-01-01T00:00:00.000'}}}}")))
filteredRDD.foreach(println)

另请参阅MongoDB Spark 连接器:过滤器和聚合以查看替代过滤器。

于 2017-07-12T01:47:30.720 回答
0

尝试这个:

val pipeline = "{'$match': {'CreationDate':{$gt: {$date:'2020-08-26T00:00:00.000Z'}}}}"

val sourceDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://administrator:password@10.XXXXX:27017/?authSource=admin").option("database","_poc").option("collection", "activity").option("pipeline", pipeline).load()
于 2021-11-09T17:49:54.787 回答