or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

legacy-connector-support.mddocs/

0

# Legacy Connector Support

1

2

The Flink Table API Java Bridge maintains backward compatibility with legacy connector interfaces that were used before the introduction of the modern connector framework (FLIP-95). These interfaces are deprecated but still supported for existing connector implementations.

3

4

## Legacy Factory Interfaces

5

6

### StreamTableSourceFactory

7

8

Factory interface for creating legacy stream table sources:

9

10

```java { .api }

11

@Deprecated

12

@PublicEvolving

13

public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {

14

StreamTableSource<T> createStreamTableSource(Map<String, String> properties);

15

16

// Inherited from TableSourceFactory

17

TableSource<T> createTableSource(Map<String, String> properties);

18

}

19

```

20

21

**Migration Note**: New implementations should use `DynamicTableSourceFactory` instead.

22

23

### StreamTableSinkFactory

24

25

Factory interface for creating legacy stream table sinks:

26

27

```java { .api }

28

@Deprecated

29

@PublicEvolving

30

public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {

31

StreamTableSink<T> createStreamTableSink(Map<String, String> properties);

32

33

// Inherited from TableSinkFactory

34

TableSink<T> createTableSink(Map<String, String> properties);

35

}

36

```

37

38

**Migration Note**: New implementations should use `DynamicTableSinkFactory` instead.

39

40

## Legacy Source Interfaces

41

42

### StreamTableSource

43

44

Base interface for legacy streaming table sources:

45

46

```java { .api }

47

@Deprecated

48

public interface StreamTableSource<T> extends TableSource<T> {

49

default boolean isBounded() {

50

return false;

51

}

52

53

DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);

54

}

55

```

56

57

**Usage Example (Deprecated):**

58

59

```java

60

@Deprecated

61

public class MyLegacyTableSource implements StreamTableSource<Row> {

62

private final String[] fieldNames;

63

private final TypeInformation<?>[] fieldTypes;

64

private final MySourceConfig config;

65

66

public MyLegacyTableSource(String[] fieldNames, TypeInformation<?>[] fieldTypes, MySourceConfig config) {

67

this.fieldNames = fieldNames;

68

this.fieldTypes = fieldTypes;

69

this.config = config;

70

}

71

72

@Override

73

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

74

return execEnv

75

.addSource(new MySourceFunction(config))

76

.map(new MyRowMapper());

77

}

78

79

@Override

80

public boolean isBounded() {

81

return config.isBounded();

82

}

83

84

@Override

85

public TableSchema getTableSchema() {

86

return TableSchema.builder()

87

.fields(fieldNames, fieldTypes)

88

.build();

89

}

90

91

@Override

92

public String explainSource() {

93

return "MyLegacyTableSource";

94

}

95

}

96

```

97

98

### InputFormatTableSource

99

100

Abstract class for bounded table sources based on InputFormat:

101

102

```java { .api }

103

@Deprecated

104

@Experimental

105

public abstract class InputFormatTableSource<T> extends StreamTableSource<T> {

106

public abstract InputFormat<T, ?> getInputFormat();

107

108

@Override

109

public final boolean isBounded() {

110

return true;

111

}

112

113

@Override

114

public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) {

115

return execEnv.createInput(getInputFormat(), getReturnType());

116

}

117

}

118

```

119

120

**Usage Example (Deprecated):**

121

122

```java

123

@Deprecated

124

public class MyInputFormatTableSource extends InputFormatTableSource<Row> {

125

private final MyInputFormat inputFormat;

126

private final RowTypeInfo returnType;

127

128

public MyInputFormatTableSource(MyInputFormat inputFormat, RowTypeInfo returnType) {

129

this.inputFormat = inputFormat;

130

this.returnType = returnType;

131

}

132

133

@Override

134

public InputFormat<Row, ?> getInputFormat() {

135

return inputFormat;

136

}

137

138

@Override

139

public TypeInformation<Row> getReturnType() {

140

return returnType;

141

}

142

143

@Override

144

public TableSchema getTableSchema() {

145

return TableSchema.fromTypeInfo(returnType);

146

}

147

}

148

```

149

150

## Legacy Sink Interfaces

151

152

### StreamTableSink

153

154

Base interface for legacy streaming table sinks:

155

156

```java { .api }

157

@Deprecated

158

public interface StreamTableSink<T> extends TableSink<T> {

159

DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);

160

}

161

```

162

163

**Usage Example (Deprecated):**

164

165

```java

166

@Deprecated

167

public class MyLegacyTableSink implements StreamTableSink<Row> {

168

private final String[] fieldNames;

169

private final TypeInformation<?>[] fieldTypes;

170

private final MySinkConfig config;

171

172

@Override

173

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

174

return dataStream

175

.map(new MyRowConverter(config))

176

.addSink(new MySinkFunction(config))

177

.name("MyLegacySink");

178

}

179

180

@Override

181

public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {

182

return new MyLegacyTableSink(fieldNames, fieldTypes, config);

183

}

184

185

@Override

186

public TableSchema getTableSchema() {

187

return TableSchema.builder()

188

.fields(fieldNames, fieldTypes)

189

.build();

190

}

191

}

192

```

