or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

modern-connector-framework.mddocs/

0

# Modern Connector Framework

1

2

The Modern Connector Framework in Flink Table API Java Bridge provides new connector interfaces following the FLIP-95 design. These interfaces offer better integration with the DataStream API and improved flexibility for connector development.

3

4

## Overview

5

6

The modern connector framework introduces provider-based interfaces that integrate directly with Flink's DataStream API, replacing the legacy table source/sink interfaces. This approach provides better type safety, improved performance, and cleaner separation of concerns.

7

8

## Source Providers

9

10

### DataStreamScanProvider

11

12

The `DataStreamScanProvider` interface allows connectors to directly produce DataStream instances:

13

14

```java { .api }

15

@PublicEvolving

16

public interface DataStreamScanProvider extends ScanRuntimeProvider {

17

DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);

18

}

19

```

20

21

**Usage Example:**

22

23

```java

24

public class MyDataStreamScanProvider implements DataStreamScanProvider {

25

private final MySourceConfig config;

26

27

public MyDataStreamScanProvider(MySourceConfig config) {

28

this.config = config;

29

}

30

31

@Override

32

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

33

return execEnv.addSource(new MySourceFunction(config))

34

.map(new MyRowDataMapper());

35

}

36

}

37

38

// In your DynamicTableSource implementation

39

@Override

40

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

41

return new MyDataStreamScanProvider(sourceConfig);

42

}

43

```

44

45

### SourceFunctionProvider

46

47

The `SourceFunctionProvider` interface provides a way to create connector sources using Flink's SourceFunction:

48

49

```java { .api }

50

@PublicEvolving

51

public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {

52

static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);

53

54

SourceFunction<RowData> createSourceFunction();

55

56

// Note: isBounded() method is inherited from ScanRuntimeProvider parent interface

57

}

58

```

59

60

**Usage Example:**

61

62

```java

63

// Create a bounded source function provider

64

SourceFunction<RowData> mySourceFunction = new MyBoundedSourceFunction(config);

65

SourceFunctionProvider provider = SourceFunctionProvider.of(mySourceFunction, true);

66

67

// Create an unbounded source function provider

68

SourceFunction<RowData> streamingSource = new MyStreamingSourceFunction(config);

69

SourceFunctionProvider streamingProvider = SourceFunctionProvider.of(streamingSource, false);

70

71

// In your DynamicTableSource implementation

72

@Override

73

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

74

if (config.isBounded()) {

75

return SourceFunctionProvider.of(new MyBoundedSourceFunction(config), true);

76

} else {

77

return SourceFunctionProvider.of(new MyStreamingSourceFunction(config), false);

78

}

79

}

80

```

81

82

## Sink Providers

83

84

### DataStreamSinkProvider

85

86

The `DataStreamSinkProvider` interface allows connectors to directly consume DataStream instances:

87

88

```java { .api }

89

@PublicEvolving

90

public interface DataStreamSinkProvider extends SinkRuntimeProvider {

91

DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

92

93

default Optional<Integer> getParallelism() {

94

return Optional.empty();

95

}

96

}

97

```

98

99

**Usage Example:**

100

101

```java

102

public class MyDataStreamSinkProvider implements DataStreamSinkProvider {

103

private final MySinkConfig config;

104

105

public MyDataStreamSinkProvider(MySinkConfig config) {

106

this.config = config;

107

}

108

109

@Override

110

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

111

return dataStream

112

.map(new MyRowDataConverter(config))

113

.addSink(new MySinkFunction(config))

114

.name("My Custom Sink");

115

}

116

117

@Override

118

public Optional<Integer> getParallelism() {

119

return Optional.ofNullable(config.getParallelism());

120

}

121

}

122

123

// In your DynamicTableSink implementation

124

@Override

125

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

126

return new MyDataStreamSinkProvider(sinkConfig);

127

}

128

```

129

130

### SinkFunctionProvider

131

132

The `SinkFunctionProvider` interface provides a way to create connector sinks using Flink's SinkFunction:

133

134

```java { .api }

135

@PublicEvolving

136

public interface SinkFunctionProvider extends SinkRuntimeProvider {

137

static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);

138

static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);

139

140

SinkFunction<RowData> createSinkFunction();

141

142

default Optional<Integer> getParallelism() {

143

return Optional.empty();

144

}

145

}

146

```

147

148

**Usage Example:**

149

150

