or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mderror-handling.mdevent-monitoring.mdflow-orchestration.mdindex.mdjob-lifecycle.mdjob-processing.mdjob-scheduling.mdqueue-management.md
tile.json

flow-orchestration.mddocs/

Flow Orchestration

Complex workflow management with job dependencies across multiple queues. The FlowProducer class enables sophisticated pipeline architectures with conditional execution and dependency management.

Capabilities

FlowProducer Class

Manages complex job flows with dependencies between jobs across multiple queues.

/**
 * FlowProducer manages complex job workflows with dependencies
 */
class FlowProducer {
  constructor(opts?: FlowProducerOptions);
  
  /** Add a flow with job dependencies */
  add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>;
  
  /** Add multiple flows in bulk */
  addBulk(flows: FlowJob[], opts?: FlowOpts): Promise<JobNode[]>;
  
  /** Get flow tree structure */
  getFlow(opts: { id: string; queueName: string }): Promise<FlowNode>;
  
  /** Close flow producer and connections */
  close(): Promise<void>;
}

Flow Job Definition

/**
 * Defines a job within a flow with its dependencies
 */
interface FlowJob {
  /** Job name/type */
  name: string;
  
  /** Target queue name */
  queueName: string;
  
  /** Job data payload */
  data?: any;
  
  /** Job options */
  opts?: JobsOptions;
  
  /** Child job definitions (dependencies) */
  children?: FlowJob[];
}

Flow Options

interface FlowProducerOptions {
  /** Redis connection options */
  connection?: ConnectionOptions;
  
  /** Key prefix for Redis keys */
  prefix?: string;
}

interface FlowOpts {
  /** Parent job options for flow root */
  parent?: ParentOptions;
  
  /** Queue options override */
  queuesOptions?: Record<string, QueueOptions>;
}

Usage Examples:

import { FlowProducer } from "bullmq";

// Create flow producer
const flowProducer = new FlowProducer({
  connection: {
    host: "localhost",
    port: 6379,
  },
});

// Simple linear flow
const linearFlow = {
  name: "process-order",
  queueName: "orders",
  data: { orderId: "order-123" },
  children: [
    {
      name: "validate-payment",
      queueName: "payments",
      data: { orderId: "order-123" },
      children: [
        {
          name: "send-confirmation",
          queueName: "notifications",
          data: { orderId: "order-123", type: "confirmation" },
        },
      ],
    },
  ],
};

const jobNode = await flowProducer.add(linearFlow);
console.log("Flow created with root job:", jobNode.job.id);

// Complex branching flow
const complexFlow = {
  name: "process-user-signup",
  queueName: "users",
  data: { userId: "user-456", email: "user@example.com" },
  children: [
    // Parallel branches
    {
      name: "send-welcome-email",
      queueName: "emails",
      data: { userId: "user-456", template: "welcome" },
    },
    {
      name: "create-user-profile",
      queueName: "profiles",
      data: { userId: "user-456" },
      children: [
        {
          name: "generate-avatar",
          queueName: "images",
          data: { userId: "user-456" },
        },
        {
          name: "setup-preferences",
          queueName: "preferences",
          data: { userId: "user-456", defaults: true },
        },
      ],
    },
    {
      name: "add-to-mailing-list",
      queueName: "marketing",
      data: { email: "user@example.com", list: "new-users" },
    },
  ],
};

await flowProducer.add(complexFlow);

Flow Node Structure

/**
 * Represents a node in the flow tree
 */
interface JobNode {
  job: Job;
  children?: JobNode[];
}

/**
 * Complete flow tree structure
 */
interface FlowNode {
  job: Job;
  children: FlowNode[];
  opts: any;
}

Advanced Flow Patterns

// E-commerce order processing flow
const ecommerceFlow = {
  name: "process-order",
  queueName: "orders",
  data: { 
    orderId: "ORD-001",
    items: [{ id: "ITEM-1", quantity: 2 }],
    userId: "USER-123",
  },
  opts: {
    attempts: 3,
    backoff: "exponential",
  },
  children: [
    // Inventory check (must complete first)
    {
      name: "check-inventory",
      queueName: "inventory",
      data: { orderId: "ORD-001" },
      children: [
        // Payment processing (after inventory confirmed)
        {
          name: "process-payment",
          queueName: "payments",
          data: { orderId: "ORD-001" },
          opts: {
            attempts: 5,
            backoff: { type: "exponential", delay: 2000 },
          },
          children: [
            // Parallel fulfillment tasks (after payment)
            {
              name: "create-shipment",
              queueName: "shipping",
              data: { orderId: "ORD-001" },
              children: [
                {
                  name: "print-label",
                  queueName: "printing",
                  data: { orderId: "ORD-001" },
                },
                {
                  name: "schedule-pickup",
                  queueName: "logistics",
                  data: { orderId: "ORD-001" },
                },
              ],
            },
            {
              name: "send-confirmation",
              queueName: "notifications",
              data: { 
                orderId: "ORD-001",
                userId: "USER-123",
                type: "order-confirmed",
              },
            },
            {
              name: "update-analytics",
              queueName: "analytics",
              data: { 
                event: "order-completed",
                orderId: "ORD-001",
              },
            },
          ],
        },
      ],
    },
  ],
};

