Home > Software design >  aggregation with insertion of data from one collection into another
aggregation with insertion of data from one collection into another

Time:01-26

I'm trying to do the following, get a list of chats from the chat collection of a particular user, and add to that list for each chat the last message from the message collection that was sent.

How this works now, I have two methods, they are described below

First I just get a list of chats using the chat member id, and the second method uses aggregation to look for the last message for each chat, then I just match the messages with the id of the chat

collection chat:

type Chat struct {
    ID           string   `json:"id" bson:"id"`
    Participants []string `json:"participants" bson:"participants"`
    LastMessage  *Message `json:"last_message,omitempty" bson:"last_message"`
    ...
}

P.S. LastMessage - is always nil, I only need it to compose the response for the user.

Collection message:

type Message struct {
    ID         string `json:"id" bson:"id"`
    ChatID     string `json:"chat_id" bson:"chat_id"`
    FromID     string `json:"from_id" bson:"from_id"`
    CreateDate int64  `json:"create_date" bson:"create_date"`
    Body     string `json:"body" bson:"body"`
    UpdateAt int64  `json:"update_at" bson:"update_at"`
    ...
}

First method: I need this method to get a list of active chats of a particular chat participant.

func ActiveChats(ctx context.Context, uid string) ([]*Chat, error) {
    ...
    filter := bson.D{primitive.E{Key: "participants", Value: uid}}
    cursor, err := r.col.Find(ctx, filter, nil)
    if err != nil {...}

    var ch []*chat
    if err = cursor.All(ctx, &ch); err != nil {...}

    if err = cursor.Close(ctx); err != nil {...}
    ...
}

Second method: I need this method to get the last message for each chat, the input is an array of chat IDs, and for each I search for the last message if there is one. To do this I use aggregation.

func LastMessages(ctx context.Context, chatIds []string) (map[string]*Message, error) {

    matchStage := bson.D{
        primitive.E{
            Key: "$match", Value: bson.D{
                primitive.E{
                    Key: "chat_id", Value: bson.D{
                        primitive.E{Key: "$in", Value: chatIds},
                    },
                },
            },
        }}
    sortStage := bson.D{primitive.E{Key: "$sort", Value: bson.D{primitive.E{Key: "created", Value: -1}}}}
    groupStage := bson.D{primitive.E{
        Key: "$group", Value: bson.D{
            primitive.E{
                Key: "_id", Value: bson.D{
                    primitive.E{Key: "chat_id", Value: "$chat_id"},
                },
            },
            primitive.E{
                Key: "message", Value: bson.D{
                    primitive.E{Key: "$first", Value: "$$ROOT"},
                },
            },
        },
    }}

    cursor, err := r.colMessage.Aggregate(ctx, mongo.Pipeline{matchStage, groupStage, sortStage})
    if err != nil {...}

    var res []*aggregationResultGenerated
    if err = cursor.All(ctx, &res); err != nil {...}
    ...
}

I know it's a very bad solution, but it's all I've been able to come up with so far, much to my regret(not workig). I try fix this

db.chat.aggregate([
    {
        $match: {
            participants: "participant_id",
    },
    {
        $lookup: {
            from: "message", // other table name
            localField: "id", // name of chat table field
            foreignField: "chat_id", // name of message table field
            as: "msg",
        }
    },
    {
        $unwind: "$msg",
    },
    {
        $match: {
            chat_id : {
                $in: ["$$ROOT._id"],
            },
        },
    },
    {
        $sort: {
            "created": -1,
        },
    },
    {
        $group: {
            "_id": {
                "chat_id": "$chat_id"
            },
            "doc": {
                "$last": "$$ROOT"
            }
        }
    },
    {
        $project: {
            last_message: "$msg",
        }
    }
])

My question is: How can I use aggregation to get a list of chats of a particular user, and for each chat add from the collection message the last message in the field last_message in the object chat? How it works now:

{
    "chats": [
        {
            "id": "4hWsHam3ZZpoyIw44q3D",
            "title": "Chat example",
            "create-date": 1674476855918,
            "participants": [
                "63ce54460aeee5e72c778d90",
                "63ce54460aeee5e72c778d92"
            ],
            "owner_id": "63ce54460aeee5e72c778d90",
            "last_message": {
                "id": "tzwekCiCLSXJ4tfdQuHH",
                "chat_id": "4hWsHam3ZZpoyIw44q3D",
                "from_id": "63ce54460aeee5e72c778d92",
                "create_date": 1674557062031,
                "body": "text",
                "update_at": 0,
                "viewed": false
            },
            "unread": 5
        },
        {
            "id": "Anjrr9RCWFzq030Cwz7S",
            "title": "New chat One",
            "create-date": 1674476909054,
            "participants": [
                "63ce54460aeee5e72c778d90",
                "63ce54460aeee5e72c778d96"
            ],
            "owner_id": "63ce54460aeee5e72c778d90",
            "last_message": {
                "id": "7YqhhS1-EfMRSZtGCH0Z",
                "chat_id": "Anjrr9RCWFzq030Cwz7S",
                "from_id": "63ce54460aeee5e72c778d96",
                "create_date": 1674575017115,
                "body": "text",
                "update_at": 0,
            },
            "unread": 1
        },
    ]
}

CodePudding user response:

Edit: As mentioned by OP in the comment, update/$merge to collection is not necessary.

You can simply do a $sort $limit approach in the sub-pipeline of a $lookup. Do a $unwind to wrangle the lookup result to the last_message field. Finally, do a $merge to update back to the chat collection.

db.chat.aggregate([
  {
    $match: {
      participants: "63ce54460aeee5e72c778d90",
      
    }
  },
  {
    $lookup: {
      from: "message",
      localField: "id",
      foreignField: "chat_id",
      pipeline: [
        {
          $sort: {
            created: -1
          }
        },
        {
          $limit: 1
        }
      ],
      as: "last_message",
      
    }
  },
  {
    $unwind: {
      path: "$last_message",
      preserveNullAndEmptyArrays: true
    }
  },
  {
    $project: {
      last_message: "$last_message"
    }
  }
])

Mongo Playground

Here is an old Mongo Playground with $merge to update to a collection.

  • Related