0
# Confluent Kafka Container
1
2
The `org.testcontainers.kafka.ConfluentKafkaContainer` provides support for Confluent Platform images (`confluentinc/cp-kafka`) version 7.4.0 and later with KRaft mode enabled by default. This is the recommended approach for testing with Confluent Platform.
3
4
## Constructors
5
6
```java { .api }
7
public ConfluentKafkaContainer(String imageName);
8
public ConfluentKafkaContainer(DockerImageName dockerImageName);
9
```
10
11
### Parameters
12
13
- `imageName` (String): Docker image name as string (e.g., "confluentinc/cp-kafka:7.4.0")
14
- `dockerImageName` (DockerImageName): Validated Docker image name object
15
16
### Supported Images
17
18
- `confluentinc/cp-kafka` - Confluent Platform Kafka (version 7.4.0+)
19
20
## Core Methods
21
22
### getBootstrapServers()
23
24
Returns the bootstrap servers connection string for Kafka clients.
25
26
```java { .api }
27
public String getBootstrapServers();
28
```
29
30
**Returns**: String in format `"host:port"` (e.g., "localhost:32768")
31
32
### Usage Example
33
34
```java
35
try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
36
kafka.start();
37
38
String bootstrapServers = kafka.getBootstrapServers();
39
// Use with Kafka clients:
40
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
41
}
42
```
43
44
## Listener Configuration
45
46
### withListener(String listener)
47
48
Adds a custom listener for connections within the same container network.
49
50
```java { .api }
51
public ConfluentKafkaContainer withListener(String listener);
52
```
53
54
**Parameters**:
55
- `listener` (String): Listener in format `"host:port"` (e.g., "kafka:19092")
56
57
**Returns**: ConfluentKafkaContainer instance for method chaining
58
59
The host will be added as a network alias, allowing other containers in the same network to connect using this listener.
60
61
### withListener(String listener, Supplier<String> advertisedListener)
62
63
Adds a custom listener with separate advertised listener for external connections.
64
65
```java { .api }
66
public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);
67
```
68
69
**Parameters**:
70
- `listener` (String): Internal listener in format `"host:port"`
71
- `advertisedListener` (Supplier<String>): Supplier providing external advertised listener
72
73
**Returns**: ConfluentKafkaContainer instance for method chaining
74
75
### Network Usage Example
76
77
```java
78
Network network = Network.newNetwork();
79
80
try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
81
.withListener("kafka:19092") // Internal network listener
82
.withNetwork(network);
83
// Other containers in same network can connect to "kafka:19092"
84
KCatContainer kcat = new KCatContainer()
85
.withNetwork(network)) {
86
87
kafka.start();
88
kcat.start();
89
90
// Send message via internal listener
91
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
92
93
// Consume message via internal listener
94
String output = kcat.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
95
.getStdout();
96
}
97
```
98
99
### External Proxy Example
100
101
```java
102
Network network = Network.newNetwork();
103
104
try (SocatContainer proxy = new SocatContainer()
105
.withNetwork(network)
106
.withTarget(2000, "kafka", 19092);
107
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
108
.withListener("kafka:19092", () -> proxy.getHost() + ":" + proxy.getMappedPort(2000))
109
.withNetwork(network)) {
110
111
proxy.start();
112
kafka.start();
113
114
// External clients can connect via proxy
115
String externalBootstrap = proxy.getHost() + ":" + proxy.getMappedPort(2000);
116
}
117
```
118
119
## Default Configuration
120
121
The Confluent Kafka container automatically configures:
122
123
- **KRaft Mode**: Enabled by default (no Zookeeper required)
124
- **Default Listeners**:
125
- `PLAINTEXT://0.0.0.0:9092` - External connections
126
- `BROKER://0.0.0.0:9093` - Inter-broker communication
127
- `CONTROLLER://0.0.0.0:9094` - KRaft controller
128
- **Default Advertised Listeners**:
129
- `PLAINTEXT://host:mappedPort` - For external connections
130
- `BROKER://containerHostname:9093` - For inter-broker communication
131
- **Single Node Setup**: Configured for single-node testing scenarios
132
- **Confluent Platform Features**: Access to Confluent Platform specific features and tools
133
134
## Constants
135
136
The container uses shared constants from `KafkaHelper`:
137
138
```java { .api }
139
static final int KAFKA_PORT = 9092;
140
static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";
141
```
142
143
The container exposes port 9092 and automatically maps it to a random host port accessible via `getMappedPort(9092)` or `getBootstrapServers()`.
144
145
## Confluent Platform Integration
146
147
This container runs the Confluent Platform startup script at `/etc/confluent/docker/run` and includes Confluent-specific environment variable configurations. It provides the same core Kafka functionality as the Apache container but with Confluent Platform optimizations and compatibility.