await flowProducer.add(ecommerceFlow);

Bulk Flow Operations

// Create multiple flows in a single operation
const userOnboardingFlows = [
  {
    name: "onboard-user",
    queueName: "onboarding",
    data: { userId: "user-001" },
    children: [
      {
        name: "send-welcome-series",
        queueName: "emails",
        data: { userId: "user-001", series: "welcome" },
      },
    ],
  },
  {
    name: "onboard-user", 
    queueName: "onboarding",
    data: { userId: "user-002" },
    children: [
      {
        name: "send-welcome-series",
        queueName: "emails", 
        data: { userId: "user-002", series: "welcome" },
      },
    ],
  },
];

const jobNodes = await flowProducer.addBulk(userOnboardingFlows);
console.log(`Created ${jobNodes.length} flows`);

Flow Monitoring

// Get flow structure
const flow = await flowProducer.getFlow({
  id: "root-job-id",
  queueName: "orders",
});

// Traverse flow tree
function printFlowTree(node: FlowNode, depth = 0) {
  const indent = "  ".repeat(depth);
  console.log(`${indent}${node.job.name} (${node.job.id}) - ${await node.job.getState()}`);
  
  for (const child of node.children) {
    printFlowTree(child, depth + 1);
  }
}

await printFlowTree(flow);

Flow Error Handling

// Flow with error handling strategies
const robustFlow = {
  name: "data-pipeline",
  queueName: "pipeline",
  data: { datasetId: "DS-001" },
  opts: {
    attempts: 3,
    backoff: "exponential",
  },
  children: [
    {
      name: "extract-data",
      queueName: "extraction",
      data: { datasetId: "DS-001" },
      opts: {
        attempts: 5, // Data extraction might be flaky
        backoff: { type: "fixed", delay: 30000 },
      },
      children: [
        {
          name: "transform-data",
          queueName: "transformation",
          data: { datasetId: "DS-001" },
          opts: {
            // Fail parent if transformation fails
            parent: {
              failParentOnFailure: true,
            },
          },
          children: [
            {
              name: "load-to-warehouse",
              queueName: "loading",
              data: { datasetId: "DS-001" },
            },
            {
              name: "generate-report",
              queueName: "reporting",
              data: { datasetId: "DS-001" },
              opts: {
                // Continue even if reporting fails
                ignoreDependencyOnFailure: true,
              },
            },
          ],
        },
      ],
    },
  ],
};

await flowProducer.add(robustFlow);

Flow Workers

Workers process jobs in flows just like regular jobs, but can access parent/child relationships:

// Worker for parent jobs
const orderWorker = new Worker("orders", async (job) => {
  console.log(`Processing order ${job.data.orderId}`);
  
  // Process the order
  const orderResult = await processOrder(job.data);
  
  // Children will only run if this job completes successfully
  return orderResult;
});

// Worker for child jobs
const paymentWorker = new Worker("payments", async (job) => {
  console.log(`Processing payment for order ${job.data.orderId}`);
  
  // Access to parent job context if needed
  if (job.opts.parent) {
    console.log(`Parent job: ${job.opts.parent.id}`);
  }
  
  const paymentResult = await processPayment(job.data);
  
  return paymentResult;
});

// Start all workers
await Promise.all([
  orderWorker.run(),
  paymentWorker.run(),
]);

Flow Lifecycle Management

// Close flow producer when done
await flowProducer.close();

// Flow producers emit events
flowProducer.on("error", (error) => {
  console.error("Flow producer error:", error);
});

Parent-Child Job Options

interface ParentJobOptions {
  /** Fail parent job when child fails */
  failParentOnFailure?: boolean;
  
  /** Continue parent processing despite child failure */
  continueParentOnFailure?: boolean;
  
  /** Ignore failed dependencies */
  ignoreDependencyOnFailure?: boolean;
  
  /** Remove dependency when it fails */
  removeDependencyOnFailure?: boolean;
}

Flow Use Cases

  1. ETL Pipelines: Extract → Transform → Load with error handling
  2. Order Processing: Inventory → Payment → Fulfillment → Notification
  3. Content Publishing: Write → Review → Approve → Publish → Notify
  4. User Onboarding: Create Account → Send Welcome → Setup Profile → Add to Lists
  5. Data Processing: Collect → Validate → Process → Store → Index
  6. CI/CD Pipelines: Build → Test → Deploy → Verify → Notify