A fast, efficient Node.js Worker Thread Pool implementation
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.
Function type for implementing custom load balancing strategies.
/**
* Load balancer function type
* @param task - Task to be distributed
* @param workers - Available workers to choose from
* @returns Selected worker or null if none suitable
*/
type PiscinaLoadBalancer = (
task: PiscinaTask,
workers: PiscinaWorker[]
) => PiscinaWorker | null;Built-in load balancer that distributes tasks to the least busy available workers.
/**
* Creates a least-busy load balancer
* @param opts - Balancer configuration options
* @returns Load balancer function
*/
function LeastBusyBalancer(
opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer;
/**
* Configuration options for LeastBusyBalancer
*/
interface LeastBusyBalancerOptions {
/** Maximum concurrent tasks per worker before considering overloaded */
maximumUsage: number;
}Usage Examples:
import { Piscina, LeastBusyBalancer } from "piscina";
// Default least-busy balancer (used automatically)
const pool = new Piscina({
filename: "worker.js",
maxThreads: 4,
concurrentTasksPerWorker: 2
// LeastBusyBalancer is used by default
});
// Explicit least-busy balancer with custom settings
const customBalancedPool = new Piscina({
filename: "worker.js",
maxThreads: 8,
concurrentTasksPerWorker: 3,
loadBalancer: LeastBusyBalancer({ maximumUsage: 3 })
});
// High-concurrency configuration
const highConcurrencyPool = new Piscina({
filename: "worker.js",
maxThreads: 16,
concurrentTasksPerWorker: 4,
loadBalancer: LeastBusyBalancer({ maximumUsage: 4 })
});Interface representing workers available for load balancing decisions.
/**
* Worker interface for load balancing
*/
interface PiscinaWorker {
/** Unique worker thread ID */
readonly id: number;
/** Current number of tasks being processed */
readonly currentUsage: number;
/** Whether worker is running an abortable task */
readonly isRunningAbortableTask: boolean;
/** Performance histogram for this worker (if enabled) */
readonly histogram: PiscinaHistogramSummary | null;
/** Whether worker is in termination process */
readonly terminating: boolean;
/** Whether worker has been destroyed */
readonly destroyed: boolean;
}You can implement custom load balancing strategies for specialized requirements.
Usage Examples:
import { Piscina, PiscinaLoadBalancer, PiscinaTask, PiscinaWorker } from "piscina";
// Round-robin load balancer
let roundRobinIndex = 0;
const roundRobinBalancer: PiscinaLoadBalancer = (task, workers) => {
const availableWorkers = workers.filter(w =>
!w.terminating &&
!w.destroyed &&
w.currentUsage < 2
);
if (availableWorkers.length === 0) return null;
const worker = availableWorkers[roundRobinIndex % availableWorkers.length];
roundRobinIndex++;
return worker;
};
// Priority-based load balancer
const priorityBalancer: PiscinaLoadBalancer = (task, workers) => {
// High priority tasks get dedicated workers
const taskPriority = (task as any).priority || 0;
if (taskPriority > 5) {
// Find completely idle worker for high priority
const idleWorker = workers.find(w =>
w.currentUsage === 0 && !w.terminating && !w.destroyed
);
if (idleWorker) return idleWorker;
}
// Fall back to least busy
let bestWorker: PiscinaWorker | null = null;
let minUsage = Infinity;
for (const worker of workers) {
if (worker.terminating || worker.destroyed) continue;
if (worker.isRunningAbortableTask && !task.isAbortable) continue;
if (worker.currentUsage < minUsage) {
minUsage = worker.currentUsage;
bestWorker = worker;
}
}
return bestWorker;
};
// Performance-based load balancer
const performanceBalancer: PiscinaLoadBalancer = (task, workers) => {
const availableWorkers = workers.filter(w =>
!w.terminating &&
!w.destroyed &&
w.currentUsage < 3
);
if (availableWorkers.length === 0) return null;
// Prefer workers with better performance history
return availableWorkers.sort((a, b) => {
const aAvg = a.histogram?.average || 0;
const bAvg = b.histogram?.average || 0;
return aAvg - bAvg; // Lower average runtime is better
})[0];
};
// Use custom balancers
const roundRobinPool = new Piscina({
filename: "worker.js",
loadBalancer: roundRobinBalancer
});
const priorityPool = new Piscina({
filename: "worker.js",
loadBalancer: priorityBalancer,
workerHistogram: true // Enable for performance tracking
});
const performancePool = new Piscina({
filename: "worker.js",
loadBalancer: performanceBalancer,
workerHistogram: true
});The default LeastBusyBalancer implements the following algorithm:
currentUsage === 0maximumUsage setting to prevent overloadingcurrentUsage// Equivalent implementation of LeastBusyBalancer logic
const leastBusyLogic: PiscinaLoadBalancer = (task, workers) => {
let candidate: PiscinaWorker | null = null;
let checkpoint = maximumUsage;
for (const worker of workers) {
// Skip terminating or destroyed workers
if (worker.terminating || worker.destroyed) continue;
// Prioritize completely idle workers
if (worker.currentUsage === 0) {
candidate = worker;
break;
}
// Don't assign non-abortable tasks to workers running abortable tasks
if (worker.isRunningAbortableTask) continue;
// Find least busy worker under threshold
if (!task.isAbortable && worker.currentUsage < checkpoint) {
candidate = worker;
checkpoint = worker.currentUsage;
}
}
return candidate;
};When implementing custom load balancers:
terminating and destroyed workersisRunningAbortableTask and task.isAbortablenull to trigger worker creationAdvanced Custom Balancer:
// Locality-aware load balancer (conceptual)
const localityAwareBalancer: PiscinaLoadBalancer = (task, workers) => {
const availableWorkers = workers.filter(w =>
!w.terminating && !w.destroyed
);
if (availableWorkers.length === 0) return null;
// Group workers by some locality criterion
const taskLocality = (task as any).locality || 'default';
const localWorkers = availableWorkers.filter(w =>
(w as any).locality === taskLocality
);
const candidatePool = localWorkers.length > 0 ? localWorkers : availableWorkers;
// Apply least-busy within locality group
return candidatePool.reduce((best, current) => {
if (!best) return current;
return current.currentUsage < best.currentUsage ? current : best;
}, null as PiscinaWorker | null);
};
// Weighted load balancer
const weightedBalancer: PiscinaLoadBalancer = (task, workers) => {
const availableWorkers = workers.filter(w =>
!w.terminating && !w.destroyed && w.currentUsage < 5
);
if (availableWorkers.length === 0) return null;
// Calculate weighted scores (lower is better)
const scores = availableWorkers.map(worker => ({
worker,
score: worker.currentUsage * 1.0 +
(worker.histogram?.average || 0) * 0.1 +
(worker.isRunningAbortableTask ? 0.5 : 0)
}));
// Return worker with lowest score
return scores.reduce((best, current) =>
current.score < best.score ? current : best
).worker;
};Track load balancing effectiveness using worker metrics:
import { Piscina } from "piscina";
const pool = new Piscina({
filename: "worker.js",
maxThreads: 4,
workerHistogram: true
});
// Monitor worker distribution
setInterval(() => {
console.log('\nWorker Load Distribution:');
pool.threads.forEach((thread, index) => {
const workerInfo = pool.threads[index];
console.log(`Worker ${index}: ${workerInfo.currentUsage} tasks`);
});
console.log(`Queue size: ${pool.queueSize}`);
console.log(`Pool utilization: ${(pool.utilization * 100).toFixed(2)}%`);
}, 5000);
// Track worker creation/destruction
pool.on('workerCreate', (worker) => {
console.log(`Created worker ${worker.id}, total: ${pool.threads.length}`);
});
pool.on('workerDestroy', (worker) => {
console.log(`Destroyed worker ${worker.id}, total: ${pool.threads.length}`);
});Install with Tessl CLI
npx tessl i tessl/npm-piscina