MongoDB爱好者
垂直技术交流平台

MongoDB Command命令处理模块源码实现一

1. 背景


<<transport_layer网络传输层模块源码实现>>中分享了MongoDB内核底层网络IO处理相关实现,包括套接字初始化、一个完整MongoDB报文的读取、获取到DB数据发送给客户端等。MongoDB支持多种增、删、改、查、聚合处理、cluster处理等操作,每个操作在内核实现中对应一个command,每个command有不同的功能,MongoDB内核如何进行command源码处理将是本文分析的重点。

此外,MongoDB提供了mongostat工具来监控当前集群的各种操作统计。Mongostat监控统计如下图所示:

其中,insert、delete、update、query这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore不是很好理解,command代表什么统计?getMore代表什么统计?这两项相对比较难理解。

此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

Command命令处理模块分为:mongos操作命令、mongod操作命令、MongoDB集群内部命令,具体定义如下:

① mongos操作命令,客户端可以通过mongos访问集群相关的命令。

② mongod操作命令:客户端可以通过mongod复制集和cfg server访问集群的相关命令。

③ MongoDB集群内部命令:mongos、mongod、mongo-cfg集群实例之间交互的命令。

Command命令处理模块核心代码实现如下:

《Command命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2. <<transport_layer网络传输层模块源码实现>>衔接回顾

<<transport_layer网络传输层模块源码实现三>>一文中,我们对service_state_machine状态机调度子模块进行了分析,该模块中的dealTask任务进行MongoDB内部业务逻辑处理,其核心实现如下:

//dealTask处理  
void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
  ......
  //command处理、DB访问后的数据通过dbresponse返回  
  DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
  ......
}

上面的sep对应mongod或者mongos实例的服务入口实现,该seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现。SSM状态机的_seq成员初始化赋值核心代码实现如下:

//mongos实例启动初始化  
static ExitCode runMongosServer() {  
  ......  
  //mongos实例对应sep为ServiceEntryPointMongos  
  auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  
  getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  
  ......  
}  
 
//mongod实例启动初始化  
ExitCode _initAndListen(int listenPort) {  
  ......  
  //mongod实例对应sep为ServiceEntryPointMongod  
  serviceContext->setServiceEntryPoint(  
      stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  
  ......  
}  
 
//SSM状态机初始化  
ServiceStateMachine::ServiceStateMachine(...)  
  : _state{State::Created},  
    //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量  
    _sep{svcContext->getServiceEntryPoint()},  
    ......  
}

通过上面的几个核心接口实现,把mongos和mongod两个实例的服务入口与状态机SSM(ServiceStateMachine)联系起来,最终和下面的command命令处理模块关联。

dealTask进行一次MongoDB请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于mongos和mongod服务入口分别由ServiceEntryPointMongos和ServiceEntryPointMongod两个类实现,因此dealTask也就演变为如下接口处理:

① mongos实例:ServiceEntryPointMongos::handleRequest(…)

② Mongod实例::ServiceEntryPointMongod::handleRequest(…)

这两个接口入参都是OperationContext和Message,分别对应操作上下文、请求原始数据内容。下文会分析Message解析实现、OperationContext服务上下文实现将在后续章节分析。

Mongod和mongos实例服务入口类都继承自网络传输模块中的ServiceEntryPointImpl类,如下图所示:

Tips: mongos和mongod服务入口类为何要继承网络传输模块服务入口类?

原因是一个请求对应一个链接session,该session对应的请求又和SSM状态机唯一对应。所有客户端请求对应的SSM状态机信息全部保存再ServiceEntryPointImpl._sessions成员中,而command命令处理模块为SSM状态机任务中的dealTask任务,通过该继承关系,ServiceEntryPointMongod和ServiceEntryPointMongos子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的session链接信息。

3. MongoDB协议解析

《transport_layer网络传输层模块源码实现二》中的数据收发子模块完成了一个完整MongoDB报文的接收,一个MongoDB报文由Header头部+opCode包体组成,如下图所示:

上图中各个字段说明如下表:

 Header or body  字段名
    msg header  messageLength  整个message长度,包括header长度和body长度
 requestID  该请求id信息
 responseTo  应答id
 opCode  操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等
 msg body  Body  不同opCode对应的包体内容

opCode取值比较多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分别针对增删改查请求,MongoDB从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以OP_MSG操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG请求协议格式如下:

