Home > Enterprise >  How do I get MongoDB ChangeStreamDocument out as my dto instead of Document type?
How do I get MongoDB ChangeStreamDocument out as my dto instead of Document type?

Time:01-06

I have the following class in my MongoDB project:

@Document(collection = "teams")
public class Team {
    private @MongoId(FieldType.OBJECT_ID)
    @Schema(type = "string", example = "60b0c56e4192f01e8745bd75")
    ObjectId id;
    @Schema(example = "56373")
    private Integer orgId;
    private String name;
    private List<Member> players;
    private List<Member> staff;

    public class Member{
        private ObjectId id;
        private String name;
    }
}

As you can see, this class represents the documents in my teams collection in MongoDB. I'm trying to create a change stream, because I want to monitorize players and staff players joining and leaving teams, as well as existing teams being deleted from the teams collection.

So this is what I've tried:

@Component
public class MongoChangeStream {

    private final MongoTemplate mongoTemplate;

    public MongoDBChangeStream(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @EventListener(ContextRefreshedEvent.class)
    public void changeStream() {

        // Select the collection to query
        MongoCollection<Document> collection = mongoTemplate.getCollection("teams");

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in("operationType",
                                Arrays.asList("insert", "update", "delete"))));

        // Create the Change Stream and watch on the filters in the pipeline
        ChangeStreamIterable<Document> changeStream = collection.watch()
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Document> changeEvent : changeStream) {
            switch (changeEvent.getOperationType().name()) {
                case "UPDATE":
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                        // Do something
                    }
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                        // Do something
                    }
                    break;
                case "DELETE":
                        // Do something
                    break;
            }
        }
    }
}

My question is basically how do I get the documents out from the changeStream as Team objects instead of Document objects?

I've tried to change all the occurrences of Documents with Team, but then I get this error:

Can't find a codec for CodecCacheKey{clazz=class com.test.dto.Team, types=null}.

CodePudding user response:

I ended up making it work like this:

@Document(collection = "teams")
public class Team {
    @Id
    @BsonProperty("_id")
    private ObjectId id;
    private Integer orgId;
    private String name;
    private List<Member> players;
    private List<Member> staff;

    public class Member {
        @Id
        private ObjectId id;
        private String name;
    }
}

Then I added CodecRegistry to my MongoChangeStream class like this:

So this is what I've tried:

@Component
public class MongoChangeStream {

    private final MongoTemplate mongoTemplate;

    public MongoDBChangeStream(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @EventListener(ContextRefreshedEvent.class)
    public void changeStream() {

        // Select the collection to query
        CodecRegistry pojoCodecRegistry = org.bson.codecs.configuration.CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), org.bson.codecs.configuration.CodecRegistries.fromProviders(PojoCodecProvider.builder().conventions(List.of(ANNOTATION_CONVENTION)).automatic(true).build()));
        MongoCollection<Team> collection = mongoTemplate.getCollection("teams").withCodecRegistry(pojoCodecRegistry).withDocumentClass(Group.class);

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in("operationType",
                                Arrays.asList("insert", "update", "delete"))));

        // Create the Change Stream and watch on the filters in the pipeline
        ChangeStreamIterable<Team> changeStream = collection.watch()
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Team> changeEvent : changeStream) {
            switch (changeEvent.getOperationType().name()) {
                case "UPDATE":
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                        // Do something
                    }
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                        // Do something
                    }
                    break;
                case "DELETE":
                        // Do something
                    break;
            }
        }
    }
}
  • Related