or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkeyed-state-management.mdstate-backend-configuration.mdstate-types-operations.md

state-types-operations.mddocs/

0

# State Types and Operations

1

2

Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations. This capability provides changelog-enabled wrappers for every type of state supported by Apache Flink.

3

4

## Capabilities

5

6

### Value State Operations

7

8

Value state stores a single value per key and supports read, write, and clear operations with changelog logging.

9

10

```java { .api }

11

/**

12

* Changelog-enabled value state that logs all state changes.

13

* Wraps Flink's InternalValueState with transparent logging.

14

*/

15

class ChangelogValueState<K, N, V> extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>>

16

implements InternalValueState<K, N, V> {

17

18

/**

19

* Gets the current value for the current key and namespace.

20

*

21

* @return The current value, or null if no value is set

22

* @throws IOException if value retrieval fails

23

*/

24

public V value() throws IOException;

25

26

/**

27

* Updates the value for the current key and namespace.

28

* The change is automatically logged to the changelog.

29

*

30

* @param value The new value to set

31

* @throws IOException if value update fails

32

*/

33

public void update(V value) throws IOException;

34

35

/**

36

* Clears the value for the current key and namespace.

37

* The clear operation is logged to the changelog.

38

*/

39

public void clear();

40

41

/**

42

* Static factory method for creating changelog value state instances.

43

* Used internally by the state backend factory system.

44

*

45

* @param valueState The underlying value state to wrap

46

* @return Changelog-enabled value state instance

47

*/

48

static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> valueState);

49

}

50

```

51

52

**Usage Example:**

53

54

```java

55

import org.apache.flink.api.common.state.ValueState;

56

import org.apache.flink.api.common.state.ValueStateDescriptor;

57

58

// In a Flink operator

59

ValueStateDescriptor<String> descriptor =

60

new ValueStateDescriptor<>("my-value", String.class);

61

ValueState<String> valueState = getRuntimeContext().getState(descriptor);

62

63

// All operations are automatically logged to changelog

64

String currentValue = valueState.value();

65

valueState.update("new value");

66

valueState.clear();

67

```

68

69

### List State Operations

70

71

List state maintains a list of values per key, supporting add, update, and iteration operations with changelog logging.

72

73

```java { .api }

74

/**

75

* Changelog-enabled list state that logs all list modifications.

76

* Wraps Flink's InternalListState with transparent logging.

77

*/

78

class ChangelogListState<K, N, V> extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, V>>

79

implements InternalListState<K, N, V> {

80

81

/**

82

* Gets the current list of values for the current key and namespace.

83

*

84

* @return Iterable over the current list values

85

* @throws Exception if list retrieval fails

86

*/

87

public Iterable<V> get() throws Exception;

88

89

/**

90

* Adds a value to the list for the current key and namespace.

91

* The addition is logged to the changelog.

92

*

93

* @param value The value to add to the list

94

* @throws Exception if value addition fails

95

*/

96

public void add(V value) throws Exception;

97

98

/**

99

* Replaces the entire list with the provided values.

100

* The update is logged to the changelog.

101

*

102

* @param values The new list of values

103

* @throws Exception if list update fails

104

*/

105

public void update(List<V> values) throws Exception;

106

107

/**

108

* Adds all values from the provided list to the current list.

109

* Each addition is logged to the changelog.

110

*

111

* @param values The values to add

112

* @throws Exception if addition fails

113

*/

114

public void addAll(List<V> values) throws Exception;

115

116

/**

117

* Clears all values from the list.

118

* The clear operation is logged to the changelog.

119

*/

120

public void clear();

121

122

/**

123

* Updates the list internally with the provided values.

124

* This is an internal method used by the state backend.

125

*

126

* @param valueToStore The list of values to store internally

127

* @throws Exception if internal update fails

128

*/

129

public void updateInternal(List<V> valueToStore) throws Exception;

130

131

/**

132

* Gets the internal list representation.

133

* This is an internal method used by the state backend.

134

*

135

* @return The internal list of values

136

* @throws Exception if internal retrieval fails

137

*/

138

public List<V> getInternal() throws Exception;

139

140

/**

141

* Merges state from multiple namespaces into a target namespace.

142

* This is used during key group merging and state migration.

143

*

144

* @param target The target namespace to merge into

145

* @param sources The source namespaces to merge from

146

* @throws Exception if namespace merging fails

147

*/

148

public void mergeNamespaces(N target, Collection<N> sources) throws Exception;

149

150

/**

151

* Static factory method for creating changelog list state instances.

152

* Used internally by the state backend factory system.

153

*

154

* @param listState The underlying list state to wrap

155

* @return Changelog-enabled list state instance

156

*/

157

static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> listState);

158

}

159

```

