充分获知数据库的数据变动是从MongoDB向其他数据服务进行数据同步的关键点。与直接查询collection来获取数据变动相比,通过流式的方式进行监听会有效并及时的多。这是一种非常强大的“响应式编程”模式。随着MongoDB的版本更新,流式的获取方式将变得原来越易用。
让我们来一同回顾一下。在MongoDB3.6之前,如果我们希望对MongoDB数据库中的数据变动进行监听,我们通常是通过 “监听并回放oplog”(“tail the oplog”)的模式(oplog表将会记录复制集中的数据变动)。在生产环境中这种方式(“监听并回放oplog”)通常较为复杂,并且难以保证其稳定与可靠性。
Change Streams and Collections
从MongoDB3.6开始支持的 Change Streams打破了这个僵局。 Change Streams使得数据的变动监听变得简单易用。以下是一个示例,该示例演示了通过Node.js对“movieDetails”表的变动监听。
javascript const MongoClient = require("mongodb").MongoClient; const uri = "MONGODBURL"; const client = new MongoClient(uri, { useNewUrlParser: true }); client.connect().then(db => { const changeStream = client.db("video").collection("movieDetails").watch(); changeStream.on("change", next => { console.log(next); }); });
上述代码首先连接进入了数据实例,并通过watch()函数对“video”库的“movieDetails”表建立了change stream。而后通过.on(“change”,… 建立了一个事件trigger,该事件将监听该change stream上的所有变动并调用对应的后续函数。在上述示例中,监听到变动后将会将变动事件打印出来。下面是我们在 MongoDB Compass中进行对应修改后的输出示例:
javascript { _id: { _data: '825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004' }, operationType: 'replace', clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1548865599 }, fullDocument: { _id: 5c0ec4b74b052f9e2ef0c381, title: 'PS I Love You', year: 2007, ... awards: { wins: 2, nominations: 4, text: '2 wins & 4 nominations.' }, type: 'movie' }, ns: { db: 'video', coll: 'movieDetails' }, documentKey: { _id: 5c0ec4b74b052f9e2ef0c381 } }
在上述的返回中可以快速的找到此次change stream的重要信息,即通过operationType了解到变动的类型,有关完整的返回说明请参考 [Change Events documentation]。当监听某一个collection的时候,operationType的值通常是 insert , update , replace , delete 或 invalidate ,前四种的含义通过名字可以清楚的获知,上述返回的replace类型是我们通过Compass同collection进行replace操作的反馈。
当我们监听的collection被drop、改名或者其所属的db被drop的时候,我们将会看到类型为invalidate的operationType。于此同时这也意味着是时候关闭change stream了。上述返回中剩下的部分是变动的详细信息,变动发生在什么namespace,数据是什么样的,何时发生的变更。
以上的示例是在MongoDB4.x版本中生成的,相比3.6版本,4.x版本新增了一个_data字段。该字段是一个恢复token(resume token),应用程序能够在重连后从该点进行继续监听。
Beyond Collections
如果你只需要针对某一个collection进行变动监听,MongoDB3.6就可以满足你的需求,但是对于那些此前通过oplog来进行变动监听的同学,他们的诉求往往是希望监听数据库中的所有变动,以此来将变动应用到其他系统中。MongoDB4.0很好的满足了这个诉求,在4.0版本中我们可以针对若干个数据库或者整个实例(复制集或者sharding)进行变动监听。与watch()
某一个collection不同,4.0中我们可以watch()
某个数据库或者整个实例。
javascript const MongoClient = require("mongodb").MongoClient; const uri ="MONGODBURL"; undefined const client = new MongoClient(uri, { useNewUrlParser: true }); client.connect().then(db => { const changeStream = client.watch(); changeStream.on("change", next => { console.log(next); });
上述示例将对于任何数据库、任何表的任何变动进行输出,这些也不是我们所捕获的全部信息。由于我们将监听范围放到了最广,我们也将会看到在删除collection时候的删除事件、删除数据库的时间以及重命名collection的事件。
What Next?
我们可以根据实际需要选择监听某一个collection的变动、或者某个数据库中所有collection的变动又或者是整个实例中所有的数据库与collection的变动。需要注意的是创建新collection、数据库的变动将不会被直接监听到,不过我们可以通过变动中的内容间接获知。
当然,这也不是什么大问题,如果我们希望监听数据库或者collection的创建,我们可以通过变动内容中的collection来判断是否该表为此前未创建的新表这一方法进行。另外,索引的创建由于不是为表数据变动也不会被监听捕获。
MongoDB4.0为我们带来了一个全新且强大的数据变动监听方式,尤其是该方式可以实时进行变动捕获。我们十分建议你去尝试下这个功能。Change Stream的详细文档可以参考[Change Streams]。如果你还未安装MongoDB4.0实例,你也可以在MongoDB Atlas中[注册]并获取M0的免费集群节点进行学习和测试。
译者:周李洋
Teambition运维总监
MongoDB中文社区联席主席
我一直很疑惑这个watch的工作方式
如果有两个watch 是否两个watch都能接收到相同的内容?
watch能否以指定的时间开始获取监听呢?
如果基于watch做跨机房数据同步 那么数据回环问题如何解决呢?
4.x版本 _data 很棒!
1. 是
2. 第一次没有办法指定,以后都可以从指定的断点开始
3. 解释一下”数据回环问题”具体是什么问题?