or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

data-streams.mddocs/

0

# Data Streams

1

2

DStream (Discretized Stream) is the core abstraction in Spark Streaming representing a continuous stream of data. Internally, a DStream is represented as a sequence of RDDs (Resilient Distributed Datasets), where each RDD contains data for a specific time interval.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

Core transformation operations that modify each element or the structure of the DStream.

9

10

```scala { .api }

11

/**

12

* Transform each element using a function

13

* @param mapFunc - Function to apply to each element

14

* @returns New DStream with transformed elements

15

*/

16

def map[U: ClassTag](mapFunc: T => U): DStream[U]

17

18

/**

19

* Transform each element to zero or more elements

20

* @param flatMapFunc - Function returning a collection for each element

21

* @returns New DStream with flattened results

22

*/

23

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

24

25

/**

26

* Filter elements based on a predicate

27

* @param filterFunc - Function returning true for elements to keep

28

* @returns New DStream with filtered elements

29

*/

30

def filter(filterFunc: T => Boolean): DStream[T]

31

32

/**

33

* Group elements within each RDD into arrays

34

* @returns DStream where each element becomes an array of elements

35

*/

36

def glom(): DStream[Array[T]]

37

38

/**

39

* Transform each partition using a function

40

* @param mapPartFunc - Function to transform an iterator of elements

41

* @param preservePartitioning - Whether to preserve the partitioner

42

* @returns New DStream with transformed partitions

43

*/

44

def mapPartitions[U: ClassTag](

45

mapPartFunc: Iterator[T] => Iterator[U],

46

preservePartitioning: Boolean = false

47

): DStream[U]

48

```

49

50

**Usage Examples:**

51

52

```scala

53

val lines = ssc.socketTextStream("localhost", 9999)

54

55

// Map transformation

56

val lengths = lines.map(_.length)

57

58

// FlatMap transformation

59

val words = lines.flatMap(_.split(" "))

60

61

// Filter transformation

62

val longLines = lines.filter(_.length > 10)

63

64

// Combined transformations

65

val wordCounts = lines

66

.flatMap(_.split(" "))

67

.map((_, 1))

68

.reduceByKey(_ + _)

69

```

70

71

### Advanced Transformations

72

73

More complex transformation operations including custom RDD transformations.

74

75

```scala { .api }

76

/**

77

* Transform each RDD using a custom function

78

* @param transformFunc - Function to transform each RDD

79

* @returns New DStream with transformed RDDs

80

*/

81

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

82

83

/**

84

* Transform each RDD with access to time information

85

* @param transformFunc - Function receiving RDD and time

86

* @returns New DStream with transformed RDDs

87

*/

88

def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

89

90

/**

91

* Transform two DStreams together

92

* @param other - Another DStream to combine with

93

* @param transformFunc - Function to transform both RDDs together

94

* @returns New DStream with combined transformation

95

*/

96

def transformWith[U: ClassTag, V: ClassTag](

97

other: DStream[U],

98

transformFunc: (RDD[T], RDD[U]) => RDD[V]

99

): DStream[V]

100

101

/**

102

* Transform two DStreams with time information

103

* @param other - Another DStream to combine with

104

* @param transformFunc - Function receiving both RDDs and time

105

* @returns New DStream with combined transformation

106

*/

107

def transformWith[U: ClassTag, V: ClassTag](

108

other: DStream[U],

109

transformFunc: (RDD[T], RDD[U], Time) => RDD[V]

110

): DStream[V]

111

112

/**

113

* Repartition the DStream to specified number of partitions

114

* @param numPartitions - Number of partitions for output

115

* @returns Repartitioned DStream

116

*/

117

def repartition(numPartitions: Int): DStream[T]

118

119

/**

120

* Union this DStream with another DStream of same type

121

* @param that - DStream to union with

122

* @returns Combined DStream containing data from both streams

123

*/

124

def union(that: DStream[T]): DStream[T]

125

```

