Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
npx @tessl/cli install tessl/maven-org-springframework-kafka--spring-kafka-test@3.3.0Spring Kafka Test provides comprehensive testing support for Spring Kafka applications through embedded Kafka broker functionality, testing utilities, assertion helpers, and container testing support. It enables reliable integration testing that validates message production, consumption, and processing workflows in isolated test environments.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>3.3.7</version>
<scope>test</scope>
</dependency>import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
public class KafkaIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testKafkaProducerConsumer() throws Exception {
// Create consumer properties
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
// Subscribe to embedded topic
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
// Create producer properties
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<Integer, String> producer = new KafkaProducer<>(producerProps);
// Send message
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
// Consume and verify
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record.value()).isEqualTo("test-message");
}
}Spring Kafka Test is built around several key components:
EmbeddedKafkaZKBroker) and KRaft-based (EmbeddedKafkaKraftBroker) implementations for running Kafka in test environments@EmbeddedKafka annotation for declarative test setup with Spring TestContextCore embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations. Essential for running integration tests with real Kafka instances without external dependencies.
public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
int DEFAULT_ADMIN_TIMEOUT = 10;
String BEAN_NAME = "embeddedKafka";
String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
EmbeddedKafkaBroker kafkaPorts(int... ports);
EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
EmbeddedKafkaBroker adminTimeout(int adminTimeout);
String getBrokersAsString();
void addTopics(String... topicsToAdd);
void addTopics(NewTopic... topicsToAdd);
Map<String, Exception> addTopicsWithResults(String... topicsToAdd);
Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);
Set<String> getTopics();
int getPartitionsPerTopic();
}Annotation-based configuration for embedded Kafka in Spring tests. Provides declarative setup with automatic broker lifecycle management.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(EmbeddedKafkaCondition.class)
public @interface EmbeddedKafka {
int value() default 1;
int count() default 1;
boolean controlledShutdown() default false;
int[] ports() default { 0 };
int zookeeperPort() default 0;
int partitions() default 2;
String[] topics() default {};
String[] brokerProperties() default {};
String brokerPropertiesLocation() default "";
String bootstrapServersProperty() default "spring.kafka.bootstrap-servers";
int zkConnectionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;
int zkSessionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_SESSION_TIMEOUT;
int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;
boolean kraft() default false;
}Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification.
public final class KafkaTestUtils {
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);
public static Map<String, Object> consumerProps(String brokers, String group);
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);
public static Map<String, Object> producerProps(String brokers);
public static Map<String, Object> streamsProps(String applicationId, String brokers);
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);
@Nullable
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout);
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception;
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception;
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
public static Object getPropertyValue(Object root, String propertyPath);
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);
public static Properties defaultPropertyOverrides();
}Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, and timestamps.
public final class KafkaMatchers {
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);
}
public final class KafkaConditions {
public static <K> Condition<ConsumerRecord<K, ?>> key(K key);
public static <V> Condition<ConsumerRecord<?, V>> value(V value);
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);
public static Condition<ConsumerRecord<?, ?>> partition(int partition);
}JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions.
public class EmbeddedKafkaRule extends ExternalResource {
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);
public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);
public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);
public EmbeddedKafkaBroker getEmbeddedKafka();
}
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
public static EmbeddedKafkaBroker getBroker();
}Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management.
public final class ContainerTestUtils {
public static void waitForAssignment(Object container, int partitions);
}public class BrokerAddress {
public static final int DEFAULT_PORT = 9092;
public BrokerAddress(String host, int port);
public BrokerAddress(String host);
public static BrokerAddress fromAddress(String address);
public String getHost();
public int getPort();
}
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
}