or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-interfaces.mdindex.mdsavepoint-management.mdstate-reading.mdstate-writing.mdwindow-operations.md

state-reading.mddocs/

0

# State Reading

1

2

State reading functionality allows you to extract and process state data from existing savepoints. The API supports reading different types of Flink state including operator state (list, union, broadcast) and keyed state.

3

4

## Operator State Reading

5

6

### List State

7

8

Read operator list state as individual elements.

9

10

```java { .api }

11

public <T> DataSource<T> readListState(

12

String uid,

13

String name,

14

TypeInformation<T> typeInfo

15

) throws IOException;

16

17

public <T> DataSource<T> readListState(

18

String uid,

19

String name,

20

TypeInformation<T> typeInfo,

21

TypeSerializer<T> serializer

22

) throws IOException;

23

```

24

25

**Usage Example:**

26

27

```java

28

// Read list state with type information

29

DataSource<String> messages = savepoint.readListState(

30

"kafka-source",

31

"buffered-messages",

32

Types.STRING

33

);

34

35

// Read with custom serializer

36

TypeSerializer<MyCustomType> customSerializer = new MyCustomTypeSerializer();

37

DataSource<MyCustomType> customData = savepoint.readListState(

38

"custom-operator",

39

"custom-state",

40

TypeInformation.of(MyCustomType.class),

41

customSerializer

42

);

43

44

// Process the data

45

messages.map(msg -> "Processed: " + msg).print();

46

```

47

48

### Union State

49

50

Read operator union state which is similar to list state but with different semantics for redistribution.

51

52

```java { .api }

53

public <T> DataSource<T> readUnionState(

54

String uid,

55

String name,

56

TypeInformation<T> typeInfo

57

) throws IOException;

58

59

public <T> DataSource<T> readUnionState(

60

String uid,

61

String name,

62

TypeInformation<T> typeInfo,

63

TypeSerializer<T> serializer

64

) throws IOException;

65

```

66

67

**Usage Example:**

68

69

```java

70

// Read union state

71

DataSource<Configuration> configs = savepoint.readUnionState(

72

"config-broadcaster",

73

"broadcast-config",

74

TypeInformation.of(Configuration.class)

75

);

76

77

configs.map(config -> processConfiguration(config)).collect();

78

```

79

80

### Broadcast State

81

82

Read broadcast state as key-value pairs.

83

84

```java { .api }

85

public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(

86

String uid,

87

String name,

88

TypeInformation<K> keyTypeInfo,

89

TypeInformation<V> valueTypeInfo

90

) throws IOException;

91

92

public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(

93

String uid,

94

String name,

95

TypeInformation<K> keyTypeInfo,

96

TypeInformation<V> valueTypeInfo,

97

TypeSerializer<K> keySerializer,

98

TypeSerializer<V> valueSerializer

99

) throws IOException;

100

```

101

102

**Usage Example:**

103

104

```java

105

// Read broadcast state as key-value pairs

106

DataSource<Tuple2<String, Rule>> rules = savepoint.readBroadcastState(

107

"rule-processor",

108

"rules-broadcast-state",

109

Types.STRING,

110

TypeInformation.of(Rule.class)

111

);

112

113

// Process the rules

114

rules.map(tuple -> "Rule " + tuple.f0 + ": " + tuple.f1.getDescription())

115

.print();

116

```

117

118

## Keyed State Reading

119

120

Read keyed state using custom reader functions that process each key individually.

121

122

```java { .api }

123

public <K, OUT> DataSource<OUT> readKeyedState(

124

String uid,

125

KeyedStateReaderFunction<K, OUT> function

126

) throws IOException;

127

128

public <K, OUT> DataSource<OUT> readKeyedState(

129

String uid,

130

KeyedStateReaderFunction<K, OUT> function,

131

TypeInformation<K> keyTypeInfo,

132

TypeInformation<OUT> outTypeInfo

133

) throws IOException;

134

```

135

136

### KeyedStateReaderFunction Interface

137

138

```java { .api }

139

public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {

140

public abstract void open(Configuration parameters) throws Exception;

141

142

public abstract void readKey(

143

K key,

144

Context ctx,

145

Collector<OUT> out

146

) throws Exception;

147

148

public interface Context {

149

Set<Long> registeredEventTimeTimers() throws Exception;

150

Set<Long> registeredProcessingTimeTimers() throws Exception;

151

}

152

}

153

```

