or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

index.mddocs/

0

# Apache Spark Kafka SQL Connector

1

2

Apache Spark Kafka SQL Connector provides seamless integration between Apache Kafka message queues and Apache Spark's Structured Streaming framework. It enables both reading from and writing to Kafka topics with exactly-once processing semantics, fault tolerance, and automatic offset management for building real-time data pipelines.

3

4

## Package Information

5

6

- **Package Name**: spark-sql-kafka-0-10_2.13

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-sql-kafka-0-10_2.13

11

- **Version**: 3.5.6

12

- **Installation**: Add to Maven dependencies or include when submitting Spark applications

13

14

## Core Imports

15

16

```scala

17

import org.apache.spark.sql.DataFrame

18

import org.apache.spark.sql.streaming.StreamingQuery

19

import org.apache.spark.sql.functions._

20

21

// The connector is registered automatically as "kafka" data source

22

// No direct imports of connector classes are needed

23

```

24

25

**For advanced usage with types:**

26

27

```scala

28

import org.apache.kafka.common.TopicPartition

29

import org.apache.spark.sql.kafka010.PartitionOffsetMap

30

```

31

32

## Basic Usage

33

34

### Reading from Kafka (Streaming)

35

36

```scala

37

import org.apache.spark.sql.SparkSession

38

39

val spark = SparkSession

40

.builder()

41

.appName("KafkaStreaming")

42

.getOrCreate()

43

44

// Read from Kafka using structured streaming

45

val kafkaStream = spark

46

.readStream

47

.format("kafka")

48

.option("kafka.bootstrap.servers", "localhost:9092")

49

.option("subscribe", "my-topic")

50

.option("startingOffsets", "latest")

51

.load()

52

53

// Process the stream

54

val processedStream = kafkaStream

55

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")

56

.writeStream

57

.outputMode("append")

58

.format("console")

59

.start()

60

```

61

62

### Reading from Kafka (Batch)

63

64

```scala

65

val kafkaBatch = spark

66

.read

67

.format("kafka")

68

.option("kafka.bootstrap.servers", "localhost:9092")

69

.option("subscribe", "my-topic")

70

.option("startingOffsets", "earliest")

71

.option("endingOffsets", "latest")

72

.load()

73

```

74

75

### Writing to Kafka

76

77

```scala

78

val dataFrame = spark.createDataFrame(Seq(

79

("key1", "value1"),

80

("key2", "value2")

81

)).toDF("key", "value")

82

83

dataFrame

84

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

85

.write

86

.format("kafka")

87

.option("kafka.bootstrap.servers", "localhost:9092")

88

.option("topic", "output-topic")

89

.save()

90

```

91

92

## Architecture

93

94

The Spark Kafka SQL Connector is built around several key components:

95

96

- **KafkaSourceProvider**: Main entry point implementing multiple Spark SQL interfaces for registration and instantiation

97

- **Consumer Strategies**: Flexible subscription patterns (subscribe, subscribePattern, assign) for different use cases

98

- **Offset Management**: Comprehensive offset tracking with support for earliest, latest, specific offsets, and timestamp-based positioning

99

- **Schema Conversion**: Automatic conversion between Kafka records and Spark rows with optional header support

100

- **Streaming Sources**: Both micro-batch and continuous streaming implementations with trigger support

101

- **Batch Sources**: Efficient batch reading with offset range optimization

102

- **Write Support**: Both batch and streaming write capabilities with producer pooling and configuration management

103

104

## Capabilities

105

106

### Data Source Registration

107

108

Core data source functionality for registering Kafka as a Spark SQL data source with "kafka" identifier.

109

110

```scala { .api }

111

// Automatically registered - no direct usage

112

class KafkaSourceProvider extends DataSourceRegister

113

with StreamSourceProvider

114

with StreamSinkProvider

115

with RelationProvider

116

with CreatableRelationProvider

117

with SimpleTableProvider

118

```

119

120

[Data Source Registration](./data-source.md)

121

122

### Consumer Strategy Configuration

123

124

Flexible subscription patterns for connecting to Kafka topics including direct assignment, topic subscription, and pattern-based subscription.

125

126

```scala { .api }

127

// Consumer strategies are configured via options:

128

// .option("subscribe", "topic1,topic2,topic3")

129

// .option("subscribePattern", "prefix-.*")

130

// .option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")

131

132

sealed trait ConsumerStrategy {

133

def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]

134

def createAdmin(kafkaParams: ju.Map[String, Object]): Admin

135

def assignedTopicPartitions(admin: Admin): Set[TopicPartition]

136

}

137

```

138

139

[Consumer Strategies](./consumer-strategies.md)

140

141

### Offset Management

142

143

Comprehensive offset positioning and range management supporting earliest, latest, specific offsets, and timestamp-based positioning.

144

145

```scala { .api }

146

// Offset range limits for controlling read boundaries

147

sealed trait KafkaOffsetRangeLimit

148

149

case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

150

case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

151

case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

152

case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long], strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit

153

case class GlobalTimestampRangeLimit(timestamp: Long, strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit

154

```

155

156

[Offset Management](./offset-management.md)

157

158

### Schema Conversion

159

160

Schema definition and conversion between Kafka ConsumerRecord format and Spark DataFrame rows with optional header support.

161

162

```scala { .api }

163

// Schema with headers disabled (default)

164

val schemaWithoutHeaders = StructType(Array(

165

StructField("key", BinaryType),

166

StructField("value", BinaryType),

167

StructField("topic", StringType),

168

StructField("partition", IntegerType),

169

StructField("offset", LongType),

170

StructField("timestamp", TimestampType),

171

StructField("timestampType", IntegerType)

172

))

173

174

// Schema with headers enabled (.option("includeHeaders", "true"))

175

val schemaWithHeaders = schemaWithoutHeaders.add(

176

StructField("headers", ArrayType(StructType(Array(

177

StructField("key", StringType),

178

StructField("value", BinaryType)

179

))))

180

)

181

182

def kafkaSchema(includeHeaders: Boolean): StructType

183

```

