'use strict'; const EventEmitter = require('events').EventEmitter; const crypto = require('crypto'); const debugOptions = require('./utils').debugOptions; const parseHeader = require('../wireprotocol/shared').parseHeader; const decompress = require('../wireprotocol/compression').decompress; const Response = require('./commands').Response; const BinMsg = require('./msg').BinMsg; const MongoNetworkError = require('../error').MongoNetworkError; const MongoError = require('../error').MongoError; const Logger = require('./logger'); const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED; const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG; const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; const Buffer = require('safe-buffer').Buffer; let _id = 0; const DEFAULT_MAX_BSON_MESSAGE_SIZE = 1024 * 1024 * 16 * 4; const DEBUG_FIELDS = [ 'host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout', 'socketTimeout', 'ssl', 'ca', 'crl', 'cert', 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity' ]; let connectionAccountingSpy = undefined; let connectionAccounting = false; let connections = {}; /** * A class representing a single connection to a MongoDB server * * @fires Connection#connect * @fires Connection#close * @fires Connection#error * @fires Connection#timeout * @fires Connection#parseError * @fires Connection#message */ class Connection extends EventEmitter { /** * Creates a new Connection instance * * @param {Socket} socket The socket this connection wraps * @param {Object} [options] Optional settings * @param {string} [options.host] The host the socket is connected to * @param {number} [options.port] The port used for the socket connection * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting * @param {boolean} [options.promoteLongs] Convert Long values from the db into Numbers if they fit into 53 bits * @param {boolean} [options.promoteValues] Promotes BSON values to native types where possible, set to false to only receive wrapper types. * @param {boolean} [options.promoteBuffers] Promotes Binary BSON values to native Node Buffers. */ constructor(socket, options) { super(); options = options || {}; if (!options.bson) { throw new TypeError('must pass in valid bson parser'); } this.id = _id++; this.options = options; this.logger = Logger('Connection', options); this.bson = options.bson; this.tag = options.tag; this.maxBsonMessageSize = options.maxBsonMessageSize || DEFAULT_MAX_BSON_MESSAGE_SIZE; this.port = options.port || 27017; this.host = options.host || 'localhost'; this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; // These values are inspected directly in tests, but maybe not necessary to keep around this.keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true; this.keepAliveInitialDelay = typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 300000; this.connectionTimeout = typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 30000; if (this.keepAliveInitialDelay > this.socketTimeout) { this.keepAliveInitialDelay = Math.round(this.socketTimeout / 2); } // Debug information if (this.logger.isDebug()) { this.logger.debug( `creating connection ${this.id} with options [${JSON.stringify( debugOptions(DEBUG_FIELDS, options) )}]` ); } // Response options this.responseOptions = { promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false }; // Flushing this.flushing = false; this.queue = []; // Internal state this.writeStream = null; this.destroyed = false; // Create hash method const hash = crypto.createHash('sha1'); hash.update(this.address); this.hashedName = hash.digest('hex'); // All operations in flight on the connection this.workItems = []; // setup socket this.socket = socket; this.socket.once('error', errorHandler(this)); this.socket.once('timeout', timeoutHandler(this)); this.socket.once('close', closeHandler(this)); this.socket.on('data', dataHandler(this)); if (connectionAccounting) { addConnection(this.id, this); } } setSocketTimeout(value) { if (this.socket) { this.socket.setTimeout(value); } } resetSocketTimeout() { if (this.socket) { this.socket.setTimeout(this.socketTimeout); } } static enableConnectionAccounting(spy) { if (spy) { connectionAccountingSpy = spy; } connectionAccounting = true; connections = {}; } static disableConnectionAccounting() { connectionAccounting = false; connectionAccountingSpy = undefined; } static connections() { return connections; } get address() { return `${this.host}:${this.port}`; } /** * Unref this connection * @method * @return {boolean} */ unref() { if (this.socket == null) { this.once('connect', () => this.socket.unref()); return; } this.socket.unref(); } /** * Destroy connection * @method */ destroy(options, callback) { if (typeof options === 'function') { callback = options; options = {}; } options = Object.assign({ force: false }, options); if (connectionAccounting) { deleteConnection(this.id); } if (this.socket == null) { this.destroyed = true; return; } if (options.force) { this.socket.destroy(); this.destroyed = true; if (typeof callback === 'function') callback(null, null); return; } this.socket.end(err => { this.destroyed = true; if (typeof callback === 'function') callback(err, null); }); } /** * Write to connection * @method * @param {Command} command Command to write out need to implement toBin and toBinUnified */ write(buffer) { // Debug Log if (this.logger.isDebug()) { if (!Array.isArray(buffer)) { this.logger.debug(`writing buffer [${buffer.toString('hex')}] to ${this.address}`); } else { for (let i = 0; i < buffer.length; i++) this.logger.debug(`writing buffer [${buffer[i].toString('hex')}] to ${this.address}`); } } // Double check that the connection is not destroyed if (this.socket.destroyed === false) { // Write out the command if (!Array.isArray(buffer)) { this.socket.write(buffer, 'binary'); return true; } // Iterate over all buffers and write them in order to the socket for (let i = 0; i < buffer.length; i++) { this.socket.write(buffer[i], 'binary'); } return true; } // Connection is destroyed return write failed return false; } /** * Return id of connection as a string * @method * @return {string} */ toString() { return '' + this.id; } /** * Return json object of connection * @method * @return {object} */ toJSON() { return { id: this.id, host: this.host, port: this.port }; } /** * Is the connection connected * @method * @return {boolean} */ isConnected() { if (this.destroyed) return false; return !this.socket.destroyed && this.socket.writable; } } function deleteConnection(id) { // console.log("=== deleted connection " + id + " :: " + (connections[id] ? connections[id].port : '')) delete connections[id]; if (connectionAccountingSpy) { connectionAccountingSpy.deleteConnection(id); } } function addConnection(id, connection) { // console.log("=== added connection " + id + " :: " + connection.port) connections[id] = connection; if (connectionAccountingSpy) { connectionAccountingSpy.addConnection(id, connection); } } // // Connection handlers function errorHandler(conn) { return function(err) { if (connectionAccounting) deleteConnection(conn.id); // Debug information if (conn.logger.isDebug()) { conn.logger.debug( `connection ${conn.id} for [${conn.address}] errored out with [${JSON.stringify(err)}]` ); } conn.emit('error', new MongoNetworkError(err), conn); }; } function timeoutHandler(conn) { return function() { if (connectionAccounting) deleteConnection(conn.id); if (conn.logger.isDebug()) { conn.logger.debug(`connection ${conn.id} for [${conn.address}] timed out`); } conn.emit( 'timeout', new MongoNetworkError(`connection ${conn.id} to ${conn.address} timed out`), conn ); }; } function closeHandler(conn) { return function(hadError) { if (connectionAccounting) deleteConnection(conn.id); if (conn.logger.isDebug()) { conn.logger.debug(`connection ${conn.id} with for [${conn.address}] closed`); } if (!hadError) { conn.emit( 'close', new MongoNetworkError(`connection ${conn.id} to ${conn.address} closed`), conn ); } }; } // Handle a message once it is received function processMessage(conn, message) { const msgHeader = parseHeader(message); if (msgHeader.opCode !== OP_COMPRESSED) { const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response; conn.emit( 'message', new ResponseConstructor( conn.bson, message, msgHeader, message.slice(MESSAGE_HEADER_SIZE), conn.responseOptions ), conn ); return; } msgHeader.fromCompressed = true; let index = MESSAGE_HEADER_SIZE; msgHeader.opCode = message.readInt32LE(index); index += 4; msgHeader.length = message.readInt32LE(index); index += 4; const compressorID = message[index]; index++; decompress(compressorID, message.slice(index), (err, decompressedMsgBody) => { if (err) { conn.emit('error', err); return; } if (decompressedMsgBody.length !== msgHeader.length) { conn.emit( 'error', new MongoError( 'Decompressing a compressed message from the server failed. The message is corrupt.' ) ); return; } const ResponseConstructor = msgHeader.opCode === OP_MSG ? BinMsg : Response; conn.emit( 'message', new ResponseConstructor( conn.bson, message, msgHeader, decompressedMsgBody, conn.responseOptions ), conn ); }); } function dataHandler(conn) { return function(data) { // Parse until we are done with the data while (data.length > 0) { // If we still have bytes to read on the current message if (conn.bytesRead > 0 && conn.sizeOfMessage > 0) { // Calculate the amount of remaining bytes const remainingBytesToRead = conn.sizeOfMessage - conn.bytesRead; // Check if the current chunk contains the rest of the message if (remainingBytesToRead > data.length) { // Copy the new data into the exiting buffer (should have been allocated when we know the message size) data.copy(conn.buffer, conn.bytesRead); // Adjust the number of bytes read so it point to the correct index in the buffer conn.bytesRead = conn.bytesRead + data.length; // Reset state of buffer data = Buffer.alloc(0); } else { // Copy the missing part of the data into our current buffer data.copy(conn.buffer, conn.bytesRead, 0, remainingBytesToRead); // Slice the overflow into a new buffer that we will then re-parse data = data.slice(remainingBytesToRead); // Emit current complete message const emitBuffer = conn.buffer; // Reset state of buffer conn.buffer = null; conn.sizeOfMessage = 0; conn.bytesRead = 0; conn.stubBuffer = null; processMessage(conn, emitBuffer); } } else { // Stub buffer is kept in case we don't get enough bytes to determine the // size of the message (< 4 bytes) if (conn.stubBuffer != null && conn.stubBuffer.length > 0) { // If we have enough bytes to determine the message size let's do it if (conn.stubBuffer.length + data.length > 4) { // Prepad the data const newData = Buffer.alloc(conn.stubBuffer.length + data.length); conn.stubBuffer.copy(newData, 0); data.copy(newData, conn.stubBuffer.length); // Reassign for parsing data = newData; // Reset state of buffer conn.buffer = null; conn.sizeOfMessage = 0; conn.bytesRead = 0; conn.stubBuffer = null; } else { // Add the the bytes to the stub buffer const newStubBuffer = Buffer.alloc(conn.stubBuffer.length + data.length); // Copy existing stub buffer conn.stubBuffer.copy(newStubBuffer, 0); // Copy missing part of the data data.copy(newStubBuffer, conn.stubBuffer.length); // Exit parsing loop data = Buffer.alloc(0); } } else { if (data.length > 4) { // Retrieve the message size const sizeOfMessage = data[0] | (data[1] << 8) | (data[2] << 16) | (data[3] << 24); // If we have a negative sizeOfMessage emit error and return if (sizeOfMessage < 0 || sizeOfMessage > conn.maxBsonMessageSize) { const errorObject = { err: 'socketHandler', trace: '', bin: conn.buffer, parseState: { sizeOfMessage: sizeOfMessage, bytesRead: conn.bytesRead, stubBuffer: conn.stubBuffer } }; // We got a parse Error fire it off then keep going conn.emit('parseError', errorObject, conn); return; } // Ensure that the size of message is larger than 0 and less than the max allowed if ( sizeOfMessage > 4 && sizeOfMessage < conn.maxBsonMessageSize && sizeOfMessage > data.length ) { conn.buffer = Buffer.alloc(sizeOfMessage); // Copy all the data into the buffer data.copy(conn.buffer, 0); // Update bytes read conn.bytesRead = data.length; // Update sizeOfMessage conn.sizeOfMessage = sizeOfMessage; // Ensure stub buffer is null conn.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); } else if ( sizeOfMessage > 4 && sizeOfMessage < conn.maxBsonMessageSize && sizeOfMessage === data.length ) { const emitBuffer = data; // Reset state of buffer conn.buffer = null; conn.sizeOfMessage = 0; conn.bytesRead = 0; conn.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); // Emit the message processMessage(conn, emitBuffer); } else if (sizeOfMessage <= 4 || sizeOfMessage > conn.maxBsonMessageSize) { const errorObject = { err: 'socketHandler', trace: null, bin: data, parseState: { sizeOfMessage: sizeOfMessage, bytesRead: 0, buffer: null, stubBuffer: null } }; // We got a parse Error fire it off then keep going conn.emit('parseError', errorObject, conn); // Clear out the state of the parser conn.buffer = null; conn.sizeOfMessage = 0; conn.bytesRead = 0; conn.stubBuffer = null; // Exit parsing loop data = Buffer.alloc(0); } else { const emitBuffer = data.slice(0, sizeOfMessage); // Reset state of buffer conn.buffer = null; conn.sizeOfMessage = 0; conn.bytesRead = 0; conn.stubBuffer = null; // Copy rest of message data = data.slice(sizeOfMessage); // Emit the message processMessage(conn, emitBuffer); } } else { // Create a buffer that contains the space for the non-complete message conn.stubBuffer = Buffer.alloc(data.length); // Copy the data to the stub buffer data.copy(conn.stubBuffer, 0); // Exit parsing loop data = Buffer.alloc(0); } } } } }; } /** * A server connect event, used to verify that the connection is up and running * * @event Connection#connect * @type {Connection} */ /** * The server connection closed, all pool connections closed * * @event Connection#close * @type {Connection} */ /** * The server connection caused an error, all pool connections closed * * @event Connection#error * @type {Connection} */ /** * The server connection timed out, all pool connections closed * * @event Connection#timeout * @type {Connection} */ /** * The driver experienced an invalid message, all pool connections closed * * @event Connection#parseError * @type {Connection} */ /** * An event emitted each time the connection receives a parsed message from the wire * * @event Connection#message * @type {Connection} */ module.exports = Connection;