or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

writing.mddocs/

Writing to Kafka

The Kafka connector provides comprehensive writing capabilities for both batch and streaming scenarios, with support for topic routing, producer pooling, and flexible data formatting.

Capabilities

KafkaWriter

Core writer object providing the main logic for batch and streaming writes to Kafka.

/**
 * Core writer functionality for batch and streaming writes to Kafka
 * Handles data validation, topic routing, and producer management
 */
object KafkaWriter {
  
  /** Column names for Kafka message attributes */
  val TOPIC_ATTRIBUTE_NAME: String = "topic"
  val KEY_ATTRIBUTE_NAME: String = "key"
  val VALUE_ATTRIBUTE_NAME: String = "value"
  val HEADERS_ATTRIBUTE_NAME: String = "headers"
  val PARTITION_ATTRIBUTE_NAME: String = "partition"
  
  /** 
   * Validates query plan and writes data to Kafka
   * Main entry point for all Kafka write operations
   */
  def write(
    sparkSession: SparkSession,
    queryExecution: QueryExecution,
    kafkaParams: ju.Map[String, Object],
    topic: Option[String]
  ): Unit
  
  /** Validates DataFrame schema and configuration */
  def validateQuery(
    schema: Seq[Attribute],
    kafkaParameters: ju.Map[String, Object],
    topic: Option[String]
  ): Unit
  
  /** Creates expression for topic routing */
  def topicExpression(schema: Seq[Attribute], topic: Option[String]): Expression
  
  /** Creates expression for message key extraction */
  def keyExpression(schema: Seq[Attribute]): Expression
  
  /** Creates expression for message value extraction */
  def valueExpression(schema: Seq[Attribute]): Expression
  
  /** Creates expression for headers extraction */
  def headersExpression(schema: Seq[Attribute]): Expression
  
  /** Creates expression for partition assignment */
  def partitionExpression(schema: Seq[Attribute]): Expression
}

Usage Examples:

// Basic write with topic specified in options
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

// Write with topic column for dynamic routing
dataFrame
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()

KafkaWrite

V2 DataSource write implementation supporting both batch and streaming modes.

/**
 * V2 DataSource write implementation for Kafka
 * Supports both batch and streaming write operations
 */
case class KafkaWrite(
  topic: Option[String],
  producerParams: ju.Map[String, Object],
  schema: StructType
) extends Write {
  
  /** Returns description of the write operation */
  def description(): String = "KafkaWrite"
  
  /** Creates batch write implementation */
  def toBatch: BatchWrite = new KafkaBatchWrite(topic, producerParams, schema)
  
  /** Creates streaming write implementation */
  def toStreaming: StreamingWrite = new KafkaStreamingWrite(topic, producerParams, schema)
}

KafkaSink

Legacy V1 DataSource streaming sink implementation.

/**
 * V1 DataSource streaming sink for writing to Kafka
 * Provides backward compatibility with Structured Streaming V1 API
 */
class KafkaSink(
  sqlContext: SQLContext,
  kafkaParams: ju.Map[String, Object],
  topic: Option[String]
) extends Sink {
  
  /** Adds a batch of data to Kafka */
  def addBatch(batchId: Long, data: DataFrame): Unit
}

DataFrame Schema Requirements

Required Columns

The DataFrame must contain specific columns for Kafka message construction:

// Required: value column (message payload)
val validDataFrame = spark.createDataFrame(Seq(
  ("Hello World",)
)).toDF("value")

// Write with just value
validDataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "messages")
  .save()

Optional Columns

Additional columns provide more control over message attributes:

// Full control with all optional columns
val fullControlDataFrame = spark.createDataFrame(Seq(
  ("user-events", "user123", """{"event":"login","timestamp":1234567890}""", 0, Array(("correlation-id", "abc123".getBytes), ("source", "web".getBytes))),
  ("user-events", "user456", """{"event":"logout","timestamp":1234567891}""", 1, Array(("correlation-id", "def456".getBytes), ("source", "mobile".getBytes)))
)).toDF("topic", "key", "value", "partition", "headers")

