tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka Streams provides comprehensive testing utilities for unit and integration testing of stream processing topologies without requiring a running Kafka broker.
Main testing utility for driving topology execution synchronously in tests.
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class TopologyTestDriver implements Closeable {
// Constructors
public TopologyTestDriver(Topology topology);
public TopologyTestDriver(Topology topology, Properties config);
public TopologyTestDriver(Topology topology, Instant initialWallClockTime);
public TopologyTestDriver(Topology topology,
Properties config,
Instant initialWallClockTime);
// Input/Output topic creation
public <K, V> TestInputTopic<K, V> createInputTopic(String topicName,
Serializer<K> keySerializer,
Serializer<V> valueSerializer);
public <K, V> TestInputTopic<K, V> createInputTopic(String topicName,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Instant startTimestamp,
Duration autoAdvance);
public <K, V> TestOutputTopic<K, V> createOutputTopic(String topicName,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer);
// Time control
public void advanceWallClockTime(Duration advance);
// State store access
public Map<String, StateStore> getAllStateStores();
public StateStore getStateStore(String name);
public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name);
public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(String name);
public <K, V> WindowStore<K, V> getWindowStore(String name);
public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(String name);
public <K, V> SessionStore<K, V> getSessionStore(String name);
// Metadata
public Set<String> producedTopicNames();
public Map<MetricName, ? extends Metric> metrics();
// Resource cleanup
public void close();
}import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StreamsTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
@BeforeEach
public void setup() {
// Build the topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.mapValues(value -> value.toUpperCase())
.to("output-topic");
Topology topology = builder.build();
// Configure the test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
testDriver = new TopologyTestDriver(topology, config);
// Create input and output topics
inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
outputTopic = testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.String().deserializer()
);
}
@AfterEach
public void tearDown() {
testDriver.close();
}
@Test
public void testUpperCase() {
// Pipe input
inputTopic.pipeInput("key1", "hello");
inputTopic.pipeInput("key2", "world");
// Read and verify output
assertEquals("HELLO", outputTopic.readValue());
assertEquals("WORLD", outputTopic.readValue());
// Verify no more output
assertEquals(true, outputTopic.isEmpty());
}
}import java.time.Instant;
public class TimeBasedTest {
@Test
public void testWithCustomInitialTime() {
Instant initialTime = Instant.parse("2024-01-01T00:00:00Z");
TopologyTestDriver testDriver = new TopologyTestDriver(
topology,
config,
initialTime
);
// Test time-dependent operations
}
}Utility for piping test records to input topics.
package org.apache.kafka.streams;
import org.apache.kafka.streams.test.TestRecord;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
public class TestInputTopic<K, V> {
// Single record input
public void pipeInput(V value);
public void pipeInput(K key, V value);
public void pipeInput(TestRecord<K, V> record);
// Single record input with explicit timestamp
public void pipeInput(V value, Instant timestamp);
public void pipeInput(K key, V value, long timestampMs);
public void pipeInput(K key, V value, Instant timestamp);
// Batch input
public void pipeRecordList(List<? extends TestRecord<K, V>> records);
public void pipeKeyValueList(List<KeyValue<K, V>> keyValues);
public void pipeValueList(List<V> values);
// Batch input with timestamps
public void pipeKeyValueList(List<KeyValue<K, V>> keyValues,
Instant startTimestamp,
Duration advance);
public void pipeValueList(List<V> values,
Instant startTimestamp,
Duration advance);
// Time management
public void advanceTime(Duration advance);
}import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.test.TestRecord;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
public class InputTopicExamples {
@Test
public void testSimpleInput() {
// Simple value-only input (key will be null)
inputTopic.pipeInput("value1");
inputTopic.pipeInput("value2");
// Key-value input
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
}
@Test
public void testInputWithTimestamps() {
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
// Input with explicit timestamp
inputTopic.pipeInput("key1", "value1", baseTime);
inputTopic.pipeInput("key2", "value2", baseTime.plusSeconds(10));
// Input with millisecond timestamp
inputTopic.pipeInput("key3", "value3", baseTime.toEpochMilli());
}
@Test
public void testBatchInput() {
// Batch of key-value pairs
List<KeyValue<String, String>> keyValues = Arrays.asList(
new KeyValue<>("key1", "value1"),
new KeyValue<>("key2", "value2"),
new KeyValue<>("key3", "value3")
);
inputTopic.pipeKeyValueList(keyValues);
// Batch with timestamps
Instant startTime = Instant.now();
inputTopic.pipeKeyValueList(
keyValues,
startTime,
Duration.ofSeconds(1) // Each record 1 second apart
);
}
@Test
public void testInputWithAutoAdvance() {
// Create topic with auto-advance enabled
TestInputTopic<String, String> autoAdvanceTopic =
testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer(),
Instant.parse("2024-01-01T00:00:00Z"),
Duration.ofSeconds(1) // Auto-advance by 1 second per record
);
// Each input automatically advances time by 1 second
autoAdvanceTopic.pipeInput("key1", "value1"); // Time: 00:00:00
autoAdvanceTopic.pipeInput("key2", "value2"); // Time: 00:00:01
autoAdvanceTopic.pipeInput("key3", "value3"); // Time: 00:00:02
}
@Test
public void testInputWithHeaders() {
import org.apache.kafka.common.header.internals.RecordHeaders;
RecordHeaders headers = new RecordHeaders();
headers.add("correlation-id", "123".getBytes());
headers.add("content-type", "application/json".getBytes());
TestRecord<String, String> record = new TestRecord<>(
"key1",
"value1",
headers,
Instant.now()
);
inputTopic.pipeInput(record);
}
}Utility for reading and verifying output records from topics.
package org.apache.kafka.streams;
import org.apache.kafka.streams.test.TestRecord;
import java.util.List;
import java.util.Map;
public class TestOutputTopic<K, V> {
// Single record output
public V readValue();
public KeyValue<K, V> readKeyValue();
public TestRecord<K, V> readRecord();
// Batch output
public List<V> readValuesToList();
public List<KeyValue<K, V>> readKeyValuesToList();
public List<TestRecord<K, V>> readRecordsToList();
public Map<K, V> readKeyValuesToMap();
// Topic status
public long getQueueSize();
public boolean isEmpty();
}import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
public class OutputTopicExamples {
@Test
public void testReadSingleRecords() {
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
// Read individual records
assertEquals("VALUE1", outputTopic.readValue());
KeyValue<String, String> record = outputTopic.readKeyValue();
assertEquals("key2", record.key);
assertEquals("VALUE2", record.value);
assertTrue(outputTopic.isEmpty());
}
@Test
public void testReadWithHeaders() {
inputTopic.pipeInput("key1", "value1");
TestRecord<String, String> record = outputTopic.readRecord();
assertEquals("key1", record.key());
assertEquals("VALUE1", record.value());
assertNotNull(record.headers());
assertNotNull(record.timestamp());
}
@Test
public void testReadBatch() {
// Pipe multiple records
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
inputTopic.pipeInput("key3", "value3");
// Read all at once
List<KeyValue<String, String>> allRecords =
outputTopic.readKeyValuesToList();
assertEquals(3, allRecords.size());
assertEquals("VALUE1", allRecords.get(0).value);
assertEquals("VALUE2", allRecords.get(1).value);
assertEquals("VALUE3", allRecords.get(2).value);
}
@Test
public void testReadToMap() {
// For table-like results (last update wins)
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
inputTopic.pipeInput("key1", "updated"); // Update key1
Map<String, String> resultMap = outputTopic.readKeyValuesToMap();
assertEquals(2, resultMap.size());
assertEquals("UPDATED", resultMap.get("key1"));
assertEquals("VALUE2", resultMap.get("key2"));
}
@Test
public void testQueueStatus() {
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
assertEquals(2, outputTopic.getQueueSize());
assertFalse(outputTopic.isEmpty());
outputTopic.readValue();
assertEquals(1, outputTopic.getQueueSize());
outputTopic.readValue();
assertTrue(outputTopic.isEmpty());
}
}Test record wrapper containing key, value, headers, and timestamp.
package org.apache.kafka.streams.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import java.time.Instant;
public class TestRecord<K, V> {
// Constructors
public TestRecord(K key, V value);
public TestRecord(V value);
public TestRecord(K key, V value, Instant recordTime);
public TestRecord(K key, V value, Headers headers);
public TestRecord(K key, V value, Headers headers, Instant recordTime);
public TestRecord(K key, V value, Headers headers, Long timestampMs);
public TestRecord(ConsumerRecord<K, V> record);
public TestRecord(ProducerRecord<K, V> record);
// Accessors
public K key();
public V value();
public Headers headers();
public Long timestamp();
public Instant getRecordTime();
// Deprecated accessors (for compatibility)
public K getKey();
public V getValue();
public Headers getHeaders();
}import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.test.TestRecord;
import java.time.Instant;
public class TestRecordExamples {
@Test
public void testCreateTestRecords() {
// Simple key-value
TestRecord<String, String> record1 =
new TestRecord<>("key1", "value1");
// Value-only
TestRecord<String, String> record2 =
new TestRecord<>("value-only");
// With timestamp
TestRecord<String, String> record3 =
new TestRecord<>("key3", "value3", Instant.now());
// With headers
RecordHeaders headers = new RecordHeaders();
headers.add("trace-id", "abc123".getBytes());
TestRecord<String, String> record4 =
new TestRecord<>("key4", "value4", headers);
// Complete record
TestRecord<String, String> record5 = new TestRecord<>(
"key5",
"value5",
headers,
Instant.parse("2024-01-01T00:00:00Z")
);
}
@Test
public void testAccessRecordMetadata() {
RecordHeaders headers = new RecordHeaders();
headers.add("source", "test".getBytes());
TestRecord<String, String> record = new TestRecord<>(
"myKey",
"myValue",
headers,
Instant.parse("2024-01-01T12:00:00Z")
);
assertEquals("myKey", record.key());
assertEquals("myValue", record.value());
assertEquals(1704110400000L, record.timestamp());
assertEquals("test",
new String(record.headers().lastHeader("source").value()));
}
}import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
public class StatefulTopologyTest {
@Test
public void testStatefulAggregation() {
// Build topology with state store
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
.count(Materialized.as("count-store"))
.toStream()
.to("output-topic");
Topology topology = builder.build();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
TestOutputTopic<String, Long> outputTopic =
testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.Long().deserializer()
);
// Pipe input data
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key1", "value2");
inputTopic.pipeInput("key2", "value3");
// Verify output
assertEquals(1L, outputTopic.readValue()); // key1: 1
assertEquals(2L, outputTopic.readValue()); // key1: 2
assertEquals(1L, outputTopic.readValue()); // key2: 1
// Access and verify state store
KeyValueStore<String, Long> store =
testDriver.getKeyValueStore("count-store");
assertEquals(2L, store.get("key1"));
assertEquals(1L, store.get("key2"));
assertNull(store.get("key3"));
}
}
}public class PrePopulatedStateTest {
@Test
public void testWithPrePopulatedState() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
.reduce((oldValue, newValue) -> oldValue + "," + newValue,
Materialized.as("reduce-store"))
.toStream()
.to("output-topic");
Topology topology = builder.build();
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
// Pre-populate state store
KeyValueStore<String, String> store =
testDriver.getKeyValueStore("reduce-store");
store.put("existing-key", "initial-value");
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.String().deserializer()
);
// Send update for existing key
inputTopic.pipeInput("existing-key", "new-value");
// Verify combined result
assertEquals("initial-value,new-value", outputTopic.readValue());
}
}
}import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
import java.time.Instant;
public class WindowedTopologyTest {
@Test
public void testTumblingWindowAggregation() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("output-topic");
Topology topology = builder.build();
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
try (TopologyTestDriver testDriver =
new TopologyTestDriver(topology, config, baseTime)) {
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
TestOutputTopic<Windowed<String>, Long> outputTopic =
testDriver.createOutputTopic(
"output-topic",
new WindowedSerdes.TimeWindowedSerde<>(Serdes.String())
.deserializer(),
Serdes.Long().deserializer()
);
// First window: 00:00:00 - 00:05:00
inputTopic.pipeInput("key1", "value1", baseTime);
inputTopic.pipeInput("key1", "value2", baseTime.plusSeconds(60));
// Second window: 00:05:00 - 00:10:00
inputTopic.pipeInput("key1", "value3", baseTime.plusSeconds(300));
// Verify first window results
TestRecord<Windowed<String>, Long> record1 = outputTopic.readRecord();
assertEquals("key1", record1.key().key());
assertEquals(1L, record1.value());
TestRecord<Windowed<String>, Long> record2 = outputTopic.readRecord();
assertEquals("key1", record2.key().key());
assertEquals(2L, record2.value());
// Verify second window result
TestRecord<Windowed<String>, Long> record3 = outputTopic.readRecord();
assertEquals("key1", record3.key().key());
assertEquals(1L, record3.value());
}
}
}public class WallClockTimeTest {
@Test
public void testWallClockTimePunctuation() {
StreamsBuilder builder = new StreamsBuilder();
// Add processor that uses wall-clock time punctuation
builder.stream("input-topic")
.process(() -> new WallClockProcessor())
.to("output-topic");
Topology topology = builder.build();
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
try (TopologyTestDriver testDriver =
new TopologyTestDriver(topology, config, baseTime)) {
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.String().deserializer()
);
// Input triggers stream-time punctuation automatically
inputTopic.pipeInput("key1", "value1", baseTime);
// Wall-clock punctuation requires manual time advancement
testDriver.advanceWallClockTime(Duration.ofSeconds(30));
// Verify punctuation was triggered
assertFalse(outputTopic.isEmpty());
}
}
}Mock processor context for testing custom processors (deprecated).
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.header.Headers;
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
@Deprecated
public class MockProcessorContext implements ProcessorContext {
// Constructors
public MockProcessorContext();
public MockProcessorContext(Properties config);
public MockProcessorContext(Properties config, TaskId taskId, File stateDir);
// Record metadata setters
public void setRecordMetadata(String topic, int partition, long offset,
Headers headers, long timestamp);
public void setTopic(String topic);
public void setPartition(int partition);
public void setOffset(long offset);
public void setHeaders(Headers headers);
public void setRecordTimestamp(long recordTimestamp);
public void setCurrentSystemTimeMs(long currentSystemTimeMs);
public void setCurrentStreamTimeMs(long currentStreamTimeMs);
// Mock inspection
public List<CapturedForward> forwarded();
public List<CapturedForward> forwarded(String childName);
public void resetForwards();
public List<CapturedPunctuator> scheduledPunctuators();
public boolean committed();
public void resetCommit();
}Mock processor context for testing custom processors with the new API.
package org.apache.kafka.streams.processor.api;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import java.io.File;
import java.util.List;
import java.util.Properties;
public class MockProcessorContext<KForward, VForward>
implements ProcessorContext<KForward, VForward> {
// Constructors
public MockProcessorContext();
public MockProcessorContext(Properties config);
public MockProcessorContext(Properties config, TaskId taskId, File stateDir);
// Record metadata setters
public void setRecordMetadata(String topic, int partition, long offset);
public void setCurrentSystemTimeMs(long currentSystemTimeMs);
public void setCurrentStreamTimeMs(long currentStreamTimeMs);
// State store management
public <S extends StateStore> void addStateStore(S stateStore);
public StateStoreContext getStateStoreContext();
// Mock inspection
public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded();
public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(String childName);
public void resetForwards();
public List<CapturedPunctuator> scheduledPunctuators();
public boolean committed();
public void resetCommit();
}import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
public class CustomProcessorTest {
@Test
public void testCustomProcessor() {
// Create mock context
MockProcessorContext<String, String> context = new MockProcessorContext<>();
context.setCurrentStreamTimeMs(1000L);
context.setCurrentSystemTimeMs(2000L);
context.setRecordMetadata("test-topic", 0, 10L);
// Create and initialize processor
Processor<String, String, String, String> processor =
new MyCustomProcessor();
processor.init(context);
// Process records
Record<String, String> inputRecord =
new Record<>("key1", "value1", 1000L);
processor.process(inputRecord);
// Verify forwarded records
List<MockProcessorContext.CapturedForward<String, String>> forwarded =
context.forwarded();
assertEquals(1, forwarded.size());
Record<String, String> outputRecord = forwarded.get(0).record();
assertEquals("key1", outputRecord.key());
assertEquals("PROCESSED:value1", outputRecord.value());
// Verify punctuators were scheduled
List<MockProcessorContext.CapturedPunctuator> punctuators =
context.scheduledPunctuators();
assertEquals(1, punctuators.size());
assertEquals(Duration.ofSeconds(10), punctuators.get(0).getInterval());
processor.close();
}
}import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
public class ProcessorWithStateTest {
@Test
public void testProcessorWithState() {
MockProcessorContext<String, String> context = new MockProcessorContext<>();
// Create and add in-memory state store
KeyValueStore<String, Long> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
).withLoggingDisabled() // Disable changelog for testing
.build();
store.init(context.getStateStoreContext(), store);
context.addStateStore(store);
// Create processor that uses the store
Processor<String, String, String, Long> processor =
new StatefulProcessor();
processor.init(context);
// Process records
processor.process(new Record<>("key1", "value1", 1000L));
processor.process(new Record<>("key1", "value2", 2000L));
processor.process(new Record<>("key2", "value3", 3000L));
// Verify state store contents
assertEquals(2L, store.get("key1"));
assertEquals(1L, store.get("key2"));
// Verify forwarded records
List<MockProcessorContext.CapturedForward<String, Long>> forwarded =
context.forwarded();
assertEquals(3, forwarded.size());
processor.close();
store.close();
}
}import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.*;
public class CompleteStreamProcessingTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
@BeforeEach
public void setup() {
// Build topology: word count with filtering
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(
"input-topic",
Consumed.with(Serdes.String(), Serdes.String())
);
source
.filter((key, value) -> value != null && !value.isEmpty())
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
.filter((key, word) -> word.length() > 3)
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"))
.toStream()
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
// Configure test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
testDriver = new TopologyTestDriver(
topology,
config,
Instant.parse("2024-01-01T00:00:00Z")
);
// Create test topics
inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
outputTopic = testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.Long().deserializer()
);
}
@AfterEach
public void tearDown() {
if (testDriver != null) {
testDriver.close();
}
}
@Test
public void testWordCountBasic() {
// Input test data
inputTopic.pipeInput(null, "hello world kafka streams");
inputTopic.pipeInput(null, "kafka streams testing");
// Read and verify output
List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();
// Expected: hello(1), world(1), kafka(1), streams(1), kafka(2), streams(2), testing(1)
assertEquals(7, results.size());
// Verify state store
KeyValueStore<String, Long> store =
testDriver.getKeyValueStore("word-count-store");
assertEquals(2L, store.get("kafka"));
assertEquals(2L, store.get("streams"));
assertEquals(1L, store.get("hello"));
assertEquals(1L, store.get("world"));
assertEquals(1L, store.get("testing"));
}
@Test
public void testWordCountFiltering() {
// Test filtering of short words and empty strings
inputTopic.pipeInput(null, "a an the hello");
inputTopic.pipeInput(null, "");
inputTopic.pipeInput(null, "test ok yes no");
// Only words longer than 3 characters should be counted
List<KeyValue<String, Long>> results = outputTopic.readKeyValuesToList();
// Expected: hello(1), test(1)
assertEquals(2, results.size());
assertTrue(results.stream().anyMatch(kv ->
kv.key.equals("hello") && kv.value == 1L));
assertTrue(results.stream().anyMatch(kv ->
kv.key.equals("test") && kv.value == 1L));
}
@Test
public void testWordCountWithTimestamps() {
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
// Input with explicit timestamps
inputTopic.pipeInput(null, "hello world", baseTime);
inputTopic.pipeInput(null, "kafka streams", baseTime.plusSeconds(10));
inputTopic.pipeInput(null, "hello kafka", baseTime.plusSeconds(20));
// Verify outputs were generated
List<TestRecord<String, Long>> records = outputTopic.readRecordsToList();
assertEquals(6, records.size());
// Verify timestamps are preserved
assertTrue(records.stream().allMatch(r -> r.timestamp() != null));
}
@Test
public void testEmptyInput() {
// Verify topology handles empty input gracefully
assertTrue(outputTopic.isEmpty());
KeyValueStore<String, Long> store =
testDriver.getKeyValueStore("word-count-store");
assertEquals(0L, store.approximateNumEntries());
}
}public class TestConfiguration {
public static Properties createTestConfig() {
Properties config = new Properties();
// Required properties
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app-id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Default serdes
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
// Disable caching for deterministic testing
config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
// Disable commit interval (test driver commits after each record)
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
return config;
}
}public class ResourceManagementTest {
@Test
public void testWithTryWithResources() {
Properties config = createTestConfig();
// Use try-with-resources for automatic cleanup
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
// Test code here
TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input",
Serdes.String().serializer(),
Serdes.String().serializer());
inputTopic.pipeInput("key", "value");
// No need to explicitly close testDriver
}
}
}import static org.assertj.core.api.Assertions.assertThat;
public class AssertionExamples {
@Test
public void testWithAssertJ() {
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
List<KeyValue<String, String>> results =
outputTopic.readKeyValuesToList();
assertThat(results)
.hasSize(2)
.extracting(KeyValue::key)
.containsExactly("key1", "key2");
assertThat(outputTopic.isEmpty()).isTrue();
}
}public class ErrorHandlingTest {
@Test
public void testDeserializationError() {
// Configure with error handling
Properties config = createTestConfig();
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName());
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
TestInputTopic<byte[], byte[]> inputTopic =
testDriver.createInputTopic("input",
new ByteArraySerializer(),
new ByteArraySerializer());
// Send invalid data
inputTopic.pipeInput(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
// Verify error was handled gracefully
TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output",
Serdes.String().deserializer(),
Serdes.String().deserializer());
assertTrue(outputTopic.isEmpty());
}
}
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
public class ComprehensiveStreamsTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
@BeforeEach
public void setup() {
// Create topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(String::toUpperCase)
.to("output-topic");
// Configure test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
testDriver = new TopologyTestDriver(builder.build(), props);
inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
outputTopic = testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.String().deserializer()
);
}
@AfterEach
public void tearDown() {
testDriver.close();
}
@Test
public void testNormalProcessing() {
inputTopic.pipeInput("key1", "hello");
assertEquals("HELLO", outputTopic.readValue());
}
@Test
public void testNullValue() {
inputTopic.pipeInput("key1", null);
assertTrue(outputTopic.isEmpty()); // Filtered out
}
@Test
public void testEmptyValue() {
inputTopic.pipeInput("key1", "");
assertTrue(outputTopic.isEmpty()); // Filtered out
}
@Test
public void testMultipleRecords() {
inputTopic.pipeInput("key1", "hello");
inputTopic.pipeInput("key2", "world");
assertEquals("HELLO", outputTopic.readValue());
assertEquals("WORLD", outputTopic.readValue());
assertTrue(outputTopic.isEmpty());
}
}import org.apache.kafka.streams.state.*;
@Test
public void testStateStoreAggregation() {
// Build topology with state store
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
.count(Materialized.as("counts-store"))
.toStream()
.to("output-topic");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props)) {
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
// Send test data
inputTopic.pipeInput("user1", "event1");
inputTopic.pipeInput("user1", "event2");
inputTopic.pipeInput("user2", "event1");
// Query state store directly
KeyValueStore<String, Long> store = testDriver.getKeyValueStore("counts-store");
assertEquals(2L, store.get("user1"));
assertEquals(1L, store.get("user2"));
assertNull(store.get("user3")); // Non-existent key
// Verify output
TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic(
"output-topic",
Serdes.String().deserializer(),
Serdes.Long().deserializer()
);
assertEquals(1L, outputTopic.readValue()); // user1 first count
assertEquals(2L, outputTopic.readValue()); // user1 updated count
assertEquals(1L, outputTopic.readValue()); // user2 count
}
}import java.time.Duration;
import java.time.Instant;
@Test
public void testWindowedAggregation() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("output-topic");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
Instant baseTime = Instant.parse("2024-01-01T00:00:00Z");
try (TopologyTestDriver testDriver = new TopologyTestDriver(
builder.build(), props, baseTime)) {
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer(),
baseTime,
Duration.ZERO // No auto-advance
);
// Send records in different windows
inputTopic.pipeInput("key1", "value1", baseTime);
inputTopic.pipeInput("key1", "value2", baseTime.plusSeconds(60));
inputTopic.pipeInput("key1", "value3", baseTime.plusSeconds(360)); // Different window
// Advance time to close windows
testDriver.advanceWallClockTime(Duration.ofMinutes(10));
// Verify windowed counts
TestOutputTopic<Windowed<String>, Long> outputTopic =
testDriver.createOutputTopic(
"output-topic",
WindowedSerdes.timeWindowedSerdeFrom(String.class).deserializer(),
Serdes.Long().deserializer()
);
// First window: 2 records
assertEquals(2L, outputTopic.readValue());
// Second window: 1 record
assertEquals(1L, outputTopic.readValue());
}
}@Test
public void testProcessorException() {
// Topology with processor that may throw exceptions
Topology topology = new Topology();
topology.addSource("Source", "input-topic");
topology.addProcessor("Process", () -> new Processor<String, String, String, String>() {
private ProcessorContext<String, String> context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
if (record.value().equals("error")) {
throw new RuntimeException("Simulated error");
}
context.forward(record);
}
}, "Source");
topology.addSink("Sink", "output-topic", "Process");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
try (TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic(
"input-topic",
Serdes.String().serializer(),
Serdes.String().serializer()
);
// Test normal processing
inputTopic.pipeInput("key1", "normal");
// Test error case
assertThrows(RuntimeException.class, () -> {
inputTopic.pipeInput("key2", "error");
});
}
}