or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md

core-streaming.mddocs/

0

# Core Streaming

1

2

Core streaming abstractions including StreamingContext for coordinating stream processing, DStream operations for data transformations, and time management classes.

3

4

## Capabilities

5

6

### StreamingContext

7

8

Main entry point for Spark Streaming functionality, responsible for creating input streams, managing the streaming lifecycle, and coordinating batch processing.

9

10

```scala { .api }

11

/**

12

* Main entry point for all streaming operations

13

* @param sparkContext - Spark context for cluster communication

14

* @param batchDuration - Time interval for micro-batch processing

15

*/

16

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {

17

18

/** Alternative constructor using SparkConf */

19

def this(conf: SparkConf, batchDuration: Duration)

20

21

/** Constructor with master URL and app name */

22

def this(

23

master: String,

24

appName: String,

25

batchDuration: Duration,

26

sparkHome: String = null,

27

jars: Seq[String] = Nil,

28

environment: Map[String, String] = Map()

29

)

30

31

/** Constructor for recreating from checkpoint directory */

32

def this(path: String, hadoopConf: Configuration)

33

34

/** Constructor for recreating from checkpoint directory (simplified) */

35

def this(path: String)

36

37

// Lifecycle management

38

/** Start the streaming context and begin processing */

39

def start(): Unit

40

41

/** Stop the streaming context */

42

def stop(stopSparkContext: Boolean = true): Unit

43

44

/** Wait for the streaming context to terminate */

45

def awaitTermination(): Unit

46

47

/** Wait for termination or timeout */

48

def awaitTerminationOrTimeout(timeout: Long): Boolean

49

50

// Configuration

51

/** Set checkpoint directory for fault tolerance */

52

def checkpoint(directory: String): Unit

53

54

/** Set remember duration for caching intermediate RDDs */

55

def remember(duration: Duration): Unit

56

57

// State and properties

58

/** Get current state of the streaming context */

59

def getState(): StreamingContextState

60

61

/** Access to underlying Spark context */

62

def sparkContext: SparkContext

63

64

// Input stream creation (detailed in Input Sources doc)

65

def socketTextStream(hostname: String, port: Int): DStream[String]

66

def textFileStream(directory: String): DStream[String]

67

def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): DStream[T]

68

69

// Listeners

70

/** Add a streaming listener for monitoring */

71

def addStreamingListener(listener: StreamingListener): Unit

72

73

/** Remove a streaming listener */

74

def removeStreamingListener(listener: StreamingListener): Unit

75

}

76

```

77

78

**Usage Examples:**

79

80

```scala

81

import org.apache.spark.{SparkConf, SparkContext}

82

import org.apache.spark.streaming._

83

84

// Create from SparkContext

85

val sc = new SparkContext(new SparkConf().setAppName("StreamingApp"))

86

val ssc = new StreamingContext(sc, Seconds(2))

87

88

// Create from SparkConf directly

89

val ssc2 = new StreamingContext(

90

new SparkConf().setAppName("StreamingApp").setMaster("local[2]"),

91

Seconds(1)

92

)

93

94

// Configure checkpointing for fault tolerance

95

ssc.checkpoint("hdfs://namenode:9000/checkpoints")

96

97

// Create input streams and transformations

98

val lines = ssc.socketTextStream("localhost", 9999)

99

val words = lines.flatMap(_.split(" "))

100

101

// Start processing

102

ssc.start()

103

ssc.awaitTermination()

104

```

105

106

### DStream Operations

107

108

Core abstraction representing a continuous sequence of RDDs with transformation and action operations.

109

110

