or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md

windowing.mddocs/

0

# Windowing and Time-Based Processing

1

2

Windowing in Flink enables processing of unbounded streams by grouping elements into finite sets called windows. Flink supports both keyed and non-keyed windowing with various window types and time semantics.

3

4

## Keyed Windowing

5

6

### Time-Based Windows

7

8

```scala { .api }

9

class KeyedStream[T, K] {

10

def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]

11

def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]

12

}

13

```

14

15

Create time-based windows for keyed streams:

16

17

```scala

18

import org.apache.flink.streaming.api.scala._

19

import org.apache.flink.streaming.api.windowing.time.Time

20

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

21

22

val env = StreamExecutionEnvironment.getExecutionEnvironment

23

24

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

25

26

val sensorData = env.fromElements(

27

SensorReading("sensor1", 20.0, 1000),

28

SensorReading("sensor1", 22.0, 2000),

29

SensorReading("sensor2", 18.0, 1500),

30

SensorReading("sensor2", 25.0, 2500)

31

).keyBy(_.sensorId)

32

33

// Tumbling time windows (non-overlapping)

34

val tumblingWindows = sensorData

35

.timeWindow(Time.minutes(5)) // 5-minute tumbling windows

36

37

// Sliding time windows (overlapping)

38

val slidingWindows = sensorData

39

.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, sliding every 2 minutes

40

41

// Process windows

42

tumblingWindows

43

.apply((key, window, readings, out) => {

44

val avgTemp = readings.map(_.temperature).sum / readings.size

45

out.collect((key, avgTemp, window.getStart, window.getEnd))

46

})

47

.print()

48

```

49

50

### Count-Based Windows

51

52

```scala { .api }

53

class KeyedStream[T, K] {

54

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]

55

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]

56

}

57

```

58

59

Create count-based windows:

60

61

```scala

62

import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

63

64

val env = StreamExecutionEnvironment.getExecutionEnvironment

65

val events = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

66

.keyBy(identity)

67

68

// Tumbling count windows

69

val tumblingCountWindows = events

70

.countWindow(3) // Every 3 elements

71

72

// Sliding count windows

73

val slidingCountWindows = events

74

.countWindow(5, 2) // 5 elements per window, slide by 2

75

76

tumblingCountWindows

77

.sum(0)

78

.print("Tumbling Count")

79

80

slidingCountWindows

81

.sum(0)

82

.print("Sliding Count")

83

```

84

85

### Custom Window Assigners

86

87

```scala { .api }

88

class KeyedStream[T, K] {

89

def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]

90

}

91

```

92

93

Use custom window assigners:

94

95

```scala

96

import org.apache.flink.streaming.api.windowing.assigners._

97

import org.apache.flink.streaming.api.windowing.time.Time

98

99

val env = StreamExecutionEnvironment.getExecutionEnvironment

100

val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

101

102

// Session windows (windows that close after period of inactivity)

103

val sessionWindows = keyedStream

104

.window(EventTimeSessionWindows.withGap(Time.minutes(10)))

105

106

// Global windows (all elements in one window until manually triggered)

107

val globalWindows = keyedStream

108

.window(GlobalWindows.create())

109

110

// Processing time sliding windows

111

val processingTimeWindows = keyedStream

112

.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))

113

114

// Event time tumbling windows

115

val eventTimeWindows = keyedStream

116

.window(TumblingEventTimeWindows.of(Time.hours(1)))

117

```

118

119

## Non-Keyed Windowing (All Windows)

120

121

### Time-Based All Windows

122

123

```scala { .api }

124

class DataStream[T] {

125

def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]

126

def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]

127

}

128

```

129

130

Apply windows to entire stream (all elements):

131

132

```scala

133

val env = StreamExecutionEnvironment.getExecutionEnvironment

134

val allData = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

135

136

// All elements in 5-minute tumbling windows

137

val allTumbling = allData

138

.timeWindowAll(Time.minutes(5))

139

140

// All elements in 10-minute sliding windows

141

val allSliding = allData

142

.timeWindowAll(Time.minutes(10), Time.minutes(2))

143

144

allTumbling

145

.sum(0)

146

.print("All Tumbling")

147

```

148

149

### Count-Based All Windows

150

151

```scala { .api }

152

class DataStream[T] {

153

def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]

154

def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]

155

}

156

```

157

158

Count windows over all elements:

159

160

```scala

161

val allCountWindows = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

162

.countWindowAll(4) // Every 4 elements

163

164

allCountWindows

165

.sum(0)

166

.print("All Count Windows")

167

```

168

169

### Custom All Window Assigners

170

171

```scala { .api }

172

class DataStream[T] {

173

def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]

174

}

175

```

176

177

## Window Configuration

178

179

### Triggers

