or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

connectors.mddocs/

0

# Connectors

1

2

Apache Flink Core provides comprehensive connector APIs for building data sources and sinks that integrate with external systems. These APIs enable developers to create efficient, fault-tolerant connectors with features like checkpointing, parallelism, and exactly-once semantics.

3

4

## Source Connector Framework

5

6

### Basic Source Interface

7

8

The foundation for all Flink data sources.

9

10

```java { .api }

11

import org.apache.flink.api.connector.source.*;

12

import org.apache.flink.core.io.SimpleVersionedSerializer;

13

14

// Basic source implementation

15

public class CustomSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {

16

17

@Override

18

public Boundedness getBoundedness() {

19

return Boundedness.CONTINUOUS_UNBOUNDED; // or BOUNDED

20

}

21

22

@Override

23

public SourceReader<MyRecord, MySourceSplit> createReader(SourceReaderContext readerContext) {

24

return new MySourceReader(readerContext);

25

}

26

27

@Override

28

public SplitEnumerator<MySourceSplit, MyEnumeratorState> createEnumerator(

29

SplitEnumeratorContext<MySourceSplit> enumContext) {

30

return new MySplitEnumerator(enumContext);

31

}

32

33

@Override

34

public SplitEnumerator<MySourceSplit, MyEnumeratorState> restoreEnumerator(

35

SplitEnumeratorContext<MySourceSplit> enumContext,

36

MyEnumeratorState checkpoint) {

37

return new MySplitEnumerator(enumContext, checkpoint);

38

}

39

40

@Override

41

public SimpleVersionedSerializer<MySourceSplit> getSplitSerializer() {

42

return new MySourceSplitSerializer();

43

}

44

45

@Override

46

public SimpleVersionedSerializer<MyEnumeratorState> getEnumeratorCheckpointSerializer() {

47

return new MyEnumeratorStateSerializer();

48

}

49

}

50

```

51

52

### Source Split Definition

53

54

Define how data is partitioned and processed.

55

56

```java { .api }

57

import org.apache.flink.api.connector.source.SourceSplit;

58

59

// Custom source split

60

public class MySourceSplit implements SourceSplit {

61

private final String splitId;

62

private final String filepath;

63

private final long startOffset;

64

private final long endOffset;

65

66

public MySourceSplit(String splitId, String filepath, long startOffset, long endOffset) {

67

this.splitId = splitId;

68

this.filepath = filepath;

69

this.startOffset = startOffset;

70

this.endOffset = endOffset;

71

}

72

73

@Override

74

public String splitId() {

75

return splitId;

76

}

77

78

// Getters

79

public String getFilepath() { return filepath; }

80

public long getStartOffset() { return startOffset; }

81

public long getEndOffset() { return endOffset; }

82

83

@Override

84

public String toString() {

85

return String.format("MySourceSplit{id='%s', file='%s', range=[%d, %d]}",

86

splitId, filepath, startOffset, endOffset);

87

}

88

}

89

90

// Split serializer for checkpointing

91

public class MySourceSplitSerializer implements SimpleVersionedSerializer<MySourceSplit> {

92

93

@Override

94

public int getVersion() {

95

return 1;

96

}

97

98

@Override

99

public byte[] serialize(MySourceSplit split) throws IOException {

100

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

101

DataOutputStream out = new DataOutputStream(baos)) {

102

103

out.writeUTF(split.splitId());

104

out.writeUTF(split.getFilepath());

105

out.writeLong(split.getStartOffset());

106

out.writeLong(split.getEndOffset());

107

108

return baos.toByteArray();

109

}

110

}

111

112

@Override

113

public MySourceSplit deserialize(int version, byte[] serialized) throws IOException {

114

try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);

115

DataInputStream in = new DataInputStream(bais)) {

116

117

String splitId = in.readUTF();

118

String filepath = in.readUTF();

119

long startOffset = in.readLong();

120

long endOffset = in.readLong();

121

122

return new MySourceSplit(splitId, filepath, startOffset, endOffset);

123

}

124

}

125

}

126

```

