CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-kafka--spring-kafka-test

Spring Kafka Test Support provides embedded Kafka broker and testing utilities for Spring Kafka applications

Pending
Overview
Eval results
Files

container-testing.mddocs/

Container Testing

Utilities for testing Spring Kafka listener containers. Provides methods for waiting on partition assignments and container lifecycle management without hard dependencies on container classes.

Capabilities

Container Utilities

Utilities for testing listener containers with partition assignment waiting and lifecycle management.

/**
 * Utilities for testing listener containers
 * No hard references to container classes are used to avoid circular project dependencies
 */
public final class ContainerTestUtils {
    /**
     * Wait until the container has the required number of assigned partitions
     * @param container the container
     * @param partitions the number of partitions
     * @throws IllegalStateException if the operation cannot be completed as expected
     * @throws ContainerTestUtilsException if the call to the container's getAssignedPartitions() method fails
     */
    public static void waitForAssignment(Object container, int partitions);
}

/**
 * Exception thrown when container test utilities fail
 */
public static class ContainerTestUtilsException extends RuntimeException {
    ContainerTestUtilsException(String message, Throwable cause);
}

Usage Examples:

// Basic container assignment waiting
@SpringBootTest
@EmbeddedKafka(partitions = 3, topics = { "test-topic" })
public class ContainerAssignmentTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testSingleContainerAssignment() throws Exception {
        // Configure consumer properties
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group", "false", embeddedKafka);
        
        // Create container factory
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
        
        // Create container
        ContainerProperties containerProperties = new ContainerProperties("test-topic");
        containerProperties.setGroupId("test-group");
        
        KafkaMessageListenerContainer<Integer, String> container = 
            new KafkaMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
        
        container.start();
        try {
            // Wait for all 3 partitions to be assigned
            ContainerTestUtils.waitForAssignment(container, 3);
            
            // Container is now ready for testing
            assertTrue(container.isRunning());
            
        } finally {
            container.stop();
        }
    }
}

// Multi-container assignment waiting
@SpringBootTest
@EmbeddedKafka(partitions = 6, topics = { "multi-partition-topic" })
public class MultiContainerAssignmentTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testConcurrentContainerAssignment() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("multi-group", "false", embeddedKafka);
        
        // Create concurrent container factory
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
        factory.setConcurrency(3); // 3 consumer threads
        
        // Create concurrent container
        ContainerProperties containerProperties = new ContainerProperties("multi-partition-topic");
        containerProperties.setGroupId("multi-group");
        
        ConcurrentMessageListenerContainer<String, String> container = 
            new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProperties);
        container.setConcurrency(3);
        
        container.start();
        try {
            // Wait for all 6 partitions to be assigned across the 3 concurrent containers
            ContainerTestUtils.waitForAssignment(container, 6);
            
            // All partitions are now assigned
            assertTrue(container.isRunning());
            
        } finally {
            container.stop();
        }
    }
}

// Integration test with message processing
@SpringBootTest
@EmbeddedKafka(partitions = 2, topics = { "processing-topic" })
public class MessageProcessingContainerTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    private final CountDownLatch latch = new CountDownLatch(3);
    private final List<String> receivedMessages = new ArrayList<>();
    
    @Test
    public void testMessageProcessingWithContainer() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("processing-group", "false", embeddedKafka);
        
        // Create container with message listener
        ContainerProperties containerProperties = new ContainerProperties("processing-topic");
        containerProperties.setGroupId("processing-group");
        containerProperties.setMessageListener(new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                receivedMessages.add(record.value());
                latch.countDown();
            }
        });
        
        DefaultKafkaConsumerFactory<String, String> consumerFactory = 
            new DefaultKafkaConsumerFactory<>(consumerProps);
        
        KafkaMessageListenerContainer<String, String> container = 
            new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        
        container.start();
        try {
            // Wait for partition assignment
            ContainerTestUtils.waitForAssignment(container, 2);
            
            // Send test messages
            Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
                producer.send(new ProducerRecord<>("processing-topic", "key1", "message1"));
                producer.send(new ProducerRecord<>("processing-topic", "key2", "message2"));
                producer.send(new ProducerRecord<>("processing-topic", "key3", "message3"));
            }
            
            // Wait for messages to be processed
            assertTrue(latch.await(30, TimeUnit.SECONDS));
            
            // Verify messages were received
            assertEquals(3, receivedMessages.size());
            assertTrue(receivedMessages.contains("message1"));
            assertTrue(receivedMessages.contains("message2"));
            assertTrue(receivedMessages.contains("message3"));
            
        } finally {
            container.stop();
        }
    }
}

