CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-kafka--spring-kafka-test

Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications

Pending
Overview
Eval results
Files

test-utilities.mddocs/

Test Utilities

Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification patterns.

Capabilities

Configuration Utilities

Helper methods for creating standard test configurations for consumers, producers, and Kafka Streams.

/**
 * Kafka testing utilities
 */
public final class KafkaTestUtils {
    /**
     * Set up test properties for an <Integer, String> consumer
     * @param group the group id
     * @param autoCommit the auto commit
     * @param embeddedKafka a EmbeddedKafkaBroker instance
     * @return the properties
     */
    public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);

    /**
     * Set up test properties for an <Integer, String> consumer
     * @param brokers the bootstrapServers property
     * @param group the group id
     * @return the properties
     */
    public static Map<String, Object> consumerProps(String brokers, String group);

    /**
     * Set up test properties for an <Integer, String> consumer
     * @param brokers the bootstrapServers property
     * @param group the group id
     * @param autoCommit the auto commit
     * @return the properties
     */
    public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);

    /**
     * Set up test properties for an <Integer, String> producer
     * @param embeddedKafka a EmbeddedKafkaBroker instance
     * @return the properties
     */
    public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);

    /**
     * Set up test properties for an <Integer, String> producer
     * @param brokers the bootstrapServers property
     * @return the properties
     */
    public static Map<String, Object> producerProps(String brokers);

    /**
     * Set up test properties for the Kafka Streams
     * @param applicationId the applicationId for the Kafka Streams
     * @param brokers the bootstrapServers property
     * @return the properties
     */
    public static Map<String, Object> streamsProps(String applicationId, String brokers);
}

Message Polling Utilities

Methods for polling and retrieving messages from Kafka topics with various timeout and filtering options.

public final class KafkaTestUtils {
    /**
     * Poll the consumer, expecting a single record for the specified topic
     * @param consumer the consumer
     * @param topic the topic
     * @param <K> the key type
     * @param <V> the value type
     * @return the record
     * @throws IllegalStateException if exactly one record is not received
     */
    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);

    /**
     * Poll the consumer, expecting a single record for the specified topic
     * @param consumer the consumer
     * @param topic the topic
     * @param timeout max duration to wait for records
     * @param <K> the key type
     * @param <V> the value type
     * @return the record
     * @throws IllegalStateException if exactly one record is not received
     */
    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);

    /**
     * Get a single record for the group from the topic/partition
     * @param brokerAddresses the broker address(es)
     * @param group the group
     * @param topic the topic
     * @param partition the partition
     * @param seekToLast true to fetch an existing last record, if present
     * @param commit commit offset after polling or not
     * @param timeout the timeout
     * @return the record or null if no record received
     */
    @Nullable
    public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
            boolean seekToLast, boolean commit, Duration timeout);

    /**
     * Poll the consumer for records
     * @param consumer the consumer
     * @param <K> the key type
     * @param <V> the value type
     * @return the records
     */
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);

    /**
     * Poll the consumer for records
     * @param consumer the consumer
     * @param timeout max time to wait for records
     * @param <K> the key type
     * @param <V> the value type
     * @return the records
     * @throws IllegalStateException if the poll returns null
     */
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);

    /**
     * Poll the consumer for records
     * @param consumer the consumer
     * @param timeout max time to wait for records
     * @param minRecords wait until the timeout or at least this number of records are received
     * @param <K> the key type
     * @param <V> the value type
     * @return the records
     * @throws IllegalStateException if the poll returns null
     */
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
}

Offset and Metadata Utilities

Methods for retrieving consumer group offsets and topic partition metadata.

public final class KafkaTestUtils {
    /**
     * Get the current offset and metadata for the provided group/topic/partition
     * @param brokerAddresses the broker address(es)
     * @param group the group
     * @param topic the topic
     * @param partition the partition
     * @return the offset and metadata
     * @throws Exception if an exception occurs
     */
    public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
            throws Exception;

    /**
     * Get the current offset and metadata for the provided group/topic/partition
     * @param adminClient the AdminClient instance
     * @param group the group
     * @param topic the topic
     * @param partition the partition
     * @return the offset and metadata
     * @throws Exception if an exception occurs
     */
    public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
            throws Exception;

    /**
     * Return the end offsets of the requested topic/partitions
     * @param consumer the consumer
     * @param topic the topic
     * @param partitions the partitions, or null for all partitions
     * @return the map of end offsets
     */
    public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
}

