or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mdcatalog-management.mdexpressions.mdindex.mdsql-integration.mdtable-environment.mdtable-operations.mduser-defined-functions.mdwindow-operations.md

window-operations.mddocs/

0

# Window Operations

1

2

Window operations enable time-based and count-based processing for streaming data. Flink supports multiple window types including tumbling, sliding, session windows, and over windows for different analytical scenarios.

3

4

## Capabilities

5

6

### Tumbling Windows

7

8

Fixed-size, non-overlapping windows that partition data into distinct time segments.

9

10

```java { .api }

11

public final class Tumble {

12

/**

13

* Creates a tumbling window with the specified size

14

* @param size Window size expression (duration)

15

* @return TumbleWithSize for further configuration

16

*/

17

public static TumbleWithSize over(Expression size);

18

}

19

20

public final class TumbleWithSize {

21

/**

22

* Specifies the time attribute for the tumbling window

23

* @param timeField Time attribute column

24

* @return TumbleWithSizeOnTime for alias configuration

25

*/

26

public TumbleWithSizeOnTime on(Expression timeField);

27

}

28

29

public final class TumbleWithSizeOnTime {

30

/**

31

* Assigns an alias to the window for reference in selection

32

* @param alias Window alias name

33

* @return GroupWindow that can be used with groupBy

34

*/

35

public GroupWindow as(String alias);

36

}

37

```

38

39

**Usage Examples:**

40

41

```java

42

import static org.apache.flink.table.api.Expressions.*;

43

44

// 1-hour tumbling window

45

GroupWindow hourlyWindow = Tumble

46

.over(lit(1).hour())

47

.on($("event_time"))

48

.as("hourly_window");

49

50

Table hourlyStats = sourceTable

51

.window(hourlyWindow)

52

.groupBy($("hourly_window"), $("category"))

53

.select(

54

$("category"),

55

$("hourly_window").start().as("window_start"),

56

$("hourly_window").end().as("window_end"),

57

count($("*")).as("event_count"),

58

sum($("amount")).as("total_amount")

59

);

60

61

// 15-minute tumbling window

62

GroupWindow quarterHourWindow = Tumble

63

.over(lit(15).minute())

64

.on($("process_time"))

65

.as("quarter_hour");

66

67

Table frequentStats = sourceTable

68

.window(quarterHourWindow)

69

.groupBy($("quarter_hour"))

70

.select(

71

$("quarter_hour").start().as("period_start"),

72

count($("*")).as("transaction_count"),

73

avg($("value")).as("avg_transaction_value")

74

);

75

```

76

77

### Sliding Windows

78

79

Fixed-size, overlapping windows that slide by a specified interval, useful for moving averages and trend analysis.

80

81

```java { .api }

82

public final class Slide {

83

/**

84

* Creates a sliding window with the specified size

85

* @param size Window size expression (duration)

86

* @return SlideWithSize for slide interval configuration

87

*/

88

public static SlideWithSize over(Expression size);

89

}

90

91

public final class SlideWithSize {

92

/**

93

* Specifies the slide interval for the window

94

* @param slide Slide interval expression (duration)

95

* @return SlideWithSizeAndSlide for time field configuration

96

*/

97

public SlideWithSizeAndSlide every(Expression slide);

98

}

99

100

public final class SlideWithSizeAndSlide {

101

/**

102

* Specifies the time attribute for the sliding window

103

* @param timeField Time attribute column

104

* @return SlideWithSizeAndSlideOnTime for alias configuration

105

*/

106

public SlideWithSizeAndSlideOnTime on(Expression timeField);

107

}

108

109

public final class SlideWithSizeAndSlideOnTime {

110

/**

111

* Assigns an alias to the sliding window

112

* @param alias Window alias name

113

* @return GroupWindow for use with groupBy

114

*/

115

public GroupWindow as(String alias);

116

}

117

```

118

119

**Usage Examples:**

120

121

