or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

connector-framework.mddocs/

0

# Connector Framework

1

2

Provider interfaces for implementing custom table sources and sinks that integrate with the DataStream API. These interfaces enable developers to create connectors that seamlessly bridge external systems with Flink's table ecosystem.

3

4

## Capabilities

5

6

### Source Providers

7

8

Interfaces for implementing table sources that produce DataStream instances.

9

10

#### DataStream Scan Provider

11

12

Creates DataStream-based table sources with direct DataStream integration.

13

14

```java { .api }

15

/**

16

* Provider that produces a DataStream as runtime implementation for ScanTableSource

17

* Note: For advanced connector developers. Usually prefer SourceProvider or SourceFunctionProvider

18

*/

19

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {

20

21

/**

22

* Creates a scan DataStream from StreamExecutionEnvironment with provider context

23

* @param providerContext Context for generating unique identifiers and accessing framework services

24

* @param execEnv The StreamExecutionEnvironment for DataStream creation

25

* @return DataStream of RowData for table consumption

26

*/

27

DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);

28

29

/**

30

* @deprecated Use produceDataStream(ProviderContext, StreamExecutionEnvironment) instead

31

*/

32

@Deprecated

33

DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);

34

}

35

```

36

37

#### Source Function Provider

38

39

Creates SourceFunction-based table sources for more traditional streaming patterns.

40

41

```java { .api }

42

/**

43

* Provider that produces a SourceFunction as runtime implementation for ScanTableSource

44

*/

45

public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {

46

47

/**

48

* Creates a SourceFunction instance for data production

49

* @return SourceFunction that produces RowData records

50

*/

51

SourceFunction<RowData> createSourceFunction();

52

53

/**

54

* Indicates whether the source is bounded (finite) or unbounded (infinite)

55

* @return true if source is bounded, false for unbounded streams

56

*/

57

boolean isBounded();

58

}

59

```

60

61

**Usage Examples:**

62

63

```java

64

import org.apache.flink.streaming.api.functions.source.SourceFunction;

65

import org.apache.flink.table.data.RowData;

66

import org.apache.flink.table.connector.source.SourceFunctionProvider;

67

68

// Custom source function provider

69

public class CustomSourceProvider implements SourceFunctionProvider {

70

71

@Override

72

public SourceFunction<RowData> createSourceFunction() {

73

return new CustomSourceFunction();

74

}

75

76

@Override

77

public boolean isBounded() {

78

return false; // Unbounded stream

79

}

80

81

@Override

82

public Optional<Integer> getParallelism() {

83

return Optional.of(4); // Fixed parallelism

84

}

85

}

86

87

// Custom DataStream scan provider

88

public class CustomDataStreamProvider implements DataStreamScanProvider {

89

90

@Override

91

public DataStream<RowData> produceDataStream(

92

ProviderContext providerContext,

93

StreamExecutionEnvironment execEnv) {

94

95

return execEnv

96

.addSource(new CustomSourceFunction())

97

.uid(providerContext.generateUid("custom-source"))

98

.name("Custom Table Source");

99

}

100

}

101

```

102

103

### Sink Providers

104

105

Interfaces for implementing table sinks that consume DataStream instances.

106

107

#### DataStream Sink Provider

108

109

Creates DataStream-based table sinks with direct DataStream integration.

110

111

```java { .api }

112

/**

113

* Provider that consumes a DataStream as runtime implementation for DynamicTableSink

114

* Note: For advanced connector developers. Usually prefer SinkProvider or SinkFunctionProvider

115

*/

116

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {

117

118

/**

119

* Consumes a DataStream and returns the sink transformation

120

* @param providerContext Context for generating unique identifiers

121

* @param dataStream The input DataStream of RowData to consume

122

* @return DataStreamSink transformation for the sink operation

123

*/

124

DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);

125

126

/**

127

* @deprecated Use consumeDataStream(ProviderContext, DataStream) instead

128

*/

129

@Deprecated

130

DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);

131

}

132

```

133

134

#### Sink Function Provider

135

136

Creates SinkFunction-based table sinks for more traditional output patterns.

137

138

```java { .api }

139

/**

140

* Provider that produces a SinkFunction as runtime implementation for DynamicTableSink

141

*/

142

public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {

143

144

/**

145

* Creates a SinkFunction instance for data consumption

146

* @return SinkFunction that consumes RowData records

147

*/

148

SinkFunction<RowData> createSinkFunction();

149

}

150

```

151

152

**Usage Examples:**

153

154

```java

155

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

156

import org.apache.flink.streaming.api.datastream.DataStreamSink;

157

import org.apache.flink.table.connector.sink.SinkFunctionProvider;

158

import org.apache.flink.table.connector.sink.DataStreamSinkProvider;

159

160

// Custom sink function provider

161

public class CustomSinkFunctionProvider implements SinkFunctionProvider {

162

163

@Override

164

public SinkFunction<RowData> createSinkFunction() {

165

return new CustomSinkFunction();

166

}

167

}

168

169

// Custom DataStream sink provider

170

public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {

171

172

@Override

173

public DataStreamSink<?> consumeDataStream(

174

ProviderContext providerContext,

175

DataStream<RowData> dataStream) {

176

177

return dataStream

178

.addSink(new CustomSinkFunction())

179

.uid(providerContext.generateUid("custom-sink"))

180

.name("Custom Table Sink");

181

}

182

}

183

```

