0

我有一个由 MongoDB Kafka 连接器生成的 oplog 消息流。我想告诉连接器从给定的时间点开始生成消息。我可以使用管道来做到这一点[在此处记录|https://docs.mongodb.com/kafka-connector/current/kafka-source/。]

我正在尝试准备一个将clusterTime用于此操作但没有运气的查询。clusterTime 是一个 BSON 时间戳。我正在使用 MongoDB v4.0.9。

这是我如何测试它

db.gustotest.insertMany([{ 

    "operationType" : "insert", 

    "clusterTime" : {

        "$timestamp" : {

            "t" : 1634824102.0, 

            "i" : 1.0

        }

    }, 

    "fullDocument" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }, 

        "location" : "Location01", 

        "organizationID" : "123", 

        "created" : {

            "$date" : 1634824102357.0

        }

    }, 

    "ns" : {

        "db" : "warehouse", 

        "coll" : "gustotest"

    }, 

    "documentKey" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }

    }

}

])

当我们至少有一个元素时,我们可以试一试并按 clusterTime 过滤。我尝试了很多方法,比如

 db.gustotest.aggregate( [

   { $addFields:

      {

        convertedDate: { $toDate: {$dateToString:{date:"$clusterTime"}} },

       }

    }

] )

或者

db.gustotest.aggregate( [

   { $match:

      {

        clusterTime: { $gt: Timestamp(0, 0) },

       }

    }

] ) 

有没有其他方法可以在不使用最新版本的$getField情况下实现这一目标?$function

我设法从该字段中提取了纯时间戳,但使用了 $function 功能。不幸的是,MongoDB v5.0 支持它,所以我不能使用它。

db.gustotest.aggregate( [

   { $addFields:

      {

        cTime:

            { $function:

               {

                  body: function(clusterTime) {

                     return clusterTime["$timestamp"].t

                  },

                  args: [ "$clusterTime" ],

                  lang: "js"

               }

            },

       }

    }

] ) 

这是可用于重现问题的最少代码

db.mycollection.insertOne(
  {
    "clusterTime": {
      "$timestamp": {
        "t": 120000001,
        "i": 0
      }
    }
  }
)

db.mycollection.aggregate([
  {
    $match: {
      clusterTime: {
        $gt: Timestamp(0, 0)
      }
    }
  }
])
4

1 回答 1

1

MongoDB 对以美元符号开头的字段名称很挑剔。

您可以使用额外的阶段将该对象转换为数组,以便字段名称成为一个值:

db.gustotest.aggregate([
    {$addFields:{
       date:{$arrayElemAt:[{$objectToArray:"$clusterTime"},0]}
    }},
    {$addFields:{
       date:{$toDate:{$multiply:[1000,"$date.v.t"]}}
    }}
])

编辑 我忘记了这$timestamp是一个特殊的 mongodb 类型指示器。以下是将 Timestamp 类型转换为日期的方法:

{$addFields: {
    date: {
        $dateFromString: {
            dateString: {
                $dateToString: {date: "$clusterTime"}
            }
        }
    }
}}

操场

于 2021-10-26T01:27:31.863 回答