or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdcore-functions.mddatastream-traditional.mddatastream-v2.mdindex.mdstate-management.mdtable-api.mdwindowing.md

state-management.mddocs/

0

# State Management

1

2

Comprehensive state management API supporting both synchronous and asynchronous operations. Flink provides different types of state for various use cases including value state, list state, map state, and specialized aggregating states for stateful stream processing applications.

3

4

## Capabilities

5

6

### State Interfaces (Synchronous API)

7

8

Core state interfaces for traditional synchronous state access.

9

10

```java { .api }

11

/**

12

* Base interface for all state types

13

*/

14

interface State {

15

/**

16

* Clear the state

17

*/

18

void clear();

19

}

20

21

/**

22

* State that holds a single value

23

* @param <T> Type of the value

24

*/

25

interface ValueState<T> extends State {

26

/**

27

* Get current value

28

* @return Current value, null if not set

29

* @throws Exception

30

*/

31

T value() throws Exception;

32

33

/**

34

* Update the state value

35

* @param value New value

36

* @throws Exception

37

*/

38

void update(T value) throws Exception;

39

}

40

41

/**

42

* State that holds a list of elements

43

* @param <T> Type of elements in the list

44

*/

45

interface ListState<T> extends State {

46

/**

47

* Get all elements in the list

48

* @return Iterable over all elements

49

* @throws Exception

50

*/

51

Iterable<T> get() throws Exception;

52

53

/**

54

* Add element to the list

55

* @param value Element to add

56

* @throws Exception

57

*/

58

void add(T value) throws Exception;

59

60

/**

61

* Add all elements from iterable to the list

62

* @param values Elements to add

63

* @throws Exception

64

*/

65

void addAll(List<T> values) throws Exception;

66

67

/**

68

* Replace all elements in the list

69

* @param values New elements

70

* @throws Exception

71

*/

72

void update(List<T> values) throws Exception;

73

}

74

75

/**

76

* State that holds a key-value map

77

* @param <UK> Type of user keys

78

* @param <UV> Type of user values

79

*/

80

interface MapState<UK, UV> extends State {

81

/**

82

* Get value for the given key

83

* @param key User key

84

* @return Value for the key, null if not present

85

* @throws Exception

86

*/

87

UV get(UK key) throws Exception;

88

89

/**

90

* Associate value with key

91

* @param key User key

92

* @param value User value

93

* @throws Exception

94

*/

95

void put(UK key, UV value) throws Exception;

96

97

/**

98

* Add all key-value pairs from map

99

* @param map Key-value pairs to add

100

* @throws Exception

101

*/

102

void putAll(Map<UK, UV> map) throws Exception;

103

104

/**

105

* Remove key-value pair

106

* @param key Key to remove

107

* @throws Exception

108

*/

109

void remove(UK key) throws Exception;

110

111

/**

112

* Check if key exists

113

* @param key Key to check

114

* @return true if key exists

115

* @throws Exception

116

*/

117

boolean contains(UK key) throws Exception;

118

119

/**

120

* Get all entries

121

* @return Iterable over all entries

122

* @throws Exception

123

*/

124

Iterable<Map.Entry<UK, UV>> entries() throws Exception;

125

126

/**

127

* Get all keys

128

* @return Iterable over all keys

129

* @throws Exception

130

*/

131

Iterable<UK> keys() throws Exception;

132

133

/**

134

* Get all values

135

* @return Iterable over all values

136

* @throws Exception

137

*/

138

Iterable<UV> values() throws Exception;

139

140

/**

141

* Check if state is empty

142

* @return true if no entries

143

* @throws Exception

144

*/

145

boolean isEmpty() throws Exception;

146

}

147

148

/**

149

* State for pre-aggregating values using AggregateFunction

150

* @param <IN> Input type

151

* @param <OUT> Output type

152

*/

153

interface AggregatingState<IN, OUT> extends State {

154

/**

155

* Get current aggregated result

156

* @return Aggregated result

157

* @throws Exception

158

*/

159

OUT get() throws Exception;

160

161

/**

162

* Add value to aggregation

163

* @param value Value to add

164

* @throws Exception

165

*/

166

void add(IN value) throws Exception;

167

}

168

169

/**

170

* State that reduces values on-the-fly using ReduceFunction

171

* @param <T> Element type

172

*/

173

interface ReducingState<T> extends State {

174

/**

175

* Get current reduced result

176

* @return Reduced result

177

* @throws Exception

178

*/

179

T get() throws Exception;

180

181

/**

182

* Add value to reduction

183

* @param value Value to add

184

* @throws Exception

185

*/

186

void add(T value) throws Exception;

187

}

188

189

/**

190

* State for broadcast patterns - read-only for non-broadcast stream

191

* @param <K> Key type

192

* @param <V> Value type

193

*/

194

interface BroadcastState<K, V> extends State {

195

/**

196

* Get value for key (read-only for non-broadcast stream)

197

* @param key Key to lookup

198

* @return Value for key

199

* @throws Exception

200

*/

201

V get(K key) throws Exception;

202

203

/**

204

* Check if key exists (read-only for non-broadcast stream)

205

* @param key Key to check

206

* @return true if key exists

207

* @throws Exception

208

*/

209

boolean contains(K key) throws Exception;

210

211

/**

212

* Get all entries (read-only for non-broadcast stream)

213

* @return Iterable over all entries

214

* @throws Exception

215

*/

216

Iterable<Map.Entry<K, V>> entries() throws Exception;

217

218

/**

219

* Get all keys (read-only for non-broadcast stream)

220

* @return Iterable over all keys

221

* @throws Exception

222

*/

223

Iterable<K> keys() throws Exception;

224

225

/**

226

* Get all values (read-only for non-broadcast stream)

227

* @return Iterable over all values

228

* @throws Exception

229

*/

230

Iterable<V> values() throws Exception;

231

232

// Write operations available only in broadcast stream processing functions

233

234

/**

235

* Associate value with key (broadcast stream only)

236

* @param key Key

237

* @param value Value

238

* @throws Exception

239

*/

240

void put(K key, V value) throws Exception;

241

242

/**

243

* Add all key-value pairs (broadcast stream only)

244

* @param map Key-value pairs to add

245

* @throws Exception

246

*/

247

void putAll(Map<K, V> map) throws Exception;

248

249

/**

250

* Remove key-value pair (broadcast stream only)

251

* @param key Key to remove

252

* @throws Exception

253

*/

254

void remove(K key) throws Exception;

255

}

256

```

