Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
npx @tessl/cli install tessl/maven-org-springframework-kafka--spring-kafka-test@3.3.00
# Spring Kafka Test
1
2
Spring Kafka Test provides comprehensive testing support for Spring Kafka applications through embedded Kafka broker functionality, testing utilities, assertion helpers, and container testing support. It enables reliable integration testing that validates message production, consumption, and processing workflows in isolated test environments.
3
4
## Package Information
5
6
- **Package Name**: spring-kafka-test
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.springframework.kafka</groupId>
13
<artifactId>spring-kafka-test</artifactId>
14
<version>3.3.7</version>
15
<scope>test</scope>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.springframework.kafka.test.EmbeddedKafkaBroker;
23
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
24
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
25
import org.springframework.kafka.test.context.EmbeddedKafka;
26
import org.springframework.kafka.test.utils.KafkaTestUtils;
27
```
28
29
## Basic Usage
30
31
```java
32
@SpringBootTest
33
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
34
public class KafkaIntegrationTest {
35
36
@Autowired
37
private EmbeddedKafkaBroker embeddedKafka;
38
39
@Test
40
public void testKafkaProducerConsumer() throws Exception {
41
// Create consumer properties
42
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
43
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
44
45
// Subscribe to embedded topic
46
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
47
48
// Create producer properties
49
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
50
Producer<Integer, String> producer = new KafkaProducer<>(producerProps);
51
52
// Send message
53
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
54
55
// Consume and verify
56
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
57
assertThat(record.value()).isEqualTo("test-message");
58
}
59
}
60
```
61
62
## Architecture
63
64
Spring Kafka Test is built around several key components:
65
66
- **Embedded Brokers**: Both ZooKeeper-based (`EmbeddedKafkaZKBroker`) and KRaft-based (`EmbeddedKafkaKraftBroker`) implementations for running Kafka in test environments
67
- **Testing Annotations**: `@EmbeddedKafka` annotation for declarative test setup with Spring TestContext
68
- **Utility Classes**: Helper methods for creating test configurations, polling records, and common testing operations
69
- **Assertion Support**: Both Hamcrest matchers and AssertJ conditions for validating Kafka message behavior
70
- **JUnit Integration**: Rules for JUnit 4 and extensions for JUnit 5 to manage broker lifecycle
71
- **Container Testing**: Utilities for testing Spring Kafka listener containers
72
73
## Capabilities
74
75
### Embedded Kafka Brokers
76
77
Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations. Essential for running integration tests with real Kafka instances without external dependencies.
78
79
```java { .api }
80
public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
81
int DEFAULT_ADMIN_TIMEOUT = 10;
82
String BEAN_NAME = "embeddedKafka";
83
String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
84
String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
85
86
EmbeddedKafkaBroker kafkaPorts(int... ports);
87
EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);
88
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
89
EmbeddedKafkaBroker adminTimeout(int adminTimeout);
90
String getBrokersAsString();
91
void addTopics(String... topicsToAdd);
92
void addTopics(NewTopic... topicsToAdd);
93
Map<String, Exception> addTopicsWithResults(String... topicsToAdd);
94
Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);
95
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);
96
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);
97
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);
98
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);
99
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);
100
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);
101
Set<String> getTopics();
102
int getPartitionsPerTopic();
103
}
104
```
105
106
[Embedded Brokers](./embedded-brokers.md)
107
108
### Testing Annotations
109
110
Annotation-based configuration for embedded Kafka in Spring tests. Provides declarative setup with automatic broker lifecycle management.
111
112
```java { .api }
113
@Target(ElementType.TYPE)
114
@Retention(RetentionPolicy.RUNTIME)
115
@ExtendWith(EmbeddedKafkaCondition.class)
116
public @interface EmbeddedKafka {
117
int value() default 1;
118
int count() default 1;
119
boolean controlledShutdown() default false;
120
int[] ports() default { 0 };
121
int zookeeperPort() default 0;
122
int partitions() default 2;
123
String[] topics() default {};
124
String[] brokerProperties() default {};
125
String brokerPropertiesLocation() default "";
126
String bootstrapServersProperty() default "spring.kafka.bootstrap-servers";
127
int zkConnectionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;
128
int zkSessionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_SESSION_TIMEOUT;
129
int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;
130
boolean kraft() default false;
131
}
132
```
133
134
[Testing Annotations](./testing-annotations.md)
135
136
### Test Utilities
137
138
Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification.
139
140
```java { .api }
141
public final class KafkaTestUtils {
142
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);
143
public static Map<String, Object> consumerProps(String brokers, String group);
144
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);
145
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);
146
public static Map<String, Object> producerProps(String brokers);
147
public static Map<String, Object> streamsProps(String applicationId, String brokers);
148
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);
149
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);
150
@Nullable
151
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout);
152
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);
153
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);
154
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
155
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception;
156
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception;
157
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
158
public static Object getPropertyValue(Object root, String propertyPath);
159
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);
160
public static Properties defaultPropertyOverrides();
161
}
162
```
163
164
[Test Utilities](./test-utilities.md)
165
166
### Assertion and Matching
167
168
Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, and timestamps.
169
170
```java { .api }
171
public final class KafkaMatchers {
172
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);
173
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);
174
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);
175
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);
176
}
177
178
public final class KafkaConditions {
179
public static <K> Condition<ConsumerRecord<K, ?>> key(K key);
180
public static <V> Condition<ConsumerRecord<?, V>> value(V value);
181
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);
182
public static Condition<ConsumerRecord<?, ?>> partition(int partition);
183
}
184
```
185
186
[Assertion and Matching](./assertion-matching.md)
187
188
### JUnit Integration
189
190
JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions.
191
192
```java { .api }
193
public class EmbeddedKafkaRule extends ExternalResource {
194
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);
195
public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);
196
public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);
197
public EmbeddedKafkaBroker getEmbeddedKafka();
198
}
199
200
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
201
public static EmbeddedKafkaBroker getBroker();
202
}
203
```
204
205
[JUnit Integration](./junit-integration.md)
206
207
### Container Testing
208
209
Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management.
210
211
```java { .api }
212
public final class ContainerTestUtils {
213
public static void waitForAssignment(Object container, int partitions);
214
}
215
```
216
217
[Container Testing](./container-testing.md)
218
219
## Types
220
221
```java { .api }
222
public class BrokerAddress {
223
public static final int DEFAULT_PORT = 9092;
224
225
public BrokerAddress(String host, int port);
226
public BrokerAddress(String host);
227
public static BrokerAddress fromAddress(String address);
228
public String getHost();
229
public int getPort();
230
}
231
232
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
233
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
234
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
235
}
236
```