Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
—
Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, timestamps, and combined conditions.
Hamcrest-based matchers for validating ConsumerRecord properties in a fluent, readable way.
/**
* Hamcrest Matchers utilities for Kafka records
*/
public final class KafkaMatchers {
/**
* Matcher for record key
* @param key the key
* @param <K> the type
* @return a Matcher that matches the key in a consumer record
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);
/**
* Matcher for record value
* @param value the value
* @param <V> the type
* @return a Matcher that matches the value in a consumer record
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);
/**
* Matcher for record partition
* @param partition the partition
* @return a Matcher that matches the partition in a consumer record
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);
/**
* Matcher testing the timestamp of a ConsumerRecord assuming the topic has been set with CREATE_TIME
* @param ts timestamp of the consumer record
* @return a Matcher that matches the timestamp in a consumer record
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);
/**
* Matcher testing the timestamp of a ConsumerRecord
* @param type timestamp type of the record
* @param ts timestamp of the consumer record
* @return a Matcher that matches the timestamp in a consumer record
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts);
}Internal matcher implementations for detailed record validation.
/**
* Hamcrest matcher for ConsumerRecord key validation
*/
public static class ConsumerRecordKeyMatcher<K> extends DiagnosingMatcher<ConsumerRecord<K, ?>> {
public ConsumerRecordKeyMatcher(K key);
public void describeTo(Description description);
protected boolean matches(Object item, Description mismatchDescription);
}
/**
* Hamcrest matcher for ConsumerRecord value validation
*/
public static class ConsumerRecordValueMatcher<V> extends DiagnosingMatcher<ConsumerRecord<?, V>> {
public ConsumerRecordValueMatcher(V payload);
public void describeTo(Description description);
protected boolean matches(Object item, Description mismatchDescription);
}
/**
* Hamcrest matcher for ConsumerRecord partition validation
*/
public static class ConsumerRecordPartitionMatcher extends DiagnosingMatcher<ConsumerRecord<?, ?>> {
public ConsumerRecordPartitionMatcher(int partition);
public void describeTo(Description description);
protected boolean matches(Object item, Description mismatchDescription);
}
/**
* Hamcrest matcher for ConsumerRecord timestamp validation
*/
public static class ConsumerRecordTimestampMatcher extends DiagnosingMatcher<ConsumerRecord<?, ?>> {
public ConsumerRecordTimestampMatcher(TimestampType type, long ts);
public void describeTo(Description description);
protected boolean matches(Object item, Description mismatchDescription);
}AssertJ-based conditions for modern, fluent assertion style with Kafka records.
/**
* AssertJ custom Conditions for Kafka records
*/
public final class KafkaConditions {
/**
* Condition for record key
* @param key the key
* @param <K> the type
* @return a Condition that matches the key in a consumer record
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key);
/**
* Condition for record value
* @param value the value
* @param <V> the type
* @return a Condition that matches the value in a consumer record
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value);
/**
* Condition for key-value pair
* @param key the key
* @param value the value
* @param <K> the key type
* @param <V> the value type
* @return a Condition that matches the key and value in a consumer record
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);
/**
* Condition for record timestamp (CREATE_TIME)
* @param value the timestamp
* @return a Condition that matches the timestamp value in a consumer record
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value);
/**
* Condition for record timestamp with type
* @param type the type of timestamp
* @param value the timestamp
* @return a Condition that matches the timestamp value in a consumer record
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value);
/**
* Condition for record partition
* @param partition the partition
* @return a Condition that matches the partition in a consumer record
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition);
}Internal condition implementations for detailed record validation.
/**
* AssertJ condition for ConsumerRecord key validation
*/
public static class ConsumerRecordKeyCondition<K> extends Condition<ConsumerRecord<K, ?>> {
public ConsumerRecordKeyCondition(K key);
public boolean matches(ConsumerRecord<K, ?> value);
}
/**
* AssertJ condition for ConsumerRecord value validation
*/
public static class ConsumerRecordValueCondition<V> extends Condition<ConsumerRecord<?, V>> {
public ConsumerRecordValueCondition(V payload);
public boolean matches(ConsumerRecord<?, V> value);
}
/**
* AssertJ condition for ConsumerRecord key-value validation
*/
public static class ConsumerRecordKeyValueCondition<K, V> extends Condition<ConsumerRecord<K, V>> {
public ConsumerRecordKeyValueCondition(K key, V value);
public boolean matches(ConsumerRecord<K, V> value);
}
/**
* AssertJ condition for ConsumerRecord timestamp validation
*/
public static class ConsumerRecordTimestampCondition extends Condition<ConsumerRecord<?, ?>> {
public ConsumerRecordTimestampCondition(TimestampType type, long ts);
public boolean matches(ConsumerRecord<?, ?> value);
}
/**
* AssertJ condition for ConsumerRecord partition validation
*/
public static class ConsumerRecordPartitionCondition extends Condition<ConsumerRecord<?, ?>> {
public ConsumerRecordPartitionCondition(int partition);
public boolean matches(ConsumerRecord<?, ?> value);
}Usage Examples:
// Hamcrest matchers with JUnit
@Test
public void testHamcrestMatchers() throws Exception {
// Setup consumer and producer
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// Send test message
producer.send(new ProducerRecord<>("test-topic", 0, System.currentTimeMillis(), "test-key", "test-value"));
// Consume and assert using Hamcrest matchers
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record, hasKey("test-key"));
assertThat(record, hasValue("test-value"));
assertThat(record, hasPartition(0));
assertThat(record, hasTimestamp(TimestampType.CREATE_TIME, System.currentTimeMillis()));
// Combined assertions
assertThat(record, allOf(
hasKey("test-key"),
hasValue("test-value"),
hasPartition(0)
));
}
// AssertJ conditions
@Test
public void testAssertJConditions() throws Exception {
// Setup consumer and producer
Consumer<Integer, String> consumer = createConsumer();
Producer<Integer, String> producer = createProducer();
// Send test message
producer.send(new ProducerRecord<>("test-topic", 1, 42, "hello world"));
// Consume and assert using AssertJ conditions
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertThat(record).is(key(42));
assertThat(record).is(value("hello world"));
assertThat(record).is(partition(1));
assertThat(record).is(keyValue(42, "hello world"));
// Combined conditions
assertThat(record).satisfies(
keyValue(42, "hello world"),
partition(1)
);
}
// Null value handling
@Test
public void testNullValueHandling() throws Exception {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// Send message with null value (tombstone)
producer.send(new ProducerRecord<>("test-topic", "delete-key", null));
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
// Hamcrest matcher
assertThat(record, hasKey("delete-key"));
assertThat(record, hasValue(nullValue()));
// AssertJ condition
assertThat(record).is(key("delete-key"));
assertThat(record).is(value(null));
}
// Timestamp validation
@Test
public void testTimestampValidation() throws Exception {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
long timestamp = System.currentTimeMillis();
producer.send(new ProducerRecord<>("test-topic", 0, timestamp, "key", "value"));
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
// Hamcrest timestamp matchers
assertThat(record, hasTimestamp(timestamp));
assertThat(record, hasTimestamp(TimestampType.CREATE_TIME, timestamp));
// AssertJ timestamp conditions
assertThat(record).is(timestamp(timestamp));
assertThat(record).is(timestamp(TimestampType.CREATE_TIME, timestamp));
}
// Multiple records validation
@Test
public void testMultipleRecordsValidation() throws Exception {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// Send multiple messages
producer.send(new ProducerRecord<>("test-topic", 0, "key1", "value1"));
producer.send(new ProducerRecord<>("test-topic", 1, "key2", "value2"));
producer.send(new ProducerRecord<>("test-topic", 0, "key3", "value3"));
// Consume all records
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10));
// Convert to list for easier assertion
List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
records.forEach(recordList::add);
// Hamcrest collection matchers
assertThat(recordList, hasSize(3));
assertThat(recordList, hasItem(hasKey("key1")));
assertThat(recordList, hasItem(hasValue("value2")));
assertThat(recordList, hasItem(allOf(hasKey("key3"), hasPartition(0))));
// AssertJ collection assertions
assertThat(recordList)
.hasSize(3)
.anyMatch(record -> record.key().equals("key1"))
.anyMatch(record -> record.value().equals("value2"))
.satisfies(list -> {
assertThat(list.get(0)).is(keyValue("key1", "value1"));
assertThat(list.get(1)).is(keyValue("key2", "value2"));
assertThat(list.get(2)).is(keyValue("key3", "value3"));
});
}
// Custom matcher combinations
@Test
public void testCustomMatcherCombinations() throws Exception {
Consumer<OrderEvent, String> consumer = createOrderConsumer();
Producer<OrderEvent, String> producer = createOrderProducer();
OrderEvent orderKey = new OrderEvent("order-123", "CREATED");
producer.send(new ProducerRecord<>("orders", orderKey, "Order created successfully"));
ConsumerRecord<OrderEvent, String> record = KafkaTestUtils.getSingleRecord(consumer, "orders");
// Custom assertions
assertThat(record, hasKey(orderKey));
assertThat(record.key().getOrderId(), equalTo("order-123"));
assertThat(record.key().getStatus(), equalTo("CREATED"));
assertThat(record, hasValue(containsString("successfully")));
// AssertJ custom conditions
assertThat(record)
.is(key(orderKey))
.satisfies(r -> assertThat(r.value()).contains("successfully"));
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test