tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The high-level Streams DSL provides functional-style operations for stream processing.
Stream of records with distinct key-value pairs.
package org.apache.kafka.streams.kstream;
public interface KStream<K, V> {
// Filtering
KStream<K, V> filter(Predicate<? super K, ? super V> predicate);
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named);
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named);
// Key selection
<KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper);
<KOut> KStream<KOut, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper,
Named named);
// Value transformation
<VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper);
<VOut> KStream<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);
<VOut> KStream<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper, Named named);
// Key-value transformation
<KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V,
? extends KeyValue<? extends KOut, ? extends VOut>> mapper);
<KOut, VOut> KStream<KOut, VOut> map(KeyValueMapper<? super K, ? super V,
? extends KeyValue<? extends KOut, ? extends VOut>> mapper, Named named);
// FlatMap
<KOut, VOut> KStream<KOut, VOut> flatMap(KeyValueMapper<? super K, ? super V,
? extends Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper);
<VOut> KStream<K, VOut> flatMapValues(ValueMapper<? super V,
? extends Iterable<? extends VOut>> mapper);
<VOut> KStream<K, VOut> flatMapValues(ValueMapperWithKey<? super K, ? super V,
? extends Iterable<? extends VOut>> mapper);
// Side effects
void print(Printed<K, V> printed);
void foreach(ForeachAction<? super K, ? super V> action);
void foreach(ForeachAction<? super K, ? super V> action, Named named);
KStream<K, V> peek(ForeachAction<? super K, ? super V> action);
KStream<K, V> peek(ForeachAction<? super K, ? super V> action, Named named);
// Branching and merging
BranchedKStream<K, V> split();
BranchedKStream<K, V> split(Named named);
KStream<K, V> merge(KStream<K, V> stream);
KStream<K, V> merge(KStream<K, V> stream, Named named);
// Repartitioning
KStream<K, V> repartition();
KStream<K, V> repartition(Repartitioned<K, V> repartitioned);
// Output
void to(String topic);
void to(String topic, Produced<K, V> produced);
void to(TopicNameExtractor<K, V> topicExtractor);
void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced);
// Conversion
KTable<K, V> toTable();
KTable<K, V> toTable(Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
// Grouping
KGroupedStream<K, V> groupByKey();
KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped);
<KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> selector);
<KOut> KGroupedStream<KOut, V> groupBy(KeyValueMapper<? super K, ? super V, KOut> selector,
Grouped<KOut, V> grouped);
// Stream-Stream joins
<VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
JoinWindows windows);
<VRight, VOut> KStream<K, VOut> join(KStream<K, VRight> otherStream,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
JoinWindows windows,
StreamJoined<K, V, VRight> streamJoined);
<VRight, VOut> KStream<K, VOut> leftJoin(KStream<K, VRight> otherStream,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
JoinWindows windows);
<VRight, VOut> KStream<K, VOut> outerJoin(KStream<K, VRight> otherStream,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
JoinWindows windows);
// Stream-Table joins
<VTable, VOut> KStream<K, VOut> join(KTable<K, VTable> table,
ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner);
<VTable, VOut> KStream<K, VOut> leftJoin(KTable<K, VTable> table,
ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner);
// Stream-GlobalKTable joins
<GK, GV, VOut> KStream<K, VOut> join(GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoiner<? super V, ? super GV, ? extends VOut> joiner);
<GK, GV, VOut> KStream<K, VOut> leftJoin(GlobalKTable<GK, GV> globalTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
ValueJoiner<? super V, ? super GV, ? extends VOut> joiner);
// Processor API integration
<KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V,
? extends KOut, ? extends VOut> processorSupplier, String... stateStoreNames);
<VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V,
? extends VOut> processorSupplier, String... stateStoreNames);
}Usage Examples:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// Filter
KStream<String, String> filtered = source.filter((key, value) -> value != null && value.length() > 5);
// Map values
KStream<String, String> uppercased = source.mapValues(value -> value.toUpperCase());
// Map with key
KStream<String, Integer> lengths = source.mapValues((key, value) -> value.length());
// FlatMap
KStream<String, String> words = source.flatMapValues(value ->
Arrays.asList(value.toLowerCase().split("\\s+")));
// Select key
KStream<String, String> rekeyed = source.selectKey((key, value) -> value.substring(0, 1));
// Peek (for debugging)
source.peek((key, value) -> System.out.println("Processing: " + key + " -> " + value))
.mapValues(String::toUpperCase)
.to("output-topic");
// Branch
BranchedKStream<String, String> branched = source.split()
.branch((key, value) -> value.startsWith("A"), Branched.as("a-branch"))
.branch((key, value) -> value.startsWith("B"), Branched.as("b-branch"))
.defaultBranch(Branched.as("other-branch"));Changelog stream representing a table.
package org.apache.kafka.streams.kstream;
public interface KTable<K, V> {
// Filtering
KTable<K, V> filter(Predicate<? super K, ? super V> predicate);
KTable<K, V> filter(Predicate<? super K, ? super V> predicate,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate);
// Value transformation
<VOut> KTable<K, VOut> mapValues(ValueMapper<? super V, ? extends VOut> mapper);
<VOut> KTable<K, VOut> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);
// Conversion
KStream<K, V> toStream();
<KOut> KStream<KOut, V> toStream(KeyValueMapper<? super K, ? super V, ? extends KOut> mapper);
// Grouping
<KOut, VOut> KGroupedTable<KOut, VOut> groupBy(KeyValueMapper<? super K, ? super V,
KeyValue<KOut, VOut>> selector);
// Joins
<VRight, VOut> KTable<K, VOut> join(KTable<K, VRight> other,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);
<VRight, VOut> KTable<K, VOut> leftJoin(KTable<K, VRight> other,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);
<VRight, VOut> KTable<K, VOut> outerJoin(KTable<K, VRight> other,
ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner);
// Suppression
KTable<K, V> suppress(Suppressed<? super K> suppressed);
// Metadata
String queryableStoreName();
}Usage Examples:
import org.apache.kafka.streams.kstream.*;
KTable<String, Long> counts = builder.table("counts-topic");
// Filter
KTable<String, Long> highCounts = counts.filter((key, value) -> value > 100);
// Map values
KTable<String, String> formatted = counts.mapValues(value -> "Count: " + value);
// Convert to stream
KStream<String, Long> countsStream = counts.toStream();
// Join tables
KTable<String, String> names = builder.table("names-topic");
KTable<String, String> joined = counts.join(names,
(count, name) -> name + " has " + count + " items");Controls how KTable updates are suppressed before being emitted downstream.
package org.apache.kafka.streams.kstream;
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
// Factory methods
static Suppressed<Windowed<?>> untilWindowCloses(StrictBufferConfig bufferConfig);
static <K> Suppressed<K> untilTimeLimit(Duration timeToWaitForMoreEvents,
BufferConfig<?> bufferConfig);
// Naming
Suppressed<K> withName(String name);
// Buffer configuration interfaces
interface BufferConfig<BC extends BufferConfig<BC>> {
// Factory methods for buffer configuration
static EagerBufferConfig maxRecords(long recordLimit);
static EagerBufferConfig maxBytes(long byteLimit);
static StrictBufferConfig unbounded();
// Configuration methods
BC withMaxRecords(long recordLimit);
BC withMaxBytes(long byteLimit);
StrictBufferConfig withNoBound();
StrictBufferConfig shutDownWhenFull();
EagerBufferConfig emitEarlyWhenFull();
BC withLoggingDisabled();
BC withLoggingEnabled(Map<String, String> config);
}
// Buffer config types
interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {
// Strict buffer never emits early
}
interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {
// Eager buffer may emit early when full
}
}Usage Examples:
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Map;
// Suppress window results until window closes
KTable<Windowed<String>, Long> windowedCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()));
// Suppress with time limit and bounded buffer
KTable<String, Long> counts = source
.groupByKey()
.count()
.suppress(Suppressed.untilTimeLimit(
Duration.ofSeconds(10),
Suppressed.BufferConfig.maxRecords(1000).emitEarlyWhenFull()));
// Suppress with byte-limited buffer
KTable<String, Long> suppressed = counts
.suppress(Suppressed.untilTimeLimit(
Duration.ofMinutes(1),
Suppressed.BufferConfig.maxBytes(10_000_000L)
.withLoggingEnabled(Map.of("segment.ms", "60000"))));Grouped stream for aggregations.
package org.apache.kafka.streams.kstream;
public interface KGroupedStream<K, V> {
// Count
KTable<K, Long> count();
KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
// Reduce
KTable<K, V> reduce(Reducer<V> reducer);
KTable<K, V> reduce(Reducer<V> reducer,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
// Aggregate
<VOut> KTable<K, VOut> aggregate(Initializer<VOut> initializer,
Aggregator<? super K, ? super V, VOut> aggregator);
<VOut> KTable<K, VOut> aggregate(Initializer<VOut> initializer,
Aggregator<? super K, ? super V, VOut> aggregator,
Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
// Windowed operations
TimeWindowedKStream<K, V> windowedBy(Windows<W> windows);
TimeWindowedKStream<K, V> windowedBy(SlidingWindows windows);
SessionWindowedKStream<K, V> windowedBy(SessionWindows windows);
// Cogrouping
<VOut> CogroupedKStream<K, VOut> cogroup(Aggregator<? super K, ? super V, VOut> aggregator);
}Usage Examples:
import org.apache.kafka.streams.kstream.*;
KStream<String, String> source = builder.stream("input-topic");
// Count
KTable<String, Long> counts = source
.groupByKey()
.count();
// Reduce
KTable<String, String> concatenated = source
.groupByKey()
.reduce((value1, value2) -> value1 + ", " + value2);
// Aggregate
KTable<String, Long> sums = source
.mapValues(Integer::parseInt)
.groupByKey()
.aggregate(
() -> 0L, // initializer
(key, value, aggregate) -> aggregate + value, // aggregator
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("sums-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
);
// Windowed count
KTable<Windowed<String>, Long> windowedCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();Time-based windows for stream-stream joins.
package org.apache.kafka.streams.kstream;
public final class JoinWindows extends Windows<Window> {
// Factory methods
public static JoinWindows ofTimeDifferenceWithNoGrace(Duration timeDifference);
public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference,
Duration afterWindowEnd);
// Fluent configuration
public JoinWindows before(Duration timeDifference);
public JoinWindows after(Duration timeDifference);
@Deprecated
public JoinWindows grace(Duration afterWindowEnd);
// Accessor methods
public long size();
public long gracePeriodMs();
}Usage:
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
// Join streams within 5-minute window
KStream<String, String> joined = stream1.join(stream2,
(value1, value2) -> value1 + "+" + value2,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));
// Join with grace period for late-arriving data
KStream<String, String> graceful = stream1.join(stream2,
(value1, value2) -> value1 + "+" + value2,
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5),
Duration.ofMinutes(1)));
// Asymmetric window: 10 minutes before, 5 minutes after
KStream<String, String> asymmetric = stream1.join(stream2,
(value1, value2) -> value1 + "+" + value2,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
.before(Duration.ofMinutes(10))
.after(Duration.ofMinutes(5)));Fixed-size time windows.
package org.apache.kafka.streams.kstream;
public final class TimeWindows extends Windows<TimeWindow> {
// Factory methods
public static TimeWindows ofSizeWithNoGrace(Duration size);
public static TimeWindows ofSizeAndGrace(Duration size, Duration afterWindowEnd);
// Configuration methods
public TimeWindows advanceBy(Duration advance);
// Accessor methods
public long size();
public long gracePeriodMs();
}Usage:
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
// Tumbling window (5 minutes)
KTable<Windowed<String>, Long> tumblingCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Tumbling window with grace period
KTable<Windowed<String>, Long> gracefulCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5),
Duration.ofMinutes(1)))
.count();
// Hopping window (10-minute window, advancing every 5 minutes)
KTable<Windowed<String>, Long> hoppingCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(5)))
.count();Session windows based on inactivity gap.
package org.apache.kafka.streams.kstream;
public final class SessionWindows {
// Factory methods
public static SessionWindows ofInactivityGapWithNoGrace(Duration inactivityGap);
public static SessionWindows ofInactivityGapAndGrace(Duration inactivityGap,
Duration afterWindowEnd);
// Accessor methods
public long inactivityGap();
public long gracePeriodMs();
}Usage:
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
// Session window with 5-minute inactivity gap
KTable<Windowed<String>, Long> sessions = source
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)))
.count();
// Session window with grace period
KTable<Windowed<String>, Long> gracefulSessions = source
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(5),
Duration.ofMinutes(1)))
.count();Sliding windows for aggregations based on time difference between records.
package org.apache.kafka.streams.kstream;
public final class SlidingWindows {
// Factory methods
public static SlidingWindows ofTimeDifferenceWithNoGrace(Duration timeDifference);
public static SlidingWindows ofTimeDifferenceAndGrace(Duration timeDifference,
Duration afterWindowEnd);
// Accessor methods
public long timeDifferenceMs();
public long gracePeriodMs();
}Usage:
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
// Sliding window with 5-minute time difference
KTable<Windowed<String>, Long> slidingCounts = source
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
// Sliding window with grace period
KTable<Windowed<String>, Long> gracefulSliding = source
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5),
Duration.ofMinutes(1)))
.count();Description:
Sliding windows are defined by a maximum time difference between records in the same window. A new window is created each time a record enters or leaves the sliding window. Unlike time windows which are aligned to the epoch, sliding windows are aligned to the actual record timestamps.
For example, with a time difference of 5 seconds and records at times 8s, 9.2s, and 12.4s:
Unlimited (landmark) windows for aggregations.
package org.apache.kafka.streams.kstream;
public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
// Factory methods
public static UnlimitedWindows of();
// Configuration methods
public UnlimitedWindows startOn(Instant start);
// Accessor methods
public long size();
public long gracePeriodMs();
}Usage:
import org.apache.kafka.streams.kstream.*;
import java.time.Instant;
// Unlimited window starting at timestamp zero
KTable<Windowed<String>, Long> unlimitedCounts = source
.groupByKey()
.windowedBy(UnlimitedWindows.of())
.count();
// Unlimited window starting at specific timestamp
KTable<Windowed<String>, Long> customStart = source
.groupByKey()
.windowedBy(UnlimitedWindows.of()
.startOn(Instant.parse("2024-01-01T00:00:00Z")))
.count();Description:
Unlimited windows (also called landmark windows) have a fixed starting point but no end time. They are effectively fixed-size windows with infinite window size. All records with timestamps greater than or equal to the start time are included in a single window.
package org.apache.kafka.streams.kstream;
@FunctionalInterface
public interface Predicate<K, V> {
boolean test(K key, V value);
}
@FunctionalInterface
public interface KeyValueMapper<K, V, R> {
R apply(K key, V value);
}
@FunctionalInterface
public interface ValueMapper<V, R> {
R apply(V value);
}
@FunctionalInterface
public interface ValueMapperWithKey<K, V, R> {
R apply(K readOnlyKey, V value);
}
@FunctionalInterface
public interface Initializer<VA> {
VA apply();
}
@FunctionalInterface
public interface Aggregator<K, V, VA> {
VA apply(K key, V value, VA aggregate);
}
@FunctionalInterface
public interface Reducer<V> {
V apply(V value1, V value2);
}
@FunctionalInterface
public interface ValueJoiner<V1, V2, R> {
R apply(V1 value1, V2 value2);
}
@FunctionalInterface
public interface ValueJoinerWithKey<K, V1, V2, R> {
R apply(K readOnlyKey, V1 value1, V2 value2);
}
@FunctionalInterface
public interface ForeachAction<K, V> {
void apply(K key, V value);
}Consumption configuration.
package org.apache.kafka.streams.kstream;
public class Consumed<K, V> {
// Factory methods
public static <K, V> Consumed<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
public static <K, V> Consumed<K, V> with(TimestampExtractor timestampExtractor);
public static <K, V> Consumed<K, V> with(AutoOffsetReset offsetReset);
// Fluent setters
public Consumed<K, V> withName(String name);
public Consumed<K, V> withOffsetResetPolicy(AutoOffsetReset offsetResetPolicy);
public Consumed<K, V> withTimestampExtractor(TimestampExtractor timestampExtractor);
public Consumed<K, V> withKeySerde(Serde<K> keySerde);
public Consumed<K, V> withValueSerde(Serde<V> valueSerde);
}Production configuration.
package org.apache.kafka.streams.kstream;
public class Produced<K, V> {
public static <K, V> Produced<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
public static <K, V> Produced<K, V> with(Serde<K> keySerde, Serde<V> valueSerde,
StreamPartitioner<? super K, ? super V> partitioner);
public Produced<K, V> withName(String name);
public Produced<K, V> withKeySerde(Serde<K> keySerde);
public Produced<K, V> withValueSerde(Serde<V> valueSerde);
public Produced<K, V> withStreamPartitioner(StreamPartitioner<? super K, ? super V> partitioner);
}Materialization configuration for state stores.
package org.apache.kafka.streams.kstream;
public class Materialized<K, V, S extends StateStore> {
public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(String storeName);
public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreSupplier<S> storeSupplier);
public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> with(Serde<K> keySerde,
Serde<V> valueSerde);
public Materialized<K, V, S> withKeySerde(Serde<K> keySerde);
public Materialized<K, V, S> withValueSerde(Serde<V> valueSerde);
public Materialized<K, V, S> withRetention(Duration retention);
public Materialized<K, V, S> withLoggingEnabled(Map<String, String> config);
public Materialized<K, V, S> withLoggingDisabled();
public Materialized<K, V, S> withCachingEnabled();
public Materialized<K, V, S> withCachingDisabled();
}Grouping configuration.
package org.apache.kafka.streams.kstream;
public class Grouped<K, V> {
public static <K, V> Grouped<K, V> with(Serde<K> keySerde, Serde<V> valueSerde);
public static <K, V> Grouped<K, V> with(String name);
public static <K, V> Grouped<K, V> as(String name);
public Grouped<K, V> withName(String name);
public Grouped<K, V> withKeySerde(Serde<K> keySerde);
public Grouped<K, V> withValueSerde(Serde<V> valueSerde);
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
import java.util.*;
public class WordCountExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Input stream
KStream<String, String> textLines = builder.stream("text-input");
// Word count pipeline
KTable<String, Long> wordCounts = textLines
// Split into words
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Filter empty words
.filter((key, word) -> !word.isEmpty())
// Group by word
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
// Count occurrences
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
// Output to topic
wordCounts.toStream().to("word-counts", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}Symptoms:
Causes:
Solutions:
import org.apache.kafka.streams.kstream.*;
// INEFFICIENT: Causes repartition
KStream<String, String> source = builder.stream("input");
KStream<String, String> result = source
.map((key, value) -> KeyValue.pair(key, value.toUpperCase())) // Repartitions!
.filter((key, value) -> value.length() > 5);
// EFFICIENT: No repartition
KStream<String, String> source = builder.stream("input");
KStream<String, String> result = source
.mapValues(value -> value.toUpperCase()) // No repartition
.filter((key, value) -> value.length() > 5);
// When repartitioning is necessary:
KStream<String, String> rekeyed = source
.selectKey((key, value) -> value.substring(0, 1)) // Changes key
.repartition(Repartitioned.<String, String>as("rekeyed")
.withNumberOfPartitions(10)); // Explicit controlPrevention:
Symptoms:
Causes:
Solutions:
// Debug join issues
// 1. Verify keys match
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
// Add logging to verify keys
stream1.peek((key, value) ->
System.out.println("Stream1 key: " + key + " (type: " +
(key != null ? key.getClass() : "null") + ")"));
stream2.peek((key, value) ->
System.out.println("Stream2 key: " + key + " (type: " +
(key != null ? key.getClass() : "null") + ")"));
// 2. Check window configuration
JoinWindows windows = JoinWindows
.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5));
// Records must arrive within 5 minutes of each other
// Increase window if needed
JoinWindows largerWindow = JoinWindows
.ofTimeDifferenceAndGrace(
Duration.ofMinutes(10), // Time difference
Duration.ofMinutes(2) // Grace period for late records
);
// 3. Verify both streams have data
stream1.foreach((key, value) ->
System.out.println("Stream1 record: " + key + " = " + value));
stream2.foreach((key, value) ->
System.out.println("Stream2 record: " + key + " = " + value));
// 4. Check join result
KStream<String, String> joined = stream1.join(
stream2,
(value1, value2) -> {
String result = value1 + ":" + value2;
System.out.println("Join produced: " + result);
return result;
},
windows
);Prevention:
// Null values in aggregations represent deletions
KStream<String, String> source = builder.stream("input");
KTable<String, Long> counts = source
.groupByKey()
.aggregate(
() -> 0L, // Initial value
(key, value, aggregate) -> {
if (value == null) {
// Null value - should we decrement or reset?
System.out.println("Received null value for key: " + key);
return 0L; // Reset count
}
return aggregate + 1;
},
Materialized.with(Serdes.String(), Serdes.Long())
);
// Null values in source topic:
// - Treated as tombstones in compacted topics
// - Passed to aggregation function
// - Can be used to reset aggregated state// Records arriving after window close
KStream<String, String> source = builder.stream("input");
// Without grace period - late records dropped
TimeWindows windowsNoGrace = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
// With grace period - late records accepted
TimeWindows windowsWithGrace = TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(5), // Window size
Duration.ofMinutes(1) // Grace period
);
KTable<Windowed<String>, Long> counts = source
.groupByKey()
.windowedBy(windowsWithGrace)
.count();
// Records arriving within grace period:
// - Included in window
// - May update already-emitted results
// - Downstream processors see updates
// Records arriving after grace period:
// - Dropped
// - Logged as late records
// - Check metrics: dropped-records-rate// Windows with no records
KStream<String, String> source = builder.stream("input");
KTable<Windowed<String>, Long> windowedCounts = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Empty windows:
// - Not materialized in state store
// - No output produced
// - Cannot distinguish between empty and non-existent
// To emit zeros for empty windows, use custom aggregation:
KTable<Windowed<String>, Long> countsWithZeros = source
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1,
Materialized.with(Serdes.String(), Serdes.Long())
);
// Still won't emit for windows with no records
// For guaranteed output, use punctuation:
source.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1,
Materialized.with(Serdes.String(), Serdes.Long())
)
.toStream()
.process(() -> new Processor<Windowed<String>, Long, String, Long>() {
private ProcessorContext<String, Long> context;
@Override
public void init(ProcessorContext<String, Long> context) {
this.context = context;
// Schedule periodic emission
context.schedule(
Duration.ofMinutes(5),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
// Emit zero counts for inactive keys
emitZeroCountsForInactiveKeys();
}
);
}
@Override
public void process(Record<Windowed<String>, Long> record) {
context.forward(new Record<>(
record.key().key(),
record.value(),
record.timestamp()
));
}
private void emitZeroCountsForInactiveKeys() {
// Implementation to track and emit zeros
}
});// Choose operations based on performance characteristics
// FAST operations (no repartition, no state):
// - filter(), filterNot()
// - mapValues(), flatMapValues()
// - peek(), foreach()
// - branch(), merge()
// MEDIUM operations (may require repartition):
// - map(), flatMap() - if key changes
// - selectKey() - always repartitions
// - groupBy() - repartitions
// EXPENSIVE operations (require state):
// - join() - requires state for buffering
// - aggregate(), reduce(), count() - requires state store
// - windowed operations - requires windowed state
// Optimize by ordering operations:
KStream<String, String> source = builder.stream("input");
// INEFFICIENT: Filter after expensive join
KStream<String, String> result1 = source
.join(otherStream, joiner, windows) // Expensive
.filter((key, value) -> value.length() > 5); // Should be earlier
// EFFICIENT: Filter before expensive join
KStream<String, String> result2 = source
.filter((key, value) -> value.length() > 5) // Cheap, reduces join input
.join(otherStream, joiner, windows); // Expensive, but less data// Window type selection guide
// Tumbling Windows (non-overlapping, fixed size)
TimeWindows tumblingWindows = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
// Use when: Fixed time buckets, no overlap needed
// Example: Hourly aggregations, daily summaries
// Hopping Windows (overlapping, fixed size and advance)
TimeWindows hoppingWindows = TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Window size
Duration.ofMinutes(1) // Grace period
).advanceBy(Duration.ofMinutes(5)); // Advance interval
// Use when: Overlapping time periods needed
// Example: Moving averages, sliding aggregations
// Session Windows (variable size, gap-based)
SessionWindows sessionWindows = SessionWindows.ofInactivityGapWithNoGrace(
Duration.ofMinutes(5)); // Inactivity gap
// Use when: Activity-based grouping
// Example: User sessions, burst detection
// Sliding Windows (continuous, for joins)
SlidingWindows slidingWindows = SlidingWindows.ofTimeDifferenceWithNoGrace(
Duration.ofMinutes(5)); // Time difference
// Use when: Stream-stream joins with time correlation
// Example: Correlating events within time window
// Performance characteristics:
// Tumbling: Fastest (non-overlapping)
// Hopping: Medium (overlapping creates multiple windows per record)
// Session: Variable (depends on activity patterns)
// Sliding: Expensive (many window combinations for joins)Enrich stream with data from multiple KTables:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
public class MultiStageEnrichment {
public static Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Source stream
KStream<String, String> orders = builder.stream("orders",
Consumed.with(Serdes.String(), Serdes.String()));
// Reference tables
KTable<String, String> customers = builder.table("customers",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("customers-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
KTable<String, String> products = builder.table("products",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("products-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
KTable<String, String> inventory = builder.table("inventory",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("inventory-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
// Multi-stage enrichment
orders
// Stage 1: Extract customer ID and join with customer data
.selectKey((key, value) -> extractCustomerId(value))
.join(customers, (order, customer) ->
enrichWithCustomer(order, customer))
// Stage 2: Extract product ID and join with product data
.selectKey((key, enriched) -> extractProductId(enriched))
.join(products, (enriched, product) ->
enrichWithProduct(enriched, product))
// Stage 3: Join with inventory for availability
.leftJoin(inventory, (enriched, stock) ->
enrichWithInventory(enriched, stock))
// Stage 4: Filter and validate
.filter((key, enriched) -> isValid(enriched))
// Output enriched orders
.to("enriched-orders", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
private static String extractCustomerId(String order) {
// Parse order JSON and extract customer_id
return "customer-123"; // Simplified
}
private static String extractProductId(String enriched) {
// Parse enriched data and extract product_id
return "product-456"; // Simplified
}
private static String enrichWithCustomer(String order, String customer) {
return order + ",customer:" + customer;
}
private static String enrichWithProduct(String enriched, String product) {
return enriched + ",product:" + product;
}
private static String enrichWithInventory(String enriched, String stock) {
String availability = (stock != null && Integer.parseInt(stock) > 0) ?
"available" : "out-of-stock";
return enriched + ",availability:" + availability;
}
private static boolean isValid(String enriched) {
return !enriched.contains("out-of-stock");
}
}Detect patterns across multiple events:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
public class ComplexEventProcessing {
public static Topology buildFraudDetectionTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> transactions = builder.stream("transactions");
// Pattern 1: Multiple transactions from same user in short time
KTable<String, Long> transactionCounts = transactions
.selectKey((key, value) -> extractUserId(value))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("transaction-counts"));
// Pattern 2: High-value transactions
KStream<String, String> highValueTransactions = transactions
.filter((key, value) ->
Double.parseDouble(extractAmount(value)) > 10000);
// Pattern 3: Geographically suspicious transactions
KStream<String, String> suspiciousLocations = transactions
.filter((key, value) ->
isDistantFromPreviousLocation(value));
// Combine patterns
KStream<String, String> potentialFraud = highValueTransactions
.merge(suspiciousLocations)
.selectKey((key, value) -> extractUserId(value))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.reduce((v1, v2) -> v1 + "," + v2)
.toStream()
.filter((windowedKey, value) ->
value.split(",").length >= 2) // At least 2 suspicious events
.selectKey((windowedKey, value) -> windowedKey.key());
potentialFraud.to("fraud-alerts");
return builder.build();
}
private static String extractUserId(String transaction) {
return "user-123"; // Simplified
}
private static String extractAmount(String transaction) {
return "15000"; // Simplified
}
private static boolean isDistantFromPreviousLocation(String transaction) {
return false; // Simplified - check against user's location history
}
}Compute metrics across different time windows:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
public class MultiWindowAggregation {
public static Topology buildMetricsTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> events = builder.stream("events",
Consumed.with(Serdes.String(), Serdes.Long()));
KGroupedStream<String, Long> grouped = events.groupByKey();
// 1-minute window aggregation
KTable<Windowed<String>, Long> oneMinuteCounts = grouped
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("one-minute-counts"));
oneMinuteCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key() + "@1m",
String.format("{\"window\":\"1m\",\"key\":\"%s\",\"count\":%d," +
"\"start\":%d,\"end\":%d}",
windowedKey.key(), count,
windowedKey.window().start(),
windowedKey.window().end())))
.to("metrics-1m");
// 5-minute window aggregation
KTable<Windowed<String>, Long> fiveMinuteCounts = grouped
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("five-minute-counts"));
fiveMinuteCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key() + "@5m",
String.format("{\"window\":\"5m\",\"key\":\"%s\",\"count\":%d}",
windowedKey.key(), count)))
.to("metrics-5m");
// 1-hour window aggregation
KTable<Windowed<String>, Long> oneHourCounts = grouped
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("one-hour-counts"));
oneHourCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key() + "@1h",
String.format("{\"window\":\"1h\",\"key\":\"%s\",\"count\":%d}",
windowedKey.key(), count)))
.to("metrics-1h");
return builder.build();
}
}Route records to different topics based on content:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
public class DynamicRouting {
public static Topology buildRoutingTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("input-events");
// Branch into multiple streams based on content
Map<String, KStream<String, String>> branches = events
.split(Named.as("branch-"))
.branch((key, value) -> value.contains("CRITICAL"),
Branched.as("critical"))
.branch((key, value) -> value.contains("ERROR"),
Branched.as("error"))
.branch((key, value) -> value.contains("WARNING"),
Branched.as("warning"))
.defaultBranch(Branched.as("info"))
.asMap();
// Route each branch to appropriate topic
branches.get("branch-critical").to("critical-events");
branches.get("branch-error").to("error-events");
branches.get("branch-warning").to("warning-events");
branches.get("branch-info").to("info-events");
// Alternative: Dynamic topic selection
events.to((key, value, recordContext) -> {
if (value.contains("CRITICAL")) return "critical-events";
if (value.contains("ERROR")) return "error-events";
if (value.contains("WARNING")) return "warning-events";
return "info-events";
});
return builder.build();
}
}Deduplicate records using state store:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
public class StatefulDeduplication {
public static Topology buildDeduplicationTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Create state store for tracking seen records
StoreBuilder<WindowStore<String, String>> deduplicationStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore("deduplication-store",
Duration.ofHours(24), // Retention period
Duration.ofMinutes(1), // Window size
false), // Not retain duplicates
Serdes.String(),
Serdes.String());
builder.addStateStore(deduplicationStoreBuilder);
KStream<String, String> events = builder.stream("input-events");
// Deduplicate using transformValues with state store
KStream<String, String> deduplicated = events
.transformValues(() -> new ValueTransformerWithKey<String, String, String>() {
private WindowStore<String, String> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = context.getStateStore("deduplication-store");
}
@Override
public String transform(String key, String value) {
long timestamp = System.currentTimeMillis();
// Check if we've seen this record recently
String deduplicationKey = key + ":" + value.hashCode();
try (WindowStoreIterator<String> iterator =
stateStore.fetch(deduplicationKey,
timestamp - Duration.ofHours(1).toMillis(),
timestamp)) {
if (iterator.hasNext()) {
// Duplicate found
return null; // Filter out
}
}
// Not a duplicate - store and pass through
stateStore.put(deduplicationKey, value, timestamp);
return value;
}
@Override
public void close() {}
}, "deduplication-store")
.filter((key, value) -> value != null); // Remove nulls (duplicates)
deduplicated.to("deduplicated-events");
return builder.build();
}
}Detect anomalies using sliding window statistics:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
public class AnomalyDetection {
public static Topology buildAnomalyDetectionTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> metrics = builder.stream("metrics",
Consumed.with(Serdes.String(), Serdes.Double()));
// Calculate rolling statistics
KTable<Windowed<String>, AggregatedStats> statistics = metrics
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
.aggregate(
AggregatedStats::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
},
Materialized.<String, AggregatedStats, WindowStore<Bytes, byte[]>>as("statistics-store")
.withKeySerde(Serdes.String())
.withValueSerde(new AggregatedStatsSerde())
);
// Join metrics with statistics to detect anomalies
KStream<String, String> anomalies = metrics
.join(statistics.toStream(),
(value, stats) -> {
double mean = stats.getMean();
double stdDev = stats.getStdDev();
double threshold = 3.0; // 3 sigma rule
if (Math.abs(value - mean) > threshold * stdDev) {
return String.format(
"{\"value\":%.2f,\"mean\":%.2f,\"stddev\":%.2f," +
"\"deviation\":%.2f,\"threshold\":%.2f}",
value, mean, stdDev,
Math.abs(value - mean) / stdDev, threshold);
}
return null;
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)),
StreamJoined.with(Serdes.String(), Serdes.Double(), new AggregatedStatsSerde()))
.filter((key, value) -> value != null);
anomalies.to("anomalies");
return builder.build();
}
// Helper class for statistics aggregation
public static class AggregatedStats {
private long count = 0;
private double sum = 0;
private double sumOfSquares = 0;
public void add(double value) {
count++;
sum += value;
sumOfSquares += value * value;
}
public double getMean() {
return count > 0 ? sum / count : 0;
}
public double getStdDev() {
if (count <= 1) return 0;
double mean = getMean();
double variance = (sumOfSquares / count) - (mean * mean);
return Math.sqrt(variance);
}
}
// Serde for AggregatedStats (simplified - implement properly)
public static class AggregatedStatsSerde implements Serde<AggregatedStats> {
@Override
public Serializer<AggregatedStats> serializer() {
return (topic, data) -> null; // Implement serialization
}
@Override
public Deserializer<AggregatedStats> deserializer() {
return (topic, data) -> null; // Implement deserialization
}
}
}Complete testing example with TopologyTestDriver:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.test.*;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
public class StreamsTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
@BeforeEach
public void setup() {
StreamsBuilder builder = new StreamsBuilder();
// Build topology
KStream<String, String> input = builder.stream("input");
input
.filter((key, value) -> value != null)
.mapValues(String::toUpperCase)
.to("output");
// Create test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
testDriver = new TopologyTestDriver(builder.build(), props);
// Create test topics
inputTopic = testDriver.createInputTopic("input",
Serdes.String().serializer(),
Serdes.String().serializer());
outputTopic = testDriver.createOutputTopic("output",
Serdes.String().deserializer(),
Serdes.String().deserializer());
}
@AfterEach
public void tearDown() {
testDriver.close();
}
@Test
public void shouldTransformToUpperCase() {
// Given
inputTopic.pipeInput("key1", "hello");
inputTopic.pipeInput("key2", "world");
// When
var outputs = outputTopic.readKeyValuesToList();
// Then
assertEquals(2, outputs.size());
assertEquals("HELLO", outputs.get(0).value);
assertEquals("WORLD", outputs.get(1).value);
}
@Test
public void shouldFilterNullValues() {
// Given
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", null);
inputTopic.pipeInput("key3", "value3");
// When
var outputs = outputTopic.readKeyValuesToList();
// Then
assertEquals(2, outputs.size());
assertEquals("VALUE1", outputs.get(0).value);
assertEquals("VALUE3", outputs.get(1).value);
}
@Test
public void shouldHandleEmptyInput() {
// No input
// When
assertTrue(outputTopic.isEmpty());
}
}