or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

index.mddocs/

0

# Apache Spark Kafka Integration

1

2

Apache Spark Kafka Integration provides comprehensive structured streaming and batch data processing capabilities for Apache Kafka. This module enables seamless reading from and writing to Kafka topics using Spark DataFrames and Datasets with support for micro-batch processing, continuous streaming, and batch operations with complete offset management and fault tolerance.

3

4

## Package Information

5

6

- **Package Name**: spark-sql-kafka-0-10_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: Add to your Spark application dependencies

10

- **Coordinate**: `org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8`

11

12

## Core Imports

13

14

```scala

15

import org.apache.spark.sql.SparkSession

16

import org.apache.spark.sql.streaming.Trigger

17

import org.apache.kafka.common.TopicPartition

18

```

19

20

## Basic Usage

21

22

### Reading from Kafka (Streaming)

23

24

```scala

25

import org.apache.spark.sql.SparkSession

26

27

val spark = SparkSession.builder

28

.appName("KafkaExample")

29

.getOrCreate()

30

31

// Read from Kafka topic

32

val df = spark

33

.readStream

34

.format("kafka")

35

.option("kafka.bootstrap.servers", "localhost:9092")

36

.option("subscribe", "my-topic")

37

.option("startingOffsets", "latest")

38

.load()

39

40

// Process the stream

41

val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

42

.writeStream

43

.outputMode("append")

44

.format("console")

45

.trigger(Trigger.ProcessingTime("10 seconds"))

46

.start()

47

48

query.awaitTermination()

49

```

50

51

### Writing to Kafka

52

53

```scala

54

// Write DataFrame to Kafka

55

df.select(

56

col("id").cast("string").as("key"),

57

to_json(struct(col("*"))).as("value")

58

)

59

.write

60

.format("kafka")

61

.option("kafka.bootstrap.servers", "localhost:9092")

62

.option("topic", "output-topic")

63

.save()

64

```

65

66

### Batch Processing

67

68

```scala

69

// Read from Kafka for batch processing

70

val batchDF = spark

71

.read

72

.format("kafka")

73

.option("kafka.bootstrap.servers", "localhost:9092")

74

.option("subscribe", "my-topic")

75

.option("startingOffsets", "earliest")

76

.option("endingOffsets", "latest")

77

.load()

78

```

79

80

## Architecture

81

82

The Spark Kafka integration is built around several key components:

83

84

- **Data Source Provider**: `KafkaSourceProvider` implements Spark's DataSource API for both V1 and V2

85

- **Consumer Strategies**: Flexible patterns for topic assignment (Assign, Subscribe, SubscribePattern)

86

- **Offset Management**: Comprehensive offset tracking with configurable start/end positions

87

- **Streaming Readers**: Micro-batch and continuous processing capabilities

88

- **Producer Integration**: Efficient writing with connection pooling and caching

89

- **Schema Management**: Fixed schema for Kafka records with proper type handling

90

91

## Kafka Record Schema

92

93

All Kafka records follow this fixed schema:

94

95

```scala

96

StructType(Seq(

97

StructField("key", BinaryType), // Message key (nullable)

98

StructField("value", BinaryType), // Message value (nullable)

99

StructField("topic", StringType), // Topic name

100

StructField("partition", IntegerType), // Partition number

101

StructField("offset", LongType), // Message offset

102

StructField("timestamp", TimestampType), // Message timestamp

103

StructField("timestampType", IntegerType) // Timestamp type (0=CreateTime, 1=LogAppendTime)

104

))

105

```

106

107

## Capabilities

108

109

### Consumer Strategies

110

111

Flexible patterns for consuming data from Kafka topics, supporting subscription by topic names, regex patterns, or specific partition assignments.

112

113

```scala { .api }

114

sealed trait ConsumerStrategy {

115

def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]

116

}

117

118

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy

119

case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy

120

case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

121

```

122

123

[Consumer Strategies](./consumer-strategies.md)

124

125

### Offset Management

126

127

Comprehensive offset tracking and range limit handling for precise control over data consumption boundaries.

128

129

