or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

streaming-context.mddocs/

0

# Streaming Context

1

2

The StreamingContext is the main entry point for Apache Spark Streaming functionality. It coordinates the streaming application, manages the execution, and provides methods for creating input streams and configuring the streaming environment.

3

4

## Capabilities

5

6

### StreamingContext Creation

7

8

Create a StreamingContext with various configuration options.

9

10

```scala { .api }

11

/**

12

* Create StreamingContext from SparkContext

13

* @param sparkContext - Existing SparkContext instance

14

* @param batchDuration - Time interval at which streaming data will be divided into batches

15

*/

16

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

17

18

/**

19

* Create StreamingContext from SparkConf

20

* @param conf - Spark configuration

21

* @param batchDuration - Time interval at which streaming data will be divided into batches

22

*/

23

class StreamingContext(conf: SparkConf, batchDuration: Duration)

24

25

/**

26

* Create StreamingContext with master and app name

27

* @param master - Cluster URL to connect to

28

* @param appName - Name for your application

29

* @param batchDuration - Time interval at which streaming data will be divided into batches

30

* @param sparkHome - Spark home directory (optional)

31

* @param jars - JAR files to send to the cluster (optional)

32

* @param environment - Environment variables (optional)

33

*/

34

class StreamingContext(

35

master: String,

36

appName: String,

37

batchDuration: Duration,

38

sparkHome: String = null,

39

jars: Seq[String] = Nil,

40

environment: Map[String, String] = Map()

41

)

42

```

43

44

**Usage Examples:**

45

46

```scala

47

import org.apache.spark.SparkConf

48

import org.apache.spark.streaming.{StreamingContext, Seconds}

49

50

// From SparkConf

51

val conf = new SparkConf().setAppName("MyStreamingApp").setMaster("local[2]")

52

val ssc = new StreamingContext(conf, Seconds(1))

53

54

// Direct creation

55

val ssc2 = new StreamingContext("local[2]", "MyApp", Seconds(5))

56

```

57

58

### Lifecycle Management

59

60

Control the streaming application lifecycle with start, stop, and termination methods.

61

62

```scala { .api }

63

/**

64

* Start the streaming context and begin processing

65

*/

66

def start(): Unit

67

68

/**

69

* Stop the streaming context

70

* @param stopSparkContext - Whether to stop the underlying SparkContext

71

* @param stopGracefully - Whether to stop gracefully by waiting for data to be processed

72

*/

73

def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit

74

75

/**

76

* Wait for the streaming to terminate

77

*/

78

def awaitTermination(): Unit

79

80

/**

81

* Wait for termination or timeout

82

* @param timeout - Maximum time to wait in milliseconds

83

* @returns true if terminated within timeout, false if timeout occurred

84

*/

85

def awaitTerminationOrTimeout(timeout: Long): Boolean

86

87

/**

88

* Get the current state of the streaming context

89

* @returns StreamingContextState (INITIALIZED, ACTIVE, or STOPPED)

90

*/

91

def getState(): StreamingContextState

92

```

93

94

### Configuration Methods

95

96

Configure checkpointing, data retention, and streaming behavior.

97

98

```scala { .api }

99

/**

100

* Set checkpoint directory for fault tolerance

101

* @param directory - HDFS-compatible directory path for checkpoints

102

*/

103

def checkpoint(directory: String): Unit

104

105

/**

106

* Set how long to remember RDDs for recovery

107

* @param duration - Duration to keep RDDs in memory for recovery

108

*/

109

def remember(duration: Duration): Unit

110

111

/**

112

* Add streaming listener for monitoring

113

* @param streamingListener - Listener to receive streaming events

114

*/

115

def addStreamingListener(streamingListener: StreamingListener): Unit

116

```

117

118

### Socket Input Streams

119

120

Create input streams from TCP sockets.

121

122

```scala { .api }

123

/**

124

* Create text input stream from TCP socket

125

* @param hostname - Hostname to connect to

126

* @param port - Port number to connect to

127

* @param storageLevel - Storage level for received data

128

* @returns ReceiverInputDStream of strings

129

*/

130

def socketTextStream(

131

hostname: String,

132

port: Int,

133

storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2

134

): ReceiverInputDStream[String]

135

136

/**

137

* Create binary input stream from TCP socket

138

* @param hostname - Hostname to connect to

139

* @param port - Port number to connect to

140

* @param converter - Function to convert InputStream to Iterator[T]

141

* @param storageLevel - Storage level for received data

142

* @returns ReceiverInputDStream of converted type

143

*/

144

def socketStream[T: ClassTag](

145

hostname: String,

146

port: Int,

147

converter: (InputStream) => Iterator[T],

148

storageLevel: StorageLevel

149

): ReceiverInputDStream[T]

150

151

/**

152

* Create raw TCP socket stream

153

* @param hostname - Hostname to connect to

154

* @param port - Port number to connect to

155

* @param storageLevel - Storage level for received data

156

* @returns ReceiverInputDStream of byte arrays

157

*/

158

def rawSocketStream[T: ClassTag](

159

hostname: String,

160

port: Int,

161

storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2

162

): ReceiverInputDStream[T]

163

```

