MongoDB爱好者
垂直技术交流平台

MongoDB网络传输层模块源码实现二

本文为《MongoDB内核源码实现、性能调优、最佳运维实践系列》模块一:MongoDB网络传输层模块源码 第二篇,您可点击此处查看第一篇:MongoDB网络传输模块源码实现及性能调优实践-体验内核性能极致设计

1. 说明


在之前的<<MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计>>一文中分析了如何阅读百万级大工程源码、Asio网络库实现、transport传输层网络模块中线程模型实现,但是由于篇幅原因,传输层网络模块中的以下模块实现原理没有分析,本文将继续分析遗留的以下子模块:

 

  • transport_layer套接字处理及传输层管理子模块
  • session会话子模块
  • Ticket数据收发子模块
  • service_entry_point服务入口点子模块
  • service_state_machine状态机子模块(该《模块在网络传输层模块源码实现三》中分析)
  • service_executor线程模型子模块(该《模块在网络传输层模块源码实现四》中分析)

2. transport_layer

套接字处理及传输层管理子模块


transport_layer套接字处理及传输层管理子模块功能包括套接字相关初始化处理、结合asio库实现异步accept处理、不同线程模型管理及初始化等,该模块的源码实现主要由以下几个文件实现:

 

 

上图是套接字处理及传输层管理子模块源码实现的相关文件,其中mock和test文件主要用于模拟测试等,所以真正核心的代码实现只有下表的几个文件,对应源码文件功能说明如下表所示:

 

文件名 功能
transport_layer.h

transport_layer.cpp

该子模块基类,通过该类把本模块和Ticket数据分发模块衔接起来,具体类实现在transport_layer_legacy.cpp和transport_layer_asio.cpp中
transport_layer_legacy.h

transport_layer_legacy.cpp

早期的传输模块实现方式,现在已淘汰,不在分析
transport_layer_asio.h

transport_layer_asio.cpp

传输模块Asio网络IO处理实现,同时衔接ServiceEntryPoint服务入口模块和Ticket数据分发模块

 

2.1 核心代码实现 

该子模块核心代码主要由TransportLayerManager类和TransportLayerASIO类相关接口实现。

2.1.1 TransportLayerManager类核心代码实现

TransportLayerManager类主要成员及接口如下:

//网络会话链接,消息处理管理相关的类,在createWithConfig构造该类存入_tls  
class TransportLayerManager final : public TransportLayer {  
    //以下四个接口真正实现在TransportLayerASIO类中具体实现  
    Ticket sourceMessage(...) override;  
    Ticket sinkMessage(...) override;      
    Status wait(Ticket&& ticket) override;  
    void asyncWait(...) override;  
    //配置初始化实现  
    std::unique_ptr<TransportLayer> createWithConfig(...);  
  
    //createWithConfig中赋值,对应TransportLayerASIO,  
    //实际上容器中就一个成员,就是TransportLayerASIO  
    std::vector<std::unique_ptr<TransportLayer>> _tls;  
};

TransportLayerManager类包含一个_tls成员,该类最核心的createWithConfig接口代码实现如下:

//根据配置构造相应类信息  _initAndListen中调用  
std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(...) {  
    std::unique_ptr<TransportLayer> transportLayer;  
    //服务类型,也就是本实例是mongos还是mongod  
    //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
    auto sep = ctx->getServiceEntryPoint();  
    //net.transportLayer配置模式,默认asio, legacy模式已淘汰  
    if (config->transportLayer == "asio") {  
         //同步方式还是异步方式,默认synchronous  
        if (config->serviceExecutor == "adaptive") {  
            //动态线程池模型,也就是异步模式  
            opts.transportMode = transport::Mode::kAsynchronous;  
        } else if (config->serviceExecutor == "synchronous") {  
            //一个链接一个线程模型,也就是同步模式  
            opts.transportMode = transport::Mode::kSynchronous;  
        }   
        //如果配置是asio,构造TransportLayerASIO类  
        auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);  
        if (config->serviceExecutor == "adaptive") { //异步方式  
             //构造动态线程模型对应的执行器ServiceExecutorAdaptive  
            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>(  
                ctx, transportLayerASIO->getIOContext()));  
         } else if (config->serviceExecutor == "synchronous") { //同步方式  
            //构造一个链接一个线程模型对应的执行器ServiceExecutorSynchronous  
            ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));  
         }  
         //transportLayerASIO转换为transportLayer类  
         transportLayer = std::move(transportLayerASIO);  
    }   
   //transportLayer转存到对应retVector数组中并返回  
    std::vector<std::unique_ptr<TransportLayer>> retVector;  
    retVector.emplace_back(std::move(transportLayer));  
    return stdx::make_unique<TransportLayerManager>(std::move(retVector));  
}  

createWithConfig函数根据配置文件来确定对应的TransportLayer,如果net.transportLayer配置为”asio”,则选用TransportLayerASIO类来进行底层的网络IO处理,如果配置为”legacy”,则选用TransportLayerLegacy。”legacy”模式当前已淘汰,本文只分析”asio”模式实现。

 

“asio”模式包含两种线程模型:adaptive(动态线程模型)和synchronous(同步线程模型)。adaptive模式线程设计采用动态线程方式,线程数和MongoDB压力直接相关,如果MongoDB压力大,则线程数增加;如果MongoDB压力变小,则线程数自动减少。同步线程模式也就是一个链接一个线程模型,线程数的多少和链接数的多少成正比,链接数越多则线程数也越大。

 

MongoDB内核实现中通过opts.transportMode来标记asio的线程模型,这两种模型对应标记如下:

 

