or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

window-ops.mddocs/

0

# Window Operations

1

2

Window operations in Spark Streaming allow you to apply transformations over a sliding window of data. These operations are essential for time-based aggregations, trend analysis, and processing data across multiple batches.

3

4

## Capabilities

5

6

### Basic Window Operations

7

8

Core windowing functionality for creating time-based data windows.

9

10

```scala { .api }

11

/**

12

* Create windowed DStream with default slide duration

13

* @param windowDuration - Width of the window (must be multiple of batch duration)

14

* @returns DStream containing data from the specified window

15

*/

16

def window(windowDuration: Duration): DStream[T]

17

18

/**

19

* Create windowed DStream with custom slide duration

20

* @param windowDuration - Width of the window (must be multiple of batch duration)

21

* @param slideDuration - Sliding interval (must be multiple of batch duration)

22

* @returns DStream containing data from the specified sliding window

23

*/

24

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

25

```

26

27

**Usage Examples:**

28

29

```scala

30

val lines = ssc.socketTextStream("localhost", 9999)

31

32

// 30-second window, sliding every batch (1 second)

33

val windowedLines = lines.window(Seconds(30))

34

35

// 30-second window, sliding every 10 seconds

36

val slidingWindow = lines.window(Seconds(30), Seconds(10))

37

```

38

39

### Counting Operations

40

41

Window operations that count elements over time windows.

42

43

```scala { .api }

44

/**

45

* Count elements over a sliding window

46

* @param windowDuration - Width of the window

47

* @param slideDuration - Sliding interval of the window

48

* @returns DStream of Long values representing element counts

49

*/

50

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

51

52

/**

53

* Count occurrences of each unique element over a sliding window

54

* @param windowDuration - Width of the window

55

* @param slideDuration - Sliding interval of the window

56

* @param numPartitions - Number of partitions for result (optional)

57

* @returns DStream of (element, count) pairs over the window

58

*/

59

def countByValueAndWindow(

60

windowDuration: Duration,

61

slideDuration: Duration,

62

numPartitions: Int = ssc.sc.defaultParallelism

63

): DStream[(T, Long)]

64

65

/**

66

* Count occurrences with timeout for inactive elements

67

* @param windowDuration - Width of the window

68

* @param slideDuration - Sliding interval of the window

69

* @param numPartitions - Number of partitions for result

70

* @param timeout - Timeout duration for inactive elements

71

* @returns DStream of (element, count) pairs with timeout handling

72

*/

73

def countByValueAndWindow(

74

windowDuration: Duration,

75

slideDuration: Duration,

76

numPartitions: Int,

77

timeout: Duration

78

): DStream[(T, Long)]

79

```

80

81

### Reduction Operations

82

83

Window operations that reduce/aggregate data over time windows.

84

85

```scala { .api }

86

/**

87

* Reduce elements over a sliding window using associative function

88

* @param reduceFunc - Associative and commutative reduce function

89

* @param windowDuration - Width of the window

90

* @param slideDuration - Sliding interval of the window

91

* @returns DStream with reduced results over windows

92

*/

93

def reduceByWindow(

94

reduceFunc: (T, T) => T,

95

windowDuration: Duration,

96

slideDuration: Duration

97

): DStream[T]

98

99

/**

100

* Efficient reduce over window with inverse function

101

* @param reduceFunc - Associative reduce function

102

* @param invReduceFunc - Inverse of the reduce function for removing old values

103

* @param windowDuration - Width of the window

104

* @param slideDuration - Sliding interval of the window

105

* @returns DStream with reduced results using incremental computation

106

*/

107

def reduceByWindow(

108

reduceFunc: (T, T) => T,

109

invReduceFunc: (T, T) => T,

110

windowDuration: Duration,

111

slideDuration: Duration

112

): DStream[T]

113

```

114

115

**Usage Examples:**

116

117

```scala

118

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

119

120

// Count numbers in 1-minute windows every 10 seconds

121

val windowCounts = numbers.countByWindow(Minutes(1), Seconds(10))

122

123

// Sum numbers over windows

124

val windowSums = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))

125

126

// Efficient windowed sum with inverse function

127

val efficientSums = numbers.reduceByWindow(

128

_ + _, // Add new values

129

_ - _, // Subtract old values

130

Minutes(1), // 1-minute window

131

Seconds(10) // Slide every 10 seconds

132

)

133

```

