1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- 'use strict';
-
- /*!
- * Module dependencies.
- */
-
- const async = require('async');
- const utils = require('../../utils');
-
- /**
- * Execute `fn` for every document in the cursor. If `fn` returns a promise,
- * will wait for the promise to resolve before iterating on to the next one.
- * Returns a promise that resolves when done.
- *
- * @param {Function} next the thunk to call to get the next document
- * @param {Function} fn
- * @param {Object} options
- * @param {Function} [callback] executed when all docs have been processed
- * @return {Promise}
- * @api public
- * @method eachAsync
- */
-
- module.exports = function eachAsync(next, fn, options, callback) {
- const parallel = options.parallel || 1;
-
- const handleNextResult = function(doc, callback) {
- const promise = fn(doc);
- if (promise && typeof promise.then === 'function') {
- promise.then(
- function() { callback(null); },
- function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
- } else {
- callback(null);
- }
- };
-
- const iterate = function(callback) {
- let drained = false;
- const nextQueue = async.queue(function(task, cb) {
- if (drained) return cb();
- next(function(err, doc) {
- if (err) return cb(err);
- if (!doc) drained = true;
- cb(null, doc);
- });
- }, 1);
-
- const getAndRun = function(cb) {
- nextQueue.push({}, function(err, doc) {
- if (err) return cb(err);
- if (!doc) return cb();
- handleNextResult(doc, function(err) {
- if (err) return cb(err);
- // Make sure to clear the stack re: gh-4697
- setTimeout(function() {
- getAndRun(cb);
- }, 0);
- });
- });
- };
-
- async.times(parallel, function(n, cb) {
- getAndRun(cb);
- }, callback);
- };
-
- return utils.promiseOrCallback(callback, cb => {
- iterate(cb);
- });
- };
|