CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-kafka--spring-kafka-test

Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications

Pending
Overview
Eval results
Files

embedded-brokers.mddocs/

Embedded Kafka Brokers

Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations for running Kafka instances in test environments without external dependencies.

Capabilities

EmbeddedKafkaBroker Interface

Core interface defining embedded Kafka broker functionality with lifecycle management and topic operations.

/**
 * Core interface for embedded Kafka broker functionality
 */
public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
    int DEFAULT_ADMIN_TIMEOUT = 10;
    String BEAN_NAME = "embeddedKafka";
    String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
    String LOOPBACK = "127.0.0.1";

    /**
     * Set explicit ports on which the kafka brokers will listen
     * @param ports the ports
     * @return the EmbeddedKafkaBroker
     */
    EmbeddedKafkaBroker kafkaPorts(int... ports);

    /**
     * Specify properties to configure Kafka Broker before start
     * @param properties the properties to use for configuring Kafka Broker(s)
     * @return this for chaining configuration
     */
    EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);

    /**
     * Set the system property with this name to the list of broker addresses
     * @param brokerListProperty the brokerListProperty to set
     * @return this broker
     */
    EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);

    /**
     * Set the timeout in seconds for admin operations
     * @param adminTimeout the timeout
     * @return the EmbeddedKafkaBroker
     */
    EmbeddedKafkaBroker adminTimeout(int adminTimeout);

    /**
     * Get the bootstrap server addresses as a String
     * @return the bootstrap servers
     */
    String getBrokersAsString();

    /**
     * Add topics to the existing broker(s) using the configured number of partitions
     * @param topicsToAdd the topics
     */
    void addTopics(String... topicsToAdd);

    /**
     * Add topics to the existing broker(s)
     * @param topicsToAdd the topics
     */
    void addTopics(NewTopic... topicsToAdd);

    /**
     * Add topics to the existing broker(s) and return results
     * @param topicsToAdd the topics
     * @return the results; null values indicate success
     */
    Map<String, Exception> addTopicsWithResults(String... topicsToAdd);

    /**
     * Add topics to the existing broker(s) and return results
     * @param topicsToAdd the topics
     * @return the results; null values indicate success
     */
    Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);

    /**
     * Subscribe a consumer to one or more of the embedded topics
     * @param consumer the consumer
     * @param seekToEnd true to seek to the end instead of the beginning
     * @param topicsToConsume the topics
     */
    void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);

    /**
     * Subscribe a consumer to one or more of the embedded topics
     * @param consumer the consumer
     * @param topicsToConsume the topics
     */
    void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);

    /**
     * Subscribe a consumer to one of the embedded topics
     * @param consumer the consumer
     * @param seekToEnd true to seek to the end instead of the beginning
     * @param topic the topic
     */
    void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);

    /**
     * Subscribe a consumer to one of the embedded topics
     * @param consumer the consumer
     * @param topic the topic
     */
    void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);

    /**
     * Subscribe a consumer to all the embedded topics
     * @param consumer the consumer
     * @param seekToEnd true to seek to the end instead of the beginning
     */
    void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);

    /**
     * Subscribe a consumer to all the embedded topics
     * @param consumer the consumer
     */
    void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);

    /**
     * Get the topics
     * @return the topics
     */
    Set<String> getTopics();

    /**
     * Get the configured number of partitions per topic
     * @return the partition count
     */
    int getPartitionsPerTopic();
}

ZooKeeper-based Embedded Broker

Implementation using ZooKeeper for coordination, providing full Kafka functionality with traditional ZooKeeper-based coordination.

