or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

index.mddocs/

Testcontainers RabbitMQ

The Testcontainers RabbitMQ module provides a simple way to create disposable RabbitMQ message broker instances running in Docker containers for integration testing. It handles container lifecycle management, port mapping, credential configuration, and provides convenient helper methods for connecting to the containerized RabbitMQ instance.

Package Information

  • Package Name: @testcontainers/rabbitmq
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @testcontainers/rabbitmq testcontainers
  • Peer Dependency: Requires testcontainers package

Core Imports

import { RabbitMQContainer, StartedRabbitMQContainer } from '@testcontainers/rabbitmq';

For CommonJS:

const { RabbitMQContainer, StartedRabbitMQContainer } = require('@testcontainers/rabbitmq');

Note: StartedRabbitMQContainer is typically returned by RabbitMQContainer.start() and rarely needs to be imported directly.

Basic Usage

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

// Start a RabbitMQ container
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();

// Get connection URL and connect
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();

// Use RabbitMQ for your tests
await channel.assertQueue('test-queue');
channel.sendToQueue('test-queue', Buffer.from('Hello World'));

// Cleanup
await connection.close();
await container.stop();

Using with async disposables (Node.js 20+):

await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Container automatically stopped when scope exits

API Reference

RabbitMQContainer Class

class RabbitMQContainer extends GenericContainer {
  /**
   * Create a new RabbitMQ container builder
   * @param image - Docker image name (e.g., 'rabbitmq:3.13-alpine', 'rabbitmq:3.13-management')
   */
  constructor(image: string);

  /**
   * Start the RabbitMQ container
   * @returns Promise resolving to a started container instance
   * @throws Error if container fails to start, image cannot be pulled, or ports are unavailable
   */
  start(): Promise<StartedRabbitMQContainer>;
}

Default Configuration:

  • Exposed ports: 5672 (AMQP), 5671 (AMQPS), 15672 (HTTP management), 15671 (HTTPS management)
  • Default credentials: username guest, password guest
  • Wait strategy: Waits for "Server startup complete" log message
  • Startup timeout: 30 seconds
  • Default image: Uses the image specified in constructor (no default image tag)

Important Notes:

  • The management UI is only available if using an image with -management tag (e.g., rabbitmq:3.13-management)
  • Standard images (e.g., rabbitmq:3.13-alpine) do not include the management plugin
  • Container waits for RabbitMQ to be fully ready before start() resolves

StartedRabbitMQContainer Class

class StartedRabbitMQContainer extends AbstractStartedContainer {
  /**
   * Get the AMQP connection URL
   * @returns Connection URL in format: amqp://guest:guest@{host}:{port}
   * @example "amqp://guest:guest@localhost:32768"
   */
  getAmqpUrl(): string;

  /**
   * Get the AMQPS (secure) connection URL
   * @returns Secure connection URL in format: amqps://guest:guest@{host}:{port}
   * @example "amqps://guest:guest@localhost:32769"
   * @note Requires SSL/TLS configuration in RabbitMQ container
   */
  getAmqpsUrl(): string;

  /**
   * Get the host where the container is running
   * @returns Host address (typically 'localhost' for Docker Desktop, actual hostname for remote Docker)
   */
  getHost(): string;

  /**
   * Get the mapped port for a container port
   * @param port - Internal container port number (5672 for AMQP, 15672 for management UI)
   * @returns Externally mapped port number (randomly assigned unless fixed)
   * @throws Error if port is not exposed or container is not started
   */
  getMappedPort(port: number): number;

  /**
   * Get the container ID
   * @returns Docker container ID (short format)
   */
  getId(): string;

  /**
   * Get the container name
   * @returns Docker container name (auto-generated or custom if set)
   */
  getName(): string;

  /**
   * Stop the container
   * @param options - Optional stop configuration
   * @param options.timeout - Stop timeout in milliseconds (default: 10 seconds)
   * @param options.removeVolumes - Whether to remove associated volumes
   * @returns Promise resolving to stopped container
   * @throws Error if container cannot be stopped
   */
  stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>;

  /**
   * Restart the container
   * @param options - Optional restart configuration
   * @param options.timeout - Restart timeout in milliseconds
   * @returns Promise resolving when container is restarted
   * @throws Error if container cannot be restarted
   */
  restart(options?: Partial<RestartOptions>): Promise<void>;

  /**
   * Execute a command inside the running container
   * @param command - Command string or array of command parts
   * @param opts - Optional execution configuration
   * @param opts.workingDir - Working directory for command execution
   * @param opts.user - User to run command as
   * @param opts.env - Environment variables for command
   * @returns Promise resolving to execution result with stdout, stderr, and exit code
   * @example await container.exec(['rabbitmqctl', 'status'])
   * @example await container.exec(['rabbitmqctl', 'list_queues'])
   */
  exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>;

  /**
   * Copy files to the container
   * @param filesToCopy - Array of file copy configurations
   * @returns Promise resolving when files are copied
   */
  copyFilesToContainer(filesToCopy: FileToCopy[]): Promise<void>;

