or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

data-writing.mddocs/

0

# Data Writing

1

2

Comprehensive data writing capabilities for both streaming and batch workloads with producer connection pooling, automatic serialization, and support for multiple output modes.

3

4

## Capabilities

5

6

### Streaming Sink (Legacy)

7

8

Legacy streaming sink for writing DataFrames to Kafka topics in streaming queries.

9

10

```scala { .api }

11

/**

12

* Legacy streaming sink for writing to Kafka

13

*/

14

class KafkaSink extends Sink with Logging {

15

/**

16

* Adds a batch of data to the sink

17

* @param batchId Unique identifier for this batch

18

* @param data DataFrame containing data to write

19

*/

20

def addBatch(batchId: Long, data: DataFrame): Unit

21

22

/**

23

* String representation of the sink

24

* @return String describing this sink

25

*/

26

def toString(): String

27

}

28

```

29

30

### Stream Writer (DataSource V2)

31

32

Modern stream writer for DataSource V2 with improved performance and reliability.

33

34

```scala { .api }

35

/**

36

* Stream writer for DataSource V2 streaming writes

37

* @param topic Optional default topic for writes

38

* @param producerParams Kafka producer configuration

39

* @param schema Schema of input data

40

*/

41

class KafkaStreamWriter(

42

topic: Option[String],

43

producerParams: Map[String, String],

44

schema: StructType

45

) extends StreamWriter {

46

/**

47

* Creates writer factory for this stream

48

* @return KafkaStreamWriterFactory instance

49

*/

50

def createWriterFactory(): KafkaStreamWriterFactory

51

52

/**

53

* Commits an epoch of writes

54

* @param epochId Epoch identifier

55

* @param messages Array of commit messages from writers

56

*/

57

def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit

58

59

/**

60

* Aborts an epoch of writes

61

* @param epochId Epoch identifier

62

* @param messages Array of commit messages from writers

63

*/

64

def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit

65

}

66

```

67

68

### Stream Writer Factory

69

70

Factory for creating data writers in streaming contexts.

71

72

```scala { .api }

73

/**

74

* Factory for creating stream data writers

75

* @param topic Optional default topic

76

* @param producerParams Kafka producer configuration

77

* @param schema Schema of input data

78

*/

79

case class KafkaStreamWriterFactory(

80

topic: Option[String],

81

producerParams: Map[String, String],

82

schema: StructType

83

) extends DataWriterFactory[InternalRow] {

84

/**

85

* Creates data writer for a specific partition and task

86

* @param partitionId Partition identifier

87

* @param taskId Task identifier

88

* @param epochId Epoch identifier

89

* @return DataWriter for writing records

90

*/

91

def createDataWriter(

92

partitionId: Int,

93

taskId: Long,

94

epochId: Long

95

): DataWriter[InternalRow]

96

}

97

```

98

99

### Stream Data Writer

100

101

Data writer for streaming Kafka writes with row-level processing.

102

103

```scala { .api }

104

/**

105

* Data writer for streaming Kafka writes

106

* @param targetTopic Optional target topic

107

* @param producerParams Kafka producer configuration

108

* @param inputSchema Schema of input data

109

*/

110

class KafkaStreamDataWriter(

111

targetTopic: Option[String],

112

producerParams: Map[String, String],

113

inputSchema: StructType

114

) extends KafkaRowWriter with DataWriter[InternalRow] {

115

/**

116

* Writes a single row to Kafka

117

* @param row InternalRow to write

118

*/

119

def write(row: InternalRow): Unit

120

121

/**

122

* Commits all pending writes

123

* @return WriterCommitMessage confirming completion

124

*/

125

def commit(): WriterCommitMessage

126

127

/**

128

* Aborts all pending writes

129

*/

130

def abort(): Unit

131

132

/**

133

* Closes the writer and releases resources

134

*/

135

def close(): Unit

136

}

137

```

138

139

### Batch Writer

140

141

Utilities for writing data to Kafka from batch and streaming queries.

142

143

```scala { .api }

144

/**

145

* Utilities for writing data to Kafka from batch/streaming queries

146

*/

147

object KafkaWriter extends Logging {

148

/** Topic column name in DataFrame */

149

val TOPIC_ATTRIBUTE_NAME: String = "topic"

150

151

/** Key column name in DataFrame */

152

val KEY_ATTRIBUTE_NAME: String = "key"

153

154

/** Value column name in DataFrame */

155

val VALUE_ATTRIBUTE_NAME: String = "value"

156

157

/**

158

* Validates query schema for Kafka write compatibility

159

* @param schema Attribute schema to validate

160

* @param kafkaParameters Kafka producer parameters

161

* @param topic Optional default topic

162

*/

163

def validateQuery(

164

schema: Seq[Attribute],

165

kafkaParameters: ju.Map[String, Object],

166

topic: Option[String]

167

): Unit

168

169

/**

170

* Writes DataFrame data to Kafka

171

* @param sparkSession Current Spark session

172

* @param queryExecution Query execution plan

173

* @param kafkaParameters Kafka producer parameters

174

* @param topic Optional default topic

175

*/

176

def write(

177

sparkSession: SparkSession,

178

queryExecution: QueryExecution,

179

kafkaParameters: ju.Map[String, Object],

180

topic: Option[String]

181

): Unit

182

183

/**

184

* String representation of the writer

185

* @return String describing writer configuration

186

*/

187

def toString: String

188

}

189

```

190

191

### Write Task

192

193

Task for writing data to Kafka in batch mode with proper resource management.

194

195

```scala { .api }

196

/**

197

* Task for writing data to Kafka in batch mode

198

* @param producerConfiguration Kafka producer configuration

199

* @param inputSchema Schema of input data

200

* @param topic Optional target topic

201

*/

202

class KafkaWriteTask(

203

producerConfiguration: ju.Map[String, Object],

204

inputSchema: StructType,

205

topic: Option[String]

206

) extends KafkaRowWriter {

207

/**

208

* Executes write task for iterator of rows

209

* @param iterator Iterator of InternalRow objects to write

210

*/

211

def execute(iterator: Iterator[InternalRow]): Unit

212

213

/**

214

* Closes task and releases resources

215

*/

216

def close(): Unit

217

}

218

```

219

220

### Row Writer Base

221

222

Base class for writing rows to Kafka with common functionality.

223

224

```scala { .api }

225

/**

226

* Base class for writing rows to Kafka

227

* @param inputSchema Schema of input data

228

* @param topic Optional target topic

229

*/

230

abstract class KafkaRowWriter(inputSchema: StructType, topic: Option[String]) {

231

/**

232

* Sends a row to Kafka producer

233

* @param row InternalRow to send

234

* @param producer Kafka producer instance

235

*/

236

protected def sendRow(row: InternalRow, producer: Producer[Array[Byte], Array[Byte]]): Unit

237

238

/**

239

* Checks for write errors and throws exceptions if found

240

*/

241

protected def checkForErrors(): Unit

242

}

243

```

244

245

### Writer Commit Message

246

247

Commit message for Kafka stream writes indicating successful completion.

248

249

```scala { .api }

250

/**

251

* Commit message for Kafka stream writes

252

*/

253

case object KafkaWriterCommitMessage extends WriterCommitMessage

254

```

255

256

### Cached Producer

257

258

Producer cache for improved performance and connection reuse.

259

260

```scala { .api }

261

/**

262

* Cache for Kafka producers to improve performance

263

*/

264

object CachedKafkaProducer extends Logging {

265

/**

266

* Gets existing producer from cache or creates new one

267

* @param kafkaParams Kafka producer parameters

268

* @return Cached or new Producer instance

269

*/

270

def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer[Array[Byte], Array[Byte]]

271

272

/**

273

* Explicitly closes and removes producer from cache

274

* @param kafkaParams Producer parameters for identification

275

*/

276

def close(kafkaParams: ju.Map[String, Object]): Unit

277

278

/**

279

* Clears entire producer cache

280

*/

281

def clear(): Unit

282

}

283

```

284

285

## Usage Examples

286

287

### Streaming Write to Single Topic

288

289

```scala

290

val query = df

291

.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")

292

.writeStream

293

.format("kafka")

294

.option("kafka.bootstrap.servers", "localhost:9092")

295

.option("topic", "output-topic")

296

.option("checkpointLocation", "/tmp/checkpoint")

297

.start()

298

299

query.awaitTermination()

300

```

301

302

### Streaming Write to Multiple Topics

303

304

```scala

305

val multiTopicDF = inputDF

306

.withColumn("topic", when($"event_type" === "user", "user-events")

307

.when($"event_type" === "order", "order-events")

308

.otherwise("other-events"))

309

.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value", "topic")

310

311

val query = multiTopicDF

312

.writeStream

313

.format("kafka")

314

.option("kafka.bootstrap.servers", "localhost:9092")

315

.option("checkpointLocation", "/tmp/multi-topic-checkpoint")

316

.start()

317

```

318

319

### Batch Write

320

321

```scala

322

df.select(

323

col("user_id").cast("string").as("key"),

324

to_json(struct(col("*"))).as("value")

325

)

326

.write

327

.format("kafka")

328

.option("kafka.bootstrap.servers", "localhost:9092")

329

.option("topic", "user-data")

330

.save()

331

```

332

333

### Partitioned Write

334

335

```scala

336

val partitionedDF = df

337

.withColumn("partition_key", hash($"user_id") % 10)

338

.selectExpr(

339

"CAST(partition_key AS STRING) AS key",

340

"to_json(struct(*)) AS value"

341

)

342

343

partitionedDF

344

.write

345

.format("kafka")

346

.option("kafka.bootstrap.servers", "localhost:9092")

347

.option("topic", "partitioned-topic")

348

.save()

349

```

350

351

### Custom Serialization

352

353

```scala

354

import org.apache.spark.sql.functions._

355

356

val customSerializedDF = df

357

.selectExpr(

358

"CAST(id AS STRING) AS key",

359

"CAST(serialize_avro(struct(*)) AS BINARY) AS value" // Custom serialization

360

)

361

362

customSerializedDF

363

.writeStream

364

.format("kafka")

365

.option("kafka.bootstrap.servers", "localhost:9092")

366

.option("topic", "avro-topic")

367

.start()

368

```

369

370

## Configuration Options

371

372

### Producer Configuration

373

374

```scala

375

// Basic configuration

376

.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")

377

.option("topic", "my-topic")

378

379

// Performance tuning

380

.option("kafka.acks", "all") // Acknowledgment level

381

.option("kafka.retries", "3") // Retry count

382

.option("kafka.batch.size", "16384") // Batch size

383

.option("kafka.linger.ms", "5") // Batching delay

384

.option("kafka.buffer.memory", "33554432") // Buffer memory

385

.option("kafka.compression.type", "snappy") // Compression

386

387

// Reliability

388

.option("kafka.enable.idempotence", "true") // Idempotent producer

389

.option("kafka.max.in.flight.requests.per.connection", "5")

390

.option("kafka.request.timeout.ms", "30000") // Request timeout

391

```

392

393

### Security Configuration

394

395

```scala

396

// SSL configuration

397

.option("kafka.security.protocol", "SSL")

398

.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")

399

.option("kafka.ssl.truststore.password", "password")

400

.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")

401

.option("kafka.ssl.keystore.password", "password")

402

403

// SASL configuration

404

.option("kafka.security.protocol", "SASL_SSL")

405

.option("kafka.sasl.mechanism", "PLAIN")

406

.option("kafka.sasl.jaas.config",

407

"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")

408

```

409

410

## Data Format Requirements

411

412

### Required Columns

413

414

The DataFrame must contain specific columns for Kafka writes:

415

416

```scala

417

// Minimum required: value column

418

df.select(

419

lit("default-key").as("key"), // Optional: key column (String or Binary)

420

to_json(struct(col("*"))).as("value"), // Required: value column (String or Binary)

421

lit("my-topic").as("topic") // Optional: topic column (String)

422

)

423

```

424

425

### Column Types

426

427

```scala

428

val schemaValidation = StructType(Seq(

429

StructField("key", StringType, nullable = true), // or BinaryType

430

StructField("value", StringType, nullable = false), // or BinaryType

431

StructField("topic", StringType, nullable = true) // Optional

432

))

433

```

434

435

### Type Conversions

436

437

```scala

438

// String to binary conversion

439

df.selectExpr(

440

"CAST(key AS BINARY) AS key",

441

"CAST(value AS BINARY) AS value"

442

)

443

444

// JSON serialization

445

df.select(

446

col("id").cast("string").as("key"),

447

to_json(struct(col("*"))).as("value")

448

)

449

450

// Custom serialization function

451

import org.apache.spark.sql.functions.udf

452

453

val customSerializer = udf((data: String) => {

454

// Custom serialization logic

455

data.getBytes("UTF-8")

456

})

457

458

df.select(

459

col("key"),

460

customSerializer(col("data")).as("value")

461

)

462

```

463

464

## Error Handling

465

466

### Write Failures

467

468

```scala

469

import org.apache.spark.sql.streaming.StreamingQueryException

470

471

try {

472

val query = df.writeStream

473

.format("kafka")

474

.option("kafka.bootstrap.servers", "localhost:9092")

475

.option("topic", "test-topic")

476

.start()

477

478

query.awaitTermination()

479

} catch {

480

case e: StreamingQueryException =>

481

println(s"Streaming query failed: ${e.getMessage}")

482

e.getCause match {

483

case kafkaException: org.apache.kafka.common.KafkaException =>

484

println(s"Kafka error: ${kafkaException.getMessage}")

485

case _ =>

486

println("Non-Kafka error occurred")

487

}

488

}

489

```

490

491

### Producer Error Handling

492

493

```scala

494

// Configure retry behavior

495

.option("kafka.retries", "3")

496

.option("kafka.retry.backoff.ms", "100")

497

.option("kafka.request.timeout.ms", "30000")

498

499

// Error tolerance

500

.option("kafka.acks", "1") // or "all" for stronger guarantees

501

```

502

503

### Schema Validation

504

505

```scala

506

def validateKafkaSchema(df: DataFrame): Unit = {

507

val schema = df.schema

508

val hasValue = schema.fieldNames.contains("value")

509

val hasValidValueType = hasValue &&

510

(schema("value").dataType == StringType || schema("value").dataType == BinaryType)

511

512

require(hasValue && hasValidValueType,

513

"DataFrame must contain 'value' column of String or Binary type")

514

515

if (schema.fieldNames.contains("key")) {

516

val keyType = schema("key").dataType

517

require(keyType == StringType || keyType == BinaryType,

518

"'key' column must be String or Binary type")

519

}

520

521

if (schema.fieldNames.contains("topic")) {

522

require(schema("topic").dataType == StringType,

523

"'topic' column must be String type")

524

}

525

}

526

527

validateKafkaSchema(df)

528

```

529

530

## Performance Optimization

531

532

### Batching Configuration

533

534

```scala

535

// Optimize batching for throughput

536

.option("kafka.batch.size", "32768") // 32KB batches

537

.option("kafka.linger.ms", "10") // Wait up to 10ms

538

.option("kafka.buffer.memory", "67108864") // 64MB buffer

539

540

// Optimize for latency

541

.option("kafka.batch.size", "0") // No batching

542

.option("kafka.linger.ms", "0") // Send immediately

543

```

544

545

### Connection Management

546

547

```scala

548

// Connection pooling

549

.option("kafka.connections.max.idle.ms", "540000") // 9 minutes

550

.option("kafka.max.in.flight.requests.per.connection", "5")

551

552

// Reduce connection overhead by reusing producers

553

CachedKafkaProducer.getOrCreate(kafkaParams)

554

```

555

556

### Compression

557

558

```scala

559

// Enable compression for large messages

560

.option("kafka.compression.type", "snappy") // or "gzip", "lz4", "zstd"

561

```

562

563

## Monitoring and Metrics

564

565

### Query Progress

566

567

```scala

568

val query = df.writeStream

569

.format("kafka")

570

.option("kafka.bootstrap.servers", "localhost:9092")

571

.option("topic", "metrics-topic")

572

.start()

573

574

// Monitor progress

575

query.progress.foreach { progress =>

576

println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")

577

println(s"Processing time: ${progress.durationMs.get("triggerExecution")}ms")

578

}

579

```

580

581

### Producer Metrics

582

583

```scala

584

// Access producer metrics through JMX or custom metrics collectors

585

import org.apache.kafka.clients.producer.ProducerConfig

586

587

val metricsReporters = "org.apache.kafka.common.metrics.JmxReporter"

588

.option(s"kafka.${ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG}", metricsReporters)

589

```

590

591

## Best Practices

592

593

### Message Design

594

595

1. **Keep keys consistent for partitioning**:

596

```scala

597

df.withColumn("key", col("user_id").cast("string"))

598

```

599

600

2. **Use efficient serialization**:

601

```scala

602

// Prefer binary formats for large messages

603

df.select(col("key"), col("avro_bytes").as("value"))

604

```

605

606

3. **Include metadata in messages**:

607

```scala

608

df.select(

609

col("key"),

610

to_json(struct(

611

col("*"),

612

current_timestamp().as("processed_at"),

613

lit("spark").as("source")

614

)).as("value")

615

)

616

```

617

618

### Reliability Patterns

619

620

1. **Enable idempotence for exactly-once**:

621

```scala

622

.option("kafka.enable.idempotence", "true")

623

.option("kafka.acks", "all")

624

```

625

626

2. **Use checkpointing for fault tolerance**:

627

```scala

628

.option("checkpointLocation", "/reliable/checkpoint/path")

629

```

630

631

3. **Monitor for failures**:

632

```scala

633

query.exception.foreach(throw _) // Propagate exceptions

634

```

635

636

### Resource Management

637

638

1. **Close resources properly**:

639

```scala

640

try {

641

// Write operations

642

} finally {

643

CachedKafkaProducer.clear() // Clean up on shutdown

644

}

645

```

646

647

2. **Configure memory appropriately**:

648

```scala

649

spark.conf.set("spark.executor.memory", "4g")

650

spark.conf.set("spark.driver.memory", "2g")

651

```