or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md

core-streaming.mddocs/

0

# Core Streaming Operations

1

2

Core streaming functionality providing the essential components for building streaming applications including StreamingContext creation, DStream transformations, and output operations.

3

4

## Capabilities

5

6

### StreamingContext

7

8

The main entry point for Spark Streaming functionality. Creates and manages streaming computations.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark Streaming functionality

13

*/

14

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {

15

/** Start the streaming computation */

16

def start(): Unit

17

18

/** Stop the streaming computation (uses spark.streaming.stopSparkContextByDefault config) */

19

def stop(): Unit

20

21

/** Stop the streaming computation with option to stop SparkContext */

22

def stop(stopSparkContext: Boolean): Unit

23

24

/** Stop with graceful shutdown and optional SparkContext stop */

25

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

26

27

/** Wait for the streaming context to terminate */

28

def awaitTermination(): Unit

29

30

/** Wait for termination or timeout */

31

def awaitTerminationOrTimeout(timeout: Long): Boolean

32

33

/** Get current state of the streaming context */

34

def getState(): StreamingContextState

35

36

/** Set checkpoint directory for fault tolerance */

37

def checkpoint(directory: String): Unit

38

39

/** Set how long DStreams should remember their RDDs */

40

def remember(duration: Duration): Unit

41

42

/** Get the underlying SparkContext */

43

def sparkContext: SparkContext

44

45

/** Add a streaming listener for event notifications */

46

def addStreamingListener(streamingListener: StreamingListener): Unit

47

48

/** Remove a streaming listener */

49

def removeStreamingListener(streamingListener: StreamingListener): Unit

50

}

51

```

52

53

**Alternative Constructors:**

54

55

```scala { .api }

56

/** Create from SparkConf */

57

def this(conf: SparkConf, batchDuration: Duration)

58

59

/** Create with master and app name */

60

def this(master: String, appName: String, batchDuration: Duration)

61

62

/** Create with master, app name, batch duration, and Spark home */

63

def this(master: String, appName: String, batchDuration: Duration, sparkHome: String)

64

65

/** Create with master, app name, batch duration, Spark home, and JAR files */

66

def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String])

67

68

/** Create with master, app name, batch duration, Spark home, JARs, and environment */

69

def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String], environment: Map[String, String])

70

71

/** Restore from checkpoint */

72

def this(path: String)

73

74

/** Restore from checkpoint with Hadoop configuration */

75

def this(path: String, hadoopConf: Configuration)

76

77

/** Restore from checkpoint with existing SparkContext */

78

def this(path: String, sparkContext: SparkContext)

79

```

80

81

**Companion Object Methods:**

82

83

```scala { .api }

84

object StreamingContext {

85

/** Get currently active StreamingContext */

86

def getActive(): Option[StreamingContext]

87

88

/** Get active context or create new one */

89

def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext

90

91

/** Create from checkpoint or use creating function */

92

def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext

93

94

/** Create from checkpoint with Hadoop configuration or use creating function */

95

def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext): StreamingContext

96

97

/** Create from checkpoint with Hadoop configuration and SparkContext or use creating function */

98

def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext, createOnError: Boolean): StreamingContext

99

}

100

```

101

102

**Usage Examples:**

103

104

```scala

105

import org.apache.spark._

106

import org.apache.spark.streaming._

107

108

// Create with SparkContext and batch duration

109

val conf = new SparkConf().setAppName("MyStreamingApp")

110

val sc = new SparkContext(conf)

111

val ssc = new StreamingContext(sc, Seconds(1))

112

113

// Create directly from SparkConf

114

val ssc2 = new StreamingContext(conf, Seconds(2))

115

116

// Create with checkpoint recovery

117

val ssc3 = StreamingContext.getOrCreate("/path/to/checkpoint", () => {

118

val conf = new SparkConf().setAppName("RecoverableApp")

119

new StreamingContext(conf, Seconds(1))

120

})

121

122

// Configure and start

123

ssc.checkpoint("/path/to/checkpoint")

124

ssc.start()

125

ssc.awaitTermination()

126

```

127

128

### DStream[T]

129

130

Discretized Stream - represents a continuous sequence of RDDs. The fundamental abstraction in Spark Streaming.

131

132

```scala { .api }

133

/**

134

* Discretized Stream - represents a continuous sequence of RDDs

135

* @tparam T the type of elements in the stream

136

*/

137

abstract class DStream[T] {

138

/** The StreamingContext associated with this DStream */

139

def context: StreamingContext

140

141

/** The slide duration of this DStream */

142

def slideDuration: Duration

143

144

/** List of parent DStreams on which this DStream depends */

145

def dependencies: List[DStream[_]]

146

147

/** Persist RDDs in this DStream with the default storage level */

148

def persist(): DStream[T]

149

150

/** Persist RDDs with specific storage level */

151

def persist(level: StorageLevel): DStream[T]

152

153

/** Cache RDDs in memory */

154

def cache(): DStream[T]

155

156

/** Enable periodic checkpointing */

157

def checkpoint(interval: Duration): DStream[T]

158

}

159

```

160

161

**Basic Transformations:**

162

163

```scala { .api }

164

abstract class DStream[T] {

165

/** Transform each element using the provided function */

166

def map[U](mapFunc: T => U): DStream[U]

167

168

/** Transform each element and flatten results */

169

def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]

170

171

/** Filter elements based on predicate */

