or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

state-management.mddocs/

0

# State Management

1

2

Apache Flink provides comprehensive state management capabilities for stateful stream processing applications. The state APIs enable functions to maintain state across events while providing fault tolerance through checkpointing and recovery mechanisms.

3

4

## State Types

5

6

### Value State

7

8

Store and update a single value per key.

9

10

```java { .api }

11

import org.apache.flink.api.common.state.ValueState;

12

import org.apache.flink.api.common.state.ValueStateDescriptor;

13

import org.apache.flink.api.common.functions.RichMapFunction;

14

import org.apache.flink.api.common.functions.OpenContext;

15

16

public class CountingMapFunction extends RichMapFunction<String, Tuple2<String, Integer>> {

17

private ValueState<Integer> countState;

18

19

@Override

20

public void open(OpenContext openContext) throws Exception {

21

// Create state descriptor

22

ValueStateDescriptor<Integer> descriptor =

23

new ValueStateDescriptor<>("count", Integer.class, 0);

24

25

// Get state handle

26

countState = getRuntimeContext().getState(descriptor);

27

}

28

29

@Override

30

public Tuple2<String, Integer> map(String value) throws Exception {

31

// Read current state

32

Integer currentCount = countState.value();

33

34

// Update state

35

currentCount++;

36

countState.update(currentCount);

37

38

return new Tuple2<>(value, currentCount);

39

}

40

}

41

```

42

43

### List State

44

45

Maintain a list of values per key.

46

47

```java { .api }

48

import org.apache.flink.api.common.state.ListState;

49

import org.apache.flink.api.common.state.ListStateDescriptor;

50

51

public class BufferingMapFunction extends RichMapFunction<String, List<String>> {

52

private ListState<String> bufferState;

53

54

@Override

55

public void open(OpenContext openContext) throws Exception {

56

ListStateDescriptor<String> descriptor =

57

new ListStateDescriptor<>("buffer", String.class);

58

bufferState = getRuntimeContext().getListState(descriptor);

59

}

60

61

@Override

62

public List<String> map(String value) throws Exception {

63

// Add to list state

64

bufferState.add(value);

65

66

// Read all values

67

List<String> allValues = new ArrayList<>();

68

for (String item : bufferState.get()) {

69

allValues.add(item);

70

}

71

72

// Clear state if buffer is full

73

if (allValues.size() > 100) {

74

bufferState.clear();

75

}

76

77

return allValues;

78

}

79

}

80

```

81

82

### Map State

83

84

Store key-value pairs as state.

85

86

```java { .api }

87

import org.apache.flink.api.common.state.MapState;

88

import org.apache.flink.api.common.state.MapStateDescriptor;

89

90

public class UserSessionFunction extends RichMapFunction<Event, SessionInfo> {

91

private MapState<String, Long> sessionStartTimes;

92

93

@Override

94

public void open(OpenContext openContext) throws Exception {

95

MapStateDescriptor<String, Long> descriptor =

96

new MapStateDescriptor<>("sessions", String.class, Long.class);

97

sessionStartTimes = getRuntimeContext().getMapState(descriptor);

98

}

99

100

@Override

101

public SessionInfo map(Event event) throws Exception {

102

String sessionId = event.getSessionId();

103

104

// Check if session exists

105

if (!sessionStartTimes.contains(sessionId)) {

106

// New session

107

sessionStartTimes.put(sessionId, event.getTimestamp());

108

}

109

110

long startTime = sessionStartTimes.get(sessionId);

111

long duration = event.getTimestamp() - startTime;

112

113

// Remove expired sessions

114

Iterator<Map.Entry<String, Long>> iterator = sessionStartTimes.iterator();

115

while (iterator.hasNext()) {

116

Map.Entry<String, Long> entry = iterator.next();

117

if (event.getTimestamp() - entry.getValue() > 3600000) { // 1 hour

118

iterator.remove();

119

}

120

}

121

122

return new SessionInfo(sessionId, startTime, duration);

123

}

124

}

125

```

126

127

### Reducing State

128

129

Aggregate values using a reduce function.

