or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-interfaces.mdindex.mdsavepoint-management.mdstate-reading.mdstate-writing.mdwindow-operations.md

window-operations.mddocs/

0

# Window Operations

1

2

Window operations provide specialized functionality for reading and writing window state data. The State Processor API supports both regular windows and evicting windows with different aggregation strategies.

3

4

## Window Reader Overview

5

6

The `WindowReader` class provides entry points for reading window state from savepoints.

7

8

```java { .api }

9

public class WindowReader<W extends Window> {

10

public EvictingWindowReader<W> evictor();

11

12

public <T, K> DataSource<T> reduce(

13

String uid,

14

ReduceFunction<T> function,

15

TypeInformation<K> keyType,

16

TypeInformation<T> reduceType

17

) throws IOException;

18

19

public <K, T, OUT> DataSource<OUT> reduce(

20

String uid,

21

ReduceFunction<T> function,

22

WindowReaderFunction<T, OUT, K, W> readerFunction,

23

TypeInformation<K> keyType,

24

TypeInformation<T> reduceType,

25

TypeInformation<OUT> outputType

26

) throws IOException;

27

28

public <K, T, ACC, R> DataSource<R> aggregate(

29

String uid,

30

AggregateFunction<T, ACC, R> aggregateFunction,

31

TypeInformation<K> keyType,

32

TypeInformation<ACC> accType,

33

TypeInformation<R> outputType

34

) throws IOException;

35

36

public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(

37

String uid,

38

AggregateFunction<T, ACC, R> aggregateFunction,

39

WindowReaderFunction<R, OUT, K, W> readerFunction,

40

TypeInformation<K> keyType,

41

TypeInformation<ACC> accType,

42

TypeInformation<OUT> outputType

43

) throws IOException;

44

45

public <K, T, OUT> DataSource<OUT> process(

46

String uid,

47

WindowReaderFunction<T, OUT, K, W> readerFunction,

48

TypeInformation<K> keyType,

49

TypeInformation<T> stateType,

50

TypeInformation<OUT> outputType

51

) throws IOException;

52

}

53

```

54

55

## Creating Window Readers

56

57

### Using Window Assigners

58

59

```java

60

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

61

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;

62

import org.apache.flink.streaming.api.windowing.assigners.SessionWindows;

63

64

// Create window reader with tumbling windows

65

WindowReader<TimeWindow> tumblingReader = savepoint.window(

66

TumblingEventTimeWindows.of(Duration.ofMinutes(5))

67

);

68

69

// Create window reader with sliding windows

70

WindowReader<TimeWindow> slidingReader = savepoint.window(

71

SlidingEventTimeWindows.of(Duration.ofMinutes(10), Duration.ofMinutes(2))

72

);

73

74

// Create window reader with session windows

75

WindowReader<TimeWindow> sessionReader = savepoint.window(

76

SessionWindows.withGap(Duration.ofMinutes(30))

77

);

78

```

79

80

### Using Window Serializers

81

82

```java

83

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

84

85

// Create window reader with explicit serializer

86

TypeSerializer<TimeWindow> windowSerializer = new TimeWindow.Serializer();

87

WindowReader<TimeWindow> reader = savepoint.window(windowSerializer);

88

```

89

90

## Reading Reduce Windows

91

92

### Simple Reduce Reading

93

94

```java

95

// Read window state created with ReduceFunction

96

DataSource<Integer> windowSums = reader.reduce(

97

"sum-window-operator",

98

new SumReduceFunction(), // The original reduce function

99

Types.STRING, // Key type

100

Types.INT // Value type

101

);

102

103

windowSums.print();

104

```

105

106

### Reduce with Window Reader Function

107

108