// Error handling and timeout scenarios
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "error-topic" })
public class ContainerErrorHandlingTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testAssignmentTimeout() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("error-group", "false", embeddedKafka);
        
        // Create container with invalid topic (should not exist)
        ContainerProperties containerProperties = new ContainerProperties("non-existent-topic");
        containerProperties.setGroupId("error-group");
        
        DefaultKafkaConsumerFactory<String, String> consumerFactory = 
            new DefaultKafkaConsumerFactory<>(consumerProps);
        
        KafkaMessageListenerContainer<String, String> container = 
            new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        
        container.start();
        try {
            // This should throw IllegalStateException due to no partitions being assigned
            assertThrows(IllegalStateException.class, () -> {
                ContainerTestUtils.waitForAssignment(container, 1);
            });
            
        } finally {
            container.stop();
        }
    }
    
    @Test
    public void testPartialAssignment() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("partial-group", "false", embeddedKafka);
        
        ContainerProperties containerProperties = new ContainerProperties("error-topic");
        containerProperties.setGroupId("partial-group");
        
        DefaultKafkaConsumerFactory<String, String> consumerFactory = 
            new DefaultKafkaConsumerFactory<>(consumerProps);
        
        KafkaMessageListenerContainer<String, String> container = 
            new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        
        container.start();
        try {
            // Wait for actual partition count (1), not more
            ContainerTestUtils.waitForAssignment(container, 1);
            
            // This should throw because we expect more partitions than available
            assertThrows(IllegalStateException.class, () -> {
                ContainerTestUtils.waitForAssignment(container, 5);
            });
            
        } finally {
            container.stop();
        }
    }
}

// Custom container implementations
@SpringBootTest
@EmbeddedKafka(partitions = 4, topics = { "custom-topic" })
public class CustomContainerTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testCustomContainerImplementation() throws Exception {
        // Create a custom container that implements getAssignedPartitions()
        CustomKafkaContainer customContainer = new CustomKafkaContainer(embeddedKafka, "custom-topic");
        
        customContainer.start();
        try {
            // ContainerTestUtils uses reflection to call getAssignedPartitions()
            ContainerTestUtils.waitForAssignment(customContainer, 4);
            
            // Verify custom container is working
            assertTrue(customContainer.isRunning());
            assertEquals(4, customContainer.getAssignedPartitions().size());
            
        } finally {
            customContainer.stop();
        }
    }
    
    private static class CustomKafkaContainer {
        private Consumer<String, String> consumer;
        private boolean running = false;
        private Collection<TopicPartition> assignedPartitions = new HashSet<>();
        
        public CustomKafkaContainer(EmbeddedKafkaBroker broker, String topic) {
            Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("custom-group", "false", broker);
            this.consumer = new KafkaConsumer<>(consumerProps);
        }
        
        public void start() {
            consumer.subscribe(Arrays.asList("custom-topic"), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    assignedPartitions.clear();
                }
                
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    assignedPartitions = new HashSet<>(partitions);
                }
            });
            
            // Trigger initial assignment
            consumer.poll(Duration.ofMillis(100));
            running = true;
        }
        
        public void stop() {
            running = false;
            if (consumer != null) {
                consumer.close();
            }
        }
        
        public boolean isRunning() {
            return running;
        }
        
        // This method will be called by ContainerTestUtils via reflection
        public Collection<TopicPartition> getAssignedPartitions() {
            if (!running) {
                return Collections.emptyList();
            }
            
            // Poll to trigger rebalance if needed
            consumer.poll(Duration.ofMillis(100));
            return new HashSet<>(assignedPartitions);
        }
    }
}

// Kafka Streams container testing
@SpringBootTest
@EmbeddedKafka(partitions = 2, topics = { "input-topic", "output-topic" })
public class KafkaStreamsContainerTest {
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    
    @Test
    public void testKafkaStreamsWithContainerUtils() throws Exception {
        // Setup Kafka Streams
        Map<String, Object> streamsProps = KafkaTestUtils.streamsProps("streams-app", embeddedKafka.getBrokersAsString());
        
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
               .mapValues(value -> value.toString().toUpperCase())
               .to("output-topic");
        
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, new Properties(streamsProps));
        
        // Create output consumer
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("output-group", "false", embeddedKafka);
        
        ContainerProperties containerProperties = new ContainerProperties("output-topic");
        containerProperties.setGroupId("output-group");
        
        DefaultKafkaConsumerFactory<String, String> consumerFactory = 
            new DefaultKafkaConsumerFactory<>(consumerProps);
        
        KafkaMessageListenerContainer<String, String> outputContainer = 
            new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        
        try {
            streams.start();
            outputContainer.start();
            
            // Wait for output topic partition assignment
            ContainerTestUtils.waitForAssignment(outputContainer, 2);
            
            // Send input message
            Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
            try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
                producer.send(new ProducerRecord<>("input-topic", "key", "hello streams"));
            }
            
            // Verify output
            Map<String, Object> outputConsumerProps = KafkaTestUtils.consumerProps("verify-group", "false", embeddedKafka);
            try (Consumer<String, String> verifyConsumer = new KafkaConsumer<>(outputConsumerProps)) {
                embeddedKafka.consumeFromAnEmbeddedTopic(verifyConsumer, "output-topic");
                ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(verifyConsumer, "output-topic");
                assertEquals("HELLO STREAMS", record.value());
            }
            
        } finally {
            streams.close();
            outputContainer.stop();
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-kafka--spring-kafka-test

docs

assertion-matching.md

container-testing.md

embedded-brokers.md

index.md

junit-integration.md

test-utilities.md

testing-annotations.md

tile.json