or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

index.mddocs/

0

# Spark Streaming Kafka 0.10 Assembly

1

2

Apache Spark's integration with Apache Kafka 0.10 for reliable distributed streaming data processing. This assembly JAR packages the core Kafka 0.10 streaming library and all its dependencies into a single deployable JAR file, enabling consumption of data from Kafka topics as Spark DStreams and RDDs with exactly-once delivery semantics.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-kafka-0-10-assembly_2.13

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Version**: 4.0.0

10

- **License**: Apache-2.0

11

- **Installation**: Add to Maven dependencies or include JAR in Spark classpath

12

13

```xml

14

<dependency>

15

<groupId>org.apache.spark</groupId>

16

<artifactId>spark-streaming-kafka-0-10-assembly_2.13</artifactId>

17

<version>4.0.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```scala

24

import org.apache.spark.streaming.kafka010._

25

```

26

27

For specific functionality:

28

29

```scala

30

import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies, ConsumerStrategies}

31

import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, CanCommitOffsets}

32

```

33

34

## Basic Usage

35

36

### Streaming with Direct Stream

37

38

```scala

39

import org.apache.spark.streaming.kafka010._

40

import org.apache.spark.streaming.{StreamingContext, Seconds}

41

import org.apache.kafka.clients.consumer.ConsumerRecord

42

import org.apache.kafka.common.serialization.StringDeserializer

43

import java.util.HashMap

44

45

val ssc = new StreamingContext(spark.sparkContext, Seconds(5))

46

47

val kafkaParams = new HashMap[String, Object]()

48

kafkaParams.put("bootstrap.servers", "localhost:9092")

49

kafkaParams.put("key.deserializer", classOf[StringDeserializer])

50

kafkaParams.put("value.deserializer", classOf[StringDeserializer])

51

kafkaParams.put("group.id", "my-group")

52

kafkaParams.put("auto.offset.reset", "latest")

53

54

val topics = Array("topic1", "topic2")

55

56

val stream = KafkaUtils.createDirectStream[String, String](

57

ssc,

58

LocationStrategies.PreferConsistent,

59

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

60

)

61

62

// Process the stream

63

stream.foreachRDD { rdd =>

64

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

65

rdd.foreach { record =>

66

println(s"${record.key}: ${record.value}")

67

}

68

// Commit offsets if needed

69

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

70

}

71

72

ssc.start()

73

ssc.awaitTermination()

74

```

75

76

### Batch Processing with RDD

77

78

```scala

79

import org.apache.spark.streaming.kafka010._

80

import org.apache.kafka.common.TopicPartition

81

82

val offsetRanges = Array(

83

OffsetRange("topic1", 0, 0, 1000),

84

OffsetRange("topic1", 1, 0, 1000)

85

)

86

87

val rdd = KafkaUtils.createRDD[String, String](

88

spark.sparkContext,

89

kafkaParams,

90

offsetRanges,

91

LocationStrategies.PreferConsistent

92

)

93

94

// Process the RDD

95

rdd.foreach { record =>

96

println(s"${record.key}: ${record.value}")

97

}

98

```

99

100

## Architecture

101

102

The Kafka 0.10 integration is built around several key components:

103

104

- **KafkaUtils**: Main entry point providing factory methods for creating Kafka RDDs and DStreams

105

- **Location Strategies**: Control where Kafka consumers are scheduled on executors for optimal performance

106

- **Consumer Strategies**: Define how Kafka consumers are created and configured (Subscribe, SubscribePattern, Assign)

107

- **Offset Management**: Handle offset ranges and commit operations for exactly-once semantics

108

- **Per-Partition Configuration**: Configure rate limiting and other settings on a per-partition basis

109

- **Consumer Caching**: Optimize performance by caching Kafka consumers across partition computations

110

111

## Capabilities

112

113

### Stream Creation

114

115

Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies.

116

117

```scala { .api }

118

object KafkaUtils {

119

// Scala Stream Creation

120

def createDirectStream[K, V](

121

ssc: StreamingContext,

122

locationStrategy: LocationStrategy,

123

consumerStrategy: ConsumerStrategy[K, V]

124

): InputDStream[ConsumerRecord[K, V]]

125

126

def createDirectStream[K, V](

127

ssc: StreamingContext,

128

locationStrategy: LocationStrategy,

129

consumerStrategy: ConsumerStrategy[K, V],

130

perPartitionConfig: PerPartitionConfig

131

): InputDStream[ConsumerRecord[K, V]]

132

133

// Scala RDD Creation

134

def createRDD[K, V](

135

sc: SparkContext,

136

kafkaParams: java.util.Map[String, Object],

137

offsetRanges: Array[OffsetRange],

138

locationStrategy: LocationStrategy

139

): RDD[ConsumerRecord[K, V]]

140

}

141

```

142

143

[Stream Creation](./stream-creation.md)

144

145

### Location Strategies

146

147

Strategies for scheduling Kafka consumers on executors to optimize performance and network locality.

148

149

```scala { .api }

150

object LocationStrategies {

151

def PreferBrokers: LocationStrategy

152

def PreferConsistent: LocationStrategy

153

def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy

154

def PreferFixed(hostMap: java.util.Map[TopicPartition, String]): LocationStrategy

155

}

156

```

157

158

[Location Strategies](./location-strategies.md)

159

160

### Consumer Strategies

161

162

Configuration strategies for creating and managing Kafka consumers with different subscription patterns.

163

164

```scala { .api }

165

object ConsumerStrategies {

166

// Subscribe to specific topics

167

def Subscribe[K, V](

168

topics: Iterable[String],

169

kafkaParams: collection.Map[String, Object]

170

): ConsumerStrategy[K, V]

171

172

// Subscribe to topics matching a pattern

173

def SubscribePattern[K, V](

174

pattern: java.util.regex.Pattern,

175

kafkaParams: collection.Map[String, Object]

176

): ConsumerStrategy[K, V]

177

178

// Assign specific topic partitions

179

def Assign[K, V](

180

topicPartitions: Iterable[TopicPartition],

181

kafkaParams: collection.Map[String, Object]

182

): ConsumerStrategy[K, V]

183

}

184

```

185

186

[Consumer Strategies](./consumer-strategies.md)

187

188

### Offset Management

189

190

Comprehensive offset range management and commit operations for exactly-once processing semantics.

191

192

```scala { .api }

193

final class OffsetRange {

194

val topic: String

195

val partition: Int

196

val fromOffset: Long

197

val untilOffset: Long

198

199

def topicPartition(): TopicPartition

200

def count(): Long

201

}

202

203

object OffsetRange {

204

def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

205

def create(topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long): OffsetRange

206

}

207

208

trait HasOffsetRanges {

209

def offsetRanges: Array[OffsetRange]

210

}

211

212

trait CanCommitOffsets {

213

def commitAsync(offsetRanges: Array[OffsetRange]): Unit

214

def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit

215

}

216

```

217

218

[Offset Management](./offset-management.md)

219

220

### Per-Partition Configuration

221

222

Configuration interface for controlling processing rates and other settings on a per-partition basis.

223

224

```scala { .api }

225

abstract class PerPartitionConfig extends Serializable {

226

def maxRatePerPartition(topicPartition: TopicPartition): Long

227

def minRatePerPartition(topicPartition: TopicPartition): Long

228

}

229

```

230

231

[Per-Partition Configuration](./per-partition-config.md)

232

233

## Configuration Parameters

234

235

The integration supports numerous Spark configuration parameters for fine-tuning performance:

236

237

- `spark.streaming.kafka.maxRatePerPartition`: Maximum records per second per partition (default: 0 = unlimited)

238

- `spark.streaming.kafka.minRatePerPartition`: Minimum records per second per partition (default: 1)

239

- `spark.streaming.kafka.consumer.cache.enabled`: Enable consumer caching (default: true)

240

- `spark.streaming.kafka.consumer.cache.maxCapacity`: Maximum cached consumers (default: 64)

241

- `spark.streaming.kafka.consumer.cache.initialCapacity`: Initial cache capacity (default: 16)

242

- `spark.streaming.kafka.consumer.cache.loadFactor`: Cache load factor (default: 0.75)

243

- `spark.streaming.kafka.consumer.poll.ms`: Consumer poll timeout in milliseconds (optional)

244

- `spark.streaming.kafka.allowNonConsecutiveOffsets`: Allow non-consecutive offsets (default: false)

245

246

## Types

247

248

```scala { .api }

249

// Core Kafka types (from kafka-clients dependency)

250

import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetCommitCallback}

251

import org.apache.kafka.common.TopicPartition

252

253

// Spark types

254

import org.apache.spark.{SparkContext, SparkConf}

255

import org.apache.spark.streaming.{StreamingContext, Time}

256

import org.apache.spark.streaming.dstream.InputDStream

257

import org.apache.spark.rdd.RDD

258

import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}

259

import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaInputDStream}

260

261

// Java types

262

import java.util.{Map => JMap, Collection => JCollection}

263

import java.util.regex.Pattern

264

import java.{lang => jl}

265

```