0
# Processing Functions
1
2
Low-level processing functions provide access to element processing with timers, state, and side outputs. These are essential for complex event-driven logic and stateful stream processing.
3
4
## Capabilities
5
6
### ProcessFunction
7
8
Basic processing function for DataStream operations.
9
10
```scala { .api }
11
abstract class ProcessFunction[I, O] {
12
/**
13
* Process each element from the input stream
14
* @param value Input element to process
15
* @param ctx Context providing access to timestamp, timers, and side outputs
16
* @param out Collector for emitting output elements
17
*/
18
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
19
20
/**
21
* Called when a timer fires (optional override)
22
* @param timestamp Timestamp of the fired timer
23
* @param ctx OnTimerContext providing timer information
24
* @param out Collector for emitting output elements
25
*/
26
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
27
28
abstract class Context {
29
def timestamp(): Long
30
def timerService(): TimerService
31
def output[X](outputTag: OutputTag[X], value: X): Unit
32
}
33
34
abstract class OnTimerContext extends Context {
35
def timeDomain(): TimeDomain
36
}
37
}
38
```
39
40
### KeyedProcessFunction
41
42
Processing function for KeyedStream operations with access to keys.
43
44
```scala { .api }
45
abstract class KeyedProcessFunction[K, I, O] {
46
/**
47
* Process each element from the input keyed stream
48
* @param value Input element to process
49
* @param ctx Context providing access to key, timestamp, timers, and side outputs
50
* @param out Collector for emitting output elements
51
*/
52
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
53
54
/**
55
* Called when a timer fires (optional override)
56
* @param timestamp Timestamp of the fired timer
57
* @param ctx OnTimerContext providing timer and key information
58
* @param out Collector for emitting output elements
59
*/
60
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
61
62
abstract class Context {
63
def timestamp(): Long
64
def getCurrentKey: K
65
def timerService(): TimerService
66
def output[X](outputTag: OutputTag[X], value: X): Unit
67
}
68
69
abstract class OnTimerContext extends Context {
70
def timeDomain(): TimeDomain
71
}
72
}
73
```
74
75
**Usage Examples:**
76
77
```scala
78
import org.apache.flink.streaming.api.scala._
79
import org.apache.flink.streaming.api.functions.ProcessFunction
80
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
81
import org.apache.flink.util.Collector
82
83
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
84
case class Alert(sensorId: String, message: String, timestamp: Long)
85
86
// ProcessFunction example - detect temperature spikes
87
class TemperatureSpikeFunction extends ProcessFunction[SensorReading, Alert] {
88
override def processElement(
89
reading: SensorReading,
90
ctx: ProcessFunction[SensorReading, Alert]#Context,
91
out: Collector[Alert]
92
): Unit = {
93
if (reading.temperature > 50.0) {
94
out.collect(Alert(
95
reading.sensorId,
96
s"High temperature detected: ${reading.temperature}°C",
97
ctx.timestamp()
98
))
99
}
100
}
101
}
102
103
// KeyedProcessFunction example - timeout detection
104
class SensorTimeoutFunction extends KeyedProcessFunction[String, SensorReading, Alert] {
105
106
override def processElement(
107
reading: SensorReading,
108
ctx: KeyedProcessFunction[String, SensorReading, Alert]#Context,
109
out: Collector[Alert]
110
): Unit = {
111
// Set a timer for 60 seconds from now
112
val timeoutTime = ctx.timestamp() + 60000
113
ctx.timerService().registerEventTimeTimer(timeoutTime)
114
}
115
116
override def onTimer(
117
timestamp: Long,
118
ctx: KeyedProcessFunction[String, SensorReading, Alert]#OnTimerContext,
119
out: Collector[Alert]
120
): Unit = {
121
out.collect(Alert(
122
ctx.getCurrentKey,
123
s"Sensor ${ctx.getCurrentKey} has been inactive for 60 seconds",
124
timestamp
125
))
126
}
127
}
128
129
// Apply processing functions
130
val readings = env.fromElements(
131
SensorReading("sensor1", 45.0, 1000L),
132
SensorReading("sensor2", 55.0, 2000L)
133
)
134
135
// Apply process function
136
val alerts = readings.process(new TemperatureSpikeFunction)
137
138
// Apply keyed process function
139
val timeoutAlerts = readings
140
.keyBy(_.sensorId)
141
.process(new SensorTimeoutFunction)
142
```
143
144
### CoProcessFunction
145
146
Processing function for ConnectedStreams with two input types.
147
148
```scala { .api }
149
abstract class CoProcessFunction[IN1, IN2, OUT] {
150
/**
151
* Process element from first input stream
152
* @param value Element from first stream
153
* @param ctx Context providing access to timers and side outputs
154
* @param out Collector for emitting output elements
155
*/
156
def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit
157
158
/**
159
* Process element from second input stream
160
* @param value Element from second stream
161
* @param ctx Context providing access to timers and side outputs
162
* @param out Collector for emitting output elements
163
*/
164
def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit
165
166
/**
167
* Called when a timer fires (optional override)
168
* @param timestamp Timestamp of the fired timer
169
* @param ctx OnTimerContext providing timer information
170
* @param out Collector for emitting output elements
171
*/
172
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}
173
174
abstract class Context {
175
def timestamp(): Long
176
def timerService(): TimerService
177
def output[X](outputTag: OutputTag[X], value: X): Unit
178
}
179
180
abstract class OnTimerContext extends Context {
181
def timeDomain(): TimeDomain
182
}
183
}
184
```
185
186
### Timer Service
187
188
Service for registering and managing timers in processing functions.
189
190
```scala { .api }
191
trait TimerService {
192
/**
193
* Get current processing time
194
* @return Current processing time in milliseconds
195
*/
196
def currentProcessingTime(): Long
197
198
/**
199
* Get current watermark (event time)
200
* @return Current watermark in milliseconds
201
*/
202
def currentWatermark(): Long
203
204
/**
205
* Register a processing time timer
206
* @param time Timer timestamp in processing time
207
*/
208
def registerProcessingTimeTimer(time: Long): Unit
209
210
/**
211
* Register an event time timer
212
* @param time Timer timestamp in event time
213
*/
214
def registerEventTimeTimer(time: Long): Unit
215
216
/**
217
* Delete a processing time timer
218
* @param time Timer timestamp to delete
219
*/
220
def deleteProcessingTimeTimer(time: Long): Unit
221
222
/**
223
* Delete an event time timer
224
* @param time Timer timestamp to delete
225
*/
226
def deleteEventTimeTimer(time: Long): Unit
227
}
228
```
229
230
**Usage Examples:**
231
232
```scala
233
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
234
235
case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
236
case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)
237
case class OrderPaymentMatch(orderId: String, paymentId: String, customerId: String)
238
239
class OrderPaymentMatcher extends CoProcessFunction[Order, Payment, OrderPaymentMatch] {
240
241
override def processElement1(
242
order: Order,
243
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
244
out: Collector[OrderPaymentMatch]
245
): Unit = {
246
// Store order and set timeout timer
247
// This would typically use state to store pending orders
248
val timeoutTime = ctx.timestamp() + 300000 // 5 minutes
249
ctx.timerService().registerEventTimeTimer(timeoutTime)
250
}
251
252
override def processElement2(
253
payment: Payment,
254
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,
255
out: Collector[OrderPaymentMatch]
256
): Unit = {
257
// Match payment with stored orders
258
// This would typically check state for matching orders
259
out.collect(OrderPaymentMatch("order1", payment.id, payment.customerId))
260
}
261
262
override def onTimer(
263
timestamp: Long,
264
ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#OnTimerContext,
265
out: Collector[OrderPaymentMatch]
266
): Unit = {
267
// Handle unmatched orders (timeout case)
268
println(s"Order timeout at $timestamp")
269
}
270
}
271
```
272
273
## Types
274
275
```scala { .api }
276
// Time domain enumeration
277
sealed trait TimeDomain
278
object TimeDomain {
279
case object EVENT_TIME extends TimeDomain
280
case object PROCESSING_TIME extends TimeDomain
281
}
282
283
// Output tag for side outputs
284
case class OutputTag[T: TypeInformation](id: String) {
285
def getTypeInfo: TypeInformation[T]
286
}
287
288
// Rich process function with lifecycle methods
289
abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction {
290
override def open(parameters: Configuration): Unit = {}
291
override def close(): Unit = {}
292
def getRuntimeContext: RuntimeContext
293
def setRuntimeContext(t: RuntimeContext): Unit
294
}
295
296
// Rich keyed process function
297
abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction {
298
override def open(parameters: Configuration): Unit = {}
299
override def close(): Unit = {}
300
def getRuntimeContext: RuntimeContext
301
def setRuntimeContext(t: RuntimeContext): Unit
302
}
303
304
// Rich co-process function
305
abstract class RichCoProcessFunction[IN1, IN2, OUT] extends CoProcessFunction[IN1, IN2, OUT] with RichFunction {
306
override def open(parameters: Configuration): Unit = {}
307
override def close(): Unit = {}
308
def getRuntimeContext: RuntimeContext
309
def setRuntimeContext(t: RuntimeContext): Unit
310
}
311
```