CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-piscina

A fast, efficient Node.js Worker Thread Pool implementation

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

load-balancing.mddocs/

Load Balancing

Intelligent task distribution across available workers with built-in least-busy balancer and support for custom balancing strategies.

Capabilities

PiscinaLoadBalancer Type

Function type for implementing custom load balancing strategies.

/**
 * Load balancer function type
 * @param task - Task to be distributed
 * @param workers - Available workers to choose from
 * @returns Selected worker or null if none suitable
 */
type PiscinaLoadBalancer = (
  task: PiscinaTask,
  workers: PiscinaWorker[]
) => PiscinaWorker | null;

LeastBusyBalancer

Built-in load balancer that distributes tasks to the least busy available workers.

/**
 * Creates a least-busy load balancer
 * @param opts - Balancer configuration options
 * @returns Load balancer function
 */
function LeastBusyBalancer(
  opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer;

/**
 * Configuration options for LeastBusyBalancer
 */
interface LeastBusyBalancerOptions {
  /** Maximum concurrent tasks per worker before considering overloaded */
  maximumUsage: number;
}

Usage Examples:

import { Piscina, LeastBusyBalancer } from "piscina";

// Default least-busy balancer (used automatically)
const pool = new Piscina({
  filename: "worker.js",
  maxThreads: 4,
  concurrentTasksPerWorker: 2
  // LeastBusyBalancer is used by default
});

// Explicit least-busy balancer with custom settings
const customBalancedPool = new Piscina({
  filename: "worker.js",
  maxThreads: 8,
  concurrentTasksPerWorker: 3,
  loadBalancer: LeastBusyBalancer({ maximumUsage: 3 })
});

// High-concurrency configuration
const highConcurrencyPool = new Piscina({
  filename: "worker.js",
  maxThreads: 16,
  concurrentTasksPerWorker: 4,
  loadBalancer: LeastBusyBalancer({ maximumUsage: 4 })
});

PiscinaWorker Interface

Interface representing workers available for load balancing decisions.

/**
 * Worker interface for load balancing
 */
interface PiscinaWorker {
  /** Unique worker thread ID */
  readonly id: number;
  
  /** Current number of tasks being processed */
  readonly currentUsage: number;
  
  /** Whether worker is running an abortable task */
  readonly isRunningAbortableTask: boolean;
  
  /** Performance histogram for this worker (if enabled) */
  readonly histogram: PiscinaHistogramSummary | null;
  
  /** Whether worker is in termination process */
  readonly terminating: boolean;
  
  /** Whether worker has been destroyed */
  readonly destroyed: boolean;
}

Custom Load Balancer Implementation

You can implement custom load balancing strategies for specialized requirements.

Usage Examples:

import { Piscina, PiscinaLoadBalancer, PiscinaTask, PiscinaWorker } from "piscina";

// Round-robin load balancer
let roundRobinIndex = 0;
const roundRobinBalancer: PiscinaLoadBalancer = (task, workers) => {
  const availableWorkers = workers.filter(w => 
    !w.terminating && 
    !w.destroyed && 
    w.currentUsage < 2
  );
  
  if (availableWorkers.length === 0) return null;
  
  const worker = availableWorkers[roundRobinIndex % availableWorkers.length];
  roundRobinIndex++;
  return worker;
};

// Priority-based load balancer
const priorityBalancer: PiscinaLoadBalancer = (task, workers) => {
  // High priority tasks get dedicated workers
  const taskPriority = (task as any).priority || 0;
  
  if (taskPriority > 5) {
    // Find completely idle worker for high priority
    const idleWorker = workers.find(w => 
      w.currentUsage === 0 && !w.terminating && !w.destroyed
    );
    if (idleWorker) return idleWorker;
  }
  
  // Fall back to least busy
  let bestWorker: PiscinaWorker | null = null;
  let minUsage = Infinity;
  
  for (const worker of workers) {
    if (worker.terminating || worker.destroyed) continue;
    if (worker.isRunningAbortableTask && !task.isAbortable) continue;
    
    if (worker.currentUsage < minUsage) {
      minUsage = worker.currentUsage;
      bestWorker = worker;
    }
  }
  
  return bestWorker;
};

// Performance-based load balancer
const performanceBalancer: PiscinaLoadBalancer = (task, workers) => {
  const availableWorkers = workers.filter(w => 
    !w.terminating && 
    !w.destroyed && 
    w.currentUsage < 3
  );
  
  if (availableWorkers.length === 0) return null;
  
  // Prefer workers with better performance history
  return availableWorkers.sort((a, b) => {
    const aAvg = a.histogram?.average || 0;
    const bAvg = b.histogram?.average || 0;
    return aAvg - bAvg; // Lower average runtime is better
  })[0];
};

// Use custom balancers
const roundRobinPool = new Piscina({
  filename: "worker.js",
  loadBalancer: roundRobinBalancer
});

const priorityPool = new Piscina({
  filename: "worker.js",
  loadBalancer: priorityBalancer,
  workerHistogram: true // Enable for performance tracking
});

const performancePool = new Piscina({
  filename: "worker.js",
  loadBalancer: performanceBalancer,
  workerHistogram: true
});

Load Balancing Strategies

Least Busy Strategy (Default)

The default LeastBusyBalancer implements the following algorithm:

  1. Idle Worker Priority: Always prefer workers with currentUsage === 0
  2. Abortable Task Handling: Avoid workers running abortable tasks for non-abortable tasks
  3. Usage Threshold: Respect maximumUsage setting to prevent overloading
  4. Least Loaded: Among available workers, choose the one with lowest currentUsage
// Equivalent implementation of LeastBusyBalancer logic
const leastBusyLogic: PiscinaLoadBalancer = (task, workers) => {
  let candidate: PiscinaWorker | null = null;
  let checkpoint = maximumUsage;
  
  for (const worker of workers) {
    // Skip terminating or destroyed workers
    if (worker.terminating || worker.destroyed) continue;
    
    // Prioritize completely idle workers
    if (worker.currentUsage === 0) {
      candidate = worker;
      break;
    }
    
    // Don't assign non-abortable tasks to workers running abortable tasks
    if (worker.isRunningAbortableTask) continue;
    
    // Find least busy worker under threshold
    if (!task.isAbortable && worker.currentUsage < checkpoint) {
      candidate = worker;
      checkpoint = worker.currentUsage;
    }
  }
  
  return candidate;
};

Custom Strategy Guidelines

When implementing custom load balancers:

  1. Always check worker status: Filter out terminating and destroyed workers
  2. Handle abortable tasks: Consider isRunningAbortableTask and task.isAbortable
  3. Respect capacity: Don't overload workers beyond reasonable limits
  4. Return null: If no suitable worker is found, return null to trigger worker creation
  5. Performance considerations: Keep balancer logic fast as it runs for every task

Advanced Custom Balancer:

// Locality-aware load balancer (conceptual)
const localityAwareBalancer: PiscinaLoadBalancer = (task, workers) => {
  const availableWorkers = workers.filter(w => 
    !w.terminating && !w.destroyed
  );
  
  if (availableWorkers.length === 0) return null;
  
  // Group workers by some locality criterion
  const taskLocality = (task as any).locality || 'default';
  const localWorkers = availableWorkers.filter(w => 
    (w as any).locality === taskLocality
  );
  
  const candidatePool = localWorkers.length > 0 ? localWorkers : availableWorkers;
  
  // Apply least-busy within locality group
  return candidatePool.reduce((best, current) => {
    if (!best) return current;
    return current.currentUsage < best.currentUsage ? current : best;
  }, null as PiscinaWorker | null);
};

// Weighted load balancer
const weightedBalancer: PiscinaLoadBalancer = (task, workers) => {
  const availableWorkers = workers.filter(w => 
    !w.terminating && !w.destroyed && w.currentUsage < 5
  );
  
  if (availableWorkers.length === 0) return null;
  
  // Calculate weighted scores (lower is better)
  const scores = availableWorkers.map(worker => ({
    worker,
    score: worker.currentUsage * 1.0 + 
           (worker.histogram?.average || 0) * 0.1 +
           (worker.isRunningAbortableTask ? 0.5 : 0)
  }));
  
  // Return worker with lowest score
  return scores.reduce((best, current) => 
    current.score < best.score ? current : best
  ).worker;
};

Monitoring Load Balancing

Track load balancing effectiveness using worker metrics:

import { Piscina } from "piscina";

const pool = new Piscina({
  filename: "worker.js",
  maxThreads: 4,
  workerHistogram: true
});

// Monitor worker distribution
setInterval(() => {
  console.log('\nWorker Load Distribution:');
  pool.threads.forEach((thread, index) => {
    const workerInfo = pool.threads[index];
    console.log(`Worker ${index}: ${workerInfo.currentUsage} tasks`);
  });
  
  console.log(`Queue size: ${pool.queueSize}`);
  console.log(`Pool utilization: ${(pool.utilization * 100).toFixed(2)}%`);
}, 5000);

// Track worker creation/destruction
pool.on('workerCreate', (worker) => {
  console.log(`Created worker ${worker.id}, total: ${pool.threads.length}`);
});

pool.on('workerDestroy', (worker) => {
  console.log(`Destroyed worker ${worker.id}, total: ${pool.threads.length}`);
});

Install with Tessl CLI

npx tessl i tessl/npm-piscina

docs

index.md

load-balancing.md

performance-monitoring.md

pool-management.md

task-cancellation.md

task-queues.md

transferable-objects.md

tile.json