or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

windowing.mddocs/

0

# Windowing Examples

1

2

Advanced windowing patterns including time windows, session windows, and custom triggers with event-time processing. Demonstrates various window types, eviction policies, and complex triggering mechanisms.

3

4

## Capabilities

5

6

### TopSpeedWindowing

7

8

Car speed monitoring with custom triggers and evictors, demonstrating global windows with delta triggers.

9

10

```java { .api }

11

/**

12

* Grouped stream windowing with custom eviction and trigger policies

13

* Monitors car speeds and triggers top speed calculation every x meters

14

* @param args Command line arguments (--input path, --output path)

15

*/

16

public class TopSpeedWindowing {

17

public static void main(String[] args) throws Exception;

18

}

19

```

20

21

**Usage Example:**

22

23

```java

24

// Run with sample car data generator

25

java -cp flink-examples-streaming_2.10-1.3.3.jar \

26

org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

27

28

// Run with file input

29

java -cp flink-examples-streaming_2.10-1.3.3.jar \

30

org.apache.flink.streaming.examples.windowing.TopSpeedWindowing \

31

--input /path/to/car-data.txt --output /path/to/results.txt

32

```

33

34

### SessionWindowing

35

36

Session-based windowing for user activity analysis with configurable session gaps.

37

38

```java { .api }

39

/**

40

* Session windowing example for analyzing user activity sessions

41

* Groups events into sessions based on activity gaps

42

* @param args Command line arguments (--input path, --output path)

43

*/

44

public class SessionWindowing {

45

public static void main(String[] args) throws Exception;

46

}

47

```

48

49

### WindowWordCount

50

51

Basic windowed word count with tumbling time windows.

52

53

```java { .api }

54

/**

55

* Word count with tumbling time windows

56

* Demonstrates basic time-based windowing concepts

57

* @param args Command line arguments (--input path, --output path)

58

*/

59

public class WindowWordCount {

60

public static void main(String[] args) throws Exception;

61

}

62

```

63

64

### GroupedProcessingTimeWindowExample

65

66

High-throughput processing time windows with sliding window patterns and parallel data generation.

67

68

```java { .api }

69

/**

70

* Processing time windows with grouped keys and sliding windows

71

* Performance benchmark with 20M elements across 10K keys

72

* @param args Command line arguments

73

*/

74

public class GroupedProcessingTimeWindowExample {

75

public static void main(String[] args) throws Exception;

76

}

77

```

78

79

**Usage Example:**

80

81

```bash

82

# Run high-throughput windowing benchmark

83

java -cp flink-examples-streaming_2.10-1.3.3.jar \

84

org.apache.flink.streaming.examples.windowing.GroupedProcessingTimeWindowExample

85

```

86

87

### High-Throughput Data Source

88

89

Custom parallel source function for performance testing.

90

91

```java { .api }

92

/**

93

* High-throughput parallel source generating tuple data

94

* Generates 20,000,000 elements across multiple parallel instances

95

*/

96

public class RichParallelSourceFunction<Tuple2<Long, Long>> {

97

private volatile boolean running = true;

98

99

/**

100

* Main data generation loop

101

* @param ctx Source context for element emission

102

*/

103

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

104

105

/**

106

* Cancel source execution

107

*/

108

public void cancel();

109

}

110

```

111

112

### Key Selector Function

113

114

Extracts keys from tuple types for grouping operations.

115

116

```java { .api }

117

/**

118

* Generic key extractor for tuple types

119

* @param <Type> Tuple type extending Tuple

120

* @param <Key> Key type for grouping

121

*/

122

public static class FirstFieldKeyExtractor<Type extends Tuple, Key>

123

implements KeySelector<Type, Key> {

124

125

/**

126

* Extract key from tuple first field

127

* @param value Input tuple

128

* @return Key for grouping (first field of tuple)

129

*/

130

public Key getKey(Type value);

131

}

132

```

133

134

### Window Functions

135

136

#### SummingReducer

137

138

Pre-aggregating reduce function for efficient windowing.

139

140

```java { .api }

141

/**

142

* Efficient reduce function for summing tuple values

143

* Pre-aggregates values within window before output

144

*/

145

public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {

146

/**

147

* Combine two tuples by summing their second field

148

* @param value1 First tuple

149

* @param value2 Second tuple

150

* @return Combined tuple with summed second field

151

*/

152

public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2);

153

}

154

```

155

156

#### SummingWindowFunction

157

158

Non-pre-aggregating window function for custom aggregation logic.

159

160

```java { .api }

161

/**

162

* Window function that processes all elements at window trigger time

163

* Demonstrates non-pre-aggregating pattern vs reduce function

164

*/

165

public static class SummingWindowFunction

166

implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {

167

168

/**

169

* Process all window elements when window triggers

170

* @param key Window key

171

* @param window Window metadata

172

* @param values All elements in window

173

* @param out Output collector

174

*/

175

public void apply(Long key, Window window,

176

Iterable<Tuple2<Long, Long>> values,

177

Collector<Tuple2<Long, Long>> out);

178

}

179

```

180

181

### TopSpeedWindowing (Scala)

182

183

Scala implementation of car speed windowing using functional API and case classes.

184

185

```scala { .api }

186

/**

187

* Scala version of car top speed windowing

188

* @param args Command line arguments

189

*/

190

object TopSpeedWindowing {

191

def main(args: Array[String]): Unit;

192

}

193

```

194

195

## Key Window Patterns

196

197

### Global Windows with Custom Triggers

198

199

