or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

dsl.mddocs/streams/

Streams DSL

The high-level Streams DSL provides functional-style operations for stream processing.

KStream

Stream of records with distinct key-value pairs.

package org.apache.kafka.streams.kstream;

public interface KStream<K, V> {
    // Filtering
    KStream<K, V> filter(Predicate<? super K, ? super V> predicate);
    KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named);
    KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate);
    KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named);

    // Key selection
    <KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper);
    <KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper,
                                       Named named);

    // Value transformation
    <VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper);
    <VOut> KStream<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);
    <VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper, Named named);

    // Key-value transformation
    <KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V,
        ? extends KeyValue<? extends KOut, ? extends VOut>> mapper);
    <KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V,
        ? extends KeyValue<? extends KOut, ? extends VOut>> mapper, Named named);

    // FlatMap
    <KOut, VOut> KStream<KOut, VOut> flatMap(KeyValueMapper<? super K, ? super V,
        ? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper);
    <VOut> KStream<K, VOut> flatMapValues(ValueMapper<? super V,
        ? extends Iterable<? extends VOut>> mapper);
    <VOut> KStream<K, VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V,
        ? extends Iterable<? extends VOut>> mapper);

    // Side effects
    void print(Printed<K, V> printed);
    void foreach(ForeachAction<? super K, ? super V> action);
    void foreach(ForeachAction<? super K, ? super V> action, Named named);
    KStream<K, V> peek(ForeachAction<? super K, ? super V> action);
    KStream<K, V> peek(ForeachAction<? super K, ? super V> action, Named named);

    // Branching and merging
    BranchedKStream<K, V> split();
    BranchedKStream<K, V> split(Named named);
    KStream<K, V> merge(KStream<K, V> stream);
    KStream<K, V> merge(KStream<K, V> stream, Named named);

    // Repartitioning
    KStream<K, V> repartition();
    KStream<K, V> repartition(Repartitioned<K, V> repartitioned);

    // Output
    void to(String topic);
    void to(String topic, Produced<K, V> produced);
    void to(TopicNameExtractor<K, V> topicExtractor);
    void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced);

    // Conversion
    KTable<K, V> toTable();
    KTable<K, V> toTable(Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    // Grouping
    KGroupedStream<K, V> groupByKey();
    KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped);
    <KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> selector);
    <KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> selector,
                                            Grouped<KOut, V> grouped);

    // Stream-Stream joins
    <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream,
                                          ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
                                          JoinWindows windows);
    <VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream,
                                          ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
                                          JoinWindows windows,
                                          StreamJoined<K, V, VRight> streamJoined);
    <VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream,
                                              ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
                                              JoinWindows windows);
    <VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream,
                                               ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
                                               JoinWindows windows);

    // Stream-Table joins
    <VTable, VOut> KStream<K, VOut> join(KTable<K, VTable> table,
                                          ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner);
    <VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table,
                                              ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner);

    // Stream-GlobalKTable joins
    <GK, GV, VOut> KStream<K, VOut> join(GlobalKTable<GK, GV> globalTable,
                                          KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
                                          ValueJoiner<? super V, ? super GV, ? extends VOut> joiner);
    <GK, GV, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GK, GV> globalTable,
                                              KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
                                              ValueJoiner<? super V, ? super GV, ? extends VOut> joiner);

    // Processor API integration
    <KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V,
        ? extends KOut, ? extends VOut> processorSupplier, String... stateStoreNames);
    <VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V,
        ? extends VOut> processorSupplier, String... stateStoreNames);
}

Usage Examples:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

// Filter
KStream<String, String> filtered = source.filter((key, value) -> value != null && value.length() > 5);

// Map values
KStream<String, String> uppercased = source.mapValues(value -> value.toUpperCase());

// Map with key
KStream<String, Integer> lengths = source.mapValues((key, value) -> value.length());

// FlatMap
KStream<String, String> words = source.flatMapValues(value ->
    Arrays.asList(value.toLowerCase().split("\\s+")));

// Select key
KStream<String, String> rekeyed = source.selectKey((key, value) -> value.substring(0, 1));

// Peek (for debugging)
source.peek((key, value) -> System.out.println("Processing: " + key + " -> " + value))
      .mapValues(String::toUpperCase)
      .to("output-topic");

// Branch
BranchedKStream<String, String> branched = source.split()
    .branch((key, value) -> value.startsWith("A"), Branched.as("a-branch"))
    .branch((key, value) -> value.startsWith("B"), Branched.as("b-branch"))
    .defaultBranch(Branched.as("other-branch"));

KTable

Changelog stream representing a table.

package org.apache.kafka.streams.kstream;

public interface KTable<K, V> {
    // Filtering
    KTable<K, V> filter(Predicate<? super K, ? super V> predicate);
    KTable<K, V> filter(Predicate<? super K, ? super V> predicate,
                       Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
    KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate);

