0
# Cache and Messaging Containers
1
2
Containers for caching systems, message queues, and pub/sub services including Redis, Kafka, RabbitMQ, NATS, and other messaging brokers with integrated client support and service-specific configurations.
3
4
## Capabilities
5
6
### Redis Container
7
8
Redis in-memory data store container with authentication support, both synchronous and asynchronous client integration.
9
10
```python { .api }
11
class RedisContainer:
12
def __init__(
13
self,
14
image: str = "redis:latest",
15
port: int = 6379,
16
password: Optional[str] = None,
17
**kwargs: Any
18
):
19
"""
20
Initialize Redis container.
21
22
Args:
23
image: Redis Docker image
24
port: Redis port (default 6379)
25
password: Redis authentication password
26
**kwargs: Additional container options
27
"""
28
29
def get_client(self, **kwargs: Any):
30
"""
31
Get configured Redis client.
32
33
Args:
34
**kwargs: Additional redis-py client arguments
35
36
Returns:
37
Redis client instance
38
"""
39
40
class AsyncRedisContainer(RedisContainer):
41
def get_async_client(self, **kwargs: Any):
42
"""
43
Get configured async Redis client.
44
45
Args:
46
**kwargs: Additional redis-py async client arguments
47
48
Returns:
49
Async Redis client instance
50
"""
51
```
52
53
### Kafka Container
54
55
Apache Kafka distributed streaming platform container with KRaft mode support and bootstrap server configuration.
56
57
```python { .api }
58
class KafkaContainer:
59
def __init__(
60
self,
61
image: str = "confluentinc/cp-kafka:7.6.0",
62
port: int = 9093,
63
**kwargs: Any
64
):
65
"""
66
Initialize Kafka container.
67
68
Args:
69
image: Kafka Docker image
70
port: Kafka port (default 9093)
71
**kwargs: Additional container options
72
"""
73
74
def get_bootstrap_server(self) -> str:
75
"""
76
Get Kafka bootstrap server address.
77
78
Returns:
79
Bootstrap server address string (host:port)
80
"""
81
82
def with_kraft(self) -> "KafkaContainer":
83
"""
84
Enable KRaft mode (Kafka without Zookeeper).
85
86
Returns:
87
Self for method chaining
88
"""
89
90
def with_cluster_id(self, cluster_id: str) -> "KafkaContainer":
91
"""
92
Set Kafka cluster ID for KRaft mode.
93
94
Args:
95
cluster_id: Cluster identifier
96
97
Returns:
98
Self for method chaining
99
"""
100
```
101
102
### RabbitMQ Container
103
104
RabbitMQ message broker container with management interface and authentication configuration.
105
106
```python { .api }
107
class RabbitMqContainer:
108
def __init__(
109
self,
110
image: str = "rabbitmq:3-management",
111
port: int = 5672,
112
username: str = "guest",
113
password: str = "guest",
114
**kwargs: Any
115
):
116
"""
117
Initialize RabbitMQ container.
118
119
Args:
120
image: RabbitMQ Docker image
121
port: AMQP port (default 5672)
122
username: RabbitMQ username
123
password: RabbitMQ password
124
**kwargs: Additional container options
125
"""
126
127
def get_connection_url(self) -> str:
128
"""
129
Get RabbitMQ connection URL.
130
131
Returns:
132
AMQP connection URL string
133
"""
134
```
135
136
### NATS Container
137
138
NATS messaging system container for high-performance pub/sub and streaming communication.
139
140
```python { .api }
141
class NatsContainer:
142
def __init__(
143
self,
144
image: str = "nats:latest",
145
port: int = 4222,
146
**kwargs: Any
147
):
148
"""
149
Initialize NATS container.
150
151
Args:
152
image: NATS Docker image
153
port: NATS port (default 4222)
154
**kwargs: Additional container options
155
"""
156
157
def get_connection_url(self) -> str:
158
"""
159
Get NATS connection URL.
160
161
Returns:
162
NATS connection URL string
163
"""
164
```
165
166
### MQTT Container
167
168
MQTT message broker container for IoT and lightweight messaging scenarios.
169
170
```python { .api }
171
class MqttContainer:
172
def __init__(
173
self,
174
image: str = "eclipse-mosquitto:latest",
175
port: int = 1883,
176
**kwargs: Any
177
):
178
"""
179
Initialize MQTT broker container.
180
181
Args:
182
image: MQTT broker Docker image
183
port: MQTT port (default 1883)
184
**kwargs: Additional container options
185
"""
186
187
def get_connection_url(self) -> str:
188
"""
189
Get MQTT broker URL.
190
191
Returns:
192
MQTT broker URL string
193
"""
194
```
195
196
### Memcached Container
197
198
Memcached distributed memory caching system container for high-performance caching.
199
200
```python { .api }
201
class MemcachedContainer:
202
def __init__(
203
self,
204
image: str = "memcached:latest",
205
port: int = 11211,
206
**kwargs: Any
207
):
208
"""
209
Initialize Memcached container.
210
211
Args:
212
image: Memcached Docker image
213
port: Memcached port (default 11211)
214
**kwargs: Additional container options
215
"""
216
217
def get_connection_url(self) -> str:
218
"""
219
Get Memcached connection URL.
220
221
Returns:
222
Memcached connection URL string
223
"""
224
```
225
226
## Usage Examples
227
228
### Redis Caching
229
230
```python
231
from testcontainers.redis import RedisContainer
232
import redis
233
234
with RedisContainer("redis:6-alpine") as redis_container:
235
# Get Redis client
236
client = redis_container.get_client()
237
238
# Basic Redis operations
239
client.set("key1", "value1")
240
client.hset("user:1", "name", "John", "email", "john@example.com")
241
242
# Retrieve values
243
value = client.get("key1")
244
user_data = client.hgetall("user:1")
245
246
print(f"Cached value: {value.decode()}")
247
print(f"User data: {user_data}")
248
249
# List operations
250
client.lpush("tasks", "task1", "task2", "task3")
251
tasks = client.lrange("tasks", 0, -1)
252
print(f"Tasks: {[task.decode() for task in tasks]}")
253
```
254
255
### Async Redis Usage
256
257
```python
258
from testcontainers.redis import AsyncRedisContainer
259
import asyncio
260
261
async def async_redis_example():
262
with AsyncRedisContainer("redis:6") as redis_container:
263
# Get async Redis client
264
client = redis_container.get_async_client()
265
266
# Async Redis operations
267
await client.set("async_key", "async_value")
268
value = await client.get("async_key")
269
270
print(f"Async value: {value.decode()}")
271
272
# Close the client
273
await client.close()
274
275
# Run the async example
276
asyncio.run(async_redis_example())
277
```
278
279
### Kafka Messaging
280
281
```python
282
from testcontainers.kafka import KafkaContainer
283
from kafka import KafkaProducer, KafkaConsumer
284
import json
285
286
with KafkaContainer() as kafka:
287
bootstrap_server = kafka.get_bootstrap_server()
288
289
# Create producer
290
producer = KafkaProducer(
291
bootstrap_servers=[bootstrap_server],
292
value_serializer=lambda x: json.dumps(x).encode('utf-8')
293
)
294
295
# Send messages
296
for i in range(5):
297
message = {"id": i, "message": f"Hello Kafka {i}"}
298
producer.send("test-topic", message)
299
300
producer.flush()
301
producer.close()
302
303
# Create consumer
304
consumer = KafkaConsumer(
305
"test-topic",
306
bootstrap_servers=[bootstrap_server],
307
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
308
)
309
310
# Consume messages
311
for message in consumer:
312
print(f"Received: {message.value}")
313
if message.value["id"] >= 4: # Stop after receiving all messages
314
break
315
316
consumer.close()
317
```
318
319
### RabbitMQ Message Queue
320
321
```python
322
from testcontainers.rabbitmq import RabbitMqContainer
323
import pika
324
import json
325
326
with RabbitMqContainer() as rabbitmq:
327
connection_url = rabbitmq.get_connection_url()
328
329
# Connect to RabbitMQ
330
connection = pika.BlockingConnection(pika.URLParameters(connection_url))
331
channel = connection.channel()
332
333
# Declare queue
334
queue_name = "task_queue"
335
channel.queue_declare(queue=queue_name, durable=True)
336
337
# Publish messages
338
for i in range(3):
339
message = {"task_id": i, "data": f"Task {i} data"}
340
channel.basic_publish(
341
exchange="",
342
routing_key=queue_name,
343
body=json.dumps(message),
344
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
345
)
346
print(f"Sent task {i}")
347
348
# Consume messages
349
def callback(ch, method, properties, body):
350
message = json.loads(body)
351
print(f"Received task: {message}")
352
ch.basic_ack(delivery_tag=method.delivery_tag)
353
354
channel.basic_consume(queue=queue_name, on_message_callback=callback)
355
356
# Process a few messages
357
for _ in range(3):
358
channel.process_data_events(time_limit=1)
359
360
connection.close()
361
```
362
363
### Multi-Service Messaging Setup
364
365
```python
366
from testcontainers.redis import RedisContainer
367
from testcontainers.kafka import KafkaContainer
368
from testcontainers.rabbitmq import RabbitMqContainer
369
from testcontainers.core.network import Network
370
371
# Create shared network
372
with Network() as network:
373
# Start multiple messaging services
374
with RedisContainer("redis:6") as redis, \
375
KafkaContainer() as kafka, \
376
RabbitMqContainer() as rabbitmq:
377
378
# Connect to network
379
redis.with_network(network).with_network_aliases("redis")
380
kafka.with_network(network).with_network_aliases("kafka")
381
rabbitmq.with_network(network).with_network_aliases("rabbitmq")
382
383
# Get service endpoints
384
redis_client = redis.get_client()
385
kafka_bootstrap = kafka.get_bootstrap_server()
386
rabbitmq_url = rabbitmq.get_connection_url()
387
388
# Use services together in application architecture
389
print(f"Redis available: {redis_client.ping()}")
390
print(f"Kafka bootstrap: {kafka_bootstrap}")
391
print(f"RabbitMQ URL: {rabbitmq_url}")
392
```
393
394
### NATS Pub/Sub
395
396
```python
397
from testcontainers.nats import NatsContainer
398
import asyncio
399
import nats
400
401
async def nats_example():
402
with NatsContainer() as nats_container:
403
connection_url = nats_container.get_connection_url()
404
405
# Connect to NATS
406
nc = await nats.connect(connection_url)
407
408
# Subscribe to subject
409
async def message_handler(msg):
410
subject = msg.subject
411
data = msg.data.decode()
412
print(f"Received message on {subject}: {data}")
413
414
await nc.subscribe("updates", cb=message_handler)
415
416
# Publish messages
417
for i in range(3):
418
await nc.publish("updates", f"Update {i}".encode())
419
420
# Allow time for message processing
421
await asyncio.sleep(1)
422
423
await nc.close()
424
425
# Run the async example
426
asyncio.run(nats_example())
427
```
428
429
## Configuration Examples
430
431
### Redis with Custom Configuration
432
433
```python
434
from testcontainers.redis import RedisContainer
435
436
# Redis with password authentication
437
redis = RedisContainer("redis:6") \
438
.with_env("REDIS_PASSWORD", "mypassword") \
439
.with_command("redis-server --requirepass mypassword")
440
441
with redis:
442
client = redis.get_client(password="mypassword")
443
client.set("protected_key", "protected_value")
444
```
445
446
### Kafka with KRaft Mode
447
448
```python
449
from testcontainers.kafka import KafkaContainer
450
451
# Kafka without Zookeeper using KRaft
452
kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0") \
453
.with_kraft() \
454
.with_cluster_id("test-cluster-id")
455
456
with kafka:
457
bootstrap_server = kafka.get_bootstrap_server()
458
print(f"KRaft Kafka available at: {bootstrap_server}")
459
```