```scala { .api }

130

sealed trait KafkaOffsetRangeLimit

131

case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

132

case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

133

case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

134

135

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2

136

```

137

138

[Offset Management](./offset-management.md)

139

140

### Streaming Data Sources

141

142

Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance and exactly-once semantics.

143

144

```scala { .api }

145

class KafkaMicroBatchReader extends MicroBatchReader with Logging {

146

def setOffsetRange(start: Option[Offset], end: Offset): Unit

147

def planInputPartitions(): ju.List[InputPartition[InternalRow]]

148

def readSchema(): StructType

149

}

150

151

class KafkaContinuousReader extends ContinuousReader with Logging {

152

def readSchema: StructType

153

def setStartOffset(start: Option[Offset]): Unit

154

def planInputPartitions(): ju.List[InputPartition[InternalRow]]

155

}

156

```

157

158

[Streaming Sources](./streaming-sources.md)

159

160

### Batch Data Access

161

162

Batch relation for reading historical data from Kafka topics with configurable offset ranges.

163

164

```scala { .api }

165

class KafkaRelation extends BaseRelation with TableScan with Logging {

166

def sqlContext: SQLContext

167

def schema: StructType

168

def buildScan(): RDD[Row]

169

}

170

```

171

172

[Batch Processing](./batch-processing.md)

173

174

### Data Writing

175

176

Comprehensive writing capabilities for both streaming and batch workloads with producer connection pooling and automatic serialization.

177

178

```scala { .api }

179

class KafkaSink extends Sink with Logging {

180

def addBatch(batchId: Long, data: DataFrame): Unit

181

}

182

183

class KafkaStreamWriter extends StreamWriter {

184

def createWriterFactory(): KafkaStreamWriterFactory

185

def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit

186

}

187

188

object KafkaWriter extends Logging {

189

def write(sparkSession: SparkSession, queryExecution: QueryExecution,

190

kafkaParameters: ju.Map[String, Object], topic: Option[String]): Unit

191

}

192

```

193

194

[Data Writing](./data-writing.md)

195

196

### Configuration and Options

197

198

Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, and performance settings.

199

200

**Source Options**:

201

```scala { .api }

202

// Connection

203

"kafka.bootstrap.servers" -> "localhost:9092"

204

"subscribe" -> "topic1,topic2"

205

"subscribePattern" -> "topic.*"

206

"assign" -> """{"topic1":[0,1],"topic2":[0]}"""

207

208

// Offset Management

209

"startingOffsets" -> "earliest" // or "latest" or JSON

210

"endingOffsets" -> "latest" // or JSON (batch only)

211

"failOnDataLoss" -> "true"

212

213

// Performance

214

"minPartitions" -> "10"

215

"maxOffsetsPerTrigger" -> "1000000"

216

```

217

218

**Sink Options**:

219

```scala { .api }

220

"kafka.bootstrap.servers" -> "localhost:9092"

221

"topic" -> "output-topic"

222

```

223

224

[Configuration](./configuration.md)

225

226

## Types

227

228

```scala { .api }

229

// Package-level type alias

230

type PartitionOffsetMap = Map[TopicPartition, Long]

231

232

// Data Consumer Types

233

case class AvailableOffsetRange(earliest: Long, latest: Long)

234

235

sealed trait KafkaDataConsumer {

236

def get(offset: Long, untilOffset: Long, pollTimeoutMs: Long,

237

failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]]

238

def getAvailableOffsetRange(): AvailableOffsetRange

239

def release(): Unit

240

}

241

242

// Offset Range Types

243

case class KafkaOffsetRange(

244

topicPartition: TopicPartition,

245

fromOffset: Long,

246

untilOffset: Long,

247

preferredLoc: Option[String]

248

) {

249

lazy val size: Long = untilOffset - fromOffset

250

}

251

252

// RDD Types

253

case class KafkaSourceRDDOffsetRange(

254

topicPartition: TopicPartition,

255

fromOffset: Long,

256

untilOffset: Long,

257

preferredLoc: Option[String]

258

) {

259

def topic: String = topicPartition.topic

260

def partition: Int = topicPartition.partition

261

def size: Long = untilOffset - fromOffset

262

}

263

```