or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

connector-integration.mddocs/

0

# Connector Integration

1

2

Connector integration interfaces and utilities enable seamless integration between custom table sources/sinks and the Flink planner. These components provide the bridge between external data systems and Flink's internal execution model.

3

4

## Package Information

5

6

```java

7

import org.apache.flink.table.planner.connectors.TransformationScanProvider;

8

import org.apache.flink.table.planner.connectors.TransformationSinkProvider;

9

import org.apache.flink.table.planner.connectors.DynamicSourceUtils;

10

import org.apache.flink.table.planner.connectors.DynamicSinkUtils;

11

import org.apache.flink.table.planner.utils.ShortcutUtils;

12

import org.apache.flink.table.planner.typeutils.DataViewUtils;

13

import org.apache.flink.table.connector.source.ScanTableSource;

14

import org.apache.flink.table.connector.sink.DynamicTableSink;

15

import org.apache.flink.api.dag.Transformation;

16

import org.apache.flink.table.data.RowData;

17

```

18

19

## Capabilities

20

21

### TransformationScanProvider

22

23

Provider interface for transformation-based table sources, enabling direct integration with Flink's transformation API.

24

25

```java { .api }

26

public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {

27

28

/**

29

* Creates the transformation for this scan provider.

30

*

31

* @param context The context containing runtime information

32

* @return Transformation that produces the scanned data

33

*/

34

Transformation<RowData> createTransformation(Context context);

35

36

/**

37

* Context interface providing runtime information for transformation creation.

38

*/

39

interface Context {

40

String getTableName();

41

Configuration getConfiguration();

42

ClassLoader getClassLoader();

43

int getParallelism();

44

}

45

}

46

```

47

48

The `TransformationScanProvider` allows table sources to directly provide Flink transformations rather than going through the DataStream API. This provides more control over the execution graph and better integration with the planner's optimization process.

49

50

**Usage Example:**

51

52

```java

53

import org.apache.flink.table.planner.connectors.TransformationScanProvider;

54

import org.apache.flink.streaming.api.operators.SourceOperator;

55

import org.apache.flink.streaming.api.transformations.SourceTransformation;

56

57

public class MyTableSource implements ScanTableSource {

58

59

@Override

60

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

61

return new TransformationScanProvider() {

62

@Override

63

public Transformation<RowData> createTransformation(Context providerContext) {

64

// Create source operator

65

SourceOperator<RowData> sourceOperator = new SourceOperator<>(

66

mySourceFunction,

67

WatermarkStrategy.noWatermarks(),

68

SimpleVersionedSerializerAdapter.create(mySerializer)

69

);

70

71

// Create and configure transformation

72

SourceTransformation<RowData> transformation =

73

new SourceTransformation<>(

74

"MyTableSource",

75

sourceOperator,

76

TypeInformation.of(RowData.class),

77

providerContext.getParallelism()

78

);

79

80

return transformation;

81

}

82

};

83

}

84

}

85

```

86

87

### TransformationSinkProvider

88

89

Provider interface for transformation-based table sinks, enabling direct integration with Flink's transformation API for data output.

90

91

```java { .api }

92

public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {

93

94

/**

95

* Creates the transformation for this sink provider.

96

*

97

* @param context The context containing runtime information

98

* @return Transformation that consumes the sink data

99

*/

100

Transformation<?> createTransformation(Context context);

101

102

/**

103

* Context interface providing runtime information for transformation creation.

104

*/

105

interface Context {

106

String getTableName();

107

Configuration getConfiguration();

108

ClassLoader getClassLoader();

109

int getParallelism();

110

Transformation<RowData> getInputTransformation();

111

}

112

}

113

```

114

115

The `TransformationSinkProvider` enables table sinks to integrate directly with the transformation graph, providing precise control over how data flows into external systems.

116

117

**Usage Example:**

118

119

```java

120

import org.apache.flink.table.planner.connectors.TransformationSinkProvider;

121

import org.apache.flink.streaming.api.operators.StreamSink;

122

import org.apache.flink.streaming.api.transformations.SinkTransformation;

123

124

public class MyTableSink implements DynamicTableSink {

125

126

@Override

127

public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {

128

return new TransformationSinkProvider() {

129

@Override

130

public Transformation<?> createTransformation(Context providerContext) {

131

// Get input transformation

132

Transformation<RowData> input = providerContext.getInputTransformation();

133

134

// Create sink operator

135

StreamSink<RowData> sinkOperator = new StreamSink<>(mySinkFunction);

136

137

// Create sink transformation

138

SinkTransformation<RowData> transformation =

139

new SinkTransformation<>(

140

input,

141

"MyTableSink",

142

sinkOperator,

143

providerContext.getParallelism()

144

);

145

146

return transformation;

147

}

148

};

149

}

150

}

151

```

152

153

### DynamicSourceUtils

154

155

Utility class for converting dynamic table sources to relational nodes in the optimization process.

