UNPKG

35.2 kBJavaScriptView Raw
1'use strict';
2
3const Long = require('../core').BSON.Long;
4const MongoError = require('../core').MongoError;
5const ObjectID = require('../core').BSON.ObjectID;
6const BSON = require('../core').BSON;
7const MongoWriteConcernError = require('../core').MongoWriteConcernError;
8const toError = require('../utils').toError;
9const handleCallback = require('../utils').handleCallback;
10const applyRetryableWrites = require('../utils').applyRetryableWrites;
11const applyWriteConcern = require('../utils').applyWriteConcern;
12const executeLegacyOperation = require('../utils').executeLegacyOperation;
13const isPromiseLike = require('../utils').isPromiseLike;
14
15// Error codes
16const WRITE_CONCERN_ERROR = 64;
17
18// Insert types
19const INSERT = 1;
20const UPDATE = 2;
21const REMOVE = 3;
22
23const bson = new BSON([
24 BSON.Binary,
25 BSON.Code,
26 BSON.DBRef,
27 BSON.Decimal128,
28 BSON.Double,
29 BSON.Int32,
30 BSON.Long,
31 BSON.Map,
32 BSON.MaxKey,
33 BSON.MinKey,
34 BSON.ObjectId,
35 BSON.BSONRegExp,
36 BSON.Symbol,
37 BSON.Timestamp
38]);
39
40/**
41 * Keeps the state of a unordered batch so we can rewrite the results
42 * correctly after command execution
43 * @ignore
44 */
45class Batch {
46 constructor(batchType, originalZeroIndex) {
47 this.originalZeroIndex = originalZeroIndex;
48 this.currentIndex = 0;
49 this.originalIndexes = [];
50 this.batchType = batchType;
51 this.operations = [];
52 this.size = 0;
53 this.sizeBytes = 0;
54 }
55}
56
57/**
58 * @classdesc
59 * The result of a bulk write.
60 */
61class BulkWriteResult {
62 /**
63 * Create a new BulkWriteResult instance
64 *
65 * **NOTE:** Internal Type, do not instantiate directly
66 */
67 constructor(bulkResult) {
68 this.result = bulkResult;
69 }
70
71 /**
72 * Evaluates to true if the bulk operation correctly executes
73 * @type {boolean}
74 */
75 get ok() {
76 return this.result.ok;
77 }
78
79 /**
80 * The number of inserted documents
81 * @type {number}
82 */
83 get nInserted() {
84 return this.result.nInserted;
85 }
86
87 /**
88 * Number of upserted documents
89 * @type {number}
90 */
91 get nUpserted() {
92 return this.result.nUpserted;
93 }
94
95 /**
96 * Number of matched documents
97 * @type {number}
98 */
99 get nMatched() {
100 return this.result.nMatched;
101 }
102
103 /**
104 * Number of documents updated physically on disk
105 * @type {number}
106 */
107 get nModified() {
108 return this.result.nModified;
109 }
110
111 /**
112 * Number of removed documents
113 * @type {number}
114 */
115 get nRemoved() {
116 return this.result.nRemoved;
117 }
118
119 /**
120 * Returns an array of all inserted ids
121 *
122 * @return {object[]}
123 */
124 getInsertedIds() {
125 return this.result.insertedIds;
126 }
127
128 /**
129 * Returns an array of all upserted ids
130 *
131 * @return {object[]}
132 */
133 getUpsertedIds() {
134 return this.result.upserted;
135 }
136
137 /**
138 * Returns the upserted id at the given index
139 *
140 * @param {number} index the number of the upserted id to return, returns undefined if no result for passed in index
141 * @return {object}
142 */
143 getUpsertedIdAt(index) {
144 return this.result.upserted[index];
145 }
146
147 /**
148 * Returns raw internal result
149 *
150 * @return {object}
151 */
152 getRawResponse() {
153 return this.result;
154 }
155
156 /**
157 * Returns true if the bulk operation contains a write error
158 *
159 * @return {boolean}
160 */
161 hasWriteErrors() {
162 return this.result.writeErrors.length > 0;
163 }
164
165 /**
166 * Returns the number of write errors off the bulk operation
167 *
168 * @return {number}
169 */
170 getWriteErrorCount() {
171 return this.result.writeErrors.length;
172 }
173
174 /**
175 * Returns a specific write error object
176 *
177 * @param {number} index of the write error to return, returns null if there is no result for passed in index
178 * @return {WriteError}
179 */
180 getWriteErrorAt(index) {
181 if (index < this.result.writeErrors.length) {
182 return this.result.writeErrors[index];
183 }
184 return null;
185 }
186
187 /**
188 * Retrieve all write errors
189 *
190 * @return {WriteError[]}
191 */
192 getWriteErrors() {
193 return this.result.writeErrors;
194 }
195
196 /**
197 * Retrieve lastOp if available
198 *
199 * @return {object}
200 */
201 getLastOp() {
202 return this.result.lastOp;
203 }
204
205 /**
206 * Retrieve the write concern error if any
207 *
208 * @return {WriteConcernError}
209 */
210 getWriteConcernError() {
211 if (this.result.writeConcernErrors.length === 0) {
212 return null;
213 } else if (this.result.writeConcernErrors.length === 1) {
214 // Return the error
215 return this.result.writeConcernErrors[0];
216 } else {
217 // Combine the errors
218 let errmsg = '';
219 for (let i = 0; i < this.result.writeConcernErrors.length; i++) {
220 const err = this.result.writeConcernErrors[i];
221 errmsg = errmsg + err.errmsg;
222
223 // TODO: Something better
224 if (i === 0) errmsg = errmsg + ' and ';
225 }
226
227 return new WriteConcernError({ errmsg: errmsg, code: WRITE_CONCERN_ERROR });
228 }
229 }
230
231 /**
232 * @return {object}
233 */
234 toJSON() {
235 return this.result;
236 }
237
238 /**
239 * @return {string}
240 */
241 toString() {
242 return `BulkWriteResult(${this.toJSON(this.result)})`;
243 }
244
245 /**
246 * @return {boolean}
247 */
248 isOk() {
249 return this.result.ok === 1;
250 }
251}
252
253/**
254 * @classdesc An error representing a failure by the server to apply the requested write concern to the bulk operation.
255 */
256class WriteConcernError {
257 /**
258 * Create a new WriteConcernError instance
259 *
260 * **NOTE:** Internal Type, do not instantiate directly
261 */
262 constructor(err) {
263 this.err = err;
264 }
265
266 /**
267 * Write concern error code.
268 * @type {number}
269 */
270 get code() {
271 return this.err.code;
272 }
273
274 /**
275 * Write concern error message.
276 * @type {string}
277 */
278 get errmsg() {
279 return this.err.errmsg;
280 }
281
282 /**
283 * @return {object}
284 */
285 toJSON() {
286 return { code: this.err.code, errmsg: this.err.errmsg };
287 }
288
289 /**
290 * @return {string}
291 */
292 toString() {
293 return `WriteConcernError(${this.err.errmsg})`;
294 }
295}
296
297/**
298 * @classdesc An error that occurred during a BulkWrite on the server.
299 */
300class WriteError {
301 /**
302 * Create a new WriteError instance
303 *
304 * **NOTE:** Internal Type, do not instantiate directly
305 */
306 constructor(err) {
307 this.err = err;
308 }
309
310 /**
311 * WriteError code.
312 * @type {number}
313 */
314 get code() {
315 return this.err.code;
316 }
317
318 /**
319 * WriteError original bulk operation index.
320 * @type {number}
321 */
322 get index() {
323 return this.err.index;
324 }
325
326 /**
327 * WriteError message.
328 * @type {string}
329 */
330 get errmsg() {
331 return this.err.errmsg;
332 }
333
334 /**
335 * Returns the underlying operation that caused the error
336 * @return {object}
337 */
338 getOperation() {
339 return this.err.op;
340 }
341
342 /**
343 * @return {object}
344 */
345 toJSON() {
346 return { code: this.err.code, index: this.err.index, errmsg: this.err.errmsg, op: this.err.op };
347 }
348
349 /**
350 * @return {string}
351 */
352 toString() {
353 return `WriteError(${JSON.stringify(this.toJSON())})`;
354 }
355}
356
357/**
358 * Merges results into shared data structure
359 * @ignore
360 */
361function mergeBatchResults(batch, bulkResult, err, result) {
362 // If we have an error set the result to be the err object
363 if (err) {
364 result = err;
365 } else if (result && result.result) {
366 result = result.result;
367 } else if (result == null) {
368 return;
369 }
370
371 // Do we have a top level error stop processing and return
372 if (result.ok === 0 && bulkResult.ok === 1) {
373 bulkResult.ok = 0;
374
375 const writeError = {
376 index: 0,
377 code: result.code || 0,
378 errmsg: result.message,
379 op: batch.operations[0]
380 };
381
382 bulkResult.writeErrors.push(new WriteError(writeError));
383 return;
384 } else if (result.ok === 0 && bulkResult.ok === 0) {
385 return;
386 }
387
388 // Deal with opTime if available
389 if (result.opTime || result.lastOp) {
390 const opTime = result.lastOp || result.opTime;
391 let lastOpTS = null;
392 let lastOpT = null;
393
394 // We have a time stamp
395 if (opTime && opTime._bsontype === 'Timestamp') {
396 if (bulkResult.lastOp == null) {
397 bulkResult.lastOp = opTime;
398 } else if (opTime.greaterThan(bulkResult.lastOp)) {
399 bulkResult.lastOp = opTime;
400 }
401 } else {
402 // Existing TS
403 if (bulkResult.lastOp) {
404 lastOpTS =
405 typeof bulkResult.lastOp.ts === 'number'
406 ? Long.fromNumber(bulkResult.lastOp.ts)
407 : bulkResult.lastOp.ts;
408 lastOpT =
409 typeof bulkResult.lastOp.t === 'number'
410 ? Long.fromNumber(bulkResult.lastOp.t)
411 : bulkResult.lastOp.t;
412 }
413
414 // Current OpTime TS
415 const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
416 const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
417
418 // Compare the opTime's
419 if (bulkResult.lastOp == null) {
420 bulkResult.lastOp = opTime;
421 } else if (opTimeTS.greaterThan(lastOpTS)) {
422 bulkResult.lastOp = opTime;
423 } else if (opTimeTS.equals(lastOpTS)) {
424 if (opTimeT.greaterThan(lastOpT)) {
425 bulkResult.lastOp = opTime;
426 }
427 }
428 }
429 }
430
431 // If we have an insert Batch type
432 if (batch.batchType === INSERT && result.n) {
433 bulkResult.nInserted = bulkResult.nInserted + result.n;
434 }
435
436 // If we have an insert Batch type
437 if (batch.batchType === REMOVE && result.n) {
438 bulkResult.nRemoved = bulkResult.nRemoved + result.n;
439 }
440
441 let nUpserted = 0;
442
443 // We have an array of upserted values, we need to rewrite the indexes
444 if (Array.isArray(result.upserted)) {
445 nUpserted = result.upserted.length;
446
447 for (let i = 0; i < result.upserted.length; i++) {
448 bulkResult.upserted.push({
449 index: result.upserted[i].index + batch.originalZeroIndex,
450 _id: result.upserted[i]._id
451 });
452 }
453 } else if (result.upserted) {
454 nUpserted = 1;
455
456 bulkResult.upserted.push({
457 index: batch.originalZeroIndex,
458 _id: result.upserted
459 });
460 }
461
462 // If we have an update Batch type
463 if (batch.batchType === UPDATE && result.n) {
464 const nModified = result.nModified;
465 bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
466 bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);
467
468 if (typeof nModified === 'number') {
469 bulkResult.nModified = bulkResult.nModified + nModified;
470 } else {
471 bulkResult.nModified = null;
472 }
473 }
474
475 if (Array.isArray(result.writeErrors)) {
476 for (let i = 0; i < result.writeErrors.length; i++) {
477 const writeError = {
478 index: batch.originalIndexes[result.writeErrors[i].index],
479 code: result.writeErrors[i].code,
480 errmsg: result.writeErrors[i].errmsg,
481 op: batch.operations[result.writeErrors[i].index]
482 };
483
484 bulkResult.writeErrors.push(new WriteError(writeError));
485 }
486 }
487
488 if (result.writeConcernError) {
489 bulkResult.writeConcernErrors.push(new WriteConcernError(result.writeConcernError));
490 }
491}
492
493function executeCommands(bulkOperation, options, callback) {
494 if (bulkOperation.s.batches.length === 0) {
495 return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
496 }
497
498 const batch = bulkOperation.s.batches.shift();
499
500 function resultHandler(err, result) {
501 // Error is a driver related error not a bulk op error, terminate
502 if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
503 return handleCallback(callback, err);
504 }
505
506 // If we have and error
507 if (err) err.ok = 0;
508 if (err instanceof MongoWriteConcernError) {
509 return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
510 }
511
512 // Merge the results together
513 const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
514 const mergeResult = mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
515 if (mergeResult != null) {
516 return handleCallback(callback, null, writeResult);
517 }
518
519 if (bulkOperation.handleWriteError(callback, writeResult)) return;
520
521 // Execute the next command in line
522 executeCommands(bulkOperation, options, callback);
523 }
524
525 bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
526}
527
528/**
529 * handles write concern error
530 *
531 * @ignore
532 * @param {object} batch
533 * @param {object} bulkResult
534 * @param {boolean} ordered
535 * @param {WriteConcernError} err
536 * @param {function} callback
537 */
538function handleMongoWriteConcernError(batch, bulkResult, err, callback) {
539 mergeBatchResults(batch, bulkResult, null, err.result);
540
541 const wrappedWriteConcernError = new WriteConcernError({
542 errmsg: err.result.writeConcernError.errmsg,
543 code: err.result.writeConcernError.result
544 });
545 return handleCallback(
546 callback,
547 new BulkWriteError(toError(wrappedWriteConcernError), new BulkWriteResult(bulkResult)),
548 null
549 );
550}
551
552/**
553 * @classdesc An error indicating an unsuccessful Bulk Write
554 */
555class BulkWriteError extends MongoError {
556 /**
557 * Creates a new BulkWriteError
558 *
559 * @param {Error|string|object} message The error message
560 * @param {BulkWriteResult} result The result of the bulk write operation
561 * @extends {MongoError}
562 */
563 constructor(error, result) {
564 const message = error.err || error.errmsg || error.errMessage || error;
565 super(message);
566
567 Object.assign(this, error);
568
569 this.name = 'BulkWriteError';
570 this.result = result;
571 }
572}
573
574/**
575 * @classdesc A builder object that is returned from {@link BulkOperationBase#find}.
576 * Is used to build a write operation that involves a query filter.
577 */
578class FindOperators {
579 /**
580 * Creates a new FindOperators object.
581 *
582 * **NOTE:** Internal Type, do not instantiate directly
583 * @param {OrderedBulkOperation|UnorderedBulkOperation} bulkOperation
584 */
585 constructor(bulkOperation) {
586 this.s = bulkOperation.s;
587 }
588
589 /**
590 * Add a multiple update operation to the bulk operation
591 *
592 * @method
593 * @param {object} updateDocument An update field for an update operation. See {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-u u documentation}
594 * @param {object} [options.hint] An optional hint for query optimization. See the {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-hint|update command} reference for more information.
595 * @throws {MongoError} If operation cannot be added to bulk write
596 * @return {OrderedBulkOperation|UnorderedBulkOperation} A reference to the parent BulkOperation
597 */
598 update(updateDocument) {
599 // Perform upsert
600 const upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
601
602 // Establish the update command
603 const document = {
604 q: this.s.currentOp.selector,
605 u: updateDocument,
606 multi: true,
607 upsert: upsert
608 };
609
610 if (updateDocument.hint) {
611 document.hint = updateDocument.hint;
612 }
613
614 // Clear out current Op
615 this.s.currentOp = null;
616 return this.s.options.addToOperationsList(this, UPDATE, document);
617 }
618
619 /**
620 * Add a single update operation to the bulk operation
621 *
622 * @method
623 * @param {object} updateDocument An update field for an update operation. See {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-u u documentation}
624 * @param {object} [options.hint] An optional hint for query optimization. See the {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-hint|update command} reference for more information.
625 * @throws {MongoError} If operation cannot be added to bulk write
626 * @return {OrderedBulkOperation|UnorderedBulkOperation} A reference to the parent BulkOperation
627 */
628 updateOne(updateDocument) {
629 // Perform upsert
630 const upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
631
632 // Establish the update command
633 const document = {
634 q: this.s.currentOp.selector,
635 u: updateDocument,
636 multi: false,
637 upsert: upsert
638 };
639
640 if (updateDocument.hint) {
641 document.hint = updateDocument.hint;
642 }
643
644 // Clear out current Op
645 this.s.currentOp = null;
646 return this.s.options.addToOperationsList(this, UPDATE, document);
647 }
648
649 /**
650 * Add a replace one operation to the bulk operation
651 *
652 * @method
653 * @param {object} updateDocument the new document to replace the existing one with
654 * @throws {MongoError} If operation cannot be added to bulk write
655 * @return {OrderedBulkOperation|UnorderedBulkOperation} A reference to the parent BulkOperation
656 */
657 replaceOne(updateDocument) {
658 this.updateOne(updateDocument);
659 }
660
661 /**
662 * Upsert modifier for update bulk operation, noting that this operation is an upsert.
663 *
664 * @method
665 * @throws {MongoError} If operation cannot be added to bulk write
666 * @return {FindOperators} reference to self
667 */
668 upsert() {
669 this.s.currentOp.upsert = true;
670 return this;
671 }
672
673 /**
674 * Add a delete one operation to the bulk operation
675 *
676 * @method
677 * @throws {MongoError} If operation cannot be added to bulk write
678 * @return {OrderedBulkOperation|UnorderedBulkOperation} A reference to the parent BulkOperation
679 */
680 deleteOne() {
681 // Establish the update command
682 const document = {
683 q: this.s.currentOp.selector,
684 limit: 1
685 };
686
687 // Clear out current Op
688 this.s.currentOp = null;
689 return this.s.options.addToOperationsList(this, REMOVE, document);
690 }
691
692 /**
693 * Add a delete many operation to the bulk operation
694 *
695 * @method
696 * @throws {MongoError} If operation cannot be added to bulk write
697 * @return {OrderedBulkOperation|UnorderedBulkOperation} A reference to the parent BulkOperation
698 */
699 delete() {
700 // Establish the update command
701 const document = {
702 q: this.s.currentOp.selector,
703 limit: 0
704 };
705
706 // Clear out current Op
707 this.s.currentOp = null;
708 return this.s.options.addToOperationsList(this, REMOVE, document);
709 }
710
711 /**
712 * backwards compatability for deleteOne
713 */
714 removeOne() {
715 return this.deleteOne();
716 }
717
718 /**
719 * backwards compatability for delete
720 */
721 remove() {
722 return this.delete();
723 }
724}
725
726/**
727 * @classdesc Parent class to OrderedBulkOperation and UnorderedBulkOperation
728 *
729 * **NOTE:** Internal Type, do not instantiate directly
730 */
731class BulkOperationBase {
732 /**
733 * Create a new OrderedBulkOperation or UnorderedBulkOperation instance
734 * @property {number} length Get the number of operations in the bulk.
735 */
736 constructor(topology, collection, options, isOrdered) {
737 // determine whether bulkOperation is ordered or unordered
738 this.isOrdered = isOrdered;
739
740 options = options == null ? {} : options;
741 // TODO Bring from driver information in isMaster
742 // Get the namespace for the write operations
743 const namespace = collection.s.namespace;
744 // Used to mark operation as executed
745 const executed = false;
746
747 // Current item
748 const currentOp = null;
749
750 // Handle to the bson serializer, used to calculate running sizes
751 const bson = topology.bson;
752 // Set max byte size
753 const isMaster = topology.lastIsMaster();
754
755 // If we have autoEncryption on, batch-splitting must be done on 2mb chunks, but single documents
756 // over 2mb are still allowed
757 const usingAutoEncryption = !!(topology.s.options && topology.s.options.autoEncrypter);
758 const maxBsonObjectSize =
759 isMaster && isMaster.maxBsonObjectSize ? isMaster.maxBsonObjectSize : 1024 * 1024 * 16;
760 const maxBatchSizeBytes = usingAutoEncryption ? 1024 * 1024 * 2 : maxBsonObjectSize;
761 const maxWriteBatchSize =
762 isMaster && isMaster.maxWriteBatchSize ? isMaster.maxWriteBatchSize : 1000;
763
764 // Calculates the largest possible size of an Array key, represented as a BSON string
765 // element. This calculation:
766 // 1 byte for BSON type
767 // # of bytes = length of (string representation of (maxWriteBatchSize - 1))
768 // + 1 bytes for null terminator
769 const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;
770
771 // Final options for retryable writes and write concern
772 let finalOptions = Object.assign({}, options);
773 finalOptions = applyRetryableWrites(finalOptions, collection.s.db);
774 finalOptions = applyWriteConcern(finalOptions, { collection: collection }, options);
775 const writeConcern = finalOptions.writeConcern;
776
777 // Get the promiseLibrary
778 const promiseLibrary = options.promiseLibrary || Promise;
779
780 // Final results
781 const bulkResult = {
782 ok: 1,
783 writeErrors: [],
784 writeConcernErrors: [],
785 insertedIds: [],
786 nInserted: 0,
787 nUpserted: 0,
788 nMatched: 0,
789 nModified: 0,
790 nRemoved: 0,
791 upserted: []
792 };
793
794 // Internal state
795 this.s = {
796 // Final result
797 bulkResult: bulkResult,
798 // Current batch state
799 currentBatch: null,
800 currentIndex: 0,
801 // ordered specific
802 currentBatchSize: 0,
803 currentBatchSizeBytes: 0,
804 // unordered specific
805 currentInsertBatch: null,
806 currentUpdateBatch: null,
807 currentRemoveBatch: null,
808 batches: [],
809 // Write concern
810 writeConcern: writeConcern,
811 // Max batch size options
812 maxBsonObjectSize,
813 maxBatchSizeBytes,
814 maxWriteBatchSize,
815 maxKeySize,
816 // Namespace
817 namespace: namespace,
818 // BSON
819 bson: bson,
820 // Topology
821 topology: topology,
822 // Options
823 options: finalOptions,
824 // Current operation
825 currentOp: currentOp,
826 // Executed
827 executed: executed,
828 // Collection
829 collection: collection,
830 // Promise Library
831 promiseLibrary: promiseLibrary,
832 // Fundamental error
833 err: null,
834 // check keys
835 checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : true
836 };
837
838 // bypass Validation
839 if (options.bypassDocumentValidation === true) {
840 this.s.bypassDocumentValidation = true;
841 }
842 }
843
844 /**
845 * Add a single insert document to the bulk operation
846 *
847 * @param {object} document the document to insert
848 * @throws {MongoError}
849 * @return {BulkOperationBase} A reference to self
850 *
851 * @example
852 * const bulkOp = collection.initializeOrderedBulkOp();
853 * // Adds three inserts to the bulkOp.
854 * bulkOp
855 * .insert({ a: 1 })
856 * .insert({ b: 2 })
857 * .insert({ c: 3 });
858 * await bulkOp.execute();
859 */
860 insert(document) {
861 if (this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null)
862 document._id = new ObjectID();
863 return this.s.options.addToOperationsList(this, INSERT, document);
864 }
865
866 /**
867 * Builds a find operation for an update/updateOne/delete/deleteOne/replaceOne.
868 * Returns a builder object used to complete the definition of the operation.
869 *
870 * @method
871 * @param {object} selector The selector for the bulk operation. See {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-q q documentation}
872 * @throws {MongoError} if a selector is not specified
873 * @return {FindOperators} A helper object with which the write operation can be defined.
874 *
875 * @example
876 * const bulkOp = collection.initializeOrderedBulkOp();
877 *
878 * // Add an updateOne to the bulkOp
879 * bulkOp.find({ a: 1 }).updateOne({ $set: { b: 2 } });
880 *
881 * // Add an updateMany to the bulkOp
882 * bulkOp.find({ c: 3 }).update({ $set: { d: 4 } });
883 *
884 * // Add an upsert
885 * bulkOp.find({ e: 5 }).upsert().updateOne({ $set: { f: 6 } });
886 *
887 * // Add a deletion
888 * bulkOp.find({ g: 7 }).deleteOne();
889 *
890 * // Add a multi deletion
891 * bulkOp.find({ h: 8 }).delete();
892 *
893 * // Add a replaceOne
894 * bulkOp.find({ i: 9 }).replaceOne({ j: 10 });
895 *
896 * // Update using a pipeline (requires Mongodb 4.2 or higher)
897 * bulk.find({ k: 11, y: { $exists: true }, z: { $exists: true } }).updateOne([
898 * { $set: { total: { $sum: [ '$y', '$z' ] } } }
899 * ]);
900 *
901 * // All of the ops will now be executed
902 * await bulkOp.execute();
903 */
904 find(selector) {
905 if (!selector) {
906 throw toError('Bulk find operation must specify a selector');
907 }
908
909 // Save a current selector
910 this.s.currentOp = {
911 selector: selector
912 };
913
914 return new FindOperators(this);
915 }
916
917 /**
918 * Specifies a raw operation to perform in the bulk write.
919 *
920 * @method
921 * @param {object} op The raw operation to perform.
922 * @param {object} [options.hint] An optional hint for query optimization. See the {@link https://docs.mongodb.com/manual/reference/command/update/#update-command-hint|update command} reference for more information.
923 * @return {BulkOperationBase} A reference to self
924 */
925 raw(op) {
926 const key = Object.keys(op)[0];
927
928 // Set up the force server object id
929 const forceServerObjectId =
930 typeof this.s.options.forceServerObjectId === 'boolean'
931 ? this.s.options.forceServerObjectId
932 : this.s.collection.s.db.options.forceServerObjectId;
933
934 // Update operations
935 if (
936 (op.updateOne && op.updateOne.q) ||
937 (op.updateMany && op.updateMany.q) ||
938 (op.replaceOne && op.replaceOne.q)
939 ) {
940 op[key].multi = op.updateOne || op.replaceOne ? false : true;
941 return this.s.options.addToOperationsList(this, UPDATE, op[key]);
942 }
943
944 // Crud spec update format
945 if (op.updateOne || op.updateMany || op.replaceOne) {
946 const multi = op.updateOne || op.replaceOne ? false : true;
947 const operation = {
948 q: op[key].filter,
949 u: op[key].update || op[key].replacement,
950 multi: multi
951 };
952
953 if (op[key].hint) {
954 operation.hint = op[key].hint;
955 }
956
957 if (this.isOrdered) {
958 operation.upsert = op[key].upsert ? true : false;
959 if (op.collation) operation.collation = op.collation;
960 } else {
961 if (op[key].upsert) operation.upsert = true;
962 }
963 if (op[key].arrayFilters) operation.arrayFilters = op[key].arrayFilters;
964 return this.s.options.addToOperationsList(this, UPDATE, operation);
965 }
966
967 // Remove operations
968 if (
969 op.removeOne ||
970 op.removeMany ||
971 (op.deleteOne && op.deleteOne.q) ||
972 (op.deleteMany && op.deleteMany.q)
973 ) {
974 op[key].limit = op.removeOne ? 1 : 0;
975 return this.s.options.addToOperationsList(this, REMOVE, op[key]);
976 }
977
978 // Crud spec delete operations, less efficient
979 if (op.deleteOne || op.deleteMany) {
980 const limit = op.deleteOne ? 1 : 0;
981 const operation = { q: op[key].filter, limit: limit };
982 if (this.isOrdered) {
983 if (op.collation) operation.collation = op.collation;
984 }
985 return this.s.options.addToOperationsList(this, REMOVE, operation);
986 }
987
988 // Insert operations
989 if (op.insertOne && op.insertOne.document == null) {
990 if (forceServerObjectId !== true && op.insertOne._id == null)
991 op.insertOne._id = new ObjectID();
992 return this.s.options.addToOperationsList(this, INSERT, op.insertOne);
993 } else if (op.insertOne && op.insertOne.document) {
994 if (forceServerObjectId !== true && op.insertOne.document._id == null)
995 op.insertOne.document._id = new ObjectID();
996 return this.s.options.addToOperationsList(this, INSERT, op.insertOne.document);
997 }
998
999 if (op.insertMany) {
1000 for (let i = 0; i < op.insertMany.length; i++) {
1001 if (forceServerObjectId !== true && op.insertMany[i]._id == null)
1002 op.insertMany[i]._id = new ObjectID();
1003 this.s.options.addToOperationsList(this, INSERT, op.insertMany[i]);
1004 }
1005
1006 return;
1007 }
1008
1009 // No valid type of operation
1010 throw toError(
1011 'bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany'
1012 );
1013 }
1014
1015 /**
1016 * helper function to assist with promiseOrCallback behavior
1017 * @ignore
1018 * @param {*} err
1019 * @param {*} callback
1020 */
1021 _handleEarlyError(err, callback) {
1022 if (typeof callback === 'function') {
1023 callback(err, null);
1024 return;
1025 }
1026
1027 return this.s.promiseLibrary.reject(err);
1028 }
1029
1030 /**
1031 * An internal helper method. Do not invoke directly. Will be going away in the future
1032 *
1033 * @ignore
1034 * @method
1035 * @param {class} bulk either OrderedBulkOperation or UnorderdBulkOperation
1036 * @param {object} writeConcern
1037 * @param {object} options
1038 * @param {function} callback
1039 */
1040 bulkExecute(_writeConcern, options, callback) {
1041 if (typeof options === 'function') (callback = options), (options = {});
1042 options = options || {};
1043
1044 if (typeof _writeConcern === 'function') {
1045 callback = _writeConcern;
1046 } else if (_writeConcern && typeof _writeConcern === 'object') {
1047 this.s.writeConcern = _writeConcern;
1048 }
1049
1050 if (this.s.executed) {
1051 const executedError = toError('batch cannot be re-executed');
1052 return this._handleEarlyError(executedError, callback);
1053 }
1054
1055 // If we have current batch
1056 if (this.isOrdered) {
1057 if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
1058 } else {
1059 if (this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
1060 if (this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
1061 if (this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
1062 }
1063 // If we have no operations in the bulk raise an error
1064 if (this.s.batches.length === 0) {
1065 const emptyBatchError = toError('Invalid Operation, no operations specified');
1066 return this._handleEarlyError(emptyBatchError, callback);
1067 }
1068 return { options, callback };
1069 }
1070
1071 /**
1072 * The callback format for results
1073 * @callback BulkOperationBase~resultCallback
1074 * @param {MongoError} error An error instance representing the error during the execution.
1075 * @param {BulkWriteResult} result The bulk write result.
1076 */
1077
1078 /**
1079 * Execute the bulk operation
1080 *
1081 * @method
1082 * @param {WriteConcern} [_writeConcern] Optional write concern. Can also be specified through options.
1083 * @param {object} [options] Optional settings.
1084 * @param {(number|string)} [options.w] The write concern.
1085 * @param {number} [options.wtimeout] The write concern timeout.
1086 * @param {boolean} [options.j=false] Specify a journal write concern.
1087 * @param {boolean} [options.fsync=false] Specify a file sync write concern.
1088 * @param {BulkOperationBase~resultCallback} [callback] A callback that will be invoked when bulkWrite finishes/errors
1089 * @throws {MongoError} Throws error if the bulk object has already been executed
1090 * @throws {MongoError} Throws error if the bulk object does not have any operations
1091 * @return {Promise|void} returns Promise if no callback passed
1092 */
1093 execute(_writeConcern, options, callback) {
1094 const ret = this.bulkExecute(_writeConcern, options, callback);
1095 if (!ret || isPromiseLike(ret)) {
1096 return ret;
1097 }
1098
1099 options = ret.options;
1100 callback = ret.callback;
1101
1102 return executeLegacyOperation(this.s.topology, executeCommands, [this, options, callback]);
1103 }
1104
1105 /**
1106 * Handles final options before executing command
1107 *
1108 * An internal method. Do not invoke. Will not be accessible in the future
1109 *
1110 * @ignore
1111 * @param {object} config
1112 * @param {object} config.options
1113 * @param {number} config.batch
1114 * @param {function} config.resultHandler
1115 * @param {function} callback
1116 */
1117 finalOptionsHandler(config, callback) {
1118 const finalOptions = Object.assign({ ordered: this.isOrdered }, config.options);
1119 if (this.s.writeConcern != null) {
1120 finalOptions.writeConcern = this.s.writeConcern;
1121 }
1122
1123 if (finalOptions.bypassDocumentValidation !== true) {
1124 delete finalOptions.bypassDocumentValidation;
1125 }
1126
1127 // Set an operationIf if provided
1128 if (this.operationId) {
1129 config.resultHandler.operationId = this.operationId;
1130 }
1131
1132 // Serialize functions
1133 if (this.s.options.serializeFunctions) {
1134 finalOptions.serializeFunctions = true;
1135 }
1136
1137 // Ignore undefined
1138 if (this.s.options.ignoreUndefined) {
1139 finalOptions.ignoreUndefined = true;
1140 }
1141
1142 // Is the bypassDocumentValidation options specific
1143 if (this.s.bypassDocumentValidation === true) {
1144 finalOptions.bypassDocumentValidation = true;
1145 }
1146
1147 // Is the checkKeys option disabled
1148 if (this.s.checkKeys === false) {
1149 finalOptions.checkKeys = false;
1150 }
1151
1152 if (finalOptions.retryWrites) {
1153 if (config.batch.batchType === UPDATE) {
1154 finalOptions.retryWrites =
1155 finalOptions.retryWrites && !config.batch.operations.some(op => op.multi);
1156 }
1157
1158 if (config.batch.batchType === REMOVE) {
1159 finalOptions.retryWrites =
1160 finalOptions.retryWrites && !config.batch.operations.some(op => op.limit === 0);
1161 }
1162 }
1163
1164 try {
1165 if (config.batch.batchType === INSERT) {
1166 this.s.topology.insert(
1167 this.s.namespace,
1168 config.batch.operations,
1169 finalOptions,
1170 config.resultHandler
1171 );
1172 } else if (config.batch.batchType === UPDATE) {
1173 this.s.topology.update(
1174 this.s.namespace,
1175 config.batch.operations,
1176 finalOptions,
1177 config.resultHandler
1178 );
1179 } else if (config.batch.batchType === REMOVE) {
1180 this.s.topology.remove(
1181 this.s.namespace,
1182 config.batch.operations,
1183 finalOptions,
1184 config.resultHandler
1185 );
1186 }
1187 } catch (err) {
1188 // Force top level error
1189 err.ok = 0;
1190 // Merge top level error and return
1191 handleCallback(callback, null, mergeBatchResults(config.batch, this.s.bulkResult, err, null));
1192 }
1193 }
1194
1195 /**
1196 * Handles the write error before executing commands
1197 *
1198 * An internal helper method. Do not invoke directly. Will be going away in the future
1199 *
1200 * @ignore
1201 * @param {function} callback
1202 * @param {BulkWriteResult} writeResult
1203 * @param {class} self either OrderedBulkOperation or UnorderdBulkOperation
1204 */
1205 handleWriteError(callback, writeResult) {
1206 if (this.s.bulkResult.writeErrors.length > 0) {
1207 if (this.s.bulkResult.writeErrors.length === 1) {
1208 handleCallback(
1209 callback,
1210 new BulkWriteError(toError(this.s.bulkResult.writeErrors[0]), writeResult),
1211 null
1212 );
1213 return true;
1214 }
1215
1216 const msg = this.s.bulkResult.writeErrors[0].errmsg
1217 ? this.s.bulkResult.writeErrors[0].errmsg
1218 : 'write operation failed';
1219
1220 handleCallback(
1221 callback,
1222 new BulkWriteError(
1223 toError({
1224 message: msg,
1225 code: this.s.bulkResult.writeErrors[0].code,
1226 writeErrors: this.s.bulkResult.writeErrors
1227 }),
1228 writeResult
1229 ),
1230 null
1231 );
1232 return true;
1233 } else if (writeResult.getWriteConcernError()) {
1234 handleCallback(
1235 callback,
1236 new BulkWriteError(toError(writeResult.getWriteConcernError()), writeResult),
1237 null
1238 );
1239 return true;
1240 }
1241 }
1242}
1243
1244Object.defineProperty(BulkOperationBase.prototype, 'length', {
1245 enumerable: true,
1246 get: function() {
1247 return this.s.currentIndex;
1248 }
1249});
1250
1251// Exports symbols
1252module.exports = {
1253 Batch,
1254 BulkOperationBase,
1255 bson,
1256 INSERT: INSERT,
1257 UPDATE: UPDATE,
1258 REMOVE: REMOVE,
1259 BulkWriteError
1260};