or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

testing.mddocs/streams/

Streams Testing

Kafka Streams provides comprehensive testing utilities for unit and integration testing of stream processing topologies without requiring a running Kafka broker.

TopologyTestDriver

Main testing utility for driving topology execution synchronously in tests.

package org.apache.kafka.streams;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;

import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class TopologyTestDriver implements Closeable {
    // Constructors
    public TopologyTestDriver(Topology topology);
    public TopologyTestDriver(Topology topology, Properties config);
    public TopologyTestDriver(Topology topology, Instant initialWallClockTime);
    public TopologyTestDriver(Topology topology,
                             Properties config,
                             Instant initialWallClockTime);

    // Input/Output topic creation
    public <K, V> TestInputTopic<K, V> createInputTopic(String topicName,
                                                         Serializer<K> keySerializer,
                                                         Serializer<V> valueSerializer);

    public <K, V> TestInputTopic<K, V> createInputTopic(String topicName,
                                                         Serializer<K> keySerializer,
                                                         Serializer<V> valueSerializer,
                                                         Instant startTimestamp,
                                                         Duration autoAdvance);

    public <K, V> TestOutputTopic<K, V> createOutputTopic(String topicName,
                                                           Deserializer<K> keyDeserializer,
                                                           Deserializer<V> valueDeserializer);

    // Time control
    public void advanceWallClockTime(Duration advance);

    // State store access
    public Map<String, StateStore> getAllStateStores();
    public StateStore getStateStore(String name);
    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name);
    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(String name);
    public <K, V> WindowStore<K, V> getWindowStore(String name);
    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(String name);
    public <K, V> SessionStore<K, V> getSessionStore(String name);

    // Metadata
    public Set<String> producedTopicNames();
    public Map<MetricName, ? extends Metric> metrics();

    // Resource cleanup
    public void close();
}

Basic Usage Example

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class StreamsTopologyTest {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;

    @BeforeEach
    public void setup() {
        // Build the topology
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
               .mapValues(value -> value.toUpperCase())
               .to("output-topic");
        Topology topology = builder.build();

        // Configure the test driver
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());

        testDriver = new TopologyTestDriver(topology, config);

        // Create input and output topics
        inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer()
        );

        outputTopic = testDriver.createOutputTopic(
            "output-topic",
            Serdes.String().deserializer(),
            Serdes.String().deserializer()
        );
    }

    @AfterEach
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void testUpperCase() {
        // Pipe input
        inputTopic.pipeInput("key1", "hello");
        inputTopic.pipeInput("key2", "world");

        // Read and verify output
        assertEquals("HELLO", outputTopic.readValue());
        assertEquals("WORLD", outputTopic.readValue());

        // Verify no more output
        assertEquals(true, outputTopic.isEmpty());
    }
}

Testing with Initial Wall Clock Time

import java.time.Instant;

public class TimeBasedTest {
    @Test
    public void testWithCustomInitialTime() {
        Instant initialTime = Instant.parse("2024-01-01T00:00:00Z");
        TopologyTestDriver testDriver = new TopologyTestDriver(
            topology,
            config,
            initialTime
        );

        // Test time-dependent operations
    }
}

TestInputTopic

Utility for piping test records to input topics.

package org.apache.kafka.streams;

import org.apache.kafka.streams.test.TestRecord;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

public class TestInputTopic<K, V> {
    // Single record input
    public void pipeInput(V value);
    public void pipeInput(K key, V value);
    public void pipeInput(TestRecord<K, V> record);

    // Single record input with explicit timestamp
    public void pipeInput(V value, Instant timestamp);
    public void pipeInput(K key, V value, long timestampMs);
    public void pipeInput(K key, V value, Instant timestamp);

    // Batch input
    public void pipeRecordList(List<? extends TestRecord<K, V>> records);
    public void pipeKeyValueList(List<KeyValue<K, V>> keyValues);
    public void pipeValueList(List<V> values);

