Runtime server components for Apache Flink's queryable state feature enabling external applications to query live streaming job state
npx @tessl/cli install tessl/maven-org-apache-flink--flink-queryable-state-runtime-2-11@1.13.0Apache Flink's queryable state runtime provides server-side components that enable external applications to query the state of running Flink streaming jobs in real-time. This library implements the infrastructure needed to expose keyed state from Flink jobs to external clients without interrupting job execution.
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.queryablestate.server.KvStateServerHandler;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.KvStateClientProxy;import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.KvStateRegistry;
import java.util.Collections;
import java.util.Arrays;
// Create and start a queryable state server
KvStateServerImpl stateServer = new KvStateServerImpl(
"localhost", // bind address
Collections.singleton(9069).iterator(), // port range
1, // event loop threads
1, // query threads
kvStateRegistry, // state registry
new DisabledKvStateRequestStats() // stats collector
);
stateServer.start();
// Create and start a client proxy
KvStateClientProxyImpl clientProxy = new KvStateClientProxyImpl(
"localhost", // bind address
Collections.singleton(9068).iterator(), // port range
1, // event loop threads
1, // query threads
new DisabledKvStateRequestStats() // stats collector
);
clientProxy.start();The queryable state runtime implements a two-tier client-server architecture:
KvStateClientProxyImpl) - Receives external client requests, handles state location lookups, and routes requests to appropriate state serversKvStateServerImpl) - Handles internal requests from proxies, queries the actual state backends, and returns serialized state valuesThe message flow follows this pattern: External Client → Client Proxy → State Server → State Backend → Response back through chain
The core server component that handles queryable state requests from clients.
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse>
implements KvStateServer {
public KvStateServerImpl(
String bindAddress,
Iterator<Integer> bindPortIterator,
Integer numEventLoopThreads,
Integer numQueryThreads,
KvStateRegistry kvStateRegistry,
KvStateRequestStats stats
);
public void start() throws Throwable;
public InetSocketAddress getServerAddress();
public void shutdown();
public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer();
public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler();
}Usage Example:
// Initialize server with required dependencies
KvStateServerImpl server = new KvStateServerImpl(
"0.0.0.0", // Listen on all interfaces
Arrays.asList(9069, 9070, 9071).iterator(), // Try ports in sequence
4, // Event loop threads for network I/O
8, // Query threads for state access
taskManagerKvStateRegistry, // Registry containing state references
metricsCollector // Statistics collector
);
// Start the server
server.start();
InetSocketAddress address = server.getServerAddress();
System.out.println("State server listening on " + address);
// Shutdown when done
server.shutdown();Processes individual state requests and queries the state backend.
public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
public KvStateServerHandler(
KvStateServerImpl server,
KvStateRegistry kvStateRegistry,
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
KvStateRequestStats stats
);
public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateInternalRequest request);
public CompletableFuture<Void> shutdown();
}The proxy server that receives external client requests and forwards them to appropriate state servers.
public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse>
implements KvStateClientProxy {
public KvStateClientProxyImpl(
String bindAddress,
Iterator<Integer> bindPortIterator,
Integer numEventLoopThreads,
Integer numQueryThreads,
KvStateRequestStats stats
);
public void start() throws Throwable;
public InetSocketAddress getServerAddress();
public void shutdown();
public void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle kvStateLocationOracle);
public KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler();
}Usage Example:
// Initialize client proxy
KvStateClientProxyImpl proxy = new KvStateClientProxyImpl(
"0.0.0.0", // Listen address
Arrays.asList(9068, 9067).iterator(), // Port range
2, // Event loop threads
4, // Query executor threads
statsCollector // Request statistics
);
// Start proxy server
proxy.start();
// Register location oracle for a job
JobID jobId = JobID.fromHexString("1234567890abcdef");
proxy.updateKvStateLocationOracle(jobId, jobLocationOracle);
// Remove oracle when job finishes
proxy.updateKvStateLocationOracle(jobId, null);
proxy.shutdown();Handles external client requests, performs state location lookups, and forwards requests to state servers.
public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
public KvStateClientProxyHandler(
KvStateClientProxyImpl proxy,
int queryExecutorThreads,
MessageSerializer<KvStateRequest, KvStateResponse> serializer,
KvStateRequestStats stats
);
public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateRequest request);
public CompletableFuture<Void> shutdown();
}Message format used for communication between client proxy and state server.
public class KvStateInternalRequest extends MessageBody {
public KvStateInternalRequest(KvStateID stateId, byte[] serializedKeyAndNamespace);
public KvStateID getKvStateId();
public byte[] getSerializedKeyAndNamespace();
public byte[] serialize();
// Contains inner static class KvStateInternalRequestDeserializer
}Usage Example:
// Create internal request for forwarding to state server
KvStateID stateId = new KvStateID(123L, 456L);
byte[] keyAndNamespace = serializeKeyAndNamespace(key, namespace);
KvStateInternalRequest internalRequest = new KvStateInternalRequest(
stateId,
keyAndNamespace
);
// Request can be serialized for network transmission
byte[] serialized = internalRequest.serialize();Deserializer for internal request messages received over the network. This is an inner static class of KvStateInternalRequest.
// Inner static class of KvStateInternalRequest
public static class KvStateInternalRequest.KvStateInternalRequestDeserializer
implements MessageDeserializer<KvStateInternalRequest> {
public KvStateInternalRequest deserializeMessage(ByteBuf buf);
}// Server interface implemented by KvStateServerImpl
interface KvStateServer {
void start() throws Throwable;
InetSocketAddress getServerAddress();
void shutdown();
}
// Client proxy interface implemented by KvStateClientProxyImpl
interface KvStateClientProxy {
void start() throws Throwable;
InetSocketAddress getServerAddress();
void shutdown();
void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle oracle);
KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
}// State registry containing references to queryable state instances
interface KvStateRegistry {
KvStateEntry<?, ?, ?> getKvState(KvStateID stateId);
}
// Statistics collector for monitoring request performance
interface KvStateRequestStats {
void reportRequest();
void reportSuccessfulRequest();
void reportFailedRequest();
}
// Unique identifier for state instances
class KvStateID {
public KvStateID(long lowerPart, long upperPart);
public long getLowerPart();
public long getUpperPart();
}
// Provides location information for queryable state
interface KvStateLocationOracle {
CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String stateName);
}
// Job identifier
class JobID {
public static JobID fromHexString(String hexString);
}The queryable state runtime can throw several types of exceptions:
These exceptions are typically wrapped in CompletableFuture responses and should be handled by client applications.
Both the state server and client proxy use a multi-threaded architecture:
This design ensures that network operations don't block state access and vice versa.
This runtime library integrates with the broader Flink ecosystem: