or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

state-stores.mddocs/streams/

State Stores

Kafka Streams provides local state stores for stateful stream processing operations.

Store Types

KeyValueStore<K, V>

Key-value store for simple lookups.

package org.apache.kafka.streams.state;

public interface KeyValueStore<K, V> extends StateStore {
    // Basic operations
    V get(K key);
    void put(K key, V value);
    V putIfAbsent(K key, V value);
    void putAll(List<KeyValue<K, V>> entries);
    V delete(K key);

    // Range queries
    KeyValueIterator<K, V> range(K from, K to);
    KeyValueIterator<K, V> reverseRange(K from, K to);
    KeyValueIterator<K, V> all();
    KeyValueIterator<K, V> reverseAll();

    // Prefix scan
    <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer);

    // Metadata
    long approximateNumEntries();
}

WindowStore<K, V>

Time-windowed store.

package org.apache.kafka.streams.state;

public interface WindowStore<K, V> extends StateStore {
    void put(K key, V value);
    void put(K key, V value, long windowStartTimestamp);

    V fetch(K key, long time);
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    WindowStoreIterator<V> backwardFetch(K key, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> all();
    KeyValueIterator<Windowed<K>, V> backwardAll();

    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> backwardFetchAll(long timeFrom, long timeTo);
}

SessionStore<K, V>

Session-windowed store.

package org.apache.kafka.streams.state;

public interface SessionStore<K, V> extends StateStore {
    void put(Windowed<K> sessionKey, V aggregate);
    void remove(Windowed<K> sessionKey);

    KeyValueIterator<Windowed<K>, V> fetch(K key);
    KeyValueIterator<Windowed<K>, V> backwardFetch(K key);

    KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo);
    KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo);

    KeyValueIterator<Windowed<K>, V> findSessions(K key, long earliestSessionEndTime,
                                                   long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, V> backwardFindSessions(K key, long earliestSessionEndTime,
                                                          long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, V> findSessions(K keyFrom, K keyTo,
                                                   long earliestSessionEndTime,
                                                   long latestSessionStartTime);
}

TimestampedKeyValueStore<K, V>

Key-value store with timestamps.

package org.apache.kafka.streams.state;

public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    // Inherits all KeyValueStore methods but returns ValueAndTimestamp<V>
}

VersionedKeyValueStore<K, V>

Versioned key-value store.

package org.apache.kafka.streams.state;

public interface VersionedKeyValueStore<K, V> extends StateStore {
    V get(K key);
    VersionedRecord<V> get(K key, long asOfTimestamp);

    void put(K key, V value);
    void put(K key, V value, long timestamp);

    VersionedRecord<V> delete(K key);
    VersionedRecord<V> delete(K key, long timestamp);
}

Store Builders

Stores Factory

Factory for creating store builders and suppliers.

package org.apache.kafka.streams.state;

public class Stores {
    // KeyValue store builders
    public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
        KeyValueBytesStoreSupplier supplier,
        Serde<K> keySerde,
        Serde<V> valueSerde);

    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(
        KeyValueBytesStoreSupplier supplier,
        Serde<K> keySerde,
        Serde<V> valueSerde);

    // Window store builders
    public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
        WindowBytesStoreSupplier supplier,
        Serde<K> keySerde,
        Serde<V> valueSerde);

    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(
        WindowBytesStoreSupplier supplier,
        Serde<K> keySerde,
        Serde<V> valueSerde);

    // Session store builders
    public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(
        SessionBytesStoreSupplier supplier,
        Serde<K> keySerde,
        Serde<V> valueSerde);

    // KeyValue store suppliers
    public static KeyValueBytesStoreSupplier persistentKeyValueStore(String name);
    public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(String name);
    public static KeyValueBytesStoreSupplier lruMap(String name, int maxCacheSize);

    // Window store suppliers
    public static WindowBytesStoreSupplier persistentWindowStore(String name,
                                                                  Duration retentionPeriod,
                                                                  Duration windowSize,
                                                                  boolean retainDuplicates);
    public static WindowBytesStoreSupplier inMemoryWindowStore(String name,
                                                                Duration retentionPeriod,
                                                                Duration windowSize,
                                                                boolean retainDuplicates);

    // Session store suppliers
    public static SessionBytesStoreSupplier persistentSessionStore(String name,
                                                                    Duration retentionPeriod);
    public static SessionBytesStoreSupplier inMemorySessionStore(String name,
                                                                  Duration retentionPeriod);

    // Versioned store supplier
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name,
                                                                                Duration historyRetention);
}

Usage Examples:

import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;

// Persistent key-value store
StoreBuilder<KeyValueStore<String, Long>> kvStoreBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-kv-store"),
        Serdes.String(),
        Serdes.Long()
    ).withCachingEnabled()
     .withLoggingEnabled(Collections.singletonMap("retention.ms", "86400000"));

// In-memory key-value store
StoreBuilder<KeyValueStore<String, String>> inMemoryStore =
    Stores.keyValueStoreBuilder(
        Stores.inMemoryKeyValueStore("temp-store"),
        Serdes.String(),
        Serdes.String()
    ).withLoggingDisabled();

// Window store
StoreBuilder<WindowStore<String, Long>> windowStore =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore("my-window-store",
            Duration.ofDays(1), // retention
            Duration.ofMinutes(5), // window size
            false), // retain duplicates
        Serdes.String(),
        Serdes.Long()
    );

// Session store
StoreBuilder<SessionStore<String, Long>> sessionStore =
    Stores.sessionStoreBuilder(
        Stores.persistentSessionStore("my-session-store",
            Duration.ofHours(1)), // retention
        Serdes.String(),
        Serdes.Long()
    );

// Versioned store
StoreBuilder<VersionedKeyValueStore<String, String>> versionedStore =
    Stores.versionedKeyValueStoreBuilder(
        Stores.persistentVersionedKeyValueStore("my-versioned-store",
            Duration.ofDays(7)), // history retention
        Serdes.String(),
        Serdes.String()
    );

StoreBuilder<T>

Builder for state stores with configuration options.

package org.apache.kafka.streams.state;

public interface StoreBuilder<T extends StateStore> {
    StoreBuilder<T> withCachingEnabled();
    StoreBuilder<T> withCachingDisabled();

    StoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StoreBuilder<T> withLoggingDisabled();

    T build();

    Map<String, String> logConfig();
    boolean loggingEnabled();
    String name();
}

Interactive Queries API

Kafka Streams provides two approaches for querying state stores: the traditional Read-Only Stores API and the newer Interactive Queries API (introduced in KIP-796). The Interactive Queries API provides more flexibility and control over query execution, including position tracking, partition-specific queries, and detailed execution information.

StateQueryRequest

Building queries for interactive state store access.

package org.apache.kafka.streams.query;

public class StateQueryRequest<R> {
    // Create a request for a specific store
    public static InStore inStore(String name);

    // Specify the query to execute
    public <R> StateQueryRequest<R> withQuery(Query<R> query);

    // Partition selection
    public StateQueryRequest<R> withAllPartitions();
    public StateQueryRequest<R> withPartitions(Set<Integer> partitions);

    // Position-based consistency
    public StateQueryRequest<R> withPositionBound(PositionBound positionBound);

    // Active/standby configuration
    public StateQueryRequest<R> requireActive();

    // Execution details
    public StateQueryRequest<R> enableExecutionInfo();

    // Accessors
    public String getStoreName();
    public Query<R> getQuery();
    public PositionBound getPositionBound();
    public boolean isAllPartitions();
    public Set<Integer> getPartitions();
    public boolean executionInfoEnabled();
    public boolean isRequireActive();
}

StateQueryResult

Container for query results across partitions.

package org.apache.kafka.streams.query;

public class StateQueryResult<R> {
    // Access partition-specific results
    public Map<Integer, QueryResult<R>> getPartitionResults();

    // For single-partition queries
    public QueryResult<R> getOnlyPartitionResult();

    // For global store queries
    public QueryResult<R> getGlobalResult();

    // Position tracking
    public Position getPosition();

    // Internal methods for building results
    public void addResult(int partition, QueryResult<R> result);
    public void setGlobalResult(QueryResult<R> result);
}

QueryResult

Individual partition query result with success/failure information.

package org.apache.kafka.streams.query;

public interface QueryResult<R> {
    // Factory methods
    static <R> QueryResult<R> forResult(R result);
    static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage);
    static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);
    static <R> QueryResult<R> notUpToBound(Position currentPosition,
                                           PositionBound positionBound,
                                           Integer partition);

    // Result status
    boolean isSuccess();
    boolean isFailure();

    // Success path
    R getResult();

    // Failure path
    FailureReason getFailureReason();
    String getFailureMessage();

    // Position tracking
    Position getPosition();
    void setPosition(Position position);

    // Execution details
    List<String> getExecutionInfo();
    void addExecutionInfo(String message);
}

FailureReason

Enumeration of query failure reasons.

package org.apache.kafka.streams.query;

public enum FailureReason {
    UNKNOWN_QUERY_TYPE,    // Store doesn't support this query type
    NOT_ACTIVE,            // Partition is not active on this instance
    NOT_UP_TO_BOUND,       // Store hasn't caught up to position bound
    NOT_PRESENT,           // Partition not available on this instance
    DOES_NOT_EXIST,        // Requested partition doesn't exist
    STORE_EXCEPTION        // Exception during query execution
}

Query Types

KeyQuery

Point query for retrieving a single record by key.

package org.apache.kafka.streams.query;

public final class KeyQuery<K, V> implements Query<V> {
    // Create a query for a specific key
    public static <K, V> KeyQuery<K, V> withKey(K key);

    // Skip cache and query underlying store directly
    public KeyQuery<K, V> skipCache();

    // Accessors
    public K getKey();
    public boolean isSkipCache();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;

// Create and execute a point query
StateQueryRequest<String> request = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("user-123"));

StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    String value = queryResult.getResult();
    System.out.println("Value: " + value);
} else if (queryResult != null) {
    System.err.println("Query failed: " + queryResult.getFailureMessage());
}

RangeQuery

Range query for retrieving multiple records within key bounds.

package org.apache.kafka.streams.query;

public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
    // Range with both bounds
    public static <K, V> RangeQuery<K, V> withRange(K lower, K upper);

    // Range with single bound
    public static <K, V> RangeQuery<K, V> withLowerBound(K lower);
    public static <K, V> RangeQuery<K, V> withUpperBound(K upper);

    // Full scan
    public static <K, V> RangeQuery<K, V> withNoBounds();

    // Result ordering
    public RangeQuery<K, V> withAscendingKeys();
    public RangeQuery<K, V> withDescendingKeys();

    // Accessors
    public Optional<K> getLowerBound();
    public Optional<K> getUpperBound();
    public ResultOrder resultOrder();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;

// Range query with ascending order
StateQueryRequest<KeyValueIterator<String, Long>> request = StateQueryRequest
    .inStore("counts-store")
    .withQuery(RangeQuery.<String, Long>withRange("a", "m").withAscendingKeys());

StateQueryResult<KeyValueIterator<String, Long>> result = streams.query(request);
QueryResult<KeyValueIterator<String, Long>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    try (KeyValueIterator<String, Long> iterator = queryResult.getResult()) {
        while (iterator.hasNext()) {
            KeyValue<String, Long> entry = iterator.next();
            System.out.println(entry.key + ": " + entry.value);
        }
    }
}

WindowKeyQuery

Query windowed stores by key within a time range.

package org.apache.kafka.streams.query;

public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
    // Query windows for a specific key and time range
    public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowStartRange(
        K key,
        Instant timeFrom,
        Instant timeTo
    );

    // Accessors
    public K getKey();
    public Optional<Instant> getTimeFrom();
    public Optional<Instant> getTimeTo();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;

// Query windows for a specific key
Instant now = Instant.now();
Instant hourAgo = now.minus(Duration.ofHours(1));

StateQueryRequest<WindowStoreIterator<Long>> request = StateQueryRequest
    .inStore("window-store")
    .withQuery(WindowKeyQuery.<String, Long>withKeyAndWindowStartRange(
        "user-123",
        hourAgo,
        now
    ));