180

181

```scala { .api }

182

class WindowedStream[T, K, W <: Window] {

183

def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]

184

}

185

186

class AllWindowedStream[T, W <: Window] {

187

def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W]

188

}

189

```

190

191

Control when windows fire:

192

193

```scala

194

import org.apache.flink.streaming.api.windowing.triggers._

195

196

val env = StreamExecutionEnvironment.getExecutionEnvironment

197

val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

198

199

// Custom trigger: fire on every element or when window is full

200

val customTrigger = keyedStream

201

.timeWindow(Time.minutes(5))

202

.trigger(CountTrigger.of(10)) // Fire when 10 elements or window end

203

204

// Processing time trigger

205

val processingTrigger = keyedStream

206

.timeWindow(Time.minutes(5))

207

.trigger(ProcessingTimeTrigger.create())

208

209

// Purging trigger (removes elements after firing)

210

val purgingTrigger = keyedStream

211

.timeWindow(Time.minutes(5))

212

.trigger(PurgingTrigger.of(CountTrigger.of(5)))

213

```

214

215

### Evictors

216

217

```scala { .api }

218

class WindowedStream[T, K, W <: Window] {

219

def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]

220

}

221

```

222

223

Remove elements from windows before or after function application:

224

225

```scala

226

import org.apache.flink.streaming.api.windowing.evictors._

227

228

val env = StreamExecutionEnvironment.getExecutionEnvironment

229

val keyedStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).keyBy(identity)

230

231

// Count evictor: keep only the latest N elements

232

val countEvictor = keyedStream

233

.timeWindow(Time.minutes(5))

234

.evictor(CountEvictor.of(5)) // Keep only latest 5 elements

235

236

// Time evictor: keep only elements from last N time units

237

val timeEvictor = keyedStream

238

.timeWindow(Time.minutes(5))

239

.evictor(TimeEvictor.of(Time.minutes(2))) // Keep only last 2 minutes

240

241

// Delta evictor: keep elements within threshold

242

val deltaEvictor = keyedStream

243

.timeWindow(Time.minutes(5))

244

.evictor(DeltaEvictor.of(5.0, (a: Int, b: Int) => Math.abs(a - b).toDouble))

245

```

246

247

### Allowed Lateness

248

249

```scala { .api }

250

class WindowedStream[T, K, W <: Window] {

251

def allowedLateness(lateness: Time): WindowedStream[T, K, W]

252

}

253

```

254

255

Handle late-arriving data:

256

257

```scala

258

val env = StreamExecutionEnvironment.getExecutionEnvironment

259

val lateDataStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

260

261

// Allow 1 minute of lateness

262

val windowWithLateness = lateDataStream

263

.timeWindow(Time.minutes(5))

264

.allowedLateness(Time.minutes(1))

265

.sum(0)

266

```

267

268

### Side Output for Late Data

269

270

```scala { .api }

271

class WindowedStream[T, K, W <: Window] {

272

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]

273

}

274

```

275

276

Route late data to side output:

277

278

```scala

279

import org.apache.flink.streaming.api.scala.OutputTag

280

281

val env = StreamExecutionEnvironment.getExecutionEnvironment

282

val lateDataTag = OutputTag[Int]("late-data")

283

284

val mainResult = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)

285

.timeWindow(Time.minutes(5))

286

.allowedLateness(Time.minutes(1))

287

.sideOutputLateData(lateDataTag)

288

.sum(0)

289

290

// Process late data separately

291

val lateData = mainResult.getSideOutput(lateDataTag)

292

lateData.print("Late Data")

293

```

294

295

## Time Characteristics and Watermarks

296

297

### Time Characteristics

298

299

```scala

300

import org.apache.flink.streaming.api.TimeCharacteristic

301

302

val env = StreamExecutionEnvironment.getExecutionEnvironment

303

304

// Event time processing (use timestamps in data)

305

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

306

307

// Processing time (use system time when elements arrive)

308

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

309

310

// Ingestion time (use time when elements enter Flink)

311

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

312

```

313

314

### Watermark Assignment

315

316

```scala

317

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

318

import org.apache.flink.streaming.api.windowing.time.Time

319

320

case class TimestampedEvent(data: String, eventTime: Long)

321

322

val env = StreamExecutionEnvironment.getExecutionEnvironment

323

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

324

325

val timestampedStream = env.fromElements(

326

TimestampedEvent("event1", 1000),

327

TimestampedEvent("event2", 2000),

328

TimestampedEvent("event3", 1500)

329

)

330

331

// Assign watermarks with bounded out-of-orderness

332

val watermarkedStream = timestampedStream

333

.assignTimestampsAndWatermarks(

334

new BoundedOutOfOrdernessTimestampExtractor[TimestampedEvent](Time.seconds(5)) {

335

override def extractTimestamp(element: TimestampedEvent): Long = element.eventTime

336

}

337

)

338

339

// Ascending timestamps (no out-of-order events)

340

val ascendingStream = timestampedStream

341

.assignAscendingTimestamps(_.eventTime)

342

```

