or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

data-writing.mddocs/

Data Writing

Comprehensive data writing capabilities for both streaming and batch workloads with producer connection pooling, automatic serialization, and support for multiple output modes.

Capabilities

Streaming Sink (Legacy)

Legacy streaming sink for writing DataFrames to Kafka topics in streaming queries.

/**
 * Legacy streaming sink for writing to Kafka
 */
class KafkaSink extends Sink with Logging {
  /**
   * Adds a batch of data to the sink
   * @param batchId Unique identifier for this batch
   * @param data DataFrame containing data to write
   */
  def addBatch(batchId: Long, data: DataFrame): Unit
  
  /**
   * String representation of the sink
   * @return String describing this sink
   */
  def toString(): String
}

Stream Writer (DataSource V2)

Modern stream writer for DataSource V2 with improved performance and reliability.

/**
 * Stream writer for DataSource V2 streaming writes
 * @param topic Optional default topic for writes
 * @param producerParams Kafka producer configuration
 * @param schema Schema of input data
 */
class KafkaStreamWriter(
  topic: Option[String],
  producerParams: Map[String, String],
  schema: StructType
) extends StreamWriter {
  /**
   * Creates writer factory for this stream
   * @return KafkaStreamWriterFactory instance
   */
  def createWriterFactory(): KafkaStreamWriterFactory
  
  /**
   * Commits an epoch of writes
   * @param epochId Epoch identifier
   * @param messages Array of commit messages from writers
   */
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
  
  /**
   * Aborts an epoch of writes
   * @param epochId Epoch identifier  
   * @param messages Array of commit messages from writers
   */
  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}

Stream Writer Factory

Factory for creating data writers in streaming contexts.

/**
 * Factory for creating stream data writers
 * @param topic Optional default topic
 * @param producerParams Kafka producer configuration
 * @param schema Schema of input data
 */
case class KafkaStreamWriterFactory(
  topic: Option[String],
  producerParams: Map[String, String], 
  schema: StructType
) extends DataWriterFactory[InternalRow] {
  /**
   * Creates data writer for a specific partition and task
   * @param partitionId Partition identifier
   * @param taskId Task identifier
   * @param epochId Epoch identifier
   * @return DataWriter for writing records
   */
  def createDataWriter(
    partitionId: Int,
    taskId: Long,
    epochId: Long
  ): DataWriter[InternalRow]
}

Stream Data Writer

Data writer for streaming Kafka writes with row-level processing.

/**
 * Data writer for streaming Kafka writes
 * @param targetTopic Optional target topic
 * @param producerParams Kafka producer configuration
 * @param inputSchema Schema of input data
 */
class KafkaStreamDataWriter(
  targetTopic: Option[String],
  producerParams: Map[String, String],
  inputSchema: StructType
) extends KafkaRowWriter with DataWriter[InternalRow] {
  /**
   * Writes a single row to Kafka
   * @param row InternalRow to write
   */
  def write(row: InternalRow): Unit
  
  /**
   * Commits all pending writes
   * @return WriterCommitMessage confirming completion
   */
  def commit(): WriterCommitMessage
  
  /**
   * Aborts all pending writes
   */
  def abort(): Unit
  
  /**
   * Closes the writer and releases resources
   */
  def close(): Unit
}

Batch Writer

Utilities for writing data to Kafka from batch and streaming queries.

/**
 * Utilities for writing data to Kafka from batch/streaming queries
 */
object KafkaWriter extends Logging {
  /** Topic column name in DataFrame */
  val TOPIC_ATTRIBUTE_NAME: String = "topic"
  
  /** Key column name in DataFrame */
  val KEY_ATTRIBUTE_NAME: String = "key"
  
  /** Value column name in DataFrame */  
  val VALUE_ATTRIBUTE_NAME: String = "value"
  
  /**
   * Validates query schema for Kafka write compatibility
   * @param schema Attribute schema to validate
   * @param kafkaParameters Kafka producer parameters
   * @param topic Optional default topic
   */
  def validateQuery(
    schema: Seq[Attribute],
    kafkaParameters: ju.Map[String, Object],
    topic: Option[String]
  ): Unit
  
  /**
   * Writes DataFrame data to Kafka
   * @param sparkSession Current Spark session
   * @param queryExecution Query execution plan
   * @param kafkaParameters Kafka producer parameters
   * @param topic Optional default topic
   */
  def write(
    sparkSession: SparkSession,
    queryExecution: QueryExecution,
    kafkaParameters: ju.Map[String, Object],
    topic: Option[String]
  ): Unit
  
  /**
   * String representation of the writer
   * @return String describing writer configuration
   */
  def toString: String
}

Write Task

Task for writing data to Kafka in batch mode with proper resource management.