    // Value transformation
    <VOut> KTable<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper);
    <VOut> KTable<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);

    // Conversion
    KStream<K, V> toStream();
    <KOut> KStream<KOut, V> toStream(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper);

    // Grouping
    <KOut, VOut> KGroupedTable<KOut, VOut> groupBy(KeyValueMapper<? super K, ? super V,
        KeyValue<KOut, VOut>> selector);

    // Joins
    <VRight, VOut> KTable<K, VOut> join(KTable<K, VRight> other,
                                         ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);
    <VRight, VOut> KTable<K, VOut> leftJoin(KTable<K, VRight> other,
                                             ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);
    <VRight, VOut> KTable<K, VOut> outerJoin(KTable<K, VRight> other,
                                              ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);

    // Suppression
    KTable<K, V> suppress(Suppressed<? super K> suppressed);

    // Metadata
    String queryableStoreName();
}

Usage Examples:

import org.apache.kafka.streams.kstream.*;

KTable<String, Long> counts = builder.table("counts-topic");

// Filter
KTable<String, Long> highCounts = counts.filter((key, value) -> value > 100);

// Map values
KTable<String, String> formatted = counts.mapValues(value -> "Count: " + value);

// Convert to stream
KStream<String, Long> countsStream = counts.toStream();

// Join tables
KTable<String, String> names = builder.table("names-topic");
KTable<String, String> joined = counts.join(names,
    (count, name) -> name + " has " + count + " items");

Suppressed

Controls how KTable updates are suppressed before being emitted downstream.

package org.apache.kafka.streams.kstream;

public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
    // Factory methods
    static Suppressed<Windowed<?>> untilWindowCloses(StrictBufferConfig bufferConfig);
    static <K> Suppressed<K> untilTimeLimit(Duration timeToWaitForMoreEvents,
                                             BufferConfig<?> bufferConfig);

    // Naming
    Suppressed<K> withName(String name);

    // Buffer configuration interfaces
    interface BufferConfig<BC extends BufferConfig<BC>> {
        // Factory methods for buffer configuration
        static EagerBufferConfig maxRecords(long recordLimit);
        static EagerBufferConfig maxBytes(long byteLimit);
        static StrictBufferConfig unbounded();

        // Configuration methods
        BC withMaxRecords(long recordLimit);
        BC withMaxBytes(long byteLimit);
        StrictBufferConfig withNoBound();
        StrictBufferConfig shutDownWhenFull();
        EagerBufferConfig emitEarlyWhenFull();
        BC withLoggingDisabled();
        BC withLoggingEnabled(Map<String, String> config);
    }

    // Buffer config types
    interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {
        // Strict buffer never emits early
    }

    interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {
        // Eager buffer may emit early when full
    }
}

Usage Examples:

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Map;

// Suppress window results until window closes
KTable<Windowed<String>, Long> windowedCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .suppress(Suppressed.untilWindowCloses(
        Suppressed.BufferConfig.unbounded()));

// Suppress with time limit and bounded buffer
KTable<String, Long> counts = source
    .groupByKey()
    .count()
    .suppress(Suppressed.untilTimeLimit(
        Duration.ofSeconds(10),
        Suppressed.BufferConfig.maxRecords(1000).emitEarlyWhenFull()));

// Suppress with byte-limited buffer
KTable<String, Long> suppressed = counts
    .suppress(Suppressed.untilTimeLimit(
        Duration.ofMinutes(1),
        Suppressed.BufferConfig.maxBytes(10_000_000L)
            .withLoggingEnabled(Map.of("segment.ms", "60000"))));

KGroupedStream

Grouped stream for aggregations.

package org.apache.kafka.streams.kstream;

public interface KGroupedStream<K, V> {
    // Count
    KTable<K, Long> count();
    KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

    // Reduce
    KTable<K, V> reduce(Reducer<V> reducer);
    KTable<K, V> reduce(Reducer<V> reducer,
                       Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    // Aggregate
    <VOut> KTable<K, VOut> aggregate(Initializer<VOut> initializer,
                                      Aggregator<? super K, ? super V, VOut> aggregator);
    <VOut> KTable<K, VOut> aggregate(Initializer<VOut> initializer,
                                      Aggregator<? super K, ? super V, VOut> aggregator,
                                      Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

    // Windowed operations
    TimeWindowedKStream<K, V> windowedBy(Windows<W> windows);
    TimeWindowedKStream<K, V> windowedBy(SlidingWindows windows);
    SessionWindowedKStream<K, V> windowedBy(SessionWindows windows);

    // Cogrouping
    <VOut> CogroupedKStream<K, VOut> cogroup(Aggregator<? super K, ? super V, VOut> aggregator);
}

Usage Examples:

import org.apache.kafka.streams.kstream.*;

KStream<String, String> source = builder.stream("input-topic");

// Count
KTable<String, Long> counts = source
    .groupByKey()
    .count();

// Reduce
KTable<String, String> concatenated = source
    .groupByKey()
    .reduce((value1, value2) -> value1 + ", " + value2);

// Aggregate
KTable<String, Long> sums = source
    .mapValues(Integer::parseInt)
    .groupByKey()
    .aggregate(
        () -> 0L, // initializer
        (key, value, aggregate) -> aggregate + value, // aggregator
        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("sums-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Long())
    );

// Windowed count
KTable<Windowed<String>, Long> windowedCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

Windowing

JoinWindows

Time-based windows for stream-stream joins.

package org.apache.kafka.streams.kstream;

public final class JoinWindows extends Windows<Window> {
    // Factory methods
    public static JoinWindows ofTimeDifferenceWithNoGrace(Duration timeDifference);
    public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference,
                                                       Duration afterWindowEnd);

    // Fluent configuration
    public JoinWindows before(Duration timeDifference);
    public JoinWindows after(Duration timeDifference);

    @Deprecated
    public JoinWindows grace(Duration afterWindowEnd);

    // Accessor methods
    public long size();
    public long gracePeriodMs();
}

Usage:

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");

// Join streams within 5-minute window
KStream<String, String> joined = stream1.join(stream2,
    (value1, value2) -> value1 + "+" + value2,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));

// Join with grace period for late-arriving data
KStream<String, String> graceful = stream1.join(stream2,
    (value1, value2) -> value1 + "+" + value2,
    JoinWindows.ofTimeDifferenceAndGrace(
        Duration.ofMinutes(5),
        Duration.ofMinutes(1)));

// Asymmetric window: 10 minutes before, 5 minutes after
KStream<String, String> asymmetric = stream1.join(stream2,
    (value1, value2) -> value1 + "+" + value2,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
        .before(Duration.ofMinutes(10))
        .after(Duration.ofMinutes(5)));

TimeWindows

Fixed-size time windows.

package org.apache.kafka.streams.kstream;

public final class TimeWindows extends Windows<TimeWindow> {
    // Factory methods
    public static TimeWindows ofSizeWithNoGrace(Duration size);
    public static TimeWindows ofSizeAndGrace(Duration size, Duration afterWindowEnd);