StateQueryResult<WindowStoreIterator<Long>> result = streams.query(request);
QueryResult<WindowStoreIterator<Long>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    try (WindowStoreIterator<Long> iterator = queryResult.getResult()) {
        while (iterator.hasNext()) {
            KeyValue<Long, Long> entry = iterator.next();
            System.out.println("Timestamp: " + entry.key + ", Value: " + entry.value);
        }
    }
}

WindowRangeQuery

Range query on windowed stores.

package org.apache.kafka.streams.query;

public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
    // Query all windows for a specific key
    public static <K, V> WindowRangeQuery<K, V> withKey(K key);

    // Query all keys within a time range
    public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(
        Instant timeFrom,
        Instant timeTo
    );

    // Accessors
    public Optional<K> getKey();
    public Optional<Instant> getTimeFrom();
    public Optional<Instant> getTimeTo();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;

// Query all windows for a key
StateQueryRequest<KeyValueIterator<Windowed<String>, Long>> request = StateQueryRequest
    .inStore("window-store")
    .withQuery(WindowRangeQuery.<String, Long>withKey("user-123"));

StateQueryResult<KeyValueIterator<Windowed<String>, Long>> result = streams.query(request);
QueryResult<KeyValueIterator<Windowed<String>, Long>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    try (KeyValueIterator<Windowed<String>, Long> iterator = queryResult.getResult()) {
        while (iterator.hasNext()) {
            KeyValue<Windowed<String>, Long> entry = iterator.next();
            Window window = entry.key.window();
            System.out.println("Window [" + window.start() + " - " + window.end() +
                             "]: " + entry.value);
        }
    }
}

TimestampedKeyQuery

Point query for timestamped stores.

package org.apache.kafka.streams.query;

public final class TimestampedKeyQuery<K, V> implements Query<ValueAndTimestamp<V>> {
    // Create a query for a specific key
    public static <K, V> TimestampedKeyQuery<K, V> withKey(K key);

    // Skip cache
    public TimestampedKeyQuery<K, V> skipCache();

    // Accessors
    public K key();
    public boolean isSkipCache();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;

// Query timestamped store
StateQueryRequest<ValueAndTimestamp<String>> request = StateQueryRequest
    .inStore("timestamped-store")
    .withQuery(TimestampedKeyQuery.<String, String>withKey("key-1"));

StateQueryResult<ValueAndTimestamp<String>> result = streams.query(request);
QueryResult<ValueAndTimestamp<String>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    ValueAndTimestamp<String> valueAndTimestamp = queryResult.getResult();
    if (valueAndTimestamp != null) {
        System.out.println("Value: " + valueAndTimestamp.value());
        System.out.println("Timestamp: " + valueAndTimestamp.timestamp());
    }
}

TimestampedRangeQuery

Range query for timestamped stores.

package org.apache.kafka.streams.query;

public final class TimestampedRangeQuery<K, V>
    implements Query<KeyValueIterator<K, ValueAndTimestamp<V>>> {

    // Range with both bounds
    public static <K, V> TimestampedRangeQuery<K, V> withRange(K lower, K upper);

    // Range with single bound
    public static <K, V> TimestampedRangeQuery<K, V> withLowerBound(K lower);
    public static <K, V> TimestampedRangeQuery<K, V> withUpperBound(K upper);

    // Full scan
    public static <K, V> TimestampedRangeQuery<K, V> withNoBounds();

    // Result ordering
    public TimestampedRangeQuery<K, V> withAscendingKeys();
    public TimestampedRangeQuery<K, V> withDescendingKeys();

    // Accessors
    public Optional<K> lowerBound();
    public Optional<K> upperBound();
    public ResultOrder resultOrder();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;

// Range query on timestamped store
StateQueryRequest<KeyValueIterator<String, ValueAndTimestamp<Long>>> request =
    StateQueryRequest
        .inStore("timestamped-store")
        .withQuery(TimestampedRangeQuery.<String, Long>withRange("a", "z"));

StateQueryResult<KeyValueIterator<String, ValueAndTimestamp<Long>>> result =
    streams.query(request);
QueryResult<KeyValueIterator<String, ValueAndTimestamp<Long>>> queryResult =
    result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    try (KeyValueIterator<String, ValueAndTimestamp<Long>> iterator = queryResult.getResult()) {
        while (iterator.hasNext()) {
            KeyValue<String, ValueAndTimestamp<Long>> entry = iterator.next();
            System.out.println(entry.key + ": value=" + entry.value.value() +
                             ", timestamp=" + entry.value.timestamp());
        }
    }
}

VersionedKeyQuery

Point query for retrieving a single versioned record by key and optional timestamp.

package org.apache.kafka.streams.query;

public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> {
    // Create a query for a specific key
    public static <K, V> VersionedKeyQuery<K, V> withKey(K key);

    // Specify timestamp for historical query
    public VersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

    // Accessors
    public K key();
    public Optional<Instant> asOfTimestamp();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;

// Query latest version of a key
StateQueryRequest<VersionedRecord<String>> request = StateQueryRequest
    .inStore("versioned-store")
    .withQuery(VersionedKeyQuery.<String, String>withKey("user-123"));

StateQueryResult<VersionedRecord<String>> result = streams.query(request);
QueryResult<VersionedRecord<String>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    VersionedRecord<String> record = queryResult.getResult();
    if (record != null) {
        System.out.println("Value: " + record.value());
        System.out.println("Timestamp: " + record.timestamp());
        System.out.println("Valid to: " + record.validTo());
    }
}

// Query historical version as of a specific timestamp
Instant yesterday = Instant.now().minus(Duration.ofDays(1));
StateQueryRequest<VersionedRecord<String>> historicalRequest = StateQueryRequest
    .inStore("versioned-store")
    .withQuery(VersionedKeyQuery.<String, String>withKey("user-123").asOf(yesterday));

StateQueryResult<VersionedRecord<String>> historicalResult = streams.query(historicalRequest);
QueryResult<VersionedRecord<String>> historicalQueryResult = historicalResult.getOnlyPartitionResult();

if (historicalQueryResult != null && historicalQueryResult.isSuccess()) {
    VersionedRecord<String> historicalRecord = historicalQueryResult.getResult();
    if (historicalRecord != null) {
        System.out.println("Historical value: " + historicalRecord.value());
        System.out.println("Was valid at: " + historicalRecord.timestamp());
    }
}