160

161

**Usage Example:**

162

163

```java

164

import org.apache.flink.api.common.state.ListState;

165

import org.apache.flink.api.common.state.ListStateDescriptor;

166

167

// In a Flink operator

168

ListStateDescriptor<String> descriptor =

169

new ListStateDescriptor<>("my-list", String.class);

170

ListState<String> listState = getRuntimeContext().getListState(descriptor);

171

172

// All operations are automatically logged to changelog

173

listState.add("item1");

174

listState.add("item2");

175

listState.update(Arrays.asList("new1", "new2", "new3"));

176

177

for (String item : listState.get()) {

178

// Process each item

179

}

180

```

181

182

### Map State Operations

183

184

Map state maintains key-value mappings per key, supporting put, get, remove, and iteration operations with changelog logging.

185

186

```java { .api }

187

/**

188

* Changelog-enabled map state that logs all map modifications.

189

* Wraps Flink's InternalMapState with transparent logging.

190

*/

191

class ChangelogMapState<K, N, UK, UV> extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>>

192

implements InternalMapState<K, N, UK, UV> {

193

194

/**

195

* Gets the value for the specified map key.

196

*

197

* @param key The map key to look up

198

* @return The value associated with the key, or null if not present

199

* @throws Exception if value retrieval fails

200

*/

201

public UV get(UK key) throws Exception;

202

203

/**

204

* Puts a key-value pair into the map.

205

* The put operation is logged to the changelog.

206

*

207

* @param key The map key

208

* @param value The value to associate with the key

209

* @throws Exception if put operation fails

210

*/

211

public void put(UK key, UV value) throws Exception;

212

213

/**

214

* Puts all entries from the provided map into the state map.

215

* Each put operation is logged to the changelog.

216

*

217

* @param map The map containing entries to add

218

* @throws Exception if put operations fail

219

*/

220

public void putAll(Map<UK, UV> map) throws Exception;

221

222

/**

223

* Removes the entry for the specified key.

224

* The remove operation is logged to the changelog.

225

*

226

* @param key The key to remove

227

* @throws Exception if remove operation fails

228

*/

229

public void remove(UK key) throws Exception;

230

231

/**

232

* Checks if the map contains the specified key.

233

*

234

* @param key The key to check

235

* @return true if the key exists, false otherwise

236

* @throws Exception if check operation fails

237

*/

238

public boolean contains(UK key) throws Exception;

239

240

/**

241

* Gets all entries in the map.

242

*

243

* @return Iterable over all key-value entries

244

* @throws Exception if entry retrieval fails

245

*/

246

public Iterable<Map.Entry<UK, UV>> entries() throws Exception;

247

248

/**

249

* Gets all keys in the map.

250

*

251

* @return Iterable over all keys

252

* @throws Exception if key retrieval fails

253

*/

254

public Iterable<UK> keys() throws Exception;

255

256

/**

257

* Gets all values in the map.

258

*

259

* @return Iterable over all values

260

* @throws Exception if value retrieval fails

261

*/

262

public Iterable<UV> values() throws Exception;

263

264

/**

265

* Gets an iterator over all entries in the map.

266

*

267

* @return Iterator over key-value entries

268

* @throws Exception if iterator creation fails

269

*/

270

public Iterator<Map.Entry<UK, UV>> iterator() throws Exception;

271

272

/**

273

* Checks if the map is empty.

274

*

275

* @return true if the map contains no entries, false otherwise

276

* @throws Exception if empty check fails

277

*/

278

public boolean isEmpty() throws Exception;

279

280

/**

281

* Clears all entries from the map.

282

* The clear operation is logged to the changelog.

283

*/

284

public void clear();

285

286

/**

287

* Static factory method for creating changelog map state instances.

288

* Used internally by the state backend factory system.

289

*

290

* @param mapState The underlying map state to wrap

291

* @return Changelog-enabled map state instance

292

*/

293

static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> mapState);

294

}

295

```