```java

151

// Create a sink function provider without parallelism constraint

152

SinkFunction<RowData> mySinkFunction = new MySinkFunction(config);

153

SinkFunctionProvider provider = SinkFunctionProvider.of(mySinkFunction);

154

155

// Create a sink function provider with specific parallelism

156

SinkFunction<RowData> parallelSink = new MyParallelSinkFunction(config);

157

SinkFunctionProvider parallelProvider = SinkFunctionProvider.of(parallelSink, 4);

158

159

// In your DynamicTableSink implementation

160

@Override

161

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

162

SinkFunction<RowData> sinkFunction = new MySinkFunction(sinkConfig);

163

164

if (sinkConfig.getParallelism() != null) {

165

return SinkFunctionProvider.of(sinkFunction, sinkConfig.getParallelism());

166

} else {

167

return SinkFunctionProvider.of(sinkFunction);

168

}

169

}

170

```

171

172

## Complete Connector Implementation Examples

173

174

### Custom DataStream Source

175

176

```java

177

public class MyCustomTableSource implements DynamicTableSource {

178

private final MySourceConfig config;

179

private final ResolvedSchema resolvedSchema;

180

181

public MyCustomTableSource(MySourceConfig config, ResolvedSchema resolvedSchema) {

182

this.config = config;

183

this.resolvedSchema = resolvedSchema;

184

}

185

186

@Override

187

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

188

return new DataStreamScanProvider() {

189

@Override

190

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

191

return execEnv

192

.addSource(new MySourceFunction(config))

193

.returns(context.createTypeInformation())

194

.map(new MyToRowDataMapper(resolvedSchema));

195

}

196

};

197

}

198

199

@Override

200

public DynamicTableSource copy() {

201

return new MyCustomTableSource(config, resolvedSchema);

202

}

203

204

@Override

205

public String asSummaryString() {

206

return "MyCustomSource";

207

}

208

209

@Override

210

public ChangelogMode getChangelogMode() {

211

return ChangelogMode.insertOnly();

212

}

213

}

214

```

215

216

### Custom DataStream Sink

217

218

```java

219

public class MyCustomTableSink implements DynamicTableSink {

220

private final MySinkConfig config;

221

private final ResolvedSchema resolvedSchema;

222

223

public MyCustomTableSink(MySinkConfig config, ResolvedSchema resolvedSchema) {

224

this.config = config;

225

this.resolvedSchema = resolvedSchema;

226

}

227

228

@Override

229

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

230

return new DataStreamSinkProvider() {

231

@Override

232

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

233

return dataStream

234

.map(new MyFromRowDataMapper(resolvedSchema))

235

.addSink(new MySinkFunction(config))

236

.name("MyCustomSink");

237

}

238

239

@Override

240

public Optional<Integer> getParallelism() {

241

return Optional.ofNullable(config.getParallelism());

242

}

243

};

244

}

245

246

@Override

247

public DynamicTableSink copy() {

248

return new MyCustomTableSink(config, resolvedSchema);

249

}

250

251

@Override

252

public String asSummaryString() {

253

return "MyCustomSink";

254

}

255

256

@Override

257

public ChangelogMode getChangelogMode() {

258

return ChangelogMode.insertOnly();

259

}

260

}

261

```

262

263

## Advanced Integration Patterns

264

265

### Source with Watermark Strategy

266

267

```java

268

public class WatermarkedDataStreamScanProvider implements DataStreamScanProvider {

269

private final MySourceConfig config;

270

private final WatermarkStrategy<MyRecord> watermarkStrategy;

271

272

@Override

273

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

274

return execEnv

275

.fromSource(

276

new MyFLinkSource(config),

277

watermarkStrategy,

278

"MyWatermarkedSource"

279

)

280

.map(new MyToRowDataMapper());

281

}

282

}

283

```

284

285

### Sink with State Backend Integration

286

287

```java

288

public class StatefulDataStreamSinkProvider implements DataStreamSinkProvider {

289

private final MySinkConfig config;

290

291

@Override

292

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

293

return dataStream

294

.keyBy(new MyKeySelector())

295

.process(new MyStatefulSinkFunction(config))

296

.addSink(new MyOutputSink(config));

297

}

298

}

299

```

300

301

### Connector with Custom Serialization

302

303

