or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Streaming ZeroMQ

1

2

Spark Streaming ZeroMQ provides Apache Spark Streaming integration with ZeroMQ, a high-performance distributed messaging library. It enables Spark applications to consume real-time data streams from ZeroMQ publishers through a publish-subscribe pattern, supporting both Scala and Java APIs for distributed stream processing.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-zeromq_2.10

7

- **Package Type**: Maven

8

- **Language**: Scala (with Java compatibility)

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-streaming-zeromq_2.10

11

- **Version**: 1.6.3

12

- **Maven Dependency**:

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-streaming-zeromq_2.10</artifactId>

17

<version>1.6.3</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

### Scala Imports

24

25

```scala

26

import org.apache.spark.streaming.zeromq.ZeroMQUtils

27

import org.apache.spark.streaming.{StreamingContext, Seconds}

28

import org.apache.spark.storage.StorageLevel

29

import org.apache.spark.streaming.receiver.ActorSupervisorStrategy

30

import org.apache.spark.SparkConf

31

import akka.zeromq.Subscribe

32

import akka.util.ByteString

33

```

34

35

### Java Imports

36

37

```java

38

import org.apache.spark.streaming.zeromq.ZeroMQUtils;

39

import org.apache.spark.streaming.api.java.JavaStreamingContext;

40

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

41

import org.apache.spark.storage.StorageLevel;

42

import org.apache.spark.api.java.function.Function;

43

import org.apache.spark.streaming.receiver.ActorSupervisorStrategy;

44

import org.apache.spark.SparkConf;

45

import akka.zeromq.Subscribe;

46

import akka.util.ByteString;

47

```

48

49

## Basic Usage

50

51

### Scala Example

52

53

```scala

54

import org.apache.spark.streaming.zeromq.ZeroMQUtils

55

import org.apache.spark.streaming.{Seconds, StreamingContext}

56

import org.apache.spark.storage.StorageLevel

57

import akka.zeromq.Subscribe

58

import akka.util.ByteString

59

60

// Create streaming context

61

val sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]")

62

val ssc = new StreamingContext(sparkConf, Seconds(1))

63

64

// Define ZeroMQ connection parameters

65

val publisherUrl = "tcp://localhost:5555"

66

val subscribe = Subscribe("topic".getBytes)

67

68

// Define message converter function

69

val bytesToObjects = (bytes: Seq[ByteString]) => {

70

bytes.map(_.utf8String).iterator

71

}

72

73

// Create ZeroMQ input stream

74

val zmqStream = ZeroMQUtils.createStream(

75

ssc,

76

publisherUrl,

77

subscribe,

78

bytesToObjects,

79

StorageLevel.MEMORY_AND_DISK_SER_2

80

)

81

82

// Process the stream

83

zmqStream.foreachRDD { rdd =>

84

rdd.foreach(message => println(s"Received: $message"))

85

}

86

87

ssc.start()

88

ssc.awaitTermination()

89

```

90

91

### Java Example

92

93

```java

94

import org.apache.spark.streaming.zeromq.ZeroMQUtils;

95

import org.apache.spark.streaming.api.java.JavaStreamingContext;

96

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

97

import org.apache.spark.storage.StorageLevel;

98

import org.apache.spark.api.java.function.Function;

99

import org.apache.spark.streaming.Durations;

100

import akka.zeromq.Subscribe;

101

import akka.util.ByteString;

102

import java.util.Arrays;

103

import java.util.List;

104

import java.util.stream.Collectors;

105

106

// Create streaming context

107

SparkConf sparkConf = new SparkConf().setAppName("ZeroMQApp").setMaster("local[2]");

108

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

109

110

// Define ZeroMQ connection parameters

111

String publisherUrl = "tcp://localhost:5555";

112

Subscribe subscribe = new Subscribe("topic".getBytes());

113

114

// Define message converter function

115

Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {

116

@Override

117

public Iterable<String> call(byte[][] bytes) throws Exception {

118

return Arrays.stream(bytes)

119

.map(String::new)

120

.collect(Collectors.toList());

121

}

122

};

123

124

// Create ZeroMQ input stream

125

JavaReceiverInputDStream<String> zmqStream = ZeroMQUtils.<String>createStream(

126

jssc,

127

publisherUrl,

128

subscribe,

129

bytesToObjects,

130

StorageLevel.MEMORY_AND_DISK_SER_2()

131

);

132

133

// Process the stream

134

zmqStream.foreachRDD(rdd -> {

135

rdd.foreach(message -> System.out.println("Received: " + message));

136

});

137

138

jssc.start();

139

jssc.awaitTermination();

140

```

