MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

MongoDB使用insertMany方法批量插入文档技巧

2022-08-054.3k 阅读

MongoDB 之 insertMany 方法批量插入文档技巧

1. 了解 insertMany 基本语法

在 MongoDB 中,insertMany 方法用于向集合中批量插入多个文档。其基本语法如下:

db.collection.insertMany(
   [ <document 1> , <document 2>,... ],
   {
     writeConcern: <document>,
     ordered: <boolean>
   }
)

其中,第一个参数是一个包含多个文档的数组,这些文档将被插入到指定的集合中。第二个参数是一个可选的选项文档。

  • writeConcern:用于指定写入操作的安全级别和确认模式。例如,{w: "majority", j: true, wtimeout: 1000}w 表示需要多少个副本集成员确认写入操作,majority 表示大多数成员;j 表示是否等待写入操作持久化到磁盘;wtimeout 表示等待确认的最长时间(以毫秒为单位)。
  • ordered:是一个布尔值,默认为 true。当 orderedtrue 时,insertMany 方法会按顺序插入文档。如果在插入过程中遇到错误,例如违反唯一索引约束,插入操作将立即停止,后续文档不会被插入。当 orderedfalse 时,insertMany 方法会并行插入文档,即使某个文档插入失败,其他文档的插入操作仍会继续进行。

2. 简单示例:有序插入

假设我们有一个名为 students 的集合,我们想要批量插入多个学生的信息。首先,我们创建连接并选择集合:

const { MongoClient } = require('mongodb');
const uri = "mongodb://localhost:27017";
const client = new MongoClient(uri);

async function insertStudents() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Alice', age: 20, grade: 'A' },
            { name: 'Bob', age: 21, grade: 'B' },
            { name: 'Charlie', age: 22, grade: 'C' }
        ];

        const result = await studentsCollection.insertMany(studentsToInsert);
        console.log(`${result.insertedCount} documents were inserted`);
    } catch (e) {
        console.error(e);
    } finally {
        await client.close();
    }
}

insertStudents();

在这个示例中,insertMany 方法以默认的 ordered: true 模式执行。如果所有文档都成功插入,result.insertedCount 将返回插入的文档数量。

3. 处理插入错误:有序插入模式下的错误处理

在有序插入模式下,如果某个文档插入失败,后续文档将不会被插入。例如,如果我们对 students 集合中的 name 字段设置了唯一索引:

async function createUniqueIndex() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');
        await studentsCollection.createIndex({ name: 1 }, { unique: true });
        console.log('Unique index created successfully');
    } catch (e) {
        console.error(e);
    } finally {
        await client.close();
    }
}

createUniqueIndex();

然后再次尝试插入包含重复 name 的文档:

async function insertDuplicateStudents() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Alice', age: 20, grade: 'A' },
            { name: 'Alice', age: 21, grade: 'B' }, // 重复的 name
            { name: 'Charlie', age: 22, grade: 'C' }
        ];

        const result = await studentsCollection.insertMany(studentsToInsert);
        console.log(`${result.insertedCount} documents were inserted`);
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

insertDuplicateStudents();

运行上述代码,我们会捕获到一个 E11000 duplicate key error collection 错误,并且只有第一个文档会被插入,因为 orderedtrue,遇到错误后插入操作立即停止。

4. 无序插入:提高插入效率与容错性

当我们设置 ordered: false 时,即使某个文档插入失败,其他文档的插入操作仍会继续。修改上述示例代码如下:

async function insertUnorderedStudents() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Alice', age: 20, grade: 'A' },
            { name: 'Alice', age: 21, grade: 'B' }, // 重复的 name
            { name: 'Charlie', age: 22, grade: 'C' }
        ];

        const result = await studentsCollection.insertMany(studentsToInsert, { ordered: false });
        console.log(`${result.insertedCount} documents were inserted`);
        console.log('Inserted IDs:', result.insertedIds);
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

insertUnorderedStudents();

在这个示例中,即使第二个文档由于 name 重复插入失败,第一个和第三个文档仍会被插入。result.insertedCount 将返回成功插入的文档数量,result.insertedIds 会包含成功插入文档的 _id。

5. 结合 writeConcern 确保数据安全

写入关注点(writeConcern)对于确保数据的安全性和一致性非常重要。例如,我们希望等待大多数副本集成员确认写入操作,并且等待写入操作持久化到磁盘:

