or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

overview.mddocs/streams/

Streams Overview

Kafka Streams is a client library for building stream processing applications and microservices. It provides high-level DSL and low-level Processor API for stream transformations.

Core Entry Point

KafkaStreams

Main application class for Kafka Streams.

package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {
    // Constructors
    public KafkaStreams(Topology topology, Properties props);
    public KafkaStreams(Topology topology, StreamsConfig config);
    public KafkaStreams(Topology topology,
                       Properties props,
                       KafkaClientSupplier clientSupplier);

    // Lifecycle
    public void start();
    public void close();
    public void close(Duration timeout);
    public void close(CloseOptions options);
    public void cleanUp();

    // State management
    public State state();
    public void setStateListener(StateListener listener);
    public void pause();
    public void resume();
    public boolean isPaused();

    // Thread management
    public Optional<String> addStreamThread();
    public Optional<String> removeStreamThread();
    public Optional<String> removeStreamThread(Duration timeout);

    // Exception handling
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler handler);
    public void setGlobalStateRestoreListener(StateRestoreListener listener);
    public void setStandbyUpdateListener(StandbyUpdateListener listener);

    // Interactive queries
    public <T> T store(StoreQueryParameters<T> storeQueryParameters);
    public <R> StateQueryResult<R> query(StateQueryRequest<R> request);

    // Metadata
    public Collection<StreamsMetadata> metadataForAllStreamsClients();
    public Collection<StreamsMetadata> streamsMetadataForStore(String storeName);
    public <K> KeyQueryMetadata queryMetadataForKey(String storeName,
                                                     K key,
                                                     Serializer<K> keySerializer);
    public Collection<ThreadMetadata> metadataForLocalThreads();
    public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags();

    // Monitoring
    public Map<MetricName, ? extends Metric> metrics();
    public Set<Uuid> clientInstanceIds(Duration timeout);

    // State enum
    public enum State {
        CREATED,
        REBALANCING,
        RUNNING,
        PENDING_SHUTDOWN,
        NOT_RUNNING,
        PENDING_ERROR,
        ERROR
    }
}

Basic Usage:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.mapValues(value -> value.toUpperCase())
      .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

CloseOptions

Options for closing Kafka Streams.

package org.apache.kafka.streams;

public class CloseOptions {
    public CloseOptions leaveGroup(boolean leaveGroup);
    public CloseOptions timeout(Duration timeout);
}

Usage:

CloseOptions options = new CloseOptions()
    .leaveGroup(true)
    .timeout(Duration.ofSeconds(30));
streams.close(options);

Topology Building

StreamsBuilder

High-level DSL topology builder.

package org.apache.kafka.streams;

public class StreamsBuilder {
    // Constructor
    public StreamsBuilder();
    public StreamsBuilder(TopologyConfig topologyConfigs);

    // Stream creation
    public <K, V> KStream<K, V> stream(String topic);
    public <K, V> KStream<K, V> stream(String topic, Consumed<K, V> consumed);
    public <K, V> KStream<K, V> stream(Collection<String> topics);
    public <K, V> KStream<K, V> stream(Collection<String> topics,
                                       Consumed<K, V> consumed);
    public <K, V> KStream<K, V> stream(Pattern topicPattern);
    public <K, V> KStream<K, V> stream(Pattern topicPattern, Consumed<K, V> consumed);

