Fixed schema for Kafka records with support for headers and comprehensive type definitions for data processing.
The Spark SQL Kafka connector uses a fixed schema for all Kafka records, providing access to both message content and metadata.
/**
* Standard Kafka record schema structure (without headers)
* Note: Schema is fixed and cannot be customized
*/
val schemaWithoutHeaders = StructType(Array(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
/**
* Kafka schema with headers field included
* Note: Headers field is only present when includeHeaders=true
*/
val headersType = ArrayType(StructType(Array(
StructField("key", StringType),
StructField("value", BinaryType)
)))
val schemaWithHeaders = StructType(
schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)
/**
* Get appropriate schema based on headers configuration
* @param includeHeaders - Whether to include headers field
* @return StructType representing Kafka record schema
*/
def kafkaSchema(includeHeaders: Boolean): StructType = {
if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}/**
* Message key - typically used for partitioning and keyed operations
* Type: BinaryType (nullable)
* Note: Keys are deserialized as byte arrays using ByteArrayDeserializer
*/
val key: Array[Byte]
/**
* Message value - the actual message payload
* Type: BinaryType (non-nullable)
* Note: Values are deserialized as byte arrays using ByteArrayDeserializer
*/
val value: Array[Byte]/**
* Topic name where the message was published
* Type: StringType (non-nullable)
*/
val topic: String
/**
* Partition number within the topic
* Type: IntegerType (non-nullable)
*/
val partition: Int
/**
* Message offset within the partition
* Type: LongType (non-nullable)
* Note: Offsets are unique within a partition and increase monotonically
*/
val offset: Long
/**
* Message timestamp
* Type: TimestampType (non-nullable)
* Note: Represents either creation time or log append time depending on timestampType
*/
val timestamp: java.sql.Timestamp
/**
* Timestamp type indicator
* Type: IntegerType (non-nullable)
* Values: 0 = CreateTime (set by producer), 1 = LogAppendTime (set by broker)
*/
val timestampType: IntHeaders can be included in the Kafka record schema by setting the includeHeaders option.
/**
* Enable headers field in Kafka schema
* @param include - "true" to include headers, "false" to exclude (default: "false")
*/
.option("includeHeaders", "true")
/**
* Message headers as array of key-value pairs
* Type: ArrayType[StructType] (nullable)
* Note: Only included when includeHeaders option is true
*/
val headers: Array[Row] // Each Row contains (key: String, value: Array[Byte])
/**
* Header structure definition
*/
val headerType = StructType(Array(
StructField("key", StringType), // Header key
StructField("value", BinaryType) // Header value as binary
))
/**
* Complete headers array type
*/
val headersType = ArrayType(headerType)Utilities for working with the Kafka schema and converting data.
/**
* Schema converter utilities
*/
object KafkaRecordToRowConverter {
/**
* Get the standard Kafka schema
* @param includeHeaders - Whether to include headers field in schema
* @return StructType representing Kafka record schema
*/
def kafkaSchema(includeHeaders: Boolean): StructType
/**
* Get the headers array type definition
* @return ArrayType for Kafka headers
*/
def headersType: ArrayType
}import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Read from Kafka with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("includeHeaders", "true")
.load()
// Convert binary fields to strings
val messagesDF = df
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
expr("CAST(key AS STRING)").as("key_str"),
expr("CAST(value AS STRING)").as("value_str"),
col("headers")
)
// Extract JSON from value field
val jsonDF = df
.select(
col("topic"),
col("offset"),
from_json(expr("CAST(value AS STRING)"), jsonSchema).as("data")
)
.select("topic", "offset", "data.*")
// Work with headers
val withHeadersDF = df
.select(
col("topic"),
expr("CAST(value AS STRING)").as("message"),
col("headers")
)
.select(
col("topic"),
col("message"),
expr("transform(headers, h -> struct(h.key, CAST(h.value AS STRING)))").as("headers_str")
)When writing to Kafka, the DataFrame must contain specific columns:
/**
* Required and optional columns for writing to Kafka
*/
// Required: value column
val valueColumn = col("message").cast("binary") // or expr("CAST(message AS BINARY)")
// Optional: key column for partitioning
val keyColumn = col("user_id").cast("binary")
// Optional: topic column for per-record topic routing
val topicColumn = lit("dynamic-topic")
// Optional: partition column for explicit partition assignment
val partitionColumn = col("computed_partition").cast("integer")
// Optional: headers column
val headersColumn = array(
struct(lit("source").as("key"), lit("spark").cast("binary").as("value")),
struct(lit("version").as("key"), lit("1.0").cast("binary").as("value"))
)Writing Examples:
// Simple write with value only
df.select(expr("CAST(message AS BINARY)").as("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()
// Write with key and headers
df.select(
expr("CAST(user_id AS BINARY)").as("key"),
expr("CAST(to_json(struct(*)) AS BINARY)").as("value"),
array(struct(lit("source").as("key"), lit("analytics").cast("binary").as("value"))).as("headers")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "enriched-events")
.save()
// Dynamic topic routing
df.select(
col("target_topic").as("topic"),
expr("CAST(payload AS BINARY)").as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save() // No default topic needed when topic column is providedThe connector performs automatic schema validation for write operations.
/**
* Column name constants for writing
*/
object KafkaWriter {
val TOPIC_ATTRIBUTE_NAME = "topic"
val KEY_ATTRIBUTE_NAME = "key"
val VALUE_ATTRIBUTE_NAME = "value"
val HEADERS_ATTRIBUTE_NAME = "headers"
val PARTITION_ATTRIBUTE_NAME = "partition"
/**
* Validate DataFrame schema for Kafka writing
* Checks for required value column and proper types
*/
def validateQuery(/* parameters */): Unit
}import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Standard Kafka schema components
val BinaryType: DataType // For key and value fields
val StringType: DataType // For topic and header keys
val IntegerType: DataType // For partition and timestampType
val LongType: DataType // For offset
val TimestampType: DataType // For timestamp
val ArrayType: DataType // For headers array
// Header structure
case class KafkaHeader(key: String, value: Array[Byte])
// Complete record structure (conceptual - actual access via DataFrame columns)
case class KafkaRecord(
key: Option[Array[Byte]],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long,
timestamp: java.sql.Timestamp,
timestampType: Int,
headers: Option[Array[KafkaHeader]]
)