'use strict'; var Writable = require('flush-write-stream'); function listenerCount(stream, evt) { return stream.listeners(evt).length; } function hasListeners(stream) { return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data')); } function sinker(file, enc, callback) { callback(); } function sink(stream) { var sinkAdded = false; var sinkOptions = { objectMode: stream._readableState.objectMode, }; var sinkStream = new Writable(sinkOptions, sinker); function addSink() { if (sinkAdded) { return; } if (hasListeners(stream)) { return; } sinkAdded = true; stream.pipe(sinkStream); } function removeSink(evt) { if (evt !== 'readable' && evt !== 'data') { return; } if (hasListeners(stream)) { sinkAdded = false; stream.unpipe(sinkStream); } } stream.on('newListener', removeSink); stream.on('removeListener', removeSink); stream.on('removeListener', addSink); // Sink the stream to start flowing // Do this on nextTick, it will flow at slowest speed of piped streams process.nextTick(addSink); return stream; } module.exports = sink;