Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
—
Core embedded Kafka broker functionality providing both ZooKeeper-based and KRaft-based implementations for running Kafka instances in test environments without external dependencies.
Core interface defining embedded Kafka broker functionality with lifecycle management and topic operations.
/**
* Core interface for embedded Kafka broker functionality
*/
public interface EmbeddedKafkaBroker extends InitializingBean, DisposableBean {
int DEFAULT_ADMIN_TIMEOUT = 10;
String BEAN_NAME = "embeddedKafka";
String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
String LOOPBACK = "127.0.0.1";
/**
* Set explicit ports on which the kafka brokers will listen
* @param ports the ports
* @return the EmbeddedKafkaBroker
*/
EmbeddedKafkaBroker kafkaPorts(int... ports);
/**
* Specify properties to configure Kafka Broker before start
* @param properties the properties to use for configuring Kafka Broker(s)
* @return this for chaining configuration
*/
EmbeddedKafkaBroker brokerProperties(Map<String, String> properties);
/**
* Set the system property with this name to the list of broker addresses
* @param brokerListProperty the brokerListProperty to set
* @return this broker
*/
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
/**
* Set the timeout in seconds for admin operations
* @param adminTimeout the timeout
* @return the EmbeddedKafkaBroker
*/
EmbeddedKafkaBroker adminTimeout(int adminTimeout);
/**
* Get the bootstrap server addresses as a String
* @return the bootstrap servers
*/
String getBrokersAsString();
/**
* Add topics to the existing broker(s) using the configured number of partitions
* @param topicsToAdd the topics
*/
void addTopics(String... topicsToAdd);
/**
* Add topics to the existing broker(s)
* @param topicsToAdd the topics
*/
void addTopics(NewTopic... topicsToAdd);
/**
* Add topics to the existing broker(s) and return results
* @param topicsToAdd the topics
* @return the results; null values indicate success
*/
Map<String, Exception> addTopicsWithResults(String... topicsToAdd);
/**
* Add topics to the existing broker(s) and return results
* @param topicsToAdd the topics
* @return the results; null values indicate success
*/
Map<String, Exception> addTopicsWithResults(NewTopic... topicsToAdd);
/**
* Subscribe a consumer to one or more of the embedded topics
* @param consumer the consumer
* @param seekToEnd true to seek to the end instead of the beginning
* @param topicsToConsume the topics
*/
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume);
/**
* Subscribe a consumer to one or more of the embedded topics
* @param consumer the consumer
* @param topicsToConsume the topics
*/
void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume);
/**
* Subscribe a consumer to one of the embedded topics
* @param consumer the consumer
* @param seekToEnd true to seek to the end instead of the beginning
* @param topic the topic
*/
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic);
/**
* Subscribe a consumer to one of the embedded topics
* @param consumer the consumer
* @param topic the topic
*/
void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic);
/**
* Subscribe a consumer to all the embedded topics
* @param consumer the consumer
* @param seekToEnd true to seek to the end instead of the beginning
*/
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd);
/**
* Subscribe a consumer to all the embedded topics
* @param consumer the consumer
*/
void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer);
/**
* Get the topics
* @return the topics
*/
Set<String> getTopics();
/**
* Get the configured number of partitions per topic
* @return the partition count
*/
int getPartitionsPerTopic();
}Implementation using ZooKeeper for coordination, providing full Kafka functionality with traditional ZooKeeper-based coordination.
/**
* ZooKeeper-based embedded Kafka broker implementation
*/
public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
public static final String SPRING_EMBEDDED_ZOOKEEPER_CONNECT = "spring.embedded.zookeeper.connect";
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 18000;
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT = DEFAULT_ZK_SESSION_TIMEOUT;
/**
* Create embedded Kafka brokers
* @param count the number of brokers
*/
public EmbeddedKafkaZKBroker(int count);
/**
* Create embedded Kafka brokers
* @param count the number of brokers
* @param controlledShutdown passed into TestUtils.createBrokerConfig
* @param topics the topics to create (2 partitions per)
*/
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics);
/**
* Create embedded Kafka brokers listening on random ports
* @param count the number of brokers
* @param controlledShutdown passed into TestUtils.createBrokerConfig
* @param partitions partitions per topic
* @param topics the topics to create
*/
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics);
/**
* Specify a broker property
* @param property the property name
* @param value the value
* @return the EmbeddedKafkaBroker
*/
public EmbeddedKafkaBroker brokerProperty(String property, Object value);
/**
* Set an explicit port for the embedded Zookeeper
* @param port the port
* @return the EmbeddedKafkaBroker
*/
public EmbeddedKafkaZKBroker zkPort(int port);
/**
* Get the port that the embedded Zookeeper is running on or will run on
* @return the port
*/
public int getZkPort();
/**
* Set connection timeout for the client to the embedded Zookeeper
* @param zkConnectionTimeout the connection timeout
* @return the EmbeddedKafkaBroker
*/
public synchronized EmbeddedKafkaZKBroker zkConnectionTimeout(int zkConnectionTimeout);
/**
* Set session timeout for the client to the embedded Zookeeper
* @param zkSessionTimeout the session timeout
* @return the EmbeddedKafkaBroker
*/
public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout);
/**
* Create an AdminClient; invoke the callback and reliably close the admin
* @param callback the callback
*/
public void doWithAdmin(Consumer<AdminClient> callback);
/**
* Create an AdminClient; invoke the callback and reliably close the admin
* @param callback the callback
* @param <T> the function return type
* @return a map of results
*/
public <T> T doWithAdminFunction(Function<AdminClient, T> callback);
/**
* Get the underlying Kafka servers
* @return the Kafka servers
*/
public List<KafkaServer> getKafkaServers();
/**
* Get specific Kafka server
* @param id the server id
* @return the Kafka server
*/
public KafkaServer getKafkaServer(int id);
/**
* Get embedded ZooKeeper
* @return the ZooKeeper instance
*/
public EmbeddedZookeeper getZookeeper();
/**
* Return the ZooKeeperClient
* @return the client
*/
public synchronized ZooKeeperClient getZooKeeperClient();
/**
* Get ZooKeeper connection string
* @return the connection string
*/
public String getZookeeperConnectionString();
/**
* Get broker address by index
* @param i the index
* @return the broker address
*/
public BrokerAddress getBrokerAddress(int i);
/**
* Get all broker addresses
* @return array of broker addresses
*/
public BrokerAddress[] getBrokerAddresses();
/**
* Restart a broker
* @param brokerAddress the broker to restart
*/
public void bounce(BrokerAddress brokerAddress);
/**
* Restart broker by index
* @param index the broker index
* @throws Exception if restart fails
*/
public void restart(int index) throws Exception;
}Implementation using KRaft (Kafka Raft) for coordination, providing ZooKeeper-free Kafka functionality.
/**
* KRaft-based embedded Kafka broker implementation (ZooKeeper-free)
*/
public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
public static final int DEFAULT_ADMIN_TIMEOUT = 10;
/**
* Create embedded Kafka brokers listening on random ports
* @param count the number of brokers
* @param partitions partitions per topic
* @param topics the topics to create
*/
public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics);
/**
* Specify a broker property
* @param property the property name
* @param value the value
* @return the EmbeddedKafkaBroker
*/
public EmbeddedKafkaBroker brokerProperty(String property, Object value);
/**
* Set the timeout in seconds for admin operations
* @param adminTimeout the timeout
*/
public void setAdminTimeout(int adminTimeout);
/**
* Create an AdminClient; invoke the callback and reliably close the admin
* @param callback the callback
*/
public void doWithAdmin(Consumer<AdminClient> callback);
/**
* Create an AdminClient; invoke the callback and reliably close the admin
* @param callback the callback
* @param <T> the function return type
* @return a map of results
*/
public <T> T doWithAdminFunction(Function<AdminClient, T> callback);
/**
* Get underlying test cluster
* @return the cluster
*/
public KafkaClusterTestKit getCluster();
}Factory for creating embedded Kafka brokers from annotation configuration.
/**
* Factory to encapsulate EmbeddedKafkaBroker creation logic
*/
public final class EmbeddedKafkaBrokerFactory {
/**
* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
* @param embeddedKafka the EmbeddedKafka annotation
* @return a new EmbeddedKafkaBroker instance
*/
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka);
/**
* Create an EmbeddedKafkaBroker based on the EmbeddedKafka annotation
* @param embeddedKafka the EmbeddedKafka annotation
* @param propertyResolver the Function for placeholders in the annotation attributes
* @return a new EmbeddedKafkaBroker instance
*/
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver);
}Usage Examples:
// ZooKeeper-based broker
EmbeddedKafkaBroker zkBroker = new EmbeddedKafkaZKBroker(1, false, 2, "test-topic")
.brokerProperty("auto.create.topics.enable", "true")
.kafkaPorts(9092)
.zkPort(2181);
zkBroker.afterPropertiesSet();
// KRaft-based broker
EmbeddedKafkaBroker kraftBroker = new EmbeddedKafkaKraftBroker(1, 2, "test-topic")
.brokerProperty("auto.create.topics.enable", "true")
.adminTimeout(30);
kraftBroker.afterPropertiesSet();
// Using factory
@EmbeddedKafka(kraft = true, topics = "test-topic")
EmbeddedKafkaBroker broker = EmbeddedKafkaBrokerFactory.create(embeddedKafkaAnnotation);
// Topic management
broker.addTopics("new-topic");
broker.addTopics(new NewTopic("configured-topic", 3, (short) 1));
// Consumer subscription
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");public class BrokerAddress {
public static final int DEFAULT_PORT = 9092;
public BrokerAddress(String host, int port);
public BrokerAddress(String host);
public BrokerAddress(BrokerEndPoint broker);
public static BrokerAddress fromAddress(String address);
public String getHost();
public int getPort();
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test