or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12

Kafka 0.10+ Source for Structured Streaming providing Kafka integration for Apache Spark's streaming and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12@3.5.0

0

# Apache Spark Kafka Connector

1

2

The Apache Spark Kafka Connector (spark-sql-kafka-0-10_2.12) provides seamless integration between Apache Kafka and Apache Spark's Structured Streaming and SQL APIs. It enables both batch and streaming data processing from Kafka topics with exactly-once processing semantics, offset management, and fault tolerance.

3

4

## Package Information

5

6

- **Package Name**: spark-sql-kafka-0-10_2.12

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: `spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6`

10

- **Dependencies**: Requires Apache Spark 3.5.6 and Kafka client libraries

11

12

## Core Usage Pattern

13

14

The connector is accessed through Spark SQL's DataSource API using the "kafka" format identifier:

15

16

```scala

17

// Reading from Kafka (streaming)

18

val df = spark.readStream

19

.format("kafka")

20

.option("kafka.bootstrap.servers", "localhost:9092")

21

.option("subscribe", "topic1,topic2")

22

.load()

23

24

// Writing to Kafka (streaming)

25

df.writeStream

26

.format("kafka")

27

.option("kafka.bootstrap.servers", "localhost:9092")

28

.option("topic", "output-topic")

29

.outputMode("append")

30

.start()

31

```

32

33

## Basic Usage

34

35

```scala

36

import org.apache.spark.sql.SparkSession

37

import org.apache.spark.sql.functions._

38

39

val spark = SparkSession.builder()

40

.appName("KafkaExample")

41

.getOrCreate()

42

43

// Stream from Kafka

44

val kafkaDF = spark.readStream

45

.format("kafka")

46

.option("kafka.bootstrap.servers", "localhost:9092")

47

.option("subscribe", "input-topic")

48

.option("startingOffsets", "earliest")

49

.load()

50

51

// Extract and process the value

52

val processedDF = kafkaDF

53

.select(col("value").cast("string").as("message"))

54

.filter(col("message").isNotNull)

55

56

// Write back to Kafka

57

val query = processedDF

58

.select(to_json(struct("*")).as("value"))

59

.writeStream

60

.format("kafka")

61

.option("kafka.bootstrap.servers", "localhost:9092")

62

.option("topic", "output-topic")

63

.outputMode("append")

64

.start()

65

```

66

67

## Architecture

68

69

The Spark Kafka connector is built around several key components:

70

71

- **KafkaSourceProvider**: Main entry point that registers the "kafka" format with Spark SQL

72

- **Fixed Schema**: Standardized schema for Kafka records (key, value, topic, partition, offset, timestamp, etc.)

73

- **Offset Management**: Comprehensive offset tracking and recovery for exactly-once processing

74

- **Consumer Strategies**: Flexible topic subscription patterns (subscribe, subscribePattern, assign)

75

- **Producer Integration**: Seamless writing back to Kafka topics with proper serialization

76

77

## Capabilities

78

79

### Streaming Data Reading

80

81

Read data from Kafka topics in real-time using Spark Structured Streaming with micro-batch or continuous processing modes.

82

83

```scala { .api }

84

// Streaming read operation

85

spark.readStream

86

.format("kafka")

87

.option("kafka.bootstrap.servers", servers: String)

88

.option("subscribe", topics: String) // or subscribepattern or assign

89

.option("startingOffsets", offsets: String) // "earliest", "latest", or JSON

90

.load(): DataFrame

91

```

92

93

[Streaming Operations](./streaming.md)

94

95

### Batch Data Reading

96

97

Read historical data from Kafka topics for batch processing and analysis.

98

99

```scala { .api }

100

// Batch read operation

101

spark.read

102

.format("kafka")

103

.option("kafka.bootstrap.servers", servers: String)

104

.option("subscribe", topics: String) // or subscribepattern or assign

105

.option("startingOffsets", startOffsets: String)

106

.option("endingOffsets", endOffsets: String)

107

.load(): DataFrame

108

```

109

110

[Batch Operations](./batch.md)

111

112

### Data Writing

113

114

Write DataFrame data to Kafka topics with proper serialization and partitioning.