/**
 * Task for writing data to Kafka in batch mode
 * @param producerConfiguration Kafka producer configuration
 * @param inputSchema Schema of input data
 * @param topic Optional target topic
 */
class KafkaWriteTask(
  producerConfiguration: ju.Map[String, Object],
  inputSchema: StructType,
  topic: Option[String]
) extends KafkaRowWriter {
  /**
   * Executes write task for iterator of rows
   * @param iterator Iterator of InternalRow objects to write
   */
  def execute(iterator: Iterator[InternalRow]): Unit
  
  /**
   * Closes task and releases resources
   */
  def close(): Unit
}

Row Writer Base

Base class for writing rows to Kafka with common functionality.

/**
 * Base class for writing rows to Kafka
 * @param inputSchema Schema of input data
 * @param topic Optional target topic
 */
abstract class KafkaRowWriter(inputSchema: StructType, topic: Option[String]) {
  /**
   * Sends a row to Kafka producer
   * @param row InternalRow to send
   * @param producer Kafka producer instance
   */
  protected def sendRow(row: InternalRow, producer: Producer[Array[Byte], Array[Byte]]): Unit
  
  /**
   * Checks for write errors and throws exceptions if found
   */
  protected def checkForErrors(): Unit
}

Writer Commit Message

Commit message for Kafka stream writes indicating successful completion.

/**
 * Commit message for Kafka stream writes
 */
case object KafkaWriterCommitMessage extends WriterCommitMessage

Cached Producer

Producer cache for improved performance and connection reuse.

/**
 * Cache for Kafka producers to improve performance
 */
object CachedKafkaProducer extends Logging {
  /**
   * Gets existing producer from cache or creates new one
   * @param kafkaParams Kafka producer parameters
   * @return Cached or new Producer instance
   */
  def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer[Array[Byte], Array[Byte]]
  
  /**
   * Explicitly closes and removes producer from cache
   * @param kafkaParams Producer parameters for identification
   */
  def close(kafkaParams: ju.Map[String, Object]): Unit
  
  /**
   * Clears entire producer cache
   */
  def clear(): Unit
}

Usage Examples

Streaming Write to Single Topic

val query = df
  .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

query.awaitTermination()

Streaming Write to Multiple Topics

val multiTopicDF = inputDF
  .withColumn("topic", when($"event_type" === "user", "user-events")
                      .when($"event_type" === "order", "order-events")
                      .otherwise("other-events"))
  .selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value", "topic")

val query = multiTopicDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("checkpointLocation", "/tmp/multi-topic-checkpoint")
  .start()

Batch Write

df.select(
  col("user_id").cast("string").as("key"),
  to_json(struct(col("*"))).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "user-data")
.save()

Partitioned Write

val partitionedDF = df
  .withColumn("partition_key", hash($"user_id") % 10)
  .selectExpr(
    "CAST(partition_key AS STRING) AS key",
    "to_json(struct(*)) AS value"
  )

partitionedDF
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "partitioned-topic")
  .save()

Custom Serialization

import org.apache.spark.sql.functions._

val customSerializedDF = df
  .selectExpr(
    "CAST(id AS STRING) AS key",
    "CAST(serialize_avro(struct(*)) AS BINARY) AS value"  // Custom serialization
  )

customSerializedDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "avro-topic")
  .start()

Configuration Options

Producer Configuration

// Basic configuration
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("topic", "my-topic")

// Performance tuning
.option("kafka.acks", "all")                           // Acknowledgment level
.option("kafka.retries", "3")                          // Retry count
.option("kafka.batch.size", "16384")                   // Batch size
.option("kafka.linger.ms", "5")                        // Batching delay
.option("kafka.buffer.memory", "33554432")             // Buffer memory
.option("kafka.compression.type", "snappy")            // Compression

// Reliability
.option("kafka.enable.idempotence", "true")            // Idempotent producer
.option("kafka.max.in.flight.requests.per.connection", "5")
.option("kafka.request.timeout.ms", "30000")           // Request timeout

Security Configuration

// SSL configuration
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "password")
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
.option("kafka.ssl.keystore.password", "password")

// SASL configuration  
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", 
  "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")

Data Format Requirements

Required Columns

The DataFrame must contain specific columns for Kafka writes:

// Minimum required: value column
df.select(
  lit("default-key").as("key"),     // Optional: key column (String or Binary)
  to_json(struct(col("*"))).as("value"), // Required: value column (String or Binary)  
  lit("my-topic").as("topic")       // Optional: topic column (String)
)

Column Types

val schemaValidation = StructType(Seq(
  StructField("key", StringType, nullable = true),      // or BinaryType
  StructField("value", StringType, nullable = false),   // or BinaryType
  StructField("topic", StringType, nullable = true)     // Optional
))

