or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mddatastream-bridge.mdexpressions.mdfunctions.mdindex.mdsql-gateway.mdtable-operations.md

connectors.mddocs/

0

# Connector Framework

1

2

Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development in Apache Flink's Table API.

3

4

## Capabilities

5

6

### Connector Framework Interfaces

7

8

Base interfaces for implementing custom table sources and sinks.

9

10

```java { .api }

11

/**

12

* Base interface for dynamic table sources

13

*/

14

public interface DynamicTableSource extends TableSource {

15

/**

16

* Create a copy of this source with additional abilities

17

* @param abilities Required abilities for the source

18

* @return New source instance with the specified abilities

19

*/

20

public DynamicTableSource copy();

21

22

/**

23

* Get a string summary of this source

24

* @return Human-readable summary

25

*/

26

public String asSummaryString();

27

}

28

29

/**

30

* Interface for scan-based table sources (batch and streaming)

31

*/

32

public interface ScanTableSource extends DynamicTableSource {

33

/**

34

* Get the change log mode supported by this source

35

* @return Change log mode (INSERT_ONLY, UPSERT, etc.)

36

*/

37

public ChangelogMode getChangelogMode();

38

39

/**

40

* Create the actual source provider for runtime

41

* @param context Context with parallelism and other runtime info

42

* @return Source provider for the Flink runtime

43

*/

44

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);

45

}

46

47

/**

48

* Interface for lookup table sources (for joins)

49

*/

50

public interface LookupTableSource extends DynamicTableSource {

51

/**

52

* Create the lookup runtime provider

53

* @param context Context with lookup configuration

54

* @return Runtime provider for lookup operations

55

*/

56

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

57

}

58

59

/**

60

* Base interface for dynamic table sinks

61

*/

62

public interface DynamicTableSink extends TableSink {

63

/**

64

* Get the change log mode consumed by this sink

65

* @param requestedMode Requested change log mode from query

66

* @return Change log mode accepted by this sink

67

*/

68

public ChangelogMode getChangelogMode(ChangelogMode requestedMode);

69

70

/**

71

* Create the sink runtime provider

72

* @param context Context with parallelism and other runtime info

73

* @return Sink provider for the Flink runtime

74

*/

75

public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context);

76

77

/**

78

* Create a copy of this sink with additional abilities

79

* @param abilities Required abilities for the sink

80

* @return New sink instance with the specified abilities

81

*/

82

public DynamicTableSink copy();

83

84

/**

85

* Get a string summary of this sink

86

* @return Human-readable summary

87

*/

88

public String asSummaryString();

89

}

90

```

91

92

### Connector Factory Interface

93

94

Interface for creating connectors dynamically based on configuration.

95

96

```java { .api }

97

/**

98

* Factory interface for creating dynamic table sources

99

*/

100

public interface DynamicTableSourceFactory extends Factory {

101

/**

102

* Create a table source based on the context

103

* @param context Creation context with options and schema

104

* @return Created table source instance

105

*/

106

public DynamicTableSource createDynamicTableSource(Context context);

107

}

108

109

/**

110

* Factory interface for creating dynamic table sinks

111

*/

112

public interface DynamicTableSinkFactory extends Factory {

113

/**

114

* Create a table sink based on the context

115

* @param context Creation context with options and schema

116

* @return Created table sink instance

117

*/

118

public DynamicTableSink createDynamicTableSink(Context context);

119

}

120

121

/**

122

* Base factory interface with common metadata

123

*/

124

public interface Factory {

125

/**

126

* Get unique identifier for this factory

127

* @return Factory identifier (e.g., "kafka", "filesystem")

128

*/

129

public String factoryIdentifier();

130

131

/**

132

* Get required configuration options

133

* @return Set of required configuration keys

134

*/

135

public Set<ConfigOption<?>> requiredOptions();

136

137

/**

138

* Get optional configuration options

139

* @return Set of optional configuration keys

140

*/

141

public Set<ConfigOption<?>> optionalOptions();

142

}

143

```

144

145

### Built-in Connectors

146

147

Ready-to-use connectors included in the uber JAR for testing and development.

148

149

