or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md

source-reader.mddocs/

0

# Source Reader Framework

1

2

The Source Reader Framework provides a sophisticated foundation for building custom Flink source readers with automatic split management, coordination between threads, and comprehensive state handling. It supports both single-threaded and multi-threaded split reading patterns.

3

4

## Core Components

5

6

### SourceReaderBase

7

8

The foundation class for all source reader implementations.

9

10

```java { .api }

11

@PublicEvolving

12

public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>

13

implements SourceReader<T, SplitT> {

14

15

// Constructors

16

public SourceReaderBase(

17

SplitFetcherManager<E, SplitT> splitFetcherManager,

18

RecordEmitter<E, T, SplitStateT> recordEmitter,

19

Configuration config,

20

SourceReaderContext context)

21

22

public SourceReaderBase(

23

SplitFetcherManager<E, SplitT> splitFetcherManager,

24

RecordEmitter<E, T, SplitStateT> recordEmitter,

25

RecordEvaluator<T> eofRecordEvaluator,

26

Configuration config,

27

SourceReaderContext context)

28

29

// Public interface methods

30

public void start()

31

public InputStatus pollNext(ReaderOutput<T> output) throws Exception

32

public CompletableFuture<Void> isAvailable()

33

public List<SplitT> snapshotState(long checkpointId)

34

public void addSplits(List<SplitT> splits)

35

public void notifyNoMoreSplits()

36

public void handleSourceEvents(SourceEvent sourceEvent)

37

public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume)

38

public void close() throws Exception

39

public int getNumberOfCurrentlyAssignedSplits()

40

41

// Abstract methods to implement

42

protected abstract void onSplitFinished(Map<String, SplitStateT> finishedSplitIds)

43

protected abstract SplitStateT initializedState(SplitT split)

44

protected abstract SplitT toSplitType(String splitId, SplitStateT splitState)

45

}

46

```

47

48

### SingleThreadMultiplexSourceReaderBase

49

50

Specialized reader for sources that use a single thread with one SplitReader.

51

52

```java { .api }

53

@PublicEvolving

54

public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>

55

extends SourceReaderBase<E, T, SplitT, SplitStateT> {

56

57

// Constructors

58

public SingleThreadMultiplexSourceReaderBase(

59

Supplier<SplitReader<E, SplitT>> splitReaderSupplier,

60

RecordEmitter<E, T, SplitStateT> recordEmitter,

61

Configuration config,

62

SourceReaderContext context)

63

64

public SingleThreadMultiplexSourceReaderBase(

65

SingleThreadFetcherManager<E, SplitT> splitFetcherManager,

66

RecordEmitter<E, T, SplitStateT> recordEmitter,

67

Configuration config,

68

SourceReaderContext context)

69

70

public SingleThreadMultiplexSourceReaderBase(

71

SingleThreadFetcherManager<E, SplitT> splitFetcherManager,

72

RecordEmitter<E, T, SplitStateT> recordEmitter,

73

RecordEvaluator<T> eofRecordEvaluator,

74

Configuration config,

75

SourceReaderContext context)

76

}

77

```

78

79

### SplitReader

80

81

Interface for reading records from splits.

82

83

```java { .api }

84

@PublicEvolving

85

public interface SplitReader<E, SplitT> extends AutoCloseable {

86

RecordsWithSplitIds<E> fetch() throws IOException

87

void handleSplitsChanges(SplitsChange<SplitT> splitsChanges)

88

void wakeUp()

89

void pauseOrResumeSplits(Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume)

90

void close() throws Exception

91

}

92

```

93

94

### RecordEmitter

95

96

Processes records from the split reader and emits them to the output.

97

98

```java { .api }

99

@PublicEvolving

100

public interface RecordEmitter<E, T, SplitStateT> {

101

void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception

102

}

103

```

104

105

### RecordEvaluator

106

107

Evaluates records to determine if they represent end-of-stream markers.

108

109

```java { .api }

110

@PublicEvolving

111

public interface RecordEvaluator<T> {

112

boolean isEndOfStream(T record)

113

}

114

```

115

116

## Implementation Examples

117

118

### Complete File Source Reader

119

120