130

131

```java { .api }

132

import org.apache.flink.api.common.state.ReducingState;

133

import org.apache.flink.api.common.state.ReducingStateDescriptor;

134

import org.apache.flink.api.common.functions.ReduceFunction;

135

136

public class SumAccumulatorFunction extends RichMapFunction<Integer, Integer> {

137

private ReducingState<Integer> sumState;

138

139

@Override

140

public void open(OpenContext openContext) throws Exception {

141

ReducingStateDescriptor<Integer> descriptor =

142

new ReducingStateDescriptor<>(

143

"sum",

144

new ReduceFunction<Integer>() {

145

@Override

146

public Integer reduce(Integer value1, Integer value2) throws Exception {

147

return value1 + value2;

148

}

149

},

150

Integer.class

151

);

152

153

sumState = getRuntimeContext().getReducingState(descriptor);

154

}

155

156

@Override

157

public Integer map(Integer value) throws Exception {

158

// Add to reducing state

159

sumState.add(value);

160

161

// Get current sum

162

return sumState.get();

163

}

164

}

165

```

166

167

### Aggregating State

168

169

Use custom aggregate functions for complex aggregations.

170

171

```java { .api }

172

import org.apache.flink.api.common.state.AggregatingState;

173

import org.apache.flink.api.common.state.AggregatingStateDescriptor;

174

import org.apache.flink.api.common.functions.AggregateFunction;

175

176

public class AverageAccumulatorFunction extends RichMapFunction<Double, Double> {

177

private AggregatingState<Double, Double> avgState;

178

179

// Average aggregate function

180

public static class AverageAggregate implements AggregateFunction<Double, Tuple2<Double, Long>, Double> {

181

@Override

182

public Tuple2<Double, Long> createAccumulator() {

183

return new Tuple2<>(0.0, 0L);

184

}

185

186

@Override

187

public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {

188

return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1L);

189

}

190

191

@Override

192

public Double getResult(Tuple2<Double, Long> accumulator) {

193

return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;

194

}

195

196

@Override

197

public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {

198

return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);

199

}

200

}

201

202

@Override

203

public void open(OpenContext openContext) throws Exception {

204

AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =

205

new AggregatingStateDescriptor<>("average", new AverageAggregate(),

206

TypeInformation.of(new TypeHint<Tuple2<Double, Long>>(){}));

207

208

avgState = getRuntimeContext().getAggregatingState(descriptor);

209

}

210

211

@Override

212

public Double map(Double value) throws Exception {

213

avgState.add(value);

214

return avgState.get();

215

}

216

}

217

```

218

219

## State TTL (Time To Live)

220

221

Configure automatic state cleanup based on time.

222

223

```java { .api }

224

import org.apache.flink.api.common.state.StateTtlConfig;

225

import org.apache.flink.api.common.time.Time;

226

227

public class TTLEnabledFunction extends RichMapFunction<String, String> {

228

private ValueState<String> ttlState;

229

230

@Override

231

public void open(OpenContext openContext) throws Exception {

232

// Configure state TTL

233

StateTtlConfig ttlConfig = StateTtlConfig

234

.newBuilder(Time.hours(1)) // TTL of 1 hour

235

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

236

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

237

.cleanupFullSnapshot() // Cleanup on full snapshots

238

.cleanupIncrementally(10, true) // Incremental cleanup

239

.build();

240

241

ValueStateDescriptor<String> descriptor =

242

new ValueStateDescriptor<>("ttl-state", String.class);

243

descriptor.enableTimeToLive(ttlConfig);

244

245

ttlState = getRuntimeContext().getState(descriptor);

246

}

247

248

@Override

249

public String map(String value) throws Exception {

250

String currentValue = ttlState.value();

251

ttlState.update(value);

252

return currentValue != null ? currentValue : "first-value";

253

}

254

}

255

```

256

257

### Advanced TTL Configuration

258

259

