or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdcore-functions.mddatastream-traditional.mddatastream-v2.mdindex.mdstate-management.mdtable-api.mdwindowing.md

windowing.mddocs/

0

# Windowing System

1

2

Complete windowing system for time-based and count-based data aggregation, supporting event time and processing time semantics with customizable triggers and evictors.

3

4

## Capabilities

5

6

### Window Assigners

7

8

Window assigners determine which windows elements belong to.

9

10

```java { .api }

11

/**

12

* Base window assigner interface

13

* @param <T> Element type

14

* @param <W> Window type

15

*/

16

abstract class WindowAssigner<T, W extends Window> {

17

/**

18

* Assign windows to element

19

* @param element Element

20

* @param timestamp Element timestamp

21

* @param context Window assigner context

22

* @return Collection of windows

23

*/

24

public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

25

26

/**

27

* Get default trigger

28

* @param env Stream execution environment

29

* @return Default trigger

30

*/

31

public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

32

33

/**

34

* Get window serializer

35

* @param executionConfig Execution configuration

36

* @return Window serializer

37

*/

38

public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

39

}

40

41

/**

42

* Tumbling event time windows

43

*/

44

class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

45

/**

46

* Create tumbling windows with size

47

* @param size Window size

48

* @return Window assigner

49

*/

50

public static TumblingEventTimeWindows of(Time size);

51

52

/**

53

* Create tumbling windows with size and offset

54

* @param size Window size

55

* @param offset Window offset

56

* @return Window assigner

57

*/

58

public static TumblingEventTimeWindows of(Time size, Time offset);

59

}

60

61

/**

62

* Sliding event time windows

63

*/

64

class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

65

/**

66

* Create sliding windows

67

* @param size Window size

68

* @param slide Slide interval

69

* @return Window assigner

70

*/

71

public static SlidingEventTimeWindows of(Time size, Time slide);

72

73

/**

74

* Create sliding windows with offset

75

* @param size Window size

76

* @param slide Slide interval

77

* @param offset Window offset

78

* @return Window assigner

79

*/

80

public static SlidingEventTimeWindows of(Time size, Time slide, Time offset);

81

}

82

83

/**

84

* Event time session windows

85

*/

86

class EventTimeSessionWindows extends WindowAssigner<Object, TimeWindow> {

87

/**

88

* Create session windows with gap

89

* @param sessionTimeout Session timeout

90

* @return Window assigner

91

*/

92

public static EventTimeSessionWindows withGap(Time sessionTimeout);

93

94

/**

95

* Create dynamic session windows

96

* @param sessionWindowTimeGapExtractor Gap extractor function

97

* @param <T> Element type

98

* @return Window assigner

99

*/

100

public static <T> EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor);

101

}

102

```

103

104

### Window Triggers

105

106

Triggers determine when window computation should be performed.

107

108

```java { .api }

109

/**

110

* Base trigger class

111

* @param <T> Element type

112

* @param <W> Window type

113

*/

114

abstract class Trigger<T, W extends Window> {

115

/**

116

* Called when element is added to window

117

* @param element Element

118

* @param timestamp Element timestamp

119

* @param window Window

120

* @param ctx Trigger context

121

* @return Trigger result

122

* @throws Exception

123

*/

124

public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

125

126

/**

127

* Called when processing time timer fires

128

* @param time Timer timestamp

129

* @param window Window

130

* @param ctx Trigger context

131

* @return Trigger result

132

* @throws Exception

133

*/

134

public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

135

136

/**

137

* Called when event time timer fires

138

* @param time Timer timestamp

139

* @param window Window

140

* @param ctx Trigger context

141

* @return Trigger result

142

* @throws Exception

143

*/

144

public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

145

146

/**

147

* Clear trigger state

148

* @param window Window

149

* @param ctx Trigger context

150

* @throws Exception

151

*/

152

public abstract void clear(W window, TriggerContext ctx) throws Exception;

153

}

154

155

/**

156

* Trigger result enumeration

157

*/

158

enum TriggerResult {

159

/** Continue without action */

160

CONTINUE,

161

/** Fire window computation */

162

FIRE,

163

/** Purge window contents */

164

PURGE,

165

/** Fire and purge */

166

FIRE_AND_PURGE

167

}

168

169

/**

170

* Continuous processing time trigger

171

*/

172

class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {

173

/**

174

* Create trigger with interval

175

* @param interval Trigger interval

176

* @param <W> Window type

177

* @return Trigger instance

178

*/

179

public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval);

180

}

181

182

/**

183

* Delta trigger fires when element differs from last by threshold

184

* @param <T> Element type

185

* @param <W> Window type

186

*/

187

class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {

188

/**

189

* Create delta trigger

190

* @param threshold Delta threshold

191

* @param deltaFunction Delta calculation function

192

* @param typeInfo Type information

193

* @param <T> Element type

194

* @param <W> Window type

195

* @return Trigger instance

196

*/

197

public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeInformation<T> typeInfo);

198

}

199

```

