or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

datastream-connectors.mddocs/

0

# DataStream Connectors

1

2

Provider interfaces for advanced connector development that integrate directly with DataStream API. These providers enable custom connectors to produce and consume DataStreams while maintaining full integration with Flink's table ecosystem.

3

4

## Capabilities

5

6

### DataStream Scan Provider

7

8

Provider interface for creating table sources that produce DataStreams directly.

9

10

```java { .api }

11

/**

12

* Provider that produces a Java DataStream as runtime implementation for ScanTableSource

13

* Note: This provider is only meant for advanced connector developers

14

*/

15

public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {

16

17

/**

18

* Creates a scan DataStream from a StreamExecutionEnvironment

19

* Note: Must set unique identifiers for transformations when using CompiledPlan feature

20

* @param providerContext Context providing utilities like UID generation

21

* @param execEnv StreamExecutionEnvironment for creating the DataStream

22

* @return DataStream producing RowData for the table source

23

*/

24

DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);

25

}

26

```

27

28

**Usage Examples:**

29

30

```java

31

import org.apache.flink.table.connector.source.DataStreamScanProvider;

32

import org.apache.flink.table.connector.ProviderContext;

33

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

34

import org.apache.flink.streaming.api.datastream.DataStream;

35

import org.apache.flink.table.data.RowData;

36

37

public class CustomDataStreamScanProvider implements DataStreamScanProvider {

38

39

@Override

40

public DataStream<RowData> produceDataStream(

41

ProviderContext providerContext,

42

StreamExecutionEnvironment execEnv) {

43

44

// Create custom data source

45

DataStream<RowData> sourceStream = execEnv

46

.addSource(new CustomSourceFunction())

47

.uid(providerContext.generateUid("custom-source")); // Unique ID for savepoint compatibility

48

49

// Apply transformations with unique IDs

50

return sourceStream

51

.map(new CustomRowDataMapper())

52

.uid(providerContext.generateUid("custom-mapper"));

53

}

54

55

@Override

56

public Optional<Integer> getParallelism() {

57

return Optional.of(4); // Custom parallelism

58

}

59

}

60

```

61

62

### DataStream Sink Provider

63

64

Provider interface for creating table sinks that consume DataStreams directly.

65

66

```java { .api }

67

/**

68

* Provider that consumes a Java DataStream as runtime implementation for DynamicTableSink

69

* Note: This provider is only meant for advanced connector developers

70

*/

71

public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {

72

73

/**

74

* Consumes the given DataStream and returns the sink transformation

75

* Note: Must set unique identifiers for transformations when using CompiledPlan feature

76

* @param providerContext Context providing utilities like UID generation

77

* @param dataStream Input DataStream of RowData to consume

78

* @return DataStreamSink representing the sink transformation

79

*/

80

DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);

81

82

/**

83

* Custom parallelism for the sink operations

84

* Note: If multiple transformations are applied, set same parallelism to avoid changelog issues

85

* @return Optional parallelism setting

86

*/

87

@Override

88

default Optional<Integer> getParallelism() {

89

return Optional.empty();

90

}

91

}

92

```

93

94

**Usage Examples:**

95

96

```java

97

import org.apache.flink.table.connector.sink.DataStreamSinkProvider;

98

import org.apache.flink.streaming.api.datastream.DataStream;

99

import org.apache.flink.streaming.api.datastream.DataStreamSink;

100

import org.apache.flink.table.data.RowData;

101

102

public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {

103

104

@Override

105

public DataStreamSink<?> consumeDataStream(

106

ProviderContext providerContext,

107

DataStream<RowData> dataStream) {

108

109

// Apply transformations to the input stream

110

DataStream<String> transformedStream = dataStream

111

.map(new RowDataToStringMapper())

112

.uid(providerContext.generateUid("sink-mapper"));

113

114

// Create sink with unique ID

115

return transformedStream

116

.addSink(new CustomSinkFunction())

117

.uid(providerContext.generateUid("custom-sink"));

118

}

119

120

@Override

121

public Optional<Integer> getParallelism() {

122

return Optional.of(2); // Custom parallelism for all sink operations

123

}

124

}

125

```

126

127

### Provider Context

128

129

Context interface providing utilities for connector providers.

130

131

```java { .api }

132

/**

133

* Context providing utilities for runtime providers

134

*/

135

public interface ProviderContext {

136

137

/**

138

* Generates topology-wide unique identifier for transformations

139

* Essential for stateful upgrades and savepoint compatibility

140

* @param operatorName Base name for the operator

141

* @return Unique identifier string

142

*/

143

String generateUid(String operatorName);

144

}

145

```

146

147

### Parallelism Provider

148

149

Interface for specifying custom parallelism in connector providers.

150

151

```java { .api }

152

/**

153

* Provider interface for specifying custom parallelism

154

*/

155

public interface ParallelismProvider {

156

157

/**

158

* Returns custom parallelism for the connector operations

159

* @return Optional parallelism setting, empty means use default

160

*/

161

default Optional<Integer> getParallelism() {

162

return Optional.empty();

163

}

164

}

165

```

166

167

## Advanced Implementation Patterns

168

169

### Custom Source Connector

170

171

Complete example of implementing a custom table source with DataStream integration.

172

173