```java { .api }

260

// Different cleanup strategies

261

StateTtlConfig incrementalCleanup = StateTtlConfig

262

.newBuilder(Time.minutes(30))

263

.cleanupIncrementally(5, true) // Clean 5 entries per access

264

.build();

265

266

StateTtlConfig fullSnapshotCleanup = StateTtlConfig

267

.newBuilder(Time.days(1))

268

.cleanupFullSnapshot() // Clean during full snapshots

269

.build();

270

271

StateTtlConfig rocksDBCleanup = StateTtlConfig

272

.newBuilder(Time.hours(2))

273

.cleanupInRocksdbCompactFilter(1000) // RocksDB compaction filter

274

.build();

275

276

// Combined cleanup strategies

277

StateTtlConfig combinedCleanup = StateTtlConfig

278

.newBuilder(Time.hours(6))

279

.cleanupIncrementally(10, true)

280

.cleanupFullSnapshot()

281

.cleanupInRocksdbCompactFilter(500)

282

.build();

283

```

284

285

## Operator State

286

287

State that is not keyed and maintained per operator instance.

288

289

```java { .api }

290

import org.apache.flink.api.common.state.ListState;

291

import org.apache.flink.api.common.state.ListStateDescriptor;

292

import org.apache.flink.runtime.state.FunctionInitializationContext;

293

import org.apache.flink.runtime.state.FunctionSnapshotContext;

294

import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;

295

296

public class BufferingMapFunction extends RichMapFunction<String, String>

297

implements CheckpointedFunction {

298

299

private List<String> bufferedElements;

300

private ListState<String> checkpointedState;

301

302

@Override

303

public String map(String value) throws Exception {

304

bufferedElements.add(value);

305

306

if (bufferedElements.size() >= 10) {

307

String result = String.join(",", bufferedElements);

308

bufferedElements.clear();

309

return result;

310

}

311

312

return null; // Buffer not full yet

313

}

314

315

@Override

316

public void snapshotState(FunctionSnapshotContext context) throws Exception {

317

checkpointedState.clear();

318

for (String element : bufferedElements) {

319

checkpointedState.add(element);

320

}

321

}

322

323

@Override

324

public void initializeState(FunctionInitializationContext context) throws Exception {

325

ListStateDescriptor<String> descriptor =

326

new ListStateDescriptor<>("buffered-elements", String.class);

327

328

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

329

330

if (context.isRestored()) {

331

// Restore state after failure

332

bufferedElements = new ArrayList<>();

333

for (String element : checkpointedState.get()) {

334

bufferedElements.add(element);

335

}

336

} else {

337

bufferedElements = new ArrayList<>();

338

}

339

}

340

}

341

```

342

343

### Union List State

344

345

For redistributing state during rescaling.

346

347

```java { .api }

348

import org.apache.flink.api.common.state.ListState;

349

import org.apache.flink.runtime.state.FunctionInitializationContext;

350

351

public class RedistributingFunction extends RichMapFunction<Integer, Integer>

352

implements CheckpointedFunction {

353

354

private List<Integer> localBuffer;

355

private ListState<Integer> unionState;

356

357

@Override

358

public void initializeState(FunctionInitializationContext context) throws Exception {

359

ListStateDescriptor<Integer> descriptor =

360

new ListStateDescriptor<>("union-state", Integer.class);

361

362

// Union list state for redistribution during rescaling

363

unionState = context.getOperatorStateStore().getUnionListState(descriptor);

364

365

localBuffer = new ArrayList<>();

366

367

if (context.isRestored()) {

368

// All subtasks receive all state elements

369

for (Integer element : unionState.get()) {

370

localBuffer.add(element);

371

}

372

}

373

}

374

375

@Override

376

public Integer map(Integer value) throws Exception {

377

localBuffer.add(value);

378

379

// Process and return result

380

return localBuffer.size();

381

}

382

383

@Override

384

public void snapshotState(FunctionSnapshotContext context) throws Exception {

385

unionState.clear();

386

for (Integer element : localBuffer) {

387

unionState.add(element);

388

}

389

}

390

}

391

```

392

393

## Broadcast State

394

395

Share read-only state across all parallel instances.

396

397

