RabbitMQ module for Testcontainers that enables programmatic creation of disposable RabbitMQ Docker containers for integration testing
npx @tessl/cli install tessl/npm-testcontainers--rabbitmq@11.10.0The Testcontainers RabbitMQ module provides a simple way to create disposable RabbitMQ message broker instances running in Docker containers for integration testing. It handles container lifecycle management, port mapping, credential configuration, and provides convenient helper methods for connecting to the containerized RabbitMQ instance.
npm install @testcontainers/rabbitmq testcontainerstestcontainers packageimport { RabbitMQContainer, StartedRabbitMQContainer } from '@testcontainers/rabbitmq';For CommonJS:
const { RabbitMQContainer, StartedRabbitMQContainer } = require('@testcontainers/rabbitmq');Note: StartedRabbitMQContainer is typically returned by RabbitMQContainer.start() and rarely needs to be imported directly.
import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
// Start a RabbitMQ container
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Get connection URL and connect
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
// Use RabbitMQ for your tests
await channel.assertQueue('test-queue');
channel.sendToQueue('test-queue', Buffer.from('Hello World'));
// Cleanup
await connection.close();
await container.stop();Using with async disposables (Node.js 20+):
await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Container automatically stopped when scope exitsclass RabbitMQContainer extends GenericContainer {
/**
* Create a new RabbitMQ container builder
* @param image - Docker image name (e.g., 'rabbitmq:3.13-alpine', 'rabbitmq:3.13-management')
*/
constructor(image: string);
/**
* Start the RabbitMQ container
* @returns Promise resolving to a started container instance
* @throws Error if container fails to start, image cannot be pulled, or ports are unavailable
*/
start(): Promise<StartedRabbitMQContainer>;
}Default Configuration:
guest, password guestImportant Notes:
-management tag (e.g., rabbitmq:3.13-management)rabbitmq:3.13-alpine) do not include the management pluginstart() resolvesclass StartedRabbitMQContainer extends AbstractStartedContainer {
/**
* Get the AMQP connection URL
* @returns Connection URL in format: amqp://guest:guest@{host}:{port}
* @example "amqp://guest:guest@localhost:32768"
*/
getAmqpUrl(): string;
/**
* Get the AMQPS (secure) connection URL
* @returns Secure connection URL in format: amqps://guest:guest@{host}:{port}
* @example "amqps://guest:guest@localhost:32769"
* @note Requires SSL/TLS configuration in RabbitMQ container
*/
getAmqpsUrl(): string;
/**
* Get the host where the container is running
* @returns Host address (typically 'localhost' for Docker Desktop, actual hostname for remote Docker)
*/
getHost(): string;
/**
* Get the mapped port for a container port
* @param port - Internal container port number (5672 for AMQP, 15672 for management UI)
* @returns Externally mapped port number (randomly assigned unless fixed)
* @throws Error if port is not exposed or container is not started
*/
getMappedPort(port: number): number;
/**
* Get the container ID
* @returns Docker container ID (short format)
*/
getId(): string;
/**
* Get the container name
* @returns Docker container name (auto-generated or custom if set)
*/
getName(): string;
/**
* Stop the container
* @param options - Optional stop configuration
* @param options.timeout - Stop timeout in milliseconds (default: 10 seconds)
* @param options.removeVolumes - Whether to remove associated volumes
* @returns Promise resolving to stopped container
* @throws Error if container cannot be stopped
*/
stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>;
/**
* Restart the container
* @param options - Optional restart configuration
* @param options.timeout - Restart timeout in milliseconds
* @returns Promise resolving when container is restarted
* @throws Error if container cannot be restarted
*/
restart(options?: Partial<RestartOptions>): Promise<void>;
/**
* Execute a command inside the running container
* @param command - Command string or array of command parts
* @param opts - Optional execution configuration
* @param opts.workingDir - Working directory for command execution
* @param opts.user - User to run command as
* @param opts.env - Environment variables for command
* @returns Promise resolving to execution result with stdout, stderr, and exit code
* @example await container.exec(['rabbitmqctl', 'status'])
* @example await container.exec(['rabbitmqctl', 'list_queues'])
*/
exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>;
/**
* Copy files to the container
* @param filesToCopy - Array of file copy configurations
* @returns Promise resolving when files are copied
*/
copyFilesToContainer(filesToCopy: FileToCopy[]): Promise<void>;
/**
* Copy directories to the container
* @param directoriesToCopy - Array of directory copy configurations
* @returns Promise resolving when directories are copied
*/
copyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): Promise<void>;
/**
* Copy content to the container
* @param contentsToCopy - Array of content copy configurations
* @returns Promise resolving when content is copied
*/
copyContentToContainer(contentsToCopy: ContentToCopy[]): Promise<void>;
/**
* Copy an archive (tar stream) to the container
* @param tar - Readable tar stream
* @param target - Target path in container (default: "/")
* @returns Promise resolving when archive is copied
*/
copyArchiveToContainer(tar: Readable, target?: string): Promise<void>;
/**
* Copy an archive from the container
* @param path - Path in container to copy from
* @returns Promise resolving to readable stream of tar archive
*/
copyArchiveFromContainer(path: string): Promise<NodeJS.ReadableStream>;
/**
* Commit the container to a new image
* @param options - Commit configuration options
* @returns Promise resolving to the new image ID
*/
commit(options: CommitOptions): Promise<string>;
/**
* Get the container hostname
* @returns Container hostname
*/
getHostname(): string;
/**
* Get the first mapped port
* @returns First exposed port number
*/
getFirstMappedPort(): number;
/**
* Get container labels
* @returns Object containing all container labels
*/
getLabels(): Labels;
/**
* Get all network names the container is connected to
* @returns Array of network names
*/
getNetworkNames(): string[];
/**
* Get the network ID for a specific network name
* @param networkName - Name of the network
* @returns Network ID
*/
getNetworkId(networkName: string): string;
/**
* Get the IP address of the container on a specific network
* @param networkName - Name of the network
* @returns IP address on the specified network
*/
getIpAddress(networkName: string): string;
/**
* Get container logs as a readable stream
* @param opts - Optional log retrieval options
* @param opts.since - Unix timestamp to get logs since
* @param opts.tail - Number of lines to retrieve from end
* @returns Promise resolving to readable stream of logs
*/
logs(opts?: { since?: number; tail?: number }): Promise<Readable>;
/**
* Async dispose method for automatic cleanup (Node.js 20+)
* Enables usage with 'await using' syntax for automatic container cleanup
* @returns Promise resolving when container is stopped
*/
[Symbol.asyncDispose](): Promise<void>;
}All methods return this for method chaining. Methods are inherited from GenericContainer base class.
interface RabbitMQContainer {
/**
* Set environment variables for the container
* @param environment - Object with environment variable key-value pairs
* @returns Container builder for chaining
* @example .withEnvironment({ RABBITMQ_DEFAULT_USER: 'admin', RABBITMQ_DEFAULT_PASS: 'secret' })
*/
withEnvironment(environment: { [key: string]: string }): this;
/**
* Configure exposed ports
* @param ports - Port numbers to expose (defaults already include 5672, 5671, 15672, 15671)
* @returns Container builder for chaining
* @example .withExposedPorts(5672, 15672)
*/
withExposedPorts(...ports: number[]): this;
/**
* Set wait strategy for container readiness
* @param waitStrategy - Strategy to determine when container is ready
* @returns Container builder for chaining
* @example .withWaitStrategy(Wait.forLogMessage(/Server startup complete/))
*/
withWaitStrategy(waitStrategy: WaitStrategy): this;
/**
* Set startup timeout
* @param startupTimeout - Timeout in milliseconds (default: 30000)
* @returns Container builder for chaining
* @example .withStartupTimeout(60000)
*/
withStartupTimeout(startupTimeout: number): this;
/**
* Set container name
* @param name - Container name (must be unique)
* @returns Container builder for chaining
* @example .withName('my-rabbitmq-test')
*/
withName(name: string): this;
/**
* Bind mount volumes
* @param volumes - Array of volume bind configurations
* @returns Container builder for chaining
*/
withBindMounts(volumes: BindMount[]): this;
/**
* Set command to run in container
* @param command - Command array (overrides default RabbitMQ entrypoint)
* @returns Container builder for chaining
* @warning Overriding command may prevent RabbitMQ from starting correctly
*/
withCommand(command: string[]): this;
/**
* Set container user
* @param user - User identifier (e.g., 'rabbitmq', '1000:1000')
* @returns Container builder for chaining
*/
withUser(user: string): this;
/**
* Add network to container
* @param network - StartedNetwork instance
* @returns Container builder for chaining
*/
withNetwork(network: StartedNetwork): this;
/**
* Set network mode
* @param networkMode - Network mode ('host', 'bridge', 'none', or network name)
* @returns Container builder for chaining
*/
withNetworkMode(networkMode: string): this;
/**
* Add labels to container
* @param labels - Object with label key-value pairs
* @returns Container builder for chaining
*/
withLabels(labels: { [key: string]: string }): this;
/**
* Enable privileged mode for the container
* @returns Container builder for chaining
*/
withPrivilegedMode(): this;
/**
* Copy files to container before start
* @param filesToCopy - Array of file copy configurations
* @returns Container builder for chaining
*/
withCopyFilesToContainer(filesToCopy: FileToCopy[]): this;
/**
* Copy directories to container before start
* @param directoriesToCopy - Array of directory copy configurations
* @returns Container builder for chaining
*/
withCopyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): this;
/**
* Copy content to container before start
* @param contentsToCopy - Array of content copy configurations
* @returns Container builder for chaining
*/
withCopyContentToContainer(contentsToCopy: ContentToCopy[]): this;
/**
* Set container entrypoint
* @param entrypoint - Entrypoint command array
* @returns Container builder for chaining
*/
withEntrypoint(entrypoint: string[]): this;
/**
* Set container platform
* @param platform - Platform string ('linux/amd64', 'linux/arm64', etc.)
* @returns Container builder for chaining
*/
withPlatform(platform: string): this;
/**
* Configure temporary filesystems
* @param tmpFs - Object mapping directories to tmpfs options
* @returns Container builder for chaining
*/
withTmpFs(tmpFs: TmpFs): this;
/**
* Set ulimits for the container
* @param ulimits - Object with ulimit configurations
* @returns Container builder for chaining
*/
withUlimits(ulimits: Ulimits): this;
/**
* Add Linux capabilities to the container
* @param capabilities - Capability names to add
* @returns Container builder for chaining
*/
withAddedCapabilities(...capabilities: string[]): this;
/**
* Drop Linux capabilities from the container
* @param capabilities - Capability names to drop
* @returns Container builder for chaining
*/
withDroppedCapabilities(...capabilities: string[]): this;
/**
* Set network aliases for the container
* @param networkAliases - Alias names for the container on the network
* @returns Container builder for chaining
*/
withNetworkAliases(...networkAliases: string[]): this;
/**
* Add extra host entries to the container
* @param extraHosts - Array of host-to-IP mappings
* @returns Container builder for chaining
*/
withExtraHosts(extraHosts: ExtraHost[]): this;
/**
* Configure container health check
* @param healthCheck - Health check configuration
* @returns Container builder for chaining
*/
withHealthCheck(healthCheck: HealthCheck): this;
/**
* Use default JSON file log driver
* @returns Container builder for chaining
*/
withDefaultLogDriver(): this;
/**
* Enable container reuse (requires TESTCONTAINERS_REUSE_ENABLE=true environment variable)
* @returns Container builder for chaining
* @note Reuse allows containers to persist across test runs for faster execution
*/
withReuse(): this;
/**
* Configure automatic container removal on stop
* @param autoRemove - Whether to automatically remove container (default: true)
* @returns Container builder for chaining
*/
withAutoRemove(autoRemove: boolean): this;
/**
* Set image pull policy
* @param pullPolicy - Policy for pulling container images
* @returns Container builder for chaining
*/
withPullPolicy(pullPolicy: ImagePullPolicy): this;
/**
* Set IPC mode for the container
* @param ipcMode - IPC mode string ('shareable', 'private', container name/id)
* @returns Container builder for chaining
*/
withIpcMode(ipcMode: string): this;
/**
* Copy archives to container before start
* @param archivesToCopy - Array of archive copy configurations
* @returns Container builder for chaining
*/
withCopyArchivesToContainer(archivesToCopy: ArchiveToCopy[]): this;
/**
* Set working directory in the container
* @param workingDir - Working directory path
* @returns Container builder for chaining
*/
withWorkingDir(workingDir: string): this;
/**
* Set resource quotas for the container
* @param quota - Memory and CPU resource limits
* @returns Container builder for chaining
*/
withResourcesQuota(quota: ResourcesQuota): this;
/**
* Set shared memory size for the container
* @param bytes - Shared memory size in bytes
* @returns Container builder for chaining
*/
withSharedMemorySize(bytes: number): this;
/**
* Set a log consumer to receive container logs
* @param logConsumer - Function to consume log stream
* @returns Container builder for chaining
*/
withLogConsumer(logConsumer: (stream: Readable) => unknown): this;
/**
* Set container hostname
* @param hostname - Hostname for the container
* @returns Container builder for chaining
*/
withHostname(hostname: string): this;
}interface ExecResult {
output: string; // Combined stdout and stderr
stdout: string; // Standard output
stderr: string; // Standard error
exitCode: number; // Process exit code (0 for success)
}
interface ExecOptions {
workingDir?: string; // Working directory for command
user?: string; // User to run command as
env?: Record<string, string>; // Environment variables
}
interface FileToCopy {
source: string; // Source file path on host
target: string; // Target path in container
mode?: number; // File permissions (octal, e.g., 0o644)
}
interface DirectoryToCopy {
source: string; // Source directory path on host
target: string; // Target path in container
mode?: number; // Directory permissions (octal)
}
type Content = string | Buffer | Readable;
interface ContentToCopy {
content: Content; // Content to copy (string, Buffer, or stream)
target: string; // Target file path in container
mode?: number; // File permissions (octal)
}
interface ArchiveToCopy {
tar: Readable; // Tar archive stream
target: string; // Target directory in container
}
interface StopOptions {
timeout?: number; // Stop timeout in milliseconds
removeVolumes?: boolean; // Whether to remove associated volumes
}
interface RestartOptions {
timeout?: number; // Restart timeout in milliseconds
}
type BindMode = 'rw' | 'ro' | 'z' | 'Z';
interface BindMount {
source: string; // Source path on host
target: string; // Target path in container
mode?: BindMode; // Mount mode: 'rw' (read-write), 'ro' (read-only), 'z' (shared SELinux), 'Z' (private SELinux)
}
interface CommitOptions {
repository?: string; // Repository name for new image
tag?: string; // Tag for new image
comment?: string; // Commit message
author?: string; // Author name
pause?: boolean; // Pause container before commit
deleteOnExit?: boolean; // Delete container after commit
changes?: string[]; // Dockerfile instructions as array
}
type Labels = { [key: string]: string };
type TmpFs = { [dir: string]: string };
interface Ulimits {
[name: string]: {
hard: number | undefined; // Hard limit
soft: number | undefined; // Soft limit
};
}
interface ExtraHost {
host: string; // Hostname
ipAddress: string; // IP address
}
interface HealthCheck {
test: ['CMD-SHELL', string] | ['CMD', ...string[]]; // Health check command
interval?: number; // Check interval in milliseconds
timeout?: number; // Timeout in milliseconds
retries?: number; // Number of retries before marking unhealthy
startPeriod?: number; // Grace period in milliseconds before checks start
}
interface ResourcesQuota {
memory?: number; // Memory limit in Gigabytes
cpu?: number; // CPU quota in units of CPUs
}
interface ImagePullPolicy {
shouldPull(): boolean; // Returns true if image should be pulled
}
interface StartedNetwork {
getName(): string; // Network name
getId(): string; // Network ID
}
interface WaitStrategy {
// Wait strategy from testcontainers package
// Common strategies: Wait.forLogMessage(), Wait.forHealthCheck(), Wait.forListeningPorts()
}
interface StoppedTestContainer {
// Stopped container interface from testcontainers package
// Container can be restarted or removed
}
// Readable is from Node.js 'stream' module
type Readable = NodeJS.ReadableStream;RabbitMQ container behavior can be configured using environment variables:
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withEnvironment({
// Authentication
RABBITMQ_DEFAULT_USER: 'admin',
RABBITMQ_DEFAULT_PASS: 'secret123',
// Virtual host
RABBITMQ_DEFAULT_VHOST: '/myvhost',
// Node name (for clustering)
RABBITMQ_NODENAME: 'rabbit@myhost',
// Erlang cookie (for clustering)
RABBITMQ_ERLANG_COOKIE: 'SWQOKODSQALRPCLNMEQG',
// Memory limit (e.g., '256MiB', '2GiB')
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: '0.4',
// Disk free space limit (e.g., '2GB', '50MB')
RABBITMQ_DISK_FREE_LIMIT: '2GB',
// Enable plugins (comma-separated)
RABBITMQ_ENABLED_PLUGINS: 'rabbitmq_management,rabbitmq_shovel',
// Management UI configuration
RABBITMQ_MANAGEMENT_SSL_ENABLED: 'false',
// Log level
RABBITMQ_LOG_LEVEL: 'info',
})
.start();RabbitMQ can be configured using Erlang configuration files:
import { RabbitMQContainer } from '@testcontainers/rabbitmq';
// Advanced configuration using rabbitmq.config
const configContent = `[
{rabbit, [
{loopback_users, []}, // Allow guest user from non-localhost
{vm_memory_high_watermark, 0.4}, // Memory threshold (40%)
{disk_free_limit, 2000000000}, // 2GB disk free limit
{tcp_listen_options, [binary, {packet, raw}, {active, false}]},
{heartbeat, 60} // Heartbeat timeout in seconds
]},
{rabbitmq_management, [
{listener, [{port, 15672}, {ip, "0.0.0.0"}]}
]}
].`;
const container = await new RabbitMQContainer('rabbitmq:3.13-management')
.withCopyContentToContainer([{
content: configContent,
target: '/etc/rabbitmq/rabbitmq.config'
}])
.start();Modern RabbitMQ (3.7+) supports a simpler configuration format:
const confContent = `## Networking
listeners.tcp.default = 5672
management.tcp.port = 15672
## Memory and Disk
vm_memory_high_watermark.relative = 0.4
disk_free_limit.absolute = 2GB
## Cluster
cluster_formation.peer_discovery_backend = classic_config
## Logging
log.console = true
log.console.level = info
log.file = false
## Performance
heartbeat = 60
frame_max = 131072
channel_max = 2047
`;
const container = await new RabbitMQContainer('rabbitmq:3.13-management')
.withCopyContentToContainer([{
content: confContent,
target: '/etc/rabbitmq/rabbitmq.conf'
}])
.start();Plugins can be enabled via environment variable or by copying enabled_plugins file:
// Method 1: Environment variable
const container1 = await new RabbitMQContainer('rabbitmq:3.13-management')
.withEnvironment({
RABBITMQ_ENABLED_PLUGINS: 'rabbitmq_management,rabbitmq_shovel,rabbitmq_federation'
})
.start();
// Method 2: Enabled plugins file
const container2 = await new RabbitMQContainer('rabbitmq:3.13-management')
.withCopyContentToContainer([{
content: '[rabbitmq_management,rabbitmq_shovel,rabbitmq_federation].',
target: '/etc/rabbitmq/enabled_plugins'
}])
.start();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const QUEUE = 'test-queue';
const MESSAGE = 'Hello World';
await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE, { durable: false });
// Publish
channel.sendToQueue(QUEUE, Buffer.from(MESSAGE), { persistent: false });
// Subscribe
await new Promise<void>((resolve) => {
channel.consume(QUEUE, (message) => {
if (message) {
console.log(message.content.toString()); // "Hello World"
channel.ack(message);
resolve();
}
}, { noAck: false });
});
await channel.close();
await connection.close();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
// Create exchange
await channel.assertExchange('test-exchange', 'direct', { durable: false });
// Create queues
await channel.assertQueue('queue1', { durable: false });
await channel.assertQueue('queue2', { durable: false });
// Bind queues to exchange with routing keys
await channel.bindQueue('queue1', 'test-exchange', 'error');
await channel.bindQueue('queue2', 'test-exchange', 'info');
// Publish with routing key
channel.publish('test-exchange', 'error', Buffer.from('Error message'));
channel.publish('test-exchange', 'info', Buffer.from('Info message'));
// Consume from queues
channel.consume('queue1', (msg) => {
if (msg) console.log('Queue1:', msg.content.toString());
});
channel.consume('queue2', (msg) => {
if (msg) console.log('Queue2:', msg.content.toString());
});
await channel.close();
await connection.close();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
describe('RabbitMQ Integration Tests', () => {
let container: StartedRabbitMQContainer;
let connection: amqp.Connection;
beforeAll(async () => {
container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
connection = await amqp.connect(container.getAmqpUrl());
});
afterAll(async () => {
await connection.close();
await container.stop();
});
beforeEach(async () => {
// Clean up queues before each test
const channel = await connection.createChannel();
await channel.deleteQueue('test-queue');
await channel.close();
});
it('should publish and consume messages', async () => {
const channel = await connection.createChannel();
await channel.assertQueue('test-queue');
channel.sendToQueue('test-queue', Buffer.from('test message'));
const message = await new Promise<amqp.ConsumeMessage>((resolve) => {
channel.consume('test-queue', (msg) => {
if (msg) resolve(msg);
}, { noAck: true });
});
expect(message.content.toString()).toBe('test message');
await channel.close();
});
});import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest';
import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
describe('RabbitMQ Tests', () => {
let container: StartedRabbitMQContainer;
let connection: amqp.Connection;
beforeAll(async () => {
container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
connection = await amqp.connect(container.getAmqpUrl());
});
afterAll(async () => {
await connection.close();
await container.stop();
});
it('should handle message routing', async () => {
// Test implementation
});
});import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
describe('RabbitMQ Integration', function() {
this.timeout(60000); // Increase timeout for container startup
let container: StartedRabbitMQContainer;
let connection: amqp.Connection;
before(async () => {
container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
connection = await amqp.connect(container.getAmqpUrl());
});
after(async () => {
await connection.close();
await container.stop();
});
it('should process messages', async () => {
// Test implementation
});
});import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import fetch from 'node-fetch';
// Use management image
const container = await new RabbitMQContainer('rabbitmq:3.13-management').start();
// Access management API
const managementPort = container.getMappedPort(15672);
const managementUrl = `http://${container.getHost()}:${managementPort}`;
// Get overview using management API
const response = await fetch(`${managementUrl}/api/overview`, {
headers: {
'Authorization': 'Basic ' + Buffer.from('guest:guest').toString('base64')
}
});
const overview = await response.json();
console.log('RabbitMQ Version:', overview.rabbitmq_version);
console.log('Management UI:', `${managementUrl}`);
// List queues
const queuesResponse = await fetch(`${managementUrl}/api/queues`, {
headers: {
'Authorization': 'Basic ' + Buffer.from('guest:guest').toString('base64')
}
});
const queues = await queuesResponse.json();
console.log('Queues:', queues);
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withEnvironment({
RABBITMQ_DEFAULT_USER: 'myuser',
RABBITMQ_DEFAULT_PASS: 'mypassword'
})
.start();
// Connect with custom credentials
const connection = await amqp.connect({
username: 'myuser',
password: 'mypassword',
hostname: container.getHost(),
port: container.getMappedPort(5672)
});
// Or construct URL manually
const customUrl = `amqp://myuser:mypassword@${container.getHost()}:${container.getMappedPort(5672)}`;
const connection2 = await amqp.connect(customUrl);
await connection.close();
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
// Start multiple containers for different test scenarios
await using container1 = await new RabbitMQContainer('rabbitmq:3.13').start();
await using container2 = await new RabbitMQContainer('rabbitmq:3.12').start();
await using container3 = await new RabbitMQContainer('rabbitmq:3.13-management')
.withEnvironment({ RABBITMQ_DEFAULT_USER: 'admin', RABBITMQ_DEFAULT_PASS: 'admin' })
.start();
// Run tests against different versions
const conn1 = await amqp.connect(container1.getAmqpUrl());
const conn2 = await amqp.connect(container2.getAmqpUrl());
const conn3 = await amqp.connect({
username: 'admin',
password: 'admin',
hostname: container3.getHost(),
port: container3.getMappedPort(5672)
});
// Test compatibility across versionsimport { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Network, StartedNetwork } from 'testcontainers';
// Create isolated network
const network = await new Network().start();
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withNetwork(network)
.withNetworkAliases('rabbitmq')
.start();
// Other containers on the same network can connect using 'rabbitmq' hostname
const otherContainer = await new GenericContainer('my-app')
.withNetwork(network)
.start();
// From otherContainer, connect to RabbitMQ using: amqp://rabbitmq:5672
await container.stop();
await network.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Check RabbitMQ status
const statusResult = await container.exec(['rabbitmqctl', 'status']);
console.log('Status:', statusResult.output);
// List queues
const queuesResult = await container.exec(['rabbitmqctl', 'list_queues']);
console.log('Queues:', queuesResult.output);
// List exchanges
const exchangesResult = await container.exec(['rabbitmqctl', 'list_exchanges']);
console.log('Exchanges:', exchangesResult.output);
// List bindings
const bindingsResult = await container.exec(['rabbitmqctl', 'list_bindings']);
console.log('Bindings:', bindingsResult.output);
// List connections
const connectionsResult = await container.exec(['rabbitmqctl', 'list_connections']);
console.log('Connections:', connectionsResult.output);
// Create virtual host
await container.exec(['rabbitmqctl', 'add_vhost', '/myvhost']);
// Set permissions
await container.exec(['rabbitmqctl', 'set_permissions', '-p', '/myvhost', 'guest', '.*', '.*', '.*']);
// List virtual hosts
const vhostsResult = await container.exec(['rabbitmqctl', 'list_vhosts']);
console.log('Virtual Hosts:', vhostsResult.output);
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Wait } from 'testcontainers';
// Custom wait strategy - wait for specific log message
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withWaitStrategy(
Wait.forLogMessage(/Server startup complete/)
.withStartupTimeout(60000)
)
.start();
// Wait for health check (if health check is configured)
const container2 = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withHealthCheck({
test: ['CMD-SHELL', 'rabbitmqctl status'],
interval: 5000,
timeout: 3000,
retries: 5,
startPeriod: 10000
})
.withWaitStrategy(Wait.forHealthCheck())
.start();
// Wait for listening ports
const container3 = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withWaitStrategy(
Wait.forListeningPorts()
.withStartupTimeout(45000)
)
.start();
await container.stop();
await container2.stop();
await container3.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
// Set memory and CPU limits
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withResourcesQuota({
memory: 1, // 1 GB memory limit
cpu: 0.5 // 0.5 CPU cores
})
.withEnvironment({
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: '0.4' // 40% of container memory
})
.start();
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import { Readable } from 'stream';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withLogConsumer((stream: Readable) => {
stream.on('data', (chunk: Buffer) => {
console.log('[RabbitMQ]', chunk.toString());
});
})
.withEnvironment({
RABBITMQ_LOG_LEVEL: 'debug' // Enable debug logging
})
.start();
// Get logs programmatically
const logs = await container.logs({ tail: 100 });
logs.on('data', (chunk: Buffer) => {
console.log('Recent logs:', chunk.toString());
});
// Get logs since specific time
const since = Date.now() - 60000; // Last minute
const recentLogs = await container.logs({ since });
recentLogs.on('data', (chunk: Buffer) => {
console.log('Recent logs:', chunk.toString());
});
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
// Enable container reuse (requires TESTCONTAINERS_REUSE_ENABLE=true)
// Containers with same configuration will be reused across test runs
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withName('reusable-rabbitmq') // Fixed name required for reuse
.withReuse()
.start();
// Container will persist after stop() if reuse is enabled
await container.stop();
// Next test run will reuse the same container
const reusedContainer = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withName('reusable-rabbitmq')
.withReuse()
.start();
// Container starts much faster as it's reused
await reusedContainer.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
// Use amqplib API
await channel.assertQueue('queue', { durable: true });
channel.sendToQueue('queue', Buffer.from('message'));import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqp-connection-manager';
import { ConfirmChannel } from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = amqp.connect([container.getAmqpUrl()]);
connection.on('connect', () => console.log('Connected'));
connection.on('disconnect', () => console.log('Disconnected'));
const channelWrapper = connection.createChannel({
setup: async (channel: ConfirmChannel) => {
await channel.assertQueue('queue', { durable: true });
}
});
await channelWrapper.sendToQueue('queue', Buffer.from('message'));
await connection.close();
await container.stop();Containers may fail to start or operate due to various reasons:
Docker not running or inaccessible
try {
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
} catch (error) {
if (error.message.includes('Docker')) {
console.error('Docker is not running or not accessible');
}
}Image cannot be pulled
try {
const container = await new RabbitMQContainer('rabbitmq:invalid-tag').start();
} catch (error) {
if (error.message.includes('pull') || error.message.includes('image')) {
console.error('Failed to pull Docker image');
}
}Ports already in use
try {
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withExposedPorts(5672)
.start();
} catch (error) {
if (error.message.includes('port') || error.message.includes('bind')) {
console.error('Port is already in use');
}
}Insufficient system resources
try {
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
} catch (error) {
if (error.message.includes('memory') || error.message.includes('resource')) {
console.error('Insufficient system resources');
}
}Container startup timeout
try {
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withStartupTimeout(10000) // 10 seconds
.start();
} catch (error) {
if (error.message.includes('timeout')) {
console.error('Container failed to start within timeout');
// Check logs for startup issues
const logs = await container.logs();
logs.on('data', (chunk) => console.error('Container logs:', chunk.toString()));
}
}RabbitMQ configuration errors
try {
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withCopyContentToContainer([{
content: 'invalid config',
target: '/etc/rabbitmq/rabbitmq.config'
}])
.start();
} catch (error) {
// RabbitMQ may start but fail to apply configuration
// Check container logs for configuration errors
const execResult = await container.exec(['rabbitmqctl', 'status']);
if (execResult.exitCode !== 0) {
console.error('RabbitMQ configuration error:', execResult.stderr);
}
}Always use async disposables for automatic cleanup:
try {
await using container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
// Simulate error
throw new Error('Test error');
// Container automatically cleaned up even on error
} catch (error) {
console.error('Error occurred:', error);
// Container is already stopped due to async dispose
}let container: StartedRabbitMQContainer | null = null;
try {
container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Use container
} catch (error) {
console.error('Failed to start container:', error);
} finally {
if (container) {
try {
await container.stop();
} catch (stopError) {
console.error('Failed to stop container:', stopError);
}
}
}import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
try {
const connection = await amqp.connect(container.getAmqpUrl());
connection.on('error', (err) => {
console.error('Connection error:', err);
});
connection.on('close', () => {
console.log('Connection closed');
});
const channel = await connection.createChannel();
channel.on('error', (err) => {
console.error('Channel error:', err);
});
// Use channel
await channel.assertQueue('test-queue');
} catch (error) {
if (error.code === 'ECONNREFUSED') {
console.error('Cannot connect to RabbitMQ - container may not be ready');
} else if (error.code === 'ENOTFOUND') {
console.error('Cannot resolve RabbitMQ hostname');
} else {
console.error('Unexpected error:', error);
}
} finally {
await container.stop();
}import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
// Create durable queue
await channel.assertQueue('persistent-queue', { durable: true });
// Send persistent message
channel.sendToQueue('persistent-queue', Buffer.from('persistent message'), {
persistent: true
});
// Restart container to test persistence
await container.restart();
// Reconnect and verify message
const connection2 = await amqp.connect(container.getAmqpUrl());
const channel2 = await connection2.createChannel();
// Note: Messages are only persisted if queue is durable AND message is marked persistent
// Container restart may lose messages unless using volumes for data directory
await channel2.close();
await connection2.close();
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
// Create virtual host via rabbitmqctl
await container.exec(['rabbitmqctl', 'add_vhost', '/testvhost']);
// Set permissions
await container.exec(['rabbitmqctl', 'set_permissions', '-p', '/testvhost', 'guest', '.*', '.*', '.*']);
// Connect to specific virtual host
const connection = await amqp.connect({
hostname: container.getHost(),
port: container.getMappedPort(5672),
vhost: '/testvhost'
});
const channel = await connection.createChannel();
await channel.assertQueue('vhost-queue');
await channel.close();
await connection.close();
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine')
.withResourcesQuota({
memory: 2, // 2 GB
cpu: 1 // 1 CPU core
})
.start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
await channel.assertQueue('perf-queue', { durable: false });
// Publish many messages
const messageCount = 10000;
const startTime = Date.now();
for (let i = 0; i < messageCount; i++) {
channel.sendToQueue('perf-queue', Buffer.from(`message-${i}`), { persistent: false });
}
// Wait for all messages to be published
await new Promise(resolve => setTimeout(resolve, 1000));
const publishTime = Date.now() - startTime;
console.log(`Published ${messageCount} messages in ${publishTime}ms`);
// Consume messages
let consumed = 0;
const consumeStart = Date.now();
channel.consume('perf-queue', (msg) => {
if (msg) {
consumed++;
channel.ack(msg);
if (consumed === messageCount) {
const consumeTime = Date.now() - consumeStart;
console.log(`Consumed ${messageCount} messages in ${consumeTime}ms`);
}
}
}, { noAck: false });
await channel.close();
await connection.close();
await container.stop();import { RabbitMQContainer } from '@testcontainers/rabbitmq';
import amqp from 'amqplib';
const container = await new RabbitMQContainer('rabbitmq:3.13-alpine').start();
const connection = await amqp.connect(container.getAmqpUrl());
const channel = await connection.createChannel();
// Create dead letter exchange
await channel.assertExchange('dlx', 'direct', { durable: false });
// Create dead letter queue
await channel.assertQueue('dlq', { durable: false });
await channel.bindQueue('dlq', 'dlx', 'dead');
// Create main queue with dead letter configuration
await channel.assertQueue('main-queue', {
durable: false,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 5000 // Messages expire after 5 seconds
}
});
// Publish message
channel.sendToQueue('main-queue', Buffer.from('test message'));
// Wait for message to expire
await new Promise(resolve => setTimeout(resolve, 6000));
// Message should be in dead letter queue
channel.consume('dlq', (msg) => {
if (msg) {
console.log('Dead letter message:', msg.content.toString());
channel.ack(msg);
}
}, { noAck: false });
await channel.close();
await connection.close();
await container.stop();