or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md
tile.json

writing.mddocs/

Write Operations

Write operations enable sending data from Spark DataFrames to Kafka topics for real-time data distribution, event publishing, and data pipeline integration. The connector supports both batch and streaming write modes.

Capabilities

Streaming Write

Write streaming DataFrames to Kafka topics in real-time with exactly-once semantics.

/**
 * Write streaming DataFrame to Kafka topics
 * Supports exactly-once semantics with checkpointing
 */
df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)    // Required: Kafka bootstrap servers
  .option("topic", topicName: String)                    // Target topic (optional if in data)
  .outputMode("append")                                  // Required: only "append" supported
  .option("checkpointLocation", path: String)            // Required: checkpoint directory
  .start(): StreamingQuery

Usage Examples:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("KafkaWriter")
  .getOrCreate()

// Basic streaming write
val inputStream = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()

val kafkaWrite = inputStream
  .select(to_json(struct("*")).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

// Write with custom key and headers
val enrichedWrite = inputStream
  .select(
    lit("sensor-data").as("key"),
    to_json(struct("*")).as("value"),
    map(lit("source"), lit("spark-app")).as("headers")
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sensor-events")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Batch Write

Write batch DataFrames to Kafka topics for bulk data publishing.

/**
 * Write batch DataFrame to Kafka topics
 * Supports transactional writes for exactly-once semantics
 */
df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)    // Required: Kafka bootstrap servers
  .option("topic", topicName: String)                    // Target topic (optional if in data)
  .save()

Usage Examples:

// Basic batch write
val batchData = spark.range(1000)
  .select(
    col("id").cast("string").as("key"),
    to_json(struct(col("id"), current_timestamp().as("timestamp"))).as("value")
  )

batchData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "batch-events")
  .save()

// Write with producer configuration
val configuredWrite = batchData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "secure-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='pass';")
  .save()

Dynamic Topic Selection

Write to different topics based on data content using the topic column.

/**
 * Dynamic topic selection using data content
 * Topic determined per row based on 'topic' column value
 */
df.select(
  expr("CASE WHEN category = 'error' THEN 'error-topic' ELSE 'info-topic' END").as("topic"),
  col("key"),
  col("value")
).write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .save()

Usage Examples:

import org.apache.spark.sql.functions._

// Route messages by category
val categorizedData = spark.range(100)
  .select(
    (col("id") % 10).as("category"),
    col("id").cast("string").as("key"),
    to_json(struct("*")).as("value")
  )
  .select(
    when(col("category") < 3, "priority-topic")
      .when(col("category") < 7, "normal-topic")
      .otherwise("low-priority-topic").as("topic"),
    col("key"),
    col("value")
  )

categorizedData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()

// Route by timestamp (time-based partitioning)
val timePartitioned = inputData
  .select(
    concat(lit("events-"), date_format(col("timestamp"), "yyyy-MM-dd")).as("topic"),
    col("userId").cast("string").as("key"),
    to_json(col("eventData")).as("value")
  )

Partitioning Control

Control which Kafka partition messages are sent to for load balancing and ordering.

/**
 * Partition control using the 'partition' column
 * Specify exact partition numbers for message placement
 */
df.select(
  col("key"),
  col("value"),
  expr("abs(hash(key)) % 10").as("partition")  // Custom partitioning logic
).write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("topic", topicName: String)
  .save()

Usage Examples:

// Partition by user ID for ordering guarantees
val userPartitioned = userData
  .select(
    col("userId").cast("string").as("key"),
    to_json(struct("*")).as("value"),
    (abs(hash(col("userId"))) % 12).as("partition") // 12 partitions
  )

userPartitioned.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "user-events")
  .save()

// Load balancing with round-robin
val roundRobin = inputData
  .withColumn("row_number", monotonically_increasing_id())
  .select(
    col("key"),
    col("value"),
    (col("row_number") % 8).as("partition") // Distribute across 8 partitions
  )

Headers and Metadata

Include custom headers and metadata with Kafka messages.

/**
 * Include headers using the 'headers' column
 * Headers should be Map[String, Array[Byte]] or Map[String, String]
 */
df.select(
  col("key"),
  col("value"),
  map(
    lit("source"), lit("spark-app").cast("binary"),
    lit("version"), lit("1.0").cast("binary")
  ).as("headers")
).write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("topic", topicName: String)
  .save()

Usage Examples:

// Add tracing headers
val tracedMessages = processedData
  .select(
    col("messageId").cast("string").as("key"),
    to_json(col("payload")).as("value"),
    map(
      lit("trace-id"), lit(java.util.UUID.randomUUID().toString).cast("binary"),
      lit("source"), lit("data-processor").cast("binary"),
      lit("version"), lit("2.1.0").cast("binary")
    ).as("headers")
  )

// Add schema information headers
val schemaHeaders = jsonData
  .select(
    col("key"),
    col("value"),
    map(
      lit("schema-id"), lit("user-event-v1").cast("binary"),
      lit("content-type"), lit("application/json").cast("binary")
    ).as("headers")
  )

Producer Configuration

Configure Kafka producer settings for performance, security, and reliability.

/**
 * Producer configuration options (prefixed with 'kafka.')
 * All standard Kafka producer configurations are supported
 */
.option("kafka.acks", "all")                          // Acknowledgment level
.option("kafka.retries", "3")                         // Retry attempts
.option("kafka.batch.size", "16384")                  // Batch size in bytes
.option("kafka.linger.ms", "10")                      // Batching delay
.option("kafka.buffer.memory", "33554432")            // Total memory for buffering
.option("kafka.compression.type", "gzip")             // Compression algorithm
.option("kafka.enable.idempotence", "true")           // Idempotent producer

Usage Examples:

// High-throughput configuration
val highThroughputWrite = largeDataset.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "high-volume-topic")
  .option("kafka.acks", "1")                    // Faster but less durable
  .option("kafka.batch.size", "65536")          // Larger batches
  .option("kafka.linger.ms", "100")             // More batching delay
  .option("kafka.compression.type", "lz4")      // Fast compression
  .option("kafka.buffer.memory", "67108864")    // More buffer memory
  .save()

// High-reliability configuration
val reliableWrite = criticalData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "critical-events")
  .option("kafka.acks", "all")                  // Wait for all replicas
  .option("kafka.retries", "10")                // More retry attempts
  .option("kafka.enable.idempotence", "true")   // Prevent duplicates
  .option("kafka.max.in.flight.requests.per.connection", "1") // Ordering guarantee
  .save()

Security Configuration

Configure authentication and encryption for secure Kafka clusters.

/**
 * Security configuration options
 * Support for SASL, SSL, and other security mechanisms
 */
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", jaasConfig: String)
.option("kafka.ssl.truststore.location", truststorePath: String)
.option("kafka.ssl.truststore.password", truststorePassword: String)
.option("kafka.ssl.keystore.location", keystorePath: String)
.option("kafka.ssl.keystore.password", keystorePassword: String)

Usage Examples:

// SASL/PLAIN authentication
val saslWrite = secureData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "secure-kafka:9093")
  .option("topic", "secure-topic")
  .option("kafka.security.protocol", "SASL_PLAINTEXT")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required username='producer' password='secret';")
  .save()

// SSL with client certificates
val sslWrite = sensitiveData.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "ssl-kafka:9094")
  .option("topic", "sensitive-data")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
  .option("kafka.ssl.keystore.password", "keystore-password")
  .save()

Write Schema Requirements

Required Fields

/**
 * Minimum required schema for Kafka writes
 * Only 'value' field is mandatory
 */
case class MinimalKafkaRecord(
  value: Any  // Required: message payload (will be serialized to bytes)
)

Complete Schema

/**
 * Complete schema for Kafka writes with all optional fields
 * All fields except 'value' are optional
 */
case class KafkaWriteRecord(
  topic: String,                      // Target topic (optional if set in options)
  key: Any,                          // Message key (optional, will be serialized)
  value: Any,                        // Message value (required, will be serialized)
  partition: Int,                    // Target partition (optional, auto-assigned if not specified)
  headers: Map[String, Array[Byte]]  // Message headers (optional)
)

Data Transformation Patterns

JSON Serialization

import org.apache.spark.sql.functions._

// Convert structured data to JSON
val jsonMessages = structuredData
  .select(
    col("id").cast("string").as("key"),
    to_json(struct(
      col("timestamp"),
      col("userId"),
      col("eventType"),
      col("properties")
    )).as("value")
  )

// Add metadata to JSON
val enrichedJson = rawData
  .select(
    col("messageId").as("key"),
    to_json(struct(
      col("*"),
      current_timestamp().as("processedAt"),
      lit("spark-processor").as("source")
    )).as("value")
  )

Binary Data Handling

// Handle binary payloads
val binaryMessages = imageData
  .select(
    col("imageId").cast("string").as("key"),
    col("imageBytes").as("value"), // Already binary
    map(
      lit("content-type"), lit("image/jpeg").cast("binary"),
      lit("size"), col("imageSize").cast("string").cast("binary")
    ).as("headers")
  )

Avro Serialization

// Using Confluent Schema Registry with Avro
val avroMessages = userData
  .select(
    col("userId").cast("string").as("key"),
    to_avro(struct(col("*")), "user-schema").as("value")
  )

Error Handling and Monitoring

Write Error Handling

// Monitor write progress and handle failures
val query = processedData
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "events")
  .option("checkpointLocation", "/tmp/checkpoint")
  .foreachBatch { (batchDF, batchId) =>
    try {
      batchDF.write
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("topic", "events")
        .save()
      println(s"Successfully wrote batch $batchId")
    } catch {
      case ex: Exception =>
        println(s"Failed to write batch $batchId: ${ex.getMessage}")
        // Handle error (retry, log, alert, etc.)
        throw ex // Re-throw to trigger stream failure
    }
  }
  .start()

Dead Letter Queue Pattern

// Implement dead letter queue for failed messages
val (validMessages, invalidMessages) = inputData
  .select(
    col("*"),
    when(col("payload").isNotNull && col("userId").isNotNull, true)
      .otherwise(false).as("isValid")
  )
  .cache()
  .split(col("isValid"))

// Write valid messages to main topic
validMessages
  .select(col("userId").cast("string").as("key"), to_json(col("payload")).as("value"))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "valid-events")
  .save()

// Write invalid messages to dead letter queue
invalidMessages
  .select(
    col("messageId").cast("string").as("key"),
    to_json(struct(
      col("*"),
      lit("VALIDATION_FAILED").as("errorType"),
      current_timestamp().as("errorTimestamp")
    )).as("value")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "dead-letter-queue")
  .save()

Performance Optimization

Batch Size Tuning

// Optimize for high throughput
val optimizedWrite = largeDataset.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "high-volume")
  .option("kafka.batch.size", "131072")      // 128KB batches
  .option("kafka.linger.ms", "50")           // 50ms batching delay
  .option("kafka.compression.type", "snappy") // Fast compression
  .save()

Memory Management

// Configure memory usage for large datasets
spark.conf.set("spark.sql.streaming.kafka.producer.cache.timeout", "10m")
spark.conf.set("spark.kafka.producer.cache.evictorThreadRunInterval", "30s")

val memoryOptimizedWrite = bigDataset.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "large-messages")
  .option("kafka.buffer.memory", "134217728") // 128MB buffer
  .option("kafka.max.request.size", "10485760") // 10MB max message
  .save()