or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

configuration.mddocs/

0

# Configuration

1

2

Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, performance settings, and security configurations.

3

4

## Capabilities

5

6

### Source Configuration Options

7

8

Configuration options for reading data from Kafka in both streaming and batch modes.

9

10

```scala { .api }

11

// Connection Configuration

12

"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses

13

"subscribe" -> "topic1,topic2,topic3" // Topic subscription (comma-separated)

14

"subscribePattern" -> "events_.*" // Topic pattern subscription (regex)

15

"assign" -> """{"topic1":[0,1],"topic2":[0]}""" // Manual partition assignment (JSON)

16

17

// Offset Management

18

"startingOffsets" -> "earliest" // Starting position: "earliest", "latest", or JSON

19

"endingOffsets" -> "latest" // Ending position: "latest" or JSON (batch only)

20

"failOnDataLoss" -> "true" // Fail query on data loss (default: true)

21

22

// Performance Tuning

23

"minPartitions" -> "10" // Minimum Spark partitions

24

"maxOffsetsPerTrigger" -> "1000000" // Rate limiting for streaming

25

"kafkaConsumer.pollTimeoutMs" -> "10000" // Consumer poll timeout

26

```

27

28

### Sink Configuration Options

29

30

Configuration options for writing data to Kafka topics.

31

32

```scala { .api }

33

// Connection Configuration

34

"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses

35

"topic" -> "output-topic" // Default output topic

36

37

// Producer Performance

38

"kafka.acks" -> "all" // Acknowledgment level: "0", "1", "all"

39

"kafka.retries" -> "3" // Number of retries

40

"kafka.batch.size" -> "16384" // Batch size in bytes

41

"kafka.linger.ms" -> "5" // Batching delay in milliseconds

42

"kafka.buffer.memory" -> "33554432" // Total memory for buffering

43

"kafka.compression.type" -> "snappy" // Compression: "none", "gzip", "snappy", "lz4", "zstd"

44

45

// Reliability

46

"kafka.enable.idempotence" -> "true" // Enable idempotent producer

47

"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests

48

"kafka.request.timeout.ms" -> "30000" // Request timeout

49

```

50

51

### Kafka Consumer Parameters

52

53

Complete set of Kafka consumer configuration parameters supported through the `kafka.` prefix.

54

55

```scala { .api }

56

// Core Consumer Settings

57

"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"

58

"kafka.client.id" -> "spark-kafka-consumer"

59

"kafka.session.timeout.ms" -> "30000" // Consumer session timeout

60

"kafka.heartbeat.interval.ms" -> "3000" // Heartbeat interval

61

"kafka.max.poll.records" -> "500" // Records per poll

62

"kafka.max.poll.interval.ms" -> "300000" // Max time between polls

63

64

// Fetch Configuration

65

"kafka.fetch.min.bytes" -> "1" // Minimum bytes to fetch

66

"kafka.fetch.max.wait.ms" -> "500" // Max wait for min bytes

67

"kafka.fetch.max.bytes" -> "52428800" // Maximum bytes per fetch (50MB)

68

"kafka.max.partition.fetch.bytes" -> "1048576" // Maximum bytes per partition (1MB)

69

70

// Network Configuration

71

"kafka.receive.buffer.bytes" -> "65536" // Receive buffer size (64KB)

72

"kafka.send.buffer.bytes" -> "131072" // Send buffer size (128KB)

73

"kafka.request.timeout.ms" -> "30000" // Request timeout

74

"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff

75

"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoff

76

"kafka.retry.backoff.ms" -> "100" // Retry backoff

77

78

// Connection Management

79

"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)

80

"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh interval (5 minutes)

81

```

82

83

### Kafka Producer Parameters

84

85

Complete set of Kafka producer configuration parameters for writing data.

86

87

```scala { .api }

88

// Core Producer Settings

89

"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"

90

"kafka.client.id" -> "spark-kafka-producer"

91

"kafka.acks" -> "all" // "0", "1", or "all"

92

"kafka.retries" -> "2147483647" // Max retries (Integer.MAX_VALUE)

93

"kafka.retry.backoff.ms" -> "100" // Retry backoff

94

95

// Batching and Performance

96

"kafka.batch.size" -> "16384" // Batch size (16KB)

97

"kafka.linger.ms" -> "0" // Batching delay

98

"kafka.buffer.memory" -> "33554432" // Buffer memory (32MB)

99

"kafka.compression.type" -> "none" // Compression type

100

"kafka.max.request.size" -> "1048576" // Max request size (1MB)

101

102

// Idempotence and Ordering

103

"kafka.enable.idempotence" -> "false" // Idempotent producer

104

"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests

105

106

// Timing Configuration

107

"kafka.request.timeout.ms" -> "30000" // Request timeout

108

"kafka.delivery.timeout.ms" -> "120000" // Delivery timeout (2 minutes)

109

"kafka.send.buffer.bytes" -> "131072" // Send buffer (128KB)

110

"kafka.receive.buffer.bytes" -> "32768" // Receive buffer (32KB)

111

112

// Metadata and Connections

113

"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh (5 minutes)

114

"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)

115

"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff

116

"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoff

117

```

