0
# Functions
1
2
User-defined functions (UDFs) allow you to implement custom processing logic in Flink. The Scala API provides various function interfaces for different use cases, from simple transformations to complex stateful processing.
3
4
## Capabilities
5
6
### Process Functions
7
8
Core process functions for custom stream processing logic.
9
10
```scala { .api }
11
/**
12
* ProcessFunction for single stream processing
13
*/
14
trait ProcessFunction[I, O] {
15
def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit
16
17
def onTimer(timestamp: Long, ctx: ProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
18
19
trait Context {
20
def element(): I
21
def timestamp(): Long
22
def timerService(): TimerService
23
def output[X](outputTag: OutputTag[X], value: X): Unit
24
}
25
26
trait OnTimerContext extends Context {
27
def timeDomain(): TimeDomain
28
}
29
}
30
31
/**
32
* KeyedProcessFunction for keyed stream processing
33
*/
34
trait KeyedProcessFunction[K, I, O] {
35
def processElement(value: I, ctx: KeyedProcessFunction.Context, out: Collector[O]): Unit
36
37
def onTimer(timestamp: Long, ctx: KeyedProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}
38
39
trait Context {
40
def getCurrentKey: K
41
def element(): I
42
def timestamp(): Long
43
def timerService(): TimerService
44
def output[X](outputTag: OutputTag[X], value: X): Unit
45
}
46
47
trait OnTimerContext extends Context {
48
def timeDomain(): TimeDomain
49
}
50
}
51
```
52
53
**Usage Examples:**
54
55
```scala
56
import org.apache.flink.streaming.api.functions.ProcessFunction
57
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
58
import org.apache.flink.util.Collector
59
import org.apache.flink.streaming.api.TimeDomain
60
61
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
62
63
// Simple process function
64
class TemperatureAlertFunction extends ProcessFunction[SensorReading, String] {
65
66
override def processElement(
67
reading: SensorReading,
68
ctx: ProcessFunction.Context,
69
out: Collector[String]
70
): Unit = {
71
if (reading.temperature > 30.0) {
72
out.collect(s"High temperature alert: ${reading.temperature}°C from ${reading.sensorId}")
73
}
74
75
// Set timer for 10 seconds later
76
ctx.timerService().registerEventTimeTimer(reading.timestamp + 10000)
77
}
78
79
override def onTimer(
80
timestamp: Long,
81
ctx: ProcessFunction.OnTimerContext,
82
out: Collector[String]
83
): Unit = {
84
out.collect(s"Timer fired at $timestamp")
85
}
86
}
87
88
// Keyed process function with state
89
class TemperatureMonitorFunction extends KeyedProcessFunction[String, SensorReading, String] {
90
91
private var lastTemperature: ValueState[Double] = _
92
93
override def open(parameters: Configuration): Unit = {
94
val descriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
95
lastTemperature = getRuntimeContext.getState(descriptor)
96
}
97
98
override def processElement(
99
reading: SensorReading,
100
ctx: KeyedProcessFunction.Context,
101
out: Collector[String]
102
): Unit = {
103
val previousTemp = Option(lastTemperature.value()).getOrElse(0.0)
104
val currentTemp = reading.temperature
105
106
if (math.abs(currentTemp - previousTemp) > 5.0) {
107
out.collect(s"Temperature spike detected for ${ctx.getCurrentKey}: $previousTemp -> $currentTemp")
108
}
109
110
lastTemperature.update(currentTemp)
111
}
112
}
113
```
114
115
### Window Functions
116
117
Functions for processing windowed data.
118
119
```scala { .api }
120
/**
121
* WindowFunction for processing window contents
122
*/
123
trait WindowFunction[IN, OUT, KEY, W <: Window] {
124
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
125
}
126
127
/**
128
* ProcessWindowFunction with rich context
129
*/
130
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
131
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
132
133
trait Context {
134
def window: W
135
def currentProcessingTime: Long
136
def currentWatermark: Long
137
def windowState: KeyedStateStore
138
def globalState: KeyedStateStore
139
def output[X](outputTag: OutputTag[X], value: X): Unit
140
}
141
}
142
143
/**
144
* AllWindowFunction for non-keyed windows
145
*/
146
trait AllWindowFunction[IN, OUT, W <: Window] {
147
def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit
148
}
149
150
/**
151
* ProcessAllWindowFunction for non-keyed windows with rich context
152
*/
153
trait ProcessAllWindowFunction[IN, OUT, W <: Window] {
154
def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
155
156
trait Context {
157
def window: W
158
def currentProcessingTime: Long
159
def currentWatermark: Long
160
def windowState: KeyedStateStore
161
def globalState: KeyedStateStore
162
def output[X](outputTag: OutputTag[X], value: X): Unit
163
}
164
}
165
```
166
167
**Usage Examples:**
168
169
```scala
170
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
171
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
172
173
// Simple window function
174
class TemperatureStatsFunction extends WindowFunction[SensorReading, String, String, TimeWindow] {
175
176
override def apply(
177
sensorId: String,
178
window: TimeWindow,
179
input: Iterable[SensorReading],
180
out: Collector[String]
181
): Unit = {
182
val readings = input.toList
183
val count = readings.length
184
val avgTemp = readings.map(_.temperature).sum / count
185
val minTemp = readings.map(_.temperature).min
186
val maxTemp = readings.map(_.temperature).max
187
188
out.collect(s"Sensor $sensorId: Window [${window.getStart}-${window.getEnd}], " +
189
s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp")
190
}
191
}
192
193
// Process window function with state
194
class ProcessTemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
195
196
override def process(
197
sensorId: String,
198
context: Context,
199
elements: Iterable[SensorReading],
200
out: Collector[String]
201
): Unit = {
202
val readings = elements.toList
203
val stats = calculateStats(readings)
204
205
// Access window metadata
206
val windowStart = context.window.getStart
207
val windowEnd = context.window.getEnd
208
val watermark = context.currentWatermark
209
210
// Use window state for cross-window information
211
val windowCountDescriptor = new ValueStateDescriptor[Long]("windowCount", classOf[Long])
212
val windowCount = context.windowState.getState(windowCountDescriptor)
213
val currentCount = Option(windowCount.value()).getOrElse(0L) + 1
214
windowCount.update(currentCount)
215
216
out.collect(s"Sensor $sensorId: Window #$currentCount [$windowStart-$windowEnd], " +
217
s"Stats: $stats, Watermark: $watermark")
218
}
219
220
private def calculateStats(readings: List[SensorReading]): String = {
221
val temps = readings.map(_.temperature)
222
s"Count: ${temps.length}, Avg: ${temps.sum / temps.length}, Min: ${temps.min}, Max: ${temps.max}"
223
}
224
}
225
```
226
227
### Rich Functions
228
229
Rich versions of functions with lifecycle methods and runtime context access.
230
231
```scala { .api }
232
/**
233
* RichFunction base trait
234
*/
235
trait RichFunction {
236
def open(parameters: Configuration): Unit = {}
237
def close(): Unit = {}
238
def getRuntimeContext: RuntimeContext
239
def setRuntimeContext(context: RuntimeContext): Unit
240
def getIterationRuntimeContext: IterationRuntimeContext
241
}
242
243
/**
244
* Rich process functions
245
*/
246
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction
247
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction
248
249
/**
250
* Rich window functions
251
*/
252
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] extends WindowFunction[IN, OUT, KEY, W] with RichFunction
253
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction
254
abstract class RichAllWindowFunction[IN, OUT, W <: Window] extends AllWindowFunction[IN, OUT, W] with RichFunction
255
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction
256
```
257
258
**Usage Examples:**
259
260
```scala
261
import org.apache.flink.streaming.api.scala.function.RichProcessFunction
262
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
263
import org.apache.flink.metrics.Counter
264
import org.apache.flink.configuration.Configuration
265
266
class RichTemperatureProcessor extends RichProcessFunction[SensorReading, String] {
267
268
private var highTempCount: Counter = _
269
private var lastProcessingTime: ValueState[Long] = _
270
271
override def open(parameters: Configuration): Unit = {
272
// Initialize metrics
273
highTempCount = getRuntimeContext
274
.getMetricGroup
275
.addGroup("temperature")
276
.counter("high_temp_count")
277
278
// Initialize state
279
val descriptor = new ValueStateDescriptor[Long]("lastProcessingTime", classOf[Long])
280
lastProcessingTime = getRuntimeContext.getState(descriptor)
281
}
282
283
override def close(): Unit = {
284
// Cleanup resources if needed
285
}
286
287
override def processElement(
288
reading: SensorReading,
289
ctx: ProcessFunction.Context,
290
out: Collector[String]
291
): Unit = {
292
val currentTime = ctx.timerService().currentProcessingTime()
293
val lastTime = Option(lastProcessingTime.value()).getOrElse(0L)
294
295
if (reading.temperature > 30.0) {
296
highTempCount.inc()
297
out.collect(s"High temperature: ${reading.temperature}°C")
298
}
299
300
// Update processing time
301
lastProcessingTime.update(currentTime)
302
303
// Access runtime context information
304
val subtaskIndex = getRuntimeContext.getIndexOfThisSubtask
305
val parallelism = getRuntimeContext.getNumberOfParallelSubtasks
306
307
out.collect(s"Processed by subtask $subtaskIndex/$parallelism at time $currentTime")
308
}
309
}
310
```
311
312
### Stateful Functions
313
314
Functions that maintain state across elements.
315
316
```scala { .api }
317
/**
318
* StatefulFunction marker trait
319
*/
320
trait StatefulFunction
321
322
/**
323
* State descriptors for different state types
324
*/
325
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
326
class ListStateDescriptor[T](name: String, typeClass: Class[T])
327
class MapStateDescriptor[UK, UV](name: String, keyClass: Class[UK], valueClass: Class[UV])
328
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
329
class AggregatingStateDescriptor[IN, ACC, OUT](
330
name: String,
331
aggregateFunction: AggregateFunction[IN, ACC, OUT],
332
accClass: Class[ACC]
333
)
334
```
335
336
**Usage Examples:**
337
338
```scala
339
import org.apache.flink.api.common.state._
340
import org.apache.flink.api.common.functions.ReduceFunction
341
342
class StatefulWordCounter extends RichKeyedProcessFunction[String, String, (String, Long)] {
343
344
private var wordCount: ValueState[Long] = _
345
private var wordHistory: ListState[String] = _
346
private var wordTimestamps: MapState[String, Long] = _
347
private var totalCount: ReducingState[Long] = _
348
349
override def open(parameters: Configuration): Unit = {
350
// Value state
351
wordCount = getRuntimeContext.getState(
352
new ValueStateDescriptor[Long]("wordCount", classOf[Long])
353
)
354
355
// List state
356
wordHistory = getRuntimeContext.getListState(
357
new ListStateDescriptor[String]("wordHistory", classOf[String])
358
)
359
360
// Map state
361
wordTimestamps = getRuntimeContext.getMapState(
362
new MapStateDescriptor[String, Long]("wordTimestamps", classOf[String], classOf[Long])
363
)
364
365
// Reducing state
366
totalCount = getRuntimeContext.getReducingState(
367
new ReducingStateDescriptor[Long](
368
"totalCount",
369
new ReduceFunction[Long] {
370
override def reduce(value1: Long, value2: Long): Long = value1 + value2
371
},
372
classOf[Long]
373
)
374
)
375
}
376
377
override def processElement(
378
word: String,
379
ctx: KeyedProcessFunction.Context,
380
out: Collector[(String, Long)]
381
): Unit = {
382
// Update value state
383
val currentCount = Option(wordCount.value()).getOrElse(0L) + 1
384
wordCount.update(currentCount)
385
386
// Update list state
387
wordHistory.add(word)
388
389
// Update map state
390
wordTimestamps.put(word, ctx.timestamp())
391
392
// Update reducing state
393
totalCount.add(1L)
394
395
out.collect((word, currentCount))
396
}
397
}
398
```
399
400
### Timer Service
401
402
Service for registering and managing timers in process functions.
403
404
```scala { .api }
405
/**
406
* TimerService for managing timers
407
*/
408
trait TimerService {
409
def currentProcessingTime(): Long
410
def currentWatermark(): Long
411
def registerProcessingTimeTimer(time: Long): Unit
412
def registerEventTimeTimer(time: Long): Unit
413
def deleteProcessingTimeTimer(time: Long): Unit
414
def deleteEventTimeTimer(time: Long): Unit
415
}
416
417
/**
418
* TimeDomain enumeration
419
*/
420
enum TimeDomain {
421
EVENT_TIME, PROCESSING_TIME
422
}
423
```
424
425
**Usage Examples:**
426
427
```scala
428
class TimerBasedProcessor extends KeyedProcessFunction[String, SensorReading, String] {
429
430
private var lastReading: ValueState[SensorReading] = _
431
432
override def open(parameters: Configuration): Unit = {
433
lastReading = getRuntimeContext.getState(
434
new ValueStateDescriptor[SensorReading]("lastReading", classOf[SensorReading])
435
)
436
}
437
438
override def processElement(
439
reading: SensorReading,
440
ctx: KeyedProcessFunction.Context,
441
out: Collector[String]
442
): Unit = {
443
// Store current reading
444
lastReading.update(reading)
445
446
// Register timer for 30 seconds later (event time)
447
val timerTime = reading.timestamp + 30000
448
ctx.timerService().registerEventTimeTimer(timerTime)
449
450
// Register processing time timer for 1 minute later
451
val processingTimer = ctx.timerService().currentProcessingTime() + 60000
452
ctx.timerService().registerProcessingTimeTimer(processingTimer)
453
454
out.collect(s"Processed reading: $reading")
455
}
456
457
override def onTimer(
458
timestamp: Long,
459
ctx: KeyedProcessFunction.OnTimerContext,
460
out: Collector[String]
461
): Unit = {
462
val key = ctx.getCurrentKey
463
val timeDomain = ctx.timeDomain()
464
val lastReadingValue = lastReading.value()
465
466
timeDomain match {
467
case TimeDomain.EVENT_TIME =>
468
out.collect(s"Event time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
469
case TimeDomain.PROCESSING_TIME =>
470
out.collect(s"Processing time timer fired for $key at $timestamp. Last reading: $lastReadingValue")
471
}
472
473
// Clean up state if needed
474
if (lastReadingValue != null && timestamp - lastReadingValue.timestamp > 300000) {
475
lastReading.clear()
476
}
477
}
478
}
479
```
480
481
## Types
482
483
```scala { .api }
484
// Core function types
485
trait ProcessFunction[I, O]
486
trait KeyedProcessFunction[K, I, O]
487
trait WindowFunction[IN, OUT, KEY, W <: Window]
488
trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]
489
trait AllWindowFunction[IN, OUT, W <: Window]
490
trait ProcessAllWindowFunction[IN, OUT, W <: Window]
491
492
// Rich function types
493
trait RichFunction
494
abstract class RichProcessFunction[I, O]
495
abstract class RichKeyedProcessFunction[K, I, O]
496
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
497
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
498
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
499
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
500
501
// State types
502
trait ValueState[T]
503
trait ListState[T]
504
trait MapState[UK, UV]
505
trait ReducingState[T]
506
trait AggregatingState[IN, OUT]
507
508
// State descriptors
509
class ValueStateDescriptor[T]
510
class ListStateDescriptor[T]
511
class MapStateDescriptor[UK, UV]
512
class ReducingStateDescriptor[T]
513
class AggregatingStateDescriptor[IN, ACC, OUT]
514
515
// Context and service types
516
trait TimerService
517
enum TimeDomain
518
trait KeyedStateStore
519
class Configuration
520
trait RuntimeContext
521
trait IterationRuntimeContext
522
trait Counter
523
trait MetricGroup
524
525
// Window types
526
trait Window
527
class TimeWindow
528
class GlobalWindow
529
530
// Utility types
531
trait Collector[T]
532
class OutputTag[T]
533
```