翻译或纠错本页面

对增量数据做Map-Reduce

Map-reduce操作可以处理复杂的聚合任务。通过 mapReduce 命令和 mongo 命令行下封装好的 db.collection.mapReduce() 方法,都可以执行map-reduce操作。

如果数据集的记录数是不断增长的,你可能希望只对增量数据做map-reduce,而不是在整个数据集上面执行 map-reduce 。

对增量数据做map-reduce:

  1. 在当前的集合上执行 map-reduce ,并把结果输出到其他的集合。

  2. 当你有增量数据需要处理时,运行另外的 map-reduce:

    • query 参数中指定只查询新的增量文档:

    • out 参数中指定 reduce 函数来合并结果并输出到指定的集合中。

例如,你计划每天晚上在集合 sessions 上执行一次 map-reduce 操作。

创建数据

The sessions collection contains documents that log users’ sessions each day, for example:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

在当前集合初始化Map-Reduce

首次运行map-reduce:

  1. 定义map函数,它映射用户的数据到新的文档中,新文档的字段有 userid , total_time , count , 和 avg_time :

    var mapFunction = function() {
                          var key = this.userid;
                          var value = {
                                        userid: this.userid,
                                        total_time: this.length,
                                        count: 1,
                                        avg_time: 0
                                       };
    
                          emit( key, value );
                      };
    
  2. 定义使用两个参数 keyvalues reduce函数,该函数会计算总时间和总个数。 key 对应的就是 useridvalues 是一个数组,其中的元素就是上一步 mapFunction 函数输出的文档。

    var reduceFunction = function(key, values) {
    
                            var reducedObject = {
                                                  userid: key,
                                                  total_time: 0,
                                                  count:0,
                                                  avg_time:0
                                                };
    
                            values.forEach( function(value) {
                                                  reducedObject.total_time += value.total_time;
                                                  reducedObject.count += value.count;
                                            }
                                          );
                            return reducedObject;
                         };
    
  3. 定义使用两个参数 keyreducedValue 的完成函数。该方法在 reducedValue 文档中增加 average 字段,并返回修改后的文档。

    var finalizeFunction = function (key, reducedValue) {
    
                              if (reducedValue.count > 0)
                                  reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    
                              return reducedValue;
                           };
    
  4. session 集合上执行map-reduce,包含 mapFunctionreduceFunction ,和 finalizeFunction 函数。将结果输出到 session_stat 集合中。如果 session_stat 集合已经存在,本次操作会先删除旧的内容:

    db.sessions.mapReduce( mapFunction,
                           reduceFunction,
                           {
                             out: "session_stat",
                             finalize: finalizeFunction
                           }
                         )
    

对增量数据做Map-Reduce

以后,如果 sessions 有了新的数据记录,你可以运行增量数据的map-recude操作。例如,在 sessions 集合增加一些新的文档:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

At the end of the day, perform incremental map-reduce on the sessions collection, but use the query field to select only the new documents. Output the results to the collection session_stat, but reduce the contents with the results of the incremental map-reduce:

db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
                         out: { reduce: "session_stat" },
                         finalize: finalizeFunction
                       }
                     );