or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

schema-conversion.mddocs/

Schema Conversion

The Kafka connector automatically converts Kafka ConsumerRecord objects into Spark DataFrame rows, providing a structured view of Kafka messages with optional header support.

Capabilities

Kafka Record Schema

The connector provides a fixed schema for Kafka records that cannot be customized. The schema varies based on whether headers are included.

Default Schema (Headers Disabled)

When includeHeaders is false or not specified:

val kafkaSchemaWithoutHeaders = StructType(Array(
  StructField("key", BinaryType, nullable = true),
  StructField("value", BinaryType, nullable = true),  
  StructField("topic", StringType, nullable = false),
  StructField("partition", IntegerType, nullable = false),
  StructField("offset", LongType, nullable = false),
  StructField("timestamp", TimestampType, nullable = true),
  StructField("timestampType", IntegerType, nullable = true)
))

Field Descriptions:

  • key: Message key as binary data (null if no key)
  • value: Message value as binary data (null if no value)
  • topic: Topic name where message was published
  • partition: Partition number within the topic
  • offset: Offset of the message within the partition
  • timestamp: Message timestamp (producer or broker timestamp)
  • timestampType: Type of timestamp (0 = CreateTime, 1 = LogAppendTime)

Usage Examples:

val kafkaStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()

// Access fields directly
val processedStream = kafkaStream
  .select(
    col("topic"),
    col("partition"), 
    col("offset"),
    col("timestamp"),
    expr("CAST(key AS STRING)").as("key_str"),
    expr("CAST(value AS STRING)").as("value_str")
  )

Extended Schema (Headers Enabled)

When includeHeaders is set to "true":

val kafkaSchemaWithHeaders = StructType(Array(
  StructField("key", BinaryType, nullable = true),
  StructField("value", BinaryType, nullable = true),
  StructField("topic", StringType, nullable = false),
  StructField("partition", IntegerType, nullable = false),
  StructField("offset", LongType, nullable = false),
  StructField("timestamp", TimestampType, nullable = true),
  StructField("timestampType", IntegerType, nullable = true),
  StructField("headers", ArrayType(StructType(Array(
    StructField("key", StringType, nullable = false),
    StructField("value", BinaryType, nullable = true)
  ))), nullable = true)
))

Headers Field Structure:

val headersType = ArrayType(StructType(Array(
  StructField("key", StringType, nullable = false),    // Header key as string
  StructField("value", BinaryType, nullable = true)    // Header value as binary
)))

Configuration:

val kafkaStreamWithHeaders = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("includeHeaders", "true")  // Enable headers
  .load()

Usage Examples:

// Process messages with headers
val processedWithHeaders = kafkaStreamWithHeaders
  .select(
    col("topic"),
    expr("CAST(key AS STRING)").as("key_str"),
    expr("CAST(value AS STRING)").as("value_str"),
    col("headers")
  )

// Extract specific header values
val withExtractedHeaders = kafkaStreamWithHeaders
  .select(
    col("*"),
    expr("filter(headers, x -> x.key = 'content-type')[0].value").as("content_type_header"),
    expr("filter(headers, x -> x.key = 'correlation-id')[0].value").as("correlation_id_header")
  )

KafkaRecordToRowConverter

Internal converter class that handles the transformation from Kafka ConsumerRecord to Spark rows.

/**
 * Converts Kafka ConsumerRecord to Spark InternalRow/UnsafeRow
 * Handles both header-enabled and header-disabled modes
 */
class KafkaRecordToRowConverter(
  includeHeaders: Boolean
) {
  
  /** Convert to InternalRow without headers */
  val toInternalRowWithoutHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
  
  /** Convert to InternalRow with headers */  
  val toInternalRowWithHeaders: ConsumerRecord[Array[Byte], Array[Byte]] => InternalRow
  
  /** Convert to UnsafeRow without headers */
  val toUnsafeRowWithoutHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
  
  /** Convert to UnsafeRow with headers */
  val toUnsafeRowWithHeadersProjector: ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
  
  /** Generic UnsafeRow projector based on header setting */
  def toUnsafeRowProjector(includeHeaders: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] => UnsafeRow
}

Companion Object:

object KafkaRecordToRowConverter {
  /** Returns appropriate schema based on header inclusion */
  def kafkaSchema(includeHeaders: Boolean): StructType
  
  /** Headers array type definition */
  val headersType: DataType = ArrayType(StructType(Array(
    StructField("key", StringType),
    StructField("value", BinaryType)
  )))
}