线程模型 内核transportMode标记 说明 对应线程模型由那个类实现
adaptive KAsynchronous(异步) adaptive模型也可以称为异步模型 ServiceExecutorAdaptive
synchronous KSynchronous(同步) synchronous模型也可以称为同步模型 ServiceExecutorSynchronous

 

说明:adaptive线程模型被标记为KAsynchronous,synchronous被标记为KSynchronous是有原因的,adaptive动态线程模型网络IO处理借助epoll异步实现,而synchronous一个链接一个线程模型网络IO处理是同步读写操作。MongoDB网络线程模型具体实现及各种优缺点可以参考:MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计

2.1.2 TransportLayerASIO类核心代码实现

TransportLayerASIO类核心成员及接口如下:

class TransportLayerASIO final : public TransportLayer {  
    //以下四个接口主要和套接字数据读写相关  
    Ticket sourceMessage(...);  
    Ticket sinkMessage(...);  
    Status wait(Ticket&& ticket);  
    void asyncWait(Ticket&& ticket, TicketCallback callback);  
    void end(const SessionHandle& session);  
    //新链接处理  
    void _acceptConnection(GenericAcceptor& acceptor);  
      
    //adaptive线程模型网络IO上下文处理  
    std::shared_ptr<asio::io_context> _workerIOContext;   
    //accept接收客户端链接对应的IO上下文  
    std::unique_ptr<asio::io_context> _acceptorIOContext;    
    //bindIp配置中的ip地址列表,用于bind监听,accept客户端请求  
    std::vector<std::pair<SockAddr, GenericAcceptor>> _acceptors;  
    //listener线程负责接收客户端新链接  
    stdx::thread _listenerThread;  
    //服务类型,也就是本实例是mongos还是mongod  
    //mongos对应ServiceEntryPointMongod,mongod对应ServiceEntryPointMongos  
    ServiceEntryPoint* const _sep = nullptr;  
    //当前运行状态  
    AtomicWord<bool> _running{false};  
    //listener处理相关的配置信息  
    Options _listenerOptions;  
}

 

从上面的类结构可以看出,该类主要通过listenerThread线程完成bind绑定及listen监听操作,同时部分接口实现新连接上的数据读写。

 

套接字初始化代码实现如下:

Status TransportLayerASIO::setup() {  
    std::vector<std::string> listenAddrs;  
    //如果没有配置bindIp,则默认监听"127.0.0.1:27017"
    if (_listenerOptions.ipList.empty()) {  
        listenAddrs = {"127.0.0.1"};  
    } else {  
        //配置文件中的bindIp:1.1.1.1,2.2.2.2,以逗号分隔符获取ip列表存入ipList  
        boost::split(listenAddrs, _listenerOptions.ipList, boost::is_any_of(","), boost::token_compress_on);  
    }  
    //遍历ip地址列表  
    for (auto& ip : listenAddrs) {  
        //根据IP和端口构造对应SockAddr结构  
        const auto addrs = SockAddr::createAll(  
            ip, _listenerOptions.port, _listenerOptions.enableIPv6 ? AF_UNSPEC : AF_INET);  
        ......  
        //根据addr构造endpoint  
        asio::generic::stream_protocol::endpoint endpoint(addr.raw(), addr.addressSize);  
        //_acceptorIOContext和_acceptors关联  
        GenericAcceptor acceptor(*_acceptorIOContext);  
        //epoll注册,也就是fd和epoll关联  
        //basic_socket_acceptor::open  
        acceptor.open(endpoint.protocol());   
         //SO_REUSEADDR配置 basic_socket_acceptor::set_option  
        acceptor.set_option(GenericAcceptor::reuse_address(true));  
        //非阻塞设置 basic_socket_acceptor::non_blocking  
        acceptor.non_blocking(true, ec);    
        //bind绑定    
        acceptor.bind(endpoint, ec);   
        if (ec) {  
            return errorCodeToStatus(ec);  
        }  
    }  
}

从上面的分析可以看出,代码实现首先解析出配置文件中bindIP中的ip:port列表,然后遍历列表绑定所有服务端需要监听的ip:port,每个ip:port对应一个GenericAcceptor ,所有acceptor和全局accept IO上下文_acceptorIOContext关联,同时bind()绑定所有ip:port。

 

Bind()绑定所有配置文件中的Ip:port后,然后通过TransportLayerASIO::start()完成后续处理,该接口代码实现如下:

//_initAndListen中调用执行   
Status TransportLayerASIO::start() { //listen线程处理  
    ......  
    //这里专门起一个线程做listen相关的accept事件处理  
    _listenerThread = stdx::thread([this] {  
        //修改线程名  
        setThreadName("listener");   
        //该函数中循环处理accept事件  
        while (_running.load()) {  
            asio::io_context::work work(*_acceptorIOContext);   
            try {  
                //accept事件调度处理  
                 _acceptorIOContext->run();    
            } catch (...) { //异常处理  
                severe() << "Uncaught exception in the listener: " << exceptionToStatus();  
                fassertFailed(40491);  
            }  
        }  
    });   
   遍历_acceptors,进行listen监听处理  
   for (auto& acceptor : _acceptors) {   
        acceptor.second.listen(serverGlobalParams.listenBacklog);  
        //异步accept回调注册在该函数中  
        _acceptConnection(acceptor.second);       
    }  
}

从上面的TransportLayerASIO::start()接口可以看出,MongoDB特地创建了一个listener线程用于客户端accept事件处理,然后借助ASIO网络库的_acceptorIOContext->run()接口来调度,当有新链接到来的时候,就会执行相应的accept回调处理,accept回调注册到io_context的流程由acceptConnection()完成,该接口核心源码实现如下:

//accept新连接到来的回调注册 
void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {  
      //新链接到来时候的回调函数,服务端接收到新连接都会执行该回调
    //注意这里面是递归执行,保证所有accept事件都会一次处理完毕
    auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {  
        if (!_running.load())  
            return;  
  
        ......  
        //每个新的链接都会new一个新的ASIOSession  
        std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));  
        //新的链接处理ServiceEntryPointImpl::startSession,  
        //和ServiceEntryPointImpl服务入口点模块关联起来  
        _sep->startSession(std::move(session));  
        //递归,直到处理完所有的网络accept事件  
        _acceptConnection(acceptor);   
    };  
    //accept新连接到来后服务端的回调处理在这里注册  
    acceptor.async_accept(*_workerIOContext, std::move(acceptCb));  
}

TransportLayerASIO::_acceptConnection的新连接处理过程借助ASIO库实现,通过acceptor.async_accept实现所有监听的acceptor回调异步注册。

 

当服务端接收到客户端新连接事件通知后,会触发执行acceptCb()回调,该回调中底层ASIO库通过epoll_wait获取到所有的accept事件,每获取到一个accept事件就代表一个新的客户端链接,然后调用ServiceEntryPointImpl::startSession()接口处理这个新的链接事件,整个过程递归执行,保证一次可以处理所有的客户端accept请求信息。

 

每个链接都会构造一个唯一的session信息,该session就代表一个唯一的新连接,链接和session一一对应。此外,最终会调用ServiceEntryPointImpl::startSession()进行真正的accept()处理,从而获取到一个新的链接。

 

注意:TransportLayerASIO::_acceptConnection()中实现了TransportLayerASIO类和ServiceEntryPointImpl类的关联,这两个类在该接口实现了关联。

 

此外,从前面的TransportLayerASIO类结构中可以看出,该类还包含如下四个接口:sourceMessage(…)、sinkMessage(…)、wait(Ticket&& ticket)、asyncWait(Ticket&&ticket, TicketCallback callback),这四个接口入参都和Ticket数据分发子模块相关联,具体核心代码实现如下:

//根据asioSession, expiration, message三个信息构造数据接收类ASIOSourceTicket  
Ticket TransportLayerASIO::sourceMessage(...) {  
    ......  
    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
    //根据asioSession, expiration, message三个信息构造ASIOSourceTicket  
    auto ticket = stdx::make_unique<ASIOSourceTicket>(asioSession, expiration, message);  
    return {this, std::move(ticket)};  
}  
  
//根据asioSession, expiration, message三个信息构造数据发送类ASIOSinkTicket  
Ticket TransportLayerASIO::sinkMessage(...) {  
    auto asioSession = checked_pointer_cast<ASIOSession>(session);  
    auto ticket = stdx::make_unique<ASIOSinkTicket>(asioSession, expiration, message);  
    return {this, std::move(ticket)};  
}  
  
//同步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
Status TransportLayerASIO::wait(Ticket&& ticket) {  
    //获取对应Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
    auto ownedASIOTicket = getOwnedTicketImpl(std::move(ticket));  
    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
    ......  
    //调用对应fill接口 同步接收ASIOSourceTicket::fill 或者 同步发送ASIOSinkTicket::fill  
    asioTicket->fill(true, [&waitStatus](Status result) { waitStatus = result; });  
    return waitStatus;  
}  
//异步接收或者发送,最终调用ASIOSourceTicket::fill 或者 ASIOSinkTicket::fill  
void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) {  
    //获取对应数据收发的Ticket,接收对应ASIOSourceTicket,发送对应ASIOSinkTicket  
    auto ownedASIOTicket = std::shared_ptr<TicketImpl>(getOwnedTicketImpl(std::move(ticket)));  
    auto asioTicket = checked_cast<ASIOTicket*>(ownedASIOTicket.get());  
  
   //调用对应ASIOTicket::fill  
    asioTicket->fill(  
        false,   [ callback = std::move(callback),  
        ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); });  
}

上面四个接口中的前两个接口主要通过Session, expiration, message这三个参数来获取对应的Ticket 信息,实际上MongoDB内核实现中把接收数据的Ticket和发送数据的Ticket分别用不同的继承类ASIOSourceTicket和ASIOSinkTicket来区分,三个参数的作用如下表所示:

 

参数名 作用
Session 代表一个链接,一个session和一个链接意义对应
expiration 数据收发超时相关设置
message 数据内容

 

数据收发包括同步收发和异步收发,同步收发通过TransportLayerASIO::wait()实现,异步收发通过TransportLayerASIO::asyncWait()实现。

 

注意:以上四个接口把TransportLayerASIO类和Ticket 数据收发类的关联。

2.2 总结 

transport_layer套接字处理及传输层管理子模块主要由transport_layer_manager和transport_layer_asio两个核心类组成,这两个类的核心接口功能总结如下表所示:

 

类命 接口名 功能说明
 

 

transport_layer_manager

TransportLayerManager::createWithConfig() 根据配置文件选择不同的TransportLayer和serviceExecutor
TransportLayerManager::setup() 已废弃
TransportLayerManager::start() 已废弃
 

 

 

 

 

 

 

TransportLayerASIO

TransportLayerASIO::Options::Options() Net相关配置
TransportLayerASIO::TransportLayerASIO() 初始化构造
TransportLayerASIO::sourceMessage() 获取数据接收的Ticket
TransportLayerASIO::sinkMessage() 获取数据发送的Ticket
TransportLayerASIO::wait() 同步发送接收或者发送数据
TransportLayerASIO::asyncWait() 异步方式发送接收或者发送数据
TransportLayerASIO::end() 关闭链接
TransportLayerASIO::setup() 创建套接字并bind绑定
TransportLayerASIO::start() 创建listener线程做监听操作,同时注册accept回调
TransportLayerASIO::shutdown() shutdown处理及资源回收
TransportLayerASIO::_acceptConnection() Accept回调注册

 