  /**
   * Copy directories to the container
   * @param directoriesToCopy - Array of directory copy configurations
   * @returns Promise resolving when directories are copied
   */
  copyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): Promise<void>;

  /**
   * Copy content to the container
   * @param contentsToCopy - Array of content copy configurations
   * @returns Promise resolving when content is copied
   */
  copyContentToContainer(contentsToCopy: ContentToCopy[]): Promise<void>;

  /**
   * Copy an archive (tar stream) to the container
   * @param tar - Readable tar stream
   * @param target - Target path in container (default: "/")
   * @returns Promise resolving when archive is copied
   */
  copyArchiveToContainer(tar: Readable, target?: string): Promise<void>;

  /**
   * Copy an archive from the container
   * @param path - Path in container to copy from
   * @returns Promise resolving to readable stream of tar archive
   */
  copyArchiveFromContainer(path: string): Promise<NodeJS.ReadableStream>;

  /**
   * Commit the container to a new image
   * @param options - Commit configuration options
   * @returns Promise resolving to the new image ID
   */
  commit(options: CommitOptions): Promise<string>;

  /**
   * Get the container hostname
   * @returns Container hostname
   */
  getHostname(): string;

  /**
   * Get the first mapped port
   * @returns First exposed port number
   */
  getFirstMappedPort(): number;

  /**
   * Get container labels
   * @returns Object containing all container labels
   */
  getLabels(): Labels;

  /**
   * Get all network names the container is connected to
   * @returns Array of network names
   */
  getNetworkNames(): string[];

  /**
   * Get the network ID for a specific network name
   * @param networkName - Name of the network
   * @returns Network ID
   */
  getNetworkId(networkName: string): string;

  /**
   * Get the IP address of the container on a specific network
   * @param networkName - Name of the network
   * @returns IP address on the specified network
   */
  getIpAddress(networkName: string): string;

  /**
   * Get container logs as a readable stream
   * @param opts - Optional log retrieval options
   * @param opts.since - Unix timestamp to get logs since
   * @param opts.tail - Number of lines to retrieve from end
   * @returns Promise resolving to readable stream of logs
   */
  logs(opts?: { since?: number; tail?: number }): Promise<Readable>;

  /**
   * Async dispose method for automatic cleanup (Node.js 20+)
   * Enables usage with 'await using' syntax for automatic container cleanup
   * @returns Promise resolving when container is stopped
   */
  [Symbol.asyncDispose](): Promise<void>;
}

Container Configuration Methods

All methods return this for method chaining. Methods are inherited from GenericContainer base class.

interface RabbitMQContainer {
  /**
   * Set environment variables for the container
   * @param environment - Object with environment variable key-value pairs
   * @returns Container builder for chaining
   * @example .withEnvironment({ RABBITMQ_DEFAULT_USER: 'admin', RABBITMQ_DEFAULT_PASS: 'secret' })
   */
  withEnvironment(environment: { [key: string]: string }): this;

  /**
   * Configure exposed ports
   * @param ports - Port numbers to expose (defaults already include 5672, 5671, 15672, 15671)
   * @returns Container builder for chaining
   * @example .withExposedPorts(5672, 15672)
   */
  withExposedPorts(...ports: number[]): this;

  /**
   * Set wait strategy for container readiness
   * @param waitStrategy - Strategy to determine when container is ready
   * @returns Container builder for chaining
   * @example .withWaitStrategy(Wait.forLogMessage(/Server startup complete/))
   */
  withWaitStrategy(waitStrategy: WaitStrategy): this;

  /**
   * Set startup timeout
   * @param startupTimeout - Timeout in milliseconds (default: 30000)
   * @returns Container builder for chaining
   * @example .withStartupTimeout(60000)
   */
  withStartupTimeout(startupTimeout: number): this;

  /**
   * Set container name
   * @param name - Container name (must be unique)
   * @returns Container builder for chaining
   * @example .withName('my-rabbitmq-test')
   */
  withName(name: string): this;

  /**
   * Bind mount volumes
   * @param volumes - Array of volume bind configurations
   * @returns Container builder for chaining
   */
  withBindMounts(volumes: BindMount[]): this;

  /**
   * Set command to run in container
   * @param command - Command array (overrides default RabbitMQ entrypoint)
   * @returns Container builder for chaining
   * @warning Overriding command may prevent RabbitMQ from starting correctly
   */
  withCommand(command: string[]): this;

  /**
   * Set container user
   * @param user - User identifier (e.g., 'rabbitmq', '1000:1000')
   * @returns Container builder for chaining
   */
  withUser(user: string): this;

  /**
   * Add network to container
   * @param network - StartedNetwork instance
   * @returns Container builder for chaining
   */
  withNetwork(network: StartedNetwork): this;

  /**
   * Set network mode
   * @param networkMode - Network mode ('host', 'bridge', 'none', or network name)
   * @returns Container builder for chaining
   */
  withNetworkMode(networkMode: string): this;

