'use strict'; /*! * Module dependencies. */ const async = require('async'); const utils = require('../../utils'); /** * Execute `fn` for every document in the cursor. If `fn` returns a promise, * will wait for the promise to resolve before iterating on to the next one. * Returns a promise that resolves when done. * * @param {Function} next the thunk to call to get the next document * @param {Function} fn * @param {Object} options * @param {Function} [callback] executed when all docs have been processed * @return {Promise} * @api public * @method eachAsync */ module.exports = function eachAsync(next, fn, options, callback) { const parallel = options.parallel || 1; const handleNextResult = function(doc, callback) { const promise = fn(doc); if (promise && typeof promise.then === 'function') { promise.then( function() { callback(null); }, function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); }); } else { callback(null); } }; const iterate = function(callback) { let drained = false; const nextQueue = async.queue(function(task, cb) { if (drained) return cb(); next(function(err, doc) { if (err) return cb(err); if (!doc) drained = true; cb(null, doc); }); }, 1); const getAndRun = function(cb) { nextQueue.push({}, function(err, doc) { if (err) return cb(err); if (!doc) return cb(); handleNextResult(doc, function(err) { if (err) return cb(err); // Make sure to clear the stack re: gh-4697 setTimeout(function() { getAndRun(cb); }, 0); }); }); }; async.times(parallel, function(n, cb) { getAndRun(cb); }, callback); }; return utils.promiseOrCallback(callback, cb => { iterate(cb); }); };