/**
 * ZooKeeper-based embedded Kafka broker implementation
 */
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
    public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
    public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
    public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;

    /**
     * Create embedded Kafka brokers
     * @param count the number of brokers
     */
    public EmbeddedKafkaZKBroker(int count);

    /**
     * Create embedded Kafka brokers
     * @param count the number of brokers
     * @param controlledShutdown passed into TestUtils.createBrokerConfig
     * @param topics the topics to create (2 partitions per)
     */
    public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics);

    /**
     * Create embedded Kafka brokers listening on random ports
     * @param count the number of brokers
     * @param controlledShutdown passed into TestUtils.createBrokerConfig
     * @param partitions partitions per topic
     * @param topics the topics to create
     */
    public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics);

    /**
     * Specify a broker property
     * @param property the property name
     * @param value the value
     * @return the EmbeddedKafkaBroker
     */
    public EmbeddedKafkaBroker brokerProperty(String property, Object value);

    /**
     * Set an explicit port for the embedded Zookeeper
     * @param port the port
     * @return the EmbeddedKafkaBroker
     */
    public EmbeddedKafkaZKBroker zkPort(int port);

    /**
     * Get the port that the embedded Zookeeper is running on or will run on
     * @return the port
     */
    public int getZkPort();

    /**
     * Set connection timeout for the client to the embedded Zookeeper
     * @param zkConnectionTimeout the connection timeout
     * @return the EmbeddedKafkaBroker
     */
    public synchronized EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout);

    /**
     * Set session timeout for the client to the embedded Zookeeper
     * @param zkSessionTimeout the session timeout
     * @return the EmbeddedKafkaBroker
     */
    public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout);

    /**
     * Create an AdminClient; invoke the callback and reliably close the admin
     * @param callback the callback
     */
    public void doWithAdmin(Consumer<AdminClient> callback);

    /**
     * Create an AdminClient; invoke the callback and reliably close the admin
     * @param callback the callback
     * @param <T> the function return type
     * @return a map of results
     */
    public <T> T doWithAdminFunction(Function<AdminClient, T> callback);

    /**
     * Get the underlying Kafka servers
     * @return the Kafka servers
     */
    public List<KafkaServer> getKafkaServers();

    /**
     * Get specific Kafka server
     * @param id the server id
     * @return the Kafka server
     */
    public KafkaServer getKafkaServer(int id);

    /**
     * Get embedded ZooKeeper
     * @return the ZooKeeper instance
     */
    public EmbeddedZookeeper getZookeeper();

    /**
     * Return the ZooKeeperClient
     * @return the client
     */
    public synchronized ZooKeeperClient getZooKeeperClient();

    /**
     * Get ZooKeeper connection string
     * @return the connection string
     */
    public String getZookeeperConnectionString();

    /**
     * Get broker address by index
     * @param i the index
     * @return the broker address
     */
    public BrokerAddress getBrokerAddress(int i);

    /**
     * Get all broker addresses
     * @return array of broker addresses
     */
    public BrokerAddress[] getBrokerAddresses();

    /**
     * Restart a broker
     * @param brokerAddress the broker to restart
     */
    public void bounce(BrokerAddress brokerAddress);

    /**
     * Restart broker by index
     * @param index the broker index
     * @throws Exception if restart fails
     */
    public void restart(int index) throws Exception;
}

KRaft-based Embedded Broker

Implementation using KRaft (Kafka Raft) for coordination, providing ZooKeeper-free Kafka functionality.

/**
 * KRaft-based embedded Kafka broker implementation (ZooKeeper-free)
 */
public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
    public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    public static final int DEFAULT_ADMIN_TIMEOUT = 10;

    /**
     * Create embedded Kafka brokers listening on random ports
     * @param count the number of brokers
     * @param partitions partitions per topic
     * @param topics the topics to create
     */
    public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics);

    /**
     * Specify a broker property
     * @param property the property name
     * @param value the value
     * @return the EmbeddedKafkaBroker
     */
    public EmbeddedKafkaBroker brokerProperty(String property, Object value);

    /**
     * Set the timeout in seconds for admin operations
     * @param adminTimeout the timeout
     */
    public void setAdminTimeout(int adminTimeout);

    /**
     * Create an AdminClient; invoke the callback and reliably close the admin
     * @param callback the callback
     */
    public void doWithAdmin(Consumer<AdminClient> callback);

    /**
     * Create an AdminClient; invoke the callback and reliably close the admin
     * @param callback the callback
     * @param <T> the function return type
     * @return a map of results
     */
    public <T> T doWithAdminFunction(Function<AdminClient, T> callback);

    /**
     * Get underlying test cluster
     * @return the cluster
     */
    public KafkaClusterTestKit getCluster();
}

Broker Factory

Factory for creating embedded Kafka brokers from annotation configuration.

/**
 * Factory to encapsulate EmbeddedKafkaBroker creation logic
 */
public final class EmbeddedKafkaBrokerFactory {
    /**
     * Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
     * @param embeddedKafka the EmbeddedKafka annotation
     * @return a new EmbeddedKafkaBroker instance
     */
    public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka);

    /**
     * Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
     * @param embeddedKafka the EmbeddedKafka annotation
     * @param propertyResolver the Function for placeholders in the annotation attributes
     * @return a new EmbeddedKafkaBroker instance
     */
    public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver);
}

Usage Examples:

// ZooKeeper-based broker
EmbeddedKafkaBroker zkBroker = new EmbeddedKafkaZKBroker(1, false, 2, "test-topic")
    .brokerProperty("auto.create.topics.enable", "true")
    .kafkaPorts(9092)
    .zkPort(2181);
zkBroker.afterPropertiesSet();

// KRaft-based broker
EmbeddedKafkaBroker kraftBroker = new EmbeddedKafkaKraftBroker(1, 2, "test-topic")
    .brokerProperty("auto.create.topics.enable", "true")
    .adminTimeout(30);
kraftBroker.afterPropertiesSet();

// Using factory
@EmbeddedKafka(kraft = true, topics = "test-topic")
EmbeddedKafkaBroker broker = EmbeddedKafkaBrokerFactory.create(embeddedKafkaAnnotation);

// Topic management
broker.addTopics("new-topic");
broker.addTopics(new NewTopic("configured-topic", 3, (short) 1));

// Consumer subscription
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");

Types

public class BrokerAddress {
    public static final int DEFAULT_PORT = 9092;
    
    public BrokerAddress(String host, int port);
    public BrokerAddress(String host);
    public BrokerAddress(BrokerEndPoint broker);
    public static BrokerAddress fromAddress(String address);
    public String getHost();
    public int getPort();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test

docs

assertion-matching.md

container-testing.md

embedded-brokers.md

index.md

junit-integration.md

test-utilities.md

testing-annotations.md

tile.json