or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

windowing.mddocs/

0

# Windowing Operations

1

2

WindowedStream enables bounded computations on infinite streams by grouping elements into finite windows based on time or count. This is essential for aggregations and batch-style processing on streaming data.

3

4

## Capabilities

5

6

### Window Configuration

7

8

Configure window behavior for late data handling and triggering.

9

10

```scala { .api }

11

class WindowedStream[T, K, W <: Window] {

12

/**

13

* Set allowed lateness for late arriving elements

14

* @param lateness Maximum allowed lateness

15

* @return WindowedStream with lateness configuration

16

*/

17

def allowedLateness(lateness: Time): WindowedStream[T, K, W]

18

19

/**

20

* Send late data to a side output

21

* @param outputTag Tag for late data side output

22

* @return WindowedStream with late data handling

23

*/

24

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]

25

26

/**

27

* Set a custom trigger for window firing

28

* @param trigger Custom trigger implementation

29

* @return WindowedStream with custom trigger

30

*/

31

def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]

32

33

/**

34

* Set an evictor for removing elements from windows

35

* @param evictor Custom evictor implementation

36

* @return WindowedStream with custom evictor

37

*/

38

def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]

39

}

40

```

41

42

### Reduction Operations

43

44

Apply reduction functions to combine elements within windows.

45

46

```scala { .api }

47

class WindowedStream[T, K, W <: Window] {

48

/**

49

* Reduce elements in each window using a ReduceFunction

50

* @param function Reduce function to combine elements

51

* @return DataStream with reduced window results

52

*/

53

def reduce(function: ReduceFunction[T]): DataStream[T]

54

55

/**

56

* Reduce elements in each window using a function

57

* @param function Function to combine two elements

58

* @return DataStream with reduced window results

59

*/

60

def reduce(function: (T, T) => T): DataStream[T]

61

62

/**

63

* Reduce with pre-aggregation and window function

64

* @param preAggregator ReduceFunction for pre-aggregation

65

* @param function WindowFunction for final processing

66

* @return DataStream with window function results

67

*/

68

def reduce[R: TypeInformation](

69

preAggregator: ReduceFunction[T],

70

function: WindowFunction[T, R, K, W]

71

): DataStream[R]

72

73

/**

74

* Reduce with pre-aggregation and process window function

75

* @param preAggregator ReduceFunction for pre-aggregation

76

* @param function ProcessWindowFunction for final processing

77

* @return DataStream with process window function results

78

*/

79

def reduce[R: TypeInformation](

80

preAggregator: ReduceFunction[T],

81

function: ProcessWindowFunction[T, R, K, W]

82

): DataStream[R]

83

}

84

```

85

86

**Usage Examples:**

87

88

```scala

89

import org.apache.flink.streaming.api.scala._

90

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

91

import org.apache.flink.streaming.api.windowing.time.Time

92

93

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

94

95

val readings = env.fromElements(

96

SensorReading("sensor1", 20.0, 1000L),

97

SensorReading("sensor1", 25.0, 2000L),

98

SensorReading("sensor1", 22.0, 3000L)

99

)

100

101

val keyedReadings = readings

102

.assignAscendingTimestamps(_.timestamp)

103

.keyBy(_.sensorId)

104

105

// Simple reduction - get maximum temperature per window

106

val maxTemps = keyedReadings

107

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

108

.reduce((r1, r2) => if (r1.temperature > r2.temperature) r1 else r2)

109

110

// Reduction with window function - add window info

111

val maxTempsWithWindow = keyedReadings

112

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

113

.reduce(

114

(r1: SensorReading, r2: SensorReading) => if (r1.temperature > r2.temperature) r1 else r2,

115

(key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Double, Long, Long)]) => {

116

val maxReading = readings.head

117

out.collect((key, maxReading.temperature, window.getStart, window.getEnd))

118

}

119

)

120

```

121

122

### Aggregation Operations

123

124

Apply aggregate functions for more complex computations within windows.