200

201

### Window Evictors

202

203

Evictors remove elements from windows before or after window computation.

204

205

```java { .api }

206

/**

207

* Base evictor interface

208

* @param <T> Element type

209

* @param <W> Window type

210

*/

211

interface Evictor<T, W extends Window> {

212

/**

213

* Evict elements before window function

214

* @param elements Window elements

215

* @param size Number of elements

216

* @param window Window

217

* @param evictorContext Evictor context

218

*/

219

void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

220

221

/**

222

* Evict elements after window function

223

* @param elements Window elements

224

* @param size Number of elements

225

* @param window Window

226

* @param evictorContext Evictor context

227

*/

228

void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

229

}

230

231

/**

232

* Time-based evictor

233

* @param <W> Window type

234

*/

235

class TimeEvictor<W extends Window> implements Evictor<Object, W> {

236

/**

237

* Create time evictor

238

* @param windowSize Window size to keep

239

* @return Time evictor

240

*/

241

public static <W extends Window> TimeEvictor<W> of(Time windowSize);

242

243

/**

244

* Create time evictor with processing time

245

* @param windowSize Window size

246

* @param doEvictAfter Whether to evict after window function

247

* @return Time evictor

248

*/

249

public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);

250

}

251

252

/**

253

* Count-based evictor

254

* @param <W> Window type

255

*/

256

class CountEvictor<W extends Window> implements Evictor<Object, W> {

257

/**

258

* Create count evictor

259

* @param maxCount Maximum elements to keep

260

* @return Count evictor

261

*/

262

public static <W extends Window> CountEvictor<W> of(long maxCount);

263

264

/**

265

* Create count evictor

266

* @param maxCount Maximum elements

267

* @param doEvictAfter Whether to evict after window function

268

* @return Count evictor

269

*/

270

public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);

271

}

272

273

/**

274

* Delta-based evictor

275

* @param <T> Element type

276

* @param <W> Window type

277

*/

278

class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {

279

/**

280

* Create delta evictor

281

* @param threshold Delta threshold

282

* @param deltaFunction Delta function

283

* @return Delta evictor

284

*/

285

public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction);

286

}

287

```

288

289

### Window Functions

290

291

Functions for processing windowed data.

292

293

```java { .api }

294

/**

295

* Window function interface

296

* @param <IN> Input type

297

* @param <OUT> Output type

298

* @param <KEY> Key type

299

* @param <W> Window type

300

*/

301

interface WindowFunction<IN, OUT, KEY, W extends Window> {

302

/**

303

* Process window contents

304

* @param key Window key

305

* @param window Window

306

* @param input Window elements

307

* @param out Result collector

308

* @throws Exception

309

*/

310

void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;

311

}

312

313

/**

314

* Process window function with context

315

* @param <IN> Input type

316

* @param <OUT> Output type

317

* @param <KEY> Key type

318

* @param <W> Window type

319

*/

320

abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> {

321

/**

322

* Process window with context

323

* @param key Window key

324

* @param context Process context

325

* @param elements Window elements

326

* @param out Result collector

327

* @throws Exception

328

*/

329

public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

330

331

/**

332

* Process window context

333

*/

334

public abstract class Context {

335

/**

336

* Get current window

337

* @return Current window

338

*/

339

public abstract W window();

340

341

/**

342

* Get current processing time

343

* @return Processing time

344

*/

345

public abstract long currentProcessingTime();

346

347

/**

348

* Get current event time

349

* @return Event time

350

*/

351

public abstract long currentWatermark();

352

353

/**

354

* Get global state

355

* @param stateDescriptor State descriptor

356

* @param <S> State type

357

* @return Global state

358

*/

359

public abstract <S extends State> S globalState(StateDescriptor<S, ?> stateDescriptor);

360

361

/**

362

* Get window state

363

* @param stateDescriptor State descriptor

364

* @param <S> State type

365

* @return Window state

366

*/

367

public abstract <S extends State> S windowState(StateDescriptor<S, ?> stateDescriptor);

368

}

369

}

370

```

