or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md
tile.json

configuration.mddocs/

Configuration Options

The Spark Kafka connector provides comprehensive configuration options for connection management, performance tuning, security, and reliability. All Kafka client configurations are supported with the kafka. prefix.

Capabilities

Connection Configuration

Essential connection settings for accessing Kafka clusters.

/**
 * Required connection configuration
 */
.option("kafka.bootstrap.servers", servers: String)  // Required: Comma-separated list of Kafka brokers

/**
 * Optional connection settings
 */
.option("kafka.client.id", clientId: String)         // Client identifier for broker logs
.option("kafka.request.timeout.ms", timeout: String) // Request timeout in milliseconds
.option("kafka.connections.max.idle.ms", idle: String) // Max idle time for connections

Usage Examples:

// Basic connection
val basicConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")

// High availability setup
val haConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
  .option("kafka.client.id", "spark-consumer-app")
  .option("kafka.request.timeout.ms", "60000")      // 60 second timeout
  .option("kafka.connections.max.idle.ms", "300000") // 5 minute idle timeout
  .option("subscribe", "critical-events")

Topic Selection Configuration

Configure how topics are selected and subscribed to.

/**
 * Topic selection options (exactly one required)
 */
.option("subscribe", topics: String)           // Comma-separated topic names
.option("subscribepattern", pattern: String)   // Regex pattern for topic names
.option("assign", partitions: String)          // JSON specification of TopicPartitions

/**
 * Topic-related settings
 */
.option("topic", topicName: String)            // Default topic for writes (optional)

Usage Examples:

// Subscribe to specific topics
val topicSubscription = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events,logs,metrics")

// Pattern-based subscription
val patternSubscription = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribepattern", "prod-.*-events")

// Specific partition assignment
val partitionAssignment = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"events":[0,1,2],"logs":[0,1]}""")

Offset Management Configuration

Control how offsets are managed and tracked for reading operations.

/**
 * Offset specification for reads
 */
.option("startingOffsets", offsets: String)    // Starting position: "earliest", "latest", or JSON
.option("endingOffsets", offsets: String)      // Ending position: "earliest", "latest", or JSON (batch only)

/**
 * Timestamp-based offset resolution
 */
.option("startingTimestamp", timestamp: String)           // Global timestamp (ms since epoch)
.option("endingTimestamp", timestamp: String)             // Global timestamp (ms since epoch, batch only)
.option("startingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON)
.option("endingOffsetsByTimestamp", timestamps: String)   // Per-partition timestamps (JSON, batch only)

/**
 * Offset resolution strategy
 */
.option("startingOffsetsByTimestampStrategy", strategy: String) // "ERROR" or "LATEST"

Usage Examples:

// Start from earliest available
val earliestConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "earliest")

// Specific offsets per partition
val specificOffsets = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500}}""")
  .option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500}}""")

// Timestamp-based reading
val timestampConfig = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingTimestamp", "1640995200000")  // Jan 1, 2022 UTC
  .option("endingTimestamp", "1641081600000")    // Jan 2, 2022 UTC

Performance Configuration

Tune performance characteristics for streaming and batch operations.

/**
 * Streaming performance options
 */
.option("maxOffsetsPerTrigger", maxRecords: String)    // Maximum records per micro-batch
.option("minOffsetsPerTrigger", minRecords: String)    // Minimum records before triggering
.option("maxTriggerDelay", delay: String)              // Maximum delay before triggering (e.g., "30s")

/**
 * Batch performance options  
 */
.option("minPartitions", partitions: String)           // Minimum Spark partitions for batch reads

/**
 * Offset fetching configuration
 */
.option("fetchOffset.numRetries", retries: String)     // Number of retries for offset fetching
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in milliseconds

Usage Examples:

// High-throughput streaming configuration
val highThroughput = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("maxOffsetsPerTrigger", "100000")      // Max 100K records per batch
  .option("minOffsetsPerTrigger", "10000")       // Min 10K records before processing
  .option("maxTriggerDelay", "60s")              // Process every 60s regardless

// Batch optimization
val batchOptimized = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("minPartitions", "200")               // Force higher parallelism
  .option("fetchOffset.numRetries", "10")       // More retries for reliability
  .option("fetchOffset.retryIntervalMs", "500") // Wait 500ms between retries

Reliability Configuration

Configure fault tolerance, error handling, and data consistency.

/**
 * Reliability and fault tolerance options
 */
.option("failOnDataLoss", failBehavior: String)        // "true" or "false"
.option("groupIdPrefix", prefix: String)               // Consumer group ID prefix
.option("includeHeaders", includeHeaders: String)      // "true" or "false"

/**
 * Consumer polling configuration
 */
.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeout in milliseconds

Usage Examples:

// Strict reliability mode
val strictMode = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-events")
  .option("failOnDataLoss", "true")              // Fail query on any data loss
  .option("groupIdPrefix", "critical-app")       // Custom consumer group prefix
  .option("includeHeaders", "true")              // Include message headers

// Permissive mode for non-critical data
val permissiveMode = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("failOnDataLoss", "false")             // Continue despite data loss
  .option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout

Security Configuration

Configure authentication, encryption, and access control.

/**
 * Security protocol configuration
 */
.option("kafka.security.protocol", protocol: String)   // "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"

/**
 * SASL Authentication
 */
.option("kafka.sasl.mechanism", mechanism: String)     // "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI"
.option("kafka.sasl.jaas.config", jaasConfig: String)  // JAAS configuration string

/**
 * SSL Configuration
 */
.option("kafka.ssl.truststore.location", path: String)       // Truststore file path
.option("kafka.ssl.truststore.password", password: String)   // Truststore password
.option("kafka.ssl.keystore.location", path: String)         // Keystore file path (for client auth)
.option("kafka.ssl.keystore.password", password: String)     // Keystore password
.option("kafka.ssl.key.password", password: String)          // Key password

Usage Examples:

// SASL/PLAIN authentication
val saslConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "secure-kafka:9093")
  .option("kafka.security.protocol", "SASL_PLAINTEXT")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username='consumer' password='consumer-secret';")
  .option("subscribe", "secure-topic")

// SSL with mutual authentication
val sslConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "ssl-kafka:9094")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .option("kafka.ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
  .option("kafka.ssl.keystore.password", "keystore-password")
  .option("kafka.ssl.key.password", "key-password")
  .option("subscribe", "ssl-topic")

// SASL/SSL combination
val saslSslConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "secure-kafka:9095")
  .option("kafka.security.protocol", "SASL_SSL") 
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  .option("kafka.sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username='app-user' password='app-password';")
  .option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
  .option("kafka.ssl.truststore.password", "truststore-password")
  .option("subscribe", "encrypted-topic")

Producer Configuration (Write Operations)

Configure Kafka producer settings for write operations.

/**
 * Producer reliability configuration
 */
.option("kafka.acks", acks: String)                    // "0", "1", or "all"
.option("kafka.retries", retries: String)              // Number of retry attempts
.option("kafka.enable.idempotence", idempotent: String) // "true" or "false"

/**
 * Producer performance configuration
 */
.option("kafka.batch.size", batchSize: String)         // Batch size in bytes
.option("kafka.linger.ms", lingerMs: String)           // Batching delay in milliseconds
.option("kafka.buffer.memory", bufferMemory: String)   // Total memory for buffering
.option("kafka.compression.type", compression: String)  // "none", "gzip", "snappy", "lz4", "zstd"

/**
 * Producer connection configuration
 */
.option("kafka.max.in.flight.requests.per.connection", maxInflight: String) // Max unacked requests
.option("kafka.request.timeout.ms", timeout: String)   // Request timeout
.option("kafka.delivery.timeout.ms", timeout: String)  // Total delivery timeout

Usage Examples:

// High-reliability producer configuration
val reliableProducer = dataFrame.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "critical-events")
  .option("kafka.acks", "all")                          // Wait for all replicas
  .option("kafka.retries", "10")                        // Retry up to 10 times
  .option("kafka.enable.idempotence", "true")           // Prevent duplicates
  .option("kafka.max.in.flight.requests.per.connection", "1") // Maintain ordering

// High-throughput producer configuration
val highThroughputProducer = dataFrame.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "high-volume-events")
  .option("kafka.acks", "1")                            // Faster acknowledgment
  .option("kafka.batch.size", "131072")                 // 128KB batches
  .option("kafka.linger.ms", "100")                     // 100ms batching delay
  .option("kafka.compression.type", "lz4")              // Fast compression
  .option("kafka.buffer.memory", "134217728")           // 128MB buffer

// Balanced configuration
val balancedProducer = dataFrame.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "standard-events")
  .option("kafka.acks", "1")                            // Leader acknowledgment
  .option("kafka.retries", "3")                         // Moderate retry count
  .option("kafka.batch.size", "16384")                  // 16KB batches
  .option("kafka.linger.ms", "10")                      // 10ms batching delay
  .option("kafka.compression.type", "snappy")           // Good compression ratio

Consumer Configuration (Read Operations)

Configure Kafka consumer settings for read operations (advanced use cases).

/**
 * Consumer session and heartbeat configuration
 */
.option("kafka.session.timeout.ms", timeout: String)       // Session timeout
.option("kafka.heartbeat.interval.ms", interval: String)   // Heartbeat interval
.option("kafka.max.poll.interval.ms", interval: String)    // Max time between polls

/**
 * Consumer fetch configuration
 */
.option("kafka.fetch.min.bytes", minBytes: String)         // Minimum fetch size
.option("kafka.fetch.max.wait.ms", waitMs: String)         // Maximum fetch wait time
.option("kafka.max.partition.fetch.bytes", maxBytes: String) // Max bytes per partition

Usage Examples:

// Low-latency consumer configuration
val lowLatencyConsumer = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "real-time-events")
  .option("kafka.fetch.min.bytes", "1")                 // Fetch immediately
  .option("kafka.fetch.max.wait.ms", "100")             // Max 100ms wait
  .option("kafka.session.timeout.ms", "10000")          // 10s session timeout
  .option("kafka.heartbeat.interval.ms", "3000")        // 3s heartbeat

// High-throughput consumer configuration  
val highThroughputConsumer = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "bulk-data")
  .option("kafka.fetch.min.bytes", "1048576")           // 1MB minimum fetch
  .option("kafka.max.partition.fetch.bytes", "10485760") // 10MB max per partition
  .option("kafka.session.timeout.ms", "30000")          // 30s session timeout

Configuration Best Practices

Development Environment

// Development configuration - prioritize ease of debugging
val devConfig = Map(
  "kafka.bootstrap.servers" -> "localhost:9092",
  "failOnDataLoss" -> "false",                    // Continue despite data issues
  "startingOffsets" -> "earliest",                // Read all available data
  "includeHeaders" -> "true",                     // Include headers for debugging
  "maxOffsetsPerTrigger" -> "1000"               // Small batches for testing
)

Production Environment

// Production configuration - prioritize reliability and performance
val prodConfig = Map(
  "kafka.bootstrap.servers" -> "kafka1:9092,kafka2:9092,kafka3:9092",
  "failOnDataLoss" -> "true",                     // Strict data consistency
  "groupIdPrefix" -> "prod-spark-app",            // Identifiable consumer groups
  "maxOffsetsPerTrigger" -> "100000",            // Larger batches for efficiency
  "kafka.session.timeout.ms" -> "30000",         // Longer session timeout
  "kafka.request.timeout.ms" -> "60000",         // Longer request timeout
  "fetchOffset.numRetries" -> "5",               // More retries
  "fetchOffset.retryIntervalMs" -> "1000"        // Longer retry intervals
)

Security-First Configuration

// Security-focused configuration
val secureConfig = Map(
  "kafka.bootstrap.servers" -> "secure-kafka:9093",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-256",
  "kafka.sasl.jaas.config" -> 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username='spark-user' password='${KAFKA_PASSWORD}';",
  "kafka.ssl.truststore.location" -> "/etc/kafka/ssl/truststore.jks",
  "kafka.ssl.truststore.password" -> "${TRUSTSTORE_PASSWORD}",
  "kafka.ssl.endpoint.identification.algorithm" -> "https"
)

Common Configuration Patterns

Auto Scaling Configuration

// Configuration that adapts to load
val autoScalingConfig = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topics)
  .option("minOffsetsPerTrigger", "1000")        // Process at least 1K records
  .option("maxOffsetsPerTrigger", "50000")       // But no more than 50K
  .option("maxTriggerDelay", "30s")              // Force processing every 30s

Multi-Region Configuration

// Configuration for multi-region deployment
val multiRegionConfig = Map(
  "kafka.bootstrap.servers" -> 
    "us-kafka1:9092,us-kafka2:9092,eu-kafka1:9092,eu-kafka2:9092",
  "kafka.client.id" -> s"spark-${region}-${applicationId}",
  "kafka.request.timeout.ms" -> "120000",        // Longer timeout for cross-region
  "kafka.session.timeout.ms" -> "60000",         // Longer session timeout
  "fetchOffset.numRetries" -> "10",              // More retries for network issues
  "fetchOffset.retryIntervalMs" -> "2000"        // Longer retry intervals
)

Schema Registry Integration

// Configuration for Confluent Schema Registry
val schemaRegistryConfig = Map(
  "kafka.bootstrap.servers" -> "kafka:9092",
  "kafka.schema.registry.url" -> "http://schema-registry:8081",
  "kafka.schema.registry.basic.auth.user.info" -> "user:password",
  "kafka.key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "kafka.value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer"
)