    // Configuration methods
    public TimeWindows advanceBy(Duration advance);

    // Accessor methods
    public long size();
    public long gracePeriodMs();
}

Usage:

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

// Tumbling window (5 minutes)
KTable<Windowed<String>, Long> tumblingCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Tumbling window with grace period
KTable<Windowed<String>, Long> gracefulCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(
        Duration.ofMinutes(5),
        Duration.ofMinutes(1)))
    .count();

// Hopping window (10-minute window, advancing every 5 minutes)
KTable<Windowed<String>, Long> hoppingCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10))
        .advanceBy(Duration.ofMinutes(5)))
    .count();

SessionWindows

Session windows based on inactivity gap.

package org.apache.kafka.streams.kstream;

public final class SessionWindows {
    // Factory methods
    public static SessionWindows ofInactivityGapWithNoGrace(Duration inactivityGap);
    public static SessionWindows ofInactivityGapAndGrace(Duration inactivityGap,
                                                         Duration afterWindowEnd);

    // Accessor methods
    public long inactivityGap();
    public long gracePeriodMs();
}

Usage:

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

// Session window with 5-minute inactivity gap
KTable<Windowed<String>, Long> sessions = source
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Session window with grace period
KTable<Windowed<String>, Long> gracefulSessions = source
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapAndGrace(
        Duration.ofMinutes(5),
        Duration.ofMinutes(1)))
    .count();

SlidingWindows

Sliding windows for aggregations based on time difference between records.

package org.apache.kafka.streams.kstream;

public final class SlidingWindows {
    // Factory methods
    public static SlidingWindows ofTimeDifferenceWithNoGrace(Duration timeDifference);
    public static SlidingWindows ofTimeDifferenceAndGrace(Duration timeDifference,
                                                          Duration afterWindowEnd);

    // Accessor methods
    public long timeDifferenceMs();
    public long gracePeriodMs();
}

Usage:

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

// Sliding window with 5-minute time difference
KTable<Windowed<String>, Long> slidingCounts = source
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Sliding window with grace period
KTable<Windowed<String>, Long> gracefulSliding = source
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(
        Duration.ofMinutes(5),
        Duration.ofMinutes(1)))
    .count();

Description:

Sliding windows are defined by a maximum time difference between records in the same window. A new window is created each time a record enters or leaves the sliding window. Unlike time windows which are aligned to the epoch, sliding windows are aligned to the actual record timestamps.

For example, with a time difference of 5 seconds and records at times 8s, 9.2s, and 12.4s:

  • Window [3s, 8s] contains record 1
  • Window [4.2s, 9.2s] contains records 1, 2
  • Window [7.4s, 12.4s] contains records 1, 2, 3
  • Window [8.001s, 13.001s] contains records 2, 3
  • Window [9.201s, 14.201s] contains record 3

UnlimitedWindows

Unlimited (landmark) windows for aggregations.

package org.apache.kafka.streams.kstream;

public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
    // Factory methods
    public static UnlimitedWindows of();

    // Configuration methods
    public UnlimitedWindows startOn(Instant start);

    // Accessor methods
    public long size();
    public long gracePeriodMs();
}

Usage:

import org.apache.kafka.streams.kstream.*;
import java.time.Instant;

// Unlimited window starting at timestamp zero
KTable<Windowed<String>, Long> unlimitedCounts = source
    .groupByKey()
    .windowedBy(UnlimitedWindows.of())
    .count();

// Unlimited window starting at specific timestamp
KTable<Windowed<String>, Long> customStart = source
    .groupByKey()
    .windowedBy(UnlimitedWindows.of()
        .startOn(Instant.parse("2024-01-01T00:00:00Z")))
    .count();

