我有一个由 MongoDB Kafka 连接器生成的 oplog 消息流。我想告诉连接器从给定的时间点开始生成消息。我可以使用管道来做到这一点[在此处记录|https://docs.mongodb.com/kafka-connector/current/kafka-source/。]
我正在尝试准备一个将clusterTime
用于此操作但没有运气的查询。clusterTime 是一个 BSON 时间戳。我正在使用 MongoDB v4.0.9。
这是我如何测试它
db.gustotest.insertMany([{
"operationType" : "insert",
"clusterTime" : {
"$timestamp" : {
"t" : 1634824102.0,
"i" : 1.0
}
},
"fullDocument" : {
"_id" : {
"$oid" : "61716fa62b7ffb4a2e01a235"
},
"location" : "Location01",
"organizationID" : "123",
"created" : {
"$date" : 1634824102357.0
}
},
"ns" : {
"db" : "warehouse",
"coll" : "gustotest"
},
"documentKey" : {
"_id" : {
"$oid" : "61716fa62b7ffb4a2e01a235"
}
}
}
])
当我们至少有一个元素时,我们可以试一试并按 clusterTime 过滤。我尝试了很多方法,比如
db.gustotest.aggregate( [
{ $addFields:
{
convertedDate: { $toDate: {$dateToString:{date:"$clusterTime"}} },
}
}
] )
或者
db.gustotest.aggregate( [
{ $match:
{
clusterTime: { $gt: Timestamp(0, 0) },
}
}
] )
有没有其他方法可以在不使用最新版本的$getField
情况下实现这一目标?$function
我设法从该字段中提取了纯时间戳,但使用了 $function 功能。不幸的是,MongoDB v5.0 支持它,所以我不能使用它。
db.gustotest.aggregate( [
{ $addFields:
{
cTime:
{ $function:
{
body: function(clusterTime) {
return clusterTime["$timestamp"].t
},
args: [ "$clusterTime" ],
lang: "js"
}
},
}
}
] )
这是可用于重现问题的最少代码
db.mycollection.insertOne(
{
"clusterTime": {
"$timestamp": {
"t": 120000001,
"i": 0
}
}
}
)
db.mycollection.aggregate([
{
$match: {
clusterTime: {
$gt: Timestamp(0, 0)
}
}
}
])