OP_MSG {  
  //MongoDB报文头部  
  MsgHeader header;            
  //位图,用于标识报文是否需要校验 是否需要应答等  
  uint32 flagBits;           // message flags  
  //报文内容,例如find write等命令内容通过bson格式存在于该结构中  
  Sections[] sections;       // data sections  
  //报文CRC校验  
  optional<uint32> checksum; // optional CRC-32C checksum  
}  

OP_MSG各个字段说明如下表:

 字段名  功能说明
 header  MongoDB报文头部,详见msg header说明
 flagBits  OP_MSG位图信息,其中0-15位只有第0和第1位有效,16-31位为可选位,各位功能说明如下:Bit-0: 如果该位置1,则说明报文需要校验  Bit-1: 标记是否需要应答客户端  Bit-[2-15]: 第2-第15位,必须位0,否则解析异常  Bit-[16-31]:可选位,3.6版本意义不大。
 sections  各个请求的命令内容都在该结构中,sections=kind+body组成,kind分为两种类型:Kind-0:客户端的真正请求内容,kind后紧跟的bson格式数据即为命令请求  Kind-1:Document Sequence,文档序列号,暂时不知道具体用途。
 checksum  如果有校验标记,用于报文校验,默认4字节

一个完整OP_MSG请求格式如下:

除了通用头部header外,客户端命令请求实际上都保存于sections字段中,该字段存放的是请求的原始bson格式数据。BSON是由10gen开发的一个数据格式,目前主要用于MongoDB中,是MongoDB的数据存储格式。BSON基于JSON格式,选择JSON进行改造的原因主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具有以下特性:

① Lightweight(更轻量级)

② Traversable(易操作)

③ Efficient(高效性能)

本文重点不是分析bson协议格式,bson协议实现细节将在后续章节分享。bson协议更多设计细节详见:http://bsonspec.org/

总结:一个完整MongoDB报文由header+body组成,其中header长度固定为16字节,body长度等于messageLength-16。Header部分协议解析由message.cpp和message.h两源码文件实现,body部分对应的OP_MSG类请求解析由op_msg.cpp和op_msg.h两源码文件实现。

4. MongoDB报文通用头部解析及封装源码实现

Header头部解析由src/mongo/util/net目录下message.cpp和message.h两文件完成,该类主要完成通用header头部和body部分的解析、封装。因此报文头部核心代码分为以下两类:

① 报文头部内容解析及封装(MSGHEADER命名空间实现)

② 头部和body内容解析及封装(MsgData命名空间实现)

4.1 MongoDB报文头部解析及封装核心代码实现

MongoDB报文头部解析由namespace MSGHEADER {…}实现,该类主要成员及接口实现如下:

namespace MSGHEADER {  
//header头部各个字段信息  
struct Layout {  
  //整个message长度,包括header长度和body长度  
  int32_t messageLength;    
  //requestID 该请求id信息  
  int32_t requestID;        
  //getResponseToMsgId解析  
  int32_t responseTo;        
  //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等  
  int32_t opCode;  
};  
 
//ConstView实现header头部数据解析  
class ConstView {  
public:  
  ......  
  //初始化构造  
  ConstView(const char* data) : _data(data) {}  
  //获取_data地址  
  const char* view2ptr() const {  
      return data().view();  
  }  
  //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用  
  //解析header头部的messageLength字段  
  int32_t getMessageLength() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  
  }  
  //解析header头部的requestID字段  
  int32_t getRequestMsgId() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  
  }  
  //解析header头部的getResponseToMsgId字段  
  int32_t getResponseToMsgId() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  
  }  
  //解析header头部的opCode字段  
  int32_t getOpCode() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  
  }  
 
protected:  
  //MongoDB报文数据起始地址  
  const view_type& data() const {  
      return _data;  
  }  
private:  
  //数据部分  
  view_type _data;  
};  
 
//View填充header头部数据  
class View : public ConstView {  
public:  
  ......  
  //构造初始化  
  View(char* data) : ConstView(data) {}  
  //header起始地址  
  char* view2ptr() {  
      return data().view();  
  }  
  //以下四个接口进行header填充  
  //填充header头部messageLength字段  
  void setMessageLength(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  
  }  
  //填充header头部requestID字段  
  void setRequestMsgId(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, requestID));  
  }  
  //填充header头部responseTo字段  
  void setResponseToMsgId(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  
  }  
  //填充header头部opCode字段  
  void setOpCode(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, opCode));  
  }  