Description:

Unlimited windows (also called landmark windows) have a fixed starting point but no end time. They are effectively fixed-size windows with infinite window size. All records with timestamps greater than or equal to the start time are included in a single window.

Functional Interfaces

package org.apache.kafka.streams.kstream;

@FunctionalInterface
public interface Predicate<K, V> {
    boolean test(K key, V value);
}

@FunctionalInterface
public interface KeyValueMapper<K, V, R> {
    R apply(K key, V value);
}

@FunctionalInterface
public interface ValueMapper<V, R> {
    R apply(V value);
}

@FunctionalInterface
public interface ValueMapperWithKey<K, V, R> {
    R apply(K readOnlyKey, V value);
}

@FunctionalInterface
public interface Initializer<VA> {
    VA apply();
}

@FunctionalInterface
public interface Aggregator<K, V, VA> {
    VA apply(K key, V value, VA aggregate);
}

@FunctionalInterface
public interface Reducer<V> {
    V apply(V value1, V value2);
}

@FunctionalInterface
public interface ValueJoiner<V1, V2, R> {
    R apply(V1 value1, V2 value2);
}

@FunctionalInterface
public interface ValueJoinerWithKey<K, V1, V2, R> {
    R apply(K readOnlyKey, V1 value1, V2 value2);
}

@FunctionalInterface
public interface ForeachAction<K, V> {
    void apply(K key, V value);
}

Configuration Helpers

Consumed<K, V>

Consumption configuration.

package org.apache.kafka.streams.kstream;

public class Consumed<K, V> {
    // Factory methods
    public static <K, V> Consumed<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
    public static <K, V> Consumed<K, V> with(TimestampExtractor timestampExtractor);
    public static <K, V> Consumed<K, V> with(AutoOffsetReset offsetReset);

    // Fluent setters
    public Consumed<K, V> withName(String name);
    public Consumed<K, V> withOffsetResetPolicy(AutoOffsetReset offsetResetPolicy);
    public Consumed<K, V> withTimestampExtractor(TimestampExtractor timestampExtractor);
    public Consumed<K, V> withKeySerde(Serde<K> keySerde);
    public Consumed<K, V> withValueSerde(Serde<V> valueSerde);
}

Produced<K, V>

Production configuration.

package org.apache.kafka.streams.kstream;

public class Produced<K, V> {
    public static <K, V> Produced<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
    public static <K, V> Produced<K, V> with(Serde<K> keySerde, Serde<V> valueSerde,
                                             StreamPartitioner<? super K, ? super V> partitioner);

    public Produced<K, V> withName(String name);
    public Produced<K, V> withKeySerde(Serde<K> keySerde);
    public Produced<K, V> withValueSerde(Serde<V> valueSerde);
    public Produced<K, V> withStreamPartitioner(StreamPartitioner<? super K, ? super V> partitioner);
}

Materialized<K, V, S>

Materialization configuration for state stores.

package org.apache.kafka.streams.kstream;

public class Materialized<K, V, S extends StateStore> {
    public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(String storeName);
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreSupplier<S> storeSupplier);
    public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> with(Serde<K> keySerde,
                                                                                 Serde<V> valueSerde);

    public Materialized<K, V, S> withKeySerde(Serde<K> keySerde);
    public Materialized<K, V, S> withValueSerde(Serde<V> valueSerde);
    public Materialized<K, V, S> withRetention(Duration retention);
    public Materialized<K, V, S> withLoggingEnabled(Map<String, String> config);
    public Materialized<K, V, S> withLoggingDisabled();
    public Materialized<K, V, S> withCachingEnabled();
    public Materialized<K, V, S> withCachingDisabled();
}

Grouped<K, V>

Grouping configuration.

package org.apache.kafka.streams.kstream;

public class Grouped<K, V> {
    public static <K, V> Grouped<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
    public static <K, V> Grouped<K, V> with(String name);
    public static <K, V> Grouped<K, V> as(String name);

    public Grouped<K, V> withName(String name);
    public Grouped<K, V> withKeySerde(Serde<K> keySerde);
    public Grouped<K, V> withValueSerde(Serde<V> valueSerde);
}

Complete Example

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.*;

