or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md

index.mddocs/

0

# Apache Spark Streaming

1

2

Apache Spark Streaming is a scalable fault-tolerant streaming processing system built on Apache Spark that enables processing of live data streams. It provides high-level abstractions like DStream (discretized stream) that represents a continuous sequence of RDDs, offering fault-tolerance through lineage-based recovery and integration with Spark's batch processing capabilities.

3

4

**⚠️ Deprecation Notice:** Spark Streaming (DStreams) is deprecated as of Apache Spark 3.4.0. Users are strongly encouraged to migrate to [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), which provides a more modern streaming API with better performance, late data handling, and exactly-once processing guarantees.

5

6

## Package Information

7

8

- **Package Name**: org.apache.spark:spark-streaming_2.12

9

- **Package Type**: Maven

10

- **Language**: Scala

11

- **Installation**: Add to `pom.xml`:

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming_2.12</artifactId>

16

<version>3.5.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```scala

23

import org.apache.spark._

24

import org.apache.spark.streaming._

25

import org.apache.spark.streaming.dstream._

26

```

27

28

## Basic Usage

29

30

```scala

31

import org.apache.spark._

32

import org.apache.spark.streaming._

33

34

// Create StreamingContext with 2 second batch interval

35

val conf = new SparkConf().setAppName("StreamingApp")

36

val ssc = new StreamingContext(conf, Seconds(2))

37

38

// Create DStream from text files

39

val lines = ssc.textFileStream("/path/to/directory")

40

41

// Transform the data

42

val words = lines.flatMap(_.split(" "))

43

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

44

45

// Output the results

46

wordCounts.print()

47

48

// Start the streaming context

49

ssc.start()

50

ssc.awaitTermination()

51

```

52

53

## Architecture

54

55

Spark Streaming is built around several key components:

56

57

- **StreamingContext**: Main entry point for creating DStreams from various input sources

58

- **DStream**: Discretized stream representing a continuous sequence of RDDs

59

- **Micro-batching**: Divides live streams into small batches processed by Spark engine

60

- **Fault Tolerance**: Lineage-based recovery and checkpointing for stateful operations

61

- **Write-Ahead Log (WAL)**: Optional reliability mechanism for receiver-based sources

62

- **Receivers**: Background tasks running on worker nodes to receive external data

63

- **Rate Controller**: Backpressure mechanism to prevent overwhelming the system

64

- **Block Manager**: Manages storage of received data blocks across cluster nodes

65

- **Integration**: Seamless integration with Spark SQL, MLlib, and GraphX

66

67

## Capabilities

68

69

### Core Streaming Operations

70

71

Primary streaming functionality including StreamingContext creation, DStream transformations, and output operations. Essential for all streaming applications.

72

73

```scala { .api }

74

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {

75

def start(): Unit

76

def stop(): Unit

77

def awaitTermination(): Unit

78

def checkpoint(directory: String): Unit

79

}

80

81

abstract class DStream[T] {

82

def map[U](mapFunc: T => U): DStream[U]

83

def filter(filterFunc: T => Boolean): DStream[T]

84

def print(): Unit

85

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

86

}

87

```

88

89

[Core Streaming Operations](./core-streaming.md)

90

91

### Input Sources and Data Ingestion

92

93

Comprehensive data ingestion capabilities from files, sockets, queues, and custom sources. Supports both receiver-based and direct stream approaches.

94

95

```scala { .api }

96

class StreamingContext {

97

def textFileStream(directory: String): DStream[String]

98

def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]

99

def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]

100

def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]

101

}

102

103

abstract class Receiver[T](storageLevel: StorageLevel) {

104

def onStart(): Unit

105

def onStop(): Unit

106

def store(dataItem: T): Unit

107

}

108

```

109

110

[Input Sources and Data Ingestion](./input-sources.md)

111

112

### Advanced Transformations and Windowing

113

114

Windowed operations, stateful transformations, and advanced data processing patterns for complex streaming analytics.

115

116

```scala { .api }

117

class DStream[T] {

118

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

119

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

120

}

121

122

class PairDStreamFunctions[K, V] {

123

def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

124

def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]

125

}

126

```

127

128

[Advanced Transformations and Windowing](./advanced-transformations.md)

129

130

### Java API Integration

131

132

Java-friendly wrappers providing seamless integration for Java applications with lambda expressions and Java collections support.

133

134

```scala { .api }

135

class JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration) {

136

def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]

137

def textFileStream(directory: String): JavaDStream[String]

138

}

139

140

class JavaDStream[T] {

141

def map[U](f: JFunction[T, U]): JavaDStream[U]

142

def filter(f: JFunction[T, Boolean]): JavaDStream[T]

143

}

144

```

145

146

[Java API Integration](./java-api.md)

147

148

## Core Types

149

150

```scala { .api }

151

case class Duration(milliseconds: Long) {

152

def +(that: Duration): Duration

153

def -(that: Duration): Duration

154

def *(times: Int): Duration

155

def isMultipleOf(that: Duration): Boolean

156

}

157

158

object Seconds {

159

def apply(seconds: Long): Duration

160

}

161

162

object Minutes {

163

def apply(minutes: Long): Duration

164

}

165

166

case class Time(milliseconds: Long) {

167

def +(that: Duration): Time

168

def -(that: Time): Duration

169

def floor(that: Duration): Time

170

}

171

172

case class Interval(beginTime: Time, endTime: Time) {

173

def duration(): Duration

174

}

175

176

sealed trait StreamingContextState

177

object StreamingContextState {

178

case object INITIALIZED extends StreamingContextState

179

case object ACTIVE extends StreamingContextState

180

case object STOPPED extends StreamingContextState

181

}

182

183

sealed abstract class StorageLevel extends Serializable {

184

def useDisk: Boolean

185

def useMemory: Boolean

186

def useOffHeap: Boolean

187

def deserialized: Boolean

188

def replication: Int

189

}

190

191

object StorageLevel {

192

val NONE: StorageLevel

193

val DISK_ONLY: StorageLevel

194

val DISK_ONLY_2: StorageLevel

195

val MEMORY_ONLY: StorageLevel

196

val MEMORY_ONLY_2: StorageLevel

197

val MEMORY_ONLY_SER: StorageLevel

198

val MEMORY_ONLY_SER_2: StorageLevel

199

val MEMORY_AND_DISK: StorageLevel

200

val MEMORY_AND_DISK_2: StorageLevel

201

val MEMORY_AND_DISK_SER: StorageLevel

202

val MEMORY_AND_DISK_SER_2: StorageLevel

203

val OFF_HEAP: StorageLevel

204

}

205

206

abstract class StreamingListener {

207

def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}

208

def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}

209

def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}

210

def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}

211

def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}

212

def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}

213

def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}

214

def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}

215

def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}

216

}

217

```

218

219

## Error Handling

220

221

Spark Streaming applications can handle errors through:

222

223

- **Receiver Error Handling**: `Receiver.reportError()` for custom receiver failures

224

- **DStream Failure Recovery**: Automatic lineage-based RDD recovery

225

- **Checkpoint Recovery**: `StreamingContext.getOrCreate()` for driver failure recovery

226

- **Custom Error Processing**: Using `DStream.foreachRDD()` with try-catch blocks

227

228

Common exceptions include:

229

- `IllegalArgumentException`: Invalid configuration parameters

230

- `SparkException`: General Spark runtime errors

231

- `StreamingQueryException`: Streaming-specific runtime errors