async function insertStudentsWithWriteConcern() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'David', age: 23, grade: 'D' },
            { name: 'Eve', age: 24, grade: 'E' }
        ];

        const result = await studentsCollection.insertMany(studentsToInsert, {
            writeConcern: { w: "majority", j: true, wtimeout: 5000 },
            ordered: true
        });
        console.log(`${result.insertedCount} documents were inserted`);
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

insertStudentsWithWriteConcern();

在上述代码中,w: "majority" 表示等待大多数副本集成员确认写入,j: true 表示等待写入操作持久化到磁盘,wtimeout: 5000 表示等待确认的最长时间为 5000 毫秒。如果在规定时间内无法满足写入关注点的要求,将会抛出错误。

6. 批量插入大文档数组

在实际应用中,可能需要批量插入大量文档。然而,如果一次性传递一个非常大的文档数组,可能会导致内存问题或网络超时。为了避免这些问题,可以将大数组分成较小的块进行插入。

例如,假设我们有一个包含 10000 个文档的数组 bigArrayOfDocuments,我们可以将其分成每块 1000 个文档的小块:

async function insertLargeNumberOfDocuments() {
    try {
        await client.connect();
        const db = client.db('bigData');
        const largeCollection = db.collection('largeCollection');

        const bigArrayOfDocuments = [];
        for (let i = 0; i < 10000; i++) {
            bigArrayOfDocuments.push({ data: `Item ${i}` });
        }

        const chunkSize = 1000;
        for (let i = 0; i < bigArrayOfDocuments.length; i += chunkSize) {
            const chunk = bigArrayOfDocuments.slice(i, i + chunkSize);
            const result = await largeCollection.insertMany(chunk);
            console.log(`Inserted ${result.insertedCount} documents in chunk starting at index ${i}`);
        }
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

insertLargeNumberOfDocuments();

通过这种方式,我们可以有效地管理内存,并确保每个插入操作都能成功执行,而不会因为文档数组过大而出现问题。

7. 数据验证与批量插入

在 MongoDB 4.2 及更高版本中,可以在集合上定义验证规则。当使用 insertMany 方法时,插入的文档必须符合这些验证规则。

例如,我们在 students 集合上定义一个验证规则,要求 age 字段必须是大于 0 的整数:

async function createCollectionWithValidation() {
    try {
        await client.connect();
        const db = client.db('school');
        const validationRules = {
            validator: {
                $and: [
                    { age: { $type: 'int', $gt: 0 } }
                ]
            }
        };
        await db.createCollection('students', { validator: validationRules });
        console.log('Collection created with validation rules');
    } catch (e) {
        console.error(e);
    } finally {
        await client.close();
    }
}

createCollectionWithValidation();

然后尝试插入文档:

async function insertStudentsWithValidationCheck() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Frank', age: 25, grade: 'F' },
            { name: 'Grace', age: -1, grade: 'G' } // 不符合验证规则
        ];

        const result = await studentsCollection.insertMany(studentsToInsert);
        console.log(`${result.insertedCount} documents were inserted`);
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

insertStudentsWithValidationCheck();

在这个例子中,由于第二个文档的 age 为 -1,不符合验证规则,插入操作会失败,并抛出一个验证错误。

8. 利用索引优化批量插入性能

适当的索引可以显著提高 insertMany 操作的性能。例如,如果我们经常根据 name 字段查询 students 集合,我们可以为 name 字段创建索引:

async function createIndexForInsertion() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');
        await studentsCollection.createIndex({ name: 1 });
        console.log('Index created successfully');
    } catch (e) {
        console.error(e);
    } finally {
        await client.close();
    }
}

createIndexForInsertion();

在插入文档时,MongoDB 可以利用这个索引更快地定位和插入文档,特别是在处理大量文档时,性能提升会更加明显。然而,需要注意的是,过多的索引可能会增加插入操作的开销,因为每次插入都需要更新索引结构。因此,应该根据实际查询需求合理创建索引。

9. 分布式环境下的批量插入

在分布式 MongoDB 环境(如副本集或分片集群)中,insertMany 操作的行为和性能会有所不同。

在副本集中,写入操作会首先发送到主节点,主节点将操作记录在 oplog 中,然后将 oplog 条目复制到从节点。writeConcern 决定了在返回成功响应之前需要等待多少个副本集成员确认写入。如果设置 w: "majority",主节点会等待大多数副本集成员(包括自己)确认写入后才返回成功。

