Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications
—
JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions with automatic broker setup and teardown.
JUnit 4 rule wrapper for embedded Kafka broker with automatic lifecycle management.
/**
* A JUnit rules TestRule wrapper around an EmbeddedKafkaBroker
*/
public class EmbeddedKafkaRule extends ExternalResource {
/**
* Create embedded Kafka brokers
* @param count the number of brokers
*/
public EmbeddedKafkaRule(int count);
/**
* Create embedded Kafka brokers
* @param count the number of brokers
* @param controlledShutdown passed into TestUtils.createBrokerConfig
* @param topics the topics to create (2 partitions per)
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics);
/**
* Create embedded Kafka brokers listening on random ports
* @param count the number of brokers
* @param controlledShutdown passed into TestUtils.createBrokerConfig
* @param partitions partitions per topic
* @param topics the topics to create
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);
/**
* Specify the properties to configure Kafka Broker before start
* @param brokerProperties the properties to use for configuring Kafka Broker(s)
* @return this for chaining configuration
*/
public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);
/**
* Specify a broker property
* @param property the property name
* @param value the value
* @return the EmbeddedKafkaRule
*/
public EmbeddedKafkaRule brokerProperty(String property, Object value);
/**
* Set explicit ports on which the kafka brokers will listen
* @param kafkaPorts the ports
* @return the rule
*/
public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);
/**
* Set ZooKeeper port
* @param port the port
* @return the rule
*/
public EmbeddedKafkaRule zkPort(int port);
/**
* Return an underlying delegator EmbeddedKafkaBroker instance
* @return the EmbeddedKafkaBroker instance
*/
public EmbeddedKafkaBroker getEmbeddedKafka();
}JUnit 5 condition and extension for embedded broker setup with parameter injection support.
/**
* JUnit5 condition for an embedded broker
*/
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
/**
* Check if parameter is supported for injection
* @param parameterContext the parameter context
* @param extensionContext the extension context
* @return true if EmbeddedKafkaBroker parameter is supported
*/
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);
/**
* Resolve parameter for injection
* @param parameterContext the parameter context
* @param context the extension context
* @return the EmbeddedKafkaBroker instance
*/
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context);
/**
* Clean up after all tests
* @param context the extension context
*/
public void afterAll(ExtensionContext context);
/**
* Evaluate execution condition
* @param context the extension context
* @return condition evaluation result
*/
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context);
/**
* Get the current thread's broker instance
* @return the EmbeddedKafkaBroker
*/
public static EmbeddedKafkaBroker getBroker();
}Spring TestContext integration for global embedded Kafka lifecycle management.
/**
* Global test execution listener for embedded Kafka lifecycle
*/
public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {
// Automatic lifecycle management through Spring TestContext
}Usage Examples:
// JUnit 4 Rule - Basic Setup
public class JUnit4KafkaTest {
@Rule
public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, "test-topic");
@Test
public void testKafkaWithRule() throws Exception {
EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
// Create consumer and producer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
try (Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
Producer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertEquals("test-message", record.value());
}
}
}
// JUnit 4 Rule - Advanced Configuration
public class JUnit4AdvancedKafkaTest {
@Rule
public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 3, "orders", "payments")
.brokerProperties(Map.of(
"auto.create.topics.enable", "true",
"transaction.state.log.replication.factor", "1"
))
.brokerProperty("offsets.topic.replication.factor", "1")
.kafkaPorts(9092, 9093)
.zkPort(2181);
@Test
public void testMultiBrokerSetup() {
EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
assertTrue(broker.getTopics().contains("orders"));
assertTrue(broker.getTopics().contains("payments"));
assertTrue(broker.getBrokersAsString().contains("9092"));
assertTrue(broker.getBrokersAsString().contains("9093"));
}
}
// JUnit 5 Extension - Parameter Injection
@ExtendWith(EmbeddedKafkaCondition.class)
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
public class JUnit5ParameterInjectionTest {
@Test
public void testWithInjectedBroker(EmbeddedKafkaBroker embeddedKafka) throws Exception {
// Broker is automatically injected
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
producer.send(new ProducerRecord<>("test-topic", "key", "value"));
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertEquals("value", record.value());
}
}
@Test
public void testStaticBrokerAccess() {
// Alternative access via static method
EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
assertNotNull(broker);
assertTrue(broker.getTopics().contains("test-topic"));
}
}
// JUnit 5 Extension - Without Spring
@EmbeddedKafka(count = 2, partitions = 2, topics = { "events", "commands" })
public class JUnit5NonSpringTest {
@Test
public void testNonSpringSetup() {
// Access broker via static method when not using parameter injection
EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
assertEquals(2, broker.getPartitionsPerTopic());
assertTrue(broker.getTopics().contains("events"));
assertTrue(broker.getTopics().contains("commands"));
}
}
// JUnit 5 with Spring TestContext
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "integration-topic" })
public class JUnit5SpringIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testSpringIntegration() {
// Broker is automatically configured by Spring TestContext
assertNotNull(embeddedKafka);
assertTrue(embeddedKafka.getTopics().contains("integration-topic"));
}
}
// Complex JUnit 4 Test with Multiple Rules
public class ComplexJUnit4Test {
@Rule
public EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, "source-topic", "sink-topic")
.brokerProperty("auto.create.topics.enable", "false");
@Rule
public TestRule chain = RuleChain
.outerRule(kafkaRule)
.around(new CustomTestRule());
@Test
public void testKafkaStreamsProcessing() throws Exception {
EmbeddedKafkaBroker broker = kafkaRule.getEmbeddedKafka();
// Setup Kafka Streams
Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("test-streams", broker.getBrokersAsString());
// Build topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("source-topic")
.mapValues(value -> value.toString().toUpperCase())
.to("sink-topic");
Topology topology = builder.build();
try (KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps))) {
streams.start();
// Send input message
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>("source-topic", "key", "hello world"));
}
// Verify output
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
broker.consumeFromAnEmbeddedTopic(consumer, "sink-topic");
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "sink-topic");
assertEquals("HELLO WORLD", record.value());
}
}
}
}
// Nested Test Classes with JUnit 5
@EmbeddedKafka(partitions = 1, topics = { "parent-topic" })
public class NestedJUnit5Test {
@Nested
@EmbeddedKafka(partitions = 2, topics = { "nested-topic" })
class NestedTests {
@Test
public void testNestedBroker(EmbeddedKafkaBroker embeddedKafka) {
// This broker has the nested configuration
assertTrue(embeddedKafka.getTopics().contains("nested-topic"));
assertEquals(2, embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testParentBroker(EmbeddedKafkaBroker embeddedKafka) {
// This broker has the parent configuration
assertTrue(embeddedKafka.getTopics().contains("parent-topic"));
assertEquals(1, embeddedKafka.getPartitionsPerTopic());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test