  /**
   * Add labels to container
   * @param labels - Object with label key-value pairs
   * @returns Container builder for chaining
   */
  withLabels(labels: { [key: string]: string }): this;

  /**
   * Enable privileged mode for the container
   * @returns Container builder for chaining
   */
  withPrivilegedMode(): this;

  /**
   * Copy files to container before start
   * @param filesToCopy - Array of file copy configurations
   * @returns Container builder for chaining
   */
  withCopyFilesToContainer(filesToCopy: FileToCopy[]): this;

  /**
   * Copy directories to container before start
   * @param directoriesToCopy - Array of directory copy configurations
   * @returns Container builder for chaining
   */
  withCopyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): this;

  /**
   * Copy content to container before start
   * @param contentsToCopy - Array of content copy configurations
   * @returns Container builder for chaining
   */
  withCopyContentToContainer(contentsToCopy: ContentToCopy[]): this;

  /**
   * Set container entrypoint
   * @param entrypoint - Entrypoint command array
   * @returns Container builder for chaining
   */
  withEntrypoint(entrypoint: string[]): this;

  /**
   * Set container platform
   * @param platform - Platform string ('linux/amd64', 'linux/arm64', etc.)
   * @returns Container builder for chaining
   */
  withPlatform(platform: string): this;

  /**
   * Configure temporary filesystems
   * @param tmpFs - Object mapping directories to tmpfs options
   * @returns Container builder for chaining
   */
  withTmpFs(tmpFs: TmpFs): this;

  /**
   * Set ulimits for the container
   * @param ulimits - Object with ulimit configurations
   * @returns Container builder for chaining
   */
  withUlimits(ulimits: Ulimits): this;

  /**
   * Add Linux capabilities to the container
   * @param capabilities - Capability names to add
   * @returns Container builder for chaining
   */
  withAddedCapabilities(...capabilities: string[]): this;

  /**
   * Drop Linux capabilities from the container
   * @param capabilities - Capability names to drop
   * @returns Container builder for chaining
   */
  withDroppedCapabilities(...capabilities: string[]): this;

  /**
   * Set network aliases for the container
   * @param networkAliases - Alias names for the container on the network
   * @returns Container builder for chaining
   */
  withNetworkAliases(...networkAliases: string[]): this;

  /**
   * Add extra host entries to the container
   * @param extraHosts - Array of host-to-IP mappings
   * @returns Container builder for chaining
   */
  withExtraHosts(extraHosts: ExtraHost[]): this;

  /**
   * Configure container health check
   * @param healthCheck - Health check configuration
   * @returns Container builder for chaining
   */
  withHealthCheck(healthCheck: HealthCheck): this;

  /**
   * Use default JSON file log driver
   * @returns Container builder for chaining
   */
  withDefaultLogDriver(): this;

  /**
   * Enable container reuse (requires TESTCONTAINERS_REUSE_ENABLE=true environment variable)
   * @returns Container builder for chaining
   * @note Reuse allows containers to persist across test runs for faster execution
   */
  withReuse(): this;

  /**
   * Configure automatic container removal on stop
   * @param autoRemove - Whether to automatically remove container (default: true)
   * @returns Container builder for chaining
   */
  withAutoRemove(autoRemove: boolean): this;

  /**
   * Set image pull policy
   * @param pullPolicy - Policy for pulling container images
   * @returns Container builder for chaining
   */
  withPullPolicy(pullPolicy: ImagePullPolicy): this;

  /**
   * Set IPC mode for the container
   * @param ipcMode - IPC mode string ('shareable', 'private', container name/id)
   * @returns Container builder for chaining
   */
  withIpcMode(ipcMode: string): this;

  /**
   * Copy archives to container before start
   * @param archivesToCopy - Array of archive copy configurations
   * @returns Container builder for chaining
   */
  withCopyArchivesToContainer(archivesToCopy: ArchiveToCopy[]): this;

  /**
   * Set working directory in the container
   * @param workingDir - Working directory path
   * @returns Container builder for chaining
   */
  withWorkingDir(workingDir: string): this;

  /**
   * Set resource quotas for the container
   * @param quota - Memory and CPU resource limits
   * @returns Container builder for chaining
   */
  withResourcesQuota(quota: ResourcesQuota): this;

  /**
   * Set shared memory size for the container
   * @param bytes - Shared memory size in bytes
   * @returns Container builder for chaining
   */
  withSharedMemorySize(bytes: number): this;

  /**
   * Set a log consumer to receive container logs
   * @param logConsumer - Function to consume log stream
   * @returns Container builder for chaining
   */
  withLogConsumer(logConsumer: (stream: Readable) => unknown): this;

  /**
   * Set container hostname
   * @param hostname - Hostname for the container
   * @returns Container builder for chaining
   */
  withHostname(hostname: string): this;
}

Types

interface ExecResult {
  output: string;      // Combined stdout and stderr
  stdout: string;      // Standard output
  stderr: string;      // Standard error
  exitCode: number;    // Process exit code (0 for success)
}