Transport_layer_manager中初始化TransportLayer和serviceExecutor,net.TransportLayer配置可以为legacy和asio,其中legacy已经淘汰,当前内核只支持asio模式。asio配置对应的TransportLayer由TransportLayerASIO实现,对应的serviceExecutor线程模型可以是adaptive动态线程模型,也可以是synchronous同步线程模型。

 

套接字创建、bind()绑定、listen()监听、accept事件注册等都由本类实现,同时数据分发Ticket模块也与本模块关联,一起配合完成整个后续Ticket模块模块的同步及异步数据读写流程。此外,本模块还通过ServiceEntryPoint服务入口子模块联动,保证了套接字初始化、accept事件注册完成后,服务入口子模块能有序的进行新连接接收处理。

 

接下来继续分析本模块相关联的ServiceEntryPoint服务入口子模块和Ticket数据分发子模块实现。

3. service_entry_point

服务入口点子模块


service_entry_point服务入口点子模块主要负责如下功能:新连接处理、Session会话管理、接收到一个完整报文后的回调处理(含报文解析、认证、引擎层处理等)。

 

该模块的源码实现主要包含以下几个文件:

service_entry_point开头的代码文件都和本模块相关,其中service_entry_point_utils*负责工作线程创建,service_entry_point_impl*完成新链接回调处理及sesseion会话管理。

3.1 核心源码实现  

服务入口子模块相关代码实现比较简洁,主要由ServiceEntryPointImpl类和service_entry_point_utils中的线程创建函数组成。

3.1.1 ServiceEntryPointlmpl类核心代码实现 

ServiceEntryPointImpl类主要成员和接口如下:

class ServiceEntryPointImpl : public ServiceEntryPoint {  
    MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);  
public:  
    //构造函数  
    explicit ServiceEntryPointImpl(ServiceContext* svcCtx);     
    //以下三个接口进行session会话处理控制  
    void startSession(transport::SessionHandle session) final;  
    void endAllSessions(transport::Session::TagMask tags) final;  
    bool shutdown(Milliseconds timeout) final;  
    //session会话统计  
    Stats sessionStats() const final;  
    ......  
private:  
    //该list结构管理所有的ServiceStateMachine信息  
    using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;  
    //SSMList对应的迭代器  
    using SSMListIterator = SSMList::iterator;  
    //赋值ServiceEntryPointImpl::ServiceEntryPointImpl  
    //对应ServiceContextMongoD(mongod)或者ServiceContextNoop(mongos)类  
    ServiceContext* const _svcCtx;   
    //该成员变量在代码中没有使用  
    AtomicWord<std::size_t> _nWorkers;  
    //锁  
    mutable stdx::mutex _sessionsMutex;  
    //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
    SSMList _sessions;  
    //最大链接数控制  
    size_t _maxNumConnections{DEFAULT_MAX_CONN};  
    //当前的总链接数,不包括关闭的链接  
    AtomicWord<size_t> _currentConnections{0};  
    //所有的链接,包括已经关闭的链接  
    AtomicWord<size_t> _createdConnections{0};  
};

该类的几个接口主要是session相关控制处理,该类中的变量成员说明如下:

 

成员名 功能说明
_svcCtx 服务上下文,mongod实例对应ServiceContextMongoD类,mongos代理实例对应ServiceContextNoop类
_sessionsMutex _sessions锁保护
_sessions 一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中
_maxNumConnections 最大链接数,默认1000000,可以通过maxConns配置
_currentConnections 当前的在线链接数,不包括以前关闭的链接
_createdConnections 所有的链接,包括已经关闭的链接

 

ServiceEntryPointImpl类最核心的startSession()接口负责每个新连接到来后的内部回调处理,具体实现如下:

//新链接到来后的回调处理  
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {   
    //获取该新连接对应的服务端和客户端地址信息  
    const auto& remoteAddr = session->remote().sockAddr();  
    const auto& localAddr = session->local().sockAddr();  
    //服务端和客户端地址记录到session中  
    auto restrictionEnvironment =  stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr);  
    RestrictionEnvironment::set(session, std::move(restrictionEnvironment));  
    ......  
  
    //获取transportMode,kAsynchronous或者kSynchronous  
    auto transportMode = _svcCtx->getServiceExecutor()->transportMode();  
    //构造ssm  
    auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);  
    {//该{}体内实现链接计数,同时把ssm统一添加到_sessions列表管理  
        stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);  
        connectionCount = _sessions.size() + 1; //连接数自增  
        if (connectionCount <= _maxNumConnections) {  
            //新来的链接对应的session保存到_sessions链表    
            //一个新链接对应一个ssm保存到ServiceEntryPointImpl._sessions中  
            ssmIt = _sessions.emplace(_sessions.begin(), ssm);  
            _currentConnections.store(connectionCount);  
            _createdConnections.addAndFetch(1);  
        }  
    }  
    //链接超限,直接退出  
    if (connectionCount > _maxNumConnections) {   
        ......  
        return;  
    }  
    //链接关闭的回收处理  
    ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] {  
         ......  
    });  
    //获取transport模式为同步模式还是异步模式,也就是adaptive线程模式还是synchronous线程模式  
    auto ownership = ServiceStateMachine::Ownership::kOwned;  
    if (transportMode == transport::Mode::kSynchronous) {  
        ownership = ServiceStateMachine::Ownership::kStatic;  
    }  
    //ServiceStateMachine::start,这里和服务状态机模块衔接起来  
    ssm->start(ownership);  
}

