Ohm-Management - Projektarbeit B-ME
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_concurrency.js 2.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /* vim: set ts=8 sts=8 sw=8 noet: */
  2. var mod_tap = require('tap');
  3. var mod_vasync = require('..');
  4. function
  5. latched_worker(task, cb)
  6. {
  7. if (task.immediate) {
  8. cb();
  9. } else {
  10. task.latched = true;
  11. task.unlatch = function () {
  12. task.latched = false;
  13. cb();
  14. };
  15. }
  16. }
  17. function
  18. unlatchAll(tasks)
  19. {
  20. tasks.forEach(function (t) {
  21. if (t.latched) {
  22. t.unlatch();
  23. }
  24. });
  25. }
  26. function
  27. setAllImmediate(tasks)
  28. {
  29. tasks.forEach(function (t) {
  30. t.immediate = true;
  31. });
  32. }
  33. mod_tap.test('test serial tasks', function (test) {
  34. test.plan(2);
  35. var q = mod_vasync.queuev({
  36. worker: latched_worker,
  37. concurrency: 1
  38. });
  39. test.ok(q);
  40. var tasks = [];
  41. for (var i = 0; i < 2; ++i) {
  42. tasks.push({
  43. 'id': i,
  44. 'latched': false,
  45. 'immediate': false
  46. });
  47. }
  48. setTimeout(function () {
  49. var latched = 0;
  50. tasks.forEach(function (t) {
  51. if (t.latched) {
  52. ++latched;
  53. }
  54. });
  55. test.ok(latched === 1);
  56. unlatchAll(tasks);
  57. setAllImmediate(tasks);
  58. }, 10);
  59. q.on('drain', function () {
  60. q.close();
  61. });
  62. q.on('end', function () {
  63. test.end();
  64. });
  65. q.push(tasks);
  66. });
  67. mod_tap.test('test parallel tasks', function (test) {
  68. test.plan(2);
  69. var q = mod_vasync.queuev({
  70. worker: latched_worker,
  71. concurrency: 2
  72. });
  73. test.ok(q);
  74. var tasks = [];
  75. for (var i = 0; i < 3; ++i) {
  76. tasks.push({
  77. 'id': i,
  78. 'latched': false,
  79. 'immediate': false
  80. });
  81. }
  82. setTimeout(function () {
  83. var latched = 0;
  84. tasks.forEach(function (t) {
  85. if (t.latched) {
  86. ++latched;
  87. }
  88. });
  89. test.ok(latched === 2);
  90. unlatchAll(tasks);
  91. setAllImmediate(tasks);
  92. }, 10);
  93. q.on('drain', function () {
  94. q.close();
  95. });
  96. q.on('end', function () {
  97. test.end();
  98. });
  99. q.push(tasks);
  100. });
  101. mod_tap.test('test ratchet up and down', function (test) {
  102. test.plan(8);
  103. var q = mod_vasync.queuev({
  104. worker: latched_worker,
  105. concurrency: 2
  106. });
  107. test.ok(q);
  108. var bounced = 0;
  109. var tasks = [];
  110. for (var i = 0; i < 21; ++i) {
  111. tasks.push({
  112. 'id': i,
  113. 'latched': false,
  114. 'immediate': false
  115. });
  116. }
  117. function count() {
  118. var latched = 0;
  119. tasks.forEach(function (t) {
  120. if (t.latched) {
  121. ++latched;
  122. }
  123. });
  124. return (latched);
  125. }
  126. function fiveLatch() {
  127. if (!q.closed) {
  128. ++bounced;
  129. test.ok(count() === 5);
  130. q.updateConcurrency(2);
  131. unlatchAll(tasks);
  132. setTimeout(twoLatch, 10);
  133. }
  134. }
  135. function twoLatch() {
  136. if (!q.closed) {
  137. ++bounced;
  138. test.ok(count() === 2);
  139. q.updateConcurrency(5);
  140. unlatchAll(tasks);
  141. setTimeout(fiveLatch, 10);
  142. }
  143. }
  144. setTimeout(twoLatch, 10);
  145. q.on('drain', function () {
  146. q.close();
  147. });
  148. q.on('end', function () {
  149. // 21 tasks === 5 * 3 + 2 * 3 === 6 bounces
  150. test.ok(bounced === 6);
  151. test.end();
  152. });
  153. q.push(tasks);
  154. });