```java

109

public class WindowSummaryReader implements WindowReaderFunction<Integer, WindowSummary, String, TimeWindow> {

110

111

@Override

112

public void readWindow(

113

String key,

114

Context context,

115

Iterable<Integer> elements,

116

Collector<WindowSummary> out

117

) throws Exception {

118

119

TimeWindow window = context.window();

120

121

// Process reduced result (should be single element for reduce)

122

Integer sum = elements.iterator().next();

123

124

WindowSummary summary = new WindowSummary(

125

key,

126

window.getStart(),

127

window.getEnd(),

128

sum,

129

1 // Reduce produces single value

130

);

131

132

out.collect(summary);

133

}

134

}

135

136

// Use with reduce reader

137

DataSource<WindowSummary> summaries = reader.reduce(

138

"sum-window-operator",

139

new SumReduceFunction(),

140

new WindowSummaryReader(),

141

Types.STRING, // Key type

142

Types.INT, // Reduce type

143

TypeInformation.of(WindowSummary.class) // Output type

144

);

145

```

146

147

## Reading Aggregate Windows

148

149

### Simple Aggregate Reading

150

151

```java

152

public class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {

153

154

@Override

155

public AverageAccumulator createAccumulator() {

156

return new AverageAccumulator();

157

}

158

159

@Override

160

public AverageAccumulator add(Double value, AverageAccumulator accumulator) {

161

accumulator.sum += value;

162

accumulator.count++;

163

return accumulator;

164

}

165

166

@Override

167

public Double getResult(AverageAccumulator accumulator) {

168

return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;

169

}

170

171

@Override

172

public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {

173

a.sum += b.sum;

174

a.count += b.count;

175

return a;

176

}

177

}

178

179

// Read aggregated window results

180

DataSource<Double> windowAverages = reader.aggregate(

181

"average-window-operator",

182

new AverageAggregateFunction(),

183

Types.STRING, // Key type

184

TypeInformation.of(AverageAccumulator.class), // Accumulator type

185

Types.DOUBLE // Result type

186

);

187

```

188

189

### Aggregate with Window Reader Function

190

191

```java

192

public class AggregateWindowReader implements WindowReaderFunction<Double, AggregateResult, String, TimeWindow> {

193

194

@Override

195

public void readWindow(

196

String key,

197

Context context,

198

Iterable<Double> elements,

199

Collector<AggregateResult> out

200

) throws Exception {

201

202

TimeWindow window = context.window();

203

Double average = elements.iterator().next(); // Single aggregated result

204

205

AggregateResult result = new AggregateResult(

206

key,

207

window.getStart(),

208

window.getEnd(),

209

average,

210

"AVERAGE"

211

);

212

213

out.collect(result);

214

}

215

}

216

217

// Use with aggregate reader

218

DataSource<AggregateResult> results = reader.aggregate(

219

"average-window-operator",

220

new AverageAggregateFunction(),

221

new AggregateWindowReader(),

222

Types.STRING, // Key type

223

TypeInformation.of(AverageAccumulator.class), // Accumulator type

224

TypeInformation.of(AggregateResult.class) // Output type

225

);

226

```

227

228

## Reading Process Windows

229

230

Process windows handle raw window contents without pre-aggregation.

231

232

