or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

processing-functions.mddocs/

0

# Processing Functions

1

2

Low-level processing functions provide access to element processing with timers, state, and side outputs. These are essential for complex event-driven logic and stateful stream processing.

3

4

## Capabilities

5

6

### ProcessFunction

7

8

Basic processing function for DataStream operations.

9

10

```scala { .api }

11

abstract class ProcessFunction[I, O] {

12

/**

13

* Process each element from the input stream

14

* @param value Input element to process

15

* @param ctx Context providing access to timestamp, timers, and side outputs

16

* @param out Collector for emitting output elements

17

*/

18

def processElement(value: I, ctx: Context, out: Collector[O]): Unit

19

20

/**

21

* Called when a timer fires (optional override)

22

* @param timestamp Timestamp of the fired timer

23

* @param ctx OnTimerContext providing timer information

24

* @param out Collector for emitting output elements

25

*/

26

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}

27

28

abstract class Context {

29

def timestamp(): Long

30

def timerService(): TimerService

31

def output[X](outputTag: OutputTag[X], value: X): Unit

32

}

33

34

abstract class OnTimerContext extends Context {

35

def timeDomain(): TimeDomain

36

}

37

}

38

```

39

40

### KeyedProcessFunction

41

42

Processing function for KeyedStream operations with access to keys.

43

44

```scala { .api }

45

abstract class KeyedProcessFunction[K, I, O] {

46

/**

47

* Process each element from the input keyed stream

48

* @param value Input element to process

49

* @param ctx Context providing access to key, timestamp, timers, and side outputs

50

* @param out Collector for emitting output elements

51

*/

52

def processElement(value: I, ctx: Context, out: Collector[O]): Unit

53

54

/**

55

* Called when a timer fires (optional override)

56

* @param timestamp Timestamp of the fired timer

57

* @param ctx OnTimerContext providing timer and key information

58

* @param out Collector for emitting output elements

59

*/

60

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}

61

62

abstract class Context {

63

def timestamp(): Long

64

def getCurrentKey: K

65

def timerService(): TimerService

66

def output[X](outputTag: OutputTag[X], value: X): Unit

67

}

68

69

abstract class OnTimerContext extends Context {

70

def timeDomain(): TimeDomain

71

}

72

}

73

```

74

75

**Usage Examples:**

76

77

```scala

78

import org.apache.flink.streaming.api.scala._

79

import org.apache.flink.streaming.api.functions.ProcessFunction

80

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

81

import org.apache.flink.util.Collector

82

83

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

84

case class Alert(sensorId: String, message: String, timestamp: Long)

85

86

// ProcessFunction example - detect temperature spikes

87

class TemperatureSpikeFunction extends ProcessFunction[SensorReading, Alert] {

88

override def processElement(

89

reading: SensorReading,

90

ctx: ProcessFunction[SensorReading, Alert]#Context,

91

out: Collector[Alert]

92

): Unit = {

93

if (reading.temperature > 50.0) {

94

out.collect(Alert(

95

reading.sensorId,

96

s"High temperature detected: ${reading.temperature}°C",

97

ctx.timestamp()

98

))

99

}

100

}

101

}

102

103

// KeyedProcessFunction example - timeout detection

104

class SensorTimeoutFunction extends KeyedProcessFunction[String, SensorReading, Alert] {

105

106

override def processElement(

107

reading: SensorReading,

108

ctx: KeyedProcessFunction[String, SensorReading, Alert]#Context,

109

out: Collector[Alert]

110

): Unit = {

111

// Set a timer for 60 seconds from now

112

val timeoutTime = ctx.timestamp() + 60000

113

ctx.timerService().registerEventTimeTimer(timeoutTime)

114

}

115

116

override def onTimer(

117

timestamp: Long,

118

ctx: KeyedProcessFunction[String, SensorReading, Alert]#OnTimerContext,

119

out: Collector[Alert]

120

): Unit = {

121

out.collect(Alert(

122

ctx.getCurrentKey,

123

s"Sensor ${ctx.getCurrentKey} has been inactive for 60 seconds",

124

timestamp

125

))

126

}

127

}

128

129

// Apply processing functions

130

val readings = env.fromElements(

131

SensorReading("sensor1", 45.0, 1000L),

132

SensorReading("sensor2", 55.0, 2000L)

133

)

134

135

// Apply process function

136

val alerts = readings.process(new TemperatureSpikeFunction)

137

138

// Apply keyed process function

139

val timeoutAlerts = readings

140

.keyBy(_.sensorId)

141

.process(new SensorTimeoutFunction)

142

```

143

144

### CoProcessFunction

145

146

Processing function for ConnectedStreams with two input types.

147

148

