MongoDB爱好者
垂直技术交流平台

【五分钟了解MongoDB】Change Stream 和MongoDB 4.x

充分获知数据库的数据变动是从MongoDB向其他数据服务进行数据同步的关键点。与直接查询collection来获取数据变动相比,通过流式的方式进行监听会有效并及时的多。这是一种非常强大的“响应式编程”模式。随着MongoDB的版本更新,流式的获取方式将变得原来越易用。

让我们来一同回顾一下。在MongoDB3.6之前,如果我们希望对MongoDB数据库中的数据变动进行监听,我们通常是通过 “监听并回放oplog”(“tail the oplog”)的模式(oplog表将会记录复制集中的数据变动)。在生产环境中这种方式(“监听并回放oplog”)通常较为复杂,并且难以保证其稳定与可靠性。

Change Streams and Collections

从MongoDB3.6开始支持的 Change Streams打破了这个僵局。 Change Streams使得数据的变动监听变得简单易用。以下是一个示例,该示例演示了通过Node.js对“movieDetails”表的变动监听。

javascript
const MongoClient = require("mongodb").MongoClient;
const uri = "MONGODBURL";
const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.db("video").collection("movieDetails").watch();
 changeStream.on("change", next => {
   console.log(next);
 });
});

上述代码首先连接进入了数据实例,并通过watch()函数对“video”库的“movieDetails”表建立了change stream。而后通过.on(“change”,… 建立了一个事件trigger,该事件将监听该change stream上的所有变动并调用对应的后续函数。在上述示例中,监听到变动后将会将变动事件打印出来。下面是我们在 MongoDB Compass中进行对应修改后的输出示例:

javascript
{ _id:   
   { _data:      '825C51D03F0000000129295A1004E515B4338C574BA2B9603CB1C7FB3B0446645F696400645C0EC4B74B052F9E2EF0C3810004' },  operationType: 'replace', 
 clusterTime:   
  Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1548865599 }, 
 fullDocument:
   { _id: 5c0ec4b74b052f9e2ef0c381,
     title: 'PS I Love You',
     year: 2007,
     ...
     awards: { wins: 2, nominations: 4, text: '2 wins & 4 nominations.' },
     type: 'movie' },
  ns: { db: 'video', coll: 'movieDetails' },
  documentKey: { _id: 5c0ec4b74b052f9e2ef0c381 }
 }

在上述的返回中可以快速的找到此次change stream的重要信息,即通过operationType了解到变动的类型,有关完整的返回说明请参考 [Change Events documentation]。当监听某一个collection的时候,operationType的值通常是 insert , update , replace , delete 或 invalidate ,前四种的含义通过名字可以清楚的获知,上述返回的replace类型是我们通过Compass同collection进行replace操作的反馈。

当我们监听的collection被drop、改名或者其所属的db被drop的时候,我们将会看到类型为invalidate的operationType。于此同时这也意味着是时候关闭change stream了。上述返回中剩下的部分是变动的详细信息,变动发生在什么namespace,数据是什么样的,何时发生的变更。

以上的示例是在MongoDB4.x版本中生成的,相比3.6版本,4.x版本新增了一个_data字段。该字段是一个恢复token(resume token),应用程序能够在重连后从该点进行继续监听。

Beyond Collections

如果你只需要针对某一个collection进行变动监听,MongoDB3.6就可以满足你的需求,但是对于那些此前通过oplog来进行变动监听的同学,他们的诉求往往是希望监听数据库中的所有变动,以此来将变动应用到其他系统中。MongoDB4.0很好的满足了这个诉求,在4.0版本中我们可以针对若干个数据库或者整个实例(复制集或者sharding)进行变动监听。与watch()某一个collection不同,4.0中我们可以watch()某个数据库或者整个实例。

javascript
const MongoClient = require("mongodb").MongoClient;
const uri ="MONGODBURL";
undefined
const client = new MongoClient(uri, { useNewUrlParser: true });
client.connect().then(db => {
 const changeStream = client.watch();
 changeStream.on("change", next => {
   console.log(next);
});

上述示例将对于任何数据库、任何表的任何变动进行输出,这些也不是我们所捕获的全部信息。由于我们将监听范围放到了最广,我们也将会看到在删除collection时候的删除事件、删除数据库的时间以及重命名collection的事件。

What Next?

我们可以根据实际需要选择监听某一个collection的变动、或者某个数据库中所有collection的变动又或者是整个实例中所有的数据库与collection的变动。需要注意的是创建新collection、数据库的变动将不会被直接监听到,不过我们可以通过变动中的内容间接获知。

当然,这也不是什么大问题,如果我们希望监听数据库或者collection的创建,我们可以通过变动内容中的collection来判断是否该表为此前未创建的新表这一方法进行。另外,索引的创建由于不是为表数据变动也不会被监听捕获。

MongoDB4.0为我们带来了一个全新且强大的数据变动监听方式,尤其是该方式可以实时进行变动捕获。我们十分建议你去尝试下这个功能。Change Stream的详细文档可以参考[Change Streams]。如果你还未安装MongoDB4.0实例,你也可以在MongoDB Atlas中[注册]并获取M0的免费集群节点进行学习和测试。

译者:周李洋
Teambition运维总监
MongoDB中文社区联席主席

赞(13)
未经允许不得转载:MongoDB中文社区 » 【五分钟了解MongoDB】Change Stream 和MongoDB 4.x

评论 2

评论前必须登录!

 

  1. #1

    我一直很疑惑这个watch的工作方式
    如果有两个watch 是否两个watch都能接收到相同的内容?
    watch能否以指定的时间开始获取监听呢?
    如果基于watch做跨机房数据同步 那么数据回环问题如何解决呢?

    4.x版本 _data 很棒!

    794334212@qq.com6年前 (2019-07-06)
    • 1. 是
      2. 第一次没有办法指定,以后都可以从指定的断点开始
      3. 解释一下”数据回环问题”具体是什么问题?

      xica6年前 (2019-07-09)