MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

MongoDB聚合管道操作基础与实例

2021-02-142.7k 阅读

MongoDB聚合管道操作基础

聚合管道概念

在 MongoDB 中,聚合(Aggregation)是一种强大的数据处理机制,用于对集合中的文档进行复杂的数据处理和分析。聚合管道(Aggregation Pipeline)是实现这种功能的关键概念。它类似于 Unix 中的管道机制,将一系列的数据处理操作连接在一起,每个操作对输入数据进行处理,并将结果作为下一个操作的输入。这种链式处理方式允许我们以一种非常灵活和高效的方式对数据进行各种统计、分析和转换。

例如,我们有一个存储用户购买记录的集合,每个文档包含用户 ID、购买金额、购买时间等字段。通过聚合管道,我们可以轻松计算每个用户的总购买金额、平均购买金额,或者按时间段统计购买次数等。

聚合管道操作符

  1. $match

    • 功能:用于筛选文档,只允许符合指定条件的文档通过管道进行下一步处理。它的作用类似于查询语句中的 find 方法,但在聚合管道中使用。
    • 语法
    { $match: { <query> } }
    
    • 示例:假设我们有一个 orders 集合,每个文档包含 amount(订单金额)和 status(订单状态)字段。我们要筛选出金额大于 100 且状态为 “completed” 的订单。
    db.orders.aggregate([
        {
            $match: {
                amount: { $gt: 100 },
                status: "completed"
            }
        }
    ]);
    
  2. $group

    • 功能:按指定的字段对文档进行分组,并对每个组进行累加、平均等聚合操作。它通常与聚合表达式一起使用,如 $sum$avg 等。
    • 语法
    { $group: { _id: <expression>, <field1>: { <accumulator1>: <expression1> },... } }
    
    • 示例:继续以 orders 集合为例,我们要按用户 ID 分组,并计算每个用户的订单总金额。假设文档中有 userIdamount 字段。
    db.orders.aggregate([
        {
            $group: {
                _id: "$userId",
                totalAmount: { $sum: "$amount" }
            }
        }
    ]);
    

    这里 _id 字段指定了分组依据,$sum 是聚合表达式,用于累加 amount 字段的值。

  3. $project

    • 功能:用于修改输出文档的结构,例如重命名字段、只选择某些字段、添加新字段等。
    • 语法
    { $project: { <field1>: <expression1>, <field2>: <expression2>,... } }
    
    • 示例:对于 orders 集合,我们只想输出订单的 _idamount 和一个新计算的含税金额字段(假设税率为 10%)。
    db.orders.aggregate([
        {
            $project: {
                _id: 1,
                amount: 1,
                taxIncludedAmount: { $multiply: ["$amount", 1.1] }
            }
        }
    ]);
    

    这里 1 表示包含该字段,$multiply 是表达式,用于计算含税金额。

  4. $sort

    • 功能:按指定的字段对文档进行排序,可以按升序(1)或降序(-1)排列。
    • 语法
    { $sort: { <field1>: <sortOrder1>, <field2>: <sortOrder2>,... } }
    
    • 示例:对于按用户 ID 分组并计算总金额后的结果,我们想按总金额降序排列。
    db.orders.aggregate([
        {
            $group: {
                _id: "$userId",
                totalAmount: { $sum: "$amount" }
            }
        },
        {
            $sort: {
                totalAmount: -1
            }
        }
    ]);
    
  5. $limit

    • 功能:限制输出结果的文档数量。
    • 语法
    { $limit: <number> }
    
    • 示例:我们只想获取按总金额降序排列后的前 10 个用户的结果。
    db.orders.aggregate([
        {
            $group: {
                _id: "$userId",
                totalAmount: { $sum: "$amount" }
            }
        },
        {
            $sort: {
                totalAmount: -1
            }
        },
        {
            $limit: 10
        }
    ]);
    
  6. $skip

    • 功能:跳过指定数量的文档,通常与 $limit 一起使用来实现分页。
    • 语法
    { $skip: <number> }
    
    • 示例:如果我们想获取按总金额降序排列后的第 11 到 20 个用户的结果。
    db.orders.aggregate([
        {
            $group: {
                _id: "$userId",
                totalAmount: { $sum: "$amount" }
            }
        },
        {
            $sort: {
                totalAmount: -1
            }
        },
        {
            $skip: 10
        },
        {
            $limit: 10
        }
    ]);
    