该接口拿到该链接对应的服务端和客户端地址后,记录到该链接对应session中,然后根据该session、transportMode、_svcCtx构建一个服务状态机ssm(ServiceStateMachine)。一个新链接对应一个唯一session,一个session对应一个唯一的服务状态机ssm,这三者保持唯一的一对一关系。

最终,startSession()让服务入口子模块、session会话子模块、ssm状态机子模块关联起来。

3.1.2 service_entry_point_utils核心代码实现 

service_entry_point_utils源码文件只有launchServiceWorkerThread一个接口,该接口主要负责工作线程创建,并设置每个工作线程的线程栈大小,如果系统默认栈大于1M,则每个工作线程的线程栈大小设置为1M,如果系统栈大小小于1M,则以系统堆栈大小为准,同时warning打印提示。该函数实现如下:

Status launchServiceWorkerThread(stdx::function<void()> task) {  
        static const size_t kStackSize = 1024 * 1024;  //1M  
        struct rlimit limits;  
        //或者系统堆栈大小  
        invariant(getrlimit(RLIMIT_STACK, &limits) == 0);  
        //如果系统堆栈大小大于1M,则默认设置线程栈大小为1M  
        if (limits.rlim_cur > kStackSize) {  
            size_t stackSizeToSet = kStackSize;  
            int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet);  
            if (failed) {  
                const auto ewd = errnoWithDescription(failed);  
                warning() << "pthread_attr_setstacksize failed: " << ewd;  
            }  
        } else if (limits.rlim_cur < 1024 * 1024) {  
            //如果系统栈大小小于1M,则已系统堆栈为准,同时给出告警  
            warning() << "Stack size set to " << (limits.rlim_cur / 1024) << "KB. We suggest 1MB";  
        }}  
        ......  
        //task参数传递给新建线程  
        auto ctx = stdx::make_unique<stdx::function<void()>>(std::move(task));  
        int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());   
        ......  
}
3.2 总结 

service_entry_point服务入口点子模块主要负责新连接后的回调处理及工作线程创建,该模块和后续的session会话模块、SSM服务状态机模块衔接配合,完成数据收发的正常逻辑转换处理。上面的分析只列出了服务入口点子模块的核心接口实现,下表总结该模块所有的接口功能:

 

 

 

 

 

ServiceEntryPointImpl

ServiceEntryPointImpl::ServiceEntryPointImpl() 构造初始化,最大链接数限制
ServiceEntryPointImpl::startSession() 新链接的回调处理
ServiceEntryPointImpl::endAllSessions() Ssm服务状态机及session会话回收处理
ServiceEntryPointImpl::shutdown() 实例下线处理
ServiceEntryPointImpl::sessionStats() 获取链接统计信息
service_entry_point_utils launchServiceWorkerThread 创建工作线程,同时限制每个线程对应线程栈大小

4. Ticket数据收发子模块


Ticket数据收发子模块主要功能如下:调用session子模块进行底层asio库处理、拆分数据接收和数据发送到两个类、完整MongoDB报文读取 、接收或者发送MongoDB报文后的回调处理。
4.1 ASIOTicket类核心代码实现 

Ticket数据收发模块相关实现主要由ASIOTicket类完成,该类结构如下:

        return _sessionId;  
    }  
	    //asio模式没用,针对legacy模型  
	    Date_t expiration() const final {  
	        return _expiration;  
	    }  
	
	    //以下四个接口用于数据收发相关处理  
	    void fill(bool sync, TicketCallback&& cb);  
	protected:  
	    void finishFill(Status status);  
	    bool isSync() const;  
	    virtual void fillImpl() = 0;  
	private:  
	    //会话信息,一个链接一个session  
	    std::weak_ptr<ASIOSession> _session;  
	    //每个session有一个唯一id  
	    const SessionId _sessionId;  
	    //asio模型没用,针对legacy生效  
	    const Date_t _expiration;  
	    //数据发送或者接收成功后的回调处理  
	    TicketCallback _fillCallback;  
	    //同步方式还是异步方式进行数据处理,默认异步  
	    bool _fillSync;  
	};

 

该类保护多个成员变量,这些成员变量功能说明如下:

 

成员名 作用
_session Session会话信息,一个链接对应一个session
_sessionId 每个session都有一个对应的唯一ID
_expiration Legacy模式使用,当前都是用asio,该成员已淘汰
_fillCallback 发送或者接收一个完整MongoDB报文后的回调处理
_fillSync 同步还是异步方式收发数据。adaptive线程模型为异步,synchronous线程模型为同步读写方式

MongoDB在具体实现上,数据接收和数据发送分开实现,分别是数据接收类ASIOSourceTicket和数据发送类ASIOSinkTicket,这两个类都继承自ASIOTicket类,这两个类的主要结构如下:

//数据接收的ticket  
class TransportLayerASIO::ASIOSourceTicket : public TransportLayerASIO::ASIOTicket {  
public:  
    //初始化构造  
    ASIOSourceTicket(const ASIOSessionHandle& session, Date_t expiration, Message* msg);  
protected:  
    //数据接收Impl  
    void fillImpl() final;  
private:  
    //接收到MongoDB头部数据后的回调处理  
    void _headerCallback(const std::error_code& ec, size_t size);  
    //接收到MongoDB包体数据后的回调处理    
    void _bodyCallback(const std::error_code& ec, size_t size);    
    //存储数据的buffer,网络IO读取到的原始数据内容  
    SharedBuffer _buffer;  
    //数据Message管理,数据来源为_buffer  
    Message* _target;  
};  
//数据发送的ticket  
class TransportLayerASIO::ASIOSinkTicket : public TransportLayerASIO::ASIOTicket {  
 public:  
   //初始化构造  
   ASIOSinkTicket(const ASIOSessionHandle& session, Date_t expiration, const Message& msg);
  	protected:  
	    //数据发送Impl  
	    void fillImpl() final;  
	private:  
    //发送数据完成的回调处理  
   void _sinkCallback(const std::error_code& ec, size_t size);  
    //需要发送的数据message信息  
    Message _msgToSend;  
};

