0
# Function Interfaces
1
2
Function interfaces define the contracts for processing state data in the State Processor API. These interfaces allow you to implement custom logic for reading from and writing to Flink savepoints.
3
4
## State Bootstrap Functions
5
6
### StateBootstrapFunction
7
8
Base function for bootstrapping non-keyed operator state.
9
10
```java { .api }
11
public abstract class StateBootstrapFunction<IN> extends AbstractRichFunction
12
implements CheckpointedFunction {
13
public abstract void processElement(IN value, Context ctx) throws Exception;
14
15
public interface Context {
16
long currentProcessingTime();
17
}
18
}
19
```
20
21
**Lifecycle:**
22
1. `open(Configuration)` - Initialize state descriptors and resources
23
2. `processElement(T, Context)` - Process each input element
24
3. `close()` - Cleanup resources
25
26
**Usage Example:**
27
28
```java
29
public class MetricsBootstrapFunction extends StateBootstrapFunction<MetricEvent> {
30
private ValueState<Long> totalCountState;
31
private ValueState<Double> averageState;
32
33
@Override
34
public void open(Configuration parameters) throws Exception {
35
super.open(parameters);
36
37
// Register state descriptors
38
totalCountState = getRuntimeContext().getState(
39
new ValueStateDescriptor<>("totalCount", Long.class)
40
);
41
averageState = getRuntimeContext().getState(
42
new ValueStateDescriptor<>("average", Double.class)
43
);
44
}
45
46
@Override
47
public void processElement(MetricEvent event, Context ctx) throws Exception {
48
// Update total count
49
Long currentCount = totalCountState.value();
50
totalCountState.update((currentCount != null ? currentCount : 0L) + 1);
51
52
// Update running average
53
Double currentAvg = averageState.value();
54
double newAvg = calculateRunningAverage(currentAvg, event.getValue());
55
averageState.update(newAvg);
56
57
// Access context information
58
System.out.println("Processing time: " + ctx.currentProcessingTime());
59
System.out.println("Watermark: " + ctx.currentWatermark());
60
}
61
62
private double calculateRunningAverage(Double currentAvg, double newValue) {
63
return currentAvg != null ? (currentAvg + newValue) / 2.0 : newValue;
64
}
65
}
66
```
67
68
### KeyedStateBootstrapFunction
69
70
Function for bootstrapping keyed state with access to key context and timers.
71
72
```java { .api }
73
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
74
public abstract void processElement(IN value, Context ctx) throws Exception;
75
76
public abstract class Context {
77
public abstract TimerService timerService();
78
public abstract K getCurrentKey();
79
}
80
}
81
```
82
83
**Key Features:**
84
- Access to current key via `ctx.getCurrentKey()`
85
- Timer service for registering event-time and processing-time timers
86
- Keyed state automatically partitioned by key
87
88
**Usage Example:**
89
90
```java
91
public class UserSessionBootstrapFunction extends KeyedStateBootstrapFunction<String, UserActivity> {
92
private ValueState<UserSession> sessionState;
93
private ListState<String> activityLogState;
94
95
@Override
96
public void open(Configuration parameters) throws Exception {
97
super.open(parameters);
98
99
sessionState = getRuntimeContext().getState(
100
new ValueStateDescriptor<>("session", UserSession.class)
101
);
102
activityLogState = getRuntimeContext().getListState(
103
new ListStateDescriptor<>("activityLog", String.class)
104
);
105
}
106
107
@Override
108
public void processElement(UserActivity activity, Context ctx) throws Exception {
109
String userId = ctx.getCurrentKey();
110
111
// Update or create user session
112
UserSession session = sessionState.value();
113
if (session == null) {
114
session = new UserSession(userId, activity.getTimestamp());
115
}
116
session.addActivity(activity);
117
sessionState.update(session);
118
119
// Add to activity log
120
activityLogState.add(activity.getAction() + ":" + activity.getTimestamp());
121
122
// Set session timeout timer
123
ctx.timerService().registerEventTimeTimer(
124
activity.getTimestamp() + Duration.ofMinutes(30).toMillis()
125
);
126
127
System.out.println("Updated session for user: " + userId);
128
}
129
}
130
```
131
132
### BroadcastStateBootstrapFunction
133
134
Function for bootstrapping broadcast state that is replicated across all operator instances.
135
136
```java { .api }
137
public abstract class BroadcastStateBootstrapFunction<IN> extends AbstractRichFunction {
138
public abstract void processElement(IN value, Context ctx) throws Exception;
139
140
public interface Context {
141
long currentProcessingTime();
142
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> descriptor);
143
}
144
}
145
```
146
147
**Usage Example:**
148
149
```java
150
public class RulesBootstrapFunction extends BroadcastStateBootstrapFunction<BusinessRule> {
151
private MapStateDescriptor<String, BusinessRule> rulesDescriptor;
152
153
@Override
154
public void open(Configuration parameters) throws Exception {
155
super.open(parameters);
156
157
rulesDescriptor = new MapStateDescriptor<>(
158
"business-rules", String.class, BusinessRule.class
159
);
160
}
161
162
@Override
163
public void processElement(BusinessRule rule, Context ctx) throws Exception {
164
BroadcastState<String, BusinessRule> broadcastState =
165
getRuntimeContext().getBroadcastState(rulesDescriptor);
166
167
broadcastState.put(rule.getId(), rule);
168
169
System.out.println("Broadcasted rule: " + rule.getId());
170
}
171
}
172
```
173
174
## State Reader Functions
175
176
### KeyedStateReaderFunction
177
178
Function for reading keyed state from existing savepoints.
179
180
```java { .api }
181
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
182
public abstract void open(Configuration parameters) throws Exception;
183
184
public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
185
186
public interface Context {
187
Set<Long> registeredEventTimeTimers() throws Exception;
188
Set<Long> registeredProcessingTimeTimers() throws Exception;
189
}
190
}
191
```
192
193
**Key Features:**
194
- Must register all state descriptors in `open()` method
195
- Called once per key in the savepoint
196
- Access to registered timers for each key
197
- Can output zero or more results per key
198
199
**Usage Example:**
200
201
```java
202
public class UserAnalyticsReaderFunction extends KeyedStateReaderFunction<String, UserInsight> {
203
private ValueState<UserProfile> profileState;
204
private ListState<PurchaseEvent> purchaseHistoryState;
205
private MapState<String, Double> categorySpendingState;
206
207
@Override
208
public void open(Configuration parameters) throws Exception {
209
super.open(parameters);
210
211
// Must register ALL state descriptors here
212
profileState = getRuntimeContext().getState(
213
new ValueStateDescriptor<>("profile", UserProfile.class)
214
);
215
purchaseHistoryState = getRuntimeContext().getListState(
216
new ListStateDescriptor<>("purchases", PurchaseEvent.class)
217
);
218
categorySpendingState = getRuntimeContext().getMapState(
219
new MapStateDescriptor<>("categorySpending", String.class, Double.class)
220
);
221
}
222
223
@Override
224
public void readKey(String userId, Context ctx, Collector<UserInsight> out) throws Exception {
225
UserProfile profile = profileState.value();
226
if (profile == null) {
227
return; // No data for this user
228
}
229
230
// Collect purchase history
231
List<PurchaseEvent> purchases = new ArrayList<>();
232
purchaseHistoryState.get().forEach(purchases::add);
233
234
// Collect category spending
235
Map<String, Double> categorySpending = new HashMap<>();
236
for (Map.Entry<String, Double> entry : categorySpendingState.entries()) {
237
categorySpending.put(entry.getKey(), entry.getValue());
238
}
239
240
// Access timer information
241
Set<Long> eventTimers = ctx.registeredEventTimeTimers();
242
Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
243
244
// Create and emit insight
245
UserInsight insight = new UserInsight(
246
userId,
247
profile,
248
purchases,
249
categorySpending,
250
eventTimers,
251
processingTimers
252
);
253
254
out.collect(insight);
255
}
256
}
257
```
258
259
### WindowReaderFunction
260
261
Function for reading window state with access to window metadata.
262
263
```java { .api }
264
public abstract class WindowReaderFunction<IN, OUT, KEY, W extends Window>
265
extends AbstractRichFunction {
266
public abstract void readWindow(
267
KEY key,
268
Context<W> context,
269
Iterable<IN> elements,
270
Collector<OUT> out
271
) throws Exception;
272
273
public interface Context<W extends Window> extends java.io.Serializable {
274
W window();
275
<S extends State> S triggerState(StateDescriptor<S, ?> descriptor);
276
KeyedStateStore windowState();
277
KeyedStateStore globalState();
278
Set<Long> registeredEventTimeTimers() throws Exception;
279
Set<Long> registeredProcessingTimeTimers() throws Exception;
280
}
281
}
282
```
283
284
**Usage Example:**
285
286
```java
287
public class SessionWindowReaderFunction implements WindowReaderFunction<UserEvent, SessionSummary, String, TimeWindow> {
288
289
@Override
290
public void readWindow(
291
String userId,
292
Context context,
293
Iterable<UserEvent> events,
294
Collector<SessionSummary> out
295
) throws Exception {
296
297
TimeWindow window = context.window();
298
List<UserEvent> eventList = new ArrayList<>();
299
events.forEach(eventList::add);
300
301
if (!eventList.isEmpty()) {
302
SessionSummary summary = new SessionSummary(
303
userId,
304
window.getStart(),
305
window.getEnd(),
306
eventList.size(),
307
eventList
308
);
309
310
out.collect(summary);
311
}
312
}
313
}
314
```
315
316
## Utility Functions
317
318
### Timestamper
319
320
Interface for assigning timestamps to elements during bootstrap.
321
322
```java { .api }
323
@FunctionalInterface
324
public interface Timestamper<T> extends Function {
325
long timestamp(T element);
326
}
327
```
328
329
**Usage Example:**
330
331
```java
332
public class EventTimestamper implements Timestamper<Event> {
333
@Override
334
public long timestamp(Event event) {
335
return event.getEventTime();
336
}
337
}
338
339
// Use with bootstrap transformation
340
BootstrapTransformation<Event> transformation = OperatorTransformation
341
.bootstrapWith(events)
342
.keyBy(Event::getUserId)
343
.assignTimestamps(new EventTimestamper()) // Custom timestamper
344
.transform(new EventBootstrapFunction());
345
```
346
347
## Advanced Function Patterns
348
349
### Multi-State Function
350
351
```java
352
public class ComplexStateBootstrapFunction extends KeyedStateBootstrapFunction<String, ComplexEvent> {
353
// Multiple state types
354
private ValueState<String> statusState;
355
private ListState<String> historyState;
356
private MapState<String, Integer> countersState;
357
private ReducingState<Double> sumState;
358
private AggregatingState<Double, Double, Double> avgState;
359
360
@Override
361
public void open(Configuration parameters) throws Exception {
362
super.open(parameters);
363
364
// Value state
365
statusState = getRuntimeContext().getState(
366
new ValueStateDescriptor<>("status", String.class)
367
);
368
369
// List state
370
historyState = getRuntimeContext().getListState(
371
new ListStateDescriptor<>("history", String.class)
372
);
373
374
// Map state
375
countersState = getRuntimeContext().getMapState(
376
new MapStateDescriptor<>("counters", String.class, Integer.class)
377
);
378
379
// Reducing state
380
sumState = getRuntimeContext().getReducingState(
381
new ReducingStateDescriptor<>("sum", Double::sum, Double.class)
382
);
383
384
// Aggregating state
385
avgState = getRuntimeContext().getAggregatingState(
386
new AggregatingStateDescriptor<>("avg", new AverageAggregator(), Double.class)
387
);
388
}
389
390
@Override
391
public void processElement(ComplexEvent event, Context ctx) throws Exception {
392
// Update all state types
393
statusState.update(event.getStatus());
394
historyState.add(event.getAction());
395
396
String category = event.getCategory();
397
Integer count = countersState.get(category);
398
countersState.put(category, (count != null ? count : 0) + 1);
399
400
sumState.add(event.getValue());
401
avgState.add(event.getValue());
402
403
// Set timer if needed
404
if (event.needsCleanup()) {
405
ctx.timerService().registerEventTimeTimer(
406
event.getTimestamp() + Duration.ofHours(1).toMillis()
407
);
408
}
409
}
410
}
411
```
412
413
### Conditional Processing Function
414
415
```java
416
public class ConditionalReaderFunction extends KeyedStateReaderFunction<String, FilteredResult> {
417
private ValueState<UserData> userDataState;
418
private ListState<Transaction> transactionState;
419
private final Predicate<UserData> userFilter;
420
private final Predicate<Transaction> transactionFilter;
421
422
public ConditionalReaderFunction(
423
Predicate<UserData> userFilter,
424
Predicate<Transaction> transactionFilter
425
) {
426
this.userFilter = userFilter;
427
this.transactionFilter = transactionFilter;
428
}
429
430
@Override
431
public void open(Configuration parameters) throws Exception {
432
super.open(parameters);
433
434
userDataState = getRuntimeContext().getState(
435
new ValueStateDescriptor<>("userData", UserData.class)
436
);
437
transactionState = getRuntimeContext().getListState(
438
new ListStateDescriptor<>("transactions", Transaction.class)
439
);
440
}
441
442
@Override
443
public void readKey(String key, Context ctx, Collector<FilteredResult> out) throws Exception {
444
UserData userData = userDataState.value();
445
446
// Apply user filter
447
if (userData == null || !userFilter.test(userData)) {
448
return;
449
}
450
451
// Filter transactions
452
List<Transaction> filteredTransactions = new ArrayList<>();
453
for (Transaction transaction : transactionState.get()) {
454
if (transactionFilter.test(transaction)) {
455
filteredTransactions.add(transaction);
456
}
457
}
458
459
if (!filteredTransactions.isEmpty()) {
460
FilteredResult result = new FilteredResult(key, userData, filteredTransactions);
461
out.collect(result);
462
}
463
}
464
}
465
```
466
467
## Error Handling in Functions
468
469
### Robust Bootstrap Function
470
471
```java
472
public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, DataEvent> {
473
private ValueState<String> dataState;
474
private static final Logger LOG = LoggerFactory.getLogger(RobustBootstrapFunction.class);
475
476
@Override
477
public void open(Configuration parameters) throws Exception {
478
super.open(parameters);
479
480
try {
481
dataState = getRuntimeContext().getState(
482
new ValueStateDescriptor<>("data", String.class)
483
);
484
} catch (Exception e) {
485
LOG.error("Failed to initialize state", e);
486
throw new RuntimeException("State initialization failed", e);
487
}
488
}
489
490
@Override
491
public void processElement(DataEvent event, Context ctx) throws Exception {
492
try {
493
// Validate input
494
if (event == null || event.getData() == null) {
495
LOG.warn("Received null event or data for key: {}", ctx.getCurrentKey());
496
return;
497
}
498
499
// Process event
500
String currentData = dataState.value();
501
String newData = combineData(currentData, event.getData());
502
dataState.update(newData);
503
504
} catch (Exception e) {
505
LOG.error("Failed to process event for key: {}", ctx.getCurrentKey(), e);
506
// Decide whether to re-throw or continue
507
// throw new RuntimeException("Processing failed", e);
508
}
509
}
510
511
private String combineData(String current, String newData) {
512
return current != null ? current + "," + newData : newData;
513
}
514
}
515
```
516
517
### Safe Reader Function
518
519
```java
520
public class SafeReaderFunction extends KeyedStateReaderFunction<String, SafeResult> {
521
private ValueState<String> dataState;
522
private static final Logger LOG = LoggerFactory.getLogger(SafeReaderFunction.class);
523
524
@Override
525
public void open(Configuration parameters) throws Exception {
526
super.open(parameters);
527
528
dataState = getRuntimeContext().getState(
529
new ValueStateDescriptor<>("data", String.class)
530
);
531
}
532
533
@Override
534
public void readKey(String key, Context ctx, Collector<SafeResult> out) throws Exception {
535
try {
536
String data = dataState.value();
537
538
if (data != null) {
539
SafeResult result = new SafeResult(key, data, true);
540
out.collect(result);
541
} else {
542
LOG.debug("No data found for key: {}", key);
543
// Optionally emit a result indicating missing data
544
SafeResult result = new SafeResult(key, null, false);
545
out.collect(result);
546
}
547
548
} catch (Exception e) {
549
LOG.error("Failed to read state for key: {}", key, e);
550
// Emit error result instead of failing
551
SafeResult errorResult = new SafeResult(key, "ERROR: " + e.getMessage(), false);
552
out.collect(errorResult);
553
}
554
}
555
}