```java

174

import org.apache.flink.table.connector.source.DynamicTableSource;

175

import org.apache.flink.table.connector.source.ScanTableSource;

176

177

public class CustomTableSource implements ScanTableSource {

178

179

@Override

180

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

181

return new DataStreamScanProvider() {

182

@Override

183

public DataStream<RowData> produceDataStream(

184

ProviderContext providerContext,

185

StreamExecutionEnvironment execEnv) {

186

187

// Create source with configuration

188

CustomSourceFunction sourceFunction = new CustomSourceFunction(config);

189

190

return execEnv

191

.addSource(sourceFunction)

192

.uid(providerContext.generateUid("custom-table-source"))

193

.map(new RecordToRowDataMapper())

194

.uid(providerContext.generateUid("record-mapper"));

195

}

196

197

@Override

198

public Optional<Integer> getParallelism() {

199

return Optional.of(config.getSourceParallelism());

200

}

201

};

202

}

203

204

@Override

205

public DynamicTableSource copy() {

206

return new CustomTableSource();

207

}

208

209

@Override

210

public String asSummaryString() {

211

return "CustomTableSource";

212

}

213

}

214

```

215

216

### Custom Sink Connector

217

218

Complete example of implementing a custom table sink with DataStream integration.

219

220

```java

221

import org.apache.flink.table.connector.sink.DynamicTableSink;

222

223

public class CustomTableSink implements DynamicTableSink {

224

225

@Override

226

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

227

return new DataStreamSinkProvider() {

228

@Override

229

public DataStreamSink<?> consumeDataStream(

230

ProviderContext providerContext,

231

DataStream<RowData> dataStream) {

232

233

// Apply pre-processing transformations

234

DataStream<CustomRecord> processedStream = dataStream

235

.map(new RowDataToCustomRecordMapper())

236

.uid(providerContext.generateUid("sink-mapper"))

237

.filter(new CustomRecordFilter())

238

.uid(providerContext.generateUid("sink-filter"));

239

240

// Create sink function

241

CustomSinkFunction sinkFunction = new CustomSinkFunction(config);

242

243

return processedStream

244

.addSink(sinkFunction)

245

.uid(providerContext.generateUid("custom-table-sink"));

246

}

247

248

@Override

249

public Optional<Integer> getParallelism() {

250

return Optional.of(config.getSinkParallelism());

251

}

252

};

253

}

254

255

@Override

256

public DynamicTableSink copy() {

257

return new CustomTableSink();

258

}

259

260

@Override

261

public String asSummaryString() {

262

return "CustomTableSink";

263

}

264

}

265

```

266

267

### Legacy Sink Function Provider

268

269

Provider for legacy sink functions in table sinks.

270

271

```java { .api }

272

/**

273

* Provider for sink functions in legacy table sinks

274

*/

275

public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {

276

277

/**

278

* Creates a sink function for consuming table data

279

* @return SinkFunction instance

280

*/

281

SinkFunction<RowData> createSinkFunction();

282

283

/**

284

* Optional parallelism for the sink function

285

* @return Optional parallelism setting

286

*/

287

default Optional<Integer> getParallelism() {

288

return Optional.empty();

289

}

290

}

291

```

292

293

### Legacy Source Function Provider

294

295

Provider for legacy source functions in table sources.

296

297

```java { .api }

298

/**

299

* Provider for source functions in legacy table sources

300

*/

301

public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {

302

303

/**

304

* Creates a source function for producing table data

305

* @return SourceFunction instance

306

*/

307

SourceFunction<RowData> createSourceFunction();

308

309

/**

310

* Optional parallelism for the source function

311

* @return Optional parallelism setting

312

*/

313

default Optional<Integer> getParallelism() {

314

return Optional.empty();

315

}

316

}

317

```

318

319

## Best Practices

320

321

### Unique ID Generation

322

323

Always use ProviderContext.generateUid() for transformation IDs to ensure savepoint compatibility:

324

325

```java

326

// Good - uses context for unique IDs

327

DataStream<RowData> stream = execEnv

328

.addSource(sourceFunction)

329

.uid(providerContext.generateUid("my-source"))

330

.map(mapper)

331

.uid(providerContext.generateUid("my-mapper"));

332

333

// Bad - hardcoded IDs may conflict

334

DataStream<RowData> stream = execEnv

335

.addSource(sourceFunction)

336

.uid("hardcoded-source") // May conflict with other connectors

337

.map(mapper)

338

.uid("hardcoded-mapper");

339

```

340

341

### Parallelism Consistency

342

343

When using custom parallelism in sinks, ensure all transformations use the same parallelism:

344

345

```java

346

@Override

347

public DataStreamSink<?> consumeDataStream(

348

ProviderContext providerContext,

349

DataStream<RowData> dataStream) {

350

351

int customParallelism = 4;

352

353

return dataStream

354

.map(mapper)

355

.setParallelism(customParallelism) // Same parallelism

356

.uid(providerContext.generateUid("mapper"))

357

.addSink(sinkFunction)

358

.setParallelism(customParallelism) // Same parallelism

359

.uid(providerContext.generateUid("sink"));

360

}

361

```

362

363

## Types

364

365

### Core Connector Types

366

367

```java { .api }

368

import org.apache.flink.table.connector.source.DataStreamScanProvider;

369

import org.apache.flink.table.connector.sink.DataStreamSinkProvider;

370

import org.apache.flink.table.connector.ProviderContext;

371

import org.apache.flink.table.connector.ParallelismProvider;

372

```

373

374

### DataStream Integration Types

375

376

```java { .api }

377

import org.apache.flink.streaming.api.datastream.DataStream;

378

import org.apache.flink.streaming.api.datastream.DataStreamSink;

379

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

380

import org.apache.flink.table.data.RowData;

381

```

382

383

### Legacy Provider Types

384

385

```java { .api }

386

import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;

387

import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;

388

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

389

import org.apache.flink.streaming.api.functions.source.SourceFunction;

390

```