or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cache-messaging.mdcloud-services.mdcompose.mdcore-containers.mddatabase-containers.mdindex.mdsearch-analytics.mdwaiting-strategies.mdweb-testing.md

cache-messaging.mddocs/

0

# Cache and Messaging Containers

1

2

Containers for caching systems, message queues, and pub/sub services including Redis, Kafka, RabbitMQ, NATS, and other messaging brokers with integrated client support and service-specific configurations.

3

4

## Capabilities

5

6

### Redis Container

7

8

Redis in-memory data store container with authentication support, both synchronous and asynchronous client integration.

9

10

```python { .api }

11

class RedisContainer:

12

def __init__(

13

self,

14

image: str = "redis:latest",

15

port: int = 6379,

16

password: Optional[str] = None,

17

**kwargs: Any

18

):

19

"""

20

Initialize Redis container.

21

22

Args:

23

image: Redis Docker image

24

port: Redis port (default 6379)

25

password: Redis authentication password

26

**kwargs: Additional container options

27

"""

28

29

def get_client(self, **kwargs: Any):

30

"""

31

Get configured Redis client.

32

33

Args:

34

**kwargs: Additional redis-py client arguments

35

36

Returns:

37

Redis client instance

38

"""

39

40

class AsyncRedisContainer(RedisContainer):

41

def get_async_client(self, **kwargs: Any):

42

"""

43

Get configured async Redis client.

44

45

Args:

46

**kwargs: Additional redis-py async client arguments

47

48

Returns:

49

Async Redis client instance

50

"""

51

```

52

53

### Kafka Container

54

55

Apache Kafka distributed streaming platform container with KRaft mode support and bootstrap server configuration.

56

57

```python { .api }

58

class KafkaContainer:

59

def __init__(

60

self,

61

image: str = "confluentinc/cp-kafka:7.6.0",

62

port: int = 9093,

63

**kwargs: Any

64

):

65

"""

66

Initialize Kafka container.

67

68

Args:

69

image: Kafka Docker image

70

port: Kafka port (default 9093)

71

**kwargs: Additional container options

72

"""

73

74

def get_bootstrap_server(self) -> str:

75

"""

76

Get Kafka bootstrap server address.

77

78

Returns:

79

Bootstrap server address string (host:port)

80

"""

81

82

def with_kraft(self) -> "KafkaContainer":

83

"""

84

Enable KRaft mode (Kafka without Zookeeper).

85

86

Returns:

87

Self for method chaining

88

"""

89

90

def with_cluster_id(self, cluster_id: str) -> "KafkaContainer":

91

"""

92

Set Kafka cluster ID for KRaft mode.

93

94

Args:

95

cluster_id: Cluster identifier

96

97

Returns:

98

Self for method chaining

99

"""

100

```

101

102

### RabbitMQ Container

103

104

RabbitMQ message broker container with management interface and authentication configuration.

105

106

```python { .api }

107

class RabbitMqContainer:

108

def __init__(

109

self,

110

image: str = "rabbitmq:3-management",

111

port: int = 5672,

112

username: str = "guest",

113

password: str = "guest",

114

**kwargs: Any

115

):

116

"""

117

Initialize RabbitMQ container.

118

119

Args:

120

image: RabbitMQ Docker image

121

port: AMQP port (default 5672)

122

username: RabbitMQ username

123

password: RabbitMQ password

124

**kwargs: Additional container options

125

"""

126

127

def get_connection_url(self) -> str:

128

"""

129

Get RabbitMQ connection URL.

130

131

Returns:

132

AMQP connection URL string

133

"""

134

```

135

136

### NATS Container

137

138

NATS messaging system container for high-performance pub/sub and streaming communication.

139

140

```python { .api }

141

class NatsContainer:

142

def __init__(

143

self,

144

image: str = "nats:latest",

145

port: int = 4222,

146

**kwargs: Any

147

):

148

"""

149

Initialize NATS container.

150

151

Args:

152

image: NATS Docker image

153

port: NATS port (default 4222)

154

**kwargs: Additional container options

155

"""

156

157

def get_connection_url(self) -> str:

158

"""

159

Get NATS connection URL.

160

161

Returns:

162

NATS connection URL string

163

"""

164

```