interface ExecOptions {
  workingDir?: string;                    // Working directory for command
  user?: string;                          // User to run command as
  env?: Record<string, string>;           // Environment variables
}

interface FileToCopy {
  source: string;      // Source file path on host
  target: string;      // Target path in container
  mode?: number;       // File permissions (octal, e.g., 0o644)
}

interface DirectoryToCopy {
  source: string;      // Source directory path on host
  target: string;      // Target path in container
  mode?: number;       // Directory permissions (octal)
}

type Content = string | Buffer | Readable;

interface ContentToCopy {
  content: Content;   // Content to copy (string, Buffer, or stream)
  target: string;     // Target file path in container
  mode?: number;      // File permissions (octal)
}

interface ArchiveToCopy {
  tar: Readable;      // Tar archive stream
  target: string;     // Target directory in container
}

interface StopOptions {
  timeout?: number;         // Stop timeout in milliseconds
  removeVolumes?: boolean;  // Whether to remove associated volumes
}

interface RestartOptions {
  timeout?: number;        // Restart timeout in milliseconds
}

type BindMode = 'rw' | 'ro' | 'z' | 'Z';

interface BindMount {
  source: string;     // Source path on host
  target: string;     // Target path in container
  mode?: BindMode;    // Mount mode: 'rw' (read-write), 'ro' (read-only), 'z' (shared SELinux), 'Z' (private SELinux)
}

interface CommitOptions {
  repository?: string;      // Repository name for new image
  tag?: string;            // Tag for new image
  comment?: string;        // Commit message
  author?: string;         // Author name
  pause?: boolean;         // Pause container before commit
  deleteOnExit?: boolean; // Delete container after commit
  changes?: string[];      // Dockerfile instructions as array
}

type Labels = { [key: string]: string };

type TmpFs = { [dir: string]: string };

interface Ulimits {
  [name: string]: {
    hard: number | undefined;  // Hard limit
    soft: number | undefined;   // Soft limit
  };
}

interface ExtraHost {
  host: string;        // Hostname
  ipAddress: string;   // IP address
}

interface HealthCheck {
  test: ['CMD-SHELL', string] | ['CMD', ...string[]];  // Health check command
  interval?: number;   // Check interval in milliseconds
  timeout?: number;    // Timeout in milliseconds
  retries?: number;    // Number of retries before marking unhealthy
  startPeriod?: number; // Grace period in milliseconds before checks start
}

interface ResourcesQuota {
  memory?: number;  // Memory limit in Gigabytes
  cpu?: number;     // CPU quota in units of CPUs
}

interface ImagePullPolicy {
  shouldPull(): boolean;  // Returns true if image should be pulled
}

interface StartedNetwork {
  getName(): string;  // Network name
  getId(): string;   // Network ID
}

interface WaitStrategy {
  // Wait strategy from testcontainers package
  // Common strategies: Wait.forLogMessage(), Wait.forHealthCheck(), Wait.forListeningPorts()
}

interface StoppedTestContainer {
  // Stopped container interface from testcontainers package
  // Container can be restarted or removed
}

// Readable is from Node.js 'stream' module
type Readable = NodeJS.ReadableStream;

RabbitMQ-Specific Configuration

Environment Variables

RabbitMQ container behavior can be configured using environment variables:

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withEnvironment({
    // Authentication
    RABBITMQ_DEFAULT_USER: 'admin',
    RABBITMQ_DEFAULT_PASS: 'secret123',
    
    // Virtual host
    RABBITMQ_DEFAULT_VHOST: '/myvhost',
    
    // Node name (for clustering)
    RABBITMQ_NODENAME: 'rabbit@myhost',
    
    // Erlang cookie (for clustering)
    RABBITMQ_ERLANG_COOKIE: 'SWQOKODSQALRPCLNMEQG',
    
    // Memory limit (e.g., '256MiB', '2GiB')
    RABBITMQ_VM_MEMORY_HIGH_WATERMARK: '0.4',
    
    // Disk free space limit (e.g., '2GB', '50MB')
    RABBITMQ_DISK_FREE_LIMIT: '2GB',
    
    // Enable plugins (comma-separated)
    RABBITMQ_ENABLED_PLUGINS: 'rabbitmq_management,rabbitmq_shovel',
    
    // Management UI configuration
    RABBITMQ_MANAGEMENT_SSL_ENABLED: 'false',
    
    // Log level
    RABBITMQ_LOG_LEVEL: 'info',
  })
  .start();

Custom RabbitMQ Configuration File

RabbitMQ can be configured using Erlang configuration files:

import { RabbitMQContainer } from '@testcontainers/rabbitmq';

// Advanced configuration using rabbitmq.config
const configContent = `[
  {rabbit, [
    {loopback_users, []},                    // Allow guest user from non-localhost
    {vm_memory_high_watermark, 0.4},         // Memory threshold (40%)
    {disk_free_limit, 2000000000},          // 2GB disk free limit
    {tcp_listen_options, [binary, {packet, raw}, {active, false}]},
    {heartbeat, 60}                          // Heartbeat timeout in seconds
  ]},
  {rabbitmq_management, [
    {listener, [{port, 15672}, {ip, "0.0.0.0"}]}
  ]}
].`;

