0
# Test Utilities
1
2
Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification patterns.
3
4
## Capabilities
5
6
### Configuration Utilities
7
8
Helper methods for creating standard test configurations for consumers, producers, and Kafka Streams.
9
10
```java { .api }
11
/**
12
* Kafka testing utilities
13
*/
14
public final class KafkaTestUtils {
15
/**
16
* Set up test properties for an <Integer, String> consumer
17
* @param group the group id
18
* @param autoCommit the auto commit
19
* @param embeddedKafka a EmbeddedKafkaBroker instance
20
* @return the properties
21
*/
22
public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);
23
24
/**
25
* Set up test properties for an <Integer, String> consumer
26
* @param brokers the bootstrapServers property
27
* @param group the group id
28
* @return the properties
29
*/
30
public static Map<String, Object> consumerProps(String brokers, String group);
31
32
/**
33
* Set up test properties for an <Integer, String> consumer
34
* @param brokers the bootstrapServers property
35
* @param group the group id
36
* @param autoCommit the auto commit
37
* @return the properties
38
*/
39
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);
40
41
/**
42
* Set up test properties for an <Integer, String> producer
43
* @param embeddedKafka a EmbeddedKafkaBroker instance
44
* @return the properties
45
*/
46
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);
47
48
/**
49
* Set up test properties for an <Integer, String> producer
50
* @param brokers the bootstrapServers property
51
* @return the properties
52
*/
53
public static Map<String, Object> producerProps(String brokers);
54
55
/**
56
* Set up test properties for the Kafka Streams
57
* @param applicationId the applicationId for the Kafka Streams
58
* @param brokers the bootstrapServers property
59
* @return the properties
60
*/
61
public static Map<String, Object> streamsProps(String applicationId, String brokers);
62
}
63
```
64
65
### Message Polling Utilities
66
67
Methods for polling and retrieving messages from Kafka topics with various timeout and filtering options.
68
69
```java { .api }
70
public final class KafkaTestUtils {
71
/**
72
* Poll the consumer, expecting a single record for the specified topic
73
* @param consumer the consumer
74
* @param topic the topic
75
* @param <K> the key type
76
* @param <V> the value type
77
* @return the record
78
* @throws IllegalStateException if exactly one record is not received
79
*/
80
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);
81
82
/**
83
* Poll the consumer, expecting a single record for the specified topic
84
* @param consumer the consumer
85
* @param topic the topic
86
* @param timeout max duration to wait for records
87
* @param <K> the key type
88
* @param <V> the value type
89
* @return the record
90
* @throws IllegalStateException if exactly one record is not received
91
*/
92
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);
93
94
/**
95
* Get a single record for the group from the topic/partition
96
* @param brokerAddresses the broker address(es)
97
* @param group the group
98
* @param topic the topic
99
* @param partition the partition
100
* @param seekToLast true to fetch an existing last record, if present
101
* @param commit commit offset after polling or not
102
* @param timeout the timeout
103
* @return the record or null if no record received
104
*/
105
@Nullable
106
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
107
boolean seekToLast, boolean commit, Duration timeout);
108
109
/**
110
* Poll the consumer for records
111
* @param consumer the consumer
112
* @param <K> the key type
113
* @param <V> the value type
114
* @return the records
115
*/
116
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);
117
118
/**
119
* Poll the consumer for records
120
* @param consumer the consumer
121
* @param timeout max time to wait for records
122
* @param <K> the key type
123
* @param <V> the value type
124
* @return the records
125
* @throws IllegalStateException if the poll returns null
126
*/
127
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);
128
129
/**
130
* Poll the consumer for records
131
* @param consumer the consumer
132
* @param timeout max time to wait for records
133
* @param minRecords wait until the timeout or at least this number of records are received
134
* @param <K> the key type
135
* @param <V> the value type
136
* @return the records
137
* @throws IllegalStateException if the poll returns null
138
*/
139
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
140
}
141
```
142
143
### Offset and Metadata Utilities
144
145
Methods for retrieving consumer group offsets and topic partition metadata.
146
147
```java { .api }
148
public final class KafkaTestUtils {
149
/**
150
* Get the current offset and metadata for the provided group/topic/partition
151
* @param brokerAddresses the broker address(es)
152
* @param group the group
153
* @param topic the topic
154
* @param partition the partition
155
* @return the offset and metadata
156
* @throws Exception if an exception occurs
157
*/
158
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
159
throws Exception;
160
161
/**
162
* Get the current offset and metadata for the provided group/topic/partition
163
* @param adminClient the AdminClient instance
164
* @param group the group
165
* @param topic the topic
166
* @param partition the partition
167
* @return the offset and metadata
168
* @throws Exception if an exception occurs
169
*/
170
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
171
throws Exception;
172
173
/**
174
* Return the end offsets of the requested topic/partitions
175
* @param consumer the consumer
176
* @param topic the topic
177
* @param partitions the partitions, or null for all partitions
178
* @return the map of end offsets
179
*/
180
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
181
}
182
```
183
184
### Property Access Utilities
185
186
Utilities for accessing nested properties in objects using dotted notation.
187
188
```java { .api }
189
public final class KafkaTestUtils {
190
/**
191
* Uses nested DirectFieldAccessors to obtain a property using dotted notation to traverse fields
192
* @param root The object
193
* @param propertyPath The path
194
* @return The field
195
*/
196
public static Object getPropertyValue(Object root, String propertyPath);
197
198
/**
199
* A typed version of getPropertyValue(Object, String)
200
* @param root the object
201
* @param propertyPath the path
202
* @param type the type to cast the object to
203
* @param <T> the type
204
* @return the field value
205
*/
206
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);
207
208
/**
209
* Return a Properties object equal to the default consumer property overrides
210
* Useful when matching arguments in Mockito tests
211
* @return the default properties
212
*/
213
public static Properties defaultPropertyOverrides();
214
}
215
```
216
217
### JUnit Utilities
218
219
Additional utilities for JUnit testing scenarios.
220
221
```java { .api }
222
/**
223
* JUnit testing utilities
224
*/
225
public final class JUnitUtils {
226
// Utility methods for JUnit integration
227
}
228
```
229
230
**Usage Examples:**
231
232
```java
233
// Basic consumer and producer setup
234
@Test
235
public void testBasicProducerConsumer() throws Exception {
236
// Create consumer
237
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
238
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
239
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
240
241
// Create producer
242
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
243
Producer<Integer, String> producer = new KafkaProducer<>(producerProps);
244
245
// Send message
246
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
247
248
// Consume and verify
249
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
250
assertEquals("test-message", record.value());
251
assertEquals(Integer.valueOf(1), record.key());
252
}
253
254
// Multiple records polling
255
@Test
256
public void testMultipleRecords() throws Exception {
257
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
258
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
259
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
260
261
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
262
Producer<String, String> producer = new KafkaProducer<>(producerProps);
263
264
// Send multiple messages
265
for (int i = 0; i < 5; i++) {
266
producer.send(new ProducerRecord<>("test-topic", "key-" + i, "message-" + i));
267
}
268
269
// Poll for multiple records with timeout
270
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 5);
271
assertEquals(5, records.count());
272
273
// Verify each record
274
for (ConsumerRecord<String, String> record : records) {
275
assertTrue(record.key().startsWith("key-"));
276
assertTrue(record.value().startsWith("message-"));
277
}
278
}
279
280
// Offset management
281
@Test
282
public void testOffsetManagement() throws Exception {
283
String brokerAddresses = embeddedKafka.getBrokersAsString();
284
String group = "offset-test-group";
285
String topic = "offset-topic";
286
int partition = 0;
287
288
// Send a message
289
Map<String, Object> producerProps = KafkaTestUtils.producerProps(brokerAddresses);
290
Producer<String, String> producer = new KafkaProducer<>(producerProps);
291
producer.send(new ProducerRecord<>(topic, partition, "key", "value"));
292
293
// Consume the message
294
ConsumerRecord<?, ?> record = KafkaTestUtils.getOneRecord(
295
brokerAddresses, group, topic, partition, false, true, Duration.ofSeconds(10)
296
);
297
assertNotNull(record);
298
299
// Check committed offset
300
OffsetAndMetadata offset = KafkaTestUtils.getCurrentOffset(brokerAddresses, group, topic, partition);
301
assertNotNull(offset);
302
assertEquals(1L, offset.offset());
303
}
304
305
// End offsets
306
@Test
307
public void testEndOffsets() throws Exception {
308
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("end-offset-group", "false", embeddedKafka);
309
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
310
311
// Get end offsets for all partitions
312
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic");
313
assertFalse(endOffsets.isEmpty());
314
315
// Get end offsets for specific partitions
316
Map<TopicPartition, Long> specificOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic", 0, 1);
317
assertEquals(2, specificOffsets.size());
318
}
319
320
// Kafka Streams configuration
321
@Test
322
public void testStreamsConfiguration() {
323
String applicationId = "test-streams-app";
324
String brokers = embeddedKafka.getBrokersAsString();
325
326
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps(applicationId, brokers);
327
328
assertEquals(applicationId, streamsProps.get(StreamsConfig.APPLICATION_ID_CONFIG));
329
assertEquals(brokers, streamsProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
330
}
331
332
// Property access utilities
333
@Test
334
public void testPropertyAccess() {
335
SomeComplexObject obj = new SomeComplexObject();
336
337
// Access nested properties
338
String nestedValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value", String.class);
339
Object rawValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value");
340
341
// Default properties
342
Properties defaults = KafkaTestUtils.defaultPropertyOverrides();
343
assertEquals("false", defaults.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
344
}
345
```