'use strict' var fs = require('fs') var path = require('path') var test = require('tape').test var from = require('from2') var crypto = require('crypto') var sink = require('flush-write-stream') var pump = require('pump') var cloneable = require('./') test('basic passthrough', function (t) { t.plan(2) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) }) test('clone sync', function (t) { t.plan(4) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var cloned = instance.clone() t.notOk(read, 'stream not started') instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) cloned.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) }) test('clone async', function (t) { t.plan(4) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var cloned = instance.clone() t.notOk(read, 'stream not started') instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) setImmediate(function () { cloned.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) }) }) test('basic passthrough in obj mode', function (t) { t.plan(2) var read = false var source = from.obj(function (size, next) { if (read) { return this.push(null) } else { read = true this.push({ hello: 'world' }) } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') instance.pipe(sink.obj(function (chunk, enc, cb) { t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') cb() })) }) test('multiple clone in object mode', function (t) { t.plan(4) var read = false var source = from.obj(function (size, next) { if (read) { return this.push(null) } else { read = true this.push({ hello: 'world' }) } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var cloned = instance.clone() t.notOk(read, 'stream not started') instance.pipe(sink.obj(function (chunk, enc, cb) { t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') cb() })) setImmediate(function () { cloned.pipe(sink.obj(function (chunk, enc, cb) { t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') cb() })) }) }) test('basic passthrough with data event', function (t) { t.plan(2) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var data = '' instance.on('data', function (chunk) { data += chunk.toString() }) instance.on('end', function () { t.equal(data, 'hello world', 'chunk matches') }) }) test('basic passthrough with data event on clone', function (t) { t.plan(3) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) var cloned = instance.clone() t.notOk(read, 'stream not started') var data = '' cloned.on('data', function (chunk) { data += chunk.toString() }) cloned.on('end', function () { t.equal(data, 'hello world', 'chunk matches in clone') }) instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches in instance') cb() })) }) test('errors if cloned after start', function (t) { t.plan(2) var source = from(function (size, next) { this.push('hello world') this.push(null) next() }) var instance = cloneable(source) instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') t.throws(function () { instance.clone() }, 'throws if cloned after start') cb() })) }) test('basic passthrough with readable event', function (t) { t.plan(2) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var data = '' instance.on('readable', function () { var chunk while ((chunk = this.read()) !== null) { data += chunk.toString() } }) instance.on('end', function () { t.equal(data, 'hello world', 'chunk matches') }) }) test('basic passthrough with readable event on clone', function (t) { t.plan(3) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) var cloned = instance.clone() t.notOk(read, 'stream not started') var data = '' cloned.on('readable', function () { var chunk while ((chunk = this.read()) !== null) { data += chunk.toString() } }) cloned.on('end', function () { t.equal(data, 'hello world', 'chunk matches in clone') }) instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches in instance') cb() })) }) test('source error destroys all', function (t) { t.plan(3) var source = from() var instance = cloneable(source) var clone = instance.clone() source.on('error', function (err) { t.ok(err, 'source errors') instance.on('error', function (err2) { t.ok(err === err2, 'instance receives same error') }) clone.on('error', function (err3) { t.ok(err === err3, 'clone receives same error') }) }) source.emit('error', new Error()) }) test('source destroy destroys all', function (t) { t.plan(2) var source = from() var instance = cloneable(source) var clone = instance.clone() instance.on('end', function () { t.pass('instance has ended') }) clone.on('end', function () { t.pass('clone has ended') }) clone.resume() instance.resume() source.destroy() }) test('instance error destroys all but the source', function (t) { t.plan(2) var source = from() var instance = cloneable(source) var clone = instance.clone() source.on('close', function () { t.fail('source should not be closed') }) instance.on('error', function (err) { t.is(err.message, 'beep', 'instance errors') }) instance.on('close', function () { t.fail('close should not be emitted') }) clone.on('error', function (err) { t.is(err.message, 'beep', 'instance errors') }) clone.on('close', function () { t.fail('close should not be emitted') }) instance.destroy(new Error('beep')) }) test('instance destroy destroys all but the source', function (t) { t.plan(2) var source = from() var instance = cloneable(source) var clone = instance.clone() source.on('close', function () { t.fail('source should not be closed') }) instance.on('end', function () { t.pass('instance has ended') }) clone.on('end', function () { t.pass('clone has ended') }) instance.resume() clone.resume() instance.destroy() }) test('clone destroy does not affect other clones, cloneable or source', function (t) { t.plan(1) var source = from() var instance = cloneable(source) var clone = instance.clone() var other = instance.clone() source.on('close', function () { t.fail('source should not be closed') }) instance.on('close', function () { t.fail('instance should not be closed') }) other.on('close', function () { t.fail('other clone should not be closed') }) clone.on('close', function () { t.pass('clone is closed') }) clone.destroy() }) test('clone remains readable if other is destroyed', function (t) { t.plan(3) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello') } next() }) var instance = cloneable(source) var clone = instance.clone() var other = instance.clone() instance.pipe(sink.obj(function (chunk, enc, cb) { t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches') cb() })) clone.pipe(sink.obj(function (chunk, enc, cb) { t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches') cb() })) clone.on('close', function () { t.fail('clone should not be closed') }) instance.on('close', function () { t.fail('instance should not be closed') }) other.on('close', function () { t.pass('other is closed') }) other.destroy() }) test('clone of clone', function (t) { t.plan(6) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var cloned = instance.clone() t.notOk(read, 'stream not started') var replica = cloned.clone() t.notOk(read, 'stream not started') instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) cloned.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) replica.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), 'hello world', 'chunk matches') cb() })) }) test('from vinyl', function (t) { t.plan(3) var source = from(['wa', 'dup']) var instance = cloneable(source) var clone = instance.clone() var data = '' var data2 = '' var ends = 2 function latch () { if (--ends === 0) { t.equal(data, data2) } } instance.on('data', function (chunk) { data += chunk.toString() }) process.nextTick(function () { t.equal('', data, 'nothing was written yet') t.equal('', data2, 'nothing was written yet') clone.on('data', function (chunk) { data2 += chunk.toString() }) }) instance.on('end', latch) clone.on('end', latch) }) test('waits till all are flowing', function (t) { t.plan(1) var source = from(['wa', 'dup']) var instance = cloneable(source) // we create a clone instance.clone() instance.on('data', function (chunk) { t.fail('this should never happen') }) process.nextTick(function () { t.pass('wait till nextTick') }) }) test('isCloneable', function (t) { t.plan(4) var source = from(['hello', ' ', 'world']) t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable') var instance = cloneable(source) t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable') var clone = instance.clone() t.ok(cloneable.isCloneable(clone), 'a clone is cloneable') var cloneClone = clone.clone() t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable') }) test('emits finish', function (t) { var chunks = ['a', 'b', 'c', 'd', null] var e1 = ['a', 'b', 'c', 'd'] var e2 = ['a', 'b', 'c', 'd'] t.plan(2 + e1.length + e2.length) var source = from(function (size, next) { setImmediate(next, null, chunks.shift()) }) var instance = cloneable(source) var clone = instance.clone() clone.on('finish', t.pass.bind(null, 'clone emits finish')) instance.on('finish', t.pass.bind(null, 'main emits finish')) instance.pipe(sink(function (chunk, enc, cb) { t.equal(chunk.toString(), e1.shift(), 'chunk matches') cb() })) clone.on('data', function (chunk) { t.equal(chunk.toString(), e2.shift(), 'chunk matches') }) }) test('clone async w resume', function (t) { t.plan(4) var read = false var source = from(function (size, next) { if (read) { this.push(null) } else { read = true this.push('hello world') } next() }) var instance = cloneable(source) t.notOk(read, 'stream not started') var cloned = instance.clone() t.notOk(read, 'stream not started') instance.on('end', t.pass.bind(null, 'end emitted')) instance.resume() setImmediate(function () { cloned.on('end', t.pass.bind(null, 'end emitted')) cloned.resume() }) }) test('big file', function (t) { t.plan(13) var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big'))) var hash = crypto.createHash('sha1') hash.setEncoding('hex') var toCheck fs.createReadStream(path.join(__dirname, 'big')) .pipe(hash) .once('readable', function () { toCheck = hash.read() t.ok(toCheck) }) function pipe (s, num) { s.on('end', function () { t.pass('end for ' + num) }) var dest = path.join(__dirname, 'out') s.pipe(fs.createWriteStream(dest)) .on('finish', function () { t.pass('finish for ' + num) var destHash = crypto.createHash('sha1') destHash.setEncoding('hex') fs.createReadStream(dest) .pipe(destHash) .once('readable', function () { var hash = destHash.read() t.ok(hash) t.equal(hash, toCheck) }) }) } // Pipe in another event loop tick <-- this one finished only, it's the original cloneable. setImmediate(pipe.bind(null, stream, 1)) // Pipe in the same event loop tick pipe(stream.clone(), 0) // Pipe a long time after setTimeout(pipe.bind(null, stream.clone(), 2), 1000) }) test('pump error', function (t) { t.plan(1) var err = new Error('kaboom') pump([ cloneable(from(function () { this.destroy(err) })), sink(function (chunk, enc, cb) { t.fail('this should not be called') }) ], function (_err) { t.equal(_err, err) }) })