184

185

[Schema Conversion](./schema-conversion.md)

186

187

### Streaming Sources

188

189

Micro-batch and continuous streaming implementations with comprehensive trigger support and metrics.

190

191

```scala { .api }

192

// Micro-batch streaming

193

class KafkaMicroBatchStream extends MicroBatchStream

194

with SupportsTriggerAvailableNow

195

with ReportsSourceMetrics {

196

197

def initialOffset(): Offset

198

def latestOffset(): Offset

199

def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset

200

def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]

201

def createReaderFactory(): PartitionReaderFactory

202

def commit(end: Offset): Unit

203

def stop(): Unit

204

}

205

206

// Continuous streaming

207

class KafkaContinuousStream extends ContinuousStream {

208

def mergeOffsets(offsets: Array[PartitionOffset]): Offset

209

def initialOffset(): Offset

210

def deserializeOffset(json: String): Offset

211

def commit(end: Offset): Unit

212

def stop(): Unit

213

}

214

```

215

216

[Streaming Sources](./streaming-sources.md)

217

218

### Batch Reading

219

220

Efficient batch reading with offset range calculation and partition optimization.

221

222

```scala { .api }

223

class KafkaBatch extends Batch {

224

def planInputPartitions(): Array[InputPartition]

225

def createReaderFactory(): PartitionReaderFactory

226

}

227

228

class KafkaBatchPartitionReader extends PartitionReader[InternalRow] {

229

def next(): Boolean

230

def get(): UnsafeRow

231

def close(): Unit

232

def currentMetricsValues(): Array[CustomTaskMetric]

233

}

234

```

235

236

[Batch Reading](./batch-reading.md)

237

238

### Writing to Kafka

239

240

Both batch and streaming write support with producer pooling, topic routing, and data validation.

241

242

```scala { .api }

243

// Core writer functionality

244

object KafkaWriter {

245

val TOPIC_ATTRIBUTE_NAME: String = "topic"

246

val KEY_ATTRIBUTE_NAME: String = "key"

247

val VALUE_ATTRIBUTE_NAME: String = "value"

248

val HEADERS_ATTRIBUTE_NAME: String = "headers"

249

val PARTITION_ATTRIBUTE_NAME: String = "partition"

250

251

def write(sparkSession: SparkSession, queryExecution: QueryExecution,

252

kafkaParams: ju.Map[String, Object], topic: Option[String]): Unit

253

}

254

255

// V2 DataSource write implementation

256

case class KafkaWrite(topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends Write {

257

def description(): String

258

def toBatch: BatchWrite

259

def toStreaming: StreamingWrite

260

}

261

```

262

263

[Writing to Kafka](./writing.md)

264

265

## Configuration Options

266

267

### Required Options

268

269

- `kafka.bootstrap.servers`: Kafka bootstrap servers (required)

270

- One of: `subscribe`, `subscribePattern`, or `assign` (required)

271

272

### Common Options

273

274

- `startingOffsets`: Where to start reading ("earliest", "latest", or JSON offset specification)

275

- `endingOffsets`: Where to stop reading for batch queries ("latest" or JSON offset specification)

276

- `failOnDataLoss`: Whether to fail query when data loss is detected (default: "true")

277

- `includeHeaders`: Include Kafka headers in DataFrame schema (default: "false")

278

- `maxOffsetsPerTrigger`: Maximum number of offsets to process per trigger

279

- `minOffsetsPerTrigger`: Minimum number of offsets to process per trigger

280

281

### Advanced Options

282

283

- `minPartitions`: Minimum number of partitions for processing

284

- `kafkaConsumer.pollTimeoutMs`: Consumer poll timeout in milliseconds

285

- `fetchOffset.numRetries`: Number of retries for offset fetching

286

- `fetchOffset.retryIntervalMs`: Retry interval for offset fetching

287

- `groupIdPrefix`: Prefix for consumer group IDs

288

289

## Error Handling

290

291

The connector provides structured exception handling for common Kafka integration scenarios:

292

293

- **Data Loss Detection**: Automatic detection of missing data due to Kafka retention or topic deletion

294

- **Offset Out of Range**: Handling of invalid offset requests

295

- **Connection Failures**: Retry logic for transient network issues

296

- **Configuration Validation**: Comprehensive validation of all configuration options

297

298

### Specific Exceptions

299

300

```scala { .api }

301

object KafkaExceptions {

302

def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(

303

tpsForPrefetched: Set[TopicPartition],

304

tpsForEndOffset: Set[TopicPartition]): SparkException

305

306

def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(

307

prefetchedOffset: Map[TopicPartition, Long],

308

endOffset: Map[TopicPartition, Long]): SparkException

309

310

def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(

311

tpsForLatestOffset: Set[TopicPartition],

312

tpsForEndOffset: Set[TopicPartition]): SparkException

313

314

def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(

315

latestOffset: Map[TopicPartition, Long],

316

endOffset: Map[TopicPartition, Long]): SparkException

317

}

318

319

## Custom Metrics

320

321

The connector exposes custom metrics for monitoring:

322

323

- `offsetOutOfRange`: Number of offsets that were out of range

324

- `dataLoss`: Number of data loss events detected

325

326

These metrics integrate with Spark's metrics system and can be monitored through Spark UI and external monitoring systems.