```java

233

public class ProcessWindowReader implements WindowReaderFunction<SensorReading, WindowAnalysis, String, TimeWindow> {

234

235

@Override

236

public void readWindow(

237

String sensorId,

238

Context context,

239

Iterable<SensorReading> readings,

240

Collector<WindowAnalysis> out

241

) throws Exception {

242

243

TimeWindow window = context.window();

244

List<SensorReading> readingList = new ArrayList<>();

245

readings.forEach(readingList::add);

246

247

if (!readingList.isEmpty()) {

248

// Analyze raw window contents

249

double min = readingList.stream().mapToDouble(SensorReading::getValue).min().orElse(0.0);

250

double max = readingList.stream().mapToDouble(SensorReading::getValue).max().orElse(0.0);

251

double avg = readingList.stream().mapToDouble(SensorReading::getValue).average().orElse(0.0);

252

253

WindowAnalysis analysis = new WindowAnalysis(

254

sensorId,

255

window.getStart(),

256

window.getEnd(),

257

readingList.size(),

258

min,

259

max,

260

avg,

261

calculateTrend(readingList)

262

);

263

264

out.collect(analysis);

265

}

266

}

267

268

private String calculateTrend(List<SensorReading> readings) {

269

if (readings.size() < 2) return "STABLE";

270

271

double first = readings.get(0).getValue();

272

double last = readings.get(readings.size() - 1).getValue();

273

274

if (last > first * 1.1) return "INCREASING";

275

if (last < first * 0.9) return "DECREASING";

276

return "STABLE";

277

}

278

}

279

280

// Read process window state

281

DataSource<WindowAnalysis> analyses = reader.process(

282

"sensor-window-operator",

283

new ProcessWindowReader(),

284

Types.STRING, // Key type (sensor ID)

285

TypeInformation.of(SensorReading.class), // Element type

286

TypeInformation.of(WindowAnalysis.class) // Output type

287

);

288

```

289

290

## Evicting Window Operations

291

292

Evicting windows use evictors to remove elements from windows before or after window processing.

293

294

```java { .api }

295

public class EvictingWindowReader<W extends Window> {

296

public <T, K> DataSource<T> reduce(

297

String uid,

298

ReduceFunction<T> function,

299

TypeInformation<K> keyType,

300

TypeInformation<T> reduceType

301

) throws IOException;

302

303

public <K, T, OUT> DataSource<OUT> reduce(

304

String uid,

305

ReduceFunction<T> function,

306

WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,

307

TypeInformation<K> keyType,

308

TypeInformation<T> reduceType,

309

TypeInformation<OUT> outputType

310

) throws IOException;

311

312

public <K, T, ACC, R> DataSource<R> aggregate(

313

String uid,

314

AggregateFunction<T, ACC, R> aggregateFunction,

315

TypeInformation<K> keyType,

316

TypeInformation<ACC> accType,

317

TypeInformation<R> outputType

318

) throws IOException;

319

320

public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(

321

String uid,

322

AggregateFunction<T, ACC, R> aggregateFunction,

323

WindowReaderFunction<Iterable<R>, OUT, K, W> readerFunction,

324

TypeInformation<K> keyType,

325

TypeInformation<ACC> accType,

326

TypeInformation<OUT> outputType

327

) throws IOException;

328

329

public <K, T, OUT> DataSource<OUT> process(

330

String uid,

331

WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,

332

TypeInformation<K> keyType,

333

TypeInformation<T> stateType,

334

TypeInformation<OUT> outputType

335

) throws IOException;

336

}

337

```

338

339

### Reading Evicting Windows

340

341

```java

342

// Get evicting window reader

343

EvictingWindowReader<TimeWindow> evictingReader = reader.evictor();

344

345

// Read evicting window with process function

346

public class EvictingProcessReader implements WindowReaderFunction<Iterable<Event>, EventSummary, String, TimeWindow> {

347

348

@Override

349

public void readWindow(

350

String key,

351

Context context,

352

Iterable<Iterable<Event>> elements, // Note: Iterable of Iterable for evicting windows

353

Collector<EventSummary> out

354

) throws Exception {

355

356

TimeWindow window = context.window();

357

List<Event> allEvents = new ArrayList<>();

358

359

// Flatten the nested iterables

360

for (Iterable<Event> eventGroup : elements) {

361

eventGroup.forEach(allEvents::add);

362

}

363

364

if (!allEvents.isEmpty()) {

365

EventSummary summary = new EventSummary(

366

key,

367

window.getStart(),

368

window.getEnd(),

369

allEvents.size(),

370

allEvents

371

);

372

373

out.collect(summary);

374

}

375

}

376

}

377

378

DataSource<EventSummary> evictingSummaries = evictingReader.process(

379

"evicting-window-operator",

380

new EvictingProcessReader(),

381

Types.STRING, // Key type

382

TypeInformation.of(Event.class), // Element type

383

TypeInformation.of(EventSummary.class) // Output type

384

);

385

```