156

157

```java { .api }

158

public final class DynamicSourceUtils {

159

160

/**

161

* Converts a DataStream to a RelNode for integration with Calcite optimization.

162

*/

163

public static RelNode convertDataStreamToRel(

164

StreamTableEnvironment tableEnv,

165

DataStream<RowData> dataStream,

166

List<String> fieldNames

167

);

168

169

/**

170

* Converts a table source to a RelNode with statistics for optimization.

171

*/

172

public static RelNode convertSourceToRel(

173

FlinkOptimizeContext optimizeContext,

174

RelOptTable relOptTable,

175

DynamicTableSource tableSource,

176

FlinkStatistic statistic

177

);

178

179

/**

180

* Creates a scan rel node from a table source.

181

*/

182

public static RelNode createScanRelNode(

183

FlinkOptimizeContext optimizeContext,

184

RelOptTable relOptTable,

185

DynamicTableSource tableSource

186

);

187

}

188

```

189

190

**Usage Example:**

191

192

```java

193

import org.apache.flink.table.planner.connectors.DynamicSourceUtils;

194

import org.apache.calcite.rel.RelNode;

195

196

// Convert DataStream to RelNode for optimization

197

DataStream<RowData> sourceStream = // your data stream

198

List<String> fieldNames = Arrays.asList("id", "name", "timestamp");

199

200

RelNode relNode = DynamicSourceUtils.convertDataStreamToRel(

201

tableEnv,

202

sourceStream,

203

fieldNames

204

);

205

206

// Convert table source with statistics

207

FlinkStatistic statistics = FlinkStatistic.builder()

208

.tableStats(new TableStats(1000000L)) // 1M rows estimated

209

.build();

210

211

RelNode optimizedRel = DynamicSourceUtils.convertSourceToRel(

212

optimizeContext,

213

relOptTable,

214

myTableSource,

215

statistics

216

);

217

```

218

219

### DynamicSinkUtils

220

221

Utility class for converting dynamic table sinks to relational nodes and managing sink operations.

222

223

```java { .api }

224

public final class DynamicSinkUtils {

225

226

/**

227

* Converts a collect sink to a RelNode for query planning.

228

*/

229

public static RelNode convertCollectToRel(

230

FlinkOptimizeContext optimizeContext,

231

RelNode input,

232

DynamicTableSink tableSink,

233

String sinkName

234

);

235

236

/**

237

* Converts a table sink to a RelNode for integration with optimization.

238

*/

239

public static RelNode convertSinkToRel(

240

FlinkOptimizeContext optimizeContext,

241

RelNode input,

242

RelOptTable relOptTable,

243

DynamicTableSink tableSink,

244

String sinkName

245

);

246

247

/**

248

* Validates sink compatibility with input schema.

249

*/

250

public static void validateSchemaCompatibility(

251

ResolvedSchema inputSchema,

252

ResolvedSchema sinkSchema,

253

String sinkName

254

);

255

}

256

```

257

258

**Usage Example:**

259

260

```java

261

import org.apache.flink.table.planner.connectors.DynamicSinkUtils;

262

263

// Convert sink to RelNode for optimization

264

RelNode inputRel = // input relation from query

265

RelNode sinkRel = DynamicSinkUtils.convertSinkToRel(

266

optimizeContext,

267

inputRel,

268

relOptTable,

269

myTableSink,

270

"my_output_table"

271

);

272

273

// Validate schema compatibility

274

ResolvedSchema inputSchema = // schema from query result

275

ResolvedSchema sinkSchema = myTableSink.getConsumedDataType().getLogicalType();

276

277

DynamicSinkUtils.validateSchemaCompatibility(

278

inputSchema,

279

sinkSchema,

280

"my_output_table"

281

);

282

```

283

284

### ShortcutUtils

285

286

Utility methods for shortcut operations and performance optimizations in connector integration.

287

288

```java { .api }

289

public final class ShortcutUtils {

290

291

/**

292

* Determines if a shortcut can be applied for the given operation.

293

*/

294

public static boolean canApplyShortcut(

295

RelNode input,

296

TableSink<?> tableSink

297

);

298

299

/**

300

* Applies shortcut optimization to bypass unnecessary transformations.

301

*/

302

public static Transformation<?> applyShortcut(

303

RelNode input,

304

TableSink<?> tableSink,

305

String sinkName

306

);

307

308

/**

309

* Checks if source supports pushed down predicates.

310

*/

311

public static boolean supportsPredicatePushDown(

312

DynamicTableSource tableSource,

313

List<Expression> predicates

314

);

315

}

316

```

317

318

**Usage Example:**

319

320