const container = await new RabbitMQContainer('rabbitmq:3.13-management')
  .withCopyContentToContainer([{
    content: configContent,
    target: '/etc/rabbitmq/rabbitmq.config'
  }])
  .start();

RabbitMQ Advanced Configuration (rabbitmq.conf)

Modern RabbitMQ (3.7+) supports a simpler configuration format:

const confContent = `## Networking
listeners.tcp.default = 5672
management.tcp.port = 15672

## Memory and Disk
vm_memory_high_watermark.relative = 0.4
disk_free_limit.absolute = 2GB

## Cluster
cluster_formation.peer_discovery_backend = classic_config

## Logging
log.console = true
log.console.level = info
log.file = false

## Performance
heartbeat = 60
frame_max = 131072
channel_max = 2047
`;

const container = await new RabbitMQContainer('rabbitmq:3.13-management')
  .withCopyContentToContainer([{
    content: confContent,
    target: '/etc/rabbitmq/rabbitmq.conf'
  }])
  .start();

Enabling RabbitMQ Plugins

Plugins can be enabled via environment variable or by copying enabled_plugins file:

// Method 1: Environment variable
const container1 = await new RabbitMQContainer('rabbitmq:3.13-management')
  .withEnvironment({
    RABBITMQ_ENABLED_PLUGINS: 'rabbitmq_management,rabbitmq_shovel,rabbitmq_federation'
  })
  .start();

// Method 2: Enabled plugins file
const container2 = await new RabbitMQContainer('rabbitmq:3.13-management')
  .withCopyContentToContainer([{
    content: '[rabbitmq_management,rabbitmq_shovel,rabbitmq_federation].',
    target: '/etc/rabbitmq/enabled_plugins'
  }])
  .start();

Usage Patterns

Basic Publish and Subscribe Test

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const QUEUE = 'test-queue';
const MESSAGE = 'Hello World';

await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();

const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });

// Publish
channel.sendToQueue(QUEUE, Buffer.from(MESSAGE), { persistent: false });

// Subscribe
await new Promise<void>((resolve) => {
  channel.consume(QUEUE, (message) => {
    if (message) {
      console.log(message.content.toString()); // "Hello World"
      channel.ack(message);
      resolve();
    }
  }, { noAck: false });
});

await channel.close();
await connection.close();

Testing with Exchanges and Routing

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();

// Create exchange
await channel.assertExchange('test-exchange', 'direct', { durable: false });

// Create queues
await channel.assertQueue('queue1', { durable: false });
await channel.assertQueue('queue2', { durable: false });

// Bind queues to exchange with routing keys
await channel.bindQueue('queue1', 'test-exchange', 'error');
await channel.bindQueue('queue2', 'test-exchange', 'info');

// Publish with routing key
channel.publish('test-exchange', 'error', Buffer.from('Error message'));
channel.publish('test-exchange', 'info', Buffer.from('Info message'));

// Consume from queues
channel.consume('queue1', (msg) => {
  if (msg) console.log('Queue1:', msg.content.toString());
});
channel.consume('queue2', (msg) => {
  if (msg) console.log('Queue2:', msg.content.toString());
});

await channel.close();
await connection.close();

Integration with Testing Frameworks

Jest Example

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

describe('RabbitMQ Integration Tests', () => {
  let container: StartedRabbitMQContainer;
  let connection: amqp.Connection;

  beforeAll(async () => {
    container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
    connection = await amqp.connect(container.getAmqpUrl());
  });

  afterAll(async () => {
    await connection.close();
    await container.stop();
  });

  beforeEach(async () => {
    // Clean up queues before each test
    const channel = await connection.createChannel();
    await channel.deleteQueue('test-queue');
    await channel.close();
  });

  it('should publish and consume messages', async () => {
    const channel = await connection.createChannel();
    await channel.assertQueue('test-queue');
    
    channel.sendToQueue('test-queue', Buffer.from('test message'));
    
    const message = await new Promise<amqp.ConsumeMessage>((resolve) => {
      channel.consume('test-queue', (msg) => {
        if (msg) resolve(msg);
      }, { noAck: true });
    });
    
    expect(message.content.toString()).toBe('test message');
    await channel.close();
  });
});

Vitest Example

import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest';
import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

describe('RabbitMQ Tests', () => {
  let container: StartedRabbitMQContainer;
  let connection: amqp.Connection;

  beforeAll(async () => {
    container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
    connection = await amqp.connect(container.getAmqpUrl());
  });

  afterAll(async () => {
    await connection.close();
    await container.stop();
  });

  it('should handle message routing', async () => {
    // Test implementation
  });
});

Mocha Example

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

describe('RabbitMQ Integration', function() {
  this.timeout(60000); // Increase timeout for container startup
  
  let container: StartedRabbitMQContainer;
  let connection: amqp.Connection;

  before(async () => {
    container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
    connection = await amqp.connect(container.getAmqpUrl());
  });

  after(async () => {
    await connection.close();
    await container.stop();
  });

  it('should process messages', async () => {
    // Test implementation
  });
});

