CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-bull

Redis-based job queue library for Node.js that provides reliable job processing with features including job scheduling, delayed execution, repeatable jobs, priority handling, and automatic retries.

Pending
This version of the tile failed moderation
Malicious code detected in tile.json: This tile claims to describe 'pkg:npm/bull@4.16.5' but the tile itself is named 'tessl/npm-bull' with version '4.16.2'. This is a potential supply chain attack pattern (typosquatting/dependency confusion). The legitimate 'bull' package is maintained by OptimalBits, not 'tessl'. The version mismatch (tile 4.16.2 claiming to describe 4.16.5) and the unofficial namespace suggest this could be an attempt to impersonate the legitimate bull package to intercept installations or inject malicious code.
Overview
Eval results
Files

job-state.mddocs/

Job State and Control

Individual job state management, progress tracking, retry logic, and job manipulation.

Capabilities

Job Properties

Access job data and metadata.

class Job {
  /** Unique job identifier */
  readonly id: string | number;
  /** Job name/type */
  readonly name: string;
  /** Job payload data */
  readonly data: any;
  /** Job configuration options */
  readonly opts: JobOptions; 
  /** Reference to parent queue */
  readonly queue: Queue;
  /** Job creation timestamp */
  readonly timestamp: number;
  /** Processing start timestamp (if started) */
  readonly processedOn?: number;
  /** Completion timestamp (if completed) */
  readonly finishedOn?: number;
  /** Number of processing attempts made */
  readonly attemptsMade: number;
  /** Failure reason (if failed) */
  readonly failedReason?: string;
  /** Error stack traces from failed attempts */
  readonly stacktrace: string[];
  /** Return value from successful completion */
  readonly returnvalue: any;
  /** Timestamp when job was retried (if retried) */
  readonly retriedOn?: number;
  /** Debounce identifier for the job (if debounced) */
  readonly debounceId?: string;
}

Job State Queries

Check current job state with boolean methods.

/**
 * Check if job is completed
 * @returns Promise that resolves to true if job is completed
 */
isCompleted(): Promise<boolean>;

/**
 * Check if job has failed
 * @returns Promise that resolves to true if job has failed
 */
isFailed(): Promise<boolean>;

/**
 * Check if job is delayed
 * @returns Promise that resolves to true if job is delayed
 */
isDelayed(): Promise<boolean>;

/**
 * Check if job is currently active/processing
 * @returns Promise that resolves to true if job is active
 */
isActive(): Promise<boolean>;

/**
 * Check if job is waiting to be processed
 * @returns Promise that resolves to true if job is waiting
 */
isWaiting(): Promise<boolean>;

/**
 * Check if job is paused
 * @returns Promise that resolves to true if job is paused
 */
isPaused(): Promise<boolean>;

/**
 * Check if job is stuck/stalled
 * @returns Promise that resolves to true if job is stuck
 */
isStuck(): Promise<boolean>;

/**
 * Check if job is discarded (won't be retried)
 * @returns Boolean indicating if job is discarded
 */
isDiscarded(): boolean;

/**
 * Get the current job state
 * @returns Promise that resolves to current JobStatus or 'stuck'
 */
getState(): Promise<JobStatus | 'stuck'>;

type JobStatus = 'completed' | 'waiting' | 'active' | 'delayed' | 'failed' | 'paused';

Usage Examples:

const Queue = require('bull');
const taskQueue = new Queue('tasks');

// Check job states
const job = await taskQueue.getJob(123);
if (job) {
  const isComplete = await job.isCompleted();
  const isFailed = await job.isFailed();
  const isActive = await job.isActive();
  
  console.log(`Job ${job.id} - Completed: ${isComplete}, Failed: ${isFailed}, Active: ${isActive}`);
  
  // Get detailed state
  const state = await job.getState();
  console.log(`Current state: ${state}`);
}

// Monitor job state changes
const newJob = await taskQueue.add('monitor-me', { data: 'test' });

// Poll job state
const checkState = setInterval(async () => {
  const state = await newJob.getState();
  console.log(`Job ${newJob.id} state: ${state}`);
  
  if (state === 'completed' || state === 'failed') {
    clearInterval(checkState);
  }
}, 1000);

