or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

window-functions.mddocs/

0

# Window Functions

1

2

Specialized functions for processing windowed data with access to window metadata, state, and complete window contents. Essential for complex windowed computations and analytics.

3

4

## Capabilities

5

6

### WindowFunction

7

8

Basic window function interface for processing all elements in a window.

9

10

```scala { .api }

11

trait WindowFunction[IN, OUT, KEY, W <: Window] {

12

/**

13

* Process all elements in a window

14

* @param key The key of the window

15

* @param window The window metadata

16

* @param input All elements in the window

17

* @param out Collector for emitting results

18

*/

19

def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit

20

}

21

```

22

23

### ProcessWindowFunction

24

25

Advanced window function with access to context and state.

26

27

```scala { .api }

28

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {

29

/**

30

* Process all elements in a window with access to context

31

* @param key The key of the window

32

* @param context Context providing window info and state access

33

* @param elements All elements in the window

34

* @param out Collector for emitting results

35

*/

36

def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

37

38

/**

39

* Clear any per-window state (optional override)

40

* @param context Context for accessing state

41

*/

42

def clear(context: Context): Unit = {}

43

44

abstract class Context {

45

def window: W

46

def currentProcessingTime: Long

47

def currentWatermark: Long

48

def windowState: KeyedStateStore

49

def globalState: KeyedStateStore

50

def output[X](outputTag: OutputTag[X], value: X): Unit

51

}

52

}

53

```

54

55

**Usage Examples:**

56

57

```scala

58

import org.apache.flink.streaming.api.scala._

59

import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}

60

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

61

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

62

import org.apache.flink.streaming.api.windowing.time.Time

63

import org.apache.flink.util.Collector

64

65

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

66

case class WindowStats(sensorId: String, windowStart: Long, windowEnd: Long,

67

count: Int, avgTemp: Double, minTemp: Double, maxTemp: Double)

68

69

// WindowFunction example - calculate window statistics

70

class SensorStatsWindowFunction extends WindowFunction[SensorReading, WindowStats, String, TimeWindow] {

71

override def apply(

72

key: String,

73

window: TimeWindow,

74

input: Iterable[SensorReading],

75

out: Collector[WindowStats]

76

): Unit = {

77

val readings = input.toList

78

val count = readings.size

79

val temperatures = readings.map(_.temperature)

80

val avgTemp = temperatures.sum / count

81

val minTemp = temperatures.min

82

val maxTemp = temperatures.max

83

84

out.collect(WindowStats(

85

key, window.getStart, window.getEnd,

86

count, avgTemp, minTemp, maxTemp

87

))

88

}

89

}

90

91

// ProcessWindowFunction example - with state and side outputs

92

class AdvancedSensorStatsFunction extends ProcessWindowFunction[SensorReading, WindowStats, String, TimeWindow] {

93

94

override def process(

95

key: String,

96

context: Context,

97

elements: Iterable[SensorReading],

98

out: Collector[WindowStats]

99

): Unit = {

100

val readings = elements.toList

101

val count = readings.size

102

val temperatures = readings.map(_.temperature)

103

val avgTemp = temperatures.sum / count

104

val minTemp = temperatures.min

105

val maxTemp = temperatures.max

106

107

// Emit main result

108

out.collect(WindowStats(

109

key, context.window.getStart, context.window.getEnd,

110

count, avgTemp, minTemp, maxTemp

111

))

112

113

// Emit to side output if temperature is too high

114

if (maxTemp > 80.0) {

115

context.output(

116

OutputTag[String]("high-temp-alerts"),

117

s"Sensor $key exceeded 80°C in window ${context.window.getStart}-${context.window.getEnd}"

118

)

119

}

120

121

// Update global state (count of processed windows)

122

val globalCounter = context.globalState.getState(

123

new ValueStateDescriptor[Long]("window-count", classOf[Long])

124

)

125

val currentCount = Option(globalCounter.value()).getOrElse(0L)

126

globalCounter.update(currentCount + 1)

127

}

128

}

129

130

// Apply window functions

131

val readings = env.fromElements(

132

SensorReading("sensor1", 20.0, 1000L),

133

SensorReading("sensor1", 25.0, 2000L),

134

SensorReading("sensor1", 30.0, 3000L)

135

).assignAscendingTimestamps(_.timestamp)

136

137

val keyedReadings = readings.keyBy(_.sensorId)

138

139

// Using WindowFunction

140

val windowStats = keyedReadings

141

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

142

.apply(new SensorStatsWindowFunction)

143

144

// Using ProcessWindowFunction

145

val advancedStats = keyedReadings

146

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

147

.process(new AdvancedSensorStatsFunction)

148

149

// Get side output

150

val highTempAlerts = advancedStats.getSideOutput(OutputTag[String]("high-temp-alerts"))

151

```