134

135

### Key-Value Window Operations

136

137

Specialized window operations for key-value pair DStreams.

138

139

```scala { .api }

140

/**

141

* Group values by key over a sliding window

142

* @param windowDuration - Width of the window

143

* @param slideDuration - Sliding interval of the window (optional)

144

* @returns DStream of (key, iterable of values) over windows

145

*/

146

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

147

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

148

149

/**

150

* Group by key with custom partitioning

151

* @param windowDuration - Width of the window

152

* @param slideDuration - Sliding interval of the window

153

* @param numPartitions - Number of partitions for result

154

* @returns DStream of (key, iterable of values) over windows

155

*/

156

def groupByKeyAndWindow(

157

windowDuration: Duration,

158

slideDuration: Duration,

159

numPartitions: Int

160

): DStream[(K, Iterable[V])]

161

162

/**

163

* Reduce values by key over sliding window

164

* @param reduceFunc - Associative function to combine values

165

* @param windowDuration - Width of the window

166

* @param slideDuration - Sliding interval of the window (optional)

167

* @returns DStream of (key, reduced value) over windows

168

*/

169

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]

170

def reduceByKeyAndWindow(

171

reduceFunc: (V, V) => V,

172

windowDuration: Duration,

173

slideDuration: Duration

174

): DStream[(K, V)]

175

176

/**

177

* Efficient reduce by key with inverse function for incremental computation

178

* @param reduceFunc - Associative function to combine values

179

* @param invReduceFunc - Inverse function to remove old values

180

* @param windowDuration - Width of the window

181

* @param slideDuration - Sliding interval of the window

182

* @param numPartitions - Number of partitions for result (optional)

183

* @param filterFunc - Function to filter results (optional)

184

* @returns DStream of (key, reduced value) over windows

185

*/

186

def reduceByKeyAndWindow(

187

reduceFunc: (V, V) => V,

188

invReduceFunc: (V, V) => V,

189

windowDuration: Duration,

190

slideDuration: Duration,

191

numPartitions: Int = ssc.sc.defaultParallelism,

192

filterFunc: ((K, V)) => Boolean = null

193

): DStream[(K, V)]

194

```

195

196

**Usage Examples:**

197

198

```scala

199

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

200

201

// Word count over 30-second windows

202

val windowedWordCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

203

204

// Efficient windowed word count with inverse

205

val efficientWordCounts = wordPairs.reduceByKeyAndWindow(

206

_ + _, // Add new counts

207

_ - _, // Subtract old counts

208

Seconds(30), // 30-second window

209

Seconds(10), // Slide every 10 seconds

210

2, // Use 2 partitions

211

_._2 > 0 // Filter out zero counts

212

)

213

214

// Group words by length over windows

215

val wordsByLength = lines

216

.flatMap(_.split(" "))

217

.map(word => (word.length, word))

218

.groupByKeyAndWindow(Seconds(60), Seconds(20))

219

```

220

221

## Window Operation Characteristics

222

223

### Memory Requirements

224

225

Window operations require more memory as they maintain data across multiple batches:

226

227

- **Window Size Impact**: Larger windows require more memory to store historical data

228

- **Slide Duration Impact**: Smaller slide durations create more overlapping windows

229

- **Checkpointing**: Window operations require checkpointing for fault tolerance

230

231

### Performance Considerations

232

233

```scala { .api }

234

// Performance characteristics of different window operations

235

236

// Memory efficient - only stores aggregated results

237

val efficientCount = numbers.reduceByWindow(_ + _, _ - _, Minutes(5), Seconds(30))

238

239

// Memory intensive - stores all windowed data

240

val memoryIntensive = numbers.window(Minutes(5), Seconds(30)).reduce(_ + _)

241

242

// Optimal partitioning for windowed operations

243

val optimizedWindow = wordPairs.reduceByKeyAndWindow(

244

_ + _, _ - _,

245

windowDuration = Minutes(5),

246

slideDuration = Seconds(30),

247

numPartitions = ssc.sparkContext.defaultParallelism * 2 // Increase parallelism

248

)

249

```