```java { .api }

398

import org.apache.flink.api.common.state.MapStateDescriptor;

399

import org.apache.flink.api.common.state.BroadcastState;

400

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;

401

402

public class RuleBasedProcessFunction extends BroadcastProcessFunction<Event, Rule, Alert> {

403

404

// Broadcast state descriptor

405

private static final MapStateDescriptor<String, Rule> RULE_STATE_DESCRIPTOR =

406

new MapStateDescriptor<>("rules", String.class, Rule.class);

407

408

@Override

409

public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out)

410

throws Exception {

411

412

// Read from broadcast state (read-only in processElement)

413

ReadOnlyBroadcastState<String, Rule> broadcastState =

414

ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);

415

416

// Apply rules to event

417

for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {

418

Rule rule = entry.getValue();

419

if (rule.matches(event)) {

420

out.collect(new Alert(event, rule));

421

}

422

}

423

}

424

425

@Override

426

public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)

427

throws Exception {

428

429

// Update broadcast state (writable in processBroadcastElement)

430

BroadcastState<String, Rule> broadcastState =

431

ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);

432

433

// Add or update rule

434

broadcastState.put(rule.getId(), rule);

435

}

436

}

437

```

438

439

## Queryable State

440

441

Make state queryable from external applications.

442

443

```java { .api }

444

import org.apache.flink.api.common.state.ValueState;

445

import org.apache.flink.api.common.state.ValueStateDescriptor;

446

import org.apache.flink.queryablestate.client.QueryableStateClient;

447

448

public class QueryableStateFunction extends RichMapFunction<Tuple2<String, Integer>, String> {

449

private ValueState<Integer> queryableState;

450

451

@Override

452

public void open(OpenContext openContext) throws Exception {

453

ValueStateDescriptor<Integer> descriptor =

454

new ValueStateDescriptor<>("queryable-count", Integer.class, 0);

455

456

// Make state queryable

457

descriptor.setQueryable("count-query");

458

459

queryableState = getRuntimeContext().getState(descriptor);

460

}

461

462

@Override

463

public String map(Tuple2<String, Integer> value) throws Exception {

464

Integer currentCount = queryableState.value();

465

currentCount += value.f1;

466

queryableState.update(currentCount);

467

468

return "Updated count for " + value.f0 + ": " + currentCount;

469

}

470

}

471

472

// Client code to query state

473

public class StateQueryClient {

474

public static void queryState() throws Exception {

475

QueryableStateClient client = new QueryableStateClient("localhost", 9069);

476

477

CompletableFuture<ValueState<Integer>> future =

478

client.getKvState(

479

JobID.generate(),

480

"count-query",

481

"key",

482

BasicTypeInfo.STRING_TYPE_INFO,

483

new ValueStateDescriptor<>("queryable-count", Integer.class)

484

);

485

486

ValueState<Integer> state = future.get();

487

Integer count = state.value();

488

System.out.println("Current count: " + count);

489

490

client.close();

491

}

492

}

493

```

494

495

## Checkpoint Listeners

496

497

React to checkpoint events.

498

499

```java { .api }

500

import org.apache.flink.api.common.state.CheckpointListener;

501

502

public class CheckpointAwareFunction extends RichMapFunction<String, String>

503

implements CheckpointListener {

504

505

private transient DatabaseConnection connection;

506

507

@Override

508

public void open(OpenContext openContext) throws Exception {

509

connection = new DatabaseConnection();

510

}

511

512

@Override

513

public String map(String value) throws Exception {

514

// Regular processing

515

return process(value);

516

}

517

518

@Override

519

public void notifyCheckpointComplete(long checkpointId) throws Exception {

520

// Checkpoint completed successfully

521

connection.commitTransaction();

522

System.out.println("Checkpoint " + checkpointId + " completed");

523

}

524

525

@Override

526

public void notifyCheckpointAborted(long checkpointId) throws Exception {

527

// Checkpoint was aborted

528

connection.rollbackTransaction();

529

System.out.println("Checkpoint " + checkpointId + " aborted");

530

}

531

}

532

```

533

534

## State Migration

535

536

Handle state evolution during application updates.