MultiVersionedKeyQuery

Query for retrieving multiple versions of a key within a time range.

package org.apache.kafka.streams.query;

public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> {
    // Create a query for a specific key
    public static <K, V> MultiVersionedKeyQuery<K, V> withKey(K key);

    // Specify time range
    public MultiVersionedKeyQuery<K, V> fromTime(Instant fromTime);
    public MultiVersionedKeyQuery<K, V> toTime(Instant toTime);

    // Result ordering by timestamp
    public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();
    public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

    // Accessors
    public K key();
    public Optional<Instant> fromTime();
    public Optional<Instant> toTime();
    public ResultOrder resultOrder();
}

Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;

// Query all versions of a key within time range
Instant now = Instant.now();
Instant weekAgo = now.minus(Duration.ofDays(7));

StateQueryRequest<VersionedRecordIterator<String>> request = StateQueryRequest
    .inStore("versioned-store")
    .withQuery(MultiVersionedKeyQuery.<String, String>withKey("user-123")
        .fromTime(weekAgo)
        .toTime(now)
        .withDescendingTimestamps());

StateQueryResult<VersionedRecordIterator<String>> result = streams.query(request);
QueryResult<VersionedRecordIterator<String>> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    try (VersionedRecordIterator<String> iterator = queryResult.getResult()) {
        while (iterator.hasNext()) {
            VersionedRecord<String> record = iterator.next();
            System.out.println("Version at " + record.timestamp() + ": " + record.value());
            System.out.println("  Valid until: " + record.validTo());
        }
    }
}

// Query all versions without time bounds
StateQueryRequest<VersionedRecordIterator<String>> allVersionsRequest = StateQueryRequest
    .inStore("versioned-store")
    .withQuery(MultiVersionedKeyQuery.<String, String>withKey("user-123")
        .withAscendingTimestamps());

StateQueryResult<VersionedRecordIterator<String>> allVersionsResult =
    streams.query(allVersionsRequest);
QueryResult<VersionedRecordIterator<String>> allVersionsQueryResult =
    allVersionsResult.getOnlyPartitionResult();

if (allVersionsQueryResult != null && allVersionsQueryResult.isSuccess()) {
    try (VersionedRecordIterator<String> iterator = allVersionsQueryResult.getResult()) {
        System.out.println("All historical versions:");
        while (iterator.hasNext()) {
            VersionedRecord<String> record = iterator.next();
            System.out.println("  " + record.timestamp() + ": " + record.value());
        }
    }
}

Position Tracking

Position

Tracks the processing position of state stores relative to input topics.

package org.apache.kafka.streams.query;

public class Position {
    // Create positions
    public static Position emptyPosition();
    public static Position fromMap(Map<String, ? extends Map<Integer, Long>> map);

    // Modify position
    public Position withComponent(String topic, int partition, long offset);
    public Position merge(Position other);

    // Query position
    public Set<String> getTopics();
    public Map<Integer, Long> getPartitionPositions(String topic);
    public boolean isEmpty();

    // Copy
    public Position copy();
}

PositionBound

Bounds query execution based on store position.

package org.apache.kafka.streams.query;

public class PositionBound {
    // Create bounds
    public static PositionBound unbounded();
    public static PositionBound at(Position position);

    // Query bound
    public boolean isUnbounded();
    public Position position();
}

Position-Aware Query Example:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;

// First query - get current position
StateQueryRequest<String> request1 = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("key-1"));

StateQueryResult<String> result1 = streams.query(request1);
Position position1 = result1.getPosition();

// Later query - ensure we read at least as recent as first query
StateQueryRequest<String> request2 = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("key-2"))
    .withPositionBound(PositionBound.at(position1));

StateQueryResult<String> result2 = streams.query(request2);
QueryResult<String> queryResult2 = result2.getOnlyPartitionResult();

if (queryResult2 != null && queryResult2.isSuccess()) {
    // This result is at least as recent as the first query
    String value = queryResult2.getResult();
    System.out.println("Value: " + value);
} else if (queryResult2 != null &&
           queryResult2.getFailureReason() == FailureReason.NOT_UP_TO_BOUND) {
    // Store hasn't caught up yet, retry later or try another replica
    System.out.println("Store not caught up: " + queryResult2.getFailureMessage());
}

Query Configuration

QueryConfig

Runtime configuration for query execution.

package org.apache.kafka.streams.query;

public class QueryConfig {
    public QueryConfig(boolean collectExecutionInfo);

    public boolean isCollectExecutionInfo();
}

ResultOrder

Controls ordering of range query results.

package org.apache.kafka.streams.query;

public enum ResultOrder {
    ANY,          // No specific order
    ASCENDING,    // Ascending by serialized key bytes
    DESCENDING    // Descending by serialized key bytes
}

Executing Queries

KafkaStreams.query()

Execute interactive queries against state stores.

package org.apache.kafka.streams;

public class KafkaStreams {
    public <R> StateQueryResult<R> query(StateQueryRequest<R> request);
}

Advanced Query Examples

Partition-Specific Query

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import java.util.Set;

// Query specific partitions
StateQueryRequest<String> request = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("key-1"))
    .withPartitions(Set.of(0, 1, 2));  // Query partitions 0, 1, and 2 only

StateQueryResult<String> result = streams.query(request);

// Iterate through partition results
for (Map.Entry<Integer, QueryResult<String>> entry : result.getPartitionResults().entrySet()) {
    int partition = entry.getKey();
    QueryResult<String> partitionResult = entry.getValue();

    if (partitionResult.isSuccess()) {
        System.out.println("Partition " + partition + ": " + partitionResult.getResult());
    } else {
        System.out.println("Partition " + partition + " failed: " +
                         partitionResult.getFailureMessage());
    }
}

Query with Execution Info

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;

// Enable execution information
StateQueryRequest<String> request = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("key-1"))
    .enableExecutionInfo();

StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();

if (queryResult != null) {
    // Print execution details
    List<String> executionInfo = queryResult.getExecutionInfo();
    System.out.println("Execution info:");
    executionInfo.forEach(System.out::println);

    if (queryResult.isSuccess()) {
        System.out.println("Result: " + queryResult.getResult());
    }
}

Require Active Replica

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;