165

166

### MQTT Container

167

168

MQTT message broker container for IoT and lightweight messaging scenarios.

169

170

```python { .api }

171

class MqttContainer:

172

def __init__(

173

self,

174

image: str = "eclipse-mosquitto:latest",

175

port: int = 1883,

176

**kwargs: Any

177

):

178

"""

179

Initialize MQTT broker container.

180

181

Args:

182

image: MQTT broker Docker image

183

port: MQTT port (default 1883)

184

**kwargs: Additional container options

185

"""

186

187

def get_connection_url(self) -> str:

188

"""

189

Get MQTT broker URL.

190

191

Returns:

192

MQTT broker URL string

193

"""

194

```

195

196

### Memcached Container

197

198

Memcached distributed memory caching system container for high-performance caching.

199

200

```python { .api }

201

class MemcachedContainer:

202

def __init__(

203

self,

204

image: str = "memcached:latest",

205

port: int = 11211,

206

**kwargs: Any

207

):

208

"""

209

Initialize Memcached container.

210

211

Args:

212

image: Memcached Docker image

213

port: Memcached port (default 11211)

214

**kwargs: Additional container options

215

"""

216

217

def get_connection_url(self) -> str:

218

"""

219

Get Memcached connection URL.

220

221

Returns:

222

Memcached connection URL string

223

"""

224

```

225

226

## Usage Examples

227

228

### Redis Caching

229

230

```python

231

from testcontainers.redis import RedisContainer

232

import redis

233

234

with RedisContainer("redis:6-alpine") as redis_container:

235

# Get Redis client

236

client = redis_container.get_client()

237

238

# Basic Redis operations

239

client.set("key1", "value1")

240

client.hset("user:1", "name", "John", "email", "john@example.com")

241

242

# Retrieve values

243

value = client.get("key1")

244

user_data = client.hgetall("user:1")

245

246

print(f"Cached value: {value.decode()}")

247

print(f"User data: {user_data}")

248

249

# List operations

250

client.lpush("tasks", "task1", "task2", "task3")

251

tasks = client.lrange("tasks", 0, -1)

252

print(f"Tasks: {[task.decode() for task in tasks]}")

253

```

254

255

### Async Redis Usage

256

257

```python

258

from testcontainers.redis import AsyncRedisContainer

259

import asyncio

260

261

async def async_redis_example():

262

with AsyncRedisContainer("redis:6") as redis_container:

263

# Get async Redis client

264

client = redis_container.get_async_client()

265

266

# Async Redis operations

267

await client.set("async_key", "async_value")

268

value = await client.get("async_key")

269

270

print(f"Async value: {value.decode()}")

271

272

# Close the client

273

await client.close()

274

275

# Run the async example

276

asyncio.run(async_redis_example())

277

```

278

279

### Kafka Messaging

280

281

```python

282

from testcontainers.kafka import KafkaContainer

283

from kafka import KafkaProducer, KafkaConsumer

284

import json

285

286

with KafkaContainer() as kafka:

287

bootstrap_server = kafka.get_bootstrap_server()

288

289

# Create producer

290

producer = KafkaProducer(

291

bootstrap_servers=[bootstrap_server],

292

value_serializer=lambda x: json.dumps(x).encode('utf-8')

293

)

294

295

# Send messages

296

for i in range(5):

297

message = {"id": i, "message": f"Hello Kafka {i}"}

298

producer.send("test-topic", message)

299

300

producer.flush()

301

producer.close()

302

303

# Create consumer

304

consumer = KafkaConsumer(

305

"test-topic",

306

bootstrap_servers=[bootstrap_server],

307

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

308

)

309

310

# Consume messages

311

for message in consumer:

312

print(f"Received: {message.value}")

313

if message.value["id"] >= 4: # Stop after receiving all messages

314

break

315

316

consumer.close()

317

```

318

319

### RabbitMQ Message Queue

320

321