MongoDB聚合管道实例

实例一:统计商品销售数据

假设我们有一个 sales 集合,用于记录商品的销售记录。每个文档包含以下字段:

  • productId:商品 ID
  • quantity:销售数量
  • price:商品单价
  • saleDate:销售日期
  1. 计算每种商品的总销售金额
    • 思路:使用 $group 操作符按 productId 分组,然后通过 $sum 表达式计算每个组的总销售金额(quantity * price)。
    • 代码示例
    db.sales.aggregate([
        {
            $group: {
                _id: "$productId",
                totalSalesAmount: {
                    $sum: { $multiply: ["$quantity", "$price"] }
                }
            }
        }
    ]);
    
  2. 按销售日期统计销售总额,并按日期升序排列
    • 思路:先使用 $groupsaleDate 分组,计算每个日期的销售总额。然后使用 $sortsaleDate 升序排列。
    • 代码示例
    db.sales.aggregate([
        {
            $group: {
                _id: "$saleDate",
                dailyTotalSales: {
                    $sum: { $multiply: ["$quantity", "$price"] }
                }
            }
        },
        {
            $sort: {
                _id: 1
            }
        }
    ]);
    
  3. 找出销售金额最高的前 5 种商品
    • 思路:首先按 productId 分组计算每种商品的总销售金额,然后使用 $sort 按总销售金额降序排列,最后使用 $limit 只取前 5 个结果。
    • 代码示例
    db.sales.aggregate([
        {
            $group: {
                _id: "$productId",
                totalSalesAmount: {
                    $sum: { $multiply: ["$quantity", "$price"] }
                }
            }
        },
        {
            $sort: {
                totalSalesAmount: -1
            }
        },
        {
            $limit: 5
        }
    ]);
    

实例二:分析用户行为数据

假设有一个 userActions 集合,记录用户的操作行为。每个文档包含以下字段:

  • userId:用户 ID
  • actionType:操作类型(如 “login”、“purchase”、“view” 等)
  • actionTime:操作时间
  1. 统计每个用户的操作次数
    • 思路:使用 $groupuserId 分组,然后通过 $sum 表达式(值为 1)来统计每个用户的操作次数。
    • 代码示例
    db.userActions.aggregate([
        {
            $group: {
                _id: "$userId",
                actionCount: {
                    $sum: 1
                }
            }
        }
    ]);
    
  2. 按操作类型统计操作次数,并按次数降序排列
    • 思路:先使用 $groupactionType 分组,计算每个操作类型的次数。然后使用 $sort 按次数降序排列。
    • 代码示例
    db.userActions.aggregate([
        {
            $group: {
                _id: "$actionType",
                actionCount: {
                    $sum: 1
                }
            }
        },
        {
            $sort: {
                actionCount: -1
            }
        }
    ]);
    
  3. 找出最近一周内登录次数最多的前 10 个用户
    • 思路:首先使用 $match 筛选出最近一周内的操作记录(假设当前日期为 new Date(),可以通过日期计算获取一周前的日期)。然后按 userId 分组并统计登录次数(只统计 actionType 为 “login” 的记录)。接着按登录次数降序排列,最后取前 10 个用户。
    • 代码示例
    var oneWeekAgo = new Date(new Date().getTime() - 7 * 24 * 60 * 60 * 1000);
    db.userActions.aggregate([
        {
            $match: {
                actionTime: { $gte: oneWeekAgo },
                actionType: "login"
            }
        },
        {
            $group: {
                _id: "$userId",
                loginCount: {
                    $sum: 1
                }
            }
        },
        {
            $sort: {
                loginCount: -1
            }
        },
        {
            $limit: 10
        }
    ]);
    

