or run

tessl search
Log in

Version

Files

docs

batch-operations.mdconnection-configuration.mderror-handling-reliability.mdindex.mdoffset-management.mdschema-data-format.mdstreaming-operations.md
tile.json

connection-configuration.mddocs/

Connection Configuration

Configuration options for connecting to Kafka clusters and managing consumer/producer behavior.

Capabilities

Bootstrap Servers

Required configuration for connecting to Kafka brokers.

/**
 * Kafka bootstrap servers configuration (required)
 * Comma-separated list of Kafka broker addresses
 */
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()

Consumer Strategies

Different strategies for subscribing to Kafka topics. You must specify exactly one strategy.

Subscribe Strategy

Subscribe to a fixed collection of topics.

/**
 * Subscribe to specific topics by name
 * @param topics - Comma-separated list of topic names
 */
.option("subscribe", "topic1,topic2,topic3")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "orders,payments,inventory")
  .load()

Subscribe Pattern Strategy

Subscribe to topics using a regular expression pattern.

/**
 * Subscribe to topics matching a regex pattern
 * @param pattern - Regular expression for topic names
 */
.option("subscribePattern", "logs-.*")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "events-\\d{4}-\\d{2}")  // matches events-2023-01, etc.
  .load()

Assign Strategy

Assign specific topic partitions directly.

/**
 * Assign specific topic partitions
 * @param partitions - JSON object mapping topics to partition arrays
 */
.option("assign", """{"topic1":[0,1,2],"topic2":[0]}""")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"high-priority":[0,1],"normal":[0]}""")
  .load()

Group ID Configuration

Control consumer group behavior for streaming queries.

/**
 * Prefix for automatically generated consumer group IDs
 * Default: "spark-kafka-source" for streaming, "spark-kafka-relation" for batch
 */
.option("groupIdPrefix", "my-app")

/**
 * Custom group ID (not recommended for concurrent queries)
 * Warning: Multiple queries with same group ID will interfere
 */
.option("kafka.group.id", "custom-group")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("groupIdPrefix", "analytics-pipeline")
  .load()

Kafka Client Configuration

Pass-through configuration for underlying Kafka clients using kafka. prefix.

/**
 * Consumer configuration options
 * All options prefixed with "kafka." are passed to KafkaConsumer
 */
.option("kafka.session.timeout.ms", "30000")
.option("kafka.request.timeout.ms", "40000")
.option("kafka.max.poll.records", "1000")
.option("kafka.fetch.min.bytes", "1024")
.option("kafka.receive.buffer.bytes", "65536")

/**
 * Producer configuration options (for writing)
 * All options prefixed with "kafka." are passed to KafkaProducer
 */  
.option("kafka.acks", "all")
.option("kafka.retries", "5")
.option("kafka.batch.size", "16384")
.option("kafka.linger.ms", "5")

Restricted Options:

The following Kafka options are not supported and will cause errors:

  • kafka.group.id - Use groupIdPrefix instead
  • kafka.auto.offset.reset - Use startingOffsets instead
  • kafka.key.deserializer - Fixed to ByteArrayDeserializer
  • kafka.value.deserializer - Fixed to ByteArrayDeserializer
  • kafka.key.serializer - Fixed to ByteArraySerializer (writing)
  • kafka.value.serializer - Fixed to ByteArraySerializer (writing)
  • kafka.enable.auto.commit - Managed by Spark
  • kafka.interceptor.classes - Not supported

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("kafka.session.timeout.ms", "10000")
  .option("kafka.max.poll.records", "500")
  .option("kafka.fetch.min.bytes", "50000")
  .load()

Security Configuration

Authentication and encryption settings for secure Kafka clusters.

/**
 * Security protocol configuration
 */
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")

/**
 * SSL configuration
 */
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
.option("kafka.ssl.keystore.password", "keystore-password")

Usage Example:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "secure-broker:9093")
  .option("subscribe", "secure-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  .option("kafka.sasl.jaas.config", 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"myuser\" password=\"mypassword\";")
  .load()

Types

// Consumer strategy types
sealed trait ConsumerStrategy {
  def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy  
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy