CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10

Apache Spark Streaming integration with ZeroMQ message queue system for real-time data processing

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Spark Streaming ZeroMQ

Spark Streaming ZeroMQ provides Apache Spark Streaming integration with ZeroMQ, a high-performance distributed messaging library. It enables Spark applications to consume real-time data streams from ZeroMQ publishers through a publish-subscribe pattern, supporting both Scala and Java APIs for distributed stream processing.

Package Information

  • Package Name: spark-streaming-zeromq_2.10
  • Package Type: Maven
  • Language: Scala (with Java compatibility)
  • Group ID: org.apache.spark
  • Artifact ID: spark-streaming-zeromq_2.10
  • Version: 1.6.3
  • Maven Dependency:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-zeromq_2.10</artifactId>
      <version>1.6.3</version>
    </dependency>

Core Imports

Scala Imports

import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
import org.apache.spark.SparkConf
import akka.zeromq.Subscribe
import akka.util.ByteString

Java Imports

import org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy;
import org.apache.spark.SparkConf;
import akka.zeromq.Subscribe;
import akka.util.ByteString;

Basic Usage

Scala Example

import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import akka.zeromq.Subscribe
import akka.util.ByteString

// Create streaming context
val sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Define ZeroMQ connection parameters
val publisherUrl = "tcp://localhost:5555"
val subscribe = Subscribe("topic".getBytes)

// Define message converter function
val bytesToObjects = (bytes: Seq[ByteString]) => {
  bytes.map(_.utf8String).iterator
}

// Create ZeroMQ input stream
val zmqStream = ZeroMQUtils.createStream(
  ssc,
  publisherUrl,
  subscribe,
  bytesToObjects,
  StorageLevel.MEMORY_AND_DISK_SER_2
)

// Process the stream
zmqStream.foreachRDD { rdd =>
  rdd.foreach(message => println(s"Received: $message"))
}

ssc.start()
ssc.awaitTermination()

Java Example

import org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import akka.zeromq.Subscribe;
import akka.util.ByteString;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

// Create streaming context
SparkConf sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

// Define ZeroMQ connection parameters
String publisherUrl = "tcp://localhost:5555";
Subscribe subscribe = new Subscribe("topic".getBytes());

// Define message converter function
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
    @Override
    public Iterable<String> call(byte[][] bytes) throws Exception {
        return Arrays.stream(bytes)
            .map(String::new)
            .collect(Collectors.toList());
    }
};

// Create ZeroMQ input stream
JavaReceiverInputDStream<String> zmqStream = ZeroMQUtils.<String>createStream(
    jssc,
    publisherUrl,
    subscribe,
    bytesToObjects,
    StorageLevel.MEMORY_AND_DISK_SER_2()
);

// Process the stream
zmqStream.foreachRDD(rdd -> {
    rdd.foreach(message -> System.out.println("Received: " + message));
});

jssc.start();
jssc.awaitTermination();

Architecture

Spark Streaming ZeroMQ is built around several key components:

  • ZeroMQUtils: Main utility object providing factory methods for creating ZeroMQ input streams
  • ZeroMQReceiver: Internal Akka actor-based receiver that handles ZeroMQ message reception and forwarding to Spark
  • Message Conversion: User-defined functions to convert ZeroMQ byte sequences into typed objects
  • Fault Tolerance: Built-in supervisor strategies for handling actor failures and ensuring reliable message processing
  • Storage Integration: Configurable RDD storage levels for controlling data persistence and replication

Capabilities

Stream Creation (Scala API)

Creates a ZeroMQ input stream for Spark Streaming with full configuration options.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param ssc StreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
 *                       and each frame has sequence of byte thus it needs the converter
 *                       to translate from sequence of sequence of bytes
 * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
 * @param supervisorStrategy Actor supervisor strategy for fault tolerance
 */
def createStream[T: ClassTag](
    ssc: StreamingContext,
    publisherUrl: String,
    subscribe: Subscribe,
    bytesToObjects: Seq[ByteString] => Iterator[T],
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
    supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]

Stream Creation (Java API - Full Configuration)

Creates a ZeroMQ input stream for Java applications with custom storage level and supervisor strategy.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote ZeroMQ publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 * @param storageLevel Storage level to use for storing the received objects
 * @param supervisorStrategy Actor supervisor strategy for fault tolerance
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects,
    StorageLevel storageLevel,
    SupervisorStrategy supervisorStrategy
)

Stream Creation (Java API - With Storage Level)

Creates a ZeroMQ input stream for Java applications with custom storage level.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 * @param storageLevel RDD storage level
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects,
    StorageLevel storageLevel
)

Stream Creation (Java API - Basic)

Creates a ZeroMQ input stream for Java applications with default configuration.

/**
 * Create an input stream that receives messages pushed by a zeromq publisher.
 * @param jssc JavaStreamingContext object
 * @param publisherUrl Url of remote zeromq publisher
 * @param subscribe Topic to subscribe to
 * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
 *                       frame has sequence of byte thus it needs the converter to translate
 *                       from sequence of sequence of bytes
 */
public static <T> JavaReceiverInputDStream<T> createStream(
    JavaStreamingContext jssc,
    String publisherUrl,
    Subscribe subscribe,
    Function<byte[][], Iterable<T>> bytesToObjects
)

Types

Core Spark Types

// Spark configuration object
class SparkConf {
  def setAppName(name: String): SparkConf
  def setMaster(master: String): SparkConf
}

// Spark Streaming context for creating streams
class StreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class StreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

// Java wrapper for StreamingContext  
class JavaStreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class JavaStreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

// Receiver-based input stream for Scala
class ReceiverInputDStream[T: ClassTag] extends InputDStream[T]

// Java wrapper for receiver input stream
class JavaReceiverInputDStream[T] extends JavaInputDStream[T]

// RDD storage configuration
object StorageLevel {
  val MEMORY_ONLY: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  // ... other storage levels
}

Akka ZeroMQ Types

// ZeroMQ subscription configuration
case class Subscribe(topic: ByteString)

// Efficient byte string representation
class ByteString {
  def utf8String: String
  def toArray: Array[Byte]
}

// Actor supervision strategy for fault tolerance
abstract class SupervisorStrategy {
  def decider: Decider
}

Function Types

// Scala converter function type
type BytesToObjects[T] = Seq[ByteString] => Iterator[T]

// Java converter function interface
interface Function<T, R> {
  R call(T input) throws Exception;
}

Error Handling

The ZeroMQ integration includes several error handling mechanisms:

  • Actor Supervision: Uses Akka's supervision strategies to handle receiver actor failures
  • Connection Failures: Automatic reconnection attempts when ZeroMQ publisher becomes unavailable
  • Message Processing Errors: Supervisor strategies can be configured to restart, resume, or stop on processing failures
  • Storage Failures: RDD storage level configuration controls data replication and recovery options

Common Error Scenarios:

  • Network Issues: Connection timeouts or network partitions to ZeroMQ publisher
  • Message Format Errors: Invalid message formats that cannot be processed by bytesToObjects converter
  • Resource Exhaustion: Memory or disk space issues during message buffering
  • Publisher Unavailable: ZeroMQ publisher process crashes or becomes unreachable

Configuration Options:

  • Use SupervisorStrategy.defaultStrategy for standard fault tolerance
  • Configure custom StorageLevel with replication for high availability
  • Implement robust bytesToObjects functions with proper error handling
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-zeromq_2.10@1.6.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10 badge