从上面的代码实现可以看出,ASIOSinkTicket 和ASIOSourceTicket 类接口及成员实现几乎意义,只是具体的实现方法不同,下面对ASIOSourceTicket和ASIOSinkTicket 相关核心代码实现进行分析。

4.1.2 ASIOSourceTicket数据接收核心代码实现 

数据接收过程核心代码如下:

//数据接收的fillImpl接口实现  
void TransportLayerASIO::ASIOSourceTicket::fillImpl() {    
    //获取对应session信息  
    auto session = getSession();  
    if (!session)  
        return;  
    //收到读取MongoDB头部数据,头部数据长度是固定的kHeaderSize字节  
    const auto initBufSize = kHeaderSize;  
    _buffer = SharedBuffer::allocate(initBufSize);  
  
    //调用TransportLayerASIO::ASIOSession::read读取底层数据存入_buffer  
    //读完头部数据后执行对应的_headerCallback回调函数  
    session->read(isSync(),  
                  asio::buffer(_buffer.get(), initBufSize), //先读取头部字段出来  
                  [this](const std::error_code& ec, size_t size) { _headerCallback(ec, size); });  
}  
  
//读取到MongoDB header头部信息后的回调处理  
void TransportLayerASIO::ASIOSourceTicket::_headerCallback(const std::error_code& ec, size_t size) {  
    ......  
    //获取session信息  
    auto session = getSession();  
    if (!session)  
        return;  
    //从_buffer中获取头部信息  
    MSGHEADER::View headerView(_buffer.get());  
    //获取message长度  
    auto msgLen = static_cast<size_t>(headerView.getMessageLength());  
    //长度太小或者太大,直接报错  
    if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {  
        .......  
        return;  
    }  
    ....  
   //内容还不够一个mongo协议报文,继续读取body长度字节的数据,读取完毕后开始body处理  
   //注意这里是realloc,保证头部和body在同一个buffer中  
    _buffer.realloc(msgLen);   
    MsgData::View msgView(_buffer.get());  
  
    //调用底层TransportLayerASIO::ASIOSession::read读取数据body   
    session->read(isSync(),  
      //数据读取到该buffer                  
      asio::buffer(msgView.data(), msgView.dataLen()),  
      //读取成功后的回调处理  
      [this](const std::error_code& ec, size_t size) { _bodyCallback(ec, size); });  
}  
  
//_headerCallback对header读取后解析header头部获取到对应的msg长度,然后开始body部分处理  
void TransportLayerASIO::ASIOSourceTicket::_bodyCallback(const std::error_code& ec, size_t size) {  
    ......  
    //buffer转存到_target中  
    _target->setData(std::move(_buffer));  
    //流量统计  
    networkCounter.hitPhysicalIn(_target->size());  
    //TransportLayerASIO::ASIOTicket::finishFill    
    finishFill(Status::OK()); //包体内容读完后,开始下一阶段的处理    
    //报文读取完后的下一阶段就是报文内容处理,开始走ServiceStateMachine::_processMessage  
}

MongoDB协议由msg header + msg body组成,一个完整的MongoDB报文内容格式如下:

上图所示各个字段及body内容部分功能说明如下表:

 

Header or body 字段名
 

msg  header

messageLength 整个message长度,包括header长度和body长度
requestID 该请求id信息
responseTo 应答id
opCode 操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE等
msg  body Body 不同opCode对应的包体内容

 

ASIOSourceTicket类的几个核心接口都是围绕这一原则展开,整个MongoDB数据接收流程如下:

1. 读取MongoDB头部header数据,解析出header中的messageLength字段。

2. 检查messageLength字段是否在指定的合理范围,该字段不能小于Header整个头部大小,也不能超过MaxMessageSizeBytes最大长度。

3. Header len检查通过,说明读取header数据完成,于是执行_headerCallback回调。

4. realloc更多的空间来存储body内容。

5. 继续读取body len长度数据,读取body完成后,执行_bodyCallback回调处理。

4.1.3 ASIOSinkTicket数据发送类核心代码实现 

ASIOSinkTicket发送类相比接收类,没有数据解析相关的流程,因此实现过程会更加简单,具体源码实现如下:

//发送数据成功后的回调处理  
void TransportLayerASIO::ASIOSinkTicket::_sinkCallback(const std::error_code& ec, size_t size) {  
    //发送的网络字节数统计  
    networkCounter.hitPhysicalOut(_msgToSend.size());   
    //执行SSM中对应的状态流程  
    finishFill(ec ? errorCodeToStatus(ec) : Status::OK());  
}  
  
//发送数据的fillImpl  
void TransportLayerASIO::ASIOSinkTicket::fillImpl() {  
    //获取对应session  
    auto session = getSession();  
    if (!session)  
        return;  
  
    //调用底层TransportLayerASIO::ASIOSession::write发送数据,发送成功后执行_sinkCallback回调   
    session->write(isSync(),  
       asio::buffer(_msgToSend.buf(), _msgToSend.size()),  
       //发送数据成功后的callback回调  
       [this](const std::error_code& ec, size_t size) { _sinkCallback(ec, size); });  
}

 

4.2 总结 

从上面的分析可以看出,Ticket数据收发模块主要调用session会话模块来进行底层数据的读写、解析,当读取或者发送一个完整的MongoDB报文后最终交由SSM服务状态机模块调度处理。

 