127

128

### Split Enumerator

129

130

Discovers and assigns splits to source readers.

131

132

```java { .api }

133

import org.apache.flink.api.connector.source.SplitEnumerator;

134

import org.apache.flink.api.connector.source.SplitEnumeratorContext;

135

136

public class MySplitEnumerator implements SplitEnumerator<MySourceSplit, MyEnumeratorState> {

137

private final SplitEnumeratorContext<MySourceSplit> context;

138

private final Set<String> remainingFiles;

139

private final Map<Integer, Set<String>> readerAssignments;

140

141

public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context) {

142

this.context = context;

143

this.remainingFiles = discoverFiles();

144

this.readerAssignments = new HashMap<>();

145

}

146

147

public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context,

148

MyEnumeratorState restoredState) {

149

this.context = context;

150

this.remainingFiles = restoredState.getRemainingFiles();

151

this.readerAssignments = restoredState.getReaderAssignments();

152

}

153

154

@Override

155

public void start() {

156

// Initialize and assign initial splits

157

assignSplitsToReaders();

158

}

159

160

@Override

161

public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {

162

// Assign more splits when requested

163

if (!remainingFiles.isEmpty()) {

164

String nextFile = remainingFiles.iterator().next();

165

remainingFiles.remove(nextFile);

166

167

MySourceSplit split = createSplitFromFile(nextFile, subtaskId);

168

context.assignSplit(split, subtaskId);

169

170

// Track assignment

171

readerAssignments.computeIfAbsent(subtaskId, k -> new HashSet<>()).add(nextFile);

172

} else {

173

// No more splits available

174

context.signalNoMoreSplits(subtaskId);

175

}

176

}

177

178

@Override

179

public void addSplitsBack(List<MySourceSplit> splits, int subtaskId) {

180

// Handle split reassignment on failure

181

for (MySourceSplit split : splits) {

182

remainingFiles.add(split.getFilepath());

183

readerAssignments.get(subtaskId).remove(split.getFilepath());

184

}

185

}

186

187

@Override

188

public void addReader(int subtaskId) {

189

// New reader registered

190

readerAssignments.put(subtaskId, new HashSet<>());

191

assignSplitsToReader(subtaskId);

192

}

193

194

@Override

195

public MyEnumeratorState snapshotState(long checkpointId) throws Exception {

196

return new MyEnumeratorState(remainingFiles, readerAssignments);

197

}

198

199

@Override

200

public void close() throws IOException {

201

// Cleanup resources

202

}

203

204

private void assignSplitsToReaders() {

205

for (int readerId : context.registeredReaders().keySet()) {

206

assignSplitsToReader(readerId);

207

}

208

}

209

210

private void assignSplitsToReader(int readerId) {

211

// Assign initial splits to reader

212

if (!remainingFiles.isEmpty()) {

213

String file = remainingFiles.iterator().next();

214

remainingFiles.remove(file);

215

216

MySourceSplit split = createSplitFromFile(file, readerId);

217

context.assignSplit(split, readerId);

218

readerAssignments.get(readerId).add(file);

219

}

220

}

221

222

private MySourceSplit createSplitFromFile(String filepath, int readerId) {

223

String splitId = String.format("%s-%d", filepath, readerId);

224

// Calculate file offsets based on parallelism

225

return new MySourceSplit(splitId, filepath, 0, getFileSize(filepath));

226

}

227

228

private Set<String> discoverFiles() {

229

// Discover files to process

230

return new HashSet<>(Arrays.asList("file1.txt", "file2.txt", "file3.txt"));

231

}

232

233

private long getFileSize(String filepath) {

234

// Get file size for split calculation

235

return 1024 * 1024; // 1MB example

236

}

237

}

238

```

