or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

window-operations.mddocs/

0

# Window Operations

1

2

The Flink Table API provides comprehensive windowing support for time-based and count-based aggregations. Windows enable grouping of streaming data by time intervals or row counts for meaningful aggregations.

3

4

## Capabilities

5

6

### Time Windows

7

8

Time-based windows that group events by temporal boundaries.

9

10

```scala { .api }

11

/**

12

* Tumbling window with fixed size and no overlap

13

* @param size Window size expression (time interval)

14

*/

15

case class TumbleWithSize(size: Expression) {

16

/**

17

* Specifies the time attribute for the window

18

* @param timeField Time field expression (rowtime or proctime)

19

* @returns Window specification with time attribute

20

*/

21

def on(timeField: Expression): TumbleWithSizeOnTime

22

}

23

24

case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) {

25

/**

26

* Assigns an alias to the window

27

* @param alias Window alias for referencing window properties

28

* @returns Complete tumbling window specification

29

*/

30

def as(alias: Expression): TumbleWithSizeOnTimeWithAlias

31

}

32

33

/**

34

* Sliding window with fixed size and slide interval

35

* @param size Window size expression (time interval)

36

*/

37

case class SlideWithSize(size: Expression) {

38

/**

39

* Specifies the slide interval

40

* @param slide Slide interval expression (time interval)

41

* @returns Window specification with slide

42

*/

43

def every(slide: Expression): SlideWithSizeAndSlide

44

}

45

46

case class SlideWithSizeAndSlide(size: Expression, slide: Expression) {

47

/**

48

* Specifies the time attribute for the window

49

* @param timeField Time field expression (rowtime or proctime)

50

* @returns Window specification with time attribute

51

*/

52

def on(timeField: Expression): SlideWithSizeAndSlideOnTime

53

}

54

55

case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) {

56

/**

57

* Assigns an alias to the window

58

* @param alias Window alias for referencing window properties

59

* @returns Complete sliding window specification

60

*/

61

def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias

62

}

63

64

/**

65

* Session window with dynamic gaps based on data activity

66

* @param gap Session gap expression (time interval)

67

*/

68

case class SessionWithGap(gap: Expression) {

69

/**

70

* Specifies the time attribute for the window

71

* @param timeField Time field expression (rowtime or proctime)

72

* @returns Window specification with time attribute

73

*/

74

def on(timeField: Expression): SessionWithGapOnTime

75

}

76

77

case class SessionWithGapOnTime(gap: Expression, timeField: Expression) {

78

/**

79

* Assigns an alias to the window

80

* @param alias Window alias for referencing window properties

81

* @returns Complete session window specification

82

*/

83

def as(alias: Expression): SessionWithGapOnTimeWithAlias

84

}

85

```

86

87

**Usage Examples:**

88

89

```scala

90

import org.apache.flink.table.api.Tumble

91

import org.apache.flink.table.api.Slide

92

import org.apache.flink.table.api.Session

93

94

// Tumbling window - 10 minute non-overlapping windows

95

val tumblingResult = table

96

.window(Tumble over 10.minutes on 'rowtime as 'w)

97

.groupBy('w, 'userId)

98

.select('userId, 'w.start, 'w.end, 'amount.sum)

99

100

// Sliding window - 10 minute windows sliding every 5 minutes

101

val slidingResult = table

102

.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

103

.groupBy('w, 'userId)

104

.select('userId, 'w.start, 'w.end, 'amount.avg)

105

106

// Session window - sessions with 15 minute inactivity gap

107

val sessionResult = table

108

.window(Session withGap 15.minutes on 'rowtime as 'w)

109

.groupBy('w, 'userId)

110

.select('userId, 'w.start, 'w.end, 'eventCount.count)

111

112

// Processing time windows (using proctime)

113

val proctimeResult = table

114

.window(Tumble over 1.hour on 'proctime as 'w)

115

.groupBy('w, 'category)

116

.select('category, 'w.start, 'amount.max)

117

```

118

119

### Window Properties

120

121

Access window metadata and boundaries within windowed aggregations.

122

123