Using Management UI

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import fetch from 'node-fetch';

// Use management image
const container = await new RabbitMQContainer('rabbitmq:3.13-management').start();

// Access management API
const managementPort = container.getMappedPort(15672);
const managementUrl = `http://${container.getHost()}:${managementPort}`;

// Get overview using management API
const response = await fetch(`${managementUrl}/api/overview`, {
  headers: {
    'Authorization': 'Basic ' + Buffer.from('guest:guest').toString('base64')
  }
});

const overview = await response.json();
console.log('RabbitMQ Version:', overview.rabbitmq_version);
console.log('Management UI:', `${managementUrl}`);

// List queues
const queuesResponse = await fetch(`${managementUrl}/api/queues`, {
  headers: {
    'Authorization': 'Basic ' + Buffer.from('guest:guest').toString('base64')
  }
});
const queues = await queuesResponse.json();
console.log('Queues:', queues);

await container.stop();

Custom Credentials

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withEnvironment({
    RABBITMQ_DEFAULT_USER: 'myuser',
    RABBITMQ_DEFAULT_PASS: 'mypassword'
  })
  .start();

// Connect with custom credentials
const connection = await amqp.connect({
  username: 'myuser',
  password: 'mypassword',
  hostname: container.getHost(),
  port: container.getMappedPort(5672)
});

// Or construct URL manually
const customUrl = `amqp://myuser:mypassword@${container.getHost()}:${container.getMappedPort(5672)}`;
const connection2 = await amqp.connect(customUrl);

await connection.close();
await container.stop();

Multiple Containers for Different Scenarios

import { RabbitMQContainer } from '@testcontainers/rabbitmq';

// Start multiple containers for different test scenarios
await using container1 = await new RabbitMQContainer('rabbitmq:3.13').start();
await using container2 = await new RabbitMQContainer('rabbitmq:3.12').start();
await using container3 = await new RabbitMQContainer('rabbitmq:3.13-management')
  .withEnvironment({ RABBITMQ_DEFAULT_USER: 'admin', RABBITMQ_DEFAULT_PASS: 'admin' })
  .start();

// Run tests against different versions
const conn1 = await amqp.connect(container1.getAmqpUrl());
const conn2 = await amqp.connect(container2.getAmqpUrl());
const conn3 = await amqp.connect({
  username: 'admin',
  password: 'admin',
  hostname: container3.getHost(),
  port: container3.getMappedPort(5672)
});

// Test compatibility across versions

Network Isolation

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Network, StartedNetwork } from 'testcontainers';

// Create isolated network
const network = await new Network().start();

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withNetwork(network)
  .withNetworkAliases('rabbitmq')
  .start();

// Other containers on the same network can connect using 'rabbitmq' hostname
const otherContainer = await new GenericContainer('my-app')
  .withNetwork(network)
  .start();

// From otherContainer, connect to RabbitMQ using: amqp://rabbitmq:5672

await container.stop();
await network.stop();

Executing RabbitMQ Commands

import { RabbitMQContainer } from '@testcontainers/rabbitmq';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();

// Check RabbitMQ status
const statusResult = await container.exec(['rabbitmqctl', 'status']);
console.log('Status:', statusResult.output);

// List queues
const queuesResult = await container.exec(['rabbitmqctl', 'list_queues']);
console.log('Queues:', queuesResult.output);

// List exchanges
const exchangesResult = await container.exec(['rabbitmqctl', 'list_exchanges']);
console.log('Exchanges:', exchangesResult.output);

// List bindings
const bindingsResult = await container.exec(['rabbitmqctl', 'list_bindings']);
console.log('Bindings:', bindingsResult.output);

// List connections
const connectionsResult = await container.exec(['rabbitmqctl', 'list_connections']);
console.log('Connections:', connectionsResult.output);

// Create virtual host
await container.exec(['rabbitmqctl', 'add_vhost', '/myvhost']);

// Set permissions
await container.exec(['rabbitmqctl', 'set_permissions', '-p', '/myvhost', 'guest', '.*', '.*', '.*']);

// List virtual hosts
const vhostsResult = await container.exec(['rabbitmqctl', 'list_vhosts']);
console.log('Virtual Hosts:', vhostsResult.output);

await container.stop();

Custom Wait Strategy

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Wait } from 'testcontainers';

// Custom wait strategy - wait for specific log message
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withWaitStrategy(
    Wait.forLogMessage(/Server startup complete/)
      .withStartupTimeout(60000)
  )
  .start();

// Wait for health check (if health check is configured)
const container2 = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withHealthCheck({
    test: ['CMD-SHELL', 'rabbitmqctl status'],
    interval: 5000,
    timeout: 3000,
    retries: 5,
    startPeriod: 10000
  })
  .withWaitStrategy(Wait.forHealthCheck())
  .start();

// Wait for listening ports
const container3 = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withWaitStrategy(
    Wait.forListeningPorts()
      .withStartupTimeout(45000)
  )
  .start();