```java

122

// 1-hour window sliding every 15 minutes

123

GroupWindow slidingWindow = Slide

124

.over(lit(1).hour())

125

.every(lit(15).minute())

126

.on($("event_time"))

127

.as("sliding_window");

128

129

Table movingAverages = sourceTable

130

.window(slidingWindow)

131

.groupBy($("sliding_window"), $("sensor_id"))

132

.select(

133

$("sensor_id"),

134

$("sliding_window").start().as("window_start"),

135

$("sliding_window").end().as("window_end"),

136

avg($("temperature")).as("avg_temperature"),

137

count($("*")).as("reading_count")

138

);

139

140

// 30-minute window sliding every 5 minutes for real-time monitoring

141

GroupWindow realtimeWindow = Slide

142

.over(lit(30).minute())

143

.every(lit(5).minute())

144

.on($("processing_time"))

145

.as("realtime_window");

146

147

Table realtimeMetrics = sourceTable

148

.window(realtimeWindow)

149

.groupBy($("realtime_window"))

150

.select(

151

$("realtime_window").start().as("period_start"),

152

count($("*")).as("event_rate"),

153

sum($("bytes")).as("total_bytes"),

154

max($("latency")).as("max_latency")

155

);

156

```

157

158

### Session Windows

159

160

Dynamic windows that group events based on activity sessions with configurable gap timeouts.

161

162

```java { .api }

163

public final class Session {

164

/**

165

* Creates a session window with the specified gap

166

* @param gap Session gap expression (duration of inactivity)

167

* @return SessionWithGap for time field configuration

168

*/

169

public static SessionWithGap withGap(Expression gap);

170

}

171

172

public final class SessionWithGap {

173

/**

174

* Specifies the time attribute for the session window

175

* @param timeField Time attribute column

176

* @return SessionWithGapOnTime for alias configuration

177

*/

178

public SessionWithGapOnTime on(Expression timeField);

179

}

180

181

public final class SessionWithGapOnTime {

182

/**

183

* Assigns an alias to the session window

184

* @param alias Window alias name

185

* @return GroupWindow for use with groupBy

186

*/

187

public GroupWindow as(String alias);

188

}

189

```

190

191

**Usage Examples:**

192

193

```java

194

// Session window with 30-minute inactivity gap

195

GroupWindow userSession = Session

196

.withGap(lit(30).minute())

197

.on($("event_time"))

198

.as("user_session");

199

200

Table sessionAnalysis = sourceTable

201

.window(userSession)

202

.groupBy($("user_session"), $("user_id"))

203

.select(

204

$("user_id"),

205

$("user_session").start().as("session_start"),

206

$("user_session").end().as("session_end"),

207

count($("*")).as("actions_in_session"),

208

sum($("duration")).as("total_session_time"),

209

max($("page_views")).as("max_page_views")

210

);

211

212

// Short session window for detecting bursts of activity

213

GroupWindow activityBurst = Session

214

.withGap(lit(2).minute())

215

.on($("event_time"))

216

.as("activity_burst");

217

218

Table burstDetection = sourceTable

219

.window(activityBurst)

220

.groupBy($("activity_burst"), $("device_id"))

221

.select(

222

$("device_id"),

223

$("activity_burst").start().as("burst_start"),

224

$("activity_burst").end().as("burst_end"),

225

count($("*")).as("burst_event_count")

226

)

227

.filter($("burst_event_count").isGreater(10)); // Only high-activity bursts

228

```

229

230

### Over Windows

231

232

Unbounded or bounded windows for analytical functions like ranking, cumulative sums, and moving averages without explicit grouping.

233

234

