or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

window-operations.mddocs/

0

# Window Operations and Time Processing

1

2

This document covers time-based and count-based windowing operations for streaming data analysis in Apache Flink Table Uber Blink.

3

4

## Time Attributes

5

6

### Processing Time

7

8

```java { .api }

9

// In Table API

10

Table table = tableEnv.fromDataStream(dataStream,

11

$("user_id"),

12

$("data"),

13

$("proc_time").proctime()

14

);

15

16

// In SQL DDL

17

tEnv.executeSql(

18

"CREATE TABLE events (" +

19

" user_id BIGINT," +

20

" data STRING," +

21

" proc_time AS PROCTIME()" +

22

") WITH (...)"

23

);

24

```

25

26

### Event Time

27

28

```java { .api }

29

// With watermark in DDL

30

tEnv.executeSql(

31

"CREATE TABLE events (" +

32

" user_id BIGINT," +

33

" event_time TIMESTAMP(3)," +

34

" data STRING," +

35

" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +

36

") WITH (...)"

37

);

38

39

// In Table API

40

Table table = tableEnv.fromDataStream(watermarkedStream,

41

$("user_id"),

42

$("event_time").rowtime(),

43

$("data")

44

);

45

```

46

47

## Group Windows (Table API)

48

49

### Tumbling Windows

50

51

```java { .api }

52

class Tumble {

53

static TumbleWithSize over(Expression size);

54

}

55

56

interface TumbleWithSize {

57

TumbleWithSizeOnTime on(Expression timeField);

58

}

59

60

interface TumbleWithSizeOnTime {

61

GroupWindow as(String alias);

62

}

63

```

64

65

**Usage:**

66

67

```java

68

Table result = table

69

.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))

70

.groupBy($("user_id"), $("w"))

71

.select($("user_id"), $("w").start(), $("w").end(), $("data").count());

72

```

73

74

### Sliding Windows

75

76

```java { .api }

77

class Slide {

78

static SlideWithSize over(Expression size);

79

}

80

81

interface SlideWithSize {

82

SlideWithSizeAndSlide every(Expression slide);

83

}

84

85

interface SlideWithSizeAndSlide {

86

SlideWithSizeAndSlideOnTime on(Expression timeField);

87

}

88

```

89

90

**Usage:**

91

92

```java

93

Table result = table

94

.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("event_time")).as("w"))

95

.groupBy($("user_id"), $("w"))

96

.select($("user_id"), $("w").start(), $("w").end(), $("data").count());

97

```

98

99

### Session Windows

100

101

```java { .api }

102

class Session {

103

static SessionWithGap withGap(Expression gap);

104

}

105

106

interface SessionWithGap {

107

SessionWithGapOnTime on(Expression timeField);

108

}

109

```

110

111

**Usage:**

112

113

```java

114

Table result = table

115

.window(Session.withGap(lit(30).minutes()).on($("event_time")).as("w"))

116

.groupBy($("user_id"), $("w"))

117

.select($("user_id"), $("w").start(), $("w").end(), $("data").count());

118

```

119

120

## Window SQL Functions

121

122

### Tumbling Window Functions

123

124

```sql

125

-- TUMBLE function

126

SELECT

127

user_id,

128

TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,

129

TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,

130

COUNT(*) as event_count

131

FROM events

132

GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

133

134

-- TUMBLE_ROWTIME and TUMBLE_PROCTIME

135

SELECT

136

user_id,

137

TUMBLE_ROWTIME(event_time, INTERVAL '5' MINUTE) as window_rowtime,

138

TUMBLE_PROCTIME(event_time, INTERVAL '5' MINUTE) as window_proctime,

139

COUNT(*) as event_count

140

FROM events

141

GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

142

```

143

144

### Sliding Window Functions

145

146

```sql

147

-- HOP function (sliding window)

148

SELECT

149

user_id,

150

HOP_START(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_start,

151

HOP_END(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_end,

152

COUNT(*) as event_count

153

FROM events

154

GROUP BY user_id, HOP(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE);

155

```

156

157

### Session Window Functions

158

159

```sql

160

-- SESSION function

161

SELECT

162

user_id,

163

SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,

164

SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,

165

COUNT(*) as event_count

166

FROM events

167

GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);

168

```

169

170

## Over Windows (Window Aggregations)

171

172

### Unbounded Over Windows

173

174

```sql

175

-- OVER clause with unbounded preceding

176

SELECT

177

user_id,

178

event_time,

179

data,

180

COUNT(*) OVER (

181

PARTITION BY user_id

182

ORDER BY event_time

183

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

184

) as running_count,

185

SUM(amount) OVER (

186

PARTITION BY user_id

187

ORDER BY event_time

188

RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

189

) as running_sum

190

FROM events;

191

```

192

193

### Bounded Over Windows

194

195

```sql

196

-- Sliding window with OVER

197

SELECT

198

user_id,

199

event_time,

200

data,

201

COUNT(*) OVER (

202

PARTITION BY user_id

203

ORDER BY event_time

204

ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

205

) as count_last_10,

206

AVG(amount) OVER (

207

PARTITION BY user_id

208

ORDER BY event_time

209

RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW

210

) as avg_last_hour

211

FROM events;

212

```

213

214

### Over Window in Table API

215

216

```java { .api }

217

interface Table {

218

OverWindowedTable window(OverWindow overWindow);

219

}

220

221

class Over {

222

static OverWindowPartitionedOrderedPreceding partitionBy(Expression... fields);

223

static OverWindowPartitionedOrdered orderBy(Expression field);

224

}

225

```

226

227

**Usage:**

228

229

