or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-streaming-mqtt-2-10

MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-mqtt_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-mqtt-2-10@1.6.0

index.mddocs/

Spark Streaming MQTT

MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers. Built on the Eclipse Paho MQTT client, it provides seamless integration between IoT messaging systems and Spark's distributed streaming architecture.

Package Information

  • Package Name: spark-streaming-mqtt_2.10
  • Package Type: maven
  • Language: Scala (with Java and Python APIs)
  • Installation:

Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-mqtt_2.10</artifactId>
  <version>1.6.3</version>
</dependency>

Python (PySpark):

# Install PySpark
pip install pyspark==1.6.3

# Add MQTT library to spark-submit
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.10:1.6.3 your_app.py

SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming-mqtt" % "1.6.3"

Core Imports

Scala:

import org.apache.spark.streaming.mqtt.MQTTUtils

Java:

import org.apache.spark.streaming.mqtt.MQTTUtils;

Python:

from pyspark.streaming.mqtt import MQTTUtils

Basic Usage

Scala API

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.mqtt.MQTTUtils
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sparkConf, Seconds(2))
val brokerUrl = "tcp://localhost:1883"
val topic = "temperature/sensors"

// Create MQTT input stream with default storage level
val mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)

// Process messages
mqttStream.foreachRDD { rdd =>
  rdd.collect().foreach(message => println(s"Received: $message"))
}

ssc.start()
ssc.awaitTermination()

Java API

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.mqtt.MQTTUtils;
import org.apache.spark.storage.StorageLevel;

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
String brokerUrl = "tcp://localhost:1883";
String topic = "temperature/sensors";

// Create MQTT input stream
JavaReceiverInputDStream<String> mqttStream = 
    MQTTUtils.createStream(jssc, brokerUrl, topic);

// Process messages
mqttStream.foreachRDD(rdd -> {
    rdd.collect().forEach(message -> 
        System.out.println("Received: " + message));
});

jssc.start();
jssc.awaitTermination();

Python API

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.storagelevel import StorageLevel

sc = SparkContext(appName="MQTTStreamingApp")
ssc = StreamingContext(sc, 2)  # 2 second batch interval
broker_url = "tcp://localhost:1883"
topic = "temperature/sensors"

# Create MQTT input stream with default storage level
mqtt_stream = MQTTUtils.createStream(ssc, broker_url, topic)

# Process messages
def process_rdd(rdd):
    messages = rdd.collect()
    for message in messages:
        print(f"Received: {message}")

mqtt_stream.foreachRDD(process_rdd)

ssc.start()
ssc.awaitTermination()

Capabilities

MQTT Stream Creation (Scala API)

Creates an MQTT input stream for Scala applications with configurable storage levels.

object MQTTUtils {
  /**
   * Create an input stream that receives messages pushed by a MQTT publisher.
   * @param ssc StreamingContext object
   * @param brokerUrl Url of remote MQTT publisher
   * @param topic Topic name to subscribe to
   * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
   * @return ReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
   */
  def createStream(
      ssc: StreamingContext,
      brokerUrl: String,
      topic: String,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String]
}

Usage Example:

import org.apache.spark.storage.StorageLevel

// With default storage level
val mqttStream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

// With custom storage level
val mqttStreamCustom = MQTTUtils.createStream(
  ssc, 
  "tcp://broker:1883", 
  "sensors/data",
  StorageLevel.MEMORY_ONLY_2
)

MQTT Stream Creation (Java API - Default Storage)

Creates an MQTT input stream for Java applications with default storage level.

/**
 * Create an input stream that receives messages pushed by a MQTT publisher.
 * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
 * @param jssc JavaStreamingContext object
 * @param brokerUrl Url of remote MQTT publisher
 * @param topic Topic name to subscribe to
 * @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
 */
public static JavaReceiverInputDStream<String> createStream(
    JavaStreamingContext jssc,
    String brokerUrl,
    String topic
);

MQTT Stream Creation (Java API - Custom Storage)

Creates an MQTT input stream for Java applications with configurable storage level.

/**
 * Create an input stream that receives messages pushed by a MQTT publisher.
 * @param jssc JavaStreamingContext object
 * @param brokerUrl Url of remote MQTT publisher
 * @param topic Topic name to subscribe to
 * @param storageLevel RDD storage level
 * @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings
 */
public static JavaReceiverInputDStream<String> createStream(
    JavaStreamingContext jssc,
    String brokerUrl,
    String topic,
    StorageLevel storageLevel
);

Usage Example:

import org.apache.spark.storage.StorageLevel;

// With default storage level
JavaReceiverInputDStream<String> mqttStream = 
    MQTTUtils.createStream(jssc, "tcp://broker:1883", "sensors/data");

// With custom storage level
JavaReceiverInputDStream<String> mqttStreamCustom = 
    MQTTUtils.createStream(
        jssc, 
        "tcp://broker:1883", 
        "sensors/data",
        StorageLevel.MEMORY_ONLY_2()
    );

