or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-rdd.mddirect-streaming.mdindex.mdjava-api.mdoffset-management.mdreceiver-streaming.md

receiver-streaming.mddocs/

0

# Receiver-based Streaming

1

2

Receiver-based streaming creates input streams that use long-running receivers to pull messages from Kafka brokers. This is the legacy approach that relies on Zookeeper for coordination and consumer group management.

3

4

## Capabilities

5

6

### Basic Receiver Stream Creation

7

8

Creates a receiver-based stream with String key/value types.

9

10

```scala { .api }

11

/**

12

* Create an input stream that pulls messages from Kafka Brokers using receivers.

13

*

14

* @param ssc StreamingContext object

15

* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)

16

* @param groupId The group id for this consumer

17

* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed

18

* in its own thread

19

* @param storageLevel Storage level to use for storing the received objects

20

* (default: StorageLevel.MEMORY_AND_DISK_SER_2)

21

* @return DStream of (Kafka message key, Kafka message value)

22

*/

23

def createStream(

24

ssc: StreamingContext,

25

zkQuorum: String,

26

groupId: String,

27

topics: Map[String, Int],

28

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

29

): ReceiverInputDStream[(String, String)]

30

```

31

32

**Usage Example:**

33

34

```scala

35

import org.apache.spark.streaming.kafka._

36

import org.apache.spark.storage.StorageLevel

37

38

val zkQuorum = "localhost:2181"

39

val groupId = "my-consumer-group"

40

val topics = Map("user-events" -> 1, "purchase-events" -> 2)

41

42

val stream = KafkaUtils.createStream(

43

streamingContext, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2

44

)

45

46

stream.foreachRDD { rdd =>

47

rdd.foreach { case (key, value) =>

48

println(s"Received: $key -> $value")

49

}

50

}

51

```

52

53

### Generic Receiver Stream Creation

54

55

Creates a receiver-based stream with custom key/value types and decoders.

56

57

```scala { .api }

58

/**

59

* Create an input stream that pulls messages from Kafka Brokers using receivers

60

* with custom key/value types.

61

*

62

* @param ssc StreamingContext object

63

* @param kafkaParams Map of kafka configuration parameters

64

* @param topics Map of (topic_name -> numPartitions) to consume

65

* @param storageLevel Storage level to use for storing the received objects

66

* @tparam K type of Kafka message key

67

* @tparam V type of Kafka message value

68

* @tparam U type of Kafka message key decoder

69

* @tparam T type of Kafka message value decoder

70

* @return DStream of (Kafka message key, Kafka message value)

71

*/

72

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag](

73

ssc: StreamingContext,

74

kafkaParams: Map[String, String],

75

topics: Map[String, Int],

76

storageLevel: StorageLevel

77

): ReceiverInputDStream[(K, V)]

78

```

79

80

**Usage Example:**

81

82

```scala

83

import kafka.serializer.{StringDecoder, DefaultDecoder}

84

85

val kafkaParams = Map[String, String](

86

"zookeeper.connect" -> "localhost:2181",

87

"group.id" -> "my-consumer-group",

88

"zookeeper.connection.timeout.ms" -> "10000"

89

)

90

91

val topics = Map("binary-data" -> 1)

92

93

val binaryStream = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](

94

streamingContext, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER

95

)

96

97

binaryStream.foreachRDD { rdd =>

98

rdd.foreach { case (keyBytes, valueBytes) =>

99

println(s"Key length: ${keyBytes.length}, Value length: ${valueBytes.length}")

100

}

101

}

102

```

103

104

### Java Receiver Stream API

105

106

Java-friendly API for receiver-based stream creation.

107

108

```java { .api }

109

/**

110

* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).

111

*

112

* @param jssc JavaStreamingContext object

113

* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)

114

* @param groupId The group id for this consumer

115

* @param topics Map of (topic_name -> numPartitions) to consume

116

* @return DStream of (Kafka message key, Kafka message value)

117

*/

118

public static JavaPairReceiverInputDStream<String, String> createStream(

119

JavaStreamingContext jssc,

120

String zkQuorum,

121

String groupId,

122

Map<String, Integer> topics

123

)

124

125

/**

126

* Create an input stream with custom storage level (Java API).

127

*/

128

public static JavaPairReceiverInputDStream<String, String> createStream(

129

JavaStreamingContext jssc,

130

String zkQuorum,

131

String groupId,

132

Map<String, Integer> topics,

133

StorageLevel storageLevel

134

)

135

```

136

137

**Java Usage Example:**

138

139