193

194

### AppendStreamTableSink

195

196

Interface for append-only stream table sinks:

197

198

```java { .api }

199

@Deprecated

200

@PublicEvolving

201

public interface AppendStreamTableSink<T> extends StreamTableSink<T> {

202

// Inherits all methods from StreamTableSink

203

// Semantically indicates append-only capability

204

}

205

```

206

207

### RetractStreamTableSink

208

209

Interface for retractable stream table sinks that can handle updates:

210

211

```java { .api }

212

@Deprecated

213

@PublicEvolving

214

public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {

215

TypeInformation<T> getRecordType();

216

217

default TypeInformation<Tuple2<Boolean, T>> getOutputType() {

218

return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());

219

}

220

}

221

```

222

223

**Usage Example (Deprecated):**

224

225

```java

226

@Deprecated

227

public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {

228

229

@Override

230

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

231

return dataStream

232

.process(new ProcessFunction<Tuple2<Boolean, Row>, MyRecord>() {

233

@Override

234

public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<MyRecord> out) {

235

Boolean isInsert = value.f0;

236

Row row = value.f1;

237

238

if (isInsert) {

239

out.collect(MyRecord.fromRow(row, ChangeType.INSERT));

240

} else {

241

out.collect(MyRecord.fromRow(row, ChangeType.DELETE));

242

}

243

}

244

})

245

.addSink(new MyChangelogSinkFunction());

246

}

247

248

@Override

249

public TypeInformation<Row> getRecordType() {

250

return Types.ROW_NAMED(fieldNames, fieldTypes);

251

}

252

}

253

```

254

255

### UpsertStreamTableSink

256

257

Interface for upsert stream table sinks that can handle insert/update/delete operations:

258

259

```java { .api }

260

@Deprecated

261

@PublicEvolving

262

public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {

263

void setKeyFields(String[] keys);

264

void setIsAppendOnly(Boolean isAppendOnly);

265

TypeInformation<T> getRecordType();

266

267

default TypeInformation<Tuple2<Boolean, T>> getOutputType() {

268

return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());

269

}

270

}

271

```

272

273

**Usage Example (Deprecated):**

274

275

```java

276

@Deprecated

277

public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {

278

private String[] keyFields;

279

private Boolean isAppendOnly;

280

281

@Override

282

public void setKeyFields(String[] keys) {

283

this.keyFields = keys;

284

}

285

286

@Override

287

public void setIsAppendOnly(Boolean isAppendOnly) {

288

this.isAppendOnly = isAppendOnly;

289

}

290

291

@Override

292

public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

293

return dataStream

294

.keyBy(new KeySelector<Tuple2<Boolean, Row>, String>() {

295

@Override

296

public String getKey(Tuple2<Boolean, Row> value) throws Exception {

297

Row row = value.f1;

298

// Build key from key fields

299

StringBuilder keyBuilder = new StringBuilder();

300

for (String keyField : keyFields) {

301

int index = getFieldIndex(keyField);

302

keyBuilder.append(row.getField(index)).append("|");

303

}

304

return keyBuilder.toString();

305

}

306

})

307

.addSink(new MyUpsertSinkFunction(keyFields, isAppendOnly));

308

}

309

310

@Override

311

public TypeInformation<Row> getRecordType() {

312

return Types.ROW_NAMED(fieldNames, fieldTypes);

313

}

314

}

315

```

316

317

### OutputFormatTableSink

318

319

Abstract class for table sinks based on OutputFormat:

320

321

```java { .api }

322

@Deprecated

323

public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {

324

public abstract OutputFormat<T> getOutputFormat();

325

326

@Override

327

public DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {

328

return dataStream.writeUsingOutputFormat(getOutputFormat());

329

}

330

}

331

```

332

333

## Legacy CSV Connector (Testing Only)

334

335

The CSV connector implementations are maintained only for testing the legacy connector stack:

336

337

### CsvTableSource

338

339

```java { .api }

340

@Deprecated

341

public class CsvTableSource extends InputFormatTableSource<Row> {

342

// Constructor options

343

public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);

344

public CsvTableSource(

345

String path,

346

String[] fieldNames,

347

TypeInformation<?>[] fieldTypes,

348

String fieldDelim,

349

String rowDelim,

350

Character quoteCharacter,

351

boolean ignoreFirstLine,

352

String ignoreComments,

353

boolean lenient

354

);

355

356

// Builder pattern support

357

public static CsvTableSource.Builder builder();

358

}

359

```

360

361