118

119

### Security Configuration

120

121

Security configuration for SSL and SASL authentication.

122

123

```scala { .api }

124

// SSL Configuration

125

"kafka.security.protocol" -> "SSL" // Security protocol

126

"kafka.ssl.protocol" -> "TLSv1.2" // SSL protocol version

127

"kafka.ssl.truststore.location" -> "/path/to/truststore.jks"

128

"kafka.ssl.truststore.password" -> "truststore-password"

129

"kafka.ssl.truststore.type" -> "JKS" // Truststore type

130

"kafka.ssl.keystore.location" -> "/path/to/keystore.jks"

131

"kafka.ssl.keystore.password" -> "keystore-password"

132

"kafka.ssl.keystore.type" -> "JKS" // Keystore type

133

"kafka.ssl.key.password" -> "key-password" // Key password

134

135

// SSL Verification

136

"kafka.ssl.endpoint.identification.algorithm" -> "https" // Hostname verification

137

"kafka.ssl.check.hostname" -> "true" // Check hostname

138

139

// SASL Configuration

140

"kafka.security.protocol" -> "SASL_SSL" // SASL with SSL

141

"kafka.sasl.mechanism" -> "PLAIN" // SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI

142

"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";"

143

144

// SASL SCRAM Configuration

145

"kafka.sasl.mechanism" -> "SCRAM-SHA-256"

146

"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"

147

148

// Kerberos (GSSAPI) Configuration

149

"kafka.sasl.mechanism" -> "GSSAPI"

150

"kafka.sasl.kerberos.service.name" -> "kafka"

151

"kafka.sasl.jaas.config" -> "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/path/to/kafka.keytab\" storeKey=true useTicketCache=false principal=\"kafka/hostname@REALM\";"

152

```

153

154

## Configuration Examples

155

156

### Basic Streaming Configuration

157

158

```scala

159

val streamingDF = spark

160

.readStream

161

.format("kafka")

162

.option("kafka.bootstrap.servers", "localhost:9092")

163

.option("subscribe", "input-topic")

164

.option("startingOffsets", "latest")

165

.option("failOnDataLoss", "false")

166

.load()

167

168

val query = streamingDF

169

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

170

.writeStream

171

.format("kafka")

172

.option("kafka.bootstrap.servers", "localhost:9092")

173

.option("topic", "output-topic")

174

.option("checkpointLocation", "/tmp/checkpoint")

175

.start()

176

```

177

178

### High Performance Configuration

179

180

```scala

181

val highPerfDF = spark

182

.readStream

183

.format("kafka")

184

// Connection settings

185

.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")

186

.option("subscribe", "high-volume-topic")

187

188

// Performance tuning

189

.option("minPartitions", "50") // Increase parallelism

190

.option("maxOffsetsPerTrigger", "5000000") // 5M records per batch

191

.option("kafkaConsumer.pollTimeoutMs", "5000") // 5 second timeout

192

193

// Consumer optimization

194

.option("kafka.fetch.min.bytes", "1024") // 1KB minimum fetch

195

.option("kafka.fetch.max.wait.ms", "100") // Fast fetching

196

.option("kafka.max.poll.records", "1000") // More records per poll

197

.option("kafka.receive.buffer.bytes", "262144") // 256KB buffer

198

.option("kafka.fetch.max.bytes", "104857600") // 100MB max fetch

199

200

.load()

201

202

val writeQuery = processedDF

203

.writeStream

204

.format("kafka")

205

.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")

206

.option("topic", "processed-topic")

207

208

// Producer optimization

209

.option("kafka.batch.size", "65536") // 64KB batches

210

.option("kafka.linger.ms", "10") // 10ms batching delay

211

.option("kafka.compression.type", "snappy") // Compression

212

.option("kafka.buffer.memory", "134217728") // 128MB buffer

213

.option("kafka.acks", "1") // Balanced reliability

214

215

.start()

216

```

217

218

### Secure Configuration

219

220