```java

140

import org.apache.spark.streaming.kafka.KafkaUtils;

141

import org.apache.spark.storage.StorageLevel;

142

143

Map<String, Integer> topics = new HashMap<>();

144

topics.put("my-topic", 1);

145

146

JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(

147

jssc,

148

"localhost:2181",

149

"my-consumer-group",

150

topics,

151

StorageLevel.MEMORY_AND_DISK_SER_2()

152

);

153

154

stream.foreachRDD(rdd -> {

155

rdd.foreach(record -> {

156

System.out.println("Key: " + record._1 + ", Value: " + record._2);

157

});

158

});

159

```

160

161

### Advanced Java Receiver Stream with Custom Types

162

163

Java API with custom key/value types and decoders.

164

165

```java { .api }

166

/**

167

* Create an input stream with custom types and decoders (Java API).

168

*

169

* @param jssc JavaStreamingContext object

170

* @param keyTypeClass Key type of DStream

171

* @param valueTypeClass Value type of DStream

172

* @param keyDecoderClass Type of kafka key decoder

173

* @param valueDecoderClass Type of kafka value decoder

174

* @param kafkaParams Map of kafka configuration parameters

175

* @param topics Map of (topic_name -> numPartitions) to consume

176

* @param storageLevel RDD storage level

177

* @tparam K type of Kafka message key

178

* @tparam V type of Kafka message value

179

* @tparam U type of Kafka message key decoder

180

* @tparam T type of Kafka message value decoder

181

* @return DStream of (Kafka message key, Kafka message value)

182

*/

183

public static <K, V, U extends Decoder<K>, T extends Decoder<V>>

184

JavaPairReceiverInputDStream<K, V> createStream(

185

JavaStreamingContext jssc,

186

Class<K> keyTypeClass,

187

Class<V> valueTypeClass,

188

Class<U> keyDecoderClass,

189

Class<T> valueDecoderClass,

190

Map<String, String> kafkaParams,

191

Map<String, Integer> topics,

192

StorageLevel storageLevel

193

)

194

```

195

196

## Key Features

197

198

### Write-Ahead Logs (WAL)

199

- Automatic WAL enabled when configured: `spark.streaming.receiver.writeAheadLog.enable=true`

200

- Ensures data recovery in case of driver failures

201

- Trades performance for reliability

202

203

### Consumer Group Management

204

- Automatic consumer group coordination through Zookeeper

205

- Offset management handled by Kafka consumer group

206

- Multiple instances can share the same consumer group for load balancing

207

208

### Storage Levels

209

Common storage levels for different reliability/performance trade-offs:

210

211

```scala

212

StorageLevel.MEMORY_AND_DISK_SER_2 // Default: Memory + disk, serialized, replicated

213

StorageLevel.MEMORY_AND_DISK_SER // Memory + disk, serialized, not replicated

214

StorageLevel.MEMORY_ONLY_SER_2 // Memory only, serialized, replicated

215

StorageLevel.DISK_ONLY_2 // Disk only, replicated

216

```

217

218

## Configuration Parameters

219

220

### Kafka Parameters

221

```scala

222

Map(

223

"zookeeper.connect" -> "localhost:2181",

224

"group.id" -> "my-consumer-group",

225

"zookeeper.connection.timeout.ms" -> "10000",

226

"zookeeper.session.timeout.ms" -> "10000",

227

"zookeeper.sync.time.ms" -> "2000",

228

"auto.commit.interval.ms" -> "1000"

229

)

230

```

231

232

### Topic Partition Mapping

233

```scala

234

// Map of topic name to number of threads/partitions to consume

235

Map(

236

"topic1" -> 1, // 1 thread for topic1

237

"topic2" -> 2, // 2 threads for topic2

238

"topic3" -> 4 // 4 threads for topic3

239

)

240

```

241

242

## Limitations

243

244

### At-Least-Once Semantics

245

- Receiver-based approach provides at-least-once delivery guarantees

246

- Duplicate messages possible during failures

247

- Use direct streaming for exactly-once semantics

248

249

### Scalability Constraints

250

- Number of partitions limited by number of cores available

251

- Each partition consumed in separate thread

252

- Receiver runs on executor, consuming cluster resources

253

254

### Zookeeper Dependency

255

- Requires Zookeeper for consumer coordination

256

- Additional operational complexity

257

- Single point of failure for consumer group management

258

259

## Migration to Direct Streaming

260

261

For new applications, consider using direct streaming instead:

262

263

```scala

264

// Old receiver-based approach

265

val receiverStream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)

266

267

// New direct streaming approach

268

val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

269

ssc, kafkaParams, topics.keySet

270

)

271

```

272

273

Benefits of migration:

274

- Exactly-once semantics

275

- Better performance and throughput

276

- No Zookeeper dependency for streaming

277

- More control over offset management