239

240

### Source Reader

241

242

Reads records from assigned splits.

243

244

```java { .api }

245

import org.apache.flink.api.connector.source.SourceReader;

246

import org.apache.flink.api.connector.source.SourceReaderContext;

247

248

public class MySourceReader implements SourceReader<MyRecord, MySourceSplit> {

249

private final SourceReaderContext context;

250

private final Queue<MySourceSplit> pendingSplits;

251

private final Map<String, MyFileReader> activeReaders;

252

253

public MySourceReader(SourceReaderContext context) {

254

this.context = context;

255

this.pendingSplits = new LinkedList<>();

256

this.activeReaders = new HashMap<>();

257

}

258

259

@Override

260

public void start() {

261

// Initialize reader

262

}

263

264

@Override

265

public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {

266

// Check for available data

267

if (activeReaders.isEmpty() && pendingSplits.isEmpty()) {

268

// Request more splits if needed

269

context.sendSplitRequest();

270

return InputStatus.NOTHING_AVAILABLE;

271

}

272

273

// Process pending splits

274

while (!pendingSplits.isEmpty()) {

275

MySourceSplit split = pendingSplits.poll();

276

MyFileReader fileReader = new MyFileReader(split);

277

activeReaders.put(split.splitId(), fileReader);

278

}

279

280

// Read records from active readers

281

boolean hasData = false;

282

Iterator<Map.Entry<String, MyFileReader>> iterator = activeReaders.entrySet().iterator();

283

284

while (iterator.hasNext()) {

285

Map.Entry<String, MyFileReader> entry = iterator.next();

286

MyFileReader reader = entry.getValue();

287

288

MyRecord record = reader.readNext();

289

if (record != null) {

290

output.collect(record);

291

hasData = true;

292

} else if (reader.isFinished()) {

293

// Split is exhausted

294

reader.close();

295

iterator.remove();

296

}

297

}

298

299

if (activeReaders.isEmpty()) {

300

return InputStatus.END_OF_INPUT;

301

}

302

303

return hasData ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;

304

}

305

306

@Override

307

public List<MySourceSplit> snapshotState(long checkpointId) {

308

// Return unprocessed splits for checkpointing

309

List<MySourceSplit> splitsToSnapshot = new ArrayList<>(pendingSplits);

310

311

// Add partially processed splits

312

for (MyFileReader reader : activeReaders.values()) {

313

if (!reader.isFinished()) {

314

splitsToSnapshot.add(reader.getCurrentSplit());

315

}

316

}

317

318

return splitsToSnapshot;

319

}

320

321

@Override

322

public void addSplits(List<MySourceSplit> splits) {

323

pendingSplits.addAll(splits);

324

}

325

326

@Override

327

public void notifyNoMoreSplits() {

328

// No more splits will be assigned

329

}

330

331

@Override

332

public void close() throws Exception {

333

for (MyFileReader reader : activeReaders.values()) {

334

reader.close();

335

}

336

activeReaders.clear();

337

}

338

}

339

```

340

341

### Built-in Sources

342

343

Using pre-built source implementations.

344

345

```java { .api }

346

import org.apache.flink.api.connector.source.lib.NumberSequenceSource;

347

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

348

349

public class BuiltInSourceExamples {

350

351

public static void numberSequenceExample() {

352

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

353

354

// Number sequence source

355

NumberSequenceSource source = new NumberSequenceSource(1, 1000000);

356

357

DataStream<Long> numbers = env.fromSource(

358

source,

359

WatermarkStrategy.noWatermarks(),

360

"number-sequence"

361

);

362

363

numbers.print();

364

}

365

366

// Custom bounded source

367

public static DataStream<String> createBoundedFileSource(StreamExecutionEnvironment env) {

368

CustomFileSource source = new CustomFileSource("/path/to/files");

369

370

return env.fromSource(

371

source,

372

WatermarkStrategy.noWatermarks(),

373

"file-source"

374

);

375

}

376

377

// Custom unbounded source with watermarks

378

public static DataStream<Event> createUnboundedEventSource(StreamExecutionEnvironment env) {

379

CustomEventSource source = new CustomEventSource("kafka-topic");

380

381

WatermarkStrategy<Event> watermarkStrategy =

382

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))

383

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

384

385

return env.fromSource(

386

source,

387

watermarkStrategy,

388

"event-source"

389

);

390

}

391

}

392

```

