or run

tessl search
Log in

Version

Files

docs

batch-operations.mdconnection-configuration.mderror-handling-reliability.mdindex.mdoffset-management.mdschema-data-format.mdstreaming-operations.md
tile.json

schema-data-format.mddocs/

Schema and Data Format

Fixed schema for Kafka records with support for headers and comprehensive type definitions for data processing.

Capabilities

Standard Kafka Schema

The Spark SQL Kafka connector uses a fixed schema for all Kafka records, providing access to both message content and metadata.

/**
 * Standard Kafka record schema structure (without headers)
 * Note: Schema is fixed and cannot be customized
 */
val schemaWithoutHeaders = StructType(Array(
  StructField("key", BinaryType),
  StructField("value", BinaryType),
  StructField("topic", StringType),
  StructField("partition", IntegerType),
  StructField("offset", LongType),
  StructField("timestamp", TimestampType),
  StructField("timestampType", IntegerType)
))

/**
 * Kafka schema with headers field included
 * Note: Headers field is only present when includeHeaders=true
 */
val headersType = ArrayType(StructType(Array(
  StructField("key", StringType),
  StructField("value", BinaryType)
)))

val schemaWithHeaders = StructType(
  schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)

/**
 * Get appropriate schema based on headers configuration
 * @param includeHeaders - Whether to include headers field
 * @return StructType representing Kafka record schema
 */
def kafkaSchema(includeHeaders: Boolean): StructType = {
  if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}

Schema Fields

Core Message Fields

/**
 * Message key - typically used for partitioning and keyed operations
 * Type: BinaryType (nullable)
 * Note: Keys are deserialized as byte arrays using ByteArrayDeserializer
 */
val key: Array[Byte]

/**
 * Message value - the actual message payload
 * Type: BinaryType (non-nullable)
 * Note: Values are deserialized as byte arrays using ByteArrayDeserializer
 */
val value: Array[Byte]

Metadata Fields

/**
 * Topic name where the message was published
 * Type: StringType (non-nullable)
 */
val topic: String

/**
 * Partition number within the topic
 * Type: IntegerType (non-nullable)
 */
val partition: Int

/**
 * Message offset within the partition
 * Type: LongType (non-nullable)
 * Note: Offsets are unique within a partition and increase monotonically
 */
val offset: Long

/**
 * Message timestamp
 * Type: TimestampType (non-nullable)
 * Note: Represents either creation time or log append time depending on timestampType
 */
val timestamp: java.sql.Timestamp

/**
 * Timestamp type indicator
 * Type: IntegerType (non-nullable)
 * Values: 0 = CreateTime (set by producer), 1 = LogAppendTime (set by broker)
 */
val timestampType: Int

Headers Support

Headers can be included in the Kafka record schema by setting the includeHeaders option.

/**
 * Enable headers field in Kafka schema
 * @param include - "true" to include headers, "false" to exclude (default: "false")
 */
.option("includeHeaders", "true")

/**
 * Message headers as array of key-value pairs
 * Type: ArrayType[StructType] (nullable)
 * Note: Only included when includeHeaders option is true
 */
val headers: Array[Row] // Each Row contains (key: String, value: Array[Byte])

/**
 * Header structure definition
 */
val headerType = StructType(Array(
  StructField("key", StringType),     // Header key
  StructField("value", BinaryType)    // Header value as binary
))

/**
 * Complete headers array type
 */
val headersType = ArrayType(headerType)

Schema Access and Conversion

Utilities for working with the Kafka schema and converting data.

/**
 * Schema converter utilities
 */
object KafkaRecordToRowConverter {
  /**
   * Get the standard Kafka schema
   * @param includeHeaders - Whether to include headers field in schema
   * @return StructType representing Kafka record schema
   */
  def kafkaSchema(includeHeaders: Boolean): StructType
  
  /**
   * Get the headers array type definition
   * @return ArrayType for Kafka headers
   */
  def headersType: ArrayType
}

Data Conversion Examples

Reading and Converting Data

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Read from Kafka with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("includeHeaders", "true")
  .load()

// Convert binary fields to strings
val messagesDF = df
  .select(
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp"),
    expr("CAST(key AS STRING)").as("key_str"),
    expr("CAST(value AS STRING)").as("value_str"),
    col("headers")
  )

// Extract JSON from value field
val jsonDF = df
  .select(
    col("topic"),
    col("offset"),
    from_json(expr("CAST(value AS STRING)"), jsonSchema).as("data")
  )
  .select("topic", "offset", "data.*")

// Work with headers
val withHeadersDF = df
  .select(
    col("topic"),
    expr("CAST(value AS STRING)").as("message"),
    col("headers")
  )
  .select(
    col("topic"),
    col("message"),
    expr("transform(headers, h -> struct(h.key, CAST(h.value AS STRING)))").as("headers_str")
  )

Writing Data Format

When writing to Kafka, the DataFrame must contain specific columns:

/**
 * Required and optional columns for writing to Kafka
 */
// Required: value column
val valueColumn = col("message").cast("binary")  // or expr("CAST(message AS BINARY)")

// Optional: key column for partitioning
val keyColumn = col("user_id").cast("binary")

// Optional: topic column for per-record topic routing
val topicColumn = lit("dynamic-topic")

// Optional: partition column for explicit partition assignment  
val partitionColumn = col("computed_partition").cast("integer")

// Optional: headers column
val headersColumn = array(
  struct(lit("source").as("key"), lit("spark").cast("binary").as("value")),
  struct(lit("version").as("key"), lit("1.0").cast("binary").as("value"))
)

Writing Examples:

// Simple write with value only
df.select(expr("CAST(message AS BINARY)").as("value"))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

// Write with key and headers
df.select(
    expr("CAST(user_id AS BINARY)").as("key"),
    expr("CAST(to_json(struct(*)) AS BINARY)").as("value"),
    array(struct(lit("source").as("key"), lit("analytics").cast("binary").as("value"))).as("headers")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "enriched-events")
  .save()

// Dynamic topic routing
df.select(
    col("target_topic").as("topic"),
    expr("CAST(payload AS BINARY)").as("value")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()  // No default topic needed when topic column is provided

Schema Validation

The connector performs automatic schema validation for write operations.

/**
 * Column name constants for writing
 */
object KafkaWriter {
  val TOPIC_ATTRIBUTE_NAME = "topic"
  val KEY_ATTRIBUTE_NAME = "key"  
  val VALUE_ATTRIBUTE_NAME = "value"
  val HEADERS_ATTRIBUTE_NAME = "headers"
  val PARTITION_ATTRIBUTE_NAME = "partition"
  
  /**
   * Validate DataFrame schema for Kafka writing
   * Checks for required value column and proper types
   */
  def validateQuery(/* parameters */): Unit
}

Types

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Standard Kafka schema components
val BinaryType: DataType        // For key and value fields
val StringType: DataType        // For topic and header keys
val IntegerType: DataType       // For partition and timestampType
val LongType: DataType          // For offset
val TimestampType: DataType     // For timestamp
val ArrayType: DataType         // For headers array

// Header structure
case class KafkaHeader(key: String, value: Array[Byte])

// Complete record structure (conceptual - actual access via DataFrame columns)
case class KafkaRecord(
  key: Option[Array[Byte]],
  value: Array[Byte],
  topic: String,
  partition: Int,
  offset: Long,
  timestamp: java.sql.Timestamp,
  timestampType: Int,
  headers: Option[Array[KafkaHeader]]
)