or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

configuration.mddocs/

Configuration

Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, performance settings, and security configurations.

Capabilities

Source Configuration Options

Configuration options for reading data from Kafka in both streaming and batch modes.

// Connection Configuration
"kafka.bootstrap.servers" -> "localhost:9092"           // Required: Kafka broker addresses
"subscribe" -> "topic1,topic2,topic3"                   // Topic subscription (comma-separated)
"subscribePattern" -> "events_.*"                       // Topic pattern subscription (regex)
"assign" -> """{"topic1":[0,1],"topic2":[0]}"""        // Manual partition assignment (JSON)

// Offset Management
"startingOffsets" -> "earliest"                         // Starting position: "earliest", "latest", or JSON
"endingOffsets" -> "latest"                             // Ending position: "latest" or JSON (batch only)
"failOnDataLoss" -> "true"                              // Fail query on data loss (default: true)

// Performance Tuning  
"minPartitions" -> "10"                                 // Minimum Spark partitions
"maxOffsetsPerTrigger" -> "1000000"                     // Rate limiting for streaming
"kafkaConsumer.pollTimeoutMs" -> "10000"               // Consumer poll timeout

Sink Configuration Options

Configuration options for writing data to Kafka topics.

// Connection Configuration
"kafka.bootstrap.servers" -> "localhost:9092"           // Required: Kafka broker addresses  
"topic" -> "output-topic"                               // Default output topic

// Producer Performance
"kafka.acks" -> "all"                                   // Acknowledgment level: "0", "1", "all"
"kafka.retries" -> "3"                                  // Number of retries
"kafka.batch.size" -> "16384"                           // Batch size in bytes
"kafka.linger.ms" -> "5"                                // Batching delay in milliseconds
"kafka.buffer.memory" -> "33554432"                     // Total memory for buffering
"kafka.compression.type" -> "snappy"                    // Compression: "none", "gzip", "snappy", "lz4", "zstd"

// Reliability
"kafka.enable.idempotence" -> "true"                    // Enable idempotent producer
"kafka.max.in.flight.requests.per.connection" -> "5"   // Max unacknowledged requests
"kafka.request.timeout.ms" -> "30000"                   // Request timeout

Kafka Consumer Parameters

Complete set of Kafka consumer configuration parameters supported through the kafka. prefix.

// Core Consumer Settings
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
"kafka.client.id" -> "spark-kafka-consumer"
"kafka.session.timeout.ms" -> "30000"                   // Consumer session timeout
"kafka.heartbeat.interval.ms" -> "3000"                 // Heartbeat interval
"kafka.max.poll.records" -> "500"                       // Records per poll
"kafka.max.poll.interval.ms" -> "300000"               // Max time between polls

// Fetch Configuration
"kafka.fetch.min.bytes" -> "1"                          // Minimum bytes to fetch
"kafka.fetch.max.wait.ms" -> "500"                      // Max wait for min bytes
"kafka.fetch.max.bytes" -> "52428800"                   // Maximum bytes per fetch (50MB)
"kafka.max.partition.fetch.bytes" -> "1048576"          // Maximum bytes per partition (1MB)

// Network Configuration
"kafka.receive.buffer.bytes" -> "65536"                 // Receive buffer size (64KB)
"kafka.send.buffer.bytes" -> "131072"                   // Send buffer size (128KB)
"kafka.request.timeout.ms" -> "30000"                   // Request timeout
"kafka.reconnect.backoff.ms" -> "50"                    // Reconnect backoff
"kafka.reconnect.backoff.max.ms" -> "1000"             // Max reconnect backoff
"kafka.retry.backoff.ms" -> "100"                       // Retry backoff

// Connection Management
"kafka.connections.max.idle.ms" -> "540000"            // Connection idle timeout (9 minutes)
"kafka.metadata.max.age.ms" -> "300000"                // Metadata refresh interval (5 minutes)

Kafka Producer Parameters

Complete set of Kafka producer configuration parameters for writing data.

// Core Producer Settings
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
"kafka.client.id" -> "spark-kafka-producer"
"kafka.acks" -> "all"                                   // "0", "1", or "all"
"kafka.retries" -> "2147483647"                         // Max retries (Integer.MAX_VALUE)
"kafka.retry.backoff.ms" -> "100"                       // Retry backoff

// Batching and Performance
"kafka.batch.size" -> "16384"                           // Batch size (16KB)
"kafka.linger.ms" -> "0"                                // Batching delay
"kafka.buffer.memory" -> "33554432"                     // Buffer memory (32MB)
"kafka.compression.type" -> "none"                      // Compression type
"kafka.max.request.size" -> "1048576"                   // Max request size (1MB)

// Idempotence and Ordering
"kafka.enable.idempotence" -> "false"                   // Idempotent producer
"kafka.max.in.flight.requests.per.connection" -> "5"   // Max unacknowledged requests

// Timing Configuration
"kafka.request.timeout.ms" -> "30000"                   // Request timeout
"kafka.delivery.timeout.ms" -> "120000"                 // Delivery timeout (2 minutes)
"kafka.send.buffer.bytes" -> "131072"                   // Send buffer (128KB)
"kafka.receive.buffer.bytes" -> "32768"                 // Receive buffer (32KB)

