MongoDB爱好者
垂直技术交流平台

MongoDB write写(增、删、改)模块源码实现

前面的《transport_layer网络传输层模块源码实现》和《command命令处理模块源码实现》详细的分析了MongoDB内核网络数据收发过程以及命令解析处理的整个过程,本文将继续分析该系列的第三个子模块-《write写(增、删、改)模块源码实现》。

 

1. write写模块与command命令处理模块衔接回顾

上面两图是command命令处理模块的大体流程,最终经过command模块处理后,会执行对应的命令run接口,本文要分析的write模块也将从本入口入手。增、删、改三个最基本的写操作对应的命令入口如下表:

操作类型 命令Run()入口
CmdInsert::runImpl()
CmdDelete::runImpl()
CmdUpdate::runImpl()

MongoDB内核write模块主要由如下目录代码实现:

下面章节将分析增删改操作的详细内核实现流程,注意包括请求序列化解析存储、insert写入流程、update更新计划执行器、delete删除计划执行器等。

 

2. 增、删、改序列化解析及结构化统一存储

本章节详细分析增、删、改三个操作的序列化解析及结构化统一存储核心实现过程。

 

2.1 增删改写入操作语法及其主要含义说明

insert插入语法及说明

insert主要完成数据的写入操作,其命令语法如下:

{ 

  insert: <collection>, 

  documents: [ <document>, <document>, <document>, ... ], 

  ordered: <boolean>, 

  writeConcern: { <write concern> }, 

  bypassDocumentValidation: <boolean> 

} 

insert操作主要由五个字段类型组成,具体字段功能说明如下:

字段名 功能说明
insert 集合名
documents 具体的文档内容
ordered 一次性插入多条文档数据,前面的数据写入失败,是否继续后面的数据写入
writeConcern writeConcern写策略配置,写多少个节点才算成功
bypassDocumentValidation 是否进行validator相关schema文档验证

update更新语法及说明

update操作实现数据更新操作,其命令语法如下:

{ 

  update: <collection>, 

  updates: [ 

   { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>, 

    collation: <document>, arrayFilters: <array> }, 

   { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>, 

    collation: <document>, arrayFilters: <array> }, 

   { q: <query>, u: <update>, upsert: <boolean>, multi: <boolean>, 

    collation: <document>, arrayFilters: <array> }, 

     ... 

    ], 

    ordered: <boolean>, 

    writeConcern: { <write concern> }, 

    bypassDocumentValidation: <boolean> 

  } 

 

上述语法各字段功能说明如表:

字段名 功能说明
update 对那个表做update操作
updates.q 查询条件
updates.u 更新操作方法
updates.upsert 如果需要更新的数据不存在,是否直接插入新数据
updates.multi query满足条件数据有多条,是只更新一条还是多条一起更新
updates.collation 根据不同语言定义不同排序规则
updates.arrayFilters 数组中成员内容跟新
ordered 一次更新多条文档数据,前面的数据更新失败,是否继续后面的数据更新操作
writeConcern 更新多少个节点成功才返回OK
bypassDocumentValidation 是否进行validator相关schema文档验证

 

update更新语法及说明

delete删除操作对应语法如下:

 {

 delete: <collection>,

deletes: [

{ q : <query>, limit : <integer>, collation: <document> },

 { q : <query>, limit : <integer>, collation: <document> }, 

 { q : <query>, limit : <integer>, collation: <document> }, 

  ... 

  ],

 ordered: <boolean>, 

 writeConcern: { <write concern> }

}

如上,delete语法各个字段功能说明如下:

字段名 功能说明
delete 对那个表做delete操作
deletes.q 需要删除那一部分数据,也就是删除数据的条件
deletes.limit 删除所有满足条件的数据还是只删除一条,取值0或1
deletes.collation 根据不同语言定义不同排序规则
ordered 删除一批数据,如果前面某数据删除失败,是否还需要删除后面满足条件的数据
writeConcern 删除多少个节点成功才返回OK

