or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-queryable-state-runtime-2-11

Runtime server components for Apache Flink's queryable state feature enabling external applications to query live streaming job state

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-queryable-state-runtime_2.11@1.13.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-queryable-state-runtime-2-11@1.13.0

index.mddocs/

Flink Queryable State Runtime

Apache Flink's queryable state runtime provides server-side components that enable external applications to query the state of running Flink streaming jobs in real-time. This library implements the infrastructure needed to expose keyed state from Flink jobs to external clients without interrupting job execution.

Package Information

  • Package Name: flink-queryable-state-runtime_2.11
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-queryable-state-runtime_2.11
  • Version: 1.13.6
  • Installation: Add to Maven dependencies or include in Flink installation

Core Imports

import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.KvStateClientProxy;

Basic Usage

import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.KvStateRegistry;
import java.util.Collections;
import java.util.Arrays;

// Create and start a queryable state server
KvStateServerImpl stateServer = new KvStateServerImpl(
    "localhost",              // bind address
    Collections.singleton(9069).iterator(), // port range
    1,                        // event loop threads
    1,                        // query threads
    kvStateRegistry,          // state registry
    new DisabledKvStateRequestStats() // stats collector
);

stateServer.start();

// Create and start a client proxy
KvStateClientProxyImpl clientProxy = new KvStateClientProxyImpl(
    "localhost",              // bind address
    Collections.singleton(9068).iterator(), // port range
    1,                        // event loop threads
    1,                        // query threads
    new DisabledKvStateRequestStats() // stats collector
);

clientProxy.start();

Architecture

The queryable state runtime implements a two-tier client-server architecture:

  1. Client Proxy Layer (KvStateClientProxyImpl) - Receives external client requests, handles state location lookups, and routes requests to appropriate state servers
  2. State Server Layer (KvStateServerImpl) - Handles internal requests from proxies, queries the actual state backends, and returns serialized state values

The message flow follows this pattern: External Client → Client Proxy → State Server → State Backend → Response back through chain

Capabilities

State Server Implementation

The core server component that handles queryable state requests from clients.

public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> 
        implements KvStateServer {
    
    public KvStateServerImpl(
        String bindAddress,
        Iterator<Integer> bindPortIterator,
        Integer numEventLoopThreads,
        Integer numQueryThreads,
        KvStateRegistry kvStateRegistry,
        KvStateRequestStats stats
    );
    
    public void start() throws Throwable;
    public InetSocketAddress getServerAddress();
    public void shutdown();
    public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer();
    public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler();
}

Usage Example:

// Initialize server with required dependencies
KvStateServerImpl server = new KvStateServerImpl(
    "0.0.0.0",                           // Listen on all interfaces
    Arrays.asList(9069, 9070, 9071).iterator(), // Try ports in sequence
    4,                                   // Event loop threads for network I/O
    8,                                   // Query threads for state access
    taskManagerKvStateRegistry,          // Registry containing state references
    metricsCollector                     // Statistics collector
);

// Start the server
server.start();
InetSocketAddress address = server.getServerAddress();
System.out.println("State server listening on " + address);

// Shutdown when done
server.shutdown();

State Server Request Handler

Processes individual state requests and queries the state backend.

public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
    
    public KvStateServerHandler(
        KvStateServerImpl server,
        KvStateRegistry kvStateRegistry,
        MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
        KvStateRequestStats stats
    );
    
    public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateInternalRequest request);
    public CompletableFuture<Void> shutdown();
}

Client Proxy Implementation

The proxy server that receives external client requests and forwards them to appropriate state servers.

public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> 
        implements KvStateClientProxy {
    
    public KvStateClientProxyImpl(
        String bindAddress,
        Iterator<Integer> bindPortIterator,
        Integer numEventLoopThreads,
        Integer numQueryThreads,
        KvStateRequestStats stats
    );
    
    public void start() throws Throwable;
    public InetSocketAddress getServerAddress();
    public void shutdown();
    public void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle kvStateLocationOracle);
    public KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
    public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler();
}

Usage Example:

// Initialize client proxy
KvStateClientProxyImpl proxy = new KvStateClientProxyImpl(
    "0.0.0.0",                          // Listen address
    Arrays.asList(9068, 9067).iterator(), // Port range
    2,                                  // Event loop threads
    4,                                  // Query executor threads
    statsCollector                      // Request statistics
);