    // Table creation
    public <K, V> KTable<K, V> table(String topic);
    public <K, V> KTable<K, V> table(String topic, Consumed<K, V> consumed);
    public <K, V> KTable<K, V> table(String topic,
                                     Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
    public <K, V> KTable<K, V> table(String topic,
                                     Consumed<K, V> consumed,
                                     Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    // Global table creation
    public <K, V> GlobalKTable<K, V> globalTable(String topic);
    public <K, V> GlobalKTable<K, V> globalTable(String topic, Consumed<K, V> consumed);
    public <K, V> GlobalKTable<K, V> globalTable(String topic,
                                                 Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
    public <K, V> GlobalKTable<K, V> globalTable(String topic,
                                                 Consumed<K, V> consumed,
                                                 Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

    // State stores
    public StreamsBuilder addStateStore(StoreBuilder<?> builder);
    public StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder,
                                        String topic,
                                        Consumed<KIn, VIn> consumed,
                                        ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier);

    // Build topology
    public Topology build();
    public Topology build(Properties props);
}

Usage Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;

StreamsBuilder builder = new StreamsBuilder();

// Create stream with serdes
KStream<String, String> source = builder.stream("input-topic",
    Consumed.with(Serdes.String(), Serdes.String()));

// Create table
KTable<String, Long> counts = builder.table("counts-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long()));

// Create global table
GlobalKTable<String, String> globalData = builder.globalTable("reference-data");

// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long());
builder.addStateStore(storeBuilder);

// Build topology
Topology topology = builder.build();

Topology

Low-level topology builder (Processor API).

package org.apache.kafka.streams;

public class Topology {
    // Constructor
    public Topology();
    public Topology(TopologyConfig topologyConfigs);

    // Source processors
    public Topology addSource(String name, String... topics);
    public Topology addSource(AutoOffsetReset offsetReset,
                             String name,
                             String... topics);
    public Topology addSource(String name, Pattern topicPattern);
    public Topology addSource(TimestampExtractor timestampExtractor,
                             String name,
                             String... topics);
    public <K, V> Topology addSource(String name,
                                     Deserializer<K> keyDeserializer,
                                     Deserializer<V> valueDeserializer,
                                     String... topics);

    // Sink processors
    public Topology addSink(String name,
                           String topic,
                           String... parentNames);
    public <K, V> Topology addSink(String name,
                                   String topic,
                                   StreamPartitioner<? super K, ? super V> partitioner,
                                   String... parentNames);
    public <K, V> Topology addSink(String name,
                                   TopicNameExtractor<? super K, ? super V> topicExtractor,
                                   String... parentNames);
    public <K, V> Topology addSink(String name,
                                   String topic,
                                   Serializer<K> keySerializer,
                                   Serializer<V> valueSerializer,
                                   String... parentNames);

    // Custom processors
    public <KIn, VIn, KOut, VOut> Topology addProcessor(
        String name,
        ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
        String... parentNames);

    // State stores
    public <S extends StateStore> Topology addStateStore(
        StoreBuilder<S> storeBuilder,
        String... processorNames);

    public Topology addReadOnlyStateStore(
        StoreBuilder<?> storeBuilder,
        String... processorNames);

    public <KIn, VIn> Topology addGlobalStore(
        StoreBuilder<?> storeBuilder,
        String sourceName,
        Deserializer<KIn> keyDeserializer,
        Deserializer<VIn> valueDeserializer,
        String topic,
        String processorName,
        ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier);

    public Topology connectProcessorAndStateStores(String processorName,
                                                   String... stateStoreNames);

    // Description
    public TopologyDescription describe();
}

Usage Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.common.serialization.*;

Topology topology = new Topology();

// Add source
topology.addSource("Source",
    new StringDeserializer(),
    new StringDeserializer(),
    "input-topic");

// Add processor
topology.addProcessor("Process",
    () -> new MyProcessor(),
    "Source");

// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long());
topology.addStateStore(storeBuilder, "Process");

// Add sink
topology.addSink("Sink",
    "output-topic",
    new StringSerializer(),
    new StringSerializer(),
    "Process");

// Describe topology
TopologyDescription description = topology.describe();
System.out.println(description);

TopologyConfig

Configuration for topology.

package org.apache.kafka.streams;

public class TopologyConfig {
    public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG =
        "buffered.records.per.partition";
    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
}

Configuration

StreamsConfig

Configuration for Kafka Streams applications.

package org.apache.kafka.streams;

public class StreamsConfig extends AbstractConfig {
    // Required configurations
    public static final String APPLICATION_ID_CONFIG = "application.id";
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";

    // Common configurations
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
    public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
    public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG =
        "statestore.cache.max.bytes";

    // Serdes
    public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG =
        "default.timestamp.extractor";

    // State and topology
    public static final String STATE_DIR_CONFIG = "state.dir";
    public static final String APPLICATION_SERVER_CONFIG = "application.server";
    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";

    // Client prefixes
    public static final String CONSUMER_PREFIX = "consumer.";
    public static final String PRODUCER_PREFIX = "producer.";
    public static final String ADMIN_CLIENT_PREFIX = "admin.";
    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";

    // Helper methods
    public static String consumerPrefix(String consumerProp);
    public static String producerPrefix(String producerProp);
    public static String adminClientPrefix(String adminClientProp);
    public static String topicPrefix(String topicProp);

    // Processing guarantee constants
    public static final String AT_LEAST_ONCE = "at_least_once";
    public static final String EXACTLY_ONCE_V2 = "exactly_once_v2";

    // Topology optimization constants
    public static final String NO_OPTIMIZATION = "none";
    public static final String OPTIMIZE = "all";
}

Configuration Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;

Properties props = new Properties();

// Required
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Processing guarantee
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// State directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Thread count
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

// Commit interval
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); // 10 seconds

// Cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024); // 10MB

// Standby replicas
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

// Topology optimization
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

// Consumer config
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 30000);

// Producer config
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 100);

KafkaStreams streams = new KafkaStreams(topology, props);

Configuration Defaults:

  • num.stream.threads: 1
  • processing.guarantee: "at_least_once"
  • commit.interval.ms: 30000 (30 seconds)
  • statestore.cache.max.bytes: 10485760 (10MB)
  • state.dir: /tmp/kafka-streams
  • num.standby.replicas: 0
  • max.task.idle.ms: 0
  • topology.optimization: "none"

Lifecycle Management

State Transitions

import org.apache.kafka.streams.*;

KafkaStreams streams = new KafkaStreams(topology, props);

// State: CREATED
System.out.println("Initial state: " + streams.state());

// Start application
streams.start();
// State: REBALANCING -> RUNNING

// Check state
if (streams.state() == KafkaStreams.State.RUNNING) {
    System.out.println("Streams is running");
}

// Pause processing
streams.pause();
System.out.println("Processing paused");

// Resume processing
streams.resume();
System.out.println("Processing resumed");

// Close application
streams.close();
// State: PENDING_SHUTDOWN -> NOT_RUNNING

State Listener

import org.apache.kafka.streams.*;

public interface StateListener {
    void onChange(KafkaStreams.State newState, KafkaStreams.State oldState);
}

// Usage
KafkaStreams streams = new KafkaStreams(topology, props);

streams.setStateListener((newState, oldState) -> {
    System.out.println("State transition: " + oldState + " -> " + newState);

    if (newState == KafkaStreams.State.RUNNING) {
        System.out.println("Application is now running");
    } else if (newState == KafkaStreams.State.ERROR) {
        System.err.println("Application entered error state");
    }
});

streams.start();

Exception Handling

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;

public interface StreamsUncaughtExceptionHandler {
    enum StreamThreadExceptionResponse {
        REPLACE_THREAD,   // Replace failed thread
        SHUTDOWN_CLIENT,  // Shutdown entire client
        SHUTDOWN_APPLICATION  // Shutdown application
    }

    StreamThreadExceptionResponse handle(Throwable exception);
}

// Usage
streams.setUncaughtExceptionHandler(exception -> {
    System.err.println("Uncaught exception: " + exception.getMessage());

    if (exception instanceof ProducerFencedException) {
        return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
    } else if (exception instanceof InvalidStateStoreException) {
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    }

    return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});

State Restore Listener

import org.apache.kafka.streams.processor.*;
import org.apache.kafka.common.TopicPartition;

public interface StateRestoreListener {
    void onRestoreStart(TopicPartition topicPartition,
                       String storeName,
                       long startingOffset,
                       long endingOffset);

