123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937 |
- 'use strict';
-
- /*!
- * ws: a node.js websocket client
- * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
- * MIT Licensed
- */
-
- var url = require('url')
- , util = require('util')
- , http = require('http')
- , https = require('https')
- , crypto = require('crypto')
- , stream = require('stream')
- , Ultron = require('ultron')
- , Options = require('options')
- , Sender = require('./Sender')
- , Receiver = require('./Receiver')
- , SenderHixie = require('./Sender.hixie')
- , ReceiverHixie = require('./Receiver.hixie')
- , Extensions = require('./Extensions')
- , PerMessageDeflate = require('./PerMessageDeflate')
- , EventEmitter = require('events').EventEmitter;
-
- /**
- * Constants
- */
-
- // Default protocol version
-
- var protocolVersion = 13;
-
- // Close timeout
-
- var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly
-
- /**
- * WebSocket implementation
- *
- * @constructor
- * @param {String} address Connection address.
- * @param {String|Array} protocols WebSocket protocols.
- * @param {Object} options Additional connection options.
- * @api public
- */
- function WebSocket(address, protocols, options) {
- EventEmitter.call(this);
-
- if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
- // accept the "options" Object as the 2nd argument
- options = protocols;
- protocols = null;
- }
-
- if ('string' === typeof protocols) {
- protocols = [ protocols ];
- }
-
- if (!Array.isArray(protocols)) {
- protocols = [];
- }
-
- this._socket = null;
- this._ultron = null;
- this._closeReceived = false;
- this.bytesReceived = 0;
- this.readyState = null;
- this.supports = {};
- this.extensions = {};
-
- if (Array.isArray(address)) {
- initAsServerClient.apply(this, address.concat(options));
- } else {
- initAsClient.apply(this, [address, protocols, options]);
- }
- }
-
- /**
- * Inherits from EventEmitter.
- */
- util.inherits(WebSocket, EventEmitter);
-
- /**
- * Ready States
- */
- ["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) {
- WebSocket.prototype[state] = WebSocket[state] = index;
- });
-
- /**
- * Gracefully closes the connection, after sending a description message to the server
- *
- * @param {Object} data to be sent to the server
- * @api public
- */
- WebSocket.prototype.close = function close(code, data) {
- if (this.readyState === WebSocket.CLOSED) return;
-
- if (this.readyState === WebSocket.CONNECTING) {
- this.readyState = WebSocket.CLOSED;
- return;
- }
-
- if (this.readyState === WebSocket.CLOSING) {
- if (this._closeReceived && this._isServer) {
- this.terminate();
- }
- return;
- }
-
- var self = this;
- try {
- this.readyState = WebSocket.CLOSING;
- this._closeCode = code;
- this._closeMessage = data;
- var mask = !this._isServer;
- this._sender.close(code, data, mask, function(err) {
- if (err) self.emit('error', err);
-
- if (self._closeReceived && self._isServer) {
- self.terminate();
- } else {
- // ensure that the connection is cleaned up even when no response of closing handshake.
- clearTimeout(self._closeTimer);
- self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout);
- }
- });
- } catch (e) {
- this.emit('error', e);
- }
- };
-
- /**
- * Pause the client stream
- *
- * @api public
- */
- WebSocket.prototype.pause = function pauser() {
- if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
-
- return this._socket.pause();
- };
-
- /**
- * Sends a ping
- *
- * @param {Object} data to be sent to the server
- * @param {Object} Members - mask: boolean, binary: boolean
- * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
- * @api public
- */
- WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) {
- if (this.readyState !== WebSocket.OPEN) {
- if (dontFailWhenClosed === true) return;
- throw new Error('not opened');
- }
-
- options = options || {};
-
- if (typeof options.mask === 'undefined') options.mask = !this._isServer;
-
- this._sender.ping(data, options);
- };
-
- /**
- * Sends a pong
- *
- * @param {Object} data to be sent to the server
- * @param {Object} Members - mask: boolean, binary: boolean
- * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
- * @api public
- */
- WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) {
- if (this.readyState !== WebSocket.OPEN) {
- if (dontFailWhenClosed === true) return;
- throw new Error('not opened');
- }
-
- options = options || {};
-
- if (typeof options.mask === 'undefined') options.mask = !this._isServer;
-
- this._sender.pong(data, options);
- };
-
- /**
- * Resume the client stream
- *
- * @api public
- */
- WebSocket.prototype.resume = function resume() {
- if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
-
- return this._socket.resume();
- };
-
- /**
- * Sends a piece of data
- *
- * @param {Object} data to be sent to the server
- * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
- * @param {function} Optional callback which is executed after the send completes
- * @api public
- */
-
- WebSocket.prototype.send = function send(data, options, cb) {
- if (typeof options === 'function') {
- cb = options;
- options = {};
- }
-
- if (this.readyState !== WebSocket.OPEN) {
- if (typeof cb === 'function') cb(new Error('not opened'));
- else throw new Error('not opened');
- return;
- }
-
- if (!data) data = '';
- if (this._queue) {
- var self = this;
- this._queue.push(function() { self.send(data, options, cb); });
- return;
- }
-
- options = options || {};
- options.fin = true;
-
- if (typeof options.binary === 'undefined') {
- options.binary = (data instanceof ArrayBuffer || data instanceof Buffer ||
- data instanceof Uint8Array ||
- data instanceof Uint16Array ||
- data instanceof Uint32Array ||
- data instanceof Int8Array ||
- data instanceof Int16Array ||
- data instanceof Int32Array ||
- data instanceof Float32Array ||
- data instanceof Float64Array);
- }
-
- if (typeof options.mask === 'undefined') options.mask = !this._isServer;
- if (typeof options.compress === 'undefined') options.compress = true;
- if (!this.extensions[PerMessageDeflate.extensionName]) {
- options.compress = false;
- }
-
- var readable = typeof stream.Readable === 'function'
- ? stream.Readable
- : stream.Stream;
-
- if (data instanceof readable) {
- startQueue(this);
- var self = this;
-
- sendStream(this, data, options, function send(error) {
- process.nextTick(function tock() {
- executeQueueSends(self);
- });
-
- if (typeof cb === 'function') cb(error);
- });
- } else {
- this._sender.send(data, options, cb);
- }
- };
-
- /**
- * Streams data through calls to a user supplied function
- *
- * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
- * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'.
- * @api public
- */
- WebSocket.prototype.stream = function stream(options, cb) {
- if (typeof options === 'function') {
- cb = options;
- options = {};
- }
-
- var self = this;
-
- if (typeof cb !== 'function') throw new Error('callback must be provided');
-
- if (this.readyState !== WebSocket.OPEN) {
- if (typeof cb === 'function') cb(new Error('not opened'));
- else throw new Error('not opened');
- return;
- }
-
- if (this._queue) {
- this._queue.push(function () { self.stream(options, cb); });
- return;
- }
-
- options = options || {};
-
- if (typeof options.mask === 'undefined') options.mask = !this._isServer;
- if (typeof options.compress === 'undefined') options.compress = true;
- if (!this.extensions[PerMessageDeflate.extensionName]) {
- options.compress = false;
- }
-
- startQueue(this);
-
- function send(data, final) {
- try {
- if (self.readyState !== WebSocket.OPEN) throw new Error('not opened');
- options.fin = final === true;
- self._sender.send(data, options);
- if (!final) process.nextTick(cb.bind(null, null, send));
- else executeQueueSends(self);
- } catch (e) {
- if (typeof cb === 'function') cb(e);
- else {
- delete self._queue;
- self.emit('error', e);
- }
- }
- }
-
- process.nextTick(cb.bind(null, null, send));
- };
-
- /**
- * Immediately shuts down the connection
- *
- * @api public
- */
- WebSocket.prototype.terminate = function terminate() {
- if (this.readyState === WebSocket.CLOSED) return;
-
- if (this._socket) {
- this.readyState = WebSocket.CLOSING;
-
- // End the connection
- try { this._socket.end(); }
- catch (e) {
- // Socket error during end() call, so just destroy it right now
- cleanupWebsocketResources.call(this, true);
- return;
- }
-
- // Add a timeout to ensure that the connection is completely
- // cleaned up within 30 seconds, even if the clean close procedure
- // fails for whatever reason
- // First cleanup any pre-existing timeout from an earlier "terminate" call,
- // if one exists. Otherwise terminate calls in quick succession will leak timeouts
- // and hold the program open for `closeTimout` time.
- if (this._closeTimer) { clearTimeout(this._closeTimer); }
- this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout);
- } else if (this.readyState === WebSocket.CONNECTING) {
- cleanupWebsocketResources.call(this, true);
- }
- };
-
- /**
- * Expose bufferedAmount
- *
- * @api public
- */
- Object.defineProperty(WebSocket.prototype, 'bufferedAmount', {
- get: function get() {
- var amount = 0;
- if (this._socket) {
- amount = this._socket.bufferSize || 0;
- }
- return amount;
- }
- });
-
- /**
- * Emulates the W3C Browser based WebSocket interface using function members.
- *
- * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
- * @api public
- */
- ['open', 'error', 'close', 'message'].forEach(function(method) {
- Object.defineProperty(WebSocket.prototype, 'on' + method, {
- /**
- * Returns the current listener
- *
- * @returns {Mixed} the set function or undefined
- * @api public
- */
- get: function get() {
- var listener = this.listeners(method)[0];
- return listener ? (listener._listener ? listener._listener : listener) : undefined;
- },
-
- /**
- * Start listening for events
- *
- * @param {Function} listener the listener
- * @returns {Mixed} the set function or undefined
- * @api public
- */
- set: function set(listener) {
- this.removeAllListeners(method);
- this.addEventListener(method, listener);
- }
- });
- });
-
- /**
- * Emulates the W3C Browser based WebSocket interface using addEventListener.
- *
- * @see https://developer.mozilla.org/en/DOM/element.addEventListener
- * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
- * @api public
- */
- WebSocket.prototype.addEventListener = function(method, listener) {
- var target = this;
-
- function onMessage (data, flags) {
- listener.call(target, new MessageEvent(data, flags.binary ? 'Binary' : 'Text', target));
- }
-
- function onClose (code, message) {
- listener.call(target, new CloseEvent(code, message, target));
- }
-
- function onError (event) {
- event.target = target;
- listener.call(target, event);
- }
-
- function onOpen () {
- listener.call(target, new OpenEvent(target));
- }
-
- if (typeof listener === 'function') {
- if (method === 'message') {
- // store a reference so we can return the original function from the
- // addEventListener hook
- onMessage._listener = listener;
- this.on(method, onMessage);
- } else if (method === 'close') {
- // store a reference so we can return the original function from the
- // addEventListener hook
- onClose._listener = listener;
- this.on(method, onClose);
- } else if (method === 'error') {
- // store a reference so we can return the original function from the
- // addEventListener hook
- onError._listener = listener;
- this.on(method, onError);
- } else if (method === 'open') {
- // store a reference so we can return the original function from the
- // addEventListener hook
- onOpen._listener = listener;
- this.on(method, onOpen);
- } else {
- this.on(method, listener);
- }
- }
- };
-
- module.exports = WebSocket;
-
- /**
- * W3C MessageEvent
- *
- * @see http://www.w3.org/TR/html5/comms.html
- * @constructor
- * @api private
- */
- function MessageEvent(dataArg, typeArg, target) {
- this.data = dataArg;
- this.type = typeArg;
- this.target = target;
- }
-
- /**
- * W3C CloseEvent
- *
- * @see http://www.w3.org/TR/html5/comms.html
- * @constructor
- * @api private
- */
- function CloseEvent(code, reason, target) {
- this.wasClean = (typeof code === 'undefined' || code === 1000);
- this.code = code;
- this.reason = reason;
- this.target = target;
- }
-
- /**
- * W3C OpenEvent
- *
- * @see http://www.w3.org/TR/html5/comms.html
- * @constructor
- * @api private
- */
- function OpenEvent(target) {
- this.target = target;
- }
-
- /**
- * Entirely private apis,
- * which may or may not be bound to a sepcific WebSocket instance.
- */
- function initAsServerClient(req, socket, upgradeHead, options) {
- options = new Options({
- protocolVersion: protocolVersion,
- protocol: null,
- extensions: {}
- }).merge(options);
-
- // expose state properties
- this.protocol = options.value.protocol;
- this.protocolVersion = options.value.protocolVersion;
- this.extensions = options.value.extensions;
- this.supports.binary = (this.protocolVersion !== 'hixie-76');
- this.upgradeReq = req;
- this.readyState = WebSocket.CONNECTING;
- this._isServer = true;
-
- // establish connection
- if (options.value.protocolVersion === 'hixie-76') {
- establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead);
- } else {
- establishConnection.call(this, Receiver, Sender, socket, upgradeHead);
- }
- }
-
- function initAsClient(address, protocols, options) {
- options = new Options({
- origin: null,
- protocolVersion: protocolVersion,
- host: null,
- headers: null,
- protocol: protocols.join(','),
- agent: null,
-
- // ssl-related options
- pfx: null,
- key: null,
- passphrase: null,
- cert: null,
- ca: null,
- ciphers: null,
- rejectUnauthorized: null,
- perMessageDeflate: true
- }).merge(options);
-
- if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) {
- throw new Error('unsupported protocol version');
- }
-
- // verify URL and establish http class
- var serverUrl = url.parse(address);
- var isUnixSocket = serverUrl.protocol === 'ws+unix:';
- if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url');
- var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:';
- var httpObj = isSecure ? https : http;
- var port = serverUrl.port || (isSecure ? 443 : 80);
- var auth = serverUrl.auth;
-
- // prepare extensions
- var extensionsOffer = {};
- var perMessageDeflate;
- if (options.value.perMessageDeflate) {
- perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false);
- extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer();
- }
-
- // expose state properties
- this._isServer = false;
- this.url = address;
- this.protocolVersion = options.value.protocolVersion;
- this.supports.binary = (this.protocolVersion !== 'hixie-76');
-
- // begin handshake
- var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64');
- var shasum = crypto.createHash('sha1');
- shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
- var expectedServerKey = shasum.digest('base64');
-
- var agent = options.value.agent;
-
- var headerHost = serverUrl.hostname;
- // Append port number to Host and Origin header, only if specified in the url
- // and non-default
- if (serverUrl.port) {
- if ((isSecure && (port !== 443)) || (!isSecure && (port !== 80))){
- headerHost = headerHost + ':' + port;
- }
- }
-
- var requestOptions = {
- port: port,
- host: serverUrl.hostname,
- headers: {
- 'Connection': 'Upgrade',
- 'Upgrade': 'websocket',
- 'Host': headerHost,
- 'Origin': headerHost,
- 'Sec-WebSocket-Version': options.value.protocolVersion,
- 'Sec-WebSocket-Key': key
- }
- };
-
- // If we have basic auth.
- if (auth) {
- requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64');
- }
-
- if (options.value.protocol) {
- requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol;
- }
-
- if (options.value.host) {
- requestOptions.headers.Host = options.value.host;
- }
-
- if (options.value.headers) {
- for (var header in options.value.headers) {
- if (options.value.headers.hasOwnProperty(header)) {
- requestOptions.headers[header] = options.value.headers[header];
- }
- }
- }
-
- if (Object.keys(extensionsOffer).length) {
- requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer);
- }
-
- if (options.isDefinedAndNonNull('pfx')
- || options.isDefinedAndNonNull('key')
- || options.isDefinedAndNonNull('passphrase')
- || options.isDefinedAndNonNull('cert')
- || options.isDefinedAndNonNull('ca')
- || options.isDefinedAndNonNull('ciphers')
- || options.isDefinedAndNonNull('rejectUnauthorized')) {
-
- if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx;
- if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key;
- if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase;
- if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert;
- if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca;
- if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers;
- if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized;
-
- if (!agent) {
- // global agent ignores client side certificates
- agent = new httpObj.Agent(requestOptions);
- }
- }
-
- requestOptions.path = serverUrl.path || '/';
-
- if (agent) {
- requestOptions.agent = agent;
- }
-
- if (isUnixSocket) {
- requestOptions.socketPath = serverUrl.pathname;
- }
- if (options.value.origin) {
- if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin;
- else requestOptions.headers.Origin = options.value.origin;
- }
-
- var self = this;
- var req = httpObj.request(requestOptions);
-
- req.on('error', function onerror(error) {
- self.emit('error', error);
- cleanupWebsocketResources.call(this, error);
- });
-
- req.once('response', function response(res) {
- var error;
-
- if (!self.emit('unexpected-response', req, res)) {
- error = new Error('unexpected server response (' + res.statusCode + ')');
- req.abort();
- self.emit('error', error);
- }
-
- cleanupWebsocketResources.call(this, error);
- });
-
- req.once('upgrade', function upgrade(res, socket, upgradeHead) {
- if (self.readyState === WebSocket.CLOSED) {
- // client closed before server accepted connection
- self.emit('close');
- self.removeAllListeners();
- socket.end();
- return;
- }
-
- var serverKey = res.headers['sec-websocket-accept'];
- if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) {
- self.emit('error', 'invalid server key');
- self.removeAllListeners();
- socket.end();
- return;
- }
-
- var serverProt = res.headers['sec-websocket-protocol'];
- var protList = (options.value.protocol || "").split(/, */);
- var protError = null;
-
- if (!options.value.protocol && serverProt) {
- protError = 'server sent a subprotocol even though none requested';
- } else if (options.value.protocol && !serverProt) {
- protError = 'server sent no subprotocol even though requested';
- } else if (serverProt && protList.indexOf(serverProt) === -1) {
- protError = 'server responded with an invalid protocol';
- }
-
- if (protError) {
- self.emit('error', protError);
- self.removeAllListeners();
- socket.end();
- return;
- } else if (serverProt) {
- self.protocol = serverProt;
- }
-
- var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']);
- if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) {
- try {
- perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]);
- } catch (err) {
- self.emit('error', 'invalid extension parameter');
- self.removeAllListeners();
- socket.end();
- return;
- }
- self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
- }
-
- establishConnection.call(self, Receiver, Sender, socket, upgradeHead);
-
- // perform cleanup on http resources
- req.removeAllListeners();
- req = null;
- agent = null;
- });
-
- req.end();
- this.readyState = WebSocket.CONNECTING;
- }
-
- function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) {
- var ultron = this._ultron = new Ultron(socket);
- this._socket = socket;
-
- socket.setTimeout(0);
- socket.setNoDelay(true);
- var self = this;
- this._receiver = new ReceiverClass(this.extensions);
-
- // socket cleanup handlers
- ultron.on('end', cleanupWebsocketResources.bind(this));
- ultron.on('close', cleanupWebsocketResources.bind(this));
- ultron.on('error', cleanupWebsocketResources.bind(this));
-
- // ensure that the upgradeHead is added to the receiver
- function firstHandler(data) {
- if (self.readyState !== WebSocket.OPEN && self.readyState !== WebSocket.CLOSING) return;
-
- if (upgradeHead && upgradeHead.length > 0) {
- self.bytesReceived += upgradeHead.length;
- var head = upgradeHead;
- upgradeHead = null;
- self._receiver.add(head);
- }
-
- dataHandler = realHandler;
-
- if (data) {
- self.bytesReceived += data.length;
- self._receiver.add(data);
- }
- }
-
- // subsequent packets are pushed straight to the receiver
- function realHandler(data) {
- if (data) self.bytesReceived += data.length;
- self._receiver.add(data);
- }
-
- var dataHandler = firstHandler;
-
- // if data was passed along with the http upgrade,
- // this will schedule a push of that on to the receiver.
- // this has to be done on next tick, since the caller
- // hasn't had a chance to set event handlers on this client
- // object yet.
- process.nextTick(firstHandler);
-
- // receiver event handlers
- self._receiver.ontext = function ontext(data, flags) {
- flags = flags || {};
-
- self.emit('message', data, flags);
- };
-
- self._receiver.onbinary = function onbinary(data, flags) {
- flags = flags || {};
-
- flags.binary = true;
- self.emit('message', data, flags);
- };
-
- self._receiver.onping = function onping(data, flags) {
- flags = flags || {};
-
- self.pong(data, {
- mask: !self._isServer,
- binary: flags.binary === true
- }, true);
-
- self.emit('ping', data, flags);
- };
-
- self._receiver.onpong = function onpong(data, flags) {
- self.emit('pong', data, flags || {});
- };
-
- self._receiver.onclose = function onclose(code, data, flags) {
- flags = flags || {};
-
- self._closeReceived = true;
- self.close(code, data);
- };
-
- self._receiver.onerror = function onerror(reason, errorCode) {
- // close the connection when the receiver reports a HyBi error code
- self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, '');
- self.emit('error', reason, errorCode);
- };
-
- // finalize the client
- this._sender = new SenderClass(socket, this.extensions);
- this._sender.on('error', function onerror(error) {
- self.close(1002, '');
- self.emit('error', error);
- });
-
- this.readyState = WebSocket.OPEN;
- this.emit('open');
-
- ultron.on('data', dataHandler);
- }
-
- function startQueue(instance) {
- instance._queue = instance._queue || [];
- }
-
- function executeQueueSends(instance) {
- var queue = instance._queue;
- if (typeof queue === 'undefined') return;
-
- delete instance._queue;
- for (var i = 0, l = queue.length; i < l; ++i) {
- queue[i]();
- }
- }
-
- function sendStream(instance, stream, options, cb) {
- stream.on('data', function incoming(data) {
- if (instance.readyState !== WebSocket.OPEN) {
- if (typeof cb === 'function') cb(new Error('not opened'));
- else {
- delete instance._queue;
- instance.emit('error', new Error('not opened'));
- }
- return;
- }
-
- options.fin = false;
- instance._sender.send(data, options);
- });
-
- stream.on('end', function end() {
- if (instance.readyState !== WebSocket.OPEN) {
- if (typeof cb === 'function') cb(new Error('not opened'));
- else {
- delete instance._queue;
- instance.emit('error', new Error('not opened'));
- }
- return;
- }
-
- options.fin = true;
- instance._sender.send(null, options);
-
- if (typeof cb === 'function') cb(null);
- });
- }
-
- function cleanupWebsocketResources(error) {
- if (this.readyState === WebSocket.CLOSED) return;
-
- var emitClose = this.readyState !== WebSocket.CONNECTING;
- this.readyState = WebSocket.CLOSED;
-
- clearTimeout(this._closeTimer);
- this._closeTimer = null;
-
- if (emitClose) {
- this.emit('close', this._closeCode || 1000, this._closeMessage || '');
- }
-
- if (this._socket) {
- if (this._ultron) this._ultron.destroy();
- this._socket.on('error', function onerror() {
- try { this.destroy(); }
- catch (e) {}
- });
-
- try {
- if (!error) this._socket.end();
- else this._socket.destroy();
- } catch (e) { /* Ignore termination errors */ }
-
- this._socket = null;
- this._ultron = null;
- }
-
- if (this._sender) {
- this._sender.removeAllListeners();
- this._sender = null;
- }
-
- if (this._receiver) {
- this._receiver.cleanup();
- this._receiver = null;
- }
-
- this.removeAllListeners();
- this.on('error', function onerror() {}); // catch all errors after this
- delete this._queue;
- }
|