    // Batch input with timestamps
    public void pipeKeyValueList(List<KeyValue<K, V>> keyValues,
                                 Instant startTimestamp,
                                 Duration advance);
    public void pipeValueList(List<V> values,
                             Instant startTimestamp,
                             Duration advance);

    // Time management
    public void advanceTime(Duration advance);
}

Input Topic Examples

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.test.TestRecord;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;

public class InputTopicExamples {
    @Test
    public void testSimpleInput() {
        // Simple value-only input (key will be null)
        inputTopic.pipeInput("value1");
        inputTopic.pipeInput("value2");

        // Key-value input
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");
    }

    @Test
    public void testInputWithTimestamps() {
        Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");

        // Input with explicit timestamp
        inputTopic.pipeInput("key1", "value1", baseTime);
        inputTopic.pipeInput("key2", "value2", baseTime.plusSeconds(10));

        // Input with millisecond timestamp
        inputTopic.pipeInput("key3", "value3", baseTime.toEpochMilli());
    }

    @Test
    public void testBatchInput() {
        // Batch of key-value pairs
        List<KeyValue<String, String>> keyValues = Arrays.asList(
            new KeyValue<>("key1", "value1"),
            new KeyValue<>("key2", "value2"),
            new KeyValue<>("key3", "value3")
        );
        inputTopic.pipeKeyValueList(keyValues);

        // Batch with timestamps
        Instant startTime = Instant.now();
        inputTopic.pipeKeyValueList(
            keyValues,
            startTime,
            Duration.ofSeconds(1)  // Each record 1 second apart
        );
    }

    @Test
    public void testInputWithAutoAdvance() {
        // Create topic with auto-advance enabled
        TestInputTopic<String, String> autoAdvanceTopic =
            testDriver.createInputTopic(
                "input-topic",
                Serdes.String().serializer(),
                Serdes.String().serializer(),
                Instant.parse("2024-01-01T00:00:00Z"),
                Duration.ofSeconds(1)  // Auto-advance by 1 second per record
            );

        // Each input automatically advances time by 1 second
        autoAdvanceTopic.pipeInput("key1", "value1");  // Time: 00:00:00
        autoAdvanceTopic.pipeInput("key2", "value2");  // Time: 00:00:01
        autoAdvanceTopic.pipeInput("key3", "value3");  // Time: 00:00:02
    }

    @Test
    public void testInputWithHeaders() {
        import org.apache.kafka.common.header.internals.RecordHeaders;

        RecordHeaders headers = new RecordHeaders();
        headers.add("correlation-id", "123".getBytes());
        headers.add("content-type", "application/json".getBytes());

        TestRecord<String, String> record = new TestRecord<>(
            "key1",
            "value1",
            headers,
            Instant.now()
        );

        inputTopic.pipeInput(record);
    }
}

TestOutputTopic

Utility for reading and verifying output records from topics.

package org.apache.kafka.streams;

import org.apache.kafka.streams.test.TestRecord;

import java.util.List;
import java.util.Map;

public class TestOutputTopic<K, V> {
    // Single record output
    public V readValue();
    public KeyValue<K, V> readKeyValue();
    public TestRecord<K, V> readRecord();

    // Batch output
    public List<V> readValuesToList();
    public List<KeyValue<K, V>> readKeyValuesToList();
    public List<TestRecord<K, V>> readRecordsToList();
    public Map<K, V> readKeyValuesToMap();

    // Topic status
    public long getQueueSize();
    public boolean isEmpty();
}

Output Topic Examples

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

public class OutputTopicExamples {
    @Test
    public void testReadSingleRecords() {
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");

        // Read individual records
        assertEquals("VALUE1", outputTopic.readValue());

        KeyValue<String, String> record = outputTopic.readKeyValue();
        assertEquals("key2", record.key);
        assertEquals("VALUE2", record.value);

        assertTrue(outputTopic.isEmpty());
    }

    @Test
    public void testReadWithHeaders() {
        inputTopic.pipeInput("key1", "value1");

        TestRecord<String, String> record = outputTopic.readRecord();
        assertEquals("key1", record.key());
        assertEquals("VALUE1", record.value());
        assertNotNull(record.headers());
        assertNotNull(record.timestamp());
    }