343

344

## Window Functions

345

346

### Built-in Aggregations

347

348

```scala { .api }

349

class WindowedStream[T, K, W <: Window] {

350

def sum(position: Int): DataStream[T]

351

def sum(field: String): DataStream[T]

352

def min(position: Int): DataStream[T]

353

def min(field: String): DataStream[T]

354

def max(position: Int): DataStream[T]

355

def max(field: String): DataStream[T]

356

def minBy(position: Int): DataStream[T]

357

def minBy(field: String): DataStream[T]

358

def maxBy(position: Int): DataStream[T]

359

def maxBy(field: String): DataStream[T]

360

}

361

```

362

363

### Custom Window Functions

364

365

```scala { .api }

366

class WindowedStream[T, K, W <: Window] {

367

def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]

368

def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]

369

}

370

```

371

372

Apply custom functions to windows:

373

374

```scala

375

import org.apache.flink.streaming.api.scala.function.WindowFunction

376

import org.apache.flink.util.Collector

377

378

case class WindowResult(key: String, count: Int, avg: Double, window: String)

379

380

class StatisticsWindowFunction extends WindowFunction[SensorReading, WindowResult, String, TimeWindow] {

381

override def apply(

382

key: String,

383

window: TimeWindow,

384

readings: Iterable[SensorReading],

385

out: Collector[WindowResult]

386

): Unit = {

387

val temperatures = readings.map(_.temperature).toList

388

val count = temperatures.size

389

val avg = temperatures.sum / count

390

val windowInfo = s"${window.getStart}-${window.getEnd}"

391

392

out.collect(WindowResult(key, count, avg, windowInfo))

393

}

394

}

395

396

val env = StreamExecutionEnvironment.getExecutionEnvironment

397

val sensorData = env.fromElements(

398

SensorReading("sensor1", 20.0, 1000),

399

SensorReading("sensor1", 25.0, 2000)

400

).keyBy(_.sensorId)

401

402

// Apply custom window function

403

val windowResults = sensorData

404

.timeWindow(Time.minutes(5))

405

.apply(new StatisticsWindowFunction)

406

407

// Lambda-based window function

408

val lambdaResults = sensorData

409

.timeWindow(Time.minutes(5))

410

.apply { (key, window, readings, out) =>

411

val maxTemp = readings.map(_.temperature).max

412

out.collect((key, maxTemp, window.getStart))

413

}

414

```

415

416

## Advanced Windowing Patterns

417

418

### Session Windows

419

420

```scala

421

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows

422

423

case class UserActivity(userId: String, activity: String, timestamp: Long)

424

425

val env = StreamExecutionEnvironment.getExecutionEnvironment

426

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

427

428

val userActivities = env.fromElements(

429

UserActivity("user1", "login", 1000),

430

UserActivity("user1", "click", 2000),

431

UserActivity("user1", "scroll", 3000)

432

)

433

434

// Session windows with 10-minute inactivity gap

435

val sessionWindows = userActivities

436

.assignAscendingTimestamps(_.timestamp)

437

.keyBy(_.userId)

438

.window(EventTimeSessionWindows.withGap(Time.minutes(10)))

439

.apply { (userId, window, activities, out) =>

440

val sessionSummary = (

441

userId,

442

activities.size,

443

activities.map(_.activity).mkString(","),

444

window.getEnd - window.getStart

445

)

446

out.collect(sessionSummary)

447

}

448

```

449

450

### Custom Window Assigner

451

452

```scala

453

import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner

454

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

455

import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

456

457

// Custom window assigner for business hours (9 AM to 5 PM)

458

class BusinessHoursWindowAssigner extends WindowAssigner[Object, TimeWindow] {

459

override def assignWindows(

460

element: Object,

461

timestamp: Long,

462

context: WindowAssigner.WindowAssignerContext

463

): java.util.Collection[TimeWindow] = {

464

val hour = (timestamp / 3600000) % 24 // Hour of day

465

466

if (hour >= 9 && hour < 17) { // Business hours

467

val startOfDay = timestamp - (timestamp % 86400000) // Start of day

468

val startOfBusinessHours = startOfDay + 9 * 3600000 // 9 AM

469

val endOfBusinessHours = startOfDay + 17 * 3600000 // 5 PM

470

471

java.util.Collections.singletonList(new TimeWindow(startOfBusinessHours, endOfBusinessHours))

472

} else {

473

java.util.Collections.emptyList()

474

}

475

}

476

477

override def getDefaultTrigger(env: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment): org.apache.flink.streaming.api.windowing.triggers.Trigger[Object, TimeWindow] = {

478

EventTimeTrigger.create()

479

}

480

481

override def getWindowSerializer(executionConfig: org.apache.flink.api.common.ExecutionConfig): org.apache.flink.api.common.typeutils.TypeSerializer[TimeWindow] = {

482

new TimeWindow.Serializer()

483

}

484

485

override def isEventTime: Boolean = true

486

}

487

```

