or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-interfaces.mdindex.mdsavepoint-management.mdstate-reading.mdstate-writing.mdwindow-operations.md

function-interfaces.mddocs/

0

# Function Interfaces

1

2

Function interfaces define the contracts for processing state data in the State Processor API. These interfaces allow you to implement custom logic for reading from and writing to Flink savepoints.

3

4

## State Bootstrap Functions

5

6

### StateBootstrapFunction

7

8

Base function for bootstrapping non-keyed operator state.

9

10

```java { .api }

11

public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction

12

implements CheckpointedFunction {

13

public abstract void processElement(IN value, Context ctx) throws Exception;

14

15

public interface Context {

16

long currentProcessingTime();

17

}

18

}

19

```

20

21

**Lifecycle:**

22

1. `open(Configuration)` - Initialize state descriptors and resources

23

2. `processElement(T, Context)` - Process each input element

24

3. `close()` - Cleanup resources

25

26

**Usage Example:**

27

28

```java

29

public class MetricsBootstrapFunction extends StateBootstrapFunction<MetricEvent> {

30

private ValueState<Long> totalCountState;

31

private ValueState<Double> averageState;

32

33

@Override

34

public void open(Configuration parameters) throws Exception {

35

super.open(parameters);

36

37

// Register state descriptors

38

totalCountState = getRuntimeContext().getState(

39

new ValueStateDescriptor<>("totalCount", Long.class)

40

);

41

averageState = getRuntimeContext().getState(

42

new ValueStateDescriptor<>("average", Double.class)

43

);

44

}

45

46

@Override

47

public void processElement(MetricEvent event, Context ctx) throws Exception {

48

// Update total count

49

Long currentCount = totalCountState.value();

50

totalCountState.update((currentCount != null ? currentCount : 0L) + 1);

51

52

// Update running average

53

Double currentAvg = averageState.value();

54

double newAvg = calculateRunningAverage(currentAvg, event.getValue());

55

averageState.update(newAvg);

56

57

// Access context information

58

System.out.println("Processing time: " + ctx.currentProcessingTime());

59

System.out.println("Watermark: " + ctx.currentWatermark());

60

}

61

62

private double calculateRunningAverage(Double currentAvg, double newValue) {

63

return currentAvg != null ? (currentAvg + newValue) / 2.0 : newValue;

64

}

65

}

66

```

67

68

### KeyedStateBootstrapFunction

69

70

Function for bootstrapping keyed state with access to key context and timers.

71

72

```java { .api }

73

public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {

74

public abstract void processElement(IN value, Context ctx) throws Exception;

75

76

public abstract class Context {

77

public abstract TimerService timerService();

78

public abstract K getCurrentKey();

79

}

80

}

81

```

82

83

**Key Features:**

84

- Access to current key via `ctx.getCurrentKey()`

85

- Timer service for registering event-time and processing-time timers

86

- Keyed state automatically partitioned by key

87

88

**Usage Example:**

89

90

```java

91

public class UserSessionBootstrapFunction extends KeyedStateBootstrapFunction<String, UserActivity> {

92

private ValueState<UserSession> sessionState;

93

private ListState<String> activityLogState;

94

95

@Override

96

public void open(Configuration parameters) throws Exception {

97

super.open(parameters);

98

99

sessionState = getRuntimeContext().getState(

100

new ValueStateDescriptor<>("session", UserSession.class)

101

);

102

activityLogState = getRuntimeContext().getListState(

103

new ListStateDescriptor<>("activityLog", String.class)

104

);

105

}

106

107

@Override

108

public void processElement(UserActivity activity, Context ctx) throws Exception {

109

String userId = ctx.getCurrentKey();

110

111

// Update or create user session

112

UserSession session = sessionState.value();

113

if (session == null) {

114

session = new UserSession(userId, activity.getTimestamp());

115

}

116

session.addActivity(activity);

117

sessionState.update(session);

118

119

// Add to activity log

120

activityLogState.add(activity.getAction() + ":" + activity.getTimestamp());

121

122

// Set session timeout timer

123

ctx.timerService().registerEventTimeTimer(

124

activity.getTimestamp() + Duration.ofMinutes(30).toMillis()

125

);

126

127

System.out.println("Updated session for user: " + userId);

128

}

129

}

130

```