125

126

```scala { .api }

127

class WindowedStream[T, K, W <: Window] {

128

/**

129

* Apply an AggregateFunction to window elements

130

* @param aggregateFunction Function for incremental aggregation

131

* @return DataStream with aggregation results

132

*/

133

def aggregate[ACC: TypeInformation, R: TypeInformation](

134

aggregateFunction: AggregateFunction[T, ACC, R]

135

): DataStream[R]

136

137

/**

138

* Apply aggregation with window function

139

* @param aggregateFunction Function for incremental aggregation

140

* @param windowFunction Function for final window processing

141

* @return DataStream with window function results

142

*/

143

def aggregate[ACC: TypeInformation, R: TypeInformation](

144

aggregateFunction: AggregateFunction[T, ACC, R],

145

windowFunction: WindowFunction[R, R, K, W]

146

): DataStream[R]

147

148

/**

149

* Apply aggregation with process window function

150

* @param aggregateFunction Function for incremental aggregation

151

* @param windowFunction ProcessWindowFunction for final processing

152

* @return DataStream with process window function results

153

*/

154

def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](

155

aggregateFunction: AggregateFunction[T, ACC, V],

156

windowFunction: ProcessWindowFunction[V, R, K, W]

157

): DataStream[R]

158

}

159

```

160

161

**Usage Examples:**

162

163

```scala

164

import org.apache.flink.api.common.functions.AggregateFunction

165

166

// Define a custom aggregate function for average temperature

167

class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Int), Double] {

168

override def createAccumulator(): (Double, Int) = (0.0, 0)

169

170

override def add(value: SensorReading, accumulator: (Double, Int)): (Double, Int) =

171

(accumulator._1 + value.temperature, accumulator._2 + 1)

172

173

override def getResult(accumulator: (Double, Int)): Double =

174

accumulator._1 / accumulator._2

175

176

override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) =

177

(a._1 + b._1, a._2 + b._2)

178

}

179

180

// Apply aggregate function

181

val avgTemps = keyedReadings

182

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

183

.aggregate(new AverageAggregateFunction)

184

```

185

186

### Window Functions

187

188

Apply functions that operate on complete window contents.

189

190

```scala { .api }

191

class WindowedStream[T, K, W <: Window] {

192

/**

193

* Apply a WindowFunction to all elements in each window

194

* @param function WindowFunction implementation

195

* @return DataStream with window function results

196

*/

197

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

198

199

/**

200

* Apply a function using closure syntax

201

* @param function Function with (key, window, elements, collector) parameters

202

* @return DataStream with function results

203

*/

204

def apply[R: TypeInformation](

205

function: (K, W, Iterable[T], Collector[R]) => Unit

206

): DataStream[R]

207

208

/**

209

* Apply a ProcessWindowFunction for advanced processing

210

* @param function ProcessWindowFunction implementation

211

* @return DataStream with process function results

212

*/

213

def process[R: TypeInformation](

214

function: ProcessWindowFunction[T, R, K, W]

215

): DataStream[R]

216

}

217

```

218

219

**Usage Examples:**

220

221

```scala

222

import org.apache.flink.streaming.api.scala.function.WindowFunction

223

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

224

import org.apache.flink.util.Collector

225

226

// Window function to collect all readings with window metadata

227

val allReadingsWithWindow = keyedReadings

228

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

229

.apply(new WindowFunction[SensorReading, (String, List[Double], Long, Long), String, TimeWindow] {

230

override def apply(

231

key: String,

232

window: TimeWindow,

233

input: Iterable[SensorReading],

234

out: Collector[(String, List[Double], Long, Long)]

235

): Unit = {

236

val temperatures = input.map(_.temperature).toList

237

out.collect((key, temperatures, window.getStart, window.getEnd))

238

}

239

})

240

241

// Using closure syntax

242

val readingCounts = keyedReadings

243

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

244

.apply { (key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Int)]) =>

245

out.collect((key, readings.size))

246

}

247

```