    @Test
    public void testReadBatch() {
        // Pipe multiple records
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");
        inputTopic.pipeInput("key3", "value3");

        // Read all at once
        List<KeyValue<String, String>> allRecords =
            outputTopic.readKeyValuesToList();

        assertEquals(3, allRecords.size());
        assertEquals("VALUE1", allRecords.get(0).value);
        assertEquals("VALUE2", allRecords.get(1).value);
        assertEquals("VALUE3", allRecords.get(2).value);
    }

    @Test
    public void testReadToMap() {
        // For table-like results (last update wins)
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");
        inputTopic.pipeInput("key1", "updated");  // Update key1

        Map<String, String> resultMap = outputTopic.readKeyValuesToMap();

        assertEquals(2, resultMap.size());
        assertEquals("UPDATED", resultMap.get("key1"));
        assertEquals("VALUE2", resultMap.get("key2"));
    }

    @Test
    public void testQueueStatus() {
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");

        assertEquals(2, outputTopic.getQueueSize());
        assertFalse(outputTopic.isEmpty());

        outputTopic.readValue();
        assertEquals(1, outputTopic.getQueueSize());

        outputTopic.readValue();
        assertTrue(outputTopic.isEmpty());
    }
}

TestRecord

Test record wrapper containing key, value, headers, and timestamp.

package org.apache.kafka.streams.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;

import java.time.Instant;

public class TestRecord<K, V> {
    // Constructors
    public TestRecord(K key, V value);
    public TestRecord(V value);
    public TestRecord(K key, V value, Instant recordTime);
    public TestRecord(K key, V value, Headers headers);
    public TestRecord(K key, V value, Headers headers, Instant recordTime);
    public TestRecord(K key, V value, Headers headers, Long timestampMs);
    public TestRecord(ConsumerRecord<K, V> record);
    public TestRecord(ProducerRecord<K, V> record);

    // Accessors
    public K key();
    public V value();
    public Headers headers();
    public Long timestamp();
    public Instant getRecordTime();

    // Deprecated accessors (for compatibility)
    public K getKey();
    public V getValue();
    public Headers getHeaders();
}

TestRecord Examples

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.test.TestRecord;

import java.time.Instant;

public class TestRecordExamples {
    @Test
    public void testCreateTestRecords() {
        // Simple key-value
        TestRecord<String, String> record1 =
            new TestRecord<>("key1", "value1");

        // Value-only
        TestRecord<String, String> record2 =
            new TestRecord<>("value-only");

        // With timestamp
        TestRecord<String, String> record3 =
            new TestRecord<>("key3", "value3", Instant.now());

        // With headers
        RecordHeaders headers = new RecordHeaders();
        headers.add("trace-id", "abc123".getBytes());
        TestRecord<String, String> record4 =
            new TestRecord<>("key4", "value4", headers);

        // Complete record
        TestRecord<String, String> record5 = new TestRecord<>(
            "key5",
            "value5",
            headers,
            Instant.parse("2024-01-01T00:00:00Z")
        );
    }

    @Test
    public void testAccessRecordMetadata() {
        RecordHeaders headers = new RecordHeaders();
        headers.add("source", "test".getBytes());

        TestRecord<String, String> record = new TestRecord<>(
            "myKey",
            "myValue",
            headers,
            Instant.parse("2024-01-01T12:00:00Z")
        );

        assertEquals("myKey", record.key());
        assertEquals("myValue", record.value());
        assertEquals(1704110400000L, record.timestamp());
        assertEquals("test",
            new String(record.headers().lastHeader("source").value()));
    }
}

Testing Stateful Operations

Testing with State Stores

import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;

