or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming_2-11

Scalable, high-throughput, fault-tolerant stream processing library for real-time data processing on Apache Spark

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-11@2.4.0

0

# Apache Spark Streaming

1

2

Apache Spark Streaming is a scalable, high-throughput, fault-tolerant stream processing library built on Apache Spark Core that enables processing of live data streams from sources like Kafka, Flume, Kinesis, or TCP sockets. It provides high-level APIs in Scala, Java, and Python for building streaming applications using functional transformations like map, reduce, join, and window operations on DStreams (discretized streams).

3

4

## Package Information

5

6

- **Package Name**: org.apache.spark:spark-streaming_2.11

7

- **Package Type**: Maven

8

- **Language**: Scala (with Java API)

9

- **Installation**: Add dependency to your Maven `pom.xml` or SBT `build.sbt`

10

11

**Maven:**

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming_2.11</artifactId>

16

<version>2.4.8</version>

17

</dependency>

18

```

19

20

**SBT:**

21

```scala

22

"org.apache.spark" %% "spark-streaming" % "2.4.8"

23

```

24

25

## Core Imports

26

27

**Scala:**

28

```scala

29

import org.apache.spark.streaming.{StreamingContext, Duration, Seconds, Minutes}

30

import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}

31

import org.apache.spark.{SparkConf, SparkContext}

32

import org.apache.spark.storage.StorageLevel

33

```

34

35

**Java:**

36

```java

37

import org.apache.spark.streaming.api.java.JavaStreamingContext;

38

import org.apache.spark.streaming.api.java.JavaDStream;

39

import org.apache.spark.streaming.api.java.JavaPairDStream;

40

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

41

import org.apache.spark.streaming.Duration;

42

import org.apache.spark.streaming.Durations;

43

import org.apache.spark.SparkConf;

44

import org.apache.spark.storage.StorageLevel;

45

```

46

47

## Basic Usage

48

49

**Scala Example:**

50

```scala

51

import org.apache.spark.SparkConf

52

import org.apache.spark.streaming.{StreamingContext, Seconds}

53

54

// Create streaming context

55

val conf = new SparkConf().setAppName("StreamingExample")

56

val ssc = new StreamingContext(conf, Seconds(1))

57

58

// Create input stream from TCP socket

59

val lines = ssc.socketTextStream("localhost", 9999)

60

61

// Transform and process data

62

val words = lines.flatMap(_.split(" "))

63

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

64

65

// Output results

66

wordCounts.print()

67

68

// Start streaming

69

ssc.start()

70

ssc.awaitTermination()

71

```

72

73

**Java Example:**

74

```java

75

import org.apache.spark.SparkConf;

76

import org.apache.spark.streaming.api.java.JavaStreamingContext;

77

import org.apache.spark.streaming.api.java.JavaDStream;

78

import org.apache.spark.streaming.api.java.JavaPairDStream;

79

import org.apache.spark.streaming.Durations;

80

import java.util.Arrays;

81

import scala.Tuple2;

82

83

// Create streaming context

84

SparkConf conf = new SparkConf().setAppName("StreamingExample");

85

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

86

87

// Create input stream from TCP socket

88

JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

89

90

// Transform and process data

91

JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

92

JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))

93

.reduceByKey((i1, i2) -> i1 + i2);

94

95

// Output results

96

wordCounts.print();

97

98

// Start streaming

99

jssc.start();

100

jssc.awaitTermination();

101

```

102

103

## Architecture

104

105

Apache Spark Streaming is built around several key components:

106

107

- **StreamingContext**: Main entry point that coordinates the streaming application and manages the execution

108

- **DStream (Discretized Stream)**: Core abstraction representing a continuous stream of RDDs, with time-sliced micro-batches

109

- **Input Sources**: Various data sources like sockets, files, Kafka, Flume for ingesting streaming data

110

- **Transformations**: Functional operations like map, filter, reduce that transform DStreams

111

- **Output Operations**: Actions that send processed data to external systems or storage

112

- **Checkpointing**: Fault tolerance mechanism that periodically saves state to persistent storage

113

- **Receivers**: Components that collect data from streaming sources and feed it into Spark

114

115

## Capabilities

116

117

### Streaming Context Management

118

119

Entry point for creating and managing Spark Streaming applications. Handles initialization, configuration, and lifecycle management.

120

121

```scala { .api }

122

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

123

class StreamingContext(conf: SparkConf, batchDuration: Duration)

124

125

// Lifecycle methods

126

def start(): Unit

127

def stop(stopSparkContext: Boolean = true): Unit

128

def awaitTermination(): Unit

129

