MongoDB爱好者
垂直技术交流平台

Mongo4.2分布式事务实现Overview

本文接上篇事务,时间戳与混合逻辑时钟。分布式事务在20190606随着4.2rc0版本发布了。本文是对4.2分布式事务实现的overview。

整体流程

Mongos侧

相关代码:transaction_router.cpp/h

在Mongos侧,每个OperationCtx会有一个TransactionRouter,记录事务上下文。
TransactionRouter::get(opCtx)

每次事务内CRUD操作,会被TransactionRouter记录,特别是参与此次事务的shard,会被记录在_participants中。
TransactionRouter::attachTxnFieldsIfNeeded(opCtx, shardId, cmObj) {
_participants.emplace_back(shardId);
}

当事务提交时,TransactionRouter在所有参与者中选择(第一个)一个coordinator,并将两阶段提交的工作 hand off 给 coordinator。
TransactionRouter::_handOffCommitToCoordinatoropCtx)
{ CoordinateCommitTransaction cmd;
cmd.setParticipants(_participantList);
sendTo(_coordinator, cmd).waitForReply();
}

因此,Mongos不处理二阶段提交,甚至configSvr也不参与。二阶段提交的工作完全由多个shard阶段之间协作完成。基于TimestampOracle的分布式事务需要中心节点分配事务Id或者时间戳,但是基于HLC的系统就不需要。

1

Shard侧

相关代码:transaction_coordinator.cpp/h,transaction_coordinator_util.cpp/h

shard侧在收到CoordinateCommitTransactionCmd后,会有如下操作:

将{lsid,txnNumber, participants} 持久化到config.transaction_coordinators表中

广播prepare命令{lsid, txnNumber, participants}
– 收集prepareresponse,如果deadline之前没有全部返回,则prepare失败
– 只要有一个参与者反对提交,就prepare失败
– commitTimestamp = max(prepareTimestamp)

将prepare结果COMMITorABORT更新到step1写入的记录中
– 一旦这一步决定commit,二阶段提交就必须成功

假设prepare结果是commit,广播通知participants进行commit
– 没有超时,如果某个participant一直没返回(挂了/网络故障),就一直等直到有返回结果

删除config.transaction_coordinators表中的对应记录

2

Oplog格式更改

为了支持超过16MB的事务,一个事务内的多条操作会记录在多条Oplog里。每条Oplog新增字段如下:

prepare: {true/false} // 表示这条Oplog是否是分布式事务的prepare阶段产生的,与非事务产生的oplog区分
prevOpTime // 用以指向本事务内上一条Oplog,用于做prepare阶段的事务的恢复
lsid/txnNumber //多文档事务特有
o.commitTimestamp/o.commitTransaction

举个例子,一个shard内,有两个分布式事务,产生了五条Oplog db.oplog.rs.find().sort({$natural:-1})

{ts:5,o:{commitTimestamp:4,commitTransaction:1}, prevOpTime:4, lsid:1, txnNumber:1}
{ts:4, applyOps:[...], prevOpTime:1,prepare:true, lsid:1, txnNumber:1}
{ts:3,o:{commitTimestamp:2,commitTransaction:1}, prevOpTime:2,lsid:0, txnNumber:2}
{ts:2, applyOps:[...], prevOpTime:-1,prepare:true,lsid:0, txnNumber:2}
{ts:1, applyOps:[...], prevOpTime:-1,prepare:true, lsid:1, txnNumber:1}

总结:每个事务的Oplog不再放在一个OplogEntry里,甚至不再连续存放,而是通过prevOpTime串起来。

二阶段提交的故障恢复

分为Coordinator的故障和Participant的故障

Coordinator的故障

二阶段提交的信息持久化在transaction_coordinators表中。TransactionCoordinatorService::onStepUp从表里恢复 所有pending状态的事务,继续执行
auto coordinatorDocs = txn::readAllCoordinatorDocs(opCtx)
for (const auto& doc : coordinatorDocs) {
auto coordinator = std::make_shared();
coordinator.continueCommit(doc);
}

Participant的故障

事务在prepare时,会将修改记录记录到oplog表中,上文我们已经分析过prepareLog的格式,因此所有prepare成功的事务的Oplog都已经持久化,就差一条CommitLog了。在ReplicationCoordinatorImpl::startup中,会重新构建 所有处于prepare状态的事务。每个事务,顺着prevOpTime就可以找到所有的Oplog,通过Oplog恢复出事务的状态。

