Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

topology.js 33KB


  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const ServerDescription = require('./server_description').ServerDescription;
  4. const ServerType = require('./server_description').ServerType;
  5. const TopologyDescription = require('./topology_description').TopologyDescription;
  6. const TopologyType = require('./topology_description').TopologyType;
  7. const monitoring = require('./monitoring');
  8. const calculateDurationInMs = require('../utils').calculateDurationInMs;
  9. const MongoTimeoutError = require('../error').MongoTimeoutError;
  10. const Server = require('./server');
  11. const relayEvents = require('../utils').relayEvents;
  12. const ReadPreference = require('../topologies/read_preference');
  13. const readPreferenceServerSelector = require('./server_selectors').readPreferenceServerSelector;
  14. const writableServerSelector = require('./server_selectors').writableServerSelector;
  15. const isRetryableWritesSupported = require('../topologies/shared').isRetryableWritesSupported;
  16. const Cursor = require('../cursor');
  17. const deprecate = require('util').deprecate;
  18. const BSON = require('../connection/utils').retrieveBSON();
  19. const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
  20. const isRetryableError = require('../error').isRetryableError;
  21. const MongoParseError = require('../error').MongoParseError;
  22. const ClientSession = require('../sessions').ClientSession;
  23. const createClientInfo = require('../topologies/shared').createClientInfo;
  24. const MongoError = require('../error').MongoError;
  25. const resolveClusterTime = require('../topologies/shared').resolveClusterTime;
  26. // Global state
  27. let globalTopologyCounter = 0;
  28. // Constants
  29. const TOPOLOGY_DEFAULTS = {
  30. localThresholdMS: 15,
  31. serverSelectionTimeoutMS: 10000,
  32. heartbeatFrequencyMS: 30000,
  33. minHeartbeatFrequencyMS: 500
  34. };
  35. // events that we relay to the `Topology`
  36. const SERVER_RELAY_EVENTS = [
  37. 'serverHeartbeatStarted',
  38. 'serverHeartbeatSucceeded',
  39. 'serverHeartbeatFailed',
  40. 'commandStarted',
  41. 'commandSucceeded',
  42. 'commandFailed',
  43. // NOTE: Legacy events
  44. 'monitoring'
  45. ];
  46. // all events we listen to from `Server` instances
  47. const LOCAL_SERVER_EVENTS = SERVER_RELAY_EVENTS.concat([
  48. 'error',
  49. 'connect',
  50. 'descriptionReceived',
  51. 'close',
  52. 'ended'
  53. ]);
  54. /**
  55. * A container of server instances representing a connection to a MongoDB topology.
  56. *
  57. * @fires Topology#serverOpening
  58. * @fires Topology#serverClosed
  59. * @fires Topology#serverDescriptionChanged
  60. * @fires Topology#topologyOpening
  61. * @fires Topology#topologyClosed
  62. * @fires Topology#topologyDescriptionChanged
  63. * @fires Topology#serverHeartbeatStarted
  64. * @fires Topology#serverHeartbeatSucceeded
  65. * @fires Topology#serverHeartbeatFailed
  66. */
  67. class Topology extends EventEmitter {
  68. /**
  69. * Create a topology
  70. *
  71. * @param {Array|String} [seedlist] a string list, or array of Server instances to connect to
  72. * @param {Object} [options] Optional settings
  73. * @param {Number} [options.localThresholdMS=15] The size of the latency window for selecting among multiple suitable servers
  74. * @param {Number} [options.serverSelectionTimeoutMS=30000] How long to block for server selection before throwing an error
  75. * @param {Number} [options.heartbeatFrequencyMS=10000] The frequency with which topology updates are scheduled
  76. */
  77. constructor(seedlist, options) {
  78. super();
  79. if (typeof options === 'undefined' && typeof seedlist !== 'string') {
  80. options = seedlist;
  81. seedlist = [];
  82. // this is for legacy single server constructor support
  83. if (options.host) {
  84. seedlist.push({ host: options.host, port: options.port });
  85. }
  86. }
  87. seedlist = seedlist || [];
  88. if (typeof seedlist === 'string') {
  89. seedlist = parseStringSeedlist(seedlist);
  90. }
  91. options = Object.assign({}, TOPOLOGY_DEFAULTS, options);
  92. const topologyType = topologyTypeFromSeedlist(seedlist, options);
  93. const topologyId = globalTopologyCounter++;
  94. const serverDescriptions = seedlist.reduce((result, seed) => {
  95. if (seed.domain_socket) seed.host = seed.domain_socket;
  96. const address = seed.port ? `${seed.host}:${seed.port}` : `${seed.host}:27017`;
  97. result.set(address, new ServerDescription(address));
  98. return result;
  99. }, new Map());
  100. this.s = {
  101. // the id of this topology
  102. id: topologyId,
  103. // passed in options
  104. options,
  105. // initial seedlist of servers to connect to
  106. seedlist: seedlist,
  107. // the topology description
  108. description: new TopologyDescription(
  109. topologyType,
  110. serverDescriptions,
  111. options.replicaSet,
  112. null,
  113. null,
  114. null,
  115. options
  116. ),
  117. serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
  118. heartbeatFrequencyMS: options.heartbeatFrequencyMS,
  119. minHeartbeatIntervalMS: options.minHeartbeatIntervalMS,
  120. // allow users to override the cursor factory
  121. Cursor: options.cursorFactory || Cursor,
  122. // the bson parser
  123. bson: options.bson || new BSON(),
  124. // a map of server instances to normalized addresses
  125. servers: new Map(),
  126. // Server Session Pool
  127. sessionPool: null,
  128. // Active client sessions
  129. sessions: [],
  130. // Promise library
  131. promiseLibrary: options.promiseLibrary || Promise,
  132. credentials: options.credentials,
  133. clusterTime: null
  134. };
  135. // amend options for server instance creation
  136. this.s.options.compression = { compressors: createCompressionInfo(options) };
  137. // add client info
  138. this.s.clientInfo = createClientInfo(options);
  139. }
  140. /**
  141. * @return A `TopologyDescription` for this topology
  142. */
  143. get description() {
  144. return this.s.description;
  145. }
  146. get parserType() {
  147. return BSON.native ? 'c++' : 'js';
  148. }
  149. /**
  150. * All raw connections
  151. * @method
  152. * @return {Connection[]}
  153. */
  154. connections() {
  155. return Array.from(this.s.servers.values()).reduce((result, server) => {
  156. return result.concat(server.s.pool.allConnections());
  157. }, []);
  158. }
  159. /**
  160. * Initiate server connect
  161. *
  162. * @param {Object} [options] Optional settings
  163. * @param {Array} [options.auth=null] Array of auth options to apply on connect
  164. * @param {function} [callback] An optional callback called once on the first connected server
  165. */
  166. connect(options, callback) {
  167. if (typeof options === 'function') (callback = options), (options = {});
  168. options = options || {};
  169. // emit SDAM monitoring events
  170. this.emit('topologyOpening', new monitoring.TopologyOpeningEvent(this.s.id));
  171. // emit an event for the topology change
  172. this.emit(
  173. 'topologyDescriptionChanged',
  174. new monitoring.TopologyDescriptionChangedEvent(
  175. this.s.id,
  176. new TopologyDescription(TopologyType.Unknown), // initial is always Unknown
  177. this.s.description
  178. )
  179. );
  180. connectServers(this, Array.from(this.s.description.servers.values()));
  181. this.s.connected = true;
  182. // otherwise, wait for a server to properly connect based on user provided read preference,
  183. // or primary.
  184. translateReadPreference(options);
  185. const readPreference = options.readPreference || ReadPreference.primary;
  186. this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
  187. if (err) {
  188. if (typeof callback === 'function') {
  189. callback(err, null);
  190. } else {
  191. this.emit('error', err);
  192. }
  193. return;
  194. }
  195. const errorHandler = err => {
  196. server.removeListener('connect', connectHandler);
  197. if (typeof callback === 'function') callback(err, null);
  198. };
  199. const connectHandler = (_, err) => {
  200. server.removeListener('error', errorHandler);
  201. this.emit('open', err, this);
  202. this.emit('connect', this);
  203. if (typeof callback === 'function') callback(err, this);
  204. };
  205. const STATE_CONNECTING = 1;
  206. if (server.s.state === STATE_CONNECTING) {
  207. server.once('error', errorHandler);
  208. server.once('connect', connectHandler);
  209. return;
  210. }
  211. connectHandler();
  212. });
  213. }
  214. /**
  215. * Close this topology
  216. */
  217. close(options, callback) {
  218. if (typeof options === 'function') (callback = options), (options = {});
  219. options = options || {};
  220. if (this.s.sessionPool) {
  221. this.s.sessions.forEach(session => session.endSession());
  222. this.s.sessionPool.endAllPooledSessions();
  223. }
  224. const servers = this.s.servers;
  225. if (servers.size === 0) {
  226. this.s.connected = false;
  227. if (typeof callback === 'function') {
  228. callback(null, null);
  229. }
  230. return;
  231. }
  232. // destroy all child servers
  233. let destroyed = 0;
  234. servers.forEach(server =>
  235. destroyServer(server, this, () => {
  236. destroyed++;
  237. if (destroyed === servers.size) {
  238. // emit an event for close
  239. this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id));
  240. this.s.connected = false;
  241. if (typeof callback === 'function') {
  242. callback(null, null);
  243. }
  244. }
  245. })
  246. );
  247. }
  248. /**
  249. * Selects a server according to the selection predicate provided
  250. *
  251. * @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window
  252. * @param {object} [options] Optional settings related to server selection
  253. * @param {number} [options.serverSelectionTimeoutMS] How long to block for server selection before throwing an error
  254. * @param {function} callback The callback used to indicate success or failure
  255. * @return {Server} An instance of a `Server` meeting the criteria of the predicate provided
  256. */
  257. selectServer(selector, options, callback) {
  258. if (typeof options === 'function') {
  259. callback = options;
  260. if (typeof selector !== 'function') {
  261. options = selector;
  262. translateReadPreference(options);
  263. const readPreference = options.readPreference || ReadPreference.primary;
  264. selector = readPreferenceServerSelector(readPreference);
  265. } else {
  266. options = {};
  267. }
  268. }
  269. options = Object.assign(
  270. {},
  271. { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS },
  272. options
  273. );
  274. const isSharded = this.description.type === TopologyType.Sharded;
  275. const session = options.session;
  276. const transaction = session && session.transaction;
  277. if (isSharded && transaction && transaction.server) {
  278. callback(null, transaction.server);
  279. return;
  280. }
  281. selectServers(
  282. this,
  283. selector,
  284. options.serverSelectionTimeoutMS,
  285. process.hrtime(),
  286. (err, servers) => {
  287. if (err) return callback(err, null);
  288. const selectedServer = randomSelection(servers);
  289. if (isSharded && transaction && transaction.isActive) {
  290. transaction.pinServer(selectedServer);
  291. }
  292. callback(null, selectedServer);
  293. }
  294. );
  295. }
  296. // Sessions related methods
  297. /**
  298. * @return Whether sessions are supported on the current topology
  299. */
  300. hasSessionSupport() {
  301. return this.description.logicalSessionTimeoutMinutes != null;
  302. }
  303. /**
  304. * Start a logical session
  305. */
  306. startSession(options, clientOptions) {
  307. const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
  308. session.once('ended', () => {
  309. this.s.sessions = this.s.sessions.filter(s => !s.equals(session));
  310. });
  311. this.s.sessions.push(session);
  312. return session;
  313. }
  314. /**
  315. * Send endSessions command(s) with the given session ids
  316. *
  317. * @param {Array} sessions The sessions to end
  318. * @param {function} [callback]
  319. */
  320. endSessions(sessions, callback) {
  321. if (!Array.isArray(sessions)) {
  322. sessions = [sessions];
  323. }
  324. this.command(
  325. 'admin.$cmd',
  326. { endSessions: sessions },
  327. { readPreference: ReadPreference.primaryPreferred, noResponse: true },
  328. () => {
  329. // intentionally ignored, per spec
  330. if (typeof callback === 'function') callback();
  331. }
  332. );
  333. }
  334. /**
  335. * Update the internal TopologyDescription with a ServerDescription
  336. *
  337. * @param {object} serverDescription The server to update in the internal list of server descriptions
  338. */
  339. serverUpdateHandler(serverDescription) {
  340. if (!this.s.description.hasServer(serverDescription.address)) {
  341. return;
  342. }
  343. // these will be used for monitoring events later
  344. const previousTopologyDescription = this.s.description;
  345. const previousServerDescription = this.s.description.servers.get(serverDescription.address);
  346. // first update the TopologyDescription
  347. this.s.description = this.s.description.update(serverDescription);
  348. if (this.s.description.compatibilityError) {
  349. this.emit('error', new MongoError(this.s.description.compatibilityError));
  350. return;
  351. }
  352. // emit monitoring events for this change
  353. this.emit(
  354. 'serverDescriptionChanged',
  355. new monitoring.ServerDescriptionChangedEvent(
  356. this.s.id,
  357. serverDescription.address,
  358. previousServerDescription,
  359. this.s.description.servers.get(serverDescription.address)
  360. )
  361. );
  362. // update server list from updated descriptions
  363. updateServers(this, serverDescription);
  364. // Driver Sessions Spec: "Whenever a driver receives a cluster time from
  365. // a server it MUST compare it to the current highest seen cluster time
  366. // for the deployment. If the new cluster time is higher than the
  367. // highest seen cluster time it MUST become the new highest seen cluster
  368. // time. Two cluster times are compared using only the BsonTimestamp
  369. // value of the clusterTime embedded field."
  370. const clusterTime = serverDescription.$clusterTime;
  371. if (clusterTime) {
  372. resolveClusterTime(this, clusterTime);
  373. }
  374. this.emit(
  375. 'topologyDescriptionChanged',
  376. new monitoring.TopologyDescriptionChangedEvent(
  377. this.s.id,
  378. previousTopologyDescription,
  379. this.s.description
  380. )
  381. );
  382. }
  383. auth(credentials, callback) {
  384. if (typeof credentials === 'function') (callback = credentials), (credentials = null);
  385. if (typeof callback === 'function') callback(null, true);
  386. }
  387. logout(callback) {
  388. if (typeof callback === 'function') callback(null, true);
  389. }
  390. // Basic operation support. Eventually this should be moved into command construction
  391. // during the command refactor.
  392. /**
  393. * Insert one or more documents
  394. *
  395. * @param {String} ns The full qualified namespace for this operation
  396. * @param {Array} ops An array of documents to insert
  397. * @param {Boolean} [options.ordered=true] Execute in order or out of order
  398. * @param {Object} [options.writeConcern] Write concern for the operation
  399. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized
  400. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields
  401. * @param {ClientSession} [options.session] Session to use for the operation
  402. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  403. * @param {opResultCallback} callback A callback function
  404. */
  405. insert(ns, ops, options, callback) {
  406. executeWriteOperation({ topology: this, op: 'insert', ns, ops }, options, callback);
  407. }
  408. /**
  409. * Perform one or more update operations
  410. *
  411. * @param {string} ns The fully qualified namespace for this operation
  412. * @param {array} ops An array of updates
  413. * @param {boolean} [options.ordered=true] Execute in order or out of order
  414. * @param {object} [options.writeConcern] Write concern for the operation
  415. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized
  416. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields
  417. * @param {ClientSession} [options.session] Session to use for the operation
  418. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  419. * @param {opResultCallback} callback A callback function
  420. */
  421. update(ns, ops, options, callback) {
  422. executeWriteOperation({ topology: this, op: 'update', ns, ops }, options, callback);
  423. }
  424. /**
  425. * Perform one or more remove operations
  426. *
  427. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  428. * @param {array} ops An array of removes
  429. * @param {boolean} [options.ordered=true] Execute in order or out of order
  430. * @param {object} [options.writeConcern={}] Write concern for the operation
  431. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  432. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  433. * @param {ClientSession} [options.session=null] Session to use for the operation
  434. * @param {boolean} [options.retryWrites] Enable retryable writes for this operation
  435. * @param {opResultCallback} callback A callback function
  436. */
  437. remove(ns, ops, options, callback) {
  438. executeWriteOperation({ topology: this, op: 'remove', ns, ops }, options, callback);
  439. }
  440. /**
  441. * Execute a command
  442. *
  443. * @method
  444. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  445. * @param {object} cmd The command hash
  446. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  447. * @param {Connection} [options.connection] Specify connection object to execute command against
  448. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  449. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  450. * @param {ClientSession} [options.session=null] Session to use for the operation
  451. * @param {opResultCallback} callback A callback function
  452. */
  453. command(ns, cmd, options, callback) {
  454. if (typeof options === 'function') {
  455. (callback = options), (options = {}), (options = options || {});
  456. }
  457. translateReadPreference(options);
  458. const readPreference = options.readPreference || ReadPreference.primary;
  459. this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
  460. if (err) {
  461. callback(err, null);
  462. return;
  463. }
  464. const willRetryWrite =
  465. !options.retrying &&
  466. !!options.retryWrites &&
  467. options.session &&
  468. isRetryableWritesSupported(this) &&
  469. !options.session.inTransaction() &&
  470. isWriteCommand(cmd);
  471. const cb = (err, result) => {
  472. if (!err) return callback(null, result);
  473. if (!isRetryableError(err)) {
  474. return callback(err);
  475. }
  476. if (willRetryWrite) {
  477. const newOptions = Object.assign({}, options, { retrying: true });
  478. return this.command(ns, cmd, newOptions, callback);
  479. }
  480. return callback(err);
  481. };
  482. // increment and assign txnNumber
  483. if (willRetryWrite) {
  484. options.session.incrementTransactionNumber();
  485. options.willRetryWrite = willRetryWrite;
  486. }
  487. server.command(ns, cmd, options, cb);
  488. });
  489. }
  490. /**
  491. * Create a new cursor
  492. *
  493. * @method
  494. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  495. * @param {object|Long} cmd Can be either a command returning a cursor or a cursorId
  496. * @param {object} [options] Options for the cursor
  497. * @param {object} [options.batchSize=0] Batchsize for the operation
  498. * @param {array} [options.documents=[]] Initial documents list for cursor
  499. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  500. * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
  501. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
  502. * @param {ClientSession} [options.session=null] Session to use for the operation
  503. * @param {object} [options.topology] The internal topology of the created cursor
  504. * @returns {Cursor}
  505. */
  506. cursor(ns, cmd, options) {
  507. options = options || {};
  508. const topology = options.topology || this;
  509. const CursorClass = options.cursorFactory || this.s.Cursor;
  510. translateReadPreference(options);
  511. return new CursorClass(this.s.bson, ns, cmd, options, topology, this.s.options);
  512. }
  513. get clientInfo() {
  514. return this.s.clientInfo;
  515. }
  516. // Legacy methods for compat with old topology types
  517. isConnected() {
  518. // console.log('not implemented: `isConnected`');
  519. return true;
  520. }
  521. isDestroyed() {
  522. // console.log('not implemented: `isDestroyed`');
  523. return false;
  524. }
  525. unref() {
  526. console.log('not implemented: `unref`');
  527. }
  528. // NOTE: There are many places in code where we explicitly check the last isMaster
  529. // to do feature support detection. This should be done any other way, but for
  530. // now we will just return the first isMaster seen, which should suffice.
  531. lastIsMaster() {
  532. const serverDescriptions = Array.from(this.description.servers.values());
  533. if (serverDescriptions.length === 0) return {};
  534. const sd = serverDescriptions.filter(sd => sd.type !== ServerType.Unknown)[0];
  535. const result = sd || { maxWireVersion: this.description.commonWireVersion };
  536. return result;
  537. }
  538. get logicalSessionTimeoutMinutes() {
  539. return this.description.logicalSessionTimeoutMinutes;
  540. }
  541. get bson() {
  542. return this.s.bson;
  543. }
  544. }
  545. Object.defineProperty(Topology.prototype, 'clusterTime', {
  546. enumerable: true,
  547. get: function() {
  548. return this.s.clusterTime;
  549. },
  550. set: function(clusterTime) {
  551. this.s.clusterTime = clusterTime;
  552. }
  553. });
  554. // legacy aliases
  555. Topology.prototype.destroy = deprecate(
  556. Topology.prototype.close,
  557. 'destroy() is deprecated, please use close() instead'
  558. );
  559. const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
  560. function isWriteCommand(command) {
  561. return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
  562. }
  563. /**
  564. * Destroys a server, and removes all event listeners from the instance
  565. *
  566. * @param {Server} server
  567. */
  568. function destroyServer(server, topology, callback) {
  569. LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event));
  570. server.destroy(() => {
  571. topology.emit(
  572. 'serverClosed',
  573. new monitoring.ServerClosedEvent(topology.s.id, server.description.address)
  574. );
  575. if (typeof callback === 'function') callback(null, null);
  576. });
  577. }
  578. /**
  579. * Parses a basic seedlist in string form
  580. *
  581. * @param {string} seedlist The seedlist to parse
  582. */
  583. function parseStringSeedlist(seedlist) {
  584. return seedlist.split(',').map(seed => ({
  585. host: seed.split(':')[0],
  586. port: seed.split(':')[1] || 27017
  587. }));
  588. }
  589. function topologyTypeFromSeedlist(seedlist, options) {
  590. const replicaSet = options.replicaSet || options.setName || options.rs_name;
  591. if (seedlist.length === 1 && !replicaSet) return TopologyType.Single;
  592. if (replicaSet) return TopologyType.ReplicaSetNoPrimary;
  593. return TopologyType.Unknown;
  594. }
  595. function randomSelection(array) {
  596. return array[Math.floor(Math.random() * array.length)];
  597. }
  598. /**
  599. * Selects servers using the provided selector
  600. *
  601. * @private
  602. * @param {Topology} topology The topology to select servers from
  603. * @param {function} selector The actual predicate used for selecting servers
  604. * @param {Number} timeout The max time we are willing wait for selection
  605. * @param {Number} start A high precision timestamp for the start of the selection process
  606. * @param {function} callback The callback used to convey errors or the resultant servers
  607. */
  608. function selectServers(topology, selector, timeout, start, callback) {
  609. const duration = calculateDurationInMs(start);
  610. if (duration >= timeout) {
  611. return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
  612. }
  613. // ensure we are connected
  614. if (!topology.s.connected) {
  615. topology.connect();
  616. // we want to make sure we're still within the requested timeout window
  617. const failToConnectTimer = setTimeout(() => {
  618. topology.removeListener('connect', connectHandler);
  619. callback(new MongoTimeoutError('Server selection timed out waiting to connect'));
  620. }, timeout - duration);
  621. const connectHandler = () => {
  622. clearTimeout(failToConnectTimer);
  623. selectServers(topology, selector, timeout, process.hrtime(), callback);
  624. };
  625. topology.once('connect', connectHandler);
  626. return;
  627. }
  628. // otherwise, attempt server selection
  629. const serverDescriptions = Array.from(topology.description.servers.values());
  630. let descriptions;
  631. // support server selection by options with readPreference
  632. if (typeof selector === 'object') {
  633. const readPreference = selector.readPreference
  634. ? selector.readPreference
  635. : ReadPreference.primary;
  636. selector = readPreferenceServerSelector(readPreference);
  637. }
  638. try {
  639. descriptions = selector
  640. ? selector(topology.description, serverDescriptions)
  641. : serverDescriptions;
  642. } catch (e) {
  643. return callback(e, null);
  644. }
  645. if (descriptions.length) {
  646. const servers = descriptions.map(description => topology.s.servers.get(description.address));
  647. return callback(null, servers);
  648. }
  649. const retrySelection = () => {
  650. // ensure all server monitors attempt monitoring soon
  651. topology.s.servers.forEach(server => {
  652. setTimeout(
  653. () => server.monitor({ heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS }),
  654. TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS
  655. );
  656. });
  657. const descriptionChangedHandler = () => {
  658. // successful iteration, clear the check timer
  659. clearTimeout(iterationTimer);
  660. if (topology.description.error) {
  661. callback(topology.description.error, null);
  662. return;
  663. }
  664. // topology description has changed due to monitoring, reattempt server selection
  665. selectServers(topology, selector, timeout, start, callback);
  666. };
  667. const iterationTimer = setTimeout(() => {
  668. topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);
  669. callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
  670. }, timeout - duration);
  671. topology.once('topologyDescriptionChanged', descriptionChangedHandler);
  672. };
  673. retrySelection();
  674. }
  675. function createAndConnectServer(topology, serverDescription) {
  676. topology.emit(
  677. 'serverOpening',
  678. new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address)
  679. );
  680. const server = new Server(serverDescription, topology.s.options, topology);
  681. relayEvents(server, topology, SERVER_RELAY_EVENTS);
  682. server.once('connect', serverConnectEventHandler(server, topology));
  683. server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
  684. server.on('error', serverErrorEventHandler(server, topology));
  685. server.on('close', () => topology.emit('close', server));
  686. server.connect();
  687. return server;
  688. }
  689. /**
  690. * Create `Server` instances for all initially known servers, connect them, and assign
  691. * them to the passed in `Topology`.
  692. *
  693. * @param {Topology} topology The topology responsible for the servers
  694. * @param {ServerDescription[]} serverDescriptions A list of server descriptions to connect
  695. */
  696. function connectServers(topology, serverDescriptions) {
  697. topology.s.servers = serverDescriptions.reduce((servers, serverDescription) => {
  698. const server = createAndConnectServer(topology, serverDescription);
  699. servers.set(serverDescription.address, server);
  700. return servers;
  701. }, new Map());
  702. }
  703. function updateServers(topology, incomingServerDescription) {
  704. // update the internal server's description
  705. if (topology.s.servers.has(incomingServerDescription.address)) {
  706. const server = topology.s.servers.get(incomingServerDescription.address);
  707. server.s.description = incomingServerDescription;
  708. }
  709. // add new servers for all descriptions we currently don't know about locally
  710. for (const serverDescription of topology.description.servers.values()) {
  711. if (!topology.s.servers.has(serverDescription.address)) {
  712. const server = createAndConnectServer(topology, serverDescription);
  713. topology.s.servers.set(serverDescription.address, server);
  714. }
  715. }
  716. // for all servers no longer known, remove their descriptions and destroy their instances
  717. for (const entry of topology.s.servers) {
  718. const serverAddress = entry[0];
  719. if (topology.description.hasServer(serverAddress)) {
  720. continue;
  721. }
  722. const server = topology.s.servers.get(serverAddress);
  723. topology.s.servers.delete(serverAddress);
  724. // prepare server for garbage collection
  725. destroyServer(server, topology);
  726. }
  727. }
  728. function serverConnectEventHandler(server, topology) {
  729. return function(/* isMaster, err */) {
  730. server.monitor({
  731. initial: true,
  732. heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS
  733. });
  734. };
  735. }
  736. function serverErrorEventHandler(server, topology) {
  737. return function(err) {
  738. topology.emit(
  739. 'serverClosed',
  740. new monitoring.ServerClosedEvent(topology.s.id, server.description.address)
  741. );
  742. if (err instanceof MongoParseError) {
  743. resetServerState(server, err, { clearPool: true });
  744. return;
  745. }
  746. resetServerState(server, err);
  747. };
  748. }
  749. function executeWriteOperation(args, options, callback) {
  750. if (typeof options === 'function') (callback = options), (options = {});
  751. options = options || {};
  752. // TODO: once we drop Node 4, use destructuring either here or in arguments.
  753. const topology = args.topology;
  754. const op = args.op;
  755. const ns = args.ns;
  756. const ops = args.ops;
  757. const willRetryWrite =
  758. !args.retrying &&
  759. !!options.retryWrites &&
  760. options.session &&
  761. isRetryableWritesSupported(topology) &&
  762. !options.session.inTransaction();
  763. topology.selectServer(writableServerSelector(), options, (err, server) => {
  764. if (err) {
  765. callback(err, null);
  766. return;
  767. }
  768. const handler = (err, result) => {
  769. if (!err) return callback(null, result);
  770. if (!isRetryableError(err)) {
  771. return callback(err);
  772. }
  773. if (willRetryWrite) {
  774. const newArgs = Object.assign({}, args, { retrying: true });
  775. return executeWriteOperation(newArgs, options, callback);
  776. }
  777. return callback(err);
  778. };
  779. if (callback.operationId) {
  780. handler.operationId = callback.operationId;
  781. }
  782. // increment and assign txnNumber
  783. if (willRetryWrite) {
  784. options.session.incrementTransactionNumber();
  785. options.willRetryWrite = willRetryWrite;
  786. }
  787. // execute the write operation
  788. server[op](ns, ops, options, handler);
  789. });
  790. }
  791. /**
  792. * Resets the internal state of this server to `Unknown` by simulating an empty ismaster
  793. *
  794. * @private
  795. * @param {Server} server
  796. * @param {MongoError} error The error that caused the state reset
  797. * @param {object} [options] Optional settings
  798. * @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset
  799. */
  800. function resetServerState(server, error, options) {
  801. options = Object.assign({}, { clearPool: false }, options);
  802. function resetState() {
  803. server.emit(
  804. 'descriptionReceived',
  805. new ServerDescription(server.description.address, null, { error })
  806. );
  807. }
  808. if (options.clearPool && server.pool) {
  809. server.pool.reset(() => resetState());
  810. return;
  811. }
  812. resetState();
  813. }
  814. function translateReadPreference(options) {
  815. if (options.readPreference == null) {
  816. return;
  817. }
  818. let r = options.readPreference;
  819. if (typeof r === 'string') {
  820. options.readPreference = new ReadPreference(r);
  821. } else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
  822. const mode = r.mode || r.preference;
  823. if (mode && typeof mode === 'string') {
  824. options.readPreference = new ReadPreference(mode, r.tags, {
  825. maxStalenessSeconds: r.maxStalenessSeconds
  826. });
  827. }
  828. } else if (!(r instanceof ReadPreference)) {
  829. throw new TypeError('Invalid read preference: ' + r);
  830. }
  831. return options;
  832. }
  833. /**
  834. * A server opening SDAM monitoring event
  835. *
  836. * @event Topology#serverOpening
  837. * @type {ServerOpeningEvent}
  838. */
  839. /**
  840. * A server closed SDAM monitoring event
  841. *
  842. * @event Topology#serverClosed
  843. * @type {ServerClosedEvent}
  844. */
  845. /**
  846. * A server description SDAM change monitoring event
  847. *
  848. * @event Topology#serverDescriptionChanged
  849. * @type {ServerDescriptionChangedEvent}
  850. */
  851. /**
  852. * A topology open SDAM event
  853. *
  854. * @event Topology#topologyOpening
  855. * @type {TopologyOpeningEvent}
  856. */
  857. /**
  858. * A topology closed SDAM event
  859. *
  860. * @event Topology#topologyClosed
  861. * @type {TopologyClosedEvent}
  862. */
  863. /**
  864. * A topology structure SDAM change event
  865. *
  866. * @event Topology#topologyDescriptionChanged
  867. * @type {TopologyDescriptionChangedEvent}
  868. */
  869. /**
  870. * A topology serverHeartbeatStarted SDAM event
  871. *
  872. * @event Topology#serverHeartbeatStarted
  873. * @type {ServerHeartbeatStartedEvent}
  874. */
  875. /**
  876. * A topology serverHeartbeatFailed SDAM event
  877. *
  878. * @event Topology#serverHeartbeatFailed
  879. * @type {ServerHearbeatFailedEvent}
  880. */
  881. /**
  882. * A topology serverHeartbeatSucceeded SDAM change event
  883. *
  884. * @event Topology#serverHeartbeatSucceeded
  885. * @type {ServerHeartbeatSucceededEvent}
  886. */
  887. /**
  888. * An event emitted indicating a command was started, if command monitoring is enabled
  889. *
  890. * @event Topology#commandStarted
  891. * @type {object}
  892. */
  893. /**
  894. * An event emitted indicating a command succeeded, if command monitoring is enabled
  895. *
  896. * @event Topology#commandSucceeded
  897. * @type {object}
  898. */
  899. /**
  900. * An event emitted indicating a command failed, if command monitoring is enabled
  901. *
  902. * @event Topology#commandFailed
  903. * @type {object}
  904. */
  905. module.exports = Topology;