0
# Stream Creation
1
2
Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies. Provides both streaming (DStream) and batch (RDD) interfaces for consuming Kafka data.
3
4
## Capabilities
5
6
### Direct Stream Creation (Scala)
7
8
Creates a DStream where each Kafka topic/partition corresponds to an RDD partition, enabling efficient parallel processing.
9
10
```scala { .api }
11
def createDirectStream[K, V](
12
ssc: StreamingContext,
13
locationStrategy: LocationStrategy,
14
consumerStrategy: ConsumerStrategy[K, V]
15
): InputDStream[ConsumerRecord[K, V]]
16
```
17
18
**Parameters:**
19
- `ssc`: StreamingContext - The Spark Streaming context
20
- `locationStrategy`: LocationStrategy - How to schedule consumers (use LocationStrategies.PreferConsistent in most cases)
21
- `consumerStrategy`: ConsumerStrategy[K, V] - How to create and configure consumers (use ConsumerStrategies.Subscribe in most cases)
22
- Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records
23
24
**Usage Example:**
25
26
```scala
27
import org.apache.spark.streaming.kafka010._
28
import org.apache.spark.streaming.{StreamingContext, Seconds}
29
import org.apache.kafka.clients.consumer.ConsumerRecord
30
import org.apache.kafka.common.serialization.StringDeserializer
31
32
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
33
34
val kafkaParams = Map[String, Object](
35
"bootstrap.servers" -> "localhost:9092",
36
"key.deserializer" -> classOf[StringDeserializer],
37
"value.deserializer" -> classOf[StringDeserializer],
38
"group.id" -> "my-group",
39
"auto.offset.reset" -> "latest"
40
)
41
42
val topics = Array("topic1", "topic2")
43
44
val stream = KafkaUtils.createDirectStream[String, String](
45
ssc,
46
LocationStrategies.PreferConsistent,
47
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
48
)
49
50
// The stream produces ConsumerRecord objects
51
stream.foreachRDD { rdd =>
52
rdd.foreach { record: ConsumerRecord[String, String] =>
53
println(s"Topic: ${record.topic}, Partition: ${record.partition}, " +
54
s"Offset: ${record.offset}, Key: ${record.key}, Value: ${record.value}")
55
}
56
}
57
```
58
59
### Direct Stream Creation with Per-Partition Config (Scala)
60
61
Creates a DStream with custom per-partition configuration for rate limiting and other settings.
62
63
```scala { .api }
64
def createDirectStream[K, V](
65
ssc: StreamingContext,
66
locationStrategy: LocationStrategy,
67
consumerStrategy: ConsumerStrategy[K, V],
68
perPartitionConfig: PerPartitionConfig
69
): InputDStream[ConsumerRecord[K, V]]
70
```
71
72
**Parameters:**
73
- `ssc`: StreamingContext - The Spark Streaming context
74
- `locationStrategy`: LocationStrategy - How to schedule consumers
75
- `consumerStrategy`: ConsumerStrategy[K, V] - How to create and configure consumers
76
- `perPartitionConfig`: PerPartitionConfig - Per-partition configuration settings
77
- Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records
78
79
**Usage Example:**
80
81
```scala
82
import org.apache.spark.streaming.kafka010._
83
84
// Custom per-partition configuration
85
class CustomPerPartitionConfig extends PerPartitionConfig {
86
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
87
// Different rates for different partitions
88
if (topicPartition.topic() == "high-volume-topic") 1000 else 500
89
}
90
91
override def minRatePerPartition(topicPartition: TopicPartition): Long = 10
92
}
93
94
val stream = KafkaUtils.createDirectStream[String, String](
95
ssc,
96
LocationStrategies.PreferConsistent,
97
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
98
new CustomPerPartitionConfig()
99
)
100
```
101
102
### Direct Stream Creation (Java)
103
104
Java API for creating direct streams from Kafka.
105
106
```scala { .api }
107
def createDirectStream[K, V](
108
jssc: JavaStreamingContext,
109
locationStrategy: LocationStrategy,
110
consumerStrategy: ConsumerStrategy[K, V]
111
): JavaInputDStream[ConsumerRecord[K, V]]
112
113
def createDirectStream[K, V](
114
jssc: JavaStreamingContext,
115
locationStrategy: LocationStrategy,
116
consumerStrategy: ConsumerStrategy[K, V],
117
perPartitionConfig: PerPartitionConfig
118
): JavaInputDStream[ConsumerRecord[K, V]]
119
```
120
121
**Usage Example (Java):**
122
123
```java
124
import org.apache.spark.streaming.kafka010.*;
125
import org.apache.spark.streaming.api.java.JavaStreamingContext;
126
import org.apache.kafka.clients.consumer.ConsumerRecord;
127
import org.apache.kafka.common.serialization.StringDeserializer;
128
import java.util.*;
129
130
JavaStreamingContext jssc = new JavaStreamingContext(spark.sparkContext(), Durations.seconds(5));
131
132
Map<String, Object> kafkaParams = new HashMap<>();
133
kafkaParams.put("bootstrap.servers", "localhost:9092");
134
kafkaParams.put("key.deserializer", StringDeserializer.class);
135
kafkaParams.put("value.deserializer", StringDeserializer.class);
136
kafkaParams.put("group.id", "my-group");
137
kafkaParams.put("auto.offset.reset", "latest");
138
139
Collection<String> topics = Arrays.asList("topic1", "topic2");
140
141
JavaInputDStream<ConsumerRecord<String, String>> stream =
142
KafkaUtils.createDirectStream(
143
jssc,
144
LocationStrategies.PreferConsistent(),
145
ConsumerStrategies.Subscribe(topics, kafkaParams)
146
);
147
```
148
149
### RDD Creation (Scala)
150
151
Creates a batch-oriented RDD interface for consuming from Kafka with specified offset ranges for exactly-once semantics.
152
153
```scala { .api }
154
def createRDD[K, V](
155
sc: SparkContext,
156
kafkaParams: java.util.Map[String, Object],
157
offsetRanges: Array[OffsetRange],
158
locationStrategy: LocationStrategy
159
): RDD[ConsumerRecord[K, V]]
160
```
161
162
**Parameters:**
163
- `sc`: SparkContext - The Spark context
164
- `kafkaParams`: java.util.Map[String, Object] - Kafka configuration parameters (must include "bootstrap.servers")
165
- `offsetRanges`: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD
166
- `locationStrategy`: LocationStrategy - How to schedule consumers
167
- Returns: RDD[ConsumerRecord[K, V]] - RDD of Kafka consumer records
168
169
**Usage Example:**
170
171
```scala
172
import org.apache.spark.streaming.kafka010._
173
import org.apache.kafka.common.TopicPartition
174
175
val kafkaParams = new java.util.HashMap[String, Object]()
176
kafkaParams.put("bootstrap.servers", "localhost:9092")
177
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
178
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
179
kafkaParams.put("group.id", "batch-group")
180
181
// Define specific offset ranges to consume
182
val offsetRanges = Array(
183
OffsetRange("topic1", 0, 0, 1000), // partition 0, offsets 0-999
184
OffsetRange("topic1", 1, 500, 1500), // partition 1, offsets 500-1499
185
OffsetRange("topic2", 0, 0, 2000) // topic2 partition 0, offsets 0-1999
186
)
187
188
val rdd = KafkaUtils.createRDD[String, String](
189
spark.sparkContext,
190
kafkaParams,
191
offsetRanges,
192
LocationStrategies.PreferConsistent
193
)
194
195
// Process the RDD
196
rdd.foreach { record =>
197
println(s"Consumed: ${record.key} -> ${record.value}")
198
}
199
```
200
201
### RDD Creation (Java)
202
203
Java API for creating batch-oriented RDDs from Kafka.
204
205
```scala { .api }
206
def createRDD[K, V](
207
jsc: JavaSparkContext,
208
kafkaParams: java.util.Map[String, Object],
209
offsetRanges: Array[OffsetRange],
210
locationStrategy: LocationStrategy
211
): JavaRDD[ConsumerRecord[K, V]]
212
```
213
214
**Usage Example (Java):**
215
216
```java
217
import org.apache.spark.api.java.JavaSparkContext;
218
import org.apache.spark.streaming.kafka010.*;
219
220
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
221
222
OffsetRange[] offsetRanges = {
223
OffsetRange.create("topic1", 0, 0L, 1000L),
224
OffsetRange.create("topic1", 1, 500L, 1500L)
225
};
226
227
JavaRDD<ConsumerRecord<String, String>> rdd =
228
KafkaUtils.createRDD(
229
jsc,
230
kafkaParams,
231
offsetRanges,
232
LocationStrategies.PreferConsistent()
233
);
234
```
235
236
## Rate Control
237
238
The direct stream supports automatic rate limiting through Spark's backpressure mechanism:
239
240
- Set `spark.streaming.backpressure.enabled=true` to enable backpressure
241
- Set `spark.streaming.kafka.maxRatePerPartition` to limit messages per second per partition
242
- Set `spark.streaming.kafka.minRatePerPartition` to set minimum processing rate
243
- Use custom `PerPartitionConfig` for fine-grained per-partition control
244
245
## Error Handling
246
247
The stream creation handles several Kafka-specific edge cases:
248
249
- **KAFKA-3370 Workaround**: Handles NoOffsetForPartitionException when auto.offset.reset=none
250
- **Parameter Validation**: Automatically fixes problematic Kafka parameters for executors
251
- **Consumer Caching**: Manages consumer lifecycle and caching for performance
252
- **Offset Management**: Ensures proper offset handling for exactly-once semantics