Ohm-Management - Projektarbeit B-ME

unordered.js 7.2KB

  1. 'use strict';
  2. const common = require('./common');
  3. const BulkOperationBase = common.BulkOperationBase;
  4. const utils = require('../utils');
  5. const toError = utils.toError;
  6. const handleCallback = utils.handleCallback;
  7. const BulkWriteResult = common.BulkWriteResult;
  8. const Batch = common.Batch;
  9. const mergeBatchResults = common.mergeBatchResults;
  10. const executeOperation = utils.executeOperation;
  11. const MongoWriteConcernError = require('mongodb-core').MongoWriteConcernError;
  12. const handleMongoWriteConcernError = require('./common').handleMongoWriteConcernError;
  13. const bson = common.bson;
  14. /**
  15. * Add to internal list of Operations
  16. *
  17. * @param {UnorderedBulkOperation} bulkOperation
  18. * @param {number} docType number indicating the document type
  19. * @param {object} document
  20. * @return {UnorderedBulkOperation}
  21. */
  22. function addToOperationsList(bulkOperation, docType, document) {
  23. // Get the bsonSize
  24. const bsonSize = bson.calculateObjectSize(document, {
  25. checkKeys: false
  26. });
  27. // Throw error if the doc is bigger than the max BSON size
  28. if (bsonSize >= bulkOperation.s.maxBatchSizeBytes)
  29. throw toError('document is larger than the maximum size ' + bulkOperation.s.maxBatchSizeBytes);
  30. // Holds the current batch
  31. bulkOperation.s.currentBatch = null;
  32. // Get the right type of batch
  33. if (docType === common.INSERT) {
  34. bulkOperation.s.currentBatch = bulkOperation.s.currentInsertBatch;
  35. } else if (docType === common.UPDATE) {
  36. bulkOperation.s.currentBatch = bulkOperation.s.currentUpdateBatch;
  37. } else if (docType === common.REMOVE) {
  38. bulkOperation.s.currentBatch = bulkOperation.s.currentRemoveBatch;
  39. }
  40. // Create a new batch object if we don't have a current one
  41. if (bulkOperation.s.currentBatch == null)
  42. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  43. // Check if we need to create a new batch
  44. if (
  45. bulkOperation.s.currentBatch.size + 1 >= bulkOperation.s.maxWriteBatchSize ||
  46. bulkOperation.s.currentBatch.sizeBytes + bsonSize >= bulkOperation.s.maxBatchSizeBytes ||
  47. bulkOperation.s.currentBatch.batchType !== docType
  48. ) {
  49. // Save the batch to the execution stack
  50. bulkOperation.s.batches.push(bulkOperation.s.currentBatch);
  51. // Create a new batch
  52. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  53. }
  54. // We have an array of documents
  55. if (Array.isArray(document)) {
  56. throw toError('operation passed in cannot be an Array');
  57. } else {
  58. bulkOperation.s.currentBatch.operations.push(document);
  59. bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
  60. bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
  61. }
  62. // Save back the current Batch to the right type
  63. if (docType === common.INSERT) {
  64. bulkOperation.s.currentInsertBatch = bulkOperation.s.currentBatch;
  65. bulkOperation.s.bulkResult.insertedIds.push({
  66. index: bulkOperation.s.bulkResult.insertedIds.length,
  67. _id: document._id
  68. });
  69. } else if (docType === common.UPDATE) {
  70. bulkOperation.s.currentUpdateBatch = bulkOperation.s.currentBatch;
  71. } else if (docType === common.REMOVE) {
  72. bulkOperation.s.currentRemoveBatch = bulkOperation.s.currentBatch;
  73. }
  74. // Update current batch size
  75. bulkOperation.s.currentBatch.size = bulkOperation.s.currentBatch.size + 1;
  76. bulkOperation.s.currentBatch.sizeBytes = bulkOperation.s.currentBatch.sizeBytes + bsonSize;
  77. // Return bulkOperation
  78. return bulkOperation;
  79. }
  80. /**
  81. * Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  82. * @class
  83. * @property {number} length Get the number of operations in the bulk.
  84. * @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
  85. */
  86. class UnorderedBulkOperation extends BulkOperationBase {
  87. constructor(topology, collection, options) {
  88. options = options || {};
  89. options = Object.assign(options, { addToOperationsList });
  90. super(topology, collection, options, false);
  91. }
  92. /**
  93. * The callback format for results
  94. * @callback UnorderedBulkOperation~resultCallback
  95. * @param {MongoError} error An error instance representing the error during the execution.
  96. * @param {BulkWriteResult} result The bulk write result.
  97. */
  98. /**
  99. * Execute the ordered bulk operation
  100. *
  101. * @method
  102. * @param {object} [options] Optional settings.
  103. * @param {(number|string)} [options.w] The write concern.
  104. * @param {number} [options.wtimeout] The write concern timeout.
  105. * @param {boolean} [options.j=false] Specify a journal write concern.
  106. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  107. * @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
  108. * @throws {MongoError}
  109. * @return {Promise} returns Promise if no callback passed
  110. */
  111. execute(_writeConcern, options, callback) {
  112. const ret = this.bulkExecute(_writeConcern, options, callback);
  113. options = ret.options;
  114. callback = ret.callback;
  115. return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
  116. }
  117. }
  118. /**
  119. * Execute the command
  120. *
  121. * @param {UnorderedBulkOperation} bulkOperation
  122. * @param {object} batch
  123. * @param {object} options
  124. * @param {function} callback
  125. */
  126. function executeBatch(bulkOperation, batch, options, callback) {
  127. function resultHandler(err, result) {
  128. // Error is a driver related error not a bulk op error, terminate
  129. if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
  130. return handleCallback(callback, err);
  131. }
  132. // If we have and error
  133. if (err) err.ok = 0;
  134. if (err instanceof MongoWriteConcernError) {
  135. return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, false, err, callback);
  136. }
  137. handleCallback(
  138. callback,
  139. null,
  140. mergeBatchResults(false, batch, bulkOperation.s.bulkResult, err, result)
  141. );
  142. }
  143. bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
  144. }
  145. /**
  146. * Execute all the commands
  147. *
  148. * @param {UnorderedBulkOperation} bulkOperation
  149. * @param {object} options
  150. * @param {function} callback
  151. */
  152. function executeBatches(bulkOperation, options, callback) {
  153. let numberOfCommandsToExecute = bulkOperation.s.batches.length;
  154. // Execute over all the batches
  155. for (let i = 0; i < bulkOperation.s.batches.length; i++) {
  156. executeBatch(bulkOperation, bulkOperation.s.batches[i], options, function(err) {
  157. // Count down the number of commands left to execute
  158. numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
  159. // Execute
  160. if (numberOfCommandsToExecute === 0) {
  161. // Driver level error
  162. if (err) return handleCallback(callback, err);
  163. const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
  164. if (bulkOperation.handleWriteError(callback, writeResult)) return;
  165. return handleCallback(callback, null, writeResult);
  166. }
  167. });
  168. }
  169. }
  170. /**
  171. * Returns an unordered batch object
  172. * @ignore
  173. */
  174. function initializeUnorderedBulkOp(topology, collection, options) {
  175. return new UnorderedBulkOperation(topology, collection, options);
  176. }
  177. initializeUnorderedBulkOp.UnorderedBulkOperation = UnorderedBulkOperation;
  178. module.exports = initializeUnorderedBulkOp;
  179. module.exports.Bulk = UnorderedBulkOperation;