or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

writing.mddocs/

0

# Writing to Kafka

1

2

The Kafka connector provides comprehensive writing capabilities for both batch and streaming scenarios, with support for topic routing, producer pooling, and flexible data formatting.

3

4

## Capabilities

5

6

### KafkaWriter

7

8

Core writer object providing the main logic for batch and streaming writes to Kafka.

9

10

```scala { .api }

11

/**

12

* Core writer functionality for batch and streaming writes to Kafka

13

* Handles data validation, topic routing, and producer management

14

*/

15

object KafkaWriter {

16

17

/** Column names for Kafka message attributes */

18

val TOPIC_ATTRIBUTE_NAME: String = "topic"

19

val KEY_ATTRIBUTE_NAME: String = "key"

20

val VALUE_ATTRIBUTE_NAME: String = "value"

21

val HEADERS_ATTRIBUTE_NAME: String = "headers"

22

val PARTITION_ATTRIBUTE_NAME: String = "partition"

23

24

/**

25

* Validates query plan and writes data to Kafka

26

* Main entry point for all Kafka write operations

27

*/

28

def write(

29

sparkSession: SparkSession,

30

queryExecution: QueryExecution,

31

kafkaParams: ju.Map[String, Object],

32

topic: Option[String]

33

): Unit

34

35

/** Validates DataFrame schema and configuration */

36

def validateQuery(

37

schema: Seq[Attribute],

38

kafkaParameters: ju.Map[String, Object],

39

topic: Option[String]

40

): Unit

41

42

/** Creates expression for topic routing */

43

def topicExpression(schema: Seq[Attribute], topic: Option[String]): Expression

44

45

/** Creates expression for message key extraction */

46

def keyExpression(schema: Seq[Attribute]): Expression

47

48

/** Creates expression for message value extraction */

49

def valueExpression(schema: Seq[Attribute]): Expression

50

51

/** Creates expression for headers extraction */

52

def headersExpression(schema: Seq[Attribute]): Expression

53

54

/** Creates expression for partition assignment */

55

def partitionExpression(schema: Seq[Attribute]): Expression

56

}

57

```

58

59

**Usage Examples:**

60

61

```scala

62

// Basic write with topic specified in options

63

dataFrame

64

.write

65

.format("kafka")

66

.option("kafka.bootstrap.servers", "localhost:9092")

67

.option("topic", "output-topic")

68

.save()

69

70

// Write with topic column for dynamic routing

71

dataFrame

72

.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")

73

.write

74

.format("kafka")

75

.option("kafka.bootstrap.servers", "localhost:9092")

76

.save()

77

```

78

79

### KafkaWrite

80

81

V2 DataSource write implementation supporting both batch and streaming modes.

82

83

```scala { .api }

84

/**

85

* V2 DataSource write implementation for Kafka

86

* Supports both batch and streaming write operations

87

*/

88

case class KafkaWrite(

89

topic: Option[String],

90

producerParams: ju.Map[String, Object],

91

schema: StructType

92

) extends Write {

93

94

/** Returns description of the write operation */

95

def description(): String = "KafkaWrite"

96

97

/** Creates batch write implementation */

98

def toBatch: BatchWrite = new KafkaBatchWrite(topic, producerParams, schema)

99

100

/** Creates streaming write implementation */

101

def toStreaming: StreamingWrite = new KafkaStreamingWrite(topic, producerParams, schema)

102

}

103

```

104

105

### KafkaSink

106

107

Legacy V1 DataSource streaming sink implementation.

108

109

```scala { .api }

110

/**

111

* V1 DataSource streaming sink for writing to Kafka

112

* Provides backward compatibility with Structured Streaming V1 API

113

*/

114

class KafkaSink(

115

sqlContext: SQLContext,

116

kafkaParams: ju.Map[String, Object],

117

topic: Option[String]

118

) extends Sink {

119

120

/** Adds a batch of data to Kafka */

121

def addBatch(batchId: Long, data: DataFrame): Unit

122

}

123

```

124

125

## DataFrame Schema Requirements

126

127

### Required Columns

128

129

The DataFrame must contain specific columns for Kafka message construction:

130

131

