Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
—
Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management without hard dependencies on container classes.
Utilities for testing listener containers with partition assignment waiting and lifecycle management.
/**
* Utilities for testing listener containers
* No hard references to container classes are used to avoid circular project dependencies
*/
public final class ContainerTestUtils {
/**
* Wait until the container has the required number of assigned partitions
* @param container the container
* @param partitions the number of partitions
* @throws IllegalStateException if the operation cannot be completed as expected
* @throws ContainerTestUtilsException if the call to the container's getAssignedPartitions() method fails
*/
public static void waitForAssignment(Object container, int partitions);
}
/**
* Exception thrown when container test utilities fail
*/
public static class ContainerTestUtilsException extends RuntimeException {
ContainerTestUtilsException(String message, Throwable cause);
}Usage Examples:
// Basic container assignment waiting
@SpringBootTest
@EmbeddedKafka(partitions = 3, topics = { "test-topic" })
public class ContainerAssignmentTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testSingleContainerAssignment() throws Exception {
// Configure consumer properties
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
// Create container factory
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
// Create container
ContainerProperties containerProperties = new ContainerProperties("test-topic");
containerProperties.setGroupId("test-group");
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
container.start();
try {
// Wait for all 3 partitions to be assigned
ContainerTestUtils.waitForAssignment(container, 3);
// Container is now ready for testing
assertTrue(container.isRunning());
} finally {
container.stop();
}
}
}
// Multi-container assignment waiting
@SpringBootTest
@EmbeddedKafka(partitions = 6, topics = { "multi-partition-topic" })
public class MultiContainerAssignmentTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testConcurrentContainerAssignment() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("multi-group", "false", embeddedKafka);
// Create concurrent container factory
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
factory.setConcurrency(3); // 3 consumer threads
// Create concurrent container
ContainerProperties containerProperties = new ContainerProperties("multi-partition-topic");
containerProperties.setGroupId("multi-group");
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
container.setConcurrency(3);
container.start();
try {
// Wait for all 6 partitions to be assigned across the 3 concurrent containers
ContainerTestUtils.waitForAssignment(container, 6);
// All partitions are now assigned
assertTrue(container.isRunning());
} finally {
container.stop();
}
}
}
// Integration test with message processing
@SpringBootTest
@EmbeddedKafka(partitions = 2, topics = { "processing-topic" })
public class MessageProcessingContainerTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
private final CountDownLatch latch = new CountDownLatch(3);
private final List<String> receivedMessages = new ArrayList<>();
@Test
public void testMessageProcessingWithContainer() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("processing-group", "false", embeddedKafka);
// Create container with message listener
ContainerProperties containerProperties = new ContainerProperties("processing-topic");
containerProperties.setGroupId("processing-group");
containerProperties.setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
receivedMessages.add(record.value());
latch.countDown();
}
});
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.start();
try {
// Wait for partition assignment
ContainerTestUtils.waitForAssignment(container, 2);
// Send test messages
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>("processing-topic", "key1", "message1"));
producer.send(new ProducerRecord<>("processing-topic", "key2", "message2"));
producer.send(new ProducerRecord<>("processing-topic", "key3", "message3"));
}
// Wait for messages to be processed
assertTrue(latch.await(30, TimeUnit.SECONDS));
// Verify messages were received
assertEquals(3, receivedMessages.size());
assertTrue(receivedMessages.contains("message1"));
assertTrue(receivedMessages.contains("message2"));
assertTrue(receivedMessages.contains("message3"));
} finally {
container.stop();
}
}
}
// Error handling and timeout scenarios
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "error-topic" })
public class ContainerErrorHandlingTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testAssignmentTimeout() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("error-group", "false", embeddedKafka);
// Create container with invalid topic (should not exist)
ContainerProperties containerProperties = new ContainerProperties("non-existent-topic");
containerProperties.setGroupId("error-group");
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.start();
try {
// This should throw IllegalStateException due to no partitions being assigned
assertThrows(IllegalStateException.class, () -> {
ContainerTestUtils.waitForAssignment(container, 1);
});
} finally {
container.stop();
}
}
@Test
public void testPartialAssignment() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("partial-group", "false", embeddedKafka);
ContainerProperties containerProperties = new ContainerProperties("error-topic");
containerProperties.setGroupId("partial-group");
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.start();
try {
// Wait for actual partition count (1), not more
ContainerTestUtils.waitForAssignment(container, 1);
// This should throw because we expect more partitions than available
assertThrows(IllegalStateException.class, () -> {
ContainerTestUtils.waitForAssignment(container, 5);
});
} finally {
container.stop();
}
}
}
// Custom container implementations
@SpringBootTest
@EmbeddedKafka(partitions = 4, topics = { "custom-topic" })
public class CustomContainerTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testCustomContainerImplementation() throws Exception {
// Create a custom container that implements getAssignedPartitions()
CustomKafkaContainer customContainer = new CustomKafkaContainer(embeddedKafka, "custom-topic");
customContainer.start();
try {
// ContainerTestUtils uses reflection to call getAssignedPartitions()
ContainerTestUtils.waitForAssignment(customContainer, 4);
// Verify custom container is working
assertTrue(customContainer.isRunning());
assertEquals(4, customContainer.getAssignedPartitions().size());
} finally {
customContainer.stop();
}
}
private static class CustomKafkaContainer {
private Consumer<String, String> consumer;
private boolean running = false;
private Collection<TopicPartition> assignedPartitions = new HashSet<>();
public CustomKafkaContainer(EmbeddedKafkaBroker broker, String topic) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("custom-group", "false", broker);
this.consumer = new KafkaConsumer<>(consumerProps);
}
public void start() {
consumer.subscribe(Arrays.asList("custom-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
assignedPartitions.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assignedPartitions = new HashSet<>(partitions);
}
});
// Trigger initial assignment
consumer.poll(Duration.ofMillis(100));
running = true;
}
public void stop() {
running = false;
if (consumer != null) {
consumer.close();
}
}
public boolean isRunning() {
return running;
}
// This method will be called by ContainerTestUtils via reflection
public Collection<TopicPartition> getAssignedPartitions() {
if (!running) {
return Collections.emptyList();
}
// Poll to trigger rebalance if needed
consumer.poll(Duration.ofMillis(100));
return new HashSet<>(assignedPartitions);
}
}
}
// Kafka Streams container testing
@SpringBootTest
@EmbeddedKafka(partitions = 2, topics = { "input-topic", "output-topic" })
public class KafkaStreamsContainerTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testKafkaStreamsWithContainerUtils() throws Exception {
// Setup Kafka Streams
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("streams-app", embeddedKafka.getBrokersAsString());
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.mapValues(value -> value.toString().toUpperCase())
.to("output-topic");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps));
// Create output consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("output-group", "false", embeddedKafka);
ContainerProperties containerProperties = new ContainerProperties("output-topic");
containerProperties.setGroupId("output-group");
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<String, String> outputContainer =
new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
try {
streams.start();
outputContainer.start();
// Wait for output topic partition assignment
ContainerTestUtils.waitForAssignment(outputContainer, 2);
// Send input message
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>("input-topic", "key", "hello streams"));
}
// Verify output
Map<String, Object> outputConsumerProps = KafkaTestUtils.consumerProps("verify-group", "false", embeddedKafka);
try (Consumer<String, String> verifyConsumer = new KafkaConsumer<>(outputConsumerProps)) {
embeddedKafka.consumeFromAnEmbeddedTopic(verifyConsumer, "output-topic");
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(verifyConsumer, "output-topic");
assertEquals("HELLO STREAMS", record.value());
}
} finally {
streams.close();
outputContainer.stop();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test