Apache Spark Streaming integration with ZeroMQ message queue system for real-time data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10@1.6.0Spark Streaming ZeroMQ provides Apache Spark Streaming integration with ZeroMQ, a high-performance distributed messaging library. It enables Spark applications to consume real-time data streams from ZeroMQ publishers through a publish-subscribe pattern, supporting both Scala and Java APIs for distributed stream processing.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-zeromq_2.10</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
import org.apache.spark.SparkConf
import akka.zeromq.Subscribe
import akka.util.ByteStringimport org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy;
import org.apache.spark.SparkConf;
import akka.zeromq.Subscribe;
import akka.util.ByteString;import org.apache.spark.streaming.zeromq.ZeroMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import akka.zeromq.Subscribe
import akka.util.ByteString
// Create streaming context
val sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Define ZeroMQ connection parameters
val publisherUrl = "tcp://localhost:5555"
val subscribe = Subscribe("topic".getBytes)
// Define message converter function
val bytesToObjects = (bytes: Seq[ByteString]) => {
bytes.map(_.utf8String).iterator
}
// Create ZeroMQ input stream
val zmqStream = ZeroMQUtils.createStream(
ssc,
publisherUrl,
subscribe,
bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2
)
// Process the stream
zmqStream.foreachRDD { rdd =>
rdd.foreach(message => println(s"Received: $message"))
}
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.zeromq.ZeroMQUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import akka.zeromq.Subscribe;
import akka.util.ByteString;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
// Create streaming context
SparkConf sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
// Define ZeroMQ connection parameters
String publisherUrl = "tcp://localhost:5555";
Subscribe subscribe = new Subscribe("topic".getBytes());
// Define message converter function
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] bytes) throws Exception {
return Arrays.stream(bytes)
.map(String::new)
.collect(Collectors.toList());
}
};
// Create ZeroMQ input stream
JavaReceiverInputDStream<String> zmqStream = ZeroMQUtils.<String>createStream(
jssc,
publisherUrl,
subscribe,
bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2()
);
// Process the stream
zmqStream.foreachRDD(rdd -> {
rdd.foreach(message -> System.out.println("Received: " + message));
});
jssc.start();
jssc.awaitTermination();Spark Streaming ZeroMQ is built around several key components:
Creates a ZeroMQ input stream for Spark Streaming with full configuration options.
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param ssc StreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* to translate from sequence of sequence of bytes
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
* @param supervisorStrategy Actor supervisor strategy for fault tolerance
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]Creates a ZeroMQ input stream for Java applications with custom storage level and supervisor strategy.
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter to translate
* from sequence of sequence of bytes
* @param storageLevel Storage level to use for storing the received objects
* @param supervisorStrategy Actor supervisor strategy for fault tolerance
*/
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String publisherUrl,
Subscribe subscribe,
Function<byte[][], Iterable<T>> bytesToObjects,
StorageLevel storageLevel,
SupervisorStrategy supervisorStrategy
)Creates a ZeroMQ input stream for Java applications with custom storage level.
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter to translate
* from sequence of sequence of bytes
* @param storageLevel RDD storage level
*/
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String publisherUrl,
Subscribe subscribe,
Function<byte[][], Iterable<T>> bytesToObjects,
StorageLevel storageLevel
)Creates a ZeroMQ input stream for Java applications with default configuration.
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter to translate
* from sequence of sequence of bytes
*/
public static <T> JavaReceiverInputDStream<T> createStream(
JavaStreamingContext jssc,
String publisherUrl,
Subscribe subscribe,
Function<byte[][], Iterable<T>> bytesToObjects
)// Spark configuration object
class SparkConf {
def setAppName(name: String): SparkConf
def setMaster(master: String): SparkConf
}
// Spark Streaming context for creating streams
class StreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class StreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated
// Java wrapper for StreamingContext
class JavaStreamingContext(sparkConf: SparkConf, batchDuration: Duration)
class JavaStreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated
// Receiver-based input stream for Scala
class ReceiverInputDStream[T: ClassTag] extends InputDStream[T]
// Java wrapper for receiver input stream
class JavaReceiverInputDStream[T] extends JavaInputDStream[T]
// RDD storage configuration
object StorageLevel {
val MEMORY_ONLY: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
// ... other storage levels
}// ZeroMQ subscription configuration
case class Subscribe(topic: ByteString)
// Efficient byte string representation
class ByteString {
def utf8String: String
def toArray: Array[Byte]
}
// Actor supervision strategy for fault tolerance
abstract class SupervisorStrategy {
def decider: Decider
}// Scala converter function type
type BytesToObjects[T] = Seq[ByteString] => Iterator[T]
// Java converter function interface
interface Function<T, R> {
R call(T input) throws Exception;
}The ZeroMQ integration includes several error handling mechanisms:
Common Error Scenarios:
Configuration Options:
SupervisorStrategy.defaultStrategy for standard fault toleranceStorageLevel with replication for high availabilitybytesToObjects functions with proper error handling