public class WordCountExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // Input stream
        KStream<String, String> textLines = builder.stream("text-input");

        // Word count pipeline
        KTable<String, Long> wordCounts = textLines
            // Split into words
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            // Filter empty words
            .filter((key, word) -> !word.isEmpty())
            // Group by word
            .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
            // Count occurrences
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

        // Output to topic
        wordCounts.toStream().to("word-counts", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Troubleshooting DSL Operations

Common Issues

Issue: Unnecessary Repartitioning

Symptoms:

  • Slow processing
  • High network usage
  • Many repartition topics created
  • High latency

Causes:

  • Using map() instead of mapValues()
  • Using selectKey() unnecessarily
  • Grouping after operations that change keys

Solutions:

import org.apache.kafka.streams.kstream.*;

// INEFFICIENT: Causes repartition
KStream<String, String> source = builder.stream("input");
KStream<String, String> result = source
    .map((key, value) -> KeyValue.pair(key, value.toUpperCase())) // Repartitions!
    .filter((key, value) -> value.length() > 5);

// EFFICIENT: No repartition
KStream<String, String> source = builder.stream("input");
KStream<String, String> result = source
    .mapValues(value -> value.toUpperCase()) // No repartition
    .filter((key, value) -> value.length() > 5);

// When repartitioning is necessary:
KStream<String, String> rekeyed = source
    .selectKey((key, value) -> value.substring(0, 1)) // Changes key
    .repartition(Repartitioned.<String, String>as("rekeyed")
        .withNumberOfPartitions(10)); // Explicit control

Prevention:

  • Use mapValues() when key doesn't change
  • Use filter() instead of map() for filtering
  • Minimize key changes
  • Use topology.describe() to identify repartitions

Issue: Join Not Producing Results

Symptoms:

  • Join operation produces no output
  • Expected matches not found
  • Empty result stream

Causes:

  • Keys don't match (different types or values)
  • Time windows don't overlap
  • Records outside grace period
  • Incorrect serdes

Solutions:

// Debug join issues

// 1. Verify keys match
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");

// Add logging to verify keys
stream1.peek((key, value) -> 
    System.out.println("Stream1 key: " + key + " (type: " + 
        (key != null ? key.getClass() : "null") + ")"));

stream2.peek((key, value) -> 
    System.out.println("Stream2 key: " + key + " (type: " + 
        (key != null ? key.getClass() : "null") + ")"));

// 2. Check window configuration
JoinWindows windows = JoinWindows
    .ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5));

// Records must arrive within 5 minutes of each other
// Increase window if needed
JoinWindows largerWindow = JoinWindows
    .ofTimeDifferenceAndGrace(
        Duration.ofMinutes(10),  // Time difference
        Duration.ofMinutes(2)    // Grace period for late records
    );

// 3. Verify both streams have data
stream1.foreach((key, value) -> 
    System.out.println("Stream1 record: " + key + " = " + value));
stream2.foreach((key, value) -> 
    System.out.println("Stream2 record: " + key + " = " + value));

// 4. Check join result
KStream<String, String> joined = stream1.join(
    stream2,
    (value1, value2) -> {
        String result = value1 + ":" + value2;
        System.out.println("Join produced: " + result);
        return result;
    },
    windows
);

Prevention:

  • Ensure consistent key types
  • Use appropriate window sizes
  • Add grace periods for late data
  • Log intermediate results

Edge Cases

Null Values in Aggregations

// Null values in aggregations represent deletions
KStream<String, String> source = builder.stream("input");

KTable<String, Long> counts = source
    .groupByKey()
    .aggregate(
        () -> 0L, // Initial value
        (key, value, aggregate) -> {
            if (value == null) {
                // Null value - should we decrement or reset?
                System.out.println("Received null value for key: " + key);
                return 0L; // Reset count
            }
            return aggregate + 1;
        },
        Materialized.with(Serdes.String(), Serdes.Long())
    );

// Null values in source topic:
// - Treated as tombstones in compacted topics
// - Passed to aggregation function
// - Can be used to reset aggregated state

Late Arriving Records

// Records arriving after window close
KStream<String, String> source = builder.stream("input");

// Without grace period - late records dropped
TimeWindows windowsNoGrace = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));

// With grace period - late records accepted
TimeWindows windowsWithGrace = TimeWindows.ofSizeAndGrace(
    Duration.ofMinutes(5),  // Window size
    Duration.ofMinutes(1)   // Grace period
);

KTable<Windowed<String>, Long> counts = source
    .groupByKey()
    .windowedBy(windowsWithGrace)
    .count();

// Records arriving within grace period:
// - Included in window
// - May update already-emitted results
// - Downstream processors see updates

// Records arriving after grace period:
// - Dropped
// - Logged as late records
// - Check metrics: dropped-records-rate

Empty Windows

// Windows with no records
KStream<String, String> source = builder.stream("input");

KTable<Windowed<String>, Long> windowedCounts = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Empty windows:
// - Not materialized in state store
// - No output produced
// - Cannot distinguish between empty and non-existent

// To emit zeros for empty windows, use custom aggregation:
KTable<Windowed<String>, Long> countsWithZeros = source
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        () -> 0L,
        (key, value, aggregate) -> aggregate + 1,
        Materialized.with(Serdes.String(), Serdes.Long())
    );

// Still won't emit for windows with no records
// For guaranteed output, use punctuation:
source.groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        () -> 0L,
        (key, value, aggregate) -> aggregate + 1,
        Materialized.with(Serdes.String(), Serdes.Long())
    )
    .toStream()
    .process(() -> new Processor<Windowed<String>, Long, String, Long>() {
        private ProcessorContext<String, Long> context;
        
        @Override
        public void init(ProcessorContext<String, Long> context) {
            this.context = context;
            
            // Schedule periodic emission
            context.schedule(
                Duration.ofMinutes(5),
                PunctuationType.WALL_CLOCK_TIME,
                timestamp -> {
                    // Emit zero counts for inactive keys
                    emitZeroCountsForInactiveKeys();
                }
            );
        }
        
        @Override
        public void process(Record<Windowed<String>, Long> record) {
            context.forward(new Record<>(
                record.key().key(),
                record.value(),
                record.timestamp()
            ));
        }
        
        private void emitZeroCountsForInactiveKeys() {
            // Implementation to track and emit zeros
        }
    });