```java { .api }

235

public final class Over {

236

/**

237

* Creates an Over window partitioned by specified fields

238

* @param partitionBy Fields to partition the window by

239

* @return OverWindowPartitioned for ordering configuration

240

*/

241

public static OverWindowPartitioned partitionBy(Expression... partitionBy);

242

243

/**

244

* Creates an Over window with ordering but no partitioning

245

* @param orderBy Fields to order the window by

246

* @return OverWindowPartitionedOrdered for range/rows configuration

247

*/

248

public static OverWindowPartitionedOrdered orderBy(Expression... orderBy);

249

}

250

251

public interface OverWindowPartitioned {

252

/**

253

* Specifies ordering for the partitioned over window

254

* @param orderBy Ordering expressions

255

* @return OverWindowPartitionedOrdered for range/rows configuration

256

*/

257

OverWindowPartitionedOrdered orderBy(Expression... orderBy);

258

}

259

260

public interface OverWindowPartitionedOrdered {

261

/**

262

* Creates a preceding rows window

263

* @param preceding Number of preceding rows

264

* @return OverWindowPartitionedOrderedPreceding for alias configuration

265

*/

266

OverWindowPartitionedOrderedPreceding preceding(Expression preceding);

267

268

/**

269

* Creates an unbounded preceding window

270

* @return OverWindow for alias configuration

271

*/

272

OverWindow unboundedPreceding();

273

274

/**

275

* Creates a current row window

276

* @return OverWindow for alias configuration

277

*/

278

OverWindow currentRow();

279

}

280

281

public interface OverWindow {

282

/**

283

* Assigns an alias to the over window

284

* @param alias Window alias name

285

* @return OverWindow with alias

286

*/

287

OverWindow as(String alias);

288

}

289

```

290

291

**Usage Examples:**

292

293

```java

294

// Running totals and cumulative calculations

295

OverWindow cumulativeWindow = Over

296

.partitionBy($("customer_id"))

297

.orderBy($("order_date"))

298

.unboundedPreceding()

299

.as("cumulative");

300

301

Table runningTotals = sourceTable

302

.window(cumulativeWindow)

303

.select(

304

$("customer_id"),

305

$("order_date"),

306

$("amount"),

307

sum($("amount")).over($("cumulative")).as("running_total"),

308

count($("*")).over($("cumulative")).as("order_sequence"),

309

row_number().over($("cumulative")).as("order_rank")

310

);

311

312

// Moving averages with bounded windows

313

OverWindow movingAvgWindow = Over

314

.partitionBy($("product_id"))

315

.orderBy($("sale_date"))

316

.preceding(lit(6)) // 7-day moving window (6 preceding + current)

317

.as("weekly_window");

318

319

Table movingAverages = sourceTable

320

.window(movingAvgWindow)

321

.select(

322

$("product_id"),

323

$("sale_date"),

324

$("daily_sales"),

325

avg($("daily_sales")).over($("weekly_window")).as("weekly_avg_sales"),

326

sum($("daily_sales")).over($("weekly_window")).as("weekly_total_sales")

327

);

328

329

// Ranking and analytical functions

330

OverWindow rankingWindow = Over

331

.partitionBy($("department"))

332

.orderBy($("salary").desc())

333

.currentRow()

334

.as("dept_ranking");

335

336

Table employeeRanking = sourceTable

337

.window(rankingWindow)

338

.select(

339

$("employee_id"),

340

$("name"),

341

$("department"),

342

$("salary"),

343

row_number().over($("dept_ranking")).as("salary_rank"),

344

rank().over($("dept_ranking")).as("salary_dense_rank"),

345

lag($("salary"), 1).over($("dept_ranking")).as("next_highest_salary")

346

);

347

```

348

349

### Window Functions and Expressions

350

351

Special functions available for window operations and time manipulation.

352

353

```java { .api }

354

// Time interval expressions

355

public static Expression lit(long value).year();

356

public static Expression lit(long value).month();

357

public static Expression lit(long value).day();

358

public static Expression lit(long value).hour();

359

public static Expression lit(long value).minute();

360

public static Expression lit(long value).second();

361

public static Expression lit(long value).milli();

362

363

// Window functions

364

public static Expression rowNumber();

365

public static Expression rank();

366

public static Expression denseRank();

367

public static Expression lag(Expression field, int offset);

368

public static Expression lead(Expression field, int offset);

369

public static Expression firstValue(Expression field);

370

public static Expression lastValue(Expression field);

371

372

// Window start/end functions (for group windows)

373

// Available as methods on window alias expressions

374

// $("window_alias").start()

375

// $("window_alias").end()

376

```