private:  
  //指向header起始地址  
  view_type data() const {  
      return const_cast<char*>(ConstView::view2ptr());  
  }  
};  
}

从上面的header头部解析、填充的实现类可以看出,header头部解析由MSGHEADER::ConstView实现;header头部填充由MSGHEADER::View完成。实际上代码实现上,通过offsetof来进行移位,从而快速定位到头部对应字段。

4.2 MongoDB报文头部+body解析封装核心代码实现

Namespace MSGHEADER{…}命名空间只负责header头部的处理,namespace MsgData{…}命名空间相对MSGHEADER命名空间更加完善,除了处理头部解析封装外,还负责body数据起始地址维护、body数据封装、数据长度检查等。MsgData命名空间核心代码实现如下:

mespace MsgData {  
ruct Layout {  
//数据填充组成:header部分  
MSGHEADER::Layout header;  
//数据填充组成: body部分,body先用data占位置  
char data[4];  
 

解析header字段信息及body其实地址信息  
ass ConstView {  
blic:  
//初始化构造  
ConstView(const char* storage) : _storage(storage) {}  
//获取数据起始地址  
const char* view2ptr() const {  
    return storage().view();  
}  

//以下四个接口间接执行前面的MSGHEADER中的头部字段解析  
//填充header头部messageLength字段  
int32_t getLen() const {  
    return header().getMessageLength();  
}  
//填充header头部requestID字段  
int32_t getId() const {  
    return header().getRequestMsgId();  
}  
//填充header头部responseTo字段  
int32_t getResponseToMsgId() const {  
    return header().getResponseToMsgId();  
}  
//获取网络数据报文中的opCode字段  
NetworkOp getNetworkOp() const {  
    return NetworkOp(header().getOpCode());  
}  
//指向body起始地址  
const char* data() const {  
    return storage().view(offsetof(Layout, data));  
}  
//messageLength长度检查,opcode检查  
bool valid() const {  
    if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  
        return false;  
    if (getNetworkOp() < 0 || getNetworkOp() > 30000)  
        return false;  
    return true;  
}  
......  
otected:  
//获取_storage  
const ConstDataView& storage() const {  
    return _storage;  
}  
//指向header起始地址  
MSGHEADER::ConstView header() const {  
    return storage().view(offsetof(Layout, header));  
}  
ivate:  
//MongoDB报文存储在这里  
ConstDataView _storage;  
 

填充数据,包括Header和body  
ass View : public ConstView {  
blic:  
//构造初始化  
View(char* storage) : ConstView(storage) {}  
......  
//获取报文起始地址  
char* view2ptr() {  
    return storage().view();  
}  

//以下四个接口间接执行前面的MSGHEADER中的头部字段构造  
//以下四个接口完成msg header赋值  
//填充header头部messageLength字段  
void setLen(int value) {  
    return header().setMessageLength(value);  
}  
//填充header头部messageLength字段  
void setId(int32_t value) {  
    return header().setRequestMsgId(value);  
}  
//填充header头部messageLength字段  
void setResponseToMsgId(int32_t value) {  
    return header().setResponseToMsgId(value);  
}  
//填充header头部messageLength字段  
void setOperation(int value) {  
    return header().setOpCode(value);  
}  

using ConstView::data;  
//指向data  
char* data() {  
    return storage().view(offsetof(Layout, data));  
}  
ivate:  
//也就是报文起始地址  
  DataView storage() const {  
      return const_cast<char*>(ConstView::view2ptr());  
  }  
  //指向header头部  
  MSGHEADER::View header() const {  
      return storage().view(offsetof(Layout, header));  
  }  
};  
 
......  
//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度  
const int MsgDataHeaderSize = sizeof(Value) - 4;  
 
//除去头部后的数据部分长度  
inline int ConstView::dataLen() const {  
  return getLen() - MsgDataHeaderSize;  
}  
} // namespace MsgData  

和MSGHEADER命名空间相比,MsgData这个namespace命名空间接口实现和前面的MSGHEADER命名空间实现大同小异。MsgData不仅仅处理header头部的解析组装,还负责body部分数据头部指针指向、头部长度检查、opCode检查、数据填充等。其中,MsgData命名空间中header头部的解析构造底层依赖MSGHEADER实现。

4.3 Message/DbMessage核心代码实现

《transport_layer网络传输层模块源码实现二》中,从底层ASIO库接收到的MongoDB报文是存放在Message结构中存储,最终存放在ServiceStateMachine._inMessage成员中。

在前面第2章我们知道mongod和mongso实例的服务入口接口handleRequest(…)中都带有Message入参,也就是接收到的Message数据通过该接口处理。Message类主要接口实现如下:

//DbMessage._msg成员为该类型  
class Message {  
public:  
  //message初始化  
  explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  
  //头部header数据  
  MsgData::View header() const {  
      verify(!empty());  
      return _buf.get();  
  }  
  //获取网络数据报文中的op字段  
  NetworkOp operation() const {  
      return header().getNetworkOp();  
  }  
  //_buf释放为空  
  bool empty() const {  
      return !_buf;  
  }  
  //获取报文总长度messageLength  
  int size() const {  
      if (_buf) {  
          return MsgData::ConstView(_buf.get()).getLen();  
      }  
      return 0;  
  }  
  //body长度  
  int dataSize() const {  
      return size() - sizeof(MSGHEADER::Value);  
  }  
  //buf重置  
  void reset() {  
      _buf = {};  
  }  
  // use to set first buffer if empty  
  //_buf直接使用buf空间  
  void setData(SharedBuffer buf) {  
      verify(empty());  
      _buf = std::move(buf);  
  }  
    //把msgtxt拷贝到_buf中  
  void setData(int operation, const char* msgtxt) {  
      setData(operation, msgtxt, strlen(msgtxt) + 1);  
  }  
  //根据operation和msgdata构造一个完整MongoDB报文  
  void setData(int operation, const char* msgdata, size_t len) {  
      verify(empty());  
      size_t dataLen = len + sizeof(MsgData::Value) - 4;  
      _buf = SharedBuffer::allocate(dataLen);  
      MsgData::View d = _buf.get();  
      if (len)  
          memcpy(d.data(), msgdata, len);  
      d.setLen(dataLen);  
      d.setOperation(operation);  
  }  
  ......  
  //获取_buf对应指针  
  const char* buf() const {  
      return _buf.get();  
  }  
 
private:  
  //存放接收数据的buf  
  SharedBuffer _buf;  
};  

Message是操作MongoDB收发报文最直接的实现类,该类主要完成一个完整MongoDB报文封装。有关MongoDB报文头后面的body更多的解析实现在DbMessage类中完成,DbMessage类包含Message类成员msg。实际上,Message报文信息在handleRequest(…)实例服务入口中赋值给DbMessage.msg,报文后续的body处理继续由DbMessage类相关接口完成处理。DbMessage和Message类关系如下:

class DbMessage {  
  ......  
  //包含Message成员变量  
  const Message& _msg;  
  //MongoDB报文起始地址
  const char* _nsStart;
  //报文结束地址
  const char* _theEnd;
}  
 
DbMessage::DbMessage(const Message& msg) : _msg(msg),  
_nsStart(NULL), _mark(NULL), _nsLen(0) {  
  //一个MongoDB报文(header+body)数据的结束地址  
  _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  
  //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整MongoDB报文  
  _nextjsobj = _msg.singleData().data();  
  ......  
}

DbMessage.msg成员为DbMessage 类型,DbMessage的nsStart和_theEnd成员分别记录完整MongoDB报文的起始地址和结束地址,通过这两个指针就可以获取一个完整MongoDB报文的全部内容,包括header和body。

注意:DbMessage是早期MongoDB版本(version<3.6)中用于报文body解析封装的类,这些类针对opCode=[dbUpdate, dbDelete]这个区间的操作。在MongoDB新版本(version>=3.6)中,body解析及封装由op_msg.h和op_msg.cpp代码文件中的clase OpMsgRequest{}完成处理。

4.4 OpMsg报文解析封装核心代码实现

MongoDB从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG对应MongoDB报文body解析封装处理由OpMsg类相关接口完成,OpMsg::parse(Message)从Message中解析出报文body内容,其核心代码实现如下:

struct OpMsg {  
    ......  
  //msg解析赋值见OpMsg::parse    
  //各种命令(insert update find等)都存放在该body中  
  BSONObj body;    
  //sequences用法暂时没看懂,感觉没什么用?先跳过  
  std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse  
}  
//从message中解析出OpMsg信息  
OpMsg OpMsg::parse(const Message& message) try {  
  //message不能为空,并且opCode必须为dbMsg  
  invariant(!message.empty());  
  invariant(message.operation() == dbMsg);  
  //获取flagBits  
  const uint32_t flags = OpMsg::flags(message);  
  //flagBits有效性检查,bit 0-15中只能对第0和第1位操作  
  uassert(ErrorCodes::IllegalOpMsgFlag,  
          str::stream() << "Message contains illegal flags value: Ob"  
                        << std::bitset<32>(flags).to_string(),  
          !containsUnknownRequiredFlags(flags));  
 
  //校验码默认4字节  
  constexpr int kCrc32Size = 4;  
  //判断该mongo报文body内容是否启用了校验功能  
  const bool haveChecksum = flags & kChecksumPresent;  
  //如果有启用校验功能,则报文末尾4字节为校验码  
  const int checksumSize = haveChecksum ? kCrc32Size : 0;  
  //sections字段内容  
  BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  
                        message.dataSize() - sizeof(flags) - checksumSize);  
 
  //默认先设置位false  
  bool haveBody = false;  
  OpMsg msg;  
  //解析sections对应命令请求数据  
  while (!sectionsBuf.atEof()) {  
      //BufReader::read读取kind内容,一个字节  
      const auto sectionKind = sectionsBuf.read<Section>();  
      //kind为0对应命令请求body内容,内容通过bson报错  
      switch (sectionKind) {  
          //sections第一个字节是0说明是body  
          case Section::kBody: {  
              //默认只能有一个body  
              uassert(40430, "Multiple body sections in message", !haveBody);  
              haveBody = true;  
              //命令请求的bson信息保存在这里  
              msg.body = sectionsBuf.read<Validated<BSONObj>>();  
              break;  
          }  
 
          //DocSequence暂时没看明白,用到的地方很少,跳过,后续等  
          //该系列文章主流功能分析完成后,从头再回首分析  
          case Section::kDocSequence: {  
                ......  
          }  
      }  
  }  
  //OP_MSG必须有body内容  
  uassert(40587, "OP_MSG messages must have a body", haveBody);  
  //body和sequence去重判断  
  for (const auto& docSeq : msg.sequences) {  
      ......  
  }  
  return msg;  
}

OpMsg类被OpMsgRequest类继承,OpMsgRequest类中核心接口就是解析出OpMsg.body中的库信息和表信息,OpMsgRequest类代码实现如下:

//协议解析得时候会用到,见runCommands  
struct OpMsgRequest : public OpMsg {  
  ......  
  //构造初始化  
  explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  
  //opMsgRequestFromAnyProtocol->OpMsgRequest::parse  
  //从message中解析出OpMsg所需成员信息  
  static OpMsgRequest parse(const Message& message) {  
      //OpMsg::parse  
      return OpMsgRequest(OpMsg::parse(message));  
  }  
  //根据db body extraFields填充OpMsgRequest  
  static OpMsgRequest fromDBAndBody(... {  
      OpMsgRequest request;  
      request.body = ([&] {  
          //填充request.body  
          ......  
      }());  
      return request;  
  }  
  //从body中获取db name  
  StringData getDatabase() const {  
      if (auto elem = body["$db"])  
          return elem.checkAndGetStringData();  
      uasserted(40571, "OP_MSG requests require a $db argument");  
  }  
  //find insert 等命令信息 body中的第一个elem就是command 名  
  StringData getCommandName() const {  
      return body.firstElementFieldName();  
  }  
};

OpMsgRequest通过OpMsg::parse(message)解析出OpMsg信息,从而获取到body内容,GetCommandName()接口和getDatabase()则分别从body中获取库DB信息、命令名信息。通过该类相关接口,命令名(find、write、update等)和DB库都获取到了。

OpMsg模块除了OP_MSG相关报文解析外,还负责OP_MSG报文组装填充,该模块接口功能大全如下表:

 类名  接口名  功能说明
 containsUnknownRequiredFlags  OP_MSG报文体flags检查
          OpMsg  OpMsg::flags(…)  获取message中的flag
 OpMsg::replaceFlags(…)  设置message中的flags
 OpMsg::parse(…)  从message中解析出OpMsg成员信息
 OpMsg::serialize()  OpMsg序列化
 OpMsg::shareOwnershipWith(…)  共享buffer设置
             OpMsgBuilder  OpMsgBuilder::beginDocSequence(…)  填充kDocSequence类型的name数据
 OpMsgBuilder::finishDocumentStream(…)  完成流式处理
 OpMsgBuilder::beginBody()  填充kBody类型数据
 OpMsgBuilder::resumeBody()  获取body数据
 OpMsgBuilder::finish()  构造message数据

5. Mongod实例服务入口核心代码实现

Mongod实例服务入口类ServiceEntryPointMongod继承ServiceEntryPointImpl类,mongod实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod核心接口可以细分为:opCode解析及回调处理、命令解析及查找、命令执行三个子模块。

5.1 opCode解析及回调处理

OpCode操作码解析及其回调处理由ServiceEntryPointMongod::handleRequest(…)接口实现,核心代码实现如下:

//mongod服务对于客户端请求的处理    
//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage  
DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  
  //获取opCode,3.6版本对应客户端默认使用OP_MSG  
  NetworkOp op = m.operation();  
  ......  
  //根据message构造DbMessage  
  DbMessage dbmsg(m);  
  //根据操作上下文获取对应的client  
  Client& c = *opCtx->getClient();    
  ......  
  //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息  
  const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  
  const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  
  ....  
  //CurOp::debug 初始化opDebug,慢日志相关记录  
  OpDebug& debug = currentOp.debug();  
  //慢日志阀值  
  long long logThresholdMs = serverGlobalParams.slowMS;  
  //时MongoDB将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作    
  bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  
  DbResponse dbresponse;  
  if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  
      //新版本op=dbMsg,因此走这里  
      //从DB获取数据,获取到的数据通过dbresponse返回  
      dbresponse = runCommands(opCtx, m);    
  } else if (op == dbQuery) {  
      ......  
      //早期MongoDB版本查询走这里  
      dbresponse = receivedQuery(opCtx, nsString, c, m);  
  } else if (op == dbGetMore) {    
      //早期MongoDB版本查询走这里  
      dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  
  } else {  
      ......  
      //早期版本增 删 改走这里处理  
        if (op == dbInsert) {  
            receivedInsert(opCtx, nsString, m); //插入操作入口   新版本CmdInsert::runImpl  
        } else if (op == dbUpdate) {  
            receivedUpdate(opCtx, nsString, m); //更新操作入口    
        } else if (op == dbDelete) {  
            receivedDelete(opCtx, nsString, m); //删除操作入口    
        }  
  }  
  //获取runCommands执行时间,也就是内部处理时间  
  debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  
  ......  
  //慢日志记录  
  if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  
      Locker::LockerInfo lockerInfo;    
      //OperationContext::lockState LockerImpl<>::getLockerInfo  
      opCtx->lockState()->getLockerInfo(&lockerInfo);  
 
  //OpDebug::report 记录慢日志到日志文件  
      log() << debug.report(&c, currentOp, lockerInfo.stats);  
  }  
  //各种统计信息  
  recordCurOpMetrics(opCtx);  
}