Performance Optimization

Operation Selection Guide

// Choose operations based on performance characteristics

// FAST operations (no repartition, no state):
// - filter(), filterNot()
// - mapValues(), flatMapValues()
// - peek(), foreach()
// - branch(), merge()

// MEDIUM operations (may require repartition):
// - map(), flatMap() - if key changes
// - selectKey() - always repartitions
// - groupBy() - repartitions

// EXPENSIVE operations (require state):
// - join() - requires state for buffering
// - aggregate(), reduce(), count() - requires state store
// - windowed operations - requires windowed state

// Optimize by ordering operations:
KStream<String, String> source = builder.stream("input");

// INEFFICIENT: Filter after expensive join
KStream<String, String> result1 = source
    .join(otherStream, joiner, windows) // Expensive
    .filter((key, value) -> value.length() > 5); // Should be earlier

// EFFICIENT: Filter before expensive join
KStream<String, String> result2 = source
    .filter((key, value) -> value.length() > 5) // Cheap, reduces join input
    .join(otherStream, joiner, windows); // Expensive, but less data

Choosing Window Type

// Window type selection guide

// Tumbling Windows (non-overlapping, fixed size)
TimeWindows tumblingWindows = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
// Use when: Fixed time buckets, no overlap needed
// Example: Hourly aggregations, daily summaries

// Hopping Windows (overlapping, fixed size and advance)
TimeWindows hoppingWindows = TimeWindows.ofSizeAndGrace(
    Duration.ofMinutes(10),  // Window size
    Duration.ofMinutes(1)    // Grace period
).advanceBy(Duration.ofMinutes(5)); // Advance interval
// Use when: Overlapping time periods needed
// Example: Moving averages, sliding aggregations

// Session Windows (variable size, gap-based)
SessionWindows sessionWindows = SessionWindows.ofInactivityGapWithNoGrace(
    Duration.ofMinutes(5)); // Inactivity gap
// Use when: Activity-based grouping
// Example: User sessions, burst detection

// Sliding Windows (continuous, for joins)
SlidingWindows slidingWindows = SlidingWindows.ofTimeDifferenceWithNoGrace(
    Duration.ofMinutes(5)); // Time difference
// Use when: Stream-stream joins with time correlation
// Example: Correlating events within time window

// Performance characteristics:
// Tumbling: Fastest (non-overlapping)
// Hopping: Medium (overlapping creates multiple windows per record)
// Session: Variable (depends on activity patterns)
// Sliding: Expensive (many window combinations for joins)

Advanced Streams DSL Patterns

Pattern: Multi-Stage Enrichment Pipeline

Enrich stream with data from multiple KTables:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;

public class MultiStageEnrichment {
    