```scala { .api }

149

abstract class CoProcessFunction[IN1, IN2, OUT] {

150

/**

151

* Process element from first input stream

152

* @param value Element from first stream

153

* @param ctx Context providing access to timers and side outputs

154

* @param out Collector for emitting output elements

155

*/

156

def processElement1(value: IN1, ctx: Context, out: Collector[OUT]): Unit

157

158

/**

159

* Process element from second input stream

160

* @param value Element from second stream

161

* @param ctx Context providing access to timers and side outputs

162

* @param out Collector for emitting output elements

163

*/

164

def processElement2(value: IN2, ctx: Context, out: Collector[OUT]): Unit

165

166

/**

167

* Called when a timer fires (optional override)

168

* @param timestamp Timestamp of the fired timer

169

* @param ctx OnTimerContext providing timer information

170

* @param out Collector for emitting output elements

171

*/

172

def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {}

173

174

abstract class Context {

175

def timestamp(): Long

176

def timerService(): TimerService

177

def output[X](outputTag: OutputTag[X], value: X): Unit

178

}

179

180

abstract class OnTimerContext extends Context {

181

def timeDomain(): TimeDomain

182

}

183

}

184

```

185

186

### Timer Service

187

188

Service for registering and managing timers in processing functions.

189

190

```scala { .api }

191

trait TimerService {

192

/**

193

* Get current processing time

194

* @return Current processing time in milliseconds

195

*/

196

def currentProcessingTime(): Long

197

198

/**

199

* Get current watermark (event time)

200

* @return Current watermark in milliseconds

201

*/

202

def currentWatermark(): Long

203

204

/**

205

* Register a processing time timer

206

* @param time Timer timestamp in processing time

207

*/

208

def registerProcessingTimeTimer(time: Long): Unit

209

210

/**

211

* Register an event time timer

212

* @param time Timer timestamp in event time

213

*/

214

def registerEventTimeTimer(time: Long): Unit

215

216

/**

217

* Delete a processing time timer

218

* @param time Timer timestamp to delete

219

*/

220

def deleteProcessingTimeTimer(time: Long): Unit

221

222

/**

223

* Delete an event time timer

224

* @param time Timer timestamp to delete

225

*/

226

def deleteEventTimeTimer(time: Long): Unit

227

}

228

```

229

230

**Usage Examples:**

231

232

```scala

233

import org.apache.flink.streaming.api.functions.co.CoProcessFunction

234

235

case class Order(id: String, customerId: String, amount: Double, timestamp: Long)

236

case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)

237

case class OrderPaymentMatch(orderId: String, paymentId: String, customerId: String)

238

239

class OrderPaymentMatcher extends CoProcessFunction[Order, Payment, OrderPaymentMatch] {

240

241

override def processElement1(

242

order: Order,

243

ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,

244

out: Collector[OrderPaymentMatch]

245

): Unit = {

246

// Store order and set timeout timer

247

// This would typically use state to store pending orders

248

val timeoutTime = ctx.timestamp() + 300000 // 5 minutes

249

ctx.timerService().registerEventTimeTimer(timeoutTime)

250

}

251

252

override def processElement2(

253

payment: Payment,

254

ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#Context,

255

out: Collector[OrderPaymentMatch]

256

): Unit = {

257

// Match payment with stored orders

258

// This would typically check state for matching orders

259

out.collect(OrderPaymentMatch("order1", payment.id, payment.customerId))

260

}

261

262

override def onTimer(

263

timestamp: Long,

264

ctx: CoProcessFunction[Order, Payment, OrderPaymentMatch]#OnTimerContext,

265

out: Collector[OrderPaymentMatch]

266

): Unit = {

267

// Handle unmatched orders (timeout case)

268

println(s"Order timeout at $timestamp")

269

}

270

}

271

```

272

273

## Types

274

275

```scala { .api }

276

// Time domain enumeration

277

sealed trait TimeDomain

278

object TimeDomain {

279

case object EVENT_TIME extends TimeDomain

280

case object PROCESSING_TIME extends TimeDomain

281

}

282

283

// Output tag for side outputs

284

case class OutputTag[T: TypeInformation](id: String) {

285

def getTypeInfo: TypeInformation[T]

286

}

287

288

// Rich process function with lifecycle methods

289

abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction {

290

override def open(parameters: Configuration): Unit = {}

291

override def close(): Unit = {}

292

def getRuntimeContext: RuntimeContext

293

def setRuntimeContext(t: RuntimeContext): Unit

294

}

295

296

// Rich keyed process function

297

abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction {

298

override def open(parameters: Configuration): Unit = {}

299

override def close(): Unit = {}

300

def getRuntimeContext: RuntimeContext

301

def setRuntimeContext(t: RuntimeContext): Unit

302

}

303

304

// Rich co-process function

305

abstract class RichCoProcessFunction[IN1, IN2, OUT] extends CoProcessFunction[IN1, IN2, OUT] with RichFunction {

306

override def open(parameters: Configuration): Unit = {}

307

override def close(): Unit = {}

308

def getRuntimeContext: RuntimeContext

309

def setRuntimeContext(t: RuntimeContext): Unit

310

}

311

```