0
# JUnit Integration
1
2
JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions with automatic broker setup and teardown.
3
4
## Capabilities
5
6
### JUnit 4 Rule Support
7
8
JUnit 4 rule wrapper for embedded Kafka broker with automatic lifecycle management.
9
10
```java { .api }
11
/**
12
* A JUnit rules TestRule wrapper around an EmbeddedKafkaBroker
13
*/
14
public class EmbeddedKafkaRule extends ExternalResource {
15
/**
16
* Create embedded Kafka brokers
17
* @param count the number of brokers
18
*/
19
public EmbeddedKafkaRule(int count);
20
21
/**
22
* Create embedded Kafka brokers
23
* @param count the number of brokers
24
* @param controlledShutdown passed into TestUtils.createBrokerConfig
25
* @param topics the topics to create (2 partitions per)
26
*/
27
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics);
28
29
/**
30
* Create embedded Kafka brokers listening on random ports
31
* @param count the number of brokers
32
* @param controlledShutdown passed into TestUtils.createBrokerConfig
33
* @param partitions partitions per topic
34
* @param topics the topics to create
35
*/
36
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);
37
38
/**
39
* Specify the properties to configure Kafka Broker before start
40
* @param brokerProperties the properties to use for configuring Kafka Broker(s)
41
* @return this for chaining configuration
42
*/
43
public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);
44
45
/**
46
* Specify a broker property
47
* @param property the property name
48
* @param value the value
49
* @return the EmbeddedKafkaRule
50
*/
51
public EmbeddedKafkaRule brokerProperty(String property, Object value);
52
53
/**
54
* Set explicit ports on which the kafka brokers will listen
55
* @param kafkaPorts the ports
56
* @return the rule
57
*/
58
public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);
59
60
/**
61
* Set ZooKeeper port
62
* @param port the port
63
* @return the rule
64
*/
65
public EmbeddedKafkaRule zkPort(int port);
66
67
/**
68
* Return an underlying delegator EmbeddedKafkaBroker instance
69
* @return the EmbeddedKafkaBroker instance
70
*/
71
public EmbeddedKafkaBroker getEmbeddedKafka();
72
}
73
```
74
75
### JUnit 5 Extension Support
76
77
JUnit 5 condition and extension for embedded broker setup with parameter injection support.
78
79
```java { .api }
80
/**
81
* JUnit5 condition for an embedded broker
82
*/
83
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
84
/**
85
* Check if parameter is supported for injection
86
* @param parameterContext the parameter context
87
* @param extensionContext the extension context
88
* @return true if EmbeddedKafkaBroker parameter is supported
89
*/
90
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
91
92
/**
93
* Resolve parameter for injection
94
* @param parameterContext the parameter context
95
* @param context the extension context
96
* @return the EmbeddedKafkaBroker instance
97
*/
98
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context);
99
100
/**
101
* Clean up after all tests
102
* @param context the extension context
103
*/
104
public void afterAll(ExtensionContext context);
105
106
/**
107
* Evaluate execution condition
108
* @param context the extension context
109
* @return condition evaluation result
110
*/
111
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context);
112
113
/**
114
* Get the current thread's broker instance
115
* @return the EmbeddedKafkaBroker
116
*/
117
public static EmbeddedKafkaBroker getBroker();
118
}
119
```
120
121
### Global Test Execution Listener
122
123
Spring TestContext integration for global embedded Kafka lifecycle management.
124
125
```java { .api }
126
/**
127
* Global test execution listener for embedded Kafka lifecycle
128
*/
129
public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {
130
// Automatic lifecycle management through Spring TestContext
131
}
132
```
133
134
**Usage Examples:**
135
136
```java
137
// JUnit 4 Rule - Basic Setup
138
public class JUnit4KafkaTest {
139
@Rule
140
public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, "test-topic");
141
142
@Test
143
public void testKafkaWithRule() throws Exception {
144
EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
145
146
// Create consumer and producer
147
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
148
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
149
150
try (Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
151
Producer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
152
153
broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
154
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
155
156
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
157
assertEquals("test-message", record.value());
158
}
159
}
160
}
161
162
// JUnit 4 Rule - Advanced Configuration
163
public class JUnit4AdvancedKafkaTest {
164
@Rule
165
public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 3, "orders", "payments")
166
.brokerProperties(Map.of(
167
"auto.create.topics.enable", "true",
168
"transaction.state.log.replication.factor", "1"
169
))
170
.brokerProperty("offsets.topic.replication.factor", "1")
171
.kafkaPorts(9092, 9093)
172
.zkPort(2181);
173
174
@Test
175
public void testMultiBrokerSetup() {
176
EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
177
178
assertTrue(broker.getTopics().contains("orders"));
179
assertTrue(broker.getTopics().contains("payments"));
180
assertTrue(broker.getBrokersAsString().contains("9092"));
181
assertTrue(broker.getBrokersAsString().contains("9093"));
182
}
183
}
184
185
// JUnit 5 Extension - Parameter Injection
186
@ExtendWith(EmbeddedKafkaCondition.class)
187
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
188
public class JUnit5ParameterInjectionTest {
189
190
@Test
191
public void testWithInjectedBroker(EmbeddedKafkaBroker embeddedKafka) throws Exception {
192
// Broker is automatically injected
193
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
194
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
195
196
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
197
Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
198
199
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
200
producer.send(new ProducerRecord<>("test-topic", "key", "value"));
201
202
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
203
assertEquals("value", record.value());
204
}
205
}
206
207
@Test
208
public void testStaticBrokerAccess() {
209
// Alternative access via static method
210
EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
211
assertNotNull(broker);
212
assertTrue(broker.getTopics().contains("test-topic"));
213
}
214
}
215
216
// JUnit 5 Extension - Without Spring
217
@EmbeddedKafka(count = 2, partitions = 2, topics = { "events", "commands" })
218
public class JUnit5NonSpringTest {
219
220
@Test
221
public void testNonSpringSetup() {
222
// Access broker via static method when not using parameter injection
223
EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
224
225
assertEquals(2, broker.getPartitionsPerTopic());
226
assertTrue(broker.getTopics().contains("events"));
227
assertTrue(broker.getTopics().contains("commands"));
228
}
229
}
230
231
// JUnit 5 with Spring TestContext
232
@SpringBootTest
233
@EmbeddedKafka(partitions = 1, topics = { "integration-topic" })
234
public class JUnit5SpringIntegrationTest {
235
236
@Autowired
237
private EmbeddedKafkaBroker embeddedKafka;
238
239
@Test
240
public void testSpringIntegration() {
241
// Broker is automatically configured by Spring TestContext
242
assertNotNull(embeddedKafka);
243
assertTrue(embeddedKafka.getTopics().contains("integration-topic"));
244
}
245
}
246
247
// Complex JUnit 4 Test with Multiple Rules
248
public class ComplexJUnit4Test {
249
250
@Rule
251
public EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, "source-topic", "sink-topic")
252
.brokerProperty("auto.create.topics.enable", "false");
253
254
@Rule
255
public TestRule chain = RuleChain
256
.outerRule(kafkaRule)
257
.around(new CustomTestRule());
258
259
@Test
260
public void testKafkaStreamsProcessing() throws Exception {
261
EmbeddedKafkaBroker broker = kafkaRule.getEmbeddedKafka();
262
263
// Setup Kafka Streams
264
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("test-streams", broker.getBrokersAsString());
265
266
// Build topology
267
StreamsBuilder builder = new StreamsBuilder();
268
builder.stream("source-topic")
269
.mapValues(value -> value.toString().toUpperCase())
270
.to("sink-topic");
271
272
Topology topology = builder.build();
273
274
try (KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps))) {
275
streams.start();
276
277
// Send input message
278
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
279
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
280
producer.send(new ProducerRecord<>("source-topic", "key", "hello world"));
281
}
282
283
// Verify output
284
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
285
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
286
broker.consumeFromAnEmbeddedTopic(consumer, "sink-topic");
287
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "sink-topic");
288
assertEquals("HELLO WORLD", record.value());
289
}
290
}
291
}
292
}
293
294
// Nested Test Classes with JUnit 5
295
@EmbeddedKafka(partitions = 1, topics = { "parent-topic" })
296
public class NestedJUnit5Test {
297
298
@Nested
299
@EmbeddedKafka(partitions = 2, topics = { "nested-topic" })
300
class NestedTests {
301
302
@Test
303
public void testNestedBroker(EmbeddedKafkaBroker embeddedKafka) {
304
// This broker has the nested configuration
305
assertTrue(embeddedKafka.getTopics().contains("nested-topic"));
306
assertEquals(2, embeddedKafka.getPartitionsPerTopic());
307
}
308
}
309
310
@Test
311
public void testParentBroker(EmbeddedKafkaBroker embeddedKafka) {
312
// This broker has the parent configuration
313
assertTrue(embeddedKafka.getTopics().contains("parent-topic"));
314
assertEquals(1, embeddedKafka.getPartitionsPerTopic());
315
}
316
}
317
```