141

142

## Architecture

143

144

Spark Streaming ZeroMQ is built around several key components:

145

146

- **ZeroMQUtils**: Main utility object providing factory methods for creating ZeroMQ input streams

147

- **ZeroMQReceiver**: Internal Akka actor-based receiver that handles ZeroMQ message reception and forwarding to Spark

148

- **Message Conversion**: User-defined functions to convert ZeroMQ byte sequences into typed objects

149

- **Fault Tolerance**: Built-in supervisor strategies for handling actor failures and ensuring reliable message processing

150

- **Storage Integration**: Configurable RDD storage levels for controlling data persistence and replication

151

152

## Capabilities

153

154

### Stream Creation (Scala API)

155

156

Creates a ZeroMQ input stream for Spark Streaming with full configuration options.

157

158

```scala { .api }

159

/**

160

* Create an input stream that receives messages pushed by a zeromq publisher.

161

* @param ssc StreamingContext object

162

* @param publisherUrl Url of remote zeromq publisher

163

* @param subscribe Topic to subscribe to

164

* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic

165

* and each frame has sequence of byte thus it needs the converter

166

* to translate from sequence of sequence of bytes

167

* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.

168

* @param supervisorStrategy Actor supervisor strategy for fault tolerance

169

*/

170

def createStream[T: ClassTag](

171

ssc: StreamingContext,

172

publisherUrl: String,

173

subscribe: Subscribe,

174

bytesToObjects: Seq[ByteString] => Iterator[T],

175

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,

176

supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy

177

): ReceiverInputDStream[T]

178

```

179

180

### Stream Creation (Java API - Full Configuration)

181

182

Creates a ZeroMQ input stream for Java applications with custom storage level and supervisor strategy.

183

184

```java { .api }

185

/**

186

* Create an input stream that receives messages pushed by a zeromq publisher.

187

* @param jssc JavaStreamingContext object

188

* @param publisherUrl Url of remote ZeroMQ publisher

189

* @param subscribe Topic to subscribe to

190

* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each

191

* frame has sequence of byte thus it needs the converter to translate

192

* from sequence of sequence of bytes

193

* @param storageLevel Storage level to use for storing the received objects

194

* @param supervisorStrategy Actor supervisor strategy for fault tolerance

195

*/

196

public static <T> JavaReceiverInputDStream<T> createStream(

197

JavaStreamingContext jssc,

198

String publisherUrl,

199

Subscribe subscribe,

200

Function<byte[][], Iterable<T>> bytesToObjects,

201

StorageLevel storageLevel,

202

SupervisorStrategy supervisorStrategy

203

)

204

```

205

206

### Stream Creation (Java API - With Storage Level)

207

208

Creates a ZeroMQ input stream for Java applications with custom storage level.

209

210

```java { .api }

211

/**

212

* Create an input stream that receives messages pushed by a zeromq publisher.

213

* @param jssc JavaStreamingContext object

214

* @param publisherUrl Url of remote zeromq publisher

215

* @param subscribe Topic to subscribe to

216

* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each

217

* frame has sequence of byte thus it needs the converter to translate

218

* from sequence of sequence of bytes

219

* @param storageLevel RDD storage level

220

*/

221

public static <T> JavaReceiverInputDStream<T> createStream(

222

JavaStreamingContext jssc,

223

String publisherUrl,

224

Subscribe subscribe,

225

Function<byte[][], Iterable<T>> bytesToObjects,

226

StorageLevel storageLevel

227

)

228

```

