I have the following class in my MongoDB project:
@Document(collection = "teams")
public class Team {
private @MongoId(FieldType.OBJECT_ID)
@Schema(type = "string", example = "60b0c56e4192f01e8745bd75")
ObjectId id;
@Schema(example = "56373")
private Integer orgId;
private String name;
private List<Member> players;
private List<Member> staff;
public class Member{
private ObjectId id;
private String name;
}
}
As you can see, this class represents the documents in my teams
collection in MongoDB.
I'm trying to create a change stream, because I want to monitorize players and staff players joining and leaving teams, as well as existing teams being deleted from the teams collection.
So this is what I've tried:
@Component
public class MongoChangeStream {
private final MongoTemplate mongoTemplate;
public MongoDBChangeStream(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@EventListener(ContextRefreshedEvent.class)
public void changeStream() {
// Select the collection to query
MongoCollection<Document> collection = mongoTemplate.getCollection("teams");
// Create pipeline for operationType filter
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update", "delete"))));
// Create the Change Stream and watch on the filters in the pipeline
ChangeStreamIterable<Document> changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
// Iterate over the Change Stream
for (ChangeStreamDocument<Document> changeEvent : changeStream) {
switch (changeEvent.getOperationType().name()) {
case "UPDATE":
if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
// Do something
}
if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
// Do something
}
break;
case "DELETE":
// Do something
break;
}
}
}
}
My question is basically how do I get the documents out from the changeStream as Team objects instead of Document objects?
I've tried to change all the occurrences of Documents with Team, but then I get this error:
Can't find a codec for CodecCacheKey{clazz=class com.test.dto.Team, types=null}.
CodePudding user response:
I ended up making it work like this:
@Document(collection = "teams")
public class Team {
@Id
@BsonProperty("_id")
private ObjectId id;
private Integer orgId;
private String name;
private List<Member> players;
private List<Member> staff;
public class Member {
@Id
private ObjectId id;
private String name;
}
}
Then I added CodecRegistry to my MongoChangeStream
class like this:
So this is what I've tried:
@Component
public class MongoChangeStream {
private final MongoTemplate mongoTemplate;
public MongoDBChangeStream(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@EventListener(ContextRefreshedEvent.class)
public void changeStream() {
// Select the collection to query
CodecRegistry pojoCodecRegistry = org.bson.codecs.configuration.CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), org.bson.codecs.configuration.CodecRegistries.fromProviders(PojoCodecProvider.builder().conventions(List.of(ANNOTATION_CONVENTION)).automatic(true).build()));
MongoCollection<Team> collection = mongoTemplate.getCollection("teams").withCodecRegistry(pojoCodecRegistry).withDocumentClass(Group.class);
// Create pipeline for operationType filter
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update", "delete"))));
// Create the Change Stream and watch on the filters in the pipeline
ChangeStreamIterable<Team> changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
// Iterate over the Change Stream
for (ChangeStreamDocument<Team> changeEvent : changeStream) {
switch (changeEvent.getOperationType().name()) {
case "UPDATE":
if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
// Do something
}
if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
// Do something
}
break;
case "DELETE":
// Do something
break;
}
}
}
}