386

387

## Window Bootstrap Operations

388

389

While the main focus is on reading windows, you can also bootstrap window state.

390

391

### Creating Window Bootstrap Transformation

392

393

```java

394

public class WindowBootstrapFunction extends KeyedStateBootstrapFunction<String, TimestampedEvent> {

395

private WindowState<TimeWindow> windowState;

396

397

@Override

398

public void open(Configuration parameters) throws Exception {

399

super.open(parameters);

400

401

// Register window state

402

WindowStateDescriptor<TimeWindow> windowDesc = new WindowStateDescriptor<>(

403

"window", new TimeWindow.Serializer()

404

);

405

windowState = getRuntimeContext().getWindowState(windowDesc);

406

}

407

408

@Override

409

public void processElement(TimestampedEvent event, Context ctx) throws Exception {

410

// Assign to window based on timestamp

411

long timestamp = event.getTimestamp();

412

long windowStart = timestamp - (timestamp % Duration.ofMinutes(5).toMillis());

413

long windowEnd = windowStart + Duration.ofMinutes(5).toMillis();

414

415

TimeWindow window = new TimeWindow(windowStart, windowEnd);

416

417

// Add event to window

418

windowState.add(window, event);

419

420

// Set timer for window end

421

ctx.timerService().registerEventTimeTimer(windowEnd);

422

}

423

}

424

```

425

426

## Complex Window Analysis

427

428

### Multi-Window Analysis

429

430

```java

431

public class MultiWindowAnalyzer implements WindowReaderFunction<MetricEvent, MultiWindowResult, String, TimeWindow> {

432

433

@Override

434

public void readWindow(

435

String metricName,

436

Context context,

437

Iterable<MetricEvent> events,

438

Collector<MultiWindowResult> out

439

) throws Exception {

440

441

TimeWindow window = context.window();

442

List<MetricEvent> eventList = new ArrayList<>();

443

events.forEach(eventList::add);

444

445

if (eventList.isEmpty()) return;

446

447

// Calculate multiple statistics

448

DoubleSummaryStatistics stats = eventList.stream()

449

.mapToDouble(MetricEvent::getValue)

450

.summaryStatistics();

451

452

// Calculate percentiles

453

List<Double> sortedValues = eventList.stream()

454

.mapToDouble(MetricEvent::getValue)

455

.sorted()

456

.boxed()

457

.collect(Collectors.toList());

458

459

double p50 = calculatePercentile(sortedValues, 0.5);

460

double p95 = calculatePercentile(sortedValues, 0.95);

461

double p99 = calculatePercentile(sortedValues, 0.99);

462

463

// Detect anomalies

464

List<MetricEvent> anomalies = detectAnomalies(eventList, stats.getAverage(), Math.sqrt(stats.getAverage()));

465

466

MultiWindowResult result = new MultiWindowResult(

467

metricName,

468

window.getStart(),

469

window.getEnd(),

470

eventList.size(),

471

stats.getMin(),

472

stats.getMax(),

473

stats.getAverage(),

474

p50, p95, p99,

475

anomalies

476

);

477

478

out.collect(result);

479

}

480

481

private double calculatePercentile(List<Double> sortedValues, double percentile) {

482

int index = (int) Math.ceil(percentile * sortedValues.size()) - 1;

483

return sortedValues.get(Math.max(0, Math.min(index, sortedValues.size() - 1)));

484

}

485

486

private List<MetricEvent> detectAnomalies(List<MetricEvent> events, double mean, double stdDev) {

487

double threshold = 2.0 * stdDev;

488

return events.stream()

489

.filter(event -> Math.abs(event.getValue() - mean) > threshold)

490

.collect(Collectors.toList());

491

}

492

}

493

```

494

495

## Error Handling in Window Operations

496

497

### Robust Window Reading

498

499

