or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md

functions.mddocs/

0

# Functions

1

2

User-defined functions (UDFs) allow you to implement custom processing logic in Flink. The Scala API provides various function interfaces for different use cases, from simple transformations to complex stateful processing.

3

4

## Capabilities

5

6

### Process Functions

7

8

Core process functions for custom stream processing logic.

9

10

```scala { .api }

11

/**

12

* ProcessFunction for single stream processing

13

*/

14

trait ProcessFunction[I, O] {

15

def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit

16

17

def onTimer(timestamp: Long, ctx: ProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}

18

19

trait Context {

20

def element(): I

21

def timestamp(): Long

22

def timerService(): TimerService

23

def output[X](outputTag: OutputTag[X], value: X): Unit

24

}

25

26

trait OnTimerContext extends Context {

27

def timeDomain(): TimeDomain

28

}

29

}

30

31

/**

32

* KeyedProcessFunction for keyed stream processing

33

*/

34

trait KeyedProcessFunction[K, I, O] {

35

def processElement(value: I, ctx: KeyedProcessFunction.Context, out: Collector[O]): Unit

36

37

def onTimer(timestamp: Long, ctx: KeyedProcessFunction.OnTimerContext, out: Collector[O]): Unit = {}

38

39

trait Context {

40

def getCurrentKey: K

41

def element(): I

42

def timestamp(): Long

43

def timerService(): TimerService

44

def output[X](outputTag: OutputTag[X], value: X): Unit

45

}

46

47

trait OnTimerContext extends Context {

48

def timeDomain(): TimeDomain

49

}

50

}

51

```

52

53

**Usage Examples:**

54

55

```scala

56

import org.apache.flink.streaming.api.functions.ProcessFunction

57

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

58

import org.apache.flink.util.Collector

59

import org.apache.flink.streaming.api.TimeDomain

60

61

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

62

63

// Simple process function

64

class TemperatureAlertFunction extends ProcessFunction[SensorReading, String] {

65

66

override def processElement(

67

reading: SensorReading,

68

ctx: ProcessFunction.Context,

69

out: Collector[String]

70

): Unit = {

71

if (reading.temperature > 30.0) {

72

out.collect(s"High temperature alert: ${reading.temperature}°C from ${reading.sensorId}")

73

}

74

75

// Set timer for 10 seconds later

76

ctx.timerService().registerEventTimeTimer(reading.timestamp + 10000)

77

}

78

79

override def onTimer(

80

timestamp: Long,

81

ctx: ProcessFunction.OnTimerContext,

82

out: Collector[String]

83

): Unit = {

84

out.collect(s"Timer fired at $timestamp")

85

}

86

}

87

88

// Keyed process function with state

89

class TemperatureMonitorFunction extends KeyedProcessFunction[String, SensorReading, String] {

90

91

private var lastTemperature: ValueState[Double] = _

92

93

override def open(parameters: Configuration): Unit = {

94

val descriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])

95

lastTemperature = getRuntimeContext.getState(descriptor)

96

}

97

98

override def processElement(

99

reading: SensorReading,

100

ctx: KeyedProcessFunction.Context,

101

out: Collector[String]

102

): Unit = {

103

val previousTemp = Option(lastTemperature.value()).getOrElse(0.0)

104

val currentTemp = reading.temperature

105

106

if (math.abs(currentTemp - previousTemp) > 5.0) {

107

out.collect(s"Temperature spike detected for ${ctx.getCurrentKey}: $previousTemp -> $currentTemp")

108

}

109

110

lastTemperature.update(currentTemp)

111

}

112

}

113

```

114

115

### Window Functions

116

117

Functions for processing windowed data.

118

119

