Apache Spark streaming integration with Kafka 0.10+ providing exactly-once semantics and high-performance data consumption
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-kafka-0-10_2-11@2.4.00
# Spark Streaming Kafka 0.10+ Integration
1
2
Apache Spark Streaming integration with Kafka 0.10+ that provides exactly-once semantics and high-performance real-time data processing. This library offers a direct approach to Kafka integration with better performance and reliability compared to earlier receiver-based approaches.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-kafka-0-10_2.11
7
- **Package Type**: Maven
8
- **Language**: Scala (with Java API support)
9
- **Group ID**: org.apache.spark
10
- **Version**: 2.4.8
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
16
<version>2.4.8</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.spark.streaming.kafka010._
24
```
25
26
For specific components:
27
28
```scala
29
import org.apache.spark.streaming.kafka010.KafkaUtils
30
import org.apache.spark.streaming.kafka010.LocationStrategies
31
import org.apache.spark.streaming.kafka010.ConsumerStrategies
32
```
33
34
Java imports:
35
36
```java
37
import org.apache.spark.streaming.kafka010.*;
38
```
39
40
## Basic Usage
41
42
```scala
43
import org.apache.spark.streaming.kafka010._
44
import org.apache.kafka.clients.consumer.ConsumerRecord
45
import org.apache.kafka.common.serialization.StringDeserializer
46
47
val kafkaParams = Map[String, Object](
48
"bootstrap.servers" -> "localhost:9092",
49
"key.deserializer" -> classOf[StringDeserializer],
50
"value.deserializer" -> classOf[StringDeserializer],
51
"group.id" -> "spark-streaming-group",
52
"auto.offset.reset" -> "latest",
53
"enable.auto.commit" -> (false: java.lang.Boolean)
54
)
55
56
val topics = Array("topic1", "topic2")
57
val stream = KafkaUtils.createDirectStream[String, String](
58
streamingContext,
59
LocationStrategies.PreferConsistent,
60
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
61
)
62
63
stream.foreachRDD { rdd =>
64
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
65
// Process RDD
66
rdd.foreach(record => println(s"${record.key}: ${record.value}"))
67
// Commit offsets after processing
68
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
69
}
70
```
71
72
## Architecture
73
74
The Spark Streaming Kafka integration is built around several key components:
75
76
- **KafkaUtils**: Primary factory object for creating Kafka RDDs and DStreams
77
- **DirectKafkaInputDStream**: Core streaming implementation providing exactly-once semantics
78
- **KafkaRDD**: Batch-oriented RDD for precise offset control
79
- **Location Strategies**: Control consumer placement for optimal performance
80
- **Consumer Strategies**: Flexible consumer configuration (Subscribe, SubscribePattern, Assign)
81
- **Offset Management**: Precise offset control and commit functionality
82
- **Rate Control**: Per-partition rate limiting and backpressure support
83
84
## Capabilities
85
86
### Stream Creation
87
88
Core functionality for creating Kafka DStreams with exactly-once semantics and configurable consumer strategies.
89
90
```scala { .api }
91
object KafkaUtils {
92
def createDirectStream[K, V](
93
ssc: StreamingContext,
94
locationStrategy: LocationStrategy,
95
consumerStrategy: ConsumerStrategy[K, V]
96
): InputDStream[ConsumerRecord[K, V]]
97
98
def createDirectStream[K, V](
99
ssc: StreamingContext,
100
locationStrategy: LocationStrategy,
101
consumerStrategy: ConsumerStrategy[K, V],
102
perPartitionConfig: PerPartitionConfig
103
): InputDStream[ConsumerRecord[K, V]]
104
}
105
```
106
107
[Stream Creation](./stream-creation.md)
108
109
### Batch Processing
110
111
Batch-oriented interface for consuming specific offset ranges from Kafka with full control over exactly-once semantics.
112
113
```scala { .api }
114
object KafkaUtils {
115
def createRDD[K, V](
116
sc: SparkContext,
117
kafkaParams: java.util.Map[String, Object],
118
offsetRanges: Array[OffsetRange],
119
locationStrategy: LocationStrategy
120
): RDD[ConsumerRecord[K, V]]
121
}
122
```
123
124
[Batch Processing](./batch-processing.md)
125
126
### Location Strategies
127
128
Control how Kafka consumers are scheduled across Spark executors for optimal performance and data locality.
129
130
```scala { .api }
131
object LocationStrategies {
132
def PreferBrokers: LocationStrategy
133
def PreferConsistent: LocationStrategy
134
def PreferFixed(hostMap: Map[TopicPartition, String]): LocationStrategy
135
}
136
```
137
138
[Location Strategies](./location-strategies.md)
139
140
### Consumer Strategies
141
142
Flexible consumer configuration supporting topic subscription, pattern-based subscription, and partition assignment.
143
144
```scala { .api }
145
object ConsumerStrategies {
146
def Subscribe[K, V](
147
topics: Iterable[String],
148
kafkaParams: Map[String, Object]
149
): ConsumerStrategy[K, V]
150
151
def SubscribePattern[K, V](
152
pattern: java.util.regex.Pattern,
153
kafkaParams: Map[String, Object]
154
): ConsumerStrategy[K, V]
155
156
def Assign[K, V](
157
topicPartitions: Iterable[TopicPartition],
158
kafkaParams: Map[String, Object]
159
): ConsumerStrategy[K, V]
160
}
161
```
162
163
[Consumer Strategies](./consumer-strategies.md)
164
165
### Offset Management
166
167
Precise offset tracking and management for exactly-once processing guarantees and reliable stream processing.
168
169
```scala { .api }
170
final class OffsetRange(
171
val topic: String,
172
val partition: Int,
173
val fromOffset: Long,
174
val untilOffset: Long
175
) {
176
def topicPartition(): TopicPartition
177
def count(): Long
178
}
179
180
trait HasOffsetRanges {
181
def offsetRanges: Array[OffsetRange]
182
}
183
184
trait CanCommitOffsets {
185
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
186
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
187
}
188
```
189
190
[Offset Management](./offset-management.md)
191
192
## Types
193
194
### Core Types
195
196
```scala { .api }
197
// Kafka consumer record type (from kafka-clients)
198
import org.apache.kafka.clients.consumer.ConsumerRecord
199
200
// Kafka TopicPartition (from kafka-clients)
201
import org.apache.kafka.common.TopicPartition
202
203
// Spark streaming context
204
import org.apache.spark.streaming.StreamingContext
205
206
// Spark context for batch operations
207
import org.apache.spark.SparkContext
208
```
209
210
### Configuration Types
211
212
```scala { .api }
213
abstract class PerPartitionConfig extends Serializable {
214
def maxRatePerPartition(topicPartition: TopicPartition): Long
215
def minRatePerPartition(topicPartition: TopicPartition): Long = 1L
216
}
217
```
218
219
## Configuration
220
221
### Spark Configuration Parameters
222
223
Key Spark configuration parameters that affect Kafka integration behavior:
224
225
```scala { .api }
226
// Rate limiting per partition (default: 0 = unlimited)
227
"spark.streaming.kafka.maxRatePerPartition" -> "1000"
228
229
// Minimum rate per partition (default: 1)
230
"spark.streaming.kafka.minRatePerPartition" -> "1"
231
232
// Consumer poll timeout in milliseconds (default: 120000)
233
"spark.streaming.kafka.consumer.poll.ms" -> "120000"
234
235
// Allow non-consecutive offsets (default: false)
236
"spark.streaming.kafka.allowNonConsecutiveOffsets" -> "false"
237
238
// Consumer cache configuration
239
"spark.streaming.kafka.consumer.cache.enabled" -> "true"
240
"spark.streaming.kafka.consumer.cache.capacity" -> "64"
241
"spark.streaming.kafka.consumer.cache.timeout" -> "300s"
242
```
243
244
### Automatic Parameter Handling
245
246
The Kafka integration automatically modifies certain consumer parameters on executors for reliability:
247
248
- `enable.auto.commit` is always set to `false` on executors
249
- `auto.offset.reset` is set to `none` on executors
250
- `group.id` is prefixed with `spark-executor-` on executors
251
- `receive.buffer.bytes` is set to minimum 65536 (KAFKA-3135 workaround)
252
253
These modifications ensure proper exactly-once semantics and prevent consumer conflicts between driver and executors.
254
255
## Error Handling
256
257
Common exceptions thrown by the Kafka integration:
258
259
- **NoOffsetForPartitionException**: Thrown when no offset is available for a partition and auto.offset.reset is "none"
260
- **AssertionError**: Thrown when using PreferBrokers without proper host mapping
261
- **IllegalArgumentException**: Thrown for invalid Kafka parameters or configurations
262
263
Always handle these exceptions appropriately and ensure proper Kafka configuration, especially `bootstrap.servers` and consumer group settings.