实例三:处理嵌套数据结构

假设我们有一个 employees 集合,每个员工文档包含以下结构:

{
    "_id": ObjectId("64010256825d36c3e205d184"),
    "name": "John Doe",
    "departments": [
        {
            "departmentName": "HR",
            "projects": [
                {
                    "projectName": "Employee Onboarding",
                    "hoursWorked": 40
                },
                {
                    "projectName": "Performance Review",
                    "hoursWorked": 20
                }
            ]
        },
        {
            "departmentName": "IT",
            "projects": [
                {
                    "projectName": "Server Upgrade",
                    "hoursWorked": 30
                }
            ]
        }
    ]
}
  1. 计算每个员工在所有项目中的总工作时长
    • 思路:这里需要使用 $unwind 操作符来展开嵌套的数组。先展开 departments 数组,然后再展开每个部门的 projects 数组。最后使用 $group 按员工 _id 分组并计算总工作时长。
    • 代码示例
    db.employees.aggregate([
        {
            $unwind: "$departments"
        },
        {
            $unwind: "$departments.projects"
        },
        {
            $group: {
                _id: "$_id",
                totalHoursWorked: {
                    $sum: "$departments.projects.hoursWorked"
                }
            }
        }
    ]);
    
  2. 按部门统计所有员工的总工作时长
    • 思路:同样先展开 departments 数组,然后使用 $group 按部门名称分组,并计算总工作时长。
    • 代码示例
    db.employees.aggregate([
        {
            $unwind: "$departments"
        },
        {
            $group: {
                _id: "$departments.departmentName",
                totalHoursWorked: {
                    $sum: "$departments.projects.hoursWorked"
                }
            }
        }
    ]);
    

聚合管道中的表达式

  1. 算术表达式
    • $add:用于加法运算。例如,我们想在商品销售数据中,计算每件商品的含税总价(假设税率为 10%),可以在 $project 阶段使用:
    db.sales.aggregate([
        {
            $project: {
                _id: 1,
                productId: 1,
                taxIncludedPrice: {
                    $add: [
                        "$price",
                        { $multiply: ["$price", 0.1] }
                    ]
                }
            }
        }
    ]);
    
    • $subtract:减法运算。比如,我们有一个记录成本和售价的集合,要计算每件商品的利润,可以使用:
    db.products.aggregate([
        {
            $project: {
                _id: 1,
                productName: 1,
                profit: {
                    $subtract: ["$sellingPrice", "$costPrice"]
                }
            }
        }
    ]);
    
    • $multiply:乘法运算。在前面计算销售总额时已经使用过,如 { $multiply: ["$quantity", "$price"] }
    • $divide:除法运算。例如,我们想计算每个用户的平均购买金额,可以在按用户分组计算总金额后,再计算平均金额:
    db.orders.aggregate([
        {
            $group: {
                _id: "$userId",
                totalAmount: { $sum: "$amount" },
                orderCount: { $sum: 1 }
            }
        },
        {
            $project: {
                _id: 1,
                averageAmount: {
                    $divide: ["$totalAmount", "$orderCount"]
                }
            }
        }
    ]);
    
  2. 逻辑表达式
    • $eq:判断两个值是否相等。例如,在用户操作数据中,我们想筛选出操作类型为 “login” 的记录,可以在 $match 阶段使用:
    db.userActions.aggregate([
        {
            $match: {
                actionType: { $eq: "login" }
            }
        }
    ]);
    
    • $gt$gte$lt$lte:大于、大于等于、小于、小于等于比较。在筛选订单金额大于 100 的记录时,如 { $match: { amount: { $gt: 100 } } }
    • $and$or$not:逻辑与、逻辑或、逻辑非。假设我们要筛选出金额大于 100 且状态为 “completed” 或者金额大于 200 且状态为 “processing” 的订单,可以这样写:
    db.orders.aggregate([
        {
            $match: {
                $or: [
                    {
                        $and: [
                            { amount: { $gt: 100 } },
                            { status: "completed" }
                        ]
                    },
                    {
                        $and: [
                            { amount: { $gt: 200 } },
                            { status: "processing" }
                        ]
                    }
                ]
            }
        }
    ]);
    

