0
# Legacy Kafka Container (Deprecated)
1
2
The `org.testcontainers.containers.KafkaContainer` is the original Kafka container implementation supporting Confluent Platform images with embedded Zookeeper, external Zookeeper, and KRaft mode options.
3
4
**⚠️ DEPRECATED**: Use `org.testcontainers.kafka.ConfluentKafkaContainer` or `org.testcontainers.kafka.KafkaContainer` instead.
5
6
## Constructors
7
8
```java { .api }
9
@Deprecated
10
public KafkaContainer();
11
12
@Deprecated
13
public KafkaContainer(String confluentPlatformVersion);
14
15
public KafkaContainer(DockerImageName dockerImageName);
16
```
17
18
### Parameters
19
20
- `confluentPlatformVersion` (String): Confluent Platform version (deprecated)
21
- `dockerImageName` (DockerImageName): Validated Docker image name object
22
23
### Supported Images
24
25
- `confluentinc/cp-kafka` - Confluent Platform Kafka (all versions)
26
27
## Core Methods
28
29
### getBootstrapServers()
30
31
Returns the bootstrap servers connection string for Kafka clients.
32
33
```java { .api }
34
public String getBootstrapServers();
35
```
36
37
**Returns**: String in format `"PLAINTEXT://host:port"` (e.g., "PLAINTEXT://localhost:32768")
38
39
Note: Legacy container includes the protocol prefix in the bootstrap servers string.
40
41
## Kafka Configuration Modes
42
43
### Embedded Zookeeper (Default)
44
45
```java { .api }
46
public KafkaContainer withEmbeddedZookeeper();
47
```
48
49
**Returns**: KafkaContainer instance for method chaining
50
51
Configures the container to run an embedded Zookeeper instance on port 2181. This is the default mode when no Zookeeper configuration is specified.
52
53
### External Zookeeper
54
55
```java { .api }
56
public KafkaContainer withExternalZookeeper(String connectString);
57
```
58
59
**Parameters**:
60
- `connectString` (String): Zookeeper connection string (e.g., "zookeeper:2181")
61
62
**Returns**: KafkaContainer instance for method chaining
63
64
**Throws**: IllegalStateException if KRaft mode is already enabled
65
66
Configures the container to connect to an external Zookeeper instance.
67
68
### KRaft Mode
69
70
```java { .api }
71
public KafkaContainer withKraft();
72
```
73
74
**Returns**: KafkaContainer instance for method chaining
75
76
**Throws**:
77
- IllegalStateException if external Zookeeper is configured
78
- IllegalArgumentException if Confluent Platform version is below 7.0.0
79
80
Enables KRaft mode (Kafka without Zookeeper) for Confluent Platform 7.0.0 and later.
81
82
### Cluster ID Configuration
83
84
```java { .api }
85
public KafkaContainer withClusterId(String clusterId);
86
```
87
88
**Parameters**:
89
- `clusterId` (String): Custom cluster ID for KRaft mode
90
91
**Returns**: KafkaContainer instance for method chaining
92
93
**Throws**: NullPointerException if clusterId is null
94
95
Sets a custom cluster ID for KRaft mode. If not specified, uses the default cluster ID.
96
97
## Listener Configuration
98
99
### withListener(Supplier<String> listenerSupplier)
100
101
Adds a dynamic listener using a supplier function.
102
103
```java { .api }
104
public KafkaContainer withListener(Supplier<String> listenerSupplier);
105
```
106
107
**Parameters**:
108
- `listenerSupplier` (Supplier<String>): Supplier providing listener in format `"host:port"`
109
110
**Returns**: KafkaContainer instance for method chaining
111
112
The supplier allows for dynamic listener configuration that can be resolved at container start time.
113
114
## Usage Examples
115
116
### Basic Usage with Embedded Zookeeper
117
118
```java
119
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))) {
120
kafka.start();
121
122
String bootstrapServers = kafka.getBootstrapServers();
123
// Returns: "PLAINTEXT://localhost:32768"
124
}
125
```
126
127
### KRaft Mode Usage
128
129
```java
130
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1"))
131
.withKraft()) {
132
kafka.start();
133
134
String bootstrapServers = kafka.getBootstrapServers();
135
}
136
```
137
138
### External Zookeeper Usage
139
140
```java
141
Network network = Network.newNetwork();
142
143
try (GenericContainer<?> zookeeper = new GenericContainer<>("confluentinc/cp-zookeeper:4.0.0")
144
.withNetwork(network)
145
.withNetworkAliases("zookeeper")
146
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
147
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
148
.withExternalZookeeper("zookeeper:2181")
149
.withNetwork(network)) {
150
151
zookeeper.start();
152
kafka.start();
153
}
154
```
155
156
### Dynamic Listener with Supplier
157
158
```java
159
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
160
.withListener(() -> "kafka-broker:9092")) {
161
kafka.start();
162
163
// The supplier is evaluated when the container starts
164
String bootstrapServers = kafka.getBootstrapServers();
165
}
166
```
167
168
## Constants
169
170
```java { .api }
171
public static final int KAFKA_PORT = 9093;
172
public static final int ZOOKEEPER_PORT = 2181;
173
public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";
174
```
175
176
**Note**: The legacy container uses port 9093 for Kafka (different from modern containers which use 9092).
177
178
## Default Configuration
179
180
The legacy container configures:
181
182
- **Default Mode**: Embedded Zookeeper
183
- **Kafka Port**: 9093 (exposed)
184
- **Zookeeper Port**: 2181 (exposed when using embedded Zookeeper)
185
- **Default Listeners**:
186
- `PLAINTEXT://0.0.0.0:9093` - External connections
187
- `BROKER://0.0.0.0:9092` - Inter-broker communication
188
- **Default Advertised Listeners**:
189
- `PLAINTEXT://host:mappedPort` - For external connections
190
- `BROKER://containerHostname:9092` - For inter-broker communication
191
192
## Migration Guide
193
194
To migrate from the legacy container:
195
196
```java
197
// Old (deprecated)
198
import org.testcontainers.containers.KafkaContainer;
199
KafkaContainer kafka = new KafkaContainer("confluentinc/cp-kafka:7.4.0");
200
201
// New (recommended)
202
import org.testcontainers.kafka.ConfluentKafkaContainer;
203
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0");
204
```
205
206
Key differences:
207
- Modern containers use port 9092 instead of 9093
208
- Modern containers enable KRaft mode by default
209
- Modern containers don't include protocol prefix in bootstrap servers
210
- Modern containers have simpler listener configuration