152

153

### AllWindowFunction

154

155

Window function for non-keyed streams (all-window operations).

156

157

```scala { .api }

158

trait AllWindowFunction[IN, OUT, W <: Window] {

159

/**

160

* Process all elements in a window (non-keyed)

161

* @param window The window metadata

162

* @param input All elements in the window

163

* @param out Collector for emitting results

164

*/

165

def apply(window: W, input: Iterable[IN], out: Collector[OUT]): Unit

166

}

167

```

168

169

### ProcessAllWindowFunction

170

171

Advanced all-window function with context access.

172

173

```scala { .api }

174

abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] {

175

/**

176

* Process all elements in a window with context (non-keyed)

177

* @param context Context providing window info and state access

178

* @param elements All elements in the window

179

* @param out Collector for emitting results

180

*/

181

def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

182

183

/**

184

* Clear any per-window state (optional override)

185

* @param context Context for accessing state

186

*/

187

def clear(context: Context): Unit = {}

188

189

abstract class Context {

190

def window: W

191

def currentProcessingTime: Long

192

def currentWatermark: Long

193

def windowState: KeyedStateStore

194

def globalState: KeyedStateStore

195

def output[X](outputTag: OutputTag[X], value: X): Unit

196

}

197

}

198

```

199

200

**Usage Examples:**

201

202

```scala

203

import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}

204

205

case class GlobalStats(windowStart: Long, windowEnd: Long, totalCount: Int, avgValue: Double)

206

207

// AllWindowFunction example - global statistics

208

class GlobalStatsFunction extends AllWindowFunction[SensorReading, GlobalStats, TimeWindow] {

209

override def apply(

210

window: TimeWindow,

211

input: Iterable[SensorReading],

212

out: Collector[GlobalStats]

213

): Unit = {

214

val readings = input.toList

215

val count = readings.size

216

val avgTemp = readings.map(_.temperature).sum / count

217

218

out.collect(GlobalStats(window.getStart, window.getEnd, count, avgTemp))

219

}

220

}

221

222

// ProcessAllWindowFunction example - with side outputs

223

class AdvancedGlobalStatsFunction extends ProcessAllWindowFunction[SensorReading, GlobalStats, TimeWindow] {

224

override def process(

225

context: Context,

226

elements: Iterable[SensorReading],

227

out: Collector[GlobalStats]

228

): Unit = {

229

val readings = elements.toList

230

val count = readings.size

231

val avgTemp = readings.map(_.temperature).sum / count

232

233

out.collect(GlobalStats(context.window.getStart, context.window.getEnd, count, avgTemp))

234

235

// Side output for anomalies

236

if (avgTemp > 50.0) {

237

context.output(

238

OutputTag[String]("global-anomalies"),

239

s"Global average temperature ${avgTemp}°C is above threshold"

240

)

241

}

242

}

243

}

244

245

// Apply all-window functions

246

val globalStats = readings

247

.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))

248

.apply(new GlobalStatsFunction)

249

250

val advancedGlobalStats = readings

251

.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))

252

.process(new AdvancedGlobalStatsFunction)

253

```

254

255

### Rich Window Functions

256

257

Rich versions of window functions with lifecycle methods and runtime context access.

258

259