```scala { .api }

124

/**

125

* Window properties available in windowed table operations

126

*/

127

trait WindowProperty extends Expression {

128

/**

129

* Start timestamp of the window

130

*/

131

def start: Expression

132

133

/**

134

* End timestamp of the window

135

*/

136

def end: Expression

137

138

/**

139

* Rowtime timestamp of the window (for event time windows)

140

*/

141

def rowtime: Expression

142

143

/**

144

* Processing time timestamp of the window (for processing time windows)

145

*/

146

def proctime: Expression

147

}

148

```

149

150

**Usage Examples:**

151

152

```scala

153

// Access window properties in aggregations

154

val windowedStats = table

155

.window(Tumble over 1.hour on 'rowtime as 'w)

156

.groupBy('w, 'department)

157

.select(

158

'department,

159

'w.start as 'windowStart,

160

'w.end as 'windowEnd,

161

'w.rowtime as 'windowTime,

162

'salary.avg as 'avgSalary,

163

'employee.count as 'employeeCount

164

)

165

166

// Use window properties in filtering

167

val recentWindows = windowedStats

168

.filter('windowEnd > (currentTimestamp() - 2.hours))

169

```

170

171

### Over Windows

172

173

Row-based windows for analytical functions and running calculations.

174

175

```scala { .api }

176

/**

177

* Over window specification for analytical functions

178

* @param partitionBy Partitioning expressions

179

* @param orderBy Ordering expression

180

* @param preceding Frame start (rows or range before current)

181

* @param following Frame end (rows or range after current)

182

*/

183

case class OverWindow(

184

partitionBy: Seq[Expression],

185

orderBy: Expression,

186

preceding: Expression,

187

following: Expression

188

)

189

190

/**

191

* Over window builder starting with OVER keyword

192

*/

193

object Over {

194

/**

195

* Partitions the over window by specified fields

196

* @param fields Partitioning field expressions

197

* @returns Partial over window specification

198

*/

199

def partitionBy(fields: Expression*): OverWindowWithPartitioning

200

201

/**

202

* Orders the over window by specified field

203

* @param field Ordering field expression

204

* @returns Partial over window specification

205

*/

206

def orderBy(field: Expression): OverWindowWithOrdering

207

}

208

209

case class OverWindowWithPartitioning(partitionBy: Seq[Expression]) {

210

/**

211

* Orders the partitioned over window

212

* @param field Ordering field expression

213

* @returns Over window with partitioning and ordering

214

*/

215

def orderBy(field: Expression): OverWindowWithPartitioningAndOrdering

216

}

217

218

case class OverWindowWithOrdering(orderBy: Expression) {

219

/**

220

* Specifies the preceding frame boundary

221

* @param preceding Frame start boundary

222

* @returns Over window with ordering and preceding

223

*/

224

def preceding(preceding: Expression): OverWindowWithPreceding

225

}

226

227

case class OverWindowWithPartitioningAndOrdering(partitionBy: Seq[Expression], orderBy: Expression) {

228

/**

229

* Specifies the preceding frame boundary

230

* @param preceding Frame start boundary

231

* @returns Over window with partitioning, ordering, and preceding

232

*/

233

def preceding(preceding: Expression): OverWindowWithPreceding

234

}

235

236

case class OverWindowWithPreceding(/* fields */) {

237

/**

238

* Specifies the following frame boundary

239

* @param following Frame end boundary

240

* @returns Complete over window specification

241

*/

242

def following(following: Expression): OverWindow

243

244

/**

245

* Assigns an alias to the over window

246

* @param alias Window alias

247

* @returns Complete over window specification with alias

248

*/

249

def as(alias: Expression): OverWindow

250

}

251

252

// Frame boundary constants

253

object FrameBoundary {

254

val UNBOUNDED_PRECEDING: Expression = ???

255

val UNBOUNDED_FOLLOWING: Expression = ???

256

val CURRENT_ROW: Expression = ???

257

val CURRENT_RANGE: Expression = ???

258

}

259

```

260

261

**Usage Examples:**

262

263