488

489

## Complete Example: Real-Time Analytics Dashboard

490

491

```scala

492

import org.apache.flink.streaming.api.scala._

493

import org.apache.flink.streaming.api.windowing.time.Time

494

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

495

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

496

import org.apache.flink.streaming.api.scala.function.WindowFunction

497

import org.apache.flink.util.Collector

498

499

case class WebEvent(

500

userId: String,

501

pageUrl: String,

502

action: String,

503

timestamp: Long,

504

sessionId: String

505

)

506

507

case class PageViewStats(

508

pageUrl: String,

509

windowStart: Long,

510

windowEnd: Long,

511

uniqueUsers: Int,

512

totalViews: Int,

513

avgSessionDuration: Double

514

)

515

516

object RealTimeAnalytics {

517

518

class PageViewAnalytics extends WindowFunction[WebEvent, PageViewStats, String, TimeWindow] {

519

override def apply(

520

pageUrl: String,

521

window: TimeWindow,

522

events: Iterable[WebEvent],

523

out: Collector[PageViewStats]

524

): Unit = {

525

val eventList = events.toList

526

val uniqueUsers = eventList.map(_.userId).distinct.size

527

val totalViews = eventList.size

528

529

// Calculate average session duration (simplified)

530

val sessionDurations = eventList

531

.groupBy(_.sessionId)

532

.mapValues(events => {

533

val timestamps = events.map(_.timestamp)

534

timestamps.max - timestamps.min

535

})

536

537

val avgSessionDuration = if (sessionDurations.nonEmpty) {

538

sessionDurations.values.sum.toDouble / sessionDurations.size

539

} else 0.0

540

541

out.collect(PageViewStats(

542

pageUrl,

543

window.getStart,

544

window.getEnd,

545

uniqueUsers,

546

totalViews,

547

avgSessionDuration

548

))

549

}

550

}

551

552

def main(args: Array[String]): Unit = {

553

val env = StreamExecutionEnvironment.getExecutionEnvironment

554

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

555

env.setParallelism(1)

556

557

// Sample web events

558

val webEvents = env.fromElements(

559

WebEvent("user1", "/home", "view", 1000, "session1"),

560

WebEvent("user2", "/home", "view", 1500, "session2"),

561

WebEvent("user1", "/products", "view", 2000, "session1"),

562

WebEvent("user3", "/home", "view", 2500, "session3"),

563

WebEvent("user2", "/checkout", "view", 3000, "session2")

564

)

565

566

// Assign watermarks

567

val watermarkedEvents = webEvents

568

.assignTimestampsAndWatermarks(

569

new BoundedOutOfOrdernessTimestampExtractor[WebEvent](Time.seconds(10)) {

570

override def extractTimestamp(event: WebEvent): Long = event.timestamp

571

}

572

)

573

574

// Real-time page view analytics

575

val pageViewAnalytics = watermarkedEvents

576

.keyBy(_.pageUrl)

577

.timeWindow(Time.minutes(5)) // 5-minute windows

578

.apply(new PageViewAnalytics)

579

580

// User activity patterns (sliding windows)

581

val userActivityPatterns = watermarkedEvents

582

.keyBy(_.userId)

583

.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, slide every 2 minutes

584

.apply { (userId, window, events, out) =>

585

val actionsPerMinute = events.size.toDouble / 10.0 // Actions per minute

586

val uniquePages = events.map(_.pageUrl).toSet.size

587

out.collect((userId, window.getStart, actionsPerMinute, uniquePages))

588

}

589

590

// Global statistics (all events)

591

val globalStats = watermarkedEvents

592

.timeWindowAll(Time.minutes(1)) // 1-minute global windows

593

.apply { (window, events, out) =>

594

val totalEvents = events.size

595

val uniqueUsers = events.map(_.userId).toSet.size

596

val uniquePages = events.map(_.pageUrl).toSet.size

597

out.collect((window.getStart, totalEvents, uniqueUsers, uniquePages))

598

}

599

600

// Print results

601

pageViewAnalytics.print("Page View Analytics")

602

userActivityPatterns.print("User Activity")

603

globalStats.print("Global Stats")

604

605

env.execute("Real-Time Web Analytics")

606

}

607

}

608

```