393

394

## Sink Connector Framework

395

396

### Basic Sink Interface

397

398

The foundation for all Flink data sinks.

399

400

```java { .api }

401

import org.apache.flink.api.connector.sink2.*;

402

403

// Simple stateless sink

404

public class MyBasicSink implements Sink<MyRecord> {

405

406

@Override

407

public SinkWriter<MyRecord> createWriter(InitContext context) throws IOException {

408

return new MyBasicSinkWriter(context);

409

}

410

}

411

412

// Basic sink writer implementation

413

public class MyBasicSinkWriter implements SinkWriter<MyRecord> {

414

private final InitContext context;

415

private final DatabaseConnection connection;

416

417

public MyBasicSinkWriter(InitContext context) throws IOException {

418

this.context = context;

419

this.connection = new DatabaseConnection();

420

}

421

422

@Override

423

public void write(MyRecord element, Context context) throws IOException, InterruptedException {

424

// Write record to external system

425

connection.insert(element);

426

}

427

428

@Override

429

public void flush(boolean endOfInput) throws IOException, InterruptedException {

430

// Flush any buffered data

431

connection.flush();

432

}

433

434

@Override

435

public void close() throws Exception {

436

connection.close();

437

}

438

}

439

```

440

441

### Stateful Sink with Checkpointing

442

443

Handle state for exactly-once guarantees.

444

445

```java { .api }

446

import org.apache.flink.api.connector.sink2.StatefulSinkWriter;

447

import org.apache.flink.api.connector.sink2.SupportsWriterState;

448

449

// Sink supporting writer state

450

public class MyStatefulSink implements Sink<MyRecord>, SupportsWriterState<MyRecord, MyWriterState> {

451

452

@Override

453

public StatefulSinkWriter<MyRecord, MyWriterState> createWriter(InitContext context)

454

throws IOException {

455

return new MyStatefulSinkWriter(context);

456

}

457

458

@Override

459

public StatefulSinkWriter<MyRecord, MyWriterState> restoreWriter(

460

InitContext context,

461

Collection<MyWriterState> recoveredState) throws IOException {

462

return new MyStatefulSinkWriter(context, recoveredState);

463

}

464

465

@Override

466

public SimpleVersionedSerializer<MyWriterState> getWriterStateSerializer() {

467

return new MyWriterStateSerializer();

468

}

469

}

470

471

// Stateful sink writer

472

public class MyStatefulSinkWriter implements StatefulSinkWriter<MyRecord, MyWriterState> {

473

private final List<MyRecord> pendingRecords;

474

private final Map<String, Long> processedCounts;

475

476

public MyStatefulSinkWriter(InitContext context) {

477

this.pendingRecords = new ArrayList<>();

478

this.processedCounts = new HashMap<>();

479

}

480

481

public MyStatefulSinkWriter(InitContext context, Collection<MyWriterState> recoveredState) {

482

this.pendingRecords = new ArrayList<>();

483

this.processedCounts = new HashMap<>();

484

485

// Restore state

486

for (MyWriterState state : recoveredState) {

487

pendingRecords.addAll(state.getPendingRecords());

488

processedCounts.putAll(state.getProcessedCounts());

489

}

490

}

491

492

@Override

493

public void write(MyRecord element, Context context) throws IOException, InterruptedException {

494

pendingRecords.add(element);

495

496

String key = element.getKey();

497

processedCounts.merge(key, 1L, Long::sum);

498

499

// Batch write when buffer is full

500

if (pendingRecords.size() >= 1000) {

501

flushPendingRecords();

502

}

503

}

504

505

@Override

506

public List<MyWriterState> snapshotState(long checkpointId) throws IOException {

507

// Create state snapshot

508

MyWriterState state = new MyWriterState(

509

new ArrayList<>(pendingRecords),

510

new HashMap<>(processedCounts),

511

checkpointId

512

);

513

514

return Collections.singletonList(state);

515

}

516

517

@Override

518

public void flush(boolean endOfInput) throws IOException, InterruptedException {

519

flushPendingRecords();

520

}

521

522

@Override

523

public void close() throws Exception {

524

flushPendingRecords();

525

}

526

527

private void flushPendingRecords() throws IOException {

528

if (!pendingRecords.isEmpty()) {

529

// Write records to external system

530

for (MyRecord record : pendingRecords) {

531

writeToExternalSystem(record);

532

}

533

pendingRecords.clear();

534

}

535

}

536

537

private void writeToExternalSystem(MyRecord record) throws IOException {

538

// Implementation specific to external system

539

}

540

}

541

```