115

116

```scala { .api }

117

// Streaming write operation

118

df.writeStream

119

.format("kafka")

120

.option("kafka.bootstrap.servers", servers: String)

121

.option("topic", topicName: String) // optional if specified in data

122

.outputMode("append")

123

.start(): StreamingQuery

124

125

// Batch write operation

126

df.write

127

.format("kafka")

128

.option("kafka.bootstrap.servers", servers: String)

129

.option("topic", topicName: String)

130

.save()

131

```

132

133

[Write Operations](./writing.md)

134

135

### Configuration Management

136

137

Comprehensive configuration options for connection, performance tuning, and reliability.

138

139

```scala { .api }

140

// Core configuration options

141

.option("kafka.bootstrap.servers", servers: String) // Required

142

.option("subscribe", topics: String) // Topic selection

143

.option("maxOffsetsPerTrigger", maxRecords: String) // Performance tuning

144

.option("failOnDataLoss", failOnLoss: String) // Reliability

145

```

146

147

[Configuration Options](./configuration.md)

148

149

## Data Schema

150

151

### Read Schema (Fixed)

152

153

All Kafka DataFrames have the following fixed schema:

154

155

```scala { .api }

156

// Fixed Kafka record schema

157

case class KafkaRecord(

158

key: Array[Byte], // Message key as byte array (nullable)

159

value: Array[Byte], // Message value as byte array

160

topic: String, // Topic name

161

partition: Int, // Partition number

162

offset: Long, // Message offset within partition

163

timestamp: java.sql.Timestamp, // Message timestamp

164

timestampType: Int, // 0=CreateTime, 1=LogAppendTime

165

headers: Array[KafkaHeader] // Optional headers (when includeHeaders=true)

166

)

167

168

case class KafkaHeader(

169

key: String,

170

value: Array[Byte]

171

)

172

```

173

174

### Write Schema (Flexible)

175

176

For writing, DataFrames can contain any combination of these fields:

177

178

```scala { .api }

179

// Write schema fields (all optional except value)

180

case class KafkaWriteRecord(

181

topic: String, // Target topic (optional if set in options)

182

key: Any, // Message key (will be serialized)

183

value: Any, // Message value (required, will be serialized)

184

partition: Int, // Specific partition (optional)

185

headers: Map[String, Array[Byte]] // Message headers (optional)

186

)

187

```

188

189

## Topic Selection Strategies

190

191

```scala { .api }

192

// Subscribe to specific topics by name

193

.option("subscribe", "topic1,topic2,topic3")

194

195

// Subscribe to topics matching a regex pattern

196

.option("subscribepattern", "events-.*")

197

198

// Assign specific partitions

199

.option("assign", """{"topic1":[0,1],"topic2":[0]}""")

200

```

201

202

## Offset Management Types

203

204

```scala { .api }

205

// Offset specification options

206

"earliest" // Start from earliest available offsets

207

"latest" // Start from latest available offsets

208

209

// Specific offsets per partition (JSON format)

210

"""{"topic1":{"0":23,"1":345},"topic2":{"0":0}}"""

211

212

// Global timestamp (milliseconds since epoch)

213

.option("startingTimestamp", "1609459200000")

214

215

// Per-partition timestamps (JSON format)

216

"""{"topic1":{"0":1609459200000,"1":1609459300000}}"""

217

```

218

219

## Error Handling

220

221

The connector provides robust error handling for common scenarios:

222

223

- **Data Loss Detection**: Configurable behavior when data is no longer available

224

- **Offset Validation**: Automatic validation of offset ranges and availability

225

- **Connection Failures**: Retry logic and graceful degradation

226

- **Schema Validation**: Input validation for write operations

227

- **Configuration Errors**: Clear error messages for invalid options

228

229

## Performance Considerations

230

231

- **Consumer Pooling**: Efficient reuse of Kafka consumers across tasks

232

- **Producer Caching**: Connection pooling for Kafka producers

233

- **Batch Size Control**: Configurable limits on records per micro-batch

234

- **Parallel Processing**: Automatic parallelization based on Kafka partitions

235

- **Memory Management**: Optimized handling of large message batches