131

132

### BroadcastStateBootstrapFunction

133

134

Function for bootstrapping broadcast state that is replicated across all operator instances.

135

136

```java { .api }

137

public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction {

138

public abstract void processElement(IN value, Context ctx) throws Exception;

139

140

public interface Context {

141

long currentProcessingTime();

142

<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);

143

}

144

}

145

```

146

147

**Usage Example:**

148

149

```java

150

public class RulesBootstrapFunction extends BroadcastStateBootstrapFunction<BusinessRule> {

151

private MapStateDescriptor<String, BusinessRule> rulesDescriptor;

152

153

@Override

154

public void open(Configuration parameters) throws Exception {

155

super.open(parameters);

156

157

rulesDescriptor = new MapStateDescriptor<>(

158

"business-rules", String.class, BusinessRule.class

159

);

160

}

161

162

@Override

163

public void processElement(BusinessRule rule, Context ctx) throws Exception {

164

BroadcastState<String, BusinessRule> broadcastState =

165

getRuntimeContext().getBroadcastState(rulesDescriptor);

166

167

broadcastState.put(rule.getId(), rule);

168

169

System.out.println("Broadcasted rule: " + rule.getId());

170

}

171

}

172

```

173

174

## State Reader Functions

175

176

### KeyedStateReaderFunction

177

178

Function for reading keyed state from existing savepoints.

179

180

```java { .api }

181

public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {

182

public abstract void open(Configuration parameters) throws Exception;

183

184

public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;

185

186

public interface Context {

187

Set<Long> registeredEventTimeTimers() throws Exception;

188

Set<Long> registeredProcessingTimeTimers() throws Exception;

189

}

190

}

191

```

192

193

**Key Features:**

194

- Must register all state descriptors in `open()` method

195

- Called once per key in the savepoint

196

- Access to registered timers for each key

197

- Can output zero or more results per key

198

199

**Usage Example:**

200

201

```java

202

public class UserAnalyticsReaderFunction extends KeyedStateReaderFunction<String, UserInsight> {

203

private ValueState<UserProfile> profileState;

204

private ListState<PurchaseEvent> purchaseHistoryState;

205

private MapState<String, Double> categorySpendingState;

206

207

@Override

208

public void open(Configuration parameters) throws Exception {

209

super.open(parameters);

210

211

// Must register ALL state descriptors here

212

profileState = getRuntimeContext().getState(

213

new ValueStateDescriptor<>("profile", UserProfile.class)

214

);

215

purchaseHistoryState = getRuntimeContext().getListState(

216

new ListStateDescriptor<>("purchases", PurchaseEvent.class)

217

);

218

categorySpendingState = getRuntimeContext().getMapState(

219

new MapStateDescriptor<>("categorySpending", String.class, Double.class)

220

);

221

}

222

223

@Override

224

public void readKey(String userId, Context ctx, Collector<UserInsight> out) throws Exception {

225

UserProfile profile = profileState.value();

226

if (profile == null) {

227

return; // No data for this user

228

}

229

230

// Collect purchase history

231

List<PurchaseEvent> purchases = new ArrayList<>();

232

purchaseHistoryState.get().forEach(purchases::add);

233

234

// Collect category spending

235

Map<String, Double> categorySpending = new HashMap<>();

236

for (Map.Entry<String, Double> entry : categorySpendingState.entries()) {

237

categorySpending.put(entry.getKey(), entry.getValue());

238

}

239

240

// Access timer information

241

Set<Long> eventTimers = ctx.registeredEventTimeTimers();

242

Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();

243

244

// Create and emit insight

245

UserInsight insight = new UserInsight(

246

userId,

247

profile,

248

purchases,

249

categorySpending,

250

eventTimers,

251

processingTimers

252

);

253

254

out.collect(insight);

255

}

256

}

257

```

258

259

### WindowReaderFunction

260

261

Function for reading window state with access to window metadata.

262

263

