or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Streaming MQTT

1

2

MQTT receiver for Apache Spark Streaming that enables real-time processing of messages from MQTT brokers. Built on the Eclipse Paho MQTT client, it provides seamless integration between IoT messaging systems and Spark's distributed streaming architecture.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-mqtt_2.10

7

- **Package Type**: maven

8

- **Language**: Scala (with Java and Python APIs)

9

- **Installation**:

10

11

**Maven:**

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming-mqtt_2.10</artifactId>

16

<version>1.6.3</version>

17

</dependency>

18

```

19

20

**Python (PySpark):**

21

```bash

22

# Install PySpark

23

pip install pyspark==1.6.3

24

25

# Add MQTT library to spark-submit

26

spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.10:1.6.3 your_app.py

27

```

28

29

**SBT:**

30

```scala

31

libraryDependencies += "org.apache.spark" %% "spark-streaming-mqtt" % "1.6.3"

32

```

33

34

## Core Imports

35

36

Scala:

37

```scala

38

import org.apache.spark.streaming.mqtt.MQTTUtils

39

```

40

41

Java:

42

```java

43

import org.apache.spark.streaming.mqtt.MQTTUtils;

44

```

45

46

Python:

47

```python

48

from pyspark.streaming.mqtt import MQTTUtils

49

```

50

51

## Basic Usage

52

53

### Scala API

54

55

```scala

56

import org.apache.spark.streaming.StreamingContext

57

import org.apache.spark.streaming.mqtt.MQTTUtils

58

import org.apache.spark.storage.StorageLevel

59

60

val ssc = new StreamingContext(sparkConf, Seconds(2))

61

val brokerUrl = "tcp://localhost:1883"

62

val topic = "temperature/sensors"

63

64

// Create MQTT input stream with default storage level

65

val mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)

66

67

// Process messages

68

mqttStream.foreachRDD { rdd =>

69

rdd.collect().foreach(message => println(s"Received: $message"))

70

}

71

72

ssc.start()

73

ssc.awaitTermination()

74

```

75

76

### Java API

77

78

```java

79

import org.apache.spark.streaming.api.java.JavaStreamingContext;

80

import org.apache.spark.streaming.mqtt.MQTTUtils;

81

import org.apache.spark.storage.StorageLevel;

82

83

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

84

String brokerUrl = "tcp://localhost:1883";

85

String topic = "temperature/sensors";

86

87

// Create MQTT input stream

88

JavaReceiverInputDStream<String> mqttStream =

89

MQTTUtils.createStream(jssc, brokerUrl, topic);

90

91

// Process messages

92

mqttStream.foreachRDD(rdd -> {

93

rdd.collect().forEach(message ->

94

System.out.println("Received: " + message));

95

});

96

97

jssc.start();

98

jssc.awaitTermination();

99

```

100

101

### Python API

102

103

```python

104

from pyspark import SparkContext

105

from pyspark.streaming import StreamingContext

106

from pyspark.streaming.mqtt import MQTTUtils

107

from pyspark.storagelevel import StorageLevel

108

109

sc = SparkContext(appName="MQTTStreamingApp")

110

ssc = StreamingContext(sc, 2) # 2 second batch interval

111

broker_url = "tcp://localhost:1883"

112

topic = "temperature/sensors"

113

114

# Create MQTT input stream with default storage level

115

mqtt_stream = MQTTUtils.createStream(ssc, broker_url, topic)

116

117

# Process messages

118

def process_rdd(rdd):

119

messages = rdd.collect()

120

for message in messages:

121

print(f"Received: {message}")

122

123

mqtt_stream.foreachRDD(process_rdd)

124

125

ssc.start()

126

ssc.awaitTermination()

127

```

128

129

## Capabilities

130

131

### MQTT Stream Creation (Scala API)

132

133

Creates an MQTT input stream for Scala applications with configurable storage levels.

134

135

```scala { .api }

136

object MQTTUtils {

137

/**

138

* Create an input stream that receives messages pushed by a MQTT publisher.

139

* @param ssc StreamingContext object

140

* @param brokerUrl Url of remote MQTT publisher

141

* @param topic Topic name to subscribe to

142

* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.

143

* @return ReceiverInputDStream[String] containing MQTT messages as UTF-8 strings

144

*/

145

def createStream(

146

ssc: StreamingContext,

147

brokerUrl: String,

148

topic: String,

149

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

150

): ReceiverInputDStream[String]

151

}

