0
# Assertion and Matching
1
2
Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, timestamps, and combined conditions.
3
4
## Capabilities
5
6
### Hamcrest Matchers
7
8
Hamcrest-based matchers for validating ConsumerRecord properties in a fluent, readable way.
9
10
```java { .api }
11
/**
12
* Hamcrest Matchers utilities for Kafka records
13
*/
14
public final class KafkaMatchers {
15
/**
16
* Matcher for record key
17
* @param key the key
18
* @param <K> the type
19
* @return a Matcher that matches the key in a consumer record
20
*/
21
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);
22
23
/**
24
* Matcher for record value
25
* @param value the value
26
* @param <V> the type
27
* @return a Matcher that matches the value in a consumer record
28
*/
29
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);
30
31
/**
32
* Matcher for record partition
33
* @param partition the partition
34
* @return a Matcher that matches the partition in a consumer record
35
*/
36
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);
37
38
/**
39
* Matcher testing the timestamp of a ConsumerRecord assuming the topic has been set with CREATE_TIME
40
* @param ts timestamp of the consumer record
41
* @return a Matcher that matches the timestamp in a consumer record
42
*/
43
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);
44
45
/**
46
* Matcher testing the timestamp of a ConsumerRecord
47
* @param type timestamp type of the record
48
* @param ts timestamp of the consumer record
49
* @return a Matcher that matches the timestamp in a consumer record
50
*/
51
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts);
52
}
53
```
54
55
### Hamcrest Matcher Implementations
56
57
Internal matcher implementations for detailed record validation.
58
59
```java { .api }
60
/**
61
* Hamcrest matcher for ConsumerRecord key validation
62
*/
63
public static class ConsumerRecordKeyMatcher<K> extends DiagnosingMatcher<ConsumerRecord<K, ?>> {
64
public ConsumerRecordKeyMatcher(K key);
65
public void describeTo(Description description);
66
protected boolean matches(Object item, Description mismatchDescription);
67
}
68
69
/**
70
* Hamcrest matcher for ConsumerRecord value validation
71
*/
72
public static class ConsumerRecordValueMatcher<V> extends DiagnosingMatcher<ConsumerRecord<?, V>> {
73
public ConsumerRecordValueMatcher(V payload);
74
public void describeTo(Description description);
75
protected boolean matches(Object item, Description mismatchDescription);
76
}
77
78
/**
79
* Hamcrest matcher for ConsumerRecord partition validation
80
*/
81
public static class ConsumerRecordPartitionMatcher extends DiagnosingMatcher<ConsumerRecord<?, ?>> {
82
public ConsumerRecordPartitionMatcher(int partition);
83
public void describeTo(Description description);
84
protected boolean matches(Object item, Description mismatchDescription);
85
}
86
87
/**
88
* Hamcrest matcher for ConsumerRecord timestamp validation
89
*/
90
public static class ConsumerRecordTimestampMatcher extends DiagnosingMatcher<ConsumerRecord<?, ?>> {
91
public ConsumerRecordTimestampMatcher(TimestampType type, long ts);
92
public void describeTo(Description description);
93
protected boolean matches(Object item, Description mismatchDescription);
94
}
95
```
96
97
### AssertJ Conditions
98
99
AssertJ-based conditions for modern, fluent assertion style with Kafka records.
100
101
```java { .api }
102
/**
103
* AssertJ custom Conditions for Kafka records
104
*/
105
public final class KafkaConditions {
106
/**
107
* Condition for record key
108
* @param key the key
109
* @param <K> the type
110
* @return a Condition that matches the key in a consumer record
111
*/
112
public static <K> Condition<ConsumerRecord<K, ?>> key(K key);
113
114
/**
115
* Condition for record value
116
* @param value the value
117
* @param <V> the type
118
* @return a Condition that matches the value in a consumer record
119
*/
120
public static <V> Condition<ConsumerRecord<?, V>> value(V value);
121
122
/**
123
* Condition for key-value pair
124
* @param key the key
125
* @param value the value
126
* @param <K> the key type
127
* @param <V> the value type
128
* @return a Condition that matches the key and value in a consumer record
129
*/
130
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);
131
132
/**
133
* Condition for record timestamp (CREATE_TIME)
134
* @param value the timestamp
135
* @return a Condition that matches the timestamp value in a consumer record
136
*/
137
public static Condition<ConsumerRecord<?, ?>> timestamp(long value);
138
139
/**
140
* Condition for record timestamp with type
141
* @param type the type of timestamp
142
* @param value the timestamp
143
* @return a Condition that matches the timestamp value in a consumer record
144
*/
145
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value);
146
147
/**
148
* Condition for record partition
149
* @param partition the partition
150
* @return a Condition that matches the partition in a consumer record
151
*/
152
public static Condition<ConsumerRecord<?, ?>> partition(int partition);
153
}
154
```
155
156
### AssertJ Condition Implementations
157
158
Internal condition implementations for detailed record validation.
159
160
```java { .api }
161
/**
162
* AssertJ condition for ConsumerRecord key validation
163
*/
164
public static class ConsumerRecordKeyCondition<K> extends Condition<ConsumerRecord<K, ?>> {
165
public ConsumerRecordKeyCondition(K key);
166
public boolean matches(ConsumerRecord<K, ?> value);
167
}
168
169
/**
170
* AssertJ condition for ConsumerRecord value validation
171
*/
172
public static class ConsumerRecordValueCondition<V> extends Condition<ConsumerRecord<?, V>> {
173
public ConsumerRecordValueCondition(V payload);
174
public boolean matches(ConsumerRecord<?, V> value);
175
}
176
177
/**
178
* AssertJ condition for ConsumerRecord key-value validation
179
*/
180
public static class ConsumerRecordKeyValueCondition<K, V> extends Condition<ConsumerRecord<K, V>> {
181
public ConsumerRecordKeyValueCondition(K key, V value);
182
public boolean matches(ConsumerRecord<K, V> value);
183
}
184
185
/**
186
* AssertJ condition for ConsumerRecord timestamp validation
187
*/
188
public static class ConsumerRecordTimestampCondition extends Condition<ConsumerRecord<?, ?>> {
189
public ConsumerRecordTimestampCondition(TimestampType type, long ts);
190
public boolean matches(ConsumerRecord<?, ?> value);
191
}
192
193
/**
194
* AssertJ condition for ConsumerRecord partition validation
195
*/
196
public static class ConsumerRecordPartitionCondition extends Condition<ConsumerRecord<?, ?>> {
197
public ConsumerRecordPartitionCondition(int partition);
198
public boolean matches(ConsumerRecord<?, ?> value);
199
}
200
```
201
202
**Usage Examples:**
203
204
```java
205
// Hamcrest matchers with JUnit
206
@Test
207
public void testHamcrestMatchers() throws Exception {
208
// Setup consumer and producer
209
Consumer<String, String> consumer = createConsumer();
210
Producer<String, String> producer = createProducer();
211
212
// Send test message
213
producer.send(new ProducerRecord<>("test-topic", 0, System.currentTimeMillis(), "test-key", "test-value"));
214
215
// Consume and assert using Hamcrest matchers
216
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
217
218
assertThat(record, hasKey("test-key"));
219
assertThat(record, hasValue("test-value"));
220
assertThat(record, hasPartition(0));
221
assertThat(record, hasTimestamp(TimestampType.CREATE_TIME, System.currentTimeMillis()));
222
223
// Combined assertions
224
assertThat(record, allOf(
225
hasKey("test-key"),
226
hasValue("test-value"),
227
hasPartition(0)
228
));
229
}
230
231
// AssertJ conditions
232
@Test
233
public void testAssertJConditions() throws Exception {
234
// Setup consumer and producer
235
Consumer<Integer, String> consumer = createConsumer();
236
Producer<Integer, String> producer = createProducer();
237
238
// Send test message
239
producer.send(new ProducerRecord<>("test-topic", 1, 42, "hello world"));
240
241
// Consume and assert using AssertJ conditions
242
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
243
244
assertThat(record).is(key(42));
245
assertThat(record).is(value("hello world"));
246
assertThat(record).is(partition(1));
247
assertThat(record).is(keyValue(42, "hello world"));
248
249
// Combined conditions
250
assertThat(record).satisfies(
251
keyValue(42, "hello world"),
252
partition(1)
253
);
254
}
255
256
// Null value handling
257
@Test
258
public void testNullValueHandling() throws Exception {
259
Consumer<String, String> consumer = createConsumer();
260
Producer<String, String> producer = createProducer();
261
262
// Send message with null value (tombstone)
263
producer.send(new ProducerRecord<>("test-topic", "delete-key", null));
264
265
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
266
267
// Hamcrest matcher
268
assertThat(record, hasKey("delete-key"));
269
assertThat(record, hasValue(nullValue()));
270
271
// AssertJ condition
272
assertThat(record).is(key("delete-key"));
273
assertThat(record).is(value(null));
274
}
275
276
// Timestamp validation
277
@Test
278
public void testTimestampValidation() throws Exception {
279
Consumer<String, String> consumer = createConsumer();
280
Producer<String, String> producer = createProducer();
281
282
long timestamp = System.currentTimeMillis();
283
producer.send(new ProducerRecord<>("test-topic", 0, timestamp, "key", "value"));
284
285
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
286
287
// Hamcrest timestamp matchers
288
assertThat(record, hasTimestamp(timestamp));
289
assertThat(record, hasTimestamp(TimestampType.CREATE_TIME, timestamp));
290
291
// AssertJ timestamp conditions
292
assertThat(record).is(timestamp(timestamp));
293
assertThat(record).is(timestamp(TimestampType.CREATE_TIME, timestamp));
294
}
295
296
// Multiple records validation
297
@Test
298
public void testMultipleRecordsValidation() throws Exception {
299
Consumer<String, String> consumer = createConsumer();
300
Producer<String, String> producer = createProducer();
301
302
// Send multiple messages
303
producer.send(new ProducerRecord<>("test-topic", 0, "key1", "value1"));
304
producer.send(new ProducerRecord<>("test-topic", 1, "key2", "value2"));
305
producer.send(new ProducerRecord<>("test-topic", 0, "key3", "value3"));
306
307
// Consume all records
308
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10));
309
310
// Convert to list for easier assertion
311
List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
312
records.forEach(recordList::add);
313
314
// Hamcrest collection matchers
315
assertThat(recordList, hasSize(3));
316
assertThat(recordList, hasItem(hasKey("key1")));
317
assertThat(recordList, hasItem(hasValue("value2")));
318
assertThat(recordList, hasItem(allOf(hasKey("key3"), hasPartition(0))));
319
320
// AssertJ collection assertions
321
assertThat(recordList)
322
.hasSize(3)
323
.anyMatch(record -> record.key().equals("key1"))
324
.anyMatch(record -> record.value().equals("value2"))
325
.satisfies(list -> {
326
assertThat(list.get(0)).is(keyValue("key1", "value1"));
327
assertThat(list.get(1)).is(keyValue("key2", "value2"));
328
assertThat(list.get(2)).is(keyValue("key3", "value3"));
329
});
330
}
331
332
// Custom matcher combinations
333
@Test
334
public void testCustomMatcherCombinations() throws Exception {
335
Consumer<OrderEvent, String> consumer = createOrderConsumer();
336
Producer<OrderEvent, String> producer = createOrderProducer();
337
338
OrderEvent orderKey = new OrderEvent("order-123", "CREATED");
339
producer.send(new ProducerRecord<>("orders", orderKey, "Order created successfully"));
340
341
ConsumerRecord<OrderEvent, String> record = KafkaTestUtils.getSingleRecord(consumer, "orders");
342
343
// Custom assertions
344
assertThat(record, hasKey(orderKey));
345
assertThat(record.key().getOrderId(), equalTo("order-123"));
346
assertThat(record.key().getStatus(), equalTo("CREATED"));
347
assertThat(record, hasValue(containsString("successfully")));
348
349
// AssertJ custom conditions
350
assertThat(record)
351
.is(key(orderKey))
352
.satisfies(r -> assertThat(r.value()).contains("successfully"));
353
}
354
```