or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

assertion-matching.mdcontainer-testing.mdembedded-brokers.mdindex.mdjunit-integration.mdtest-utilities.mdtesting-annotations.md
tile.json

tessl/maven-org-springframework-kafka--spring-kafka-test

Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.kafka/spring-kafka-test@3.3.x

To install, run

npx @tessl/cli install tessl/maven-org-springframework-kafka--spring-kafka-test@3.3.0

index.mddocs/

Spring Kafka Test

Spring Kafka Test provides comprehensive testing support for Spring Kafka applications through embedded Kafka broker functionality, testing utilities, assertion helpers, and container testing support. It enables reliable integration testing that validates message production, consumption, and processing workflows in isolated test environments.

Package Information

  • Package Name: spring-kafka-test
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <version>3.3.7</version>
      <scope>test</scope>
    </dependency>

Core Imports

import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

Basic Usage

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
public class KafkaIntegrationTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void testKafkaProducerConsumer() throws Exception {
        // Create consumer properties
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
        Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
        
        // Subscribe to embedded topic
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");

        // Create producer properties
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        Producer<Integer, String> producer = new KafkaProducer<>(producerProps);

        // Send message
        producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));

        // Consume and verify
        ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
        assertThat(record.value()).isEqualTo("test-message");
    }
}

Architecture

Spring Kafka Test is built around several key components:

  • Embedded Brokers: Both ZooKeeper-based (EmbeddedKafkaZKBroker) and KRaft-based (EmbeddedKafkaKraftBroker) implementations for running Kafka in test environments
  • Testing Annotations: @EmbeddedKafka annotation for declarative test setup with Spring TestContext
  • Utility Classes: Helper methods for creating test configurations, polling records, and common testing operations
  • Assertion Support: Both Hamcrest matchers and AssertJ conditions for validating Kafka message behavior
  • JUnit Integration: Rules for JUnit 4 and extensions for JUnit 5 to manage broker lifecycle
  • Container Testing: Utilities for testing Spring Kafka listener containers

Capabilities

Embedded Kafka Brokers

Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations. Essential for running integration tests with real Kafka instances without external dependencies.

public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
    int DEFAULT_ADMIN_TIMEOUT = 10;
    String BEAN_NAME = "embeddedKafka";
    String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    
    EmbeddedKafkaBroker kafkaPorts(int... ports);
    EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);
    EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
    EmbeddedKafkaBroker adminTimeout(int adminTimeout);
    String getBrokersAsString();
    void addTopics(String... topicsToAdd);
    void addTopics(NewTopic... topicsToAdd);
    Map<String, Exception> addTopicsWithResults(String... topicsToAdd);
    Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);
    void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);
    void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);
    void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);
    void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);
    void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);
    void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);
    Set<String> getTopics();
    int getPartitionsPerTopic();
}

Embedded Brokers

Testing Annotations

Annotation-based configuration for embedded Kafka in Spring tests. Provides declarative setup with automatic broker lifecycle management.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(EmbeddedKafkaCondition.class)
public @interface EmbeddedKafka {
    int value() default 1;
    int count() default 1;
    boolean controlledShutdown() default false;
    int[] ports() default { 0 };
    int zookeeperPort() default 0;
    int partitions() default 2;
    String[] topics() default {};
    String[] brokerProperties() default {};
    String brokerPropertiesLocation() default "";
    String bootstrapServersProperty() default "spring.kafka.bootstrap-servers";
    int zkConnectionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_CONNECTION_TIMEOUT;
    int zkSessionTimeout() default EmbeddedKafkaZKBroker.DEFAULT_ZK_SESSION_TIMEOUT;
    int adminTimeout() default EmbeddedKafkaBroker.DEFAULT_ADMIN_TIMEOUT;
    boolean kraft() default false;
}

Testing Annotations

Test Utilities

Comprehensive testing utilities for creating configurations, polling messages, and common test operations. Simplifies Kafka test setup and message verification.

public final class KafkaTestUtils {
    public static Map<String, Object> consumerProps(String group, String autoCommit, EmbeddedKafkaBroker embeddedKafka);
    public static Map<String, Object> consumerProps(String brokers, String group);
    public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit);
    public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka);
    public static Map<String, Object> producerProps(String brokers);
    public static Map<String, Object> streamsProps(String applicationId, String brokers);
    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic);
    public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout);
    @Nullable
    public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition, boolean seekToLast, boolean commit, Duration timeout);
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer);
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout);
    public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords);
    public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception;
    public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception;
    public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic, Integer... partitions);
    public static Object getPropertyValue(Object root, String propertyPath);
    public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type);
    public static Properties defaultPropertyOverrides();
}

Test Utilities

Assertion and Matching

Hamcrest matchers and AssertJ conditions for validating Kafka message behavior. Provides fluent assertions for record keys, values, partitions, and timestamps.

public final class KafkaMatchers {
    public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key);
    public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value);
    public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition);
    public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts);
}

public final class KafkaConditions {
    public static <K> Condition<ConsumerRecord<K, ?>> key(K key);
    public static <V> Condition<ConsumerRecord<?, V>> value(V value);
    public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value);
    public static Condition<ConsumerRecord<?, ?>> partition(int partition);
}

Assertion and Matching

JUnit Integration

JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions.

public class EmbeddedKafkaRule extends ExternalResource {
    public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);
    public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);
    public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);
    public EmbeddedKafkaBroker getEmbeddedKafka();
}

public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
    public static EmbeddedKafkaBroker getBroker();
}

JUnit Integration

Container Testing

Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management.

public final class ContainerTestUtils {
    public static void waitForAssignment(Object container, int partitions);
}

Container Testing

Types

public class BrokerAddress {
    public static final int DEFAULT_PORT = 9092;
    
    public BrokerAddress(String host, int port);
    public BrokerAddress(String host);
    public static BrokerAddress fromAddress(String address);
    public String getHost();
    public int getPort();
}

public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
    public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
    public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
}