```scala

132

// Required: value column (message payload)

133

val validDataFrame = spark.createDataFrame(Seq(

134

("Hello World",)

135

)).toDF("value")

136

137

// Write with just value

138

validDataFrame

139

.write

140

.format("kafka")

141

.option("kafka.bootstrap.servers", "localhost:9092")

142

.option("topic", "messages")

143

.save()

144

```

145

146

### Optional Columns

147

148

Additional columns provide more control over message attributes:

149

150

```scala

151

// Full control with all optional columns

152

val fullControlDataFrame = spark.createDataFrame(Seq(

153

("user-events", "user123", """{"event":"login","timestamp":1234567890}""", 0, Array(("correlation-id", "abc123".getBytes), ("source", "web".getBytes))),

154

("user-events", "user456", """{"event":"logout","timestamp":1234567891}""", 1, Array(("correlation-id", "def456".getBytes), ("source", "mobile".getBytes)))

155

)).toDF("topic", "key", "value", "partition", "headers")

156

157

fullControlDataFrame

158

.write

159

.format("kafka")

160

.option("kafka.bootstrap.servers", "localhost:9092")

161

.save() // No topic option needed - using topic column

162

```

163

164

### Column Types and Conversion

165

166

```scala { .api }

167

// Expected column types:

168

// topic: StringType (optional - can use option instead)

169

// key: StringType or BinaryType (optional - null if not provided)

170

// value: StringType or BinaryType (required)

171

// partition: IntegerType (optional - Kafka will assign if not provided)

172

// headers: ArrayType of StructType with "key" (StringType) and "value" (BinaryType) (optional)

173

```

174

175

**Type Conversion Examples:**

176

177

```scala

178

// Convert different types to appropriate Kafka format

179

val typedDataFrame = originalDataFrame

180

.select(

181

col("topic"),

182

col("user_id").cast(StringType).as("key"), // Convert to string key

183

to_json(struct(col("*"))).as("value"), // Convert struct to JSON value

184

(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition"), // Partition by user_id mod 10

185

array(

186

struct(lit("content-type").as("key"), lit("application/json").cast(BinaryType).as("value")),

187

struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value"))

188

).as("headers")

189

)

190

```

191

192

## Topic Routing

193

194

### Static Topic Assignment

195

196

Specify topic in write options:

197

198

```scala

199

// All records go to the same topic

200

dataFrame

201

.select(col("value"))

202

.write

203

.format("kafka")

204

.option("kafka.bootstrap.servers", "localhost:9092")

205

.option("topic", "static-topic")

206

.save()

207

```

208

209

### Dynamic Topic Routing

210

211

Use topic column for per-record topic routing:

212

213

```scala

214

// Route records to different topics based on data

215

val routedDataFrame = sourceDataFrame

216

.withColumn("topic",

217

when(col("event_type") === "error", "error-topic")

218

.when(col("event_type") === "warning", "warning-topic")

219

.otherwise("info-topic")

220

)

221

.select("topic", "value")

222

223

routedDataFrame

224

.write

225

.format("kafka")

226

.option("kafka.bootstrap.servers", "localhost:9092")

227

.save()

228

```

229

230

### Topic Validation

231

232

```scala

233

// Topic must be specified either in options or as a column

234

dataFrame

235

.write

236

.format("kafka")

237

.option("kafka.bootstrap.servers", "localhost:9092")

238

// Missing topic specification will cause error:

239

// "Topic option or topic column must be specified for Kafka writes"

240

.save()

241

```

242

243

## Producer Configuration

244

245

### Basic Producer Settings

246

247

```scala

248

// Essential producer configuration

249

dataFrame

250

.write

251

.format("kafka")

252

.option("kafka.bootstrap.servers", "localhost:9092")

253

.option("kafka.acks", "all") // Wait for all replicas

254

.option("kafka.retries", "3") // Retry failed sends

255

.option("kafka.batch.size", "16384") // Batch size in bytes

256

.option("kafka.linger.ms", "5") // Wait up to 5ms to batch

257

.option("kafka.buffer.memory", "33554432") // 32MB producer buffer

258

.option("topic", "reliable-topic")

259

.save()

260

```

261

262

### Performance Optimization

263

264