await container.stop();
await container2.stop();
await container3.stop();

Resource Limits

import { RabbitMQContainer } from '@testcontainers/rabbitmq';

// Set memory and CPU limits
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withResourcesQuota({
    memory: 1,  // 1 GB memory limit
    cpu: 0.5    // 0.5 CPU cores
  })
  .withEnvironment({
    RABBITMQ_VM_MEMORY_HIGH_WATERMARK: '0.4'  // 40% of container memory
  })
  .start();

await container.stop();

Logging and Debugging

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Readable } from 'stream';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withLogConsumer((stream: Readable) => {
    stream.on('data', (chunk: Buffer) => {
      console.log('[RabbitMQ]', chunk.toString());
    });
  })
  .withEnvironment({
    RABBITMQ_LOG_LEVEL: 'debug'  // Enable debug logging
  })
  .start();

// Get logs programmatically
const logs = await container.logs({ tail: 100 });
logs.on('data', (chunk: Buffer) => {
  console.log('Recent logs:', chunk.toString());
});

// Get logs since specific time
const since = Date.now() - 60000; // Last minute
const recentLogs = await container.logs({ since });
recentLogs.on('data', (chunk: Buffer) => {
  console.log('Recent logs:', chunk.toString());
});

await container.stop();

Container Reuse for Faster Tests

import { RabbitMQContainer } from '@testcontainers/rabbitmq';

// Enable container reuse (requires TESTCONTAINERS_REUSE_ENABLE=true)
// Containers with same configuration will be reused across test runs
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withName('reusable-rabbitmq')  // Fixed name required for reuse
  .withReuse()
  .start();

// Container will persist after stop() if reuse is enabled
await container.stop();

// Next test run will reuse the same container
const reusedContainer = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withName('reusable-rabbitmq')
  .withReuse()
  .start();
// Container starts much faster as it's reused

await reusedContainer.stop();

Testing with Different AMQP Clients

Using amqplib

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();

// Use amqplib API
await channel.assertQueue('queue', { durable: true });
channel.sendToQueue('queue', Buffer.from('message'));

Using amqp-connection-manager

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqp-connection-manager';
import { ConfirmChannel } from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = amqp.connect([container.getAmqpUrl()]);

connection.on('connect', () => console.log('Connected'));
connection.on('disconnect', () => console.log('Disconnected'));

const channelWrapper = connection.createChannel({
  setup: async (channel: ConfirmChannel) => {
    await channel.assertQueue('queue', { durable: true });
  }
});

await channelWrapper.sendToQueue('queue', Buffer.from('message'));
await connection.close();
await container.stop();

Error Handling

Common Error Scenarios

Containers may fail to start or operate due to various reasons:

  1. Docker not running or inaccessible

    try {
      const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
    } catch (error) {
      if (error.message.includes('Docker')) {
        console.error('Docker is not running or not accessible');
      }
    }
  2. Image cannot be pulled

    try {
      const container = await new RabbitMQContainer('rabbitmq:invalid-tag').start();
    } catch (error) {
      if (error.message.includes('pull') || error.message.includes('image')) {
        console.error('Failed to pull Docker image');
      }
    }
  3. Ports already in use

    try {
      const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
        .withExposedPorts(5672)
        .start();
    } catch (error) {
      if (error.message.includes('port') || error.message.includes('bind')) {
        console.error('Port is already in use');
      }
    }
  4. Insufficient system resources

    try {
      const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
    } catch (error) {
      if (error.message.includes('memory') || error.message.includes('resource')) {
        console.error('Insufficient system resources');
      }
    }
  5. Container startup timeout

    try {
      const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
        .withStartupTimeout(10000)  // 10 seconds
        .start();
    } catch (error) {
      if (error.message.includes('timeout')) {
        console.error('Container failed to start within timeout');
        // Check logs for startup issues
        const logs = await container.logs();
        logs.on('data', (chunk) => console.error('Container logs:', chunk.toString()));
      }
    }
  6. RabbitMQ configuration errors

    try {
      const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
        .withCopyContentToContainer([{
          content: 'invalid config',
          target: '/etc/rabbitmq/rabbitmq.config'
        }])
        .start();
    } catch (error) {
      // RabbitMQ may start but fail to apply configuration
      // Check container logs for configuration errors
      const execResult = await container.exec(['rabbitmqctl', 'status']);
      if (execResult.exitCode !== 0) {
        console.error('RabbitMQ configuration error:', execResult.stderr);
      }
    }

Automatic Cleanup on Errors

Always use async disposables for automatic cleanup:

try {
  await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
  const connection = await amqp.connect(container.getAmqpUrl());
  
  // Simulate error
  throw new Error('Test error');
  
  // Container automatically cleaned up even on error
} catch (error) {
  console.error('Error occurred:', error);
  // Container is already stopped due to async dispose
}

Manual Cleanup with Error Handling

let container: StartedRabbitMQContainer | null = null;

