0
# Stream Creation
1
2
The stream creation capability provides methods for creating Kafka DStreams that deliver exactly-once semantics and high-performance real-time data processing. These DStreams automatically handle consumer lifecycle, offset management, and integration with Spark's rate limiting mechanisms.
3
4
## Core Functions
5
6
### createDirectStream (Basic)
7
8
Creates a DStream where each Kafka topic/partition corresponds to an RDD partition with default rate limiting configuration.
9
10
```scala { .api }
11
def createDirectStream[K, V](
12
ssc: StreamingContext,
13
locationStrategy: LocationStrategy,
14
consumerStrategy: ConsumerStrategy[K, V]
15
): InputDStream[ConsumerRecord[K, V]]
16
```
17
18
**Parameters:**
19
- `ssc`: StreamingContext - The Spark Streaming context
20
- `locationStrategy`: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent in most cases)
21
- `consumerStrategy`: ConsumerStrategy[K, V] - Consumer configuration strategy (use ConsumerStrategies.Subscribe in most cases)
22
23
**Returns:** InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream
24
25
### createDirectStream (With Configuration)
26
27
Creates a DStream with custom per-partition configuration for advanced rate limiting and performance tuning.
28
29
```scala { .api }
30
def createDirectStream[K, V](
31
ssc: StreamingContext,
32
locationStrategy: LocationStrategy,
33
consumerStrategy: ConsumerStrategy[K, V],
34
perPartitionConfig: PerPartitionConfig
35
): InputDStream[ConsumerRecord[K, V]]
36
```
37
38
**Parameters:**
39
- `ssc`: StreamingContext - The Spark Streaming context
40
- `locationStrategy`: LocationStrategy - Consumer placement strategy
41
- `consumerStrategy`: ConsumerStrategy[K, V] - Consumer configuration strategy
42
- `perPartitionConfig`: PerPartitionConfig - Custom per-partition configuration
43
44
**Returns:** InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream with custom configuration
45
46
## Java API
47
48
### createDirectStream (Java - Basic)
49
50
Java version of the basic stream creation method.
51
52
```java { .api }
53
public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
54
JavaStreamingContext jssc,
55
LocationStrategy locationStrategy,
56
ConsumerStrategy<K, V> consumerStrategy
57
)
58
```
59
60
### createDirectStream (Java - With Configuration)
61
62
Java version with custom per-partition configuration.
63
64
```java { .api }
65
public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(
66
JavaStreamingContext jssc,
67
LocationStrategy locationStrategy,
68
ConsumerStrategy<K, V> consumerStrategy,
69
PerPartitionConfig perPartitionConfig
70
)
71
```
72
73
## Usage Examples
74
75
### Basic Stream Creation
76
77
```scala
78
import org.apache.spark.streaming.kafka010._
79
import org.apache.kafka.clients.consumer.ConsumerRecord
80
import org.apache.kafka.common.serialization.StringDeserializer
81
82
val kafkaParams = Map[String, Object](
83
"bootstrap.servers" -> "localhost:9092",
84
"key.deserializer" -> classOf[StringDeserializer],
85
"value.deserializer" -> classOf[StringDeserializer],
86
"group.id" -> "spark-streaming-group",
87
"auto.offset.reset" -> "latest",
88
"enable.auto.commit" -> (false: java.lang.Boolean)
89
)
90
91
val topics = Array("orders", "payments", "users")
92
val stream = KafkaUtils.createDirectStream[String, String](
93
streamingContext,
94
LocationStrategies.PreferConsistent,
95
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
96
)
97
98
// Process the stream
99
stream.foreachRDD { rdd =>
100
if (!rdd.isEmpty()) {
101
rdd.foreach { record =>
102
println(s"Topic: ${record.topic}, Key: ${record.key}, Value: ${record.value}")
103
}
104
}
105
}
106
```
107
108
### Stream with Custom Rate Limiting
109
110
```scala
111
import org.apache.spark.streaming.kafka010._
112
113
// Custom per-partition configuration
114
class CustomPerPartitionConfig extends PerPartitionConfig {
115
def maxRatePerPartition(topicPartition: TopicPartition): Long = {
116
topicPartition.topic() match {
117
case "high-volume-topic" => 1000 // Higher rate for high-volume topic
118
case _ => 500 // Default rate
119
}
120
}
121
}
122
123
val customConfig = new CustomPerPartitionConfig()
124
125
val stream = KafkaUtils.createDirectStream[String, String](
126
streamingContext,
127
LocationStrategies.PreferConsistent,
128
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),
129
customConfig
130
)
131
```
132
133
### Java Stream Creation
134
135
```java
136
import org.apache.spark.streaming.kafka010.*;
137
import org.apache.kafka.clients.consumer.ConsumerRecord;
138
import org.apache.kafka.common.serialization.StringDeserializer;
139
140
Map<String, Object> kafkaParams = new HashMap<>();
141
kafkaParams.put("bootstrap.servers", "localhost:9092");
142
kafkaParams.put("key.deserializer", StringDeserializer.class);
143
kafkaParams.put("value.deserializer", StringDeserializer.class);
144
kafkaParams.put("group.id", "spark-streaming-group");
145
kafkaParams.put("auto.offset.reset", "latest");
146
kafkaParams.put("enable.auto.commit", false);
147
148
Collection<String> topics = Arrays.asList("topic1", "topic2");
149
JavaInputDStream<ConsumerRecord<String, String>> stream =
150
KafkaUtils.createDirectStream(
151
javaStreamingContext,
152
LocationStrategies.PreferConsistent(),
153
ConsumerStrategies.Subscribe(topics, kafkaParams)
154
);
155
156
stream.foreachRDD(rdd -> {
157
rdd.foreach(record -> {
158
System.out.println("Key: " + record.key() + ", Value: " + record.value());
159
});
160
});
161
```
162
163
## Configuration Notes
164
165
### Required Kafka Parameters
166
167
- `bootstrap.servers`: Kafka broker addresses (required)
168
- `key.deserializer`: Key deserializer class (required)
169
- `value.deserializer`: Value deserializer class (required)
170
- `group.id`: Consumer group ID (recommended)
171
172
### Recommended Kafka Parameters
173
174
- `enable.auto.commit`: Set to `false` for manual offset management
175
- `auto.offset.reset`: Set to "latest" or "earliest" based on requirements
176
177
### Rate Limiting
178
179
The configuration `spark.streaming.kafka.maxRatePerPartition` controls the maximum number of messages per second that each partition will accept. Set to 0 for unlimited rate.
180
181
## Important Notes
182
183
- All stream creation methods are marked as `@Experimental` in Spark 2.4.8
184
- The direct approach provides exactly-once semantics when combined with proper offset management
185
- Consumer instances are automatically managed and cached for performance
186
- Supports both Scala and Java APIs with appropriate type conversions
187
- DStreams created with these methods implement both `HasOffsetRanges` and `CanCommitOffsets` interfaces