248

249

## Window Assigners

250

251

Window assigners determine how elements are grouped into windows.

252

253

```scala { .api }

254

// Tumbling time windows

255

object TumblingEventTimeWindows {

256

def of(size: Time): TumblingEventTimeWindows

257

def of(size: Time, offset: Time): TumblingEventTimeWindows

258

}

259

260

object TumblingProcessingTimeWindows {

261

def of(size: Time): TumblingProcessingTimeWindows

262

def of(size: Time, offset: Time): TumblingProcessingTimeWindows

263

}

264

265

// Sliding time windows

266

object SlidingEventTimeWindows {

267

def of(size: Time, slide: Time): SlidingEventTimeWindows

268

def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows

269

}

270

271

object SlidingProcessingTimeWindows {

272

def of(size: Time, slide: Time): SlidingProcessingTimeWindows

273

def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows

274

}

275

276

// Session windows

277

object EventTimeSessionWindows {

278

def withGap(sessionTimeout: Time): EventTimeSessionWindows

279

def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): EventTimeSessionWindows

280

}

281

282

object ProcessingTimeSessionWindows {

283

def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows

284

def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): ProcessingTimeSessionWindows

285

}

286

287

// Global windows (for count-based operations)

288

object GlobalWindows {

289

def create(): GlobalWindows

290

}

291

```

292

293

**Usage Examples:**

294

295

```scala

296

import org.apache.flink.streaming.api.windowing.assigners._

297

import org.apache.flink.streaming.api.windowing.time.Time

298

299

// Tumbling windows - non-overlapping fixed-size windows

300

val tumblingWindow = keyedReadings

301

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

302

303

// Sliding windows - overlapping windows

304

val slidingWindow = keyedReadings

305

.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))

306

307

// Session windows - dynamic windows based on inactivity gaps

308

val sessionWindow = keyedReadings

309

.window(EventTimeSessionWindows.withGap(Time.minutes(30)))

310

```

311

312

## Types

313

314

```scala { .api }

315

// Window types

316

abstract class Window {

317

def maxTimestamp(): Long

318

}

319

320

class TimeWindow(start: Long, end: Long) extends Window {

321

def getStart: Long

322

def getEnd: Long

323

def maxTimestamp(): Long = end - 1

324

}

325

326

class GlobalWindow extends Window {

327

def maxTimestamp(): Long = Long.MaxValue

328

}

329

330

// Aggregate function interface

331

trait AggregateFunction[IN, ACC, OUT] {

332

def createAccumulator(): ACC

333

def add(value: IN, accumulator: ACC): ACC

334

def getResult(accumulator: ACC): OUT

335

def merge(a: ACC, b: ACC): ACC

336

}

337

338

// Window function interfaces

339

trait WindowFunction[IN, OUT, KEY, W <: Window] {

340

def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit

341

}

342

343

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {

344

def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit

345

def clear(context: Context): Unit = {}

346

347

abstract class Context {

348

def window: W

349

def currentProcessingTime: Long

350

def currentWatermark: Long

351

def windowState: KeyedStateStore

352

def globalState: KeyedStateStore

353

def output[X](outputTag: OutputTag[X], value: X): Unit

354

}

355

}

356

357

// Trigger interface for custom window firing

358

abstract class Trigger[T, W <: Window] {

359

def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult

360

def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult

361

def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult

362

def clear(window: W, ctx: TriggerContext): Unit

363

}

364

365

// Evictor interface for removing elements

366

trait Evictor[T, W <: Window] {

367

def evictBefore(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit

368

def evictAfter(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit

369

}

370

371

// Trigger results

372

sealed trait TriggerResult

373

object TriggerResult {

374

case object CONTINUE extends TriggerResult

375

case object FIRE extends TriggerResult

376

case object PURGE extends TriggerResult

377

case object FIRE_AND_PURGE extends TriggerResult

378

}

379

```