Mongod的handleRequest()接口主要完成以下工作:

① 从Message中获取OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;MongoDB 3.6开始,默认请求对应OpCode都是OP_MSG,本文默认只分析OpCode=OP_MSG相关的处理。

② 获取本操作对应的Client客户端信息。

③ 如果是早期版本,通过Message构造DbMessage,同时解析出库.表信息。

④ 根据不同OpCode执行对应回调操作,OP_MSG对应操作为runCommands(…),获取的数据通过dbresponse返回。

⑤ 获取到db层返回的数据后,进行慢日志判断,如果db层数据访问超过阀值,记录慢日志。

⑥ 设置debug的各种统计信息。

5.2 命令解析及查找

从上面的分析可以看出,接口最后调用runCommands(…),该接口核心代码实现如下所示:

//message解析出对应command执行  
DbResponse runCommands(OperationContext* opCtx, const Message& message) {  
  //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder  
  //应答数据通过该类构造  
  auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  
  [&] {  
      OpMsgRequest request;  
      try { // Parse.  
          //协议解析 根据message获取对应OpMsgRequest  
          request = rpc::opMsgRequestFromAnyProtocol(message);  
      }  
  }  
  try { // Execute.  
      //opCtx初始化  
      curOpCommandSetup(opCtx, request);  
      //command初始化为Null  
      Command* c = nullptr;  
      //OpMsgRequest::getCommandName查找  
      if (!(c = Command::findCommand(request.getCommandName()))) {  
            //没有找到相应的command的后续异常处理  
            ......  
      }  
      //执行command命令,获取到的数据通过replyBuilder.get()返回  
      execCommandDatabase(opCtx, c, request, replyBuilder.get());  
  }  
  //OpMsgReplyBuilder::done对数据进行序列化操作  
  auto response = replyBuilder->done();  
  //responseLength赋值  
  CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  
  // 返回  
  return DbResponse{std::move(response)};  
}  