```scala { .api }

111

/**

112

* Discretized Stream - represents a continuous sequence of RDDs

113

*/

114

abstract class DStream[T] {

115

116

// Core properties

117

/** Duration of each batch */

118

def slideDuration: Duration

119

120

/** Dependencies on other DStreams */

121

def dependencies: List[DStream[_]]

122

123

/** Access to streaming context */

124

def context: StreamingContext

125

126

// Transformations - create new DStreams

127

/** Transform each element using a function */

128

def map[U](mapFunc: T => U): DStream[U]

129

130

/** Transform each element to zero or more elements */

131

def flatMap[U](flatMapFunc: T => Iterable[U]): DStream[U]

132

133

/** Filter elements based on a predicate */

134

def filter(filterFunc: T => Boolean): DStream[T]

135

136

/** Group elements into arrays for each batch */

137

def glom(): DStream[Array[T]]

138

139

/** Repartition the DStream */

140

def repartition(numPartitions: Int): DStream[T]

141

142

/** Union with another DStream */

143

def union(that: DStream[T]): DStream[T]

144

145

/** Cache the DStream RDDs at default storage level */

146

def cache(): DStream[T]

147

148

/** Persist the DStream RDDs at specified storage level */

149

def persist(level: StorageLevel): DStream[T]

150

151

// Window operations

152

/** Create windowed DStream */

153

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

154

155

/** Reduce elements in a sliding window */

156

def reduceByWindow(

157

reduceFunc: (T, T) => T,

158

windowDuration: Duration,

159

slideDuration: Duration

160

): DStream[T]

161

162

/** Count elements in a sliding window */

163

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

164

165

// Note: Stateful operations (updateStateByKey, mapWithState) are only available

166

// on DStream[(K, V)] through PairDStreamFunctions - see that section below

167

168

// Actions - trigger computation

169

/** Print first 10 elements of each batch */

170

def print(): Unit

171

172

/** Print first num elements of each batch */

173

def print(num: Int): Unit

174

175

/** Apply a function to each RDD in the DStream */

176

def foreachRDD(func: RDD[T] => Unit): Unit

177

178

/** Apply a function to each RDD with timestamp */

179

def foreachRDD(func: (RDD[T], Time) => Unit): Unit

180

181

// Output operations

182

/** Save as text files with prefix */

183

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

184

185

/** Save as Hadoop files */

186

def saveAsHadoopFiles[F <: OutputFormat[K, V]](

187

prefix: String,

188

suffix: String = ""

189

): Unit

190

}

191

```

192

193

**Usage Examples:**

194

195

```scala

196

// Basic transformations

197

val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

198

val words: DStream[String] = lines.flatMap(_.split(" "))

199

val filtered: DStream[String] = words.filter(_.length > 3)

200

201

// Window operations

202

val windowedWords = words.window(Seconds(10), Seconds(2))

203

val wordCounts = words.map((_, 1))

204

.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(2))

205

206

// Actions

207

words.print(20) // Print first 20 elements each batch

208

words.foreachRDD { rdd =>

209

if (!rdd.isEmpty()) {

210

println(s"Batch has ${rdd.count()} elements")

211

}

212

}

213

214

// Save to files

215

words.saveAsTextFiles("hdfs://namenode/output", "txt")

216

```

217

218

### PairDStreamFunctions

219

220

Operations available only on DStreams of key-value pairs, providing aggregation and join capabilities.

221

222

