Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.js 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. /* vim: set ts=8 sts=8 sw=8 noet: */
  2. var mod_tap = require('tap');
  3. var mod_vasync = require('..');
  4. function
  5. immediate_worker(task, next)
  6. {
  7. setImmediate(function () {
  8. next();
  9. });
  10. }
  11. function
  12. sametick_worker(task, next)
  13. {
  14. next();
  15. }
  16. function
  17. random_delay_worker(task, next)
  18. {
  19. setTimeout(function () {
  20. next();
  21. }, Math.floor(Math.random() * 250));
  22. }
  23. mod_tap.test('must not push after close', function (test) {
  24. test.plan(3);
  25. var q = mod_vasync.queuev({
  26. worker: immediate_worker,
  27. concurrency: 10
  28. });
  29. test.ok(q);
  30. test.doesNotThrow(function () {
  31. q.push({});
  32. }, 'push should not throw _before_ close()');
  33. q.close();
  34. /*
  35. * If we attempt to add tasks to the queue _after_ calling close(),
  36. * we should get an exception:
  37. */
  38. test.throws(function () {
  39. q.push({});
  40. }, 'push should throw _after_ close()');
  41. test.end();
  42. });
  43. mod_tap.test('get \'end\' event with close()', function (test) {
  44. var task_count = 45;
  45. var tasks_finished = 0;
  46. var seen_end = false;
  47. var seen_drain = false;
  48. test.plan(14 + task_count);
  49. var q = mod_vasync.queuev({
  50. worker: random_delay_worker,
  51. concurrency: 5
  52. });
  53. test.ok(q);
  54. /*
  55. * Enqueue a bunch of tasks; more than our concurrency:
  56. */
  57. for (var i = 0; i < 45; i++) {
  58. q.push({}, function () {
  59. tasks_finished++;
  60. test.ok(true);
  61. });
  62. }
  63. /*
  64. * Close the queue to signify that we're done now.
  65. */
  66. test.equal(q.ended, false);
  67. test.equal(q.closed, false);
  68. q.close();
  69. test.equal(q.closed, true);
  70. test.equal(q.ended, false);
  71. q.on('drain', function () {
  72. /*
  73. * 'drain' should fire before 'end':
  74. */
  75. test.notOk(seen_drain);
  76. test.notOk(seen_end);
  77. seen_drain = true;
  78. });
  79. q.on('end', function () {
  80. /*
  81. * 'end' should fire after 'drain':
  82. */
  83. test.ok(seen_drain);
  84. test.notOk(seen_end);
  85. seen_end = true;
  86. /*
  87. * Check the public state:
  88. */
  89. test.equal(q.closed, true);
  90. test.equal(q.ended, true);
  91. /*
  92. * We should have fired the callbacks for _all_ enqueued
  93. * tasks by now:
  94. */
  95. test.equal(task_count, tasks_finished);
  96. test.end();
  97. });
  98. /*
  99. * Check that we see neither the 'drain', nor the 'end' event before
  100. * the end of this tick:
  101. */
  102. test.notOk(seen_drain);
  103. test.notOk(seen_end);
  104. });
  105. mod_tap.test('get \'end\' event with close() and no tasks', function (test) {
  106. var seen_drain = false;
  107. var seen_end = false;
  108. test.plan(10);
  109. var q = mod_vasync.queuev({
  110. worker: immediate_worker,
  111. concurrency: 10
  112. });
  113. setImmediate(function () {
  114. test.notOk(seen_end);
  115. });
  116. test.equal(q.ended, false);
  117. test.equal(q.closed, false);
  118. q.close();
  119. test.equal(q.closed, true);
  120. test.equal(q.ended, false);
  121. test.notOk(seen_end);
  122. q.on('drain', function () {
  123. seen_drain = true;
  124. });
  125. q.on('end', function () {
  126. /*
  127. * We do not expect to see a 'drain' event, as there were no
  128. * tasks pushed onto the queue before we closed it.
  129. */
  130. test.notOk(seen_drain);
  131. test.notOk(seen_end);
  132. test.equal(q.closed, true);
  133. test.equal(q.ended, true);
  134. seen_end = true;
  135. test.end();
  136. });
  137. });
  138. /*
  139. * We want to ensure that both the 'drain' event and the q.drain() hook are
  140. * called the same number of times:
  141. */
  142. mod_tap.test('equivalence of on(\'drain\') and q.drain()', function (test) {
  143. var enqcount = 4;
  144. var drains = 4;
  145. var ee_count = 0;
  146. var fn_count = 0;
  147. test.plan(enqcount + drains + 3);
  148. var q = mod_vasync.queuev({
  149. worker: immediate_worker,
  150. concurrency: 10
  151. });
  152. var enq = function () {
  153. if (--enqcount < 0)
  154. return;
  155. q.push({}, function () {
  156. test.ok(true, 'task completion');
  157. });
  158. };
  159. var draino = function () {
  160. test.ok(true, 'drain called');
  161. if (--drains === 0) {
  162. test.equal(q.closed, false, 'not closed');
  163. test.equal(q.ended, false, 'not ended');
  164. test.equal(fn_count, ee_count, 'same number of calls');
  165. test.end();
  166. }
  167. };
  168. enq();
  169. enq();
  170. q.on('drain', function () {
  171. ee_count++;
  172. enq();
  173. draino();
  174. });
  175. q.drain = function () {
  176. fn_count++;
  177. enq();
  178. draino();
  179. };
  180. });
  181. /*
  182. * In the past, we've only handed on the _first_ argument to the task completion
  183. * callback. Make sure we hand on _all_ of the arguments now:
  184. */
  185. mod_tap.test('ensure all arguments passed to push() callback', function (test) {
  186. test.plan(13);
  187. var q = mod_vasync.queuev({
  188. worker: function (task, callback) {
  189. if (task.fail) {
  190. callback(new Error('guru meditation'));
  191. return;
  192. }
  193. callback(null, 1, 2, 3, 5, 8);
  194. },
  195. concurrency: 1
  196. });
  197. q.push({ fail: true }, function (err, a, b, c, d, e) {
  198. test.ok(err, 'got the error');
  199. test.equal(err.message, 'guru meditation');
  200. test.type(a, 'undefined');
  201. test.type(b, 'undefined');
  202. test.type(c, 'undefined');
  203. test.type(d, 'undefined');
  204. test.type(e, 'undefined');
  205. });
  206. q.push({ fail: false }, function (err, a, b, c, d, e) {
  207. test.notOk(err, 'got no error');
  208. test.equal(a, 1);
  209. test.equal(b, 2);
  210. test.equal(c, 3);
  211. test.equal(d, 5);
  212. test.equal(e, 8);
  213. });
  214. q.drain = function () {
  215. test.end();
  216. };
  217. });
  218. mod_tap.test('queue kill', function (test) {
  219. // Derived from async queue.kill test
  220. var count = 0;
  221. var q = mod_vasync.queuev({
  222. worker: function (task, callback) {
  223. setImmediate(function () {
  224. test.ok(++count < 2,
  225. 'Function should be called once');
  226. callback();
  227. });
  228. },
  229. concurrency: 1
  230. });
  231. q.drain = function () {
  232. test.ok(false, 'Function should never be called');
  233. };
  234. // Queue twice, the first will exec immediately
  235. q.push(0);
  236. q.push(0);
  237. q.kill();
  238. q.on('end', function () {
  239. test.ok(q.killed);
  240. test.end();
  241. });
  242. });