    void onBatchRestored(TopicPartition topicPartition,
                        String storeName,
                        long batchEndOffset,
                        long numRestored);

    void onRestoreEnd(TopicPartition topicPartition,
                     String storeName,
                     long totalRestored);
}

// Usage
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
    @Override
    public void onRestoreStart(TopicPartition topicPartition, String storeName,
                              long startingOffset, long endingOffset) {
        System.out.println("Starting restore of " + storeName +
            " for partition " + topicPartition);
    }

    @Override
    public void onBatchRestored(TopicPartition topicPartition, String storeName,
                               long batchEndOffset, long numRestored) {
        System.out.println("Restored batch: " + numRestored + " records");
    }

    @Override
    public void onRestoreEnd(TopicPartition topicPartition, String storeName,
                            long totalRestored) {
        System.out.println("Finished restoring " + storeName +
            ": " + totalRestored + " records");
    }
});

Thread Management

import org.apache.kafka.streams.*;
import java.util.Optional;

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

// Add a stream thread
Optional<String> threadName = streams.addStreamThread();
if (threadName.isPresent()) {
    System.out.println("Added thread: " + threadName.get());
}

// Remove a stream thread
Optional<String> removedThread = streams.removeStreamThread();
if (removedThread.isPresent()) {
    System.out.println("Removed thread: " + removedThread.get());
}

// Remove with timeout
Optional<String> removedWithTimeout = streams.removeStreamThread(Duration.ofSeconds(10));

Metadata Access

Local Thread Metadata

import org.apache.kafka.streams.*;
import java.util.Collection;

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

Collection<ThreadMetadata> threadMetadata = streams.metadataForLocalThreads();
for (ThreadMetadata metadata : threadMetadata) {
    System.out.println("Thread: " + metadata.threadName());
    System.out.println("State: " + metadata.threadState());
    System.out.println("Active tasks: " + metadata.activeTasks());
    System.out.println("Standby tasks: " + metadata.standbyTasks());
}

Store Metadata

import org.apache.kafka.streams.*;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Collection;

// Get metadata for all instances
Collection<StreamsMetadata> allMetadata = streams.metadataForAllStreamsClients();
for (StreamsMetadata metadata : allMetadata) {
    System.out.println("Host: " + metadata.hostInfo());
    System.out.println("State stores: " + metadata.stateStoreNames());
    System.out.println("Topic partitions: " + metadata.topicPartitions());
}

// Get metadata for a specific store
Collection<StreamsMetadata> storeMetadata =
    streams.streamsMetadataForStore("my-store");

// Query metadata for a specific key
Serializer<String> keySerializer = Serdes.String().serializer();
KeyQueryMetadata keyMetadata =
    streams.queryMetadataForKey("my-store", "my-key", keySerializer);

if (keyMetadata != KeyQueryMetadata.NOT_AVAILABLE) {
    System.out.println("Active host: " + keyMetadata.activeHost());
    System.out.println("Standby hosts: " + keyMetadata.standbyHosts());
    System.out.println("Partition: " + keyMetadata.partition());
}

Lag Information

import org.apache.kafka.streams.*;
import java.util.Map;

Map<String, Map<Integer, LagInfo>> allLags = streams.allLocalStorePartitionLags();

for (Map.Entry<String, Map<Integer, LagInfo>> storeEntry : allLags.entrySet()) {
    String storeName = storeEntry.getKey();
    System.out.println("Store: " + storeName);

    for (Map.Entry<Integer, LagInfo> partitionEntry : storeEntry.getValue().entrySet()) {
        int partition = partitionEntry.getKey();
        LagInfo lagInfo = partitionEntry.getValue();

        System.out.println("  Partition " + partition +
            ": current offset = " + lagInfo.currentOffsetPosition() +
            ", end offset = " + lagInfo.endOffsetPosition() +
            ", lag = " + lagInfo.offsetLag());
    }
}