for (const auto& doc : config.transactions.find())
{ lastOplogTime = doc.lastOplogTime vector v;
while(auto t = oplog.rs.find(lastOplogTime))
{ v.push_back(t);
lastOplogTime = v.prevOpTime
}
reconstructTransaction(v.reverse());
}

上文说过,coordinator会等待所有participant返回commit结果,所以participant重启后,coordinator会重试让participant提交处于prepare状态的事务。
可以看到,与Mysql的本地XA不同的是,mongo分布式事务的prepare阶段是通过Oplog保证持久化的,而不是wal 和oplog的协同,wiredTiger层在prepare阶段不保证持久化,那wiredTiger的prepare在干嘛呢?

wiredTiger层

建议先读这篇这篇

我们知道,对于wiredTiger,未提交事务是不会被evict的,也不会进入checkpoint,在ARIES算法中,这叫做nosteal模式。可是,在极端情况下,prepare状态可能会驻留很长时间,prepare状态的事务会不会被evict出去呢?为 了防止缓存压力,wiredTiger允许prepare状态的事务被evict出去(注意:不是evict到已提交事务的btree表空间)。

另外,(可能是)为了防止快照数量膨胀,wiredTiger在prepare阶段就释放快照,我们知道wiredTiger的SI是通过 事务开始时拷贝全局快照来实现,提前释放快照会导致未提交的事务过早的被看到,wiredTiger于是又加了一个WT_PREPARE_CONFLICT状态,考虑如下的时序:

txn1.begin() txn1.insert("a", "b")
txn1.prepare(prepare_ts = 10)
txn2.begin(read_ts = 11)
txn2.read("a") // WT_PREPARE_CONFLICT
txn3.begin(read_ts = 9)
txn3.read("a") // WT_NOT_FOUND

由于txn1在prepare阶段就已经释放了快照。因此txn2开始时,txn1的修改对其是可见的,虽然可见,但是出于一种 特殊的 PREPARE_CONFLICT状态,Mongo层在处理PREPARE_CONFLICT状态的数据时,会不停重试,mongo将所有对wiredTiger的读操作通过wiredTigerPrepareConflictRetry封装起来。

int ret = wiredTigerPrepareConflictRetry(opCtx, [&] { return c->search(c); });

然而,直接略过这条记录不行吗?为什么Mongo一定要不停retry呢?因为这样违背SI。快照已经在事务prepare时释 放了,而事务的commitTimestamp > prepareTimestamp,因此对于一个SI的读事务,直接忽略另一个prepare状态的记录,会导致另一个事务提交后,对本事务可见,或者不可见。举个例子如下,txn2是快照隔离级别的,两次读a的结果必须一致。因此第一次读a需要重试。
// 如果txn2 直接不读其他出于prepare状态的记录。

txn1.begin()
txn1.insert("a", "b")
txn1.prepare(prepare_ts = 10)
txn2.begin(read_ts = 11)
txn2.read("a")
txn1.commit(commit_ts = 11)
txn2.read("a")

那么,能不能不要prepare阶段释放快照,而是commit后释放快照呢?如果这样,wiredTiger其实根本不需要提供出prepare接口出来。

TODO

还有很多没有讲到的地方,比如,wiredTiger层,对于每个事务新增的durable_timestamp的机制。从节点上回放oplog的机制(结合prepare)等等。我呼吁大家一起分析。另外,感兴趣的人可以给我发邮件讨论 kongdeyu@huawei.com。

孔德雨
华为云mongodb架构师
MongoDB中文社区深圳分会主席

赞(12)
未经允许不得转载:MongoDB中文社区 » Mongo4.2分布式事务实现Overview

评论 7