```java

321

import org.apache.flink.table.planner.utils.ShortcutUtils;

322

323

// Check if shortcut optimization can be applied

324

if (ShortcutUtils.canApplyShortcut(inputRel, tableSink)) {

325

// Apply shortcut to bypass unnecessary operations

326

Transformation<?> optimizedTransformation = ShortcutUtils.applyShortcut(

327

inputRel,

328

tableSink,

329

"output_table"

330

);

331

}

332

333

// Check predicate pushdown support

334

List<Expression> predicates = // filter predicates from query

335

if (ShortcutUtils.supportsPredicatePushDown(tableSource, predicates)) {

336

// Enable predicate pushdown optimization

337

tableSource.applyFilters(predicates);

338

}

339

```

340

341

### DataViewUtils

342

343

Utilities for DataView operations in aggregations, essential for stateful stream processing with custom aggregates.

344

345

```java { .api }

346

public final class DataViewUtils {

347

348

/**

349

* Creates a state descriptor for DataView storage.

350

*/

351

public static <T> ValueStateDescriptor<T> createDataViewStateDescriptor(

352

String name,

353

Class<T> dataViewClass,

354

TypeInformation<T> typeInfo

355

);

356

357

/**

358

* Binds DataView to state backend for persistence.

359

*/

360

public static void bindDataViewToState(

361

Object dataView,

362

RuntimeContext runtimeContext,

363

String stateName

364

);

365

366

/**

367

* Cleans up DataView state when no longer needed.

368

*/

369

public static void cleanupDataViewState(

370

RuntimeContext runtimeContext,

371

String stateName

372

);

373

374

/**

375

* Checks if a class contains DataView fields.

376

*/

377

public static boolean hasDataViewFields(Class<?> clazz);

378

}

379

```

380

381

**Usage Example:**

382

383

```java

384

import org.apache.flink.table.planner.typeutils.DataViewUtils;

385

import org.apache.flink.api.common.state.ValueStateDescriptor;

386

387

// Create state descriptor for custom aggregate DataView

388

public class MyAggregateFunction extends TableAggregateFunction<Row, MyAccumulator> {

389

390

public static class MyAccumulator {

391

public MapView<String, Integer> dataView; // Custom DataView

392

}

393

394

@Override

395

public void open(FunctionContext context) throws Exception {

396

// Check if accumulator has DataView fields

397

if (DataViewUtils.hasDataViewFields(MyAccumulator.class)) {

398

// Create state descriptor

399

ValueStateDescriptor<MapView<String, Integer>> stateDesc =

400

DataViewUtils.createDataViewStateDescriptor(

401

"myDataView",

402

MapView.class,

403

Types.MAP(Types.STRING, Types.INT)

404

);

405

406

// Bind DataView to state backend

407

DataViewUtils.bindDataViewToState(

408

accumulator.dataView,

409

getRuntimeContext(),

410

"myDataView"

411

);

412

}

413

}

414

}

415

```

416

417

## Integration Patterns

418

419

### Source Integration Pattern

420

421

```java

422

// Complete source integration example

423

public class MyCustomSource implements ScanTableSource, SupportsFilterPushDown {

424

425

@Override

426

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

427

return new TransformationScanProvider() {

428

@Override

429

public Transformation<RowData> createTransformation(Context providerContext) {

430

// Create optimized source transformation

431

return createOptimizedSourceTransformation(providerContext);

432

}

433

};

434

}

435

436

@Override

437

public Result applyFilters(List<Expression> filters) {

438

// Implement predicate pushdown

439

List<Expression> acceptedFilters = new ArrayList<>();

440

List<Expression> remainingFilters = new ArrayList<>();

441

442

for (Expression filter : filters) {

443

if (canPushDownFilter(filter)) {

444

acceptedFilters.add(filter);

445

} else {

446

remainingFilters.add(filter);

447

}

448

}

449

450

return Result.of(acceptedFilters, remainingFilters);

451

}

452

}

453

```

454

455

### Sink Integration Pattern

456

457

```java

458

// Complete sink integration example

459

public class MyCustomSink implements DynamicTableSink, SupportsPartitioning {

460

461

@Override

462

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

463

return new TransformationSinkProvider() {

464

@Override

465

public Transformation<?> createTransformation(Context providerContext) {

466

// Create optimized sink transformation

467

return createOptimizedSinkTransformation(providerContext);

468

}

469

};

470

}

471

472

@Override

473

public boolean requiresPartitionGrouping(boolean supportsGrouping) {

474

// Enable partition-aware processing

475

return true;

476

}

477

}

478

```

479

480

### Error Handling in Connectors

481

482

```java

483

// Robust error handling pattern

484

public class RobustTableSource implements ScanTableSource {

485

486

@Override

487

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

488

return new TransformationScanProvider() {

489

@Override

490

public Transformation<RowData> createTransformation(Context providerContext) {

491

try {

492

return createSourceTransformation(providerContext);

493

} catch (Exception e) {

494

throw new TableException(

495

"Failed to create source transformation for table: " +

496

providerContext.getTableName(), e

497

);

498

}

499

}

500

};

501

}

502

}

503

```