or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md

configuration.mddocs/

0

# Configuration Options

1

2

The Spark Kafka connector provides comprehensive configuration options for connection management, performance tuning, security, and reliability. All Kafka client configurations are supported with the `kafka.` prefix.

3

4

## Capabilities

5

6

### Connection Configuration

7

8

Essential connection settings for accessing Kafka clusters.

9

10

```scala { .api }

11

/**

12

* Required connection configuration

13

*/

14

.option("kafka.bootstrap.servers", servers: String) // Required: Comma-separated list of Kafka brokers

15

16

/**

17

* Optional connection settings

18

*/

19

.option("kafka.client.id", clientId: String) // Client identifier for broker logs

20

.option("kafka.request.timeout.ms", timeout: String) // Request timeout in milliseconds

21

.option("kafka.connections.max.idle.ms", idle: String) // Max idle time for connections

22

```

23

24

**Usage Examples:**

25

26

```scala

27

// Basic connection

28

val basicConfig = spark.readStream

29

.format("kafka")

30

.option("kafka.bootstrap.servers", "localhost:9092")

31

.option("subscribe", "events")

32

33

// High availability setup

34

val haConfig = spark.readStream

35

.format("kafka")

36

.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")

37

.option("kafka.client.id", "spark-consumer-app")

38

.option("kafka.request.timeout.ms", "60000") // 60 second timeout

39

.option("kafka.connections.max.idle.ms", "300000") // 5 minute idle timeout

40

.option("subscribe", "critical-events")

41

```

42

43

### Topic Selection Configuration

44

45

Configure how topics are selected and subscribed to.

46

47

```scala { .api }

48

/**

49

* Topic selection options (exactly one required)

50

*/

51

.option("subscribe", topics: String) // Comma-separated topic names

52

.option("subscribepattern", pattern: String) // Regex pattern for topic names

53

.option("assign", partitions: String) // JSON specification of TopicPartitions

54

55

/**

56

* Topic-related settings

57

*/

58

.option("topic", topicName: String) // Default topic for writes (optional)

59

```

60

61

**Usage Examples:**

62

63

```scala

64

// Subscribe to specific topics

65

val topicSubscription = spark.readStream

66

.format("kafka")

67

.option("kafka.bootstrap.servers", "localhost:9092")

68

.option("subscribe", "events,logs,metrics")

69

70

// Pattern-based subscription

71

val patternSubscription = spark.readStream

72

.format("kafka")

73

.option("kafka.bootstrap.servers", "localhost:9092")

74

.option("subscribepattern", "prod-.*-events")

75

76

// Specific partition assignment

77

val partitionAssignment = spark.readStream

78

.format("kafka")

79

.option("kafka.bootstrap.servers", "localhost:9092")

80

.option("assign", """{"events":[0,1,2],"logs":[0,1]}""")

81

```

82

83

### Offset Management Configuration

84

85

Control how offsets are managed and tracked for reading operations.

86

87

```scala { .api }

88

/**

89

* Offset specification for reads

90

*/

91

.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON

92

.option("endingOffsets", offsets: String) // Ending position: "earliest", "latest", or JSON (batch only)

93

94

/**

95

* Timestamp-based offset resolution

96

*/

97

.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)

98

.option("endingTimestamp", timestamp: String) // Global timestamp (ms since epoch, batch only)

99

.option("startingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON)

100

.option("endingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON, batch only)

101

102

/**

103

* Offset resolution strategy

104

*/

105

.option("startingOffsetsByTimestampStrategy", strategy: String) // "ERROR" or "LATEST"

106

```

107

108

**Usage Examples:**

109

110

```scala

111

// Start from earliest available

112

val earliestConfig = spark.readStream

113

.format("kafka")

114

.option("kafka.bootstrap.servers", "localhost:9092")

115

.option("subscribe", "events")

116

.option("startingOffsets", "earliest")

117

118

// Specific offsets per partition

119

val specificOffsets = spark.read

120

.format("kafka")

121

.option("kafka.bootstrap.servers", "localhost:9092")

122

.option("subscribe", "logs")

123

.option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500}}""")

124

.option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500}}""")

125

126

// Timestamp-based reading

127

val timestampConfig = spark.read

128

.format("kafka")

129

.option("kafka.bootstrap.servers", "localhost:9092")

130

.option("subscribe", "events")

131

.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC

132

.option("endingTimestamp", "1641081600000") // Jan 2, 2022 UTC

133

```

134

135

### Performance Configuration

136

137

Tune performance characteristics for streaming and batch operations.

138

139