```scala

265

// High-throughput producer configuration

266

dataFrame

267

.write

268

.format("kafka")

269

.option("kafka.bootstrap.servers", "localhost:9092")

270

.option("kafka.acks", "1") // Leader acknowledgment only

271

.option("kafka.compression.type", "snappy") // Enable compression

272

.option("kafka.batch.size", "65536") // Larger batch size

273

.option("kafka.linger.ms", "10") // Higher batching delay

274

.option("kafka.buffer.memory", "67108864") // 64MB producer buffer

275

.option("topic", "high-throughput-topic")

276

.save()

277

```

278

279

### Reliability Configuration

280

281

```scala

282

// Maximum reliability producer configuration

283

dataFrame

284

.write

285

.format("kafka")

286

.option("kafka.bootstrap.servers", "localhost:9092")

287

.option("kafka.acks", "all") // Wait for all in-sync replicas

288

.option("kafka.retries", "10") // More retry attempts

289

.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff

290

.option("kafka.enable.idempotence", "true") // Exactly-once semantics

291

.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain order

292

.option("topic", "critical-topic")

293

.save()

294

```

295

296

## Streaming Writes

297

298

### Basic Streaming Write

299

300

```scala

301

// Simple streaming write to Kafka

302

val streamingQuery = kafkaInputStream

303

.selectExpr("CAST(value AS STRING)")

304

.writeStream

305

.format("kafka")

306

.option("kafka.bootstrap.servers", "localhost:9092")

307

.option("topic", "output-stream")

308

.outputMode("append")

309

.start()

310

```

311

312

### Advanced Streaming Write

313

314

```scala

315

// Advanced streaming write with processing

316

val advancedStreamingQuery = kafkaInputStream

317

.select(

318

expr("CAST(key AS STRING)").as("key"),

319

from_json(expr("CAST(value AS STRING)"), inputSchema).as("data")

320

)

321

.select(

322

col("key"),

323

col("data.user_id"),

324

col("data.event_type"),

325

to_json(col("data")).as("value")

326

)

327

.withColumn("topic",

328

when(col("event_type") === "purchase", "purchase-events")

329

.otherwise("general-events")

330

)

331

.select("topic", "key", "value")

332

.writeStream

333

.format("kafka")

334

.option("kafka.bootstrap.servers", "localhost:9092")

335

.option("checkpointLocation", "/checkpoints/kafka-output")

336

.outputMode("append")

337

.trigger(Trigger.ProcessingTime("30 seconds"))

338

.start()

339

```

340

341

### Streaming Write with Headers

342

343

```scala

344

// Include headers in streaming write

345

val headerStreamingQuery = kafkaInputStream

346

.select(

347

expr("CAST(key AS STRING)").as("key"),

348

expr("CAST(value AS STRING)").as("value"),

349

array(

350

struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value")),

351

struct(lit("version").as("key"), lit("1.0").cast(BinaryType).as("value"))

352

).as("headers")

353

)

354

.writeStream

355

.format("kafka")

356

.option("kafka.bootstrap.servers", "localhost:9092")

357

.option("topic", "enriched-stream")

358

.outputMode("append")

359

.start()

360

```

361

362

## Batch Writes

363

364

### Simple Batch Write

365

366

```scala

367

// Write DataFrame to Kafka in batch mode

368

batchDataFrame

369

.select(

370

col("id").cast(StringType).as("key"),

371

to_json(struct(col("*"))).as("value")

372

)

373

.write

374

.format("kafka")

375

.option("kafka.bootstrap.servers", "localhost:9092")

376

.option("topic", "batch-data")

377

.save()

378

```

379

380

### Partitioned Batch Write

381

382

```scala

383

// Control Kafka partitioning for batch writes

384

batchDataFrame

385

.select(

386

col("user_id").cast(StringType).as("key"),

387

to_json(struct(col("*"))).as("value"),

388

(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition")

389

)

390

.write

391

.format("kafka")

392

.option("kafka.bootstrap.servers", "localhost:9092")

393

.option("topic", "partitioned-data")

394

.save()

395

```

396

397

### Multi-Topic Batch Write

398

399

