0
# Apache Spark Streaming Kafka Assembly
1
2
Apache Spark Streaming Kafka Assembly provides seamless integration between Apache Kafka and Spark Streaming, enabling real-time data processing from Kafka topics. This shaded JAR assembly includes all necessary dependencies to avoid version conflicts, supporting both receiver-based and direct (no-receiver) streaming approaches with exactly-once processing semantics.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kafka-assembly_2.10
7
- **Package Type**: maven
8
- **Language**: Scala (with Java API)
9
- **Installation**: `spark-streaming-kafka-assembly_2.10-1.6.3.jar` (add to classpath)
10
- **Maven Coordinates**: `org.apache.spark:spark-streaming-kafka-assembly_2.10:1.6.3`
11
12
## Core Imports
13
14
### Scala
15
```scala
16
import org.apache.spark.streaming.kafka._
17
import org.apache.spark.streaming.kafka.KafkaUtils
18
import kafka.serializer.{StringDecoder, DefaultDecoder}
19
```
20
21
### Java
22
```java
23
import org.apache.spark.streaming.kafka.KafkaUtils;
24
import org.apache.spark.streaming.kafka.OffsetRange;
25
import org.apache.spark.streaming.kafka.Broker;
26
import org.apache.spark.streaming.kafka.HasOffsetRanges;
27
```
28
29
## Basic Usage
30
31
### Direct Streaming (Recommended)
32
```scala
33
import org.apache.spark.streaming.kafka._
34
import kafka.serializer.StringDecoder
35
36
val kafkaParams = Map[String, String](
37
"metadata.broker.list" -> "localhost:9092",
38
"auto.offset.reset" -> "largest"
39
)
40
val topics = Set("my-topic")
41
42
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
43
streamingContext, kafkaParams, topics
44
)
45
46
stream.foreachRDD { rdd =>
47
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
48
// Process the data
49
rdd.foreach(println)
50
}
51
```
52
53
### Receiver-based Streaming (Legacy)
54
```scala
55
val kafkaParams = Map[String, String](
56
"zookeeper.connect" -> "localhost:2181",
57
"group.id" -> "my-consumer-group"
58
)
59
val topicMap = Map("my-topic" -> 1)
60
61
val stream = KafkaUtils.createStream(streamingContext, "localhost:2181", "my-consumer-group", topicMap)
62
stream.print()
63
```
64
65
## Architecture
66
67
The Spark Streaming Kafka integration is built around several key components:
68
69
- **KafkaUtils**: Central factory object providing static methods for creating streams and RDDs
70
- **Direct Streaming**: No-receiver approach that directly queries Kafka brokers for exactly-once semantics
71
- **Receiver-based Streaming**: Traditional approach using long-running receivers (legacy)
72
- **Offset Management**: Manual control over Kafka offsets for exactly-once processing guarantees
73
- **Type Safety**: Full support for custom key/value types with pluggable serializer/deserializer framework
74
75
## Capabilities
76
77
### Direct Stream Creation
78
79
Creates input streams that directly pull messages from Kafka brokers without receivers, providing exactly-once semantics and manual offset control.
80
81
```scala { .api }
82
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
83
ssc: StreamingContext,
84
kafkaParams: Map[String, String],
85
topics: Set[String]
86
): InputDStream[(K, V)]
87
88
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
89
ssc: StreamingContext,
90
kafkaParams: Map[String, String],
91
fromOffsets: Map[TopicAndPartition, Long],
92
messageHandler: MessageAndMetadata[K, V] => R
93
): InputDStream[R]
94
```
95
96
[Direct Streaming](./direct-streaming.md)
97
98
### Receiver-based Stream Creation
99
100
Creates input streams using long-running receivers that pull messages from Kafka brokers through Zookeeper coordination (legacy approach).
101
102
```scala { .api }
103
def createStream[K, V, U <: Decoder[K], T <: Decoder[V]](
104
ssc: StreamingContext,
105
kafkaParams: Map[String, String],
106
topics: Map[String, Int],
107
storageLevel: StorageLevel
108
): ReceiverInputDStream[(K, V)]
109
```
110
111
[Receiver-based Streaming](./receiver-streaming.md)
112
113
### Batch RDD Creation
114
115
Creates RDDs from Kafka using specific offset ranges for batch processing scenarios.
116
117
```scala { .api }
118
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
119
sc: SparkContext,
120
kafkaParams: Map[String, String],
121
offsetRanges: Array[OffsetRange]
122
): RDD[(K, V)]
123
124
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
125
sc: SparkContext,
126
kafkaParams: Map[String, String],
127
offsetRanges: Array[OffsetRange],
128
leaders: Map[TopicAndPartition, Broker],
129
messageHandler: MessageAndMetadata[K, V] => R
130
): RDD[R]
131
```
132
133
[Batch RDD Processing](./batch-rdd.md)
134
135
### Java API
136
137
Complete Java API with type-safe wrappers for all Scala functionality, supporting both streaming and batch processing scenarios.
138
139
```java { .api }
140
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
141
JavaPairInputDStream<K, V> createDirectStream(
142
JavaStreamingContext jssc,
143
Class<K> keyClass,
144
Class<V> valueClass,
145
Class<KD> keyDecoderClass,
146
Class<VD> valueDecoderClass,
147
Map<String, String> kafkaParams,
148
Set<String> topics
149
)
150
```
151
152
[Java API](./java-api.md)
153
154
### Offset Management
155
156
Utilities for managing Kafka offsets, including offset range representation and cluster interaction helpers.
157
158
```scala { .api }
159
final class OffsetRange(
160
val topic: String,
161
val partition: Int,
162
val fromOffset: Long,
163
val untilOffset: Long
164
)
165
166
trait HasOffsetRanges {
167
def offsetRanges: Array[OffsetRange]
168
}
169
```
170
171
[Offset Management](./offset-management.md)
172
173
## Types
174
175
### Common Type Parameters
176
```scala { .api }
177
// K - Type of Kafka message key
178
// V - Type of Kafka message value
179
// KD - Type of Kafka message key decoder (extends kafka.serializer.Decoder[K])
180
// VD - Type of Kafka message value decoder (extends kafka.serializer.Decoder[V])
181
// R - Type returned by message handler function
182
```
183
184
### Kafka Dependencies
185
```scala { .api }
186
// From Kafka library
187
kafka.common.TopicAndPartition
188
kafka.message.MessageAndMetadata[K, V]
189
kafka.serializer.Decoder[T]
190
kafka.serializer.StringDecoder
191
kafka.serializer.DefaultDecoder
192
```
193
194
### Spark Dependencies
195
```scala { .api }
196
// Core Spark classes
197
org.apache.spark.streaming.StreamingContext
198
org.apache.spark.SparkContext
199
org.apache.spark.rdd.RDD[T]
200
org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
201
org.apache.spark.storage.StorageLevel
202
203
// Spark Streaming Kafka classes (from this package)
204
org.apache.spark.streaming.kafka.OffsetRange
205
org.apache.spark.streaming.kafka.Broker
206
org.apache.spark.streaming.kafka.HasOffsetRanges
207
```