Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

README.md 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. # vasync: observable asynchronous control flow
  2. This module provides several functions for asynchronous control flow. There are
  3. many modules that do this already (notably async.js). This one's claim to fame
  4. is improved debuggability.
  5. ## Observability is important
  6. Working with Node's asynchronous, callback-based model is much easier with a
  7. handful of simple control-flow abstractions, like:
  8. * waterfalls and pipelines (which invoke a list of asynchronous callbacks
  9. sequentially)
  10. * parallel pipelines (which invoke a list of asynchronous callbacks in parallel
  11. and invoke a top-level callback when the last one completes).
  12. * queues
  13. * barriers
  14. But these structures also introduce new types of programming errors: failing to
  15. invoke the callback can cause the program to hang, and inadvertently invoking it
  16. twice can cause all kinds of mayhem that's very difficult to debug.
  17. The functions in this module keep track of what's going on so that you can
  18. figure out what happened when your program goes wrong. They generally return an
  19. object describing details of the current state. If your program goes wrong, you
  20. have several ways of getting at this state:
  21. * On illumos-based systems, use MDB to [find the status object](http://dtrace.org/blogs/bmc/2012/05/05/debugging-node-js-memory-leaks/)
  22. and then [print it out](http://dtrace.org/blogs/dap/2012/01/13/playing-with-nodev8-postmortem-debugging/).
  23. * Provide an HTTP API (or AMQP, or whatever) that returns these pending status
  24. objects as JSON (see [kang](https://github.com/davepacheco/kang)).
  25. * Incorporate a REPL into your program and print out the status object.
  26. * Use the Node debugger to print out the status object.
  27. ## Functions
  28. * [parallel](#parallel-invoke-n-functions-in-parallel): invoke N functions in
  29. parallel (and merge the results)
  30. * [forEachParallel](#foreachparallel-invoke-the-same-function-on-n-inputs-in-parallel):
  31. invoke the same function on N inputs in parallel
  32. * [pipeline](#pipeline-invoke-n-functions-in-series-and-stop-on-failure): invoke
  33. N functions in series (and stop on failure)
  34. * [forEachPipeline](#foreachpipeline-invoke-the-same-function-on-n-inputs-in-series-and-stop-on-failure):
  35. invoke the same function on N inputs in series (and stop on failure)
  36. * [waterfall](#waterfall-invoke-n-functions-in-series-stop-on-failure-and-propagate-results):
  37. like pipeline, but propagating results between stages
  38. * [barrier](#barrier-coordinate-multiple-concurrent-operations): coordinate
  39. multiple concurrent operations
  40. * [queue/queuev](#queuequeuev-fixed-size-worker-queue): fixed-size worker queue
  41. ### parallel: invoke N functions in parallel
  42. Synopsis: `parallel(args, callback)`
  43. This function takes a list of input functions (specified by the "funcs" property
  44. of "args") and runs them all. These input functions are expected to be
  45. asynchronous: they get a "callback" argument and should invoke it as
  46. `callback(err, result)`. The error and result will be saved and made available
  47. to the original caller when all of these functions complete.
  48. This function returns the same "result" object it passes to the callback, and
  49. you can use the fields in this object to debug or observe progress:
  50. * `operations`: array corresponding to the input functions, with
  51. * `func`: input function,
  52. * `status`: "pending", "ok", or "fail",
  53. * `err`: returned "err" value, if any, and
  54. * `result`: returned "result" value, if any
  55. * `successes`: "result" field for each of "operations" where
  56. "status" == "ok" (in no particular order)
  57. * `ndone`: number of input operations that have completed
  58. * `nerrors`: number of input operations that have failed
  59. This status object lets you see in a debugger exactly which functions have
  60. completed, what they returned, and which ones are outstanding.
  61. All errors are combined into a single "err" parameter to the final callback (see
  62. below).
  63. Example usage:
  64. ```js
  65. console.log(mod_vasync.parallel({
  66. 'funcs': [
  67. function f1 (callback) { mod_dns.resolve('joyent.com', callback); },
  68. function f2 (callback) { mod_dns.resolve('github.com', callback); },
  69. function f3 (callback) { mod_dns.resolve('asdfaqsdfj.com', callback); }
  70. ]
  71. }, function (err, results) {
  72. console.log('error: %s', err.message);
  73. console.log('results: %s', mod_util.inspect(results, null, 3));
  74. }));
  75. ```
  76. In the first tick, this outputs:
  77. ```js
  78. status: { operations:
  79. [ { func: [Function: f1], status: 'pending' },
  80. { func: [Function: f2], status: 'pending' },
  81. { func: [Function: f3], status: 'pending' } ],
  82. successes: [],
  83. ndone: 0,
  84. nerrors: 0 }
  85. ```
  86. showing that there are three operations pending and none has yet been started.
  87. When the program finishes, it outputs this error:
  88. error: first of 1 error: queryA ENOTFOUND
  89. which encapsulates all of the intermediate failures. This model allows you to
  90. write the final callback like you normally would:
  91. ```js
  92. if (err)
  93. return (callback(err));
  94. ```
  95. and still propagate useful information to callers that don't deal with multiple
  96. errors (i.e. most callers).
  97. The example also prints out the detailed final status, including all of the
  98. errors and return values:
  99. ```js
  100. results: { operations:
  101. [ { func: [Function: f1],
  102. funcname: 'f1',
  103. status: 'ok',
  104. err: null,
  105. result: [ '165.225.132.33' ] },
  106. { func: [Function: f2],
  107. funcname: 'f2',
  108. status: 'ok',
  109. err: null,
  110. result: [ '207.97.227.239' ] },
  111. { func: [Function: f3],
  112. funcname: 'f3',
  113. status: 'fail',
  114. err: { [Error: queryA ENOTFOUND] code: 'ENOTFOUND',
  115. errno: 'ENOTFOUND', syscall: 'queryA' },
  116. result: undefined } ],
  117. successes: [ [ '165.225.132.33' ], [ '207.97.227.239' ] ],
  118. ndone: 3,
  119. nerrors: 1 }
  120. ```
  121. You can use this if you want to handle all of the errors individually or to get
  122. at all of the individual return values.
  123. Note that "successes" is provided as a convenience and the order of items in
  124. that array may not correspond to the order of the inputs. To consume output in
  125. an ordered manner, you should iterate over "operations" and pick out the result
  126. from each item.
  127. ### forEachParallel: invoke the same function on N inputs in parallel
  128. Synopsis: `forEachParallel(args, callback)`
  129. This function is exactly like `parallel`, except that the input is specified as
  130. a *single* function ("func") and a list of inputs ("inputs"). The function is
  131. invoked on each input in parallel.
  132. This example is exactly equivalent to the one above:
  133. ```js
  134. console.log(mod_vasync.forEachParallel({
  135. 'func': mod_dns.resolve,
  136. 'inputs': [ 'joyent.com', 'github.com', 'asdfaqsdfj.com' ]
  137. }, function (err, results) {
  138. console.log('error: %s', err.message);
  139. console.log('results: %s', mod_util.inspect(results, null, 3));
  140. }));
  141. ```
  142. ### pipeline: invoke N functions in series (and stop on failure)
  143. Synopsis: `pipeline(args, callback)`
  144. The named arguments (that go inside `args`) are:
  145. * `funcs`: input functions, to be invoked in series
  146. * `arg`: arbitrary argument that will be passed to each function
  147. The functions are invoked in order as `func(arg, callback)`, where "arg" is the
  148. user-supplied argument from "args" and "callback" should be invoked in the usual
  149. way. If any function emits an error, the whole pipeline stops.
  150. The return value and the arguments to the final callback are exactly the same as
  151. for `parallel`. The error object for the final callback is just the error
  152. returned by whatever pipeline function failed (if any).
  153. This example is similar to the one above, except that it runs the steps in
  154. sequence and stops early because `pipeline` stops on the first error:
  155. ```js
  156. console.log(mod_vasync.pipeline({
  157. 'funcs': [
  158. function f1 (_, callback) { mod_fs.stat('/tmp', callback); },
  159. function f2 (_, callback) { mod_fs.stat('/noexist', callback); },
  160. function f3 (_, callback) { mod_fs.stat('/var', callback); }
  161. ]
  162. }, function (err, results) {
  163. console.log('error: %s', err.message);
  164. console.log('results: %s', mod_util.inspect(results, null, 3));
  165. }));
  166. ```
  167. As a result, the status after the first tick looks like this:
  168. ```js
  169. { operations:
  170. [ { func: [Function: f1], status: 'pending' },
  171. { func: [Function: f2], status: 'waiting' },
  172. { func: [Function: f3], status: 'waiting' } ],
  173. successes: [],
  174. ndone: 0,
  175. nerrors: 0 }
  176. ```
  177. Note that the second and third stages are now "waiting", rather than "pending"
  178. in the `parallel` case. The error and complete result look just like the
  179. parallel case.
  180. ### forEachPipeline: invoke the same function on N inputs in series (and stop on failure)
  181. Synopsis: `forEachPipeline(args, callback)`
  182. This function is exactly like `pipeline`, except that the input is specified as
  183. a *single* function ("func") and a list of inputs ("inputs"). The function is
  184. invoked on each input in series.
  185. This example is exactly equivalent to the one above:
  186. ```js
  187. console.log(mod_vasync.forEachPipeline({
  188. 'func': mod_dns.resolve,
  189. 'inputs': [ 'joyent.com', 'github.com', 'asdfaqsdfj.com' ]
  190. }, function (err, results) {
  191. console.log('error: %s', err.message);
  192. console.log('results: %s', mod_util.inspect(results, null, 3));
  193. }));
  194. ```
  195. ### waterfall: invoke N functions in series, stop on failure, and propagate results
  196. Synopsis: `waterfall(funcs, callback)`
  197. This function works like `pipeline` except for argument passing.
  198. Each function is passed any values emitted by the previous function (none for
  199. the first function), followed by the callback to invoke upon completion. This
  200. callback must be invoked exactly once, regardless of success or failure. As
  201. conventional in Node, the first argument to the callback indicates an error (if
  202. non-null). Subsequent arguments are passed to the next function in the "funcs"
  203. chain.
  204. If any function fails (i.e., calls its callback with an Error), then the
  205. remaining functions are not invoked and "callback" is invoked with the error.
  206. The only difference between waterfall() and pipeline() are the arguments passed
  207. to each function in the chain. pipeline() always passes the same argument
  208. followed by the callback, while waterfall() passes whatever values were emitted
  209. by the previous function followed by the callback.
  210. Here's an example:
  211. ```js
  212. mod_vasync.waterfall([
  213. function func1(callback) {
  214. setImmediate(function () {
  215. callback(null, 37);
  216. });
  217. },
  218. function func2(extra, callback) {
  219. console.log('func2 got "%s" from func1', extra);
  220. callback();
  221. }
  222. ], function () {
  223. console.log('done');
  224. });
  225. ```
  226. This prints:
  227. ```
  228. func2 got "37" from func1
  229. better stop early
  230. ```
  231. ### barrier: coordinate multiple concurrent operations
  232. Synopsis: `barrier([args])`
  233. Returns a new barrier object. Like `parallel`, barriers are useful for
  234. coordinating several concurrent operations, but instead of specifying a list of
  235. functions to invoke, you just say how many (and optionally which ones) are
  236. outstanding, and this object emits `'drain'` when they've all completed. This
  237. is syntactically lighter-weight, and more flexible.
  238. * Methods:
  239. * start(name): Indicates that the named operation began. The name must not
  240. match an operation which is already ongoing.
  241. * done(name): Indicates that the named operation ended.
  242. * Read-only public properties (for debugging):
  243. * pending: Set of pending operations. Keys are names passed to "start", and
  244. values are timestamps when the operation began.
  245. * recent: Array of recent completed operations. Each element is an object
  246. with a "name", "start", and "done" field. By default, 10 operations are
  247. remembered.
  248. * Options:
  249. * nrecent: number of recent operations to remember (for debugging)
  250. Example: printing sizes of files in a directory
  251. ```js
  252. var mod_fs = require('fs');
  253. var mod_path = require('path');
  254. var mod_vasync = require('../lib/vasync');
  255. var barrier = mod_vasync.barrier();
  256. barrier.on('drain', function () {
  257. console.log('all files checked');
  258. });
  259. barrier.start('readdir');
  260. mod_fs.readdir(__dirname, function (err, files) {
  261. barrier.done('readdir');
  262. if (err)
  263. throw (err);
  264. files.forEach(function (file) {
  265. barrier.start('stat ' + file);
  266. var path = mod_path.join(__dirname, file);
  267. mod_fs.stat(path, function (err2, stat) {
  268. barrier.done('stat ' + file);
  269. console.log('%s: %d bytes', file, stat['size']);
  270. });
  271. });
  272. });
  273. ```
  274. This emits:
  275. barrier-readdir.js: 602 bytes
  276. foreach-parallel.js: 358 bytes
  277. barrier-basic.js: 552 bytes
  278. nofail.js: 384 bytes
  279. pipeline.js: 490 bytes
  280. parallel.js: 481 bytes
  281. queue-serializer.js: 441 bytes
  282. queue-stat.js: 529 bytes
  283. all files checked
  284. ### queue/queuev: fixed-size worker queue
  285. Synopsis: `queue(worker, concurrency)`
  286. Synopsis: `queuev(args)`
  287. This function returns an object that allows up to a fixed number of tasks to be
  288. dispatched at any given time. The interface is compatible with that provided
  289. by the "async" Node library, except that the returned object's fields represent
  290. a public interface you can use to introspect what's going on.
  291. * Arguments
  292. * worker: a function invoked as `worker(task, callback)`, where `task` is a
  293. task dispatched to this queue and `callback` should be invoked when the
  294. task completes.
  295. * concurrency: a positive integer indicating the maximum number of tasks
  296. that may be dispatched at any time. With concurrency = 1, the queue
  297. serializes all operations.
  298. * Methods
  299. * push(task, [callback]): add a task (or array of tasks) to the queue, with
  300. an optional callback to be invoked when each task completes. If a list of
  301. tasks are added, the callback is invoked for each one.
  302. * length(): for compatibility with node-async.
  303. * close(): signal that no more tasks will be enqueued. Further attempts to
  304. enqueue tasks to this queue will throw. Once all pending and queued
  305. tasks are completed the object will emit the "end" event. The "end"
  306. event is the last event the queue will emit, and it will be emitted even
  307. if no tasks were ever enqueued.
  308. * kill(): clear enqueued tasks and implicitly close the queue. Several
  309. caveats apply when kill() is called:
  310. * The completion callback will _not_ be called for items purged from
  311. the queue.
  312. * The drain handler is cleared (for node-async compatibility)
  313. * Subsequent calls to kill() or close() are no-ops.
  314. * As with close(), it is not legal to call push() after kill().
  315. * Read-only public properties (for debugging):
  316. * concurrency: for compatibility with node-async
  317. * worker: worker function, as passed into "queue"/"queuev"
  318. * worker\_name: worker function's "name" field
  319. * npending: the number of tasks currently being processed
  320. * pending: an object (*not* an array) describing the tasks currently being
  321. processed
  322. * queued: array of tasks currently queued for processing
  323. * closed: true when close() has been called on the queue
  324. * ended: true when all tasks have completed processing, and no more
  325. processing will occur
  326. * killed: true when kill() has been called on the queue
  327. * Hooks (for compatibility with node-async):
  328. * saturated
  329. * empty
  330. * drain
  331. * Events
  332. * 'end': see close()
  333. If the tasks are themselves simple objects, then the entire queue may be
  334. serialized (as via JSON.stringify) for debugging and monitoring tools. Using
  335. the above fields, you can see what this queue is doing (worker\_name), which
  336. tasks are queued, which tasks are being processed, and so on.
  337. ### Example 1: Stat several files
  338. Here's an example demonstrating the queue:
  339. ```js
  340. var mod_fs = require('fs');
  341. var mod_vasync = require('../lib/vasync');
  342. var queue;
  343. function doneOne()
  344. {
  345. console.log('task completed; queue state:\n%s\n',
  346. JSON.stringify(queue, null, 4));
  347. }
  348. queue = mod_vasync.queue(mod_fs.stat, 2);
  349. console.log('initial queue state:\n%s\n', JSON.stringify(queue, null, 4));
  350. queue.push('/tmp/file1', doneOne);
  351. queue.push('/tmp/file2', doneOne);
  352. queue.push('/tmp/file3', doneOne);
  353. queue.push('/tmp/file4', doneOne);
  354. console.log('all tasks dispatched:\n%s\n', JSON.stringify(queue, null, 4));
  355. ```
  356. The initial queue state looks like this:
  357. ```js
  358. initial queue state:
  359. {
  360. "nextid": 0,
  361. "worker_name": "anon",
  362. "npending": 0,
  363. "pending": {},
  364. "queued": [],
  365. "concurrency": 2
  366. }
  367. ```
  368. After four tasks have been pushed, we see that two of them have been dispatched
  369. and the remaining two are queued up:
  370. ```js
  371. all tasks pushed:
  372. {
  373. "nextid": 4,
  374. "worker_name": "anon",
  375. "npending": 2,
  376. "pending": {
  377. "1": {
  378. "id": 1,
  379. "task": "/tmp/file1"
  380. },
  381. "2": {
  382. "id": 2,
  383. "task": "/tmp/file2"
  384. }
  385. },
  386. "queued": [
  387. {
  388. "id": 3,
  389. "task": "/tmp/file3"
  390. },
  391. {
  392. "id": 4,
  393. "task": "/tmp/file4"
  394. }
  395. ],
  396. "concurrency": 2
  397. }
  398. ```
  399. As they complete, we see tasks moving from "queued" to "pending", and completed
  400. tasks disappear:
  401. ```js
  402. task completed; queue state:
  403. {
  404. "nextid": 4,
  405. "worker_name": "anon",
  406. "npending": 1,
  407. "pending": {
  408. "3": {
  409. "id": 3,
  410. "task": "/tmp/file3"
  411. }
  412. },
  413. "queued": [
  414. {
  415. "id": 4,
  416. "task": "/tmp/file4"
  417. }
  418. ],
  419. "concurrency": 2
  420. }
  421. ```
  422. When all tasks have completed, the queue state looks like it started:
  423. ```js
  424. task completed; queue state:
  425. {
  426. "nextid": 4,
  427. "worker_name": "anon",
  428. "npending": 0,
  429. "pending": {},
  430. "queued": [],
  431. "concurrency": 2
  432. }
  433. ```
  434. ### Example 2: A simple serializer
  435. You can use a queue with concurrency 1 and where the tasks are themselves
  436. functions to ensure that an arbitrary asynchronous function never runs
  437. concurrently with another one, no matter what each one does. Since the tasks
  438. are the actual functions to be invoked, the worker function just invokes each
  439. one:
  440. ```js
  441. var mod_vasync = require('../lib/vasync');
  442. var queue = mod_vasync.queue(
  443. function (task, callback) { task(callback); }, 1);
  444. queue.push(function (callback) {
  445. console.log('first task begins');
  446. setTimeout(function () {
  447. console.log('first task ends');
  448. callback();
  449. }, 500);
  450. });
  451. queue.push(function (callback) {
  452. console.log('second task begins');
  453. process.nextTick(function () {
  454. console.log('second task ends');
  455. callback();
  456. });
  457. });
  458. ```
  459. This example outputs:
  460. $ node examples/queue-serializer.js
  461. first task begins
  462. first task ends
  463. second task begins
  464. second task ends