ticket模块主要接口功能总结如下表所示:

 

类命 接口名 功能说明
 

 

ASIOTicket

ASIOTicket::getSession() 获取session信息
ASIOTicket::isSync() 判断是同步收发还是异步收发
ASIOTicket::finishFill() 执行_fillCallback回调
ASIOTicket::fill() 给_fillCallback赋值
ASIOTicket::ASIOTicket() ASIOTicket构造初始化
 

ASIOSourceTicket

ASIOSourceTicket::ASIOSourceTicket() ASIOSourceTicket初始化
ASIOSourceTicket::_bodyCallback() 接收到mesg header+body后的回调处理
ASIOSourceTicket::_headerCallback 接收到msg header后的回调处理
 

ASIOSinkTicket

ASIOSinkTicket::ASIOSinkTicket() ASIOSinkTicket初始化构造
ASIOSourceTicket::fillImpl() 发送指定msg数据,发送完成后执行回调
ASIOSinkTicket::_sinkCallback 发送msg成功后的回调处理

 

前面的分析也可以看出,Ticket数据收发模块会调用session处理模块来进行真正的数据读写,同时接收或者发送完一个完整MongoDB报文后的后续回调处理讲交由SSM服务状态机模块处理。

5. Session会话子模块


Session会话模块功能主要如下:负责记录HostAndPort、和底层asio库直接互动,实现数据的同步或者异步收发。一个新连接fd对应一个唯一的session,对fd的操作直接映射为session操作。Session会话子模块主要代码实现相关文件如下:

5.1 session会话子模块核心代码实现 
class TransportLayerASIO::ASIOSession : public Session {  
    //初始化构造  
    ASIOSession(TransportLayerASIO* tl, GenericSocket socket);  
    //获取本session使用的tl  
    TransportLayer* getTransportLayer();  
    //以下四个接口套接字相关,本端/对端地址获取,获取fd,关闭fd等  
    const HostAndPort& remote();  
    const HostAndPort& local();  
    GenericSocket& getSocket();  
    void shutdown();  
  
    //以下四个接口调用asio网络库实现数据的同步收发和异步收发  
    void read(...)  
    void write(...)  
    void opportunisticRead(...)  
    void opportunisticWrite(...)  
  
    //远端地址信息  
    HostAndPort _remote;  
    //本段地址信息  
    HostAndPort _local;  
    //赋值见TransportLayerASIO::_acceptConnection  
    //也就是fd,一个新连接对应一个_socket  
    GenericSocket _socket;  
    //SSL相关不做分析,  
#ifdef MONGO_CONFIG_SSL  
    boost::optional<asio::ssl::stream<decltype(_socket)>> _sslSocket;  
    bool _ranHandshake = false;  
#endif  
  
    //本套接字对应的tl,赋值建TransportLayerASIO::_acceptConnection(...)  
    TransportLayerASIO* const _tl;  
}

 

该类最核心的三个接口ASIOSession(…)、opportunisticRead(…)、opportunisticWrite(..)分别完成套接字处理、调用asio库接口实现底层数据读和底层数据写。这三个核心接口源码实现如下:

//初始化构造 TransportLayerASIO::_acceptConnection调用  
ASIOSession(TransportLayerASIO* tl, GenericSocket socket)  
    //fd描述符及TL初始化赋值  
    : _socket(std::move(socket)), _tl(tl) {  
    std::error_code ec;  
  
    //异步方式设置为非阻塞读  
    _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec);  
    fassert(40490, ec.value() == 0);  
  
    //获取套接字的family  
    auto family = endpointToSockAddr(_socket.local_endpoint()).getType();  
    //满足AF_INET
    if (family == AF_INET || family == AF_INET6) {  
        //no_delay keep_alive套接字系统参数设置  
        _socket.set_option(asio::ip::tcp::no_delay(true));  
        _socket.set_option(asio::socket_base::keep_alive(true));  
        //KeepAlive系统参数设置  
        setSocketKeepAliveParams(_socket.native_handle());  
    }  
  
    //获取本端和对端地址  
    _local = endpointToHostAndPort(_socket.local_endpoint());  
    _remote = endpointToHostAndPort(_socket.remote_endpoint(ec));  
    if (ec) {  
        LOG(3) << "Unable to get remote endpoint address: " << ec.message();  
    }  
}

该类初始化的时候完成新连接_socket相关的初始化设置,包括阻塞读写还是非阻塞读写。如果是同步线程模型(一个链接一个线程),则读写方式位阻塞读写;如果是异步线程模型(adaptive动态线程模型),则调用asio网络库接口实现异步读写。

此外,该链接_socket对应的客户端ip:port和服务端ip:port也在该初始化类中获取,最终保存到本session的_remote和_local成员中。

数据读取核心代码实现如下:

//读取指定长度数据,然后执行handler回调  
void opportunisticRead(...) {  
    std::error_code ec;  
    //如果是异步线程模型,在ASIOSession构造初始化的时候会设置non_blocking非阻塞模式  
    //异步线程模型这里实际上是非阻塞读取,如果是同步线程模型,则没有non_blocking设置,也就是阻塞读取  
    auto size = asio::read(stream, buffers, ec);    
  
    //如果是异步读,并且read返回would_block或者try_again说明指定长度的数据还没有读取完毕  
    if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
        //buffers有大小size,实际读最多读size字节  
        MutableBufferSequence asyncBuffers(buffers);  
        if (size > 0) {  
            asyncBuffers += size; //buffer offset向后移动  
        }  
  
        //继续异步方式读取数据,读取到指定长度数据后执行handler回调处理  
        asio::async_read(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
    } else {   
        //阻塞方式读取read返回后可以保证读取到了size字节长度的数据  
        //直接read获取到size字节数据,则直接执行handler   
        handler(ec, size);  
    }  
}