```python

322

from testcontainers.rabbitmq import RabbitMqContainer

323

import pika

324

import json

325

326

with RabbitMqContainer() as rabbitmq:

327

connection_url = rabbitmq.get_connection_url()

328

329

# Connect to RabbitMQ

330

connection = pika.BlockingConnection(pika.URLParameters(connection_url))

331

channel = connection.channel()

332

333

# Declare queue

334

queue_name = "task_queue"

335

channel.queue_declare(queue=queue_name, durable=True)

336

337

# Publish messages

338

for i in range(3):

339

message = {"task_id": i, "data": f"Task {i} data"}

340

channel.basic_publish(

341

exchange="",

342

routing_key=queue_name,

343

body=json.dumps(message),

344

properties=pika.BasicProperties(delivery_mode=2) # Make message persistent

345

)

346

print(f"Sent task {i}")

347

348

# Consume messages

349

def callback(ch, method, properties, body):

350

message = json.loads(body)

351

print(f"Received task: {message}")

352

ch.basic_ack(delivery_tag=method.delivery_tag)

353

354

channel.basic_consume(queue=queue_name, on_message_callback=callback)

355

356

# Process a few messages

357

for _ in range(3):

358

channel.process_data_events(time_limit=1)

359

360

connection.close()

361

```

362

363

### Multi-Service Messaging Setup

364

365

```python

366

from testcontainers.redis import RedisContainer

367

from testcontainers.kafka import KafkaContainer

368

from testcontainers.rabbitmq import RabbitMqContainer

369

from testcontainers.core.network import Network

370

371

# Create shared network

372

with Network() as network:

373

# Start multiple messaging services

374

with RedisContainer("redis:6") as redis, \

375

KafkaContainer() as kafka, \

376

RabbitMqContainer() as rabbitmq:

377

378

# Connect to network

379

redis.with_network(network).with_network_aliases("redis")

380

kafka.with_network(network).with_network_aliases("kafka")

381

rabbitmq.with_network(network).with_network_aliases("rabbitmq")

382

383

# Get service endpoints

384

redis_client = redis.get_client()

385

kafka_bootstrap = kafka.get_bootstrap_server()

386

rabbitmq_url = rabbitmq.get_connection_url()

387

388

# Use services together in application architecture

389

print(f"Redis available: {redis_client.ping()}")

390

print(f"Kafka bootstrap: {kafka_bootstrap}")

391

print(f"RabbitMQ URL: {rabbitmq_url}")

392

```

393

394

### NATS Pub/Sub

395

396

```python

397

from testcontainers.nats import NatsContainer

398

import asyncio

399

import nats

400

401

async def nats_example():

402

with NatsContainer() as nats_container:

403

connection_url = nats_container.get_connection_url()

404

405

# Connect to NATS

406

nc = await nats.connect(connection_url)

407

408

# Subscribe to subject

409

async def message_handler(msg):

410

subject = msg.subject

411

data = msg.data.decode()

412

print(f"Received message on {subject}: {data}")

413

414

await nc.subscribe("updates", cb=message_handler)

415

416

# Publish messages

417

for i in range(3):

418

await nc.publish("updates", f"Update {i}".encode())

419

420

# Allow time for message processing

421

await asyncio.sleep(1)

422

423

await nc.close()

424

425

# Run the async example

426

asyncio.run(nats_example())

427

```

428

429

## Configuration Examples

430

431

### Redis with Custom Configuration

432

433

```python

434

from testcontainers.redis import RedisContainer

435

436

# Redis with password authentication

437

redis = RedisContainer("redis:6") \

438

.with_env("REDIS_PASSWORD", "mypassword") \

439

.with_command("redis-server --requirepass mypassword")

440

441

with redis:

442

client = redis.get_client(password="mypassword")

443

client.set("protected_key", "protected_value")

444

```

445

446

### Kafka with KRaft Mode

447

448

```python

449

from testcontainers.kafka import KafkaContainer

450

451

# Kafka without Zookeeper using KRaft

452

kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0") \

453

.with_kraft() \

454

.with_cluster_id("test-cluster-id")

455

456

with kafka:

457

bootstrap_server = kafka.get_bootstrap_server()

458

print(f"KRaft Kafka available at: {bootstrap_server}")

459

```