```scala { .api }

120

/**

121

* WindowFunction for processing window contents

122

*/

123

trait WindowFunction[IN, OUT, KEY, W <: Window] {

124

def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit

125

}

126

127

/**

128

* ProcessWindowFunction with rich context

129

*/

130

trait ProcessWindowFunction[IN, OUT, KEY, W <: Window] {

131

def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

132

133

trait Context {

134

def window: W

135

def currentProcessingTime: Long

136

def currentWatermark: Long

137

def windowState: KeyedStateStore

138

def globalState: KeyedStateStore

139

def output[X](outputTag: OutputTag[X], value: X): Unit

140

}

141

}

142

143

/**

144

* AllWindowFunction for non-keyed windows

145

*/

146

trait AllWindowFunction[IN, OUT, W <: Window] {

147

def apply(window: W, values: Iterable[IN], out: Collector[OUT]): Unit

148

}

149

150

/**

151

* ProcessAllWindowFunction for non-keyed windows with rich context

152

*/

153

trait ProcessAllWindowFunction[IN, OUT, W <: Window] {

154

def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

155

156

trait Context {

157

def window: W

158

def currentProcessingTime: Long

159

def currentWatermark: Long

160

def windowState: KeyedStateStore

161

def globalState: KeyedStateStore

162

def output[X](outputTag: OutputTag[X], value: X): Unit

163

}

164

}

165

```

166

167

**Usage Examples:**

168

169

```scala

170

import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}

171

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

172

173

// Simple window function

174

class TemperatureStatsFunction extends WindowFunction[SensorReading, String, String, TimeWindow] {

175

176

override def apply(

177

sensorId: String,

178

window: TimeWindow,

179

input: Iterable[SensorReading],

180

out: Collector[String]

181

): Unit = {

182

val readings = input.toList

183

val count = readings.length

184

val avgTemp = readings.map(_.temperature).sum / count

185

val minTemp = readings.map(_.temperature).min

186

val maxTemp = readings.map(_.temperature).max

187

188

out.collect(s"Sensor $sensorId: Window [${window.getStart}-${window.getEnd}], " +

189

s"Count: $count, Avg: $avgTemp, Min: $minTemp, Max: $maxTemp")

190

}

191

}

192

193

// Process window function with state

194

class ProcessTemperatureStatsFunction extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {

195

196

override def process(

197

sensorId: String,

198

context: Context,

199

elements: Iterable[SensorReading],

200

out: Collector[String]

201

): Unit = {

202

val readings = elements.toList

203

val stats = calculateStats(readings)

204

205

// Access window metadata

206

val windowStart = context.window.getStart

207

val windowEnd = context.window.getEnd

208

val watermark = context.currentWatermark

209

210

// Use window state for cross-window information

211

val windowCountDescriptor = new ValueStateDescriptor[Long]("windowCount", classOf[Long])

212

val windowCount = context.windowState.getState(windowCountDescriptor)

213

val currentCount = Option(windowCount.value()).getOrElse(0L) + 1

214

windowCount.update(currentCount)

215

216

out.collect(s"Sensor $sensorId: Window #$currentCount [$windowStart-$windowEnd], " +

217

s"Stats: $stats, Watermark: $watermark")

218

}

219

220

private def calculateStats(readings: List[SensorReading]): String = {

221

val temps = readings.map(_.temperature)

222

s"Count: ${temps.length}, Avg: ${temps.sum / temps.length}, Min: ${temps.min}, Max: ${temps.max}"

223

}

224

}

225

```

226

227

### Rich Functions

228

229

Rich versions of functions with lifecycle methods and runtime context access.

230

231

```scala { .api }

232

/**

233

* RichFunction base trait

234

*/

235

trait RichFunction {

236

def open(parameters: Configuration): Unit = {}

237

def close(): Unit = {}

238

def getRuntimeContext: RuntimeContext

239

def setRuntimeContext(context: RuntimeContext): Unit

240

def getIterationRuntimeContext: IterationRuntimeContext

241

}

242

243

/**

244

* Rich process functions

245

*/

246

abstract class RichProcessFunction[I, O] extends ProcessFunction[I, O] with RichFunction

247

abstract class RichKeyedProcessFunction[K, I, O] extends KeyedProcessFunction[K, I, O] with RichFunction

248

249

/**

250

* Rich window functions

251

*/

252

abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] extends WindowFunction[IN, OUT, KEY, W] with RichFunction

253

abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window] extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction

254

abstract class RichAllWindowFunction[IN, OUT, W <: Window] extends AllWindowFunction[IN, OUT, W] with RichFunction

255

abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window] extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction

256

```

257

258

**Usage Examples:**

259

260