542

543

### Two-Phase Commit Sink

544

545

Implement exactly-once semantics with two-phase commit.

546

547

```java { .api }

548

import org.apache.flink.api.connector.sink2.SupportsCommitter;

549

import org.apache.flink.api.connector.sink2.CommittingSinkWriter;

550

import org.apache.flink.api.connector.sink2.Committer;

551

552

// Sink with two-phase commit

553

public class MyTransactionalSink implements

554

Sink<MyRecord>,

555

SupportsCommitter<MyCommittable> {

556

557

@Override

558

public CommittingSinkWriter<MyRecord, MyCommittable> createWriter(InitContext context)

559

throws IOException {

560

return new MyCommittingSinkWriter(context);

561

}

562

563

@Override

564

public Committer<MyCommittable> createCommitter() throws IOException {

565

return new MyCommitter();

566

}

567

568

@Override

569

public SimpleVersionedSerializer<MyCommittable> getCommittableSerializer() {

570

return new MyCommittableSerializer();

571

}

572

}

573

574

// Committing sink writer (first phase)

575

public class MyCommittingSinkWriter implements CommittingSinkWriter<MyRecord, MyCommittable> {

576

private final String transactionId;

577

private final List<MyRecord> currentBatch;

578

private final DatabaseTransaction transaction;

579

580

public MyCommittingSinkWriter(InitContext context) throws IOException {

581

this.transactionId = generateTransactionId(context);

582

this.currentBatch = new ArrayList<>();

583

this.transaction = new DatabaseTransaction(transactionId);

584

}

585

586

@Override

587

public void write(MyRecord element, Context context) throws IOException, InterruptedException {

588

currentBatch.add(element);

589

transaction.prepare(element);

590

}

591

592

@Override

593

public Collection<MyCommittable> prepareCommit() throws IOException, InterruptedException {

594

if (currentBatch.isEmpty()) {

595

return Collections.emptyList();

596

}

597

598

// Prepare transaction for commit

599

transaction.prepareForCommit();

600

601

MyCommittable committable = new MyCommittable(

602

transactionId,

603

new ArrayList<>(currentBatch),

604

System.currentTimeMillis()

605

);

606

607

currentBatch.clear();

608

return Collections.singletonList(committable);

609

}

610

611

@Override

612

public void flush(boolean endOfInput) throws IOException, InterruptedException {

613

// Ensure all data is prepared

614

prepareCommit();

615

}

616

617

@Override

618

public void close() throws Exception {

619

transaction.close();

620

}

621

622

private String generateTransactionId(InitContext context) {

623

return String.format("txn_%d_%d_%d",

624

context.getSubtaskId(),

625

context.getAttemptNumber(),

626

System.currentTimeMillis());

627

}

628

}

629

630

// Committer (second phase)

631

public class MyCommitter implements Committer<MyCommittable> {

632

633

@Override

634

public void commit(Collection<CommitRequest<MyCommittable>> requests)

635

throws IOException, InterruptedException {

636

637

for (CommitRequest<MyCommittable> request : requests) {

638

MyCommittable committable = request.getCommittable();

639

640

try {

641

// Commit the transaction

642

DatabaseTransaction transaction =

643

DatabaseTransaction.resume(committable.getTransactionId());

644

transaction.commit();

645

646

} catch (Exception e) {

647

// Handle commit failure

648

throw new IOException("Failed to commit transaction: " +

649

committable.getTransactionId(), e);

650

}

651

}

652

}

653

654

@Override

655

public void close() throws Exception {

656

// Cleanup resources

657

}

658

}

659

```

