or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

index.mddocs/

0

# Spark Streaming Kafka 0.10+ Integration

1

2

Apache Spark Streaming integration with Kafka 0.10+ that provides exactly-once semantics and high-performance real-time data processing. This library offers a direct approach to Kafka integration with better performance and reliability compared to earlier receiver-based approaches.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kafka-0-10_2.11

7

- **Package Type**: Maven

8

- **Language**: Scala (with Java API support)

9

- **Group ID**: org.apache.spark

10

- **Version**: 2.4.8

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.spark</groupId>

15

<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

16

<version>2.4.8</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```scala

23

import org.apache.spark.streaming.kafka010._

24

```

25

26

For specific components:

27

28

```scala

29

import org.apache.spark.streaming.kafka010.KafkaUtils

30

import org.apache.spark.streaming.kafka010.LocationStrategies

31

import org.apache.spark.streaming.kafka010.ConsumerStrategies

32

```

33

34

Java imports:

35

36

```java

37

import org.apache.spark.streaming.kafka010.*;

38

```

39

40

## Basic Usage

41

42

```scala

43

import org.apache.spark.streaming.kafka010._

44

import org.apache.kafka.clients.consumer.ConsumerRecord

45

import org.apache.kafka.common.serialization.StringDeserializer

46

47

val kafkaParams = Map[String, Object](

48

"bootstrap.servers" -> "localhost:9092",

49

"key.deserializer" -> classOf[StringDeserializer],

50

"value.deserializer" -> classOf[StringDeserializer],

51

"group.id" -> "spark-streaming-group",

52

"auto.offset.reset" -> "latest",

53

"enable.auto.commit" -> (false: java.lang.Boolean)

54

)

55

56

val topics = Array("topic1", "topic2")

57

val stream = KafkaUtils.createDirectStream[String, String](

58

streamingContext,

59

LocationStrategies.PreferConsistent,

60

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

61

)

62

63

stream.foreachRDD { rdd =>

64

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

65

// Process RDD

66

rdd.foreach(record => println(s"${record.key}: ${record.value}"))

67

// Commit offsets after processing

68

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

69

}

70

```

71

72

## Architecture

73

74

The Spark Streaming Kafka integration is built around several key components:

75

76

- **KafkaUtils**: Primary factory object for creating Kafka RDDs and DStreams

77

- **DirectKafkaInputDStream**: Core streaming implementation providing exactly-once semantics

78

- **KafkaRDD**: Batch-oriented RDD for precise offset control

79

- **Location Strategies**: Control consumer placement for optimal performance

80

- **Consumer Strategies**: Flexible consumer configuration (Subscribe, SubscribePattern, Assign)

81

- **Offset Management**: Precise offset control and commit functionality

82

- **Rate Control**: Per-partition rate limiting and backpressure support

83

84

## Capabilities

85

86

### Stream Creation

87

88

Core functionality for creating Kafka DStreams with exactly-once semantics and configurable consumer strategies.

89

90

```scala { .api }

91

object KafkaUtils {

92

def createDirectStream[K, V](

93

ssc: StreamingContext,

94

locationStrategy: LocationStrategy,

95

consumerStrategy: ConsumerStrategy[K, V]

96

): InputDStream[ConsumerRecord[K, V]]

97

98

def createDirectStream[K, V](

99

ssc: StreamingContext,

100

locationStrategy: LocationStrategy,

101

consumerStrategy: ConsumerStrategy[K, V],

102

perPartitionConfig: PerPartitionConfig

103

): InputDStream[ConsumerRecord[K, V]]

104

}

105

```

106

107

[Stream Creation](./stream-creation.md)

108

109

### Batch Processing

110

111

Batch-oriented interface for consuming specific offset ranges from Kafka with full control over exactly-once semantics.

112

113

```scala { .api }

114

object KafkaUtils {

115

def createRDD[K, V](

116

sc: SparkContext,

117

kafkaParams: java.util.Map[String, Object],

118

offsetRanges: Array[OffsetRange],

119

locationStrategy: LocationStrategy

120

): RDD[ConsumerRecord[K, V]]

121

}

122

```

123

124

[Batch Processing](./batch-processing.md)

125

126

### Location Strategies

127

128

Control how Kafka consumers are scheduled across Spark executors for optimal performance and data locality.

129

130

```scala { .api }

131

object LocationStrategies {

132

def PreferBrokers: LocationStrategy

133

def PreferConsistent: LocationStrategy

134

def PreferFixed(hostMap: Map[TopicPartition, String]): LocationStrategy

135

}

136

```