    public static Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream
        KStream<String, String> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), Serdes.String()));
        
        // Reference tables
        KTable<String, String> customers = builder.table("customers",
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("customers-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.String()));
        
        KTable<String, String> products = builder.table("products",
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("products-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.String()));
        
        KTable<String, String> inventory = builder.table("inventory",
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("inventory-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.String()));
        
        // Multi-stage enrichment
        orders
            // Stage 1: Extract customer ID and join with customer data
            .selectKey((key, value) -> extractCustomerId(value))
            .join(customers, (order, customer) -> 
                enrichWithCustomer(order, customer))
            
            // Stage 2: Extract product ID and join with product data
            .selectKey((key, enriched) -> extractProductId(enriched))
            .join(products, (enriched, product) -> 
                enrichWithProduct(enriched, product))
            
            // Stage 3: Join with inventory for availability
            .leftJoin(inventory, (enriched, stock) -> 
                enrichWithInventory(enriched, stock))
            
            // Stage 4: Filter and validate
            .filter((key, enriched) -> isValid(enriched))
            
            // Output enriched orders
            .to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
        
        return builder.build();
    }
    
    private static String extractCustomerId(String order) {
        // Parse order JSON and extract customer_id
        return "customer-123";  // Simplified
    }
    
    private static String extractProductId(String enriched) {
        // Parse enriched data and extract product_id
        return "product-456";  // Simplified
    }
    
    private static String enrichWithCustomer(String order, String customer) {
        return order + ",customer:" + customer;
    }
    
    private static String enrichWithProduct(String enriched, String product) {
        return enriched + ",product:" + product;
    }
    
    private static String enrichWithInventory(String enriched, String stock) {
        String availability = (stock != null && Integer.parseInt(stock) > 0) ? 
            "available" : "out-of-stock";
        return enriched + ",availability:" + availability;
    }
    
    private static boolean isValid(String enriched) {
        return !enriched.contains("out-of-stock");
    }
}

Pattern: Complex Event Processing (CEP)

Detect patterns across multiple events:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;

public class ComplexEventProcessing {
    
    public static Topology buildFraudDetectionTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> transactions = builder.stream("transactions");
        
        // Pattern 1: Multiple transactions from same user in short time
        KTable<String, Long> transactionCounts = transactions
            .selectKey((key, value) -> extractUserId(value))
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.as("transaction-counts"));
        
        // Pattern 2: High-value transactions
        KStream<String, String> highValueTransactions = transactions
            .filter((key, value) -> 
                Double.parseDouble(extractAmount(value)) > 10000);
        
        // Pattern 3: Geographically suspicious transactions
        KStream<String, String> suspiciousLocations = transactions
            .filter((key, value) -> 
                isDistantFromPreviousLocation(value));
        
        // Combine patterns
        KStream<String, String> potentialFraud = highValueTransactions
            .merge(suspiciousLocations)
            .selectKey((key, value) -> extractUserId(value))
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
            .reduce((v1, v2) -> v1 + "," + v2)
            .toStream()
            .filter((windowedKey, value) -> 
                value.split(",").length >= 2)  // At least 2 suspicious events
            .selectKey((windowedKey, value) -> windowedKey.key());
        
        potentialFraud.to("fraud-alerts");
        
        return builder.build();
    }
    
    private static String extractUserId(String transaction) {
        return "user-123";  // Simplified
    }
    
    private static String extractAmount(String transaction) {
        return "15000";  // Simplified
    }
    
    private static boolean isDistantFromPreviousLocation(String transaction) {
        return false;  // Simplified - check against user's location history
    }
}

Pattern: Streaming Aggregation with Multiple Time Windows

Compute metrics across different time windows:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;

public class MultiWindowAggregation {
    
    public static Topology buildMetricsTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, Long> events = builder.stream("events",
            Consumed.with(Serdes.String(), Serdes.Long()));
        
        KGroupedStream<String, Long> grouped = events.groupByKey();
        
        // 1-minute window aggregation
        KTable<Windowed<String>, Long> oneMinuteCounts = grouped
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as("one-minute-counts"));
        
        oneMinuteCounts.toStream()
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key() + "@1m",
                String.format("{\"window\":\"1m\",\"key\":\"%s\",\"count\":%d," +
                    "\"start\":%d,\"end\":%d}",
                    windowedKey.key(), count,
                    windowedKey.window().start(),
                    windowedKey.window().end())))
            .to("metrics-1m");
        
        // 5-minute window aggregation
        KTable<Windowed<String>, Long> fiveMinuteCounts = grouped
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.as("five-minute-counts"));
        
        fiveMinuteCounts.toStream()
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key() + "@5m",
                String.format("{\"window\":\"5m\",\"key\":\"%s\",\"count\":%d}",
                    windowedKey.key(), count)))
            .to("metrics-5m");
        
        // 1-hour window aggregation
        KTable<Windowed<String>, Long> oneHourCounts = grouped
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .count(Materialized.as("one-hour-counts"));
        
        oneHourCounts.toStream()
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key() + "@1h",
                String.format("{\"window\":\"1h\",\"key\":\"%s\",\"count\":%d}",
                    windowedKey.key(), count)))
            .to("metrics-1h");
        
        return builder.build();
    }
}

Pattern: Dynamic Routing Based on Content

Route records to different topics based on content:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;

public class DynamicRouting {
    
    public static Topology buildRoutingTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, String> events = builder.stream("input-events");
        
        // Branch into multiple streams based on content
        Map<String, KStream<String, String>> branches = events
            .split(Named.as("branch-"))
            .branch((key, value) -> value.contains("CRITICAL"), 
                Branched.as("critical"))
            .branch((key, value) -> value.contains("ERROR"), 
                Branched.as("error"))
            .branch((key, value) -> value.contains("WARNING"), 
                Branched.as("warning"))
            .defaultBranch(Branched.as("info"))
            .asMap();
        
        // Route each branch to appropriate topic
        branches.get("branch-critical").to("critical-events");
        branches.get("branch-error").to("error-events");
        branches.get("branch-warning").to("warning-events");
        branches.get("branch-info").to("info-events");
        
        // Alternative: Dynamic topic selection
        events.to((key, value, recordContext) -> {
            if (value.contains("CRITICAL")) return "critical-events";
            if (value.contains("ERROR")) return "error-events";
            if (value.contains("WARNING")) return "warning-events";
            return "info-events";
        });
        
        return builder.build();
    }
}

Pattern: Stateful Deduplication

Deduplicate records using state store:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;

public class StatefulDeduplication {
    
    public static Topology buildDeduplicationTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Create state store for tracking seen records
        StoreBuilder<WindowStore<String, String>> deduplicationStoreBuilder =
            Stores.windowStoreBuilder(
                Stores.persistentWindowStore("deduplication-store",
                    Duration.ofHours(24),  // Retention period
                    Duration.ofMinutes(1),  // Window size
                    false),                 // Not retain duplicates
                Serdes.String(),
                Serdes.String());
        
        builder.addStateStore(deduplicationStoreBuilder);
        
        KStream<String, String> events = builder.stream("input-events");
        
