The Kafka connector automatically converts Kafka ConsumerRecord objects into Spark DataFrame rows, providing a structured view of Kafka messages with optional header support.
The connector provides a fixed schema for Kafka records that cannot be customized. The schema varies based on whether headers are included.
When includeHeaders is false or not specified:
val kafkaSchemaWithoutHeaders = StructType(Array(
StructField("key", BinaryType, nullable = true),
StructField("value", BinaryType, nullable = true),
StructField("topic", StringType, nullable = false),
StructField("partition", IntegerType, nullable = false),
StructField("offset", LongType, nullable = false),
StructField("timestamp", TimestampType, nullable = true),
StructField("timestampType", IntegerType, nullable = true)
))Field Descriptions:
key: Message key as binary data (null if no key)value: Message value as binary data (null if no value)topic: Topic name where message was publishedpartition: Partition number within the topicoffset: Offset of the message within the partitiontimestamp: Message timestamp (producer or broker timestamp)timestampType: Type of timestamp (0 = CreateTime, 1 = LogAppendTime)Usage Examples:
val kafkaStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
// Access fields directly
val processedStream = kafkaStream
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
expr("CAST(key AS STRING)").as("key_str"),
expr("CAST(value AS STRING)").as("value_str")
)When includeHeaders is set to "true":
val kafkaSchemaWithHeaders = StructType(Array(
StructField("key", BinaryType, nullable = true),
StructField("value", BinaryType, nullable = true),
StructField("topic", StringType, nullable = false),
StructField("partition", IntegerType, nullable = false),
StructField("offset", LongType, nullable = false),
StructField("timestamp", TimestampType, nullable = true),
StructField("timestampType", IntegerType, nullable = true),
StructField("headers", ArrayType(StructType(Array(
StructField("key", StringType, nullable = false),
StructField("value", BinaryType, nullable = true)
))), nullable = true)
))Headers Field Structure:
val headersType = ArrayType(StructType(Array(
StructField("key", StringType, nullable = false), // Header key as string
StructField("value", BinaryType, nullable = true) // Header value as binary
)))Configuration:
val kafkaStreamWithHeaders = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("includeHeaders", "true") // Enable headers
.load()Usage Examples:
// Process messages with headers
val processedWithHeaders = kafkaStreamWithHeaders
.select(
col("topic"),
expr("CAST(key AS STRING)").as("key_str"),
expr("CAST(value AS STRING)").as("value_str"),
col("headers")
)
// Extract specific header values
val withExtractedHeaders = kafkaStreamWithHeaders
.select(
col("*"),
expr("filter(headers, x -> x.key = 'content-type')[0].value").as("content_type_header"),
expr("filter(headers, x -> x.key = 'correlation-id')[0].value").as("correlation_id_header")
)Internal converter class that handles the transformation from Kafka ConsumerRecord to Spark rows.
/**
* Converts Kafka ConsumerRecord to Spark InternalRow/UnsafeRow
* Handles both header-enabled and header-disabled modes
*/
class KafkaRecordToRowConverter(
includeHeaders: Boolean
) {
/** Convert to InternalRow without headers */
val toInternalRowWithoutHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
/** Convert to InternalRow with headers */
val toInternalRowWithHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
/** Convert to UnsafeRow without headers */
val toUnsafeRowWithoutHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
/** Convert to UnsafeRow with headers */
val toUnsafeRowWithHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
/** Generic UnsafeRow projector based on header setting */
def toUnsafeRowProjector(includeHeaders: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
}Companion Object:
object KafkaRecordToRowConverter {
/** Returns appropriate schema based on header inclusion */
def kafkaSchema(includeHeaders: Boolean): StructType
/** Headers array type definition */
val headersType: DataType = ArrayType(StructType(Array(
StructField("key", StringType),
StructField("value", BinaryType)
)))
}Kafka keys and values are always treated as binary data in Spark:
// Keys and values come as BinaryType - cast to appropriate types
val stringData = kafkaStream
.select(
expr("CAST(key AS STRING)").as("key_string"),
expr("CAST(value AS STRING)").as("value_string")
)
// Parse JSON values
val jsonData = kafkaStream
.select(
expr("CAST(value AS STRING)").as("json_string")
)
.select(
from_json(col("json_string"), jsonSchema).as("parsed_data")
)Kafka timestamps are converted to Spark TimestampType:
// Work with timestamps
val withTimestamps = kafkaStream
.select(
col("timestamp"),
col("timestampType"),
date_format(col("timestamp"), "yyyy-MM-dd HH:mm:ss").as("formatted_timestamp"),
hour(col("timestamp")).as("hour"),
date(col("timestamp")).as("date")
)
// Filter by time ranges
val recentMessages = kafkaStream
.filter(col("timestamp") > lit("2023-01-01 00:00:00").cast(TimestampType))Process Kafka headers as structured data:
// Work with headers (when includeHeaders = "true")
val withHeaderProcessing = kafkaStreamWithHeaders
.select(
col("*"),
size(col("headers")).as("header_count"),
// Extract all header keys
expr("transform(headers, x -> x.key)").as("header_keys"),
// Find specific header by key
expr("filter(headers, x -> x.key = 'content-type')").as("content_type_headers"),
// Extract header value as string
expr("CAST(filter(headers, x -> x.key = 'user-id')[0].value AS STRING)").as("user_id")
)The connector enforces a fixed schema and will reject custom schemas:
// This will fail - custom schemas are not supported
spark
.readStream
.format("kafka")
.schema(customSchema) // Will throw exception
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
// Exception: "Kafka source has a fixed schema and cannot be set with a custom one"The converter supports column projection for better performance:
// Only select needed columns to reduce processing overhead
val optimized = kafkaStream
.select("value", "timestamp", "topic") // Only process these columns
.filter(col("timestamp") > recentTimestamp)// Efficient - work with binary data when possible
val binaryProcessing = kafkaStream
.select(col("value"))
.filter(length(col("value")) > 100)
// Less efficient - casting to string for every row
val stringProcessing = kafkaStream
.select(expr("CAST(value AS STRING)").as("value_str"))
.filter(length(col("value_str")) > 100)// Efficient - only include headers when needed
val withoutHeaders = spark
.readStream
.format("kafka")
.option("includeHeaders", "false") // Default, more efficient
.load()
// Less efficient - headers require additional processing
val withHeaders = spark
.readStream
.format("kafka")
.option("includeHeaders", "true") // Only when headers are needed
.load()import org.apache.spark.sql.types._
// Define JSON schema
val messageSchema = StructType(Array(
StructField("user_id", StringType),
StructField("event_type", StringType),
StructField("timestamp", LongType),
StructField("properties", MapType(StringType, StringType))
))
// Parse JSON messages
val parsedMessages = kafkaStream
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp").as("kafka_timestamp"),
from_json(expr("CAST(value AS STRING)"), messageSchema).as("message")
)
.select(
col("topic"),
col("partition"),
col("offset"),
col("kafka_timestamp"),
col("message.*")
)// For Avro messages, use external libraries like spark-avro
val avroMessages = kafkaStream
.select(
col("topic"),
from_avro(col("value"), avroSchemaString).as("avro_data")
)
.select(
col("topic"),
col("avro_data.*")
)// Process different topics differently based on schema
val processedMessages = kafkaStream
.withColumn("processed_value",
when(col("topic") === "user-events",
from_json(expr("CAST(value AS STRING)"), userEventSchema))
.when(col("topic") === "system-logs",
from_json(expr("CAST(value AS STRING)"), logSchema))
.otherwise(lit(null))
)