```scala

264

import org.apache.flink.table.api.Over

265

import org.apache.flink.table.api.FrameBoundary._

266

267

// Running sum over all previous rows in partition

268

val runningSum = table

269

.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)

270

.select('employee, 'department, 'salary, 'salary.sum over 'w as 'runningSalary)

271

272

// Moving average over last 3 rows

273

val movingAvg = table

274

.window(Over partitionBy 'department orderBy 'date.asc preceding 2.rows following CURRENT_ROW as 'w)

275

.select('employee, 'date, 'sales, 'sales.avg over 'w as 'movingAvgSales)

276

277

// Ranking within partition

278

val ranking = table

279

.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)

280

.select('employee, 'department, 'salary, row_number() over 'w as 'rank)

281

282

// Multiple over windows

283

val analytics = table

284

.window(

285

Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'salaryWindow,

286

Over partitionBy 'department orderBy 'hireDate.asc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'timeWindow

287

)

288

.select(

289

'employee,

290

'salary,

291

'salary.sum over 'salaryWindow as 'totalDeptSalary,

292

row_number() over 'salaryWindow as 'salaryRank,

293

row_number() over 'timeWindow as 'seniorityRank

294

)

295

```

296

297

### Windowed Table Operations

298

299

Operations available on windowed tables for grouping and aggregation.

300

301

```scala { .api }

302

/**

303

* Table with applied window specification

304

*/

305

class WindowedTable {

306

/**

307

* Groups the windowed table by specified fields

308

* @param fields Grouping field expressions (usually includes window alias)

309

* @returns Grouped windowed table for aggregation

310

*/

311

def groupBy(fields: Expression*): WindowGroupedTable

312

}

313

314

/**

315

* Windowed table after grouping, ready for aggregation

316

*/

317

class WindowGroupedTable {

318

/**

319

* Selects aggregated results from windowed groups

320

* @param fields Field expressions including aggregations and window properties

321

* @returns Aggregated table

322

*/

323

def select(fields: Expression*): Table

324

}

325

326

/**

327

* Table with applied over windows

328

*/

329

class OverWindowedTable {

330

/**

331

* Selects fields with over window functions applied

332

* @param fields Field expressions including over window functions

333

* @returns Table with over window calculations

334

*/

335

def select(fields: Expression*): Table

336

}

337

```

338

339

**Usage Examples:**

340

341

```scala

342

// Complex windowed aggregation

343

val complexWindowed = table

344

.window(Tumble over 15.minutes on 'eventTime as 'w)

345

.groupBy('w, 'userId, 'category)

346

.select(

347

'userId,

348

'category,

349

'w.start as 'windowStart,

350

'w.end as 'windowEnd,

351

'amount.sum as 'totalAmount,

352

'amount.avg as 'avgAmount,

353

'amount.min as 'minAmount,

354

'amount.max as 'maxAmount,

355

'transactionId.count as 'transactionCount,

356

'transactionId.countDistinct as 'uniqueTransactions

357

)

358

359

// Over window with multiple analytical functions

360

val analyticalResult = table

361

.window(Over partitionBy 'category orderBy 'amount.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)

362

.select(

363

'transactionId,

364

'category,

365

'amount,

366

'amount.sum over 'w as 'categoryTotal,

367

'amount.avg over 'w as 'categoryAvg,

368

row_number() over 'w as 'amountRank,

369

rank() over 'w as 'amountRankWithTies,

370

dense_rank() over 'w as 'amountDenseRank,

371

percent_rank() over 'w as 'amountPercentRank

372

)

373

```

374

375

### Time Attributes

376

377

Special timestamp fields for defining event time and processing time.

378

379

```scala { .api }

380

/**

381

* Methods for defining time attributes in table schemas

382

*/

383

object TimeAttributes {

384

/**

385

* Defines a rowtime attribute for event time processing

386

* @param field Timestamp field expression

387

* @returns Rowtime attribute expression

388

*/

389

def rowtime(field: Expression): Expression

390

391

/**

392

* Defines a processing time attribute

393

* @returns Processing time attribute expression

394

*/

395

def proctime(): Expression

396

}

397

```

398

399

**Usage Examples:**

400

401

