|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698 |
- /*!
- * ws: a node.js websocket client
- * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
- * MIT Licensed
- */
-
- var util = require('util')
- , Validation = require('./Validation').Validation
- , ErrorCodes = require('./ErrorCodes')
- , BufferPool = require('./BufferPool')
- , bufferUtil = require('./BufferUtil').BufferUtil
- , PerMessageDeflate = require('./PerMessageDeflate');
-
- /**
- * HyBi Receiver implementation
- */
-
- function Receiver (extensions) {
- // memory pool for fragmented messages
- var fragmentedPoolPrevUsed = -1;
- this.fragmentedBufferPool = new BufferPool(1024, function(db, length) {
- return db.used + length;
- }, function(db) {
- return fragmentedPoolPrevUsed = fragmentedPoolPrevUsed >= 0 ?
- (fragmentedPoolPrevUsed + db.used) / 2 :
- db.used;
- });
-
- // memory pool for unfragmented messages
- var unfragmentedPoolPrevUsed = -1;
- this.unfragmentedBufferPool = new BufferPool(1024, function(db, length) {
- return db.used + length;
- }, function(db) {
- return unfragmentedPoolPrevUsed = unfragmentedPoolPrevUsed >= 0 ?
- (unfragmentedPoolPrevUsed + db.used) / 2 :
- db.used;
- });
-
- this.extensions = extensions || {};
- this.state = {
- activeFragmentedOperation: null,
- lastFragment: false,
- masked: false,
- opcode: 0,
- fragmentedOperation: false
- };
- this.overflow = [];
- this.headerBuffer = new Buffer(10);
- this.expectOffset = 0;
- this.expectBuffer = null;
- this.expectHandler = null;
- this.currentMessage = [];
- this.messageHandlers = [];
- this.expectHeader(2, this.processPacket);
- this.dead = false;
- this.processing = false;
-
- this.onerror = function() {};
- this.ontext = function() {};
- this.onbinary = function() {};
- this.onclose = function() {};
- this.onping = function() {};
- this.onpong = function() {};
- }
-
- module.exports = Receiver;
-
- /**
- * Add new data to the parser.
- *
- * @api public
- */
-
- Receiver.prototype.add = function(data) {
- var dataLength = data.length;
- if (dataLength == 0) return;
- if (this.expectBuffer == null) {
- this.overflow.push(data);
- return;
- }
- var toRead = Math.min(dataLength, this.expectBuffer.length - this.expectOffset);
- fastCopy(toRead, data, this.expectBuffer, this.expectOffset);
- this.expectOffset += toRead;
- if (toRead < dataLength) {
- this.overflow.push(data.slice(toRead));
- }
- while (this.expectBuffer && this.expectOffset == this.expectBuffer.length) {
- var bufferForHandler = this.expectBuffer;
- this.expectBuffer = null;
- this.expectOffset = 0;
- this.expectHandler.call(this, bufferForHandler);
- }
- };
-
- /**
- * Releases all resources used by the receiver.
- *
- * @api public
- */
-
- Receiver.prototype.cleanup = function() {
- this.dead = true;
- this.overflow = null;
- this.headerBuffer = null;
- this.expectBuffer = null;
- this.expectHandler = null;
- this.unfragmentedBufferPool = null;
- this.fragmentedBufferPool = null;
- this.state = null;
- this.currentMessage = null;
- this.onerror = null;
- this.ontext = null;
- this.onbinary = null;
- this.onclose = null;
- this.onping = null;
- this.onpong = null;
- };
-
- /**
- * Waits for a certain amount of header bytes to be available, then fires a callback.
- *
- * @api private
- */
-
- Receiver.prototype.expectHeader = function(length, handler) {
- if (length == 0) {
- handler(null);
- return;
- }
- this.expectBuffer = this.headerBuffer.slice(this.expectOffset, this.expectOffset + length);
- this.expectHandler = handler;
- var toRead = length;
- while (toRead > 0 && this.overflow.length > 0) {
- var fromOverflow = this.overflow.pop();
- if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
- var read = Math.min(fromOverflow.length, toRead);
- fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
- this.expectOffset += read;
- toRead -= read;
- }
- };
-
- /**
- * Waits for a certain amount of data bytes to be available, then fires a callback.
- *
- * @api private
- */
-
- Receiver.prototype.expectData = function(length, handler) {
- if (length == 0) {
- handler(null);
- return;
- }
- this.expectBuffer = this.allocateFromPool(length, this.state.fragmentedOperation);
- this.expectHandler = handler;
- var toRead = length;
- while (toRead > 0 && this.overflow.length > 0) {
- var fromOverflow = this.overflow.pop();
- if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
- var read = Math.min(fromOverflow.length, toRead);
- fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
- this.expectOffset += read;
- toRead -= read;
- }
- };
-
- /**
- * Allocates memory from the buffer pool.
- *
- * @api private
- */
-
- Receiver.prototype.allocateFromPool = function(length, isFragmented) {
- return (isFragmented ? this.fragmentedBufferPool : this.unfragmentedBufferPool).get(length);
- };
-
- /**
- * Start processing a new packet.
- *
- * @api private
- */
-
- Receiver.prototype.processPacket = function (data) {
- if (this.extensions[PerMessageDeflate.extensionName]) {
- if ((data[0] & 0x30) != 0) {
- this.error('reserved fields (2, 3) must be empty', 1002);
- return;
- }
- } else {
- if ((data[0] & 0x70) != 0) {
- this.error('reserved fields must be empty', 1002);
- return;
- }
- }
- this.state.lastFragment = (data[0] & 0x80) == 0x80;
- this.state.masked = (data[1] & 0x80) == 0x80;
- var compressed = (data[0] & 0x40) == 0x40;
- var opcode = data[0] & 0xf;
- if (opcode === 0) {
- if (compressed) {
- this.error('continuation frame cannot have the Per-message Compressed bits', 1002);
- return;
- }
- // continuation frame
- this.state.fragmentedOperation = true;
- this.state.opcode = this.state.activeFragmentedOperation;
- if (!(this.state.opcode == 1 || this.state.opcode == 2)) {
- this.error('continuation frame cannot follow current opcode', 1002);
- return;
- }
- }
- else {
- if (opcode < 3 && this.state.activeFragmentedOperation != null) {
- this.error('data frames after the initial data frame must have opcode 0', 1002);
- return;
- }
- if (opcode >= 8 && compressed) {
- this.error('control frames cannot have the Per-message Compressed bits', 1002);
- return;
- }
- this.state.compressed = compressed;
- this.state.opcode = opcode;
- if (this.state.lastFragment === false) {
- this.state.fragmentedOperation = true;
- this.state.activeFragmentedOperation = opcode;
- }
- else this.state.fragmentedOperation = false;
- }
- var handler = opcodes[this.state.opcode];
- if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode, 1002);
- else {
- handler.start.call(this, data);
- }
- };
-
- /**
- * Endprocessing a packet.
- *
- * @api private
- */
-
- Receiver.prototype.endPacket = function() {
- if (!this.state.fragmentedOperation) this.unfragmentedBufferPool.reset(true);
- else if (this.state.lastFragment) this.fragmentedBufferPool.reset(false);
- this.expectOffset = 0;
- this.expectBuffer = null;
- this.expectHandler = null;
- if (this.state.lastFragment && this.state.opcode === this.state.activeFragmentedOperation) {
- // end current fragmented operation
- this.state.activeFragmentedOperation = null;
- }
- this.state.lastFragment = false;
- this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0;
- this.state.masked = false;
- this.expectHeader(2, this.processPacket);
- };
-
- /**
- * Reset the parser state.
- *
- * @api private
- */
-
- Receiver.prototype.reset = function() {
- if (this.dead) return;
- this.state = {
- activeFragmentedOperation: null,
- lastFragment: false,
- masked: false,
- opcode: 0,
- fragmentedOperation: false
- };
- this.fragmentedBufferPool.reset(true);
- this.unfragmentedBufferPool.reset(true);
- this.expectOffset = 0;
- this.expectBuffer = null;
- this.expectHandler = null;
- this.overflow = [];
- this.currentMessage = [];
- this.messageHandlers = [];
- };
-
- /**
- * Unmask received data.
- *
- * @api private
- */
-
- Receiver.prototype.unmask = function (mask, buf, binary) {
- if (mask != null && buf != null) bufferUtil.unmask(buf, mask);
- if (binary) return buf;
- return buf != null ? buf.toString('utf8') : '';
- };
-
- /**
- * Concatenates a list of buffers.
- *
- * @api private
- */
-
- Receiver.prototype.concatBuffers = function(buffers) {
- var length = 0;
- for (var i = 0, l = buffers.length; i < l; ++i) length += buffers[i].length;
- var mergedBuffer = new Buffer(length);
- bufferUtil.merge(mergedBuffer, buffers);
- return mergedBuffer;
- };
-
- /**
- * Handles an error
- *
- * @api private
- */
-
- Receiver.prototype.error = function (reason, protocolErrorCode) {
- this.reset();
- this.onerror(reason, protocolErrorCode);
- return this;
- };
-
- /**
- * Execute message handler buffers
- *
- * @api private
- */
-
- Receiver.prototype.flush = function() {
- if (this.processing || this.dead) return;
-
- var handler = this.messageHandlers.shift();
- if (!handler) return;
-
- this.processing = true;
- var self = this;
-
- handler(function() {
- self.processing = false;
- self.flush();
- });
- };
-
- /**
- * Apply extensions to message
- *
- * @api private
- */
-
- Receiver.prototype.applyExtensions = function(messageBuffer, fin, compressed, callback) {
- var self = this;
- if (compressed) {
- this.extensions[PerMessageDeflate.extensionName].decompress(messageBuffer, fin, function(err, buffer) {
- if (self.dead) return;
- if (err) {
- callback(new Error('invalid compressed data'));
- return;
- }
- callback(null, buffer);
- });
- } else {
- callback(null, messageBuffer);
- }
- };
-
- /**
- * Buffer utilities
- */
-
- function readUInt16BE(start) {
- return (this[start]<<8) +
- this[start+1];
- }
-
- function readUInt32BE(start) {
- return (this[start]<<24) +
- (this[start+1]<<16) +
- (this[start+2]<<8) +
- this[start+3];
- }
-
- function fastCopy(length, srcBuffer, dstBuffer, dstOffset) {
- switch (length) {
- default: srcBuffer.copy(dstBuffer, dstOffset, 0, length); break;
- case 16: dstBuffer[dstOffset+15] = srcBuffer[15];
- case 15: dstBuffer[dstOffset+14] = srcBuffer[14];
- case 14: dstBuffer[dstOffset+13] = srcBuffer[13];
- case 13: dstBuffer[dstOffset+12] = srcBuffer[12];
- case 12: dstBuffer[dstOffset+11] = srcBuffer[11];
- case 11: dstBuffer[dstOffset+10] = srcBuffer[10];
- case 10: dstBuffer[dstOffset+9] = srcBuffer[9];
- case 9: dstBuffer[dstOffset+8] = srcBuffer[8];
- case 8: dstBuffer[dstOffset+7] = srcBuffer[7];
- case 7: dstBuffer[dstOffset+6] = srcBuffer[6];
- case 6: dstBuffer[dstOffset+5] = srcBuffer[5];
- case 5: dstBuffer[dstOffset+4] = srcBuffer[4];
- case 4: dstBuffer[dstOffset+3] = srcBuffer[3];
- case 3: dstBuffer[dstOffset+2] = srcBuffer[2];
- case 2: dstBuffer[dstOffset+1] = srcBuffer[1];
- case 1: dstBuffer[dstOffset] = srcBuffer[0];
- }
- }
-
- function clone(obj) {
- var cloned = {};
- for (var k in obj) {
- if (obj.hasOwnProperty(k)) {
- cloned[k] = obj[k];
- }
- }
- return cloned;
- }
-
- /**
- * Opcode handlers
- */
-
- var opcodes = {
- // text
- '1': {
- start: function(data) {
- var self = this;
- // decode length
- var firstLength = data[1] & 0x7f;
- if (firstLength < 126) {
- opcodes['1'].getData.call(self, firstLength);
- }
- else if (firstLength == 126) {
- self.expectHeader(2, function(data) {
- opcodes['1'].getData.call(self, readUInt16BE.call(data, 0));
- });
- }
- else if (firstLength == 127) {
- self.expectHeader(8, function(data) {
- if (readUInt32BE.call(data, 0) != 0) {
- self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
- return;
- }
- opcodes['1'].getData.call(self, readUInt32BE.call(data, 4));
- });
- }
- },
- getData: function(length) {
- var self = this;
- if (self.state.masked) {
- self.expectHeader(4, function(data) {
- var mask = data;
- self.expectData(length, function(data) {
- opcodes['1'].finish.call(self, mask, data);
- });
- });
- }
- else {
- self.expectData(length, function(data) {
- opcodes['1'].finish.call(self, null, data);
- });
- }
- },
- finish: function(mask, data) {
- var self = this;
- var packet = this.unmask(mask, data, true) || new Buffer(0);
- var state = clone(this.state);
- this.messageHandlers.push(function(callback) {
- self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) {
- if (err) return self.error(err.message, 1007);
- if (buffer != null) self.currentMessage.push(buffer);
-
- if (state.lastFragment) {
- var messageBuffer = self.concatBuffers(self.currentMessage);
- self.currentMessage = [];
- if (!Validation.isValidUTF8(messageBuffer)) {
- self.error('invalid utf8 sequence', 1007);
- return;
- }
- self.ontext(messageBuffer.toString('utf8'), {masked: state.masked, buffer: messageBuffer});
- }
- callback();
- });
- });
- this.flush();
- this.endPacket();
- }
- },
- // binary
- '2': {
- start: function(data) {
- var self = this;
- // decode length
- var firstLength = data[1] & 0x7f;
- if (firstLength < 126) {
- opcodes['2'].getData.call(self, firstLength);
- }
- else if (firstLength == 126) {
- self.expectHeader(2, function(data) {
- opcodes['2'].getData.call(self, readUInt16BE.call(data, 0));
- });
- }
- else if (firstLength == 127) {
- self.expectHeader(8, function(data) {
- if (readUInt32BE.call(data, 0) != 0) {
- self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
- return;
- }
- opcodes['2'].getData.call(self, readUInt32BE.call(data, 4, true));
- });
- }
- },
- getData: function(length) {
- var self = this;
- if (self.state.masked) {
- self.expectHeader(4, function(data) {
- var mask = data;
- self.expectData(length, function(data) {
- opcodes['2'].finish.call(self, mask, data);
- });
- });
- }
- else {
- self.expectData(length, function(data) {
- opcodes['2'].finish.call(self, null, data);
- });
- }
- },
- finish: function(mask, data) {
- var self = this;
- var packet = this.unmask(mask, data, true) || new Buffer(0);
- var state = clone(this.state);
- this.messageHandlers.push(function(callback) {
- self.applyExtensions(packet, state.lastFragment, state.compressed, function(err, buffer) {
- if (err) return self.error(err.message, 1007);
- if (buffer != null) self.currentMessage.push(buffer);
- if (state.lastFragment) {
- var messageBuffer = self.concatBuffers(self.currentMessage);
- self.currentMessage = [];
- self.onbinary(messageBuffer, {masked: state.masked, buffer: messageBuffer});
- }
- callback();
- });
- });
- this.flush();
- this.endPacket();
- }
- },
- // close
- '8': {
- start: function(data) {
- var self = this;
- if (self.state.lastFragment == false) {
- self.error('fragmented close is not supported', 1002);
- return;
- }
-
- // decode length
- var firstLength = data[1] & 0x7f;
- if (firstLength < 126) {
- opcodes['8'].getData.call(self, firstLength);
- }
- else {
- self.error('control frames cannot have more than 125 bytes of data', 1002);
- }
- },
- getData: function(length) {
- var self = this;
- if (self.state.masked) {
- self.expectHeader(4, function(data) {
- var mask = data;
- self.expectData(length, function(data) {
- opcodes['8'].finish.call(self, mask, data);
- });
- });
- }
- else {
- self.expectData(length, function(data) {
- opcodes['8'].finish.call(self, null, data);
- });
- }
- },
- finish: function(mask, data) {
- var self = this;
- data = self.unmask(mask, data, true);
-
- var state = clone(this.state);
- this.messageHandlers.push(function() {
- if (data && data.length == 1) {
- self.error('close packets with data must be at least two bytes long', 1002);
- return;
- }
- var code = data && data.length > 1 ? readUInt16BE.call(data, 0) : 1000;
- if (!ErrorCodes.isValidErrorCode(code)) {
- self.error('invalid error code', 1002);
- return;
- }
- var message = '';
- if (data && data.length > 2) {
- var messageBuffer = data.slice(2);
- if (!Validation.isValidUTF8(messageBuffer)) {
- self.error('invalid utf8 sequence', 1007);
- return;
- }
- message = messageBuffer.toString('utf8');
- }
- self.onclose(code, message, {masked: state.masked});
- self.reset();
- });
- this.flush();
- },
- },
- // ping
- '9': {
- start: function(data) {
- var self = this;
- if (self.state.lastFragment == false) {
- self.error('fragmented ping is not supported', 1002);
- return;
- }
-
- // decode length
- var firstLength = data[1] & 0x7f;
- if (firstLength < 126) {
- opcodes['9'].getData.call(self, firstLength);
- }
- else {
- self.error('control frames cannot have more than 125 bytes of data', 1002);
- }
- },
- getData: function(length) {
- var self = this;
- if (self.state.masked) {
- self.expectHeader(4, function(data) {
- var mask = data;
- self.expectData(length, function(data) {
- opcodes['9'].finish.call(self, mask, data);
- });
- });
- }
- else {
- self.expectData(length, function(data) {
- opcodes['9'].finish.call(self, null, data);
- });
- }
- },
- finish: function(mask, data) {
- var self = this;
- data = this.unmask(mask, data, true);
- var state = clone(this.state);
- this.messageHandlers.push(function(callback) {
- self.onping(data, {masked: state.masked, binary: true});
- callback();
- });
- this.flush();
- this.endPacket();
- }
- },
- // pong
- '10': {
- start: function(data) {
- var self = this;
- if (self.state.lastFragment == false) {
- self.error('fragmented pong is not supported', 1002);
- return;
- }
-
- // decode length
- var firstLength = data[1] & 0x7f;
- if (firstLength < 126) {
- opcodes['10'].getData.call(self, firstLength);
- }
- else {
- self.error('control frames cannot have more than 125 bytes of data', 1002);
- }
- },
- getData: function(length) {
- var self = this;
- if (this.state.masked) {
- this.expectHeader(4, function(data) {
- var mask = data;
- self.expectData(length, function(data) {
- opcodes['10'].finish.call(self, mask, data);
- });
- });
- }
- else {
- this.expectData(length, function(data) {
- opcodes['10'].finish.call(self, null, data);
- });
- }
- },
- finish: function(mask, data) {
- var self = this;
- data = self.unmask(mask, data, true);
- var state = clone(this.state);
- this.messageHandlers.push(function(callback) {
- self.onpong(data, {masked: state.masked, binary: true});
- callback();
- });
- this.flush();
- this.endPacket();
- }
- }
- }
|