在应用程序里面,开启数据库或集合上的监听,一旦捕获到数据变更事件,就会产生变更流数据(类型为文档),变更流里面包含具体的动作(如 insert、delete、update 等)和变更的文档,应用程序可以将此变更流数据发送到下游系统,由下游系统进一步处理(如完成下游系统相应数据变更,实现数据实时同步)。
本质上,Change Streams 特性,可以完成与 Kafka 或 RabbitMQ 等消息组件类似的功能,这样当需要将 MongoDB 集群中的数据,向异构系统实时同步时,我们就不需要额外再部署一套类似 Kafka 等消息处理的集群了。
Change Streams 整体流程如下图所示:
复制集与下游系统间的数据同步依赖于实时生成的变更流数据,实时流数据的格式为文档类型,包含如下字段:
{
_id : { <BSON Object> }, //已打开的变更流标识,可以作为值赋给参数resumeAfter,用来后续恢复此变更流
"operationType" : "<operation>", //发生的变更操作类型,如:insert、delete、update等
"fullDocument" : { <document> }, //变更操作所涉及的完整文档数据,删除操作里面没有这个字段
"ns" : {
"db" : "<database>", //变更操作发生在哪个数据库上
"coll" : "<collection>" //变更操作发生在哪个集合上
},
"to" : { //当操作类型为rename时,才显示这几个字段
"db" : "<database>", //变更后的新数据库名
"coll" : "<collection>" //变更后的新集合名
},
"documentKey" : { "_id" : <value> }, //变更操作所涉文档的_id字段值
"updateDescription" : { //修改操作描述
"updatedFields" : { <document> }, //修改操作修改了什么字段及值
"removedFields" : [ "<field>", ... ] //修改操作删除了什么字段及值
}
"clusterTime" : <Timestamp>, //变更操作对应的Oplog日志条目上的时间
"txnNumber" : <NumberLong>, //如果变更操作在一个多文档事务里面执行,则显示此字段及值,表示事务的编号
"lsid" : { //表示事务所在的Session相关信息
"id" : <UUID>,
"uid" : <BinData>
}
}
打开一个实时数据流,会返回一个 cursor,变更的数据可以通过循环遍历 cursor 获得,相当于打开一个水龙头,水会源源不断地流过来。
针对不同编程语言的驱动,MongoDB 都提供了相应的 API 来打开实时数据流,下面以 Python 为例子进行说明,如下客户端应用代码:
from pymongo import MongoClient
import pprint
client=MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
cursor = db.inventory.watch()
for doc in cursor:
print(doc)
其中,语句db.inventory.watch()
表示打开一个实时变更流,监听集合 inventory 上的任何数据变化。
for 循环语句对游标循环遍历,实时打印变更流里面的文档。
先运行上面的代码,再通过 mongo 连接到复制集,模拟向 inventory 集合插入、修改、删除数据,观察上面的代码是否能实时输出流数据。
插入数据语句如下:
rs0:PRIMARY> db.inventory.insert({ "_id" : 20, "model" : "SIM", "count" : 1000})
如果实时输出如下流数据,说明打开的实时数据流是正确的:
{'operationType': 'insert', 'clusterTime': Timestamp(1594645788, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}, 'fullDocument': {'model': 'SIM', '_id': 20.0, 'count': 1000.0}, '_id': {'_typeBits': b'@', '_data': '825F0C5D1C000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}}
同理,测试删除数据,如下语句:
rs0:PRIMARY> db.inventory.deleteOne({"_id":20})
也能实时输出如下信息:
{'operationType': 'delete', '_id': {'_typeBits': b'@', '_data': '825F0C5E3A000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B280004'}, 'clusterTime': Timestamp(1594646074, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 20.0}}
*注意:删除变更操作,输出流数据不包含字段'fullDocument'
最后,测试下修改数据,如下语句:
rs0:PRIMARY> db.inventory.update({"_id":19},{$set:{"count":2999}})
实时输出如下流数据:
{'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'count': 2999.0}}, 'clusterTime': Timestamp(1594646292, 1), 'ns': {'coll': 'inventory', 'db': 'crm'}, 'documentKey': {'_id': 19.0}, '_id': {'_typeBits': b'@', '_data':'825F0C5F14000000012B022C0100296E5A10040CBC8551DC064D74B3BFCD35FAF2377D461E5F6964002B260004'}}
注意:默认情况下对于 update 操作,输出的实时流数据也不会包含字段'fullDocument'
;但是可以在打开变更流的方法里传入可选参数full_document= 'updateLookup'
实现输出的实时流数据包含'fullDocument'
字段及值,如带参数语句:cursor = db.inventory.watch(full_document='updateLookup')
在有些场景下,需要控制实时流的输出,希望将不同的流数据传给不同的下游系统进行处理,类似快递公司的包裹分拣系统,将送往不同地方的包裹分开,如下图所示:
MongoDB提供了一种管道模式来处理这些数据流,当流数据经过预先配置好的管道时,数据会依次被管道中的每一个步骤进行处理。
这种数据处理模式与MongoDB自带的管道模式聚集框架类似。
如下代码示例:
from pymongo import MongoClient
import pprint
client= MongoClient('mongodb://192.168.85.128:60001,192.168.85.128:60002, 192.168.85.128:60003/?replicaSet=rs0')
db = client.crm
pipeline = [
{'$match':{'fullDocument.model':'SIM'}},
{'$addFields':{'newField':'this is an added field'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
for doc in cursor:
print(doc)
先构造一个管道,然后在打开实时数据流时传入管道参数。
通过管道参数,从数据流里过滤出满足'fullDocument.model':'SIM'
条件的数据流,然后再向数据流添加一个额外的'newField'
字段。经过管道处理后的数据流可以被下游系统作进一步处理。
针对 MongoDB 4.2 版本,其它还可被使用的管道操作符有:$project
、$replaceRoot
、$replaceWith
、$redact
、$set
、$unset
注意:上面代码对实时数据流的处理只是简单的循环打印,如果需将数据实时同步到其它系统中,如 MySQL、Hbase 等,需要应用开发人员进一步编写相应的逻辑代码进行处理。
关于作者:郭远威
评论前必须登录!
注册