'use strict'; const Query = require('../connection/commands').Query; const Msg = require('../connection/msg').Msg; const MongoError = require('../error').MongoError; const getReadPreference = require('./shared').getReadPreference; const isSharded = require('./shared').isSharded; const databaseNamespace = require('./shared').databaseNamespace; const isTransactionCommand = require('../transactions').isTransactionCommand; const applySession = require('../sessions').applySession; function command(server, ns, cmd, options, callback) { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; if (cmd == null) { return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`)); } const bson = server.s.bson; const pool = server.s.pool; const readPreference = getReadPreference(cmd, options); const shouldUseOpMsg = supportsOpMsg(server); const session = options.session; let clusterTime = server.clusterTime; let finalCmd = Object.assign({}, cmd); if (hasSessionSupport(server) && session) { if ( session.clusterTime && session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime) ) { clusterTime = session.clusterTime; } const err = applySession(session, finalCmd, options); if (err) { return callback(err); } } // if we have a known cluster time, gossip it if (clusterTime) { finalCmd.$clusterTime = clusterTime; } if ( isSharded(server) && !shouldUseOpMsg && readPreference && readPreference.preference !== 'primary' ) { finalCmd = { $query: finalCmd, $readPreference: readPreference.toJSON() }; } const commandOptions = Object.assign( { command: true, numberToSkip: 0, numberToReturn: -1, checkKeys: false }, options ); // This value is not overridable commandOptions.slaveOk = readPreference.slaveOk(); const cmdNs = `${databaseNamespace(ns)}.$cmd`; const message = shouldUseOpMsg ? new Msg(bson, cmdNs, finalCmd, commandOptions) : new Query(bson, cmdNs, finalCmd, commandOptions); const inTransaction = session && (session.inTransaction() || isTransactionCommand(finalCmd)); const commandResponseHandler = inTransaction ? function(err) { if ( !cmd.commitTransaction && err && err instanceof MongoError && err.hasErrorLabel('TransientTransactionError') ) { session.transaction.unpinServer(); } return callback.apply(null, arguments); } : callback; try { pool.write(message, commandOptions, commandResponseHandler); } catch (err) { commandResponseHandler(err); } } function hasSessionSupport(topology) { if (topology == null) return false; if (topology.description) { return topology.description.maxWireVersion >= 6; } return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6; } function supportsOpMsg(topologyOrServer) { const description = topologyOrServer.ismaster ? topologyOrServer.ismaster : topologyOrServer.description; if (description == null) { return false; } return description.maxWireVersion >= 6 && description.__nodejs_mock_server__ == null; } module.exports = command;