MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-mqtt-2-10@1.6.0MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers. Built on the Eclipse Paho MQTT client, it provides seamless integration between IoT messaging systems and Spark's distributed streaming architecture.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_2.10</artifactId>
<version>1.6.3</version>
</dependency>Python (PySpark):
# Install PySpark
pip install pyspark==1.6.3
# Add MQTT library to spark-submit
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.10:1.6.3 your_app.pySBT:
libraryDependencies += "org.apache.spark" %% "spark-streaming-mqtt" % "1.6.3"Scala:
import org.apache.spark.streaming.mqtt.MQTTUtilsJava:
import org.apache.spark.streaming.mqtt.MQTTUtils;Python:
from pyspark.streaming.mqtt import MQTTUtilsimport org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.mqtt.MQTTUtils
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sparkConf, Seconds(2))
val brokerUrl = "tcp://localhost:1883"
val topic = "temperature/sensors"
// Create MQTT input stream with default storage level
val mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
// Process messages
mqttStream.foreachRDD { rdd =>
rdd.collect().foreach(message => println(s"Received: $message"))
}
ssc.start()
ssc.awaitTermination()import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.mqtt.MQTTUtils;
import org.apache.spark.storage.StorageLevel;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
String brokerUrl = "tcp://localhost:1883";
String topic = "temperature/sensors";
// Create MQTT input stream
JavaReceiverInputDStream<String> mqttStream =
MQTTUtils.createStream(jssc, brokerUrl, topic);
// Process messages
mqttStream.foreachRDD(rdd -> {
rdd.collect().forEach(message ->
System.out.println("Received: " + message));
});
jssc.start();
jssc.awaitTermination();from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.storagelevel import StorageLevel
sc = SparkContext(appName="MQTTStreamingApp")
ssc = StreamingContext(sc, 2) # 2 second batch interval
broker_url = "tcp://localhost:1883"
topic = "temperature/sensors"
# Create MQTT input stream with default storage level
mqtt_stream = MQTTUtils.createStream(ssc, broker_url, topic)
# Process messages
def process_rdd(rdd):
messages = rdd.collect()
for message in messages:
print(f"Received: {message}")
mqtt_stream.foreachRDD(process_rdd)
ssc.start()
ssc.awaitTermination()Creates an MQTT input stream for Scala applications with configurable storage levels.
object MQTTUtils {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* @param ssc StreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
* @return ReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
*/
def createStream(
ssc: StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
}Usage Example:
import org.apache.spark.storage.StorageLevel
// With default storage level
val mqttStream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")
// With custom storage level
val mqttStreamCustom = MQTTUtils.createStream(
ssc,
"tcp://broker:1883",
"sensors/data",
StorageLevel.MEMORY_ONLY_2
)Creates an MQTT input stream for Java applications with default storage level.
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
*/
public static JavaReceiverInputDStream<String> createStream(
JavaStreamingContext jssc,
String brokerUrl,
String topic
);Creates an MQTT input stream for Java applications with configurable storage level.
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
* @param storageLevel RDD storage level
* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
*/
public static JavaReceiverInputDStream<String> createStream(
JavaStreamingContext jssc,
String brokerUrl,
String topic,
StorageLevel storageLevel
);Usage Example:
import org.apache.spark.storage.StorageLevel;
// With default storage level
JavaReceiverInputDStream<String> mqttStream =
MQTTUtils.createStream(jssc, "tcp://broker:1883", "sensors/data");
// With custom storage level
JavaReceiverInputDStream<String> mqttStreamCustom =
MQTTUtils.createStream(
jssc,
"tcp://broker:1883",
"sensors/data",
StorageLevel.MEMORY_ONLY_2()
);Creates an MQTT input stream for Python applications using PySpark.
@staticmethod
def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
"""
Create an input stream that pulls messages from a MQTT Broker.
Args:
ssc: StreamingContext object
brokerUrl: Url of remote MQTT publisher
topic: Topic name to subscribe to
storageLevel: RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2
Returns:
DStream: A DStream object containing MQTT messages as UTF-8 strings
"""Usage Example:
from pyspark.storagelevel import StorageLevel
# With default storage level
mqtt_stream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")
# With custom storage level
mqtt_stream_custom = MQTTUtils.createStream(
ssc,
"tcp://broker:1883",
"sensors/data",
StorageLevel.MEMORY_ONLY_2
)/**
* Scala DStream that receives MQTT messages as UTF-8 encoded strings.
* Extends Spark's ReceiverInputDStream for distributed message processing.
*/
class ReceiverInputDStream[String] extends InputDStream[String]/**
* Java wrapper for ReceiverInputDStream providing MQTT messages as UTF-8 encoded strings.
* Provides Java-friendly API for distributed message processing.
*/
public class JavaReceiverInputDStream<String> extends JavaInputDStream<String>"""
Python DStream that receives MQTT messages as UTF-8 encoded strings.
Extends PySpark's DStream for distributed message processing in Python applications.
"""
class DStream:
def foreachRDD(self, func):
"""Apply a function to each RDD in the stream"""
def transform(self, func):
"""Transform each RDD in the stream using a function"""
def collect(self):
"""Collect all elements from the stream"""/**
* Defines how RDDs should be stored and replicated across the cluster.
* Common values for MQTT streams:
* - MEMORY_AND_DISK_SER_2: Default, serialized in memory with disk fallback, 2x replication
* - MEMORY_ONLY_2: In memory only with 2x replication
* - MEMORY_AND_DISK_2: In memory with disk fallback, 2x replication
*/
object StorageLevel {
val MEMORY_AND_DISK_SER_2: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
}/**
* Main entry point for Spark Streaming functionality in Scala.
* Used to create DStreams and manage streaming computations.
*/
class StreamingContext(
sparkConf: SparkConf,
batchDuration: Duration
) {
def start(): Unit
def awaitTermination(): Unit
def stop(): Unit
}/**
* Java API for StreamingContext, providing Java-friendly streaming functionality.
* Main entry point for Spark Streaming in Java applications.
*/
public class JavaStreamingContext {
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
public void start();
public void awaitTermination();
public void stop();
}"""
Python API for StreamingContext, main entry point for PySpark Streaming.
Used to create DStreams and manage streaming computations in Python.
"""
class StreamingContext:
def __init__(self, sparkContext, batchDuration):
"""Initialize with SparkContext and batch duration in seconds"""
def start(self):
"""Start the streaming computation"""
def awaitTermination(self):
"""Wait for streaming computation to terminate"""
def stop(self):
"""Stop the streaming computation"""tcp://hostname:port (default port 1883)ssl://hostname:port (default port 8883)ws://hostname:port/pathwss://hostname:port/path"sensors/temperature")+ (single level) and # (multi-level) wildcards
"sensors/+/temperature" - matches sensors/room1/temperature, sensors/room2/temperature"sensors/#" - matches all topics under sensors/The MQTT receiver implements automatic error handling:
The Eclipse Paho dependency is automatically included when using this library.