public class StatefulTopologyTest {
    @Test
    public void testStatefulAggregation() {
        // Build topology with state store
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
               .groupByKey()
               .count(Materialized.as("count-store"))
               .toStream()
               .to("output-topic");

        Topology topology = builder.build();

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
            TestInputTopic<String, String> inputTopic =
                testDriver.createInputTopic(
                    "input-topic",
                    Serdes.String().serializer(),
                    Serdes.String().serializer()
                );

            TestOutputTopic<String, Long> outputTopic =
                testDriver.createOutputTopic(
                    "output-topic",
                    Serdes.String().deserializer(),
                    Serdes.Long().deserializer()
                );

            // Pipe input data
            inputTopic.pipeInput("key1", "value1");
            inputTopic.pipeInput("key1", "value2");
            inputTopic.pipeInput("key2", "value3");

            // Verify output
            assertEquals(1L, outputTopic.readValue());  // key1: 1
            assertEquals(2L, outputTopic.readValue());  // key1: 2
            assertEquals(1L, outputTopic.readValue());  // key2: 1

            // Access and verify state store
            KeyValueStore<String, Long> store =
                testDriver.getKeyValueStore("count-store");

            assertEquals(2L, store.get("key1"));
            assertEquals(1L, store.get("key2"));
            assertNull(store.get("key3"));
        }
    }
}

Testing with Pre-populated State

public class PrePopulatedStateTest {
    @Test
    public void testWithPrePopulatedState() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
               .groupByKey()
               .reduce((oldValue, newValue) -> oldValue + "," + newValue,
                      Materialized.as("reduce-store"))
               .toStream()
               .to("output-topic");

        Topology topology = builder.build();

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
            // Pre-populate state store
            KeyValueStore<String, String> store =
                testDriver.getKeyValueStore("reduce-store");
            store.put("existing-key", "initial-value");

            TestInputTopic<String, String> inputTopic =
                testDriver.createInputTopic(
                    "input-topic",
                    Serdes.String().serializer(),
                    Serdes.String().serializer()
                );

            TestOutputTopic<String, String> outputTopic =
                testDriver.createOutputTopic(
                    "output-topic",
                    Serdes.String().deserializer(),
                    Serdes.String().deserializer()
                );

            // Send update for existing key
            inputTopic.pipeInput("existing-key", "new-value");

            // Verify combined result
            assertEquals("initial-value,new-value", outputTopic.readValue());
        }
    }
}

Testing Windowed Operations

Time Window Testing

import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
import java.time.Instant;

public class WindowedTopologyTest {
    @Test
    public void testTumblingWindowAggregation() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
               .groupByKey()
               .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
               .count()
               .toStream()
               .to("output-topic");

        Topology topology = builder.build();
        Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");

        try (TopologyTestDriver testDriver =
                new TopologyTestDriver(topology, config, baseTime)) {

            TestInputTopic<String, String> inputTopic =
                testDriver.createInputTopic(
                    "input-topic",
                    Serdes.String().serializer(),
                    Serdes.String().serializer()
                );

            TestOutputTopic<Windowed<String>, Long> outputTopic =
                testDriver.createOutputTopic(
                    "output-topic",
                    new WindowedSerdes.TimeWindowedSerde<>(Serdes.String())
                        .deserializer(),
                    Serdes.Long().deserializer()
                );

            // First window: 00:00:00 - 00:05:00
            inputTopic.pipeInput("key1", "value1", baseTime);
            inputTopic.pipeInput("key1", "value2", baseTime.plusSeconds(60));

            // Second window: 00:05:00 - 00:10:00
            inputTopic.pipeInput("key1", "value3", baseTime.plusSeconds(300));

            // Verify first window results
            TestRecord<Windowed<String>, Long> record1 = outputTopic.readRecord();
            assertEquals("key1", record1.key().key());
            assertEquals(1L, record1.value());

            TestRecord<Windowed<String>, Long> record2 = outputTopic.readRecord();
            assertEquals("key1", record2.key().key());
            assertEquals(2L, record2.value());

            // Verify second window result
            TestRecord<Windowed<String>, Long> record3 = outputTopic.readRecord();
            assertEquals("key1", record3.key().key());
            assertEquals(1L, record3.value());
        }
    }
}

Advancing Wall Clock Time

