UNPKG

25.4 kBJavaScriptView Raw
1var EventEmitter = require ('events').EventEmitter,
2 crypto = require ('crypto'),
3 util = require ('util'),
4 urlUtil = require ('url'),
5 spawn = require ('child_process').spawn,
6 mongo = require ('mongodb'),
7 task = require ('./base');
8
9/**
10 * @author
11 * @docauthor
12 * @class task.mongoRequest
13 * @extends task.task
14 *
15 * A class for creating MongoDB-related tasks.
16 *
17 * To use, set {@link #className} to `"mongoRequest"`.
18 *
19 * ### Example
20 *
21 {
22 flows: [{
23 url: "/entity/suggest",
24
25 tasks: [{
26 functionName: "parseFilter',
27 url: "{$request.url}",
28 produce: "data.suggest"
29 }, {
30 className: "mongoRequest",
31 connector: "mongo",
32 collection: "messages",
33 filter: "{$data.suggest.tag}",
34 produce: "data.records"
35 }, {
36 className: "renderTask",
37 type: "json",
38 data: "{$data.records}",
39 output: "{$response}"
40 }]
41 }]
42 }
43 *
44 * @cfg {String} connector (required) The **config name** for connector
45 * in the project configuration object or prepared **MongoDB.Db connection**
46 *
47 * @cfg {String} collection (required) The collection name from MongoDB.
48 *
49 * @cfg {String} [method="run"] The name of the method name to be called
50 * after the task requirements are statisfied.
51 *
52 * Possible values:
53 *
54 * - `run`, selects from the DB
55 * - `insert`, inserts into the DB
56 * - `update`, updates records in the DB
57 * - `remove`, removes records from the DB
58 *
59 * @cfg {String} filter (required) The name of the property of the dataflow
60 * instance or the identifier of an object with filter fields for `select`,
61 * `insert` or `update` methods (see {@link #method}). Filter can be mongo's
62 * ObjectID, ObjectID array (in such case mongo requested with {$in: []})
63 * or real {@link http://www.mongodb.org/display/DOCS/Querying mongo query}
64 */
65var mongoRequestTask = module.exports = function (config) {
66
67 this.timestamp = true;
68 this.insertingSafe = false;
69
70 /* aliases */
71 this.find = this.run;
72
73 this.init (config);
74
75};
76
77mongo.Db.prototype.open = function (callback) {
78 var self = this;
79
80 if (self._state == 'connected') {
81 return callback (null, self);
82 }
83
84 // Set the status of the server
85 if (this.openCalled)
86 self._state = 'connecting';
87
88 // Set up connections
89 if(self.serverConfig instanceof mongo.Server || self.serverConfig instanceof mongo.ReplSet) {
90 if (!this._openCallbacks) this._openCallbacks = [];
91
92 if (callback)
93 this._openCallbacks.push (callback);
94
95 if (!this.openCalled) self.serverConfig.connect(self, {firstCall: true}, function(err, result) {
96
97 if(err != null) {
98 // Return error from connection
99 self.emit ('error', err);
100 self._openCallbacks.map (function (item) {
101 item (err, null);
102 });
103 self._openCallbacks = [];
104 return;
105 }
106 // Set the status of the server
107 self._state = 'connected';
108 // Callback
109 self.emit ('open', self);
110 self._openCallbacks.map (function (item) {
111 item (null, self);
112 });
113 self._openCallbacks = [];
114 return;
115 });
116
117 // Set that db has been opened
118 this.openCalled = true;
119 } else {
120 var err = new Error ("Server parameter must be of type Server or ReplSet");
121 self.emit ('error', err);
122 return callback(err, null);
123 }
124};
125
126
127util.inherits (mongoRequestTask, task);
128
129util.extend (mongoRequestTask.prototype, {
130
131
132 _log : function(){
133 var self = this;
134 if (self.verbose){
135 console.log.apply (console, arguments);
136 }
137 },
138
139 // private method get connector
140
141 _getConnector: function () {
142
143 // connector is real connector object
144 if (!this.connector.substring && this.connector.open)
145 return this.connector;
146
147 // get connector config from project if it created
148 if (project.connectors[this.connector]) {
149 return project.connectors[this.connector];
150 }
151
152 // otherwise create connector from project config and add to project.connectors
153
154 var connectorConfig = project.config.db[this.connector];
155
156 var connOptions;
157 if (!connectorConfig.options)
158 connectorConfig.options = {};
159
160 connOptions = connectorConfig.options;
161
162 /*
163 if (!connOptions.hasOwnProperty('native_parser'))
164 connOptions['native_parser'] = true;
165 */
166
167 if (!connOptions.hasOwnProperty('journal') ||
168 !connOptions.hasOwnProperty('w') ||
169 !connOptions.hasOwnProperty('fsync'))
170 connOptions['journal'] = true;
171
172 // create connector
173 if (this.verbose) {
174 console.log ("new mongo connector:", connectorConfig);
175 }
176
177 var connector = new mongo.Db (
178 connectorConfig.database,
179 new mongo.Server (connectorConfig.host, connectorConfig.port),
180 connOptions
181 );
182
183 project.connectors[this.connector] = connector;
184 project.connections[this.connector] = {};
185
186 return connector;
187 },
188
189 // private method to collection open
190
191 _openCollection: function (cb) {
192
193 var self = this;
194
195 // get db client
196 var client = self._getConnector ();
197
198 if (this.verbose)
199 console.log (
200 'checking project.connections', self.connector, self.collection,
201 project.connections[self.connector][self.collection] === void 0 ? 'not cached' : 'cached'
202 );
203
204 // check collection existing in cache
205 // if collection cached - return through callback this collection
206 if (project.connections[self.connector][self.collection]) {
207 cb.call (self, false, project.connections[self.connector][self.collection]);
208 return;
209 }
210
211 // otherwise open db connection
212 client.open (function (err, p_client) {
213 // get collection
214 client.collection (self.collection, function (err, collection) {
215 if (err) {
216 console.log (err);
217 } else {
218 // add to collections cache
219 if (this.verbose)
220 console.log ('storing project.connections', self.connector, self.collection);
221 project.connections[self.connector][self.collection] = collection;
222 }
223 cb.call (self, err, collection);
224 });
225 });
226 },
227
228 objectId: function () {
229 this.completed(this._objectId(this.id));
230 },
231
232 // private method to create ObjectID
233
234 _objectId: function (hexString) {
235
236 if (!hexString) return null;
237
238 var ObjectID = project.connectors[this.connector].bson_serializer.ObjectID;
239
240 if (hexString.constructor === ObjectID) return hexString;
241
242 var id;
243
244 try {
245 id = new ObjectID(hexString);
246 } catch (e) {
247 console.error(hexString);
248 id = hexString.toString();
249 }
250
251 if (this.verbose) console.log('ObjectID',id);
252
253 return id;
254 },
255
256 // actually, it's a fetch function
257
258 run: function () {
259 var self = this;
260
261 if (this.verbose)
262 self.emit ('log', 'run called');
263
264 // primary usable by Ext.data.Store
265 // we need to return {data: []}
266
267 // open collection
268 self._openColOrFail(function (collection) {
269 var filter = self.filter,
270 options = self.options || {},
271 sort = self.sort || (self.pager && self.pager.sort) || {};
272
273 if (self.pager) {
274 if (self.pager.limit) {
275 options.limit = self.pager.limit;
276 options.page = self.pager.page || 0;
277 //options.skip = self.pager.start || 0;
278 options.skip = self.pager.start || options.limit * options.page;
279 }
280
281 if (!filter) filter = self.pager.filter;
282 }
283
284 options.sort = sort;
285
286 if (self.verbose)
287 console.log ("collection.find >> ", self.collection, filter, options );
288
289 // find by filter or all records
290 if (filter) {
291 if (filter.constructor === Array)
292 filter = {_id: {'$in': filter}};
293 // filter is string
294 if (filter.constructor === String) {
295 filter = {_id: self._objectId (filter)};
296 // filter is hash
297 } else if (filter._id) {
298 // filter._id is string
299 if (filter._id.constructor === String) filter._id = self._objectId (filter._id);
300 // filter._id is hash with $in quantificators
301 if (filter._id['$in']) {
302 filter._id['$in'] = filter._id['$in'].map(function(id) {
303 return self._objectId(id);
304 });
305 }
306 }
307 }
308
309 //remap options fields
310 if (options.fields) {
311 var fields = options.fields,
312 include = fields["$inc"],
313 exclude = fields["$exc"];
314
315 delete fields.$inc;
316 delete fields.$exc;
317
318 if (include) {
319 include.map(function(field) {fields[field] = 1});
320 } else if (exclude) {
321 exclude.map(function(field) {fields[field] = 0})
322 }
323 }
324
325 if (self.verbose)
326 console.log ("collection.find", self.collection, filter, options);
327
328 var cursor = collection.find(filter, options);
329 cursor.toArray (function (err, docs) {
330
331 if (self.verbose)
332 console.log ("findResult", docs && docs.length || 0);
333
334 if (docs) {
335 docs.map (function (item) {
336 if (self.verbose) console.log(item._id);
337 if (self.mapping) {
338 self.mapFields (item);
339 }
340 });
341 }
342
343 cursor.count(function (err, n) {
344 self.completed ({
345 success: (err == null),
346 total: n || 0,
347 err: err,
348 data: docs
349 });
350
351 if (!err & 0 == n) {
352 self.empty();
353 }
354 });
355 });
356 });
357 },
358
359 insert: function () {
360
361 var self = this;
362
363 if (!self.data) self.data = {};
364
365 if (self.verbose) {
366 self.emit ('log', 'insert called ' + self.data);
367 }
368
369 self._openCollection (function (err, collection) {
370
371 if (self.data.constructor != Array) {
372 self.data = [self.data];
373 }
374
375 var docsId = [];
376 self.data = self.data.map(function(item) {
377
378 var clone = util.extend(true, {}, item);
379
380 if (self.timestamp) {
381 clone.created = clone.updated = ~~(new Date().getTime()/1000);
382 }
383 if (clone._id == null || clone._id == '') {
384 delete clone._id;
385 } else {
386 docsId.push(clone._id);
387 }
388
389 return clone;
390
391 });
392
393 /* MODIFIED: optionally check if records already in collection by self.filter, otherwise by _id
394 * if records found :
395 * if self.forceUpdate is true of updateData is provided
396 * : update records using updateData or data
397 * if records not found : insert
398 */
399
400 var filter = self.filter || {_id: {$in: docsId}};
401
402 self._log('Filter: ', filter, ', Update: ', self.updateData);
403
404 if (self.insertingSafe) {
405
406 // find any records alredy stored in db
407
408 self._log('insertingSafe data = ', self.data);
409
410 collection.find(filter).toArray(function (err, alreadyStoredDocs) {
411
412 self._log('Already stored: ', alreadyStoredDocs.length, ' docs');
413
414 if (alreadyStoredDocs.length > 0 && (self.forceUpdate || self.updateData)) {
415
416 var updateData = self.updateData || self.data;
417
418 self._log('Updating @filter: ', filter, ' with: ', updateData);
419
420 if (self.emulate) {
421 console.log('EMULATION: Update');
422 self.completed ({
423 success: true,
424 total: alreadyStoredDocs.length,
425 err: null,
426 data: []
427 });
428
429 if (0 == alreadyStoredDocs.length) {
430 self.empty();
431 }
432
433 return;
434 }
435
436 collection.update(
437 filter, updateData, { safe: true }, Boolean
438 );
439
440 self._log(alreadyStoredDocs);
441
442 self.completed ({
443 success: true,
444 total: alreadyStoredDocs.length,
445 err: false,
446 data: alreadyStoredDocs
447 });
448
449 if (0 == alreadyStoredDocs.length) {
450 self.empty();
451 }
452
453 return;
454
455 } else {
456
457 // build list of new records
458
459 self._log('Really inserting. Creating dataToInsert with unique = ', self.unique);
460
461 var dataToInsert = [];
462
463 /* if self.unique array is provided, its fields are used to check whether doc is already in collection
464 * doc is not inserted only if all unique fields of the new doc are equal to the same fields of the old doc
465 *
466 * if self.unique is not provided checks by _id
467 */
468
469 if (alreadyStoredDocs.length == 0) {
470
471 self.data.map(function (item) { dataToInsert.push(item) });
472
473 } else {
474
475 if (!self.unique) {
476
477 var alreadyStoredDocsIds = {};
478
479 alreadyStoredDocs.map (function(item) {
480 alreadyStoredDocsIds[item._id] = true;
481 });
482
483 self.data.map(function(item) {
484 if (!alreadyStoredDocsIds[item._id]) dataToInsert.push(item);
485 });
486
487 } else {
488 var unique = self.unique;
489 if ( !(unique instanceof Array) ) unique = [unique];
490
491 dataToInsert = self.data.filter(function(item) {
492 var uniqueField;
493 for (var k = 0; k < alreadyStoredDocs.length; k++) {
494 for (var l = 0; l < unique.length; l++) {
495 uniqueField = unique[l];
496 if (alreadyStoredDocs[k][uniqueField] != item[uniqueField]) return true;
497 }
498 }
499 return false;
500 });
501
502 }
503 }
504
505 if (dataToInsert.length == 0) {
506
507 self._log('Nothing to insert');
508
509 self.completed ({
510 success: (err == null),
511 total: alreadyStoredDocs.length,
512 err: err || null,
513 data: alreadyStoredDocs
514 });
515
516 if (!err && 0 == alreadyStoredDocs.length) {
517 self.empty();
518 }
519
520 return;
521 }
522
523
524 self._log('Perform insert of ', dataToInsert.length, ' items', dataToInsert);
525
526 if (self.emulate) {
527 console.log('EMULATION: Insert Safe');
528 self.completed ({
529 success: true,
530 total: 1,
531 err: null,
532 data: []
533 });
534 return;
535 }
536
537 collection.insert (dataToInsert, {safe: true}, function (err, docs) {
538
539 if (docs) docs.map (function (item) {
540 if (self.mapping) {
541 self.mapFields (item);
542 }
543 });
544
545 self._log('inserted ', docs, err);
546
547 var insertedRecords = alreadyStoredDocs.concat(docs);
548
549
550 self.completed ({
551 success: (err == null),
552 total: (insertedRecords && insertedRecords.length) || 0,
553 err: err || null,
554 data: insertedRecords
555 });
556
557 if (!err && 0 == insertedRecords.length) {
558 self.empty();
559 }
560 });
561 }
562
563 }); //collection.find(filter).toArray
564
565 } else {
566
567 if (self.emulate) {
568 console.log('EMULATION: Insert');
569 self.completed ({
570 success: true,
571 total: 1,
572 err: null,
573 data: []
574 });
575 return;
576 }
577 collection.insert (self.data, {safe: true}, function (err, docs) {
578
579 // TODO: check two parallels tasks: if one from its completed, then dataflow must be completed (for exaple mongo & ldap tasks)
580 if (this.verbose)
581 console.log ('collection.insert', docs, err);
582
583 if (docs) docs.map (function (item) {
584 if (self.mapping) {
585 self.mapFields (item);
586 }
587 });
588
589 if (err) {
590 console.error(err);
591 }
592
593 self.completed ({
594 success: (err == null),
595 total: (docs && docs.length) || 0,
596 err: err || null,
597 data: docs
598 });
599
600 if (!err && 0 == docs.length) {
601 self.empty();
602 }
603 });
604
605 }
606
607 });
608 },
609
610/**
611 * Params:
612 *
613 * @cfg {Object} criteria - object for select updating object (see MongoDB docs).
614 *
615 * @cfg {Array} criteriaFields - this array must contains field names, by wich will
616 * be constructed criteriaObj. This parameter is for updating many records.
617 *
618 * @cfg {Array | Object} data - main data container.
619 *
620 * @cfg {Object} modify - object {operation: [fieldName], ...} for modifying data,
621 * f.e. {$push: ['comment'], $set: ['title']}
622 *
623 * @cfg {Array} options (upsert, multi, safe)
624 *
625 */
626
627 update: function () {
628
629 var self = this,
630 options = self.options || {},
631 idList,
632 total = 0,
633 success = 0,
634 failed = 0,
635 criteriaFields = self.criteriaFields || ["_id"];
636
637 var callback = function (err) {
638
639 if (idList.length > 1) { // many records
640
641 total++;
642
643 if (err) {
644 failed++
645 } else {
646 success++;
647 }
648
649 if (total == idList.length) {
650 if (total == success) {
651 if (self.verbose) self.emit('log', 'Updated IDs', idList);
652 self.completed({
653 _id: { $in: idList }
654 });
655 } else {
656 self.failed({
657 msg: 'Not all records updated',
658 failed: failed,
659 total: total,
660 success: success
661 });
662 }
663 }
664
665 } else { // single object
666
667 if (err) {
668 self.failed(err);
669 } else {
670
671 self.completed ({
672 _id: idList[0]
673 });
674
675 if (0 == idList.length) {
676 self.empty();
677 }
678 }
679 }
680
681 };
682
683 if (self.verbose)
684 self.emit ('log', 'update called ', self.data);
685
686 self._openCollection (function (err, collection) {
687
688 // wrap single record to array
689
690 if (self.data.constructor != Array) {
691 self.data = [self.data];
692 }
693
694 idList = self.data.map (function (item) {
695
696 if (item._id || self.criteria || options.upsert) {
697
698 // clone before update
699
700 var set = util.extend(true, {}, item);
701 delete set._id;
702
703 // criteriaObj
704
705 var criteriaObj;
706
707 if (!self.criteria) {
708
709 // default by _id or by defined first level fields just
710
711 criteriaObj = {};
712
713 criteriaFields.forEach(function(fieldName) {
714
715 if (fieldName == "_id") {
716 if (item.hasOwnProperty(fieldName))
717 criteriaObj[fieldName] = self._objectId(item[fieldName]);
718 } else {
719 if (set.hasOwnProperty(fieldName))
720 criteriaObj[fieldName] = set[fieldName];
721 }
722
723 });
724
725 } else {
726 criteriaObj = self.criteria;
727 }
728
729 // newObj
730
731 var newObj;
732
733 if (self.modify) {
734
735 newObj = {};
736 var modify = self.modify;
737
738 for (var m in modify) {
739 newObj[m] = {};
740
741 modify[m].map(function(field) {
742 newObj[m][field] = set[field];
743 delete set[field];
744 });
745 }
746
747 if (!('$set' in modify)) {
748 newObj.$set = set;
749 }
750
751 } else {
752 newObj = (self.replace) ? (set) : ({$set: set});
753 }
754
755 // set timestamp
756
757 if (self.timestamp) {
758 var timestamp = ~~(new Date().getTime()/1000);
759 if (newObj.$set) newObj.$set.updated = timestamp;
760 else newObj.updated = timestamp;
761 }
762
763 // safe
764
765 options.safe = true;
766
767 // show input params
768 if (self.verbose)
769 console.log('collection.update ', criteriaObj, newObj, options);
770
771 // do update
772 collection.update(criteriaObj, newObj, options, callback);
773
774 // return Id for map operation
775 return item._id;
776
777 } else {
778 // something wrong. this couldn't happen
779 self.emit ('log', 'strange things with _id: "'+item._id+'"');
780 }
781
782 return null;
783 });
784 });
785 },
786
787
788 remove: function () {
789 var self = this,
790 ids;
791
792 self.options = self.options || { safe: true };
793
794 if (self.verbose) {
795 self.emit('log', 'remove called ', self.data);
796 }
797
798 if (!Object.is('Array', self.data)) {
799 self.data = [self.data];
800 }
801
802 ids = self.data.filter(function (item) {
803 return null != item._id;
804 });
805
806 if (self.data.length != ids.length && ids.length == 0) {
807
808 ids = self.data.filter(function (id) {
809 return null != id;
810 }). map(function (id) {
811 return self._objectId(id);
812 });
813
814 } else {
815
816 ids = ids.map(function (item) {
817 return self._objectId(item._id);
818 });
819
820 }
821
822 self._openCollection(function (err, collection) {
823
824 if (self.verbose) {
825 console.log('remove by filter', {
826 _id: { $in: ids }
827 })
828 }
829
830 collection.remove({
831 _id: { $in: ids }
832 }, self.options, function (err, records) {
833 self.completed ({
834 err: err,
835 success: err == null,
836 total: records.length,
837 data: records
838 });
839 });
840 });
841 },
842
843 removeAll: function () {
844 var self = this;
845
846 self.options = self.options || { safe: true };
847
848 if (self.verbose) {
849 self.emit('log', 'removeAll');
850 }
851
852 self._openCollection(function (err, collection) {
853 collection.remove({
854 }, self.options, function (err, records) {
855 self.completed ({
856 err: err,
857 success: err == null,
858 total: records.length,
859 data: records
860 });
861 });
862 });
863 },
864
865 emitError: function (e) {
866 if (e) {
867 this.state = 5;
868 this.emit('error', e);
869 this.cancel();
870 return true;
871 } else {
872 return false;
873 }
874 },
875
876 readGridFS: function () {
877 var self = this;
878 this.openGridFS('r', function (gs) {
879 gs.read(function (err, data) {
880 if (err) {
881 self.failed(err);
882 } else {
883 self.completed(data);
884 }
885 });
886 });
887 },
888
889 pipeGridFS: function () {
890 var self = this;
891 var toStream = this.toStream;
892
893 this.openGridFS('r', function (gs) {
894 var stream = gs.stream(true);
895
896 stream.on('end', function () {
897 self.completed(stream);
898 });
899
900 stream.on('error', function (err) {
901 self.failed(err);
902 });
903
904 stream.pipe(toStream);
905 });
906 },
907
908 writeGridFS: function () {
909 var self = this;
910 var data = this.fileData;
911
912 this.openGridFS('w', function (gs) {
913 gs.write(data, function (err) {
914 if (err) {
915 self.failed(err);
916 } else {
917 gs.close(function (err, result) {
918 if (err) {
919 self.failed(err);
920 } else {
921 self.completed(result);
922 }
923 });
924 }
925 });
926 });
927 },
928
929 openGridFS: function (mode, cb) {
930 var self = this;
931 var options = this.options;
932 var fileName = this.fileName;
933
934 this.connector = 'mongo';
935 var db = this._getConnector();
936
937 db.open(function (err, db) {
938 var gs = new mongo.GridStore(db, fileName, mode, options);
939
940 gs.open(function (err, gs) {
941 if (err) {
942 self.failed(err);
943 } else {
944 cb(gs);
945 }
946 });
947
948 });
949 },
950
951 createDbRef: function () {
952 var self = this;
953 var DBRef = project.connectors[
954 this.connector
955 ].bson_serializer.DBRef;
956 var data = this.data;
957 var colName = this.refCollection;
958
959 var createRef = function (item) {
960 return new DBRef(
961 colName, self._objectId(item._id)
962 );
963 };
964
965 try {
966 if (data instanceof Array) {
967 var refs = data.map(createRef);
968 } else {
969 refs = createRef(data);
970 }
971
972 this.completed(refs);
973 } catch (e) {
974 this.failed(e);
975 }
976 },
977
978/**
979 * Run a group command across a collection
980 *
981 * @param {Object|Array|Function|Code} keys an object, array or function expressing the keys to group by.
982 * @param {Object} condition an optional condition that must be true for a row to be considered.
983 * @param {Object} initial initial value of the aggregation counter object.
984 * @param {Function|Code} reduce the reduce function aggregates (reduces) the objects iterated
985 * @param {Function|Code} finalize an optional function to be run on each item in the result set just before the item is returned.
986 * @param {Boolean} command specify if you wish to run using the internal group command or using eval, default is true.
987 * @param {Object} [options] additional options during update.
988 * @param {Function} callback returns the results.
989 * @return {null}
990 * @api public
991 * @group(keys, condition, initial, reduce, finalize, command, options, callback)
992 */
993
994 group: function () {
995
996 var self = this;
997
998 self._openColOrFail(function (collection) {
999
1000 collection.group(self.keys, self.condition, self.initial, self.reduce, self._onResult.bind(self));
1001
1002 });
1003 },
1004
1005/**
1006 * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
1007 *
1008 * Options
1009 * - **out** {Object, default:*{inline:1}*}, sets the output target for the map reduce job. *{inline:1} | {replace:'collectionName'} | {merge:'collectionName'} | {reduce:'collectionName'}*
1010 * - **query** {Object}, query filter object.
1011 * - **sort** {Object}, sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces.
1012 * - **limit** {Number}, number of objects to return from collection.
1013 * - **keeptemp** {Boolean, default:false}, keep temporary data.
1014 * - **finalize** {Function | String}, finalize function.
1015 * - **scope** {Object}, can pass in variables that can be access from map/reduce/finalize.
1016 * - **jsMode** {Boolean, default:false}, it is possible to make the execution stay in JS. Provided in MongoDB > 2.0.X.
1017 * - **verbose** {Boolean, default:false}, provide statistics on job execution time.
1018 * - **readPreference** {String, only for inline results}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
1019 *
1020 * @param {Function|String} map the mapping function.
1021 * @param {Function|String} reduce the reduce function.
1022 * @param {Objects} [options] options for the map reduce job.
1023 * @return {Objects} returns the result of the map reduce job, (error, results, [stats])
1024 */
1025
1026 mapReduce: function () {
1027 var self = this;
1028
1029 var options = self.options || {};
1030 options.out = { inline: 1 }; // override any external out defenition
1031
1032 self._openColOrFail(function (collection) {
1033 collection.mapReduce(
1034 self.map, self.reduce, options,
1035 self._onResult.bind(self)
1036 );
1037 });
1038 },
1039
1040 _openColOrFail: function (callback) {
1041 this._openCollection(function (err, collection) {
1042 if (err) {
1043 this.failed(err);
1044 } else {
1045 callback.call(this, collection);
1046 }
1047 });
1048 },
1049
1050 _onResult: function (err, data) {
1051 if (err) {
1052 this.failed();
1053 } else {
1054 this.completed({
1055 success: true,
1056 err: data && data.errmsg,
1057 data: data,
1058 total: data ? data.length : 0
1059 });
1060
1061 if (!data || 0 == data.length) {
1062 this.empty();
1063 }
1064 }
1065 },
1066
1067 aggregate: function () {
1068 this._openColOrFail(function (collection) {
1069 collection.aggregate(this.params, this._onResult.bind(this));
1070 });
1071 },
1072
1073 GET: function () {
1074 this.run();
1075 },
1076
1077 POST: function () {
1078 this._openColOrFail(function (collection) {
1079 collection.update(
1080 this.criteria || {},
1081 this.data || {},
1082 this.options || {},
1083 this._onResult.bind(this)
1084 );
1085 });
1086 },
1087
1088 PUT: function () {
1089 this._openColOrFail(function (collection) {
1090 collection.insert(
1091 this.data || {},
1092 this.options || {},
1093 this._onResult.bind(this)
1094 );
1095 });
1096 }
1097});