// Metadata and Connections
"kafka.metadata.max.age.ms" -> "300000"                // Metadata refresh (5 minutes)
"kafka.connections.max.idle.ms" -> "540000"            // Connection idle timeout (9 minutes)
"kafka.reconnect.backoff.ms" -> "50"                   // Reconnect backoff
"kafka.reconnect.backoff.max.ms" -> "1000"             // Max reconnect backoff

Security Configuration

Security configuration for SSL and SASL authentication.

// SSL Configuration
"kafka.security.protocol" -> "SSL"                      // Security protocol
"kafka.ssl.protocol" -> "TLSv1.2"                      // SSL protocol version
"kafka.ssl.truststore.location" -> "/path/to/truststore.jks"
"kafka.ssl.truststore.password" -> "truststore-password"
"kafka.ssl.truststore.type" -> "JKS"                   // Truststore type
"kafka.ssl.keystore.location" -> "/path/to/keystore.jks"
"kafka.ssl.keystore.password" -> "keystore-password"
"kafka.ssl.keystore.type" -> "JKS"                     // Keystore type
"kafka.ssl.key.password" -> "key-password"             // Key password

// SSL Verification
"kafka.ssl.endpoint.identification.algorithm" -> "https"  // Hostname verification
"kafka.ssl.check.hostname" -> "true"                   // Check hostname

// SASL Configuration
"kafka.security.protocol" -> "SASL_SSL"                // SASL with SSL
"kafka.sasl.mechanism" -> "PLAIN"                      // SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";"

// SASL SCRAM Configuration
"kafka.sasl.mechanism" -> "SCRAM-SHA-256"
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"

// Kerberos (GSSAPI) Configuration
"kafka.sasl.mechanism" -> "GSSAPI"
"kafka.sasl.kerberos.service.name" -> "kafka"
"kafka.sasl.jaas.config" -> "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/path/to/kafka.keytab\" storeKey=true useTicketCache=false principal=\"kafka/hostname@REALM\";"

Configuration Examples

Basic Streaming Configuration

val streamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .load()

val query = streamingDF
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("topic", "output-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

High Performance Configuration

val highPerfDF = spark
  .readStream
  .format("kafka")
  // Connection settings
  .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
  .option("subscribe", "high-volume-topic")
  
  // Performance tuning
  .option("minPartitions", "50")                        // Increase parallelism
  .option("maxOffsetsPerTrigger", "5000000")           // 5M records per batch
  .option("kafkaConsumer.pollTimeoutMs", "5000")       // 5 second timeout
  
  // Consumer optimization
  .option("kafka.fetch.min.bytes", "1024")             // 1KB minimum fetch
  .option("kafka.fetch.max.wait.ms", "100")            // Fast fetching
  .option("kafka.max.poll.records", "1000")            // More records per poll
  .option("kafka.receive.buffer.bytes", "262144")      // 256KB buffer
  .option("kafka.fetch.max.bytes", "104857600")        // 100MB max fetch
  
  .load()

val writeQuery = processedDF
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
  .option("topic", "processed-topic")
  
  // Producer optimization
  .option("kafka.batch.size", "65536")                 // 64KB batches
  .option("kafka.linger.ms", "10")                     // 10ms batching delay
  .option("kafka.compression.type", "snappy")          // Compression
  .option("kafka.buffer.memory", "134217728")          // 128MB buffer
  .option("kafka.acks", "1")                          // Balanced reliability
  
  .start()

Secure Configuration

val secureDF = spark
  .readStream
  .format("kafka")
  // Basic connection
  .option("kafka.bootstrap.servers", "secure-broker1:9093,secure-broker2:9093")
  .option("subscribe", "secure-topic")
  
  // SSL Configuration
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", "/etc/kafka/truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .option("kafka.ssl.keystore.location", "/etc/kafka/keystore.jks")
  .option("kafka.ssl.keystore.password", "keystore-password")
  .option("kafka.ssl.key.password", "key-password")
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  
  .load()

// SASL/SCRAM configuration
val saslDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "sasl-broker:9094")
  .option("subscribe", "authenticated-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  .option("kafka.sasl.jaas.config", 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"myuser\" password=\"mypassword\";")
  .load()

Batch Processing Configuration

val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "historical-data")
  
  // Batch-specific offset configuration
  .option("startingOffsets", "earliest")
  .option("endingOffsets", """{"historical-data":{"0":1000000,"1":1500000}}""")
  
  // Optimize for large batch reads
  .option("minPartitions", "20")
  .option("kafka.fetch.max.bytes", "104857600")        // 100MB
  .option("kafka.max.partition.fetch.bytes", "10485760") // 10MB per partition
  .option("kafka.receive.buffer.bytes", "1048576")     // 1MB buffer
  
  .load()

Multi-Environment Configuration

import scala.util.Properties

val environment = Properties.envOrElse("ENVIRONMENT", "dev")