Cleanup

import org.apache.kafka.streams.*;

KafkaStreams streams = new KafkaStreams(topology, props);

// Clean up local state (call before start() to reset state)
streams.cleanUp();

// Start application
streams.start();

// ... application runs ...

// Close and cleanup
streams.close();
streams.cleanUp(); // Remove local state

Warning: cleanUp() deletes the local state directory. Only call it when you want to reset application state.

Best Practices

Graceful Shutdown

import org.apache.kafka.streams.*;

KafkaStreams streams = new KafkaStreams(topology, props);

// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutting down Kafka Streams");
    streams.close(Duration.ofSeconds(30));
}));

streams.start();

// Keep application running
try {
    Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

Error Recovery

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.*;

KafkaStreams streams = new KafkaStreams(topology, props);

streams.setUncaughtExceptionHandler(exception -> {
    System.err.println("Stream exception: " + exception.getMessage());
    exception.printStackTrace();

    // Decide recovery strategy
    if (exception instanceof InvalidStateStoreException) {
        // Temporary issue, replace thread
        return StreamThreadExceptionResponse.REPLACE_THREAD;
    } else if (exception instanceof ProducerFencedException) {
        // Another instance took over, shutdown
        return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
    } else {
        // Unknown error, shutdown application
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
});

streams.setStateListener((newState, oldState) -> {
    if (newState == KafkaStreams.State.ERROR) {
        System.err.println("Application in ERROR state");
        // Implement alerting or recovery logic
    }
});

streams.start();

Monitoring

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;

Map<MetricName, ? extends Metric> metrics = streams.metrics();

for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
    MetricName name = entry.getKey();
    System.out.println(name.group() + "." + name.name() + ": " +
        entry.getValue().metricValue());
}

Key metrics to monitor:

  • stream-metrics:commit-latency-avg
  • stream-metrics:poll-latency-avg
  • stream-metrics:process-latency-avg
  • stream-metrics:failed-stream-threads
  • stream-task-metrics:commit-rate
  • stream-task-metrics:process-rate

Troubleshooting

Common Kafka Streams Issues

Issue: InvalidStateStoreException

Symptoms:

  • InvalidStateStoreException when querying stores
  • Intermittent query failures
  • Store unavailable errors

Causes:

  • State store still initializing
  • Application rebalancing
  • State store migrated to another instance
  • Application not started yet

Solutions:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public <K, V> V queryStoreWithRetry(KafkaStreams streams, 
                                     String storeName, 
                                     K key,
                                     int maxRetries) {
    int retries = 0;
    
    while (retries < maxRetries) {
        try {
            // Check application state first
            if (streams.state() != KafkaStreams.State.RUNNING) {
                System.out.println("Application not running, state: " + streams.state());
                Thread.sleep(1000);
                retries++;
                continue;
            }
            
            ReadOnlyKeyValueStore<K, V> store = streams.store(
                StoreQueryParameters.fromNameAndType(
                    storeName,
                    QueryableStoreTypes.keyValueStore()
                )
            );
            
            return store.get(key);
            
        } catch (InvalidStateStoreException e) {
            retries++;
            System.err.println("Store unavailable (attempt " + retries + "): " + 
                e.getMessage());
            
            if (retries >= maxRetries) {
                throw e;
            }
            
            try {
                Thread.sleep(1000 * retries); // Exponential backoff
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", ie);
            }
        }
    }
    
    throw new RuntimeException("Failed to query store after " + maxRetries + " retries");
}

Prevention:

  • Wait for application to reach RUNNING state before queries
  • Implement retry logic for store access
  • Use state listeners to track availability
  • Consider remote queries to other instances

Issue: OutOfMemoryError

Symptoms:

  • OutOfMemoryError in Kafka Streams
  • Application crashes
  • Excessive GC activity

Causes:

  • State stores too large for available memory
  • Cache size too large
  • Too many stream threads
  • Memory leaks in processing logic

Solutions:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Reduce cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 
    5 * 1024 * 1024); // 5MB (down from 10MB default)