660

661

### Advanced Connector Features

662

663

#### Dynamic Parallelism Inference

664

665

```java { .api }

666

import org.apache.flink.api.connector.source.DynamicParallelismInference;

667

668

public class DynamicSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState>,

669

DynamicParallelismInference {

670

671

@Override

672

public int inferParallelism(SourceReaderFactory readerFactory) {

673

// Infer optimal parallelism based on source characteristics

674

int availableFiles = discoverAvailableFiles();

675

int maxParallelism = getMaxRecommendedParallelism();

676

677

return Math.min(availableFiles, maxParallelism);

678

}

679

680

private int discoverAvailableFiles() {

681

// Count available data partitions/files

682

return 10; // Example

683

}

684

685

private int getMaxRecommendedParallelism() {

686

// Based on external system limits or performance characteristics

687

return 50;

688

}

689

}

690

```

691

692

#### Rate Limiting

693

694

```java { .api }

695

import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;

696

697

public class RateLimitedSourceReader implements SourceReader<MyRecord, MySourceSplit> {

698

private final SourceReaderContext context;

699

private final RateLimiterStrategy rateLimiter;

700

701

public RateLimitedSourceReader(SourceReaderContext context) {

702

this.context = context;

703

this.rateLimiter = RateLimiterStrategy.perSecond(1000); // 1000 records/sec

704

}

705

706

@Override

707

public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {

708

// Check rate limit before processing

709

if (!rateLimiter.tryAcquire(1)) {

710

return InputStatus.NOTHING_AVAILABLE;

711

}

712

713

// Regular record processing

714

MyRecord record = readNextRecord();

715

if (record != null) {

716

output.collect(record);

717

return InputStatus.MORE_AVAILABLE;

718

}

719

720

return InputStatus.NOTHING_AVAILABLE;

721

}

722

723

private MyRecord readNextRecord() {

724

// Read from external source

725

return null; // Implementation specific

726

}

727

}

728

```

729

730

## Connector Utilities and Best Practices

731

732

### Error Handling and Retry Logic

733

734

```java { .api }

735

public class RobustSinkWriter implements SinkWriter<MyRecord> {

736

private final RetryPolicy retryPolicy;

737

private final DeadLetterQueue<MyRecord> dlq;

738

739

public RobustSinkWriter(InitContext context) {

740

this.retryPolicy = RetryPolicy.builder()

741

.maxAttempts(3)

742

.backoff(Duration.ofSeconds(1), Duration.ofSeconds(30))

743

.build();

744

this.dlq = new DeadLetterQueue<>();

745

}

746

747

@Override

748

public void write(MyRecord element, Context context) throws IOException, InterruptedException {

749

retryPolicy.execute(() -> {

750

try {

751

writeToExternalSystem(element);

752

} catch (TransientException e) {

753

throw new RetryableException("Transient error, will retry", e);

754

} catch (PermanentException e) {

755

// Send to dead letter queue

756

dlq.send(element, e);

757

return; // Don't retry permanent failures

758

}

759

});

760

}

761

762

private void writeToExternalSystem(MyRecord record) throws Exception {

763

// Implementation specific

764

}

765

}

766

```