// Only query active replicas
StateQueryRequest<String> request = StateQueryRequest
    .inStore("my-store")
    .withQuery(KeyQuery.withKey("key-1"))
    .requireActive();

StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();

if (queryResult != null && queryResult.isSuccess()) {
    // Result from active replica
    System.out.println("Value: " + queryResult.getResult());
} else if (queryResult != null &&
           queryResult.getFailureReason() == FailureReason.NOT_ACTIVE) {
    // This instance is not the active replica
    System.out.println("Not active: " + queryResult.getFailureMessage());
}

Comprehensive Error Handling

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;

public String queryStore(KafkaStreams streams, String key) {
    StateQueryRequest<String> request = StateQueryRequest
        .inStore("my-store")
        .withQuery(KeyQuery.withKey(key));

    StateQueryResult<String> result = streams.query(request);
    QueryResult<String> queryResult = result.getOnlyPartitionResult();

    if (queryResult == null) {
        return null;  // Key not found in any partition
    }

    if (queryResult.isSuccess()) {
        return queryResult.getResult();
    }

    // Handle failures
    switch (queryResult.getFailureReason()) {
        case NOT_ACTIVE:
            // Retry on different instance or wait for rebalance
            System.err.println("Partition not active on this instance");
            break;

        case NOT_UP_TO_BOUND:
            // Store hasn't caught up, retry later
            System.err.println("Store not caught up to requested position");
            break;

        case NOT_PRESENT:
            // Partition moved to different instance
            System.err.println("Partition not present on this instance");
            break;

        case DOES_NOT_EXIST:
            // Invalid partition requested
            System.err.println("Requested partition doesn't exist");
            break;

        case UNKNOWN_QUERY_TYPE:
            // Store doesn't support this query
            System.err.println("Query type not supported by store");
            break;

        case STORE_EXCEPTION:
            // Internal store error
            System.err.println("Store exception: " + queryResult.getFailureMessage());
            break;
    }

    return null;
}

Traditional Read-Only Stores API

The traditional API provides simpler access to state stores without the advanced features of the Interactive Queries API.

Read-Only Stores

package org.apache.kafka.streams.state;

public interface ReadOnlyKeyValueStore<K, V> {
    V get(K key);
    KeyValueIterator<K, V> range(K from, K to);
    KeyValueIterator<K, V> reverseRange(K from, K to);
    KeyValueIterator<K, V> all();
    KeyValueIterator<K, V> reverseAll();
    long approximateNumEntries();
}

public interface ReadOnlyWindowStore<K, V> {
    V fetch(K key, long time);
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    WindowStoreIterator<V> backwardFetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> all();
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
}

public interface ReadOnlySessionStore<K, V> {
    KeyValueIterator<Windowed<K>, V> fetch(K key);
    KeyValueIterator<Windowed<K>, V> backwardFetch(K key);
    KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo);
}

QueryableStoreTypes

Factory for queryable store types.

package org.apache.kafka.streams.state;

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore();
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
        timestampedKeyValueStore();
    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore();
    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>>
        timestampedWindowStore();
    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore();
}

StoreQueryParameters

Parameters for querying state stores.

package org.apache.kafka.streams;

public class StoreQueryParameters<T> {
    public static <T> StoreQueryParameters<T> fromNameAndType(String storeName,
                                                               QueryableStoreType<T> queryableStoreType);

    public StoreQueryParameters<T> withPartition(Integer partition);
    public StoreQueryParameters<T> enableStaleStores();
}

Query Examples:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;

// Query key-value store
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType("my-kv-store",
        QueryableStoreTypes.keyValueStore())
);

Long value = store.get("my-key");
System.out.println("Value: " + value);

// Iterate all entries
try (KeyValueIterator<String, Long> iterator = store.all()) {
    while (iterator.hasNext()) {
        KeyValue<String, Long> entry = iterator.next();
        System.out.println(entry.key + ": " + entry.value);
    }
}

// Range query
try (KeyValueIterator<String, Long> range = store.range("a", "m")) {
    while (range.hasNext()) {
        KeyValue<String, Long> entry = range.next();
        System.out.println(entry.key + ": " + entry.value);
    }
}

// Query window store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
    StoreQueryParameters.fromNameAndType("my-window-store",
        QueryableStoreTypes.windowStore())
);

long now = System.currentTimeMillis();
try (WindowStoreIterator<Long> iterator = windowStore.fetch("key",
    now - Duration.ofHours(1).toMillis(), now)) {
    while (iterator.hasNext()) {
        KeyValue<Long, Long> entry = iterator.next();
        System.out.println("Window: " + entry.key + ", Value: " + entry.value);
    }
}

// Query from specific partition
ReadOnlyKeyValueStore<String, Long> partitionStore = streams.store(
    StoreQueryParameters.fromNameAndType("my-kv-store",
        QueryableStoreTypes.keyValueStore())
        .withPartition(0) // Query partition 0 only
);

// Query with stale stores enabled
ReadOnlyKeyValueStore<String, Long> staleStore = streams.store(
    StoreQueryParameters.fromNameAndType("my-kv-store",
        QueryableStoreTypes.keyValueStore())
        .enableStaleStores() // Allow queries from standby replicas
);

State Store Usage in Topology

Adding Store to StreamsBuilder

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;

StreamsBuilder builder = new StreamsBuilder();

// Create and add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    );
builder.addStateStore(storeBuilder);

// Use store in processor
KStream<String, String> source = builder.stream("input-topic");
source.process(() -> new MyProcessor(), "my-store");

Adding Store to Topology

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;

Topology topology = new Topology();

// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    );

topology.addSource("Source", "input-topic")
        .addProcessor("Process", () -> new MyProcessor(), "Source")
        .addStateStore(storeBuilder, "Process")
        .addSink("Sink", "output-topic", "Process");

Supporting Types

KeyValue<K, V>

package org.apache.kafka.streams;

public class KeyValue<K, V> {
    public static <K, V> KeyValue<K, V> pair(K key, V value);

    public final K key;
    public final V value;
}

ValueAndTimestamp<V>

package org.apache.kafka.streams.state;

public interface ValueAndTimestamp<V> {
    static <V> ValueAndTimestamp<V> make(V value, long timestamp);

