0
# Direct Streaming
1
2
Direct streaming creates input streams that directly query Kafka brokers without using any receiver. This approach provides exactly-once semantics, lower latency, and better throughput compared to receiver-based streaming.
3
4
## Capabilities
5
6
### Basic Direct Stream Creation
7
8
Creates a direct stream with automatic offset management.
9
10
```scala { .api }
11
/**
12
* Create an input stream that directly pulls messages from Kafka Brokers
13
* without using any receiver. This stream can guarantee that each message
14
* from Kafka is included in transformations exactly once.
15
*
16
* @param ssc StreamingContext object
17
* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"
18
* or "bootstrap.servers" to be set with Kafka broker(s) specified in
19
* host1:port1,host2:port2 form. If not starting from checkpoint,
20
* "auto.offset.reset" may be set to "largest" or "smallest"
21
* @param topics Names of the topics to consume
22
* @tparam K type of Kafka message key
23
* @tparam V type of Kafka message value
24
* @tparam KD type of Kafka message key decoder
25
* @tparam VD type of Kafka message value decoder
26
* @return DStream of (Kafka message key, Kafka message value)
27
*/
28
def createDirectStream[
29
K: ClassTag,
30
V: ClassTag,
31
KD <: Decoder[K]: ClassTag,
32
VD <: Decoder[V]: ClassTag
33
](
34
ssc: StreamingContext,
35
kafkaParams: Map[String, String],
36
topics: Set[String]
37
): InputDStream[(K, V)]
38
```
39
40
**Usage Example:**
41
42
```scala
43
import org.apache.spark.streaming.kafka._
44
import kafka.serializer.StringDecoder
45
46
val kafkaParams = Map[String, String](
47
"metadata.broker.list" -> "localhost:9092",
48
"auto.offset.reset" -> "largest"
49
)
50
val topics = Set("user-events", "purchase-events")
51
52
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
53
streamingContext, kafkaParams, topics
54
)
55
56
stream.foreachRDD { rdd =>
57
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
58
rdd.foreach { case (key, value) =>
59
println(s"Key: $key, Value: $value")
60
}
61
// Commit offsets to external store if needed
62
offsetRanges.foreach(println)
63
}
64
```
65
66
### Advanced Direct Stream with Custom Message Handler
67
68
Creates a direct stream with explicit starting offsets and custom message transformation.
69
70
```scala { .api }
71
/**
72
* Create an input stream that directly pulls messages from Kafka Brokers
73
* with explicit starting offsets and custom message handler.
74
*
75
* @param ssc StreamingContext object
76
* @param kafkaParams Kafka configuration parameters
77
* @param fromOffsets Per-topic/partition Kafka offsets defining the inclusive starting point
78
* @param messageHandler Function for translating each message and metadata into desired type
79
* @tparam K type of Kafka message key
80
* @tparam V type of Kafka message value
81
* @tparam KD type of Kafka message key decoder
82
* @tparam VD type of Kafka message value decoder
83
* @tparam R type returned by messageHandler
84
* @return DStream of R
85
*/
86
def createDirectStream[
87
K: ClassTag,
88
V: ClassTag,
89
KD <: Decoder[K]: ClassTag,
90
VD <: Decoder[V]: ClassTag,
91
R: ClassTag
92
](
93
ssc: StreamingContext,
94
kafkaParams: Map[String, String],
95
fromOffsets: Map[TopicAndPartition, Long],
96
messageHandler: MessageAndMetadata[K, V] => R
97
): InputDStream[R]
98
```
99
100
**Usage Example:**
101
102
```scala
103
import kafka.common.TopicAndPartition
104
105
val fromOffsets = Map(
106
TopicAndPartition("events", 0) -> 1000L,
107
TopicAndPartition("events", 1) -> 2000L
108
)
109
110
val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
111
s"${mmd.topic}:${mmd.partition}:${mmd.offset} -> ${mmd.key()}:${mmd.message()}"
112
}
113
114
val customStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
115
streamingContext, kafkaParams, fromOffsets, messageHandler
116
)
117
118
customStream.print()
119
```
120
121
### Java Direct Stream API
122
123
Java-friendly API for direct stream creation.
124
125
```java { .api }
126
/**
127
* Create an input stream that directly pulls messages from Kafka Brokers
128
* without using any receiver (Java API).
129
*
130
* @param jssc JavaStreamingContext object
131
* @param keyClass Class of the keys in the Kafka records
132
* @param valueClass Class of the values in the Kafka records
133
* @param keyDecoderClass Class of the key decoder
134
* @param valueDecoderClass Class type of the value decoder
135
* @param kafkaParams Kafka configuration parameters
136
* @param topics Names of the topics to consume
137
* @tparam K type of Kafka message key
138
* @tparam V type of Kafka message value
139
* @tparam KD type of Kafka message key decoder
140
* @tparam VD type of Kafka message value decoder
141
* @return DStream of (Kafka message key, Kafka message value)
142
*/
143
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
144
JavaPairInputDStream<K, V> createDirectStream(
145
JavaStreamingContext jssc,
146
Class<K> keyClass,
147
Class<V> valueClass,
148
Class<KD> keyDecoderClass,
149
Class<VD> valueDecoderClass,
150
Map<String, String> kafkaParams,
151
Set<String> topics
152
)
153
```
154
155
**Java Usage Example:**
156
157
```java
158
import org.apache.spark.streaming.kafka.KafkaUtils;
159
import kafka.serializer.StringDecoder;
160
161
Map<String, String> kafkaParams = new HashMap<>();
162
kafkaParams.put("metadata.broker.list", "localhost:9092");
163
kafkaParams.put("auto.offset.reset", "largest");
164
165
Set<String> topics = Collections.singleton("my-topic");
166
167
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
168
jssc,
169
String.class,
170
String.class,
171
StringDecoder.class,
172
StringDecoder.class,
173
kafkaParams,
174
topics
175
);
176
177
stream.foreachRDD(rdd -> {
178
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
179
rdd.foreach(record -> {
180
System.out.println("Key: " + record._1 + ", Value: " + record._2);
181
});
182
});
183
```
184
185
## Key Features
186
187
### Exactly-Once Semantics
188
- No receivers: Stream directly queries Kafka brokers
189
- Manual offset tracking: Offsets tracked by stream itself, not Zookeeper
190
- Failure recovery: Enable checkpointing in StreamingContext for driver failure recovery
191
- Idempotent output: Ensure output operations are idempotent for end-to-end exactly-once semantics
192
193
### Rate Limiting
194
Configure maximum messages per partition per second using `spark.streaming.kafka.maxRatePerPartition`:
195
196
```scala
197
val conf = new SparkConf()
198
.setAppName("KafkaDirectStream")
199
.set("spark.streaming.kafka.maxRatePerPartition", "1000")
200
```
201
202
### Offset Access
203
Access offset information from generated RDDs:
204
205
```scala
206
stream.foreachRDD { rdd =>
207
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
208
offsetRanges.foreach { offsetRange =>
209
println(s"${offsetRange.topic} ${offsetRange.partition} " +
210
s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")
211
}
212
}
213
```
214
215
## Configuration Parameters
216
217
### Required Parameters
218
```scala
219
Map(
220
"metadata.broker.list" -> "host1:port1,host2:port2" // Kafka brokers (NOT zookeeper)
221
)
222
```
223
224
### Optional Parameters
225
```scala
226
Map(
227
"auto.offset.reset" -> "largest", // or "smallest"
228
"group.id" -> "my-consumer-group",
229
"enable.auto.commit" -> "false"
230
)
231
```
232
233
## Error Handling
234
235
- **SparkException**: Thrown for connectivity issues, invalid offsets, or configuration problems
236
- **Offset validation**: Automatic validation that requested offsets are available on Kafka brokers
237
- **Leader discovery**: Automatic discovery of partition leaders with fallback handling