```java { .api }

150

/**

151

* Factory for data generation connector (for testing)

152

*/

153

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

154

@Override

155

public String factoryIdentifier() {

156

return "datagen";

157

}

158

159

@Override

160

public Set<ConfigOption<?>> requiredOptions() {

161

return Collections.emptySet();

162

}

163

164

@Override

165

public Set<ConfigOption<?>> optionalOptions() {

166

return Set.of(

167

DataGenConnectorOptions.ROWS_PER_SECOND,

168

DataGenConnectorOptions.NUMBER_OF_ROWS,

169

DataGenConnectorOptions.FIELDS

170

);

171

}

172

}

173

174

/**

175

* Factory for print sink connector (for debugging)

176

*/

177

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

178

@Override

179

public String factoryIdentifier() {

180

return "print";

181

}

182

183

@Override

184

public Set<ConfigOption<?>> requiredOptions() {

185

return Collections.emptySet();

186

}

187

188

@Override

189

public Set<ConfigOption<?>> optionalOptions() {

190

return Set.of(

191

PrintConnectorOptions.PRINT_IDENTIFIER,

192

PrintConnectorOptions.STANDARD_ERROR

193

);

194

}

195

}

196

197

/**

198

* Factory for blackhole sink connector (for performance testing)

199

*/

200

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

201

@Override

202

public String factoryIdentifier() {

203

return "blackhole";

204

}

205

206

@Override

207

public Set<ConfigOption<?>> requiredOptions() {

208

return Collections.emptySet();

209

}

210

211

@Override

212

public Set<ConfigOption<?>> optionalOptions() {

213

return Collections.emptySet();

214

}

215

}

216

```

217

218

**Built-in Connector Usage:**

219

220

```java

221

// Data generation source for testing

222

tEnv.executeSql("CREATE TABLE test_source (" +

223

"id BIGINT," +

224

"name STRING," +

225

"amount DECIMAL(10,2)," +

226

"event_time TIMESTAMP(3)" +

227

") WITH (" +

228

"'connector' = 'datagen'," +

229

"'rows-per-second' = '100'," +

230

"'number-of-rows' = '10000'," +

231

"'fields.id.kind' = 'sequence'," +

232

"'fields.id.start' = '1'," +

233

"'fields.id.end' = '10000'," +

234

"'fields.name.length' = '10'," +

235

"'fields.amount.min' = '1.00'," +

236

"'fields.amount.max' = '1000.00'" +

237

")");

238

239

// Print sink for debugging output

240

tEnv.executeSql("CREATE TABLE debug_sink (" +

241

"id BIGINT," +

242

"name STRING," +

243

"amount DECIMAL(10,2)" +

244

") WITH (" +

245

"'connector' = 'print'," +

246

"'print-identifier' = 'debug'," +

247

"'standard-error' = 'false'" +

248

")");

249

250

// Blackhole sink for performance testing

251

tEnv.executeSql("CREATE TABLE perf_sink (" +

252

"id BIGINT," +

253

"name STRING," +

254

"processed_time TIMESTAMP(3)" +

255

") WITH (" +

256

"'connector' = 'blackhole'" +

257

")");

258

259

// Use the connectors

260

tEnv.executeSql("INSERT INTO debug_sink SELECT id, name, amount FROM test_source");

261

```

262

263

### Connector Configuration Options

264

265

Configuration utilities for connector options and validation.

266

267

```java { .api }

268

/**

269

* Configuration options for DataGen connector

270

*/

271

public class DataGenConnectorOptions {

272

public static final ConfigOption<Long> ROWS_PER_SECOND = ConfigOptions

273

.key("rows-per-second")

274

.longType()

275

.defaultValue(10000L)

276

.withDescription("Rows per second to generate");

277

278

public static final ConfigOption<Long> NUMBER_OF_ROWS = ConfigOptions

279

.key("number-of-rows")

280

.longType()

281

.noDefaultValue()

282

.withDescription("Total number of rows to generate");

283

284

public static final ConfigOption<Map<String, String>> FIELDS = ConfigOptions

285

.key("fields")

286

.mapType()

287

.noDefaultValue()

288

.withDescription("Field-specific generation options");

289

}

290

291

/**

292

* Configuration options for Print connector

293

*/

294

public class PrintConnectorOptions {

295

public static final ConfigOption<String> PRINT_IDENTIFIER = ConfigOptions

296

.key("print-identifier")

297

.stringType()

298

.noDefaultValue()

299

.withDescription("Identifier for print output");

300

301

public static final ConfigOption<Boolean> STANDARD_ERROR = ConfigOptions

302

.key("standard-error")

303

.booleanType()

304

.defaultValue(false)

305

.withDescription("Print to standard error instead of standard out");

306

}

307

```

308

309

### Connector Abilities

310

311

Interface for extending connector capabilities with additional features.

312

313

