Assembly JAR providing Apache Spark integration with Apache Kafka 0.10 for reliable distributed streaming data processing
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10-assembly@4.0.00
# Spark Streaming Kafka 0.10 Assembly
1
2
Apache Spark's integration with Apache Kafka 0.10 for reliable distributed streaming data processing. This assembly JAR packages the core Kafka 0.10 streaming library and all its dependencies into a single deployable JAR file, enabling consumption of data from Kafka topics as Spark DStreams and RDDs with exactly-once delivery semantics.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kafka-0-10-assembly_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Version**: 4.0.0
10
- **License**: Apache-2.0
11
- **Installation**: Add to Maven dependencies or include JAR in Spark classpath
12
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-streaming-kafka-0-10-assembly_2.13</artifactId>
17
<version>4.0.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```scala
24
import org.apache.spark.streaming.kafka010._
25
```
26
27
For specific functionality:
28
29
```scala
30
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, ConsumerStrategies}
31
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, CanCommitOffsets}
32
```
33
34
## Basic Usage
35
36
### Streaming with Direct Stream
37
38
```scala
39
import org.apache.spark.streaming.kafka010._
40
import org.apache.spark.streaming.{StreamingContext, Seconds}
41
import org.apache.kafka.clients.consumer.ConsumerRecord
42
import org.apache.kafka.common.serialization.StringDeserializer
43
import java.util.HashMap
44
45
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
46
47
val kafkaParams = new HashMap[String, Object]()
48
kafkaParams.put("bootstrap.servers", "localhost:9092")
49
kafkaParams.put("key.deserializer", classOf[StringDeserializer])
50
kafkaParams.put("value.deserializer", classOf[StringDeserializer])
51
kafkaParams.put("group.id", "my-group")
52
kafkaParams.put("auto.offset.reset", "latest")
53
54
val topics = Array("topic1", "topic2")
55
56
val stream = KafkaUtils.createDirectStream[String, String](
57
ssc,
58
LocationStrategies.PreferConsistent,
59
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
60
)
61
62
// Process the stream
63
stream.foreachRDD { rdd =>
64
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
65
rdd.foreach { record =>
66
println(s"${record.key}: ${record.value}")
67
}
68
// Commit offsets if needed
69
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
70
}
71
72
ssc.start()
73
ssc.awaitTermination()
74
```
75
76
### Batch Processing with RDD
77
78
```scala
79
import org.apache.spark.streaming.kafka010._
80
import org.apache.kafka.common.TopicPartition
81
82
val offsetRanges = Array(
83
OffsetRange("topic1", 0, 0, 1000),
84
OffsetRange("topic1", 1, 0, 1000)
85
)
86
87
val rdd = KafkaUtils.createRDD[String, String](
88
spark.sparkContext,
89
kafkaParams,
90
offsetRanges,
91
LocationStrategies.PreferConsistent
92
)
93
94
// Process the RDD
95
rdd.foreach { record =>
96
println(s"${record.key}: ${record.value}")
97
}
98
```
99
100
## Architecture
101
102
The Kafka 0.10 integration is built around several key components:
103
104
- **KafkaUtils**: Main entry point providing factory methods for creating Kafka RDDs and DStreams
105
- **Location Strategies**: Control where Kafka consumers are scheduled on executors for optimal performance
106
- **Consumer Strategies**: Define how Kafka consumers are created and configured (Subscribe, SubscribePattern, Assign)
107
- **Offset Management**: Handle offset ranges and commit operations for exactly-once semantics
108
- **Per-Partition Configuration**: Configure rate limiting and other settings on a per-partition basis
109
- **Consumer Caching**: Optimize performance by caching Kafka consumers across partition computations
110
111
## Capabilities
112
113
### Stream Creation
114
115
Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies.
116
117
```scala { .api }
118
object KafkaUtils {
119
// Scala Stream Creation
120
def createDirectStream[K, V](
121
ssc: StreamingContext,
122
locationStrategy: LocationStrategy,
123
consumerStrategy: ConsumerStrategy[K, V]
124
): InputDStream[ConsumerRecord[K, V]]
125
126
def createDirectStream[K, V](
127
ssc: StreamingContext,
128
locationStrategy: LocationStrategy,
129
consumerStrategy: ConsumerStrategy[K, V],
130
perPartitionConfig: PerPartitionConfig
131
): InputDStream[ConsumerRecord[K, V]]
132
133
// Scala RDD Creation
134
def createRDD[K, V](
135
sc: SparkContext,
136
kafkaParams: java.util.Map[String, Object],
137
offsetRanges: Array[OffsetRange],
138
locationStrategy: LocationStrategy
139
): RDD[ConsumerRecord[K, V]]
140
}
141
```
142
143
[Stream Creation](./stream-creation.md)
144
145
### Location Strategies
146
147
Strategies for scheduling Kafka consumers on executors to optimize performance and network locality.
148
149
```scala { .api }
150
object LocationStrategies {
151
def PreferBrokers: LocationStrategy
152
def PreferConsistent: LocationStrategy
153
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy
154
def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy
155
}
156
```
157
158
[Location Strategies](./location-strategies.md)
159
160
### Consumer Strategies
161
162
Configuration strategies for creating and managing Kafka consumers with different subscription patterns.
163
164
```scala { .api }
165
object ConsumerStrategies {
166
// Subscribe to specific topics
167
def Subscribe[K, V](
168
topics: Iterable[String],
169
kafkaParams: collection.Map[String, Object]
170
): ConsumerStrategy[K, V]
171
172
// Subscribe to topics matching a pattern
173
def SubscribePattern[K, V](
174
pattern: java.util.regex.Pattern,
175
kafkaParams: collection.Map[String, Object]
176
): ConsumerStrategy[K, V]
177
178
// Assign specific topic partitions
179
def Assign[K, V](
180
topicPartitions: Iterable[TopicPartition],
181
kafkaParams: collection.Map[String, Object]
182
): ConsumerStrategy[K, V]
183
}
184
```
185
186
[Consumer Strategies](./consumer-strategies.md)
187
188
### Offset Management
189
190
Comprehensive offset range management and commit operations for exactly-once processing semantics.
191
192
```scala { .api }
193
final class OffsetRange {
194
val topic: String
195
val partition: Int
196
val fromOffset: Long
197
val untilOffset: Long
198
199
def topicPartition(): TopicPartition
200
def count(): Long
201
}
202
203
object OffsetRange {
204
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange
205
def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange
206
}
207
208
trait HasOffsetRanges {
209
def offsetRanges: Array[OffsetRange]
210
}
211
212
trait CanCommitOffsets {
213
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
214
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
215
}
216
```
217
218
[Offset Management](./offset-management.md)
219
220
### Per-Partition Configuration
221
222
Configuration interface for controlling processing rates and other settings on a per-partition basis.
223
224
```scala { .api }
225
abstract class PerPartitionConfig extends Serializable {
226
def maxRatePerPartition(topicPartition: TopicPartition): Long
227
def minRatePerPartition(topicPartition: TopicPartition): Long
228
}
229
```
230
231
[Per-Partition Configuration](./per-partition-config.md)
232
233
## Configuration Parameters
234
235
The integration supports numerous Spark configuration parameters for fine-tuning performance:
236
237
- `spark.streaming.kafka.maxRatePerPartition`: Maximum records per second per partition (default: 0 = unlimited)
238
- `spark.streaming.kafka.minRatePerPartition`: Minimum records per second per partition (default: 1)
239
- `spark.streaming.kafka.consumer.cache.enabled`: Enable consumer caching (default: true)
240
- `spark.streaming.kafka.consumer.cache.maxCapacity`: Maximum cached consumers (default: 64)
241
- `spark.streaming.kafka.consumer.cache.initialCapacity`: Initial cache capacity (default: 16)
242
- `spark.streaming.kafka.consumer.cache.loadFactor`: Cache load factor (default: 0.75)
243
- `spark.streaming.kafka.consumer.poll.ms`: Consumer poll timeout in milliseconds (optional)
244
- `spark.streaming.kafka.allowNonConsecutiveOffsets`: Allow non-consecutive offsets (default: false)
245
246
## Types
247
248
```scala { .api }
249
// Core Kafka types (from kafka-clients dependency)
250
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetCommitCallback}
251
import org.apache.kafka.common.TopicPartition
252
253
// Spark types
254
import org.apache.spark.{SparkContext, SparkConf}
255
import org.apache.spark.streaming.{StreamingContext, Time}
256
import org.apache.spark.streaming.dstream.InputDStream
257
import org.apache.spark.rdd.RDD
258
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
259
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaInputDStream}
260
261
// Java types
262
import java.util.{Map => JMap, Collection => JCollection}
263
import java.util.regex.Pattern
264
import java.{lang => jl}
265
```