Python library for throwaway instances of anything that can run in a Docker container
Containers for caching systems, message queues, and pub/sub services including Redis, Kafka, RabbitMQ, NATS, and other messaging brokers with integrated client support and service-specific configurations.
Redis in-memory data store container with authentication support, both synchronous and asynchronous client integration.
class RedisContainer:
def __init__(
self,
image: str = "redis:latest",
port: int = 6379,
password: Optional[str] = None,
**kwargs: Any
):
"""
Initialize Redis container.
Args:
image: Redis Docker image
port: Redis port (default 6379)
password: Redis authentication password
**kwargs: Additional container options
"""
def get_client(self, **kwargs: Any):
"""
Get configured Redis client.
Args:
**kwargs: Additional redis-py client arguments
Returns:
Redis client instance
"""
class AsyncRedisContainer(RedisContainer):
def get_async_client(self, **kwargs: Any):
"""
Get configured async Redis client.
Args:
**kwargs: Additional redis-py async client arguments
Returns:
Async Redis client instance
"""Apache Kafka distributed streaming platform container with KRaft mode support and bootstrap server configuration.
class KafkaContainer:
def __init__(
self,
image: str = "confluentinc/cp-kafka:7.6.0",
port: int = 9093,
**kwargs: Any
):
"""
Initialize Kafka container.
Args:
image: Kafka Docker image
port: Kafka port (default 9093)
**kwargs: Additional container options
"""
def get_bootstrap_server(self) -> str:
"""
Get Kafka bootstrap server address.
Returns:
Bootstrap server address string (host:port)
"""
def with_kraft(self) -> "KafkaContainer":
"""
Enable KRaft mode (Kafka without Zookeeper).
Returns:
Self for method chaining
"""
def with_cluster_id(self, cluster_id: str) -> "KafkaContainer":
"""
Set Kafka cluster ID for KRaft mode.
Args:
cluster_id: Cluster identifier
Returns:
Self for method chaining
"""RabbitMQ message broker container with management interface and authentication configuration.
class RabbitMqContainer:
def __init__(
self,
image: str = "rabbitmq:3-management",
port: int = 5672,
username: str = "guest",
password: str = "guest",
**kwargs: Any
):
"""
Initialize RabbitMQ container.
Args:
image: RabbitMQ Docker image
port: AMQP port (default 5672)
username: RabbitMQ username
password: RabbitMQ password
**kwargs: Additional container options
"""
def get_connection_url(self) -> str:
"""
Get RabbitMQ connection URL.
Returns:
AMQP connection URL string
"""NATS messaging system container for high-performance pub/sub and streaming communication.
class NatsContainer:
def __init__(
self,
image: str = "nats:latest",
port: int = 4222,
**kwargs: Any
):
"""
Initialize NATS container.
Args:
image: NATS Docker image
port: NATS port (default 4222)
**kwargs: Additional container options
"""
def get_connection_url(self) -> str:
"""
Get NATS connection URL.
Returns:
NATS connection URL string
"""MQTT message broker container for IoT and lightweight messaging scenarios.
class MqttContainer:
def __init__(
self,
image: str = "eclipse-mosquitto:latest",
port: int = 1883,
**kwargs: Any
):
"""
Initialize MQTT broker container.
Args:
image: MQTT broker Docker image
port: MQTT port (default 1883)
**kwargs: Additional container options
"""
def get_connection_url(self) -> str:
"""
Get MQTT broker URL.
Returns:
MQTT broker URL string
"""Memcached distributed memory caching system container for high-performance caching.
class MemcachedContainer:
def __init__(
self,
image: str = "memcached:latest",
port: int = 11211,
**kwargs: Any
):
"""
Initialize Memcached container.
Args:
image: Memcached Docker image
port: Memcached port (default 11211)
**kwargs: Additional container options
"""
def get_connection_url(self) -> str:
"""
Get Memcached connection URL.
Returns:
Memcached connection URL string
"""from testcontainers.redis import RedisContainer
import redis
with RedisContainer("redis:6-alpine") as redis_container:
# Get Redis client
client = redis_container.get_client()
# Basic Redis operations
client.set("key1", "value1")
client.hset("user:1", "name", "John", "email", "john@example.com")
# Retrieve values
value = client.get("key1")
user_data = client.hgetall("user:1")
print(f"Cached value: {value.decode()}")
print(f"User data: {user_data}")
# List operations
client.lpush("tasks", "task1", "task2", "task3")
tasks = client.lrange("tasks", 0, -1)
print(f"Tasks: {[task.decode() for task in tasks]}")from testcontainers.redis import AsyncRedisContainer
import asyncio
async def async_redis_example():
with AsyncRedisContainer("redis:6") as redis_container:
# Get async Redis client
client = redis_container.get_async_client()
# Async Redis operations
await client.set("async_key", "async_value")
value = await client.get("async_key")
print(f"Async value: {value.decode()}")
# Close the client
await client.close()
# Run the async example
asyncio.run(async_redis_example())from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json
with KafkaContainer() as kafka:
bootstrap_server = kafka.get_bootstrap_server()
# Create producer
producer = KafkaProducer(
bootstrap_servers=[bootstrap_server],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Send messages
for i in range(5):
message = {"id": i, "message": f"Hello Kafka {i}"}
producer.send("test-topic", message)
producer.flush()
producer.close()
# Create consumer
consumer = KafkaConsumer(
"test-topic",
bootstrap_servers=[bootstrap_server],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Consume messages
for message in consumer:
print(f"Received: {message.value}")
if message.value["id"] >= 4: # Stop after receiving all messages
break
consumer.close()from testcontainers.rabbitmq import RabbitMqContainer
import pika
import json
with RabbitMqContainer() as rabbitmq:
connection_url = rabbitmq.get_connection_url()
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.URLParameters(connection_url))
channel = connection.channel()
# Declare queue
queue_name = "task_queue"
channel.queue_declare(queue=queue_name, durable=True)
# Publish messages
for i in range(3):
message = {"task_id": i, "data": f"Task {i} data"}
channel.basic_publish(
exchange="",
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
)
print(f"Sent task {i}")
# Consume messages
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received task: {message}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
# Process a few messages
for _ in range(3):
channel.process_data_events(time_limit=1)
connection.close()from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.rabbitmq import RabbitMqContainer
from testcontainers.core.network import Network
# Create shared network
with Network() as network:
# Start multiple messaging services
with RedisContainer("redis:6") as redis, \
KafkaContainer() as kafka, \
RabbitMqContainer() as rabbitmq:
# Connect to network
redis.with_network(network).with_network_aliases("redis")
kafka.with_network(network).with_network_aliases("kafka")
rabbitmq.with_network(network).with_network_aliases("rabbitmq")
# Get service endpoints
redis_client = redis.get_client()
kafka_bootstrap = kafka.get_bootstrap_server()
rabbitmq_url = rabbitmq.get_connection_url()
# Use services together in application architecture
print(f"Redis available: {redis_client.ping()}")
print(f"Kafka bootstrap: {kafka_bootstrap}")
print(f"RabbitMQ URL: {rabbitmq_url}")from testcontainers.nats import NatsContainer
import asyncio
import nats
async def nats_example():
with NatsContainer() as nats_container:
connection_url = nats_container.get_connection_url()
# Connect to NATS
nc = await nats.connect(connection_url)
# Subscribe to subject
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received message on {subject}: {data}")
await nc.subscribe("updates", cb=message_handler)
# Publish messages
for i in range(3):
await nc.publish("updates", f"Update {i}".encode())
# Allow time for message processing
await asyncio.sleep(1)
await nc.close()
# Run the async example
asyncio.run(nats_example())from testcontainers.redis import RedisContainer
# Redis with password authentication
redis = RedisContainer("redis:6") \
.with_env("REDIS_PASSWORD", "mypassword") \
.with_command("redis-server --requirepass mypassword")
with redis:
client = redis.get_client(password="mypassword")
client.set("protected_key", "protected_value")from testcontainers.kafka import KafkaContainer
# Kafka without Zookeeper using KRaft
kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0") \
.with_kraft() \
.with_cluster_id("test-cluster-id")
with kafka:
bootstrap_server = kafka.get_bootstrap_server()
print(f"KRaft Kafka available at: {bootstrap_server}")Install with Tessl CLI
npx tessl i tessl/pypi-testcontainers