```java

230

Table result = table

231

.window(Over.partitionBy($("user_id")).orderBy($("event_time")).preceding(UNBOUNDED_ROW).as("w"))

232

.select($("user_id"), $("event_time"), $("data"), $("data").count().over($("w")));

233

```

234

235

## Window TVF (Table-Valued Functions)

236

237

### Tumble TVF

238

239

```sql

240

-- TUMBLE TVF (Flink 1.13+)

241

SELECT

242

window_start,

243

window_end,

244

user_id,

245

COUNT(*) as event_count

246

FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE))

247

GROUP BY window_start, window_end, user_id;

248

```

249

250

### Hop TVF

251

252

```sql

253

-- HOP TVF

254

SELECT

255

window_start,

256

window_end,

257

user_id,

258

COUNT(*) as event_count

259

FROM TABLE(HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))

260

GROUP BY window_start, window_end, user_id;

261

```

262

263

### Session TVF

264

265

```sql

266

-- SESSION TVF

267

SELECT

268

window_start,

269

window_end,

270

user_id,

271

COUNT(*) as event_count

272

FROM TABLE(SESSION(TABLE events, DESCRIPTOR(event_time), DESCRIPTOR(user_id), INTERVAL '30' MINUTE))

273

GROUP BY window_start, window_end, user_id;

274

```

275

276

## Time-based Joins

277

278

### Interval Joins

279

280

```sql

281

-- Time-based interval join

282

SELECT

283

o.order_id,

284

o.user_id,

285

o.order_time,

286

p.payment_id,

287

p.payment_time

288

FROM orders o

289

JOIN payments p ON o.order_id = p.order_id

290

AND p.payment_time BETWEEN o.order_time - INTERVAL '1' HOUR

291

AND o.order_time + INTERVAL '1' HOUR;

292

```

293

294

### Temporal Joins

295

296

```java

297

// Register temporal table

298

tEnv.createTemporaryView("rates_temporal",

299

rates.createTemporalTableFunction($("update_time"), $("currency")));

300

301

// Temporal join in SQL

302

Table result = tEnv.sqlQuery(

303

"SELECT " +

304

" o.order_id, " +

305

" o.amount, " +

306

" o.currency, " +

307

" r.rate, " +

308

" o.amount * r.rate as amount_usd " +

309

"FROM orders o " +

310

"JOIN rates_temporal FOR SYSTEM_TIME AS OF o.order_time AS r " +

311

"ON o.currency = r.currency"

312

);

313

```

314

315

## Watermarks and Late Data

316

317

### Watermark Strategies

318

319

```java

320

// Bounded out-of-orderness

321

WatermarkStrategy<Event> strategy = WatermarkStrategy

322

.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))

323

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

324

325

// Monotonous timestamps

326

WatermarkStrategy<Event> monotonous = WatermarkStrategy

327

.<Event>forMonotonousTimestamps()

328

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

329

330

// Custom watermark generator

331

WatermarkStrategy<Event> custom = WatermarkStrategy

332

.forGenerator(ctx -> new CustomWatermarkGenerator())

333

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

334

```

335

336

### Late Data Handling

337

338

```java

339

// Configure late data handling

340

Configuration config = tEnv.getConfig().getConfiguration();

341

config.setString("table.exec.emit.late-fire.enabled", "true");

342

config.setString("table.exec.emit.late-fire.delay", "5 s");

343

344

// Side output for late data

345

DataStream<Event> lateEvents = mainStream

346

.assignTimestampsAndWatermarks(watermarkStrategy)

347

.process(new ProcessFunction<Event, Event>() {

348

private OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};

349

350

@Override

351

public void processElement(Event event, Context ctx, Collector<Event> out) {

352

if (event.getTimestamp() < ctx.timerService().currentWatermark()) {

353

ctx.output(lateOutputTag, event);

354

} else {

355

out.collect(event);

356

}

357

}

358

});

359

```

360

361

## Window Aggregations

362

363

### Built-in Aggregation Functions

364

365

```sql

366

-- Count, sum, average

367

SELECT

368

TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,

369

COUNT(*) as event_count,

370

SUM(amount) as total_amount,

371

AVG(amount) as avg_amount,

372

MIN(amount) as min_amount,

373

MAX(amount) as max_amount

374

FROM events

375

GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);

376

377

-- Statistical functions

378

SELECT

379

TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,

380

STDDEV_POP(amount) as stddev,

381

VAR_SAMP(amount) as variance,

382

COLLECT(user_id) as user_list,

383

LISTAGG(event_type, ',') as event_types

384

FROM events

385

GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);

386

```

387

388

## Types

389

390

```java { .api }

391

interface GroupWindow {

392

Expression getTimeField();

393

Expression getSize();

394

String getAlias();

395

}

396

397

class TumbleWithSize implements GroupWindow;

398

class SlideWithSizeAndSlide implements GroupWindow;

399

class SessionWithGap implements GroupWindow;

400

401

interface OverWindow {

402

Expression getPartitioning();

403

Expression getOrder();

404

Expression getPreceding();

405

Expression getFollowing();

406

String getAlias();

407

}

408

409

interface WindowGroupedTable extends Table {

410

Table select(Expression... fields);

411

AggregatedTable aggregate(Expression aggregateFunction);

412

FlatAggregateTable flatAggregate(Expression tableAggregateFunction);

413

}

414

415

interface OverWindowedTable extends Table {

416

Table select(Expression... fields);

417

}

418

419

// Window bounds

420

class UNBOUNDED_ROW;

421

class UNBOUNDED_RANGE;

422

class CURRENT_ROW;

423

class CURRENT_RANGE;

424

```