```scala { .api }

223

/**

224

* Additional operations on DStreams of (K, V) pairs

225

* Available via implicit conversion when DStream contains tuples

226

*/

227

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {

228

229

// Key-based aggregations

230

/** Group values by key */

231

def groupByKey(): DStream[(K, Iterable[V])]

232

233

/** Reduce values by key using associative function */

234

def reduceByKey(func: (V, V) => V): DStream[(K, V)]

235

236

/** Combine values by key using combiner functions */

237

def combineByKey[C](

238

createCombiner: V => C,

239

mergeValue: (C, V) => C,

240

mergeCombiner: (C, C) => C

241

): DStream[(K, C)]

242

243

/** Count occurrences of each key */

244

def countByKey(): DStream[(K, Long)]

245

246

// Joins with other pair DStreams

247

/** Inner join with another pair DStream */

248

def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]

249

250

/** Left outer join */

251

def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

252

253

/** Right outer join */

254

def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

255

256

/** Full outer join */

257

def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

258

259

// Windowed operations

260

/** Group by key within a sliding window */

261

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

262

263

/** Reduce by key within a sliding window */

264

def reduceByKeyAndWindow(

265

func: (V, V) => V,

266

windowDuration: Duration,

267

slideDuration: Duration

268

): DStream[(K, V)]

269

270

// Stateful operations

271

/** Update state by key using a function */

272

def updateStateByKey[S](

273

updateFunc: (Seq[V], Option[S]) => Option[S]

274

): DStream[(K, S)]

275

276

/** Update state by key with custom partitioner */

277

def updateStateByKey[S](

278

updateFunc: (Seq[V], Option[S]) => Option[S],

279

partitioner: Partitioner

280

): DStream[(K, S)]

281

282

/** Update state by key with number of partitions */

283

def updateStateByKey[S](

284

updateFunc: (Seq[V], Option[S]) => Option[S],

285

numPartitions: Int

286

): DStream[(K, S)]

287

288

/** Map with state using StateSpec */

289

def mapWithState[StateType, MappedType](

290

spec: StateSpec[K, V, StateType, MappedType]

291

): MapWithStateDStream[K, V, StateType, MappedType]

292

}

293

```

294

295

### Time and Duration Classes

296

297

Time management classes for specifying batch intervals, window durations, and timeout periods.

298

299

```scala { .api }

300

/**

301

* Represents a duration for streaming operations

302

*/

303

case class Duration(milliseconds: Long) {

304

// Arithmetic operations

305

def +(other: Duration): Duration

306

def -(other: Duration): Duration

307

def *(times: Int): Duration

308

def /(divisor: Int): Duration

309

310

// Comparison operations

311

def <(other: Duration): Boolean

312

def <=(other: Duration): Boolean

313

def >(other: Duration): Boolean

314

def >=(other: Duration): Boolean

315

316

// Utility methods

317

def isMultipleOf(other: Duration): Boolean

318

def min(other: Duration): Duration

319

def max(other: Duration): Duration

320

def isZero: Boolean

321

def prettyPrint: String

322

}

323

324

/**

325

* Represents an absolute point in time

326

*/

327

case class Time(milliseconds: Long) {

328

// Arithmetic with Duration

329

def +(duration: Duration): Time

330

def -(duration: Duration): Time

331

def -(other: Time): Duration

332

333

// Utility methods

334

def floor(duration: Duration): Time

335

def isMultipleOf(duration: Duration): Boolean

336

def until(endTime: Time, stepSize: Duration): Seq[Time]

337

def to(endTime: Time, stepSize: Duration): Seq[Time]

338

}

339

340

/**

341

* Factory objects for creating durations

342

*/

343

object Milliseconds {

344

def apply(milliseconds: Long): Duration

345

}

346

347

object Seconds {

348

def apply(seconds: Long): Duration

349

}

350

351

object Minutes {

352

def apply(minutes: Long): Duration

353

}

354

355

/**

356

* Java-friendly duration factories

357

*/

358

object Durations {

359

def milliseconds(milliseconds: Long): Duration

360

def seconds(seconds: Long): Duration

361

def minutes(minutes: Long): Duration

362

}

363

364

/**

365

* Represents a time interval with start and end

366

*/

367

case class Interval(beginTime: Time, endTime: Time) {

368

def duration: Duration = endTime - beginTime

369

def contains(time: Time): Boolean

370

}

371

```

372

373

**Usage Examples:**

374

375

```scala

376

// Creating durations

377

val batchInterval = Seconds(2)

378

val windowSize = Minutes(10)

379

val slideInterval = Seconds(30)

380

381

// Time arithmetic

382

val future = Time(System.currentTimeMillis()) + Minutes(5)

383

val elapsed = Time(System.currentTimeMillis()) - Time(startTime)

384

385

// Using in streaming operations

386

val ssc = new StreamingContext(conf, Seconds(1))

387

val windowed = dstream.window(Minutes(5), Seconds(30))

388

val reduced = dstream.reduceByWindow(_ + _, Minutes(2), Seconds(10))

389

```