RunCommands(…)接口从message中解析出OpMsg信息,然后获取该OpMsg对应的command命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

① 获取该OpCode对应replyBuilder,OP_MSG操作对应builder为OpMsgReplyBuilder。

② 根据message解析出OpMsgRequest数据,OpMsgRequest来中包含了真正的命令请求bson信息。

③ opCtx初始化操作。

④ 通过request.getCommandName()返回命令信息(如“find”、“update”等字符串)。

⑤ 通过Command::findCommand(command name)从CommandMap这个map表中查找是否支持该command命令。如果没找到说明不支持,如果找到说明支持。

⑥ 调用execCommandDatabase(…)执行该命令,并获取命令的执行结果。

⑦ 根据command执行结果构造response并返回

5.3 命令执行

void execCommandDatabase(...) {  
  ......  
  //获取dbname  
  const auto dbname = request.getDatabase().toString();  
  ......  
  //mab表存放从bson中解析出的elem信息  
  StringMap<int> topLevelFields;  
  //body elem解析  
  for (auto&& element : request.body) {  
      //获取bson中的elem信息  
      StringData fieldName = element.fieldNameStringData();  
      //如果elem信息重复,则异常处理  
      ......  
  }  
  //如果是help命令,则给出help提示  
  if (Command::isHelpRequest(helpField)) {  
      //给出help提示  
      Command::generateHelpResponse(opCtx, replyBuilder, *command);  
      return;  
  }  
  //权限认证检查,检查该命令执行权限  
  uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  
  ......  
 
  //该命令执行次数统计 db.serverStatus().metrics.commands可以获取统计信息  
  command->incrementCommandsExecuted();  
  //真正的命令执行在这里面  
  retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  
  //该命令执行失败次数统计  
  if (!retval) {  
      command->incrementCommandsFailed();  
    }  
    ......  
}