164

165

### File Input Streams

166

167

Create input streams that monitor file systems for new files.

168

169

```scala { .api }

170

/**

171

* Create input stream from text files in a directory

172

* @param directory - Directory path to monitor

173

* @returns DStream of strings (file contents line by line)

174

*/

175

def textFileStream(directory: String): DStream[String]

176

177

/**

178

* Create input stream from binary files

179

* @param directory - Directory path to monitor

180

* @param recordLength - Length of each record in bytes

181

* @returns DStream of byte arrays

182

*/

183

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

184

185

/**

186

* Create generic file input stream

187

* @param directory - Directory path to monitor

188

* @returns InputDStream of key-value pairs based on input format

189

*/

190

def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](

191

directory: String

192

): InputDStream[(K, V)]

193

```

194

195

### Queue and Custom Input Streams

196

197

Create input streams from RDD queues or custom receivers.

198

199

```scala { .api }

200

/**

201

* Create input stream from queue of RDDs

202

* @param queue - Queue containing RDDs to process

203

* @param oneAtATime - Whether to process one RDD per batch

204

* @param defaultRDD - Default RDD when queue is empty

205

* @returns InputDStream of queue elements

206

*/

207

def queueStream[T: ClassTag](

208

queue: Queue[RDD[T]],

209

oneAtATime: Boolean = true,

210

defaultRDD: RDD[T] = null

211

): InputDStream[T]

212

213

/**

214

* Create input stream from custom receiver

215

* @param receiver - Custom receiver implementation

216

* @returns ReceiverInputDStream from the receiver

217

*/

218

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

219

```

220

221

### Stream Combination Operations

222

223

Combine multiple streams into unified streams.

224

225

```scala { .api }

226

/**

227

* Union multiple DStreams of the same type

228

* @param streams - Sequence of DStreams to union

229

* @returns Single DStream containing data from all input streams

230

*/

231

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

232

233

/**

234

* Transform multiple DStreams using a custom function

235

* @param dstreams - Sequence of input DStreams

236

* @param transformFunc - Function to transform RDDs from all streams

237

* @returns DStream with transformed data

238

*/

239

def transform[T: ClassTag](

240

dstreams: Seq[DStream[_]],

241

transformFunc: (Seq[RDD[_]], Time) => RDD[T]

242

): DStream[T]

243

```

244

245

**Usage Examples:**

246

247

```scala

248

// Basic lifecycle

249

val ssc = new StreamingContext(conf, Seconds(1))

250

ssc.checkpoint("hdfs://checkpoint")

251

252

val lines = ssc.socketTextStream("localhost", 9999)

253

lines.print()

254

255

ssc.start()

256

ssc.awaitTermination()

257

258

// Combining streams

259

val stream1 = ssc.socketTextStream("host1", 9999)

260

val stream2 = ssc.socketTextStream("host2", 9999)

261

val combined = ssc.union(Seq(stream1, stream2))

262

263

// Custom transformation

264

val transformed = ssc.transform(Seq(stream1, stream2), (rdds, time) => {

265

val rdd1 = rdds(0).asInstanceOf[RDD[String]]

266

val rdd2 = rdds(1).asInstanceOf[RDD[String]]

267

rdd1.union(rdd2).filter(_.nonEmpty)

268

})

269

```

270

271

## Context Factory Methods

272

273

Factory methods for creating StreamingContext from checkpoints.

274

275

```scala { .api }

276

object StreamingContext {

277

/**

278

* Recreate StreamingContext from checkpoint

279

* @param path - Path to checkpoint directory

280

* @param hadoopConf - Hadoop configuration (optional)

281

* @returns StreamingContext restored from checkpoint

282

*/

283

def getOrCreate(

284

path: String,

285

creatingFunc: () => StreamingContext,

286

hadoopConf: Configuration = new Configuration(),

287

createOnError: Boolean = false

288

): StreamingContext

289

290

/**

291

* Get active StreamingContext

292

* @returns Currently active StreamingContext or null

293

*/

294

def getActive(): Option[StreamingContext]

295

}

296

```

297

298

## Types

299

300

```scala { .api }

301

// Context state enumeration

302

object StreamingContextState extends Enumeration {

303

type StreamingContextState = Value

304

val INITIALIZED, ACTIVE, STOPPED = Value

305

}

306

```