public class WallClockTimeTest {
    @Test
    public void testWallClockTimePunctuation() {
        StreamsBuilder builder = new StreamsBuilder();

        // Add processor that uses wall-clock time punctuation
        builder.stream("input-topic")
               .process(() -> new WallClockProcessor())
               .to("output-topic");

        Topology topology = builder.build();
        Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");

        try (TopologyTestDriver testDriver =
                new TopologyTestDriver(topology, config, baseTime)) {

            TestInputTopic<String, String> inputTopic =
                testDriver.createInputTopic(
                    "input-topic",
                    Serdes.String().serializer(),
                    Serdes.String().serializer()
                );

            TestOutputTopic<String, String> outputTopic =
                testDriver.createOutputTopic(
                    "output-topic",
                    Serdes.String().deserializer(),
                    Serdes.String().deserializer()
                );

            // Input triggers stream-time punctuation automatically
            inputTopic.pipeInput("key1", "value1", baseTime);

            // Wall-clock punctuation requires manual time advancement
            testDriver.advanceWallClockTime(Duration.ofSeconds(30));

            // Verify punctuation was triggered
            assertFalse(outputTopic.isEmpty());
        }
    }
}

MockProcessorContext (Legacy API)

Mock processor context for testing custom processors (deprecated).

package org.apache.kafka.streams.processor;

import org.apache.kafka.common.header.Headers;

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Properties;

@Deprecated
public class MockProcessorContext implements ProcessorContext {
    // Constructors
    public MockProcessorContext();
    public MockProcessorContext(Properties config);
    public MockProcessorContext(Properties config, TaskId taskId, File stateDir);

    // Record metadata setters
    public void setRecordMetadata(String topic, int partition, long offset,
                                 Headers headers, long timestamp);
    public void setTopic(String topic);
    public void setPartition(int partition);
    public void setOffset(long offset);
    public void setHeaders(Headers headers);
    public void setRecordTimestamp(long recordTimestamp);
    public void setCurrentSystemTimeMs(long currentSystemTimeMs);
    public void setCurrentStreamTimeMs(long currentStreamTimeMs);

    // Mock inspection
    public List<CapturedForward> forwarded();
    public List<CapturedForward> forwarded(String childName);
    public void resetForwards();
    public List<CapturedPunctuator> scheduledPunctuators();
    public boolean committed();
    public void resetCommit();
}

MockProcessorContext (New API)

Mock processor context for testing custom processors with the new API.

package org.apache.kafka.streams.processor.api;

import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;

import java.io.File;
import java.util.List;
import java.util.Properties;

public class MockProcessorContext<KForward, VForward>
        implements ProcessorContext<KForward, VForward> {

    // Constructors
    public MockProcessorContext();
    public MockProcessorContext(Properties config);
    public MockProcessorContext(Properties config, TaskId taskId, File stateDir);

    // Record metadata setters
    public void setRecordMetadata(String topic, int partition, long offset);
    public void setCurrentSystemTimeMs(long currentSystemTimeMs);
    public void setCurrentStreamTimeMs(long currentStreamTimeMs);

    // State store management
    public <S extends StateStore> void addStateStore(S stateStore);
    public StateStoreContext getStateStoreContext();

    // Mock inspection
    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded();
    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(String childName);
    public void resetForwards();
    public List<CapturedPunctuator> scheduledPunctuators();
    public boolean committed();
    public void resetCommit();
}

Testing Custom Processors

import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;

public class CustomProcessorTest {
    @Test
    public void testCustomProcessor() {
        // Create mock context
        MockProcessorContext<String, String> context = new MockProcessorContext<>();
        context.setCurrentStreamTimeMs(1000L);
        context.setCurrentSystemTimeMs(2000L);
        context.setRecordMetadata("test-topic", 0, 10L);

        // Create and initialize processor
        Processor<String, String, String, String> processor =
            new MyCustomProcessor();
        processor.init(context);

        // Process records
        Record<String, String> inputRecord =
            new Record<>("key1", "value1", 1000L);
        processor.process(inputRecord);

        // Verify forwarded records
        List<MockProcessorContext.CapturedForward<String, String>> forwarded =
            context.forwarded();
        assertEquals(1, forwarded.size());

        Record<String, String> outputRecord = forwarded.get(0).record();
        assertEquals("key1", outputRecord.key());
        assertEquals("PROCESSED:value1", outputRecord.value());

        // Verify punctuators were scheduled
        List<MockProcessorContext.CapturedPunctuator> punctuators =
            context.scheduledPunctuators();
        assertEquals(1, punctuators.size());
        assertEquals(Duration.ofSeconds(10), punctuators.get(0).getInterval());

        processor.close();
    }
}