377

378

**Usage Examples:**

379

380

```java

381

// Time interval examples

382

GroupWindow dailyWindow = Tumble

383

.over(lit(1).day())

384

.on($("event_time"))

385

.as("daily");

386

387

GroupWindow weeklySliding = Slide

388

.over(lit(7).day())

389

.every(lit(1).day())

390

.on($("event_time"))

391

.as("weekly_sliding");

392

393

// Analytical window functions

394

OverWindow analyticalWindow = Over

395

.partitionBy($("category"))

396

.orderBy($("created_date"))

397

.unboundedPreceding()

398

.as("analytical");

399

400

Table analyticalResults = sourceTable

401

.window(analyticalWindow)

402

.select(

403

$("id"),

404

$("category"),

405

$("value"),

406

$("created_date"),

407

rowNumber().over($("analytical")).as("row_num"),

408

rank().over($("analytical")).as("rank"),

409

lag($("value"), 1).over($("analytical")).as("prev_value"),

410

lead($("value"), 1).over($("analytical")).as("next_value"),

411

firstValue($("value")).over($("analytical")).as("first_in_category"),

412

lastValue($("value")).over($("analytical")).as("current_last")

413

);

414

```

415

416

### WindowGroupedTable Operations

417

418

Tables with window grouping applied support aggregation operations.

419

420

```java { .api }

421

public interface WindowGroupedTable {

422

/**

423

* Performs selection with aggregation on windowed data

424

* @param fields Selection expressions including aggregates and window functions

425

* @return Table with windowed aggregation results

426

*/

427

Table select(Expression... fields);

428

}

429

```

430

431

### Complex Window Scenarios

432

433

Advanced windowing patterns for complex analytical use cases.

434

435

**Usage Examples:**

436

437

```java

438

// Multi-level windowing - hourly stats with daily rollups

439

Table hourlyStats = sourceTable

440

.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))

441

.groupBy($("hourly"), $("region"))

442

.select(

443

$("region"),

444

$("hourly").start().as("hour_start"),

445

count($("*")).as("hourly_count"),

446

sum($("revenue")).as("hourly_revenue")

447

);

448

449

Table dailyRollup = hourlyStats

450

.window(Tumble.over(lit(1).day()).on($("hour_start")).as("daily"))

451

.groupBy($("daily"), $("region"))

452

.select(

453

$("region"),

454

$("daily").start().as("day_start"),

455

sum($("hourly_count")).as("daily_count"),

456

sum($("hourly_revenue")).as("daily_revenue"),

457

avg($("hourly_revenue")).as("avg_hourly_revenue")

458

);

459

460

// Session analysis with user behavior patterns

461

GroupWindow userActivitySession = Session

462

.withGap(lit(20).minute())

463

.on($("event_time"))

464

.as("session");

465

466

Table sessionBehavior = sourceTable

467

.window(userActivitySession)

468

.groupBy($("session"), $("user_id"))

469

.select(

470

$("user_id"),

471

$("session").start().as("session_start"),

472

$("session").end().as("session_end"),

473

count($("*")).as("total_actions"),

474

countDistinct($("page_id")).as("unique_pages"),

475

sum($("time_spent")).as("total_time"),

476

first($("entry_page")).as("landing_page"),

477

last($("page_id")).as("exit_page"),

478

// Calculate session duration manually

479

$("session").end().minus($("session").start()).as("session_duration")

480

);

481

```

482

483

## Window Time Attributes

484

485

```java { .api }

486

// Processing time attribute (system time when record is processed)

487

// Usually defined in DDL or table descriptor

488

// PROCTIME() AS processing_time

489

490

// Event time attribute (timestamp from the data)

491

// Defined with watermark strategy in DDL

492

// event_time TIMESTAMP(3),

493

// WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

494

495

// Window functions for accessing time attributes

496

Expression start(); // Window start time

497

Expression end(); // Window end time

498

Expression proctime(); // Processing time

499

Expression rowtime(); // Event time (watermark)

500

```