```scala

261

import org.apache.flink.streaming.api.scala.function.RichProcessFunction

262

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}

263

import org.apache.flink.metrics.Counter

264

import org.apache.flink.configuration.Configuration

265

266

class RichTemperatureProcessor extends RichProcessFunction[SensorReading, String] {

267

268

private var highTempCount: Counter = _

269

private var lastProcessingTime: ValueState[Long] = _

270

271

override def open(parameters: Configuration): Unit = {

272

// Initialize metrics

273

highTempCount = getRuntimeContext

274

.getMetricGroup

275

.addGroup("temperature")

276

.counter("high_temp_count")

277

278

// Initialize state

279

val descriptor = new ValueStateDescriptor[Long]("lastProcessingTime", classOf[Long])

280

lastProcessingTime = getRuntimeContext.getState(descriptor)

281

}

282

283

override def close(): Unit = {

284

// Cleanup resources if needed

285

}

286

287

override def processElement(

288

reading: SensorReading,

289

ctx: ProcessFunction.Context,

290

out: Collector[String]

291

): Unit = {

292

val currentTime = ctx.timerService().currentProcessingTime()

293

val lastTime = Option(lastProcessingTime.value()).getOrElse(0L)

294

295

if (reading.temperature > 30.0) {

296

highTempCount.inc()

297

out.collect(s"High temperature: ${reading.temperature}°C")

298

}

299

300

// Update processing time

301

lastProcessingTime.update(currentTime)

302

303

// Access runtime context information

304

val subtaskIndex = getRuntimeContext.getIndexOfThisSubtask

305

val parallelism = getRuntimeContext.getNumberOfParallelSubtasks

306

307

out.collect(s"Processed by subtask $subtaskIndex/$parallelism at time $currentTime")

308

}

309

}

310

```

311

312

### Stateful Functions

313

314

Functions that maintain state across elements.

315

316

```scala { .api }

317

/**

318

* StatefulFunction marker trait

319

*/

320

trait StatefulFunction

321

322

/**

323

* State descriptors for different state types

324

*/

325

class ValueStateDescriptor[T](name: String, typeClass: Class[T])

326

class ListStateDescriptor[T](name: String, typeClass: Class[T])

327

class MapStateDescriptor[UK, UV](name: String, keyClass: Class[UK], valueClass: Class[UV])

328

class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])

329

class AggregatingStateDescriptor[IN, ACC, OUT](

330

name: String,

331

aggregateFunction: AggregateFunction[IN, ACC, OUT],

332

accClass: Class[ACC]

333

)

334

```

335

336

**Usage Examples:**

337

338

```scala

339

import org.apache.flink.api.common.state._

340

import org.apache.flink.api.common.functions.ReduceFunction

341

342

class StatefulWordCounter extends RichKeyedProcessFunction[String, String, (String, Long)] {

343

344

private var wordCount: ValueState[Long] = _

345

private var wordHistory: ListState[String] = _

346

private var wordTimestamps: MapState[String, Long] = _

347

private var totalCount: ReducingState[Long] = _

348

349

override def open(parameters: Configuration): Unit = {

350

// Value state

351

wordCount = getRuntimeContext.getState(

352

new ValueStateDescriptor[Long]("wordCount", classOf[Long])

353

)

354

355

// List state

356

wordHistory = getRuntimeContext.getListState(

357

new ListStateDescriptor[String]("wordHistory", classOf[String])

358

)

359

360

// Map state

361

wordTimestamps = getRuntimeContext.getMapState(

362

new MapStateDescriptor[String, Long]("wordTimestamps", classOf[String], classOf[Long])

363

)

364

365

// Reducing state

366

totalCount = getRuntimeContext.getReducingState(

367

new ReducingStateDescriptor[Long](

368

"totalCount",

369

new ReduceFunction[Long] {

370

override def reduce(value1: Long, value2: Long): Long = value1 + value2

371

},

372

classOf[Long]

373

)

374

)

375

}

376

377

override def processElement(

378

word: String,

379

ctx: KeyedProcessFunction.Context,

380

out: Collector[(String, Long)]

381

): Unit = {

382

// Update value state

383

val currentCount = Option(wordCount.value()).getOrElse(0L) + 1

384

wordCount.update(currentCount)

385

386

// Update list state

387

wordHistory.add(word)

388

389

// Update map state

390

wordTimestamps.put(word, ctx.timestamp())

391

392

// Update reducing state

393

totalCount.add(1L)

394

395

out.collect((word, currentCount))

396

}

397

}

398

```

399

400

### Timer Service

401

