tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka Streams is a client library for building stream processing applications and microservices. It provides high-level DSL and low-level Processor API for stream transformations.
Main application class for Kafka Streams.
package org.apache.kafka.streams;
public class KafkaStreams implements AutoCloseable {
// Constructors
public KafkaStreams(Topology topology, Properties props);
public KafkaStreams(Topology topology, StreamsConfig config);
public KafkaStreams(Topology topology,
Properties props,
KafkaClientSupplier clientSupplier);
// Lifecycle
public void start();
public void close();
public void close(Duration timeout);
public void close(CloseOptions options);
public void cleanUp();
// State management
public State state();
public void setStateListener(StateListener listener);
public void pause();
public void resume();
public boolean isPaused();
// Thread management
public Optional<String> addStreamThread();
public Optional<String> removeStreamThread();
public Optional<String> removeStreamThread(Duration timeout);
// Exception handling
public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler handler);
public void setGlobalStateRestoreListener(StateRestoreListener listener);
public void setStandbyUpdateListener(StandbyUpdateListener listener);
// Interactive queries
public <T> T store(StoreQueryParameters<T> storeQueryParameters);
public <R> StateQueryResult<R> query(StateQueryRequest<R> request);
// Metadata
public Collection<StreamsMetadata> metadataForAllStreamsClients();
public Collection<StreamsMetadata> streamsMetadataForStore(String storeName);
public <K> KeyQueryMetadata queryMetadataForKey(String storeName,
K key,
Serializer<K> keySerializer);
public Collection<ThreadMetadata> metadataForLocalThreads();
public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags();
// Monitoring
public Map<MetricName, ? extends Metric> metrics();
public Set<Uuid> clientInstanceIds(Duration timeout);
// State enum
public enum State {
CREATED,
REBALANCING,
RUNNING,
PENDING_SHUTDOWN,
NOT_RUNNING,
PENDING_ERROR,
ERROR
}
}Basic Usage:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));Options for closing Kafka Streams.
package org.apache.kafka.streams;
public class CloseOptions {
public CloseOptions leaveGroup(boolean leaveGroup);
public CloseOptions timeout(Duration timeout);
}Usage:
CloseOptions options = new CloseOptions()
.leaveGroup(true)
.timeout(Duration.ofSeconds(30));
streams.close(options);High-level DSL topology builder.
package org.apache.kafka.streams;
public class StreamsBuilder {
// Constructor
public StreamsBuilder();
public StreamsBuilder(TopologyConfig topologyConfigs);
// Stream creation
public <K, V> KStream<K, V> stream(String topic);
public <K, V> KStream<K, V> stream(String topic, Consumed<K, V> consumed);
public <K, V> KStream<K, V> stream(Collection<String> topics);
public <K, V> KStream<K, V> stream(Collection<String> topics,
Consumed<K, V> consumed);
public <K, V> KStream<K, V> stream(Pattern topicPattern);
public <K, V> KStream<K, V> stream(Pattern topicPattern, Consumed<K, V> consumed);
// Table creation
public <K, V> KTable<K, V> table(String topic);
public <K, V> KTable<K, V> table(String topic, Consumed<K, V> consumed);
public <K, V> KTable<K, V> table(String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
public <K, V> KTable<K, V> table(String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
// Global table creation
public <K, V> GlobalKTable<K, V> globalTable(String topic);
public <K, V> GlobalKTable<K, V> globalTable(String topic, Consumed<K, V> consumed);
public <K, V> GlobalKTable<K, V> globalTable(String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
public <K, V> GlobalKTable<K, V> globalTable(String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
// State stores
public StreamsBuilder addStateStore(StoreBuilder<?> builder);
public StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder,
String topic,
Consumed<KIn, VIn> consumed,
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier);
// Build topology
public Topology build();
public Topology build(Properties props);
}Usage Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
StreamsBuilder builder = new StreamsBuilder();
// Create stream with serdes
KStream<String, String> source = builder.stream("input-topic",
Consumed.with(Serdes.String(), Serdes.String()));
// Create table
KTable<String, Long> counts = builder.table("counts-topic",
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
// Create global table
GlobalKTable<String, String> globalData = builder.globalTable("reference-data");
// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long());
builder.addStateStore(storeBuilder);
// Build topology
Topology topology = builder.build();Low-level topology builder (Processor API).
package org.apache.kafka.streams;
public class Topology {
// Constructor
public Topology();
public Topology(TopologyConfig topologyConfigs);
// Source processors
public Topology addSource(String name, String... topics);
public Topology addSource(AutoOffsetReset offsetReset,
String name,
String... topics);
public Topology addSource(String name, Pattern topicPattern);
public Topology addSource(TimestampExtractor timestampExtractor,
String name,
String... topics);
public <K, V> Topology addSource(String name,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String... topics);
// Sink processors
public Topology addSink(String name,
String topic,
String... parentNames);
public <K, V> Topology addSink(String name,
String topic,
StreamPartitioner<? super K, ? super V> partitioner,
String... parentNames);
public <K, V> Topology addSink(String name,
TopicNameExtractor<? super K, ? super V> topicExtractor,
String... parentNames);
public <K, V> Topology addSink(String name,
String topic,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
String... parentNames);
// Custom processors
public <KIn, VIn, KOut, VOut> Topology addProcessor(
String name,
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
String... parentNames);
// State stores
public <S extends StateStore> Topology addStateStore(
StoreBuilder<S> storeBuilder,
String... processorNames);
public Topology addReadOnlyStateStore(
StoreBuilder<?> storeBuilder,
String... processorNames);
public <KIn, VIn> Topology addGlobalStore(
StoreBuilder<?> storeBuilder,
String sourceName,
Deserializer<KIn> keyDeserializer,
Deserializer<VIn> valueDeserializer,
String topic,
String processorName,
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier);
public Topology connectProcessorAndStateStores(String processorName,
String... stateStoreNames);
// Description
public TopologyDescription describe();
}Usage Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.common.serialization.*;
Topology topology = new Topology();
// Add source
topology.addSource("Source",
new StringDeserializer(),
new StringDeserializer(),
"input-topic");
// Add processor
topology.addProcessor("Process",
() -> new MyProcessor(),
"Source");
// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long());
topology.addStateStore(storeBuilder, "Process");
// Add sink
topology.addSink("Sink",
"output-topic",
new StringSerializer(),
new StringSerializer(),
"Process");
// Describe topology
TopologyDescription description = topology.describe();
System.out.println(description);Configuration for topology.
package org.apache.kafka.streams;
public class TopologyConfig {
public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG =
"buffered.records.per.partition";
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
}Configuration for Kafka Streams applications.
package org.apache.kafka.streams;
public class StreamsConfig extends AbstractConfig {
// Required configurations
public static final String APPLICATION_ID_CONFIG = "application.id";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
// Common configurations
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG =
"statestore.cache.max.bytes";
// Serdes
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG =
"default.timestamp.extractor";
// State and topology
public static final String STATE_DIR_CONFIG = "state.dir";
public static final String APPLICATION_SERVER_CONFIG = "application.server";
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
// Client prefixes
public static final String CONSUMER_PREFIX = "consumer.";
public static final String PRODUCER_PREFIX = "producer.";
public static final String ADMIN_CLIENT_PREFIX = "admin.";
public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
// Helper methods
public static String consumerPrefix(String consumerProp);
public static String producerPrefix(String producerProp);
public static String adminClientPrefix(String adminClientProp);
public static String topicPrefix(String topicProp);
// Processing guarantee constants
public static final String AT_LEAST_ONCE = "at_least_once";
public static final String EXACTLY_ONCE_V2 = "exactly_once_v2";
// Topology optimization constants
public static final String NO_OPTIMIZATION = "none";
public static final String OPTIMIZE = "all";
}Configuration Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
Properties props = new Properties();
// Required
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Processing guarantee
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// State directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Thread count
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
// Commit interval
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); // 10 seconds
// Cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024); // 10MB
// Standby replicas
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// Topology optimization
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
// Consumer config
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 30000);
// Producer config
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 100);
KafkaStreams streams = new KafkaStreams(topology, props);Configuration Defaults:
num.stream.threads: 1processing.guarantee: "at_least_once"commit.interval.ms: 30000 (30 seconds)statestore.cache.max.bytes: 10485760 (10MB)state.dir: /tmp/kafka-streamsnum.standby.replicas: 0max.task.idle.ms: 0topology.optimization: "none"import org.apache.kafka.streams.*;
KafkaStreams streams = new KafkaStreams(topology, props);
// State: CREATED
System.out.println("Initial state: " + streams.state());
// Start application
streams.start();
// State: REBALANCING -> RUNNING
// Check state
if (streams.state() == KafkaStreams.State.RUNNING) {
System.out.println("Streams is running");
}
// Pause processing
streams.pause();
System.out.println("Processing paused");
// Resume processing
streams.resume();
System.out.println("Processing resumed");
// Close application
streams.close();
// State: PENDING_SHUTDOWN -> NOT_RUNNINGimport org.apache.kafka.streams.*;
public interface StateListener {
void onChange(KafkaStreams.State newState, KafkaStreams.State oldState);
}
// Usage
KafkaStreams streams = new KafkaStreams(topology, props);
streams.setStateListener((newState, oldState) -> {
System.out.println("State transition: " + oldState + " -> " + newState);
if (newState == KafkaStreams.State.RUNNING) {
System.out.println("Application is now running");
} else if (newState == KafkaStreams.State.ERROR) {
System.err.println("Application entered error state");
}
});
streams.start();import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;
public interface StreamsUncaughtExceptionHandler {
enum StreamThreadExceptionResponse {
REPLACE_THREAD, // Replace failed thread
SHUTDOWN_CLIENT, // Shutdown entire client
SHUTDOWN_APPLICATION // Shutdown application
}
StreamThreadExceptionResponse handle(Throwable exception);
}
// Usage
streams.setUncaughtExceptionHandler(exception -> {
System.err.println("Uncaught exception: " + exception.getMessage());
if (exception instanceof ProducerFencedException) {
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
} else if (exception instanceof InvalidStateStoreException) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});import org.apache.kafka.streams.processor.*;
import org.apache.kafka.common.TopicPartition;
public interface StateRestoreListener {
void onRestoreStart(TopicPartition topicPartition,
String storeName,
long startingOffset,
long endingOffset);
void onBatchRestored(TopicPartition topicPartition,
String storeName,
long batchEndOffset,
long numRestored);
void onRestoreEnd(TopicPartition topicPartition,
String storeName,
long totalRestored);
}
// Usage
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(TopicPartition topicPartition, String storeName,
long startingOffset, long endingOffset) {
System.out.println("Starting restore of " + storeName +
" for partition " + topicPartition);
}
@Override
public void onBatchRestored(TopicPartition topicPartition, String storeName,
long batchEndOffset, long numRestored) {
System.out.println("Restored batch: " + numRestored + " records");
}
@Override
public void onRestoreEnd(TopicPartition topicPartition, String storeName,
long totalRestored) {
System.out.println("Finished restoring " + storeName +
": " + totalRestored + " records");
}
});import org.apache.kafka.streams.*;
import java.util.Optional;
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Add a stream thread
Optional<String> threadName = streams.addStreamThread();
if (threadName.isPresent()) {
System.out.println("Added thread: " + threadName.get());
}
// Remove a stream thread
Optional<String> removedThread = streams.removeStreamThread();
if (removedThread.isPresent()) {
System.out.println("Removed thread: " + removedThread.get());
}
// Remove with timeout
Optional<String> removedWithTimeout = streams.removeStreamThread(Duration.ofSeconds(10));import org.apache.kafka.streams.*;
import java.util.Collection;
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Collection<ThreadMetadata> threadMetadata = streams.metadataForLocalThreads();
for (ThreadMetadata metadata : threadMetadata) {
System.out.println("Thread: " + metadata.threadName());
System.out.println("State: " + metadata.threadState());
System.out.println("Active tasks: " + metadata.activeTasks());
System.out.println("Standby tasks: " + metadata.standbyTasks());
}import org.apache.kafka.streams.*;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Collection;
// Get metadata for all instances
Collection<StreamsMetadata> allMetadata = streams.metadataForAllStreamsClients();
for (StreamsMetadata metadata : allMetadata) {
System.out.println("Host: " + metadata.hostInfo());
System.out.println("State stores: " + metadata.stateStoreNames());
System.out.println("Topic partitions: " + metadata.topicPartitions());
}
// Get metadata for a specific store
Collection<StreamsMetadata> storeMetadata =
streams.streamsMetadataForStore("my-store");
// Query metadata for a specific key
Serializer<String> keySerializer = Serdes.String().serializer();
KeyQueryMetadata keyMetadata =
streams.queryMetadataForKey("my-store", "my-key", keySerializer);
if (keyMetadata != KeyQueryMetadata.NOT_AVAILABLE) {
System.out.println("Active host: " + keyMetadata.activeHost());
System.out.println("Standby hosts: " + keyMetadata.standbyHosts());
System.out.println("Partition: " + keyMetadata.partition());
}import org.apache.kafka.streams.*;
import java.util.Map;
Map<String, Map<Integer, LagInfo>> allLags = streams.allLocalStorePartitionLags();
for (Map.Entry<String, Map<Integer, LagInfo>> storeEntry : allLags.entrySet()) {
String storeName = storeEntry.getKey();
System.out.println("Store: " + storeName);
for (Map.Entry<Integer, LagInfo> partitionEntry : storeEntry.getValue().entrySet()) {
int partition = partitionEntry.getKey();
LagInfo lagInfo = partitionEntry.getValue();
System.out.println(" Partition " + partition +
": current offset = " + lagInfo.currentOffsetPosition() +
", end offset = " + lagInfo.endOffsetPosition() +
", lag = " + lagInfo.offsetLag());
}
}import org.apache.kafka.streams.*;
KafkaStreams streams = new KafkaStreams(topology, props);
// Clean up local state (call before start() to reset state)
streams.cleanUp();
// Start application
streams.start();
// ... application runs ...
// Close and cleanup
streams.close();
streams.cleanUp(); // Remove local stateWarning: cleanUp() deletes the local state directory. Only call it when you want to reset application state.
import org.apache.kafka.streams.*;
KafkaStreams streams = new KafkaStreams(topology, props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down Kafka Streams");
streams.close(Duration.ofSeconds(30));
}));
streams.start();
// Keep application running
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;
KafkaStreams streams = new KafkaStreams(topology, props);
streams.setUncaughtExceptionHandler(exception -> {
System.err.println("Stream exception: " + exception.getMessage());
exception.printStackTrace();
// Decide recovery strategy
if (exception instanceof InvalidStateStoreException) {
// Temporary issue, replace thread
return StreamThreadExceptionResponse.REPLACE_THREAD;
} else if (exception instanceof ProducerFencedException) {
// Another instance took over, shutdown
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
} else {
// Unknown error, shutdown application
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
});
streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.ERROR) {
System.err.println("Application in ERROR state");
// Implement alerting or recovery logic
}
});
streams.start();import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
Map<MetricName, ? extends Metric> metrics = streams.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
System.out.println(name.group() + "." + name.name() + ": " +
entry.getValue().metricValue());
}Key metrics to monitor:
stream-metrics:commit-latency-avgstream-metrics:poll-latency-avgstream-metrics:process-latency-avgstream-metrics:failed-stream-threadsstream-task-metrics:commit-ratestream-task-metrics:process-rateSymptoms:
Causes:
Solutions:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
public <K, V> V queryStoreWithRetry(KafkaStreams streams,
String storeName,
K key,
int maxRetries) {
int retries = 0;
while (retries < maxRetries) {
try {
// Check application state first
if (streams.state() != KafkaStreams.State.RUNNING) {
System.out.println("Application not running, state: " + streams.state());
Thread.sleep(1000);
retries++;
continue;
}
ReadOnlyKeyValueStore<K, V> store = streams.store(
StoreQueryParameters.fromNameAndType(
storeName,
QueryableStoreTypes.keyValueStore()
)
);
return store.get(key);
} catch (InvalidStateStoreException e) {
retries++;
System.err.println("Store unavailable (attempt " + retries + "): " +
e.getMessage());
if (retries >= maxRetries) {
throw e;
}
try {
Thread.sleep(1000 * retries); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", ie);
}
}
}
throw new RuntimeException("Failed to query store after " + maxRetries + " retries");
}Prevention:
Symptoms:
Causes:
Solutions:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Reduce cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG,
5 * 1024 * 1024); // 5MB (down from 10MB default)
// Reduce stream threads
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
// Use RocksDB for large states (offloads to disk)
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("large-store"), // RocksDB-backed
Serdes.String(),
Serdes.Long()
).withCachingDisabled(); // Disable caching for large stores
// Or increase JVM heap
// -Xms1g -Xmx2g
// Monitor memory usage
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double memoryUsage = (double) usedMemory / maxMemory * 100;
if (memoryUsage > 80) {
System.err.println("WARNING: Memory usage at " + memoryUsage + "%");
}Prevention:
Symptoms:
Causes:
Solutions:
// Add more stream threads
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Or dynamically adjust
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Monitor lag
Map<String, Map<Integer, LagInfo>> allLags = streams.allLocalStorePartitionLags();
long totalLag = 0;
for (Map<Integer, LagInfo> storeLags : allLags.values()) {
for (LagInfo lagInfo : storeLags.values()) {
totalLag += lagInfo.offsetLag();
}
}
if (totalLag > 100000) {
System.out.println("High lag detected, adding stream thread");
Optional<String> threadName = streams.addStreamThread();
if (threadName.isPresent()) {
System.out.println("Added thread: " + threadName.get());
}
}
// Or optimize topology
// - Remove unnecessary operations
// - Use mapValues instead of map (doesn't repartition)
// - Reduce windowing for aggregations
// - Disable caching if not neededPrevention:
KafkaStreams streams = new KafkaStreams(topology, props);
// Set state listener
streams.setStateListener((newState, oldState) -> {
System.out.println("State change: " + oldState + " -> " + newState);
if (newState == KafkaStreams.State.REBALANCING) {
System.out.println("Application rebalancing - stores may be unavailable");
} else if (newState == KafkaStreams.State.RUNNING) {
System.out.println("Application running - stores are available");
}
});
streams.start();
// Wait for RUNNING state before querying
long startTime = System.currentTimeMillis();
while (streams.state() != KafkaStreams.State.RUNNING) {
if (System.currentTimeMillis() - startTime > 60000) {
throw new TimeoutException("Application didn't start within 60 seconds");
}
Thread.sleep(100);
}
// Now safe to query stores
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"my-store",
QueryableStoreTypes.keyValueStore()
)
);// Tasks can migrate between instances during rebalancing
// State stores become unavailable when task migrates
streams.setUncaughtExceptionHandler(exception -> {
if (exception instanceof org.apache.kafka.streams.errors.TaskMigratedException) {
System.out.println("Task migrated to another instance");
// Return REPLACE_THREAD to continue processing with new assignment
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
// Handle in processor
public class MyProcessor implements Processor<String, String, String, String> {
private ProcessorContext<String, String> context;
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
try {
this.store = context.getStateStore("my-store");
} catch (InvalidStateStoreException e) {
// Store not available during init - will be retried
System.err.println("Store not available during init: " + e.getMessage());
}
}
@Override
public void process(Record<String, String> record) {
if (store == null) {
try {
store = context.getStateStore("my-store");
} catch (InvalidStateStoreException e) {
// Store still not available - skip processing
System.err.println("Store unavailable, skipping record");
return;
}
}
// Process with store
Long count = store.get(record.key());
store.put(record.key(), count == null ? 1L : count + 1);
}
}// Exactly-once processing requires careful handling of multiple inputs
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Multiple input topics with different processing speeds
KStream<String, String> fastStream = builder.stream("fast-topic");
KStream<String, String> slowStream = builder.stream("slow-topic");
// Join may cause alignment issues
KStream<String, String> joined = fastStream.join(
slowStream,
(fastValue, slowValue) -> fastValue + ":" + slowValue,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
// Configure max.task.idle.ms to prevent slow topic from blocking fast topic
props.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 60000); // 1 minute
// This allows fast topic to proceed even if slow topic is idle
KafkaStreams streams = new KafkaStreams(builder.build(), props);Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
// Automatic optimizations:
// - Repartition topic merging
// - Source topic merging
// - Key-preserving operation fusion
// Manual optimization: Use mapValues instead of map
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// INEFFICIENT: map() changes key, causes repartition
KStream<String, String> inefficient = source
.map((key, value) -> KeyValue.pair(key, value.toUpperCase()));
// EFFICIENT: mapValues() preserves key, no repartition
KStream<String, String> efficient = source
.mapValues(value -> value.toUpperCase());
// Result: No unnecessary repartition, better performance// RocksDB configuration for large state stores
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// RocksDB specific options
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
"com.example.CustomRocksDBConfigSetter");
// Implement custom RocksDB config
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;
import java.util.Map;
public class CustomRocksDBConfigSetter implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
// Increase block cache for better read performance
org.rocksdb.BlockBasedTableConfig tableConfig =
new org.rocksdb.BlockBasedTableConfig();
tableConfig.setBlockCacheSize(256 * 1024 * 1024L); // 256MB
tableConfig.setBlockSize(16 * 1024); // 16KB
options.setTableFormatConfig(tableConfig);
// Increase write buffer for better write performance
options.setWriteBufferSize(64 * 1024 * 1024); // 64MB
options.setMaxWriteBufferNumber(3);
// Enable bloom filters for faster lookups
tableConfig.setFilterPolicy(new org.rocksdb.BloomFilter(10));
// Compression
options.setCompressionType(org.rocksdb.CompressionType.LZ4_COMPRESSION);
}
@Override
public void close(String storeName, Options options) {
// Cleanup if needed
}
}// Scale Kafka Streams applications
// Option 1: Add more stream threads (vertical scaling)
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
// Dynamically add threads
Optional<String> newThread = streams.addStreamThread();
// Option 2: Add more instances (horizontal scaling)
// Each instance should have same application.id
// Tasks will be distributed across all instances
// Instance 1:
Properties props1 = new Properties();
props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "shared-app-id");
props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams1 = new KafkaStreams(topology, props1);
streams1.start();
// Instance 2 (same application.id):
Properties props2 = new Properties();
props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "shared-app-id");
props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams2 = new KafkaStreams(topology, props2);
streams2.start();
// Both instances will coordinate and share tasks// Stateless topology - simpler but no local state
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source
.filter((key, value) -> value != null)
.mapValues(String::toUpperCase)
.to("output-topic");
// No state stores needed
// No state directory management
// Faster restarts
// But cannot do aggregations or joins that require state// Can only have ONE topology per KafkaStreams instance
// For multiple topologies, create multiple instances
// Topology 1: Process stream A
StreamsBuilder builder1 = new StreamsBuilder();
builder1.stream("stream-a").to("output-a");
Topology topology1 = builder1.build();
// Topology 2: Process stream B
StreamsBuilder builder2 = new StreamsBuilder();
builder2.stream("stream-b").to("output-b");
Topology topology2 = builder2.build();
// Need separate KafkaStreams instances
Properties props1 = new Properties();
props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-a");
props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Properties props2 = new Properties();
props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-b");
props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams1 = new KafkaStreams(topology1, props1);
KafkaStreams streams2 = new KafkaStreams(topology2, props2);
streams1.start();
streams2.start();
// Must use different application.id for different topologies// State stores restored from changelog topics on startup
// Can take significant time for large stores
KafkaStreams streams = new KafkaStreams(topology, props);
// Monitor restoration progress
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
private long totalToRestore = 0;
private long totalRestored = 0;
@Override
public void onRestoreStart(TopicPartition topicPartition,
String storeName,
long startingOffset,
long endingOffset) {
long recordsToRestore = endingOffset - startingOffset;
totalToRestore += recordsToRestore;
System.out.println("Starting restore of " + storeName +
" partition " + topicPartition.partition() +
": " + recordsToRestore + " records");
}
@Override
public void onBatchRestored(TopicPartition topicPartition,
String storeName,
long batchEndOffset,
long numRestored) {
totalRestored += numRestored;
if (totalToRestore > 0) {
double progress = (double) totalRestored / totalToRestore * 100;
System.out.println("Restoration progress: " +
String.format("%.2f", progress) + "%");
}
}
@Override
public void onRestoreEnd(TopicPartition topicPartition,
String storeName,
long totalRestored) {
System.out.println("Completed restore of " + storeName +
": " + totalRestored + " records");
}
});
streams.start();
// For faster startup, use standby replicas
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// Standby replicas stay up-to-date, reducing restoration time on failover// AT_LEAST_ONCE (default)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
// - Best performance
// - May process records multiple times on failure
// - Use when: Idempotent processing or duplicates acceptable
// EXACTLY_ONCE_V2 (recommended for exactly-once)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// - Guarantees each record processed exactly once
// - Slightly lower performance (transactional overhead)
// - Requires broker version 2.5+
// - Use when: Cannot tolerate duplicates
// Performance comparison:
// - AT_LEAST_ONCE: ~10-15% higher throughput
// - EXACTLY_ONCE_V2: Stronger guarantees, slight overhead
// EXACTLY_ONCE (deprecated, use EXACTLY_ONCE_V2)
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);// commit.interval.ms controls checkpoint frequency
// Tradeoff between:
// - Fault tolerance (lower = less reprocessing on failure)
// - Performance (higher = better throughput)
// Low latency / fault tolerance
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // 1 second
// - Fast recovery on failure
// - More overhead
// - Use for: Critical data, low latency requirements
// High throughput
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); // 30 seconds (default)
// - Less overhead
// - More reprocessing on failure
// - Use for: Batch processing, high throughput needs
// Calculate appropriate value:
int avgProcessingTimeMs = 100; // per record
int recordsPerSecond = 1000;
int acceptableReprocessingRecords = 10000;
int commitIntervalMs = (acceptableReprocessingRecords / recordsPerSecond) * 1000;
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);