        // Deduplicate using transformValues with state store
        KStream<String, String> deduplicated = events
            .transformValues(() -> new ValueTransformerWithKey<String, String, String>() {
                private WindowStore<String, String> stateStore;
                
                @Override
                public void init(ProcessorContext context) {
                    this.stateStore = context.getStateStore("deduplication-store");
                }
                
                @Override
                public String transform(String key, String value) {
                    long timestamp = System.currentTimeMillis();
                    
                    // Check if we've seen this record recently
                    String deduplicationKey = key + ":" + value.hashCode();
                    
                    try (WindowStoreIterator<String> iterator = 
                            stateStore.fetch(deduplicationKey, 
                                timestamp - Duration.ofHours(1).toMillis(), 
                                timestamp)) {
                        
                        if (iterator.hasNext()) {
                            // Duplicate found
                            return null;  // Filter out
                        }
                    }
                    
                    // Not a duplicate - store and pass through
                    stateStore.put(deduplicationKey, value, timestamp);
                    return value;
                }
                
                @Override
                public void close() {}
            }, "deduplication-store")
            .filter((key, value) -> value != null);  // Remove nulls (duplicates)
        
        deduplicated.to("deduplicated-events");
        
        return builder.build();
    }
}

Pattern: Real-Time Anomaly Detection

Detect anomalies using sliding window statistics:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;

public class AnomalyDetection {
    
    public static Topology buildAnomalyDetectionTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream<String, Double> metrics = builder.stream("metrics",
            Consumed.with(Serdes.String(), Serdes.Double()));
        
        // Calculate rolling statistics
        KTable<Windowed<String>, AggregatedStats> statistics = metrics
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
            .aggregate(
                AggregatedStats::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                },
                Materialized.<String, AggregatedStats, WindowStore<Bytes, byte[]>>as("statistics-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new AggregatedStatsSerde())
            );
        
        // Join metrics with statistics to detect anomalies
        KStream<String, String> anomalies = metrics
            .join(statistics.toStream(),
                (value, stats) -> {
                    double mean = stats.getMean();
                    double stdDev = stats.getStdDev();
                    double threshold = 3.0;  // 3 sigma rule
                    
                    if (Math.abs(value - mean) > threshold * stdDev) {
                        return String.format(
                            "{\"value\":%.2f,\"mean\":%.2f,\"stddev\":%.2f," +
                            "\"deviation\":%.2f,\"threshold\":%.2f}",
                            value, mean, stdDev, 
                            Math.abs(value - mean) / stdDev, threshold);
                    }
                    return null;
                },
                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)),
                StreamJoined.with(Serdes.String(), Serdes.Double(), new AggregatedStatsSerde()))
            .filter((key, value) -> value != null);
        
        anomalies.to("anomalies");
        
        return builder.build();
    }
    
    // Helper class for statistics aggregation
    public static class AggregatedStats {
        private long count = 0;
        private double sum = 0;
        private double sumOfSquares = 0;
        
        public void add(double value) {
            count++;
            sum += value;
            sumOfSquares += value * value;
        }
        
        public double getMean() {
            return count > 0 ? sum / count : 0;
        }
        
        public double getStdDev() {
            if (count <= 1) return 0;
            double mean = getMean();
            double variance = (sumOfSquares / count) - (mean * mean);
            return Math.sqrt(variance);
        }
    }
    
    // Serde for AggregatedStats (simplified - implement properly)
    public static class AggregatedStatsSerde implements Serde<AggregatedStats> {
        @Override
        public Serializer<AggregatedStats> serializer() {
            return (topic, data) -> null;  // Implement serialization
        }
        
        @Override
        public Deserializer<AggregatedStats> deserializer() {
            return (topic, data) -> null;  // Implement deserialization
        }
    }
}

Streams Testing Best Practices

Complete testing example with TopologyTestDriver:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.test.*;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;

public class StreamsTopologyTest {
    
    private TopologyTestDriver testDriver;
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    
    @BeforeEach
    public void setup() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Build topology
        KStream<String, String> input = builder.stream("input");
        input
            .filter((key, value) -> value != null)
            .mapValues(String::toUpperCase)
            .to("output");
        
        // Create test driver
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
            Serdes.String().getClass());
        
        testDriver = new TopologyTestDriver(builder.build(), props);
        
        // Create test topics
        inputTopic = testDriver.createInputTopic("input",
            Serdes.String().serializer(),
            Serdes.String().serializer());
        outputTopic = testDriver.createOutputTopic("output",
            Serdes.String().deserializer(),
            Serdes.String().deserializer());
    }
    
    @AfterEach
    public void tearDown() {
        testDriver.close();
    }
    
    @Test
    public void shouldTransformToUpperCase() {
        // Given
        inputTopic.pipeInput("key1", "hello");
        inputTopic.pipeInput("key2", "world");
        
        // When
        var outputs = outputTopic.readKeyValuesToList();
        
        // Then
        assertEquals(2, outputs.size());
        assertEquals("HELLO", outputs.get(0).value);
        assertEquals("WORLD", outputs.get(1).value);
    }
    
    @Test
    public void shouldFilterNullValues() {
        // Given
        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key2", null);
        inputTopic.pipeInput("key3", "value3");
        
        // When
        var outputs = outputTopic.readKeyValuesToList();
        
        // Then
        assertEquals(2, outputs.size());
        assertEquals("VALUE1", outputs.get(0).value);
        assertEquals("VALUE3", outputs.get(1).value);
    }
    
    @Test
    public void shouldHandleEmptyInput() {
        // No input
        
        // When
        assertTrue(outputTopic.isEmpty());
    }
}