Testing Processor with State Store

import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

public class ProcessorWithStateTest {
    @Test
    public void testProcessorWithState() {
        MockProcessorContext<String, String> context = new MockProcessorContext<>();

        // Create and add in-memory state store
        KeyValueStore<String, Long> store =
            Stores.keyValueStoreBuilder(
                Stores.inMemoryKeyValueStore("my-store"),
                Serdes.String(),
                Serdes.Long()
            ).withLoggingDisabled()  // Disable changelog for testing
             .build();

        store.init(context.getStateStoreContext(), store);
        context.addStateStore(store);

        // Create processor that uses the store
        Processor<String, String, String, Long> processor =
            new StatefulProcessor();
        processor.init(context);

        // Process records
        processor.process(new Record<>("key1", "value1", 1000L));
        processor.process(new Record<>("key1", "value2", 2000L));
        processor.process(new Record<>("key2", "value3", 3000L));

        // Verify state store contents
        assertEquals(2L, store.get("key1"));
        assertEquals(1L, store.get("key2"));

        // Verify forwarded records
        List<MockProcessorContext.CapturedForward<String, Long>> forwarded =
            context.forwarded();
        assertEquals(3, forwarded.size());

        processor.close();
        store.close();
    }
}

Complete Test Example

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.*;

public class CompleteStreamProcessingTest {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, Long> outputTopic;

    @BeforeEach
    public void setup() {
        // Build topology: word count with filtering
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream(
            "input-topic",
            Consumed.with(Serdes.String(), Serdes.String())
        );

        source
            .filter((key, value) -> value != null && !value.isEmpty())
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
            .filter((key, word) -> word.length() > 3)
            .groupBy((key, word) -> word)
            .count(Materialized.as("word-count-store"))
            .toStream()
            .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        Topology topology = builder.build();

        // Configure test driver
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());

        testDriver = new TopologyTestDriver(
            topology,
            config,
            Instant.parse("2024-01-01T00:00:00Z")
        );

        // Create test topics
        inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer()
        );

        outputTopic = testDriver.createOutputTopic(
            "output-topic",
            Serdes.String().deserializer(),
            Serdes.Long().deserializer()
        );
    }

    @AfterEach
    public void tearDown() {
        if (testDriver != null) {
            testDriver.close();
        }
    }

    @Test
    public void testWordCountBasic() {
        // Input test data
        inputTopic.pipeInput(null, "hello world kafka streams");
        inputTopic.pipeInput(null, "kafka streams testing");

        // Read and verify output
        List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();

        // Expected: hello(1), world(1), kafka(1), streams(1), kafka(2), streams(2), testing(1)
        assertEquals(7, results.size());

        // Verify state store
        KeyValueStore<String, Long> store =
            testDriver.getKeyValueStore("word-count-store");

        assertEquals(2L, store.get("kafka"));
        assertEquals(2L, store.get("streams"));
        assertEquals(1L, store.get("hello"));
        assertEquals(1L, store.get("world"));
        assertEquals(1L, store.get("testing"));
    }

    @Test
    public void testWordCountFiltering() {
        // Test filtering of short words and empty strings
        inputTopic.pipeInput(null, "a an the hello");
        inputTopic.pipeInput(null, "");
        inputTopic.pipeInput(null, "test ok yes no");

        // Only words longer than 3 characters should be counted
        List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();

        // Expected: hello(1), test(1)
        assertEquals(2, results.size());
        assertTrue(results.stream().anyMatch(kv ->
            kv.key.equals("hello") && kv.value == 1L));
        assertTrue(results.stream().anyMatch(kv ->
            kv.key.equals("test") && kv.value == 1L));
    }

    @Test
    public void testWordCountWithTimestamps() {
        Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");

        // Input with explicit timestamps
        inputTopic.pipeInput(null, "hello world", baseTime);
        inputTopic.pipeInput(null, "kafka streams", baseTime.plusSeconds(10));
        inputTopic.pipeInput(null, "hello kafka", baseTime.plusSeconds(20));

        // Verify outputs were generated
        List<TestRecord<String, Long>> records = outputTopic.readRecordsToList();

        assertEquals(6, records.size());

        // Verify timestamps are preserved
        assertTrue(records.stream().allMatch(r -> r.timestamp() != null));
    }

    @Test
    public void testEmptyInput() {
        // Verify topology handles empty input gracefully
        assertTrue(outputTopic.isEmpty());

        KeyValueStore<String, Long> store =
            testDriver.getKeyValueStore("word-count-store");
        assertEquals(0L, store.approximateNumEntries());
    }
}