```java

121

public class CustomFileSourceReader extends SingleThreadMultiplexSourceReaderBase<

122

FileRecord, String, FileSourceSplit, FileSourceSplitState> {

123

124

public CustomFileSourceReader(

125

Configuration config,

126

SourceReaderContext context) {

127

super(

128

() -> new FileSystemSplitReader(config), // SplitReader supplier

129

new FileRecordEmitter(), // Record emitter

130

config,

131

context

132

);

133

}

134

135

@Override

136

protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {

137

// Cleanup resources for finished splits

138

for (Map.Entry<String, FileSourceSplitState> entry : finishedSplitIds.entrySet()) {

139

String splitId = entry.getKey();

140

FileSourceSplitState splitState = entry.getValue();

141

142

LOG.info("Split {} finished at position {}", splitId, splitState.getOffset());

143

144

// Close any split-specific resources

145

splitState.cleanup();

146

}

147

}

148

149

@Override

150

protected FileSourceSplitState initializedState(FileSourceSplit split) {

151

return new FileSourceSplitState(

152

split.path(),

153

split.offset(),

154

split.length()

155

);

156

}

157

158

@Override

159

protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {

160

return new FileSourceSplit(

161

splitId,

162

splitState.getPath(),

163

splitState.getOffset(),

164

splitState.getLength()

165

);

166

}

167

}

168

169

// Split implementation

170

public class FileSourceSplit implements SourceSplit {

171

private final String splitId;

172

private final Path path;

173

private final long offset;

174

private final long length;

175

176

public FileSourceSplit(String splitId, Path path, long offset, long length) {

177

this.splitId = splitId;

178

this.path = path;

179

this.offset = offset;

180

this.length = length;

181

}

182

183

@Override

184

public String splitId() {

185

return splitId;

186

}

187

188

public Path path() { return path; }

189

public long offset() { return offset; }

190

public long length() { return length; }

191

}

192

193

// Split state implementation

194

public class FileSourceSplitState {

195

private Path path;

196

private long offset;

197

private long length;

198

private BufferedReader reader;

199

200

public FileSourceSplitState(Path path, long offset, long length) {

201

this.path = path;

202

this.offset = offset;

203

this.length = length;

204

}

205

206

public Path getPath() { return path; }

207

public long getOffset() { return offset; }

208

public void setOffset(long offset) { this.offset = offset; }

209

public long getLength() { return length; }

210

211

public BufferedReader getReader() { return reader; }

212

public void setReader(BufferedReader reader) { this.reader = reader; }

213

214

public void cleanup() {

215

if (reader != null) {

216

try {

217

reader.close();

218

} catch (IOException e) {

219

LOG.warn("Failed to close reader for split", e);

220

}

221

}

222

}

223

}

224

```

225

226

### SplitReader Implementation

227

228