371

372

### Time Window

373

374

Time-based window implementation.

375

376

```java { .api }

377

/**

378

* Time window implementation

379

*/

380

class TimeWindow extends Window {

381

/**

382

* Create time window

383

* @param start Start timestamp

384

* @param end End timestamp

385

*/

386

public TimeWindow(long start, long end);

387

388

/**

389

* Get window start time

390

* @return Start timestamp

391

*/

392

public long getStart();

393

394

/**

395

* Get window end time

396

* @return End timestamp

397

*/

398

public long getEnd();

399

400

/**

401

* Get maximum timestamp in window

402

* @return Maximum timestamp

403

*/

404

public long maxTimestamp();

405

406

/**

407

* Check if timestamp intersects window

408

* @param timestamp Timestamp to check

409

* @return true if intersects

410

*/

411

public boolean intersects(TimeWindow other);

412

413

/**

414

* Get window center timestamp

415

* @return Center timestamp

416

*/

417

public long getCenter();

418

}

419

```

420

421

### Context Types

422

423

Context and utility types used by the windowing system.

424

425

```java { .api }

426

/**

427

* Window assigner context

428

*/

429

interface WindowAssignerContext {

430

/**

431

* Get current processing time

432

* @return Current processing time

433

*/

434

long getCurrentProcessingTime();

435

}

436

437

/**

438

* Trigger context interface

439

*/

440

interface TriggerContext {

441

/**

442

* Get current processing time

443

* @return Current processing time

444

*/

445

long getCurrentProcessingTime();

446

447

/**

448

* Get current watermark

449

* @return Current watermark

450

*/

451

long getCurrentWatermark();

452

453

/**

454

* Register processing time timer

455

* @param time Timer timestamp

456

*/

457

void registerProcessingTimeTimer(long time);

458

459

/**

460

* Register event time timer

461

* @param time Timer timestamp

462

*/

463

void registerEventTimeTimer(long time);

464

465

/**

466

* Delete processing time timer

467

* @param time Timer timestamp

468

*/

469

void deleteProcessingTimeTimer(long time);

470

471

/**

472

* Delete event time timer

473

* @param time Timer timestamp

474

*/

475

void deleteEventTimeTimer(long time);

476

477

/**

478

* Get partitioned state

479

* @param stateDescriptor State descriptor

480

* @param <S> State type

481

* @return Partitioned state

482

*/

483

<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

484

}

485

486

/**

487

* Evictor context interface

488

*/

489

interface EvictorContext {

490

/**

491

* Get current processing time

492

* @return Current processing time

493

*/

494

long getCurrentProcessingTime();

495

496

/**

497

* Get current watermark

498

* @return Current watermark

499

*/

500

long getCurrentWatermark();

501

}

502

503

/**

504

* Timestamped value wrapper

505

* @param <T> Value type

506

*/

507

class TimestampedValue<T> {

508

/**

509

* Create timestamped value

510

* @param value Value

511

* @param timestamp Timestamp

512

*/

513

public TimestampedValue(T value, long timestamp);

514

515

/**

516

* Get value

517

* @return Value

518

*/

519

public T getValue();

520

521

/**

522

* Get timestamp

523

* @return Timestamp

524

*/

525

public long getTimestamp();

526

527

/**

528

* Check if has timestamp

529

* @return true if has timestamp

530

*/

531

public boolean hasTimestamp();

532

}

533

534

/**

535

* Delta function interface

536

* @param <DATA> Data type

537

*/

538

interface DeltaFunction<DATA> {

539

/**

540

* Calculate delta between two data points

541

* @param oldDataPoint Old data point

542

* @param newDataPoint New data point

543

* @return Delta value

544

*/

545

double getDelta(DATA oldDataPoint, DATA newDataPoint);

546

}

547

548

/**

549

* Session window time gap extractor

550

* @param <T> Element type

551

*/

552

interface SessionWindowTimeGapExtractor<T> {

553

/**

554

* Extract session timeout for element

555

* @param element Element

556

* @return Session timeout in milliseconds

557

*/

558

long extract(T element);

559

}

560

```

561

562

### Base Window Class

563

564

Base window class for all window types.

565

566

```java { .api }

567

/**

568

* Base window class

569

*/

570

abstract class Window {

571

/**

572

* Get maximum timestamp that belongs to this window

573

* @return Maximum timestamp

574

*/

575

public abstract long maxTimestamp();

576

}

577

```