Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

topology_description.js 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. 'use strict';
  2. const ServerType = require('./server_description').ServerType;
  3. const ServerDescription = require('./server_description').ServerDescription;
  4. const ReadPreference = require('../topologies/read_preference');
  5. const WIRE_CONSTANTS = require('../wireprotocol/constants');
  6. // contstants related to compatability checks
  7. const MIN_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_SERVER_VERSION;
  8. const MAX_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_SERVER_VERSION;
  9. const MIN_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_WIRE_VERSION;
  10. const MAX_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_WIRE_VERSION;
  11. // An enumeration of topology types we know about
  12. const TopologyType = {
  13. Single: 'Single',
  14. ReplicaSetNoPrimary: 'ReplicaSetNoPrimary',
  15. ReplicaSetWithPrimary: 'ReplicaSetWithPrimary',
  16. Sharded: 'Sharded',
  17. Unknown: 'Unknown'
  18. };
  19. // Representation of a deployment of servers
  20. class TopologyDescription {
  21. /**
  22. * Create a TopologyDescription
  23. *
  24. * @param {string} topologyType
  25. * @param {Map<string, ServerDescription>} serverDescriptions the a map of address to ServerDescription
  26. * @param {string} setName
  27. * @param {number} maxSetVersion
  28. * @param {ObjectId} maxElectionId
  29. */
  30. constructor(
  31. topologyType,
  32. serverDescriptions,
  33. setName,
  34. maxSetVersion,
  35. maxElectionId,
  36. commonWireVersion,
  37. options,
  38. error
  39. ) {
  40. options = options || {};
  41. // TODO: consider assigning all these values to a temporary value `s` which
  42. // we use `Object.freeze` on, ensuring the internal state of this type
  43. // is immutable.
  44. this.type = topologyType || TopologyType.Unknown;
  45. this.setName = setName || null;
  46. this.maxSetVersion = maxSetVersion || null;
  47. this.maxElectionId = maxElectionId || null;
  48. this.servers = serverDescriptions || new Map();
  49. this.stale = false;
  50. this.compatible = true;
  51. this.compatibilityError = null;
  52. this.logicalSessionTimeoutMinutes = null;
  53. this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 0;
  54. this.localThresholdMS = options.localThresholdMS || 0;
  55. this.options = options;
  56. this.error = error;
  57. this.commonWireVersion = commonWireVersion || null;
  58. // determine server compatibility
  59. for (const serverDescription of this.servers.values()) {
  60. if (serverDescription.type === ServerType.Unknown) continue;
  61. if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) {
  62. this.compatible = false;
  63. this.compatibilityError = `Server at ${serverDescription.address} requires wire version ${
  64. serverDescription.minWireVersion
  65. }, but this version of the driver only supports up to ${MAX_SUPPORTED_WIRE_VERSION} (MongoDB ${MAX_SUPPORTED_SERVER_VERSION})`;
  66. }
  67. if (serverDescription.maxWireVersion < MIN_SUPPORTED_WIRE_VERSION) {
  68. this.compatible = false;
  69. this.compatibilityError = `Server at ${serverDescription.address} reports wire version ${
  70. serverDescription.maxWireVersion
  71. }, but this version of the driver requires at least ${MIN_SUPPORTED_WIRE_VERSION} (MongoDB ${MIN_SUPPORTED_SERVER_VERSION}).`;
  72. break;
  73. }
  74. }
  75. // Whenever a client updates the TopologyDescription from an ismaster response, it MUST set
  76. // TopologyDescription.logicalSessionTimeoutMinutes to the smallest logicalSessionTimeoutMinutes
  77. // value among ServerDescriptions of all data-bearing server types. If any have a null
  78. // logicalSessionTimeoutMinutes, then TopologyDescription.logicalSessionTimeoutMinutes MUST be
  79. // set to null.
  80. const readableServers = Array.from(this.servers.values()).filter(s => s.isReadable);
  81. this.logicalSessionTimeoutMinutes = readableServers.reduce((result, server) => {
  82. if (server.logicalSessionTimeoutMinutes == null) return null;
  83. if (result == null) return server.logicalSessionTimeoutMinutes;
  84. return Math.min(result, server.logicalSessionTimeoutMinutes);
  85. }, null);
  86. }
  87. /**
  88. * Returns a copy of this description updated with a given ServerDescription
  89. *
  90. * @param {ServerDescription} serverDescription
  91. */
  92. update(serverDescription) {
  93. const address = serverDescription.address;
  94. // NOTE: there are a number of prime targets for refactoring here
  95. // once we support destructuring assignments
  96. // potentially mutated values
  97. let topologyType = this.type;
  98. let setName = this.setName;
  99. let maxSetVersion = this.maxSetVersion;
  100. let maxElectionId = this.maxElectionId;
  101. let commonWireVersion = this.commonWireVersion;
  102. let error = serverDescription.error || null;
  103. const serverType = serverDescription.type;
  104. let serverDescriptions = new Map(this.servers);
  105. // update common wire version
  106. if (serverDescription.maxWireVersion !== 0) {
  107. if (commonWireVersion == null) {
  108. commonWireVersion = serverDescription.maxWireVersion;
  109. } else {
  110. commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);
  111. }
  112. }
  113. // update the actual server description
  114. serverDescriptions.set(address, serverDescription);
  115. if (topologyType === TopologyType.Single) {
  116. // once we are defined as single, that never changes
  117. return new TopologyDescription(
  118. TopologyType.Single,
  119. serverDescriptions,
  120. setName,
  121. maxSetVersion,
  122. maxElectionId,
  123. commonWireVersion,
  124. this.options,
  125. error
  126. );
  127. }
  128. if (topologyType === TopologyType.Unknown) {
  129. if (serverType === ServerType.Standalone) {
  130. serverDescriptions.delete(address);
  131. } else {
  132. topologyType = topologyTypeForServerType(serverType);
  133. }
  134. }
  135. if (topologyType === TopologyType.Sharded) {
  136. if ([ServerType.Mongos, ServerType.Unknown].indexOf(serverType) === -1) {
  137. serverDescriptions.delete(address);
  138. }
  139. }
  140. if (topologyType === TopologyType.ReplicaSetNoPrimary) {
  141. if ([ServerType.Mongos, ServerType.Unknown].indexOf(serverType) >= 0) {
  142. serverDescriptions.delete(address);
  143. }
  144. if (serverType === ServerType.RSPrimary) {
  145. const result = updateRsFromPrimary(
  146. serverDescriptions,
  147. setName,
  148. serverDescription,
  149. maxSetVersion,
  150. maxElectionId
  151. );
  152. (topologyType = result[0]),
  153. (setName = result[1]),
  154. (maxSetVersion = result[2]),
  155. (maxElectionId = result[3]);
  156. } else if (
  157. [ServerType.RSSecondary, ServerType.RSArbiter, ServerType.RSOther].indexOf(serverType) >= 0
  158. ) {
  159. const result = updateRsNoPrimaryFromMember(serverDescriptions, setName, serverDescription);
  160. (topologyType = result[0]), (setName = result[1]);
  161. }
  162. }
  163. if (topologyType === TopologyType.ReplicaSetWithPrimary) {
  164. if ([ServerType.Standalone, ServerType.Mongos].indexOf(serverType) >= 0) {
  165. serverDescriptions.delete(address);
  166. topologyType = checkHasPrimary(serverDescriptions);
  167. } else if (serverType === ServerType.RSPrimary) {
  168. const result = updateRsFromPrimary(
  169. serverDescriptions,
  170. setName,
  171. serverDescription,
  172. maxSetVersion,
  173. maxElectionId
  174. );
  175. (topologyType = result[0]),
  176. (setName = result[1]),
  177. (maxSetVersion = result[2]),
  178. (maxElectionId = result[3]);
  179. } else if (
  180. [ServerType.RSSecondary, ServerType.RSArbiter, ServerType.RSOther].indexOf(serverType) >= 0
  181. ) {
  182. topologyType = updateRsWithPrimaryFromMember(
  183. serverDescriptions,
  184. setName,
  185. serverDescription
  186. );
  187. } else {
  188. topologyType = checkHasPrimary(serverDescriptions);
  189. }
  190. }
  191. return new TopologyDescription(
  192. topologyType,
  193. serverDescriptions,
  194. setName,
  195. maxSetVersion,
  196. maxElectionId,
  197. commonWireVersion,
  198. this.options,
  199. error
  200. );
  201. }
  202. /**
  203. * Determines if the topology has a readable server available. See the table in the
  204. * following section for behaviour rules.
  205. *
  206. * @param {ReadPreference} [readPreference] An optional read preference for determining if a readable server is present
  207. * @return {Boolean} Whether there is a readable server in this topology
  208. */
  209. hasReadableServer(/* readPreference */) {
  210. // To be implemented when server selection is implemented
  211. }
  212. /**
  213. * Determines if the topology has a writable server available. See the table in the
  214. * following section for behaviour rules.
  215. *
  216. * @return {Boolean} Whether there is a writable server in this topology
  217. */
  218. hasWritableServer() {
  219. return this.hasReadableServer(ReadPreference.primary);
  220. }
  221. /**
  222. * Determines if the topology has a definition for the provided address
  223. *
  224. * @param {String} address
  225. * @return {Boolean} Whether the topology knows about this server
  226. */
  227. hasServer(address) {
  228. return this.servers.has(address);
  229. }
  230. }
  231. function topologyTypeForServerType(serverType) {
  232. if (serverType === ServerType.Mongos) return TopologyType.Sharded;
  233. if (serverType === ServerType.RSPrimary) return TopologyType.ReplicaSetWithPrimary;
  234. return TopologyType.ReplicaSetNoPrimary;
  235. }
  236. function updateRsFromPrimary(
  237. serverDescriptions,
  238. setName,
  239. serverDescription,
  240. maxSetVersion,
  241. maxElectionId
  242. ) {
  243. setName = setName || serverDescription.setName;
  244. if (setName !== serverDescription.setName) {
  245. serverDescriptions.delete(serverDescription.address);
  246. return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
  247. }
  248. const electionIdOID = serverDescription.electionId ? serverDescription.electionId.$oid : null;
  249. const maxElectionIdOID = maxElectionId ? maxElectionId.$oid : null;
  250. if (serverDescription.setVersion != null && electionIdOID != null) {
  251. if (maxSetVersion != null && maxElectionIdOID != null) {
  252. if (maxSetVersion > serverDescription.setVersion || maxElectionIdOID > electionIdOID) {
  253. // this primary is stale, we must remove it
  254. serverDescriptions.set(
  255. serverDescription.address,
  256. new ServerDescription(serverDescription.address)
  257. );
  258. return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
  259. }
  260. }
  261. maxElectionId = serverDescription.electionId;
  262. }
  263. if (
  264. serverDescription.setVersion != null &&
  265. (maxSetVersion == null || serverDescription.setVersion > maxSetVersion)
  266. ) {
  267. maxSetVersion = serverDescription.setVersion;
  268. }
  269. // We've heard from the primary. Is it the same primary as before?
  270. for (const address of serverDescriptions.keys()) {
  271. const server = serverDescriptions.get(address);
  272. if (server.type === ServerType.RSPrimary && server.address !== serverDescription.address) {
  273. // Reset old primary's type to Unknown.
  274. serverDescriptions.set(address, new ServerDescription(server.address));
  275. // There can only be one primary
  276. break;
  277. }
  278. }
  279. // Discover new hosts from this primary's response.
  280. serverDescription.allHosts.forEach(address => {
  281. if (!serverDescriptions.has(address)) {
  282. serverDescriptions.set(address, new ServerDescription(address));
  283. }
  284. });
  285. // Remove hosts not in the response.
  286. const currentAddresses = Array.from(serverDescriptions.keys());
  287. const responseAddresses = serverDescription.allHosts;
  288. currentAddresses.filter(addr => responseAddresses.indexOf(addr) === -1).forEach(address => {
  289. serverDescriptions.delete(address);
  290. });
  291. return [checkHasPrimary(serverDescriptions), setName, maxSetVersion, maxElectionId];
  292. }
  293. function updateRsWithPrimaryFromMember(serverDescriptions, setName, serverDescription) {
  294. if (setName == null) {
  295. throw new TypeError('setName is required');
  296. }
  297. if (
  298. setName !== serverDescription.setName ||
  299. (serverDescription.me && serverDescription.address !== serverDescription.me)
  300. ) {
  301. serverDescriptions.delete(serverDescription.address);
  302. }
  303. return checkHasPrimary(serverDescriptions);
  304. }
  305. function updateRsNoPrimaryFromMember(serverDescriptions, setName, serverDescription) {
  306. let topologyType = TopologyType.ReplicaSetNoPrimary;
  307. setName = setName || serverDescription.setName;
  308. if (setName !== serverDescription.setName) {
  309. serverDescriptions.delete(serverDescription.address);
  310. return [topologyType, setName];
  311. }
  312. serverDescription.allHosts.forEach(address => {
  313. if (!serverDescriptions.has(address)) {
  314. serverDescriptions.set(address, new ServerDescription(address));
  315. }
  316. });
  317. if (serverDescription.me && serverDescription.address !== serverDescription.me) {
  318. serverDescriptions.delete(serverDescription.address);
  319. }
  320. return [topologyType, setName];
  321. }
  322. function checkHasPrimary(serverDescriptions) {
  323. for (const addr of serverDescriptions.keys()) {
  324. if (serverDescriptions.get(addr).type === ServerType.RSPrimary) {
  325. return TopologyType.ReplicaSetWithPrimary;
  326. }
  327. }
  328. return TopologyType.ReplicaSetNoPrimary;
  329. }
  330. module.exports = {
  331. TopologyType,
  332. TopologyDescription
  333. };