Apache Spark Streaming integration with ZeroMQ message queue system for real-time data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-zeromq-2-10@1.6.00
# Spark Streaming ZeroMQ
1
2
Spark Streaming ZeroMQ provides Apache Spark Streaming integration with ZeroMQ, a high-performance distributed messaging library. It enables Spark applications to consume real-time data streams from ZeroMQ publishers through a publish-subscribe pattern, supporting both Scala and Java APIs for distributed stream processing.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-zeromq_2.10
7
- **Package Type**: Maven
8
- **Language**: Scala (with Java compatibility)
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-streaming-zeromq_2.10
11
- **Version**: 1.6.3
12
- **Maven Dependency**:
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-streaming-zeromq_2.10</artifactId>
17
<version>1.6.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
### Scala Imports
24
25
```scala
26
import org.apache.spark.streaming.zeromq.ZeroMQUtils
27
import org.apache.spark.streaming.{StreamingContext, Seconds}
28
import org.apache.spark.storage.StorageLevel
29
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
30
import org.apache.spark.SparkConf
31
import akka.zeromq.Subscribe
32
import akka.util.ByteString
33
```
34
35
### Java Imports
36
37
```java
38
import org.apache.spark.streaming.zeromq.ZeroMQUtils;
39
import org.apache.spark.streaming.api.java.JavaStreamingContext;
40
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
41
import org.apache.spark.storage.StorageLevel;
42
import org.apache.spark.api.java.function.Function;
43
import org.apache.spark.streaming.receiver.ActorSupervisorStrategy;
44
import org.apache.spark.SparkConf;
45
import akka.zeromq.Subscribe;
46
import akka.util.ByteString;
47
```
48
49
## Basic Usage
50
51
### Scala Example
52
53
```scala
54
import org.apache.spark.streaming.zeromq.ZeroMQUtils
55
import org.apache.spark.streaming.{Seconds, StreamingContext}
56
import org.apache.spark.storage.StorageLevel
57
import akka.zeromq.Subscribe
58
import akka.util.ByteString
59
60
// Create streaming context
61
val sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]")
62
val ssc = new StreamingContext(sparkConf, Seconds(1))
63
64
// Define ZeroMQ connection parameters
65
val publisherUrl = "tcp://localhost:5555"
66
val subscribe = Subscribe("topic".getBytes)
67
68
// Define message converter function
69
val bytesToObjects = (bytes: Seq[ByteString]) => {
70
bytes.map(_.utf8String).iterator
71
}
72
73
// Create ZeroMQ input stream
74
val zmqStream = ZeroMQUtils.createStream(
75
ssc,
76
publisherUrl,
77
subscribe,
78
bytesToObjects,
79
StorageLevel.MEMORY_AND_DISK_SER_2
80
)
81
82
// Process the stream
83
zmqStream.foreachRDD { rdd =>
84
rdd.foreach(message => println(s"Received: $message"))
85
}
86
87
ssc.start()
88
ssc.awaitTermination()
89
```
90
91
### Java Example
92
93
```java
94
import org.apache.spark.streaming.zeromq.ZeroMQUtils;
95
import org.apache.spark.streaming.api.java.JavaStreamingContext;
96
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
97
import org.apache.spark.storage.StorageLevel;
98
import org.apache.spark.api.java.function.Function;
99
import org.apache.spark.streaming.Durations;
100
import akka.zeromq.Subscribe;
101
import akka.util.ByteString;
102
import java.util.Arrays;
103
import java.util.List;
104
import java.util.stream.Collectors;
105
106
// Create streaming context
107
SparkConf sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]");
108
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
109
110
// Define ZeroMQ connection parameters
111
String publisherUrl = "tcp://localhost:5555";
112
Subscribe subscribe = new Subscribe("topic".getBytes());
113
114
// Define message converter function
115
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
116
@Override
117
public Iterable<String> call(byte[][] bytes) throws Exception {
118
return Arrays.stream(bytes)
119
.map(String::new)
120
.collect(Collectors.toList());
121
}
122
};
123
124
// Create ZeroMQ input stream
125
JavaReceiverInputDStream<String> zmqStream = ZeroMQUtils.<String>createStream(
126
jssc,
127
publisherUrl,
128
subscribe,
129
bytesToObjects,
130
StorageLevel.MEMORY_AND_DISK_SER_2()
131
);
132
133
// Process the stream
134
zmqStream.foreachRDD(rdd -> {
135
rdd.foreach(message -> System.out.println("Received: " + message));
136
});
137
138
jssc.start();
139
jssc.awaitTermination();
140
```
141
142
## Architecture
143
144
Spark Streaming ZeroMQ is built around several key components:
145
146
- **ZeroMQUtils**: Main utility object providing factory methods for creating ZeroMQ input streams
147
- **ZeroMQReceiver**: Internal Akka actor-based receiver that handles ZeroMQ message reception and forwarding to Spark
148
- **Message Conversion**: User-defined functions to convert ZeroMQ byte sequences into typed objects
149
- **Fault Tolerance**: Built-in supervisor strategies for handling actor failures and ensuring reliable message processing
150
- **Storage Integration**: Configurable RDD storage levels for controlling data persistence and replication
151
152
## Capabilities
153
154
### Stream Creation (Scala API)
155
156
Creates a ZeroMQ input stream for Spark Streaming with full configuration options.
157
158
```scala { .api }
159
/**
160
* Create an input stream that receives messages pushed by a zeromq publisher.
161
* @param ssc StreamingContext object
162
* @param publisherUrl Url of remote zeromq publisher
163
* @param subscribe Topic to subscribe to
164
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
165
* and each frame has sequence of byte thus it needs the converter
166
* to translate from sequence of sequence of bytes
167
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
168
* @param supervisorStrategy Actor supervisor strategy for fault tolerance
169
*/
170
def createStream[T: ClassTag](
171
ssc: StreamingContext,
172
publisherUrl: String,
173
subscribe: Subscribe,
174
bytesToObjects: Seq[ByteString] => Iterator[T],
175
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
176
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
177
): ReceiverInputDStream[T]
178
```
179
180
### Stream Creation (Java API - Full Configuration)
181
182
Creates a ZeroMQ input stream for Java applications with custom storage level and supervisor strategy.
183
184
```java { .api }
185
/**
186
* Create an input stream that receives messages pushed by a zeromq publisher.
187
* @param jssc JavaStreamingContext object
188
* @param publisherUrl Url of remote ZeroMQ publisher
189
* @param subscribe Topic to subscribe to
190
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
191
* frame has sequence of byte thus it needs the converter to translate
192
* from sequence of sequence of bytes
193
* @param storageLevel Storage level to use for storing the received objects
194
* @param supervisorStrategy Actor supervisor strategy for fault tolerance
195
*/
196
public static <T> JavaReceiverInputDStream<T> createStream(
197
JavaStreamingContext jssc,
198
String publisherUrl,
199
Subscribe subscribe,
200
Function<byte[][], Iterable<T>> bytesToObjects,
201
StorageLevel storageLevel,
202
SupervisorStrategy supervisorStrategy
203
)
204
```
205
206
### Stream Creation (Java API - With Storage Level)
207
208
Creates a ZeroMQ input stream for Java applications with custom storage level.
209
210
```java { .api }
211
/**
212
* Create an input stream that receives messages pushed by a zeromq publisher.
213
* @param jssc JavaStreamingContext object
214
* @param publisherUrl Url of remote zeromq publisher
215
* @param subscribe Topic to subscribe to
216
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
217
* frame has sequence of byte thus it needs the converter to translate
218
* from sequence of sequence of bytes
219
* @param storageLevel RDD storage level
220
*/
221
public static <T> JavaReceiverInputDStream<T> createStream(
222
JavaStreamingContext jssc,
223
String publisherUrl,
224
Subscribe subscribe,
225
Function<byte[][], Iterable<T>> bytesToObjects,
226
StorageLevel storageLevel
227
)
228
```
229
230
### Stream Creation (Java API - Basic)
231
232
Creates a ZeroMQ input stream for Java applications with default configuration.
233
234
```java { .api }
235
/**
236
* Create an input stream that receives messages pushed by a zeromq publisher.
237
* @param jssc JavaStreamingContext object
238
* @param publisherUrl Url of remote zeromq publisher
239
* @param subscribe Topic to subscribe to
240
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
241
* frame has sequence of byte thus it needs the converter to translate
242
* from sequence of sequence of bytes
243
*/
244
public static <T> JavaReceiverInputDStream<T> createStream(
245
JavaStreamingContext jssc,
246
String publisherUrl,
247
Subscribe subscribe,
248
Function<byte[][], Iterable<T>> bytesToObjects
249
)
250
```
251
252
## Types
253
254
### Core Spark Types
255
256
```scala { .api }
257
// Spark configuration object
258
class SparkConf {
259
def setAppName(name: String): SparkConf
260
def setMaster(master: String): SparkConf
261
}
262
263
// Spark Streaming context for creating streams
264
class StreamingContext(sparkConf: SparkConf, batchDuration: Duration)
265
class StreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated
266
267
// Java wrapper for StreamingContext
268
class JavaStreamingContext(sparkConf: SparkConf, batchDuration: Duration)
269
class JavaStreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated
270
271
// Receiver-based input stream for Scala
272
class ReceiverInputDStream[T: ClassTag] extends InputDStream[T]
273
274
// Java wrapper for receiver input stream
275
class JavaReceiverInputDStream[T] extends JavaInputDStream[T]
276
277
// RDD storage configuration
278
object StorageLevel {
279
val MEMORY_ONLY: StorageLevel
280
val MEMORY_AND_DISK: StorageLevel
281
val MEMORY_AND_DISK_SER: StorageLevel
282
val MEMORY_AND_DISK_SER_2: StorageLevel
283
// ... other storage levels
284
}
285
```
286
287
### Akka ZeroMQ Types
288
289
```scala { .api }
290
// ZeroMQ subscription configuration
291
case class Subscribe(topic: ByteString)
292
293
// Efficient byte string representation
294
class ByteString {
295
def utf8String: String
296
def toArray: Array[Byte]
297
}
298
299
// Actor supervision strategy for fault tolerance
300
abstract class SupervisorStrategy {
301
def decider: Decider
302
}
303
```
304
305
### Function Types
306
307
```scala { .api }
308
// Scala converter function type
309
type BytesToObjects[T] = Seq[ByteString] => Iterator[T]
310
311
// Java converter function interface
312
interface Function<T, R> {
313
R call(T input) throws Exception;
314
}
315
```
316
317
## Error Handling
318
319
The ZeroMQ integration includes several error handling mechanisms:
320
321
- **Actor Supervision**: Uses Akka's supervision strategies to handle receiver actor failures
322
- **Connection Failures**: Automatic reconnection attempts when ZeroMQ publisher becomes unavailable
323
- **Message Processing Errors**: Supervisor strategies can be configured to restart, resume, or stop on processing failures
324
- **Storage Failures**: RDD storage level configuration controls data replication and recovery options
325
326
**Common Error Scenarios:**
327
328
- **Network Issues**: Connection timeouts or network partitions to ZeroMQ publisher
329
- **Message Format Errors**: Invalid message formats that cannot be processed by bytesToObjects converter
330
- **Resource Exhaustion**: Memory or disk space issues during message buffering
331
- **Publisher Unavailable**: ZeroMQ publisher process crashes or becomes unreachable
332
333
**Configuration Options:**
334
335
- Use `SupervisorStrategy.defaultStrategy` for standard fault tolerance
336
- Configure custom `StorageLevel` with replication for high availability
337
- Implement robust `bytesToObjects` functions with proper error handling