// Like through2 except execute in parallel with a set maximum // concurrency "use strict"; var through2 = require('through2'); function cbNoop (cb) { cb(); } module.exports = function concurrentThrough (options, transform, flush) { var concurrent = 0, lastCallback = null, pendingFinish = null; if (typeof options === 'function') { flush = transform; transform = options; options = {}; } var maxConcurrency = options.maxConcurrency || 16; function _transform (message, enc, callback) { var self = this; var callbackCalled = false; concurrent++; if (concurrent < maxConcurrency) { // Ask for more right away callback(); } else { // We're at the concurrency limit, save the callback for // when we're ready for more lastCallback = callback; } transform.call(this, message, enc, function (err) { // Ignore multiple calls of the callback (shouldn't ever // happen, but just in case) if (callbackCalled) return; callbackCalled = true; if (err) { self.emit('error', err); } else if (arguments.length > 1) { self.push(arguments[1]); } concurrent--; if (lastCallback) { var cb = lastCallback; lastCallback = null; cb(); } if (concurrent === 0 && pendingFinish) { pendingFinish(); pendingFinish = null; } }); } // We need to pass in final to through2 even if the caller has // not given us a final option so that it will wait for all // transform callbacks to complete before emitting a "finish" // and "end" event. if (typeof options.final !== 'function') { options.final = cbNoop; } // We also wrap flush to make sure anyone using an ancient version // of through2 without support for final will get the old behaviour. // TODO: don't wrap flush after upgrading through2 to a version with guaranteed `_final` if (typeof flush !== 'function') { flush = cbNoop; } // Flush is always called only after Final has finished // to ensure that data from Final gets processed, so we only need one pending callback at a time function callOnFinish (original) { return function (callback) { if (concurrent === 0) { original.call(this, callback); } else { pendingFinish = original.bind(this, callback); } } } options.final = callOnFinish(options.final); return through2(options, _transform, callOnFinish(flush)); }; module.exports.obj = function (options, transform, flush) { if (typeof options === 'function') { flush = transform; transform = options; options = {}; } options.objectMode = true; if (options.highWaterMark == null) { options.highWaterMark = 16; } return module.exports(options, transform, flush); };