184

185

### Provider Context

186

187

Context interface providing framework services to connector implementations.

188

189

```java { .api }

190

/**

191

* Context for providing runtime instances in connectors

192

*/

193

public interface ProviderContext {

194

195

/**

196

* Generates a unique identifier for transformations/operators in the DataStream

197

* This enables stateful Flink version upgrades by providing consistent operator IDs

198

* @param name Base name for the identifier

199

* @return Unique identifier string for the transformation

200

*/

201

String generateUid(String name);

202

}

203

```

204

205

**Usage Examples:**

206

207

```java

208

// Using ProviderContext for unique ID generation

209

@Override

210

public DataStream<RowData> produceDataStream(

211

ProviderContext providerContext,

212

StreamExecutionEnvironment execEnv) {

213

214

return execEnv

215

.addSource(sourceFunction)

216

.uid(providerContext.generateUid("kafka-source")) // Unique source ID

217

.name("Kafka Table Source")

218

.map(transformFunction)

219

.uid(providerContext.generateUid("kafka-transform")) // Unique transform ID

220

.name("Kafka Data Transform");

221

}

222

```

223

224

### Factory Integration

225

226

Connector providers are typically created by DynamicTableSourceFactory and DynamicTableSinkFactory implementations.

227

228

```java { .api }

229

// Example factory implementation pattern

230

public class CustomTableSourceFactory implements DynamicTableSourceFactory {

231

232

@Override

233

public String factoryIdentifier() {

234

return "custom";

235

}

236

237

@Override

238

public DynamicTableSource createDynamicTableSource(Context context) {

239

return new CustomDynamicTableSource();

240

}

241

}

242

243

// Custom table source with provider

244

public class CustomDynamicTableSource implements ScanTableSource {

245

246

@Override

247

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

248

return new CustomDataStreamProvider();

249

}

250

}

251

```

252

253

## Type Definitions

254

255

### Runtime Provider Hierarchy

256

257

```java { .api }

258

// Source provider hierarchy

259

interface ScanTableSource.ScanRuntimeProvider { }

260

├── SourceProvider // New source API (preferred)

261

├── SourceFunctionProvider // Legacy SourceFunction API

262

└── DataStreamScanProvider // Direct DataStream integration

263

264

// Sink provider hierarchy

265

interface DynamicTableSink.SinkRuntimeProvider { }

266

├── SinkProvider // New sink API (preferred)

267

├── SinkFunctionProvider // Legacy SinkFunction API

268

└── DataStreamSinkProvider // Direct DataStream integration

269

```

270

271

### Parallelism Provider

272

273

```java { .api }

274

/**

275

* Optional interface for specifying parallelism constraints

276

*/

277

public interface ParallelismProvider {

278

279

/**

280

* Returns the parallelism for the operation

281

* @return Optional parallelism value, empty for default parallelism

282

*/

283

Optional<Integer> getParallelism();

284

}

285

```

286

287

### Context Interfaces

288

289

```java { .api }

290

// Scan context for source providers

291

interface ScanTableSource.ScanContext {

292

// Framework-provided context for scan operations

293

}

294

295

// Sink context for sink providers

296

interface DynamicTableSink.SinkContext {

297

// Framework-provided context for sink operations

298

}

299

300

// Factory context for table creation

301

interface DynamicTableFactory.Context {

302

// Configuration and metadata access

303

ReadableConfig getConfiguration();

304

String[] getIdentifier();

305

TableSchema getSchema();

306

}

307

```

308

309

### Data Types

310

311

```java { .api }

312

import org.apache.flink.table.data.RowData;

313

314

// RowData is the internal representation for table data

315

// Provides efficient serialization and field access

316

// Use RowData.createFieldGetter() for field extraction

317

// Use RowDataBuilder for construction

318

```

319

320

## Best Practices

321

322

### Unique ID Generation

323

324

Always use ProviderContext.generateUid() for operator IDs to support stateful upgrades:

325

326

```java

327

// Good: Consistent operator IDs

328

.uid(providerContext.generateUid("source"))

329

.uid(providerContext.generateUid("transform"))

330

331

// Bad: Manual or missing IDs

332

.uid("my-source") // Risk of conflicts

333

// Missing .uid() // Risk of savepoint incompatibility

334

```

335

336

### Provider Selection

337

338

Choose the appropriate provider based on your needs:

339

340

- **SourceProvider/SinkProvider**: Use for new implementations (preferred)

341

- **SourceFunctionProvider/SinkFunctionProvider**: Use for legacy compatibility

342

- **DataStreamScanProvider/DataStreamSinkProvider**: Use for complex DataStream integration

343

344

### Error Handling

345

346

Implement proper error handling and resource cleanup in custom providers:

347

348

```java

349

@Override

350

public DataStream<RowData> produceDataStream(

351

ProviderContext providerContext,

352

StreamExecutionEnvironment execEnv) {

353

354

try {

355

// Create and configure source

356

CustomSourceFunction source = new CustomSourceFunction(config);

357

358

return execEnv

359

.addSource(source)

360

.uid(providerContext.generateUid("custom-source"));

361

362

} catch (Exception e) {

363

throw new RuntimeException("Failed to create custom source", e);

364

}

365

}