0
# Container Testing
1
2
Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management without hard dependencies on container classes.
3
4
## Capabilities
5
6
### Container Utilities
7
8
Utilities for testing listener containers with partition assignment waiting and lifecycle management.
9
10
```java { .api }
11
/**
12
* Utilities for testing listener containers
13
* No hard references to container classes are used to avoid circular project dependencies
14
*/
15
public final class ContainerTestUtils {
16
/**
17
* Wait until the container has the required number of assigned partitions
18
* @param container the container
19
* @param partitions the number of partitions
20
* @throws IllegalStateException if the operation cannot be completed as expected
21
* @throws ContainerTestUtilsException if the call to the container's getAssignedPartitions() method fails
22
*/
23
public static void waitForAssignment(Object container, int partitions);
24
}
25
26
/**
27
* Exception thrown when container test utilities fail
28
*/
29
public static class ContainerTestUtilsException extends RuntimeException {
30
ContainerTestUtilsException(String message, Throwable cause);
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
// Basic container assignment waiting
38
@SpringBootTest
39
@EmbeddedKafka(partitions = 3, topics = { "test-topic" })
40
public class ContainerAssignmentTest {
41
42
@Autowired
43
private EmbeddedKafkaBroker embeddedKafka;
44
45
@Test
46
public void testSingleContainerAssignment() throws Exception {
47
// Configure consumer properties
48
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
49
50
// Create container factory
51
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
52
new ConcurrentKafkaListenerContainerFactory<>();
53
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
54
55
// Create container
56
ContainerProperties containerProperties = new ContainerProperties("test-topic");
57
containerProperties.setGroupId("test-group");
58
59
KafkaMessageListenerContainer<Integer, String> container =
60
new KafkaMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
61
62
container.start();
63
try {
64
// Wait for all 3 partitions to be assigned
65
ContainerTestUtils.waitForAssignment(container, 3);
66
67
// Container is now ready for testing
68
assertTrue(container.isRunning());
69
70
} finally {
71
container.stop();
72
}
73
}
74
}
75
76
// Multi-container assignment waiting
77
@SpringBootTest
78
@EmbeddedKafka(partitions = 6, topics = { "multi-partition-topic" })
79
public class MultiContainerAssignmentTest {
80
81
@Autowired
82
private EmbeddedKafkaBroker embeddedKafka;
83
84
@Test
85
public void testConcurrentContainerAssignment() throws Exception {
86
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("multi-group", "false", embeddedKafka);
87
88
// Create concurrent container factory
89
ConcurrentKafkaListenerContainerFactory<String, String> factory =
90
new ConcurrentKafkaListenerContainerFactory<>();
91
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
92
factory.setConcurrency(3); // 3 consumer threads
93
94
// Create concurrent container
95
ContainerProperties containerProperties = new ContainerProperties("multi-partition-topic");
96
containerProperties.setGroupId("multi-group");
97
98
ConcurrentMessageListenerContainer<String, String> container =
99
new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
100
container.setConcurrency(3);
101
102
container.start();
103
try {
104
// Wait for all 6 partitions to be assigned across the 3 concurrent containers
105
ContainerTestUtils.waitForAssignment(container, 6);
106
107
// All partitions are now assigned
108
assertTrue(container.isRunning());
109
110
} finally {
111
container.stop();
112
}
113
}
114
}
115
116
// Integration test with message processing
117
@SpringBootTest
118
@EmbeddedKafka(partitions = 2, topics = { "processing-topic" })
119
public class MessageProcessingContainerTest {
120
121
@Autowired
122
private EmbeddedKafkaBroker embeddedKafka;
123
124
private final CountDownLatch latch = new CountDownLatch(3);
125
private final List<String> receivedMessages = new ArrayList<>();
126
127
@Test
128
public void testMessageProcessingWithContainer() throws Exception {
129
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("processing-group", "false", embeddedKafka);
130
131
// Create container with message listener
132
ContainerProperties containerProperties = new ContainerProperties("processing-topic");
133
containerProperties.setGroupId("processing-group");
134
containerProperties.setMessageListener(new MessageListener<String, String>() {
135
@Override
136
public void onMessage(ConsumerRecord<String, String> record) {
137
receivedMessages.add(record.value());
138
latch.countDown();
139
}
140
});
141
142
DefaultKafkaConsumerFactory<String, String> consumerFactory =
143
new DefaultKafkaConsumerFactory<>(consumerProps);
144
145
KafkaMessageListenerContainer<String, String> container =
146
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
147
148
container.start();
149
try {
150
// Wait for partition assignment
151
ContainerTestUtils.waitForAssignment(container, 2);
152
153
// Send test messages
154
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
155
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
156
producer.send(new ProducerRecord<>("processing-topic", "key1", "message1"));
157
producer.send(new ProducerRecord<>("processing-topic", "key2", "message2"));
158
producer.send(new ProducerRecord<>("processing-topic", "key3", "message3"));
159
}
160
161
// Wait for messages to be processed
162
assertTrue(latch.await(30, TimeUnit.SECONDS));
163
164
// Verify messages were received
165
assertEquals(3, receivedMessages.size());
166
assertTrue(receivedMessages.contains("message1"));
167
assertTrue(receivedMessages.contains("message2"));
168
assertTrue(receivedMessages.contains("message3"));
169
170
} finally {
171
container.stop();
172
}
173
}
174
}
175
176
// Error handling and timeout scenarios
177
@SpringBootTest
178
@EmbeddedKafka(partitions = 1, topics = { "error-topic" })
179
public class ContainerErrorHandlingTest {
180
181
@Autowired
182
private EmbeddedKafkaBroker embeddedKafka;
183
184
@Test
185
public void testAssignmentTimeout() {
186
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("error-group", "false", embeddedKafka);
187
188
// Create container with invalid topic (should not exist)
189
ContainerProperties containerProperties = new ContainerProperties("non-existent-topic");
190
containerProperties.setGroupId("error-group");
191
192
DefaultKafkaConsumerFactory<String, String> consumerFactory =
193
new DefaultKafkaConsumerFactory<>(consumerProps);
194
195
KafkaMessageListenerContainer<String, String> container =
196
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
197
198
container.start();
199
try {
200
// This should throw IllegalStateException due to no partitions being assigned
201
assertThrows(IllegalStateException.class, () -> {
202
ContainerTestUtils.waitForAssignment(container, 1);
203
});
204
205
} finally {
206
container.stop();
207
}
208
}
209
210
@Test
211
public void testPartialAssignment() throws Exception {
212
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("partial-group", "false", embeddedKafka);
213
214
ContainerProperties containerProperties = new ContainerProperties("error-topic");
215
containerProperties.setGroupId("partial-group");
216
217
DefaultKafkaConsumerFactory<String, String> consumerFactory =
218
new DefaultKafkaConsumerFactory<>(consumerProps);
219
220
KafkaMessageListenerContainer<String, String> container =
221
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
222
223
container.start();
224
try {
225
// Wait for actual partition count (1), not more
226
ContainerTestUtils.waitForAssignment(container, 1);
227
228
// This should throw because we expect more partitions than available
229
assertThrows(IllegalStateException.class, () -> {
230
ContainerTestUtils.waitForAssignment(container, 5);
231
});
232
233
} finally {
234
container.stop();
235
}
236
}
237
}
238
239
// Custom container implementations
240
@SpringBootTest
241
@EmbeddedKafka(partitions = 4, topics = { "custom-topic" })
242
public class CustomContainerTest {
243
244
@Autowired
245
private EmbeddedKafkaBroker embeddedKafka;
246
247
@Test
248
public void testCustomContainerImplementation() throws Exception {
249
// Create a custom container that implements getAssignedPartitions()
250
CustomKafkaContainer customContainer = new CustomKafkaContainer(embeddedKafka, "custom-topic");
251
252
customContainer.start();
253
try {
254
// ContainerTestUtils uses reflection to call getAssignedPartitions()
255
ContainerTestUtils.waitForAssignment(customContainer, 4);
256
257
// Verify custom container is working
258
assertTrue(customContainer.isRunning());
259
assertEquals(4, customContainer.getAssignedPartitions().size());
260
261
} finally {
262
customContainer.stop();
263
}
264
}
265
266
private static class CustomKafkaContainer {
267
private Consumer<String, String> consumer;
268
private boolean running = false;
269
private Collection<TopicPartition> assignedPartitions = new HashSet<>();
270
271
public CustomKafkaContainer(EmbeddedKafkaBroker broker, String topic) {
272
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("custom-group", "false", broker);
273
this.consumer = new KafkaConsumer<>(consumerProps);
274
}
275
276
public void start() {
277
consumer.subscribe(Arrays.asList("custom-topic"), new ConsumerRebalanceListener() {
278
@Override
279
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
280
assignedPartitions.clear();
281
}
282
283
@Override
284
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
285
assignedPartitions = new HashSet<>(partitions);
286
}
287
});
288
289
// Trigger initial assignment
290
consumer.poll(Duration.ofMillis(100));
291
running = true;
292
}
293
294
public void stop() {
295
running = false;
296
if (consumer != null) {
297
consumer.close();
298
}
299
}
300
301
public boolean isRunning() {
302
return running;
303
}
304
305
// This method will be called by ContainerTestUtils via reflection
306
public Collection<TopicPartition> getAssignedPartitions() {
307
if (!running) {
308
return Collections.emptyList();
309
}
310
311
// Poll to trigger rebalance if needed
312
consumer.poll(Duration.ofMillis(100));
313
return new HashSet<>(assignedPartitions);
314
}
315
}
316
}
317
318
// Kafka Streams container testing
319
@SpringBootTest
320
@EmbeddedKafka(partitions = 2, topics = { "input-topic", "output-topic" })
321
public class KafkaStreamsContainerTest {
322
323
@Autowired
324
private EmbeddedKafkaBroker embeddedKafka;
325
326
@Test
327
public void testKafkaStreamsWithContainerUtils() throws Exception {
328
// Setup Kafka Streams
329
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("streams-app", embeddedKafka.getBrokersAsString());
330
331
StreamsBuilder builder = new StreamsBuilder();
332
builder.stream("input-topic")
333
.mapValues(value -> value.toString().toUpperCase())
334
.to("output-topic");
335
336
Topology topology = builder.build();
337
KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps));
338
339
// Create output consumer
340
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("output-group", "false", embeddedKafka);
341
342
ContainerProperties containerProperties = new ContainerProperties("output-topic");
343
containerProperties.setGroupId("output-group");
344
345
DefaultKafkaConsumerFactory<String, String> consumerFactory =
346
new DefaultKafkaConsumerFactory<>(consumerProps);
347
348
KafkaMessageListenerContainer<String, String> outputContainer =
349
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
350
351
try {
352
streams.start();
353
outputContainer.start();
354
355
// Wait for output topic partition assignment
356
ContainerTestUtils.waitForAssignment(outputContainer, 2);
357
358
// Send input message
359
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
360
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
361
producer.send(new ProducerRecord<>("input-topic", "key", "hello streams"));
362
}
363
364
// Verify output
365
Map<String, Object> outputConsumerProps = KafkaTestUtils.consumerProps("verify-group", "false", embeddedKafka);
366
try (Consumer<String, String> verifyConsumer = new KafkaConsumer<>(outputConsumerProps)) {
367
embeddedKafka.consumeFromAnEmbeddedTopic(verifyConsumer, "output-topic");
368
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(verifyConsumer, "output-topic");
369
assertEquals("HELLO STREAMS", record.value());
370
}
371
372
} finally {
373
streams.close();
374
outputContainer.stop();
375
}
376
}
377
}
378
```