296

297

**Usage Example:**

298

299

```java

300

import org.apache.flink.api.common.state.MapState;

301

import org.apache.flink.api.common.state.MapStateDescriptor;

302

303

// In a Flink operator

304

MapStateDescriptor<String, Long> descriptor =

305

new MapStateDescriptor<>("my-map", String.class, Long.class);

306

MapState<String, Long> mapState = getRuntimeContext().getMapState(descriptor);

307

308

// All operations are automatically logged to changelog

309

mapState.put("key1", 100L);

310

mapState.put("key2", 200L);

311

312

Long value = mapState.get("key1");

313

mapState.remove("key2");

314

315

for (Map.Entry<String, Long> entry : mapState.entries()) {

316

// Process each entry

317

}

318

```

319

320

### Reducing State Operations

321

322

Reducing state maintains a single value that is updated using a reduce function, with changelog logging for all reductions.

323

324

```java { .api }

325

/**

326

* Changelog-enabled reducing state that logs all reduce operations.

327

* Wraps Flink's InternalReducingState with transparent logging.

328

*/

329

class ChangelogReducingState<K, N, V> extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>>

330

implements InternalReducingState<K, N, V> {

331

332

/**

333

* Gets the current reduced value.

334

*

335

* @return The current reduced value, or null if no value has been added

336

* @throws Exception if value retrieval fails

337

*/

338

public V get() throws Exception;

339

340

/**

341

* Adds a value to the reducing state, applying the reduce function.

342

* The resulting state change is logged to the changelog.

343

*

344

* @param value The value to add/reduce

345

* @throws Exception if add operation fails

346

*/

347

public void add(V value) throws Exception;

348

349

/**

350

* Clears the reducing state.

351

* The clear operation is logged to the changelog.

352

*/

353

public void clear();

354

355

/**

356

* Static factory method for creating changelog reducing state instances.

357

* Used internally by the state backend factory system.

358

*

359

* @param reducingState The underlying reducing state to wrap

360

* @return Changelog-enabled reducing state instance

361

*/

362

static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> reducingState);

363

}

364

```

365

366

**Usage Example:**

367

368

```java

369

import org.apache.flink.api.common.state.ReducingState;

370

import org.apache.flink.api.common.state.ReducingStateDescriptor;

371

import org.apache.flink.api.common.functions.ReduceFunction;

372

373

// In a Flink operator

374

ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(

375

"sum-state",

376

new ReduceFunction<Long>() {

377

@Override

378

public Long reduce(Long value1, Long value2) throws Exception {

379

return value1 + value2;

380

}

381

},

382

Long.class

383

);

384

ReducingState<Long> reducingState = getRuntimeContext().getReducingState(descriptor);

385

386

// All operations are automatically logged to changelog

387

reducingState.add(10L);

388

reducingState.add(20L);

389

Long sum = reducingState.get(); // Returns 30L

390

```

391

392

### Aggregating State Operations

393

394

Aggregating state maintains an accumulator that is updated using aggregation functions, with changelog logging for all aggregations.

395

396

