0
# Spark Streaming MQTT
1
2
MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers. Built on the Eclipse Paho MQTT client, it provides seamless integration between IoT messaging systems and Spark's distributed streaming architecture.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-mqtt_2.10
7
- **Package Type**: maven
8
- **Language**: Scala (with Java and Python APIs)
9
- **Installation**:
10
11
**Maven:**
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming-mqtt_2.10</artifactId>
16
<version>1.6.3</version>
17
</dependency>
18
```
19
20
**Python (PySpark):**
21
```bash
22
# Install PySpark
23
pip install pyspark==1.6.3
24
25
# Add MQTT library to spark-submit
26
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.10:1.6.3 your_app.py
27
```
28
29
**SBT:**
30
```scala
31
libraryDependencies += "org.apache.spark" %% "spark-streaming-mqtt" % "1.6.3"
32
```
33
34
## Core Imports
35
36
Scala:
37
```scala
38
import org.apache.spark.streaming.mqtt.MQTTUtils
39
```
40
41
Java:
42
```java
43
import org.apache.spark.streaming.mqtt.MQTTUtils;
44
```
45
46
Python:
47
```python
48
from pyspark.streaming.mqtt import MQTTUtils
49
```
50
51
## Basic Usage
52
53
### Scala API
54
55
```scala
56
import org.apache.spark.streaming.StreamingContext
57
import org.apache.spark.streaming.mqtt.MQTTUtils
58
import org.apache.spark.storage.StorageLevel
59
60
val ssc = new StreamingContext(sparkConf, Seconds(2))
61
val brokerUrl = "tcp://localhost:1883"
62
val topic = "temperature/sensors"
63
64
// Create MQTT input stream with default storage level
65
val mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
66
67
// Process messages
68
mqttStream.foreachRDD { rdd =>
69
rdd.collect().foreach(message => println(s"Received: $message"))
70
}
71
72
ssc.start()
73
ssc.awaitTermination()
74
```
75
76
### Java API
77
78
```java
79
import org.apache.spark.streaming.api.java.JavaStreamingContext;
80
import org.apache.spark.streaming.mqtt.MQTTUtils;
81
import org.apache.spark.storage.StorageLevel;
82
83
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
84
String brokerUrl = "tcp://localhost:1883";
85
String topic = "temperature/sensors";
86
87
// Create MQTT input stream
88
JavaReceiverInputDStream<String> mqttStream =
89
MQTTUtils.createStream(jssc, brokerUrl, topic);
90
91
// Process messages
92
mqttStream.foreachRDD(rdd -> {
93
rdd.collect().forEach(message ->
94
System.out.println("Received: " + message));
95
});
96
97
jssc.start();
98
jssc.awaitTermination();
99
```
100
101
### Python API
102
103
```python
104
from pyspark import SparkContext
105
from pyspark.streaming import StreamingContext
106
from pyspark.streaming.mqtt import MQTTUtils
107
from pyspark.storagelevel import StorageLevel
108
109
sc = SparkContext(appName="MQTTStreamingApp")
110
ssc = StreamingContext(sc, 2) # 2 second batch interval
111
broker_url = "tcp://localhost:1883"
112
topic = "temperature/sensors"
113
114
# Create MQTT input stream with default storage level
115
mqtt_stream = MQTTUtils.createStream(ssc, broker_url, topic)
116
117
# Process messages
118
def process_rdd(rdd):
119
messages = rdd.collect()
120
for message in messages:
121
print(f"Received: {message}")
122
123
mqtt_stream.foreachRDD(process_rdd)
124
125
ssc.start()
126
ssc.awaitTermination()
127
```
128
129
## Capabilities
130
131
### MQTT Stream Creation (Scala API)
132
133
Creates an MQTT input stream for Scala applications with configurable storage levels.
134
135
```scala { .api }
136
object MQTTUtils {
137
/**
138
* Create an input stream that receives messages pushed by a MQTT publisher.
139
* @param ssc StreamingContext object
140
* @param brokerUrl Url of remote MQTT publisher
141
* @param topic Topic name to subscribe to
142
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
143
* @return ReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
144
*/
145
def createStream(
146
ssc: StreamingContext,
147
brokerUrl: String,
148
topic: String,
149
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
150
): ReceiverInputDStream[String]
151
}
152
```
153
154
**Usage Example:**
155
156
```scala
157
import org.apache.spark.storage.StorageLevel
158
159
// With default storage level
160
val mqttStream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")
161
162
// With custom storage level
163
val mqttStreamCustom = MQTTUtils.createStream(
164
ssc,
165
"tcp://broker:1883",
166
"sensors/data",
167
StorageLevel.MEMORY_ONLY_2
168
)
169
```
170
171
### MQTT Stream Creation (Java API - Default Storage)
172
173
Creates an MQTT input stream for Java applications with default storage level.
174
175
```java { .api }
176
/**
177
* Create an input stream that receives messages pushed by a MQTT publisher.
178
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
179
* @param jssc JavaStreamingContext object
180
* @param brokerUrl Url of remote MQTT publisher
181
* @param topic Topic name to subscribe to
182
* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
183
*/
184
public static JavaReceiverInputDStream<String> createStream(
185
JavaStreamingContext jssc,
186
String brokerUrl,
187
String topic
188
);
189
```
190
191
### MQTT Stream Creation (Java API - Custom Storage)
192
193
Creates an MQTT input stream for Java applications with configurable storage level.
194
195
```java { .api }
196
/**
197
* Create an input stream that receives messages pushed by a MQTT publisher.
198
* @param jssc JavaStreamingContext object
199
* @param brokerUrl Url of remote MQTT publisher
200
* @param topic Topic name to subscribe to
201
* @param storageLevel RDD storage level
202
* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
203
*/
204
public static JavaReceiverInputDStream<String> createStream(
205
JavaStreamingContext jssc,
206
String brokerUrl,
207
String topic,
208
StorageLevel storageLevel
209
);
210
```
211
212
**Usage Example:**
213
214
```java
215
import org.apache.spark.storage.StorageLevel;
216
217
// With default storage level
218
JavaReceiverInputDStream<String> mqttStream =
219
MQTTUtils.createStream(jssc, "tcp://broker:1883", "sensors/data");
220
221
// With custom storage level
222
JavaReceiverInputDStream<String> mqttStreamCustom =
223
MQTTUtils.createStream(
224
jssc,
225
"tcp://broker:1883",
226
"sensors/data",
227
StorageLevel.MEMORY_ONLY_2()
228
);
229
```
230
231
### MQTT Stream Creation (Python API)
232
233
Creates an MQTT input stream for Python applications using PySpark.
234
235
```python { .api }
236
@staticmethod
237
def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
238
"""
239
Create an input stream that pulls messages from a MQTT Broker.
240
241
Args:
242
ssc: StreamingContext object
243
brokerUrl: Url of remote MQTT publisher
244
topic: Topic name to subscribe to
245
storageLevel: RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2
246
247
Returns:
248
DStream: A DStream object containing MQTT messages as UTF-8 strings
249
"""
250
```
251
252
**Usage Example:**
253
254
```python
255
from pyspark.storagelevel import StorageLevel
256
257
# With default storage level
258
mqtt_stream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")
259
260
# With custom storage level
261
mqtt_stream_custom = MQTTUtils.createStream(
262
ssc,
263
"tcp://broker:1883",
264
"sensors/data",
265
StorageLevel.MEMORY_ONLY_2
266
)
267
```
268
269
## Types
270
271
### ReceiverInputDStream[String]
272
```scala { .api }
273
/**
274
* Scala DStream that receives MQTT messages as UTF-8 encoded strings.
275
* Extends Spark's ReceiverInputDStream for distributed message processing.
276
*/
277
class ReceiverInputDStream[String] extends InputDStream[String]
278
```
279
280
### JavaReceiverInputDStream[String]
281
```java { .api }
282
/**
283
* Java wrapper for ReceiverInputDStream providing MQTT messages as UTF-8 encoded strings.
284
* Provides Java-friendly API for distributed message processing.
285
*/
286
public class JavaReceiverInputDStream<String> extends JavaInputDStream<String>
287
```
288
289
### DStream (Python)
290
```python { .api }
291
"""
292
Python DStream that receives MQTT messages as UTF-8 encoded strings.
293
Extends PySpark's DStream for distributed message processing in Python applications.
294
"""
295
class DStream:
296
def foreachRDD(self, func):
297
"""Apply a function to each RDD in the stream"""
298
299
def transform(self, func):
300
"""Transform each RDD in the stream using a function"""
301
302
def collect(self):
303
"""Collect all elements from the stream"""
304
```
305
306
### StorageLevel
307
```scala { .api }
308
/**
309
* Defines how RDDs should be stored and replicated across the cluster.
310
* Common values for MQTT streams:
311
* - MEMORY_AND_DISK_SER_2: Default, serialized in memory with disk fallback, 2x replication
312
* - MEMORY_ONLY_2: In memory only with 2x replication
313
* - MEMORY_AND_DISK_2: In memory with disk fallback, 2x replication
314
*/
315
object StorageLevel {
316
val MEMORY_AND_DISK_SER_2: StorageLevel
317
val MEMORY_ONLY_2: StorageLevel
318
val MEMORY_AND_DISK_2: StorageLevel
319
}
320
```
321
322
### StreamingContext (Scala)
323
```scala { .api }
324
/**
325
* Main entry point for Spark Streaming functionality in Scala.
326
* Used to create DStreams and manage streaming computations.
327
*/
328
class StreamingContext(
329
sparkConf: SparkConf,
330
batchDuration: Duration
331
) {
332
def start(): Unit
333
def awaitTermination(): Unit
334
def stop(): Unit
335
}
336
```
337
338
### JavaStreamingContext (Java)
339
```java { .api }
340
/**
341
* Java API for StreamingContext, providing Java-friendly streaming functionality.
342
* Main entry point for Spark Streaming in Java applications.
343
*/
344
public class JavaStreamingContext {
345
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
346
public void start();
347
public void awaitTermination();
348
public void stop();
349
}
350
```
351
352
### StreamingContext (Python)
353
```python { .api }
354
"""
355
Python API for StreamingContext, main entry point for PySpark Streaming.
356
Used to create DStreams and manage streaming computations in Python.
357
"""
358
class StreamingContext:
359
def __init__(self, sparkContext, batchDuration):
360
"""Initialize with SparkContext and batch duration in seconds"""
361
362
def start(self):
363
"""Start the streaming computation"""
364
365
def awaitTermination(self):
366
"""Wait for streaming computation to terminate"""
367
368
def stop(self):
369
"""Stop the streaming computation"""
370
```
371
372
## Configuration
373
374
### MQTT Broker URL Format
375
- **TCP**: `tcp://hostname:port` (default port 1883)
376
- **SSL**: `ssl://hostname:port` (default port 8883)
377
- **WebSocket**: `ws://hostname:port/path`
378
- **Secure WebSocket**: `wss://hostname:port/path`
379
380
### Topic Subscription
381
- **Single Topic**: Exact topic name (e.g., `"sensors/temperature"`)
382
- **Wildcards**: MQTT supports `+` (single level) and `#` (multi-level) wildcards
383
- `"sensors/+/temperature"` - matches `sensors/room1/temperature`, `sensors/room2/temperature`
384
- `"sensors/#"` - matches all topics under `sensors/`
385
386
### Storage Level Guidelines
387
- **MEMORY_AND_DISK_SER_2**: Default, best for most use cases with fault tolerance
388
- **MEMORY_ONLY_2**: Faster but no disk fallback, use when memory is sufficient
389
- **MEMORY_AND_DISK_2**: Non-serialized, uses more memory but faster deserialization
390
391
## Error Handling
392
393
The MQTT receiver implements automatic error handling:
394
395
- **Connection Loss**: Automatically attempts to reconnect to the MQTT broker
396
- **Message Delivery**: Stores messages according to the specified StorageLevel for fault tolerance
397
- **Receiver Restart**: On connection failure, the receiver restarts and re-establishes subscription
398
399
## Dependencies
400
401
- **Eclipse Paho MQTT Client**: v1.0.1 for MQTT protocol implementation
402
- **Apache Spark Streaming**: Core streaming functionality
403
- **Apache Spark Core**: Base Spark functionality
404
405
The Eclipse Paho dependency is automatically included when using this library.