    V value();
    long timestamp();
}

VersionedRecord<V>

package org.apache.kafka.streams.state;

public interface VersionedRecord<V> {
    V value();
    long timestamp();
    long validTo();
}

VersionedRecordIterator<V>

package org.apache.kafka.streams.state;

public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {
    void close();
}

Windowed<K>

package org.apache.kafka.streams.kstream;

public class Windowed<K> {
    public K key();
    public Window window();
}

Window

package org.apache.kafka.streams.kstream;

public abstract class Window {
    public long start();
    public long end();
    public boolean overlap(Window other);
}

Complete Example

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;

public class StateStoreExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "state-store-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        // Create state store
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("counts-store"),
                Serdes.String(),
                Serdes.Long()
            ).withCachingEnabled()
             .withLoggingEnabled(Collections.emptyMap());

        builder.addStateStore(storeBuilder);

        // Use store in processor
        KStream<String, String> source = builder.stream("input-topic");
        source.process(() -> new Processor<String, String, String, String>() {
            private KeyValueStore<String, Long> store;

            @Override
            public void init(ProcessorContext<String, String> context) {
                store = context.getStateStore("counts-store");
            }

            @Override
            public void process(Record<String, String> record) {
                String key = record.key();
                Long count = store.get(key);
                if (count == null) {
                    count = 0L;
                }
                count++;
                store.put(key, count);

                context.forward(new Record<>(key, "Count: " + count, record.timestamp()));
            }
        }, "counts-store");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Query store
        Thread queryThread = new Thread(() -> {
            try {
                Thread.sleep(10000); // Wait for data

                ReadOnlyKeyValueStore<String, Long> store = streams.store(
                    StoreQueryParameters.fromNameAndType("counts-store",
                        QueryableStoreTypes.keyValueStore())
                );

                try (KeyValueIterator<String, Long> iterator = store.all()) {
                    while (iterator.hasNext()) {
                        KeyValue<String, Long> entry = iterator.next();
                        System.out.println(entry.key + ": " + entry.value);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        queryThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Troubleshooting State Stores

Common State Store Issues

Issue: State Store Not Available

Symptoms:

  • InvalidStateStoreException when accessing stores
  • Store queries fail intermittently
  • "Store not available" errors

Causes:

  • Application still initializing
  • Rebalancing in progress
  • Store migrated to another instance
  • Application not in RUNNING state

Solutions:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.*;

public class ResilientStoreQuery {
    
    public <K, V> V queryWithRetry(KafkaStreams streams, 
                                    String storeName, 
                                    K key,
                                    int maxRetries,
                                    long retryDelayMs) {
        int attempt = 0;
        
        while (attempt < maxRetries) {
            try {
                // Check application state
                if (streams.state() != KafkaStreams.State.RUNNING) {
                    System.out.println("Waiting for RUNNING state, current: " + 
                        streams.state());
                    Thread.sleep(retryDelayMs);
                    attempt++;
                    continue;
                }
                
                // Query store
                ReadOnlyKeyValueStore<K, V> store = streams.store(
                    StoreQueryParameters.fromNameAndType(
                        storeName,
                        QueryableStoreTypes.keyValueStore()
                    )
                );
                
                return store.get(key);
                
            } catch (InvalidStateStoreException e) {
                attempt++;
                System.err.println("Store unavailable (attempt " + attempt + 
                    "/" + maxRetries + "): " + e.getMessage());
                
                if (attempt >= maxRetries) {
                    // Check if store exists on another instance
                    KeyQueryMetadata metadata = streams.queryMetadataForKey(
                        storeName,
                        key,
                        Serdes.String().serializer()
                    );
                    
                    if (metadata != KeyQueryMetadata.NOT_AVAILABLE) {
                        System.out.println("Store available on: " + 
                            metadata.activeHost());
                        // Could query remote instance via REST API
                    }
                    
                    throw e;
                }
                
                try {
                    Thread.sleep(retryDelayMs);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted", ie);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted", e);
            }
        }
        
        return null;
    }
}

Prevention:

  • Wait for RUNNING state before queries
  • Implement retry logic
  • Use state listeners
  • Consider distributed queries across instances

Issue: State Store Corruption

Symptoms:

  • Application fails to start
  • RocksDB errors in logs
  • State directory corruption
  • Inconsistent query results

Causes:

  • Unclean shutdown
  • Disk errors
  • Out of disk space
  • Concurrent access to state directory

Solutions:

import org.apache.kafka.streams.*;
import java.io.File;
import java.nio.file.*;

public class StateStoreRecovery {
    
    public void recoverFromCorruption(String stateDir, String applicationId) {
        try {
            // Option 1: Clean up and rebuild from changelog
            KafkaStreams streams = new KafkaStreams(topology, props);
            
            System.out.println("Cleaning up corrupted state");
            streams.cleanUp(); // Deletes local state
            
            // State will be restored from changelog on startup
            streams.start();
            System.out.println("State restoration started");
            
        } catch (Exception e) {
            System.err.println("Cleanup failed: " + e.getMessage());
            
            // Option 2: Manual cleanup
            Path statePath = Paths.get(stateDir, applicationId);
            try {
                Files.walk(statePath)
                    .sorted(Comparator.reverseOrder())
                    .map(Path::toFile)
                    .forEach(File::delete);
                
                System.out.println("Manually deleted state directory");
            } catch (Exception cleanupEx) {
                System.err.println("Manual cleanup failed: " + cleanupEx.getMessage());
            }
        }
    }
    
    public void preventCorruption() {
        // Best practices to prevent corruption:
        
        // 1. Always close gracefully
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Closing streams gracefully");
            streams.close(Duration.ofSeconds(30));
        }));
        
        // 2. Monitor disk space
        File stateDir = new File("/tmp/kafka-streams");
        long usableSpace = stateDir.getUsableSpace();
        long totalSpace = stateDir.getTotalSpace();
        double usage = (double) (totalSpace - usableSpace) / totalSpace * 100;
        
        if (usage > 90) {
            System.err.println("CRITICAL: Disk usage at " + usage + "%");
        }
        
        // 3. Use separate disk for state if possible
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/dedicated/disk/kafka-streams");
        
        // 4. Enable changelog for recovery
        // (enabled by default for aggregations)
    }
}

Prevention:

  • Always close streams gracefully
  • Monitor disk space
  • Use dedicated disk for state
  • Enable changelog topics
  • Regular backups of state directory

Edge Cases

Querying Windowed Stores

import org.apache.kafka.streams.state.*;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Instant;

// Windowed stores require time range queries
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "windowed-counts",
        QueryableStoreTypes.windowStore()
    )
);