```java { .api }

397

/**

398

* Changelog-enabled aggregating state that logs all aggregation operations.

399

* Wraps Flink's InternalAggregatingState with transparent logging.

400

*/

401

class ChangelogAggregatingState<K, N, IN, ACC, OUT> extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>>

402

implements InternalAggregatingState<K, N, IN, ACC, OUT> {

403

404

/**

405

* Gets the current aggregated result.

406

*

407

* @return The current aggregated output value

408

* @throws Exception if value retrieval fails

409

*/

410

public OUT get() throws Exception;

411

412

/**

413

* Adds an input value to the aggregating state.

414

* The aggregation is performed and the state change is logged to the changelog.

415

*

416

* @param value The input value to aggregate

417

* @throws Exception if add operation fails

418

*/

419

public void add(IN value) throws Exception;

420

421

/**

422

* Clears the aggregating state.

423

* The clear operation is logged to the changelog.

424

*/

425

public void clear();

426

427

/**

428

* Static factory method for creating changelog aggregating state instances.

429

* Used internally by the state backend factory system.

430

*

431

* @param aggregatingState The underlying aggregating state to wrap

432

* @return Changelog-enabled aggregating state instance

433

*/

434

static <T, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> aggregatingState);

435

}

436

```

437

438

**Usage Example:**

439

440

```java

441

import org.apache.flink.api.common.state.AggregatingState;

442

import org.apache.flink.api.common.state.AggregatingStateDescriptor;

443

import org.apache.flink.api.common.functions.AggregateFunction;

444

445

// In a Flink operator

446

AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =

447

new AggregatingStateDescriptor<>(

448

"average-state",

449

new AggregateFunction<Double, Tuple2<Double, Long>, Double>() {

450

@Override

451

public Tuple2<Double, Long> createAccumulator() {

452

return Tuple2.of(0.0, 0L);

453

}

454

455

@Override

456

public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {

457

return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);

458

}

459

460

@Override

461

public Double getResult(Tuple2<Double, Long> accumulator) {

462

return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;

463

}

464

465

@Override

466

public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {

467

return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);

468

}

469

},

470

Types.TUPLE(Types.DOUBLE, Types.LONG)

471

);

472

473

AggregatingState<Double, Double> aggregatingState = getRuntimeContext().getAggregatingState(descriptor);

474

475

// All operations are automatically logged to changelog

476

aggregatingState.add(10.0);

477

aggregatingState.add(20.0);

478

aggregatingState.add(30.0);

479

Double average = aggregatingState.get(); // Returns 20.0

480

```

481

482

### Priority Queue Operations

483

484

Priority queue support for timers and ordered event processing with changelog logging.

485

486

```java { .api }

487

/**

488

* Changelog-enabled priority queue that logs all queue operations.

489

* Wraps Flink's KeyGroupedInternalPriorityQueue with transparent logging.

490

*/

491

class ChangelogKeyGroupedPriorityQueue<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>

492

implements KeyGroupedInternalPriorityQueue<T> {

493

494

/**

495

* Adds an element to the priority queue.

496

* The addition is logged to the changelog.

497

*

498

* @param element The element to add

499

* @return true if the element was added successfully

500

*/

501

public boolean add(T element);

502

503

/**

504

* Removes and returns the element with the highest priority.

505

* The removal is logged to the changelog.

506

*

507

* @return The highest priority element, or null if empty

508

*/

509

public T poll();

510

511

/**

512

* Returns the element with the highest priority without removing it.

513

*

514

* @return The highest priority element, or null if empty

515

*/

516

public T peek();

517

518

/**

519

* Removes the specified element from the queue.

520

* The removal is logged to the changelog.

521

*

522

* @param element The element to remove

523

* @return true if the element was removed

524

*/

525

public boolean remove(T element);

526

527

/**

528

* Checks if the queue is empty.

529

*

530

* @return true if the queue contains no elements

531

*/

532

public boolean isEmpty();

533

534

/**

535

* Gets the number of elements in the queue.

536

*

537

* @return The number of elements

538

*/

539

public int size();

540

541

/**

542

* Adds all elements from the provided collection to the queue.

543

* The additions are logged to the changelog.

544

*

545

* @param toAdd Collection of elements to add (can be null)

546

*/

547

public void addAll(@Nullable Collection<? extends T> toAdd);

548

549

/**

550

* Gets a subset of elements for a specific key group.

551

* Used for key group-based partitioning.

552

*

553

* @param keyGroupId The key group identifier

554

* @return Set of elements belonging to the key group

555

*/

556

public Set<T> getSubsetForKeyGroup(int keyGroupId);

557

558

/**

559

* Gets an iterator over all elements in the queue.

560

*

561

* @return CloseableIterator for traversing queue elements

562

*/

563

public CloseableIterator<T> iterator();

564

}

565

```

