2019-02-01 14:06:44 +01:00
'use strict' ;
const EventEmitter = require ( 'events' ) ;
const isResumableError = require ( './error' ) . isResumableError ;
const MongoError = require ( 'mongodb-core' ) . MongoError ;
var cursorOptionNames = [ 'maxAwaitTimeMS' , 'collation' , 'readPreference' ] ;
const CHANGE _DOMAIN _TYPES = {
COLLECTION : Symbol ( 'Collection' ) ,
DATABASE : Symbol ( 'Database' ) ,
CLUSTER : Symbol ( 'Cluster' )
} ;
/ * *
* Creates a new Change Stream instance . Normally created using { @ link Collection # watch | Collection . watch ( ) } .
* @ class ChangeStream
* @ since 3.0 . 0
* @ param { ( MongoClient | Db | Collection ) } changeDomain The domain against which to create the change stream
* @ param { Array } pipeline An array of { @ link https : //docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
* @ param { object } [ options ] Optional settings
* @ param { string } [ options . fullDocument = 'default' ] Allowed values : ‘ default ’ , ‘ updateLookup ’ . When set to ‘ updateLookup ’ , the change stream will include both a delta describing the changes to the document , as well as a copy of the entire document that was changed from some time after the change occurred .
* @ param { number } [ options . maxAwaitTimeMS ] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @ param { object } [ options . resumeAfter ] Specifies the logical starting point for the new change stream . This should be the _id field from a previously returned change stream document .
* @ param { number } [ options . batchSize ] The number of documents to return per batch . See { @ link https : //docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @ param { object } [ options . collation ] Specify collation settings for operation . See { @ link https : //docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @ param { ReadPreference } [ options . readPreference ] The read preference . Defaults to the read preference of the database or collection . See { @ link https : //docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @ fires ChangeStream # close
* @ fires ChangeStream # change
* @ fires ChangeStream # end
* @ fires ChangeStream # error
* @ return { ChangeStream } a ChangeStream instance .
* /
class ChangeStream extends EventEmitter {
constructor ( changeDomain , pipeline , options ) {
super ( ) ;
const Collection = require ( './collection' ) ;
const Db = require ( './db' ) ;
const MongoClient = require ( './mongo_client' ) ;
this . pipeline = pipeline || [ ] ;
this . options = options || { } ;
this . cursorNamespace = undefined ;
this . namespace = { } ;
if ( changeDomain instanceof Collection ) {
this . type = CHANGE _DOMAIN _TYPES . COLLECTION ;
this . topology = changeDomain . s . db . serverConfig ;
this . namespace = {
collection : changeDomain . collectionName ,
database : changeDomain . s . db . databaseName
} ;
this . cursorNamespace = ` ${ this . namespace . database } . ${ this . namespace . collection } ` ;
} else if ( changeDomain instanceof Db ) {
this . type = CHANGE _DOMAIN _TYPES . DATABASE ;
this . namespace = { collection : '' , database : changeDomain . databaseName } ;
this . cursorNamespace = this . namespace . database ;
this . topology = changeDomain . serverConfig ;
} else if ( changeDomain instanceof MongoClient ) {
this . type = CHANGE _DOMAIN _TYPES . CLUSTER ;
this . namespace = { collection : '' , database : 'admin' } ;
this . cursorNamespace = this . namespace . database ;
this . topology = changeDomain . topology ;
} else {
throw new TypeError (
'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
) ;
}
this . promiseLibrary = changeDomain . s . promiseLibrary ;
if ( ! this . options . readPreference && changeDomain . s . readPreference ) {
this . options . readPreference = changeDomain . s . readPreference ;
}
// We need to get the operationTime as early as possible
const isMaster = this . topology . lastIsMaster ( ) ;
if ( ! isMaster ) {
throw new MongoError ( 'Topology does not have an ismaster yet.' ) ;
}
this . operationTime = isMaster . operationTime ;
// Create contained Change Stream cursor
this . cursor = createChangeStreamCursor ( this ) ;
// Listen for any `change` listeners being added to ChangeStream
this . on ( 'newListener' , eventName => {
if ( eventName === 'change' && this . cursor && this . listenerCount ( 'change' ) === 0 ) {
this . cursor . on ( 'data' , change =>
processNewChange ( { changeStream : this , change , eventEmitter : true } )
) ;
}
} ) ;
// Listen for all `change` listeners being removed from ChangeStream
this . on ( 'removeListener' , eventName => {
if ( eventName === 'change' && this . listenerCount ( 'change' ) === 0 && this . cursor ) {
this . cursor . removeAllListeners ( 'data' ) ;
}
} ) ;
}
/ * *
* Check if there is any document still available in the Change Stream
* @ function ChangeStream . prototype . hasNext
* @ param { ChangeStream ~ resultCallback } [ callback ] The result callback .
* @ throws { MongoError }
* @ return { Promise } returns Promise if no callback passed
* /
hasNext ( callback ) {
return this . cursor . hasNext ( callback ) ;
}
/ * *
* Get the next available document from the Change Stream , returns null if no more documents are available .
* @ function ChangeStream . prototype . next
* @ param { ChangeStream ~ resultCallback } [ callback ] The result callback .
* @ throws { MongoError }
* @ return { Promise } returns Promise if no callback passed
* /
next ( callback ) {
var self = this ;
if ( this . isClosed ( ) ) {
if ( callback ) return callback ( new Error ( 'Change Stream is not open.' ) , null ) ;
return self . promiseLibrary . reject ( new Error ( 'Change Stream is not open.' ) ) ;
}
return this . cursor
. next ( )
. then ( change => processNewChange ( { changeStream : self , change , callback } ) )
. catch ( error => processNewChange ( { changeStream : self , error , callback } ) ) ;
}
/ * *
* Is the cursor closed
* @ method ChangeStream . prototype . isClosed
* @ return { boolean }
* /
isClosed ( ) {
if ( this . cursor ) {
return this . cursor . isClosed ( ) ;
}
return true ;
}
/ * *
* Close the Change Stream
* @ method ChangeStream . prototype . close
* @ param { ChangeStream ~ resultCallback } [ callback ] The result callback .
* @ return { Promise } returns Promise if no callback passed
* /
close ( callback ) {
if ( ! this . cursor ) {
if ( callback ) return callback ( ) ;
return this . promiseLibrary . resolve ( ) ;
}
// Tidy up the existing cursor
var cursor = this . cursor ;
delete this . cursor ;
return cursor . close ( callback ) ;
}
/ * *
* This method pulls all the data out of a readable stream , and writes it to the supplied destination , automatically managing the flow so that the destination is not overwhelmed by a fast readable stream .
* @ method
* @ param { Writable } destination The destination for writing data
* @ param { object } [ options ] { @ link https : //nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
* @ return { null }
* /
pipe ( destination , options ) {
if ( ! this . pipeDestinations ) {
this . pipeDestinations = [ ] ;
}
this . pipeDestinations . push ( destination ) ;
return this . cursor . pipe ( destination , options ) ;
}
/ * *
* This method will remove the hooks set up for a previous pipe ( ) call .
* @ param { Writable } [ destination ] The destination for writing data
* @ return { null }
* /
unpipe ( destination ) {
if ( this . pipeDestinations && this . pipeDestinations . indexOf ( destination ) > - 1 ) {
this . pipeDestinations . splice ( this . pipeDestinations . indexOf ( destination ) , 1 ) ;
}
return this . cursor . unpipe ( destination ) ;
}
/ * *
* Return a modified Readable stream including a possible transform method .
* @ method
* @ param { object } [ options ] Optional settings .
* @ param { function } [ options . transform ] A transformation method applied to each document emitted by the stream .
* @ return { Cursor }
* /
stream ( options ) {
this . streamOptions = options ;
return this . cursor . stream ( options ) ;
}
/ * *
* This method will cause a stream in flowing mode to stop emitting data events . Any data that becomes available will remain in the internal buffer .
* @ return { null }
* /
pause ( ) {
return this . cursor . pause ( ) ;
}
/ * *
* This method will cause the readable stream to resume emitting data events .
* @ return { null }
* /
resume ( ) {
return this . cursor . resume ( ) ;
}
}
// Create a new change stream cursor based on self's configuration
var createChangeStreamCursor = function ( self ) {
if ( self . resumeToken ) {
self . options . resumeAfter = self . resumeToken ;
}
var changeStreamCursor = buildChangeStreamAggregationCommand ( self ) ;
/ * *
* Fired for each new matching change in the specified namespace . Attaching a ` change `
* event listener to a Change Stream will switch the stream into flowing mode . Data will
* then be passed as soon as it is available .
*
* @ event ChangeStream # change
* @ type { object }
* /
if ( self . listenerCount ( 'change' ) > 0 ) {
changeStreamCursor . on ( 'data' , function ( change ) {
processNewChange ( { changeStream : self , change , eventEmitter : true } ) ;
} ) ;
}
/ * *
* Change stream close event
*
* @ event ChangeStream # close
* @ type { null }
* /
changeStreamCursor . on ( 'close' , function ( ) {
self . emit ( 'close' ) ;
} ) ;
/ * *
* Change stream end event
*
* @ event ChangeStream # end
* @ type { null }
* /
changeStreamCursor . on ( 'end' , function ( ) {
self . emit ( 'end' ) ;
} ) ;
/ * *
* Fired when the stream encounters an error .
*
* @ event ChangeStream # error
* @ type { Error }
* /
changeStreamCursor . on ( 'error' , function ( error ) {
processNewChange ( { changeStream : self , error , eventEmitter : true } ) ;
} ) ;
if ( self . pipeDestinations ) {
const cursorStream = changeStreamCursor . stream ( self . streamOptions ) ;
for ( let pipeDestination in self . pipeDestinations ) {
cursorStream . pipe ( pipeDestination ) ;
}
}
return changeStreamCursor ;
} ;
function getResumeToken ( self ) {
return self . resumeToken || self . options . resumeAfter ;
}
function getStartAtOperationTime ( self ) {
const isMaster = self . topology . lastIsMaster ( ) || { } ;
return (
isMaster . maxWireVersion && isMaster . maxWireVersion >= 7 && self . options . startAtOperationTime
) ;
}
var buildChangeStreamAggregationCommand = function ( self ) {
const topology = self . topology ;
const namespace = self . namespace ;
const pipeline = self . pipeline ;
const options = self . options ;
const cursorNamespace = self . cursorNamespace ;
var changeStreamStageOptions = {
fullDocument : options . fullDocument || 'default'
} ;
const resumeToken = getResumeToken ( self ) ;
const startAtOperationTime = getStartAtOperationTime ( self ) ;
if ( resumeToken ) {
changeStreamStageOptions . resumeAfter = resumeToken ;
}
if ( startAtOperationTime ) {
changeStreamStageOptions . startAtOperationTime = startAtOperationTime ;
}
// Map cursor options
var cursorOptions = { } ;
cursorOptionNames . forEach ( function ( optionName ) {
if ( options [ optionName ] ) {
cursorOptions [ optionName ] = options [ optionName ] ;
}
} ) ;
if ( self . type === CHANGE _DOMAIN _TYPES . CLUSTER ) {
changeStreamStageOptions . allChangesForCluster = true ;
}
var changeStreamPipeline = [ { $changeStream : changeStreamStageOptions } ] ;
changeStreamPipeline = changeStreamPipeline . concat ( pipeline ) ;
var command = {
aggregate : self . type === CHANGE _DOMAIN _TYPES . COLLECTION ? namespace . collection : 1 ,
pipeline : changeStreamPipeline ,
readConcern : { level : 'majority' } ,
cursor : {
batchSize : options . batchSize || 1
}
} ;
// Create and return the cursor
return topology . cursor ( cursorNamespace , command , cursorOptions ) ;
} ;
// This method performs a basic server selection loop, satisfying the requirements of
// ChangeStream resumability until the new SDAM layer can be used.
const SELECTION _TIMEOUT = 30000 ;
function waitForTopologyConnected ( topology , options , callback ) {
setTimeout ( ( ) => {
if ( options && options . start == null ) options . start = process . hrtime ( ) ;
const start = options . start || process . hrtime ( ) ;
const timeout = options . timeout || SELECTION _TIMEOUT ;
const readPreference = options . readPreference ;
if ( topology . isConnected ( { readPreference } ) ) return callback ( null , null ) ;
const hrElapsed = process . hrtime ( start ) ;
const elapsed = ( hrElapsed [ 0 ] * 1e9 + hrElapsed [ 1 ] ) / 1e6 ;
if ( elapsed > timeout ) return callback ( new MongoError ( 'Timed out waiting for connection' ) ) ;
waitForTopologyConnected ( topology , options , callback ) ;
} , 3000 ) ; // this is an arbitrary wait time to allow SDAM to transition
}
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
function processNewChange ( args ) {
const changeStream = args . changeStream ;
const error = args . error ;
const change = args . change ;
const callback = args . callback ;
const eventEmitter = args . eventEmitter || false ;
2019-07-02 16:05:15 +02:00
// If the changeStream is closed, then it should not process a change.
if ( changeStream . isClosed ( ) ) {
// We do not error in the eventEmitter case.
if ( eventEmitter ) {
return ;
}
const error = new MongoError ( 'ChangeStream is closed' ) ;
return typeof callback === 'function'
? callback ( error , null )
: changeStream . promiseLibrary . reject ( error ) ;
}
2019-02-01 14:06:44 +01:00
const topology = changeStream . topology ;
const options = changeStream . cursor . options ;
if ( error ) {
if ( isResumableError ( error ) && ! changeStream . attemptingResume ) {
changeStream . attemptingResume = true ;
if ( ! ( getResumeToken ( changeStream ) || getStartAtOperationTime ( changeStream ) ) ) {
const startAtOperationTime = changeStream . cursor . cursorState . operationTime ;
changeStream . options = Object . assign ( { startAtOperationTime } , changeStream . options ) ;
}
// stop listening to all events from old cursor
[ 'data' , 'close' , 'end' , 'error' ] . forEach ( event =>
changeStream . cursor . removeAllListeners ( event )
) ;
// close internal cursor, ignore errors
changeStream . cursor . close ( ) ;
// attempt recreating the cursor
if ( eventEmitter ) {
waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
if ( err ) return changeStream . emit ( 'error' , err ) ;
changeStream . cursor = createChangeStreamCursor ( changeStream ) ;
} ) ;
return ;
}
if ( callback ) {
waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
if ( err ) return callback ( err , null ) ;
changeStream . cursor = createChangeStreamCursor ( changeStream ) ;
changeStream . next ( callback ) ;
} ) ;
return ;
}
return new Promise ( ( resolve , reject ) => {
waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
if ( err ) return reject ( err ) ;
resolve ( ) ;
} ) ;
} )
. then ( ( ) => ( changeStream . cursor = createChangeStreamCursor ( changeStream ) ) )
. then ( ( ) => changeStream . next ( ) ) ;
}
if ( eventEmitter ) return changeStream . emit ( 'error' , error ) ;
if ( typeof callback === 'function' ) return callback ( error , null ) ;
return changeStream . promiseLibrary . reject ( error ) ;
}
changeStream . attemptingResume = false ;
// Cache the resume token if it is present. If it is not present return an error.
if ( ! change || ! change . _id ) {
var noResumeTokenError = new Error (
'A change stream document has been received that lacks a resume token (_id).'
) ;
if ( eventEmitter ) return changeStream . emit ( 'error' , noResumeTokenError ) ;
if ( typeof callback === 'function' ) return callback ( noResumeTokenError , null ) ;
return changeStream . promiseLibrary . reject ( noResumeTokenError ) ;
}
changeStream . resumeToken = change . _id ;
// Return the change
if ( eventEmitter ) return changeStream . emit ( 'change' , change ) ;
if ( typeof callback === 'function' ) return callback ( error , change ) ;
return changeStream . promiseLibrary . resolve ( change ) ;
}
/ * *
* The callback format for results
* @ callback ChangeStream ~ resultCallback
* @ param { MongoError } error An error instance representing the error during the execution .
* @ param { ( object | null ) } result The result object if the command was executed successfully .
* /
module . exports = ChangeStream ;