or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

core-operations.mddocs/

0

# Core Streaming Operations

1

2

Core functionality for creating and managing Spark Streaming contexts, controlling application lifecycle, and basic DStream operations.

3

4

## StreamingContext Creation

5

6

### Primary Constructors

7

8

Create StreamingContext with existing SparkContext:

9

```scala { .api }

10

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

11

```

12

13

Create StreamingContext with Spark configuration:

14

```scala { .api }

15

class StreamingContext(conf: SparkConf, batchDuration: Duration)

16

```

17

18

Create StreamingContext with master URL and app name:

19

```scala { .api }

20

class StreamingContext(

21

master: String,

22

appName: String,

23

batchDuration: Duration,

24

sparkHome: String = null,

25

jars: Seq[String] = Nil,

26

environment: Map[String, String] = Map()

27

)

28

```

29

30

### Checkpoint Recovery

31

32

Create StreamingContext from checkpoint:

33

```scala { .api }

34

class StreamingContext(path: String)

35

class StreamingContext(path: String, hadoopConf: Configuration)

36

class StreamingContext(path: String, sparkContext: SparkContext)

37

```

38

39

Example checkpoint recovery:

40

```scala

41

val checkpointDir = "hdfs://checkpoint-dir"

42

43

def createStreamingContext(): StreamingContext = {

44

val conf = new SparkConf().setAppName("MyApp")

45

val ssc = new StreamingContext(conf, Seconds(5))

46

ssc.checkpoint(checkpointDir)

47

// Define your streams and transformations here

48

ssc

49

}

50

51

val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

52

```

53

54

## Lifecycle Management

55

56

### Starting and Stopping

57

58

Start streaming computation:

59

```scala { .api }

60

def start(): Unit

61

```

62

63

Stop streaming computation:

64

```scala { .api }

65

def stop(stopSparkContext: Boolean = true): Unit

66

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

67

```

68

69

Wait for termination:

70

```scala { .api }

71

def awaitTermination(): Unit

72

def awaitTerminationOrTimeout(timeout: Long): Boolean

73

```

74

75

Example lifecycle management:

76

```scala

77

val ssc = new StreamingContext(conf, Seconds(1))

78

79

// Define your streams and transformations

80

val lines = ssc.socketTextStream("localhost", 9999)

81

val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

82

wordCounts.print()

83

84

// Start the streaming context

85

ssc.start()

86

87

// Add shutdown hook for graceful termination

88

sys.ShutdownHookThread {

89

println("Gracefully stopping Spark Streaming Application")

90

ssc.stop(true, true)

91

println("Application stopped")

92

}

93

94

// Wait for termination

95

ssc.awaitTermination()

96

```

97

98

### State Management

99

100

Get current context state:

101

```scala { .api }

102

def getState(): StreamingContextState

103

```

104

105

StreamingContextState values:

106

- `INITIALIZED` - Context created but not started

107

- `ACTIVE` - Context started and running

108

- `STOPPED` - Context stopped

109

110

Access underlying SparkContext:

111

```scala { .api }

112

def sparkContext: SparkContext

113

```

114

115

## Configuration and Checkpointing

116

117

### Checkpoint Configuration

118

119

Set checkpoint directory:

120

```scala { .api }

121

def checkpoint(directory: String): Unit

122

```

123

124

Set remember duration for DStreams:

125

```scala { .api }

126

def remember(duration: Duration): Unit

127

```

128

129

Example checkpoint setup:

130

```scala

131

val ssc = new StreamingContext(conf, Seconds(5))

132

ssc.checkpoint("hdfs://namenode:9000/checkpoint")

133

ssc.remember(Minutes(2)) // Remember last 2 minutes of data

134

```

135

136

## Static Context Management

137

138

### Singleton Context Management

139

140

Get currently active StreamingContext:

141

```scala { .api }

142

def getActive(): Option[StreamingContext]

143

```

144

145

Get or create StreamingContext:

146

```scala { .api }

147

def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext

148

```

149

150

Get or create from checkpoint:

151