```java { .api }

264

public abstract class WindowReaderFunction<IN, OUT, KEY, W extends Window>

265

extends AbstractRichFunction {

266

public abstract void readWindow(

267

KEY key,

268

Context<W> context,

269

Iterable<IN> elements,

270

Collector<OUT> out

271

) throws Exception;

272

273

public interface Context<W extends Window> extends java.io.Serializable {

274

W window();

275

<S extends State> S triggerState(StateDescriptor<S, ?> descriptor);

276

KeyedStateStore windowState();

277

KeyedStateStore globalState();

278

Set<Long> registeredEventTimeTimers() throws Exception;

279

Set<Long> registeredProcessingTimeTimers() throws Exception;

280

}

281

}

282

```

283

284

**Usage Example:**

285

286

```java

287

public class SessionWindowReaderFunction implements WindowReaderFunction<UserEvent, SessionSummary, String, TimeWindow> {

288

289

@Override

290

public void readWindow(

291

String userId,

292

Context context,

293

Iterable<UserEvent> events,

294

Collector<SessionSummary> out

295

) throws Exception {

296

297

TimeWindow window = context.window();

298

List<UserEvent> eventList = new ArrayList<>();

299

events.forEach(eventList::add);

300

301

if (!eventList.isEmpty()) {

302

SessionSummary summary = new SessionSummary(

303

userId,

304

window.getStart(),

305

window.getEnd(),

306

eventList.size(),

307

eventList

308

);

309

310

out.collect(summary);

311

}

312

}

313

}

314

```

315

316

## Utility Functions

317

318

### Timestamper

319

320

Interface for assigning timestamps to elements during bootstrap.

321

322

```java { .api }

323

@FunctionalInterface

324

public interface Timestamper<T> extends Function {

325

long timestamp(T element);

326

}

327

```

328

329

**Usage Example:**

330

331

```java

332

public class EventTimestamper implements Timestamper<Event> {

333

@Override

334

public long timestamp(Event event) {

335

return event.getEventTime();

336

}

337

}

338

339

// Use with bootstrap transformation

340

BootstrapTransformation<Event> transformation = OperatorTransformation

341

.bootstrapWith(events)

342

.keyBy(Event::getUserId)

343

.assignTimestamps(new EventTimestamper()) // Custom timestamper

344

.transform(new EventBootstrapFunction());

345

```

346

347

## Advanced Function Patterns

348

349

### Multi-State Function

350

351

```java

352

public class ComplexStateBootstrapFunction extends KeyedStateBootstrapFunction<String, ComplexEvent> {

353

// Multiple state types

354

private ValueState<String> statusState;

355

private ListState<String> historyState;

356

private MapState<String, Integer> countersState;

357

private ReducingState<Double> sumState;

358

private AggregatingState<Double, Double, Double> avgState;

359

360

@Override

361

public void open(Configuration parameters) throws Exception {

362

super.open(parameters);

363

364

// Value state

365

statusState = getRuntimeContext().getState(

366

new ValueStateDescriptor<>("status", String.class)

367

);

368

369

// List state

370

historyState = getRuntimeContext().getListState(

371

new ListStateDescriptor<>("history", String.class)

372

);

373

374

// Map state

375

countersState = getRuntimeContext().getMapState(

376

new MapStateDescriptor<>("counters", String.class, Integer.class)

377

);

378

379

// Reducing state

380

sumState = getRuntimeContext().getReducingState(

381

new ReducingStateDescriptor<>("sum", Double::sum, Double.class)

382

);

383

384

// Aggregating state

385

avgState = getRuntimeContext().getAggregatingState(

386

new AggregatingStateDescriptor<>("avg", new AverageAggregator(), Double.class)

387

);

388

}

389

390

@Override

391

public void processElement(ComplexEvent event, Context ctx) throws Exception {

392

// Update all state types

393

statusState.update(event.getStatus());

394

historyState.add(event.getAction());

395

396

String category = event.getCategory();

397

Integer count = countersState.get(category);

398

countersState.put(category, (count != null ? count : 0) + 1);

399

400

sumState.add(event.getValue());

401

avgState.add(event.getValue());

402

403

// Set timer if needed

404

if (event.needsCleanup()) {

405

ctx.timerService().registerEventTimeTimer(

406

event.getTimestamp() + Duration.ofHours(1).toMillis()

407

);

408

}

409

}

410

}

411

```

412

413

### Conditional Processing Function

414

415