126

127

### Persistence and Caching

128

129

Control how DStream data is stored and cached across operations.

130

131

```scala { .api }

132

/**

133

* Persist DStream RDDs with specified storage level

134

* @param level - Storage level (MEMORY_ONLY, MEMORY_AND_DISK, etc.)

135

* @returns This DStream for method chaining

136

*/

137

def persist(level: StorageLevel): DStream[T]

138

139

/**

140

* Cache DStream RDDs in memory

141

* @returns This DStream for method chaining

142

*/

143

def cache(): DStream[T]

144

145

/**

146

* Enable checkpointing for this DStream

147

* @param interval - Interval between checkpoints

148

* @returns This DStream for method chaining

149

*/

150

def checkpoint(interval: Duration): DStream[T]

151

```

152

153

### Aggregation Operations

154

155

Operations that aggregate data within each batch.

156

157

```scala { .api }

158

/**

159

* Reduce elements in each RDD using a function

160

* @param reduceFunc - Associative and commutative reduce function

161

* @returns DStream with one element per RDD (the reduced result)

162

*/

163

def reduce(reduceFunc: (T, T) => T): DStream[T]

164

165

/**

166

* Count elements in each RDD

167

* @returns DStream of Long values representing counts

168

*/

169

def count(): DStream[Long]

170

171

/**

172

* Count occurrences of each unique element

173

* @param numPartitions - Number of partitions for the result

174

* @returns DStream of (element, count) pairs

175

*/

176

def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]

177

```

178

179

### Window Operations

180

181

Time-based windowing operations for analyzing data across multiple batches.

182

183

```scala { .api }

184

/**

185

* Create windowed DStream

186

* @param windowDuration - Width of the window

187

* @param slideDuration - Sliding interval of the window (optional, defaults to batch interval)

188

* @returns DStream containing data from the specified window

189

*/

190

def window(windowDuration: Duration): DStream[T]

191

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

192

193

/**

194

* Count elements over a sliding window

195

* @param windowDuration - Width of the window

196

* @param slideDuration - Sliding interval of the window

197

* @returns DStream of window counts

198

*/

199

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

200

201

/**

202

* Count occurrences of each element over a sliding window

203

* @param windowDuration - Width of the window

204

* @param slideDuration - Sliding interval of the window

205

* @param numPartitions - Number of partitions for result (optional)

206

* @returns DStream of (element, count) pairs over the window

207

*/

208

def countByValueAndWindow(

209

windowDuration: Duration,

210

slideDuration: Duration,

211

numPartitions: Int = ssc.sc.defaultParallelism

212

): DStream[(T, Long)]

213

214

/**

215

* Reduce elements over a sliding window

216

* @param reduceFunc - Associative and commutative reduce function

217

* @param windowDuration - Width of the window

218

* @param slideDuration - Sliding interval of the window

219

* @returns DStream with reduced results over windows

220

*/

221

def reduceByWindow(

222

reduceFunc: (T, T) => T,

223

windowDuration: Duration,

224

slideDuration: Duration

225

): DStream[T]

226

227

/**

228

* Reduce elements over sliding window with inverse function for efficiency

229

* @param reduceFunc - Associative reduce function

230

* @param invReduceFunc - Inverse of the reduce function

231

* @param windowDuration - Width of the window

232

* @param slideDuration - Sliding interval of the window

233

* @returns DStream with reduced results over windows

234

*/

235

def reduceByWindow(

236

reduceFunc: (T, T) => T,

237

invReduceFunc: (T, T) => T,

238

windowDuration: Duration,

239

slideDuration: Duration

240

): DStream[T]

241

```

242

243

### Output Operations

244

245

Actions that send data to external systems or trigger computation.

246

247