execCommandDatabase(…)最终调用RunCommandImpl(…)进行对应命令的真正处理,该接口核心代码实现如下:

bool runCommandImpl(...) {  
  //获取命令请求内容body  
  BSONObj cmd = request.body;  
  //获取请求中的DB库信息  
  const std::string db = request.getDatabase().toString();  
  //ReadConcern检查  
  Status rcStatus = waitForReadConcern(  
      opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  
  //ReadConcern检查不通过,直接异常提示处理  
  if (!rcStatus.isOK()) {  
        //异常处理  
        return;  
  }  
  if (!command->supportsWriteConcern(cmd)) {  
      //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持  
      if (commandSpecifiesWriteConcern(cmd)) {  
          //异常处理"Command does not support writeConcern"  
          ......  
          return result;  
      }  
  //调用Command::publicRun执行不同命令操作  
      result = command->publicRun(opCtx, request, inPlaceReplyBob);  
  }  
  //提取WriteConcernOptions信息  
  auto wcResult = extractWriteConcern(opCtx, cmd, db);  
  //提取异常,直接异常处理  
  if (!wcResult.isOK()) {  
      //异常处理  
      ......  
      return result;  
  }  
  ......  
  //执行对应的命令Command::publicRun,执行不同命令操作  
  result = command->publicRun(opCtx, request, inPlaceReplyBob);  
  ......  
}

RunCommandImpl(…)接口最终调用该接口入参的command,执行 command->publicRun(…)接口,也就是命令模块的公共publicRun。

5.4 总结

Mongod服务入口首先从message中解析出opCode操作码,3.6版本对应客户端默认操作码为OP_MSQ,解析出该操作对应OpMsgRequest信息。然后从message原始数据中解析出command命令字符串后,继续通过全局Map表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

6.  Mongos实例服务入口核心代码实现

mongos服务入口核心代码实现过程和mongod服务入口代码实现流程几乎相同,mongos实例message解析、OP_MSG操作码处理、command命令查找等流程和上一章节mongod实例处理过程类似,本章节不在详细分析。Mongos实例服务入口处理调用流程如下:

ServiceEntryPointMongos::handleRequest(…)->Strategy::clientCommand(…)–>runCommand(…)->execCommandClient(…)

最后的接口核心代码实现如下:

void runCommand(...) {  
  ......  
  //获取请求命令name  
  auto const commandName = request.getCommandName();  
  //从全局map表中查找  
  auto const command = Command::findCommand(commandName);  
  //没有对应的command存在,抛异常说明不支持该命令  
  if (!command) {  
      ......  
      return;  
  }  
  ......  
  //执行命令  
  execCommandClient(opCtx, command, request, builder);  
  ......  
}  

void execCommandClient(...)  
{  
  ......  
  //认证检查,是否有操作该command命令的权限,没有则异常提示  
  Status status = Command::checkAuthorization(c, opCtx, request);    
  if (!status.isOK()) {  
      Command::appendCommandStatus(result, status);  
      return;  
  }  
  //该命令的执行次数自增,代理上面也是要计数的  
  c->incrementCommandsExecuted();  
  //如果需要command统计,则加1  
  if (c->shouldAffectCommandCounter()) {  
      globalOpCounters.gotCommand();  
  }  
  ......  
  //有部分命令不支持writeconcern配置,报错  
  bool supportsWriteConcern = c->supportsWriteConcern(request.body);  
  //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"  
  if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  
      ......  
      return;  
  }  
  //执行本命令对应的公共publicRun接口,Command::publicRun  
  ok = c->publicRun(opCtx, request, result);  
  ......  
}