2.2 增、删、改序列化解析

2.2.1 增、删、改核心数据结构

从上面的insert、delete、update语法可以看出,这三个操作有一部分字段名是一样的,内核在代码实现的时候也重复利用了这一特定,把这部分成员抽象为公共类,不同的字段则在各自操作类中封装。

最终,三个操作的字段信息通过公用类WriteCommandBase和各自私有类Insert、Update、Delete保持及解析封装。如下图所示:

公共基类由WriteCommandBase类实现,如下:

class WriteCommandBase {  

public: 

  //基类接口 

  ...... 

  //MongoDB字段验证规则(schema validation) 

  bool _bypassDocumentValidation{false}; 

  //一次对多条数据进行插入或者删除或者更新的时候,前面的数据操作失败,是否继续后面的操作 

  bool _ordered{true}; 

  //事务相关,等4.2版本回头分析 

    boost::optional<std::vector<std::int32_t>> _stmtIds; 

  } 

 

Insert类包含WriteCommandBase类成员,同时包括Insert操作对应的私有成员信息,如下:

class Insert {  

public: 

  ...... 

  //也就是db.collection 

  NamespaceString _nss; 

  //公共结构信息 

  WriteCommandBase _writeCommandBase; 

  //真正的文档在这里documents 

  std::vector<mongo::BSONObj> _documents; 

    //库信息 

    std::string _dbName; 

    //是否有documents 

  } 

 

delete删除操作对应Delete类核心成员信息如下:

class Delete {  

public: 

  ...... 

  //DB.COLLECTION信息 

  NamespaceString _nss; 

  WriteCommandBase _writeCommandBase; 

  //具体的delete内容在这里 

  std::vector<DeleteOpEntry> _deletes; 

} 

 

update更新操作对应的Update类核心成员信息如下:

class Update {  

public: 

   ...... 

  //db.collection信息,也就是库.表信息 

  NamespaceString _nss; 

  WriteCommandBase _writeCommandBase; 

  //需要更新的具体内容在该成员中   

  std::vector<UpdateOpEntry> _updates; 

} 

上面的类结构中,documents、deletes、_updates三个成员分别对应增、删、改操作的集体操作信息,都是数组类型,可以一次进行多条数据操作。

2.2.2 增、删、改解析过程

增删改三个操作对应三个不同的类,由这三个类来完成各自操作的协议解析及封装,整体代码实现大同小异,本文只分析insert解析及封装过程,主要代码实现如下:

Insert Insert::parse(const IDLParserErrorContext& ctxt, const BSONObj& bsonObject) { 

  ...... 

  //调用Insert::parseProtected 

  object.parseProtected(ctxt, bsonObject); 

  return object; 

} 

 

void Insert::parseProtected(...) 

{ 

    //解析出insert类的对应成员信息 

    for (const auto& element :request.body) { 

      const auto fieldName = element.fieldNameStringData(); 

   

      //解析bypassDocumentValidation信息 

      if (fieldName == kBypassDocumentValidationFieldName) { 

         ...... 

      } 

      //解析ordered信息 

      else if (fieldName == kOrderedFieldName) { 

         ...... 

      } 

      //解析stmtIds信息 

      else if (fieldName == kStmtIdsFieldName) { 

         ...... 

      } 

      //解析需要插入的文档信息 

      else if (fieldName == kDocumentsFieldName) { 

        //解析的文档保持到_documents数组 

        _documents = std::move(values); 

      } 

      //解析db名 

      else if (fieldName == kDbNameFieldName) { 

        ...... 

      } 

      ...... 

    } 

    //从request中解析出_writeCommandBase基础成员内容 

    _writeCommandBase = WriteCommandBase::parse(ctxt, request.body); 

   

    ...... 

    //根据db+collection构造出db.collection字符串 

    _nss = ctxt.parseNSCollectionRequired(_dbName, commandElement); 

  } 

