对增量数据做Map-Reduce¶
On this page
Map-reduce操作可以处理复杂的聚合任务。通过 mapReduce 命令和 mongo 命令行下封装好的 db.collection.mapReduce() 方法,都可以执行map-reduce操作。
如果数据集的记录数是不断增长的,你可能希望只对增量数据做map-reduce,而不是在整个数据集上面执行 map-reduce 。
对增量数据做map-reduce:
在当前的集合上执行 map-reduce ,并把结果输出到其他的集合。
当你有增量数据需要处理时,运行另外的 map-reduce:
在 query 参数中指定只查询新的增量文档:
在 out 参数中指定 reduce 函数来合并结果并输出到指定的集合中。
例如,你计划每天晚上在集合 sessions 上执行一次 map-reduce 操作。
创建数据¶
The sessions collection contains documents that log users’ sessions each day, for example:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
在当前集合初始化Map-Reduce¶
首次运行map-reduce:
定义map函数,它映射用户的数据到新的文档中,新文档的字段有 userid , total_time , count , 和 avg_time :
var mapFunction = function() { var key = this.userid; var value = { userid: this.userid, total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); };
定义使用两个参数 key 和 values reduce函数,该函数会计算总时间和总个数。 key 对应的就是 userid , values 是一个数组,其中的元素就是上一步 mapFunction 函数输出的文档。
var reduceFunction = function(key, values) { var reducedObject = { userid: key, total_time: 0, count:0, avg_time:0 }; values.forEach( function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; } ); return reducedObject; };
定义使用两个参数 key 和 reducedValue 的完成函数。该方法在 reducedValue 文档中增加 average 字段,并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; };
在 session 集合上执行map-reduce,包含 mapFunction , reduceFunction ,和 finalizeFunction 函数。将结果输出到 session_stat 集合中。如果 session_stat 集合已经存在,本次操作会先删除旧的内容:
db.sessions.mapReduce( mapFunction, reduceFunction, { out: "session_stat", finalize: finalizeFunction } )
对增量数据做Map-Reduce¶
以后,如果 sessions 有了新的数据记录,你可以运行增量数据的map-recude操作。例如,在 sessions 集合增加一些新的文档:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
At the end of the day, perform incremental map-reduce on the sessions collection, but use the query field to select only the new documents. Output the results to the collection session_stat, but reduce the contents with the results of the incremental map-reduce:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
out: { reduce: "session_stat" },
finalize: finalizeFunction
}
);