```java { .api }

314

/**

315

* Ability for sources to support reading metadata

316

*/

317

public interface SupportsReadingMetadata {

318

/**

319

* Get available metadata keys that can be read

320

* @return Map of metadata key to data type

321

*/

322

public Map<String, DataType> listReadableMetadata();

323

324

/**

325

* Apply metadata reading configuration

326

* @param metadataKeys List of metadata keys to read

327

* @param producedDataType Data type that includes metadata

328

*/

329

public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);

330

}

331

332

/**

333

* Ability for sinks to support writing metadata

334

*/

335

public interface SupportsWritingMetadata {

336

/**

337

* Get available metadata keys that can be written

338

* @return Map of metadata key to data type

339

*/

340

public Map<String, DataType> listWritableMetadata();

341

342

/**

343

* Apply metadata writing configuration

344

* @param metadataKeys List of metadata keys to write

345

* @param consumedDataType Data type that includes metadata

346

*/

347

public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType);

348

}

349

350

/**

351

* Ability for sources to support projection pushdown

352

*/

353

public interface SupportsProjectionPushDown {

354

/**

355

* Apply projection pushdown optimization

356

* @param projectedFields Indices of fields to project

357

* @param producedDataType Data type after projection

358

*/

359

public void applyProjection(int[][] projectedFields, DataType producedDataType);

360

}

361

362

/**

363

* Ability for sources to support filter pushdown

364

*/

365

public interface SupportsFilterPushDown {

366

/**

367

* Apply filter pushdown optimization

368

* @param filters List of filters to push down

369

* @return Result indicating which filters were accepted

370

*/

371

public Result applyFilters(List<ResolvedExpression> filters);

372

373

/**

374

* Result of filter pushdown application

375

*/

376

public static final class Result {

377

public static Result of(List<ResolvedExpression> acceptedFilters,

378

List<ResolvedExpression> remainingFilters) {

379

return new Result(acceptedFilters, remainingFilters);

380

}

381

382

public List<ResolvedExpression> getAcceptedFilters() { return acceptedFilters; }

383

public List<ResolvedExpression> getRemainingFilters() { return remainingFilters; }

384

}

385

}

386

387

/**

388

* Ability for sinks to support overwrite mode

389

*/

390

public interface SupportsOverwrite {

391

/**

392

* Apply overwrite mode configuration

393

* @param overwrite Whether to overwrite existing data

394

*/

395

public void applyOverwrite(boolean overwrite);

396

}

397

398

/**

399

* Ability for sinks to support partitioning

400

*/

401

public interface SupportsPartitioning {

402

/**

403

* Apply partitioning configuration

404

* @param partitionKeys List of partition key names

405

*/

406

public void applyStaticPartition(Map<String, String> partition);

407

}

408

```

409

410

### Custom Connector Development

411

412

Template and utilities for developing custom connectors.

413

414

```java { .api }

415

// Example custom source implementation

416

public class CustomTableSource implements ScanTableSource, SupportsReadingMetadata {

417

private final ResolvedSchema schema;

418

private final Map<String, String> options;

419

private List<String> metadataKeys;

420

421

public CustomTableSource(ResolvedSchema schema, Map<String, String> options) {

422

this.schema = schema;

423

this.options = options;

424

this.metadataKeys = new ArrayList<>();

425

}

426

427

@Override

428

public ChangelogMode getChangelogMode() {

429

return ChangelogMode.insertOnly();

430

}

431

432

@Override

433

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

434

return SourceProvider.of(new CustomSourceFunction(schema, options, metadataKeys));

435

}

436

437

@Override

438

public Map<String, DataType> listReadableMetadata() {

439

Map<String, DataType> metadata = new HashMap<>();

440

metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));

441

metadata.put("source-id", DataTypes.STRING());

442

return metadata;

443

}

444

445

@Override

446

public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {

447

this.metadataKeys = metadataKeys;

448

}

449

450

@Override

451

public DynamicTableSource copy() {

452

CustomTableSource copy = new CustomTableSource(schema, options);

453

copy.metadataKeys = new ArrayList<>(this.metadataKeys);

454

return copy;

455

}

456

457

@Override

458

public String asSummaryString() {

459

return "CustomSource";

460

}

461

}

462

463

// Example custom sink implementation

464

public class CustomTableSink implements DynamicTableSink, SupportsWritingMetadata {

465

private final ResolvedSchema schema;

466

private final Map<String, String> options;

467

private List<String> metadataKeys;

468

469

@Override

470

public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {

471

return ChangelogMode.insertOnly();

472

}

473

474

@Override

475

public SinkRuntimeProvider getSinkRuntimeProvider(SinkContext context) {

476

return SinkProvider.of(new CustomSinkFunction(schema, options, metadataKeys));

477

}

478

479

@Override

480

public Map<String, DataType> listWritableMetadata() {

481

Map<String, DataType> metadata = new HashMap<>();

482

metadata.put("timestamp", DataTypes.TIMESTAMP_LTZ(3));

483

metadata.put("partition", DataTypes.STRING());

484

return metadata;

485

}

486

487

@Override

488

public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {

489

this.metadataKeys = metadataKeys;

490

}

491

492

@Override

493

public DynamicTableSink copy() {

494

CustomTableSink copy = new CustomTableSink(schema, options);

495

copy.metadataKeys = new ArrayList<>(this.metadataKeys);

496

return copy;

497

}

498

499

@Override

500

public String asSummaryString() {

501

return "CustomSink";

502

}

503

}

504

505

// Custom factory implementation

506

public class CustomConnectorFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

507

@Override

508

public String factoryIdentifier() {

509

return "custom";

510

}

511

512

@Override

513

public Set<ConfigOption<?>> requiredOptions() {

514

return Set.of(CustomOptions.HOST, CustomOptions.PORT);

515

}

516

517

@Override

518

public Set<ConfigOption<?>> optionalOptions() {

519

return Set.of(CustomOptions.USERNAME, CustomOptions.PASSWORD);

520

}

521

522

@Override

523

public DynamicTableSource createDynamicTableSource(Context context) {

524

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

525

helper.validate();

526

527

ReadableConfig options = helper.getOptions();

528

ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();

529

530

return new CustomTableSource(schema, options.toMap());

531

}

532

533

@Override

534

public DynamicTableSink createDynamicTableSink(Context context) {

535

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

536

helper.validate();

537

538

ReadableConfig options = helper.getOptions();

539

ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();

540

541

return new CustomTableSink(schema, options.toMap());

542

}

543

}

544

```

