聚合框架确实将成为您完成此任务的伙伴。通过汇总,它通过使用过滤器,石斑鱼,分拣机,转换和其他操作员的多级流水线来将集合蒸馏到基本信息。所提取的结果集比其他技术的效率要高得多。
为了您上面使用的情况下,聚合包含了一系列适用于收集特殊的运营商,称为管道:
[
{ "$match": { /* options */ } },
{ "$lookup": { /* options */ } },
{ "$group": { /* options */ } }
]
当执行管道,MongoDB的管道运营商相互转化。这里的“管道”采用Linux的含义:操作员的输出成为后面的操作员的输入。每个操作员的结果都是一个新的文档集合。所以蒙戈执行以前的管道如下:
collection | $match | $lookup | $group => result
现在,应用上述到你的任务,你需要在聚集$match
管道一步为你的第一个阶段,因为它可以让你用过滤文档流只有匹配的文件通过未经修改才能进入下一个管道阶段。所以,如果你运行的Conversation
模型只是 $match
查询对象的集合,你会得到一个特定的用户在文件:
Conversation.aggregate([
{ "$match": { "participants.type": userId } }
]).exec(function(err, result){
if (err) throw err;
console.log(result);
})
可以通过管道另一家运营商,在这种情况下,你需要的$lookup
操作者执行一个左外连接到messages
收集在同一数据库中从“接合”收集处理中的文档进行过滤:
Conversation.aggregate([
{ "$match": { "participants.type": userId } },
{
"$lookup": {
"from": "messages",
"localfield": "_id",
"foreignField": "conversation",
"as": "messages"
}
}
]).exec(function(err, result){
if (err) throw err;
console.log(result);
})
这将输出以控制conversation
文档中的字段以及一个名为“messages”的新数组字段,其元素是来自“joined”messages
集合的匹配文档。 $lookup
阶段将这些重新塑造的文档传递到下一个阶段。
现在您已将用户的conversations
与messages
一起,然后您如何选择ONLY
来自每个对话的最新消息?
这可以通过$group
管道运营商可以实现,但是从以前的管道, 应用$group
操作上的文件之前,你需要拼合messages
阵列,以获得最新的文档和$unwind
运算符将为您解构数组元素。
当$unwind
操作者上的阵列字段施加,这将产生用于每一个列表中的每个元素在其上$unwind
施加一个新的记录,并且:
Conversation.aggregate([
{ "$match": { "participants.type": userId } },
{
"$lookup": {
"from": "messages",
"localfield": "_id",
"foreignField": "conversation",
"as": "messages"
}
},
{ "$unwind": "$messages" }
]).exec(function(err, result){
if (err) throw err;
console.log(result);
})
假设你MessageSchema
具有时间戳字段(例如createdAt
)表示邮件发送的日期时间,您可以按字段按降序重新排序文档,以便处理到下一个管道。该$sort
运营商非常适合这样的:
Conversation.aggregate([
{ "$match": { "participants.type": userId } },
{
"$lookup": {
"from": "messages",
"localfield": "_id",
"foreignField": "conversation",
"as": "messages"
}
},
{ "$unwind": "$messages" },
{ "$sort": { "messages.createdAt": -1 } }
]).exec(function(err, result){
if (err) throw err;
console.log(result);
})
已经去归一化的文件,然后你可以组的数据进行处理。组管道运算符类似于SQL的GROUP BY
子句。在SQL中,除非使用任何聚合函数(称为accumulators),否则不能使用GROUP BY
。同样的,你也必须在MongoDB中使用聚合函数。的MongoDB由_id
字段标识只与_id
字段和在这种情况下组的文件的分组表达式:
Conversation.aggregate([
{ "$match": { "participants.type": userId } },
{
"$lookup": {
"from": "messages",
"localfield": "_id",
"foreignField": "conversation",
"as": "messages"
}
},
{ "$unwind": "$messages" },
{ "$sort": { "messages.createdAt": -1 } }
{
"$group": {
"_id": "$_id",
"name": { "$first": "$name" },
"participants": { "$first": "$participants" },
"latestMessage": { "$first": "$message" }
}
}
]).exec(function(err, result){
if (err) throw err;
console.log(result);
})
在上文中,我们感兴趣的是$first
组累加器操作者因为它返回从一个值每组的第一份文件。由于前一个管线操作员按降序对文档进行排序,所以将为您提供LATEST
消息。
因此,运行上面的最后一次聚合操作会给你想要的结果。这假设MessageSchema
有一个时间戳,这对确定最新消息是必不可少的,否则它将只能运行到步骤$unwind
步骤。