翻译或纠错本页面

执行两个阶段的提交

概要

这个文档为执行多文档操作或者说 ” 多文档事务 ” 提供一个模型,这个模型使用两阶段提交方法把数据写到多个文档中。除此之外,你可以扩展这个过程以提供一个 rollback-like f功能。

背景

在MongoDB里一个 document 上的操作总是原子的;然而,常常被称作 “多文档事务” 的包含多个文档的操作并不是原子的。由于文档可能会相当复杂并且包含多个 “嵌套的” 文档,单文档原子性为许多使用情况提供了必要的支持。

尽管当文档原子操作很强大,但是仍然有需要多文档事务的情况。当执行一个由连续操作组成的事务时,某些问题出现了,比如:

  • 原子性:如果一个操作失败,事务内的之前的操作必须 ” 回滚 “到之前的状态(就是 “all or nothing” 里面的 “nothing”)。

  • 一致性:如果一个严重的故障(比如网络或者硬件)打断了事务,数据库必须可以恢复到一致的状态。

对于需要多文档事务的情景,你可以在你的应用里实现两阶段提交以提供这些多文档更新的支持。使用两阶段提交保证数据是一致的,并且在发生错误的情况下,执行事务之前的状态是 recoverable (可恢复的) 。然而,在执行过程中,文档可以展示未确定的(事务提交之前的)数据和状态。

注解

因为MongoDB数据库里仅仅单文档的操作是原子的,两阶段提交仅仅可以提供一个 类似 事务的语义。对于应用来说在两阶段提交期间在中间点返回中间的数据或者回滚是有可能的。

模型

概述

假设一个情景,你想从账户 A 转钱到账户 B 。在关系型数据库系统里,你可以在一个多语句事务内从 A 账户上减去钱并且为 B 账户添加上钱。在MongoDB里,你可以模仿两阶段提交以达到一个类似的结果。

这个教程里的例子使用下面的两个集合:

  1. 命名为 accounts 的集合存储账户信息。

  2. 命名为 transactions 的集合存储有关转账事务的信息。

初始化源账户和目的账户

accounts 集合里分别为账户 A 和账户 B 插入一个文档。

db.accounts.insert(
   [
     { _id: "A", balance: 1000, pendingTransactions: [] },
     { _id: "B", balance: 1000, pendingTransactions: [] }
   ]
)

这个操作返回一个包含操作状态的 BulkWriteResult() 对象。成功插入之后, BulkWriteResult()nInserted 字段设置为 2

初始化转账记录

对于每一次转账的完成,往 transactions 集合里插入一条包含转账信息的文档。这个文档包含如下字段:

  • sourcedestination 字段,与 accounts 集合里的 _id 字段相关联的。

  • value 字段,指定影响 source 账户和 destination 账户 balance 的传输量,

  • state 字段,反应传输的当前状态。state 字段可以具有值 initialpendingapplieddonecancelingcanceled

  • lastModified 字段,反映了最后修改的日期。

想要初始化从账户 A 到账户 B100 的转账,在 transactions 集合里插入一个包含转账信息的文档,设置交易 state"initial",并且 lastModified 字段设置为当前日期:

db.transactions.insert(
    { _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)

这个操作返回一个包含操作状态的 WriteResult() 对象。成功插入后, WriteResult() 对象的 nInserted 被设置为 1

使用两阶段提交在账户之间转移资金

1

Retrieve the transaction to start.

From the transactions collection, find a transaction in the initial state. Currently the transactions collection has only one document, namely the one added in the 初始化转账记录 step. If the collection contains additional documents, the query will return any transaction with an initial state unless you specify additional query conditions.

var t = db.transactions.findOne( { state: "initial" } )

Type the variable t in the mongo shell to print the contents of the variable. The operation should print a document similar to the following except the lastModified field should reflect date of your insert operation:

{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }
2

Update transaction state to pending.

Set the transaction state from initial to pending and use the $currentDate operator to set the lastModified field to the current date.

db.transactions.update(
    { _id: t._id, state: "initial" },
    {
      $set: { state: "pending" },
      $currentDate: { lastModified: true }
    }
)

The operation returns a WriteResult() object with the status of the operation. Upon successful update, the nMatched and nModified displays 1.

In the update statement, the state: "initial" condition ensures that no other process has already updated this record. If nMatched and nModified is 0, go back to the first step to get a different transaction and restart the procedure.

3

Apply the transaction to both accounts.

Apply the transaction t to both accounts using the update() method if the transaction has not been applied to the accounts. In the update condition, include the condition pendingTransactions: { $ne: t._id } in order to avoid re-applying the transaction if the step is run more than once.

To apply the transaction to the account, update both the balance field and the pendingTransactions field.

Update the source account, subtracting from its balance the transaction value and adding to its pendingTransactions array the transaction _id.

db.accounts.update(
   { _id: t.source, pendingTransactions: { $ne: t._id } },
   { $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

Update the destination account, adding to its balance the transaction value and adding to its pendingTransactions array the transaction _id .

db.accounts.update(
   { _id: t.destination, pendingTransactions: { $ne: t._id } },
   { $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

4

Update transaction state to applied.

Use the following update() operation to set the transaction’s state to applied and update the lastModified field:

db.transactions.update(
   { _id: t._id, state: "pending" },
   {
     $set: { state: "applied" },
     $currentDate: { lastModified: true }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

5

Update both accounts’ list of pending transactions.

Remove the applied transaction _id from the pendingTransactions array for both accounts.

Update the source account.

db.accounts.update(
   { _id: t.source, pendingTransactions: t._id },
   { $pull: { pendingTransactions: t._id } }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

Update the destination account.

db.accounts.update(
   { _id: t.destination, pendingTransactions: t._id },
   { $pull: { pendingTransactions: t._id } }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

6

Update transaction state to done.

Complete the transaction by setting the state of the transaction to done and updating the lastModified field:

db.transactions.update(
   { _id: t._id, state: "applied" },
   {
     $set: { state: "done" },
     $currentDate: { lastModified: true }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

从失败场景中恢复

事务过程最重要的一部分不是上面典型的例子,而是当事务没有成功完成的时候从各种各样的失败场景中恢复的可能性。这一部分展现了可能的失败的概述并且提供从这些种事件中恢复的步骤。

恢复操作

两阶段提交模式允许应用运行序列以恢复事务,并且达到一致的状态。在应用启动的时候运行恢复操作,并且如果可能的话每隔一段时间,捕捉一些未完成的事务。

需要达到一致状态的时间取决于这个应用恢复每一个事务花费的时间。

下面的恢复程序使用 lastModified 日期作为 pending 的事务是否需要恢复的标识;特别地,如果 pending 或者 applied 事务在最后的30分钟内还没有被更新,程序将判定这些事务需要恢复。你可以使用不同的条件来做这个判断。

处于 Pending 状态的事务

要想从 “Update transaction state to pending.” 步骤之后 “Update transaction state to applied.” 步骤之前发生的错误恢复,请从 transactions 集合中检索出一个 pending 的事务来恢复:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

从步骤 “Apply the transaction to both accounts.” 重新开始执行

处于 Applied 状态的事务

要想从 “Update transaction state to applied.” 步骤之后 “Update transaction state to done.” 步骤之前发生的错误恢复,请从 transactions 集合中检索出一个 applied 的事务来恢复:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );

从步骤 “Update both accounts’ list of pending transactions.“重新开始执行

回滚操作

有时候,你需要 “回滚” 或者撤销一个事务;比如,应用需要 “取消” 事务或者其中一个账户不存在或者停止在交易过程中存在(感觉不太贴切,贴出原文:or if one of the accounts does not exist or stops existing during the transaction.)。

处于 Applied 状态的事务

在 “Update transaction state to applied.” 步骤之后,你 应该回滚事务。取而代之地,完成那个事务并且创建一个新的事务并通过切换源字段和目的字段的值来冲销交易记录。

处于 Pending 状态的事务

在 “Update transaction state to pending.” 步骤之后并且 “Update transaction state to applied.” 步骤之前,你可以使用下面的步骤回滚事务:

1

Update transaction state to canceling.

Update the transaction state from pending to canceling.

db.transactions.update(
   { _id: t._id, state: "pending" },
   {
     $set: { state: "canceling" },
     $currentDate: { lastModified: true }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

2

Undo the transaction on both accounts.

To undo the transaction on both accounts, reverse the transaction t if the transaction has been applied. In the update condition, include the condition pendingTransactions: t._id in order to update the account only if the pending transaction has been applied.

Update the destination account, subtracting from its balance the transaction value and removing the transaction _id from the pendingTransactions array.

db.accounts.update(
   { _id: t.destination, pendingTransactions: t._id },
   {
     $inc: { balance: -t.value },
     $pull: { pendingTransactions: t._id }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1. If the pending transaction has not been previously applied to this account, no document will match the update condition and nMatched and nModified will be 0.

Update the source account, adding to its balance the transaction value and removing the transaction _id from the pendingTransactions array.

db.accounts.update(
   { _id: t.source, pendingTransactions: t._id },
   {
     $inc: { balance: t.value},
     $pull: { pendingTransactions: t._id }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1. If the pending transaction has not been previously applied to this account, no document will match the update condition and nMatched and nModified will be 0.

3

Update transaction state to canceled.

To finish the rollback, update the transaction state from canceling to cancelled.

db.transactions.update(
   { _id: t._id, state: "canceling" },
   {
     $set: { state: "cancelled" },
     $currentDate: { lastModified: true }
   }
)

Upon successful update, the method returns a WriteResult() object with nMatched and nModified set to 1.

多个应用

存在部分交易,他们以便多个应用创建并且一致地运行操作,并且不会造成数据不一致或者冲突。在我们的过程中,想要更新或者检索交易文档,更新条件则包含一个关于 state 字段的条件以避免被多个应用重复应用交易记录。

例如, App1App2 都获取到了处于 initial 状态的同一交易记录。在 App2 之前 App1 完成了整个交易。当 App2 试图执行 “Update transaction state to pending.” 步骤的时候,包含 state: "initial" 要求的的更新条件将不会匹配到任何文档,并且 nMatchednModified 的值将是 0 。这是向 App2 发出一个信号:返回到第一步对不同的交易记录重新开始这个过程。

当多个应用在运行的时候,在同一时间点仅仅一个应用可以处理某个给定的交易记录是非常重要的。因此,除了在更新条件里包含交易记录的期许状态之外,你还可以在交易记录文档自身中创建一个标记,它能够辨认正在操作这个交易记录文档的应用。使用 findAndModify() 方法在一步内修改交易记录并且获取此记录。

t = db.transactions.findAndModify(
       {
         query: { state: "initial", application: { $exists: false } },
         update:
           {
             $set: { state: "pending", application: "App1" },
             $currentDate: { lastModified: true }
           },
         new: true
       }
    )

改进交易操作以保证仅仅与 application 字段中标识符相匹配的应用程序才能应用该交易记录。

如果在交易执行期间应用 App1 失败的话,你可以使用 recovery procedures ,但是,应用程序应该保证在它们应用该交易之前 “拥有” 这个交易记录。例如查找并且继续挂起的工作,使用一个类似于下面的查询:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

db.transactions.find(
   {
     application: "App1",
     state: "pending",
     lastModified: { $lt: dateThreshold }
   }
)

在生产应用中使用两阶段提交

上面的交易记录是有意的简单。例如,假设总是对一个账户进行回滚操作,账户余额有可能为负值。

生产实现可能会更复杂。通常情况下,账户需要当前余额,待定积分(pending credit)和挂起的借方(pending debits)信息等。

For all transactions, ensure that you use the appropriate level of write concern for your deployment.