```java

229

public class FileSystemSplitReader implements SplitReader<FileRecord, FileSourceSplit> {

230

private final Configuration config;

231

private final Map<String, FileReaderState> splitReaders;

232

private final Set<String> pausedSplits;

233

234

public FileSystemSplitReader(Configuration config) {

235

this.config = config;

236

this.splitReaders = new HashMap<>();

237

this.pausedSplits = new HashSet<>();

238

}

239

240

@Override

241

public RecordsWithSplitIds<FileRecord> fetch() throws IOException {

242

Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();

243

Set<String> finishedSplits = new HashSet<>();

244

245

for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {

246

String splitId = entry.getKey();

247

248

// Skip paused splits

249

if (pausedSplits.contains(splitId)) {

250

continue;

251

}

252

253

FileReaderState readerState = entry.getValue();

254

List<FileRecord> records = new ArrayList<>();

255

256

try {

257

// Read batch of records from this split

258

for (int i = 0; i < 100 && readerState.hasMore(); i++) {

259

String line = readerState.readLine();

260

if (line != null) {

261

records.add(new FileRecord(

262

splitId,

263

line,

264

readerState.getCurrentOffset(),

265

System.currentTimeMillis()

266

));

267

} else {

268

// End of split reached

269

finishedSplits.add(splitId);

270

break;

271

}

272

}

273

274

if (!records.isEmpty()) {

275

recordsBySplit.put(splitId, records);

276

}

277

} catch (IOException e) {

278

LOG.error("Error reading from split {}", splitId, e);

279

throw e;

280

}

281

}

282

283

// Clean up finished splits

284

for (String finishedSplit : finishedSplits) {

285

FileReaderState readerState = splitReaders.remove(finishedSplit);

286

if (readerState != null) {

287

readerState.close();

288

}

289

}

290

291

return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);

292

}

293

294

@Override

295

public void handleSplitsChanges(SplitsChange<FileSourceSplit> splitsChanges) {

296

if (splitsChanges instanceof SplitsAddition) {

297

SplitsAddition<FileSourceSplit> addition = (SplitsAddition<FileSourceSplit>) splitsChanges;

298

for (FileSourceSplit split : addition.splits()) {

299

try {

300

FileReaderState readerState = new FileReaderState(

301

split.path(),

302

split.offset(),

303

split.length()

304

);

305

splitReaders.put(split.splitId(), readerState);

306

LOG.info("Added split {} for file {}", split.splitId(), split.path());

307

} catch (IOException e) {

308

LOG.error("Failed to open split {} for file {}", split.splitId(), split.path(), e);

309

}

310

}

311

} else if (splitsChanges instanceof SplitsRemoval) {

312

SplitsRemoval<FileSourceSplit> removal = (SplitsRemoval<FileSourceSplit>) splitsChanges;

313

for (String splitId : removal.splitIds()) {

314

FileReaderState readerState = splitReaders.remove(splitId);

315

if (readerState != null) {

316

readerState.close();

317

LOG.info("Removed split {}", splitId);

318

}

319

}

320

}

321

}

322

323

@Override

324

public void wakeUp() {

325

// Interrupt any blocking reads if needed

326

}

327

328

@Override

329

public void pauseOrResumeSplits(

330

Collection<FileSourceSplit> splitsToPause,

331

Collection<FileSourceSplit> splitsToResume) {

332

333

// Pause splits

334

for (FileSourceSplit split : splitsToPause) {

335

pausedSplits.add(split.splitId());

336

LOG.info("Paused split {}", split.splitId());

337

}

338

339

// Resume splits

340

for (FileSourceSplit split : splitsToResume) {

341

pausedSplits.remove(split.splitId());

342

LOG.info("Resumed split {}", split.splitId());

343

}

344

}

345

346

@Override

347

public void close() throws Exception {

348

// Close all split readers

349

for (FileReaderState readerState : splitReaders.values()) {

350

readerState.close();

351

}

352

splitReaders.clear();

353

}

354

}

355

356

// Helper class for managing file reading state

357

public class FileReaderState {

358

private final Path path;

359

private final long startOffset;

360

private final long length;

361

private BufferedReader reader;

362

private long currentOffset;

363

364

public FileReaderState(Path path, long startOffset, long length) throws IOException {

365

this.path = path;

366

this.startOffset = startOffset;

367

this.length = length;

368

this.currentOffset = startOffset;

369

370

// Open file and skip to start offset

371

FileInputStream fis = new FileInputStream(path.toFile());

372

fis.skip(startOffset);

373

this.reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));

374

}

375

376

public String readLine() throws IOException {

377

if (currentOffset >= startOffset + length) {

378

return null; // End of split

379

}

380

381

String line = reader.readLine();

382

if (line != null) {

383

currentOffset += line.getBytes(StandardCharsets.UTF_8).length + 1; // +1 for newline

384

}

385

return line;

386

}

387

388

public boolean hasMore() {

389

return currentOffset < startOffset + length;

390

}

391

392

public long getCurrentOffset() {

393

return currentOffset;

394

}

395

396

public void close() {

397

if (reader != null) {

398

try {

399

reader.close();

400

} catch (IOException e) {

401

LOG.warn("Error closing file reader for {}", path, e);

402

}

403

}

404

}

405

}

406

```

407

408

### RecordEmitter Implementation

409

410

```java

411

public class FileRecordEmitter implements RecordEmitter<FileRecord, String, FileSourceSplitState> {

412

413

@Override

414

public void emitRecord(

415

FileRecord element,

416

SourceOutput<String> output,

417

FileSourceSplitState splitState) throws Exception {

418

419

// Update split state with current offset

420

splitState.setOffset(element.getOffset());

421

422

// Extract the actual data and emit

423

String record = element.getData();

424

425

// Emit with timestamp if available

426

if (element.getTimestamp() > 0) {

427

output.collect(record, element.getTimestamp());

428

} else {

429

output.collect(record);

430

}

431

}

432

}

433

434

// Record type for file data

435

public class FileRecord {

436

private final String splitId;

437

private final String data;

438

private final long offset;

439

private final long timestamp;

440

441

public FileRecord(String splitId, String data, long offset, long timestamp) {

442

this.splitId = splitId;

443

this.data = data;

444

this.offset = offset;

445

this.timestamp = timestamp;

446

}

447

448

public String getSplitId() { return splitId; }

449

public String getData() { return data; }

450

public long getOffset() { return offset; }

451

public long getTimestamp() { return timestamp; }

452

}

453

```

454

455

### Advanced Source Reader with Watermarks

456

457