opportunisticRead首先调用asio::read(stream, buffers, ec)读取buffers对应size长度的数据,buffers空间大小就是需要读取的数据size大小。如果是同步线程模型,则这里为阻塞式读取,直到读到size字节才会返回;如果是异步线程模型,这这里是非阻塞读取,非阻塞读取当内核网络协议栈数据读取完毕后,如果还没有读到size字节,则继续进行async_read异步读取。

当buffers指定的size字节全部读取完整后,不管是同步模式还是异步模式,最终都会执行handler回调,开始后续的数据解析及处理流程。

发送数据核心代码实现如下:

//发送数据  
void opportunisticWrite(...) {  
    std::error_code ec;  
    //如果是同步模式,则阻塞写,直到全部写成功。异步模式则非阻塞写  
    auto size = asio::write(stream, buffers, ec);   
  
    //异步写如果返回try_again说明数据还没有发送完,则继续异步写发送  
    if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) {  
        ConstBufferSequence asyncBuffers(buffers);  
        if (size > 0) {  //buffer中数据指针偏移计数
            asyncBuffers += size;  
        }  
        //异步写发送完成,执行handler回调  
        asio::async_write(stream, asyncBuffers, std::forward<CompleteHandler>(handler));  
    } else {  
        //同步写成功,则直接执行handler回调处理  
        handler(ec, size);  
    }  
}

数据发送流程和数据接收流程类似,也分位同步模式发送和异步模式发送,同步模式发送为阻塞式写,只有当所有数据通过asio::write()发送成功后才返回;异步模式发送为非阻塞写,asio::write()不一定全部发送出去,因此需要再次调用asio库的asio::async_write()进行异步发送。

不管是同步模式还是异步模式发送数据,最终数据发送成功后,都会调用handler()回调来执行后续的流程。

5.2 总结 

从上面的代码分析可以看出,session会话模块最终直接和asio网络库交互实现数据的读写操作。该模块核心接口功能总结如下表:

 

类名 接口名 功能说明
 

 

 

 

ASIOSession

ASIOSession(…) 新连接fd相关处理,如是否非阻塞、delay设置、keep_alive设置、两端地址获取等
getTransportLayer() 获取对应TL
remote() 获取该链接对应的对端Ip:port信息
local() 获取该链接对应的对端Ip:port信息
getSocket() 获取该链接的fd
shutdown() fd回收处理
opportunisticRead() 同步或者异步数据读操作
opportunisticWrite() 同步或者异步数据写操作

6. 总结


《MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计》一文对MongoDB网络传输模块中的ASIO网络库实现、service_executor服务运行子模块(即线程模型子模块)进行了详细的分析,加上本文的transport_layer套接字处理及传输层管理子模块、session会话子模块、Ticket数据收发子模块、service_entry_point服务入口点子模块。

transport_layer套接字处理及传输层管理子模块主要由transport_layer_manager和transport_layer_asio两个核心类组成。分别完成net相关的配置文件初始化操作,套接字初始化及处理,最终transport_layer_asio的相应接口实现了和ticket数据分发子模块、服务入口点子模块的关联。

 

服务入口子模块主要由ServiceEntryPointImpl类和service_entry_point_utils中的线程创建函数组成,实现新连接accept处理及控制。该模块通过startSession()让服务入口子模块、session会话子模块、ssm状态机子模块关联起来。

 

ticket数据收发子模块主要功能如下:调用session子模块进行底层asio库处理、拆分数据接收和数据发送到两个类、完整MongoDB报文读取 、接收或者发送MongoDB报文后的回调处理, 回调处理由SSM服务状态机模块管理,当读取或者发送一个完整的MongoDB报文后最终交由SSM服务状态机模块调度处理。

 

Session会话模块功能主要如下:负责记录HostAndPort、和底层asio库直接互动,实现数据的同步或者异步收发。一个新连接fd对应一个唯一的session,对fd的操作直接映射为session操作。

 

到这里,整个MongoDB网络传输层模块分析只差service_state_machine状态机调度子模块,状态机调度子模块相比本文分析的几个子模块更加复杂,因此将在下期《MongoDB网络传输层模块源码分析三》中单独分析。

 

本文所有源码注释分析详见如下链接:MongoDB网络传输模块详细源码分析

更多文章:

常用高并发网络线程模型设计及MongoDB线程模型优化实践
MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计

OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

作者:杨亚洲前滴滴出行技术专家,现任OPPO文档数据库MongoDB负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库MongoDB内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:https://github.com/y123456yz

 

 

 

 

 

MongoDB中文手册翻译正在进行中,欢迎更多朋友在自己的空闲时间学习并进行文档翻译,您的翻译将由社区专家进行审阅,并拥有署名权更新到中文用户手册和发布到社区微信内容平台。

更多问题可以添加社区助理小芒果微信(mongoingcom)咨询,进入社区微信交流群请备注“mongo”。

赞(12)
未经允许不得转载:MongoDB中文社区 » MongoDB网络传输层模块源码实现二

评论 3

评论前必须登录!

 

  1. #1

    模块化分析确实不错,代码全部乱吗得,这咋看啊,求标准化得代码

    wokalo4年前 (2020-10-26)
  2. #2

    为啥网页版代码全是乱的啊,求工整的代码部分,哪里有,网页版?手机微信公众号始终不是很方便

    wokalo4年前 (2020-10-26)
    • 我们找技术看下编辑器,可能出了点问题。您先打开本链接查看下,https://mp.weixin.qq.com/s/CGjUICVh3D6-J5xa4C_PTA

      Mr.Mongo4年前 (2020-10-27)