545

546

### Connector Utilities

547

548

Helper utilities for connector development and configuration.

549

550

```java { .api }

551

/**

552

* Utility for connector factory helpers

553

*/

554

public class FactoryUtil {

555

/**

556

* Create a table factory helper for validation and option extraction

557

* @param factory The connector factory

558

* @param context Creation context

559

* @return Helper for option validation and extraction

560

*/

561

public static TableFactoryHelper createTableFactoryHelper(Factory factory,

562

DynamicTableFactory.Context context);

563

564

public static final class TableFactoryHelper {

565

/**

566

* Validate required and optional options

567

*/

568

public void validate();

569

570

/**

571

* Get validated configuration options

572

* @return ReadableConfig with validated options

573

*/

574

public ReadableConfig getOptions();

575

}

576

}

577

578

/**

579

* Utility for changelog mode operations

580

*/

581

public class ChangelogMode {

582

/**

583

* Create insert-only changelog mode

584

* @return ChangelogMode for append-only sources/sinks

585

*/

586

public static ChangelogMode insertOnly();

587

588

/**

589

* Create upsert changelog mode

590

* @return ChangelogMode supporting inserts, updates, and deletes

591

*/

592

public static ChangelogMode upsert();

593

594

/**

595

* Create all-changes changelog mode

596

* @return ChangelogMode supporting all change types

597

*/

598

public static ChangelogMode all();

599

}

600

```

601

602

### Testing Connectors

603

604

Utilities and patterns for testing custom connectors.

605

606

```java { .api }

607

// Test utilities for connector development

608

public class ConnectorTestUtils {

609

/**

610

* Create test table environment for connector testing

611

* @return TableEnvironment configured for testing

612

*/

613

public static TableEnvironment createTestTableEnvironment() {

614

EnvironmentSettings settings = EnvironmentSettings.newInstance()

615

.inStreamingMode()

616

.build();

617

return TableEnvironment.create(settings);

618

}

619

620

/**

621

* Execute connector test with sample data

622

* @param sourceConnector Source connector configuration

623

* @param sinkConnector Sink connector configuration

624

* @param testData Sample data for testing

625

*/

626

public static void executeConnectorTest(String sourceConnector,

627

String sinkConnector,

628

List<Row> testData) {

629

// Implementation for automated connector testing

630

}

631

}

632

633

// Example connector test

634

@Test

635

public void testCustomConnector() {

636

TableEnvironment tEnv = ConnectorTestUtils.createTestTableEnvironment();

637

638

// Register custom connector factory

639

tEnv.executeSql("CREATE TABLE source_table (" +

640

"id BIGINT," +

641

"name STRING," +

642

"amount DECIMAL(10,2)" +

643

") WITH (" +

644

"'connector' = 'custom'," +

645

"'host' = 'localhost'," +

646

"'port' = '9092'" +

647

")");

648

649

tEnv.executeSql("CREATE TABLE sink_table (" +

650

"id BIGINT," +

651

"name STRING," +

652

"amount DECIMAL(10,2)" +

653

") WITH (" +

654

"'connector' = 'print'" +

655

")");

656

657

// Test data pipeline

658

TableResult result = tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table");

659

660

// Verify results

661

assertThat(result.getJobClient()).isPresent();

662

}

663

```