CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-queryable-state-client-java

Java client library for accessing queryable state in Apache Flink streaming applications through a network-based query interface.

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception handling for various failure scenarios in state querying operations. The queryable state client provides specific exception types to help identify and handle different error conditions.

Capabilities

Key/Namespace Not Found

Thrown when no state exists for the given key and namespace combination.

public class UnknownKeyOrNamespaceException extends BadRequestException {
    public UnknownKeyOrNamespaceException(String serverName);
}

Cause: The specified key does not exist in the queryable state, or the namespace is invalid.

Usage Example:

client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
    .whenComplete((state, throwable) -> {
        if (throwable instanceof UnknownKeyOrNamespaceException) {
            System.out.println("No state found for key: " + key);
            // Handle missing key case - maybe return default value
        }
    });

State ID Not Found

Thrown when the requested state ID is not recognized by the server.

public class UnknownKvStateIdException extends BadRequestException {
    public UnknownKvStateIdException(String serverName, KvStateID kvStateId);
}

Cause: The state ID resolved from the job and state name is not valid or the state has been removed.

Usage Example:

client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
    .whenComplete((state, throwable) -> {
        if (throwable instanceof UnknownKvStateIdException) {
            System.err.println("State ID not recognized - state may have been removed");
            // Handle state removal case
        }
    });

Location Resolution Failed

Thrown when the location of the requested state cannot be determined.

public class UnknownLocationException extends FlinkException {
    public UnknownLocationException(String msg);
}

Cause: The JobManager cannot resolve the location of the key group containing the requested key.

Usage Example:

client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
    .whenComplete((state, throwable) -> {
        if (throwable instanceof UnknownLocationException) {
            System.err.println("Could not resolve state location: " + throwable.getMessage());
            // Handle location resolution failure - maybe retry
        }
    });

Key Group Location Not Found

Thrown when no location information is available for the key group.

public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
    public UnknownKvStateKeyGroupLocationException(String serverName);
}

Cause: The system cannot determine which TaskManager holds the key group for the requested key.

Usage Example:

client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
    .whenComplete((state, throwable) -> {
        if (throwable instanceof UnknownKvStateKeyGroupLocationException) {
            System.err.println("Key group location unknown");
            // Handle key group location failure
        }
    });

Base Exception Types

BadRequestException

Base class for request-related exceptions.

public class BadRequestException extends Exception {
    public BadRequestException(String serverName, String message);
}

Properties:

  • Contains the server name where the error occurred
  • Provides a descriptive error message

Request Failure

Protocol-level error information for debugging network issues.

public class RequestFailure {
    public RequestFailure(long requestId, Throwable cause);
    public long getRequestId();
    public Throwable getCause();
}

Usage: Internal to the network layer, typically wrapped in other exceptions.

Exception Hierarchy

Exception
├── FlinkException
│   └── UnknownLocationException
└── BadRequestException
    ├── UnknownKeyOrNamespaceException
    ├── UnknownKvStateIdException
    └── UnknownKvStateKeyGroupLocationException

Common Error Handling Patterns

Comprehensive Error Handling

client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
    .whenComplete((state, throwable) -> {
        if (throwable != null) {
            if (throwable instanceof UnknownKeyOrNamespaceException) {
                // Key doesn't exist - handle gracefully
                handleMissingKey(key);
            } else if (throwable instanceof UnknownKvStateIdException) {
                // State removed or invalid - may need to refresh job info
                handleInvalidState(stateName);
            } else if (throwable instanceof UnknownLocationException || 
                       throwable instanceof UnknownKvStateKeyGroupLocationException) {
                // Location resolution failed - consider retry
                handleLocationError(throwable);
            } else if (throwable instanceof IOException) {
                // Network or serialization error
                handleNetworkError(throwable);
            } else {
                // Other runtime errors
                handleGeneralError(throwable);
            }
        } else {
            // Success case
            processState(state);
        }
    });

Retry Logic with Exponential Backoff

public CompletableFuture<ValueState<String>> queryWithRetry(
        QueryableStateClient client, 
        JobID jobId, 
        String stateName, 
        String key,
        int maxRetries) {
    
    return queryWithRetryInternal(client, jobId, stateName, key, maxRetries, 0);
}

private CompletableFuture<ValueState<String>> queryWithRetryInternal(
        QueryableStateClient client,
        JobID jobId, 
        String stateName, 
        String key, 
        int maxRetries, 
        int attempt) {
    
    return client.getKvState(
        jobId, stateName, key, 
        TypeInformation.of(String.class),
        new ValueStateDescriptor<>("state", String.class)
    ).handle((state, throwable) -> {
        if (throwable != null && attempt < maxRetries) {
            // Retry on location errors and network issues
            if (throwable instanceof UnknownLocationException ||
                throwable instanceof UnknownKvStateKeyGroupLocationException ||
                throwable instanceof IOException) {
                
                long delay = Math.min(1000 * (1L << attempt), 10000); // Max 10s delay
                
                return CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
                    .execute(() -> queryWithRetryInternal(client, jobId, stateName, key, maxRetries, attempt + 1));
            }
        }
        
        if (throwable != null) {
            throw new RuntimeException(throwable);
        }
        
        return CompletableFuture.completedFuture(state);
    }).thenCompose(Function.identity());
}

Default Value Handling

public CompletableFuture<String> getValueWithDefault(
        QueryableStateClient client,
        JobID jobId,
        String stateName, 
        String key,
        String defaultValue) {
    
    return client.getKvState(
        jobId, stateName, key,
        TypeInformation.of(String.class),
        new ValueStateDescriptor<>("state", String.class)
    ).handle((state, throwable) -> {
        if (throwable instanceof UnknownKeyOrNamespaceException) {
            // Key doesn't exist, return default
            return defaultValue;
        } else if (throwable != null) {
            // Other errors should be propagated
            throw new RuntimeException("Query failed", throwable);
        } else {
            // Success - return actual value
            return state.value();
        }
    });
}

Debugging Tips

  1. Check Job Status: Ensure the Flink job is running and the state is properly configured as queryable
  2. Verify State Names: Confirm the queryable state name matches what's set in StateDescriptor.setQueryable()
  3. Network Connectivity: Verify network connectivity between client and Flink cluster
  4. Serialization Issues: Ensure proper serializers are configured in ExecutionConfig
  5. Key Group Distribution: Check that the key maps to an existing key group in the job

Error Recovery Strategies

  • Retry Logic: Implement exponential backoff for transient failures
  • Circuit Breaker: Prevent cascading failures in high-load scenarios
  • Fallback Values: Provide default values for missing keys
  • State Verification: Periodically verify state availability and configuration
  • Monitoring: Log and monitor exception patterns to identify systemic issues

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-queryable-state-client-java

docs

client-management.md

exception-handling.md

immutable-state.md

index.md

state-querying.md

tile.json