评论前必须登录!

 

  1. #1

    hi ,您好,为什么我测试4.2.5 测试分片事务, 没有出现config.transaction_coordinators 这个表呢

    5535899125年前 (2020-04-01)
    • 作者回复:您在哪里找这个表的,是shard上还是configsvr上?元数据保存在configsvr里

      xica5年前 (2020-04-01)
      • 感谢快速回复,都看啦,这些是configsvr 节点的 config 里面的表changelog
        chunks
        collections
        databases
        lockpings
        locks
        migrations
        mongos
        shards
        system.profile
        system.sessions
        tags
        transactions
        version

        5535899125年前 (2020-04-01)
        • 作者回复:这个表要操作过分布式事务(注意不是多文档事务)才会有 如果事务的所有操作都在一个分片上 会退化为多文档事务

          xica5年前 (2020-04-01)
          • 奥奥,明白了,谢谢孔老师这么及时的解答还有这么高质量文章的分享,我再试试

            5535899125年前 (2020-04-01)
          • 您好,这是我执行的事务流程,不知道是不是我的事务写法有错误,还是没能出现。
            wqrs01:PRIMARY> db.haha.find({“abc” : 52});
            { “_id” : ObjectId(“5e8474aaf7cb65ec8c06e469”), “abc” : 52, “name” : “MACLEAN”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }
            wqrs01:PRIMARY> db.gaga.find({“abc” : 52});
            { “_id” : ObjectId(“5e7abe36687d318c013c9e8c”), “abc” : 52, “name” : “MACLEAN”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }

            wqrs02:PRIMARY> db.haha.find({“abc” : 29})
            { “_id” : ObjectId(“5e8474aaf7cb65ec8c06e452”), “abc” : 29, “name” : “MACLEAN”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }
            wqrs02:PRIMARY> db.gaga.find({“abc” : 29})
            { “_id” : ObjectId(“5e7abe35687d318c013c9e75”), “abc” : 29, “name” : “MACLEAN”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }
            wqrs02:PRIMARY>

            执行:

            s = db.getMongo().startSession()
            s.startTransaction()
            s.getDatabase(“test”).gaga.update({‘abc’:52},{$set:{“name” : “111”}})
            s.getDatabase(“test”).haha.update({‘abc’:52},{$set:{“name” : “222”}})
            s.getDatabase(“test”).gaga.update({‘abc’:29},{$set:{“name” : “333”}})
            s.getDatabase(“test”).haha.update({‘abc’:29},{$set:{“name” : “444”}})
            s.commitTransaction()

            mongos> s = db.getMongo().startSession()
            session { “id” : UUID(“221c5981-5093-45a2-ab13-d2fbe21ba2ba”) }
            mongos> s.startTransaction()
            mongos> s.getDatabase(“test”).gaga.update({‘abc’:52},{$set:{“name” : “111”}})
            WriteResult({ “nMatched” : 1, “nUpserted” : 0, “nModified” : 1 })
            mongos> s.getDatabase(“test”).haha.update({‘abc’:52},{$set:{“name” : “222”}})
            WriteResult({ “nMatched” : 1, “nUpserted” : 0, “nModified” : 1 })
            mongos> s.getDatabase(“test”).gaga.update({‘abc’:29},{$set:{“name” : “333”}})
            WriteResult({ “nMatched” : 1, “nUpserted” : 0, “nModified” : 1 })
            mongos> s.getDatabase(“test”).haha.update({‘abc’:29},{$set:{“name” : “444”}})
            WriteResult({ “nMatched” : 1, “nUpserted” : 0, “nModified” : 1 })
            mongos> s.commitTransaction()

            修改过后
            分片1
            wqrs01:PRIMARY> db.gaga.find({“abc” : 52});
            { “_id” : ObjectId(“5e7abe36687d318c013c9e8c”), “abc” : 52, “name” : “111”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }
            wqrs01:PRIMARY> db.haha.find({“abc” : 52});
            { “_id” : ObjectId(“5e8474aaf7cb65ec8c06e469”), “abc” : 52, “name” : “222”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }

            分片2
            { “_id” : ObjectId(“5e8474aaf7cb65ec8c06e452”), “abc” : 29, “name” : “444”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }
            wqrs02:PRIMARY> db.gaga.find({“abc” : 29})
            { “_id” : ObjectId(“5e7abe35687d318c013c9e75”), “abc” : 29, “name” : “333”, “name1” : “MACLEAN”, “name2” : “MACLEAN”, “name3” : “MACLEAN” }

            但是结果还是没有这个表,是我的事务写法不对不属于分布式事务吗?

            5535899125年前 (2020-04-01)
          • 请您通过wolfkdy1989@gmail.com与老师交流哦~

            xica5年前 (2020-04-01)