CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-testcontainers

Python library for throwaway instances of anything that can run in a Docker container

Overview
Eval results
Files

cache-messaging.mddocs/

Cache and Messaging Containers

Containers for caching systems, message queues, and pub/sub services including Redis, Kafka, RabbitMQ, NATS, and other messaging brokers with integrated client support and service-specific configurations.

Capabilities

Redis Container

Redis in-memory data store container with authentication support, both synchronous and asynchronous client integration.

class RedisContainer:
    def __init__(
        self,
        image: str = "redis:latest",
        port: int = 6379,
        password: Optional[str] = None,
        **kwargs: Any
    ):
        """
        Initialize Redis container.
        
        Args:
            image: Redis Docker image
            port: Redis port (default 6379)
            password: Redis authentication password
            **kwargs: Additional container options
        """

    def get_client(self, **kwargs: Any):
        """
        Get configured Redis client.
        
        Args:
            **kwargs: Additional redis-py client arguments
            
        Returns:
            Redis client instance
        """

class AsyncRedisContainer(RedisContainer):
    def get_async_client(self, **kwargs: Any):
        """
        Get configured async Redis client.
        
        Args:
            **kwargs: Additional redis-py async client arguments
            
        Returns:
            Async Redis client instance
        """

Kafka Container

Apache Kafka distributed streaming platform container with KRaft mode support and bootstrap server configuration.

class KafkaContainer:
    def __init__(
        self,
        image: str = "confluentinc/cp-kafka:7.6.0",
        port: int = 9093,
        **kwargs: Any
    ):
        """
        Initialize Kafka container.
        
        Args:
            image: Kafka Docker image
            port: Kafka port (default 9093)
            **kwargs: Additional container options
        """

    def get_bootstrap_server(self) -> str:
        """
        Get Kafka bootstrap server address.
        
        Returns:
            Bootstrap server address string (host:port)
        """

    def with_kraft(self) -> "KafkaContainer":
        """
        Enable KRaft mode (Kafka without Zookeeper).
        
        Returns:
            Self for method chaining
        """

    def with_cluster_id(self, cluster_id: str) -> "KafkaContainer":
        """
        Set Kafka cluster ID for KRaft mode.
        
        Args:
            cluster_id: Cluster identifier
            
        Returns:
            Self for method chaining
        """

RabbitMQ Container

RabbitMQ message broker container with management interface and authentication configuration.

class RabbitMqContainer:
    def __init__(
        self,
        image: str = "rabbitmq:3-management",
        port: int = 5672,
        username: str = "guest",
        password: str = "guest",
        **kwargs: Any
    ):
        """
        Initialize RabbitMQ container.
        
        Args:
            image: RabbitMQ Docker image
            port: AMQP port (default 5672)
            username: RabbitMQ username
            password: RabbitMQ password
            **kwargs: Additional container options
        """

    def get_connection_url(self) -> str:
        """
        Get RabbitMQ connection URL.
        
        Returns:
            AMQP connection URL string
        """

NATS Container

NATS messaging system container for high-performance pub/sub and streaming communication.

class NatsContainer:
    def __init__(
        self,
        image: str = "nats:latest",
        port: int = 4222,
        **kwargs: Any
    ):
        """
        Initialize NATS container.
        
        Args:
            image: NATS Docker image
            port: NATS port (default 4222)
            **kwargs: Additional container options
        """

    def get_connection_url(self) -> str:
        """
        Get NATS connection URL.
        
        Returns:
            NATS connection URL string
        """

MQTT Container

MQTT message broker container for IoT and lightweight messaging scenarios.

class MqttContainer:
    def __init__(
        self,
        image: str = "eclipse-mosquitto:latest",
        port: int = 1883,
        **kwargs: Any
    ):
        """
        Initialize MQTT broker container.
        
        Args:
            image: MQTT broker Docker image
            port: MQTT port (default 1883)
            **kwargs: Additional container options
        """

    def get_connection_url(self) -> str:
        """
        Get MQTT broker URL.
        
        Returns:
            MQTT broker URL string
        """

Memcached Container

Memcached distributed memory caching system container for high-performance caching.

class MemcachedContainer:
    def __init__(
        self,
        image: str = "memcached:latest",
        port: int = 11211,
        **kwargs: Any
    ):
        """
        Initialize Memcached container.
        
        Args:
            image: Memcached Docker image
            port: Memcached port (default 11211)
            **kwargs: Additional container options
        """

    def get_connection_url(self) -> str:
        """
        Get Memcached connection URL.
        
        Returns:
            Memcached connection URL string
        """

Usage Examples

Redis Caching

from testcontainers.redis import RedisContainer
import redis

with RedisContainer("redis:6-alpine") as redis_container:
    # Get Redis client
    client = redis_container.get_client()
    
    # Basic Redis operations
    client.set("key1", "value1")
    client.hset("user:1", "name", "John", "email", "john@example.com")
    
    # Retrieve values
    value = client.get("key1")
    user_data = client.hgetall("user:1")
    
    print(f"Cached value: {value.decode()}")
    print(f"User data: {user_data}")
    
    # List operations
    client.lpush("tasks", "task1", "task2", "task3")
    tasks = client.lrange("tasks", 0, -1)
    print(f"Tasks: {[task.decode() for task in tasks]}")