Best Practices

Test Configuration

public class TestConfiguration {
    public static Properties createTestConfig() {
        Properties config = new Properties();

        // Required properties
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app-id");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        // Default serdes
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                   Serdes.String().getClass().getName());

        // Disable caching for deterministic testing
        config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);

        // Disable commit interval (test driver commits after each record)
        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);

        return config;
    }
}

Resource Management

public class ResourceManagementTest {
    @Test
    public void testWithTryWithResources() {
        Properties config = createTestConfig();

        // Use try-with-resources for automatic cleanup
        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
            // Test code here
            TestInputTopic<String, String> inputTopic =
                testDriver.createInputTopic("input",
                    Serdes.String().serializer(),
                    Serdes.String().serializer());

            inputTopic.pipeInput("key", "value");

            // No need to explicitly close testDriver
        }
    }
}

Assertions

import static org.assertj.core.api.Assertions.assertThat;

public class AssertionExamples {
    @Test
    public void testWithAssertJ() {
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", "value2");

        List<KeyValue<String, String>> results =
            outputTopic.readKeyValuesToList();

        assertThat(results)
            .hasSize(2)
            .extracting(KeyValue::key)
            .containsExactly("key1", "key2");

        assertThat(outputTopic.isEmpty()).isTrue();
    }
}

Testing Error Handling

public class ErrorHandlingTest {
    @Test
    public void testDeserializationError() {
        // Configure with error handling
        Properties config = createTestConfig();
        config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                   LogAndContinueExceptionHandler.class.getName());

        try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
            TestInputTopic<byte[], byte[]> inputTopic =
                testDriver.createInputTopic("input",
                    new ByteArraySerializer(),
                    new ByteArraySerializer());

            // Send invalid data
            inputTopic.pipeInput(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});

            // Verify error was handled gracefully
            TestOutputTopic<String, String> outputTopic =
                testDriver.createOutputTopic("output",
                    Serdes.String().deserializer(),
                    Serdes.String().deserializer());

            assertTrue(outputTopic.isEmpty());
        }
    }
}

Testing Best Practices

Test Organization

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;

public class ComprehensiveStreamsTest {
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    
    @BeforeEach
    public void setup() {
        // Create topology
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input-topic")
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(String::toUpperCase)
            .to("output-topic");
        
        // Configure test driver
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        
        testDriver = new TopologyTestDriver(builder.build(), props);
        
        inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer()
        );
        
        outputTopic = testDriver.createOutputTopic(
            "output-topic",
            Serdes.String().deserializer(),
            Serdes.String().deserializer()
        );
    }
    
    @AfterEach
    public void tearDown() {
        testDriver.close();
    }
    
    @Test
    public void testNormalProcessing() {
        inputTopic.pipeInput("key1", "hello");
        assertEquals("HELLO", outputTopic.readValue());
    }
    
    @Test
    public void testNullValue() {
        inputTopic.pipeInput("key1", null);
        assertTrue(outputTopic.isEmpty()); // Filtered out
    }
    
    @Test
    public void testEmptyValue() {
        inputTopic.pipeInput("key1", "");
        assertTrue(outputTopic.isEmpty()); // Filtered out
    }
    
    @Test
    public void testMultipleRecords() {
        inputTopic.pipeInput("key1", "hello");
        inputTopic.pipeInput("key2", "world");
        
        assertEquals("HELLO", outputTopic.readValue());
        assertEquals("WORLD", outputTopic.readValue());
        assertTrue(outputTopic.isEmpty());
    }
}

