or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

streaming-apis.mddocs/

0

# Streaming APIs

1

2

The Streaming APIs in Apache Spark Catalyst provide comprehensive support for real-time data processing through both micro-batch and continuous processing modes. These APIs enable building custom streaming data sources and sinks with advanced features like exactly-once semantics, fault tolerance, and state management.

3

4

## Core Streaming Read APIs

5

6

### MicroBatchStream

7

8

Interface for streaming data sources in micro-batch mode:

9

10

```java { .api }

11

package org.apache.spark.sql.connector.read.streaming;

12

13

public interface MicroBatchStream {

14

/**

15

* Get the latest available offset

16

*/

17

Offset latestOffset();

18

19

/**

20

* Get the initial offset for starting the stream

21

*/

22

Offset initialOffset();

23

24

/**

25

* Deserialize offset from JSON string

26

*/

27

Offset deserializeOffset(String json);

28

29

/**

30

* Commit processing up to the given offset

31

*/

32

void commit(Offset end);

33

34

/**

35

* Stop the stream

36

*/

37

void stop();

38

}

39

```

40

41

### ContinuousStream

42

43

Interface for streaming data sources in continuous mode:

44

45

```java { .api }

46

public interface ContinuousStream {

47

/**

48

* Create continuous reader factory

49

*/

50

ContinuousPartitionReaderFactory createContinuousReaderFactory();

51

52

/**

53

* Get initial offset for the stream

54

*/

55

Offset initialOffset();

56

57

/**

58

* Merge partition offsets into a single offset

59

*/

60

Offset mergeOffsets(PartitionOffset[] offsets);

61

62

/**

63

* Deserialize offset from JSON

64

*/

65

Offset deserializeOffset(String json);

66

67

/**

68

* Commit processing up to the given offset

69

*/

70

void commit(Offset end);

71

72

/**

73

* Stop the stream

74

*/

75

void stop();

76

}

77

```

78

79

### Offset

80

81

Abstract base class for representing positions in streaming data:

82

83

```java { .api }

84

public abstract class Offset {

85

/**

86

* JSON representation of this offset

87

*/

88

public abstract String json();

89

90

@Override

91

public abstract boolean equals(Object obj);

92

93

@Override

94

public abstract int hashCode();

95

}

96

```

97

98

## Implementing a Micro-Batch Streaming Source

99

100

### Custom Offset Implementation

101

102

```java

103

public class MyStreamOffset extends Offset {

104

private final long batchId;

105

private final long recordCount;

106

107

public MyStreamOffset(long batchId, long recordCount) {

108

this.batchId = batchId;

109

this.recordCount = recordCount;

110

}

111

112

@Override

113

public String json() {

114

return String.format("{\"batchId\":%d,\"recordCount\":%d}", batchId, recordCount);

115

}

116

117

@Override

118

public boolean equals(Object obj) {

119

if (obj instanceof MyStreamOffset) {

120

MyStreamOffset other = (MyStreamOffset) obj;

121

return this.batchId == other.batchId && this.recordCount == other.recordCount;

122

}

123

return false;

124

}

125

126

@Override

127

public int hashCode() {

128

return Objects.hash(batchId, recordCount);

129

}

130

131

public long getBatchId() { return batchId; }

132

public long getRecordCount() { return recordCount; }

133

134

public static MyStreamOffset fromJson(String json) {

135

// Parse JSON and return offset

136

ObjectMapper mapper = new ObjectMapper();

137

try {

138

JsonNode node = mapper.readTree(json);

139

return new MyStreamOffset(

140

node.get("batchId").asLong(),

141

node.get("recordCount").asLong()

142

);

143

} catch (Exception e) {

144

throw new RuntimeException("Failed to parse offset: " + json, e);

145

}

146

}

147

}

148

```

149

150

### Complete Micro-Batch Stream Implementation

151

152

```java

153

public class MyMicroBatchStream implements MicroBatchStream {

154

private final String streamSource;

155

private final StructType schema;

156

private volatile MyStreamOffset currentOffset;

157

private volatile boolean stopped = false;

158

159

public MyMicroBatchStream(String streamSource, StructType schema) {

160

this.streamSource = streamSource;

161

this.schema = schema;

162

this.currentOffset = new MyStreamOffset(0, 0);

163

}

164

165

@Override

166

public Offset latestOffset() {

167

if (stopped) {

168

return currentOffset;

169

}

170

171

// Check for new data and update offset

172

long newBatchId = currentOffset.getBatchId() + 1;

173

long newRecordCount = checkForNewRecords();

174

175

if (newRecordCount > currentOffset.getRecordCount()) {

176

currentOffset = new MyStreamOffset(newBatchId, newRecordCount);

177

}

178

179

return currentOffset;

180

}

181

182

@Override

183

public Offset initialOffset() {

184

return new MyStreamOffset(0, 0);

185

}

186

187

@Override

188

public Offset deserializeOffset(String json) {

189

return MyStreamOffset.fromJson(json);

190

}

191

192

@Override

193

public void commit(Offset end) {

194

MyStreamOffset offset = (MyStreamOffset) end;

195

// Persist checkpoint information

196

persistCheckpoint(offset);

197

}

198

199

@Override

200

public void stop() {

201

stopped = true;

202

// Clean up resources

203

closeConnections();

204

}

205

206

private long checkForNewRecords() {

207

// Implementation specific - check external source for new data

208

return queryExternalSourceForRecordCount();

209

}

210

211

private void persistCheckpoint(MyStreamOffset offset) {

212

// Persist offset for fault tolerance

213

writeCheckpointToStorage(offset);

214

}

215

}

216

```

217

218

### Streaming Scan Integration

219

220

```java

221

public class MyStreamingTable implements Table, SupportsRead {

222

@Override

223

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

224

return new MyStreamingScanBuilder(schema, options);

225

}

226

}

227

228

public class MyStreamingScanBuilder implements ScanBuilder {

229

private final StructType schema;

230

private final CaseInsensitiveStringMap options;

231

232

@Override

233

public Scan build() {

234

return new MyStreamingScan(schema, options);

235

}

236

}

237

238

public class MyStreamingScan implements Scan {

239

private final StructType schema;

240

private final CaseInsensitiveStringMap options;

241

242

@Override

243

public MicroBatchStream toMicroBatchStream(String checkpointLocation) {

244

String streamSource = options.get("source.path");

245

return new MyMicroBatchStream(streamSource, schema);

246

}

247

248

@Override

249

public StructType readSchema() {

250

return schema;

251

}

252

}

253

```

254

255

## Continuous Stream Implementation

256

257

### Continuous Partition Reader

258

259

```java

260

public class MyContinuousPartitionReader implements ContinuousPartitionReader<InternalRow> {

261

private final int partitionId;

262

private final StructType schema;

263

private volatile boolean stopped = false;

264

private volatile PartitionOffset currentOffset;

265

266

public MyContinuousPartitionReader(int partitionId, StructType schema,

267

PartitionOffset startOffset) {

268

this.partitionId = partitionId;

269

this.schema = schema;

270

this.currentOffset = startOffset;

271

}

272

273

@Override

274

public boolean next() throws IOException {

275

if (stopped) {

276

return false;

277

}

278

279

// Continuously poll for new records

280

return pollForNewRecord();

281

}

282

283

@Override

284

public InternalRow get() {

285

// Return current record and update offset

286

InternalRow row = getCurrentRecord();

287

updateOffset();

288

return row;

289

}

290

291

@Override

292

public PartitionOffset getOffset() {

293

return currentOffset;

294

}

295

296

@Override

297

public void close() throws IOException {

298

stopped = true;

299

// Clean up partition-specific resources

300

}

301

302

private boolean pollForNewRecord() {

303

// Implementation-specific polling logic

304

return hasNewDataAvailable();

305

}

306

}

307

```

308

309

### Continuous Stream Factory

310

311

```java

312

public class MyContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory {

313

private final StructType schema;

314

315

@Override

316

public ContinuousPartitionReader<InternalRow> createReader(

317

ContinuousInputPartition partition) {

318

MyContinuousInputPartition myPartition = (MyContinuousInputPartition) partition;

319

return new MyContinuousPartitionReader(

320

myPartition.getPartitionId(),

321

schema,

322

myPartition.getStartOffset()

323

);

324

}

325

}

326

```

327

328

## Streaming Write APIs

329

330

### StreamingWrite

331

332

Interface for streaming write operations:

333

334

```java { .api }

335

package org.apache.spark.sql.connector.write.streaming;

336

337

public interface StreamingWrite {

338

/**

339

* Create writer factory for streaming

340

*/

341

StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);

342

343

/**

344

* Whether to use Spark's commit coordinator

345

*/

346

boolean useCommitCoordinator();

347

348

/**

349

* Commit an epoch of streaming writes

350

*/

351

void commit(long epochId, WriterCommitMessage[] messages);

352

353

/**

354

* Abort an epoch of streaming writes

355

*/

356

void abort(long epochId, WriterCommitMessage[] messages);

357

}

358

```

359

360

### StreamingDataWriterFactory

361

362

Factory for creating streaming data writers:

363

364

```java { .api }

365

public interface StreamingDataWriterFactory extends DataWriterFactory {

366

/**

367

* Create writer with epoch information

368

*/

369

DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId);

370

}

371

```

372

373

### Complete Streaming Write Implementation

374

375

```java

376

public class MyStreamingWrite implements StreamingWrite {

377

private final LogicalWriteInfo writeInfo;

378

private final Map<Long, Set<String>> epochFiles = new ConcurrentHashMap<>();

379

380

@Override

381

public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {

382

return new MyStreamingDataWriterFactory(info.schema(), writeInfo.options());

383

}

384

385

@Override

386

public boolean useCommitCoordinator() {

387

return true; // Enable exactly-once semantics

388

}

389

390

@Override

391

public void commit(long epochId, WriterCommitMessage[] messages) {

392

Set<String> files = new HashSet<>();

393

394

try {

395

// Collect all files written in this epoch

396

for (WriterCommitMessage message : messages) {

397

MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;

398

files.addAll(myMessage.getWrittenFiles());

399

}

400

401

// Atomically commit all files for this epoch

402

atomicCommitEpoch(epochId, files);

403

epochFiles.put(epochId, files);

404

405

// Clean up old epochs

406

cleanupOldEpochs(epochId);

407

408

} catch (Exception e) {

409

// If commit fails, abort the epoch

410

abort(epochId, messages);

411

throw new RuntimeException("Failed to commit epoch " + epochId, e);

412

}

413

}

414

415

@Override

416

public void abort(long epochId, WriterCommitMessage[] messages) {

417

// Clean up any partially written files

418

for (WriterCommitMessage message : messages) {

419

MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;

420

for (String file : myMessage.getWrittenFiles()) {

421

deleteFileIfExists(file);

422

}

423

}

424

epochFiles.remove(epochId);

425

}

426

427

private void atomicCommitEpoch(long epochId, Set<String> files) {

428

// Implementation-specific atomic commit

429

// This might involve:

430

// 1. Writing a commit marker

431

// 2. Moving temp files to final locations

432

// 3. Updating metadata

433

}

434

435

private void cleanupOldEpochs(long currentEpoch) {

436

// Keep only recent epochs for fault tolerance

437

long cutoffEpoch = currentEpoch - 100;

438

epochFiles.entrySet().removeIf(entry -> entry.getKey() < cutoffEpoch);

439

}

440

}

441

442

public class MyStreamingDataWriterFactory implements StreamingDataWriterFactory {

443

private final StructType schema;

444

private final CaseInsensitiveStringMap options;

445

446

@Override

447

public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {

448

return new MyStreamingDataWriter(schema, partitionId, taskId, epochId, options);

449

}

450

}

451

452

public class MyStreamingDataWriter implements DataWriter<InternalRow> {

453

private final StructType schema;

454

private final int partitionId;

455

private final long taskId;

456

private final long epochId;

457

private final List<InternalRow> buffer = new ArrayList<>();

458

private final Set<String> writtenFiles = new HashSet<>();

459

460

@Override

461

public void write(InternalRow record) throws IOException {

462

buffer.add(record.copy());

463

464

// Batch writes for efficiency

465

if (buffer.size() >= 1000) {

466

flushBuffer();

467

}

468

}

469

470

@Override

471

public WriterCommitMessage commit() throws IOException {

472

if (!buffer.isEmpty()) {

473

flushBuffer();

474

}

475

return new MyStreamingCommitMessage(partitionId, taskId, epochId, writtenFiles);

476

}

477

478

@Override

479

public void abort() throws IOException {

480

// Clean up any files written by this writer

481

for (String file : writtenFiles) {

482

deleteFileIfExists(file);

483

}

484

buffer.clear();

485

writtenFiles.clear();

486

}

487

488

private void flushBuffer() throws IOException {

489

String fileName = generateFileName(partitionId, taskId, epochId);

490

writeBufferToFile(fileName, buffer);

491

writtenFiles.add(fileName);

492

buffer.clear();

493

}

494

}

495

```

496

497

## Advanced Streaming Patterns

498

499

### Exactly-Once Processing with Idempotent Writes

500

501

```java

502

public class IdempotentStreamingWrite implements StreamingWrite {

503

private final Map<Long, String> epochCommitIds = new ConcurrentHashMap<>();

504

505

@Override

506

public void commit(long epochId, WriterCommitMessage[] messages) {

507

String commitId = generateCommitId(epochId, messages);

508

509

// Check if this epoch was already committed (for retries)

510

if (epochCommitIds.containsKey(epochId)) {

511

String existingCommitId = epochCommitIds.get(epochId);

512

if (existingCommitId.equals(commitId)) {

513

// Already committed with same data - this is a retry

514

return;

515

} else {

516

throw new IllegalStateException(

517

String.format("Epoch %d committed with different data", epochId));

518

}

519

}

520

521

// Perform idempotent commit

522

performIdempotentCommit(epochId, commitId, messages);

523

epochCommitIds.put(epochId, commitId);

524

}

525

526

private void performIdempotentCommit(long epochId, String commitId,

527

WriterCommitMessage[] messages) {

528

// Use commitId to ensure idempotency

529

// This might involve conditional writes to external systems

530

}

531

}

532

```

533

534

### State Management for Streaming Aggregations

535

536

```java

537

public class StatefulStreamingProcessor {

538

private final Map<String, Object> state = new ConcurrentHashMap<>();

539

540

public void processStreamingBatch(Iterator<InternalRow> batch, long epochId) {

541

Map<String, Object> batchState = new HashMap<>();

542

543

// Process batch and update state

544

while (batch.hasNext()) {

545

InternalRow row = batch.next();

546

String key = extractKey(row);

547

Object value = extractValue(row);

548

549

// Update batch state

550

batchState.merge(key, value, this::combineValues);

551

}

552

553

// Atomically update global state

554

synchronized (state) {

555

for (Map.Entry<String, Object> entry : batchState.entrySet()) {

556

state.merge(entry.getKey(), entry.getValue(), this::combineValues);

557

}

558

}

559

560

// Persist state for fault tolerance

561

persistState(epochId);

562

}

563

564

private Object combineValues(Object existing, Object newValue) {

565

// Implementation-specific value combination logic

566

if (existing instanceof Number && newValue instanceof Number) {

567

return ((Number) existing).doubleValue() + ((Number) newValue).doubleValue();

568

}

569

return newValue;

570

}

571

}

572

```

573

574

### Watermark-Based Late Data Handling

575

576

```java

577

public class WatermarkStreamingSource implements MicroBatchStream {

578

private volatile long currentWatermark = 0;

579

private final Duration allowedLateness;

580

581

public WatermarkStreamingSource(Duration allowedLateness) {

582

this.allowedLateness = allowedLateness;

583

}

584

585

public void updateWatermark(long eventTime) {

586

// Update watermark based on event time minus allowed lateness

587

long newWatermark = eventTime - allowedLateness.toMillis();

588

currentWatermark = Math.max(currentWatermark, newWatermark);

589

}

590

591

public boolean isLateData(long eventTime) {

592

return eventTime < currentWatermark;

593

}

594

595

@Override

596

public Offset latestOffset() {

597

// Include watermark information in offset

598

return new WatermarkOffset(getCurrentBatchId(), currentWatermark);

599

}

600

}

601

602

public class WatermarkOffset extends Offset {

603

private final long batchId;

604

private final long watermark;

605

606

public WatermarkOffset(long batchId, long watermark) {

607

this.batchId = batchId;

608

this.watermark = watermark;

609

}

610

611

@Override

612

public String json() {

613

return String.format("{\"batchId\":%d,\"watermark\":%d}", batchId, watermark);

614

}

615

616

// equals and hashCode implementations...

617

}

618

```

619

620

## Fault Tolerance and Recovery

621

622

### Checkpointing Implementation

623

624

```java

625

public class CheckpointableStreamingSource implements MicroBatchStream {

626

private final String checkpointLocation;

627

private volatile MyStreamOffset lastCheckpointedOffset;

628

629

@Override

630

public void commit(Offset end) {

631

MyStreamOffset offset = (MyStreamOffset) end;

632

633

try {

634

// Write checkpoint atomically

635

writeCheckpoint(offset);

636

lastCheckpointedOffset = offset;

637

} catch (IOException e) {

638

throw new RuntimeException("Failed to checkpoint offset: " + offset, e);

639

}

640

}

641

642

@Override

643

public Offset initialOffset() {

644

try {

645

// Try to recover from checkpoint

646

MyStreamOffset checkpointOffset = readCheckpoint();

647

if (checkpointOffset != null) {

648

return checkpointOffset;

649

}

650

} catch (IOException e) {

651

// Log warning and start from beginning

652

System.err.println("Failed to read checkpoint, starting from beginning: " + e);

653

}

654

655

return new MyStreamOffset(0, 0);

656

}

657

658

private void writeCheckpoint(MyStreamOffset offset) throws IOException {

659

String tempFile = checkpointLocation + ".tmp";

660

String finalFile = checkpointLocation;

661

662

// Atomic write: write to temp file, then rename

663

Files.write(Paths.get(tempFile), offset.json().getBytes());

664

Files.move(Paths.get(tempFile), Paths.get(finalFile));

665

}

666

667

private MyStreamOffset readCheckpoint() throws IOException {

668

Path checkpointPath = Paths.get(checkpointLocation);

669

if (!Files.exists(checkpointPath)) {

670

return null;

671

}

672

673

String json = new String(Files.readAllBytes(checkpointPath));

674

return MyStreamOffset.fromJson(json);

675

}

676

}

677

```

678

679

### Retry and Error Handling

680

681

```java

682

public class ResilientStreamingProcessor {

683

private final int maxRetries;

684

private final Duration retryDelay;

685

686

public void processWithRetry(Runnable operation, String operationName) {

687

int attempts = 0;

688

Exception lastException = null;

689

690

while (attempts < maxRetries) {

691

try {

692

operation.run();

693

return; // Success

694

} catch (Exception e) {

695

attempts++;

696

lastException = e;

697

698

if (isRetriableException(e) && attempts < maxRetries) {

699

System.err.printf("Operation %s failed (attempt %d/%d), retrying in %s: %s%n",

700

operationName, attempts, maxRetries, retryDelay, e.getMessage());

701

702

try {

703

Thread.sleep(retryDelay.toMillis());

704

} catch (InterruptedException ie) {

705

Thread.currentThread().interrupt();

706

throw new RuntimeException("Interrupted during retry", ie);

707

}

708

} else {

709

break;

710

}

711

}

712

}

713

714

throw new RuntimeException(

715

String.format("Operation %s failed after %d attempts", operationName, attempts),

716

lastException);

717

}

718

719

private boolean isRetriableException(Exception e) {

720

// Determine if exception is worth retrying

721

return e instanceof IOException ||

722

e instanceof ConnectException ||

723

(e instanceof RuntimeException && e.getCause() instanceof IOException);

724

}

725

}

726

```

727

728

## Performance Optimization

729

730

### Batching for Efficiency

731

732

```java

733

public class BatchingStreamingWriter implements DataWriter<InternalRow> {

734

private final List<InternalRow> buffer = new ArrayList<>();

735

private final int batchSize;

736

private final Duration flushInterval;

737

private long lastFlushTime = System.currentTimeMillis();

738

739

@Override

740

public void write(InternalRow record) throws IOException {

741

buffer.add(record.copy());

742

743

// Flush based on size or time

744

if (shouldFlush()) {

745

flushBuffer();

746

}

747

}

748

749

private boolean shouldFlush() {

750

return buffer.size() >= batchSize ||

751

(System.currentTimeMillis() - lastFlushTime) > flushInterval.toMillis();

752

}

753

754

private void flushBuffer() throws IOException {

755

if (!buffer.isEmpty()) {

756

writeBatch(buffer);

757

buffer.clear();

758

lastFlushTime = System.currentTimeMillis();

759

}

760

}

761

}

762

```

763

764

### Parallel Processing

765

766

```java

767

public class ParallelStreamingProcessor {

768

private final ExecutorService executor;

769

770

public void processParallel(List<InputPartition> partitions) {

771

List<CompletableFuture<Void>> futures = partitions.stream()

772

.map(partition -> CompletableFuture.runAsync(

773

() -> processPartition(partition), executor))

774

.collect(Collectors.toList());

775

776

// Wait for all partitions to complete

777

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))

778

.join();

779

}

780

781

private void processPartition(InputPartition partition) {

782

// Partition-specific processing logic

783

}

784

}

785

```

786

787

The Streaming APIs provide a robust foundation for building real-time data processing systems with strong guarantees around fault tolerance, exactly-once processing, and efficient resource utilization.