152

```

153

154

**Usage Example:**

155

156

```scala

157

import org.apache.spark.storage.StorageLevel

158

159

// With default storage level

160

val mqttStream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

161

162

// With custom storage level

163

val mqttStreamCustom = MQTTUtils.createStream(

164

ssc,

165

"tcp://broker:1883",

166

"sensors/data",

167

StorageLevel.MEMORY_ONLY_2

168

)

169

```

170

171

### MQTT Stream Creation (Java API - Default Storage)

172

173

Creates an MQTT input stream for Java applications with default storage level.

174

175

```java { .api }

176

/**

177

* Create an input stream that receives messages pushed by a MQTT publisher.

178

* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.

179

* @param jssc JavaStreamingContext object

180

* @param brokerUrl Url of remote MQTT publisher

181

* @param topic Topic name to subscribe to

182

* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings

183

*/

184

public static JavaReceiverInputDStream<String> createStream(

185

JavaStreamingContext jssc,

186

String brokerUrl,

187

String topic

188

);

189

```

190

191

### MQTT Stream Creation (Java API - Custom Storage)

192

193

Creates an MQTT input stream for Java applications with configurable storage level.

194

195

```java { .api }

196

/**

197

* Create an input stream that receives messages pushed by a MQTT publisher.

198

* @param jssc JavaStreamingContext object

199

* @param brokerUrl Url of remote MQTT publisher

200

* @param topic Topic name to subscribe to

201

* @param storageLevel RDD storage level

202

* @return JavaReceiverInputDStream[String] containing MQTT messages as UTF-8 strings

203

*/

204

public static JavaReceiverInputDStream<String> createStream(

205

JavaStreamingContext jssc,

206

String brokerUrl,

207

String topic,

208

StorageLevel storageLevel

209

);

210

```

211

212

**Usage Example:**

213

214

```java

215

import org.apache.spark.storage.StorageLevel;

216

217

// With default storage level

218

JavaReceiverInputDStream<String> mqttStream =

219

MQTTUtils.createStream(jssc, "tcp://broker:1883", "sensors/data");

220

221

// With custom storage level

222

JavaReceiverInputDStream<String> mqttStreamCustom =

223

MQTTUtils.createStream(

224

jssc,

225

"tcp://broker:1883",

226

"sensors/data",

227

StorageLevel.MEMORY_ONLY_2()

228

);

229

```

230

231

### MQTT Stream Creation (Python API)

232

233

Creates an MQTT input stream for Python applications using PySpark.

234

235

```python { .api }

236

@staticmethod

237

def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):

238

"""

239

Create an input stream that pulls messages from a MQTT Broker.

240

241

Args:

242

ssc: StreamingContext object

243

brokerUrl: Url of remote MQTT publisher

244

topic: Topic name to subscribe to

245

storageLevel: RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2

246

247

Returns:

248

DStream: A DStream object containing MQTT messages as UTF-8 strings

249

"""

250

```

251

252

**Usage Example:**

253

254

```python

255

from pyspark.storagelevel import StorageLevel

256

257

# With default storage level

258

mqtt_stream = MQTTUtils.createStream(ssc, "tcp://broker:1883", "sensors/data")

259

260

# With custom storage level

261

mqtt_stream_custom = MQTTUtils.createStream(

262

ssc,

263

"tcp://broker:1883",

264

"sensors/data",

265

StorageLevel.MEMORY_ONLY_2

266

)

267

```

268

269

## Types

270

271

### ReceiverInputDStream[String]

272

```scala { .api }

273

/**

274

* Scala DStream that receives MQTT messages as UTF-8 encoded strings.

275

* Extends Spark's ReceiverInputDStream for distributed message processing.

276

*/

277

class ReceiverInputDStream[String] extends InputDStream[String]

278

```

279

280

### JavaReceiverInputDStream[String]

281

```java { .api }

282

/**

283

* Java wrapper for ReceiverInputDStream providing MQTT messages as UTF-8 encoded strings.

284

* Provides Java-friendly API for distributed message processing.

285

*/

286

public class JavaReceiverInputDStream<String> extends JavaInputDStream<String>

287

```

288

289

### DStream (Python)

290

```python { .api }

291