```scala { .api }

152

def getOrCreate(

153

checkpointPath: String,

154

creatingFunc: () => StreamingContext,

155

hadoopConf: Configuration = SparkHadoopUtil.get.conf,

156

createOnError: Boolean = false

157

): StreamingContext

158

```

159

160

Example singleton management:

161

```scala

162

object StreamingApp {

163

def createContext(): StreamingContext = {

164

val conf = new SparkConf().setAppName("MyStreamingApp")

165

val ssc = new StreamingContext(conf, Seconds(2))

166

ssc.checkpoint("checkpoint-dir")

167

// Define streaming logic

168

ssc

169

}

170

171

def main(args: Array[String]): Unit = {

172

// This will create new context or recover from checkpoint

173

val ssc = StreamingContext.getOrCreate("checkpoint-dir", createContext _)

174

ssc.start()

175

ssc.awaitTermination()

176

}

177

}

178

```

179

180

## DStream Union and Transform

181

182

### Union Operations

183

184

Union multiple DStreams:

185

```scala { .api }

186

def union[T](streams: Seq[DStream[T]]): DStream[T]

187

```

188

189

Example union:

190

```scala

191

val stream1 = ssc.socketTextStream("localhost", 9999)

192

val stream2 = ssc.socketTextStream("localhost", 9998)

193

val stream3 = ssc.textFileStream("/data/input")

194

195

val unionStream = ssc.union(Seq(stream1, stream2, stream3))

196

```

197

198

### Transform Operations

199

200

Transform multiple DStreams together:

201

```scala { .api }

202

def transform[T](

203

dstreams: Seq[DStream[_]],

204

transformFunc: (Seq[RDD[_]], Time) => RDD[T]

205

): DStream[T]

206

```

207

208

Example multi-stream transform:

209

```scala

210

val stream1 = ssc.socketTextStream("localhost", 9999)

211

val stream2 = ssc.socketTextStream("localhost", 9998)

212

213

val combinedStream = ssc.transform(Seq(stream1, stream2)) { (rdds, time) =>

214

val rdd1 = rdds(0).asInstanceOf[RDD[String]]

215

val rdd2 = rdds(1).asInstanceOf[RDD[String]]

216

217

// Custom transformation logic

218

rdd1.union(rdd2).filter(_.length > 5)

219

}

220

```

221

222

## Event Listeners

223

224

### Adding and Removing Listeners

225

226

Add streaming listener:

227

```scala { .api }

228

def addStreamingListener(streamingListener: StreamingListener): Unit

229

```

230

231

Remove streaming listener:

232

```scala { .api }

233

def removeStreamingListener(streamingListener: StreamingListener): Unit

234

```

235

236

Example custom listener:

237

```scala

238

class MyStreamingListener extends StreamingListener {

239

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

240

val batchInfo = batchCompleted.batchInfo

241

println(s"Batch ${batchInfo.batchTime} completed in ${batchInfo.processingDelay}ms")

242

}

243

244

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {

245

println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")

246

}

247

}

248

249

ssc.addStreamingListener(new MyStreamingListener())

250

```

251

252

## Utility Methods

253

254

### JAR Management

255

256

Get JAR file for a class:

257

```scala { .api }

258

def jarOfClass(cls: Class[_]): Option[String]

259

```

260

261

Example usage:

262

```scala

263

val jarPath = StreamingContext.jarOfClass(classOf[MyCustomClass])

264

```

265

266

### Context Access

267

268

Access the underlying Spark context:

269

```scala

270

val sc = ssc.sparkContext

271

val appName = sc.appName

272

val mastser = sc.master

273

```

274

275

## Duration Helpers

276

277

Create duration objects:

278

```scala { .api }

279

object Milliseconds {

280

def apply(milliseconds: Long): Duration

281

}

282

283

object Seconds {

284

def apply(seconds: Long): Duration

285

}

286

287

object Minutes {

288

def apply(minutes: Long): Duration

289

}

290

```

291

292

Common duration patterns:

293

```scala

294

val batchInterval = Seconds(5) // 5 second batches

295

val windowSize = Minutes(10) // 10 minute windows

296

val slideInterval = Seconds(30) // 30 second slides

297

val checkpointInterval = Minutes(2) // Checkpoint every 2 minutes

298

```