fullControlDataFrame
  .write  
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()  // No topic option needed - using topic column

Column Types and Conversion

// Expected column types:
// topic: StringType (optional - can use option instead)
// key: StringType or BinaryType (optional - null if not provided)  
// value: StringType or BinaryType (required)
// partition: IntegerType (optional - Kafka will assign if not provided)
// headers: ArrayType of StructType with "key" (StringType) and "value" (BinaryType) (optional)

Type Conversion Examples:

// Convert different types to appropriate Kafka format
val typedDataFrame = originalDataFrame
  .select(
    col("topic"),
    col("user_id").cast(StringType).as("key"),                    // Convert to string key
    to_json(struct(col("*"))).as("value"),                       // Convert struct to JSON value
    (col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition"), // Partition by user_id mod 10
    array(
      struct(lit("content-type").as("key"), lit("application/json").cast(BinaryType).as("value")),
      struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value"))
    ).as("headers")
  )

Topic Routing

Static Topic Assignment

Specify topic in write options:

// All records go to the same topic
dataFrame
  .select(col("value"))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "static-topic")
  .save()

Dynamic Topic Routing

Use topic column for per-record topic routing:

// Route records to different topics based on data
val routedDataFrame = sourceDataFrame
  .withColumn("topic", 
    when(col("event_type") === "error", "error-topic")
    .when(col("event_type") === "warning", "warning-topic")
    .otherwise("info-topic")
  )
  .select("topic", "value")

routedDataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()

Topic Validation

// Topic must be specified either in options or as a column
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  // Missing topic specification will cause error:
  // "Topic option or topic column must be specified for Kafka writes"
  .save()

Producer Configuration

Basic Producer Settings

// Essential producer configuration
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.acks", "all")                    // Wait for all replicas
  .option("kafka.retries", "3")                   // Retry failed sends
  .option("kafka.batch.size", "16384")            // Batch size in bytes
  .option("kafka.linger.ms", "5")                 // Wait up to 5ms to batch
  .option("kafka.buffer.memory", "33554432")      // 32MB producer buffer
  .option("topic", "reliable-topic")
  .save()

Performance Optimization

// High-throughput producer configuration
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.acks", "1")                      // Leader acknowledgment only
  .option("kafka.compression.type", "snappy")     // Enable compression
  .option("kafka.batch.size", "65536")            // Larger batch size
  .option("kafka.linger.ms", "10")                // Higher batching delay
  .option("kafka.buffer.memory", "67108864")      // 64MB producer buffer
  .option("topic", "high-throughput-topic")
  .save()

Reliability Configuration

// Maximum reliability producer configuration
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.acks", "all")                    // Wait for all in-sync replicas
  .option("kafka.retries", "10")                  // More retry attempts
  .option("kafka.retry.backoff.ms", "1000")       // 1 second retry backoff
  .option("kafka.enable.idempotence", "true")     // Exactly-once semantics
  .option("kafka.max.in.flight.requests.per.connection", "1") // Maintain order
  .option("topic", "critical-topic")
  .save()

Streaming Writes

Basic Streaming Write

// Simple streaming write to Kafka
val streamingQuery = kafkaInputStream
  .selectExpr("CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-stream")
  .outputMode("append")
  .start()

Advanced Streaming Write

