or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming_2-13

Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.13@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-13@3.5.0

0

# Apache Spark Streaming

1

2

Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.

3

4

**⚠️ Deprecation Notice**: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.

5

6

## Package Information

7

8

- **Package Name**: spark-streaming_2.13

9

- **Package Type**: maven

10

- **Language**: Scala (with Java API)

11

- **Version**: 3.5.6

12

- **Installation**: Add to `pom.xml` or `build.sbt`

13

14

Maven:

15

```xml

16

<dependency>

17

<groupId>org.apache.spark</groupId>

18

<artifactId>spark-streaming_2.13</artifactId>

19

<version>3.5.6</version>

20

</dependency>

21

```

22

23

SBT:

24

```scala

25

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"

26

```

27

28

## Core Imports

29

30

```scala

31

import org.apache.spark.streaming._

32

import org.apache.spark.streaming.dstream._

33

import org.apache.spark.streaming.receiver._

34

import org.apache.spark.streaming.scheduler._

35

```

36

37

For Java API:

38

```java

39

import org.apache.spark.streaming.api.java.*;

40

import org.apache.spark.streaming.*;

41

import org.apache.spark.streaming.scheduler.*;

42

```

43

44

## Basic Usage

45

46

```scala

47

import org.apache.spark.{SparkConf, SparkContext}

48

import org.apache.spark.streaming._

49

50

// Create streaming context with 2-second batch interval

51

val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")

52

val ssc = new StreamingContext(conf, Seconds(2))

53

54

// Create input stream from socket

55

val lines = ssc.socketTextStream("localhost", 9999)

56

57

// Transform and process the stream

58

val words = lines.flatMap(_.split(" "))

59

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

60

61

// Output results

62

wordCounts.print()

63

64

// Start the streaming context

65

ssc.start()

66

ssc.awaitTermination()

67

```

68

69

## Architecture

70

71

Apache Spark Streaming processes live data streams using micro-batches:

72

73

- **StreamingContext**: Main entry point that coordinates stream processing

74

- **DStream**: Fundamental abstraction representing a continuous sequence of RDDs

75

- **Receivers**: Components that ingest data from external sources (Kafka, Flume, etc.)

76

- **Transformations**: Operations that create new DStreams from existing ones

77

- **Output Operations**: Actions that write DStream data to external systems

78

- **Checkpointing**: Fault tolerance mechanism for stateful operations

79

80

## Capabilities

81

82

### Core Streaming

83

84

Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.

85

86

```scala { .api }

87

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {

88

def start(): Unit

89

def stop(stopSparkContext: Boolean = true): Unit

90

def awaitTermination(): Unit

91

}

92

93

abstract class DStream[T] {

94

def map[U](mapFunc: T => U): DStream[U]

95

def filter(filterFunc: T => Boolean): DStream[T]

96

def foreachRDD(func: RDD[T] => Unit): Unit

97

}

98

99

case class Duration(milliseconds: Long)

100

object Seconds { def apply(seconds: Long): Duration }

101

```

102

103

[Core Streaming](./core-streaming.md)

104

105

### Input Sources

106

107

Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.

108

109

```scala { .api }

110

class StreamingContext {

111

def socketTextStream(hostname: String, port: Int): DStream[String]

112

def textFileStream(directory: String): DStream[String]

113

def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]

114

def queueStream[T](queue: Queue[RDD[T]]): DStream[T]

115

}

116

```

117

118

[Input Sources](./input-sources.md)

119

120

### State Management

121

122

Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.

123

124

```scala { .api }

125

abstract class State[S] {

126

def exists(): Boolean

127

def get(): S

128

def update(newState: S): Unit

129

def remove(): Unit

130

}

131

132

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {

133

def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]

134

def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]

135

}

136

```

137

138

[State Management](./state-management.md)

139

140

### Java API

141

142

Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.

143

144

```java { .api }

145

public class JavaStreamingContext {

146

public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);

147

public void start();

148

public void awaitTermination();

149

}

150

151

public class JavaDStream<T> {

152

public <R> JavaDStream<R> map(Function<T, R> f);

153

public JavaDStream<T> filter(Function<T, Boolean> f);

154

public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);

155

}

156

```

157

158

[Java API](./java-api.md)

159

160

### Monitoring and Listeners

161

162

Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.

163

164

```scala { .api }

165

trait StreamingListener {

166

def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit

167

def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit

168

def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit

169

}

170

171

case class BatchInfo(

172

batchTime: Time,

173

streamIdToInputInfo: Map[Int, StreamInputInfo],

174

submissionTime: Long,

175

processingStartTime: Option[Long],

176

processingEndTime: Option[Long]

177

)

178

```

179

180

[Monitoring and Listeners](./monitoring-listeners.md)

181

182

### Web UI

183

184

Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.

185

186

```scala { .api }

187

class StreamingTab(parent: SparkUI)

188

class StreamingPage(parent: StreamingTab)

189

class StreamingJobProgressListener extends StreamingListener

190

```

191

192

[Web UI](./web-ui.md)

193

194

## Types

195

196

```scala { .api }

197

// Time and Duration

198

case class Time(milliseconds: Long) {

199

def +(that: Duration): Time

200

def -(that: Duration): Time

201

def -(that: Time): Duration

202

}

203

204

case class Duration(milliseconds: Long) {

205

def +(that: Duration): Duration

206

def -(that: Duration): Duration

207

def *(times: Int): Duration

208

def <(that: Duration): Boolean

209

def <=(that: Duration): Boolean

210

}

211

212

case class Interval(beginTime: Time, endTime: Time)

213

214

// State Management Types

215

sealed trait ValidationResult[T]

216

case class ValidationError(field: String, message: String)

217

218

// Streaming Context States

219

object StreamingContextState extends Enumeration {

220

val INITIALIZED, ACTIVE, STOPPED = Value

221

}

222

223

// Input Stream Information

224

case class StreamInputInfo(

225

inputStreamId: Int,

226

numRecords: Long,

227

metadata: Map[String, Any] = Map.empty

228

) {

229

def metadataDescription: Option[String]

230

}

231

```