```java

500

public class RobustWindowReader implements WindowReaderFunction<DataPoint, WindowResult, String, TimeWindow> {

501

private static final Logger LOG = LoggerFactory.getLogger(RobustWindowReader.class);

502

503

@Override

504

public void readWindow(

505

String key,

506

Context context,

507

Iterable<DataPoint> elements,

508

Collector<WindowResult> out

509

) throws Exception {

510

511

try {

512

TimeWindow window = context.window();

513

List<DataPoint> points = new ArrayList<>();

514

515

// Safely iterate over elements

516

for (DataPoint point : elements) {

517

if (point != null && isValidDataPoint(point)) {

518

points.add(point);

519

} else {

520

LOG.warn("Invalid data point in window for key: {}", key);

521

}

522

}

523

524

if (!points.isEmpty()) {

525

WindowResult result = processDataPoints(key, window, points);

526

out.collect(result);

527

} else {

528

LOG.debug("No valid data points in window for key: {}", key);

529

}

530

531

} catch (Exception e) {

532

LOG.error("Error processing window for key: {}", key, e);

533

// Could emit error result instead of failing

534

// out.collect(new WindowResult(key, context.window(), "ERROR", e.getMessage()));

535

throw e; // Re-throw to fail the job

536

}

537

}

538

539

private boolean isValidDataPoint(DataPoint point) {

540

return point.getValue() >= 0 && point.getTimestamp() > 0;

541

}

542

543

private WindowResult processDataPoints(String key, TimeWindow window, List<DataPoint> points) {

544

// Safe processing logic

545

double average = points.stream().mapToDouble(DataPoint::getValue).average().orElse(0.0);

546

return new WindowResult(key, window, "SUCCESS", average);

547

}

548

}

549

```

550

551

### Window Type Safety

552

553

```java

554

// Ensure proper type handling for different window types

555

public class TypeSafeWindowReader<W extends Window> implements WindowReaderFunction<TypedEvent, TypedResult, String, W> {

556

private final Class<W> windowClass;

557

558

public TypeSafeWindowReader(Class<W> windowClass) {

559

this.windowClass = windowClass;

560

}

561

562

@Override

563

public void readWindow(

564

String key,

565

Context context,

566

Iterable<TypedEvent> elements,

567

Collector<TypedResult> out

568

) throws Exception {

569

570

W window = context.window();

571

572

// Type-safe window handling

573

if (windowClass.isInstance(window)) {

574

TypedResult result = processTypedWindow(key, windowClass.cast(window), elements);

575

out.collect(result);

576

} else {

577

throw new IllegalArgumentException("Unexpected window type: " + window.getClass());

578

}

579

}

580

581

private TypedResult processTypedWindow(String key, W window, Iterable<TypedEvent> elements) {

582

// Type-specific processing

583

return new TypedResult(key, window.toString(), elements);

584

}

585

}

586

```

587

588

## Performance Optimization

589

590

### Efficient Window Processing

591

592

```java

593

public class OptimizedWindowReader implements WindowReaderFunction<LargeEvent, CompactResult, String, TimeWindow> {

594

595

@Override

596

public void readWindow(

597

String key,

598

Context context,

599

Iterable<LargeEvent> elements,

600

Collector<CompactResult> out

601

) throws Exception {

602

603

TimeWindow window = context.window();

604

605

// Process in streaming fashion to avoid loading all into memory

606

double sum = 0.0;

607

int count = 0;

608

double min = Double.MAX_VALUE;

609

double max = Double.MIN_VALUE;

610

611

for (LargeEvent event : elements) {

612

double value = event.getValue();

613

sum += value;

614

count++;

615

min = Math.min(min, value);

616

max = Math.max(max, value);

617

}

618

619

if (count > 0) {

620

CompactResult result = new CompactResult(

621

key,

622

window.getStart(),

623

window.getEnd(),

624

count,

625

sum / count,

626

min,

627

max

628

);

629

630

out.collect(result);

631

}

632

}

633

}