Testing Edge Cases

Testing State Store Behavior

import org.apache.kafka.streams.state.*;

@Test
public void testStateStoreAggregation() {
    // Build topology with state store
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("input-topic")
        .groupByKey()
        .count(Materialized.as("counts-store"))
        .toStream()
        .to("output-topic");
    
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    
    try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props)) {
        TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer()
        );
        
        // Send test data
        inputTopic.pipeInput("user1", "event1");
        inputTopic.pipeInput("user1", "event2");
        inputTopic.pipeInput("user2", "event1");
        
        // Query state store directly
        KeyValueStore<String, Long> store = testDriver.getKeyValueStore("counts-store");
        
        assertEquals(2L, store.get("user1"));
        assertEquals(1L, store.get("user2"));
        assertNull(store.get("user3")); // Non-existent key
        
        // Verify output
        TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic(
            "output-topic",
            Serdes.String().deserializer(),
            Serdes.Long().deserializer()
        );
        
        assertEquals(1L, outputTopic.readValue()); // user1 first count
        assertEquals(2L, outputTopic.readValue()); // user1 updated count
        assertEquals(1L, outputTopic.readValue()); // user2 count
    }
}

Testing Time-Based Operations

import java.time.Duration;
import java.time.Instant;

@Test
public void testWindowedAggregation() {
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("input-topic")
        .groupByKey()
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
        .count()
        .toStream()
        .to("output-topic");
    
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    
    Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
    
    try (TopologyTestDriver testDriver = new TopologyTestDriver(
            builder.build(), props, baseTime)) {
        
        TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer(),
            baseTime,
            Duration.ZERO // No auto-advance
        );
        
        // Send records in different windows
        inputTopic.pipeInput("key1", "value1", baseTime);
        inputTopic.pipeInput("key1", "value2", baseTime.plusSeconds(60));
        inputTopic.pipeInput("key1", "value3", baseTime.plusSeconds(360)); // Different window
        
        // Advance time to close windows
        testDriver.advanceWallClockTime(Duration.ofMinutes(10));
        
        // Verify windowed counts
        TestOutputTopic<Windowed<String>, Long> outputTopic = 
            testDriver.createOutputTopic(
                "output-topic",
                WindowedSerdes.timeWindowedSerdeFrom(String.class).deserializer(),
                Serdes.Long().deserializer()
            );
        
        // First window: 2 records
        assertEquals(2L, outputTopic.readValue());
        // Second window: 1 record
        assertEquals(1L, outputTopic.readValue());
    }
}

Testing Error Scenarios

@Test
public void testProcessorException() {
    // Topology with processor that may throw exceptions
    Topology topology = new Topology();
    topology.addSource("Source", "input-topic");
    topology.addProcessor("Process", () -> new Processor<String, String, String, String>() {
        private ProcessorContext<String, String> context;
        
        @Override
        public void init(ProcessorContext<String, String> context) {
            this.context = context;
        }
        
        @Override
        public void process(Record<String, String> record) {
            if (record.value().equals("error")) {
                throw new RuntimeException("Simulated error");
            }
            context.forward(record);
        }
    }, "Source");
    topology.addSink("Sink", "output-topic", "Process");
    
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    
    try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
        TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
            "input-topic",
            Serdes.String().serializer(),
            Serdes.String().serializer()
        );
        
        // Test normal processing
        inputTopic.pipeInput("key1", "normal");
        
        // Test error case
        assertThrows(RuntimeException.class, () -> {
            inputTopic.pipeInput("key2", "error");
        });
    }
}