// Advanced streaming write with processing
val advancedStreamingQuery = kafkaInputStream
  .select(
    expr("CAST(key AS STRING)").as("key"),
    from_json(expr("CAST(value AS STRING)"), inputSchema).as("data")
  )
  .select(
    col("key"),
    col("data.user_id"),
    col("data.event_type"),
    to_json(col("data")).as("value")
  )
  .withColumn("topic",
    when(col("event_type") === "purchase", "purchase-events")
    .otherwise("general-events")
  )
  .select("topic", "key", "value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("checkpointLocation", "/checkpoints/kafka-output")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Streaming Write with Headers

// Include headers in streaming write
val headerStreamingQuery = kafkaInputStream
  .select(
    expr("CAST(key AS STRING)").as("key"),
    expr("CAST(value AS STRING)").as("value"),
    array(
      struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value")),
      struct(lit("version").as("key"), lit("1.0").cast(BinaryType).as("value"))
    ).as("headers")
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "enriched-stream")
  .outputMode("append")
  .start()

Batch Writes

Simple Batch Write

// Write DataFrame to Kafka in batch mode
batchDataFrame
  .select(
    col("id").cast(StringType).as("key"),
    to_json(struct(col("*"))).as("value")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "batch-data")
  .save()

Partitioned Batch Write

// Control Kafka partitioning for batch writes
batchDataFrame
  .select(
    col("user_id").cast(StringType).as("key"),
    to_json(struct(col("*"))).as("value"),
    (col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "partitioned-data")
  .save()

Multi-Topic Batch Write

// Write to multiple topics in a single operation
multiTopicDataFrame
  .withColumn("topic",
    when(col("category") === "orders", "order-events")
    .when(col("category") === "payments", "payment-events")
    .when(col("category") === "inventory", "inventory-events")
    .otherwise("misc-events")
  )
  .select("topic", "key", "value")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()

Error Handling and Validation

Schema Validation

The writer performs comprehensive schema validation:

// Invalid schema - missing value column
invalidDataFrame
  .select("key", "topic")  // Missing required "value" column
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .save()
// Throws: "Required attribute 'value' not found"

Producer Parameter Validation

// Invalid producer configuration
dataFrame
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.key.serializer", "custom.serializer")  // Not allowed
  .option("topic", "test")
  .save()
// Throws: "Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer"

Save Mode Validation

// Invalid save modes for Kafka
dataFrame
  .write
  .mode(SaveMode.Overwrite)  // Not supported
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .save()
// Throws: "Save mode Overwrite not allowed for Kafka"

// Supported save modes:
dataFrame.write.mode(SaveMode.Append).format("kafka")...      // Allowed (default)
dataFrame.write.mode(SaveMode.ErrorIfExists).format("kafka")... // Allowed

Producer Pooling

The connector automatically manages producer instances for efficiency:

Producer Cache Configuration

// Configure producer cache behavior (internal settings)
// These are managed automatically but can be tuned via Spark configuration:

// spark.kafka.producer.cache.timeout = "10m"          // Producer cache timeout
// spark.kafka.producer.cache.evictorThreadRunInterval = "1m"  // Cache cleanup interval

Producer Pool Management

/**
 * Internal producer pool management
 * Automatically handles producer lifecycle and connection pooling
 */
object InternalKafkaProducerPool {
  /** Acquires a cached producer for the given configuration */
  def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer
  
  /** Releases a producer back to the pool */
  def release(producer: CachedKafkaProducer): Unit
}

Performance Optimization

Batch Size Tuning

// Optimize for high throughput
highVolumeDataFrame
  .coalesce(10)  // Reduce number of partitions for larger batches
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.batch.size", "131072")      // 128KB batches
  .option("kafka.linger.ms", "20")           // Wait longer to fill batches
  .option("kafka.compression.type", "lz4")   // Use fast compression
  .option("topic", "high-volume-topic")
  .save()

Memory Management

// Optimize memory usage for large writes
largeDataFrame
  .repartition(50)  // Increase parallelism to reduce memory per partition
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.buffer.memory", "134217728")     // 128MB producer buffer
  .option("kafka.max.request.size", "10485760")   // 10MB max request size
  .option("topic", "large-data-topic")
  .save()

Concurrent Writes

// Maximize concurrent producer connections
dataFrame
  .repartition(100)  // More partitions = more concurrent producers
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("kafka.max.in.flight.requests.per.connection", "5")  // More concurrent requests
  .option("topic", "concurrent-topic")
  .save()