```scala

402

// Define table with time attributes

403

val tableWithTime = table

404

.select('userId, 'amount, 'eventTimestamp.rowtime as 'rowtime, proctime() as 'proctime)

405

406

// Register table source with time attributes

407

val sourceWithTime = new StreamTableSource[Row] {

408

override def getTableSchema: TableSchema = {

409

new TableSchema(

410

Array("userId", "amount", "eventTime", "proctime"),

411

Array(Types.LONG, Types.DOUBLE, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)

412

)

413

}

414

415

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {

416

execEnv.addSource(/* source */)

417

.assignTimestampsAndWatermarks(/* watermark strategy */)

418

}

419

}

420

421

// Use time attributes in window operations

422

val eventTimeWindows = tableWithTime

423

.window(Tumble over 1.hour on 'rowtime as 'w)

424

.groupBy('w, 'userId)

425

.select('userId, 'w.start, 'amount.sum)

426

427

val procTimeWindows = tableWithTime

428

.window(Tumble over 1.hour on 'proctime as 'w)

429

.groupBy('w, 'userId)

430

.select('userId, 'w.start, 'amount.count)

431

```

432

433

### Window SQL Support

434

435

SQL syntax for window operations and time functions.

436

437

```scala { .api }

438

// Available window functions in SQL

439

val sqlWindowed = tEnv.sqlQuery("""

440

SELECT

441

userId,

442

TUMBLE_START(rowtime, INTERVAL '1' HOUR) as window_start,

443

TUMBLE_END(rowtime, INTERVAL '1' HOUR) as window_end,

444

SUM(amount) as total_amount,

445

COUNT(*) as transaction_count

446

FROM Transactions

447

GROUP BY

448

userId,

449

TUMBLE(rowtime, INTERVAL '1' HOUR)

450

""")

451

452

val sqlOverWindow = tEnv.sqlQuery("""

453

SELECT

454

userId,

455

amount,

456

SUM(amount) OVER (

457

PARTITION BY userId

458

ORDER BY eventTime

459

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

460

) as running_total,

461

ROW_NUMBER() OVER (

462

PARTITION BY userId

463

ORDER BY amount DESC

464

) as amount_rank

465

FROM Transactions

466

""")

467

```

468

469

## Watermarks and Late Data Handling

470

471

Configuration for handling out-of-order events and late arrivals.

472

473

```scala { .api }

474

/**

475

* Watermark strategies for event time processing

476

*/

477

trait WatermarkStrategy {

478

def extractTimestamp(element: Row, recordTimestamp: Long): Long

479

def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit

480

}

481

482

/**

483

* Configuration for late data handling

484

*/

485

case class LatentConfig(

486

allowedLateness: Time,

487

sideOutputTag: OutputTag[Row]

488

)

489

```

490

491

**Usage Examples:**

492

493

```scala

494

// Configure watermarks and late data handling

495

val tableWithWatermarks = table

496

.select('userId, 'amount, 'eventTime.rowtime as 'rowtime)

497

.where('eventTime > (currentTimestamp() - 1.day)) // Filter very old events

498

499

// Window with allowed lateness

500

val lateDataHandling = tableWithWatermarks

501

.window(Tumble over 1.hour on 'rowtime as 'w)

502

.allowedLateness(5.minutes) // Allow 5 minutes of lateness

503

.groupBy('w, 'userId)

504

.select('userId, 'w.start, 'amount.sum)

505

```

506

507

## Types

508

509

```scala { .api }

510

sealed trait Window

511

case class TumbleWithSize(size: Expression) extends Window

512

case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) extends Window

513

case class TumbleWithSizeOnTimeWithAlias(size: Expression, timeField: Expression, alias: Expression) extends Window

514

515

case class SlideWithSize(size: Expression) extends Window

516

case class SlideWithSizeAndSlide(size: Expression, slide: Expression) extends Window

517

case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) extends Window

518

case class SlideWithSizeAndSlideOnTimeWithAlias(size: Expression, slide: Expression, timeField: Expression, alias: Expression) extends Window

519

520

case class SessionWithGap(gap: Expression) extends Window

521

case class SessionWithGapOnTime(gap: Expression, timeField: Expression) extends Window

522

case class SessionWithGapOnTimeWithAlias(gap: Expression, timeField: Expression, alias: Expression) extends Window

523

524

case class OverWindow(partitionBy: Seq[Expression], orderBy: Expression, preceding: Expression, following: Expression)

525

case class OverWindowWithAlias(window: OverWindow, alias: Expression)

526

527

class WindowedTable

528

class WindowGroupedTable

529

class OverWindowedTable

530

531

trait WindowProperty extends Expression

532

trait WatermarkStrategy

533

case class LatentConfig(allowedLateness: Time, sideOutputTag: OutputTag[Row])

534

```