0
# Receiver-based Streaming
1
2
Receiver-based streaming creates input streams that use long-running receivers to pull messages from Kafka brokers. This is the legacy approach that relies on Zookeeper for coordination and consumer group management.
3
4
## Capabilities
5
6
### Basic Receiver Stream Creation
7
8
Creates a receiver-based stream with String key/value types.
9
10
```scala { .api }
11
/**
12
* Create an input stream that pulls messages from Kafka Brokers using receivers.
13
*
14
* @param ssc StreamingContext object
15
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
16
* @param groupId The group id for this consumer
17
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
18
* in its own thread
19
* @param storageLevel Storage level to use for storing the received objects
20
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
21
* @return DStream of (Kafka message key, Kafka message value)
22
*/
23
def createStream(
24
ssc: StreamingContext,
25
zkQuorum: String,
26
groupId: String,
27
topics: Map[String, Int],
28
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
29
): ReceiverInputDStream[(String, String)]
30
```
31
32
**Usage Example:**
33
34
```scala
35
import org.apache.spark.streaming.kafka._
36
import org.apache.spark.storage.StorageLevel
37
38
val zkQuorum = "localhost:2181"
39
val groupId = "my-consumer-group"
40
val topics = Map("user-events" -> 1, "purchase-events" -> 2)
41
42
val stream = KafkaUtils.createStream(
43
streamingContext, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2
44
)
45
46
stream.foreachRDD { rdd =>
47
rdd.foreach { case (key, value) =>
48
println(s"Received: $key -> $value")
49
}
50
}
51
```
52
53
### Generic Receiver Stream Creation
54
55
Creates a receiver-based stream with custom key/value types and decoders.
56
57
```scala { .api }
58
/**
59
* Create an input stream that pulls messages from Kafka Brokers using receivers
60
* with custom key/value types.
61
*
62
* @param ssc StreamingContext object
63
* @param kafkaParams Map of kafka configuration parameters
64
* @param topics Map of (topic_name -> numPartitions) to consume
65
* @param storageLevel Storage level to use for storing the received objects
66
* @tparam K type of Kafka message key
67
* @tparam V type of Kafka message value
68
* @tparam U type of Kafka message key decoder
69
* @tparam T type of Kafka message value decoder
70
* @return DStream of (Kafka message key, Kafka message value)
71
*/
72
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag](
73
ssc: StreamingContext,
74
kafkaParams: Map[String, String],
75
topics: Map[String, Int],
76
storageLevel: StorageLevel
77
): ReceiverInputDStream[(K, V)]
78
```
79
80
**Usage Example:**
81
82
```scala
83
import kafka.serializer.{StringDecoder, DefaultDecoder}
84
85
val kafkaParams = Map[String, String](
86
"zookeeper.connect" -> "localhost:2181",
87
"group.id" -> "my-consumer-group",
88
"zookeeper.connection.timeout.ms" -> "10000"
89
)
90
91
val topics = Map("binary-data" -> 1)
92
93
val binaryStream = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
94
streamingContext, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER
95
)
96
97
binaryStream.foreachRDD { rdd =>
98
rdd.foreach { case (keyBytes, valueBytes) =>
99
println(s"Key length: ${keyBytes.length}, Value length: ${valueBytes.length}")
100
}
101
}
102
```
103
104
### Java Receiver Stream API
105
106
Java-friendly API for receiver-based stream creation.
107
108
```java { .api }
109
/**
110
* Create an input stream that pulls messages from Kafka Brokers using receivers (Java API).
111
*
112
* @param jssc JavaStreamingContext object
113
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
114
* @param groupId The group id for this consumer
115
* @param topics Map of (topic_name -> numPartitions) to consume
116
* @return DStream of (Kafka message key, Kafka message value)
117
*/
118
public static JavaPairReceiverInputDStream<String, String> createStream(
119
JavaStreamingContext jssc,
120
String zkQuorum,
121
String groupId,
122
Map<String, Integer> topics
123
)
124
125
/**
126
* Create an input stream with custom storage level (Java API).
127
*/
128
public static JavaPairReceiverInputDStream<String, String> createStream(
129
JavaStreamingContext jssc,
130
String zkQuorum,
131
String groupId,
132
Map<String, Integer> topics,
133
StorageLevel storageLevel
134
)
135
```
136
137
**Java Usage Example:**
138
139
```java
140
import org.apache.spark.streaming.kafka.KafkaUtils;
141
import org.apache.spark.storage.StorageLevel;
142
143
Map<String, Integer> topics = new HashMap<>();
144
topics.put("my-topic", 1);
145
146
JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
147
jssc,
148
"localhost:2181",
149
"my-consumer-group",
150
topics,
151
StorageLevel.MEMORY_AND_DISK_SER_2()
152
);
153
154
stream.foreachRDD(rdd -> {
155
rdd.foreach(record -> {
156
System.out.println("Key: " + record._1 + ", Value: " + record._2);
157
});
158
});
159
```
160
161
### Advanced Java Receiver Stream with Custom Types
162
163
Java API with custom key/value types and decoders.
164
165
```java { .api }
166
/**
167
* Create an input stream with custom types and decoders (Java API).
168
*
169
* @param jssc JavaStreamingContext object
170
* @param keyTypeClass Key type of DStream
171
* @param valueTypeClass Value type of DStream
172
* @param keyDecoderClass Type of kafka key decoder
173
* @param valueDecoderClass Type of kafka value decoder
174
* @param kafkaParams Map of kafka configuration parameters
175
* @param topics Map of (topic_name -> numPartitions) to consume
176
* @param storageLevel RDD storage level
177
* @tparam K type of Kafka message key
178
* @tparam V type of Kafka message value
179
* @tparam U type of Kafka message key decoder
180
* @tparam T type of Kafka message value decoder
181
* @return DStream of (Kafka message key, Kafka message value)
182
*/
183
public static <K, V, U extends Decoder<K>, T extends Decoder<V>>
184
JavaPairReceiverInputDStream<K, V> createStream(
185
JavaStreamingContext jssc,
186
Class<K> keyTypeClass,
187
Class<V> valueTypeClass,
188
Class<U> keyDecoderClass,
189
Class<T> valueDecoderClass,
190
Map<String, String> kafkaParams,
191
Map<String, Integer> topics,
192
StorageLevel storageLevel
193
)
194
```
195
196
## Key Features
197
198
### Write-Ahead Logs (WAL)
199
- Automatic WAL enabled when configured: `spark.streaming.receiver.writeAheadLog.enable=true`
200
- Ensures data recovery in case of driver failures
201
- Trades performance for reliability
202
203
### Consumer Group Management
204
- Automatic consumer group coordination through Zookeeper
205
- Offset management handled by Kafka consumer group
206
- Multiple instances can share the same consumer group for load balancing
207
208
### Storage Levels
209
Common storage levels for different reliability/performance trade-offs:
210
211
```scala
212
StorageLevel.MEMORY_AND_DISK_SER_2 // Default: Memory + disk, serialized, replicated
213
StorageLevel.MEMORY_AND_DISK_SER // Memory + disk, serialized, not replicated
214
StorageLevel.MEMORY_ONLY_SER_2 // Memory only, serialized, replicated
215
StorageLevel.DISK_ONLY_2 // Disk only, replicated
216
```
217
218
## Configuration Parameters
219
220
### Kafka Parameters
221
```scala
222
Map(
223
"zookeeper.connect" -> "localhost:2181",
224
"group.id" -> "my-consumer-group",
225
"zookeeper.connection.timeout.ms" -> "10000",
226
"zookeeper.session.timeout.ms" -> "10000",
227
"zookeeper.sync.time.ms" -> "2000",
228
"auto.commit.interval.ms" -> "1000"
229
)
230
```
231
232
### Topic Partition Mapping
233
```scala
234
// Map of topic name to number of threads/partitions to consume
235
Map(
236
"topic1" -> 1, // 1 thread for topic1
237
"topic2" -> 2, // 2 threads for topic2
238
"topic3" -> 4 // 4 threads for topic3
239
)
240
```
241
242
## Limitations
243
244
### At-Least-Once Semantics
245
- Receiver-based approach provides at-least-once delivery guarantees
246
- Duplicate messages possible during failures
247
- Use direct streaming for exactly-once semantics
248
249
### Scalability Constraints
250
- Number of partitions limited by number of cores available
251
- Each partition consumed in separate thread
252
- Receiver runs on executor, consuming cluster resources
253
254
### Zookeeper Dependency
255
- Requires Zookeeper for consumer coordination
256
- Additional operational complexity
257
- Single point of failure for consumer group management
258
259
## Migration to Direct Streaming
260
261
For new applications, consider using direct streaming instead:
262
263
```scala
264
// Old receiver-based approach
265
val receiverStream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
266
267
// New direct streaming approach
268
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
269
ssc, kafkaParams, topics.keySet
270
)
271
```
272
273
Benefits of migration:
274
- Exactly-once semantics
275
- Better performance and throughput
276
- No Zookeeper dependency for streaming
277
- More control over offset management