### CsvTableSink

362

363

```java { .api }

364

@Deprecated

365

public class CsvTableSink extends OutputFormatTableSink<Row> {

366

public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);

367

368

@Override

369

public OutputFormat<Row> getOutputFormat() {

370

return new CsvOutputFormat<>(path, fieldDelim, numFiles, writeMode);

371

}

372

}

373

```

374

375

## Migration Guidelines

376

377

### From Legacy to Modern Connectors

378

379

**Step 1: Replace Factory Interface**

380

381

```java

382

// Old (Deprecated)

383

public class MyConnectorFactory implements StreamTableSourceFactory<Row> {

384

@Override

385

public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {

386

return new MyLegacyTableSource(properties);

387

}

388

}

389

390

// New (Recommended)

391

public class MyConnectorFactory implements DynamicTableSourceFactory {

392

@Override

393

public DynamicTableSource createDynamicTableSource(Context context) {

394

return new MyModernTableSource(context);

395

}

396

}

397

```

398

399

**Step 2: Replace Source Interface**

400

401

```java

402

// Old (Deprecated)

403

public class MyLegacyTableSource implements StreamTableSource<Row> {

404

@Override

405

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

406

return execEnv.addSource(new MySourceFunction());

407

}

408

}

409

410

// New (Recommended)

411

public class MyModernTableSource implements DynamicTableSource {

412

@Override

413

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {

414

return new DataStreamScanProvider() {

415

@Override

416

public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

417

return execEnv.addSource(new MySourceFunction())

418

.map(new MyRowDataMapper());

419

}

420

};

421

}

422

}

423

```

424

425

**Step 3: Replace Sink Interface**

426

427

```java

428

// Old (Deprecated)

429

public class MyLegacyTableSink implements StreamTableSink<Row> {

430

@Override

431

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

432

return dataStream.addSink(new MySinkFunction());

433

}

434

}

435

436

// New (Recommended)

437

public class MyModernTableSink implements DynamicTableSink {

438

@Override

439

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

440

return new DataStreamSinkProvider() {

441

@Override

442

public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {

443

return dataStream

444

.map(new MyRowDataConverter())

445

.addSink(new MySinkFunction());

446

}

447

};

448

}

449

}

450

```

451

452

### Configuration Migration

453

454

**Legacy String-based Properties:**

455

456

```java

457

// Old approach

458

Map<String, String> properties = new HashMap<>();

459

properties.put("connector.type", "my-connector");

460

properties.put("connector.host", "localhost");

461

properties.put("connector.port", "8080");

462

```

463

464

**Modern ConfigOption-based Configuration:**

465

466

```java

467

// New approach

468

public class MyConnectorOptions {

469

public static final ConfigOption<String> HOST =

470

ConfigOptions.key("host")

471

.stringType()

472

.defaultValue("localhost");

473

474

public static final ConfigOption<Integer> PORT =

475

ConfigOptions.key("port")

476

.intType()

477

.defaultValue(8080);

478

}

479

```

480

481

## Compatibility Considerations

482

483

### Runtime Compatibility

484

485

1. **Legacy connectors** continue to work with current Flink versions

486

2. **Mixed usage** of legacy and modern connectors is supported

487

3. **Gradual migration** can be performed incrementally

488

489

### Feature Limitations

490

491

Legacy connectors have limitations compared to modern ones:

492

493

1. **No support for** complex data types introduced in newer versions

494

2. **Limited metadata** access compared to modern metadata handling

495

3. **No support for** advanced features like watermark pushdown

496

4. **String-based configuration** instead of type-safe ConfigOptions

497

498

### Performance Implications

499

500

1. **Legacy connectors** may have slight performance overhead

501

2. **Type conversions** between Row and RowData may be needed

502

3. **Modern connectors** are optimized for current Flink runtime

503

504

## Best Practices for Legacy Support

505

506

### When to Use Legacy Interfaces

507

508

1. **Maintaining existing connectors** that haven't been migrated yet

509

2. **Quick prototyping** when familiar with legacy APIs

510

3. **Compatibility requirements** with older Flink versions

511

512

### Migration Strategy

513

514

1. **Plan migration** during major version upgrades

515

2. **Test thoroughly** with both legacy and modern implementations

516

3. **Migrate incrementally** by component rather than all at once

517

4. **Document migration** progress and remaining legacy components

518

519

### Code Organization

520

521

```java

522

// Organize legacy code clearly

523

@Deprecated

524

@SuppressWarnings("deprecation")

525

public class LegacyConnectorSupport {

526

527

// Legacy implementation

528

public static class LegacySource implements StreamTableSource<Row> {

529

// Implementation

530

}

531

532

// Migration helper

533

public static DynamicTableSource migrateToModern(LegacySource legacy) {

534

return new ModernSourceAdapter(legacy);

535

}

536

}

537

```