Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
—
Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification patterns.
Helper methods for creating standard test configurations for consumers, producers, and Kafka Streams.
/**
* Kafka testing utilities
*/
public final class KafkaTestUtils {
/**
* Set up test properties for an <Integer, String> consumer
* @param group the group id
* @param autoCommit the auto commit
* @param embeddedKafka a EmbeddedKafkaBroker instance
* @return the properties
*/
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);
/**
* Set up test properties for an <Integer, String> consumer
* @param brokers the bootstrapServers property
* @param group the group id
* @return the properties
*/
public static Map<String, Object> consumerProps(String brokers, String group);
/**
* Set up test properties for an <Integer, String> consumer
* @param brokers the bootstrapServers property
* @param group the group id
* @param autoCommit the auto commit
* @return the properties
*/
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);
/**
* Set up test properties for an <Integer, String> producer
* @param embeddedKafka a EmbeddedKafkaBroker instance
* @return the properties
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);
/**
* Set up test properties for an <Integer, String> producer
* @param brokers the bootstrapServers property
* @return the properties
*/
public static Map<String, Object> producerProps(String brokers);
/**
* Set up test properties for the Kafka Streams
* @param applicationId the applicationId for the Kafka Streams
* @param brokers the bootstrapServers property
* @return the properties
*/
public static Map<String, Object> streamsProps(String applicationId, String brokers);
}Methods for polling and retrieving messages from Kafka topics with various timeout and filtering options.
public final class KafkaTestUtils {
/**
* Poll the consumer, expecting a single record for the specified topic
* @param consumer the consumer
* @param topic the topic
* @param <K> the key type
* @param <V> the value type
* @return the record
* @throws IllegalStateException if exactly one record is not received
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);
/**
* Poll the consumer, expecting a single record for the specified topic
* @param consumer the consumer
* @param topic the topic
* @param timeout max duration to wait for records
* @param <K> the key type
* @param <V> the value type
* @return the record
* @throws IllegalStateException if exactly one record is not received
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);
/**
* Get a single record for the group from the topic/partition
* @param brokerAddresses the broker address(es)
* @param group the group
* @param topic the topic
* @param partition the partition
* @param seekToLast true to fetch an existing last record, if present
* @param commit commit offset after polling or not
* @param timeout the timeout
* @return the record or null if no record received
*/
@Nullable
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
boolean seekToLast, boolean commit, Duration timeout);
/**
* Poll the consumer for records
* @param consumer the consumer
* @param <K> the key type
* @param <V> the value type
* @return the records
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);
/**
* Poll the consumer for records
* @param consumer the consumer
* @param timeout max time to wait for records
* @param <K> the key type
* @param <V> the value type
* @return the records
* @throws IllegalStateException if the poll returns null
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);
/**
* Poll the consumer for records
* @param consumer the consumer
* @param timeout max time to wait for records
* @param minRecords wait until the timeout or at least this number of records are received
* @param <K> the key type
* @param <V> the value type
* @return the records
* @throws IllegalStateException if the poll returns null
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
}Methods for retrieving consumer group offsets and topic partition metadata.
public final class KafkaTestUtils {
/**
* Get the current offset and metadata for the provided group/topic/partition
* @param brokerAddresses the broker address(es)
* @param group the group
* @param topic the topic
* @param partition the partition
* @return the offset and metadata
* @throws Exception if an exception occurs
*/
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
throws Exception;
/**
* Get the current offset and metadata for the provided group/topic/partition
* @param adminClient the AdminClient instance
* @param group the group
* @param topic the topic
* @param partition the partition
* @return the offset and metadata
* @throws Exception if an exception occurs
*/
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
throws Exception;
/**
* Return the end offsets of the requested topic/partitions
* @param consumer the consumer
* @param topic the topic
* @param partitions the partitions, or null for all partitions
* @return the map of end offsets
*/
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
}Utilities for accessing nested properties in objects using dotted notation.
public final class KafkaTestUtils {
/**
* Uses nested DirectFieldAccessors to obtain a property using dotted notation to traverse fields
* @param root The object
* @param propertyPath The path
* @return The field
*/
public static Object getPropertyValue(Object root, String propertyPath);
/**
* A typed version of getPropertyValue(Object, String)
* @param root the object
* @param propertyPath the path
* @param type the type to cast the object to
* @param <T> the type
* @return the field value
*/
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);
/**
* Return a Properties object equal to the default consumer property overrides
* Useful when matching arguments in Mockito tests
* @return the default properties
*/
public static Properties defaultPropertyOverrides();
}Additional utilities for JUnit testing scenarios.
/**
* JUnit testing utilities
*/
public final class JUnitUtils {
// Utility methods for JUnit integration
}Usage Examples:
// Basic consumer and producer setup
@Test
public void testBasicProducerConsumer() throws Exception {
// Create consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
// Create producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<Integer, String> producer = new KafkaProducer<>(producerProps);
// Send message
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
// Consume and verify
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertEquals("test-message", record.value());
assertEquals(Integer.valueOf(1), record.key());
}
// Multiple records polling
@Test
public void testMultipleRecords() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send multiple messages
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("test-topic", "key-" + i, "message-" + i));
}
// Poll for multiple records with timeout
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 5);
assertEquals(5, records.count());
// Verify each record
for (ConsumerRecord<String, String> record : records) {
assertTrue(record.key().startsWith("key-"));
assertTrue(record.value().startsWith("message-"));
}
}
// Offset management
@Test
public void testOffsetManagement() throws Exception {
String brokerAddresses = embeddedKafka.getBrokersAsString();
String group = "offset-test-group";
String topic = "offset-topic";
int partition = 0;
// Send a message
Map<String, Object> producerProps = KafkaTestUtils.producerProps(brokerAddresses);
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>(topic, partition, "key", "value"));
// Consume the message
ConsumerRecord<?, ?> record = KafkaTestUtils.getOneRecord(
brokerAddresses, group, topic, partition, false, true, Duration.ofSeconds(10)
);
assertNotNull(record);
// Check committed offset
OffsetAndMetadata offset = KafkaTestUtils.getCurrentOffset(brokerAddresses, group, topic, partition);
assertNotNull(offset);
assertEquals(1L, offset.offset());
}
// End offsets
@Test
public void testEndOffsets() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("end-offset-group", "false", embeddedKafka);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// Get end offsets for all partitions
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic");
assertFalse(endOffsets.isEmpty());
// Get end offsets for specific partitions
Map<TopicPartition, Long> specificOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic", 0, 1);
assertEquals(2, specificOffsets.size());
}
// Kafka Streams configuration
@Test
public void testStreamsConfiguration() {
String applicationId = "test-streams-app";
String brokers = embeddedKafka.getBrokersAsString();
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps(applicationId, brokers);
assertEquals(applicationId, streamsProps.get(StreamsConfig.APPLICATION_ID_CONFIG));
assertEquals(brokers, streamsProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
}
// Property access utilities
@Test
public void testPropertyAccess() {
SomeComplexObject obj = new SomeComplexObject();
// Access nested properties
String nestedValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value", String.class);
Object rawValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value");
// Default properties
Properties defaults = KafkaTestUtils.defaultPropertyOverrides();
assertEquals("false", defaults.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test