137

138

[Location Strategies](./location-strategies.md)

139

140

### Consumer Strategies

141

142

Flexible consumer configuration supporting topic subscription, pattern-based subscription, and partition assignment.

143

144

```scala { .api }

145

object ConsumerStrategies {

146

def Subscribe[K, V](

147

topics: Iterable[String],

148

kafkaParams: Map[String, Object]

149

): ConsumerStrategy[K, V]

150

151

def SubscribePattern[K, V](

152

pattern: java.util.regex.Pattern,

153

kafkaParams: Map[String, Object]

154

): ConsumerStrategy[K, V]

155

156

def Assign[K, V](

157

topicPartitions: Iterable[TopicPartition],

158

kafkaParams: Map[String, Object]

159

): ConsumerStrategy[K, V]

160

}

161

```

162

163

[Consumer Strategies](./consumer-strategies.md)

164

165

### Offset Management

166

167

Precise offset tracking and management for exactly-once processing guarantees and reliable stream processing.

168

169

```scala { .api }

170

final class OffsetRange(

171

val topic: String,

172

val partition: Int,

173

val fromOffset: Long,

174

val untilOffset: Long

175

) {

176

def topicPartition(): TopicPartition

177

def count(): Long

178

}

179

180

trait HasOffsetRanges {

181

def offsetRanges: Array[OffsetRange]

182

}

183

184

trait CanCommitOffsets {

185

def commitAsync(offsetRanges: Array[OffsetRange]): Unit

186

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit

187

}

188

```

189

190

[Offset Management](./offset-management.md)

191

192

## Types

193

194

### Core Types

195

196

```scala { .api }

197

// Kafka consumer record type (from kafka-clients)

198

import org.apache.kafka.clients.consumer.ConsumerRecord

199

200

// Kafka TopicPartition (from kafka-clients)

201

import org.apache.kafka.common.TopicPartition

202

203

// Spark streaming context

204

import org.apache.spark.streaming.StreamingContext

205

206

// Spark context for batch operations

207

import org.apache.spark.SparkContext

208

```

209

210

### Configuration Types

211

212

```scala { .api }

213

abstract class PerPartitionConfig extends Serializable {

214

def maxRatePerPartition(topicPartition: TopicPartition): Long

215

def minRatePerPartition(topicPartition: TopicPartition): Long = 1L

216

}

217

```

218

219

## Configuration

220

221

### Spark Configuration Parameters

222

223

Key Spark configuration parameters that affect Kafka integration behavior:

224

225

```scala { .api }

226

// Rate limiting per partition (default: 0 = unlimited)

227

"spark.streaming.kafka.maxRatePerPartition" -> "1000"

228

229

// Minimum rate per partition (default: 1)

230

"spark.streaming.kafka.minRatePerPartition" -> "1"

231

232

// Consumer poll timeout in milliseconds (default: 120000)

233

"spark.streaming.kafka.consumer.poll.ms" -> "120000"

234

235

// Allow non-consecutive offsets (default: false)

236

"spark.streaming.kafka.allowNonConsecutiveOffsets" -> "false"

237

238

// Consumer cache configuration

239

"spark.streaming.kafka.consumer.cache.enabled" -> "true"

240

"spark.streaming.kafka.consumer.cache.capacity" -> "64"

241

"spark.streaming.kafka.consumer.cache.timeout" -> "300s"

242

```

243

244

### Automatic Parameter Handling

245

246

The Kafka integration automatically modifies certain consumer parameters on executors for reliability:

247

248

- `enable.auto.commit` is always set to `false` on executors

249

- `auto.offset.reset` is set to `none` on executors

250

- `group.id` is prefixed with `spark-executor-` on executors

251

- `receive.buffer.bytes` is set to minimum 65536 (KAFKA-3135 workaround)

252

253

These modifications ensure proper exactly-once semantics and prevent consumer conflicts between driver and executors.

254

255

## Error Handling

256

257

Common exceptions thrown by the Kafka integration:

258

259

- **NoOffsetForPartitionException**: Thrown when no offset is available for a partition and auto.offset.reset is "none"

260

- **AssertionError**: Thrown when using PreferBrokers without proper host mapping

261

- **IllegalArgumentException**: Thrown for invalid Kafka parameters or configurations

262

263

Always handle these exceptions appropriately and ensure proper Kafka configuration, especially `bootstrap.servers` and consumer group settings.