402

Service for registering and managing timers in process functions.

403

404

```scala { .api }

405

/**

406

* TimerService for managing timers

407

*/

408

trait TimerService {

409

def currentProcessingTime(): Long

410

def currentWatermark(): Long

411

def registerProcessingTimeTimer(time: Long): Unit

412

def registerEventTimeTimer(time: Long): Unit

413

def deleteProcessingTimeTimer(time: Long): Unit

414

def deleteEventTimeTimer(time: Long): Unit

415

}

416

417

/**

418

* TimeDomain enumeration

419

*/

420

enum TimeDomain {

421

EVENT_TIME, PROCESSING_TIME

422

}

423

```

424

425

**Usage Examples:**

426

427

```scala

428

class TimerBasedProcessor extends KeyedProcessFunction[String, SensorReading, String] {

429

430

private var lastReading: ValueState[SensorReading] = _

431

432

override def open(parameters: Configuration): Unit = {

433

lastReading = getRuntimeContext.getState(

434

new ValueStateDescriptor[SensorReading]("lastReading", classOf[SensorReading])

435

)

436

}

437

438

override def processElement(

439

reading: SensorReading,

440

ctx: KeyedProcessFunction.Context,

441

out: Collector[String]

442

): Unit = {

443

// Store current reading

444

lastReading.update(reading)

445

446

// Register timer for 30 seconds later (event time)

447

val timerTime = reading.timestamp + 30000

448

ctx.timerService().registerEventTimeTimer(timerTime)

449

450

// Register processing time timer for 1 minute later

451

val processingTimer = ctx.timerService().currentProcessingTime() + 60000

452

ctx.timerService().registerProcessingTimeTimer(processingTimer)

453

454

out.collect(s"Processed reading: $reading")

455

}

456

457

override def onTimer(

458

timestamp: Long,

459

ctx: KeyedProcessFunction.OnTimerContext,

460

out: Collector[String]

461

): Unit = {

462

val key = ctx.getCurrentKey

463

val timeDomain = ctx.timeDomain()

464

val lastReadingValue = lastReading.value()

465

466

timeDomain match {

467

case TimeDomain.EVENT_TIME =>

468

out.collect(s"Event time timer fired for $key at $timestamp. Last reading: $lastReadingValue")

469

case TimeDomain.PROCESSING_TIME =>

470

out.collect(s"Processing time timer fired for $key at $timestamp. Last reading: $lastReadingValue")

471

}

472

473

// Clean up state if needed

474

if (lastReadingValue != null && timestamp - lastReadingValue.timestamp > 300000) {

475

lastReading.clear()

476

}

477

}

478

}

479

```

480

481

## Types

482

483

```scala { .api }

484

// Core function types

485

trait ProcessFunction[I, O]

486

trait KeyedProcessFunction[K, I, O]

487

trait WindowFunction[IN, OUT, KEY, W <: Window]

488

trait ProcessWindowFunction[IN, OUT, KEY, W <: Window]

489

trait AllWindowFunction[IN, OUT, W <: Window]

490

trait ProcessAllWindowFunction[IN, OUT, W <: Window]

491

492

// Rich function types

493

trait RichFunction

494

abstract class RichProcessFunction[I, O]

495

abstract class RichKeyedProcessFunction[K, I, O]

496

abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]

497

abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]

498

abstract class RichAllWindowFunction[IN, OUT, W <: Window]

499

abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]

500

501

// State types

502

trait ValueState[T]

503

trait ListState[T]

504

trait MapState[UK, UV]

505

trait ReducingState[T]

506

trait AggregatingState[IN, OUT]

507

508

// State descriptors

509

class ValueStateDescriptor[T]

510

class ListStateDescriptor[T]

511

class MapStateDescriptor[UK, UV]

512

class ReducingStateDescriptor[T]

513

class AggregatingStateDescriptor[IN, ACC, OUT]

514

515

// Context and service types

516

trait TimerService

517

enum TimeDomain

518

trait KeyedStateStore

519

class Configuration

520

trait RuntimeContext

521

trait IterationRuntimeContext

522

trait Counter

523

trait MetricGroup

524

525

// Window types

526

trait Window

527

class TimeWindow

528

class GlobalWindow

529

530

// Utility types

531

trait Collector[T]

532

class OutputTag[T]

533

```