try {
  container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
  // Use container
} catch (error) {
  console.error('Failed to start container:', error);
} finally {
  if (container) {
    try {
      await container.stop();
    } catch (stopError) {
      console.error('Failed to stop container:', stopError);
    }
  }
}

Connection Error Handling

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();

try {
  const connection = await amqp.connect(container.getAmqpUrl());
  
  connection.on('error', (err) => {
    console.error('Connection error:', err);
  });
  
  connection.on('close', () => {
    console.log('Connection closed');
  });
  
  const channel = await connection.createChannel();
  
  channel.on('error', (err) => {
    console.error('Channel error:', err);
  });
  
  // Use channel
  await channel.assertQueue('test-queue');
  
} catch (error) {
  if (error.code === 'ECONNREFUSED') {
    console.error('Cannot connect to RabbitMQ - container may not be ready');
  } else if (error.code === 'ENOTFOUND') {
    console.error('Cannot resolve RabbitMQ hostname');
  } else {
    console.error('Unexpected error:', error);
  }
} finally {
  await container.stop();
}

Advanced Scenarios

Testing Message Persistence

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();

// Create durable queue
await channel.assertQueue('persistent-queue', { durable: true });

// Send persistent message
channel.sendToQueue('persistent-queue', Buffer.from('persistent message'), {
  persistent: true
});

// Restart container to test persistence
await container.restart();

// Reconnect and verify message
const connection2 = await amqp.connect(container.getAmqpUrl());
const channel2 = await connection2.createChannel();

// Note: Messages are only persisted if queue is durable AND message is marked persistent
// Container restart may lose messages unless using volumes for data directory

await channel2.close();
await connection2.close();
await container.stop();

Testing with Virtual Hosts

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();

// Create virtual host via rabbitmqctl
await container.exec(['rabbitmqctl', 'add_vhost', '/testvhost']);

// Set permissions
await container.exec(['rabbitmqctl', 'set_permissions', '-p', '/testvhost', 'guest', '.*', '.*', '.*']);

// Connect to specific virtual host
const connection = await amqp.connect({
  hostname: container.getHost(),
  port: container.getMappedPort(5672),
  vhost: '/testvhost'
});

const channel = await connection.createChannel();
await channel.assertQueue('vhost-queue');

await channel.close();
await connection.close();
await container.stop();

Performance Testing

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
  .withResourcesQuota({
    memory: 2,  // 2 GB
    cpu: 1      // 1 CPU core
  })
  .start();

const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
await channel.assertQueue('perf-queue', { durable: false });

// Publish many messages
const messageCount = 10000;
const startTime = Date.now();

for (let i = 0; i < messageCount; i++) {
  channel.sendToQueue('perf-queue', Buffer.from(`message-${i}`), { persistent: false });
}

// Wait for all messages to be published
await new Promise(resolve => setTimeout(resolve, 1000));

const publishTime = Date.now() - startTime;
console.log(`Published ${messageCount} messages in ${publishTime}ms`);

// Consume messages
let consumed = 0;
const consumeStart = Date.now();

channel.consume('perf-queue', (msg) => {
  if (msg) {
    consumed++;
    channel.ack(msg);
    if (consumed === messageCount) {
      const consumeTime = Date.now() - consumeStart;
      console.log(`Consumed ${messageCount} messages in ${consumeTime}ms`);
    }
  }
}, { noAck: false });

await channel.close();
await connection.close();
await container.stop();

Testing Dead Letter Exchanges

import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';

const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();

// Create dead letter exchange
await channel.assertExchange('dlx', 'direct', { durable: false });

// Create dead letter queue
await channel.assertQueue('dlq', { durable: false });
await channel.bindQueue('dlq', 'dlx', 'dead');

// Create main queue with dead letter configuration
await channel.assertQueue('main-queue', {
  durable: false,
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'dead',
    'x-message-ttl': 5000  // Messages expire after 5 seconds
  }
});

// Publish message
channel.sendToQueue('main-queue', Buffer.from('test message'));

// Wait for message to expire
await new Promise(resolve => setTimeout(resolve, 6000));

// Message should be in dead letter queue
channel.consume('dlq', (msg) => {
  if (msg) {
    console.log('Dead letter message:', msg.content.toString());
    channel.ack(msg);
  }
}, { noAck: false });

await channel.close();
await connection.close();
await container.stop();

Best Practices

  1. Always use async disposables when available (Node.js 20+) for automatic cleanup
  2. Set appropriate timeouts for container startup based on system performance
  3. Use fixed container names when using container reuse for faster test execution
  4. Clean up queues/exchanges between tests to avoid test interference
  5. Use network isolation when testing multiple containers together
  6. Set resource limits to prevent tests from consuming excessive system resources
  7. Handle connection errors gracefully with proper error handling and retries
  8. Use management image only when management UI/API is needed (adds overhead)
  9. Prefer alpine images for faster startup and smaller footprint
  10. Monitor container logs during development to debug configuration issues
  11. Use health checks for more reliable container readiness detection
  12. Test with production-like configurations when possible (credentials, resource limits, etc.)