val envConfig = environment match {
  case "prod" => Map(
    "kafka.bootstrap.servers" -> "prod-broker1:9092,prod-broker2:9092,prod-broker3:9092",
    "kafka.security.protocol" -> "SASL_SSL",
    "kafka.acks" -> "all",
    "kafka.retries" -> "10",
    "failOnDataLoss" -> "true"
  )
  
  case "staging" => Map(
    "kafka.bootstrap.servers" -> "staging-broker:9092",
    "kafka.acks" -> "1",
    "failOnDataLoss" -> "true"
  )
  
  case _ => Map(
    "kafka.bootstrap.servers" -> "localhost:9092",
    "kafka.acks" -> "1",
    "failOnDataLoss" -> "false"
  )
}

val configuredDF = spark
  .readStream
  .format("kafka")
  .options(envConfig)
  .option("subscribe", s"${environment}-events")
  .load()

Configuration Validation

Unsupported Options

Certain Kafka parameters are managed internally and cannot be overridden:

// Automatically managed by Spark (will throw IllegalArgumentException if specified)
"kafka.group.id"                    // Unique group IDs generated per query
"kafka.auto.offset.reset"           // Controlled via startingOffsets option
"kafka.key.deserializer"            // Fixed to ByteArrayDeserializer
"kafka.value.deserializer"          // Fixed to ByteArrayDeserializer  
"kafka.enable.auto.commit"          // Disabled for offset management
"kafka.interceptor.classes"         // Not supported for safety

// Producer-specific unsupported options
"kafka.key.serializer"              // Fixed to ByteArraySerializer
"kafka.value.serializer"            // Fixed to ByteArraySerializer

Required Options

// Required for all operations
"kafka.bootstrap.servers"           // Must be specified

// Required for sources (exactly one must be specified)
"subscribe"                         // Topic list
"subscribePattern"                  // Topic pattern
"assign"                           // Partition assignment

// Schema validation occurs at runtime
// DataFrame must have required columns for sinks: "value" (and optionally "key", "topic")

Option Validation Examples

// Valid consumer strategy
spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")           // Valid
  .load()

// Invalid: multiple strategies
spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")                  // Both subscribe
  .option("subscribePattern", "topic.*")          // and pattern specified
  .load()  // Throws IllegalArgumentException

// Invalid: unsupported parameter
spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "my-group")          // Unsupported
  .load()  // Throws IllegalArgumentException

Performance Tuning Guidelines

Consumer Performance

// Optimize fetch behavior
.option("kafka.fetch.min.bytes", "1024")         // Wait for 1KB
.option("kafka.fetch.max.wait.ms", "500")        // Max wait 500ms
.option("kafka.max.poll.records", "500")         // Records per poll

// Buffer optimization
.option("kafka.receive.buffer.bytes", "65536")   // 64KB receive buffer
.option("kafka.fetch.max.bytes", "52428800")     // 50MB max fetch

// Partition parallelism
.option("minPartitions", "20")                   // Increase Spark partitions

Producer Performance

// Batching optimization
.option("kafka.batch.size", "32768")             // 32KB batches
.option("kafka.linger.ms", "10")                 // 10ms linger time
.option("kafka.compression.type", "snappy")      // Enable compression

// Memory and throughput
.option("kafka.buffer.memory", "67108864")       // 64MB buffer
.option("kafka.max.in.flight.requests.per.connection", "5")

Memory Configuration

// Spark configuration for Kafka workloads
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4") 
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.sql.shuffle.partitions", "200")

// JVM tuning for Kafka clients
spark.conf.set("spark.executor.extraJavaOptions", 
  "-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+UnlockExperimentalVMOptions")

Monitoring Configuration

Metrics Collection

// Enable JMX metrics
.option("kafka.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter")

// Custom metrics reporting interval
.option("kafka.metrics.sample.window.ms", "30000")    // 30 second window
.option("kafka.metrics.num.samples", "2")             // 2 samples per window

Logging Configuration

// Enable detailed logging for troubleshooting
import org.apache.log4j.{Level, Logger}

Logger.getLogger("org.apache.spark.sql.kafka010").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.kafka").setLevel(Level.INFO)

Troubleshooting Common Issues

Connection Issues

// Increase timeouts for unreliable networks
.option("kafka.request.timeout.ms", "60000")     // 60 second timeout
.option("kafka.reconnect.backoff.ms", "1000")    // 1 second backoff
.option("kafka.retry.backoff.ms", "1000")        // 1 second retry backoff

Offset Management Issues

// Handle offset out of range errors
.option("failOnDataLoss", "false")               // Continue on data loss
.option("startingOffsets", "latest")             // Start from latest on errors

Performance Issues

// Diagnose slow consumption
.option("kafkaConsumer.pollTimeoutMs", "30000")  // Longer poll timeout
.option("kafka.session.timeout.ms", "60000")     // Longer session timeout
.option("kafka.max.poll.interval.ms", "600000")  // 10 minute max poll interval

Memory Issues

// Reduce memory usage
.option("kafka.fetch.max.bytes", "10485760")     // Reduce to 10MB
.option("kafka.max.partition.fetch.bytes", "1048576") // 1MB per partition
.option("maxOffsetsPerTrigger", "100000")        // Limit batch size