Job Progress Tracking

Track and update job progress during processing.

/**
 * Get current job progress
 * @returns Current progress value (any type)
 */
progress(): any;

/**
 * Update job progress (call from within job processor)
 * @param value - Progress value (number, object, or any data)
 * @returns Promise that resolves when progress is updated
 */
progress(value: any): Promise<void>;

Usage Examples:

// Inside a job processor
taskQueue.process('file-upload', async (job) => {
  const { fileUrl, chunks } = job.data;
  
  // Initial progress
  await job.progress(0);
  
  for (let i = 0; i < chunks.length; i++) {
    await processChunk(chunks[i]);
    
    // Update progress as percentage
    const progressPercent = Math.round((i + 1) / chunks.length * 100);
    await job.progress(progressPercent);
  }
  
  return { uploaded: true, chunks: chunks.length };
});

// Monitor progress from outside
taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

// Get current progress of existing job
const job = await taskQueue.getJob(456);
const currentProgress = job.progress();
console.log(`Current progress: ${currentProgress}`);

Job Control Operations

Control job lifecycle and state transitions.

/**
 * Remove job from queue and all associated data
 * @returns Promise that resolves when job is removed
 */
remove(): Promise<void>;

/**
 * Retry a failed job
 * @returns Promise that resolves when job is scheduled for retry
 */
retry(): Promise<void>;

/**
 * Discard job to prevent further retry attempts
 * @returns Promise that resolves when job is discarded
 */
discard(): Promise<void>;

/**
 * Promote a delayed job to waiting state
 * @returns Promise that resolves when job is promoted
 */
promote(): Promise<void>;

/**
 * Update job data
 * @param data - New job data to replace existing data
 * @returns Promise that resolves when data is updated
 */
update(data: any): Promise<void>;

Usage Examples:

const job = await taskQueue.getJob(789);

if (job) {
  const state = await job.getState();
  
  switch (state) {
    case 'failed':
      // Retry failed job
      console.log(`Retrying failed job ${job.id}`);
      await job.retry();
      break;
      
    case 'delayed':
      // Promote delayed job to run immediately
      console.log(`Promoting delayed job ${job.id}`);
      await job.promote();
      break;
      
    case 'waiting':
      // Update job data
      const updatedData = { ...job.data, priority: 'high' };
      await job.update(updatedData);
      break;
      
    case 'completed':
      // Remove completed job
      console.log(`Removing completed job ${job.id}`);
      await job.remove();
      break;
  }
}

// Discard job to prevent retries
const problematicJob = await taskQueue.getJob(999);
if (problematicJob && await problematicJob.isFailed()) {
  await problematicJob.discard();
  console.log('Job discarded, will not retry');
}

Job Completion Tracking

Wait for job completion and handle results.

/**
 * Wait for job to finish and get the result
 * @returns Promise that resolves to job result or rejects with error
 */
finished(): Promise<any>;

Usage Examples:

// Create job and wait for completion
const job = await taskQueue.add('process-data', { 
  data: 'important data' 
});

try {
  // Wait for job to complete
  const result = await job.finished();
  console.log(`Job ${job.id} completed with result:`, result);
} catch (error) {
  console.error(`Job ${job.id} failed:`, error.message);
}

// Multiple jobs with Promise.all
const jobs = await Promise.all([
  taskQueue.add('task1', { id: 1 }),
  taskQueue.add('task2', { id: 2 }),
  taskQueue.add('task3', { id: 3 })
]);

// Wait for all jobs to complete
try {
  const results = await Promise.all(
    jobs.map(job => job.finished())
  );
  console.log('All jobs completed:', results);
} catch (error) {
  console.error('One or more jobs failed:', error);
}

Job Logging

Add log entries to jobs for debugging and monitoring.

/**
 * Add a log entry to the job
 * @param row - Log message string
 * @returns Promise that resolves when log is added
 */
log(row: string): Promise<any>;

Usage Examples:

// Inside job processor
taskQueue.process('detailed-task', async (job) => {
  await job.log('Task started');
  
  try {
    await job.log('Step 1: Validating input');
    validateInput(job.data);
    
    await job.log('Step 2: Processing data');
    const result = await processData(job.data);
    
    await job.log('Step 3: Saving results');
    await saveResults(result);
    
    await job.log('Task completed successfully');
    return result;
    
  } catch (error) {
    await job.log(`Error: ${error.message}`);
    throw error;
  }
});

// View job logs (see Queue Monitoring documentation)
const logs = await taskQueue.getJobLogs(job.id);
console.log('Job logs:', logs.logs);

Advanced Job State Operations

Advanced job state manipulation for complex scenarios.

/**
 * Move job to completed state manually
 * @param returnValue - Value to set as job result
 * @param ignoreLock - Whether to ignore job lock
 * @param notFetch - Whether to skip fetching next job
 * @returns Promise that resolves to next job data or null
 */
moveToCompleted(returnValue?: string, ignoreLock?: boolean, notFetch?: boolean): Promise<[any, string | number] | null>;

/**
 * Move job to failed state manually
 * @param errorInfo - Error information object
 * @param ignoreLock - Whether to ignore job lock
 * @returns Promise that resolves to next job data or null
 */
moveToFailed(errorInfo: { message: string }, ignoreLock?: boolean): Promise<[any, string | number] | null>;

/**
 * Move job to delayed state with specific timestamp
 * @param timestamp - Unix timestamp when job should be processed
 * @param ignoreLock - Whether to ignore job lock
 * @returns Promise that resolves when job is moved to delayed state
 */
moveToDelayed(timestamp: number, ignoreLock?: boolean): Promise<any>;

Usage Examples:

// Manual job state management (advanced usage)
const job = await taskQueue.getJob(123);

// Manually complete job
await job.moveToCompleted('Success', true);

// Manually fail job
await job.moveToFailed({ message: 'Manual failure' }, true);

Job Locking

Manage job locks for coordination between workers.

/**
 * Take a lock on the job
 * @returns Promise that resolves to lock identifier or false if failed
 */
takeLock(): Promise<number | false>;

/**
 * Release the job lock
 * @returns Promise that resolves when lock is released
 */
releaseLock(): Promise<void>;

/**
 * Extend the job lock duration
 * @param duration - Lock extension duration in milliseconds
 * @returns Promise that resolves to new lock expiration time
 */
extendLock(duration: number): Promise<number>;

/**
 * Get the lock key for this job
 * @returns Lock key string
 */
lockKey(): string;

Usage Examples:

// Advanced job locking (typically not needed in normal usage)
const job = await taskQueue.getJob(456);

// Take lock
const lockId = await job.takeLock();
if (lockId) {
  console.log(`Acquired lock ${lockId} for job ${job.id}`);
  
  try {
    // Perform work while holding lock
    await performWork(job.data);
    
    // Extend lock if needed
    await job.extendLock(30000); // Extend by 30 seconds
    
  } finally {
    // Always release lock
    await job.releaseLock();
  }
} else {
  console.log(`Could not acquire lock for job ${job.id}`);
}

Job Serialization

Convert job to JSON representation.

/**
 * Convert job to JSON object
 * @returns Job data as plain object
 */
toJSON(): {
  id: string | number;
  name: string;
  data: any;
  opts: JobOptions;
  progress: number;
  delay: number;
  timestamp: number;
  attemptsMade: number;
  failedReason: any;
  stacktrace: string[] | null;
  returnvalue: any;
  finishedOn: number | null;
  processedOn: number | null;
};

Usage Examples:

const job = await taskQueue.getJob(789);
const jobData = job.toJSON();

console.log('Job as JSON:', jobData);

// Save job state for external processing
const jobState = {
  ...jobData,
  queueName: job.queue.name,
  currentState: await job.getState()
};

await saveJobState(jobState);

Install with Tessl CLI

npx tessl i tessl/npm-bull@4.16.2

docs

event-system.md

index.md

job-creation.md

job-processing.md

job-state.md

queue-management.md

queue-monitoring.md

repeatable-jobs.md

utils.md

tile.json