or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

schema-conversion.mddocs/

0

# Schema Conversion

1

2

The Kafka connector automatically converts Kafka ConsumerRecord objects into Spark DataFrame rows, providing a structured view of Kafka messages with optional header support.

3

4

## Capabilities

5

6

### Kafka Record Schema

7

8

The connector provides a fixed schema for Kafka records that cannot be customized. The schema varies based on whether headers are included.

9

10

### Default Schema (Headers Disabled)

11

12

When `includeHeaders` is false or not specified:

13

14

```scala { .api }

15

val kafkaSchemaWithoutHeaders = StructType(Array(

16

StructField("key", BinaryType, nullable = true),

17

StructField("value", BinaryType, nullable = true),

18

StructField("topic", StringType, nullable = false),

19

StructField("partition", IntegerType, nullable = false),

20

StructField("offset", LongType, nullable = false),

21

StructField("timestamp", TimestampType, nullable = true),

22

StructField("timestampType", IntegerType, nullable = true)

23

))

24

```

25

26

**Field Descriptions:**

27

28

- `key`: Message key as binary data (null if no key)

29

- `value`: Message value as binary data (null if no value)

30

- `topic`: Topic name where message was published

31

- `partition`: Partition number within the topic

32

- `offset`: Offset of the message within the partition

33

- `timestamp`: Message timestamp (producer or broker timestamp)

34

- `timestampType`: Type of timestamp (0 = CreateTime, 1 = LogAppendTime)

35

36

**Usage Examples:**

37

38

```scala

39

val kafkaStream = spark

40

.readStream

41

.format("kafka")

42

.option("kafka.bootstrap.servers", "localhost:9092")

43

.option("subscribe", "my-topic")

44

.load()

45

46

// Access fields directly

47

val processedStream = kafkaStream

48

.select(

49

col("topic"),

50

col("partition"),

51

col("offset"),

52

col("timestamp"),

53

expr("CAST(key AS STRING)").as("key_str"),

54

expr("CAST(value AS STRING)").as("value_str")

55

)

56

```

57

58

### Extended Schema (Headers Enabled)

59

60

When `includeHeaders` is set to "true":

61

62

```scala { .api }

63

val kafkaSchemaWithHeaders = StructType(Array(

64

StructField("key", BinaryType, nullable = true),

65

StructField("value", BinaryType, nullable = true),

66

StructField("topic", StringType, nullable = false),

67

StructField("partition", IntegerType, nullable = false),

68

StructField("offset", LongType, nullable = false),

69

StructField("timestamp", TimestampType, nullable = true),

70

StructField("timestampType", IntegerType, nullable = true),

71

StructField("headers", ArrayType(StructType(Array(

72

StructField("key", StringType, nullable = false),

73

StructField("value", BinaryType, nullable = true)

74

))), nullable = true)

75

))

76

```

77

78

**Headers Field Structure:**

79

80

```scala { .api }

81

val headersType = ArrayType(StructType(Array(

82

StructField("key", StringType, nullable = false), // Header key as string

83

StructField("value", BinaryType, nullable = true) // Header value as binary

84

)))

85

```

86

87

**Configuration:**

88

89

```scala

90

val kafkaStreamWithHeaders = spark

91

.readStream

92

.format("kafka")

93

.option("kafka.bootstrap.servers", "localhost:9092")

94

.option("subscribe", "my-topic")

95

.option("includeHeaders", "true") // Enable headers

96

.load()

97

```

98

99

**Usage Examples:**

100

101

```scala

102

// Process messages with headers

103

val processedWithHeaders = kafkaStreamWithHeaders

104

.select(

105

col("topic"),

106

expr("CAST(key AS STRING)").as("key_str"),

107

expr("CAST(value AS STRING)").as("value_str"),

108

col("headers")

109

)

110

111

// Extract specific header values

112

val withExtractedHeaders = kafkaStreamWithHeaders

113

.select(

114

col("*"),

115

expr("filter(headers, x -> x.key = 'content-type')[0].value").as("content_type_header"),

116

expr("filter(headers, x -> x.key = 'correlation-id')[0].value").as("correlation_id_header")

117

)

118

```

119

120

### KafkaRecordToRowConverter

121

122

Internal converter class that handles the transformation from Kafka ConsumerRecord to Spark rows.

123

124

```scala { .api }

125

/**

126

* Converts Kafka ConsumerRecord to Spark InternalRow/UnsafeRow

127

* Handles both header-enabled and header-disabled modes

128

*/

129

class KafkaRecordToRowConverter(

130

includeHeaders: Boolean

131

) {

132

133

/** Convert to InternalRow without headers */

134

val toInternalRowWithoutHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow

135

136

/** Convert to InternalRow with headers */

137

val toInternalRowWithHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow

138

139

/** Convert to UnsafeRow without headers */

140

val toUnsafeRowWithoutHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow

141

142

/** Convert to UnsafeRow with headers */

143

val toUnsafeRowWithHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow

144

145

/** Generic UnsafeRow projector based on header setting */

146

def toUnsafeRowProjector(includeHeaders: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow

147

}

148

```

149

150

**Companion Object:**

151

152

```scala { .api }

153

object KafkaRecordToRowConverter {

154

/** Returns appropriate schema based on header inclusion */

155

def kafkaSchema(includeHeaders: Boolean): StructType

156

157

/** Headers array type definition */

158

val headersType: DataType = ArrayType(StructType(Array(

159

StructField("key", StringType),

160

StructField("value", BinaryType)

161

)))

162

}

163

```

164

165

## Data Type Conversions

166

167

### Binary Data Handling

168

169

Kafka keys and values are always treated as binary data in Spark:

170

171

```scala

172

// Keys and values come as BinaryType - cast to appropriate types

173

val stringData = kafkaStream

174

.select(

175

expr("CAST(key AS STRING)").as("key_string"),

176

expr("CAST(value AS STRING)").as("value_string")

177

)

178

179

// Parse JSON values

180

val jsonData = kafkaStream

181

.select(

182

expr("CAST(value AS STRING)").as("json_string")

183

)

184

.select(

185

from_json(col("json_string"), jsonSchema).as("parsed_data")

186

)

187

```

188

189

### Timestamp Handling

190

191

Kafka timestamps are converted to Spark TimestampType:

192

193

```scala

194

// Work with timestamps

195

val withTimestamps = kafkaStream

196

.select(

197

col("timestamp"),

198

col("timestampType"),

199

date_format(col("timestamp"), "yyyy-MM-dd HH:mm:ss").as("formatted_timestamp"),

200

hour(col("timestamp")).as("hour"),

201

date(col("timestamp")).as("date")

202

)

203

204

// Filter by time ranges

205

val recentMessages = kafkaStream

206

.filter(col("timestamp") > lit("2023-01-01 00:00:00").cast(TimestampType))

207

```

208

209

### Header Processing

210

211

Process Kafka headers as structured data:

212

213

```scala

214

// Work with headers (when includeHeaders = "true")

215

val withHeaderProcessing = kafkaStreamWithHeaders

216

.select(

217

col("*"),

218

size(col("headers")).as("header_count"),

219

220

// Extract all header keys

221

expr("transform(headers, x -> x.key)").as("header_keys"),

222

223

// Find specific header by key

224

expr("filter(headers, x -> x.key = 'content-type')").as("content_type_headers"),

225

226

// Extract header value as string

227

expr("CAST(filter(headers, x -> x.key = 'user-id')[0].value AS STRING)").as("user_id")

228

)

229

```

230

231

## Schema Validation

232

233

The connector enforces a fixed schema and will reject custom schemas:

234

235

```scala

236

// This will fail - custom schemas are not supported

237

spark

238

.readStream

239

.format("kafka")

240

.schema(customSchema) // Will throw exception

241

.option("kafka.bootstrap.servers", "localhost:9092")

242

.option("subscribe", "my-topic")

243

.load()

244

// Exception: "Kafka source has a fixed schema and cannot be set with a custom one"

245

```

246

247

## Performance Considerations

248

249

### Projection Optimization

250

251

The converter supports column projection for better performance:

252

253

```scala

254

// Only select needed columns to reduce processing overhead

255

val optimized = kafkaStream

256

.select("value", "timestamp", "topic") // Only process these columns

257

.filter(col("timestamp") > recentTimestamp)

258

```

259

260

### Binary vs String Conversion

261

262

```scala

263

// Efficient - work with binary data when possible

264

val binaryProcessing = kafkaStream

265

.select(col("value"))

266

.filter(length(col("value")) > 100)

267

268

// Less efficient - casting to string for every row

269

val stringProcessing = kafkaStream

270

.select(expr("CAST(value AS STRING)").as("value_str"))

271

.filter(length(col("value_str")) > 100)

272

```

273

274

### Header Processing Performance

275

276

```scala

277

// Efficient - only include headers when needed

278

val withoutHeaders = spark

279

.readStream

280

.format("kafka")

281

.option("includeHeaders", "false") // Default, more efficient

282

.load()

283

284

// Less efficient - headers require additional processing

285

val withHeaders = spark

286

.readStream

287

.format("kafka")

288

.option("includeHeaders", "true") // Only when headers are needed

289

.load()

290

```

291

292

## Common Patterns

293

294

### JSON Message Processing

295

296

```scala

297

import org.apache.spark.sql.types._

298

299

// Define JSON schema

300

val messageSchema = StructType(Array(

301

StructField("user_id", StringType),

302

StructField("event_type", StringType),

303

StructField("timestamp", LongType),

304

StructField("properties", MapType(StringType, StringType))

305

))

306

307

// Parse JSON messages

308

val parsedMessages = kafkaStream

309

.select(

310

col("topic"),

311

col("partition"),

312

col("offset"),

313

col("timestamp").as("kafka_timestamp"),

314

from_json(expr("CAST(value AS STRING)"), messageSchema).as("message")

315

)

316

.select(

317

col("topic"),

318

col("partition"),

319

col("offset"),

320

col("kafka_timestamp"),

321

col("message.*")

322

)

323

```

324

325

### Avro Message Processing

326

327

```scala

328

// For Avro messages, use external libraries like spark-avro

329

val avroMessages = kafkaStream

330

.select(

331

col("topic"),

332

from_avro(col("value"), avroSchemaString).as("avro_data")

333

)

334

.select(

335

col("topic"),

336

col("avro_data.*")

337

)

338

```

339

340

### Message Routing by Topic

341

342

```scala

343

// Process different topics differently based on schema

344

val processedMessages = kafkaStream

345

.withColumn("processed_value",

346

when(col("topic") === "user-events",

347

from_json(expr("CAST(value AS STRING)"), userEventSchema))

348

.when(col("topic") === "system-logs",

349

from_json(expr("CAST(value AS STRING)"), logSchema))

350

.otherwise(lit(null))

351

)

352

```