```scala { .api }

260

abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]

261

extends WindowFunction[IN, OUT, KEY, W] with RichFunction {

262

263

override def open(parameters: Configuration): Unit = {}

264

override def close(): Unit = {}

265

def getRuntimeContext: RuntimeContext

266

def setRuntimeContext(t: RuntimeContext): Unit

267

}

268

269

abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]

270

extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction {

271

272

override def open(parameters: Configuration): Unit = {}

273

override def close(): Unit = {}

274

def getRuntimeContext: RuntimeContext

275

def setRuntimeContext(t: RuntimeContext): Unit

276

}

277

278

abstract class RichAllWindowFunction[IN, OUT, W <: Window]

279

extends AllWindowFunction[IN, OUT, W] with RichFunction {

280

281

override def open(parameters: Configuration): Unit = {}

282

override def close(): Unit = {}

283

def getRuntimeContext: RuntimeContext

284

def setRuntimeContext(t: RuntimeContext): Unit

285

}

286

287

abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]

288

extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction {

289

290

override def open(parameters: Configuration): Unit = {}

291

override def close(): Unit = {}

292

def getRuntimeContext: RuntimeContext

293

def setRuntimeContext(t: RuntimeContext): Unit

294

}

295

```

296

297

**Usage Examples:**

298

299

```scala

300

import org.apache.flink.configuration.Configuration

301

302

class MetricsWindowFunction extends RichWindowFunction[SensorReading, WindowStats, String, TimeWindow] {

303

private var processedWindows: Counter = _

304

305

override def open(parameters: Configuration): Unit = {

306

processedWindows = getRuntimeContext

307

.getMetricGroup

308

.counter("processed-windows")

309

}

310

311

override def apply(

312

key: String,

313

window: TimeWindow,

314

input: Iterable[SensorReading],

315

out: Collector[WindowStats]

316

): Unit = {

317

// Increment metric

318

processedWindows.inc()

319

320

// Process window as usual

321

val readings = input.toList

322

val count = readings.size

323

val avgTemp = readings.map(_.temperature).sum / count

324

325

out.collect(WindowStats(key, window.getStart, window.getEnd, count, avgTemp, 0.0, 0.0))

326

}

327

}

328

```

329

330

## Types

331

332

```scala { .api }

333

// Window types

334

abstract class Window {

335

def maxTimestamp(): Long

336

}

337

338

class TimeWindow(start: Long, end: Long) extends Window {

339

def getStart: Long = start

340

def getEnd: Long = end

341

def maxTimestamp(): Long = end - 1

342

}

343

344

class GlobalWindow extends Window {

345

def maxTimestamp(): Long = Long.MaxValue

346

}

347

348

// State store for window functions

349

trait KeyedStateStore {

350

def getState[T](stateDescriptor: StateDescriptor[T, _]): State

351

def getListState[T](stateDescriptor: ListStateDescriptor[T]): ListState[T]

352

def getReducingState[T](stateDescriptor: ReducingStateDescriptor[T]): ReducingState[T]

353

def getAggregatingState[IN, ACC, OUT](stateDescriptor: AggregatingStateDescriptor[IN, ACC, OUT]): AggregatingState[IN, OUT]

354

def getMapState[UK, UV](stateDescriptor: MapStateDescriptor[UK, UV]): MapState[UK, UV]

355

}

356

357

// State descriptors

358

class ValueStateDescriptor[T](name: String, typeClass: Class[T])

359

class ListStateDescriptor[T](name: String, typeClass: Class[T])

360

class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])

361

class MapStateDescriptor[K, V](name: String, keyClass: Class[K], valueClass: Class[V])

362

363

// Collector interface

364

trait Collector[T] {

365

def collect(record: T): Unit

366

def close(): Unit

367

}

368

369

// Output tag for side outputs

370

case class OutputTag[T: TypeInformation](id: String) {

371

def getTypeInfo: TypeInformation[T]

372

}

373

374

// Rich function base

375

trait RichFunction {

376

def open(parameters: Configuration): Unit

377

def close(): Unit

378

def getRuntimeContext: RuntimeContext

379

def setRuntimeContext(t: RuntimeContext): Unit

380

}

381

382

// Runtime context

383

trait RuntimeContext {

384

def getTaskName: String

385

def getMetricGroup: MetricGroup

386

def getNumberOfParallelSubtasks: Int

387

def getIndexOfThisSubtask: Int

388

}

389

390

// Metrics

391

trait Counter {

392

def inc(): Unit

393

def inc(n: Long): Unit

394

def getCount: Long

395

}

396

397

trait MetricGroup {

398

def counter(name: String): Counter

399

def gauge[T](name: String, gauge: Gauge[T]): Gauge[T]

400

def histogram(name: String, histogram: Histogram): Histogram

401

}

402

```