CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-mqtt-2-10

MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Spark Streaming MQTT

MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers. Built on the Eclipse Paho MQTT client, it provides seamless integration between IoT messaging systems and Spark's distributed streaming architecture.

Package Information

  • Package Name: spark-streaming-mqtt_2.10
  • Package Type: maven
  • Language: Scala (with Java and Python APIs)
  • Installation:

Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-mqtt_2.10</artifactId>
  <version>1.6.3</version>
</dependency>

Python (PySpark):

# Install PySpark
pip install pyspark==1.6.3

# Add MQTT library to spark-submit
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.10:1.6.3 your_app.py

SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming-mqtt" % "1.6.3"

Core Imports

Scala:

import org.apache.spark.streaming.mqtt.MQTTUtils

Java:

import org.apache.spark.streaming.mqtt.MQTTUtils;

Python:

from pyspark.streaming.mqtt import MQTTUtils

Basic Usage

Scala API

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.mqtt.MQTTUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sparkConf, Seconds(2))
val brokerUrl = "tcp://localhost:1883"
val topic = "temperature/sensors"

// Create MQTT input stream with default storage level
val mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)

// Process messages
mqttStream.foreachRDD { rdd =>
  rdd.collect().foreach(message => println(s"Received: $message"))
}

ssc.start()
ssc.awaitTermination()

Java API

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.mqtt.MQTTUtils;
import org.apache.spark.storage.StorageLevel;

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
String brokerUrl = "tcp://localhost:1883";
String topic = "temperature/sensors";

// Create MQTT input stream
JavaReceiverInputDStream<String> mqttStream = 
    MQTTUtils.createStream(jssc, brokerUrl, topic);

// Process messages
mqttStream.foreachRDD(rdd -> {
    rdd.collect().forEach(message -> 
        System.out.println("Received: " + message));
});

jssc.start();
jssc.awaitTermination();

Python API

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.storagelevel import StorageLevel

sc = SparkContext(appName="MQTTStreamingApp")
ssc = StreamingContext(sc, 2)  # 2 second batch interval
broker_url = "tcp://localhost:1883"
topic = "temperature/sensors"

# Create MQTT input stream with default storage level
mqtt_stream = MQTTUtils.createStream(ssc, broker_url, topic)

# Process messages
def process_rdd(rdd):
    messages = rdd.collect()
    for message in messages:
        print(f"Received: {message}")

mqtt_stream.foreachRDD(process_rdd)

ssc.start()
ssc.awaitTermination()

Capabilities

MQTT Stream Creation (Scala API)

Creates an MQTT input stream for Scala applications with configurable storage levels.

object MQTTUtils {
  /**
   * Create an input stream that receives messages pushed by a MQTT publisher.
   * @param ssc StreamingContext object
   * @param brokerUrl Url of remote MQTT publisher
   * @param topic Topic name to subscribe to
   * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
   * @return ReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
   */
  def createStream(
      ssc: StreamingContext,
      brokerUrl: String,
      topic: String,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String]
}

Usage Example:

import org.apache.spark.storage.StorageLevel

// With default storage level
val mqttStream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

// With custom storage level
val mqttStreamCustom = MQTTUtils.createStream(
  ssc, 
  "tcp://broker:1883", 
  "sensors/data",
  StorageLevel.MEMORY_ONLY_2
)

MQTT Stream Creation (Java API - Default Storage)

Creates an MQTT input stream for Java applications with default storage level.

/**
 * Create an input stream that receives messages pushed by a MQTT publisher.
 * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
 * @param jssc JavaStreamingContext object
 * @param brokerUrl Url of remote MQTT publisher
 * @param topic Topic name to subscribe to
 * @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
 */
public static JavaReceiverInputDStream<String> createStream(
    JavaStreamingContext jssc,
    String brokerUrl,
    String topic
);

MQTT Stream Creation (Java API - Custom Storage)

Creates an MQTT input stream for Java applications with configurable storage level.

/**
 * Create an input stream that receives messages pushed by a MQTT publisher.
 * @param jssc JavaStreamingContext object
 * @param brokerUrl Url of remote MQTT publisher
 * @param topic Topic name to subscribe to
 * @param storageLevel RDD storage level
 * @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
 */
public static JavaReceiverInputDStream<String> createStream(
    JavaStreamingContext jssc,
    String brokerUrl,
    String topic,
    StorageLevel storageLevel
);

Usage Example:

import org.apache.spark.storage.StorageLevel;

// With default storage level
JavaReceiverInputDStream<String> mqttStream = 
    MQTTUtils.createStream(jssc, "tcp://broker:1883", "sensors/data");

// With custom storage level
JavaReceiverInputDStream<String> mqttStreamCustom = 
    MQTTUtils.createStream(
        jssc, 
        "tcp://broker:1883", 
        "sensors/data",
        StorageLevel.MEMORY_ONLY_2()
    );