Tips: mongos和mongod实例服务入口核心代码实现的一点小区别

① Mongod实例opCode操作码解析、OpMsg解析、command查找及对应命令调用处理都由class ServiceEntryPointMongod{…}类一起完成。

② mongos实例则把opCode操作码解析交由class ServiceEntryPointMongos{…}类实现,OpMsg解析、command查找及对应命令调用处理放到了clase Strategy{…}类来处理。

7. 总结

MongoDB报文解析及组装流程总结:

① 一个完整MongoDB报文由通用报文header头部+body部分组成。

② Body部分内容,根据报文头部的opCode来决定不同的body内容。

③ 3.6版本对应客户端请求opCode默认为OP_MSG,该操作码对应body部分由flagBits + sections + checksum组成,其中sections 中存放的是真正的命令请求信息,已bson数据格式保存。

④ Header头部和body报文体封装及解析过程由class Message {…}类实现

⑤ Body中对应command命令名、库名、表名的解析在MongoDB(version<3.6)低版本协议中由class DbMessage {…}类实现

⑥ Body中对应command命令名、库名、表名的解析在MongoDB(version<3.6)低版

本协议中由struct OpMsgRequest{…}结构和struct OpMsg {…}类实现。

Mongos和mongod实例的服务入口处理流程大同小异,整体处理流程如下:

① 从message解析出opCode操作码,根据不同操作码执行对应操作码回调。

② 根据message解析出OpMsg request信息,MongoDB报文的命令信息就存储在该body中,该body已bson格式存储。

③ 从body中解析出command命令字符串信息(如“insert”、“update”等)。

④ 从全局_commands MAP表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。

⑤ 最终找到对应command命令后,执行command的功能run接口。

说明:第3章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析command命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

Tips: 下期继续分享不同command命令执行细节。

8. 遗留问题

第1章节中的统计信息,将在command模块核心代码分析完毕后揭晓答案,《MongoDB command命令处理模块源码实现二》中继续分析,敬请关注。

更多文章:

常用高并发网络线程模型设计及MongoDB线程模型优化实践
MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计

OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

盘点 2020 | 我要为分布式数据库 MongoDB 在国内影响力提升及推广做点事

MongoDB网络传输层模块源码实现二

MongoDB网络传输层模块源码实现三

MongoDB网络传输层模块源码实现四

作者:杨亚洲

前滴滴出行技术专家,现任OPPO文档数据库MongoDB负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库MongoDB内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:

https://github.com/y123456yz

 

 

 

 

MongoDB中文手册翻译正在进行中,欢迎更多朋友在自己的空闲时间学习并进行文档翻译,您的翻译将由社区专家进行审阅,并拥有署名权更新到中文用户手册和发布到社区微信内容平台。

更多问题可以添加社区助理小芒果微信(mongoingcom)咨询,进入社区微信交流群请备注“mongo”。

赞(17)
未经允许不得转载:MongoDB中文社区 » MongoDB Command命令处理模块源码实现一

评论 2

评论前必须登录!

 

  1. #1

    图2太模糊了,能否换一张清晰的大图?谢谢。

    julien3年前 (2021-06-09)
  2. #2

    "该seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现"
    这里貌似有笔误,应该是ServiceEntryPointMongos 吧?

    julien3年前 (2021-06-09)