0
# Embedded Kafka Brokers
1
2
Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations for running Kafka instances in test environments without external dependencies.
3
4
## Capabilities
5
6
### EmbeddedKafkaBroker Interface
7
8
Core interface defining embedded Kafka broker functionality with lifecycle management and topic operations.
9
10
```java { .api }
11
/**
12
* Core interface for embedded Kafka broker functionality
13
*/
14
public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
15
int DEFAULT_ADMIN_TIMEOUT = 10;
16
String BEAN_NAME = "embeddedKafka";
17
String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
18
String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
19
String LOOPBACK = "127.0.0.1";
20
21
/**
22
* Set explicit ports on which the kafka brokers will listen
23
* @param ports the ports
24
* @return the EmbeddedKafkaBroker
25
*/
26
EmbeddedKafkaBroker kafkaPorts(int... ports);
27
28
/**
29
* Specify properties to configure Kafka Broker before start
30
* @param properties the properties to use for configuring Kafka Broker(s)
31
* @return this for chaining configuration
32
*/
33
EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);
34
35
/**
36
* Set the system property with this name to the list of broker addresses
37
* @param brokerListProperty the brokerListProperty to set
38
* @return this broker
39
*/
40
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
41
42
/**
43
* Set the timeout in seconds for admin operations
44
* @param adminTimeout the timeout
45
* @return the EmbeddedKafkaBroker
46
*/
47
EmbeddedKafkaBroker adminTimeout(int adminTimeout);
48
49
/**
50
* Get the bootstrap server addresses as a String
51
* @return the bootstrap servers
52
*/
53
String getBrokersAsString();
54
55
/**
56
* Add topics to the existing broker(s) using the configured number of partitions
57
* @param topicsToAdd the topics
58
*/
59
void addTopics(String... topicsToAdd);
60
61
/**
62
* Add topics to the existing broker(s)
63
* @param topicsToAdd the topics
64
*/
65
void addTopics(NewTopic... topicsToAdd);
66
67
/**
68
* Add topics to the existing broker(s) and return results
69
* @param topicsToAdd the topics
70
* @return the results; null values indicate success
71
*/
72
Map<String, Exception> addTopicsWithResults(String... topicsToAdd);
73
74
/**
75
* Add topics to the existing broker(s) and return results
76
* @param topicsToAdd the topics
77
* @return the results; null values indicate success
78
*/
79
Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);
80
81
/**
82
* Subscribe a consumer to one or more of the embedded topics
83
* @param consumer the consumer
84
* @param seekToEnd true to seek to the end instead of the beginning
85
* @param topicsToConsume the topics
86
*/
87
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);
88
89
/**
90
* Subscribe a consumer to one or more of the embedded topics
91
* @param consumer the consumer
92
* @param topicsToConsume the topics
93
*/
94
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);
95
96
/**
97
* Subscribe a consumer to one of the embedded topics
98
* @param consumer the consumer
99
* @param seekToEnd true to seek to the end instead of the beginning
100
* @param topic the topic
101
*/
102
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);
103
104
/**
105
* Subscribe a consumer to one of the embedded topics
106
* @param consumer the consumer
107
* @param topic the topic
108
*/
109
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);
110
111
/**
112
* Subscribe a consumer to all the embedded topics
113
* @param consumer the consumer
114
* @param seekToEnd true to seek to the end instead of the beginning
115
*/
116
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);
117
118
/**
119
* Subscribe a consumer to all the embedded topics
120
* @param consumer the consumer
121
*/
122
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);
123
124
/**
125
* Get the topics
126
* @return the topics
127
*/
128
Set<String> getTopics();
129
130
/**
131
* Get the configured number of partitions per topic
132
* @return the partition count
133
*/
134
int getPartitionsPerTopic();
135
}
136
```
137
138
### ZooKeeper-based Embedded Broker
139
140
Implementation using ZooKeeper for coordination, providing full Kafka functionality with traditional ZooKeeper-based coordination.
141
142
```java { .api }
143
/**
144
* ZooKeeper-based embedded Kafka broker implementation
145
*/
146
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
147
public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
148
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
149
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
150
151
/**
152
* Create embedded Kafka brokers
153
* @param count the number of brokers
154
*/
155
public EmbeddedKafkaZKBroker(int count);
156
157
/**
158
* Create embedded Kafka brokers
159
* @param count the number of brokers
160
* @param controlledShutdown passed into TestUtils.createBrokerConfig
161
* @param topics the topics to create (2 partitions per)
162
*/
163
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics);
164
165
/**
166
* Create embedded Kafka brokers listening on random ports
167
* @param count the number of brokers
168
* @param controlledShutdown passed into TestUtils.createBrokerConfig
169
* @param partitions partitions per topic
170
* @param topics the topics to create
171
*/
172
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics);
173
174
/**
175
* Specify a broker property
176
* @param property the property name
177
* @param value the value
178
* @return the EmbeddedKafkaBroker
179
*/
180
public EmbeddedKafkaBroker brokerProperty(String property, Object value);
181
182
/**
183
* Set an explicit port for the embedded Zookeeper
184
* @param port the port
185
* @return the EmbeddedKafkaBroker
186
*/
187
public EmbeddedKafkaZKBroker zkPort(int port);
188
189
/**
190
* Get the port that the embedded Zookeeper is running on or will run on
191
* @return the port
192
*/
193
public int getZkPort();
194
195
/**
196
* Set connection timeout for the client to the embedded Zookeeper
197
* @param zkConnectionTimeout the connection timeout
198
* @return the EmbeddedKafkaBroker
199
*/
200
public synchronized EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout);
201
202
/**
203
* Set session timeout for the client to the embedded Zookeeper
204
* @param zkSessionTimeout the session timeout
205
* @return the EmbeddedKafkaBroker
206
*/
207
public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout);
208
209
/**
210
* Create an AdminClient; invoke the callback and reliably close the admin
211
* @param callback the callback
212
*/
213
public void doWithAdmin(Consumer<AdminClient> callback);
214
215
/**
216
* Create an AdminClient; invoke the callback and reliably close the admin
217
* @param callback the callback
218
* @param <T> the function return type
219
* @return a map of results
220
*/
221
public <T> T doWithAdminFunction(Function<AdminClient, T> callback);
222
223
/**
224
* Get the underlying Kafka servers
225
* @return the Kafka servers
226
*/
227
public List<KafkaServer> getKafkaServers();
228
229
/**
230
* Get specific Kafka server
231
* @param id the server id
232
* @return the Kafka server
233
*/
234
public KafkaServer getKafkaServer(int id);
235
236
/**
237
* Get embedded ZooKeeper
238
* @return the ZooKeeper instance
239
*/
240
public EmbeddedZookeeper getZookeeper();
241
242
/**
243
* Return the ZooKeeperClient
244
* @return the client
245
*/
246
public synchronized ZooKeeperClient getZooKeeperClient();
247
248
/**
249
* Get ZooKeeper connection string
250
* @return the connection string
251
*/
252
public String getZookeeperConnectionString();
253
254
/**
255
* Get broker address by index
256
* @param i the index
257
* @return the broker address
258
*/
259
public BrokerAddress getBrokerAddress(int i);
260
261
/**
262
* Get all broker addresses
263
* @return array of broker addresses
264
*/
265
public BrokerAddress[] getBrokerAddresses();
266
267
/**
268
* Restart a broker
269
* @param brokerAddress the broker to restart
270
*/
271
public void bounce(BrokerAddress brokerAddress);
272
273
/**
274
* Restart broker by index
275
* @param index the broker index
276
* @throws Exception if restart fails
277
*/
278
public void restart(int index) throws Exception;
279
}
280
```
281
282
### KRaft-based Embedded Broker
283
284
Implementation using KRaft (Kafka Raft) for coordination, providing ZooKeeper-free Kafka functionality.
285
286
```java { .api }
287
/**
288
* KRaft-based embedded Kafka broker implementation (ZooKeeper-free)
289
*/
290
public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
291
public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
292
public static final int DEFAULT_ADMIN_TIMEOUT = 10;
293
294
/**
295
* Create embedded Kafka brokers listening on random ports
296
* @param count the number of brokers
297
* @param partitions partitions per topic
298
* @param topics the topics to create
299
*/
300
public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics);
301
302
/**
303
* Specify a broker property
304
* @param property the property name
305
* @param value the value
306
* @return the EmbeddedKafkaBroker
307
*/
308
public EmbeddedKafkaBroker brokerProperty(String property, Object value);
309
310
/**
311
* Set the timeout in seconds for admin operations
312
* @param adminTimeout the timeout
313
*/
314
public void setAdminTimeout(int adminTimeout);
315
316
/**
317
* Create an AdminClient; invoke the callback and reliably close the admin
318
* @param callback the callback
319
*/
320
public void doWithAdmin(Consumer<AdminClient> callback);
321
322
/**
323
* Create an AdminClient; invoke the callback and reliably close the admin
324
* @param callback the callback
325
* @param <T> the function return type
326
* @return a map of results
327
*/
328
public <T> T doWithAdminFunction(Function<AdminClient, T> callback);
329
330
/**
331
* Get underlying test cluster
332
* @return the cluster
333
*/
334
public KafkaClusterTestKit getCluster();
335
}
336
```
337
338
### Broker Factory
339
340
Factory for creating embedded Kafka brokers from annotation configuration.
341
342
```java { .api }
343
/**
344
* Factory to encapsulate EmbeddedKafkaBroker creation logic
345
*/
346
public final class EmbeddedKafkaBrokerFactory {
347
/**
348
* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
349
* @param embeddedKafka the EmbeddedKafka annotation
350
* @return a new EmbeddedKafkaBroker instance
351
*/
352
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka);
353
354
/**
355
* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
356
* @param embeddedKafka the EmbeddedKafka annotation
357
* @param propertyResolver the Function for placeholders in the annotation attributes
358
* @return a new EmbeddedKafkaBroker instance
359
*/
360
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver);
361
}
362
```
363
364
**Usage Examples:**
365
366
```java
367
// ZooKeeper-based broker
368
EmbeddedKafkaBroker zkBroker = new EmbeddedKafkaZKBroker(1, false, 2, "test-topic")
369
.brokerProperty("auto.create.topics.enable", "true")
370
.kafkaPorts(9092)
371
.zkPort(2181);
372
zkBroker.afterPropertiesSet();
373
374
// KRaft-based broker
375
EmbeddedKafkaBroker kraftBroker = new EmbeddedKafkaKraftBroker(1, 2, "test-topic")
376
.brokerProperty("auto.create.topics.enable", "true")
377
.adminTimeout(30);
378
kraftBroker.afterPropertiesSet();
379
380
// Using factory
381
@EmbeddedKafka(kraft = true, topics = "test-topic")
382
EmbeddedKafkaBroker broker = EmbeddedKafkaBrokerFactory.create(embeddedKafkaAnnotation);
383
384
// Topic management
385
broker.addTopics("new-topic");
386
broker.addTopics(new NewTopic("configured-topic", 3, (short) 1));
387
388
// Consumer subscription
389
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
390
broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
391
```
392
393
## Types
394
395
```java { .api }
396
public class BrokerAddress {
397
public static final int DEFAULT_PORT = 9092;
398
399
public BrokerAddress(String host, int port);
400
public BrokerAddress(String host);
401
public BrokerAddress(BrokerEndPoint broker);
402
public static BrokerAddress fromAddress(String address);
403
public String getHost();
404
public int getPort();
405
}
406
```