// Reduce stream threads
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

// Use RocksDB for large states (offloads to disk)
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("large-store"), // RocksDB-backed
        Serdes.String(),
        Serdes.Long()
    ).withCachingDisabled(); // Disable caching for large stores

// Or increase JVM heap
// -Xms1g -Xmx2g

// Monitor memory usage
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double memoryUsage = (double) usedMemory / maxMemory * 100;

if (memoryUsage > 80) {
    System.err.println("WARNING: Memory usage at " + memoryUsage + "%");
}

Prevention:

  • Size state stores appropriately
  • Monitor memory metrics
  • Use persistent stores for large state
  • Disable caching for large stores
  • Scale horizontally with more instances

Issue: Processing Lag

Symptoms:

  • Increasing lag in stream processing
  • Records piling up in input topics
  • Slow throughput

Causes:

  • Insufficient processing capacity
  • Slow transformations
  • Too few stream threads
  • State store performance issues

Solutions:

// Add more stream threads
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// Or dynamically adjust
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

// Monitor lag
Map<String, Map<Integer, LagInfo>> allLags = streams.allLocalStorePartitionLags();
long totalLag = 0;

for (Map<Integer, LagInfo> storeLags : allLags.values()) {
    for (LagInfo lagInfo : storeLags.values()) {
        totalLag += lagInfo.offsetLag();
    }
}

if (totalLag > 100000) {
    System.out.println("High lag detected, adding stream thread");
    Optional<String> threadName = streams.addStreamThread();
    if (threadName.isPresent()) {
        System.out.println("Added thread: " + threadName.get());
    }
}

// Or optimize topology
// - Remove unnecessary operations
// - Use mapValues instead of map (doesn't repartition)
// - Reduce windowing for aggregations
// - Disable caching if not needed

Prevention:

  • Size stream threads appropriately
  • Monitor lag continuously
  • Optimize topology operations
  • Use async processing where possible

Edge Cases

State Store Not Ready During Startup

KafkaStreams streams = new KafkaStreams(topology, props);

// Set state listener
streams.setStateListener((newState, oldState) -> {
    System.out.println("State change: " + oldState + " -> " + newState);
    
    if (newState == KafkaStreams.State.REBALANCING) {
        System.out.println("Application rebalancing - stores may be unavailable");
    } else if (newState == KafkaStreams.State.RUNNING) {
        System.out.println("Application running - stores are available");
    }
});

streams.start();

// Wait for RUNNING state before querying
long startTime = System.currentTimeMillis();
while (streams.state() != KafkaStreams.State.RUNNING) {
    if (System.currentTimeMillis() - startTime > 60000) {
        throw new TimeoutException("Application didn't start within 60 seconds");
    }
    Thread.sleep(100);
}

// Now safe to query stores
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "my-store",
        QueryableStoreTypes.keyValueStore()
    )
);

Task Migration During Processing

// Tasks can migrate between instances during rebalancing
// State stores become unavailable when task migrates

streams.setUncaughtExceptionHandler(exception -> {
    if (exception instanceof org.apache.kafka.streams.errors.TaskMigratedException) {
        System.out.println("Task migrated to another instance");
        // Return REPLACE_THREAD to continue processing with new assignment
        return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
    }
    
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});

// Handle in processor
public class MyProcessor implements Processor<String, String, String, String> {
    private ProcessorContext<String, String> context;
    private KeyValueStore<String, Long> store;
    