Async Redis Usage

from testcontainers.redis import AsyncRedisContainer
import asyncio

async def async_redis_example():
    with AsyncRedisContainer("redis:6") as redis_container:
        # Get async Redis client
        client = redis_container.get_async_client()
        
        # Async Redis operations
        await client.set("async_key", "async_value")
        value = await client.get("async_key")
        
        print(f"Async value: {value.decode()}")
        
        # Close the client
        await client.close()

# Run the async example
asyncio.run(async_redis_example())

Kafka Messaging

from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json

with KafkaContainer() as kafka:
    bootstrap_server = kafka.get_bootstrap_server()
    
    # Create producer
    producer = KafkaProducer(
        bootstrap_servers=[bootstrap_server],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )
    
    # Send messages
    for i in range(5):
        message = {"id": i, "message": f"Hello Kafka {i}"}
        producer.send("test-topic", message)
    
    producer.flush()
    producer.close()
    
    # Create consumer
    consumer = KafkaConsumer(
        "test-topic",
        bootstrap_servers=[bootstrap_server],
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    # Consume messages
    for message in consumer:
        print(f"Received: {message.value}")
        if message.value["id"] >= 4:  # Stop after receiving all messages
            break
    
    consumer.close()

RabbitMQ Message Queue

from testcontainers.rabbitmq import RabbitMqContainer
import pika
import json

with RabbitMqContainer() as rabbitmq:
    connection_url = rabbitmq.get_connection_url()
    
    # Connect to RabbitMQ
    connection = pika.BlockingConnection(pika.URLParameters(connection_url))
    channel = connection.channel()
    
    # Declare queue
    queue_name = "task_queue"
    channel.queue_declare(queue=queue_name, durable=True)
    
    # Publish messages
    for i in range(3):
        message = {"task_id": i, "data": f"Task {i} data"}
        channel.basic_publish(
            exchange="",
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)  # Make message persistent
        )
        print(f"Sent task {i}")
    
    # Consume messages
    def callback(ch, method, properties, body):
        message = json.loads(body)
        print(f"Received task: {message}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    
    # Process a few messages
    for _ in range(3):
        channel.process_data_events(time_limit=1)
    
    connection.close()

Multi-Service Messaging Setup

from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.rabbitmq import RabbitMqContainer
from testcontainers.core.network import Network

# Create shared network
with Network() as network:
    # Start multiple messaging services
    with RedisContainer("redis:6") as redis, \
         KafkaContainer() as kafka, \
         RabbitMqContainer() as rabbitmq:
        
        # Connect to network
        redis.with_network(network).with_network_aliases("redis")
        kafka.with_network(network).with_network_aliases("kafka")
        rabbitmq.with_network(network).with_network_aliases("rabbitmq")
        
        # Get service endpoints
        redis_client = redis.get_client()
        kafka_bootstrap = kafka.get_bootstrap_server()
        rabbitmq_url = rabbitmq.get_connection_url()
        
        # Use services together in application architecture
        print(f"Redis available: {redis_client.ping()}")
        print(f"Kafka bootstrap: {kafka_bootstrap}")
        print(f"RabbitMQ URL: {rabbitmq_url}")

NATS Pub/Sub

from testcontainers.nats import NatsContainer
import asyncio
import nats

async def nats_example():
    with NatsContainer() as nats_container:
        connection_url = nats_container.get_connection_url()
        
        # Connect to NATS
        nc = await nats.connect(connection_url)
        
        # Subscribe to subject
        async def message_handler(msg):
            subject = msg.subject
            data = msg.data.decode()
            print(f"Received message on {subject}: {data}")
        
        await nc.subscribe("updates", cb=message_handler)
        
        # Publish messages
        for i in range(3):
            await nc.publish("updates", f"Update {i}".encode())
        
        # Allow time for message processing
        await asyncio.sleep(1)
        
        await nc.close()

# Run the async example
asyncio.run(nats_example())

Configuration Examples

Redis with Custom Configuration

from testcontainers.redis import RedisContainer

# Redis with password authentication
redis = RedisContainer("redis:6") \
    .with_env("REDIS_PASSWORD", "mypassword") \
    .with_command("redis-server --requirepass mypassword")

with redis:
    client = redis.get_client(password="mypassword")
    client.set("protected_key", "protected_value")

Kafka with KRaft Mode

from testcontainers.kafka import KafkaContainer

# Kafka without Zookeeper using KRaft
kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0") \
    .with_kraft() \
    .with_cluster_id("test-cluster-id")

with kafka:
    bootstrap_server = kafka.get_bootstrap_server()
    print(f"KRaft Kafka available at: {bootstrap_server}")

Install with Tessl CLI

npx tessl i tessl/pypi-testcontainers

docs

cache-messaging.md

cloud-services.md

compose.md

core-containers.md

database-containers.md

index.md

search-analytics.md

waiting-strategies.md

web-testing.md

tile.json