```scala { .api }

140

/**

141

* Streaming performance options

142

*/

143

.option("maxOffsetsPerTrigger", maxRecords: String) // Maximum records per micro-batch

144

.option("minOffsetsPerTrigger", minRecords: String) // Minimum records before triggering

145

.option("maxTriggerDelay", delay: String) // Maximum delay before triggering (e.g., "30s")

146

147

/**

148

* Batch performance options

149

*/

150

.option("minPartitions", partitions: String) // Minimum Spark partitions for batch reads

151

152

/**

153

* Offset fetching configuration

154

*/

155

.option("fetchOffset.numRetries", retries: String) // Number of retries for offset fetching

156

.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in milliseconds

157

```

158

159

**Usage Examples:**

160

161

```scala

162

// High-throughput streaming configuration

163

val highThroughput = spark.readStream

164

.format("kafka")

165

.option("kafka.bootstrap.servers", "localhost:9092")

166

.option("subscribe", "high-volume-topic")

167

.option("maxOffsetsPerTrigger", "100000") // Max 100K records per batch

168

.option("minOffsetsPerTrigger", "10000") // Min 10K records before processing

169

.option("maxTriggerDelay", "60s") // Process every 60s regardless

170

171

// Batch optimization

172

val batchOptimized = spark.read

173

.format("kafka")

174

.option("kafka.bootstrap.servers", "localhost:9092")

175

.option("subscribe", "large-topic")

176

.option("startingOffsets", "earliest")

177

.option("endingOffsets", "latest")

178

.option("minPartitions", "200") // Force higher parallelism

179

.option("fetchOffset.numRetries", "10") // More retries for reliability

180

.option("fetchOffset.retryIntervalMs", "500") // Wait 500ms between retries

181

```

182

183

### Reliability Configuration

184

185

Configure fault tolerance, error handling, and data consistency.

186

187

```scala { .api }

188

/**

189

* Reliability and fault tolerance options

190

*/

191

.option("failOnDataLoss", failBehavior: String) // "true" or "false"

192

.option("groupIdPrefix", prefix: String) // Consumer group ID prefix

193

.option("includeHeaders", includeHeaders: String) // "true" or "false"

194

195

/**

196

* Consumer polling configuration

197

*/

198

.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeout in milliseconds

199

```

200

201

**Usage Examples:**

202

203

```scala

204

// Strict reliability mode

205

val strictMode = spark.readStream

206

.format("kafka")

207

.option("kafka.bootstrap.servers", "localhost:9092")

208

.option("subscribe", "critical-events")

209

.option("failOnDataLoss", "true") // Fail query on any data loss

210

.option("groupIdPrefix", "critical-app") // Custom consumer group prefix

211

.option("includeHeaders", "true") // Include message headers

212

213

// Permissive mode for non-critical data

214

val permissiveMode = spark.readStream

215

.format("kafka")

216

.option("kafka.bootstrap.servers", "localhost:9092")

217

.option("subscribe", "logs")

218

.option("failOnDataLoss", "false") // Continue despite data loss

219

.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout

220

```

221

222

### Security Configuration

223

224

Configure authentication, encryption, and access control.

225

226

```scala { .api }

227

/**

228

* Security protocol configuration

229

*/

230

.option("kafka.security.protocol", protocol: String) // "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"

231

232

/**

233

* SASL Authentication

234

*/

235

.option("kafka.sasl.mechanism", mechanism: String) // "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI"

236

.option("kafka.sasl.jaas.config", jaasConfig: String) // JAAS configuration string

237

238

/**

239

* SSL Configuration

240

*/

241

.option("kafka.ssl.truststore.location", path: String) // Truststore file path

242

.option("kafka.ssl.truststore.password", password: String) // Truststore password

243

.option("kafka.ssl.keystore.location", path: String) // Keystore file path (for client auth)

244

.option("kafka.ssl.keystore.password", password: String) // Keystore password

245

.option("kafka.ssl.key.password", password: String) // Key password

246

```

247

248

**Usage Examples:**

249

250

