or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-queryable-state-runtime_2.11

Runtime server components for Apache Flink's queryable state feature enabling external applications to query live streaming job state

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-queryable-state-runtime_2.11@1.13.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-queryable-state-runtime_2.11@1.13.0

0

# Flink Queryable State Runtime

1

2

Apache Flink's queryable state runtime provides server-side components that enable external applications to query the state of running Flink streaming jobs in real-time. This library implements the infrastructure needed to expose keyed state from Flink jobs to external clients without interrupting job execution.

3

4

## Package Information

5

6

- **Package Name**: flink-queryable-state-runtime_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-queryable-state-runtime_2.11

11

- **Version**: 1.13.6

12

- **Installation**: Add to Maven dependencies or include in Flink installation

13

14

## Core Imports

15

16

```java

17

import org.apache.flink.queryablestate.server.KvStateServerImpl;

18

import org.apache.flink.queryablestate.server.KvStateServerHandler;

19

import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;

20

import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler;

21

import org.apache.flink.queryablestate.messages.KvStateInternalRequest;

22

import org.apache.flink.queryablestate.network.AbstractServerHandler;

23

import org.apache.flink.queryablestate.network.messages.MessageSerializer;

24

import org.apache.flink.runtime.query.KvStateRegistry;

25

import org.apache.flink.runtime.query.KvStateServer;

26

import org.apache.flink.runtime.query.KvStateClientProxy;

27

```

28

29

## Basic Usage

30

31

```java

32

import org.apache.flink.queryablestate.server.KvStateServerImpl;

33

import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;

34

import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;

35

import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;

36

import org.apache.flink.runtime.query.KvStateRegistry;

37

import java.util.Collections;

38

import java.util.Arrays;

39

40

// Create and start a queryable state server

41

KvStateServerImpl stateServer = new KvStateServerImpl(

42

"localhost", // bind address

43

Collections.singleton(9069).iterator(), // port range

44

1, // event loop threads

45

1, // query threads

46

kvStateRegistry, // state registry

47

new DisabledKvStateRequestStats() // stats collector

48

);

49

50

stateServer.start();

51

52

// Create and start a client proxy

53

KvStateClientProxyImpl clientProxy = new KvStateClientProxyImpl(

54

"localhost", // bind address

55

Collections.singleton(9068).iterator(), // port range

56

1, // event loop threads

57

1, // query threads

58

new DisabledKvStateRequestStats() // stats collector

59

);

60

61

clientProxy.start();

62

```

63

64

## Architecture

65

66

The queryable state runtime implements a two-tier client-server architecture:

67

68

1. **Client Proxy Layer** (`KvStateClientProxyImpl`) - Receives external client requests, handles state location lookups, and routes requests to appropriate state servers

69

2. **State Server Layer** (`KvStateServerImpl`) - Handles internal requests from proxies, queries the actual state backends, and returns serialized state values

70

71

The message flow follows this pattern:

72

External Client → Client Proxy → State Server → State Backend → Response back through chain

73

74

## Capabilities

75

76

### State Server Implementation

77

78

The core server component that handles queryable state requests from clients.

79

80

```java { .api }

81

public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse>

82

implements KvStateServer {

83

84

public KvStateServerImpl(

85

String bindAddress,

86

Iterator<Integer> bindPortIterator,

87

Integer numEventLoopThreads,

88

Integer numQueryThreads,

89

KvStateRegistry kvStateRegistry,

90

KvStateRequestStats stats

91

);

92

93

public void start() throws Throwable;

94

public InetSocketAddress getServerAddress();

95

public void shutdown();

96

public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer();

97

public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler();

98

}

99

```

100

101

**Usage Example:**

102

```java

103

// Initialize server with required dependencies

104

KvStateServerImpl server = new KvStateServerImpl(

105

"0.0.0.0", // Listen on all interfaces

106

Arrays.asList(9069, 9070, 9071).iterator(), // Try ports in sequence

107

4, // Event loop threads for network I/O

108

8, // Query threads for state access

109

taskManagerKvStateRegistry, // Registry containing state references

110

metricsCollector // Statistics collector

111

);

112

113

// Start the server

114

server.start();

115

InetSocketAddress address = server.getServerAddress();

116

System.out.println("State server listening on " + address);

117

118

// Shutdown when done

119

server.shutdown();

120

```

121

122

### State Server Request Handler

123

124

Processes individual state requests and queries the state backend.

125

126

```java { .api }

127

public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {

128

129

public KvStateServerHandler(

130

KvStateServerImpl server,

131

KvStateRegistry kvStateRegistry,

132

MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,

133

KvStateRequestStats stats

134

);

135

136

public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateInternalRequest request);

137

public CompletableFuture<Void> shutdown();

138

}

139

```

140

141

### Client Proxy Implementation

142

143

The proxy server that receives external client requests and forwards them to appropriate state servers.

144

145

```java { .api }

146

public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse>

147

implements KvStateClientProxy {

148

149

public KvStateClientProxyImpl(

150

String bindAddress,

151

Iterator<Integer> bindPortIterator,

152

Integer numEventLoopThreads,

153

Integer numQueryThreads,

154

KvStateRequestStats stats

155

);

156

157

public void start() throws Throwable;

158

public InetSocketAddress getServerAddress();

159

public void shutdown();

160

public void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle kvStateLocationOracle);

161

public KvStateLocationOracle getKvStateLocationOracle(JobID jobId);

162

public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler();

163

}

164

```

165

166

**Usage Example:**

167