Data Type Conversions

Binary Data Handling

Kafka keys and values are always treated as binary data in Spark:

// Keys and values come as BinaryType - cast to appropriate types
val stringData = kafkaStream
  .select(
    expr("CAST(key AS STRING)").as("key_string"),
    expr("CAST(value AS STRING)").as("value_string")
  )

// Parse JSON values
val jsonData = kafkaStream
  .select(
    expr("CAST(value AS STRING)").as("json_string")
  )
  .select(
    from_json(col("json_string"), jsonSchema).as("parsed_data")
  )

Timestamp Handling

Kafka timestamps are converted to Spark TimestampType:

// Work with timestamps
val withTimestamps = kafkaStream
  .select(
    col("timestamp"),
    col("timestampType"),
    date_format(col("timestamp"), "yyyy-MM-dd HH:mm:ss").as("formatted_timestamp"),
    hour(col("timestamp")).as("hour"),
    date(col("timestamp")).as("date")
  )

// Filter by time ranges
val recentMessages = kafkaStream
  .filter(col("timestamp") > lit("2023-01-01 00:00:00").cast(TimestampType))

Header Processing

Process Kafka headers as structured data:

// Work with headers (when includeHeaders = "true")
val withHeaderProcessing = kafkaStreamWithHeaders
  .select(
    col("*"),
    size(col("headers")).as("header_count"),
    
    // Extract all header keys
    expr("transform(headers, x -> x.key)").as("header_keys"),
    
    // Find specific header by key
    expr("filter(headers, x -> x.key = 'content-type')").as("content_type_headers"),
    
    // Extract header value as string
    expr("CAST(filter(headers, x -> x.key = 'user-id')[0].value AS STRING)").as("user_id")
  )

Schema Validation

The connector enforces a fixed schema and will reject custom schemas:

// This will fail - custom schemas are not supported
spark
  .readStream
  .format("kafka")
  .schema(customSchema)  // Will throw exception
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()
// Exception: "Kafka source has a fixed schema and cannot be set with a custom one"

Performance Considerations

Projection Optimization

The converter supports column projection for better performance:

// Only select needed columns to reduce processing overhead
val optimized = kafkaStream
  .select("value", "timestamp", "topic")  // Only process these columns
  .filter(col("timestamp") > recentTimestamp)

Binary vs String Conversion

// Efficient - work with binary data when possible
val binaryProcessing = kafkaStream
  .select(col("value"))
  .filter(length(col("value")) > 100)

// Less efficient - casting to string for every row
val stringProcessing = kafkaStream
  .select(expr("CAST(value AS STRING)").as("value_str"))
  .filter(length(col("value_str")) > 100)

Header Processing Performance

// Efficient - only include headers when needed
val withoutHeaders = spark
  .readStream
  .format("kafka")
  .option("includeHeaders", "false")  // Default, more efficient
  .load()

// Less efficient - headers require additional processing
val withHeaders = spark
  .readStream  
  .format("kafka")
  .option("includeHeaders", "true")   // Only when headers are needed
  .load()

Common Patterns

JSON Message Processing

import org.apache.spark.sql.types._

// Define JSON schema
val messageSchema = StructType(Array(
  StructField("user_id", StringType),
  StructField("event_type", StringType),
  StructField("timestamp", LongType),
  StructField("properties", MapType(StringType, StringType))
))

// Parse JSON messages
val parsedMessages = kafkaStream
  .select(
    col("topic"),
    col("partition"), 
    col("offset"),
    col("timestamp").as("kafka_timestamp"),
    from_json(expr("CAST(value AS STRING)"), messageSchema).as("message")
  )
  .select(
    col("topic"),
    col("partition"),
    col("offset"), 
    col("kafka_timestamp"),
    col("message.*")
  )

Avro Message Processing

// For Avro messages, use external libraries like spark-avro
val avroMessages = kafkaStream
  .select(
    col("topic"),
    from_avro(col("value"), avroSchemaString).as("avro_data")
  )
  .select(
    col("topic"),
    col("avro_data.*")
  )

Message Routing by Topic

// Process different topics differently based on schema
val processedMessages = kafkaStream
  .withColumn("processed_value", 
    when(col("topic") === "user-events", 
         from_json(expr("CAST(value AS STRING)"), userEventSchema))
    .when(col("topic") === "system-logs",
         from_json(expr("CAST(value AS STRING)"), logSchema))
    .otherwise(lit(null))
  )