250

251

### Checkpointing Requirements

252

253

Window operations must be checkpointed for fault tolerance:

254

255

```scala

256

// Window operations require checkpointing

257

ssc.checkpoint("hdfs://checkpoint-dir")

258

259

val windowedData = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))

260

// This will fail without checkpoint directory set

261

```

262

263

### Window Duration Constraints

264

265

Window and slide durations must be multiples of the batch duration:

266

267

```scala

268

val ssc = new StreamingContext(conf, Seconds(5)) // 5-second batches

269

270

// Valid - multiples of batch duration

271

val validWindow = stream.window(Seconds(30), Seconds(10)) // 6x and 2x batch duration

272

273

// Invalid - not multiples of batch duration

274

// val invalidWindow = stream.window(Seconds(7), Seconds(3)) // Would throw exception

275

```

276

277

## Advanced Window Patterns

278

279

### Custom Window Aggregations

280

281

Complex aggregations using transform with window operations:

282

283

```scala

284

val customAggregation = stream.window(Minutes(5), Minutes(1)).transform { rdd =>

285

val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

286

import sqlContext.implicits._

287

288

rdd.toDF("value")

289

.groupBy("value")

290

.agg(

291

count("*").as("count"),

292

avg("value").as("average"),

293

stddev("value").as("stddev")

294

)

295

.rdd

296

.map(row => (row.getString(0), (row.getLong(1), row.getDouble(2), row.getDouble(3))))

297

}

298

```

299

300

### Multi-Level Windows

301

302

Combining different window sizes for hierarchical analysis:

303

304

```scala

305

val stream = ssc.socketTextStream("localhost", 9999).map(_.toInt)

306

307

// Short-term trends (1 minute)

308

val shortTerm = stream.reduceByWindow(_ + _, Seconds(60), Seconds(10))

309

310

// Long-term trends (10 minutes)

311

val longTerm = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))

312

313

// Combine for trend analysis

314

val trendAnalysis = shortTerm.transformWith(longTerm, (short: RDD[Int], long: RDD[Int]) => {

315

val shortAvg = if (short.isEmpty()) 0.0 else short.collect().head.toDouble

316

val longAvg = if (long.isEmpty()) 0.0 else long.collect().head.toDouble / 10.0

317

318

ssc.sparkContext.parallelize(Seq((shortAvg, longAvg, shortAvg - longAvg)))

319

})

320

```

321

322

### Sliding Window Joins

323

324

Joining data from different time windows:

325

326

```scala

327

val stream1 = ssc.socketTextStream("host1", 9999).map(line => (line.split(",")(0), line))

328

val stream2 = ssc.socketTextStream("host2", 9999).map(line => (line.split(",")(0), line))

329

330

// Join data within 30-second windows

331

val windowedJoin = stream1

332

.window(Seconds(30), Seconds(10))

333

.join(stream2.window(Seconds(30), Seconds(10)))

334

335

// Join with different window sizes

336

val asymmetricJoin = stream1

337

.window(Seconds(60)) // 1-minute window for stream1

338

.join(stream2.window(Seconds(30))) // 30-second window for stream2

339

```

340

341

## Types

342

343

```scala { .api }

344

// Window operation related types

345

case class Duration(milliseconds: Long) {

346

def +(other: Duration): Duration

347

def *(factor: Int): Duration

348

def /(divisor: Int): Duration

349

}

350

351

case class Time(milliseconds: Long) {

352

def floor(duration: Duration): Time

353

def until(endTime: Time, interval: Duration): Seq[Time]

354

}

355

356

// Helper objects for common durations

357

object Seconds {

358

def apply(seconds: Long): Duration = Duration(seconds * 1000)

359

}

360

361

object Minutes {

362

def apply(minutes: Long): Duration = Duration(minutes * 60 * 1000)

363

}

364

365

object Hours {

366

def apply(hours: Long): Duration = Duration(hours * 60 * 60 * 1000)

367

}

368

```