or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10

Apache Spark Streaming integration with ZeroMQ message queue system for real-time data processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-zeromq_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10@1.6.0

index.mddocs/

Spark Streaming ZeroMQ

Spark Streaming ZeroMQ provides Apache Spark Streaming integration with ZeroMQ, a high-performance distributed messaging library. It enables Spark applications to consume real-time data streams from ZeroMQ publishers through a publish-subscribe pattern, supporting both Scala and Java APIs for distributed stream processing.

Package Information

  • Package Name: spark-streaming-zeromq_2.10
  • Package Type: Maven
  • Language: Scala (with Java compatibility)
  • Group ID: org.apache.spark
  • Artifact ID: spark-streaming-zeromq_2.10
  • Version: 1.6.3
  • Maven Dependency:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-zeromq_2.10</artifactId>
      <version>1.6.3</version>
    </dependency>

Core Imports

Scala Imports

import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
import org.apache.spark.SparkConf
import akka.zeromq.Subscribe
import akka.util.ByteString

Java Imports

import org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy;
import org.apache.spark.SparkConf;
import akka.zeromq.Subscribe;
import akka.util.ByteString;

Basic Usage

Scala Example

import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import akka.zeromq.Subscribe
import akka.util.ByteString

// Create streaming context
val sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Define ZeroMQ connection parameters
val publisherUrl = "tcp://localhost:5555"
val subscribe = Subscribe("topic".getBytes)

// Define message converter function
val bytesToObjects = (bytes: Seq[ByteString]) => {
  bytes.map(_.utf8String).iterator
}

// Create ZeroMQ input stream
val zmqStream = ZeroMQUtils.createStream(
  ssc,
  publisherUrl,
  subscribe,
  bytesToObjects,
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the stream
zmqStream.foreachRDD { rdd =>
  rdd.foreach(message => println(s"Received: $message"))
}

ssc.start()
ssc.awaitTermination()

Java Example

import org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import akka.zeromq.Subscribe;
import akka.util.ByteString;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

// Create streaming context
SparkConf sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

// Define ZeroMQ connection parameters
String publisherUrl = "tcp://localhost:5555";
Subscribe subscribe = new Subscribe("topic".getBytes());

// Define message converter function
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
    @Override
    public Iterable<String> call(byte[][] bytes) throws Exception {
        return Arrays.stream(bytes)
            .map(String::new)
            .collect(Collectors.toList());
    }
};

// Create ZeroMQ input stream
JavaReceiverInputDStream<String> zmqStream = ZeroMQUtils.<String>createStream(
    jssc,
    publisherUrl,
    subscribe,
    bytesToObjects,
    StorageLevel.MEMORY_AND_DISK_SER_2()
);

// Process the stream
zmqStream.foreachRDD(rdd -> {
    rdd.foreach(message -> System.out.println("Received: " + message));
});

jssc.start();
jssc.awaitTermination();

Architecture

Spark Streaming ZeroMQ is built around several key components:

  • ZeroMQUtils: Main utility object providing factory methods for creating ZeroMQ input streams
  • ZeroMQReceiver: Internal Akka actor-based receiver that handles ZeroMQ message reception and forwarding to Spark
  • Message Conversion: User-defined functions to convert ZeroMQ byte sequences into typed objects
  • Fault Tolerance: Built-in supervisor strategies for handling actor failures and ensuring reliable message processing
  • Storage Integration: Configurable RDD storage levels for controlling data persistence and replication

Capabilities

Stream Creation (Scala API)

Creates a ZeroMQ input stream for Spark Streaming with full configuration options.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param ssc StreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
 *                       and each frame has sequence of byte thus it needs the converter
 *                       to translate from sequence of sequence of bytes
 * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
 * @param supervisorStrategy Actor supervisor strategy for fault tolerance
 */
def createStream[T: ClassTag](
    ssc: StreamingContext,
    publisherUrl: String,
    subscribe: Subscribe,
    bytesToObjects: Seq[ByteString] => Iterator[T],
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
    supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]

Stream Creation (Java API - Full Configuration)

Creates a ZeroMQ input stream for Java applications with custom storage level and supervisor strategy.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote ZeroMQ publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 * @param storageLevel Storage level to use for storing the received objects
 * @param supervisorStrategy Actor supervisor strategy for fault tolerance
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects,
    StorageLevel storageLevel,
    SupervisorStrategy supervisorStrategy
)

Stream Creation (Java API - With Storage Level)

Creates a ZeroMQ input stream for Java applications with custom storage level.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 * @param storageLevel RDD storage level
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects,
    StorageLevel storageLevel
)

Stream Creation (Java API - Basic)

Creates a ZeroMQ input stream for Java applications with default configuration.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects
)

Types

Core Spark Types

// Spark configuration object
class SparkConf {
  def setAppName(name: String): SparkConf
  def setMaster(master: String): SparkConf
}

// Spark Streaming context for creating streams
class StreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class StreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

// Java wrapper for StreamingContext  
class JavaStreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class JavaStreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

// Receiver-based input stream for Scala
class ReceiverInputDStream[T: ClassTag] extends InputDStream[T]

// Java wrapper for receiver input stream
class JavaReceiverInputDStream[T] extends JavaInputDStream[T]

// RDD storage configuration
object StorageLevel {
  val MEMORY_ONLY: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  // ... other storage levels
}

Akka ZeroMQ Types

// ZeroMQ subscription configuration
case class Subscribe(topic: ByteString)

// Efficient byte string representation
class ByteString {
  def utf8String: String
  def toArray: Array[Byte]
}

// Actor supervision strategy for fault tolerance
abstract class SupervisorStrategy {
  def decider: Decider
}

Function Types

// Scala converter function type
type BytesToObjects[T] = Seq[ByteString] => Iterator[T]

// Java converter function interface
interface Function<T, R> {
  R call(T input) throws Exception;
}

Error Handling

The ZeroMQ integration includes several error handling mechanisms:

  • Actor Supervision: Uses Akka's supervision strategies to handle receiver actor failures
  • Connection Failures: Automatic reconnection attempts when ZeroMQ publisher becomes unavailable
  • Message Processing Errors: Supervisor strategies can be configured to restart, resume, or stop on processing failures
  • Storage Failures: RDD storage level configuration controls data replication and recovery options

Common Error Scenarios:

  • Network Issues: Connection timeouts or network partitions to ZeroMQ publisher
  • Message Format Errors: Invalid message formats that cannot be processed by bytesToObjects converter
  • Resource Exhaustion: Memory or disk space issues during message buffering
  • Publisher Unavailable: ZeroMQ publisher process crashes or becomes unreachable

Configuration Options:

  • Use SupervisorStrategy.defaultStrategy for standard fault tolerance
  • Configure custom StorageLevel with replication for high availability
  • Implement robust bytesToObjects functions with proper error handling