```scala

221

val secureDF = spark

222

.readStream

223

.format("kafka")

224

// Basic connection

225

.option("kafka.bootstrap.servers", "secure-broker1:9093,secure-broker2:9093")

226

.option("subscribe", "secure-topic")

227

228

// SSL Configuration

229

.option("kafka.security.protocol", "SSL")

230

.option("kafka.ssl.truststore.location", "/etc/kafka/truststore.jks")

231

.option("kafka.ssl.truststore.password", "truststore-password")

232

.option("kafka.ssl.keystore.location", "/etc/kafka/keystore.jks")

233

.option("kafka.ssl.keystore.password", "keystore-password")

234

.option("kafka.ssl.key.password", "key-password")

235

.option("kafka.ssl.endpoint.identification.algorithm", "https")

236

237

.load()

238

239

// SASL/SCRAM configuration

240

val saslDF = spark

241

.readStream

242

.format("kafka")

243

.option("kafka.bootstrap.servers", "sasl-broker:9094")

244

.option("subscribe", "authenticated-topic")

245

.option("kafka.security.protocol", "SASL_SSL")

246

.option("kafka.sasl.mechanism", "SCRAM-SHA-256")

247

.option("kafka.sasl.jaas.config",

248

"org.apache.kafka.common.security.scram.ScramLoginModule required " +

249

"username=\"myuser\" password=\"mypassword\";")

250

.load()

251

```

252

253

### Batch Processing Configuration

254

255

```scala

256

val batchDF = spark

257

.read

258

.format("kafka")

259

.option("kafka.bootstrap.servers", "localhost:9092")

260

.option("subscribe", "historical-data")

261

262

// Batch-specific offset configuration

263

.option("startingOffsets", "earliest")

264

.option("endingOffsets", """{"historical-data":{"0":1000000,"1":1500000}}""")

265

266

// Optimize for large batch reads

267

.option("minPartitions", "20")

268

.option("kafka.fetch.max.bytes", "104857600") // 100MB

269

.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB per partition

270

.option("kafka.receive.buffer.bytes", "1048576") // 1MB buffer

271

272

.load()

273

```

274

275

### Multi-Environment Configuration

276

277

```scala

278

import scala.util.Properties

279

280

val environment = Properties.envOrElse("ENVIRONMENT", "dev")

281

282

val envConfig = environment match {

283

case "prod" => Map(

284

"kafka.bootstrap.servers" -> "prod-broker1:9092,prod-broker2:9092,prod-broker3:9092",

285

"kafka.security.protocol" -> "SASL_SSL",

286

"kafka.acks" -> "all",

287

"kafka.retries" -> "10",

288

"failOnDataLoss" -> "true"

289

)

290

291

case "staging" => Map(

292

"kafka.bootstrap.servers" -> "staging-broker:9092",

293

"kafka.acks" -> "1",

294

"failOnDataLoss" -> "true"

295

)

296

297

case _ => Map(

298

"kafka.bootstrap.servers" -> "localhost:9092",

299

"kafka.acks" -> "1",

300

"failOnDataLoss" -> "false"

301

)

302

}

303

304

val configuredDF = spark

305

.readStream

306

.format("kafka")

307

.options(envConfig)

308

.option("subscribe", s"${environment}-events")

309

.load()

310

```

311

312

## Configuration Validation

313

314

### Unsupported Options

315

316

Certain Kafka parameters are managed internally and cannot be overridden:

317

318

```scala { .api }

319

// Automatically managed by Spark (will throw IllegalArgumentException if specified)

320

"kafka.group.id" // Unique group IDs generated per query

321

"kafka.auto.offset.reset" // Controlled via startingOffsets option

322

"kafka.key.deserializer" // Fixed to ByteArrayDeserializer

323

"kafka.value.deserializer" // Fixed to ByteArrayDeserializer

324

"kafka.enable.auto.commit" // Disabled for offset management

325

"kafka.interceptor.classes" // Not supported for safety

326

327

// Producer-specific unsupported options

328

"kafka.key.serializer" // Fixed to ByteArraySerializer

329

"kafka.value.serializer" // Fixed to ByteArraySerializer

330

```

331

332

### Required Options

333

334

```scala { .api }

335

// Required for all operations

336

"kafka.bootstrap.servers" // Must be specified

337

338

// Required for sources (exactly one must be specified)

339

"subscribe" // Topic list

340

"subscribePattern" // Topic pattern

341

"assign" // Partition assignment

342

343

// Schema validation occurs at runtime

344

// DataFrame must have required columns for sinks: "value" (and optionally "key", "topic")

345

```

346

