Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

vasync.js 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. /*
  2. * vasync.js: utilities for observable asynchronous control flow
  3. */
  4. var mod_assert = require('assert');
  5. var mod_events = require('events');
  6. var mod_util = require('util');
  7. var mod_verror = require('verror');
  8. /*
  9. * Public interface
  10. */
  11. exports.parallel = parallel;
  12. exports.forEachParallel = forEachParallel;
  13. exports.pipeline = pipeline;
  14. exports.forEachPipeline = forEachPipeline;
  15. exports.queue = queue;
  16. exports.queuev = queuev;
  17. exports.barrier = barrier;
  18. exports.waterfall = waterfall;
  19. if (!global.setImmediate) {
  20. global.setImmediate = function (func) {
  21. var args = Array.prototype.slice.call(arguments, 1);
  22. args.unshift(0);
  23. args.unshift(func);
  24. setTimeout.apply(this, args);
  25. };
  26. }
  27. /*
  28. * This is incorporated here from jsprim because jsprim ends up pulling in a lot
  29. * of dependencies. If we end up needing more from jsprim, though, we should
  30. * add it back and rip out this function.
  31. */
  32. function isEmpty(obj)
  33. {
  34. var key;
  35. for (key in obj)
  36. return (false);
  37. return (true);
  38. }
  39. /*
  40. * Given a set of functions that complete asynchronously using the standard
  41. * callback(err, result) pattern, invoke them all and merge the results. See
  42. * README.md for details.
  43. */
  44. function parallel(args, callback)
  45. {
  46. var funcs, rv, doneOne, i;
  47. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  48. mod_assert.ok(Array.isArray(args['funcs']),
  49. '"args.funcs" must be specified and must be an array');
  50. mod_assert.equal(typeof (callback), 'function',
  51. 'callback argument must be specified and must be a function');
  52. funcs = args['funcs'].slice(0);
  53. rv = {
  54. 'operations': new Array(funcs.length),
  55. 'successes': [],
  56. 'ndone': 0,
  57. 'nerrors': 0
  58. };
  59. if (funcs.length === 0) {
  60. setImmediate(function () { callback(null, rv); });
  61. return (rv);
  62. }
  63. doneOne = function (entry) {
  64. return (function (err, result) {
  65. mod_assert.equal(entry['status'], 'pending');
  66. entry['err'] = err;
  67. entry['result'] = result;
  68. entry['status'] = err ? 'fail' : 'ok';
  69. if (err)
  70. rv['nerrors']++;
  71. else
  72. rv['successes'].push(result);
  73. if (++rv['ndone'] < funcs.length)
  74. return;
  75. var errors = rv['operations'].filter(function (ent) {
  76. return (ent['status'] == 'fail');
  77. }).map(function (ent) { return (ent['err']); });
  78. if (errors.length > 0)
  79. callback(new mod_verror.MultiError(errors), rv);
  80. else
  81. callback(null, rv);
  82. });
  83. };
  84. for (i = 0; i < funcs.length; i++) {
  85. rv['operations'][i] = {
  86. 'func': funcs[i],
  87. 'funcname': funcs[i].name || '(anon)',
  88. 'status': 'pending'
  89. };
  90. funcs[i](doneOne(rv['operations'][i]));
  91. }
  92. return (rv);
  93. }
  94. /*
  95. * Exactly like parallel, except that the input is specified as a single
  96. * function to invoke on N different inputs (rather than N functions). "args"
  97. * must have the following fields:
  98. *
  99. * func asynchronous function to invoke on each input value
  100. *
  101. * inputs array of input values
  102. */
  103. function forEachParallel(args, callback)
  104. {
  105. var func, funcs;
  106. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  107. mod_assert.equal(typeof (args['func']), 'function',
  108. '"args.func" must be specified and must be a function');
  109. mod_assert.ok(Array.isArray(args['inputs']),
  110. '"args.inputs" must be specified and must be an array');
  111. func = args['func'];
  112. funcs = args['inputs'].map(function (input) {
  113. return (function (subcallback) {
  114. return (func(input, subcallback));
  115. });
  116. });
  117. return (parallel({ 'funcs': funcs }, callback));
  118. }
  119. /*
  120. * Like parallel, but invokes functions in sequence rather than in parallel
  121. * and aborts if any function exits with failure. Arguments include:
  122. *
  123. * funcs invoke the functions in parallel
  124. *
  125. * arg first argument to each pipeline function
  126. */
  127. function pipeline(args, callback)
  128. {
  129. var funcs, uarg, rv, next;
  130. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  131. mod_assert.ok(Array.isArray(args['funcs']),
  132. '"args.funcs" must be specified and must be an array');
  133. funcs = args['funcs'].slice(0);
  134. uarg = args['arg'];
  135. rv = {
  136. 'operations': funcs.map(function (func) {
  137. return ({
  138. 'func': func,
  139. 'funcname': func.name || '(anon)',
  140. 'status': 'waiting'
  141. });
  142. }),
  143. 'successes': [],
  144. 'ndone': 0,
  145. 'nerrors': 0
  146. };
  147. if (funcs.length === 0) {
  148. setImmediate(function () { callback(null, rv); });
  149. return (rv);
  150. }
  151. next = function (err, result) {
  152. if (rv['nerrors'] > 0 ||
  153. rv['ndone'] >= rv['operations'].length) {
  154. throw new mod_verror.VError('pipeline callback ' +
  155. 'invoked after the pipeline has already ' +
  156. 'completed (%j)', rv);
  157. }
  158. var entry = rv['operations'][rv['ndone']++];
  159. mod_assert.equal(entry['status'], 'pending');
  160. entry['status'] = err ? 'fail' : 'ok';
  161. entry['err'] = err;
  162. entry['result'] = result;
  163. if (err)
  164. rv['nerrors']++;
  165. else
  166. rv['successes'].push(result);
  167. if (err || rv['ndone'] == funcs.length) {
  168. callback(err, rv);
  169. } else {
  170. var nextent = rv['operations'][rv['ndone']];
  171. nextent['status'] = 'pending';
  172. /*
  173. * We invoke the next function on the next tick so that
  174. * the caller (stage N) need not worry about the case
  175. * that the next stage (stage N + 1) runs in its own
  176. * context.
  177. */
  178. setImmediate(function () {
  179. nextent['func'](uarg, next);
  180. });
  181. }
  182. };
  183. rv['operations'][0]['status'] = 'pending';
  184. funcs[0](uarg, next);
  185. return (rv);
  186. }
  187. /*
  188. * Exactly like pipeline, except that the input is specified as a single
  189. * function to invoke on N different inputs (rather than N functions). "args"
  190. * must have the following fields:
  191. *
  192. * func asynchronous function to invoke on each input value
  193. *
  194. * inputs array of input values
  195. */
  196. function forEachPipeline(args, callback) {
  197. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  198. mod_assert.equal(typeof (args['func']), 'function',
  199. '"args.func" must be specified and must be a function');
  200. mod_assert.ok(Array.isArray(args['inputs']),
  201. '"args.inputs" must be specified and must be an array');
  202. mod_assert.equal(typeof (callback), 'function',
  203. 'callback argument must be specified and must be a function');
  204. var func = args['func'];
  205. var funcs = args['inputs'].map(function (input) {
  206. return (function (_, subcallback) {
  207. return (func(input, subcallback));
  208. });
  209. });
  210. return (pipeline({'funcs': funcs}, callback));
  211. }
  212. /*
  213. * async-compatible "queue" function.
  214. */
  215. function queue(worker, concurrency)
  216. {
  217. return (new WorkQueue({
  218. 'worker': worker,
  219. 'concurrency': concurrency
  220. }));
  221. }
  222. function queuev(args)
  223. {
  224. return (new WorkQueue(args));
  225. }
  226. function WorkQueue(args)
  227. {
  228. mod_assert.ok(args.hasOwnProperty('worker'));
  229. mod_assert.equal(typeof (args['worker']), 'function');
  230. mod_assert.ok(args.hasOwnProperty('concurrency'));
  231. mod_assert.equal(typeof (args['concurrency']), 'number');
  232. mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
  233. mod_assert.ok(args['concurrency'] > 0);
  234. mod_events.EventEmitter.call(this);
  235. this.nextid = 0;
  236. this.worker = args['worker'];
  237. this.worker_name = args['worker'].name || 'anon';
  238. this.npending = 0;
  239. this.pending = {};
  240. this.queued = [];
  241. this.closed = false;
  242. this.ended = false;
  243. /* user-settable fields inherited from "async" interface */
  244. this.concurrency = args['concurrency'];
  245. this.saturated = undefined;
  246. this.empty = undefined;
  247. this.drain = undefined;
  248. }
  249. mod_util.inherits(WorkQueue, mod_events.EventEmitter);
  250. WorkQueue.prototype.push = function (tasks, callback)
  251. {
  252. if (!Array.isArray(tasks))
  253. return (this.pushOne(tasks, callback));
  254. var wq = this;
  255. return (tasks.map(function (task) {
  256. return (wq.pushOne(task, callback));
  257. }));
  258. };
  259. WorkQueue.prototype.updateConcurrency = function (concurrency)
  260. {
  261. if (this.closed)
  262. throw new mod_verror.VError(
  263. 'update concurrency invoked after queue closed');
  264. this.concurrency = concurrency;
  265. this.dispatchNext();
  266. };
  267. WorkQueue.prototype.close = function ()
  268. {
  269. var wq = this;
  270. if (wq.closed)
  271. return;
  272. wq.closed = true;
  273. /*
  274. * If the queue is already empty, just fire the "end" event on the
  275. * next tick.
  276. */
  277. if (wq.npending === 0 && wq.queued.length === 0) {
  278. setImmediate(function () {
  279. if (!wq.ended) {
  280. wq.ended = true;
  281. wq.emit('end');
  282. }
  283. });
  284. }
  285. };
  286. /* private */
  287. WorkQueue.prototype.pushOne = function (task, callback)
  288. {
  289. if (this.closed)
  290. throw new mod_verror.VError('push invoked after queue closed');
  291. var id = ++this.nextid;
  292. var entry = { 'id': id, 'task': task, 'callback': callback };
  293. this.queued.push(entry);
  294. this.dispatchNext();
  295. return (id);
  296. };
  297. /* private */
  298. WorkQueue.prototype.dispatchNext = function ()
  299. {
  300. var wq = this;
  301. if (wq.npending === 0 && wq.queued.length === 0) {
  302. if (wq.drain)
  303. wq.drain();
  304. wq.emit('drain');
  305. /*
  306. * The queue is closed; emit the final "end"
  307. * event before we come to rest:
  308. */
  309. if (wq.closed) {
  310. wq.ended = true;
  311. wq.emit('end');
  312. }
  313. } else if (wq.queued.length > 0) {
  314. while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
  315. var next = wq.queued.shift();
  316. wq.dispatch(next);
  317. if (wq.queued.length === 0) {
  318. if (wq.empty)
  319. wq.empty();
  320. wq.emit('empty');
  321. }
  322. }
  323. }
  324. };
  325. WorkQueue.prototype.dispatch = function (entry)
  326. {
  327. var wq = this;
  328. mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
  329. mod_assert.ok(this.npending < this.concurrency);
  330. mod_assert.ok(!this.ended);
  331. this.npending++;
  332. this.pending[entry['id']] = entry;
  333. if (this.npending === this.concurrency) {
  334. if (this.saturated)
  335. this.saturated();
  336. this.emit('saturated');
  337. }
  338. /*
  339. * We invoke the worker function on the next tick so that callers can
  340. * always assume that the callback is NOT invoked during the call to
  341. * push() even if the queue is not at capacity. It also avoids O(n)
  342. * stack usage when used with synchronous worker functions.
  343. */
  344. setImmediate(function () {
  345. wq.worker(entry['task'], function (err) {
  346. --wq.npending;
  347. delete (wq.pending[entry['id']]);
  348. if (entry['callback'])
  349. entry['callback'].apply(null, arguments);
  350. wq.dispatchNext();
  351. });
  352. });
  353. };
  354. WorkQueue.prototype.length = function ()
  355. {
  356. return (this.queued.length);
  357. };
  358. WorkQueue.prototype.kill = function ()
  359. {
  360. this.killed = true;
  361. this.queued = [];
  362. this.drain = undefined;
  363. this.close();
  364. };
  365. /*
  366. * Barriers coordinate multiple concurrent operations.
  367. */
  368. function barrier(args)
  369. {
  370. return (new Barrier(args));
  371. }
  372. function Barrier(args)
  373. {
  374. mod_assert.ok(!args || !args['nrecent'] ||
  375. typeof (args['nrecent']) == 'number',
  376. '"nrecent" must have type "number"');
  377. mod_events.EventEmitter.call(this);
  378. var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
  379. if (nrecent > 0) {
  380. this.nrecent = nrecent;
  381. this.recent = [];
  382. }
  383. this.pending = {};
  384. this.scheduled = false;
  385. }
  386. mod_util.inherits(Barrier, mod_events.EventEmitter);
  387. Barrier.prototype.start = function (name)
  388. {
  389. mod_assert.ok(!this.pending.hasOwnProperty(name),
  390. 'operation "' + name + '" is already pending');
  391. this.pending[name] = Date.now();
  392. };
  393. Barrier.prototype.done = function (name)
  394. {
  395. mod_assert.ok(this.pending.hasOwnProperty(name),
  396. 'operation "' + name + '" is not pending');
  397. if (this.recent) {
  398. this.recent.push({
  399. 'name': name,
  400. 'start': this.pending[name],
  401. 'done': Date.now()
  402. });
  403. if (this.recent.length > this.nrecent)
  404. this.recent.shift();
  405. }
  406. delete (this.pending[name]);
  407. /*
  408. * If we executed at least one operation and we're now empty, we should
  409. * emit "drain". But most code doesn't deal well with events being
  410. * processed while they're executing, so we actually schedule this event
  411. * for the next tick.
  412. *
  413. * We use the "scheduled" flag to avoid emitting multiple "drain" events
  414. * on consecutive ticks if the user starts and ends another task during
  415. * this tick.
  416. */
  417. if (!isEmpty(this.pending) || this.scheduled)
  418. return;
  419. this.scheduled = true;
  420. var self = this;
  421. setImmediate(function () {
  422. self.scheduled = false;
  423. /*
  424. * It's also possible that the user has started another task on
  425. * the previous tick, in which case we really shouldn't emit
  426. * "drain".
  427. */
  428. if (isEmpty(self.pending))
  429. self.emit('drain');
  430. });
  431. };
  432. /*
  433. * waterfall([ funcs ], callback): invoke each of the asynchronous functions
  434. * "funcs" in series. Each function is passed any values emitted by the
  435. * previous function (none for the first function), followed by the callback to
  436. * invoke upon completion. This callback must be invoked exactly once,
  437. * regardless of success or failure. As conventional in Node, the first
  438. * argument to the callback indicates an error (if non-null). Subsequent
  439. * arguments are passed to the next function in the "funcs" chain.
  440. *
  441. * If any function fails (i.e., calls its callback with an Error), then the
  442. * remaining functions are not invoked and "callback" is invoked with the error.
  443. *
  444. * The only difference between waterfall() and pipeline() are the arguments
  445. * passed to each function in the chain. pipeline() always passes the same
  446. * argument followed by the callback, while waterfall() passes whatever values
  447. * were emitted by the previous function followed by the callback.
  448. */
  449. function waterfall(funcs, callback)
  450. {
  451. var rv, current, next;
  452. mod_assert.ok(Array.isArray(funcs));
  453. mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function');
  454. funcs = funcs.slice(0);
  455. rv = {
  456. 'operations': funcs.map(function (func) {
  457. return ({
  458. 'func': func,
  459. 'funcname': func.name || '(anon)',
  460. 'status': 'waiting'
  461. });
  462. }),
  463. 'successes': [],
  464. 'ndone': 0,
  465. 'nerrors': 0
  466. };
  467. if (funcs.length === 0) {
  468. if (callback)
  469. setImmediate(function () { callback(null, rv); });
  470. return (rv);
  471. }
  472. next = function (idx, err) {
  473. var args, entry, nextentry;
  474. if (err === undefined)
  475. err = null;
  476. if (idx != current) {
  477. throw (new mod_verror.VError(
  478. 'vasync.waterfall: function %d ("%s") invoked ' +
  479. 'its callback twice', idx,
  480. rv['operations'][idx].funcname));
  481. }
  482. mod_assert.equal(idx, rv['ndone']);
  483. entry = rv['operations'][rv['ndone']++];
  484. args = Array.prototype.slice.call(arguments, 2);
  485. mod_assert.equal(entry['status'], 'pending');
  486. entry['status'] = err ? 'fail' : 'ok';
  487. entry['err'] = err;
  488. entry['results'] = args;
  489. if (err)
  490. rv['nerrors']++;
  491. else
  492. rv['successes'].push(args);
  493. if (err || rv['ndone'] == funcs.length) {
  494. if (callback) {
  495. args.unshift(err);
  496. callback.apply(null, args);
  497. }
  498. } else {
  499. nextentry = rv['operations'][rv['ndone']];
  500. nextentry['status'] = 'pending';
  501. current++;
  502. args.push(next.bind(null, current));
  503. setImmediate(function () {
  504. nextentry['func'].apply(null, args);
  505. });
  506. }
  507. };
  508. rv['operations'][0]['status'] = 'pending';
  509. current = 0;
  510. funcs[0](next.bind(null, current));
  511. return (rv);
  512. }