// Query specific time range
Instant now = Instant.now();
Instant oneHourAgo = now.minusSeconds(3600);

try (WindowStoreIterator<Long> iterator = windowStore.fetch(
        "key1",
        oneHourAgo.toEpochMilli(),
        now.toEpochMilli())) {
    
    while (iterator.hasNext()) {
        KeyValue<Long, Long> entry = iterator.next();
        long windowStartTime = entry.key;
        long count = entry.value;
        
        System.out.println("Window starting at " + 
            Instant.ofEpochMilli(windowStartTime) + ": " + count);
    }
}

// Query all windows for a key
try (WindowStoreIterator<Long> allWindows = windowStore.fetch(
        "key1",
        0,
        Long.MAX_VALUE)) {
    
    while (allWindows.hasNext()) {
        KeyValue<Long, Long> entry = allWindows.next();
        System.out.println("Window: " + entry.key + ", Count: " + entry.value);
    }
}

Versioned Store Queries

import org.apache.kafka.streams.state.*;

// Query versioned store at specific timestamp
ReadOnlyVersionedKeyValueStore<String, String> versionedStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "versioned-store",
        QueryableStoreTypes.versionedKeyValueStore()
    )
);

// Get current value
String currentValue = versionedStore.get("key1");

// Get value as of specific timestamp
long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago
VersionedRecord<String> historicalValue = versionedStore.get("key1", timestamp);

if (historicalValue != null) {
    System.out.println("Value at " + timestamp + ": " + historicalValue.value());
    System.out.println("Valid from: " + historicalValue.timestamp());
} else {
    System.out.println("No value found at timestamp " + timestamp);
}

// Edge case: Timestamp before history retention
// Returns null if timestamp is older than history retention period
long veryOldTimestamp = System.currentTimeMillis() - (8 * 24 * 3600000L); // 8 days ago
VersionedRecord<String> oldValue = versionedStore.get("key1", veryOldTimestamp);
// May be null if history retention is < 8 days

Store Iterator Lifecycle

// Always close iterators to prevent resource leaks
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "my-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// WRONG: Iterator not closed
KeyValueIterator<String, Long> iterator = store.all();
while (iterator.hasNext()) {
    KeyValue<String, Long> entry = iterator.next();
    System.out.println(entry.key + ": " + entry.value);
}
// Missing iterator.close() - resource leak

// CORRECT: Use try-with-resources
try (KeyValueIterator<String, Long> iterator = store.all()) {
    while (iterator.hasNext()) {
        KeyValue<String, Long> entry = iterator.next();
        System.out.println(entry.key + ": " + entry.value);
    }
} // Automatically closed

// CORRECT: Manual close with finally
KeyValueIterator<String, Long> iterator = store.all();
try {
    while (iterator.hasNext()) {
        KeyValue<String, Long> entry = iterator.next();
        System.out.println(entry.key + ": " + entry.value);
    }
} finally {
    iterator.close();
}

Performance Optimization

Choosing Store Type

// Decision matrix for store type selection:

// Use KeyValueStore when:
// - Simple key-value lookups
// - No time-based queries needed
// - Most common use case

// Use WindowStore when:
// - Time-based aggregations
// - Need to query by time range
// - Implementing sliding/tumbling windows

// Use SessionStore when:
// - Session-based aggregations
// - Variable window sizes
// - Activity-based grouping

// Use VersionedKeyValueStore when:
// - Need historical values
// - Point-in-time queries required
// - Audit trail needed

// Example: Choose store based on requirements
public StoreBuilder<?> chooseStore(String storeName, Requirements requirements) {
    if (requirements.needsHistoricalQueries()) {
        return Stores.versionedKeyValueStoreBuilder(
            Stores.persistentVersionedKeyValueStore(storeName, 
                Duration.ofDays(7)),
            Serdes.String(),
            Serdes.String()
        );
    } else if (requirements.needsTimeRangeQueries()) {
        return Stores.windowStoreBuilder(
            Stores.persistentWindowStore(storeName,
                Duration.ofDays(1),
                Duration.ofMinutes(5),
                false),
            Serdes.String(),
            Serdes.Long()
        );
    } else {
        return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(storeName),
            Serdes.String(),
            Serdes.Long()
        );
    }
}

Persistent vs In-Memory Stores

// Persistent stores (RocksDB-backed)
StoreBuilder<KeyValueStore<String, Long>> persistentStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("persistent-store"),
        Serdes.String(),
        Serdes.Long()
    );

// Advantages:
// - Can handle large datasets (GBs)
// - Survives application restarts
// - Backed by changelog for fault tolerance
// - Lower memory footprint

// Disadvantages:
// - Slower than in-memory (disk I/O)
// - Requires disk space
// - Longer startup time (restoration)

// In-memory stores
StoreBuilder<KeyValueStore<String, Long>> inMemoryStore =
    Stores.keyValueStoreBuilder(
        Stores.inMemoryKeyValueStore("memory-store"),
        Serdes.String(),
        Serdes.Long()
    );

// Advantages:
// - Very fast access (no disk I/O)
// - Fast startup
// - No disk space required

// Disadvantages:
// - Limited by available memory
// - Lost on application restart
// - Must rebuild from changelog on startup

// Decision guide:
// - Use persistent for: Large state, production workloads
// - Use in-memory for: Small state, caching, temporary data

Caching Configuration

// Caching reduces writes to changelog and downstream processors
StoreBuilder<KeyValueStore<String, Long>> cachedStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("cached-store"),
        Serdes.String(),
        Serdes.Long()
    ).withCachingEnabled(); // Enable caching

// Global cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 
    10 * 1024 * 1024); // 10MB shared across all stores

// Caching behavior:
// - Writes buffered in cache
// - Flushed on commit or cache full
// - Reduces changelog writes
// - Reduces downstream processing