```scala

251

// SASL/PLAIN authentication

252

val saslConfig = spark.readStream

253

.format("kafka")

254

.option("kafka.bootstrap.servers", "secure-kafka:9093")

255

.option("kafka.security.protocol", "SASL_PLAINTEXT")

256

.option("kafka.sasl.mechanism", "PLAIN")

257

.option("kafka.sasl.jaas.config",

258

"org.apache.kafka.common.security.plain.PlainLoginModule required " +

259

"username='consumer' password='consumer-secret';")

260

.option("subscribe", "secure-topic")

261

262

// SSL with mutual authentication

263

val sslConfig = spark.readStream

264

.format("kafka")

265

.option("kafka.bootstrap.servers", "ssl-kafka:9094")

266

.option("kafka.security.protocol", "SSL")

267

.option("kafka.ssl.truststore.location", "/path/to/kafka.client.truststore.jks")

268

.option("kafka.ssl.truststore.password", "truststore-password")

269

.option("kafka.ssl.keystore.location", "/path/to/kafka.client.keystore.jks")

270

.option("kafka.ssl.keystore.password", "keystore-password")

271

.option("kafka.ssl.key.password", "key-password")

272

.option("subscribe", "ssl-topic")

273

274

// SASL/SSL combination

275

val saslSslConfig = spark.readStream

276

.format("kafka")

277

.option("kafka.bootstrap.servers", "secure-kafka:9095")

278

.option("kafka.security.protocol", "SASL_SSL")

279

.option("kafka.sasl.mechanism", "SCRAM-SHA-256")

280

.option("kafka.sasl.jaas.config",

281

"org.apache.kafka.common.security.scram.ScramLoginModule required " +

282

"username='app-user' password='app-password';")

283

.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")

284

.option("kafka.ssl.truststore.password", "truststore-password")

285

.option("subscribe", "encrypted-topic")

286

```

287

288

### Producer Configuration (Write Operations)

289

290

Configure Kafka producer settings for write operations.

291

292

```scala { .api }

293

/**

294

* Producer reliability configuration

295

*/

296

.option("kafka.acks", acks: String) // "0", "1", or "all"

297

.option("kafka.retries", retries: String) // Number of retry attempts

298

.option("kafka.enable.idempotence", idempotent: String) // "true" or "false"

299

300

/**

301

* Producer performance configuration

302

*/

303

.option("kafka.batch.size", batchSize: String) // Batch size in bytes

304

.option("kafka.linger.ms", lingerMs: String) // Batching delay in milliseconds

305

.option("kafka.buffer.memory", bufferMemory: String) // Total memory for buffering

306

.option("kafka.compression.type", compression: String) // "none", "gzip", "snappy", "lz4", "zstd"

307

308

/**

309

* Producer connection configuration

310

*/

311

.option("kafka.max.in.flight.requests.per.connection", maxInflight: String) // Max unacked requests

312

.option("kafka.request.timeout.ms", timeout: String) // Request timeout

313

.option("kafka.delivery.timeout.ms", timeout: String) // Total delivery timeout

314

```

315

316

**Usage Examples:**

317

318

```scala

319

// High-reliability producer configuration

320

val reliableProducer = dataFrame.write

321

.format("kafka")

322

.option("kafka.bootstrap.servers", "localhost:9092")

323

.option("topic", "critical-events")

324

.option("kafka.acks", "all") // Wait for all replicas

325

.option("kafka.retries", "10") // Retry up to 10 times

326

.option("kafka.enable.idempotence", "true") // Prevent duplicates

327

.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain ordering

328

329

// High-throughput producer configuration

330

val highThroughputProducer = dataFrame.write

331

.format("kafka")

332

.option("kafka.bootstrap.servers", "localhost:9092")

333

.option("topic", "high-volume-events")

334

.option("kafka.acks", "1") // Faster acknowledgment

335

.option("kafka.batch.size", "131072") // 128KB batches

336

.option("kafka.linger.ms", "100") // 100ms batching delay

337

.option("kafka.compression.type", "lz4") // Fast compression

338

.option("kafka.buffer.memory", "134217728") // 128MB buffer

339

340

// Balanced configuration

341

val balancedProducer = dataFrame.write

342

.format("kafka")

343

.option("kafka.bootstrap.servers", "localhost:9092")

344

.option("topic", "standard-events")

345

.option("kafka.acks", "1") // Leader acknowledgment

346

.option("kafka.retries", "3") // Moderate retry count

347

.option("kafka.batch.size", "16384") // 16KB batches

348

.option("kafka.linger.ms", "10") // 10ms batching delay

349

.option("kafka.compression.type", "snappy") // Good compression ratio

350

```

351

352

### Consumer Configuration (Read Operations)

353

354

Configure Kafka consumer settings for read operations (advanced use cases).

355

356

```scala { .api }

357

/**

358

* Consumer session and heartbeat configuration

359

*/

360

.option("kafka.session.timeout.ms", timeout: String) // Session timeout

361

.option("kafka.heartbeat.interval.ms", interval: String) // Heartbeat interval

362

.option("kafka.max.poll.interval.ms", interval: String) // Max time between polls

363

364

/**

365

* Consumer fetch configuration

366

*/

367

.option("kafka.fetch.min.bytes", minBytes: String) // Minimum fetch size

368

.option("kafka.fetch.max.wait.ms", waitMs: String) // Maximum fetch wait time

369

.option("kafka.max.partition.fetch.bytes", maxBytes: String) // Max bytes per partition

370

```

371

372

**Usage Examples:**

373

374

