Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pool.js 37KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261
  1. 'use strict';
  2. const inherits = require('util').inherits;
  3. const EventEmitter = require('events').EventEmitter;
  4. const MongoError = require('../error').MongoError;
  5. const MongoNetworkError = require('../error').MongoNetworkError;
  6. const MongoWriteConcernError = require('../error').MongoWriteConcernError;
  7. const Logger = require('./logger');
  8. const f = require('util').format;
  9. const Msg = require('./msg').Msg;
  10. const CommandResult = require('./command_result');
  11. const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
  12. const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
  13. const opcodes = require('../wireprotocol/shared').opcodes;
  14. const compress = require('../wireprotocol/compression').compress;
  15. const compressorIDs = require('../wireprotocol/compression').compressorIDs;
  16. const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
  17. const apm = require('./apm');
  18. const Buffer = require('safe-buffer').Buffer;
  19. const connect = require('./connect');
  20. const updateSessionFromResponse = require('../sessions').updateSessionFromResponse;
  21. var DISCONNECTED = 'disconnected';
  22. var CONNECTING = 'connecting';
  23. var CONNECTED = 'connected';
  24. var DESTROYING = 'destroying';
  25. var DESTROYED = 'destroyed';
  26. var _id = 0;
  27. /**
  28. * Creates a new Pool instance
  29. * @class
  30. * @param {string} options.host The server host
  31. * @param {number} options.port The server port
  32. * @param {number} [options.size=5] Max server connection pool size
  33. * @param {number} [options.minSize=0] Minimum server connection pool size
  34. * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
  35. * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
  36. * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
  37. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  38. * @param {number} [options.keepAliveInitialDelay=300000] Initial delay before TCP keep alive enabled
  39. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  40. * @param {number} [options.connectionTimeout=30000] TCP Connection timeout setting
  41. * @param {number} [options.socketTimeout=360000] TCP Socket timeout setting
  42. * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
  43. * @param {boolean} [options.ssl=false] Use SSL for connection
  44. * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
  45. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  46. * @param {Buffer} [options.crl] SSL Certificate revocation store binary buffer
  47. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  48. * @param {Buffer} [options.key] SSL Key file binary buffer
  49. * @param {string} [options.passPhrase] SSL Certificate pass phrase
  50. * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
  51. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  52. * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
  53. * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
  54. * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
  55. * @fires Pool#connect
  56. * @fires Pool#close
  57. * @fires Pool#error
  58. * @fires Pool#timeout
  59. * @fires Pool#parseError
  60. * @return {Pool} A cursor instance
  61. */
  62. var Pool = function(topology, options) {
  63. // Add event listener
  64. EventEmitter.call(this);
  65. // Store topology for later use
  66. this.topology = topology;
  67. // Add the options
  68. this.options = Object.assign(
  69. {
  70. // Host and port settings
  71. host: 'localhost',
  72. port: 27017,
  73. // Pool default max size
  74. size: 5,
  75. // Pool default min size
  76. minSize: 0,
  77. // socket settings
  78. connectionTimeout: 30000,
  79. socketTimeout: 360000,
  80. keepAlive: true,
  81. keepAliveInitialDelay: 300000,
  82. noDelay: true,
  83. // SSL Settings
  84. ssl: false,
  85. checkServerIdentity: true,
  86. ca: null,
  87. crl: null,
  88. cert: null,
  89. key: null,
  90. passPhrase: null,
  91. rejectUnauthorized: false,
  92. promoteLongs: true,
  93. promoteValues: true,
  94. promoteBuffers: false,
  95. // Reconnection options
  96. reconnect: true,
  97. reconnectInterval: 1000,
  98. reconnectTries: 30,
  99. // Enable domains
  100. domainsEnabled: false
  101. },
  102. options
  103. );
  104. // Identification information
  105. this.id = _id++;
  106. // Current reconnect retries
  107. this.retriesLeft = this.options.reconnectTries;
  108. this.reconnectId = null;
  109. // No bson parser passed in
  110. if (
  111. !options.bson ||
  112. (options.bson &&
  113. (typeof options.bson.serialize !== 'function' ||
  114. typeof options.bson.deserialize !== 'function'))
  115. ) {
  116. throw new Error('must pass in valid bson parser');
  117. }
  118. // Logger instance
  119. this.logger = Logger('Pool', options);
  120. // Pool state
  121. this.state = DISCONNECTED;
  122. // Connections
  123. this.availableConnections = [];
  124. this.inUseConnections = [];
  125. this.connectingConnections = 0;
  126. // Currently executing
  127. this.executing = false;
  128. // Operation work queue
  129. this.queue = [];
  130. // Contains the reconnect connection
  131. this.reconnectConnection = null;
  132. // Number of consecutive timeouts caught
  133. this.numberOfConsecutiveTimeouts = 0;
  134. // Current pool Index
  135. this.connectionIndex = 0;
  136. // event handlers
  137. const pool = this;
  138. this._messageHandler = messageHandler(this);
  139. this._connectionCloseHandler = function(err) {
  140. const connection = this;
  141. connectionFailureHandler(pool, 'close', err, connection);
  142. };
  143. this._connectionErrorHandler = function(err) {
  144. const connection = this;
  145. connectionFailureHandler(pool, 'error', err, connection);
  146. };
  147. this._connectionTimeoutHandler = function(err) {
  148. const connection = this;
  149. connectionFailureHandler(pool, 'timeout', err, connection);
  150. };
  151. this._connectionParseErrorHandler = function(err) {
  152. const connection = this;
  153. connectionFailureHandler(pool, 'parseError', err, connection);
  154. };
  155. };
  156. inherits(Pool, EventEmitter);
  157. Object.defineProperty(Pool.prototype, 'size', {
  158. enumerable: true,
  159. get: function() {
  160. return this.options.size;
  161. }
  162. });
  163. Object.defineProperty(Pool.prototype, 'minSize', {
  164. enumerable: true,
  165. get: function() {
  166. return this.options.minSize;
  167. }
  168. });
  169. Object.defineProperty(Pool.prototype, 'connectionTimeout', {
  170. enumerable: true,
  171. get: function() {
  172. return this.options.connectionTimeout;
  173. }
  174. });
  175. Object.defineProperty(Pool.prototype, 'socketTimeout', {
  176. enumerable: true,
  177. get: function() {
  178. return this.options.socketTimeout;
  179. }
  180. });
  181. function stateTransition(self, newState) {
  182. var legalTransitions = {
  183. disconnected: [CONNECTING, DESTROYING, DISCONNECTED],
  184. connecting: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
  185. connected: [CONNECTED, DISCONNECTED, DESTROYING],
  186. destroying: [DESTROYING, DESTROYED],
  187. destroyed: [DESTROYED]
  188. };
  189. // Get current state
  190. var legalStates = legalTransitions[self.state];
  191. if (legalStates && legalStates.indexOf(newState) !== -1) {
  192. self.emit('stateChanged', self.state, newState);
  193. self.state = newState;
  194. } else {
  195. self.logger.error(
  196. f(
  197. 'Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]',
  198. self.id,
  199. self.state,
  200. newState,
  201. legalStates
  202. )
  203. );
  204. }
  205. }
  206. function connectionFailureHandler(pool, event, err, conn) {
  207. if (conn) {
  208. if (conn._connectionFailHandled) return;
  209. conn._connectionFailHandled = true;
  210. conn.destroy();
  211. // Remove the connection
  212. removeConnection(pool, conn);
  213. // Flush all work Items on this connection
  214. while (conn.workItems.length > 0) {
  215. const workItem = conn.workItems.shift();
  216. if (workItem.cb) workItem.cb(err);
  217. }
  218. }
  219. // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
  220. if (event === 'timeout') {
  221. pool.numberOfConsecutiveTimeouts = pool.numberOfConsecutiveTimeouts + 1;
  222. // Have we timed out more than reconnectTries in a row ?
  223. // Force close the pool as we are trying to connect to tcp sink hole
  224. if (pool.numberOfConsecutiveTimeouts > pool.options.reconnectTries) {
  225. pool.numberOfConsecutiveTimeouts = 0;
  226. // Destroy all connections and pool
  227. pool.destroy(true);
  228. // Emit close event
  229. return pool.emit('close', pool);
  230. }
  231. }
  232. // No more socket available propegate the event
  233. if (pool.socketCount() === 0) {
  234. if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
  235. stateTransition(pool, DISCONNECTED);
  236. }
  237. // Do not emit error events, they are always close events
  238. // do not trigger the low level error handler in node
  239. event = event === 'error' ? 'close' : event;
  240. pool.emit(event, err);
  241. }
  242. // Start reconnection attempts
  243. if (!pool.reconnectId && pool.options.reconnect) {
  244. pool.reconnectId = setTimeout(attemptReconnect(pool), pool.options.reconnectInterval);
  245. }
  246. // Do we need to do anything to maintain the minimum pool size
  247. const totalConnections = totalConnectionCount(pool);
  248. if (totalConnections < pool.minSize) {
  249. _createConnection(pool);
  250. }
  251. }
  252. function attemptReconnect(self) {
  253. return function() {
  254. self.emit('attemptReconnect', self);
  255. if (self.state === DESTROYED || self.state === DESTROYING) return;
  256. // We are connected do not try again
  257. if (self.isConnected()) {
  258. self.reconnectId = null;
  259. return;
  260. }
  261. self.connectingConnections++;
  262. connect(self.options, (err, connection) => {
  263. self.connectingConnections--;
  264. if (err) {
  265. if (self.logger.isDebug()) {
  266. self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
  267. }
  268. self.retriesLeft = self.retriesLeft - 1;
  269. if (self.retriesLeft <= 0) {
  270. self.destroy();
  271. self.emit(
  272. 'reconnectFailed',
  273. new MongoNetworkError(
  274. f(
  275. 'failed to reconnect after %s attempts with interval %s ms',
  276. self.options.reconnectTries,
  277. self.options.reconnectInterval
  278. )
  279. )
  280. );
  281. } else {
  282. self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
  283. }
  284. return;
  285. }
  286. if (self.state === DESTROYED || self.state === DESTROYING) {
  287. return connection.destroy();
  288. }
  289. self.reconnectId = null;
  290. handlers.forEach(event => connection.removeAllListeners(event));
  291. connection.on('error', self._connectionErrorHandler);
  292. connection.on('close', self._connectionCloseHandler);
  293. connection.on('timeout', self._connectionTimeoutHandler);
  294. connection.on('parseError', self._connectionParseErrorHandler);
  295. connection.on('message', self._messageHandler);
  296. self.retriesLeft = self.options.reconnectTries;
  297. self.availableConnections.push(connection);
  298. self.reconnectConnection = null;
  299. self.emit('reconnect', self);
  300. _execute(self)();
  301. });
  302. };
  303. }
  304. function moveConnectionBetween(connection, from, to) {
  305. var index = from.indexOf(connection);
  306. // Move the connection from connecting to available
  307. if (index !== -1) {
  308. from.splice(index, 1);
  309. to.push(connection);
  310. }
  311. }
  312. function messageHandler(self) {
  313. return function(message, connection) {
  314. // workItem to execute
  315. var workItem = null;
  316. // Locate the workItem
  317. for (var i = 0; i < connection.workItems.length; i++) {
  318. if (connection.workItems[i].requestId === message.responseTo) {
  319. // Get the callback
  320. workItem = connection.workItems[i];
  321. // Remove from list of workItems
  322. connection.workItems.splice(i, 1);
  323. }
  324. }
  325. if (workItem && workItem.monitoring) {
  326. moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
  327. }
  328. // Reset timeout counter
  329. self.numberOfConsecutiveTimeouts = 0;
  330. // Reset the connection timeout if we modified it for
  331. // this operation
  332. if (workItem && workItem.socketTimeout) {
  333. connection.resetSocketTimeout();
  334. }
  335. // Log if debug enabled
  336. if (self.logger.isDebug()) {
  337. self.logger.debug(
  338. f(
  339. 'message [%s] received from %s:%s',
  340. message.raw.toString('hex'),
  341. self.options.host,
  342. self.options.port
  343. )
  344. );
  345. }
  346. function handleOperationCallback(self, cb, err, result) {
  347. // No domain enabled
  348. if (!self.options.domainsEnabled) {
  349. return process.nextTick(function() {
  350. return cb(err, result);
  351. });
  352. }
  353. // Domain enabled just call the callback
  354. cb(err, result);
  355. }
  356. // Keep executing, ensure current message handler does not stop execution
  357. if (!self.executing) {
  358. process.nextTick(function() {
  359. _execute(self)();
  360. });
  361. }
  362. // Time to dispatch the message if we have a callback
  363. if (workItem && !workItem.immediateRelease) {
  364. try {
  365. // Parse the message according to the provided options
  366. message.parse(workItem);
  367. } catch (err) {
  368. return handleOperationCallback(self, workItem.cb, new MongoError(err));
  369. }
  370. if (message.documents[0]) {
  371. const document = message.documents[0];
  372. const session = workItem.session;
  373. if (session) {
  374. updateSessionFromResponse(session, document);
  375. }
  376. if (document.$clusterTime) {
  377. self.topology.clusterTime = document.$clusterTime;
  378. }
  379. }
  380. // Establish if we have an error
  381. if (workItem.command && message.documents[0]) {
  382. const responseDoc = message.documents[0];
  383. if (responseDoc.writeConcernError) {
  384. const err = new MongoWriteConcernError(responseDoc.writeConcernError, responseDoc);
  385. return handleOperationCallback(self, workItem.cb, err);
  386. }
  387. if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
  388. return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
  389. }
  390. }
  391. // Add the connection details
  392. message.hashedName = connection.hashedName;
  393. // Return the documents
  394. handleOperationCallback(
  395. self,
  396. workItem.cb,
  397. null,
  398. new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message)
  399. );
  400. }
  401. };
  402. }
  403. /**
  404. * Return the total socket count in the pool.
  405. * @method
  406. * @return {Number} The number of socket available.
  407. */
  408. Pool.prototype.socketCount = function() {
  409. return this.availableConnections.length + this.inUseConnections.length;
  410. // + this.connectingConnections.length;
  411. };
  412. function totalConnectionCount(pool) {
  413. return (
  414. pool.availableConnections.length + pool.inUseConnections.length + pool.connectingConnections
  415. );
  416. }
  417. /**
  418. * Return all pool connections
  419. * @method
  420. * @return {Connection[]} The pool connections
  421. */
  422. Pool.prototype.allConnections = function() {
  423. return this.availableConnections.concat(this.inUseConnections);
  424. };
  425. /**
  426. * Get a pool connection (round-robin)
  427. * @method
  428. * @return {Connection}
  429. */
  430. Pool.prototype.get = function() {
  431. return this.allConnections()[0];
  432. };
  433. /**
  434. * Is the pool connected
  435. * @method
  436. * @return {boolean}
  437. */
  438. Pool.prototype.isConnected = function() {
  439. // We are in a destroyed state
  440. if (this.state === DESTROYED || this.state === DESTROYING) {
  441. return false;
  442. }
  443. // Get connections
  444. var connections = this.availableConnections.concat(this.inUseConnections);
  445. // Check if we have any connected connections
  446. for (var i = 0; i < connections.length; i++) {
  447. if (connections[i].isConnected()) return true;
  448. }
  449. // Not connected
  450. return false;
  451. };
  452. /**
  453. * Was the pool destroyed
  454. * @method
  455. * @return {boolean}
  456. */
  457. Pool.prototype.isDestroyed = function() {
  458. return this.state === DESTROYED || this.state === DESTROYING;
  459. };
  460. /**
  461. * Is the pool in a disconnected state
  462. * @method
  463. * @return {boolean}
  464. */
  465. Pool.prototype.isDisconnected = function() {
  466. return this.state === DISCONNECTED;
  467. };
  468. /**
  469. * Connect pool
  470. */
  471. Pool.prototype.connect = function() {
  472. if (this.state !== DISCONNECTED) {
  473. throw new MongoError('connection in unlawful state ' + this.state);
  474. }
  475. const self = this;
  476. stateTransition(this, CONNECTING);
  477. self.connectingConnections++;
  478. connect(self.options, (err, connection) => {
  479. self.connectingConnections--;
  480. if (err) {
  481. if (self.logger.isDebug()) {
  482. self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
  483. }
  484. if (self.state === CONNECTING) {
  485. self.emit('error', err);
  486. }
  487. return;
  488. }
  489. if (self.state === DESTROYED || self.state === DESTROYING) {
  490. connection.destroy();
  491. return self.destroy();
  492. }
  493. // attach event handlers
  494. connection.on('error', self._connectionErrorHandler);
  495. connection.on('close', self._connectionCloseHandler);
  496. connection.on('timeout', self._connectionTimeoutHandler);
  497. connection.on('parseError', self._connectionParseErrorHandler);
  498. connection.on('message', self._messageHandler);
  499. // If we are in a topology, delegate the auth to it
  500. // This is to avoid issues where we would auth against an
  501. // arbiter
  502. if (self.options.inTopology) {
  503. stateTransition(self, CONNECTED);
  504. self.availableConnections.push(connection);
  505. return self.emit('connect', self, connection);
  506. }
  507. if (self.state === DESTROYED || self.state === DESTROYING) {
  508. return self.destroy();
  509. }
  510. if (err) {
  511. self.destroy();
  512. return self.emit('error', err);
  513. }
  514. stateTransition(self, CONNECTED);
  515. self.availableConnections.push(connection);
  516. if (self.minSize) {
  517. for (let i = 0; i < self.minSize; i++) {
  518. _createConnection(self);
  519. }
  520. }
  521. self.emit('connect', self, connection);
  522. });
  523. };
  524. /**
  525. * Authenticate using a specified mechanism
  526. * @param {authResultCallback} callback A callback function
  527. */
  528. Pool.prototype.auth = function(credentials, callback) {
  529. if (typeof callback === 'function') callback(null, null);
  530. };
  531. /**
  532. * Logout all users against a database
  533. * @param {authResultCallback} callback A callback function
  534. */
  535. Pool.prototype.logout = function(dbName, callback) {
  536. if (typeof callback === 'function') callback(null, null);
  537. };
  538. /**
  539. * Unref the pool
  540. * @method
  541. */
  542. Pool.prototype.unref = function() {
  543. // Get all the known connections
  544. var connections = this.availableConnections.concat(this.inUseConnections);
  545. connections.forEach(function(c) {
  546. c.unref();
  547. });
  548. };
  549. // Events
  550. var events = ['error', 'close', 'timeout', 'parseError', 'connect', 'message'];
  551. // Destroy the connections
  552. function destroy(self, connections, options, callback) {
  553. let connectionCount = connections.length;
  554. function connectionDestroyed() {
  555. connectionCount--;
  556. if (connectionCount > 0) {
  557. return;
  558. }
  559. // Zero out all connections
  560. self.inUseConnections = [];
  561. self.availableConnections = [];
  562. self.connectingConnections = 0;
  563. // Set state to destroyed
  564. stateTransition(self, DESTROYED);
  565. if (typeof callback === 'function') {
  566. callback(null, null);
  567. }
  568. }
  569. if (connectionCount === 0) {
  570. connectionDestroyed();
  571. return;
  572. }
  573. // Destroy all connections
  574. connections.forEach(conn => {
  575. for (var i = 0; i < events.length; i++) {
  576. conn.removeAllListeners(events[i]);
  577. }
  578. conn.destroy(options, connectionDestroyed);
  579. });
  580. }
  581. /**
  582. * Destroy pool
  583. * @method
  584. */
  585. Pool.prototype.destroy = function(force, callback) {
  586. var self = this;
  587. // Do not try again if the pool is already dead
  588. if (this.state === DESTROYED || self.state === DESTROYING) {
  589. if (typeof callback === 'function') callback(null, null);
  590. return;
  591. }
  592. // Set state to destroyed
  593. stateTransition(this, DESTROYING);
  594. // Are we force closing
  595. if (force) {
  596. // Get all the known connections
  597. var connections = self.availableConnections.concat(self.inUseConnections);
  598. // Flush any remaining work items with
  599. // an error
  600. while (self.queue.length > 0) {
  601. var workItem = self.queue.shift();
  602. if (typeof workItem.cb === 'function') {
  603. workItem.cb(new MongoError('Pool was force destroyed'));
  604. }
  605. }
  606. // Destroy the topology
  607. return destroy(self, connections, { force: true }, callback);
  608. }
  609. // Clear out the reconnect if set
  610. if (this.reconnectId) {
  611. clearTimeout(this.reconnectId);
  612. }
  613. // If we have a reconnect connection running, close
  614. // immediately
  615. if (this.reconnectConnection) {
  616. this.reconnectConnection.destroy();
  617. }
  618. // Wait for the operations to drain before we close the pool
  619. function checkStatus() {
  620. flushMonitoringOperations(self.queue);
  621. if (self.queue.length === 0) {
  622. // Get all the known connections
  623. var connections = self.availableConnections.concat(self.inUseConnections);
  624. // Check if we have any in flight operations
  625. for (var i = 0; i < connections.length; i++) {
  626. // There is an operation still in flight, reschedule a
  627. // check waiting for it to drain
  628. if (connections[i].workItems.length > 0) {
  629. return setTimeout(checkStatus, 1);
  630. }
  631. }
  632. destroy(self, connections, { force: false }, callback);
  633. // } else if (self.queue.length > 0 && !this.reconnectId) {
  634. } else {
  635. // Ensure we empty the queue
  636. _execute(self)();
  637. // Set timeout
  638. setTimeout(checkStatus, 1);
  639. }
  640. }
  641. // Initiate drain of operations
  642. checkStatus();
  643. };
  644. /**
  645. * Reset all connections of this pool
  646. *
  647. * @param {function} [callback]
  648. */
  649. Pool.prototype.reset = function(callback) {
  650. // this.destroy(true, err => {
  651. // if (err && typeof callback === 'function') {
  652. // callback(err, null);
  653. // return;
  654. // }
  655. // stateTransition(this, DISCONNECTED);
  656. // this.connect();
  657. // if (typeof callback === 'function') callback(null, null);
  658. // });
  659. if (typeof callback === 'function') callback();
  660. };
  661. // Prepare the buffer that Pool.prototype.write() uses to send to the server
  662. function serializeCommand(self, command, callback) {
  663. const originalCommandBuffer = command.toBin();
  664. // Check whether we and the server have agreed to use a compressor
  665. const shouldCompress = !!self.options.agreedCompressor;
  666. if (!shouldCompress || !canCompress(command)) {
  667. return callback(null, originalCommandBuffer);
  668. }
  669. // Transform originalCommandBuffer into OP_COMPRESSED
  670. const concatenatedOriginalCommandBuffer = Buffer.concat(originalCommandBuffer);
  671. const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
  672. // Extract information needed for OP_COMPRESSED from the uncompressed message
  673. const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
  674. // Compress the message body
  675. compress(self, messageToBeCompressed, function(err, compressedMessage) {
  676. if (err) return callback(err, null);
  677. // Create the msgHeader of OP_COMPRESSED
  678. const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
  679. msgHeader.writeInt32LE(
  680. MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
  681. 0
  682. ); // messageLength
  683. msgHeader.writeInt32LE(command.requestId, 4); // requestID
  684. msgHeader.writeInt32LE(0, 8); // responseTo (zero)
  685. msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
  686. // Create the compression details of OP_COMPRESSED
  687. const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
  688. compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
  689. compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
  690. compressionDetails.writeUInt8(compressorIDs[self.options.agreedCompressor], 8); // compressorID
  691. return callback(null, [msgHeader, compressionDetails, compressedMessage]);
  692. });
  693. }
  694. /**
  695. * Write a message to MongoDB
  696. * @method
  697. * @return {Connection}
  698. */
  699. Pool.prototype.write = function(command, options, cb) {
  700. var self = this;
  701. // Ensure we have a callback
  702. if (typeof options === 'function') {
  703. cb = options;
  704. }
  705. // Always have options
  706. options = options || {};
  707. // We need to have a callback function unless the message returns no response
  708. if (!(typeof cb === 'function') && !options.noResponse) {
  709. throw new MongoError('write method must provide a callback');
  710. }
  711. // Pool was destroyed error out
  712. if (this.state === DESTROYED || this.state === DESTROYING) {
  713. // Callback with an error
  714. if (cb) {
  715. try {
  716. cb(new MongoError('pool destroyed'));
  717. } catch (err) {
  718. process.nextTick(function() {
  719. throw err;
  720. });
  721. }
  722. }
  723. return;
  724. }
  725. if (this.options.domainsEnabled && process.domain && typeof cb === 'function') {
  726. // if we have a domain bind to it
  727. var oldCb = cb;
  728. cb = process.domain.bind(function() {
  729. // v8 - argumentsToArray one-liner
  730. var args = new Array(arguments.length);
  731. for (var i = 0; i < arguments.length; i++) {
  732. args[i] = arguments[i];
  733. }
  734. // bounce off event loop so domain switch takes place
  735. process.nextTick(function() {
  736. oldCb.apply(null, args);
  737. });
  738. });
  739. }
  740. // Do we have an operation
  741. var operation = {
  742. cb: cb,
  743. raw: false,
  744. promoteLongs: true,
  745. promoteValues: true,
  746. promoteBuffers: false,
  747. fullResult: false
  748. };
  749. // Set the options for the parsing
  750. operation.promoteLongs = typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true;
  751. operation.promoteValues =
  752. typeof options.promoteValues === 'boolean' ? options.promoteValues : true;
  753. operation.promoteBuffers =
  754. typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false;
  755. operation.raw = typeof options.raw === 'boolean' ? options.raw : false;
  756. operation.immediateRelease =
  757. typeof options.immediateRelease === 'boolean' ? options.immediateRelease : false;
  758. operation.documentsReturnedIn = options.documentsReturnedIn;
  759. operation.command = typeof options.command === 'boolean' ? options.command : false;
  760. operation.fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false;
  761. operation.noResponse = typeof options.noResponse === 'boolean' ? options.noResponse : false;
  762. operation.session = options.session || null;
  763. // Optional per operation socketTimeout
  764. operation.socketTimeout = options.socketTimeout;
  765. operation.monitoring = options.monitoring;
  766. // Custom socket Timeout
  767. if (options.socketTimeout) {
  768. operation.socketTimeout = options.socketTimeout;
  769. }
  770. // Get the requestId
  771. operation.requestId = command.requestId;
  772. // If command monitoring is enabled we need to modify the callback here
  773. if (self.options.monitorCommands) {
  774. this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
  775. operation.started = process.hrtime();
  776. operation.cb = (err, reply) => {
  777. if (err) {
  778. self.emit(
  779. 'commandFailed',
  780. new apm.CommandFailedEvent(this, command, err, operation.started)
  781. );
  782. } else {
  783. if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
  784. self.emit(
  785. 'commandFailed',
  786. new apm.CommandFailedEvent(this, command, reply.result, operation.started)
  787. );
  788. } else {
  789. self.emit(
  790. 'commandSucceeded',
  791. new apm.CommandSucceededEvent(this, command, reply, operation.started)
  792. );
  793. }
  794. }
  795. if (typeof cb === 'function') cb(err, reply);
  796. };
  797. }
  798. // Prepare the operation buffer
  799. serializeCommand(self, command, (err, serializedBuffers) => {
  800. if (err) throw err;
  801. // Set the operation's buffer to the serialization of the commands
  802. operation.buffer = serializedBuffers;
  803. // If we have a monitoring operation schedule as the very first operation
  804. // Otherwise add to back of queue
  805. if (options.monitoring) {
  806. self.queue.unshift(operation);
  807. } else {
  808. self.queue.push(operation);
  809. }
  810. // Attempt to execute the operation
  811. if (!self.executing) {
  812. process.nextTick(function() {
  813. _execute(self)();
  814. });
  815. }
  816. });
  817. };
  818. // Return whether a command contains an uncompressible command term
  819. // Will return true if command contains no uncompressible command terms
  820. function canCompress(command) {
  821. const commandDoc = command instanceof Msg ? command.command : command.query;
  822. const commandName = Object.keys(commandDoc)[0];
  823. return uncompressibleCommands.indexOf(commandName) === -1;
  824. }
  825. // Remove connection method
  826. function remove(connection, connections) {
  827. for (var i = 0; i < connections.length; i++) {
  828. if (connections[i] === connection) {
  829. connections.splice(i, 1);
  830. return true;
  831. }
  832. }
  833. }
  834. function removeConnection(self, connection) {
  835. if (remove(connection, self.availableConnections)) return;
  836. if (remove(connection, self.inUseConnections)) return;
  837. }
  838. const handlers = ['close', 'message', 'error', 'timeout', 'parseError', 'connect'];
  839. function _createConnection(self) {
  840. if (self.state === DESTROYED || self.state === DESTROYING) {
  841. return;
  842. }
  843. self.connectingConnections++;
  844. connect(self.options, (err, connection) => {
  845. self.connectingConnections--;
  846. if (err) {
  847. if (self.logger.isDebug()) {
  848. self.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
  849. }
  850. if (!self.reconnectId && self.options.reconnect) {
  851. self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
  852. }
  853. return;
  854. }
  855. if (self.state === DESTROYED || self.state === DESTROYING) {
  856. removeConnection(self, connection);
  857. return connection.destroy();
  858. }
  859. connection.on('error', self._connectionErrorHandler);
  860. connection.on('close', self._connectionCloseHandler);
  861. connection.on('timeout', self._connectionTimeoutHandler);
  862. connection.on('parseError', self._connectionParseErrorHandler);
  863. connection.on('message', self._messageHandler);
  864. if (self.state === DESTROYED || self.state === DESTROYING) {
  865. return connection.destroy();
  866. }
  867. // Remove the connection from the connectingConnections list
  868. removeConnection(self, connection);
  869. // Handle error
  870. if (err) {
  871. return connection.destroy();
  872. }
  873. // Push to available
  874. self.availableConnections.push(connection);
  875. // Execute any work waiting
  876. _execute(self)();
  877. });
  878. }
  879. function flushMonitoringOperations(queue) {
  880. for (var i = 0; i < queue.length; i++) {
  881. if (queue[i].monitoring) {
  882. var workItem = queue[i];
  883. queue.splice(i, 1);
  884. workItem.cb(
  885. new MongoError({ message: 'no connection available for monitoring', driver: true })
  886. );
  887. }
  888. }
  889. }
  890. function _execute(self) {
  891. return function() {
  892. if (self.state === DESTROYED) return;
  893. // Already executing, skip
  894. if (self.executing) return;
  895. // Set pool as executing
  896. self.executing = true;
  897. // New pool connections are in progress, wait them to finish
  898. // before executing any more operation to ensure distribution of
  899. // operations
  900. if (self.connectingConnections > 0) {
  901. self.executing = false;
  902. return;
  903. }
  904. // As long as we have available connections
  905. // eslint-disable-next-line
  906. while (true) {
  907. // Total availble connections
  908. const totalConnections = totalConnectionCount(self);
  909. // No available connections available, flush any monitoring ops
  910. if (self.availableConnections.length === 0) {
  911. // Flush any monitoring operations
  912. flushMonitoringOperations(self.queue);
  913. break;
  914. }
  915. // No queue break
  916. if (self.queue.length === 0) {
  917. break;
  918. }
  919. var connection = null;
  920. const connections = self.availableConnections.filter(conn => conn.workItems.length === 0);
  921. // No connection found that has no work on it, just pick one for pipelining
  922. if (connections.length === 0) {
  923. connection =
  924. self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
  925. } else {
  926. connection = connections[self.connectionIndex++ % connections.length];
  927. }
  928. // Is the connection connected
  929. if (!connection.isConnected()) {
  930. // Remove the disconnected connection
  931. removeConnection(self, connection);
  932. // Flush any monitoring operations in the queue, failing fast
  933. flushMonitoringOperations(self.queue);
  934. break;
  935. }
  936. // Get the next work item
  937. var workItem = self.queue.shift();
  938. // If we are monitoring we need to use a connection that is not
  939. // running another operation to avoid socket timeout changes
  940. // affecting an existing operation
  941. if (workItem.monitoring) {
  942. var foundValidConnection = false;
  943. for (let i = 0; i < self.availableConnections.length; i++) {
  944. // If the connection is connected
  945. // And there are no pending workItems on it
  946. // Then we can safely use it for monitoring.
  947. if (
  948. self.availableConnections[i].isConnected() &&
  949. self.availableConnections[i].workItems.length === 0
  950. ) {
  951. foundValidConnection = true;
  952. connection = self.availableConnections[i];
  953. break;
  954. }
  955. }
  956. // No safe connection found, attempt to grow the connections
  957. // if possible and break from the loop
  958. if (!foundValidConnection) {
  959. // Put workItem back on the queue
  960. self.queue.unshift(workItem);
  961. // Attempt to grow the pool if it's not yet maxsize
  962. if (totalConnections < self.options.size && self.queue.length > 0) {
  963. // Create a new connection
  964. _createConnection(self);
  965. }
  966. // Re-execute the operation
  967. setTimeout(function() {
  968. _execute(self)();
  969. }, 10);
  970. break;
  971. }
  972. }
  973. // Don't execute operation until we have a full pool
  974. if (totalConnections < self.options.size) {
  975. // Connection has work items, then put it back on the queue
  976. // and create a new connection
  977. if (connection.workItems.length > 0) {
  978. // Lets put the workItem back on the list
  979. self.queue.unshift(workItem);
  980. // Create a new connection
  981. _createConnection(self);
  982. // Break from the loop
  983. break;
  984. }
  985. }
  986. // Get actual binary commands
  987. var buffer = workItem.buffer;
  988. // If we are monitoring take the connection of the availableConnections
  989. if (workItem.monitoring) {
  990. moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
  991. }
  992. // Track the executing commands on the mongo server
  993. // as long as there is an expected response
  994. if (!workItem.noResponse) {
  995. connection.workItems.push(workItem);
  996. }
  997. // We have a custom socketTimeout
  998. if (!workItem.immediateRelease && typeof workItem.socketTimeout === 'number') {
  999. connection.setSocketTimeout(workItem.socketTimeout);
  1000. }
  1001. // Capture if write was successful
  1002. var writeSuccessful = true;
  1003. // Put operation on the wire
  1004. if (Array.isArray(buffer)) {
  1005. for (let i = 0; i < buffer.length; i++) {
  1006. writeSuccessful = connection.write(buffer[i]);
  1007. }
  1008. } else {
  1009. writeSuccessful = connection.write(buffer);
  1010. }
  1011. // if the command is designated noResponse, call the callback immeditely
  1012. if (workItem.noResponse && typeof workItem.cb === 'function') {
  1013. workItem.cb(null, null);
  1014. }
  1015. if (writeSuccessful === false) {
  1016. // If write not successful put back on queue
  1017. self.queue.unshift(workItem);
  1018. // Remove the disconnected connection
  1019. removeConnection(self, connection);
  1020. // Flush any monitoring operations in the queue, failing fast
  1021. flushMonitoringOperations(self.queue);
  1022. break;
  1023. }
  1024. }
  1025. self.executing = false;
  1026. };
  1027. }
  1028. // Make execution loop available for testing
  1029. Pool._execute = _execute;
  1030. /**
  1031. * A server connect event, used to verify that the connection is up and running
  1032. *
  1033. * @event Pool#connect
  1034. * @type {Pool}
  1035. */
  1036. /**
  1037. * A server reconnect event, used to verify that pool reconnected.
  1038. *
  1039. * @event Pool#reconnect
  1040. * @type {Pool}
  1041. */
  1042. /**
  1043. * The server connection closed, all pool connections closed
  1044. *
  1045. * @event Pool#close
  1046. * @type {Pool}
  1047. */
  1048. /**
  1049. * The server connection caused an error, all pool connections closed
  1050. *
  1051. * @event Pool#error
  1052. * @type {Pool}
  1053. */
  1054. /**
  1055. * The server connection timed out, all pool connections closed
  1056. *
  1057. * @event Pool#timeout
  1058. * @type {Pool}
  1059. */
  1060. /**
  1061. * The driver experienced an invalid message, all pool connections closed
  1062. *
  1063. * @event Pool#parseError
  1064. * @type {Pool}
  1065. */
  1066. /**
  1067. * The driver attempted to reconnect
  1068. *
  1069. * @event Pool#attemptReconnect
  1070. * @type {Pool}
  1071. */
  1072. /**
  1073. * The driver exhausted all reconnect attempts
  1074. *
  1075. * @event Pool#reconnectFailed
  1076. * @type {Pool}
  1077. */
  1078. module.exports = Pool;