聚合管道的优化

  1. 合理使用索引
    • $match 阶段,如果筛选条件的字段上有索引,查询效率会大大提高。例如,在 orders 集合中,如果经常按 amount 字段筛选,为 amount 字段创建索引:
    db.orders.createIndex({ amount: 1 });
    
    这样在 $match 阶段 { $match: { amount: { $gt: 100 } } } 的操作会利用索引快速定位符合条件的文档。
  2. 操作顺序优化
    • 尽量将 $match 操作放在管道的前面,这样可以在数据量较大时,快速筛选出少量符合条件的数据,减少后续操作的数据量。例如,在统计商品销售数据时,如果我们要先筛选出某个特定时间段内的销售记录,再进行分组计算等操作,应将 $match 放在 $group 之前:
    var startDate = new Date("2023 - 01 - 01");
    var endDate = new Date("2023 - 12 - 31");
    db.sales.aggregate([
        {
            $match: {
                saleDate: { $gte: startDate, $lte: endDate }
            }
        },
        {
            $group: {
                _id: "$productId",
                totalSalesAmount: {
                    $sum: { $multiply: ["$quantity", "$price"] }
                }
            }
        }
    ]);
    
  3. 减少 $unwind 的使用
    • $unwind 操作会将数组展开,生成大量的中间文档,增加内存和计算资源的消耗。如果可能,尽量通过其他方式实现相同的功能。例如,在计算嵌套数据结构中的总工作时长时,如果数据量较大,可以考虑在应用层进行处理,而不是完全依赖数据库的聚合管道。

聚合管道与 Map - Reduce 的比较

  1. 性能方面
    • 聚合管道:在大多数情况下性能更好,尤其是对于简单到中等复杂度的聚合操作。它采用流水线方式处理数据,每个阶段可以直接对前一阶段的输出进行操作,减少了中间数据的存储和读取开销。
    • Map - Reduce:对于非常复杂的计算任务,特别是需要大量自定义逻辑的情况,可能更适合。但由于它涉及到 Map 和 Reduce 两个阶段,并且需要将中间结果写入磁盘,性能相对较低,特别是在数据量较大时。
  2. 编程复杂度
    • 聚合管道:使用预定义的操作符,语法相对简洁,易于理解和编写。例如,统计商品销售总额只需要简单的 $group$sum 操作。
    • Map - Reduce:需要编写自定义的 Map 和 Reduce 函数,编程复杂度较高。Map 函数负责将输入文档转换为键值对,Reduce 函数负责对键值对进行合并和计算,这需要更多的编程技巧和对数据处理逻辑的深入理解。
  3. 适用场景
    • 聚合管道:适用于常见的数据分析任务,如分组统计、筛选、排序等。例如,统计用户行为数据、商品销售数据等。
    • Map - Reduce:适用于需要复杂数据转换和自定义计算逻辑的场景,如文本分析、复杂的统计模型计算等。例如,在处理大量文本数据,统计每个单词的出现频率,并进行一些复杂的词频分析时,Map - Reduce 可能更合适。

通过对 MongoDB 聚合管道操作基础和实例的深入了解,以及与其他数据处理方式的比较,我们可以根据实际需求,更灵活、高效地利用聚合管道来处理和分析 MongoDB 中的数据,从而为业务决策提供有力支持。无论是简单的统计分析,还是复杂的嵌套数据处理,聚合管道都提供了强大的功能和灵活的操作方式。在实际应用中,合理优化聚合管道,结合索引等技术,可以进一步提升数据处理的性能和效率。