om/node_modules/vasync/tests/queue_concurrency.js

182 lines
2.6 KiB
JavaScript

/* vim: set ts=8 sts=8 sw=8 noet: */
var mod_tap = require('tap');
var mod_vasync = require('..');
function
latched_worker(task, cb)
{
if (task.immediate) {
cb();
} else {
task.latched = true;
task.unlatch = function () {
task.latched = false;
cb();
};
}
}
function
unlatchAll(tasks)
{
tasks.forEach(function (t) {
if (t.latched) {
t.unlatch();
}
});
}
function
setAllImmediate(tasks)
{
tasks.forEach(function (t) {
t.immediate = true;
});
}
mod_tap.test('test serial tasks', function (test) {
test.plan(2);
var q = mod_vasync.queuev({
worker: latched_worker,
concurrency: 1
});
test.ok(q);
var tasks = [];
for (var i = 0; i < 2; ++i) {
tasks.push({
'id': i,
'latched': false,
'immediate': false
});
}
setTimeout(function () {
var latched = 0;
tasks.forEach(function (t) {
if (t.latched) {
++latched;
}
});
test.ok(latched === 1);
unlatchAll(tasks);
setAllImmediate(tasks);
}, 10);
q.on('drain', function () {
q.close();
});
q.on('end', function () {
test.end();
});
q.push(tasks);
});
mod_tap.test('test parallel tasks', function (test) {
test.plan(2);
var q = mod_vasync.queuev({
worker: latched_worker,
concurrency: 2
});
test.ok(q);
var tasks = [];
for (var i = 0; i < 3; ++i) {
tasks.push({
'id': i,
'latched': false,
'immediate': false
});
}
setTimeout(function () {
var latched = 0;
tasks.forEach(function (t) {
if (t.latched) {
++latched;
}
});
test.ok(latched === 2);
unlatchAll(tasks);
setAllImmediate(tasks);
}, 10);
q.on('drain', function () {
q.close();
});
q.on('end', function () {
test.end();
});
q.push(tasks);
});
mod_tap.test('test ratchet up and down', function (test) {
test.plan(8);
var q = mod_vasync.queuev({
worker: latched_worker,
concurrency: 2
});
test.ok(q);
var bounced = 0;
var tasks = [];
for (var i = 0; i < 21; ++i) {
tasks.push({
'id': i,
'latched': false,
'immediate': false
});
}
function count() {
var latched = 0;
tasks.forEach(function (t) {
if (t.latched) {
++latched;
}
});
return (latched);
}
function fiveLatch() {
if (!q.closed) {
++bounced;
test.ok(count() === 5);
q.updateConcurrency(2);
unlatchAll(tasks);
setTimeout(twoLatch, 10);
}
}
function twoLatch() {
if (!q.closed) {
++bounced;
test.ok(count() === 2);
q.updateConcurrency(5);
unlatchAll(tasks);
setTimeout(fiveLatch, 10);
}
}
setTimeout(twoLatch, 10);
q.on('drain', function () {
q.close();
});
q.on('end', function () {
// 21 tasks === 5 * 3 + 2 * 3 === 6 bounces
test.ok(bounced === 6);
test.end();
});
q.push(tasks);
});