```java

200

// Global windows with time evictor and delta trigger

201

DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData

202

.assignTimestampsAndWatermarks(new CarTimestamp())

203

.keyBy(0) // Group by car ID

204

.window(GlobalWindows.create())

205

.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))

206

.trigger(DeltaTrigger.of(triggerMeters,

207

new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {

208

@Override

209

public double getDelta(

210

Tuple4<Integer, Integer, Double, Long> oldDataPoint,

211

Tuple4<Integer, Integer, Double, Long> newDataPoint) {

212

return newDataPoint.f2 - oldDataPoint.f2; // Distance delta

213

}

214

}, carData.getType().createSerializer(env.getConfig())))

215

.maxBy(1); // Get max speed

216

```

217

218

### Session Windows

219

220

```java

221

// Session windows with configurable gap

222

dataStream

223

.keyBy(keySelector)

224

.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))

225

.apply(sessionWindowFunction);

226

```

227

228

### Sliding Time Windows

229

230

```java

231

// Sliding windows with size and slide interval (processing time)

232

dataStream

233

.keyBy(keySelector)

234

.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) // 2.5s window, 0.5s slide

235

.reduce(new SummingReducer());

236

237

// Alternative with apply function (non-pre-aggregating)

238

dataStream

239

.keyBy(keySelector)

240

.timeWindow(Time.milliseconds(2500), Time.milliseconds(500))

241

.apply(new SummingWindowFunction());

242

```

243

244

### Tumbling Time Windows

245

246

```java

247

// Fixed-size tumbling windows

248

dataStream

249

.keyBy(keySelector)

250

.timeWindow(Time.seconds(30))

251

.reduce(aggregationFunction);

252

```

253

254

### Event Time Processing

255

256

```java

257

// Enable event time processing

258

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

259

260

// Assign timestamps and watermarks

261

dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<T>() {

262

@Override

263

public long extractAscendingTimestamp(T element) {

264

return element.getTimestamp();

265

}

266

});

267

```

268

269

## Window Configuration Options

270

271

### Time Characteristics

272

- **Processing Time**: System time when elements are processed

273

- **Event Time**: Time when events actually occurred (embedded in data)

274

- **Ingestion Time**: Time when events enter Flink system

275

276

### Window Types

277

- **Tumbling Windows**: Fixed-size, non-overlapping windows

278

- **Sliding Windows**: Fixed-size, overlapping windows

279

- **Session Windows**: Variable-size windows based on activity gaps

280

- **Global Windows**: All elements in single window, custom triggers required

281

282

### Triggers

283

- **Time Triggers**: Fire based on processing/event time

284

- **Count Triggers**: Fire after specific number of elements

285

- **Delta Triggers**: Fire based on value changes (like distance traveled)

286

- **Custom Triggers**: User-defined triggering logic

287

288

### Evictors

289

- **Time Evictor**: Remove elements older than specified time

290

- **Count Evictor**: Keep only most recent N elements

291

- **Delta Evictor**: Remove elements based on value differences

292

293

## Car Data Processing Example

294

295

### Data Format

296

```java

297

// Car telemetry tuple: (carId, speed, distance, timestamp)

298

Tuple4<Integer, Integer, Double, Long> carData;

299

// f0: Car ID

300

// f1: Current speed (km/h)

301

// f2: Total distance traveled (meters)

302

// f3: Timestamp (milliseconds)

303

```

304

305

### Sample Data Generation

306

```java

307

private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {

308

@Override

309

public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {

310

while (isRunning) {

311

Thread.sleep(100); // 100ms intervals

312

for (int carId = 0; carId < speeds.length; carId++) {

313

// Randomly adjust speed

314

if (rand.nextBoolean()) {

315

speeds[carId] = Math.min(100, speeds[carId] + 5);

316

} else {

317

speeds[carId] = Math.max(0, speeds[carId] - 5);

318

}

319

// Update distance traveled

320

distances[carId] += speeds[carId] / 3.6d; // Convert km/h to m/s

321

322

Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(

323

carId, speeds[carId], distances[carId], System.currentTimeMillis());

324

ctx.collect(record);

325

}

326

}

327

}

328

}

329

```

330

331

## Session Window Configuration

332

333

### Session Gap Configuration

334

```java

335

// Configure session gap based on user activity

336

ProcessingTimeSessionWindows.withGap(Time.minutes(30)) // 30-minute inactivity gap

337

EventTimeSessionWindows.withGap(Time.seconds(60)) // 1-minute gap for event time

338

```

339

340

### Dynamic Session Gaps

341

```java

342

// Variable session gaps based on data

343

EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<T>() {

344

@Override

345

public long extract(T element) {

346

// Return gap in milliseconds based on element properties

347

return element.getUserType().equals("PREMIUM") ? 600000L : 300000L;

348

}

349

});

350

```

351

352

## Dependencies

353

354

```xml

355

<dependency>

356

<groupId>org.apache.flink</groupId>

357

<artifactId>flink-streaming-java_2.10</artifactId>

358

<version>1.3.3</version>

359

</dependency>

360

```

361

362

## Required Imports

363

364

```java

365

import org.apache.flink.api.common.functions.RichMapFunction;

366

import org.apache.flink.api.java.tuple.Tuple4;

367

import org.apache.flink.streaming.api.TimeCharacteristic;

368

import org.apache.flink.streaming.api.datastream.DataStream;

369

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

370

import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;

371

import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;

372

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;

373

import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;

374

import org.apache.flink.streaming.api.windowing.time.Time;

375

import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;

376

```