// Start proxy server
proxy.start();

// Register location oracle for a job
JobID jobId = JobID.fromHexString("1234567890abcdef");
proxy.updateKvStateLocationOracle(jobId, jobLocationOracle);

// Remove oracle when job finishes
proxy.updateKvStateLocationOracle(jobId, null);

proxy.shutdown();

Client Proxy Request Handler

Handles external client requests, performs state location lookups, and forwards requests to state servers.

public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
    
    public KvStateClientProxyHandler(
        KvStateClientProxyImpl proxy,
        int queryExecutorThreads,
        MessageSerializer<KvStateRequest, KvStateResponse> serializer,
        KvStateRequestStats stats
    );
    
    public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateRequest request);
    public CompletableFuture<Void> shutdown();
}

Internal Request Messages

Message format used for communication between client proxy and state server.

public class KvStateInternalRequest extends MessageBody {
    
    public KvStateInternalRequest(KvStateID stateId, byte[] serializedKeyAndNamespace);
    
    public KvStateID getKvStateId();
    public byte[] getSerializedKeyAndNamespace();
    public byte[] serialize();
    
    // Contains inner static class KvStateInternalRequestDeserializer
}

Usage Example:

// Create internal request for forwarding to state server
KvStateID stateId = new KvStateID(123L, 456L);
byte[] keyAndNamespace = serializeKeyAndNamespace(key, namespace);

KvStateInternalRequest internalRequest = new KvStateInternalRequest(
    stateId, 
    keyAndNamespace
);

// Request can be serialized for network transmission
byte[] serialized = internalRequest.serialize();

Request Message Deserialization

Deserializer for internal request messages received over the network. This is an inner static class of KvStateInternalRequest.

// Inner static class of KvStateInternalRequest
public static class KvStateInternalRequest.KvStateInternalRequestDeserializer 
        implements MessageDeserializer<KvStateInternalRequest> {
    
    public KvStateInternalRequest deserializeMessage(ByteBuf buf);
}

Types

Core Interfaces (from Flink Runtime)

// Server interface implemented by KvStateServerImpl
interface KvStateServer {
    void start() throws Throwable;
    InetSocketAddress getServerAddress();
    void shutdown();
}

// Client proxy interface implemented by KvStateClientProxyImpl  
interface KvStateClientProxy {
    void start() throws Throwable;
    InetSocketAddress getServerAddress();
    void shutdown();
    void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle oracle);
    KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
}

Dependency Types (from Flink Core/Runtime)

// State registry containing references to queryable state instances
interface KvStateRegistry {
    KvStateEntry<?, ?, ?> getKvState(KvStateID stateId);
}

// Statistics collector for monitoring request performance
interface KvStateRequestStats {
    void reportRequest();
    void reportSuccessfulRequest();
    void reportFailedRequest();
}

// Unique identifier for state instances
class KvStateID {
    public KvStateID(long lowerPart, long upperPart);
    public long getLowerPart();
    public long getUpperPart();
}

// Provides location information for queryable state
interface KvStateLocationOracle {
    CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String stateName);
}

// Job identifier
class JobID {
    public static JobID fromHexString(String hexString);
}

Error Handling

The queryable state runtime can throw several types of exceptions:

  • UnknownKvStateIdException - When requested state ID is not found in registry
  • UnknownKeyOrNamespaceException - When requested key/namespace combination doesn't exist
  • UnknownKvStateKeyGroupLocationException - When key group location cannot be determined
  • UnknownLocationException - When state location oracle is unavailable
  • FlinkJobNotFoundException - When referenced job ID is not found

These exceptions are typically wrapped in CompletableFuture responses and should be handled by client applications.

Threading Model

Both the state server and client proxy use a multi-threaded architecture:

  • Event Loop Threads - Handle network I/O operations (Netty event loops)
  • Query Threads - Process actual state queries and location lookups
  • Async Processing - All request handling returns CompletableFuture for non-blocking operations

This design ensures that network operations don't block state access and vice versa.

Integration with Flink

This runtime library integrates with the broader Flink ecosystem:

  • State Backends - Queries state from configured Flink state backends (RocksDB, HashMap, etc.)
  • Job Manager - Coordinates with JobManager for state location information
  • Task Managers - Runs on TaskManager nodes to provide direct state access
  • Checkpointing - Works with Flink's checkpointing mechanism for consistent state snapshots