CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-2-13

Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Spark Streaming

Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.

⚠️ Deprecation Notice: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.

Package Information

  • Package Name: spark-streaming_2.13
  • Package Type: maven
  • Language: Scala (with Java API)
  • Version: 3.5.6
  • Installation: Add to pom.xml or build.sbt

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.13</artifactId>
    <version>3.5.6</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"

Core Imports

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.scheduler._

For Java API:

import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.scheduler.*;

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._

// Create streaming context with 2-second batch interval
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))

// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)

// Transform and process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output results
wordCounts.print()

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Architecture

Apache Spark Streaming processes live data streams using micro-batches:

  • StreamingContext: Main entry point that coordinates stream processing
  • DStream: Fundamental abstraction representing a continuous sequence of RDDs
  • Receivers: Components that ingest data from external sources (Kafka, Flume, etc.)
  • Transformations: Operations that create new DStreams from existing ones
  • Output Operations: Actions that write DStream data to external systems
  • Checkpointing: Fault tolerance mechanism for stateful operations

Capabilities

Core Streaming

Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  def start(): Unit
  def stop(stopSparkContext: Boolean = true): Unit
  def awaitTermination(): Unit
}

abstract class DStream[T] {
  def map[U](mapFunc: T => U): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def foreachRDD(func: RDD[T] => Unit): Unit
}

case class Duration(milliseconds: Long)
object Seconds { def apply(seconds: Long): Duration }

Core Streaming

Input Sources

Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.

class StreamingContext {
  def socketTextStream(hostname: String, port: Int): DStream[String]
  def textFileStream(directory: String): DStream[String]
  def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]
  def queueStream[T](queue: Queue[RDD[T]]): DStream[T]
}

Input Sources

State Management

Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.

abstract class State[S] {
  def exists(): Boolean
  def get(): S
  def update(newState: S): Unit
  def remove(): Unit
}

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
  def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
  def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}

State Management

Java API

Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.

public class JavaStreamingContext {
  public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
  public void start();
  public void awaitTermination();
}

public class JavaDStream<T> {
  public <R> JavaDStream<R> map(Function<T, R> f);
  public JavaDStream<T> filter(Function<T, Boolean> f);
  public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}

Java API

Monitoring and Listeners

Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.

trait StreamingListener {
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
}

case class BatchInfo(
  batchTime: Time,
  streamIdToInputInfo: Map[Int, StreamInputInfo],
  submissionTime: Long,
  processingStartTime: Option[Long],
  processingEndTime: Option[Long]
)

Monitoring and Listeners

Web UI

Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.

class StreamingTab(parent: SparkUI)
class StreamingPage(parent: StreamingTab)
class StreamingJobProgressListener extends StreamingListener

Web UI

Types

// Time and Duration
case class Time(milliseconds: Long) {
  def +(that: Duration): Time
  def -(that: Duration): Time
  def -(that: Time): Duration
}

case class Duration(milliseconds: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration
  def *(times: Int): Duration
  def <(that: Duration): Boolean
  def <=(that: Duration): Boolean
}

case class Interval(beginTime: Time, endTime: Time)

// State Management Types
sealed trait ValidationResult[T]
case class ValidationError(field: String, message: String)

// Streaming Context States
object StreamingContextState extends Enumeration {
  val INITIALIZED, ACTIVE, STOPPED = Value
}

// Input Stream Information
case class StreamInputInfo(
  inputStreamId: Int,
  numRecords: Long,
  metadata: Map[String, Any] = Map.empty
) {
  def metadataDescription: Option[String]
}
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.13@3.5.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-streaming-2-13 badge