// When to enable caching:
// - High write rate to store
// - Downstream processing expensive
// - Can tolerate delayed updates

// When to disable caching:
// - Need immediate consistency
// - Low write rate
// - Memory constrained

StoreBuilder<KeyValueStore<String, Long>> uncachedStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("uncached-store"),
        Serdes.String(),
        Serdes.Long()
    ).withCachingDisabled(); // Disable caching

Edge Cases

Changelog Topic Compaction

// State store changelogs are compacted topics
// Compaction removes old values for same key

StoreBuilder<KeyValueStore<String, Long>> store =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    ).withLoggingEnabled(Map.of(
        "cleanup.policy", "compact",
        "min.compaction.lag.ms", "0",
        "segment.ms", "60000"
    ));

// Compaction considerations:
// - Old values eventually removed
// - Reduces changelog size
// - Faster state restoration
// - May delay if min.compaction.lag.ms is high

// For windowed stores, use delete retention
StoreBuilder<WindowStore<String, Long>> windowStore =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore("window-store",
            Duration.ofDays(1),
            Duration.ofMinutes(5),
            false),
        Serdes.String(),
        Serdes.Long()
    ).withLoggingEnabled(Map.of(
        "cleanup.policy", "compact,delete",
        "retention.ms", "86400000", // 1 day
        "delete.retention.ms", "86400000"
    ));

Store Restoration from Empty Changelog

// New application or changelog topic deleted
// Store starts empty and builds up from source topics

KafkaStreams streams = new KafkaStreams(topology, props);

streams.setGlobalStateRestoreListener(new StateRestoreListener() {
    @Override
    public void onRestoreStart(TopicPartition topicPartition, 
                              String storeName,
                              long startingOffset, 
                              long endingOffset) {
        if (startingOffset == 0 && endingOffset == 0) {
            System.out.println("Store " + storeName + 
                " has no changelog - will build from scratch");
        } else {
            System.out.println("Restoring " + storeName + 
                " from offset " + startingOffset + " to " + endingOffset);
        }
    }
    
    @Override
    public void onBatchRestored(TopicPartition topicPartition, 
                               String storeName,
                               long batchEndOffset, 
                               long numRestored) {
        // Track progress
    }
    
    @Override
    public void onRestoreEnd(TopicPartition topicPartition, 
                            String storeName,
                            long totalRestored) {
        if (totalRestored == 0) {
            System.out.println("Store " + storeName + 
                " started empty - building from source");
        } else {
            System.out.println("Restored " + totalRestored + 
                " records to " + storeName);
        }
    }
});

streams.start();

Querying Stores Across Instances

// Distributed query pattern for multi-instance deployments
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;
import java.net.*;
import java.io.*;

public class DistributedStoreQuery {
    
    public <K, V> V queryAnyInstance(KafkaStreams streams,
                                      String storeName,
                                      K key,
                                      Serializer<K> keySerializer) throws Exception {
        // Find which instance has the key
        KeyQueryMetadata metadata = streams.queryMetadataForKey(
            storeName,
            key,
            keySerializer
        );
        
        if (metadata == KeyQueryMetadata.NOT_AVAILABLE) {
            throw new IllegalStateException("Store not available");
        }
        
        HostInfo activeHost = metadata.activeHost();
        
        // Check if key is on local instance
        if (isLocalHost(activeHost)) {
            // Query local store
            ReadOnlyKeyValueStore<K, V> store = streams.store(
                StoreQueryParameters.fromNameAndType(
                    storeName,
                    QueryableStoreTypes.keyValueStore()
                )
            );
            return store.get(key);
        } else {
            // Query remote instance via REST API
            return queryRemoteInstance(activeHost, storeName, key);
        }
    }
    
    private boolean isLocalHost(HostInfo hostInfo) {
        // Check if host matches local instance
        // Compare with application.server configuration
        return hostInfo.host().equals("localhost") && 
               hostInfo.port() == 8080;
    }
    
    private <K, V> V queryRemoteInstance(HostInfo hostInfo, 
                                          String storeName,
                                          K key) throws Exception {
        // Make HTTP request to remote instance
        URL url = new URL("http://" + hostInfo.host() + ":" + 
            hostInfo.port() + "/store/" + storeName + "/key/" + key);
        
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("GET");
        
        // Read response
        BufferedReader reader = new BufferedReader(
            new InputStreamReader(conn.getInputStream()));
        String response = reader.readLine();
        reader.close();
        
        // Deserialize response
        return deserializeResponse(response);
    }
    
    private <V> V deserializeResponse(String response) {
        // Implement deserialization logic
        return null;
    }
}

Capacity Planning

Sizing State Stores

// Calculate required state store size
public class StateStoreSizing {
    
    public long calculateRequiredDiskSpace(
            long numKeys,
            int avgKeySize,
            int avgValueSize,
            double compressionRatio) {
        
        // RocksDB overhead: ~30-40% for indexes and metadata
        double rocksdbOverhead = 1.4;
        
        // Calculate raw data size
        long rawDataSize = numKeys * (avgKeySize + avgValueSize);
        
        // Apply compression
        long compressedSize = (long) (rawDataSize * compressionRatio);
        
        // Apply RocksDB overhead
        long totalSize = (long) (compressedSize * rocksdbOverhead);
        
        System.out.println("State store sizing:");
        System.out.println("  Keys: " + numKeys);
        System.out.println("  Raw data: " + (rawDataSize / 1024 / 1024) + " MB");
        System.out.println("  Compressed: " + (compressedSize / 1024 / 1024) + " MB");
        System.out.println("  With overhead: " + (totalSize / 1024 / 1024) + " MB");
        
        return totalSize;
    }
    
    // Example calculation
    public void example() {
        // Scenario: User activity counts
        // - 10 million users
        // - Key: user ID (36 bytes for UUID string)
        // - Value: count (8 bytes for Long)
        // - Compression: ~0.7 (LZ4)
        
        long requiredSpace = calculateRequiredDiskSpace(
            10_000_000,  // 10M keys
            36,          // UUID string
            8,           // Long value
            0.7          // 30% compression
        );
        
        // Result: ~430 MB per instance
        // For 3 instances: ~430 MB each
        // For standby replicas: double the space per instance
    }
}