```java

168

// Initialize client proxy

169

KvStateClientProxyImpl proxy = new KvStateClientProxyImpl(

170

"0.0.0.0", // Listen address

171

Arrays.asList(9068, 9067).iterator(), // Port range

172

2, // Event loop threads

173

4, // Query executor threads

174

statsCollector // Request statistics

175

);

176

177

// Start proxy server

178

proxy.start();

179

180

// Register location oracle for a job

181

JobID jobId = JobID.fromHexString("1234567890abcdef");

182

proxy.updateKvStateLocationOracle(jobId, jobLocationOracle);

183

184

// Remove oracle when job finishes

185

proxy.updateKvStateLocationOracle(jobId, null);

186

187

proxy.shutdown();

188

```

189

190

### Client Proxy Request Handler

191

192

Handles external client requests, performs state location lookups, and forwards requests to state servers.

193

194

```java { .api }

195

public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {

196

197

public KvStateClientProxyHandler(

198

KvStateClientProxyImpl proxy,

199

int queryExecutorThreads,

200

MessageSerializer<KvStateRequest, KvStateResponse> serializer,

201

KvStateRequestStats stats

202

);

203

204

public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateRequest request);

205

public CompletableFuture<Void> shutdown();

206

}

207

```

208

209

### Internal Request Messages

210

211

Message format used for communication between client proxy and state server.

212

213

```java { .api }

214

public class KvStateInternalRequest extends MessageBody {

215

216

public KvStateInternalRequest(KvStateID stateId, byte[] serializedKeyAndNamespace);

217

218

public KvStateID getKvStateId();

219

public byte[] getSerializedKeyAndNamespace();

220

public byte[] serialize();

221

222

// Contains inner static class KvStateInternalRequestDeserializer

223

}

224

```

225

226

**Usage Example:**

227

```java

228

// Create internal request for forwarding to state server

229

KvStateID stateId = new KvStateID(123L, 456L);

230

byte[] keyAndNamespace = serializeKeyAndNamespace(key, namespace);

231

232

KvStateInternalRequest internalRequest = new KvStateInternalRequest(

233

stateId,

234

keyAndNamespace

235

);

236

237

// Request can be serialized for network transmission

238

byte[] serialized = internalRequest.serialize();

239

```

240

241

### Request Message Deserialization

242

243

Deserializer for internal request messages received over the network. This is an inner static class of KvStateInternalRequest.

244

245

```java { .api }

246

// Inner static class of KvStateInternalRequest

247

public static class KvStateInternalRequest.KvStateInternalRequestDeserializer

248

implements MessageDeserializer<KvStateInternalRequest> {

249

250

public KvStateInternalRequest deserializeMessage(ByteBuf buf);

251

}

252

```

253

254

## Types

255

256

### Core Interfaces (from Flink Runtime)

257

258

```java { .api }

259

// Server interface implemented by KvStateServerImpl

260

interface KvStateServer {

261

void start() throws Throwable;

262

InetSocketAddress getServerAddress();

263

void shutdown();

264

}

265

266

// Client proxy interface implemented by KvStateClientProxyImpl

267

interface KvStateClientProxy {

268

void start() throws Throwable;

269

InetSocketAddress getServerAddress();

270

void shutdown();

271

void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle oracle);

272

KvStateLocationOracle getKvStateLocationOracle(JobID jobId);

273

}

274

```

275

276

### Dependency Types (from Flink Core/Runtime)

277

278

```java { .api }

279

// State registry containing references to queryable state instances

280

interface KvStateRegistry {

281

KvStateEntry<?, ?, ?> getKvState(KvStateID stateId);

282

}

283

284

// Statistics collector for monitoring request performance

285

interface KvStateRequestStats {

286

void reportRequest();

287

void reportSuccessfulRequest();

288

void reportFailedRequest();

289

}

290

291

// Unique identifier for state instances

292

class KvStateID {

293

public KvStateID(long lowerPart, long upperPart);

294

public long getLowerPart();

295

public long getUpperPart();

296

}

297

298

// Provides location information for queryable state

299

interface KvStateLocationOracle {

300

CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String stateName);

301

}

302

303

// Job identifier

304

class JobID {

305

public static JobID fromHexString(String hexString);

306

}

307

```

308

309

## Error Handling

310

311

The queryable state runtime can throw several types of exceptions:

312

313

- **UnknownKvStateIdException** - When requested state ID is not found in registry

314

- **UnknownKeyOrNamespaceException** - When requested key/namespace combination doesn't exist

315

- **UnknownKvStateKeyGroupLocationException** - When key group location cannot be determined

316

- **UnknownLocationException** - When state location oracle is unavailable

317

- **FlinkJobNotFoundException** - When referenced job ID is not found

318

319

These exceptions are typically wrapped in CompletableFuture responses and should be handled by client applications.

320

321

## Threading Model

322

323

Both the state server and client proxy use a multi-threaded architecture:

324

325

- **Event Loop Threads** - Handle network I/O operations (Netty event loops)

326

- **Query Threads** - Process actual state queries and location lookups

327

- **Async Processing** - All request handling returns CompletableFuture for non-blocking operations

328

329

This design ensures that network operations don't block state access and vice versa.

330

331

## Integration with Flink

332

333

This runtime library integrates with the broader Flink ecosystem:

334

335

- **State Backends** - Queries state from configured Flink state backends (RocksDB, HashMap, etc.)

336

- **Job Manager** - Coordinates with JobManager for state location information

337

- **Task Managers** - Runs on TaskManager nodes to provide direct state access

338

- **Checkpointing** - Works with Flink's checkpointing mechanism for consistent state snapshots