var through = require('through'); var bz2 = require('./lib/bzip2'); var bitIterator = require('./lib/bit_iterator'); module.exports = unbzip2Stream; function unbzip2Stream() { var bufferQueue = []; var hasBytes = 0; var blockSize = 0; var broken = false; var done = false; var bitReader = null; var streamCRC = null; function decompressBlock(push){ if(!blockSize){ blockSize = bz2.header(bitReader); //console.error("got header of", blockSize); streamCRC = 0; return true; }else{ var bufsize = 100000 * blockSize; var buf = new Int32Array(bufsize); var chunk = []; var f = function(b) { chunk.push(b); }; streamCRC = bz2.decompress(bitReader, f, buf, bufsize, streamCRC); if (streamCRC === null) { // reset for next bzip2 header blockSize = 0; return false; }else{ //console.error('decompressed', chunk.length,'bytes'); push(Buffer.from(chunk)); return true; } } } var outlength = 0; function decompressAndQueue(stream) { if (broken) return; try { return decompressBlock(function(d) { stream.queue(d); if (d !== null) { //console.error('write at', outlength.toString(16)); outlength += d.length; } else { //console.error('written EOS'); } }); } catch(e) { //console.error(e); stream.emit('error', e); broken = true; return false; } } return through( function write(data) { //console.error('received', data.length,'bytes in', typeof data); bufferQueue.push(data); hasBytes += data.length; if (bitReader === null) { bitReader = bitIterator(function() { return bufferQueue.shift(); }); } while (!broken && hasBytes - bitReader.bytesRead + 1 >= ((25000 + 100000 * blockSize) || 4)){ //console.error('decompressing with', hasBytes - bitReader.bytesRead + 1, 'bytes in buffer'); decompressAndQueue(this); } }, function end(x) { //console.error(x,'last compressing with', hasBytes, 'bytes in buffer'); while (!broken && bitReader && hasBytes > bitReader.bytesRead){ decompressAndQueue(this); } if (!broken) { if (streamCRC !== null) this.emit('error', new Error("input stream ended prematurely")); this.queue(null); } } ); }