```java

416

public class ConditionalReaderFunction extends KeyedStateReaderFunction<String, FilteredResult> {

417

private ValueState<UserData> userDataState;

418

private ListState<Transaction> transactionState;

419

private final Predicate<UserData> userFilter;

420

private final Predicate<Transaction> transactionFilter;

421

422

public ConditionalReaderFunction(

423

Predicate<UserData> userFilter,

424

Predicate<Transaction> transactionFilter

425

) {

426

this.userFilter = userFilter;

427

this.transactionFilter = transactionFilter;

428

}

429

430

@Override

431

public void open(Configuration parameters) throws Exception {

432

super.open(parameters);

433

434

userDataState = getRuntimeContext().getState(

435

new ValueStateDescriptor<>("userData", UserData.class)

436

);

437

transactionState = getRuntimeContext().getListState(

438

new ListStateDescriptor<>("transactions", Transaction.class)

439

);

440

}

441

442

@Override

443

public void readKey(String key, Context ctx, Collector<FilteredResult> out) throws Exception {

444

UserData userData = userDataState.value();

445

446

// Apply user filter

447

if (userData == null || !userFilter.test(userData)) {

448

return;

449

}

450

451

// Filter transactions

452

List<Transaction> filteredTransactions = new ArrayList<>();

453

for (Transaction transaction : transactionState.get()) {

454

if (transactionFilter.test(transaction)) {

455

filteredTransactions.add(transaction);

456

}

457

}

458

459

if (!filteredTransactions.isEmpty()) {

460

FilteredResult result = new FilteredResult(key, userData, filteredTransactions);

461

out.collect(result);

462

}

463

}

464

}

465

```

466

467

## Error Handling in Functions

468

469

### Robust Bootstrap Function

470

471

```java

472

public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, DataEvent> {

473

private ValueState<String> dataState;

474

private static final Logger LOG = LoggerFactory.getLogger(RobustBootstrapFunction.class);

475

476

@Override

477

public void open(Configuration parameters) throws Exception {

478

super.open(parameters);

479

480

try {

481

dataState = getRuntimeContext().getState(

482

new ValueStateDescriptor<>("data", String.class)

483

);

484

} catch (Exception e) {

485

LOG.error("Failed to initialize state", e);

486

throw new RuntimeException("State initialization failed", e);

487

}

488

}

489

490

@Override

491

public void processElement(DataEvent event, Context ctx) throws Exception {

492

try {

493

// Validate input

494

if (event == null || event.getData() == null) {

495

LOG.warn("Received null event or data for key: {}", ctx.getCurrentKey());

496

return;

497

}

498

499

// Process event

500

String currentData = dataState.value();

501

String newData = combineData(currentData, event.getData());

502

dataState.update(newData);

503

504

} catch (Exception e) {

505

LOG.error("Failed to process event for key: {}", ctx.getCurrentKey(), e);

506

// Decide whether to re-throw or continue

507

// throw new RuntimeException("Processing failed", e);

508

}

509

}

510

511

private String combineData(String current, String newData) {

512

return current != null ? current + "," + newData : newData;

513

}

514

}

515

```

516

517

### Safe Reader Function

518

519

```java

520

public class SafeReaderFunction extends KeyedStateReaderFunction<String, SafeResult> {

521

private ValueState<String> dataState;

522

private static final Logger LOG = LoggerFactory.getLogger(SafeReaderFunction.class);

523

524

@Override

525

public void open(Configuration parameters) throws Exception {

526

super.open(parameters);

527

528

dataState = getRuntimeContext().getState(

529

new ValueStateDescriptor<>("data", String.class)

530

);

531

}

532

533

@Override

534

public void readKey(String key, Context ctx, Collector<SafeResult> out) throws Exception {

535

try {

536

String data = dataState.value();

537

538

if (data != null) {

539

SafeResult result = new SafeResult(key, data, true);

540

out.collect(result);

541

} else {

542

LOG.debug("No data found for key: {}", key);

543

// Optionally emit a result indicating missing data

544

SafeResult result = new SafeResult(key, null, false);

545

out.collect(result);

546

}

547

548

} catch (Exception e) {

549

LOG.error("Failed to read state for key: {}", key, e);

550

// Emit error result instead of failing

551

SafeResult errorResult = new SafeResult(key, "ERROR: " + e.getMessage(), false);

552

out.collect(errorResult);

553

}

554

}

555

}