和insert操作类似,update和delete操作的解析过程与insert流程一样比较简单,因此不在分析。

最终,所有解析出的数据保存到各自类中,总结如下图所示:

此外,增删改操作的序列化封装由write_ops_gen.cpp中的Insert::serialize()、Update::serialize()、Delete::serialize()完成,主要根据各自类完成Bson统一封装,整个实现过程比较简单,这里不在详细分析。

增删改接口解析及序列化相关几个核心接口功能说明如下:

函数接口 功能说明
write_ops::Insert InsertOp::parse(…) insert操作解析
Insert::toBSON(…) insert Bson序列化
write_ops::Update UpdateOp::parse(…) update操作解析
Update::toBSON(…) update Bson序列化
write_ops::Delete DeleteOp::parse(…) delete操作解析
DeleteOp::toBSON(…) delete Bson序列化

注意:在insert、update、delete中还有如下一个细节,为何不见writeConcern相关成员存储?原因是writeConcern解析放到了外层runCommandImpl中通过setWriteConcern()保持到该请求对应得opCtx操作上下文中。

 

3. Insert数据写操作核心实现

insert处理和command命令处理模块通过CmdInsert::runImpl()衔接,该接口代码实现如下:

//插入文档会走这里面 CmdInsert::runImpl 

void runImpl(...) final { 

  //从request中解析出write_ops::Insert类成员信息 

  const auto batch = InsertOp::parse(request); 

  const auto reply = performInserts(opCtx, batch); 

  ...... 

} 

InsertOp::parse()在前面章节已经分析,主要完成数据的统一解析存储。insert请求解析存储到write_ops::Insert类后,开始调用performInserts(…)处理。在该接口中完成如下流程:分批数据组装、批量数据写入、事务封装、写入存储引擎等。

 

3.1 数据分批组装

由于inset一次可以插入多条数据,为了最大化满足性能要求,当写入数据很多的时候,MongoDB内核通过把这些数据按照指定规则拆分到多个batch中,这样每个batch代表一批数据,然后进行统一处理。分批数据组装拆分过程核心代码实现如下:

//数据分批写入核心代码实现 

WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) { 

  ....... 

  //写入数据成功后的会掉处理 

  //主要完成表级tps及时延统计 

  ON_BLOCK_EXIT([&] { 

  //performInserts执行完成后调用,记录执行结束时间  

    curOp.done();   

    //表级tps及时延统计 

      Top::get(opCtx->getServiceContext()) 

        .record(...); 

   

    }); 

   

    ...... 

    size_t bytesInBatch = 0; 

    //batch数组 

    std::vector<InsertStatement> batch;  

    //默认64,可以通过db.adminCommand( { setParameter: 1, internalInsertMaxBatchSize:xx } )配置 

    const size_t maxBatchSize = internalInsertMaxBatchSize.load(); 

    //当写入的数据小于64时,也就是一个batch即可一起处理 

    //batch最大限制为写入数据大于64或者batch中总字节数超过256K 

    batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize)); 

    for (auto&& doc : wholeOp.getDocuments()) { 

    ...... 

    //doc检查,例如是否嵌套过多,是否一个doc带有多个_id等 

      auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc); 

    //如果这个文档检测有异常,则跳过这个文档,进行下一个文档操作 

      if (!fixedDoc.isOK()) {  

        //啥也不做,直接忽略该doc 

      } else { 

        //事务相关,先忽略,以后会回头专门分析事务 

        const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); 

        ...... 

      //把文档插入到batch数组 

        BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); 

        batch.emplace_back(stmtId, toInsert); 

        bytesInBatch += batch.back().doc.objsize(); 

      //这里continue,就是为了把批量插入的文档组成到一个batch数组中,到达一定量一次性插入 

      //batch里面一次最多插入64个文档或者总字节数256K,则后续的数据拆分到下一个batch 

        if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) 

          continue; // Add more to batch before inserting. 

      } 

   

    //把本batch中的数据交由该接口统一处理 

      bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out); 

    //清空batch,开始下一轮处理 

      batch.clear();   

      bytesInBatch = 0; 

    ...... 

  } 