229

230

### Stream Creation (Java API - Basic)

231

232

Creates a ZeroMQ input stream for Java applications with default configuration.

233

234

```java { .api }

235

/**

236

* Create an input stream that receives messages pushed by a zeromq publisher.

237

* @param jssc JavaStreamingContext object

238

* @param publisherUrl Url of remote zeromq publisher

239

* @param subscribe Topic to subscribe to

240

* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each

241

* frame has sequence of byte thus it needs the converter to translate

242

* from sequence of sequence of bytes

243

*/

244

public static <T> JavaReceiverInputDStream<T> createStream(

245

JavaStreamingContext jssc,

246

String publisherUrl,

247

Subscribe subscribe,

248

Function<byte[][], Iterable<T>> bytesToObjects

249

)

250

```

251

252

## Types

253

254

### Core Spark Types

255

256

```scala { .api }

257

// Spark configuration object

258

class SparkConf {

259

def setAppName(name: String): SparkConf

260

def setMaster(master: String): SparkConf

261

}

262

263

// Spark Streaming context for creating streams

264

class StreamingContext(sparkConf: SparkConf, batchDuration: Duration)

265

class StreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

266

267

// Java wrapper for StreamingContext

268

class JavaStreamingContext(sparkConf: SparkConf, batchDuration: Duration)

269

class JavaStreamingContext(master: String, appName: String, batchDuration: Duration) // Deprecated

270

271

// Receiver-based input stream for Scala

272

class ReceiverInputDStream[T: ClassTag] extends InputDStream[T]

273

274

// Java wrapper for receiver input stream

275

class JavaReceiverInputDStream[T] extends JavaInputDStream[T]

276

277

// RDD storage configuration

278

object StorageLevel {

279

val MEMORY_ONLY: StorageLevel

280

val MEMORY_AND_DISK: StorageLevel

281

val MEMORY_AND_DISK_SER: StorageLevel

282

val MEMORY_AND_DISK_SER_2: StorageLevel

283

// ... other storage levels

284

}

285

```

286

287

### Akka ZeroMQ Types

288

289

```scala { .api }

290

// ZeroMQ subscription configuration

291

case class Subscribe(topic: ByteString)

292

293

// Efficient byte string representation

294

class ByteString {

295

def utf8String: String

296

def toArray: Array[Byte]

297

}

298

299

// Actor supervision strategy for fault tolerance

300

abstract class SupervisorStrategy {

301

def decider: Decider

302

}

303

```

304

305

### Function Types

306

307

```scala { .api }

308

// Scala converter function type

309

type BytesToObjects[T] = Seq[ByteString] => Iterator[T]

310

311

// Java converter function interface

312

interface Function<T, R> {

313

R call(T input) throws Exception;

314

}

315

```

316

317

## Error Handling

318

319

The ZeroMQ integration includes several error handling mechanisms:

320

321

- **Actor Supervision**: Uses Akka's supervision strategies to handle receiver actor failures

322

- **Connection Failures**: Automatic reconnection attempts when ZeroMQ publisher becomes unavailable

323

- **Message Processing Errors**: Supervisor strategies can be configured to restart, resume, or stop on processing failures

324

- **Storage Failures**: RDD storage level configuration controls data replication and recovery options

325

326

**Common Error Scenarios:**

327

328

- **Network Issues**: Connection timeouts or network partitions to ZeroMQ publisher

329

- **Message Format Errors**: Invalid message formats that cannot be processed by bytesToObjects converter

330

- **Resource Exhaustion**: Memory or disk space issues during message buffering

331

- **Publisher Unavailable**: ZeroMQ publisher process crashes or becomes unreachable

332

333

**Configuration Options:**

334

335

- Use `SupervisorStrategy.defaultStrategy` for standard fault tolerance

336

- Configure custom `StorageLevel` with replication for high availability

337

- Implement robust `bytesToObjects` functions with proper error handling