Java client library for accessing queryable state in Apache Flink streaming applications through a network-based query interface.
—
Asynchronous state querying capabilities for accessing different types of keyed state from running Flink jobs. All state queries return immutable, read-only state objects.
Query keyed state using TypeInformation for type safety.
public <K, S extends State, V> CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeInformation<K> keyTypeInfo,
StateDescriptor<S, V> stateDescriptor
);Type Parameters:
K - Type of the keyS - Type of the state (extends State)V - Type of the value stored in the stateParameters:
jobId - JobID of the job containing the queryable statequeryableStateName - Name under which the state is queryable (set via StateDescriptor.setQueryable())key - The key to query forkeyTypeInfo - TypeInformation for the key typestateDescriptor - StateDescriptor matching the state configuration in the Flink jobReturns:
CompletableFuture<S> - Future containing an immutable state objectUsage Example:
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
JobID jobId = JobID.fromHexString("a1b2c3d4e5f6789012345678901234567890abcd");
String stateName = "userProfiles";
String userId = "user123";
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("userProfiles", UserProfile.class);
CompletableFuture<ValueState<UserProfile>> future = client.getKvState(
jobId,
stateName,
userId,
TypeInformation.of(String.class),
descriptor
);
future.thenAccept(state -> {
UserProfile profile = state.value();
System.out.println("User profile: " + profile);
});Query keyed state using TypeHint for generic type inference.
public <K, S extends State, V> CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeHint<K> keyTypeHint,
StateDescriptor<S, V> stateDescriptor
);Parameters:
TypeHint<K> instead of TypeInformation<K>Usage Example:
import org.apache.flink.api.common.typeinfo.TypeHint;
CompletableFuture<ValueState<String>> future = client.getKvState(
jobId,
stateName,
"myKey",
new TypeHint<String>() {}, // Type hint for String key
new ValueStateDescriptor<>("state", String.class)
);The client supports querying all Flink state types:
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("myValue", String.class);
CompletableFuture<ValueState<String>> future = client.getKvState(/*...*/);ListStateDescriptor<Integer> descriptor =
new ListStateDescriptor<>("myList", Integer.class);
CompletableFuture<ListState<Integer>> future = client.getKvState(/*...*/);MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>("myMap", String.class, Long.class);
CompletableFuture<MapState<String, Long>> future = client.getKvState(/*...*/);ReducingStateDescriptor<Integer> descriptor =
new ReducingStateDescriptor<>("myReducing", Integer::sum, Integer.class);
CompletableFuture<ReducingState<Integer>> future = client.getKvState(/*...*/);AggregatingStateDescriptor<Integer, Long, Double> descriptor =
new AggregatingStateDescriptor<>("myAggregating", aggregateFunction, Long.class);
CompletableFuture<AggregatingState<Integer, Double>> future = client.getKvState(/*...*/);The state querying process involves several steps:
State queries can fail for various reasons. Common exceptions include:
UnknownKeyOrNamespaceException - No state exists for the given key/namespaceUnknownKvStateIdException - The state ID is not recognizedUnknownLocationException - State location cannot be resolvedIOException - Network or serialization errorsFlinkRuntimeException - General runtime errorsError Handling Example:
future.whenComplete((state, throwable) -> {
if (throwable != null) {
if (throwable instanceof UnknownKeyOrNamespaceException) {
System.out.println("No state found for key: " + key);
} else if (throwable instanceof UnknownLocationException) {
System.err.println("Could not resolve state location");
} else {
System.err.println("Query failed: " + throwable.getMessage());
}
} else {
// Process successful result
processState(state);
}
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-queryable-state-client-java