```scala

400

// Write to multiple topics in a single operation

401

multiTopicDataFrame

402

.withColumn("topic",

403

when(col("category") === "orders", "order-events")

404

.when(col("category") === "payments", "payment-events")

405

.when(col("category") === "inventory", "inventory-events")

406

.otherwise("misc-events")

407

)

408

.select("topic", "key", "value")

409

.write

410

.format("kafka")

411

.option("kafka.bootstrap.servers", "localhost:9092")

412

.save()

413

```

414

415

## Error Handling and Validation

416

417

### Schema Validation

418

419

The writer performs comprehensive schema validation:

420

421

```scala

422

// Invalid schema - missing value column

423

invalidDataFrame

424

.select("key", "topic") // Missing required "value" column

425

.write

426

.format("kafka")

427

.option("kafka.bootstrap.servers", "localhost:9092")

428

.option("topic", "test")

429

.save()

430

// Throws: "Required attribute 'value' not found"

431

```

432

433

### Producer Parameter Validation

434

435

```scala

436

// Invalid producer configuration

437

dataFrame

438

.write

439

.format("kafka")

440

.option("kafka.bootstrap.servers", "localhost:9092")

441

.option("kafka.key.serializer", "custom.serializer") // Not allowed

442

.option("topic", "test")

443

.save()

444

// Throws: "Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer"

445

```

446

447

### Save Mode Validation

448

449

```scala

450

// Invalid save modes for Kafka

451

dataFrame

452

.write

453

.mode(SaveMode.Overwrite) // Not supported

454

.format("kafka")

455

.option("kafka.bootstrap.servers", "localhost:9092")

456

.option("topic", "test")

457

.save()

458

// Throws: "Save mode Overwrite not allowed for Kafka"

459

460

// Supported save modes:

461

dataFrame.write.mode(SaveMode.Append).format("kafka")... // Allowed (default)

462

dataFrame.write.mode(SaveMode.ErrorIfExists).format("kafka")... // Allowed

463

```

464

465

## Producer Pooling

466

467

The connector automatically manages producer instances for efficiency:

468

469

### Producer Cache Configuration

470

471

```scala

472

// Configure producer cache behavior (internal settings)

473

// These are managed automatically but can be tuned via Spark configuration:

474

475

// spark.kafka.producer.cache.timeout = "10m" // Producer cache timeout

476

// spark.kafka.producer.cache.evictorThreadRunInterval = "1m" // Cache cleanup interval

477

```

478

479

### Producer Pool Management

480

481

```scala { .api }

482

/**

483

* Internal producer pool management

484

* Automatically handles producer lifecycle and connection pooling

485

*/

486

object InternalKafkaProducerPool {

487

/** Acquires a cached producer for the given configuration */

488

def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer

489

490

/** Releases a producer back to the pool */

491

def release(producer: CachedKafkaProducer): Unit

492

}

493

```

494

495

## Performance Optimization

496

497

### Batch Size Tuning

498

499

```scala

500

// Optimize for high throughput

501

highVolumeDataFrame

502

.coalesce(10) // Reduce number of partitions for larger batches

503

.write

504

.format("kafka")

505

.option("kafka.bootstrap.servers", "localhost:9092")

506

.option("kafka.batch.size", "131072") // 128KB batches

507

.option("kafka.linger.ms", "20") // Wait longer to fill batches

508

.option("kafka.compression.type", "lz4") // Use fast compression

509

.option("topic", "high-volume-topic")

510

.save()

511

```

512

513

### Memory Management

514

515

```scala

516

// Optimize memory usage for large writes

517

largeDataFrame

518

.repartition(50) // Increase parallelism to reduce memory per partition

519

.write

520

.format("kafka")

521

.option("kafka.bootstrap.servers", "localhost:9092")

522

.option("kafka.buffer.memory", "134217728") // 128MB producer buffer

523

.option("kafka.max.request.size", "10485760") // 10MB max request size

524

.option("topic", "large-data-topic")

525

.save()

526

```

527

528

### Concurrent Writes

529

530

```scala

531

// Maximize concurrent producer connections

532

dataFrame

533

.repartition(100) // More partitions = more concurrent producers

534

.write

535

.format("kafka")

536

.option("kafka.bootstrap.servers", "localhost:9092")

537

.option("kafka.max.in.flight.requests.per.connection", "5") // More concurrent requests

538

.option("topic", "concurrent-topic")

539

.save()

540

```