154

155

**Usage Example:**

156

157

```java

158

public class MyKeyedStateReader extends KeyedStateReaderFunction<String, UserStats> {

159

private ValueState<Long> countState;

160

private ValueState<Double> avgState;

161

162

@Override

163

public void open(Configuration parameters) throws Exception {

164

// Register state descriptors in open()

165

ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>(

166

"count", Long.class

167

);

168

countState = getRuntimeContext().getState(countDesc);

169

170

ValueStateDescriptor<Double> avgDesc = new ValueStateDescriptor<>(

171

"average", Double.class

172

);

173

avgState = getRuntimeContext().getState(avgDesc);

174

}

175

176

@Override

177

public void readKey(String key, Context ctx, Collector<UserStats> out) throws Exception {

178

Long count = countState.value();

179

Double average = avgState.value();

180

181

if (count != null && average != null) {

182

UserStats stats = new UserStats(key, count, average);

183

out.collect(stats);

184

}

185

186

// Access timer information if needed

187

Set<Long> eventTimers = ctx.registeredEventTimeTimers();

188

Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();

189

}

190

}

191

192

// Use the reader function

193

DataSource<UserStats> userStats = savepoint.readKeyedState(

194

"user-processor",

195

new MyKeyedStateReader()

196

);

197

198

userStats.print();

199

```

200

201

### Reading Different Keyed State Types

202

203

**Value State:**

204

205

```java

206

public class ValueStateReader extends KeyedStateReaderFunction<String, Tuple2<String, String>> {

207

private ValueState<String> valueState;

208

209

@Override

210

public void open(Configuration parameters) throws Exception {

211

ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("value", String.class);

212

valueState = getRuntimeContext().getState(desc);

213

}

214

215

@Override

216

public void readKey(String key, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {

217

String value = valueState.value();

218

if (value != null) {

219

out.collect(Tuple2.of(key, value));

220

}

221

}

222

}

223

```

224

225

**List State:**

226

227

```java

228

public class ListStateReader extends KeyedStateReaderFunction<String, Tuple2<String, List<String>>> {

229

private ListState<String> listState;

230

231

@Override

232

public void open(Configuration parameters) throws Exception {

233

ListStateDescriptor<String> desc = new ListStateDescriptor<>("list", String.class);

234

listState = getRuntimeContext().getListState(desc);

235

}

236

237

@Override

238

public void readKey(String key, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {

239

List<String> items = new ArrayList<>();

240

for (String item : listState.get()) {

241

items.add(item);

242

}

243

if (!items.isEmpty()) {

244

out.collect(Tuple2.of(key, items));

245

}

246

}

247

}

248

```

249

250

**Map State:**

251

252

```java

253

public class MapStateReader extends KeyedStateReaderFunction<String, Tuple3<String, String, Integer>> {

254

private MapState<String, Integer> mapState;

255

256

@Override

257

public void open(Configuration parameters) throws Exception {

258

MapStateDescriptor<String, Integer> desc = new MapStateDescriptor<>(

259

"map", String.class, Integer.class

260

);

261

mapState = getRuntimeContext().getMapState(desc);

262

}

263

264

@Override

265

public void readKey(String key, Context ctx, Collector<Tuple3<String, String, Integer>> out) throws Exception {

266

for (Map.Entry<String, Integer> entry : mapState.entries()) {

267

out.collect(Tuple3.of(key, entry.getKey(), entry.getValue()));

268

}

269

}

270

}

271

```

272

273

## Advanced Reading Patterns

274

275

### Filtering State Data

276

277

```java

278

public class FilteringStateReader extends KeyedStateReaderFunction<String, FilteredData> {

279

private ValueState<MyData> dataState;

280

private final Predicate<MyData> filter;

281

282

public FilteringStateReader(Predicate<MyData> filter) {

283

this.filter = filter;

284

}

285

286

@Override

287

public void open(Configuration parameters) throws Exception {

288

ValueStateDescriptor<MyData> desc = new ValueStateDescriptor<>("data", MyData.class);

289

dataState = getRuntimeContext().getState(desc);

290

}

291

292

@Override

293

public void readKey(String key, Context ctx, Collector<FilteredData> out) throws Exception {

294

MyData data = dataState.value();

295

if (data != null && filter.test(data)) {

296

out.collect(new FilteredData(key, data));

297

}

298

}

299

}

300

301

// Usage

302

DataSource<FilteredData> filtered = savepoint.readKeyedState(

303

"data-processor",

304

new FilteringStateReader(data -> data.getScore() > 0.8)

305

);

306

```

