'use strict'; const inherits = require('util').inherits; const EventEmitter = require('events').EventEmitter; const Connection = require('./connection'); const MongoError = require('../error').MongoError; const MongoNetworkError = require('../error').MongoNetworkError; const MongoWriteConcernError = require('../error').MongoWriteConcernError; const Logger = require('./logger'); const f = require('util').format; const Query = require('./commands').Query; const CommandResult = require('./command_result'); const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; const opcodes = require('../wireprotocol/shared').opcodes; const compress = require('../wireprotocol/compression').compress; const compressorIDs = require('../wireprotocol/compression').compressorIDs; const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; const resolveClusterTime = require('../topologies/shared').resolveClusterTime; const apm = require('./apm'); const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders; const Buffer = require('safe-buffer').Buffer; var DISCONNECTED = 'disconnected'; var CONNECTING = 'connecting'; var CONNECTED = 'connected'; var DESTROYING = 'destroying'; var DESTROYED = 'destroyed'; var _id = 0; function hasSessionSupport(topology) { if (topology == null) return false; return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6; } /** * Creates a new Pool instance * @class * @param {string} options.host The server host * @param {number} options.port The server port * @param {number} [options.size=5] Max server connection pool size * @param {number} [options.minSize=0] Minimum server connection pool size * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled * @param {boolean} [options.noDelay=true] TCP Connection no delay * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket * @param {boolean} [options.ssl=false] Use SSL for connection * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. * @param {Buffer} [options.ca] SSL Certificate store binary buffer * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer * @param {Buffer} [options.cert] SSL Certificate binary buffer * @param {Buffer} [options.key] SSL Key file binary buffer * @param {string} [options.passPhrase] SSL Certificate pass phrase * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. * @fires Pool#connect * @fires Pool#close * @fires Pool#error * @fires Pool#timeout * @fires Pool#parseError * @return {Pool} A cursor instance */ var Pool = function(topology, options) { // Add event listener EventEmitter.call(this); // Store topology for later use this.topology = topology; // Add the options this.options = Object.assign( { // Host and port settings host: 'localhost', port: 27017, // Pool default max size size: 5, // Pool default min size minSize: 0, // socket settings connectionTimeout: 30000, socketTimeout: 360000, keepAlive: true, keepAliveInitialDelay: 300000, noDelay: true, // SSL Settings ssl: false, checkServerIdentity: true, ca: null, crl: null, cert: null, key: null, passPhrase: null, rejectUnauthorized: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, // Reconnection options reconnect: true, reconnectInterval: 1000, reconnectTries: 30, // Enable domains domainsEnabled: false }, options ); // Identification information this.id = _id++; // Current reconnect retries this.retriesLeft = this.options.reconnectTries; this.reconnectId = null; // No bson parser passed in if ( !options.bson || (options.bson && (typeof options.bson.serialize !== 'function' || typeof options.bson.deserialize !== 'function')) ) { throw new Error('must pass in valid bson parser'); } // Logger instance this.logger = Logger('Pool', options); // Pool state this.state = DISCONNECTED; // Connections this.availableConnections = []; this.inUseConnections = []; this.connectingConnections = []; // Currently executing this.executing = false; // Operation work queue this.queue = []; // All the authProviders this.authProviders = options.authProviders || defaultAuthProviders(options.bson); // Contains the reconnect connection this.reconnectConnection = null; // Are we currently authenticating this.authenticating = false; this.loggingout = false; this.nonAuthenticatedConnections = []; this.authenticatingTimestamp = null; // Number of consecutive timeouts caught this.numberOfConsecutiveTimeouts = 0; // Current pool Index this.connectionIndex = 0; }; inherits(Pool, EventEmitter); Object.defineProperty(Pool.prototype, 'size', { enumerable: true, get: function() { return this.options.size; } }); Object.defineProperty(Pool.prototype, 'minSize', { enumerable: true, get: function() { return this.options.minSize; } }); Object.defineProperty(Pool.prototype, 'connectionTimeout', { enumerable: true, get: function() { return this.options.connectionTimeout; } }); Object.defineProperty(Pool.prototype, 'socketTimeout', { enumerable: true, get: function() { return this.options.socketTimeout; } }); function stateTransition(self, newState) { var legalTransitions = { disconnected: [CONNECTING, DESTROYING, DISCONNECTED], connecting: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED], connected: [CONNECTED, DISCONNECTED, DESTROYING], destroying: [DESTROYING, DESTROYED], destroyed: [DESTROYED] }; // Get current state var legalStates = legalTransitions[self.state]; if (legalStates && legalStates.indexOf(newState) !== -1) { self.emit('stateChanged', self.state, newState); self.state = newState; } else { self.logger.error( f( 'Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]', self.id, self.state, newState, legalStates ) ); } } function authenticate(pool, auth, connection, cb) { if (auth[0] === undefined) return cb(null); // We need to authenticate the server var mechanism = auth[0]; var db = auth[1]; // Validate if the mechanism exists if (!pool.authProviders[mechanism]) { throw new MongoError(f('authMechanism %s not supported', mechanism)); } // Get the provider var provider = pool.authProviders[mechanism]; // Authenticate using the provided mechanism provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb])); } // The write function used by the authentication mechanism (bypasses external) function write(self) { return function(connection, command, callback) { // Get the raw buffer // Ensure we stop auth if pool was destroyed if (self.state === DESTROYED || self.state === DESTROYING) { return callback(new MongoError('pool destroyed')); } // Set the connection workItem callback connection.workItems.push({ cb: callback, command: true, requestId: command.requestId }); // Write the buffer out to the connection connection.write(command.toBin()); }; } function reauthenticate(pool, connection, cb) { // Authenticate function authenticateAgainstProvider(pool, connection, providers, cb) { // Finished re-authenticating against providers if (providers.length === 0) return cb(); // Get the provider name var provider = pool.authProviders[providers.pop()]; // Auth provider provider.reauthenticate(write(pool), [connection], function(err) { // We got an error return immediately if (err) return cb(err); // Continue authenticating the connection authenticateAgainstProvider(pool, connection, providers, cb); }); } // Start re-authenticating process authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb); } function connectionFailureHandler(self, event) { return function(err) { if (this._connectionFailHandled) return; this._connectionFailHandled = true; // Destroy the connection this.destroy(); // Remove the connection removeConnection(self, this); // Flush all work Items on this connection while (this.workItems.length > 0) { var workItem = this.workItems.shift(); if (workItem.cb) workItem.cb(err); } // Did we catch a timeout, increment the numberOfConsecutiveTimeouts if (event === 'timeout') { self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1; // Have we timed out more than reconnectTries in a row ? // Force close the pool as we are trying to connect to tcp sink hole if (self.numberOfConsecutiveTimeouts > self.options.reconnectTries) { self.numberOfConsecutiveTimeouts = 0; // Destroy all connections and pool self.destroy(true); // Emit close event return self.emit('close', self); } } // No more socket available propegate the event if (self.socketCount() === 0) { if (self.state !== DESTROYED && self.state !== DESTROYING) { stateTransition(self, DISCONNECTED); } // Do not emit error events, they are always close events // do not trigger the low level error handler in node event = event === 'error' ? 'close' : event; self.emit(event, err); } // Start reconnection attempts if (!self.reconnectId && self.options.reconnect) { self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval); } // Do we need to do anything to maintain the minimum pool size const totalConnections = self.availableConnections.length + self.connectingConnections.length + self.inUseConnections.length; if (totalConnections < self.minSize) { _createConnection(self); } }; } function attemptReconnect(self) { return function() { self.emit('attemptReconnect', self); if (self.state === DESTROYED || self.state === DESTROYING) return; // We are connected do not try again if (self.isConnected()) { self.reconnectId = null; return; } // If we have failure schedule a retry function _connectionFailureHandler(self) { return function() { if (this._connectionFailHandled) return; this._connectionFailHandled = true; // Destroy the connection this.destroy(); // Count down the number of reconnects self.retriesLeft = self.retriesLeft - 1; // How many retries are left if (self.retriesLeft <= 0) { // Destroy the instance self.destroy(); // Emit close event self.emit( 'reconnectFailed', new MongoNetworkError( f( 'failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval ) ) ); } else { self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval); } }; } // Got a connect handler function _connectHandler(self) { return function() { // Assign var connection = this; // Pool destroyed stop the connection if (self.state === DESTROYED || self.state === DESTROYING) { return connection.destroy(); } // Clear out all handlers handlers.forEach(function(event) { connection.removeAllListeners(event); }); // Reset reconnect id self.reconnectId = null; // Apply pool connection handlers connection.on('error', connectionFailureHandler(self, 'error')); connection.on('close', connectionFailureHandler(self, 'close')); connection.on('timeout', connectionFailureHandler(self, 'timeout')); connection.on('parseError', connectionFailureHandler(self, 'parseError')); // Apply any auth to the connection reauthenticate(self, this, function() { // Reset retries self.retriesLeft = self.options.reconnectTries; // Push to available connections self.availableConnections.push(connection); // Set the reconnectConnection to null self.reconnectConnection = null; // Emit reconnect event self.emit('reconnect', self); // Trigger execute to start everything up again _execute(self)(); }); }; } // Create a connection self.reconnectConnection = new Connection(messageHandler(self), self.options); // Add handlers self.reconnectConnection.on('close', _connectionFailureHandler(self, 'close')); self.reconnectConnection.on('error', _connectionFailureHandler(self, 'error')); self.reconnectConnection.on('timeout', _connectionFailureHandler(self, 'timeout')); self.reconnectConnection.on('parseError', _connectionFailureHandler(self, 'parseError')); // On connection self.reconnectConnection.on('connect', _connectHandler(self)); // Attempt connection self.reconnectConnection.connect(); }; } function moveConnectionBetween(connection, from, to) { var index = from.indexOf(connection); // Move the connection from connecting to available if (index !== -1) { from.splice(index, 1); to.push(connection); } } function messageHandler(self) { return function(message, connection) { // workItem to execute var workItem = null; // Locate the workItem for (var i = 0; i < connection.workItems.length; i++) { if (connection.workItems[i].requestId === message.responseTo) { // Get the callback workItem = connection.workItems[i]; // Remove from list of workItems connection.workItems.splice(i, 1); } } // Reset timeout counter self.numberOfConsecutiveTimeouts = 0; // Reset the connection timeout if we modified it for // this operation if (workItem && workItem.socketTimeout) { connection.resetSocketTimeout(); } // Log if debug enabled if (self.logger.isDebug()) { self.logger.debug( f( 'message [%s] received from %s:%s', message.raw.toString('hex'), self.options.host, self.options.port ) ); } // Authenticate any straggler connections function authenticateStragglers(self, connection, callback) { // Get any non authenticated connections var connections = self.nonAuthenticatedConnections.slice(0); var nonAuthenticatedConnections = self.nonAuthenticatedConnections; self.nonAuthenticatedConnections = []; // Establish if the connection need to be authenticated // Add to authentication list if // 1. we were in an authentication process when the operation was executed // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened if ( connection.workItems.length === 1 && (connection.workItems[0].authenticating === true || (typeof connection.workItems[0].authenticatingTimestamp === 'number' && connection.workItems[0].authenticatingTimestamp !== self.authenticatingTimestamp)) ) { // Add connection to the list connections.push(connection); } // No connections need to be re-authenticated if (connections.length === 0) { // Release the connection back to the pool moveConnectionBetween(connection, self.inUseConnections, self.availableConnections); // Finish return callback(); } // Apply re-authentication to all connections before releasing back to pool var connectionCount = connections.length; // Authenticate all connections for (var i = 0; i < connectionCount; i++) { reauthenticate(self, connections[i], function() { connectionCount = connectionCount - 1; if (connectionCount === 0) { // Put non authenticated connections in available connections self.availableConnections = self.availableConnections.concat( nonAuthenticatedConnections ); // Release the connection back to the pool moveConnectionBetween(connection, self.inUseConnections, self.availableConnections); // Return callback(); } }); } } function handleOperationCallback(self, cb, err, result) { // No domain enabled if (!self.options.domainsEnabled) { return process.nextTick(function() { return cb(err, result); }); } // Domain enabled just call the callback cb(err, result); } authenticateStragglers(self, connection, function() { // Keep executing, ensure current message handler does not stop execution if (!self.executing) { process.nextTick(function() { _execute(self)(); }); } // Time to dispatch the message if we have a callback if (workItem && !workItem.immediateRelease) { try { // Parse the message according to the provided options message.parse(workItem); } catch (err) { return handleOperationCallback(self, workItem.cb, new MongoError(err)); } // Look for clusterTime, and operationTime and update them if necessary if (message.documents[0]) { if (message.documents[0].$clusterTime) { const $clusterTime = message.documents[0].$clusterTime; self.topology.clusterTime = $clusterTime; if (workItem.session != null) { resolveClusterTime(workItem.session, $clusterTime); } } if ( message.documents[0].operationTime && workItem.session && workItem.session.supports.causalConsistency ) { workItem.session.advanceOperationTime(message.documents[0].operationTime); } } // Establish if we have an error if (workItem.command && message.documents[0]) { const responseDoc = message.documents[0]; if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) { return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc)); } if (responseDoc.writeConcernError) { const err = responseDoc.ok === 1 ? new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc) : new MongoWriteConcernError(responseDoc.writeConcernError); return handleOperationCallback(self, workItem.cb, err); } } // Add the connection details message.hashedName = connection.hashedName; // Return the documents handleOperationCallback( self, workItem.cb, null, new CommandResult( workItem.fullResult ? message : message.documents[0], connection, message ) ); } }); }; } /** * Return the total socket count in the pool. * @method * @return {Number} The number of socket available. */ Pool.prototype.socketCount = function() { return this.availableConnections.length + this.inUseConnections.length; // + this.connectingConnections.length; }; /** * Return all pool connections * @method * @return {Connection[]} The pool connections */ Pool.prototype.allConnections = function() { return this.availableConnections.concat(this.inUseConnections).concat(this.connectingConnections); }; /** * Get a pool connection (round-robin) * @method * @return {Connection} */ Pool.prototype.get = function() { return this.allConnections()[0]; }; /** * Is the pool connected * @method * @return {boolean} */ Pool.prototype.isConnected = function() { // We are in a destroyed state if (this.state === DESTROYED || this.state === DESTROYING) { return false; } // Get connections var connections = this.availableConnections.concat(this.inUseConnections); // Check if we have any connected connections for (var i = 0; i < connections.length; i++) { if (connections[i].isConnected()) return true; } // Might be authenticating, but we are still connected if (connections.length === 0 && this.authenticating) { return true; } // Not connected return false; }; /** * Was the pool destroyed * @method * @return {boolean} */ Pool.prototype.isDestroyed = function() { return this.state === DESTROYED || this.state === DESTROYING; }; /** * Is the pool in a disconnected state * @method * @return {boolean} */ Pool.prototype.isDisconnected = function() { return this.state === DISCONNECTED; }; /** * Connect pool * @method */ Pool.prototype.connect = function() { if (this.state !== DISCONNECTED) { throw new MongoError('connection in unlawful state ' + this.state); } var self = this; // Transition to connecting state stateTransition(this, CONNECTING); // Create an array of the arguments var args = Array.prototype.slice.call(arguments, 0); // Create a connection var connection = new Connection(messageHandler(self), this.options); // Add to list of connections this.connectingConnections.push(connection); // Add listeners to the connection connection.once('connect', function(connection) { if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); // If we are in a topology, delegate the auth to it // This is to avoid issues where we would auth against an // arbiter if (self.options.inTopology) { // Set connected mode stateTransition(self, CONNECTED); // Move the active connection moveConnectionBetween(connection, self.connectingConnections, self.availableConnections); // Emit the connect event return self.emit('connect', self); } // Apply any store credentials reauthenticate(self, connection, function(err) { if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); // We have an error emit it if (err) { // Destroy the pool self.destroy(); // Emit the error return self.emit('error', err); } // Authenticate authenticate(self, args, connection, function(err) { if (self.state === DESTROYED || self.state === DESTROYING) return self.destroy(); // We have an error emit it if (err) { // Destroy the pool self.destroy(); // Emit the error return self.emit('error', err); } // Set connected mode stateTransition(self, CONNECTED); // Move the active connection moveConnectionBetween(connection, self.connectingConnections, self.availableConnections); // if we have a minPoolSize, create a connection if (self.minSize) { for (let i = 0; i < self.minSize; i++) _createConnection(self); } // Emit the connect event self.emit('connect', self); }); }); }); // Add error handlers connection.once('error', connectionFailureHandler(this, 'error')); connection.once('close', connectionFailureHandler(this, 'close')); connection.once('timeout', connectionFailureHandler(this, 'timeout')); connection.once('parseError', connectionFailureHandler(this, 'parseError')); try { connection.connect(); } catch (err) { // SSL or something threw on connect process.nextTick(function() { self.emit('error', err); }); } }; /** * Authenticate using a specified mechanism * @method * @param {string} mechanism The Auth mechanism we are invoking * @param {string} db The db we are invoking the mechanism against * @param {...object} param Parameters for the specific mechanism * @param {authResultCallback} callback A callback function */ Pool.prototype.auth = function(mechanism) { var self = this; var args = Array.prototype.slice.call(arguments, 0); var callback = args.pop(); // If we don't have the mechanism fail if (self.authProviders[mechanism] == null && mechanism !== 'default') { throw new MongoError(f('auth provider %s does not exist', mechanism)); } // Signal that we are authenticating a new set of credentials this.authenticating = true; this.authenticatingTimestamp = new Date().getTime(); // Authenticate all live connections function authenticateLiveConnections(self, args, cb) { // Get the current viable connections var connections = self.allConnections(); // Allow nothing else to use the connections while we authenticate them self.availableConnections = []; self.inUseConnections = []; self.connectingConnections = []; var connectionsCount = connections.length; var error = null; // No connections available, return if (connectionsCount === 0) { self.authenticating = false; return callback(null); } // Authenticate the connections for (var i = 0; i < connections.length; i++) { authenticate(self, args, connections[i], function(err, result) { connectionsCount = connectionsCount - 1; // Store the error if (err) error = err; // Processed all connections if (connectionsCount === 0) { // Auth finished self.authenticating = false; // Add the connections back to available connections self.availableConnections = self.availableConnections.concat(connections); // We had an error, return it if (error) { // Log the error if (self.logger.isError()) { self.logger.error( f( '[%s] failed to authenticate against server %s:%s', self.id, self.options.host, self.options.port ) ); } return cb(error, result); } cb(null, result); } }); } } // Wait for a logout in process to happen function waitForLogout(self, cb) { if (!self.loggingout) return cb(); setTimeout(function() { waitForLogout(self, cb); }, 1); } // Wait for loggout to finish waitForLogout(self, function() { // Authenticate all live connections authenticateLiveConnections(self, args, function(err, result) { // Credentials correctly stored in auth provider if successful // Any new connections will now reauthenticate correctly self.authenticating = false; // Return after authentication connections callback(err, result); }); }); }; /** * Logout all users against a database * @method * @param {string} dbName The database name * @param {authResultCallback} callback A callback function */ Pool.prototype.logout = function(dbName, callback) { var self = this; if (typeof dbName !== 'string') { throw new MongoError('logout method requires a db name as first argument'); } if (typeof callback !== 'function') { throw new MongoError('logout method requires a callback'); } // Indicate logout in process this.loggingout = true; // Get all relevant connections var connections = self.availableConnections.concat(self.inUseConnections); var count = connections.length; // Store any error var error = null; // Send logout command over all the connections for (var i = 0; i < connections.length; i++) { write(self)( connections[i], new Query( this.options.bson, f('%s.$cmd', dbName), { logout: 1 }, { numberToSkip: 0, numberToReturn: 1 } ), function(err) { count = count - 1; if (err) error = err; if (count === 0) { self.loggingout = false; callback(error); } } ); } }; /** * Unref the pool * @method */ Pool.prototype.unref = function() { // Get all the known connections var connections = this.availableConnections .concat(this.inUseConnections) .concat(this.connectingConnections); connections.forEach(function(c) { c.unref(); }); }; // Events var events = ['error', 'close', 'timeout', 'parseError', 'connect']; // Destroy the connections function destroy(self, connections) { // Destroy all connections connections.forEach(function(c) { // Remove all listeners for (var i = 0; i < events.length; i++) { c.removeAllListeners(events[i]); } // Destroy connection c.destroy(); }); // Zero out all connections self.inUseConnections = []; self.availableConnections = []; self.nonAuthenticatedConnections = []; self.connectingConnections = []; // Set state to destroyed stateTransition(self, DESTROYED); } /** * Destroy pool * @method */ Pool.prototype.destroy = function(force) { var self = this; // Do not try again if the pool is already dead if (this.state === DESTROYED || self.state === DESTROYING) return; // Set state to destroyed stateTransition(this, DESTROYING); // Are we force closing if (force) { // Get all the known connections var connections = self.availableConnections .concat(self.inUseConnections) .concat(self.nonAuthenticatedConnections) .concat(self.connectingConnections); // Flush any remaining work items with // an error while (self.queue.length > 0) { var workItem = self.queue.shift(); if (typeof workItem.cb === 'function') { workItem.cb(new MongoError('Pool was force destroyed')); } } // Destroy the topology return destroy(self, connections); } // Clear out the reconnect if set if (this.reconnectId) { clearTimeout(this.reconnectId); } // If we have a reconnect connection running, close // immediately if (this.reconnectConnection) { this.reconnectConnection.destroy(); } // Wait for the operations to drain before we close the pool function checkStatus() { flushMonitoringOperations(self.queue); if (self.queue.length === 0) { // Get all the known connections var connections = self.availableConnections .concat(self.inUseConnections) .concat(self.nonAuthenticatedConnections) .concat(self.connectingConnections); // Check if we have any in flight operations for (var i = 0; i < connections.length; i++) { // There is an operation still in flight, reschedule a // check waiting for it to drain if (connections[i].workItems.length > 0) { return setTimeout(checkStatus, 1); } } destroy(self, connections); // } else if (self.queue.length > 0 && !this.reconnectId) { } else { // Ensure we empty the queue _execute(self)(); // Set timeout setTimeout(checkStatus, 1); } } // Initiate drain of operations checkStatus(); }; // Prepare the buffer that Pool.prototype.write() uses to send to the server var serializeCommands = function(self, commands, result, callback) { // Base case when there are no more commands to serialize if (commands.length === 0) return callback(null, result); // Pop off the zeroth command and serialize it var thisCommand = commands.shift(); var originalCommandBuffer = thisCommand.toBin(); // Check whether we and the server have agreed to use a compressor if (self.options.agreedCompressor && !hasUncompressibleCommands(thisCommand)) { // Transform originalCommandBuffer into OP_COMPRESSED var concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer); var messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); // Extract information needed for OP_COMPRESSED from the uncompressed message var originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); // Compress the message body compress(self, messageToBeCompressed, function(err, compressedMessage) { if (err) return callback(err, null); // Create the msgHeader of OP_COMPRESSED var msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + 9 + compressedMessage.length, 0); // messageLength msgHeader.writeInt32LE(thisCommand.requestId, 4); // requestID msgHeader.writeInt32LE(0, 8); // responseTo (zero) msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode // Create the compression details of OP_COMPRESSED var compressionDetails = Buffer.alloc(9); compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID // Push the concatenation of the OP_COMPRESSED message onto results result.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); // Continue recursing through the commands array serializeCommands(self, commands, result, callback); }); } else { // Push the serialization of the command onto results result.push(originalCommandBuffer); // Continue recursing through the commands array serializeCommands(self, commands, result, callback); } }; /** * Write a message to MongoDB * @method * @return {Connection} */ Pool.prototype.write = function(commands, options, cb) { var self = this; // Ensure we have a callback if (typeof options === 'function') { cb = options; } // Always have options options = options || {}; // We need to have a callback function unless the message returns no response if (!(typeof cb === 'function') && !options.noResponse) { throw new MongoError('write method must provide a callback'); } // Pool was destroyed error out if (this.state === DESTROYED || this.state === DESTROYING) { // Callback with an error if (cb) { try { cb(new MongoError('pool destroyed')); } catch (err) { process.nextTick(function() { throw err; }); } } return; } if (this.options.domainsEnabled && process.domain && typeof cb === 'function') { // if we have a domain bind to it var oldCb = cb; cb = process.domain.bind(function() { // v8 - argumentsToArray one-liner var args = new Array(arguments.length); for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; } // bounce off event loop so domain switch takes place process.nextTick(function() { oldCb.apply(null, args); }); }); } // Do we have an operation var operation = { cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false }; // Set the options for the parsing operation.promoteLongs = typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true; operation.promoteValues = typeof options.promoteValues === 'boolean' ? options.promoteValues : true; operation.promoteBuffers = typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false; operation.raw = typeof options.raw === 'boolean' ? options.raw : false; operation.immediateRelease = typeof options.immediateRelease === 'boolean' ? options.immediateRelease : false; operation.documentsReturnedIn = options.documentsReturnedIn; operation.command = typeof options.command === 'boolean' ? options.command : false; operation.fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false; operation.noResponse = typeof options.noResponse === 'boolean' ? options.noResponse : false; operation.session = options.session || null; // Optional per operation socketTimeout operation.socketTimeout = options.socketTimeout; operation.monitoring = options.monitoring; // Custom socket Timeout if (options.socketTimeout) { operation.socketTimeout = options.socketTimeout; } // Ensure commands is an array if (!Array.isArray(commands)) { commands = [commands]; } // Get the requestId operation.requestId = commands[commands.length - 1].requestId; if (hasSessionSupport(this.topology)) { let sessionOptions = {}; if (this.topology.clusterTime) { sessionOptions = { $clusterTime: this.topology.clusterTime }; } if (operation.session) { // TODO: reenable when sessions development is complete // if (operation.session.topology !== this.topology) { // return cb( // new MongoError('Sessions may only be used with the client they were created from') // ); // } if (operation.session.hasEnded) { return cb(new MongoError('Use of expired sessions is not permitted')); } if ( operation.session.clusterTime && operation.session.clusterTime.clusterTime.greaterThan( sessionOptions.$clusterTime.clusterTime ) ) { sessionOptions.$clusterTime = operation.session.clusterTime; } sessionOptions.lsid = operation.session.id; // update the `lastUse` of the acquired ServerSession operation.session.serverSession.lastUse = Date.now(); } // decorate the commands with session-specific details commands.forEach(command => { if (command instanceof Query) { if (command.query.$query) { Object.assign(command.query.$query, sessionOptions); } else { Object.assign(command.query, sessionOptions); } } else { Object.assign(command, sessionOptions); } }); } // If command monitoring is enabled we need to modify the callback here if (self.options.monitorCommands) { // NOTE: there is only ever a single command, for some legacy reason I am unaware of we // treat this as a potential array of commands const command = commands[0]; this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); operation.started = process.hrtime(); operation.cb = (err, reply) => { if (err) { self.emit( 'commandFailed', new apm.CommandFailedEvent(this, command, err, operation.started) ); } else { if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) { self.emit( 'commandFailed', new apm.CommandFailedEvent(this, command, reply.result, operation.started) ); } else { self.emit( 'commandSucceeded', new apm.CommandSucceededEvent(this, command, reply, operation.started) ); } } if (typeof cb === 'function') cb(err, reply); }; } // Prepare the operation buffer serializeCommands(self, commands, [], function(err, serializedCommands) { if (err) throw err; // Set the operation's buffer to the serialization of the commands operation.buffer = serializedCommands; // If we have a monitoring operation schedule as the very first operation // Otherwise add to back of queue if (options.monitoring) { self.queue.unshift(operation); } else { self.queue.push(operation); } // Attempt to execute the operation if (!self.executing) { process.nextTick(function() { _execute(self)(); }); } }); }; // Return whether a command contains an uncompressible command term // Will return true if command contains no uncompressible command terms var hasUncompressibleCommands = function(command) { return uncompressibleCommands.some(function(cmd) { return command.query.hasOwnProperty(cmd); }); }; // Remove connection method function remove(connection, connections) { for (var i = 0; i < connections.length; i++) { if (connections[i] === connection) { connections.splice(i, 1); return true; } } } function removeConnection(self, connection) { if (remove(connection, self.availableConnections)) return; if (remove(connection, self.inUseConnections)) return; if (remove(connection, self.connectingConnections)) return; if (remove(connection, self.nonAuthenticatedConnections)) return; } // All event handlers var handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect']; function _createConnection(self) { if (self.state === DESTROYED || self.state === DESTROYING) { return; } var connection = new Connection(messageHandler(self), self.options); // Push the connection self.connectingConnections.push(connection); // Handle any errors var tempErrorHandler = function(_connection) { return function() { // Destroy the connection _connection.destroy(); // Remove the connection from the connectingConnections list removeConnection(self, _connection); // Start reconnection attempts if (!self.reconnectId && self.options.reconnect) { self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval); } }; }; // Handle successful connection var tempConnectHandler = function(_connection) { return function() { // Destroyed state return if (self.state === DESTROYED || self.state === DESTROYING) { // Remove the connection from the list removeConnection(self, _connection); return _connection.destroy(); } // Destroy all event emitters handlers.forEach(function(e) { _connection.removeAllListeners(e); }); // Add the final handlers _connection.once('close', connectionFailureHandler(self, 'close')); _connection.once('error', connectionFailureHandler(self, 'error')); _connection.once('timeout', connectionFailureHandler(self, 'timeout')); _connection.once('parseError', connectionFailureHandler(self, 'parseError')); // Signal reauthenticate(self, _connection, function(err) { if (self.state === DESTROYED || self.state === DESTROYING) { return _connection.destroy(); } // Remove the connection from the connectingConnections list removeConnection(self, _connection); // Handle error if (err) { return _connection.destroy(); } // If we are c at the moment // Do not automatially put in available connections // As we need to apply the credentials first if (self.authenticating) { self.nonAuthenticatedConnections.push(_connection); } else { // Push to available self.availableConnections.push(_connection); // Execute any work waiting _execute(self)(); } }); }; }; // Add all handlers connection.once('close', tempErrorHandler(connection)); connection.once('error', tempErrorHandler(connection)); connection.once('timeout', tempErrorHandler(connection)); connection.once('parseError', tempErrorHandler(connection)); connection.once('connect', tempConnectHandler(connection)); // Start connection connection.connect(); } function flushMonitoringOperations(queue) { for (var i = 0; i < queue.length; i++) { if (queue[i].monitoring) { var workItem = queue[i]; queue.splice(i, 1); workItem.cb( new MongoError({ message: 'no connection available for monitoring', driver: true }) ); } } } function _execute(self) { return function() { if (self.state === DESTROYED) return; // Already executing, skip if (self.executing) return; // Set pool as executing self.executing = true; // Wait for auth to clear before continuing function waitForAuth(cb) { if (!self.authenticating) return cb(); // Wait for a milisecond and try again setTimeout(function() { waitForAuth(cb); }, 1); } // Block on any auth in process waitForAuth(function() { // New pool connections are in progress, wait them to finish // before executing any more operation to ensure distribution of // operations if (self.connectingConnections.length > 0) { return; } // As long as we have available connections // eslint-disable-next-line while (true) { // Total availble connections var totalConnections = self.availableConnections.length + self.connectingConnections.length + self.inUseConnections.length; // No available connections available, flush any monitoring ops if (self.availableConnections.length === 0) { // Flush any monitoring operations flushMonitoringOperations(self.queue); break; } // No queue break if (self.queue.length === 0) { break; } // Get a connection var connection = null; // Locate all connections that have no work var connections = []; // Get a list of all connections for (var i = 0; i < self.availableConnections.length; i++) { if (self.availableConnections[i].workItems.length === 0) { connections.push(self.availableConnections[i]); } } // No connection found that has no work on it, just pick one for pipelining if (connections.length === 0) { connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length]; } else { connection = connections[self.connectionIndex++ % connections.length]; } // Is the connection connected if (connection.isConnected()) { // Get the next work item var workItem = self.queue.shift(); // If we are monitoring we need to use a connection that is not // running another operation to avoid socket timeout changes // affecting an existing operation if (workItem.monitoring) { var foundValidConnection = false; for (i = 0; i < self.availableConnections.length; i++) { // If the connection is connected // And there are no pending workItems on it // Then we can safely use it for monitoring. if ( self.availableConnections[i].isConnected() && self.availableConnections[i].workItems.length === 0 ) { foundValidConnection = true; connection = self.availableConnections[i]; break; } } // No safe connection found, attempt to grow the connections // if possible and break from the loop if (!foundValidConnection) { // Put workItem back on the queue self.queue.unshift(workItem); // Attempt to grow the pool if it's not yet maxsize if (totalConnections < self.options.size && self.queue.length > 0) { // Create a new connection _createConnection(self); } // Re-execute the operation setTimeout(function() { _execute(self)(); }, 10); break; } } // Don't execute operation until we have a full pool if (totalConnections < self.options.size) { // Connection has work items, then put it back on the queue // and create a new connection if (connection.workItems.length > 0) { // Lets put the workItem back on the list self.queue.unshift(workItem); // Create a new connection _createConnection(self); // Break from the loop break; } } // Get actual binary commands var buffer = workItem.buffer; // Set current status of authentication process workItem.authenticating = self.authenticating; workItem.authenticatingTimestamp = self.authenticatingTimestamp; // If we are monitoring take the connection of the availableConnections if (workItem.monitoring) { moveConnectionBetween(connection, self.availableConnections, self.inUseConnections); } // Track the executing commands on the mongo server // as long as there is an expected response if (!workItem.noResponse) { connection.workItems.push(workItem); } // We have a custom socketTimeout if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') { connection.setSocketTimeout(workItem.socketTimeout); } // Capture if write was successful var writeSuccessful = true; // Put operation on the wire if (Array.isArray(buffer)) { for (i = 0; i < buffer.length; i++) { writeSuccessful = connection.write(buffer[i]); } } else { writeSuccessful = connection.write(buffer); } // if the command is designated noResponse, call the callback immeditely if (workItem.noResponse && typeof workItem.cb === 'function') { workItem.cb(null, null); } if (writeSuccessful && workItem.immediateRelease && self.authenticating) { removeConnection(self, connection); self.nonAuthenticatedConnections.push(connection); } else if (writeSuccessful === false) { // If write not successful put back on queue self.queue.unshift(workItem); // Remove the disconnected connection removeConnection(self, connection); // Flush any monitoring operations in the queue, failing fast flushMonitoringOperations(self.queue); } } else { // Remove the disconnected connection removeConnection(self, connection); // Flush any monitoring operations in the queue, failing fast flushMonitoringOperations(self.queue); } } }); self.executing = false; }; } // Make execution loop available for testing Pool._execute = _execute; /** * A server connect event, used to verify that the connection is up and running * * @event Pool#connect * @type {Pool} */ /** * A server reconnect event, used to verify that pool reconnected. * * @event Pool#reconnect * @type {Pool} */ /** * The server connection closed, all pool connections closed * * @event Pool#close * @type {Pool} */ /** * The server connection caused an error, all pool connections closed * * @event Pool#error * @type {Pool} */ /** * The server connection timed out, all pool connections closed * * @event Pool#timeout * @type {Pool} */ /** * The driver experienced an invalid message, all pool connections closed * * @event Pool#parseError * @type {Pool} */ /** * The driver attempted to reconnect * * @event Pool#attemptReconnect * @type {Pool} */ /** * The driver exhausted all reconnect attempts * * @event Pool#reconnectFailed * @type {Pool} */ module.exports = Pool;