def awaitTerminationOrTimeout(timeout: Long): Boolean

130

131

// Configuration

132

def checkpoint(directory: String): Unit

133

def remember(duration: Duration): Unit

134

```

135

136

[Streaming Context](./streaming-context.md)

137

138

### Data Stream Operations

139

140

Core functionality for creating, transforming, and processing continuous data streams with functional programming paradigms.

141

142

```scala { .api }

143

abstract class DStream[T] {

144

// Transformations

145

def map[U](mapFunc: T => U): DStream[U]

146

def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]

147

def filter(filterFunc: T => Boolean): DStream[T]

148

def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]

149

def union(that: DStream[T]): DStream[T]

150

151

// Window operations

152

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

153

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

154

155

// Output operations

156

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

157

def print(num: Int = 10): Unit

158

}

159

```

160

161

[Data Streams](./data-streams.md)

162

163

### Input Sources

164

165

Various mechanisms for ingesting data from external sources including sockets, files, message queues, and custom receivers.

166

167

```scala { .api }

168

// Socket streams

169

def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

170

def rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

171

172

// File streams

173

def textFileStream(directory: String): DStream[String]

174

def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]

175

176

// Queue and custom streams

177

def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]

178

def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]

179

```

180

181

[Input Sources](./input-sources.md)

182

183

### Key-Value Operations

184

185

Specialized operations for key-value pair streams including aggregations, joins, and state management.

186

187

```scala { .api }

188

class PairDStreamFunctions[K, V] {

189

// Aggregations

190

def groupByKey(): DStream[(K, Iterable[V])]

191

def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]

192

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]

193

194

// Window aggregations

195

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

196

197

// State management

198

def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

199

def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]

200

201

// Joins

202

def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]

203

def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

204

}

205

```

206

207

[Key-Value Operations](./key-value-ops.md)

208

209

### Window Operations

210

211

Time-based windowing operations for aggregating data across multiple time intervals.

212

213

```scala { .api }

214

// Basic windowing

215

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

216

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

217

218

// Windowed reductions

219

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

220

def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

221

```

222

223

[Window Operations](./window-ops.md)

224

225

### State Management

226

227

Stateful processing capabilities for maintaining state across streaming batches with fault tolerance.

228

229

```scala { .api }

230

// StateSpec for mapWithState

231

object StateSpec {

232

def function[KeyType, ValueType, StateType, MappedType](

233

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]

234

): StateSpec[KeyType, ValueType, StateType, MappedType]

235

}

236

237

abstract class State[S] {

238

def exists(): Boolean

239

def get(): S

240

def update(newState: S): Unit

241

def remove(): Unit

242

def isTimingOut(): Boolean

243

}

244

```

245

246

[State Management](./state-management.md)

247

248

### Java API

249

250

Java-friendly wrappers providing the same functionality with Java-compatible interfaces and type system.

251

252

```java { .api }

253

public class JavaStreamingContext {

254

public JavaStreamingContext(SparkConf conf, Duration batchDuration);

255

public JavaDStream<String> socketTextStream(String hostname, int port);

256

public void start();

257

public void awaitTermination();

258

}

259

260

public class JavaDStream<T> {

261

public <R> JavaDStream<R> map(Function<T, R> f);

262

public JavaDStream<T> filter(Function<T, Boolean> f);

263

public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);

264

public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);

265

}

266

```

267

268

[Java API](./java-api.md)

269

270

### Event Monitoring

271

272

Listener system for monitoring streaming application performance, batch processing, and receiver status.

273

274

```scala { .api }

275

trait StreamingListener {

276

def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit

277

def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit

278

def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit

279

def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit

280

def onReceiverError(receiverError: StreamingListenerReceiverError): Unit

281

}

282

```

283

284

[Event Monitoring](./event-monitoring.md)

285

286

## Types

287

288

```scala { .api }

289

// Time-related types

290

case class Duration(milliseconds: Long) {

291

def +(that: Duration): Duration

292

def -(that: Duration): Duration

293

def *(times: Int): Duration

294

def /(that: Duration): Double

295

}

296

297

case class Time(milliseconds: Long) {

298

def +(that: Duration): Time

299

def -(that: Duration): Time

300

def -(that: Time): Duration

301

}

302

303

case class Interval(beginTime: Time, endTime: Time) {

304

def duration(): Duration

305

}

306

307

// Helper objects

308

object Seconds {

309

def apply(seconds: Long): Duration

310

}

311

312

object Minutes {

313

def apply(minutes: Long): Duration

314

}

315

316

object Milliseconds {

317

def apply(milliseconds: Long): Duration

318

}

319

```