257

258

### State Descriptors

259

260

Descriptors for creating and configuring state variables.

261

262

```java { .api }

263

/**

264

* Base descriptor for state variables

265

* @param <S> State type

266

* @param <T> Value type

267

*/

268

abstract class StateDescriptor<S extends State, T> {

269

/**

270

* Get state name

271

* @return State name

272

*/

273

public String getName();

274

275

/**

276

* Get type information

277

* @return Type information

278

*/

279

public TypeInformation<T> getTypeInformation();

280

281

/**

282

* Set default value

283

* @param defaultValue Default value

284

*/

285

public void setDefaultValue(T defaultValue);

286

287

/**

288

* Get default value

289

* @return Default value

290

*/

291

public T getDefaultValue();

292

}

293

294

/**

295

* Descriptor for ValueState

296

* @param <T> Value type

297

*/

298

class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {

299

/**

300

* Create descriptor with name and type information

301

* @param name State name

302

* @param typeInfo Type information

303

*/

304

public ValueStateDescriptor(String name, TypeInformation<T> typeInfo);

305

306

/**

307

* Create descriptor with name and type class

308

* @param name State name

309

* @param typeClass Type class

310

*/

311

public ValueStateDescriptor(String name, Class<T> typeClass);

312

}

313

314

/**

315

* Descriptor for ListState

316

* @param <T> Element type

317

*/

318

class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {

319

/**

320

* Create descriptor with name and element type information

321

* @param name State name

322

* @param elementTypeInfo Element type information

323

*/

324

public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo);

325

326

/**

327

* Create descriptor with name and element type class

328

* @param name State name

329

* @param elementTypeClass Element type class

330

*/

331

public ListStateDescriptor(String name, Class<T> elementTypeClass);

332

}

333

334

/**

335

* Descriptor for MapState

336

* @param <UK> User key type

337

* @param <UV> User value type

338

*/

339

class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {

340

/**

341

* Create descriptor with name and type information

342

* @param name State name

343

* @param keyTypeInfo Key type information

344

* @param valueTypeInfo Value type information

345

*/

346

public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo);

347

348

/**

349

* Create descriptor with name and type classes

350

* @param name State name

351

* @param keyTypeClass Key type class

352

* @param valueTypeClass Value type class

353

*/

354

public MapStateDescriptor(String name, Class<UK> keyTypeClass, Class<UV> valueTypeClass);

355

}

356

357

/**

358

* Descriptor for AggregatingState

359

* @param <IN> Input type

360

* @param <ACC> Accumulator type

361

* @param <OUT> Output type

362

*/

363

class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {

364

/**

365

* Create descriptor with name, aggregate function, and accumulator type

366

* @param name State name

367

* @param aggFunction Aggregate function

368

* @param accTypeInfo Accumulator type information

369

*/

370

public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> accTypeInfo);

371

}

372

373

/**

374

* Descriptor for ReducingState

375

* @param <T> Element type

376

*/

377

class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {

378

/**

379

* Create descriptor with name, reduce function, and type information

380

* @param name State name

381

* @param reduceFunction Reduce function

382

* @param typeInfo Type information

383

*/

384

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo);

385

}

386

```

387

388

### State v2 API (Asynchronous)

389

390

Next-generation state API supporting asynchronous operations for improved performance.

391

392