MQTT Stream Creation (Python API)

Creates an MQTT input stream for Python applications using PySpark.

@staticmethod
def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
    """
    Create an input stream that pulls messages from a MQTT Broker.
    
    Args:
        ssc: StreamingContext object
        brokerUrl: Url of remote MQTT publisher  
        topic: Topic name to subscribe to
        storageLevel: RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2
    
    Returns:
        DStream: A DStream object containing MQTT messages as UTF-8 strings
    """

Usage Example:

from pyspark.storagelevel import StorageLevel

# With default storage level
mqtt_stream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

# With custom storage level  
mqtt_stream_custom = MQTTUtils.createStream(
    ssc, 
    "tcp://broker:1883", 
    "sensors/data",
    StorageLevel.MEMORY_ONLY_2
)

Types

ReceiverInputDStream[String]

/**
 * Scala DStream that receives MQTT messages as UTF-8 encoded strings.
 * Extends Spark's ReceiverInputDStream for distributed message processing.
 */
class ReceiverInputDStream[String] extends InputDStream[String]

JavaReceiverInputDStream[String]

/**
 * Java wrapper for ReceiverInputDStream providing MQTT messages as UTF-8 encoded strings.
 * Provides Java-friendly API for distributed message processing.
 */
public class JavaReceiverInputDStream<String> extends JavaInputDStream<String>

DStream (Python)

"""
Python DStream that receives MQTT messages as UTF-8 encoded strings.
Extends PySpark's DStream for distributed message processing in Python applications.
"""
class DStream:
    def foreachRDD(self, func):
        """Apply a function to each RDD in the stream"""
        
    def transform(self, func):
        """Transform each RDD in the stream using a function"""
        
    def collect(self):
        """Collect all elements from the stream"""

StorageLevel

/**
 * Defines how RDDs should be stored and replicated across the cluster.
 * Common values for MQTT streams:
 * - MEMORY_AND_DISK_SER_2: Default, serialized in memory with disk fallback, 2x replication
 * - MEMORY_ONLY_2: In memory only with 2x replication  
 * - MEMORY_AND_DISK_2: In memory with disk fallback, 2x replication
 */
object StorageLevel {
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
}

StreamingContext (Scala)

/**
 * Main entry point for Spark Streaming functionality in Scala.
 * Used to create DStreams and manage streaming computations.
 */
class StreamingContext(
  sparkConf: SparkConf, 
  batchDuration: Duration
) {
  def start(): Unit
  def awaitTermination(): Unit
  def stop(): Unit
}

JavaStreamingContext (Java)

/**
 * Java API for StreamingContext, providing Java-friendly streaming functionality.
 * Main entry point for Spark Streaming in Java applications.
 */
public class JavaStreamingContext {
  public JavaStreamingContext(SparkConf conf, Duration batchDuration);
  public void start();
  public void awaitTermination();
  public void stop();
}

StreamingContext (Python)

"""
Python API for StreamingContext, main entry point for PySpark Streaming.
Used to create DStreams and manage streaming computations in Python.
"""  
class StreamingContext:
    def __init__(self, sparkContext, batchDuration):
        """Initialize with SparkContext and batch duration in seconds"""
        
    def start(self):
        """Start the streaming computation"""
        
    def awaitTermination(self):
        """Wait for streaming computation to terminate"""
        
    def stop(self):
        """Stop the streaming computation"""

Configuration

MQTT Broker URL Format

  • TCP: tcp://hostname:port (default port 1883)
  • SSL: ssl://hostname:port (default port 8883)
  • WebSocket: ws://hostname:port/path
  • Secure WebSocket: wss://hostname:port/path

Topic Subscription

  • Single Topic: Exact topic name (e.g., "sensors/temperature")
  • Wildcards: MQTT supports + (single level) and # (multi-level) wildcards
    • "sensors/+/temperature" - matches sensors/room1/temperature, sensors/room2/temperature
    • "sensors/#" - matches all topics under sensors/

Storage Level Guidelines

  • MEMORY_AND_DISK_SER_2: Default, best for most use cases with fault tolerance
  • MEMORY_ONLY_2: Faster but no disk fallback, use when memory is sufficient
  • MEMORY_AND_DISK_2: Non-serialized, uses more memory but faster deserialization

Error Handling

The MQTT receiver implements automatic error handling:

  • Connection Loss: Automatically attempts to reconnect to the MQTT broker
  • Message Delivery: Stores messages according to the specified StorageLevel for fault tolerance
  • Receiver Restart: On connection failure, the receiver restarts and re-establishes subscription

Dependencies

  • Eclipse Paho MQTT Client: v1.0.1 for MQTT protocol implementation
  • Apache Spark Streaming: Core streaming functionality
  • Apache Spark Core: Base Spark functionality

The Eclipse Paho dependency is automatically included when using this library.