767

768

### Monitoring and Metrics

769

770

```java { .api }

771

public class InstrumentedSourceReader implements SourceReader<MyRecord, MySourceSplit> {

772

private final Counter recordsRead;

773

private final Counter errors;

774

private final Histogram readLatency;

775

private final Gauge<Integer> pendingSplits;

776

777

public InstrumentedSourceReader(SourceReaderContext context) {

778

MetricGroup metricGroup = context.metricGroup();

779

780

this.recordsRead = metricGroup.counter("records_read");

781

this.errors = metricGroup.counter("errors");

782

this.readLatency = metricGroup.histogram("read_latency");

783

this.pendingSplits = metricGroup.gauge("pending_splits",

784

() -> this.pendingSplitQueue.size());

785

}

786

787

@Override

788

public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {

789

long startTime = System.nanoTime();

790

791

try {

792

MyRecord record = readRecord();

793

if (record != null) {

794

output.collect(record);

795

recordsRead.inc();

796

readLatency.update(System.nanoTime() - startTime);

797

return InputStatus.MORE_AVAILABLE;

798

}

799

return InputStatus.NOTHING_AVAILABLE;

800

801

} catch (Exception e) {

802

errors.inc();

803

throw e;

804

}

805

}

806

}

807

```

808

809

### Connector Configuration

810

811

```java { .api }

812

public class ConfigurableSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {

813

private final MySourceConfig config;

814

815

public ConfigurableSource(MySourceConfig config) {

816

this.config = config;

817

}

818

819

public static class MySourceConfig implements Serializable {

820

private final String connectionUrl;

821

private final int batchSize;

822

private final Duration pollInterval;

823

private final boolean enableMetrics;

824

825

private MySourceConfig(Builder builder) {

826

this.connectionUrl = builder.connectionUrl;

827

this.batchSize = builder.batchSize;

828

this.pollInterval = builder.pollInterval;

829

this.enableMetrics = builder.enableMetrics;

830

}

831

832

public static Builder builder() {

833

return new Builder();

834

}

835

836

public static class Builder {

837

private String connectionUrl;

838

private int batchSize = 1000;

839

private Duration pollInterval = Duration.ofSeconds(1);

840

private boolean enableMetrics = true;

841

842

public Builder connectionUrl(String url) {

843

this.connectionUrl = url;

844

return this;

845

}

846

847

public Builder batchSize(int size) {

848

this.batchSize = size;

849

return this;

850

}

851

852

public Builder pollInterval(Duration interval) {

853

this.pollInterval = interval;

854

return this;

855

}

856

857

public Builder enableMetrics(boolean enable) {

858

this.enableMetrics = enable;

859

return this;

860

}

861

862

public MySourceConfig build() {

863

Preconditions.checkNotNull(connectionUrl, "Connection URL is required");

864

return new MySourceConfig(this);

865

}

866

}

867

868

// Getters

869

public String getConnectionUrl() { return connectionUrl; }

870

public int getBatchSize() { return batchSize; }

871

public Duration getPollInterval() { return pollInterval; }

872

public boolean isMetricsEnabled() { return enableMetrics; }

873

}

874

}

875

876

// Usage

877

MySourceConfig config = MySourceConfig.builder()

878

.connectionUrl("jdbc:postgresql://localhost:5432/mydb")

879

.batchSize(500)

880

.pollInterval(Duration.ofSeconds(2))

881

.enableMetrics(true)

882

.build();

883

884

MySource source = new MySource(config);

885

```

886

887

Apache Flink's connector framework provides a powerful foundation for building efficient, fault-tolerant data sources and sinks. By understanding these APIs and following best practices, you can create connectors that integrate seamlessly with Flink's runtime and provide reliable data processing capabilities.