```scala { .api }

248

/**

249

* Apply function to each RDD in the DStream

250

* @param foreachFunc - Function to apply to each RDD

251

*/

252

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

253

254

/**

255

* Apply function to each RDD with time information

256

* @param foreachFunc - Function receiving RDD and time

257

*/

258

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

259

260

/**

261

* Print first num elements of each RDD to console

262

* @param num - Number of elements to print (default 10)

263

*/

264

def print(num: Int = 10): Unit

265

266

/**

267

* Save DStream as text files

268

* @param prefix - Prefix for output file names

269

* @param suffix - Suffix for output file names (optional)

270

*/

271

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

272

273

/**

274

* Save DStream as object files (serialized)

275

* @param prefix - Prefix for output file names

276

* @param suffix - Suffix for output file names (optional)

277

*/

278

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

279

```

280

281

**Usage Examples:**

282

283

```scala

284

val lines = ssc.socketTextStream("localhost", 9999)

285

val words = lines.flatMap(_.split(" "))

286

287

// Window operations

288

val windowedWords = words.window(Seconds(30), Seconds(10))

289

val windowedCounts = words.countByWindow(Seconds(30), Seconds(10))

290

291

// Reduce over windows

292

val windowedWordCount = words

293

.map((_, 1))

294

.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

295

296

// Output operations

297

words.foreachRDD { rdd =>

298

rdd.collect().foreach(println)

299

}

300

301

windowedWordCount.print()

302

windowedWordCount.saveAsTextFiles("output/wordcount")

303

```

304

305

### Utility Operations

306

307

Helper methods for accessing DStream data and metadata.

308

309

```scala { .api }

310

/**

311

* Get RDDs for a specific time range

312

* @param fromTime - Start time (inclusive)

313

* @param toTime - End time (exclusive)

314

* @returns Sequence of RDDs in the time range

315

*/

316

def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

317

318

/**

319

* Get RDDs for a specific time interval

320

* @param interval - Time interval to retrieve

321

* @returns Sequence of RDDs in the interval

322

*/

323

def slice(interval: Interval): Seq[RDD[T]]

324

325

/**

326

* Compute the RDD for a specific time

327

* @param validTime - Time for which to compute the RDD

328

* @returns RDD containing data for the specified time

329

*/

330

def compute(validTime: Time): Option[RDD[T]]

331

```

332

333

## Key Properties

334

335

```scala { .api }

336

// DStream properties

337

def slideDuration: Duration // Sliding interval of the DStream

338

def dependencies: List[DStream[_]] // Parent DStreams this DStream depends on

339

def generatedRDDs: HashMap[Time, RDD[T]] // Cache of generated RDDs

340

def rememberDuration: Duration // How long to remember RDDs

341

def storageLevel: StorageLevel // Storage level for persistence

342

def mustCheckpoint: Boolean // Whether this DStream requires checkpointing

343

def checkpointDuration: Duration // Checkpoint interval

344

```

345

346

**Advanced Usage:**

347

348

```scala

349

// Custom transformation with RDD operations

350

val processed = lines.transform { rdd =>

351

val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

352

import sqlContext.implicits._

353

354

val df = rdd.toDF("line")

355

df.filter($"line".contains("ERROR"))

356

.select($"line")

357

.rdd

358

.map(_.getString(0))

359

}

360

361

// Complex windowed operations

362

val complexWindow = words

363

.map((_, 1))

364

.reduceByKeyAndWindow(

365

(a: Int, b: Int) => a + b, // Add new values

366

(a: Int, b: Int) => a - b, // Remove old values

367

Seconds(30), // Window duration

368

Seconds(10) // Slide duration

369

)

370

.filter(_._2 > 5) // Filter low counts

371

372

// Combining multiple streams with transformWith

373

val stream1 = ssc.socketTextStream("host1", 9999)

374

val stream2 = ssc.socketTextStream("host2", 9999)

375

376

val combined = stream1.transformWith(stream2, (rdd1: RDD[String], rdd2: RDD[String]) => {

377

rdd1.intersection(rdd2) // Find common elements

378

})

379

```