在分片集群中,文档会根据分片键被路由到不同的分片。当执行 insertMany 时,MongoDB 会并行地将文档插入到各个分片。但是,如果某个分片出现问题,可能会影响整个插入操作。例如,如果某个分片不可用,insertMany 操作可能会失败,除非设置了适当的错误处理和重试机制。

为了在分布式环境中确保高效和可靠的批量插入,建议:

  • 合理设置 writeConcern,根据应用对数据一致性和可用性的要求进行调整。
  • 监控集群状态,及时发现和处理分片或副本集成员的故障。
  • 对于可能失败的插入操作,实现重试逻辑,以提高插入的成功率。

例如,以下是一个简单的重试逻辑示例:

async function insertStudentsWithRetry() {
    const maxRetries = 3;
    let retries = 0;
    let result;

    while (retries < maxRetries) {
        try {
            await client.connect();
            const db = client.db('school');
            const studentsCollection = db.collection('students');

            const studentsToInsert = [
                { name: 'Hank', age: 26, grade: 'H' },
                { name: 'Ivy', age: 27, grade: 'I' }
            ];

            result = await studentsCollection.insertMany(studentsToInsert);
            break;
        } catch (e) {
            retries++;
            console.error(`Insertion attempt ${retries} failed:`, e);
            if (retries === maxRetries) {
                console.error('Max retries reached. Unable to insert documents.');
            }
        } finally {
            await client.close();
        }
    }

    if (result) {
        console.log(`${result.insertedCount} documents were inserted`);
    }
}

insertStudentsWithRetry();

这个示例中,insertMany 操作如果失败会重试,最多重试 3 次,有助于提高在分布式环境中插入操作的成功率。

10. 与其他 MongoDB 操作结合使用

insertMany 方法可以与其他 MongoDB 操作结合使用,以满足更复杂的业务需求。

例如,我们可以在插入文档后立即查询刚刚插入的文档:

async function insertAndQueryStudents() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Jack', age: 28, grade: 'J' },
            { name: 'Kathy', age: 29, grade: 'K' }
        ];

        const insertResult = await studentsCollection.insertMany(studentsToInsert);

        const insertedIds = insertResult.insertedIds.map(id => id.toString());
        const queryResult = await studentsCollection.find({ _id: { $in: insertedIds } }).toArray();
        console.log('Inserted and retrieved students:', queryResult);
    } catch (e) {
        console.error('Operation error:', e);
    } finally {
        await client.close();
    }
}

insertAndQueryStudents();

在这个示例中,插入文档后,我们使用 insertedIds 获取插入文档的 _id,然后通过 find 方法查询并打印出刚刚插入的文档。

另外,我们还可以在插入前对数据进行预处理,例如对文档中的某些字段进行加密处理:

const crypto = require('crypto');

async function encryptAndInsertStudents() {
    try {
        await client.connect();
        const db = client.db('school');
        const studentsCollection = db.collection('students');

        const studentsToInsert = [
            { name: 'Leo', age: 30, grade: 'L', email: 'leo@example.com' },
            { name: 'Mona', age: 31, grade: 'M', email:'mona@example.com' }
        ];

        const encryptedStudents = studentsToInsert.map(student => {
            const encryptedEmail = crypto.createHash('sha256').update(student.email).digest('hex');
            return { ...student, encryptedEmail };
        });

        const result = await studentsCollection.insertMany(encryptedStudents);
        console.log(`${result.insertedCount} documents were inserted`);
    } catch (e) {
        console.error('Insertion error:', e);
    } finally {
        await client.close();
    }
}

encryptAndInsertStudents();

在这个例子中,我们在插入文档前对 email 字段进行了加密处理,然后将加密后的文档插入到集合中。

通过将 insertMany 与其他操作结合使用,可以构建出功能强大且灵活的 MongoDB 数据处理流程。

综上所述,insertMany 方法在 MongoDB 数据插入中是一个非常实用的工具。通过合理运用其各种参数选项,结合数据验证、索引优化、错误处理以及与其他操作的结合,能够高效、安全地批量插入大量文档,满足不同应用场景下的数据存储需求。无论是在小型单节点应用还是大型分布式集群环境中,掌握这些技巧都能帮助开发者更好地利用 MongoDB 的性能和功能。