CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-kafka--spring-kafka-test

Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications

Pending
Overview
Eval results
Files

junit-integration.mddocs/

JUnit Integration

JUnit rules and extensions for managing embedded Kafka broker lifecycle. Supports both JUnit 4 rules and JUnit 5 extensions with automatic broker setup and teardown.

Capabilities

JUnit 4 Rule Support

JUnit 4 rule wrapper for embedded Kafka broker with automatic lifecycle management.

/**
 * A JUnit rules TestRule wrapper around an EmbeddedKafkaBroker
 */
public class EmbeddedKafkaRule extends ExternalResource {
    /**
     * Create embedded Kafka brokers
     * @param count the number of brokers
     */
    public EmbeddedKafkaRule(int count);

    /**
     * Create embedded Kafka brokers
     * @param count the number of brokers
     * @param controlledShutdown passed into TestUtils.createBrokerConfig
     * @param topics the topics to create (2 partitions per)
     */
    public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics);

    /**
     * Create embedded Kafka brokers listening on random ports
     * @param count the number of brokers
     * @param controlledShutdown passed into TestUtils.createBrokerConfig
     * @param partitions partitions per topic
     * @param topics the topics to create
     */
    public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics);

    /**
     * Specify the properties to configure Kafka Broker before start
     * @param brokerProperties the properties to use for configuring Kafka Broker(s)
     * @return this for chaining configuration
     */
    public EmbeddedKafkaRule brokerProperties(Map<String, String> brokerProperties);

    /**
     * Specify a broker property
     * @param property the property name
     * @param value the value
     * @return the EmbeddedKafkaRule
     */
    public EmbeddedKafkaRule brokerProperty(String property, Object value);

    /**
     * Set explicit ports on which the kafka brokers will listen
     * @param kafkaPorts the ports
     * @return the rule
     */
    public EmbeddedKafkaRule kafkaPorts(int... kafkaPorts);

    /**
     * Set ZooKeeper port
     * @param port the port
     * @return the rule
     */
    public EmbeddedKafkaRule zkPort(int port);

    /**
     * Return an underlying delegator EmbeddedKafkaBroker instance
     * @return the EmbeddedKafkaBroker instance
     */
    public EmbeddedKafkaBroker getEmbeddedKafka();
}

JUnit 5 Extension Support

JUnit 5 condition and extension for embedded broker setup with parameter injection support.

/**
 * JUnit5 condition for an embedded broker
 */
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {
    /**
     * Check if parameter is supported for injection
     * @param parameterContext the parameter context
     * @param extensionContext the extension context
     * @return true if EmbeddedKafkaBroker parameter is supported
     */
    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext);

    /**
     * Resolve parameter for injection
     * @param parameterContext the parameter context
     * @param context the extension context
     * @return the EmbeddedKafkaBroker instance
     */
    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context);

    /**
     * Clean up after all tests
     * @param context the extension context
     */
    public void afterAll(ExtensionContext context);

    /**
     * Evaluate execution condition
     * @param context the extension context
     * @return condition evaluation result
     */
    public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context);

    /**
     * Get the current thread's broker instance
     * @return the EmbeddedKafkaBroker
     */
    public static EmbeddedKafkaBroker getBroker();
}

Global Test Execution Listener

Spring TestContext integration for global embedded Kafka lifecycle management.

/**
 * Global test execution listener for embedded Kafka lifecycle
 */
public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {
    // Automatic lifecycle management through Spring TestContext
}

Usage Examples:

// JUnit 4 Rule - Basic Setup
public class JUnit4KafkaTest {
    @Rule
    public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, "test-topic");
    
    @Test
    public void testKafkaWithRule() throws Exception {
        EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
        
        // Create consumer and producer
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
        
        try (Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
             Producer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
            
            broker.consumeFromAnEmbeddedTopic(consumer, "test-topic");
            producer.send(new ProducerRecord<>("test-topic", 1, "test-message"));
            
            ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
            assertEquals("test-message", record.value());
        }
    }
}