```java { .api }

393

/**

394

* Base interface for async state API

395

*/

396

interface org.apache.flink.api.common.state.v2.State {

397

/**

398

* Clear the state asynchronously

399

* @return Future representing completion

400

*/

401

StateFuture<Void> asyncClear();

402

}

403

404

/**

405

* Async single value state

406

* @param <T> Value type

407

*/

408

interface org.apache.flink.api.common.state.v2.ValueState<T> extends org.apache.flink.api.common.state.v2.State {

409

/**

410

* Get current value asynchronously

411

* @return Future with current value

412

*/

413

StateFuture<T> asyncValue();

414

415

/**

416

* Update state value asynchronously

417

* @param value New value

418

* @return Future representing completion

419

*/

420

StateFuture<Void> asyncUpdate(T value);

421

}

422

423

/**

424

* Async list state

425

* @param <T> Element type

426

*/

427

interface org.apache.flink.api.common.state.v2.ListState<T> extends org.apache.flink.api.common.state.v2.State {

428

/**

429

* Get all elements asynchronously

430

* @return Future with iterable over elements

431

*/

432

StateFuture<Iterable<T>> asyncGet();

433

434

/**

435

* Add element asynchronously

436

* @param value Element to add

437

* @return Future representing completion

438

*/

439

StateFuture<Void> asyncAdd(T value);

440

441

/**

442

* Update list asynchronously

443

* @param values New elements

444

* @return Future representing completion

445

*/

446

StateFuture<Void> asyncUpdate(List<T> values);

447

}

448

449

/**

450

* Async map state

451

* @param <UK> User key type

452

* @param <UV> User value type

453

*/

454

interface org.apache.flink.api.common.state.v2.MapState<UK, UV> extends org.apache.flink.api.common.state.v2.State {

455

/**

456

* Get value for key asynchronously

457

* @param key User key

458

* @return Future with value

459

*/

460

StateFuture<UV> asyncGet(UK key);

461

462

/**

463

* Put key-value pair asynchronously

464

* @param key User key

465

* @param value User value

466

* @return Future representing completion

467

*/

468

StateFuture<Void> asyncPut(UK key, UV value);

469

470

/**

471

* Remove key asynchronously

472

* @param key Key to remove

473

* @return Future representing completion

474

*/

475

StateFuture<Void> asyncRemove(UK key);

476

477

/**

478

* Check if key exists asynchronously

479

* @param key Key to check

480

* @return Future with boolean result

481

*/

482

StateFuture<Boolean> asyncContains(UK key);

483

484

/**

485

* Get all entries asynchronously

486

* @return Future with iterable over entries

487

*/

488

StateFuture<Iterable<Map.Entry<UK, UV>>> asyncEntries();

489

}

490

491

/**

492

* Future type for async state operations

493

* @param <T> Result type

494

*/

495

interface StateFuture<T> {

496

/**

497

* Apply function when future completes

498

* @param fn Function to apply

499

* @param <U> Function result type

500

* @return New future with function result

501

*/

502

<U> StateFuture<U> thenApply(Function<T, U> fn);

503

504

/**

505

* Compose with another async operation

506

* @param fn Function returning another future

507

* @param <U> Composed result type

508

* @return Future representing composed operation

509

*/

510

<U> StateFuture<U> thenCompose(Function<T, StateFuture<U>> fn);

511

512

/**

513

* Handle completion or exception

514

* @param fn Handler function

515

* @param <U> Handler result type

516

* @return Future with handler result

517

*/

518

<U> StateFuture<U> handle(BiFunction<T, Throwable, U> fn);

519

}

520

```

521

522

### Watermark Management

523

524

Watermark system for event time processing.

525

526

```java { .api }

527

/**

528

* Base interface for watermarks

529

*/

530

interface Watermark {

531

/**

532

* Get watermark timestamp

533

* @return Timestamp

534

*/

535

long getTimestamp();

536

537

/**

538

* Check if this is a special watermark (e.g., MAX_WATERMARK)

539

* @return true if special watermark

540

*/

541

boolean isSpecial();

542

}

543

544

/**

545

* Long-based watermark implementation

546

*/

547

class LongWatermark implements Watermark {

548

/**

549

* Create watermark with timestamp

550

* @param timestamp Watermark timestamp

551

*/

552

public LongWatermark(long timestamp);

553

554

@Override

555

public long getTimestamp();

556

557

@Override

558

public boolean isSpecial();

559

560

/** Maximum possible watermark value */

561

public static final LongWatermark MAX_WATERMARK;

562

}

563

564

/**

565

* Boolean-based watermark implementation

566

*/

567

class BoolWatermark implements Watermark {

568

/**

569

* Create boolean watermark

570

* @param value Boolean value

571

*/

572

public BoolWatermark(boolean value);

573

574

/**

575

* Get boolean value

576

* @return Boolean value

577

*/

578

public boolean getValue();

579

}

580

581

/**

582

* Interface for watermark management

583

*/

584

interface WatermarkManager {

585

/**

586

* Update watermarks

587

* @param watermarks New watermarks

588

*/

589

void updateWatermarks(Collection<Watermark> watermarks);

590

591

/**

592

* Get current combined watermark

593

* @return Current watermark

594

*/

595

Watermark getCurrentWatermark();

596

}

597

```