1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- 'use strict';
-
-
-
- const async = require('async');
- const utils = require('../../utils');
-
-
-
- module.exports = function eachAsync(next, fn, options, callback) {
- const parallel = options.parallel || 1;
-
- const handleNextResult = function(doc, callback) {
- const promise = fn(doc);
- if (promise && typeof promise.then === 'function') {
- promise.then(
- function() { callback(null); },
- function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
- } else {
- callback(null);
- }
- };
-
- const iterate = function(callback) {
- let drained = false;
- const nextQueue = async.queue(function(task, cb) {
- if (drained) return cb();
- next(function(err, doc) {
- if (err) return cb(err);
- if (!doc) drained = true;
- cb(null, doc);
- });
- }, 1);
-
- const getAndRun = function(cb) {
- nextQueue.push({}, function(err, doc) {
- if (err) return cb(err);
- if (!doc) return cb();
- handleNextResult(doc, function(err) {
- if (err) return cb(err);
-
- setTimeout(function() {
- getAndRun(cb);
- }, 0);
- });
- });
- };
-
- async.times(parallel, function(n, cb) {
- getAndRun(cb);
- }, callback);
- };
-
- return utils.promiseOrCallback(callback, cb => {
- iterate(cb);
- });
- };
|