566

567

## Abstract State Base Class

568

569

All changelog state types extend a common abstract base class that provides shared functionality.

570

571

```java { .api }

572

/**

573

* Base class for all changelog state wrappers.

574

* Provides common functionality for delegation and serialization.

575

*/

576

abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, V>>

577

implements InternalKvState<K, N, V> {

578

579

/**

580

* Gets the underlying delegated state instance.

581

*

582

* @return The wrapped state object

583

*/

584

public S getDelegatedState();

585

586

/**

587

* Gets the key serializer.

588

*

589

* @return TypeSerializer for key type K

590

*/

591

public TypeSerializer<K> getKeySerializer();

592

593

/**

594

* Gets the namespace serializer.

595

*

596

* @return TypeSerializer for namespace type N

597

*/

598

public TypeSerializer<N> getNamespaceSerializer();

599

600

/**

601

* Gets the value serializer.

602

*

603

* @return TypeSerializer for value type V

604

*/

605

public TypeSerializer<V> getValueSerializer();

606

607

/**

608

* Sets the current namespace for state operations.

609

*

610

* @param namespace The namespace to set as current

611

*/

612

public void setCurrentNamespace(N namespace);

613

614

/**

615

* Gets the serialized value for the given serialized key and namespace.

616

*

617

* @param serializedKeyAndNamespace Serialized key and namespace

618

* @param safeKeySerializer Safe key serializer

619

* @param safeNamespaceSerializer Safe namespace serializer

620

* @param safeValueSerializer Safe value serializer

621

* @return Serialized value bytes

622

* @throws Exception if serialization fails

623

*/

624

public byte[] getSerializedValue(

625

byte[] serializedKeyAndNamespace,

626

TypeSerializer<K> safeKeySerializer,

627

TypeSerializer<N> safeNamespaceSerializer,

628

TypeSerializer<V> safeValueSerializer

629

) throws Exception;

630

}

631

```

632

633

## State Factory System

634

635

The changelog state backend uses a factory system to create appropriate state wrappers for each state type.

636

637

```java { .api }

638

// Internal factory interface for creating changelog states

639

interface StateFactory {

640

<K, N, SV, S extends State, IS extends InternalKvState<K, N, SV>> IS create(

641

IS originalState,

642

StateDescriptor<S, ?> stateDescriptor

643

);

644

}

645

646

// Factory mapping for different state types

647

private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Map.of(

648

StateDescriptor.Type.VALUE, ChangelogValueState::create,

649

StateDescriptor.Type.LIST, ChangelogListState::create,

650

StateDescriptor.Type.REDUCING, ChangelogReducingState::create,

651

StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create,

652

StateDescriptor.Type.MAP, ChangelogMapState::create

653

);

654

```

655

656

## Changelog Integration

657

658

All state operations are transparently logged to the changelog:

659

660

- **Automatic Logging**: State mutations are logged without requiring code changes

661

- **Operation Granularity**: Individual operations (put, add, remove, etc.) are logged separately

662

- **Type Safety**: Changelog entries maintain type information for proper deserialization

663

- **Performance**: Minimal overhead for logging operations

664

- **Consistency**: Changelog entries are consistent with state backend state