Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ordered.js 6.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. 'use strict';
  2. const common = require('./common');
  3. const BulkOperationBase = common.BulkOperationBase;
  4. const utils = require('../utils');
  5. const toError = utils.toError;
  6. const handleCallback = utils.handleCallback;
  7. const BulkWriteResult = common.BulkWriteResult;
  8. const Batch = common.Batch;
  9. const mergeBatchResults = common.mergeBatchResults;
  10. const executeOperation = utils.executeOperation;
  11. const MongoWriteConcernError = require('mongodb-core').MongoWriteConcernError;
  12. const handleMongoWriteConcernError = require('./common').handleMongoWriteConcernError;
  13. const bson = common.bson;
  14. /**
  15. * Add to internal list of Operations
  16. *
  17. * @param {OrderedBulkOperation} bulkOperation
  18. * @param {number} docType number indicating the document type
  19. * @param {object} document
  20. * @return {OrderedBulkOperation}
  21. */
  22. function addToOperationsList(bulkOperation, docType, document) {
  23. // Get the bsonSize
  24. const bsonSize = bson.calculateObjectSize(document, {
  25. checkKeys: false
  26. });
  27. // Throw error if the doc is bigger than the max BSON size
  28. if (bsonSize >= bulkOperation.s.maxBatchSizeBytes)
  29. throw toError('document is larger than the maximum size ' + bulkOperation.s.maxBatchSizeBytes);
  30. // Create a new batch object if we don't have a current one
  31. if (bulkOperation.s.currentBatch == null)
  32. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  33. // Check if we need to create a new batch
  34. if (
  35. bulkOperation.s.currentBatchSize + 1 >= bulkOperation.s.maxWriteBatchSize ||
  36. bulkOperation.s.currentBatchSizeBytes + bulkOperation.s.currentBatchSizeBytes >=
  37. bulkOperation.s.maxBatchSizeBytes ||
  38. bulkOperation.s.currentBatch.batchType !== docType
  39. ) {
  40. // Save the batch to the execution stack
  41. bulkOperation.s.batches.push(bulkOperation.s.currentBatch);
  42. // Create a new batch
  43. bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
  44. // Reset the current size trackers
  45. bulkOperation.s.currentBatchSize = 0;
  46. bulkOperation.s.currentBatchSizeBytes = 0;
  47. } else {
  48. // Update current batch size
  49. bulkOperation.s.currentBatchSize = bulkOperation.s.currentBatchSize + 1;
  50. bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
  51. }
  52. if (docType === common.INSERT) {
  53. bulkOperation.s.bulkResult.insertedIds.push({
  54. index: bulkOperation.s.currentIndex,
  55. _id: document._id
  56. });
  57. }
  58. // We have an array of documents
  59. if (Array.isArray(document)) {
  60. throw toError('operation passed in cannot be an Array');
  61. } else {
  62. bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
  63. bulkOperation.s.currentBatch.operations.push(document);
  64. bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
  65. bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
  66. }
  67. // Return bulkOperation
  68. return bulkOperation;
  69. }
  70. /**
  71. * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  72. * @class
  73. * @property {number} length Get the number of operations in the bulk.
  74. * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
  75. */
  76. class OrderedBulkOperation extends BulkOperationBase {
  77. constructor(topology, collection, options) {
  78. options = options || {};
  79. options = Object.assign(options, { addToOperationsList });
  80. super(topology, collection, options, true);
  81. }
  82. /**
  83. * The callback format for results
  84. * @callback OrderedBulkOperation~resultCallback
  85. * @param {MongoError} error An error instance representing the error during the execution.
  86. * @param {BulkWriteResult} result The bulk write result.
  87. */
  88. /**
  89. * Execute the ordered bulk operation
  90. *
  91. * @method
  92. * @param {object} [options] Optional settings.
  93. * @param {(number|string)} [options.w] The write concern.
  94. * @param {number} [options.wtimeout] The write concern timeout.
  95. * @param {boolean} [options.j=false] Specify a journal write concern.
  96. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  97. * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
  98. * @throws {MongoError}
  99. * @return {Promise} returns Promise if no callback passed
  100. */
  101. execute(_writeConcern, options, callback) {
  102. const ret = this.bulkExecute(_writeConcern, options, callback);
  103. options = ret.options;
  104. callback = ret.callback;
  105. return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
  106. }
  107. }
  108. /**
  109. * Execute next write command in a chain
  110. *
  111. * @param {OrderedBulkOperation} bulkOperation
  112. * @param {object} options
  113. * @param {function} callback
  114. */
  115. function executeCommands(bulkOperation, options, callback) {
  116. if (bulkOperation.s.batches.length === 0) {
  117. return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
  118. }
  119. // Ordered execution of the command
  120. const batch = bulkOperation.s.batches.shift();
  121. function resultHandler(err, result) {
  122. // Error is a driver related error not a bulk op error, terminate
  123. if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
  124. return handleCallback(callback, err);
  125. }
  126. // If we have and error
  127. if (err) err.ok = 0;
  128. if (err instanceof MongoWriteConcernError) {
  129. return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback);
  130. }
  131. // Merge the results together
  132. const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
  133. const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result);
  134. if (mergeResult != null) {
  135. return handleCallback(callback, null, writeResult);
  136. }
  137. if (bulkOperation.handleWriteError(callback, writeResult)) return;
  138. // Execute the next command in line
  139. executeCommands(bulkOperation, options, callback);
  140. }
  141. bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
  142. }
  143. /**
  144. * Returns an unordered batch object
  145. * @ignore
  146. */
  147. function initializeOrderedBulkOp(topology, collection, options) {
  148. return new OrderedBulkOperation(topology, collection, options);
  149. }
  150. initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
  151. module.exports = initializeOrderedBulkOp;
  152. module.exports.Bulk = OrderedBulkOperation;