Java client library for accessing queryable state in Apache Flink streaming applications through a network-based query interface.
—
Comprehensive exception handling for various failure scenarios in state querying operations. The queryable state client provides specific exception types to help identify and handle different error conditions.
Thrown when no state exists for the given key and namespace combination.
public class UnknownKeyOrNamespaceException extends BadRequestException {
public UnknownKeyOrNamespaceException(String serverName);
}Cause: The specified key does not exist in the queryable state, or the namespace is invalid.
Usage Example:
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
.whenComplete((state, throwable) -> {
if (throwable instanceof UnknownKeyOrNamespaceException) {
System.out.println("No state found for key: " + key);
// Handle missing key case - maybe return default value
}
});Thrown when the requested state ID is not recognized by the server.
public class UnknownKvStateIdException extends BadRequestException {
public UnknownKvStateIdException(String serverName, KvStateID kvStateId);
}Cause: The state ID resolved from the job and state name is not valid or the state has been removed.
Usage Example:
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
.whenComplete((state, throwable) -> {
if (throwable instanceof UnknownKvStateIdException) {
System.err.println("State ID not recognized - state may have been removed");
// Handle state removal case
}
});Thrown when the location of the requested state cannot be determined.
public class UnknownLocationException extends FlinkException {
public UnknownLocationException(String msg);
}Cause: The JobManager cannot resolve the location of the key group containing the requested key.
Usage Example:
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
.whenComplete((state, throwable) -> {
if (throwable instanceof UnknownLocationException) {
System.err.println("Could not resolve state location: " + throwable.getMessage());
// Handle location resolution failure - maybe retry
}
});Thrown when no location information is available for the key group.
public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
public UnknownKvStateKeyGroupLocationException(String serverName);
}Cause: The system cannot determine which TaskManager holds the key group for the requested key.
Usage Example:
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
.whenComplete((state, throwable) -> {
if (throwable instanceof UnknownKvStateKeyGroupLocationException) {
System.err.println("Key group location unknown");
// Handle key group location failure
}
});Base class for request-related exceptions.
public class BadRequestException extends Exception {
public BadRequestException(String serverName, String message);
}Properties:
Protocol-level error information for debugging network issues.
public class RequestFailure {
public RequestFailure(long requestId, Throwable cause);
public long getRequestId();
public Throwable getCause();
}Usage: Internal to the network layer, typically wrapped in other exceptions.
Exception
├── FlinkException
│ └── UnknownLocationException
└── BadRequestException
├── UnknownKeyOrNamespaceException
├── UnknownKvStateIdException
└── UnknownKvStateKeyGroupLocationExceptionclient.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
.whenComplete((state, throwable) -> {
if (throwable != null) {
if (throwable instanceof UnknownKeyOrNamespaceException) {
// Key doesn't exist - handle gracefully
handleMissingKey(key);
} else if (throwable instanceof UnknownKvStateIdException) {
// State removed or invalid - may need to refresh job info
handleInvalidState(stateName);
} else if (throwable instanceof UnknownLocationException ||
throwable instanceof UnknownKvStateKeyGroupLocationException) {
// Location resolution failed - consider retry
handleLocationError(throwable);
} else if (throwable instanceof IOException) {
// Network or serialization error
handleNetworkError(throwable);
} else {
// Other runtime errors
handleGeneralError(throwable);
}
} else {
// Success case
processState(state);
}
});public CompletableFuture<ValueState<String>> queryWithRetry(
QueryableStateClient client,
JobID jobId,
String stateName,
String key,
int maxRetries) {
return queryWithRetryInternal(client, jobId, stateName, key, maxRetries, 0);
}
private CompletableFuture<ValueState<String>> queryWithRetryInternal(
QueryableStateClient client,
JobID jobId,
String stateName,
String key,
int maxRetries,
int attempt) {
return client.getKvState(
jobId, stateName, key,
TypeInformation.of(String.class),
new ValueStateDescriptor<>("state", String.class)
).handle((state, throwable) -> {
if (throwable != null && attempt < maxRetries) {
// Retry on location errors and network issues
if (throwable instanceof UnknownLocationException ||
throwable instanceof UnknownKvStateKeyGroupLocationException ||
throwable instanceof IOException) {
long delay = Math.min(1000 * (1L << attempt), 10000); // Max 10s delay
return CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
.execute(() -> queryWithRetryInternal(client, jobId, stateName, key, maxRetries, attempt + 1));
}
}
if (throwable != null) {
throw new RuntimeException(throwable);
}
return CompletableFuture.completedFuture(state);
}).thenCompose(Function.identity());
}public CompletableFuture<String> getValueWithDefault(
QueryableStateClient client,
JobID jobId,
String stateName,
String key,
String defaultValue) {
return client.getKvState(
jobId, stateName, key,
TypeInformation.of(String.class),
new ValueStateDescriptor<>("state", String.class)
).handle((state, throwable) -> {
if (throwable instanceof UnknownKeyOrNamespaceException) {
// Key doesn't exist, return default
return defaultValue;
} else if (throwable != null) {
// Other errors should be propagated
throw new RuntimeException("Query failed", throwable);
} else {
// Success - return actual value
return state.value();
}
});
}StateDescriptor.setQueryable()Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-queryable-state-client-java