```java

304

public class SerializationAwareDataStreamSinkProvider implements DataStreamSinkProvider {

305

private final MySinkConfig config;

306

private final ResolvedSchema schema;

307

308

@Override

309

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

310

// Create custom serializer based on schema

311

MyCustomSerializer serializer = new MyCustomSerializer(schema);

312

313

return dataStream

314

.map(rowData -> serializer.serialize(rowData))

315

.addSink(new MySinkFunction(config));

316

}

317

}

318

```

319

320

## Error Handling and Resilience

321

322

### Source Error Handling

323

324

```java

325

public class ResilientDataStreamScanProvider implements DataStreamScanProvider {

326

private final MySourceConfig config;

327

328

@Override

329

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

330

return execEnv

331

.addSource(new MySourceFunction(config))

332

.map(new MyToRowDataMapper())

333

.process(new ProcessFunction<RowData, RowData>() {

334

@Override

335

public void processElement(RowData value, Context ctx, Collector<RowData> out) {

336

try {

337

// Validate and process

338

validateRowData(value);

339

out.collect(value);

340

} catch (Exception e) {

341

// Log error and optionally send to side output

342

getRuntimeContext().getMetricGroup()

343

.counter("malformed_records")

344

.inc();

345

}

346

}

347

});

348

}

349

}

350

```

351

352

### Sink Error Handling

353

354

```java

355

public class ResilientDataStreamSinkProvider implements DataStreamSinkProvider {

356

private final MySinkConfig config;

357

358

@Override

359

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

360

return dataStream

361

.map(new MyRowDataConverter())

362

.process(new ProcessFunction<MyRecord, MyRecord>() {

363

@Override

364

public void processElement(MyRecord value, Context ctx, Collector<MyRecord> out) {

365

try {

366

out.collect(value);

367

} catch (Exception e) {

368

// Handle serialization errors

369

ctx.output(errorOutputTag, new ErrorRecord(value, e));

370

}

371

}

372

})

373

.addSink(new MyResilientSinkFunction(config));

374

}

375

}

376

```

377

378

## Performance Optimization

379

380

### Batching in Sinks

381

382

```java

383

public class BatchingDataStreamSinkProvider implements DataStreamSinkProvider {

384

private final MySinkConfig config;

385

386

@Override

387

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

388

return dataStream

389

.map(new MyRowDataConverter())

390

.countWindow(config.getBatchSize())

391

.apply(new WindowFunction<MyRecord, List<MyRecord>, GlobalWindow>() {

392

@Override

393

public void apply(GlobalWindow window,

394

Iterable<MyRecord> values,

395

Collector<List<MyRecord>> out) {

396

List<MyRecord> batch = new ArrayList<>();

397

values.forEach(batch::add);

398

out.collect(batch);

399

}

400

})

401

.addSink(new MyBatchingSinkFunction(config));

402

}

403

}

404

```

405

406

### Parallel Processing

407

408

```java

409

public class ParallelDataStreamSinkProvider implements DataStreamSinkProvider {

410

private final MySinkConfig config;

411

412

@Override

413

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

414

return dataStream

415

.rebalance() // Distribute evenly across parallel instances

416

.map(new MyRowDataConverter())

417

.addSink(new MySinkFunction(config))

418

.setParallelism(config.getParallelism());

419

}

420

421

@Override

422

public Optional<Integer> getParallelism() {

423

return Optional.of(config.getParallelism());

424

}

425

}

426

```

427

428

## Migration from Legacy Interfaces

429

430

When migrating from legacy `StreamTableSource`/`StreamTableSink` interfaces:

431

432

1. **Replace TableSource**: Implement `DynamicTableSource` with `DataStreamScanProvider`

433

2. **Replace TableSink**: Implement `DynamicTableSink` with `DataStreamSinkProvider`

434

3. **Update Factory**: Implement `DynamicTableSourceFactory`/`DynamicTableSinkFactory`

435

4. **Handle Configuration**: Use `ConfigOption` instead of string-based properties

436

437

**Before (Legacy):**

438

439

```java

440

public class LegacyTableSource implements StreamTableSource<Row> {

441

@Override

442

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

443

return execEnv.addSource(new MySourceFunction());

444

}

445

}

446

```

447

448

**After (Modern):**

449

450

```java

451

public class ModernTableSource implements DynamicTableSource {

452

@Override

453

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

454

return new DataStreamScanProvider() {

455

@Override

456

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

457

return execEnv.addSource(new MySourceFunction())

458

.map(new MyRowDataMapper());

459

}

460

};

461

}

462

}

463

```