```scala

375

// Low-latency consumer configuration

376

val lowLatencyConsumer = spark.readStream

377

.format("kafka")

378

.option("kafka.bootstrap.servers", "localhost:9092")

379

.option("subscribe", "real-time-events")

380

.option("kafka.fetch.min.bytes", "1") // Fetch immediately

381

.option("kafka.fetch.max.wait.ms", "100") // Max 100ms wait

382

.option("kafka.session.timeout.ms", "10000") // 10s session timeout

383

.option("kafka.heartbeat.interval.ms", "3000") // 3s heartbeat

384

385

// High-throughput consumer configuration

386

val highThroughputConsumer = spark.readStream

387

.format("kafka")

388

.option("kafka.bootstrap.servers", "localhost:9092")

389

.option("subscribe", "bulk-data")

390

.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch

391

.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB max per partition

392

.option("kafka.session.timeout.ms", "30000") // 30s session timeout

393

```

394

395

## Configuration Best Practices

396

397

### Development Environment

398

399

```scala

400

// Development configuration - prioritize ease of debugging

401

val devConfig = Map(

402

"kafka.bootstrap.servers" -> "localhost:9092",

403

"failOnDataLoss" -> "false", // Continue despite data issues

404

"startingOffsets" -> "earliest", // Read all available data

405

"includeHeaders" -> "true", // Include headers for debugging

406

"maxOffsetsPerTrigger" -> "1000" // Small batches for testing

407

)

408

```

409

410

### Production Environment

411

412

```scala

413

// Production configuration - prioritize reliability and performance

414

val prodConfig = Map(

415

"kafka.bootstrap.servers" -> "kafka1:9092,kafka2:9092,kafka3:9092",

416

"failOnDataLoss" -> "true", // Strict data consistency

417

"groupIdPrefix" -> "prod-spark-app", // Identifiable consumer groups

418

"maxOffsetsPerTrigger" -> "100000", // Larger batches for efficiency

419

"kafka.session.timeout.ms" -> "30000", // Longer session timeout

420

"kafka.request.timeout.ms" -> "60000", // Longer request timeout

421

"fetchOffset.numRetries" -> "5", // More retries

422

"fetchOffset.retryIntervalMs" -> "1000" // Longer retry intervals

423

)

424

```

425

426

### Security-First Configuration

427

428

```scala

429

// Security-focused configuration

430

val secureConfig = Map(

431

"kafka.bootstrap.servers" -> "secure-kafka:9093",

432

"kafka.security.protocol" -> "SASL_SSL",

433

"kafka.sasl.mechanism" -> "SCRAM-SHA-256",

434

"kafka.sasl.jaas.config" ->

435

"org.apache.kafka.common.security.scram.ScramLoginModule required " +

436

"username='spark-user' password='${KAFKA_PASSWORD}';",

437

"kafka.ssl.truststore.location" -> "/etc/kafka/ssl/truststore.jks",

438

"kafka.ssl.truststore.password" -> "${TRUSTSTORE_PASSWORD}",

439

"kafka.ssl.endpoint.identification.algorithm" -> "https"

440

)

441

```

442

443

## Common Configuration Patterns

444

445

### Auto Scaling Configuration

446

447

```scala

448

// Configuration that adapts to load

449

val autoScalingConfig = spark.readStream

450

.format("kafka")

451

.option("kafka.bootstrap.servers", brokers)

452

.option("subscribe", topics)

453

.option("minOffsetsPerTrigger", "1000") // Process at least 1K records

454

.option("maxOffsetsPerTrigger", "50000") // But no more than 50K

455

.option("maxTriggerDelay", "30s") // Force processing every 30s

456

```

457

458

### Multi-Region Configuration

459

460

```scala

461

// Configuration for multi-region deployment

462

val multiRegionConfig = Map(

463

"kafka.bootstrap.servers" ->

464

"us-kafka1:9092,us-kafka2:9092,eu-kafka1:9092,eu-kafka2:9092",

465

"kafka.client.id" -> s"spark-${region}-${applicationId}",

466

"kafka.request.timeout.ms" -> "120000", // Longer timeout for cross-region

467

"kafka.session.timeout.ms" -> "60000", // Longer session timeout

468

"fetchOffset.numRetries" -> "10", // More retries for network issues

469

"fetchOffset.retryIntervalMs" -> "2000" // Longer retry intervals

470

)

471

```

472

473

### Schema Registry Integration

474

475

```scala

476

// Configuration for Confluent Schema Registry

477

val schemaRegistryConfig = Map(

478

"kafka.bootstrap.servers" -> "kafka:9092",

479

"kafka.schema.registry.url" -> "http://schema-registry:8081",

480

"kafka.schema.registry.basic.auth.user.info" -> "user:password",

481

"kafka.key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",

482

"kafka.value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer"

483

)

484

```