上面的代码可以总结为以下图形:

说明,上面假设64条数据总大小不超过256KB的batch图,如果64条doc文档数据总大小超过256kb,这时候阀值则以总数据256K为限制。单个batch最大上限限制条件如下:

  • 最多64个doc文档数据。

  • 单个batch总数据长度不超过256Kb。

 

3.2 batch数据事务写入流程及其异常补偿机制

一批数据通过分批拆分存入多个batch后,调用insertBatchAndHandleErrors()接口来完成单个batch的数据写入。整个batch数据写入可以在一个transaction事务完成,也可以一条数据一个事务来完成写入,具体核心代码实现如下:

bool insertBatchAndHandleErrors(...) { 

  ...... 

  try { 

    //如果对应collection不存在则创建 

    acquireCollection(); //执行上面定义的函数 

    //如果collection不是固定capped集合,并且batch中数据大于一条 

    //则试着在一个事务中一次性写入所有的数据 

    if (!collection->getCollection()->isCapped() && batch.size() > 1) {  

      ...... 

        //为什么这里没有检查返回值?默认全部成功? 实际上通过try catch获取到异常后,再后续改为一条一条插入 

        insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end()); 

        //insert统计计数及返回值赋值 

        globalOpCounters.gotInserts(batch.size()); 

        ...... 

        std::fill_n(std::back_inserter(out->results), batch.size(), std::move(result)); 

        curOp.debug().ninserted += batch.size(); 

        //一个事务写入多个doc成功,直接返回 

        return true; 

      } 

    } catch (const DBException&) { //批量写入失败,则后面一条一条的写 

      collection.reset(); 

      //注意这里没有return,在后续一条一个事务写入 

    } 

   

    //这里循环解析batch,实现一条数据一个在一个事务中处理 

    for (auto it = batch.begin(); it != batch.end(); ++it) { 

      globalOpCounters.gotInsert(); //insert操作计数 

      try { 

        //log() << "yang test ............getNamespace().ns():" << wholeOp.getNamespace().ns(); 

        //writeConflictRetry里面会执行{}中的函数体  

        writeConflictRetry(opCtx, "insert", wholeOp.getNamespace().ns(), [&] { 

          try { 

            ...... 

            //把该条文档插入  

            insertDocuments(opCtx, collection->getCollection(), it, it + 1); 

            //统计计数处理 

            SingleWriteResult result; 

            result.setN(1); 

            out->results.emplace_back(std::move(result)); 

            curOp.debug().ninserted++; 

          } catch (...) { 

            ......

          } 

        }); 

      } catch (const DBException& ex) {//写入异常 

        //注意这里,如果失败是否还可以继续后续数据的写入 

        bool canContinue = 

          handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out); 

        if (!canContinue) 

          return false; //注意这里直接退出循环,也就是本批次数据后续数据没有写入了 

      } 

    } 

   

    return true; 

  } 

 

一批batch数据(假设64条)写入过程,如果不是capped固定集合,则这64条数据首先放入一个transaction事务中完成写入。如果写入异常,则继续一个事务一条数据写入。数据放入事务执行流程如下:

void insertDocuments(OperationContext* opCtx, 

           Collection* collection, 

           std::vector<InsertStatement>::iterator begin, 

           std::vector<InsertStatement>::iterator end) 

  //事务开始 

  WriteUnitOfWork wuow(opCtx); 

  ...... 

  //把数组begin到end之间的所有doc文档数据放入该事务中 

  uassertStatusOK(collection->insertDocuments( 

      opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true)); 

    //事务结束 

    wuow.commit(); //WriteUnitOfWork::commit 

  } 

到这里后,insert操作在write模块中的流程就结束了,后续的doc写入流程存储引擎将交由storage模块实现。

上面的核心代码分析可以总结为如下总结:

当这个batch中的数据放入同一个事务执行失败后,则改为一条一个事务循环处理,如下图所示:

3.3 中间数据写入异常如何处理

假设一个batch数据64条数据,如果第23条数据写入失败了,后续的第24-64条数据是否需要继续写入,这就是本章节需要分析的问题。MongoDB内核实现的时候通过handleError()接口判断是否需要继续写入,该接口代码如下:

//前面数据写入失败,是否可以继续后续数据写入

bool handleError(...) { 

  ...... 

 

  //判断是什么原因引起的异常,从而返回不同的值 

  //如果是isInterruption错误,直接返回true,意思是不需要后续数据写入 

  if (ErrorCodes::isInterruption(ex.code())) { 

    //如果是interrupt异常,则整批数据写失败,也就是不进行后续数据写入 

    throw; // These have always failed the whole batch. 

    } 

   

    ...... 

    //如果ordered为false则忽略这条写入失败的数据,继续后续数据写入 

    return !wholeOp.getOrdered(); 

  } 

从上面的代码可以看出,只要出现以下异常情况,就不可继续后续数据insert写入操作了,如下:

Interruption错误:包括Interrupted、InterruptedAtShutdown、ExceededTimeLimit、InterruptedDueToReplStateChange四种异常,其他异常情况可以继续写入。

ordered参数配置为false: 如果该配置为false则遇到异常不继续处理后续doc写入。

 

写入异常后是否继续写总结如下图所示:

 

3.4 后续

通过前面的分析可以得出,MongoDB内核把多条doc文档按照指定限制把文档封装到不同batch中,然后一个batch一个batch分批处理。最终,这些batch对应数据将会通过MongoDB内核的storage存储模块来完成insert事务处理,最终在CollectionImpl::insertDocuments()实现。

Insert写入流程核心接口调用关系图如下:

说明:数据如何组装存入wiredtiger存储引擎将在后续《storage存储模块源码实现》中详细分析。

 

4. delete删除操作核心实现

delete数据删除通过命令处理模块中的CmdDelete::runImpl(…) ->performDeletes接口完成和write写模块delete操作对接,下面我们分析该接口核心代码实现,如下:

WriteResult performDeletes(...) 

{ 

  ...... 

 

  //singleOp类型为DeleteOpEntry   write_ops::Delete::getDeletes 

  for (auto&& singleOp : wholeOp.getDeletes()) { 

    //事务相关,先跳过,以后相关章节专门分析 

    const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); 

    ...... 

   

      //该函数接口执行完后执行该finishCurOp 

      //finishCurOp实现表级QPS及时延统计 本op操作的慢日志记录等 

      ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); }); 

      try { 

        lastOpFixer.startingOp(); 

        out.results.emplace_back( 

          //该delete op操作真正执行在这里,singleOp类型为DeleteOpEntry 

          performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp)); 

        lastOpFixer.finishedOpSuccessfully(); 

      } catch (const DBException& ex) { 

        ...... 

    } 

   

    return out; 

  } 

 

从上面代码分析可以看出,如果wholeOp携带有多个DeleteOpEntry(也就是singleOp )操作,则循环对singleOp 进行处理,这个处理过程由performSingleDeleteOp(…)接口实现,具体如下:

 

performSingleDeleteOp(…)接口核心代码实现如下:

static SingleWriteResult performSingleDeleteOp(...) { 

  ...... 

 

  //根据ns构造DeleteReques 

  //根据请求相关信息初始化赋值DeleteRequest 

  DeleteRequest request(ns); 

  request.setQuery(op.getQ()); 

  request.setCollation(write_ops::collationOf(op)); 

  request.setMulti(op.getMulti()); 

    request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated. 

    request.setStmtId(stmtId); 

   

    //根据DeleteRequest构造ParsedDelete 

    ParsedDelete parsedDelete(opCtx, &request); 

    //从request解析出对应成员存入parsedDelete 

    uassertStatusOK(parsedDelete.parseRequest()); 

    //检查该请求是否已经被kill掉了 

    opCtx->checkForInterrupt(); 

   

    ...... 

    //写必须走主节点判断及版本判断 

    assertCanWrite_inlock(opCtx, ns); 

   

    //从查询引擎中获取delete执行器 

    auto exec = uassertStatusOK( 

      getExecutorDelete(opCtx, &curOp.debug(), collection.getCollection(), &parsedDelete)); 

   

    { 

      stdx::lock_guard<Client> lk(*opCtx->getClient()); 

      CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); 

    } 

   

    //运行该执行器 

    uassertStatusOK(exec->executePlan()); 

   

    //下面流程是记录各种统计信息 

    long long n = DeleteStage::getNumDeleted(*exec); 

    curOp.debug().ndeleted = n; 

   

    PlanSummaryStats summary; 

    //获取执行器运行过程中的各种统计信息 

    Explain::getSummaryStats(*exec, &summary); 

    if (collection.getCollection()) { 

      collection.getCollection()->infoCache()->notifyOfQuery(opCtx, summary.indexesUsed); 

    } 

    curOp.debug().setPlanSummaryMetrics(summary); 

    //统计信息序列化 

    if (curOp.shouldDBProfile()) { 

      BSONObjBuilder execStatsBob; 

      Explain::getWinningPlanStats(exec.get(), &execStatsBob); 

      curOp.debug().execStats = execStatsBob.obj(); 

    } 

     

    ...... 

    return result; 

  } 

 

该接口最核心的部分为获取delete执行器并运行,执行器由query查询引擎模块实现,因此getExecutorDelete(…)获取delete执行器及其运行过程具体实现流程将在后续《query查询引擎模块实现原理》章节详细分析,这里暂时跳过这一逻辑。write模块中delete操作主要接口调用流程如下:

 

5. update更新操作核心实现

update数据更新操作过程和delete操作过程类似,这里不在累述,其核心接口调用流程如下图所示:

6. 下期预告

下期将分析《storage存储模块源码实现》,storage模块分析完成后将分析MongoDB最复杂的《query查询引擎源码实现》,敬请关注。

更多文章:

干货!万亿级数据库MongoDB集群性能优化实践合辑(下)

干货!万亿级数据库MongoDB集群性能优化实践合辑(上)

常用高并发网络线程模型设计及MongoDB线程模型优化实践

MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计

OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

盘点 2020 | 我要为分布式数据库 MongoDB 在国内影响力提升及推广做点事

MongoDB网络传输层模块源码实现二

MongoDB网络传输层模块源码实现三

MongoDB网络传输层模块源码实现四

作者:杨亚洲

前滴滴出行技术专家,现任OPPO文档数据库MongoDB负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库MongoDB内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:

https://github.com/y123456yz

 

5月29日长沙参加MongoDB中文社区2021年第一场线下大会吧!互联网证券及金融系统实践案例、物联网实时数据融合平台案例分享,存储引擎、数据迁移同步等诸多MongoDB一手实践干货在现场等你来!

 

长沙大会速递Conference

时间2021年5月29日(周六) 13:30 – 17:30

地点长沙市岳麓区中南大学科技园(研发)总部1号栋3楼309多功能厅

名额: 150人

报名链接: https://sourl.cn/EJNURJ

* 9.9元票请添加小芒果(微信ID:mongoingcom)按照说明获取。

* 分享活动还有机会领取MongoDB周边礼品!

您也可以直接阅读相关活动介绍:2021年MongoDB中文社区长沙大会 | 相聚岳麓,共话mongo!

 

大会奖品Prizes

赞(15)
未经允许不得转载:MongoDB中文社区 » MongoDB write写(增、删、改)模块源码实现

评论 抢沙发

评论前必须登录!