    @Override
    public void init(ProcessorContext<String, String> context) {
        this.context = context;
        try {
            this.store = context.getStateStore("my-store");
        } catch (InvalidStateStoreException e) {
            // Store not available during init - will be retried
            System.err.println("Store not available during init: " + e.getMessage());
        }
    }
    
    @Override
    public void process(Record<String, String> record) {
        if (store == null) {
            try {
                store = context.getStateStore("my-store");
            } catch (InvalidStateStoreException e) {
                // Store still not available - skip processing
                System.err.println("Store unavailable, skipping record");
                return;
            }
        }
        
        // Process with store
        Long count = store.get(record.key());
        store.put(record.key(), count == null ? 1L : count + 1);
    }
}

Exactly-Once with Multiple Input Topics

// Exactly-once processing requires careful handling of multiple inputs
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

StreamsBuilder builder = new StreamsBuilder();

// Multiple input topics with different processing speeds
KStream<String, String> fastStream = builder.stream("fast-topic");
KStream<String, String> slowStream = builder.stream("slow-topic");

// Join may cause alignment issues
KStream<String, String> joined = fastStream.join(
    slowStream,
    (fastValue, slowValue) -> fastValue + ":" + slowValue,
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

// Configure max.task.idle.ms to prevent slow topic from blocking fast topic
props.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 60000); // 1 minute

// This allows fast topic to proceed even if slow topic is idle
KafkaStreams streams = new KafkaStreams(builder.build(), props);

Performance Optimization

Topology Optimization

Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);

// Automatic optimizations:
// - Repartition topic merging
// - Source topic merging
// - Key-preserving operation fusion

// Manual optimization: Use mapValues instead of map
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

// INEFFICIENT: map() changes key, causes repartition
KStream<String, String> inefficient = source
    .map((key, value) -> KeyValue.pair(key, value.toUpperCase()));

// EFFICIENT: mapValues() preserves key, no repartition
KStream<String, String> efficient = source
    .mapValues(value -> value.toUpperCase());

// Result: No unnecessary repartition, better performance

State Store Performance Tuning

// RocksDB configuration for large state stores
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// RocksDB specific options
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
    "com.example.CustomRocksDBConfigSetter");

// Implement custom RocksDB config
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.Options;
import java.util.Map;

public class CustomRocksDBConfigSetter implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {
        // Increase block cache for better read performance
        org.rocksdb.BlockBasedTableConfig tableConfig = 
            new org.rocksdb.BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(256 * 1024 * 1024L); // 256MB
        tableConfig.setBlockSize(16 * 1024); // 16KB
        options.setTableFormatConfig(tableConfig);
        
        // Increase write buffer for better write performance
        options.setWriteBufferSize(64 * 1024 * 1024); // 64MB
        options.setMaxWriteBufferNumber(3);
        
        // Enable bloom filters for faster lookups
        tableConfig.setFilterPolicy(new org.rocksdb.BloomFilter(10));
        
        // Compression
        options.setCompressionType(org.rocksdb.CompressionType.LZ4_COMPRESSION);
    }
    
    @Override
    public void close(String storeName, Options options) {
        // Cleanup if needed
    }
}

Scaling Strategies

// Scale Kafka Streams applications

// Option 1: Add more stream threads (vertical scaling)
Properties props = new Properties();
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

// Dynamically add threads
Optional<String> newThread = streams.addStreamThread();

// Option 2: Add more instances (horizontal scaling)
// Each instance should have same application.id
// Tasks will be distributed across all instances

// Instance 1:
Properties props1 = new Properties();
props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "shared-app-id");
props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams1 = new KafkaStreams(topology, props1);
streams1.start();

// Instance 2 (same application.id):
Properties props2 = new Properties();
props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "shared-app-id");
props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams2 = new KafkaStreams(topology, props2);
streams2.start();

// Both instances will coordinate and share tasks

Edge Cases

Topology with No State Stores

// Stateless topology - simpler but no local state
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("input-topic");
source
    .filter((key, value) -> value != null)
    .mapValues(String::toUpperCase)
    .to("output-topic");

