tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka Streams provides local state stores for stateful stream processing operations.
Key-value store for simple lookups.
package org.apache.kafka.streams.state;
public interface KeyValueStore<K, V> extends StateStore {
// Basic operations
V get(K key);
void put(K key, V value);
V putIfAbsent(K key, V value);
void putAll(List<KeyValue<K, V>> entries);
V delete(K key);
// Range queries
KeyValueIterator<K, V> range(K from, K to);
KeyValueIterator<K, V> reverseRange(K from, K to);
KeyValueIterator<K, V> all();
KeyValueIterator<K, V> reverseAll();
// Prefix scan
<PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer);
// Metadata
long approximateNumEntries();
}Time-windowed store.
package org.apache.kafka.streams.state;
public interface WindowStore<K, V> extends StateStore {
void put(K key, V value);
void put(K key, V value, long windowStartTimestamp);
V fetch(K key, long time);
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
WindowStoreIterator<V> backwardFetch(K key, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> all();
KeyValueIterator<Windowed<K>, V> backwardAll();
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> backwardFetchAll(long timeFrom, long timeTo);
}Session-windowed store.
package org.apache.kafka.streams.state;
public interface SessionStore<K, V> extends StateStore {
void put(Windowed<K> sessionKey, V aggregate);
void remove(Windowed<K> sessionKey);
KeyValueIterator<Windowed<K>, V> fetch(K key);
KeyValueIterator<Windowed<K>, V> backwardFetch(K key);
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo);
KeyValueIterator<Windowed<K>, V> backwardFetch(K keyFrom, K keyTo);
KeyValueIterator<Windowed<K>, V> findSessions(K key, long earliestSessionEndTime,
long latestSessionStartTime);
KeyValueIterator<Windowed<K>, V> backwardFindSessions(K key, long earliestSessionEndTime,
long latestSessionStartTime);
KeyValueIterator<Windowed<K>, V> findSessions(K keyFrom, K keyTo,
long earliestSessionEndTime,
long latestSessionStartTime);
}Key-value store with timestamps.
package org.apache.kafka.streams.state;
public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
// Inherits all KeyValueStore methods but returns ValueAndTimestamp<V>
}Versioned key-value store.
package org.apache.kafka.streams.state;
public interface VersionedKeyValueStore<K, V> extends StateStore {
V get(K key);
VersionedRecord<V> get(K key, long asOfTimestamp);
void put(K key, V value);
void put(K key, V value, long timestamp);
VersionedRecord<V> delete(K key);
VersionedRecord<V> delete(K key, long timestamp);
}Factory for creating store builders and suppliers.
package org.apache.kafka.streams.state;
public class Stores {
// KeyValue store builders
public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(
KeyValueBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde);
public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(
KeyValueBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde);
// Window store builders
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
WindowBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde);
public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(
WindowBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde);
// Session store builders
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(
SessionBytesStoreSupplier supplier,
Serde<K> keySerde,
Serde<V> valueSerde);
// KeyValue store suppliers
public static KeyValueBytesStoreSupplier persistentKeyValueStore(String name);
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(String name);
public static KeyValueBytesStoreSupplier lruMap(String name, int maxCacheSize);
// Window store suppliers
public static WindowBytesStoreSupplier persistentWindowStore(String name,
Duration retentionPeriod,
Duration windowSize,
boolean retainDuplicates);
public static WindowBytesStoreSupplier inMemoryWindowStore(String name,
Duration retentionPeriod,
Duration windowSize,
boolean retainDuplicates);
// Session store suppliers
public static SessionBytesStoreSupplier persistentSessionStore(String name,
Duration retentionPeriod);
public static SessionBytesStoreSupplier inMemorySessionStore(String name,
Duration retentionPeriod);
// Versioned store supplier
public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name,
Duration historyRetention);
}Usage Examples:
import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;
import java.time.Duration;
// Persistent key-value store
StoreBuilder<KeyValueStore<String, Long>> kvStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-kv-store"),
Serdes.String(),
Serdes.Long()
).withCachingEnabled()
.withLoggingEnabled(Collections.singletonMap("retention.ms", "86400000"));
// In-memory key-value store
StoreBuilder<KeyValueStore<String, String>> inMemoryStore =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("temp-store"),
Serdes.String(),
Serdes.String()
).withLoggingDisabled();
// Window store
StoreBuilder<WindowStore<String, Long>> windowStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore("my-window-store",
Duration.ofDays(1), // retention
Duration.ofMinutes(5), // window size
false), // retain duplicates
Serdes.String(),
Serdes.Long()
);
// Session store
StoreBuilder<SessionStore<String, Long>> sessionStore =
Stores.sessionStoreBuilder(
Stores.persistentSessionStore("my-session-store",
Duration.ofHours(1)), // retention
Serdes.String(),
Serdes.Long()
);
// Versioned store
StoreBuilder<VersionedKeyValueStore<String, String>> versionedStore =
Stores.versionedKeyValueStoreBuilder(
Stores.persistentVersionedKeyValueStore("my-versioned-store",
Duration.ofDays(7)), // history retention
Serdes.String(),
Serdes.String()
);Builder for state stores with configuration options.
package org.apache.kafka.streams.state;
public interface StoreBuilder<T extends StateStore> {
StoreBuilder<T> withCachingEnabled();
StoreBuilder<T> withCachingDisabled();
StoreBuilder<T> withLoggingEnabled(Map<String, String> config);
StoreBuilder<T> withLoggingDisabled();
T build();
Map<String, String> logConfig();
boolean loggingEnabled();
String name();
}Kafka Streams provides two approaches for querying state stores: the traditional Read-Only Stores API and the newer Interactive Queries API (introduced in KIP-796). The Interactive Queries API provides more flexibility and control over query execution, including position tracking, partition-specific queries, and detailed execution information.
Building queries for interactive state store access.
package org.apache.kafka.streams.query;
public class StateQueryRequest<R> {
// Create a request for a specific store
public static InStore inStore(String name);
// Specify the query to execute
public <R> StateQueryRequest<R> withQuery(Query<R> query);
// Partition selection
public StateQueryRequest<R> withAllPartitions();
public StateQueryRequest<R> withPartitions(Set<Integer> partitions);
// Position-based consistency
public StateQueryRequest<R> withPositionBound(PositionBound positionBound);
// Active/standby configuration
public StateQueryRequest<R> requireActive();
// Execution details
public StateQueryRequest<R> enableExecutionInfo();
// Accessors
public String getStoreName();
public Query<R> getQuery();
public PositionBound getPositionBound();
public boolean isAllPartitions();
public Set<Integer> getPartitions();
public boolean executionInfoEnabled();
public boolean isRequireActive();
}Container for query results across partitions.
package org.apache.kafka.streams.query;
public class StateQueryResult<R> {
// Access partition-specific results
public Map<Integer, QueryResult<R>> getPartitionResults();
// For single-partition queries
public QueryResult<R> getOnlyPartitionResult();
// For global store queries
public QueryResult<R> getGlobalResult();
// Position tracking
public Position getPosition();
// Internal methods for building results
public void addResult(int partition, QueryResult<R> result);
public void setGlobalResult(QueryResult<R> result);
}Individual partition query result with success/failure information.
package org.apache.kafka.streams.query;
public interface QueryResult<R> {
// Factory methods
static <R> QueryResult<R> forResult(R result);
static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage);
static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);
static <R> QueryResult<R> notUpToBound(Position currentPosition,
PositionBound positionBound,
Integer partition);
// Result status
boolean isSuccess();
boolean isFailure();
// Success path
R getResult();
// Failure path
FailureReason getFailureReason();
String getFailureMessage();
// Position tracking
Position getPosition();
void setPosition(Position position);
// Execution details
List<String> getExecutionInfo();
void addExecutionInfo(String message);
}Enumeration of query failure reasons.
package org.apache.kafka.streams.query;
public enum FailureReason {
UNKNOWN_QUERY_TYPE, // Store doesn't support this query type
NOT_ACTIVE, // Partition is not active on this instance
NOT_UP_TO_BOUND, // Store hasn't caught up to position bound
NOT_PRESENT, // Partition not available on this instance
DOES_NOT_EXIST, // Requested partition doesn't exist
STORE_EXCEPTION // Exception during query execution
}Point query for retrieving a single record by key.
package org.apache.kafka.streams.query;
public final class KeyQuery<K, V> implements Query<V> {
// Create a query for a specific key
public static <K, V> KeyQuery<K, V> withKey(K key);
// Skip cache and query underlying store directly
public KeyQuery<K, V> skipCache();
// Accessors
public K getKey();
public boolean isSkipCache();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
// Create and execute a point query
StateQueryRequest<String> request = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("user-123"));
StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
String value = queryResult.getResult();
System.out.println("Value: " + value);
} else if (queryResult != null) {
System.err.println("Query failed: " + queryResult.getFailureMessage());
}Range query for retrieving multiple records within key bounds.
package org.apache.kafka.streams.query;
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
// Range with both bounds
public static <K, V> RangeQuery<K, V> withRange(K lower, K upper);
// Range with single bound
public static <K, V> RangeQuery<K, V> withLowerBound(K lower);
public static <K, V> RangeQuery<K, V> withUpperBound(K upper);
// Full scan
public static <K, V> RangeQuery<K, V> withNoBounds();
// Result ordering
public RangeQuery<K, V> withAscendingKeys();
public RangeQuery<K, V> withDescendingKeys();
// Accessors
public Optional<K> getLowerBound();
public Optional<K> getUpperBound();
public ResultOrder resultOrder();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
// Range query with ascending order
StateQueryRequest<KeyValueIterator<String, Long>> request = StateQueryRequest
.inStore("counts-store")
.withQuery(RangeQuery.<String, Long>withRange("a", "m").withAscendingKeys());
StateQueryResult<KeyValueIterator<String, Long>> result = streams.query(request);
QueryResult<KeyValueIterator<String, Long>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
try (KeyValueIterator<String, Long> iterator = queryResult.getResult()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
}
}Query windowed stores by key within a time range.
package org.apache.kafka.streams.query;
public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {
// Query windows for a specific key and time range
public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowStartRange(
K key,
Instant timeFrom,
Instant timeTo
);
// Accessors
public K getKey();
public Optional<Instant> getTimeFrom();
public Optional<Instant> getTimeTo();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;
// Query windows for a specific key
Instant now = Instant.now();
Instant hourAgo = now.minus(Duration.ofHours(1));
StateQueryRequest<WindowStoreIterator<Long>> request = StateQueryRequest
.inStore("window-store")
.withQuery(WindowKeyQuery.<String, Long>withKeyAndWindowStartRange(
"user-123",
hourAgo,
now
));
StateQueryResult<WindowStoreIterator<Long>> result = streams.query(request);
QueryResult<WindowStoreIterator<Long>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
try (WindowStoreIterator<Long> iterator = queryResult.getResult()) {
while (iterator.hasNext()) {
KeyValue<Long, Long> entry = iterator.next();
System.out.println("Timestamp: " + entry.key + ", Value: " + entry.value);
}
}
}Range query on windowed stores.
package org.apache.kafka.streams.query;
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
// Query all windows for a specific key
public static <K, V> WindowRangeQuery<K, V> withKey(K key);
// Query all keys within a time range
public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(
Instant timeFrom,
Instant timeTo
);
// Accessors
public Optional<K> getKey();
public Optional<Instant> getTimeFrom();
public Optional<Instant> getTimeTo();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
// Query all windows for a key
StateQueryRequest<KeyValueIterator<Windowed<String>, Long>> request = StateQueryRequest
.inStore("window-store")
.withQuery(WindowRangeQuery.<String, Long>withKey("user-123"));
StateQueryResult<KeyValueIterator<Windowed<String>, Long>> result = streams.query(request);
QueryResult<KeyValueIterator<Windowed<String>, Long>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
try (KeyValueIterator<Windowed<String>, Long> iterator = queryResult.getResult()) {
while (iterator.hasNext()) {
KeyValue<Windowed<String>, Long> entry = iterator.next();
Window window = entry.key.window();
System.out.println("Window [" + window.start() + " - " + window.end() +
"]: " + entry.value);
}
}
}Point query for timestamped stores.
package org.apache.kafka.streams.query;
public final class TimestampedKeyQuery<K, V> implements Query<ValueAndTimestamp<V>> {
// Create a query for a specific key
public static <K, V> TimestampedKeyQuery<K, V> withKey(K key);
// Skip cache
public TimestampedKeyQuery<K, V> skipCache();
// Accessors
public K key();
public boolean isSkipCache();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
// Query timestamped store
StateQueryRequest<ValueAndTimestamp<String>> request = StateQueryRequest
.inStore("timestamped-store")
.withQuery(TimestampedKeyQuery.<String, String>withKey("key-1"));
StateQueryResult<ValueAndTimestamp<String>> result = streams.query(request);
QueryResult<ValueAndTimestamp<String>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
ValueAndTimestamp<String> valueAndTimestamp = queryResult.getResult();
if (valueAndTimestamp != null) {
System.out.println("Value: " + valueAndTimestamp.value());
System.out.println("Timestamp: " + valueAndTimestamp.timestamp());
}
}Range query for timestamped stores.
package org.apache.kafka.streams.query;
public final class TimestampedRangeQuery<K, V>
implements Query<KeyValueIterator<K, ValueAndTimestamp<V>>> {
// Range with both bounds
public static <K, V> TimestampedRangeQuery<K, V> withRange(K lower, K upper);
// Range with single bound
public static <K, V> TimestampedRangeQuery<K, V> withLowerBound(K lower);
public static <K, V> TimestampedRangeQuery<K, V> withUpperBound(K upper);
// Full scan
public static <K, V> TimestampedRangeQuery<K, V> withNoBounds();
// Result ordering
public TimestampedRangeQuery<K, V> withAscendingKeys();
public TimestampedRangeQuery<K, V> withDescendingKeys();
// Accessors
public Optional<K> lowerBound();
public Optional<K> upperBound();
public ResultOrder resultOrder();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
// Range query on timestamped store
StateQueryRequest<KeyValueIterator<String, ValueAndTimestamp<Long>>> request =
StateQueryRequest
.inStore("timestamped-store")
.withQuery(TimestampedRangeQuery.<String, Long>withRange("a", "z"));
StateQueryResult<KeyValueIterator<String, ValueAndTimestamp<Long>>> result =
streams.query(request);
QueryResult<KeyValueIterator<String, ValueAndTimestamp<Long>>> queryResult =
result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
try (KeyValueIterator<String, ValueAndTimestamp<Long>> iterator = queryResult.getResult()) {
while (iterator.hasNext()) {
KeyValue<String, ValueAndTimestamp<Long>> entry = iterator.next();
System.out.println(entry.key + ": value=" + entry.value.value() +
", timestamp=" + entry.value.timestamp());
}
}
}Point query for retrieving a single versioned record by key and optional timestamp.
package org.apache.kafka.streams.query;
public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> {
// Create a query for a specific key
public static <K, V> VersionedKeyQuery<K, V> withKey(K key);
// Specify timestamp for historical query
public VersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);
// Accessors
public K key();
public Optional<Instant> asOfTimestamp();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;
// Query latest version of a key
StateQueryRequest<VersionedRecord<String>> request = StateQueryRequest
.inStore("versioned-store")
.withQuery(VersionedKeyQuery.<String, String>withKey("user-123"));
StateQueryResult<VersionedRecord<String>> result = streams.query(request);
QueryResult<VersionedRecord<String>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
VersionedRecord<String> record = queryResult.getResult();
if (record != null) {
System.out.println("Value: " + record.value());
System.out.println("Timestamp: " + record.timestamp());
System.out.println("Valid to: " + record.validTo());
}
}
// Query historical version as of a specific timestamp
Instant yesterday = Instant.now().minus(Duration.ofDays(1));
StateQueryRequest<VersionedRecord<String>> historicalRequest = StateQueryRequest
.inStore("versioned-store")
.withQuery(VersionedKeyQuery.<String, String>withKey("user-123").asOf(yesterday));
StateQueryResult<VersionedRecord<String>> historicalResult = streams.query(historicalRequest);
QueryResult<VersionedRecord<String>> historicalQueryResult = historicalResult.getOnlyPartitionResult();
if (historicalQueryResult != null && historicalQueryResult.isSuccess()) {
VersionedRecord<String> historicalRecord = historicalQueryResult.getResult();
if (historicalRecord != null) {
System.out.println("Historical value: " + historicalRecord.value());
System.out.println("Was valid at: " + historicalRecord.timestamp());
}
}Query for retrieving multiple versions of a key within a time range.
package org.apache.kafka.streams.query;
public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> {
// Create a query for a specific key
public static <K, V> MultiVersionedKeyQuery<K, V> withKey(K key);
// Specify time range
public MultiVersionedKeyQuery<K, V> fromTime(Instant fromTime);
public MultiVersionedKeyQuery<K, V> toTime(Instant toTime);
// Result ordering by timestamp
public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();
public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();
// Accessors
public K key();
public Optional<Instant> fromTime();
public Optional<Instant> toTime();
public ResultOrder resultOrder();
}Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import org.apache.kafka.streams.state.*;
import java.time.Instant;
import java.time.Duration;
// Query all versions of a key within time range
Instant now = Instant.now();
Instant weekAgo = now.minus(Duration.ofDays(7));
StateQueryRequest<VersionedRecordIterator<String>> request = StateQueryRequest
.inStore("versioned-store")
.withQuery(MultiVersionedKeyQuery.<String, String>withKey("user-123")
.fromTime(weekAgo)
.toTime(now)
.withDescendingTimestamps());
StateQueryResult<VersionedRecordIterator<String>> result = streams.query(request);
QueryResult<VersionedRecordIterator<String>> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
try (VersionedRecordIterator<String> iterator = queryResult.getResult()) {
while (iterator.hasNext()) {
VersionedRecord<String> record = iterator.next();
System.out.println("Version at " + record.timestamp() + ": " + record.value());
System.out.println(" Valid until: " + record.validTo());
}
}
}
// Query all versions without time bounds
StateQueryRequest<VersionedRecordIterator<String>> allVersionsRequest = StateQueryRequest
.inStore("versioned-store")
.withQuery(MultiVersionedKeyQuery.<String, String>withKey("user-123")
.withAscendingTimestamps());
StateQueryResult<VersionedRecordIterator<String>> allVersionsResult =
streams.query(allVersionsRequest);
QueryResult<VersionedRecordIterator<String>> allVersionsQueryResult =
allVersionsResult.getOnlyPartitionResult();
if (allVersionsQueryResult != null && allVersionsQueryResult.isSuccess()) {
try (VersionedRecordIterator<String> iterator = allVersionsQueryResult.getResult()) {
System.out.println("All historical versions:");
while (iterator.hasNext()) {
VersionedRecord<String> record = iterator.next();
System.out.println(" " + record.timestamp() + ": " + record.value());
}
}
}Tracks the processing position of state stores relative to input topics.
package org.apache.kafka.streams.query;
public class Position {
// Create positions
public static Position emptyPosition();
public static Position fromMap(Map<String, ? extends Map<Integer, Long>> map);
// Modify position
public Position withComponent(String topic, int partition, long offset);
public Position merge(Position other);
// Query position
public Set<String> getTopics();
public Map<Integer, Long> getPartitionPositions(String topic);
public boolean isEmpty();
// Copy
public Position copy();
}Bounds query execution based on store position.
package org.apache.kafka.streams.query;
public class PositionBound {
// Create bounds
public static PositionBound unbounded();
public static PositionBound at(Position position);
// Query bound
public boolean isUnbounded();
public Position position();
}Position-Aware Query Example:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
// First query - get current position
StateQueryRequest<String> request1 = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("key-1"));
StateQueryResult<String> result1 = streams.query(request1);
Position position1 = result1.getPosition();
// Later query - ensure we read at least as recent as first query
StateQueryRequest<String> request2 = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("key-2"))
.withPositionBound(PositionBound.at(position1));
StateQueryResult<String> result2 = streams.query(request2);
QueryResult<String> queryResult2 = result2.getOnlyPartitionResult();
if (queryResult2 != null && queryResult2.isSuccess()) {
// This result is at least as recent as the first query
String value = queryResult2.getResult();
System.out.println("Value: " + value);
} else if (queryResult2 != null &&
queryResult2.getFailureReason() == FailureReason.NOT_UP_TO_BOUND) {
// Store hasn't caught up yet, retry later or try another replica
System.out.println("Store not caught up: " + queryResult2.getFailureMessage());
}Runtime configuration for query execution.
package org.apache.kafka.streams.query;
public class QueryConfig {
public QueryConfig(boolean collectExecutionInfo);
public boolean isCollectExecutionInfo();
}Controls ordering of range query results.
package org.apache.kafka.streams.query;
public enum ResultOrder {
ANY, // No specific order
ASCENDING, // Ascending by serialized key bytes
DESCENDING // Descending by serialized key bytes
}Execute interactive queries against state stores.
package org.apache.kafka.streams;
public class KafkaStreams {
public <R> StateQueryResult<R> query(StateQueryRequest<R> request);
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
import java.util.Set;
// Query specific partitions
StateQueryRequest<String> request = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("key-1"))
.withPartitions(Set.of(0, 1, 2)); // Query partitions 0, 1, and 2 only
StateQueryResult<String> result = streams.query(request);
// Iterate through partition results
for (Map.Entry<Integer, QueryResult<String>> entry : result.getPartitionResults().entrySet()) {
int partition = entry.getKey();
QueryResult<String> partitionResult = entry.getValue();
if (partitionResult.isSuccess()) {
System.out.println("Partition " + partition + ": " + partitionResult.getResult());
} else {
System.out.println("Partition " + partition + " failed: " +
partitionResult.getFailureMessage());
}
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
// Enable execution information
StateQueryRequest<String> request = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("key-1"))
.enableExecutionInfo();
StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();
if (queryResult != null) {
// Print execution details
List<String> executionInfo = queryResult.getExecutionInfo();
System.out.println("Execution info:");
executionInfo.forEach(System.out::println);
if (queryResult.isSuccess()) {
System.out.println("Result: " + queryResult.getResult());
}
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
// Only query active replicas
StateQueryRequest<String> request = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey("key-1"))
.requireActive();
StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();
if (queryResult != null && queryResult.isSuccess()) {
// Result from active replica
System.out.println("Value: " + queryResult.getResult());
} else if (queryResult != null &&
queryResult.getFailureReason() == FailureReason.NOT_ACTIVE) {
// This instance is not the active replica
System.out.println("Not active: " + queryResult.getFailureMessage());
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.query.*;
public String queryStore(KafkaStreams streams, String key) {
StateQueryRequest<String> request = StateQueryRequest
.inStore("my-store")
.withQuery(KeyQuery.withKey(key));
StateQueryResult<String> result = streams.query(request);
QueryResult<String> queryResult = result.getOnlyPartitionResult();
if (queryResult == null) {
return null; // Key not found in any partition
}
if (queryResult.isSuccess()) {
return queryResult.getResult();
}
// Handle failures
switch (queryResult.getFailureReason()) {
case NOT_ACTIVE:
// Retry on different instance or wait for rebalance
System.err.println("Partition not active on this instance");
break;
case NOT_UP_TO_BOUND:
// Store hasn't caught up, retry later
System.err.println("Store not caught up to requested position");
break;
case NOT_PRESENT:
// Partition moved to different instance
System.err.println("Partition not present on this instance");
break;
case DOES_NOT_EXIST:
// Invalid partition requested
System.err.println("Requested partition doesn't exist");
break;
case UNKNOWN_QUERY_TYPE:
// Store doesn't support this query
System.err.println("Query type not supported by store");
break;
case STORE_EXCEPTION:
// Internal store error
System.err.println("Store exception: " + queryResult.getFailureMessage());
break;
}
return null;
}The traditional API provides simpler access to state stores without the advanced features of the Interactive Queries API.
package org.apache.kafka.streams.state;
public interface ReadOnlyKeyValueStore<K, V> {
V get(K key);
KeyValueIterator<K, V> range(K from, K to);
KeyValueIterator<K, V> reverseRange(K from, K to);
KeyValueIterator<K, V> all();
KeyValueIterator<K, V> reverseAll();
long approximateNumEntries();
}
public interface ReadOnlyWindowStore<K, V> {
V fetch(K key, long time);
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
WindowStoreIterator<V> backwardFetch(K key, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, long timeFrom, long timeTo);
KeyValueIterator<Windowed<K>, V> all();
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
}
public interface ReadOnlySessionStore<K, V> {
KeyValueIterator<Windowed<K>, V> fetch(K key);
KeyValueIterator<Windowed<K>, V> backwardFetch(K key);
KeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo);
}Factory for queryable store types.
package org.apache.kafka.streams.state;
public final class QueryableStoreTypes {
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore();
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
timestampedKeyValueStore();
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore();
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>>
timestampedWindowStore();
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore();
}Parameters for querying state stores.
package org.apache.kafka.streams;
public class StoreQueryParameters<T> {
public static <T> StoreQueryParameters<T> fromNameAndType(String storeName,
QueryableStoreType<T> queryableStoreType);
public StoreQueryParameters<T> withPartition(Integer partition);
public StoreQueryParameters<T> enableStaleStores();
}Query Examples:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;
// Query key-value store
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType("my-kv-store",
QueryableStoreTypes.keyValueStore())
);
Long value = store.get("my-key");
System.out.println("Value: " + value);
// Iterate all entries
try (KeyValueIterator<String, Long> iterator = store.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
}
// Range query
try (KeyValueIterator<String, Long> range = store.range("a", "m")) {
while (range.hasNext()) {
KeyValue<String, Long> entry = range.next();
System.out.println(entry.key + ": " + entry.value);
}
}
// Query window store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
StoreQueryParameters.fromNameAndType("my-window-store",
QueryableStoreTypes.windowStore())
);
long now = System.currentTimeMillis();
try (WindowStoreIterator<Long> iterator = windowStore.fetch("key",
now - Duration.ofHours(1).toMillis(), now)) {
while (iterator.hasNext()) {
KeyValue<Long, Long> entry = iterator.next();
System.out.println("Window: " + entry.key + ", Value: " + entry.value);
}
}
// Query from specific partition
ReadOnlyKeyValueStore<String, Long> partitionStore = streams.store(
StoreQueryParameters.fromNameAndType("my-kv-store",
QueryableStoreTypes.keyValueStore())
.withPartition(0) // Query partition 0 only
);
// Query with stale stores enabled
ReadOnlyKeyValueStore<String, Long> staleStore = streams.store(
StoreQueryParameters.fromNameAndType("my-kv-store",
QueryableStoreTypes.keyValueStore())
.enableStaleStores() // Allow queries from standby replicas
);import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.*;
StreamsBuilder builder = new StreamsBuilder();
// Create and add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// Use store in processor
KStream<String, String> source = builder.stream("input-topic");
source.process(() -> new MyProcessor(), "my-store");import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;
Topology topology = new Topology();
// Add state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
);
topology.addSource("Source", "input-topic")
.addProcessor("Process", () -> new MyProcessor(), "Source")
.addStateStore(storeBuilder, "Process")
.addSink("Sink", "output-topic", "Process");package org.apache.kafka.streams;
public class KeyValue<K, V> {
public static <K, V> KeyValue<K, V> pair(K key, V value);
public final K key;
public final V value;
}package org.apache.kafka.streams.state;
public interface ValueAndTimestamp<V> {
static <V> ValueAndTimestamp<V> make(V value, long timestamp);
V value();
long timestamp();
}package org.apache.kafka.streams.state;
public interface VersionedRecord<V> {
V value();
long timestamp();
long validTo();
}package org.apache.kafka.streams.state;
public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {
void close();
}package org.apache.kafka.streams.kstream;
public class Windowed<K> {
public K key();
public Window window();
}package org.apache.kafka.streams.kstream;
public abstract class Window {
public long start();
public long end();
public boolean overlap(Window other);
}import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.api.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.common.serialization.Serdes;
public class StateStoreExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "state-store-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Create state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("counts-store"),
Serdes.String(),
Serdes.Long()
).withCachingEnabled()
.withLoggingEnabled(Collections.emptyMap());
builder.addStateStore(storeBuilder);
// Use store in processor
KStream<String, String> source = builder.stream("input-topic");
source.process(() -> new Processor<String, String, String, String>() {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, String> context) {
store = context.getStateStore("counts-store");
}
@Override
public void process(Record<String, String> record) {
String key = record.key();
Long count = store.get(key);
if (count == null) {
count = 0L;
}
count++;
store.put(key, count);
context.forward(new Record<>(key, "Count: " + count, record.timestamp()));
}
}, "counts-store");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Query store
Thread queryThread = new Thread(() -> {
try {
Thread.sleep(10000); // Wait for data
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType("counts-store",
QueryableStoreTypes.keyValueStore())
);
try (KeyValueIterator<String, Long> iterator = store.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
queryThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}Symptoms:
Causes:
Solutions:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.*;
public class ResilientStoreQuery {
public <K, V> V queryWithRetry(KafkaStreams streams,
String storeName,
K key,
int maxRetries,
long retryDelayMs) {
int attempt = 0;
while (attempt < maxRetries) {
try {
// Check application state
if (streams.state() != KafkaStreams.State.RUNNING) {
System.out.println("Waiting for RUNNING state, current: " +
streams.state());
Thread.sleep(retryDelayMs);
attempt++;
continue;
}
// Query store
ReadOnlyKeyValueStore<K, V> store = streams.store(
StoreQueryParameters.fromNameAndType(
storeName,
QueryableStoreTypes.keyValueStore()
)
);
return store.get(key);
} catch (InvalidStateStoreException e) {
attempt++;
System.err.println("Store unavailable (attempt " + attempt +
"/" + maxRetries + "): " + e.getMessage());
if (attempt >= maxRetries) {
// Check if store exists on another instance
KeyQueryMetadata metadata = streams.queryMetadataForKey(
storeName,
key,
Serdes.String().serializer()
);
if (metadata != KeyQueryMetadata.NOT_AVAILABLE) {
System.out.println("Store available on: " +
metadata.activeHost());
// Could query remote instance via REST API
}
throw e;
}
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", ie);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
}
return null;
}
}Prevention:
Symptoms:
Causes:
Solutions:
import org.apache.kafka.streams.*;
import java.io.File;
import java.nio.file.*;
public class StateStoreRecovery {
public void recoverFromCorruption(String stateDir, String applicationId) {
try {
// Option 1: Clean up and rebuild from changelog
KafkaStreams streams = new KafkaStreams(topology, props);
System.out.println("Cleaning up corrupted state");
streams.cleanUp(); // Deletes local state
// State will be restored from changelog on startup
streams.start();
System.out.println("State restoration started");
} catch (Exception e) {
System.err.println("Cleanup failed: " + e.getMessage());
// Option 2: Manual cleanup
Path statePath = Paths.get(stateDir, applicationId);
try {
Files.walk(statePath)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
System.out.println("Manually deleted state directory");
} catch (Exception cleanupEx) {
System.err.println("Manual cleanup failed: " + cleanupEx.getMessage());
}
}
}
public void preventCorruption() {
// Best practices to prevent corruption:
// 1. Always close gracefully
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Closing streams gracefully");
streams.close(Duration.ofSeconds(30));
}));
// 2. Monitor disk space
File stateDir = new File("/tmp/kafka-streams");
long usableSpace = stateDir.getUsableSpace();
long totalSpace = stateDir.getTotalSpace();
double usage = (double) (totalSpace - usableSpace) / totalSpace * 100;
if (usage > 90) {
System.err.println("CRITICAL: Disk usage at " + usage + "%");
}
// 3. Use separate disk for state if possible
props.put(StreamsConfig.STATE_DIR_CONFIG, "/dedicated/disk/kafka-streams");
// 4. Enable changelog for recovery
// (enabled by default for aggregations)
}
}Prevention:
import org.apache.kafka.streams.state.*;
import org.apache.kafka.streams.kstream.Windowed;
import java.time.Instant;
// Windowed stores require time range queries
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
StoreQueryParameters.fromNameAndType(
"windowed-counts",
QueryableStoreTypes.windowStore()
)
);
// Query specific time range
Instant now = Instant.now();
Instant oneHourAgo = now.minusSeconds(3600);
try (WindowStoreIterator<Long> iterator = windowStore.fetch(
"key1",
oneHourAgo.toEpochMilli(),
now.toEpochMilli())) {
while (iterator.hasNext()) {
KeyValue<Long, Long> entry = iterator.next();
long windowStartTime = entry.key;
long count = entry.value;
System.out.println("Window starting at " +
Instant.ofEpochMilli(windowStartTime) + ": " + count);
}
}
// Query all windows for a key
try (WindowStoreIterator<Long> allWindows = windowStore.fetch(
"key1",
0,
Long.MAX_VALUE)) {
while (allWindows.hasNext()) {
KeyValue<Long, Long> entry = allWindows.next();
System.out.println("Window: " + entry.key + ", Count: " + entry.value);
}
}import org.apache.kafka.streams.state.*;
// Query versioned store at specific timestamp
ReadOnlyVersionedKeyValueStore<String, String> versionedStore = streams.store(
StoreQueryParameters.fromNameAndType(
"versioned-store",
QueryableStoreTypes.versionedKeyValueStore()
)
);
// Get current value
String currentValue = versionedStore.get("key1");
// Get value as of specific timestamp
long timestamp = System.currentTimeMillis() - 3600000; // 1 hour ago
VersionedRecord<String> historicalValue = versionedStore.get("key1", timestamp);
if (historicalValue != null) {
System.out.println("Value at " + timestamp + ": " + historicalValue.value());
System.out.println("Valid from: " + historicalValue.timestamp());
} else {
System.out.println("No value found at timestamp " + timestamp);
}
// Edge case: Timestamp before history retention
// Returns null if timestamp is older than history retention period
long veryOldTimestamp = System.currentTimeMillis() - (8 * 24 * 3600000L); // 8 days ago
VersionedRecord<String> oldValue = versionedStore.get("key1", veryOldTimestamp);
// May be null if history retention is < 8 days// Always close iterators to prevent resource leaks
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"my-store",
QueryableStoreTypes.keyValueStore()
)
);
// WRONG: Iterator not closed
KeyValueIterator<String, Long> iterator = store.all();
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
// Missing iterator.close() - resource leak
// CORRECT: Use try-with-resources
try (KeyValueIterator<String, Long> iterator = store.all()) {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
} // Automatically closed
// CORRECT: Manual close with finally
KeyValueIterator<String, Long> iterator = store.all();
try {
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
} finally {
iterator.close();
}// Decision matrix for store type selection:
// Use KeyValueStore when:
// - Simple key-value lookups
// - No time-based queries needed
// - Most common use case
// Use WindowStore when:
// - Time-based aggregations
// - Need to query by time range
// - Implementing sliding/tumbling windows
// Use SessionStore when:
// - Session-based aggregations
// - Variable window sizes
// - Activity-based grouping
// Use VersionedKeyValueStore when:
// - Need historical values
// - Point-in-time queries required
// - Audit trail needed
// Example: Choose store based on requirements
public StoreBuilder<?> chooseStore(String storeName, Requirements requirements) {
if (requirements.needsHistoricalQueries()) {
return Stores.versionedKeyValueStoreBuilder(
Stores.persistentVersionedKeyValueStore(storeName,
Duration.ofDays(7)),
Serdes.String(),
Serdes.String()
);
} else if (requirements.needsTimeRangeQueries()) {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
Duration.ofDays(1),
Duration.ofMinutes(5),
false),
Serdes.String(),
Serdes.Long()
);
} else {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.Long()
);
}
}// Persistent stores (RocksDB-backed)
StoreBuilder<KeyValueStore<String, Long>> persistentStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("persistent-store"),
Serdes.String(),
Serdes.Long()
);
// Advantages:
// - Can handle large datasets (GBs)
// - Survives application restarts
// - Backed by changelog for fault tolerance
// - Lower memory footprint
// Disadvantages:
// - Slower than in-memory (disk I/O)
// - Requires disk space
// - Longer startup time (restoration)
// In-memory stores
StoreBuilder<KeyValueStore<String, Long>> inMemoryStore =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("memory-store"),
Serdes.String(),
Serdes.Long()
);
// Advantages:
// - Very fast access (no disk I/O)
// - Fast startup
// - No disk space required
// Disadvantages:
// - Limited by available memory
// - Lost on application restart
// - Must rebuild from changelog on startup
// Decision guide:
// - Use persistent for: Large state, production workloads
// - Use in-memory for: Small state, caching, temporary data// Caching reduces writes to changelog and downstream processors
StoreBuilder<KeyValueStore<String, Long>> cachedStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("cached-store"),
Serdes.String(),
Serdes.Long()
).withCachingEnabled(); // Enable caching
// Global cache size
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG,
10 * 1024 * 1024); // 10MB shared across all stores
// Caching behavior:
// - Writes buffered in cache
// - Flushed on commit or cache full
// - Reduces changelog writes
// - Reduces downstream processing
// When to enable caching:
// - High write rate to store
// - Downstream processing expensive
// - Can tolerate delayed updates
// When to disable caching:
// - Need immediate consistency
// - Low write rate
// - Memory constrained
StoreBuilder<KeyValueStore<String, Long>> uncachedStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("uncached-store"),
Serdes.String(),
Serdes.Long()
).withCachingDisabled(); // Disable caching// State store changelogs are compacted topics
// Compaction removes old values for same key
StoreBuilder<KeyValueStore<String, Long>> store =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
Serdes.Long()
).withLoggingEnabled(Map.of(
"cleanup.policy", "compact",
"min.compaction.lag.ms", "0",
"segment.ms", "60000"
));
// Compaction considerations:
// - Old values eventually removed
// - Reduces changelog size
// - Faster state restoration
// - May delay if min.compaction.lag.ms is high
// For windowed stores, use delete retention
StoreBuilder<WindowStore<String, Long>> windowStore =
Stores.windowStoreBuilder(
Stores.persistentWindowStore("window-store",
Duration.ofDays(1),
Duration.ofMinutes(5),
false),
Serdes.String(),
Serdes.Long()
).withLoggingEnabled(Map.of(
"cleanup.policy", "compact,delete",
"retention.ms", "86400000", // 1 day
"delete.retention.ms", "86400000"
));// New application or changelog topic deleted
// Store starts empty and builds up from source topics
KafkaStreams streams = new KafkaStreams(topology, props);
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(TopicPartition topicPartition,
String storeName,
long startingOffset,
long endingOffset) {
if (startingOffset == 0 && endingOffset == 0) {
System.out.println("Store " + storeName +
" has no changelog - will build from scratch");
} else {
System.out.println("Restoring " + storeName +
" from offset " + startingOffset + " to " + endingOffset);
}
}
@Override
public void onBatchRestored(TopicPartition topicPartition,
String storeName,
long batchEndOffset,
long numRestored) {
// Track progress
}
@Override
public void onRestoreEnd(TopicPartition topicPartition,
String storeName,
long totalRestored) {
if (totalRestored == 0) {
System.out.println("Store " + storeName +
" started empty - building from source");
} else {
System.out.println("Restored " + totalRestored +
" records to " + storeName);
}
}
});
streams.start();// Distributed query pattern for multi-instance deployments
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.*;
import java.net.*;
import java.io.*;
public class DistributedStoreQuery {
public <K, V> V queryAnyInstance(KafkaStreams streams,
String storeName,
K key,
Serializer<K> keySerializer) throws Exception {
// Find which instance has the key
KeyQueryMetadata metadata = streams.queryMetadataForKey(
storeName,
key,
keySerializer
);
if (metadata == KeyQueryMetadata.NOT_AVAILABLE) {
throw new IllegalStateException("Store not available");
}
HostInfo activeHost = metadata.activeHost();
// Check if key is on local instance
if (isLocalHost(activeHost)) {
// Query local store
ReadOnlyKeyValueStore<K, V> store = streams.store(
StoreQueryParameters.fromNameAndType(
storeName,
QueryableStoreTypes.keyValueStore()
)
);
return store.get(key);
} else {
// Query remote instance via REST API
return queryRemoteInstance(activeHost, storeName, key);
}
}
private boolean isLocalHost(HostInfo hostInfo) {
// Check if host matches local instance
// Compare with application.server configuration
return hostInfo.host().equals("localhost") &&
hostInfo.port() == 8080;
}
private <K, V> V queryRemoteInstance(HostInfo hostInfo,
String storeName,
K key) throws Exception {
// Make HTTP request to remote instance
URL url = new URL("http://" + hostInfo.host() + ":" +
hostInfo.port() + "/store/" + storeName + "/key/" + key);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
// Read response
BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
String response = reader.readLine();
reader.close();
// Deserialize response
return deserializeResponse(response);
}
private <V> V deserializeResponse(String response) {
// Implement deserialization logic
return null;
}
}// Calculate required state store size
public class StateStoreSizing {
public long calculateRequiredDiskSpace(
long numKeys,
int avgKeySize,
int avgValueSize,
double compressionRatio) {
// RocksDB overhead: ~30-40% for indexes and metadata
double rocksdbOverhead = 1.4;
// Calculate raw data size
long rawDataSize = numKeys * (avgKeySize + avgValueSize);
// Apply compression
long compressedSize = (long) (rawDataSize * compressionRatio);
// Apply RocksDB overhead
long totalSize = (long) (compressedSize * rocksdbOverhead);
System.out.println("State store sizing:");
System.out.println(" Keys: " + numKeys);
System.out.println(" Raw data: " + (rawDataSize / 1024 / 1024) + " MB");
System.out.println(" Compressed: " + (compressedSize / 1024 / 1024) + " MB");
System.out.println(" With overhead: " + (totalSize / 1024 / 1024) + " MB");
return totalSize;
}
// Example calculation
public void example() {
// Scenario: User activity counts
// - 10 million users
// - Key: user ID (36 bytes for UUID string)
// - Value: count (8 bytes for Long)
// - Compression: ~0.7 (LZ4)
long requiredSpace = calculateRequiredDiskSpace(
10_000_000, // 10M keys
36, // UUID string
8, // Long value
0.7 // 30% compression
);
// Result: ~430 MB per instance
// For 3 instances: ~430 MB each
// For standby replicas: double the space per instance
}
}