MQTT Stream Creation (Python API)

Creates an MQTT input stream for Python applications using PySpark.

@staticmethod
def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
    """
    Create an input stream that pulls messages from a MQTT Broker.
    
    Args:
        ssc: StreamingContext object
        brokerUrl: Url of remote MQTT publisher  
        topic: Topic name to subscribe to
        storageLevel: RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2
    
    Returns:
        DStream: A DStream object containing MQTT messages as UTF-8 strings
    """

Usage Example:

from pyspark.storagelevel import StorageLevel

# With default storage level
mqtt_stream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

# With custom storage level  
mqtt_stream_custom = MQTTUtils.createStream(
    ssc, 
    "tcp://broker:1883", 
    "sensors/data",
    StorageLevel.MEMORY_ONLY_2
)

Types

ReceiverInputDStream[String]

/**
 * Scala DStream that receives MQTT messages as UTF-8 encoded strings.
 * Extends Spark's ReceiverInputDStream for distributed message processing.
 */
class ReceiverInputDStream[String] extends InputDStream[String]

JavaReceiverInputDStream[String]

/**
 * Java wrapper for ReceiverInputDStream providing MQTT messages as UTF-8 encoded strings.
 * Provides Java-friendly API for distributed message processing.
 */
public class JavaReceiverInputDStream<String> extends JavaInputDStream<String>

DStream (Python)

"""
Python DStream that receives MQTT messages as UTF-8 encoded strings.
Extends PySpark's DStream for distributed message processing in Python applications.
"""
class DStream:
    def foreachRDD(self, func):
        """Apply a function to each RDD in the stream"""
        
    def transform(self, func):
        """Transform each RDD in the stream using a function"""
        
    def collect(self):
        """Collect all elements from the stream"""

StorageLevel

/**
 * Defines how RDDs should be stored and replicated across the cluster.
 * Common values for MQTT streams:
 * - MEMORY_AND_DISK_SER_2: Default, serialized in memory with disk fallback, 2x replication
 * - MEMORY_ONLY_2: In memory only with 2x replication  
 * - MEMORY_AND_DISK_2: In memory with disk fallback, 2x replication
 */
object StorageLevel {
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
}

StreamingContext (Scala)

/**
 * Main entry point for Spark Streaming functionality in Scala.
 * Used to create DStreams and manage streaming computations.
 */
class StreamingContext(
  sparkConf: SparkConf, 
  batchDuration: Duration
) {
  def start(): Unit
  def awaitTermination(): Unit
  def stop(): Unit
}

JavaStreamingContext (Java)

/**
 * Java API for StreamingContext, providing Java-friendly streaming functionality.
 * Main entry point for Spark Streaming in Java applications.
 */
public class JavaStreamingContext {
  public JavaStreamingContext(SparkConf conf, Duration batchDuration);
  public void start();
  public void awaitTermination();
  public void stop();
}

StreamingContext (Python)

"""
Python API for StreamingContext, main entry point for PySpark Streaming.
Used to create DStreams and manage streaming computations in Python.
"""  
class StreamingContext:
    def __init__(self, sparkContext, batchDuration):
        """Initialize with SparkContext and batch duration in seconds"""
        
    def start(self):
        """Start the streaming computation"""
        
    def awaitTermination(self):
        """Wait for streaming computation to terminate"""
        
    def stop(self):
        """Stop the streaming computation"""

Configuration

MQTT Broker URL Format

  • TCP: tcp://hostname:port (default port 1883)
  • SSL: ssl://hostname:port (default port 8883)
  • WebSocket: ws://hostname:port/path
  • Secure WebSocket: wss://hostname:port/path

Topic Subscription

  • Single Topic: Exact topic name (e.g., "sensors/temperature")
  • Wildcards: MQTT supports + (single level) and # (multi-level) wildcards
    • "sensors/+/temperature" - matches sensors/room1/temperature, sensors/room2/temperature
    • "sensors/#" - matches all topics under sensors/

Storage Level Guidelines

  • MEMORY_AND_DISK_SER_2: Default, best for most use cases with fault tolerance
  • MEMORY_ONLY_2: Faster but no disk fallback, use when memory is sufficient
  • MEMORY_AND_DISK_2: Non-serialized, uses more memory but faster deserialization

Error Handling

The MQTT receiver implements automatic error handling:

  • Connection Loss: Automatically attempts to reconnect to the MQTT broker
  • Message Delivery: Stores messages according to the specified StorageLevel for fault tolerance
  • Receiver Restart: On connection failure, the receiver restarts and re-establishes subscription

Dependencies

  • Eclipse Paho MQTT Client: v1.0.1 for MQTT protocol implementation
  • Apache Spark Streaming: Core streaming functionality
  • Apache Spark Core: Base Spark functionality

The Eclipse Paho dependency is automatically included when using this library.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-mqtt_2.10@1.6.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-streaming-mqtt-2-10 badge