or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

data-source.mddocs/

0

# Data Source Registration

1

2

The Kafka connector registers automatically with Spark SQL as the "kafka" data source, implementing multiple interfaces to provide comprehensive streaming and batch capabilities.

3

4

## Capabilities

5

6

### KafkaSourceProvider

7

8

Main entry point class that implements all necessary Spark SQL interfaces for complete Kafka integration.

9

10

```scala { .api }

11

/**

12

* Primary provider class for all Kafka readers and writers

13

* Automatically registers with Spark SQL using the name "kafka"

14

*/

15

class KafkaSourceProvider extends DataSourceRegister

16

with StreamSourceProvider

17

with StreamSinkProvider

18

with RelationProvider

19

with CreatableRelationProvider

20

with SimpleTableProvider {

21

22

/** Returns the short name used to identify this data source */

23

def shortName(): String = "kafka"

24

25

/** Creates streaming source for reading Kafka data */

26

def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType],

27

providerName: String, parameters: Map[String, String]): Source

28

29

/** Creates streaming sink for writing to Kafka */

30

def createSink(sqlContext: SQLContext, parameters: Map[String, String],

31

partitionColumns: Seq[String], outputMode: OutputMode): Sink

32

33

/** Creates batch relation for reading Kafka data */

34

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

35

36

/** Creates relation for writing DataFrame to Kafka */

37

def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String],

38

data: DataFrame): BaseRelation

39

40

/** Returns V2 DataSource table implementation */

41

def getTable(options: CaseInsensitiveStringMap): KafkaTable

42

}

43

```

44

45

**Usage Examples:**

46

47

The provider is used automatically when specifying "kafka" as the format:

48

49

```scala

50

// Streaming read

51

val stream = spark

52

.readStream

53

.format("kafka") // Uses KafkaSourceProvider automatically

54

.option("kafka.bootstrap.servers", "localhost:9092")

55

.option("subscribe", "my-topic")

56

.load()

57

58

// Batch read

59

val batch = spark

60

.read

61

.format("kafka") // Uses KafkaSourceProvider automatically

62

.option("kafka.bootstrap.servers", "localhost:9092")

63

.option("subscribe", "my-topic")

64

.load()

65

66

// Write

67

dataFrame

68

.write

69

.format("kafka") // Uses KafkaSourceProvider automatically

70

.option("kafka.bootstrap.servers", "localhost:9092")

71

.option("topic", "output-topic")

72

.save()

73

```

74

75

### KafkaTable

76

77

V2 DataSource API table implementation providing modern Spark SQL integration.

78

79

```scala { .api }

80

/**

81

* V2 DataSource table implementation for Kafka

82

* Supports both reading and writing with comprehensive capabilities

83

*/

84

class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {

85

86

/** Returns the table name for identification */

87

def name(): String = "KafkaTable"

88

89

/** Returns the schema for Kafka records */

90

def schema(): StructType

91

92

/** Returns supported table capabilities */

93

def capabilities(): ju.Set[TableCapability]

94

95

/** Creates scan builder for reading operations */

96

def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder

97

98

/** Creates write builder for writing operations */

99

def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder

100

}

101

```

102

103

**Supported Capabilities:**

104

105

```scala { .api }

106

// Table capabilities supported by KafkaTable

107

import org.apache.spark.sql.connector.catalog.TableCapability._

108

109

val supportedCapabilities: ju.Set[TableCapability] = ju.EnumSet.of(

110

BATCH_READ, // Batch reading support

111

BATCH_WRITE, // Batch writing support

112

MICRO_BATCH_READ, // Micro-batch streaming reads

113

CONTINUOUS_READ, // Continuous streaming reads

114

STREAMING_WRITE, // Streaming writes

115

ACCEPT_ANY_SCHEMA // Flexible schema handling

116

)

117

```

118

119

### KafkaScan

120

121

Scan implementation for reading Kafka data in both batch and streaming modes.

122

123

```scala { .api }

124

/**

125

* Scan implementation for reading Kafka data

126

* Handles conversion between different read modes

127

*/

128

class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {

129

130

/** Returns the read schema for Kafka records */

131

def readSchema(): StructType

132

133

/** Converts scan to batch reading mode */

134

def toBatch(): Batch

135

136

/** Converts scan to micro-batch streaming mode */

137

def toMicroBatchStream(checkpointLocation: String): MicroBatchStream

138

139

/** Converts scan to continuous streaming mode */

140

def toContinuousStream(checkpointLocation: String): ContinuousStream

141

142

/** Returns supported custom metrics */

143

def supportedCustomMetrics(): Array[CustomMetric]

144

}

145

```

146

147

## Configuration Validation

148

149

The data source provider performs comprehensive validation of configuration options:

150

151

### Required Options Validation

152

153

```scala

154

// One of these subscription strategies must be specified:

155

// - "subscribe": "topic1,topic2,topic3"

156

// - "subscribePattern": "prefix-.*"

157

// - "assign": """{"topic1":[0,1],"topic2":[0,1]}"""

158

159

// Bootstrap servers must be specified:

160

// - "kafka.bootstrap.servers": "localhost:9092"

161

```

162

163

### Unsupported Kafka Options

164

165

The following Kafka consumer/producer options are not supported and will cause exceptions:

166

167

```scala

168

// Unsupported consumer options

169

"kafka.auto.offset.reset" // Use startingOffsets instead

170

"kafka.key.deserializer" // Fixed to ByteArrayDeserializer

171

"kafka.value.deserializer" // Fixed to ByteArrayDeserializer

172

"kafka.enable.auto.commit" // Managed internally

173

"kafka.interceptor.classes" // Not safe for Spark usage

174

175

// Unsupported producer options

176

"kafka.key.serializer" // Fixed to ByteArraySerializer

177

"kafka.value.serializer" // Fixed to ByteArraySerializer

178

```

179

180

### Stream vs Batch Option Validation

181

182

```scala

183

// Stream-only options (not valid for batch queries)

184

"endingOffsets" // Only for batch

185

"endingOffsetsByTimestamp" // Only for batch

186

187

// Batch-specific restrictions

188

// startingOffsets cannot be "latest" for batch queries

189

// endingOffsets cannot be "earliest" for batch queries

190

```

191

192

## Error Messages

193

194

The provider includes helpful error messages for common configuration issues:

195

196

```scala

197

// Missing subscription strategy

198

"One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"

199

200

// Custom group ID warning

201

"Kafka option 'kafka.group.id' has been set on this query, it is not recommended to set this option"

202

203

// Invalid offset configuration

204

"starting offset can't be latest for batch queries on Kafka"

205

"ending offset can't be earliest for batch queries on Kafka"

206

```