Property Access Utilities

Utilities for accessing nested properties in objects using dotted notation.

public final class KafkaTestUtils {
    /**
     * Uses nested DirectFieldAccessors to obtain a property using dotted notation to traverse fields
     * @param root The object
     * @param propertyPath The path
     * @return The field
     */
    public static Object getPropertyValue(Object root, String propertyPath);

    /**
     * A typed version of getPropertyValue(Object, String)
     * @param root the object
     * @param propertyPath the path
     * @param type the type to cast the object to
     * @param <T> the type
     * @return the field value
     */
    public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);

    /**
     * Return a Properties object equal to the default consumer property overrides
     * Useful when matching arguments in Mockito tests
     * @return the default properties
     */
    public static Properties defaultPropertyOverrides();
}

JUnit Utilities

Additional utilities for JUnit testing scenarios.

/**
 * JUnit testing utilities
 */
public final class JUnitUtils {
    // Utility methods for JUnit integration
}

Usage Examples:

// Basic consumer and producer setup
@Test
public void testBasicProducerConsumer() throws Exception {
    // Create consumer
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
    Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

    // Create producer
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
    Producer<Integer, String> producer = new KafkaProducer<>(producerProps);

    // Send message
    producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));

    // Consume and verify
    ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
    assertEquals("test-message", record.value());
    assertEquals(Integer.valueOf(1), record.key());
}

// Multiple records polling
@Test
public void testMultipleRecords() throws Exception {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
    Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
    Producer<String, String> producer = new KafkaProducer<>(producerProps);

    // Send multiple messages
    for (int i = 0; i < 5; i++) {
        producer.send(new ProducerRecord<>("test-topic", "key-" + i, "message-" + i));
    }

    // Poll for multiple records with timeout
    ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 5);
    assertEquals(5, records.count());

    // Verify each record
    for (ConsumerRecord<String, String> record : records) {
        assertTrue(record.key().startsWith("key-"));
        assertTrue(record.value().startsWith("message-"));
    }
}

// Offset management
@Test
public void testOffsetManagement() throws Exception {
    String brokerAddresses = embeddedKafka.getBrokersAsString();
    String group = "offset-test-group";
    String topic = "offset-topic";
    int partition = 0;

    // Send a message
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(brokerAddresses);
    Producer<String, String> producer = new KafkaProducer<>(producerProps);
    producer.send(new ProducerRecord<>(topic, partition, "key", "value"));

    // Consume the message
    ConsumerRecord<?, ?> record = KafkaTestUtils.getOneRecord(
        brokerAddresses, group, topic, partition, false, true, Duration.ofSeconds(10)
    );
    assertNotNull(record);

    // Check committed offset
    OffsetAndMetadata offset = KafkaTestUtils.getCurrentOffset(brokerAddresses, group, topic, partition);
    assertNotNull(offset);
    assertEquals(1L, offset.offset());
}

// End offsets
@Test
public void testEndOffsets() throws Exception {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("end-offset-group", "false", embeddedKafka);
    Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

    // Get end offsets for all partitions
    Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic");
    assertFalse(endOffsets.isEmpty());

    // Get end offsets for specific partitions
    Map<TopicPartition, Long> specificOffsets = KafkaTestUtils.getEndOffsets(consumer, "test-topic", 0, 1);
    assertEquals(2, specificOffsets.size());
}

// Kafka Streams configuration
@Test
public void testStreamsConfiguration() {
    String applicationId = "test-streams-app";
    String brokers = embeddedKafka.getBrokersAsString();
    
    Map<String, Object> streamsProps = KafkaTestUtils.streamsProps(applicationId, brokers);
    
    assertEquals(applicationId, streamsProps.get(StreamsConfig.APPLICATION_ID_CONFIG));
    assertEquals(brokers, streamsProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
}

// Property access utilities
@Test
public void testPropertyAccess() {
    SomeComplexObject obj = new SomeComplexObject();
    
    // Access nested properties
    String nestedValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value", String.class);
    Object rawValue = KafkaTestUtils.getPropertyValue(obj, "nested.property.value");
    
    // Default properties
    Properties defaults = KafkaTestUtils.defaultPropertyOverrides();
    assertEquals("false", defaults.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test

docs

assertion-matching.md

container-testing.md

embedded-brokers.md

index.md

junit-integration.md

test-utilities.md

testing-annotations.md

tile.json