537

538

```java { .api }

539

// Version 1 of the state

540

public class StateV1 {

541

public String name;

542

public int count;

543

544

// Constructor, getters, setters

545

}

546

547

// Version 2 of the state (evolved)

548

public class StateV2 {

549

public String name;

550

public int count;

551

public long timestamp; // New field

552

553

public StateV2(StateV1 oldState) {

554

this.name = oldState.name;

555

this.count = oldState.count;

556

this.timestamp = System.currentTimeMillis();

557

}

558

}

559

560

// State migration function

561

public class MigratableStateFunction extends RichMapFunction<String, String> {

562

private ValueState<StateV2> state;

563

564

@Override

565

public void open(OpenContext openContext) throws Exception {

566

ValueStateDescriptor<StateV2> descriptor =

567

new ValueStateDescriptor<>("evolved-state", StateV2.class);

568

569

state = getRuntimeContext().getState(descriptor);

570

}

571

572

@Override

573

public String map(String value) throws Exception {

574

StateV2 currentState = state.value();

575

576

if (currentState == null) {

577

currentState = new StateV2();

578

currentState.name = value;

579

currentState.count = 1;

580

currentState.timestamp = System.currentTimeMillis();

581

} else {

582

currentState.count++;

583

currentState.timestamp = System.currentTimeMillis();

584

}

585

586

state.update(currentState);

587

return currentState.toString();

588

}

589

}

590

```

591

592

## Best Practices

593

594

### State Design Patterns

595

596

```java { .api }

597

// Use appropriate state types for your use case

598

public class StatePatternExamples extends RichMapFunction<Event, Result> {

599

600

// Single values per key

601

private ValueState<String> lastValue;

602

603

// Collections that grow over time

604

private ListState<Event> eventHistory;

605

606

// Key-value mappings

607

private MapState<String, Counter> categoryCounters;

608

609

// Aggregations

610

private ReducingState<Long> totalSum;

611

612

@Override

613

public void open(OpenContext openContext) throws Exception {

614

// Configure TTL for all states

615

StateTtlConfig ttlConfig = StateTtlConfig

616

.newBuilder(Time.hours(24))

617

.cleanupIncrementally(5, true)

618

.build();

619

620

ValueStateDescriptor<String> lastValueDesc =

621

new ValueStateDescriptor<>("last-value", String.class);

622

lastValueDesc.enableTimeToLive(ttlConfig);

623

lastValue = getRuntimeContext().getState(lastValueDesc);

624

625

// Other state descriptors with TTL...

626

}

627

628

@Override

629

public Result map(Event event) throws Exception {

630

// Efficient state operations

631

String previous = lastValue.value();

632

lastValue.update(event.getValue());

633

634

return new Result(previous, event.getValue());

635

}

636

}

637

638

// Minimize state size

639

public class EfficientStateFunction extends RichMapFunction<Event, Summary> {

640

private ValueState<CompactSummary> compactState;

641

642

// Use compact data structures

643

public static class CompactSummary {

644

public long count;

645

public double sum;

646

public long lastTimestamp;

647

648

// Compact representation instead of storing all events

649

}

650

651

@Override

652

public void open(OpenContext openContext) throws Exception {

653

ValueStateDescriptor<CompactSummary> descriptor =

654

new ValueStateDescriptor<>("compact-summary", CompactSummary.class);

655

compactState = getRuntimeContext().getState(descriptor);

656

}

657

658

@Override

659

public Summary map(Event event) throws Exception {

660

CompactSummary summary = compactState.value();

661

if (summary == null) {

662

summary = new CompactSummary();

663

}

664

665

// Update compact state

666

summary.count++;

667

summary.sum += event.getValue();

668

summary.lastTimestamp = event.getTimestamp();

669

670

compactState.update(summary);

671

672

return new Summary(summary.count, summary.sum / summary.count);

673

}

674

}

675

```

676

677

Apache Flink's state management provides powerful capabilities for building stateful stream processing applications with fault tolerance guarantees. By understanding the different state types and following best practices, you can build efficient and reliable stateful applications.