307

308

### Aggregating State Across Keys

309

310

```java

311

public class AggregatingStateReader extends KeyedStateReaderFunction<String, KeyAggregate> {

312

private ValueState<Double> valueState;

313

private ListState<String> tagState;

314

315

@Override

316

public void open(Configuration parameters) throws Exception {

317

valueState = getRuntimeContext().getState(

318

new ValueStateDescriptor<>("value", Double.class)

319

);

320

tagState = getRuntimeContext().getListState(

321

new ListStateDescriptor<>("tags", String.class)

322

);

323

}

324

325

@Override

326

public void readKey(String key, Context ctx, Collector<KeyAggregate> out) throws Exception {

327

Double value = valueState.value();

328

List<String> tags = new ArrayList<>();

329

tagState.get().forEach(tags::add);

330

331

if (value != null) {

332

KeyAggregate aggregate = new KeyAggregate(key, value, tags);

333

out.collect(aggregate);

334

}

335

}

336

}

337

```

338

339

## Error Handling

340

341

### Common Reading Errors

342

343

```java

344

try {

345

DataSource<MyData> data = savepoint.readListState(

346

"operator-uid",

347

"state-name",

348

TypeInformation.of(MyData.class)

349

);

350

351

data.collect();

352

353

} catch (IOException e) {

354

if (e.getMessage().contains("does not exist")) {

355

System.err.println("Operator UID not found in savepoint");

356

} else if (e.getMessage().contains("state")) {

357

System.err.println("State descriptor not found");

358

} else {

359

System.err.println("Failed to read state: " + e.getMessage());

360

}

361

}

362

```

363

364

### State Descriptor Registration Errors

365

366

```java

367

public class SafeStateReader extends KeyedStateReaderFunction<String, MyOutput> {

368

private ValueState<String> state;

369

370

@Override

371

public void open(Configuration parameters) throws Exception {

372

try {

373

ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("myState", String.class);

374

state = getRuntimeContext().getState(desc);

375

} catch (Exception e) {

376

throw new RuntimeException("Failed to register state descriptor", e);

377

}

378

}

379

380

@Override

381

public void readKey(String key, Context ctx, Collector<MyOutput> out) throws Exception {

382

try {

383

String value = state.value();

384

if (value != null) {

385

out.collect(new MyOutput(key, value));

386

}

387

} catch (Exception e) {

388

System.err.println("Failed to read state for key " + key + ": " + e.getMessage());

389

// Could collect error record or skip

390

}

391

}

392

}

393

```

394

395

## Performance Considerations

396

397

### Parallelism Settings

398

399

```java

400

// Set appropriate parallelism for reading operations

401

env.setParallelism(8);

402

403

DataSource<MyData> data = savepoint.readKeyedState("operator", readerFunction);

404

405

// Can override parallelism for specific operations

406

data.setParallelism(4).map(processData).print();

407

```

408

409

### Memory Management

410

411

```java

412

// For large state, consider processing in batches

413

public class BatchingStateReader extends KeyedStateReaderFunction<String, List<MyData>> {

414

private ListState<MyData> listState;

415

416

@Override

417

public void readKey(String key, Context ctx, Collector<List<MyData>> out) throws Exception {

418

List<MyData> batch = new ArrayList<>();

419

int batchSize = 0;

420

421

for (MyData item : listState.get()) {

422

batch.add(item);

423

batchSize++;

424

425

// Emit batch when it reaches size limit

426

if (batchSize >= 1000) {

427

out.collect(new ArrayList<>(batch));

428

batch.clear();

429

batchSize = 0;

430

}

431

}

432

433

// Emit remaining items

434

if (!batch.isEmpty()) {

435

out.collect(batch);

436

}

437

}

438

}

439

```