347

### Option Validation Examples

348

349

```scala

350

// Valid consumer strategy

351

spark.readStream.format("kafka")

352

.option("kafka.bootstrap.servers", "localhost:9092")

353

.option("subscribe", "topic1,topic2") // Valid

354

.load()

355

356

// Invalid: multiple strategies

357

spark.readStream.format("kafka")

358

.option("kafka.bootstrap.servers", "localhost:9092")

359

.option("subscribe", "topic1") // Both subscribe

360

.option("subscribePattern", "topic.*") // and pattern specified

361

.load() // Throws IllegalArgumentException

362

363

// Invalid: unsupported parameter

364

spark.readStream.format("kafka")

365

.option("kafka.bootstrap.servers", "localhost:9092")

366

.option("subscribe", "topic1")

367

.option("kafka.group.id", "my-group") // Unsupported

368

.load() // Throws IllegalArgumentException

369

```

370

371

## Performance Tuning Guidelines

372

373

### Consumer Performance

374

375

```scala

376

// Optimize fetch behavior

377

.option("kafka.fetch.min.bytes", "1024") // Wait for 1KB

378

.option("kafka.fetch.max.wait.ms", "500") // Max wait 500ms

379

.option("kafka.max.poll.records", "500") // Records per poll

380

381

// Buffer optimization

382

.option("kafka.receive.buffer.bytes", "65536") // 64KB receive buffer

383

.option("kafka.fetch.max.bytes", "52428800") // 50MB max fetch

384

385

// Partition parallelism

386

.option("minPartitions", "20") // Increase Spark partitions

387

```

388

389

### Producer Performance

390

391

```scala

392

// Batching optimization

393

.option("kafka.batch.size", "32768") // 32KB batches

394

.option("kafka.linger.ms", "10") // 10ms linger time

395

.option("kafka.compression.type", "snappy") // Enable compression

396

397

// Memory and throughput

398

.option("kafka.buffer.memory", "67108864") // 64MB buffer

399

.option("kafka.max.in.flight.requests.per.connection", "5")

400

```

401

402

### Memory Configuration

403

404

```scala

405

// Spark configuration for Kafka workloads

406

spark.conf.set("spark.executor.memory", "8g")

407

spark.conf.set("spark.executor.cores", "4")

408

spark.conf.set("spark.driver.memory", "4g")

409

spark.conf.set("spark.sql.shuffle.partitions", "200")

410

411

// JVM tuning for Kafka clients

412

spark.conf.set("spark.executor.extraJavaOptions",

413

"-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+UnlockExperimentalVMOptions")

414

```

415

416

## Monitoring Configuration

417

418

### Metrics Collection

419

420

```scala

421

// Enable JMX metrics

422

.option("kafka.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter")

423

424

// Custom metrics reporting interval

425

.option("kafka.metrics.sample.window.ms", "30000") // 30 second window

426

.option("kafka.metrics.num.samples", "2") // 2 samples per window

427

```

428

429

### Logging Configuration

430

431

```scala

432

// Enable detailed logging for troubleshooting

433

import org.apache.log4j.{Level, Logger}

434

435

Logger.getLogger("org.apache.spark.sql.kafka010").setLevel(Level.DEBUG)

436

Logger.getLogger("org.apache.kafka").setLevel(Level.INFO)

437

```

438

439

## Troubleshooting Common Issues

440

441

### Connection Issues

442

443

```scala

444

// Increase timeouts for unreliable networks

445

.option("kafka.request.timeout.ms", "60000") // 60 second timeout

446

.option("kafka.reconnect.backoff.ms", "1000") // 1 second backoff

447

.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff

448

```

449

450

### Offset Management Issues

451

452

```scala

453

// Handle offset out of range errors

454

.option("failOnDataLoss", "false") // Continue on data loss

455

.option("startingOffsets", "latest") // Start from latest on errors

456

```

457

458

### Performance Issues

459

460

```scala

461

// Diagnose slow consumption

462

.option("kafkaConsumer.pollTimeoutMs", "30000") // Longer poll timeout

463

.option("kafka.session.timeout.ms", "60000") // Longer session timeout

464

.option("kafka.max.poll.interval.ms", "600000") // 10 minute max poll interval

465

```

466

467

### Memory Issues

468

469

```scala

470

// Reduce memory usage

471

.option("kafka.fetch.max.bytes", "10485760") // Reduce to 10MB

472

.option("kafka.max.partition.fetch.bytes", "1048576") // 1MB per partition

473

.option("maxOffsetsPerTrigger", "100000") // Limit batch size

474

```