```java

458

public class WatermarkingFileSourceReader extends SingleThreadMultiplexSourceReaderBase<

459

FileRecord, String, FileSourceSplit, FileSourceSplitState> {

460

461

private final WatermarkStrategy<String> watermarkStrategy;

462

private final Map<String, WatermarkOutput> splitWatermarkOutputs;

463

464

public WatermarkingFileSourceReader(

465

Configuration config,

466

SourceReaderContext context,

467

WatermarkStrategy<String> watermarkStrategy) {

468

super(

469

() -> new FileSystemSplitReader(config),

470

new WatermarkingFileRecordEmitter(watermarkStrategy),

471

config,

472

context

473

);

474

this.watermarkStrategy = watermarkStrategy;

475

this.splitWatermarkOutputs = new HashMap<>();

476

}

477

478

// ... other methods same as before ...

479

}

480

481

public class WatermarkingFileRecordEmitter

482

implements RecordEmitter<FileRecord, String, FileSourceSplitState> {

483

484

private final WatermarkStrategy<String> watermarkStrategy;

485

private final Map<String, WatermarkGenerator<String>> watermarkGenerators;

486

private final Map<String, TimestampAssigner<String>> timestampAssigners;

487

488

public WatermarkingFileRecordEmitter(WatermarkStrategy<String> watermarkStrategy) {

489

this.watermarkStrategy = watermarkStrategy;

490

this.watermarkGenerators = new HashMap<>();

491

this.timestampAssigners = new HashMap<>();

492

}

493

494

@Override

495

public void emitRecord(

496

FileRecord element,

497

SourceOutput<String> output,

498

FileSourceSplitState splitState) throws Exception {

499

500

String splitId = element.getSplitId();

501

String record = element.getData();

502

503

// Update split state

504

splitState.setOffset(element.getOffset());

505

506

// Get or create watermark generator for this split

507

WatermarkGenerator<String> watermarkGenerator = watermarkGenerators.computeIfAbsent(

508

splitId,

509

k -> watermarkStrategy.createWatermarkGenerator(() -> new WatermarkGeneratorSupplier.Context() {

510

@Override

511

public MetricGroup getMetricGroup() {

512

return new UnregisteredMetricsGroup();

513

}

514

515

@Override

516

public ProcessingTimeService getProcessingTimeService() {

517

return new TestProcessingTimeService();

518

}

519

})

520

);

521

522

// Get or create timestamp assigner

523

TimestampAssigner<String> timestampAssigner = timestampAssigners.computeIfAbsent(

524

splitId,

525

k -> watermarkStrategy.createTimestampAssigner(() -> new TimestampAssignerSupplier.Context() {

526

@Override

527

public MetricGroup getMetricGroup() {

528

return new UnregisteredMetricsGroup();

529

}

530

531

@Override

532

public ProcessingTimeService getProcessingTimeService() {

533

return new TestProcessingTimeService();

534

}

535

})

536

);

537

538

// Assign timestamp

539

long timestamp = timestampAssigner.extractTimestamp(record, element.getTimestamp());

540

541

// Update watermark generator

542

watermarkGenerator.onEvent(record, timestamp, new WatermarkOutput() {

543

@Override

544

public void emitWatermark(Watermark watermark) {

545

output.emitWatermark(watermark);

546

}

547

548

@Override

549

public void markIdle() {

550

output.markIdle();

551

}

552

553

@Override

554

public void markActive() {

555

output.markActive();

556

}

557

});

558

559

// Emit record with timestamp

560

output.collect(record, timestamp);

561

}

562

}

563

```

564

565

## Configuration and Options

566

567

### SourceReaderOptions

568

569

```java

570

// Available configuration options

571

public static final ConfigOption<Integer> ELEMENT_QUEUE_CAPACITY =

572

ConfigOptions.key("source.reader.element-queue-capacity")

573

.intType()

574

.defaultValue(1000)

575

.withDescription("The capacity of the element queue in the source reader.");

576

577

public static final ConfigOption<Duration> SOURCE_READER_CLOSE_TIMEOUT =

578

ConfigOptions.key("source.reader.close-timeout")

579

.durationType()

580

.defaultValue(Duration.ofSeconds(30))

581

.withDescription("The timeout for closing the source reader.");

582

583

// Usage in source reader

584

public class ConfigurableFileSourceReader extends SingleThreadMultiplexSourceReaderBase<

585

FileRecord, String, FileSourceSplit, FileSourceSplitState> {

586

587

public ConfigurableFileSourceReader(

588

Configuration config,

589

SourceReaderContext context) {

590

super(

591

() -> new FileSystemSplitReader(config),

592

new FileRecordEmitter(),

593

config,

594

context

595

);

596

597

// Access configuration options

598

int queueCapacity = config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);

599

Duration closeTimeout = config.get(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT);

600

601

LOG.info("Source reader configured with queue capacity: {}, close timeout: {}",

602

queueCapacity, closeTimeout);

603

}

604

}

605

```