172

def filter(filterFunc: T => Boolean): DStream[T]

173

174

/** Transform each partition independently */

175

def mapPartitions[U](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U]

176

177

/** Group elements of each partition into an array */

178

def glom(): DStream[Array[T]]

179

180

/** Repartition RDDs in the DStream */

181

def repartition(numPartitions: Int): DStream[T]

182

183

/** Union with another DStream */

184

def union(that: DStream[T]): DStream[T]

185

}

186

```

187

188

**Reduction Operations:**

189

190

```scala { .api }

191

abstract class DStream[T] {

192

/** Reduce elements using associative and commutative function */

193

def reduce(reduceFunc: (T, T) => T): DStream[T]

194

195

/** Count number of elements in each RDD */

196

def count(): DStream[Long]

197

198

/** Count occurrences of each unique value */

199

def countByValue(): DStream[(T, Long)]

200

}

201

```

202

203

**Output Operations:**

204

205

```scala { .api }

206

abstract class DStream[T] {

207

/** Print first 10 elements of each RDD */

208

def print(): Unit

209

210

/** Print first num elements of each RDD */

211

def print(num: Int): Unit

212

213

/** Apply function to each RDD in the DStream */

214

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

215

216

/** Apply function to each RDD with timestamp */

217

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

218

219

/** Save as object files with prefix and suffix */

220

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

221

222

/** Save as text files with prefix and suffix */

223

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

224

}

225

```

226

227

**Advanced Transformations:**

228

229

```scala { .api }

230

abstract class DStream[T] {

231

/** Transform using arbitrary RDD operation */

232

def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]

233

234

/** Transform with access to timestamp */

235

def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

236

237

/** Transform with another DStream */

238

def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]

239

240

/** Transform with another DStream and timestamp access */

241

def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]

242

}

243

```

244

245

**Utility Methods:**

246

247

```scala { .api }

248

abstract class DStream[T] {

249

/** Retrieve RDDs in specified time interval */

250

def slice(interval: Interval): Seq[RDD[T]]

251

252

/** Retrieve RDDs between specified times */

253

def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

254

}

255

```

256

257

**Usage Examples:**

258

259

```scala

260

import org.apache.spark.streaming._

261

262

// Basic transformations

263

val lines: DStream[String] = ssc.textFileStream("/path/to/files")

264

val words = lines.flatMap(_.split(" "))

265

val filtered = words.filter(_.length > 3)

266

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

267

268

// Persistence and checkpointing

269

val important = lines.filter(_.contains("ERROR")).cache()

270

important.checkpoint(Seconds(30))

271

272

// Custom transformations

273

val processed = lines.transform { rdd =>

274

rdd.mapPartitions(partition => {

275

// Custom partition-level processing

276

partition.map(_.toUpperCase)

277

})

278

}

279

280

// Output operations

281

wordCounts.print(20)

282

wordCounts.foreachRDD { (rdd, time) =>

283

println(s"Batch time: $time")

284

rdd.take(10).foreach(println)

285

}

286

```

287

288

### PairDStreamFunctions[K, V]

289

290

Enhanced functionality for DStreams of key-value pairs, providing operations like grouping, joins, and aggregations.

291

292

```scala { .api }

293

/**

294

* Extra functionality available on DStreams of (key, value) pairs

295

* Available as implicit conversion from DStream[(K, V)]

296

*/

297

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {

298

/** Group values by key */

299

def groupByKey(): DStream[(K, Iterable[V])]

300

301

/** Group by key with specific number of partitions */

302

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

303

304

/** Group by key using custom partitioner */

305

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

306

307

/** Reduce values by key */

308

def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]

309

310

/** Reduce by key with specific partitions */

311

def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]

312

313

/** Reduce by key with custom partitioner */

314

def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

315

316

/** Combine values by key with custom combiners */

317

def combineByKey[C](

318

createCombiner: V => C,

319

mergeValue: (C, V) => C,

320

mergeCombiner: (C, C) => C

321

): DStream[(K, C)]

322

323

/** Transform values while preserving keys */

324

def mapValues[U](mapValuesFunc: V => U): DStream[(K, U)]

325

326

/** Flat map values while preserving keys */

327

def flatMapValues[U](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]

328

}

329

```

330

331

**Join Operations:**

332

333

```scala { .api }

334

class PairDStreamFunctions[K, V] {

335

/** Inner join with another pair DStream */

336

def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]

337

338

/** Left outer join */

339

def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

340

341

/** Right outer join */

342

def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

343

344

/** Full outer join */

345

def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

346

347

/** Cogroup with another pair DStream */

348

def cogroup[W](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

349

}

350

```

351

352

**Usage Examples:**

353

354

```scala

355

val pairs: DStream[(String, Int)] = words.map(word => (word, 1))

356

357

// Basic operations

358

val wordCounts = pairs.reduceByKey(_ + _)

359

val upperWords = pairs.mapValues(_.toString.toUpperCase)

360

val grouped = pairs.groupByKey()

361

362

// Joins

363

val pairs2: DStream[(String, Double)] = // another pair DStream

364

val joined = pairs.join(pairs2) // DStream[(String, (Int, Double))]

365

val leftJoined = pairs.leftOuterJoin(pairs2)

366

367

// Custom partitioning

368

val customPartitioner = new HashPartitioner(4)

369

val partitioned = pairs.reduceByKey(_ + _, customPartitioner)

370

```