// JUnit 4 Rule - Advanced Configuration
public class JUnit4AdvancedKafkaTest {
    @Rule
    public EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 3, "orders", "payments")
        .brokerProperties(Map.of(
            "auto.create.topics.enable", "true",
            "transaction.state.log.replication.factor", "1"
        ))
        .brokerProperty("offsets.topic.replication.factor", "1")
        .kafkaPorts(9092, 9093)
        .zkPort(2181);
    
    @Test
    public void testMultiBrokerSetup() {
        EmbeddedKafkaBroker broker = embeddedKafka.getEmbeddedKafka();
        
        assertTrue(broker.getTopics().contains("orders"));
        assertTrue(broker.getTopics().contains("payments"));
        assertTrue(broker.getBrokersAsString().contains("9092"));
        assertTrue(broker.getBrokersAsString().contains("9093"));
    }
}

// JUnit 5 Extension - Parameter Injection
@ExtendWith(EmbeddedKafkaCondition.class)
@EmbeddedKafka(partitions = 1, topics = { "test-topic" })
public class JUnit5ParameterInjectionTest {
    
    @Test
    public void testWithInjectedBroker(EmbeddedKafkaBroker embeddedKafka) throws Exception {
        // Broker is automatically injected
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
        
        try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
             Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
            
            embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
            producer.send(new ProducerRecord<>("test-topic", "key", "value"));
            
            ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "test-topic");
            assertEquals("value", record.value());
        }
    }
    
    @Test
    public void testStaticBrokerAccess() {
        // Alternative access via static method
        EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
        assertNotNull(broker);
        assertTrue(broker.getTopics().contains("test-topic"));
    }
}

// JUnit 5 Extension - Without Spring
@EmbeddedKafka(count = 2, partitions = 2, topics = { "events", "commands" })
public class JUnit5NonSpringTest {
    
    @Test
    public void testNonSpringSetup() {
        // Access broker via static method when not using parameter injection
        EmbeddedKafkaBroker broker = EmbeddedKafkaCondition.getBroker();
        
        assertEquals(2, broker.getPartitionsPerTopic());
        assertTrue(broker.getTopics().contains("events"));
        assertTrue(broker.getTopics().contains("commands"));
    }
}

// JUnit 5 with Spring TestContext
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "integration-topic" })
public class JUnit5SpringIntegrationTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testSpringIntegration() {
        // Broker is automatically configured by Spring TestContext
        assertNotNull(embeddedKafka);
        assertTrue(embeddedKafka.getTopics().contains("integration-topic"));
    }
}

// Complex JUnit 4 Test with Multiple Rules
public class ComplexJUnit4Test {
    
    @Rule
    public EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, "source-topic", "sink-topic")
        .brokerProperty("auto.create.topics.enable", "false");
    
    @Rule
    public TestRule chain = RuleChain
        .outerRule(kafkaRule)
        .around(new CustomTestRule());
    
    @Test
    public void testKafkaStreamsProcessing() throws Exception {
        EmbeddedKafkaBroker broker = kafkaRule.getEmbeddedKafka();
        
        // Setup Kafka Streams
        Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("test-streams", broker.getBrokersAsString());
        
        // Build topology
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("source-topic")
               .mapValues(value -> value.toString().toUpperCase())
               .to("sink-topic");
        
        Topology topology = builder.build();
        
        try (KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps))) {
            streams.start();
            
            // Send input message
            Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
                producer.send(new ProducerRecord<>("source-topic", "key", "hello world"));
            }
            
            // Verify output
            Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", broker);
            try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                broker.consumeFromAnEmbeddedTopic(consumer, "sink-topic");
                ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "sink-topic");
                assertEquals("HELLO WORLD", record.value());
            }
        }
    }
}

// Nested Test Classes with JUnit 5
@EmbeddedKafka(partitions = 1, topics = { "parent-topic" })
public class NestedJUnit5Test {
    
    @Nested
    @EmbeddedKafka(partitions = 2, topics = { "nested-topic" })
    class NestedTests {
        
        @Test
        public void testNestedBroker(EmbeddedKafkaBroker embeddedKafka) {
            // This broker has the nested configuration
            assertTrue(embeddedKafka.getTopics().contains("nested-topic"));
            assertEquals(2, embeddedKafka.getPartitionsPerTopic());
        }
    }
    
    @Test
    public void testParentBroker(EmbeddedKafkaBroker embeddedKafka) {
        // This broker has the parent configuration
        assertTrue(embeddedKafka.getTopics().contains("parent-topic"));
        assertEquals(1, embeddedKafka.getPartitionsPerTopic());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test

docs

assertion-matching.md

container-testing.md

embedded-brokers.md

index.md

junit-integration.md

test-utilities.md

testing-annotations.md

tile.json