Home > OS >  Cannot use timestamp from oplog for filtering
Cannot use timestamp from oplog for filtering

Time:10-26

I have a stream of oplog messages produced by MongoDB Kafka connector. I want to tell the connector to start producing messages from a given point in time. I can do it using pipelines [documented here|https://docs.mongodb.com/kafka-connector/current/kafka-source/.]

I'm trying to prepare a query that will use the clusterTime for this operation but with no luck. The clusterTime is a BSON Timestamp. I'm using MongoDB v4.0.9.

Here's how I test it

db.gustotest.insertMany([{ 

    "operationType" : "insert", 

    "clusterTime" : {

        "$timestamp" : {

            "t" : 1634824102.0, 

            "i" : 1.0

        }

    }, 

    "fullDocument" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }, 

        "location" : "Location01", 

        "organizationID" : "123", 

        "created" : {

            "$date" : 1634824102357.0

        }

    }, 

    "ns" : {

        "db" : "warehouse", 

        "coll" : "gustotest"

    }, 

    "documentKey" : {

        "_id" : {

            "$oid" : "61716fa62b7ffb4a2e01a235"

        }

    }

}

])

when we have at least one element, we can give it a try and filter by the clusterTime. I've tried many approaches like

 db.gustotest.aggregate( [

   { $addFields:

      {

        convertedDate: { $toDate: {$dateToString:{date:"$clusterTime"}} },

       }

    }

] )

or

db.gustotest.aggregate( [

   { $match:

      {

        clusterTime: { $gt: Timestamp(0, 0) },

       }

    }

] ) 

Is there any other way I can achieve this without using $getField, $function that are available in the newest versions?

I managed to extract the pure timestamp from this field but used the $function feature. Unfortunately, it's supported by MongoDB v5.0 so I cannot use it.

db.gustotest.aggregate( [

   { $addFields:

      {

        cTime:

            { $function:

               {

                  body: function(clusterTime) {

                     return clusterTime["$timestamp"].t

                  },

                  args: [ "$clusterTime" ],

                  lang: "js"

               }

            },

       }

    }

] ) 

This is the minimal code that can be used to reproduce the problem

db.mycollection.insertOne(
  {
    "clusterTime": {
      "$timestamp": {
        "t": 120000001,
        "i": 0
      }
    }
  }
)

db.mycollection.aggregate([
  {
    $match: {
      clusterTime: {
        $gt: Timestamp(0, 0)
      }
    }
  }
])

CodePudding user response:

MongoDB is finicky about field names that begin with dollar sign.

You an use an extra stage to convert that object to an array so the field name becomes a value:

db.gustotest.aggregate([
    {$addFields:{
       date:{$arrayElemAt:[{$objectToArray:"$clusterTime"},0]}
    }},
    {$addFields:{
       date:{$toDate:{$multiply:[1000,"$date.v.t"]}}
    }}
])
  • Related