Type Conversions

// String to binary conversion
df.selectExpr(
  "CAST(key AS BINARY) AS key",
  "CAST(value AS BINARY) AS value"
)

// JSON serialization
df.select(
  col("id").cast("string").as("key"),
  to_json(struct(col("*"))).as("value")
)

// Custom serialization function
import org.apache.spark.sql.functions.udf

val customSerializer = udf((data: String) => {
  // Custom serialization logic
  data.getBytes("UTF-8")
})

df.select(
  col("key"),
  customSerializer(col("data")).as("value")
)

Error Handling

Write Failures

import org.apache.spark.sql.streaming.StreamingQueryException

try {
  val query = df.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "test-topic")
    .start()
    
  query.awaitTermination()
} catch {
  case e: StreamingQueryException =>
    println(s"Streaming query failed: ${e.getMessage}")
    e.getCause match {
      case kafkaException: org.apache.kafka.common.KafkaException =>
        println(s"Kafka error: ${kafkaException.getMessage}")
      case _ =>
        println("Non-Kafka error occurred")
    }
}

Producer Error Handling

// Configure retry behavior
.option("kafka.retries", "3")
.option("kafka.retry.backoff.ms", "100")
.option("kafka.request.timeout.ms", "30000")

// Error tolerance
.option("kafka.acks", "1")  // or "all" for stronger guarantees

Schema Validation

def validateKafkaSchema(df: DataFrame): Unit = {
  val schema = df.schema
  val hasValue = schema.fieldNames.contains("value")
  val hasValidValueType = hasValue && 
    (schema("value").dataType == StringType || schema("value").dataType == BinaryType)
    
  require(hasValue && hasValidValueType, 
    "DataFrame must contain 'value' column of String or Binary type")
    
  if (schema.fieldNames.contains("key")) {
    val keyType = schema("key").dataType
    require(keyType == StringType || keyType == BinaryType,
      "'key' column must be String or Binary type")
  }
  
  if (schema.fieldNames.contains("topic")) {
    require(schema("topic").dataType == StringType,
      "'topic' column must be String type")
  }
}

validateKafkaSchema(df)

Performance Optimization

Batching Configuration

// Optimize batching for throughput
.option("kafka.batch.size", "32768")        // 32KB batches
.option("kafka.linger.ms", "10")            // Wait up to 10ms
.option("kafka.buffer.memory", "67108864")  // 64MB buffer

// Optimize for latency
.option("kafka.batch.size", "0")            // No batching
.option("kafka.linger.ms", "0")             // Send immediately

Connection Management

// Connection pooling
.option("kafka.connections.max.idle.ms", "540000")  // 9 minutes
.option("kafka.max.in.flight.requests.per.connection", "5")

// Reduce connection overhead by reusing producers
CachedKafkaProducer.getOrCreate(kafkaParams)

Compression

// Enable compression for large messages
.option("kafka.compression.type", "snappy")  // or "gzip", "lz4", "zstd"

Monitoring and Metrics

Query Progress

val query = df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "metrics-topic")
  .start()

// Monitor progress
query.progress.foreach { progress =>
  println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
  println(s"Processing time: ${progress.durationMs.get("triggerExecution")}ms")
}

Producer Metrics

// Access producer metrics through JMX or custom metrics collectors
import org.apache.kafka.clients.producer.ProducerConfig

val metricsReporters = "org.apache.kafka.common.metrics.JmxReporter"
.option(s"kafka.${ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG}", metricsReporters)

Best Practices

Message Design

  1. Keep keys consistent for partitioning:

    df.withColumn("key", col("user_id").cast("string"))
  2. Use efficient serialization:

    // Prefer binary formats for large messages
    df.select(col("key"), col("avro_bytes").as("value"))
  3. Include metadata in messages:

    df.select(
      col("key"),
      to_json(struct(
        col("*"),
        current_timestamp().as("processed_at"),
        lit("spark").as("source")
      )).as("value")
    )

Reliability Patterns

  1. Enable idempotence for exactly-once:

    .option("kafka.enable.idempotence", "true")
    .option("kafka.acks", "all")
  2. Use checkpointing for fault tolerance:

    .option("checkpointLocation", "/reliable/checkpoint/path")
  3. Monitor for failures:

    query.exception.foreach(throw _)  // Propagate exceptions

Resource Management

  1. Close resources properly:

    try {
      // Write operations
    } finally {
      CachedKafkaProducer.clear()  // Clean up on shutdown
    }
  2. Configure memory appropriately:

    spark.conf.set("spark.executor.memory", "4g")
    spark.conf.set("spark.driver.memory", "2g")