606

607

## Best Practices

608

609

### Performance Optimization

610

611

1. **Batch Size Tuning**

612

```java

613

@Override

614

public RecordsWithSplitIds<FileRecord> fetch() throws IOException {

615

// Adjust batch size based on record size and processing speed

616

int batchSize = calculateOptimalBatchSize();

617

618

Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();

619

620

for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {

621

List<FileRecord> records = new ArrayList<>();

622

623

// Read optimal batch size for each split

624

for (int i = 0; i < batchSize && readerState.hasMore(); i++) {

625

// ... reading logic

626

}

627

}

628

}

629

630

private int calculateOptimalBatchSize() {

631

// Consider factors like:

632

// - Available memory

633

// - Record processing time

634

// - Network latency

635

// - Split characteristics

636

return Math.min(1000, Runtime.getRuntime().availableProcessors() * 100);

637

}

638

```

639

640

2. **Efficient State Management**

641

```java

642

@Override

643

protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {

644

// Efficient cleanup of finished splits

645

finishedSplitIds.forEach((splitId, splitState) -> {

646

try {

647

// Close resources immediately

648

splitState.cleanup();

649

650

// Update metrics

651

updateSplitFinishedMetrics(splitId);

652

653

// Log completion with important details

654

LOG.info("Split {} finished: processed {} bytes in {} ms",

655

splitId, splitState.getBytesProcessed(), splitState.getProcessingTime());

656

} catch (Exception e) {

657

LOG.warn("Error cleaning up finished split {}", splitId, e);

658

}

659

});

660

}

661

```

662

663

3. **Memory Management**

664

```java

665

public class MemoryAwareFileReader implements SplitReader<FileRecord, FileSourceSplit> {

666

private final MemoryManager memoryManager;

667

private final long maxMemoryUsage;

668

669

@Override

670

public RecordsWithSplitIds<FileRecord> fetch() throws IOException {

671

// Check memory usage before reading

672

if (memoryManager.getCurrentUsage() > maxMemoryUsage) {

673

// Return smaller batch or pause reading

674

return RecordsBySplits.forRecords(Collections.emptyMap());

675

}

676

677

// Normal fetch logic

678

return fetchRecords();

679

}

680

}

681

```

682

683

### Error Handling and Resilience

684

685

1. **Split-Level Error Isolation**

686

```java

687

@Override

688

public RecordsWithSplitIds<FileRecord> fetch() throws IOException {

689

Map<String, Collection<FileRecord>> recordsBySplit = new HashMap<>();

690

Set<String> finishedSplits = new HashSet<>();

691

List<String> failedSplits = new ArrayList<>();

692

693

for (Map.Entry<String, FileReaderState> entry : splitReaders.entrySet()) {

694

String splitId = entry.getKey();

695

696

try {

697

// Read from split

698

List<FileRecord> records = readFromSplit(entry.getValue());

699

if (!records.isEmpty()) {

700

recordsBySplit.put(splitId, records);

701

}

702

} catch (IOException e) {

703

LOG.error("Error reading from split {}, marking as failed", splitId, e);

704

failedSplits.add(splitId);

705

}

706

}

707

708

// Handle failed splits

709

handleFailedSplits(failedSplits);

710

711

return RecordsBySplits.forRecords(recordsBySplit, finishedSplits);

712

}

713

714

private void handleFailedSplits(List<String> failedSplits) {

715

for (String splitId : failedSplits) {

716

FileReaderState readerState = splitReaders.remove(splitId);

717

if (readerState != null) {

718

readerState.close();

719

720

// Optionally retry split or report failure

721

reportSplitFailure(splitId);

722

}

723

}

724

}

725

```

726

727

2. **Graceful Degradation**

728

```java

729

public class ResilientFileSourceReader extends SingleThreadMultiplexSourceReaderBase<

730

FileRecord, String, FileSourceSplit, FileSourceSplitState> {

731

732

private final AtomicInteger failedSplitCount = new AtomicInteger(0);

733

private final int maxFailedSplits;

734

735

@Override

736

protected void onSplitFinished(Map<String, FileSourceSplitState> finishedSplitIds) {

737

// Check if too many splits have failed

738

if (failedSplitCount.get() > maxFailedSplits) {

739

LOG.warn("Too many splits have failed ({}), source may be degraded",

740

failedSplitCount.get());

741

}

742

743

super.onSplitFinished(finishedSplitIds);

744

}

745

}

746

```

747

748

The Source Reader Framework provides a robust foundation for building sophisticated source readers with automatic coordination, efficient resource management, and comprehensive error handling capabilities.