"""

292

Python DStream that receives MQTT messages as UTF-8 encoded strings.

293

Extends PySpark's DStream for distributed message processing in Python applications.

294

"""

295

class DStream:

296

def foreachRDD(self, func):

297

"""Apply a function to each RDD in the stream"""

298

299

def transform(self, func):

300

"""Transform each RDD in the stream using a function"""

301

302

def collect(self):

303

"""Collect all elements from the stream"""

304

```

305

306

### StorageLevel

307

```scala { .api }

308

/**

309

* Defines how RDDs should be stored and replicated across the cluster.

310

* Common values for MQTT streams:

311

* - MEMORY_AND_DISK_SER_2: Default, serialized in memory with disk fallback, 2x replication

312

* - MEMORY_ONLY_2: In memory only with 2x replication

313

* - MEMORY_AND_DISK_2: In memory with disk fallback, 2x replication

314

*/

315

object StorageLevel {

316

val MEMORY_AND_DISK_SER_2: StorageLevel

317

val MEMORY_ONLY_2: StorageLevel

318

val MEMORY_AND_DISK_2: StorageLevel

319

}

320

```

321

322

### StreamingContext (Scala)

323

```scala { .api }

324

/**

325

* Main entry point for Spark Streaming functionality in Scala.

326

* Used to create DStreams and manage streaming computations.

327

*/

328

class StreamingContext(

329

sparkConf: SparkConf,

330

batchDuration: Duration

331

) {

332

def start(): Unit

333

def awaitTermination(): Unit

334

def stop(): Unit

335

}

336

```

337

338

### JavaStreamingContext (Java)

339

```java { .api }

340

/**

341

* Java API for StreamingContext, providing Java-friendly streaming functionality.

342

* Main entry point for Spark Streaming in Java applications.

343

*/

344

public class JavaStreamingContext {

345

public JavaStreamingContext(SparkConf conf, Duration batchDuration);

346

public void start();

347

public void awaitTermination();

348

public void stop();

349

}

350

```

351

352

### StreamingContext (Python)

353

```python { .api }

354

"""

355

Python API for StreamingContext, main entry point for PySpark Streaming.

356

Used to create DStreams and manage streaming computations in Python.

357

"""

358

class StreamingContext:

359

def __init__(self, sparkContext, batchDuration):

360

"""Initialize with SparkContext and batch duration in seconds"""

361

362

def start(self):

363

"""Start the streaming computation"""

364

365

def awaitTermination(self):

366

"""Wait for streaming computation to terminate"""

367

368

def stop(self):

369

"""Stop the streaming computation"""

370

```

371

372

## Configuration

373

374

### MQTT Broker URL Format

375

- **TCP**: `tcp://hostname:port` (default port 1883)

376

- **SSL**: `ssl://hostname:port` (default port 8883)

377

- **WebSocket**: `ws://hostname:port/path`

378

- **Secure WebSocket**: `wss://hostname:port/path`

379

380

### Topic Subscription

381

- **Single Topic**: Exact topic name (e.g., `"sensors/temperature"`)

382

- **Wildcards**: MQTT supports `+` (single level) and `#` (multi-level) wildcards

383

- `"sensors/+/temperature"` - matches `sensors/room1/temperature`, `sensors/room2/temperature`

384

- `"sensors/#"` - matches all topics under `sensors/`

385

386

### Storage Level Guidelines

387

- **MEMORY_AND_DISK_SER_2**: Default, best for most use cases with fault tolerance

388

- **MEMORY_ONLY_2**: Faster but no disk fallback, use when memory is sufficient

389

- **MEMORY_AND_DISK_2**: Non-serialized, uses more memory but faster deserialization

390

391

## Error Handling

392

393

The MQTT receiver implements automatic error handling:

394

395

- **Connection Loss**: Automatically attempts to reconnect to the MQTT broker

396

- **Message Delivery**: Stores messages according to the specified StorageLevel for fault tolerance

397

- **Receiver Restart**: On connection failure, the receiver restarts and re-establishes subscription

398

399

## Dependencies

400

401

- **Eclipse Paho MQTT Client**: v1.0.1 for MQTT protocol implementation

402

- **Apache Spark Streaming**: Core streaming functionality

403

- **Apache Spark Core**: Base Spark functionality

404

405

The Eclipse Paho dependency is automatically included when using this library.