// No state stores needed
// No state directory management
// Faster restarts
// But cannot do aggregations or joins that require state

Multiple Topologies in Same Application

// Can only have ONE topology per KafkaStreams instance
// For multiple topologies, create multiple instances

// Topology 1: Process stream A
StreamsBuilder builder1 = new StreamsBuilder();
builder1.stream("stream-a").to("output-a");
Topology topology1 = builder1.build();

// Topology 2: Process stream B
StreamsBuilder builder2 = new StreamsBuilder();
builder2.stream("stream-b").to("output-b");
Topology topology2 = builder2.build();

// Need separate KafkaStreams instances
Properties props1 = new Properties();
props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-a");
props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Properties props2 = new Properties();
props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-b");
props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaStreams streams1 = new KafkaStreams(topology1, props1);
KafkaStreams streams2 = new KafkaStreams(topology2, props2);

streams1.start();
streams2.start();

// Must use different application.id for different topologies

State Store Restoration

// State stores restored from changelog topics on startup
// Can take significant time for large stores

KafkaStreams streams = new KafkaStreams(topology, props);

// Monitor restoration progress
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
    private long totalToRestore = 0;
    private long totalRestored = 0;
    
    @Override
    public void onRestoreStart(TopicPartition topicPartition, 
                              String storeName,
                              long startingOffset, 
                              long endingOffset) {
        long recordsToRestore = endingOffset - startingOffset;
        totalToRestore += recordsToRestore;
        
        System.out.println("Starting restore of " + storeName + 
            " partition " + topicPartition.partition() +
            ": " + recordsToRestore + " records");
    }
    
    @Override
    public void onBatchRestored(TopicPartition topicPartition, 
                               String storeName,
                               long batchEndOffset, 
                               long numRestored) {
        totalRestored += numRestored;
        
        if (totalToRestore > 0) {
            double progress = (double) totalRestored / totalToRestore * 100;
            System.out.println("Restoration progress: " + 
                String.format("%.2f", progress) + "%");
        }
    }
    
    @Override
    public void onRestoreEnd(TopicPartition topicPartition, 
                            String storeName,
                            long totalRestored) {
        System.out.println("Completed restore of " + storeName + 
            ": " + totalRestored + " records");
    }
});

streams.start();

// For faster startup, use standby replicas
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
// Standby replicas stay up-to-date, reducing restoration time on failover

Configuration Best Practices

Choosing Processing Guarantee

// AT_LEAST_ONCE (default)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
// - Best performance
// - May process records multiple times on failure
// - Use when: Idempotent processing or duplicates acceptable

// EXACTLY_ONCE_V2 (recommended for exactly-once)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// - Guarantees each record processed exactly once
// - Slightly lower performance (transactional overhead)
// - Requires broker version 2.5+
// - Use when: Cannot tolerate duplicates

// Performance comparison:
// - AT_LEAST_ONCE: ~10-15% higher throughput
// - EXACTLY_ONCE_V2: Stronger guarantees, slight overhead

// EXACTLY_ONCE (deprecated, use EXACTLY_ONCE_V2)
// props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

Commit Interval Tuning

// commit.interval.ms controls checkpoint frequency
// Tradeoff between:
// - Fault tolerance (lower = less reprocessing on failure)
// - Performance (higher = better throughput)

// Low latency / fault tolerance
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // 1 second
// - Fast recovery on failure
// - More overhead
// - Use for: Critical data, low latency requirements

// High throughput
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000); // 30 seconds (default)
// - Less overhead
// - More reprocessing on failure
// - Use for: Batch processing, high throughput needs

// Calculate appropriate value:
int avgProcessingTimeMs = 100; // per record
int recordsPerSecond = 1000;
int acceptableReprocessingRecords = 10000;

int commitIntervalMs = (acceptableReprocessingRecords / recordsPerSecond) * 1000;
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);