Complex workflow management with job dependencies across multiple queues. The FlowProducer class enables sophisticated pipeline architectures with conditional execution and dependency management.
Manages complex job flows with dependencies between jobs across multiple queues.
/**
* FlowProducer manages complex job workflows with dependencies
*/
class FlowProducer {
constructor(opts?: FlowProducerOptions);
/** Add a flow with job dependencies */
add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>;
/** Add multiple flows in bulk */
addBulk(flows: FlowJob[], opts?: FlowOpts): Promise<JobNode[]>;
/** Get flow tree structure */
getFlow(opts: { id: string; queueName: string }): Promise<FlowNode>;
/** Close flow producer and connections */
close(): Promise<void>;
}/**
* Defines a job within a flow with its dependencies
*/
interface FlowJob {
/** Job name/type */
name: string;
/** Target queue name */
queueName: string;
/** Job data payload */
data?: any;
/** Job options */
opts?: JobsOptions;
/** Child job definitions (dependencies) */
children?: FlowJob[];
}interface FlowProducerOptions {
/** Redis connection options */
connection?: ConnectionOptions;
/** Key prefix for Redis keys */
prefix?: string;
}
interface FlowOpts {
/** Parent job options for flow root */
parent?: ParentOptions;
/** Queue options override */
queuesOptions?: Record<string, QueueOptions>;
}Usage Examples:
import { FlowProducer } from "bullmq";
// Create flow producer
const flowProducer = new FlowProducer({
connection: {
host: "localhost",
port: 6379,
},
});
// Simple linear flow
const linearFlow = {
name: "process-order",
queueName: "orders",
data: { orderId: "order-123" },
children: [
{
name: "validate-payment",
queueName: "payments",
data: { orderId: "order-123" },
children: [
{
name: "send-confirmation",
queueName: "notifications",
data: { orderId: "order-123", type: "confirmation" },
},
],
},
],
};
const jobNode = await flowProducer.add(linearFlow);
console.log("Flow created with root job:", jobNode.job.id);
// Complex branching flow
const complexFlow = {
name: "process-user-signup",
queueName: "users",
data: { userId: "user-456", email: "user@example.com" },
children: [
// Parallel branches
{
name: "send-welcome-email",
queueName: "emails",
data: { userId: "user-456", template: "welcome" },
},
{
name: "create-user-profile",
queueName: "profiles",
data: { userId: "user-456" },
children: [
{
name: "generate-avatar",
queueName: "images",
data: { userId: "user-456" },
},
{
name: "setup-preferences",
queueName: "preferences",
data: { userId: "user-456", defaults: true },
},
],
},
{
name: "add-to-mailing-list",
queueName: "marketing",
data: { email: "user@example.com", list: "new-users" },
},
],
};
await flowProducer.add(complexFlow);/**
* Represents a node in the flow tree
*/
interface JobNode {
job: Job;
children?: JobNode[];
}
/**
* Complete flow tree structure
*/
interface FlowNode {
job: Job;
children: FlowNode[];
opts: any;
}// E-commerce order processing flow
const ecommerceFlow = {
name: "process-order",
queueName: "orders",
data: {
orderId: "ORD-001",
items: [{ id: "ITEM-1", quantity: 2 }],
userId: "USER-123",
},
opts: {
attempts: 3,
backoff: "exponential",
},
children: [
// Inventory check (must complete first)
{
name: "check-inventory",
queueName: "inventory",
data: { orderId: "ORD-001" },
children: [
// Payment processing (after inventory confirmed)
{
name: "process-payment",
queueName: "payments",
data: { orderId: "ORD-001" },
opts: {
attempts: 5,
backoff: { type: "exponential", delay: 2000 },
},
children: [
// Parallel fulfillment tasks (after payment)
{
name: "create-shipment",
queueName: "shipping",
data: { orderId: "ORD-001" },
children: [
{
name: "print-label",
queueName: "printing",
data: { orderId: "ORD-001" },
},
{
name: "schedule-pickup",
queueName: "logistics",
data: { orderId: "ORD-001" },
},
],
},
{
name: "send-confirmation",
queueName: "notifications",
data: {
orderId: "ORD-001",
userId: "USER-123",
type: "order-confirmed",
},
},
{
name: "update-analytics",
queueName: "analytics",
data: {
event: "order-completed",
orderId: "ORD-001",
},
},
],
},
],
},
],
};
await flowProducer.add(ecommerceFlow);// Create multiple flows in a single operation
const userOnboardingFlows = [
{
name: "onboard-user",
queueName: "onboarding",
data: { userId: "user-001" },
children: [
{
name: "send-welcome-series",
queueName: "emails",
data: { userId: "user-001", series: "welcome" },
},
],
},
{
name: "onboard-user",
queueName: "onboarding",
data: { userId: "user-002" },
children: [
{
name: "send-welcome-series",
queueName: "emails",
data: { userId: "user-002", series: "welcome" },
},
],
},
];
const jobNodes = await flowProducer.addBulk(userOnboardingFlows);
console.log(`Created ${jobNodes.length} flows`);// Get flow structure
const flow = await flowProducer.getFlow({
id: "root-job-id",
queueName: "orders",
});
// Traverse flow tree
function printFlowTree(node: FlowNode, depth = 0) {
const indent = " ".repeat(depth);
console.log(`${indent}${node.job.name} (${node.job.id}) - ${await node.job.getState()}`);
for (const child of node.children) {
printFlowTree(child, depth + 1);
}
}
await printFlowTree(flow);// Flow with error handling strategies
const robustFlow = {
name: "data-pipeline",
queueName: "pipeline",
data: { datasetId: "DS-001" },
opts: {
attempts: 3,
backoff: "exponential",
},
children: [
{
name: "extract-data",
queueName: "extraction",
data: { datasetId: "DS-001" },
opts: {
attempts: 5, // Data extraction might be flaky
backoff: { type: "fixed", delay: 30000 },
},
children: [
{
name: "transform-data",
queueName: "transformation",
data: { datasetId: "DS-001" },
opts: {
// Fail parent if transformation fails
parent: {
failParentOnFailure: true,
},
},
children: [
{
name: "load-to-warehouse",
queueName: "loading",
data: { datasetId: "DS-001" },
},
{
name: "generate-report",
queueName: "reporting",
data: { datasetId: "DS-001" },
opts: {
// Continue even if reporting fails
ignoreDependencyOnFailure: true,
},
},
],
},
],
},
],
};
await flowProducer.add(robustFlow);Workers process jobs in flows just like regular jobs, but can access parent/child relationships:
// Worker for parent jobs
const orderWorker = new Worker("orders", async (job) => {
console.log(`Processing order ${job.data.orderId}`);
// Process the order
const orderResult = await processOrder(job.data);
// Children will only run if this job completes successfully
return orderResult;
});
// Worker for child jobs
const paymentWorker = new Worker("payments", async (job) => {
console.log(`Processing payment for order ${job.data.orderId}`);
// Access to parent job context if needed
if (job.opts.parent) {
console.log(`Parent job: ${job.opts.parent.id}`);
}
const paymentResult = await processPayment(job.data);
return paymentResult;
});
// Start all workers
await Promise.all([
orderWorker.run(),
paymentWorker.run(),
]);// Close flow producer when done
await flowProducer.close();
// Flow producers emit events
flowProducer.on("error", (error) => {
console.error("Flow producer error:", error);
});interface ParentJobOptions {
/** Fail parent job when child fails */
failParentOnFailure?: boolean;
/** Continue parent processing despite child failure */
continueParentOnFailure?: boolean;
/** Ignore failed dependencies */
ignoreDependencyOnFailure?: boolean;
/** Remove dependency when it fails */
removeDependencyOnFailure?: boolean;
}