I have a stream of oplog messages produced by MongoDB Kafka connector. I want to tell the connector to start producing messages from a given point in time. I can do it using pipelines [documented here|https://docs.mongodb.com/kafka-connector/current/kafka-source/.]
I'm trying to prepare a query that will use the clusterTime
for this operation but with no luck. The clusterTime is a BSON Timestamp. I'm using MongoDB v4.0.9.
Here's how I test it
db.gustotest.insertMany([{
"operationType" : "insert",
"clusterTime" : {
"$timestamp" : {
"t" : 1634824102.0,
"i" : 1.0
}
},
"fullDocument" : {
"_id" : {
"$oid" : "61716fa62b7ffb4a2e01a235"
},
"location" : "Location01",
"organizationID" : "123",
"created" : {
"$date" : 1634824102357.0
}
},
"ns" : {
"db" : "warehouse",
"coll" : "gustotest"
},
"documentKey" : {
"_id" : {
"$oid" : "61716fa62b7ffb4a2e01a235"
}
}
}
])
when we have at least one element, we can give it a try and filter by the clusterTime. I've tried many approaches like
db.gustotest.aggregate( [
{ $addFields:
{
convertedDate: { $toDate: {$dateToString:{date:"$clusterTime"}} },
}
}
] )
or
db.gustotest.aggregate( [
{ $match:
{
clusterTime: { $gt: Timestamp(0, 0) },
}
}
] )
Is there any other way I can achieve this without using $getField
, $function
that are available in the newest versions?
I managed to extract the pure timestamp from this field but used the $function feature. Unfortunately, it's supported by MongoDB v5.0 so I cannot use it.
db.gustotest.aggregate( [
{ $addFields:
{
cTime:
{ $function:
{
body: function(clusterTime) {
return clusterTime["$timestamp"].t
},
args: [ "$clusterTime" ],
lang: "js"
}
},
}
}
] )
This is the minimal code that can be used to reproduce the problem
db.mycollection.insertOne(
{
"clusterTime": {
"$timestamp": {
"t": 120000001,
"i": 0
}
}
}
)
db.mycollection.aggregate([
{
$match: {
clusterTime: {
$gt: Timestamp(0, 0)
}
}
}
])
CodePudding user response:
MongoDB is finicky about field names that begin with dollar sign.
You an use an extra stage to convert that object to an array so the field name becomes a value:
db.gustotest.aggregate([
{$addFields:{
date:{$arrayElemAt:[{$objectToArray:"$clusterTime"},0]}
}},
{$addFields:{
date:{$toDate:{$multiply:[1000,"$date.v.t"]}}
}}
])