MongoDB爱好者
垂直技术交流平台

技术干货 | 如何利用 MongoDB Change Streams 实现数据实时同步?

当前实时数据同步的应用场景较多,实现方式主要有两种,一是数据库厂家本身提供了实时数据捕获工具,如 Oracle 的 OGG 等;另外一种是实时解析数据库的事务日志,获取到实时变化的数据后进行同步,如 Flink CDC 等。
对于 MongoDB 复制集来说,默认情况下,成员间通过 Oplog 实现的数据同步是有延迟的。因此,为了实现数据的实时同步,且能将数据同步到异构系统中,从3.6版本开始,MongoDB 提供了 Change Steams 功能,允许用户非常方便地将实时变更数据同步到下游系统进行处理
其实在3.6版本之前,如果要实现这种实时同步,开发人员也可以通过实时解析复制集 Oplog 里面的日志条目来完成,只不过这种方式需要额外开发代码,实现起来较复杂。
实现原理

在应用程序里面,开启数据库或集合上的监听,一旦捕获到数据变更事件,就会产生变更流数据(类型为文档),变更流里面包含具体的动作(如 insert、delete、update 等)和变更的文档,应用程序可以将此变更流数据发送到下游系统,由下游系统进一步处理(如完成下游系统相应数据变更,实现数据实时同步)。

本质上,Change Streams 特性,可以完成与 Kafka 或 RabbitMQ 等消息组件类似的功能,这样当需要将 MongoDB 集群中的数据,向异构系统实时同步时,我们就不需要额外再部署一套类似 Kafka 等消息处理的集群了。

Change Streams 整体流程如下图所示:

Change Streams 整体流程

可以看到直接打开 MongoDB 的 Change Streams 变更流监听,就可以实现向异构下游系统实时同步数据。。
实时流数据的格式

复制集与下游系统间的数据同步依赖于实时生成的变更流数据,实时流数据的格式为文档类型,包含如下字段:

{
   _id : { <BSON Object> },        //已打开的变更流标识,可以作为值赋给参数resumeAfter,用来后续恢复此变更流
   "operationType" : "<operation>",        //发生的变更操作类型,如:insert、delete、update等
   "fullDocument" : { <document> },      //变更操作所涉及的完整文档数据,删除操作里面没有这个字段
   "ns" : {   
      "db" : "<database>",         //变更操作发生在哪个数据库上
      "coll" : "<collection>"     //变更操作发生在哪个集合上
   },
   "to" : {   //当操作类型为rename时,才显示这几个字段
      "db" : "<database>",         //变更后的新数据库名
      "coll" : "<collection>"     //变更后的新集合名
   },
   "documentKey" : { "_id" : <value> },  //变更操作所涉文档的_id字段值
   "updateDescription" : {    //修改操作描述
      "updatedFields" : { <document> },  //修改操作修改了什么字段及值
      "removedFields" : [ "<field>", ... ]     //修改操作删除了什么字段及值
   }
   "clusterTime" : <Timestamp>,     //变更操作对应的Oplog日志条目上的时间
   "txnNumber" : <NumberLong>,    //如果变更操作在一个多文档事务里面执行,则显示此字段及值,表示事务的编号
   "lsid" : {          //表示事务所在的Session相关信息
      "id" : <UUID>,  
      "uid" : <BinData> 
   }
}

打开实时数据流

打开一个实时数据流,会返回一个 cursor,变更的数据可以通过循环遍历 cursor 获得,相当于打开一个水龙头,水会源源不断地流过来。

针对不同编程语言的驱动,MongoDB 都提供了相应的 API 来打开实时数据流,下面以 Python 为例子进行说明,如下客户端应用代码:

from pymongo import MongoClient
import pprint
client=MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
cursor = db.inventory.watch() 
for doc in cursor:
        print(doc)

其中,语句db.inventory.watch()表示打开一个实时变更流,监听集合 inventory 上的任何数据变化。

for 循环语句对游标循环遍历,实时打印变更流里面的文档。

先运行上面的代码,再通过 mongo 连接到复制集,模拟向 inventory 集合插入、修改、删除数据,观察上面的代码是否能实时输出流数据。

插入数据语句如下:

rs0:PRIMARY> db.inventory.insert({ "_id" : 20, "model" : "SIM", "count" : 1000})

如果实时输出如下流数据,说明打开的实时数据流是正确的:

{'operationType': 'insert', 'clusterTime': Timestamp(1594645788, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}, 'fullDocument': {'model': 'SIM', '_id': 20.0, 'count': 1000.0}, '_id': {'_typeBits': b'@', '_data': '825F0C5D1C000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}}

同理,测试删除数据,如下语句:

rs0:PRIMARY> db.inventory.deleteOne({"_id":20})

也能实时输出如下信息:

{'operationType': 'delete', '_id': {'_typeBits': b'@', '_data': '825F0C5E3A000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}, 'clusterTime': Timestamp(1594646074, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}}

*注意:删除变更操作,输出流数据不包含字段'fullDocument'

最后,测试下修改数据,如下语句:

rs0:PRIMARY> db.inventory.update({"_id":19},{$set:{"count":2999}})

实时输出如下流数据:

{'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'count': 2999.0}}, 'clusterTime': Timestamp(1594646292, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 19.0}, '_id': {'_typeBits': b'@', '_data':'825F0C5F14000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B260004'}}

注意:默认情况下对于 update 操作,输出的实时流数据也不会包含字段'fullDocument';但是可以在打开变更流的方法里传入可选参数full_document= 'updateLookup'实现输出的实时流数据包含'fullDocument'字段及值,如带参数语句:cursor = db.inventory.watch(full_document='updateLookup')

控制实时流数据的输出

在有些场景下,需要控制实时流的输出,希望将不同的流数据传给不同的下游系统进行处理,类似快递公司的包裹分拣系统,将送往不同地方的包裹分开,如下图所示:

 

MongoDB提供了一种管道模式来处理这些数据流,当流数据经过预先配置好的管道时,数据会依次被管道中的每一个步骤进行处理。

这种数据处理模式与MongoDB自带的管道模式聚集框架类似。

如下代码示例:

from pymongo import MongoClient
import pprint
client= MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
pipeline = [
    {'$match':{'fullDocument.model':'SIM'}},
    {'$addFields':{'newField':'this is an added field'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
for doc in cursor:
        print(doc)

先构造一个管道,然后在打开实时数据流时传入管道参数。

通过管道参数,从数据流里过滤出满足'fullDocument.model':'SIM'条件的数据流,然后再向数据流添加一个额外的'newField'字段。经过管道处理后的数据流可以被下游系统作进一步处理。

针对 MongoDB 4.2 版本,其它还可被使用的管道操作符有:$project$replaceRoot$replaceWith$redact$set$unset

注意:上面代码对实时数据流的处理只是简单的循环打印,如果需将数据实时同步到其它系统中,如 MySQL、Hbase 等,需要应用开发人员进一步编写相应的逻辑代码进行处理。

 

关于作者:郭远威

MongoDB 中文社区长沙分会主席,资深大数据架构师,著有《大数据存储MongoDB实战指南》一书。通信行业业务架构与数据迁移专家,先后在华为、中兴工作十余年;曾负责实施了海外多个运营商的大数据迁移及 BI 等大数据系统的设计开发。

赞(10)
未经允许不得转载:MongoDB中文社区 » 技术干货 | 如何利用 MongoDB Change Streams 实现数据实时同步?

评论 抢沙发

评论前必须登录!