or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

built-in-connectors.mddocs/

0

# Built-in Connectors

1

2

The Flink Table API Java Bridge includes several built-in connectors designed for development, testing, and specific use cases. These connectors provide ready-to-use implementations for common scenarios.

3

4

## DataGen Connector

5

6

The DataGen connector generates random data for testing and development purposes. It supports various data types and generation strategies.

7

8

### DataGenTableSourceFactory

9

10

```java { .api }

11

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

12

public static final String IDENTIFIER = "datagen";

13

14

// Factory methods for creating DataGen sources

15

}

16

```

17

18

### Configuration Options

19

20

The DataGen connector supports the following configuration options through `DataGenConnectorOptions`:

21

22

```java { .api }

23

public class DataGenConnectorOptions {

24

// Row generation options

25

public static final ConfigOption<Long> ROWS_PER_SECOND;

26

public static final ConfigOption<Long> NUMBER_OF_ROWS;

27

28

// Field-specific options

29

public static final ConfigOption<String> FIELDS_PREFIX;

30

public static final ConfigOption<String> KIND_OPTION;

31

public static final ConfigOption<String> START_OPTION;

32

public static final ConfigOption<String> END_OPTION;

33

public static final ConfigOption<String> MAX_PAST_OPTION;

34

public static final ConfigOption<String> LENGTH_OPTION;

35

}

36

```

37

38

### Usage Examples

39

40

**Basic Random Data Generation:**

41

42

```sql

43

CREATE TABLE random_source (

44

id BIGINT,

45

name STRING,

46

age INT,

47

score DOUBLE,

48

birthday TIMESTAMP(3)

49

) WITH (

50

'connector' = 'datagen',

51

'rows-per-second' = '100',

52

'number-of-rows' = '1000'

53

);

54

```

55

56

**Field-Specific Configuration:**

57

58

```sql

59

CREATE TABLE configured_source (

60

user_id BIGINT,

61

username STRING,

62

age INT,

63

balance DECIMAL(10,2),

64

registration_time TIMESTAMP(3)

65

) WITH (

66

'connector' = 'datagen',

67

'rows-per-second' = '50',

68

69

-- Configure user_id as sequence

70

'fields.user_id.kind' = 'sequence',

71

'fields.user_id.start' = '1',

72

'fields.user_id.end' = '1000',

73

74

-- Configure username with specific length

75

'fields.username.length' = '10',

76

77

-- Configure age with range

78

'fields.age.min' = '18',

79

'fields.age.max' = '65',

80

81

-- Configure balance with range

82

'fields.balance.min' = '0.00',

83

'fields.balance.max' = '10000.00'

84

);

85

```

86

87

**Programmatic Creation:**

88

89

```java

90

// Create DataGen source programmatically

91

TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")

92

.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)

93

.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 5000L)

94

.schema(Schema.newBuilder()

95

.column("id", DataTypes.BIGINT())

96

.column("name", DataTypes.STRING())

97

.column("value", DataTypes.DOUBLE())

98

.build())

99

.build();

100

101

tableEnv.createTable("generated_data", sourceDescriptor);

102

```

103

104

### Data Generation Strategies

105

106

The DataGen connector supports different generation strategies:

107

108

1. **Random Generation**: Default behavior for most data types

109

2. **Sequence Generation**: Sequential numeric values

110

3. **Custom Ranges**: Min/max values for numeric types

111

4. **String Length**: Configurable string lengths

112

113

## Print Connector

114

115

The Print connector outputs table data to standard output, useful for debugging and development.

116

117

### PrintTableSinkFactory

118

119

```java { .api }

120

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

121

public static final String IDENTIFIER = "print";

122

123

// Factory methods for creating Print sinks

124

}

125

```

126

127

### Usage Examples

128

129

**Basic Print Sink:**

130

131

```sql

132

CREATE TABLE print_sink (

133

id BIGINT,

134

name STRING,

135

value DOUBLE

136

) WITH (

137

'connector' = 'print'

138

);

139

140

-- Insert data to see output

141

INSERT INTO print_sink SELECT id, name, value FROM source_table;

142

```

143

144

**Print with Identifier:**

145

146

```sql

147

CREATE TABLE debug_print (

148

user_id BIGINT,

149

event_type STRING,

150

timestamp_col TIMESTAMP(3)

151

) WITH (

152

'connector' = 'print',

153

'print-identifier' = 'DEBUG'

154

);

155

```

156

157

**Programmatic Creation:**

158

159

```java

160

// Create Print sink programmatically

161

TableDescriptor printDescriptor = TableDescriptor.forConnector("print")

162

.option("print-identifier", "MyOutput")

163

.schema(Schema.newBuilder()

164

.column("id", DataTypes.BIGINT())

165

.column("message", DataTypes.STRING())

166

.column("timestamp_col", DataTypes.TIMESTAMP(3))

167

.build())

168

.build();

169

170

tableEnv.createTable("debug_output", printDescriptor);

171

172

// Use in statement set

173

StreamStatementSet statementSet = tableEnv.createStatementSet();

174

statementSet.addInsert("debug_output", sourceTable);

175

statementSet.attachAsDataStream();

176

```

177

178

### Output Format

179

180

The Print connector outputs data in a readable format:

181

182

```

183

DEBUG> +I[1001, Alice, 2023-12-01T10:30:00]

184

DEBUG> +I[1002, Bob, 2023-12-01T10:31:00]

185

DEBUG> -U[1001, Alice, 2023-12-01T10:30:00]

186

DEBUG> +U[1001, Alice Updated, 2023-12-01T10:30:00]

187

```

188

189

Where:

190

- `+I`: Insert operation

191

- `-U`: Update before (retract)

192

- `+U`: Update after

193

- `-D`: Delete operation

194

195

## BlackHole Connector

196

197

The BlackHole connector discards all data, useful for performance testing and benchmarking.

198

199

### BlackHoleTableSinkFactory

200

201

```java { .api }

202

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

203

public static final String IDENTIFIER = "blackhole";

204

205

// Factory methods for creating BlackHole sinks

206

}

207

```

208

209

### Usage Examples

210

211

**Basic BlackHole Sink:**

212

213

```sql

214

CREATE TABLE blackhole_sink (

215

id BIGINT,

216

data STRING,

217

timestamp_col TIMESTAMP(3)

218

) WITH (

219

'connector' = 'blackhole'

220

);

221

222

-- All data inserted here will be discarded

223

INSERT INTO blackhole_sink SELECT * FROM high_volume_source;

224

```

225

226

**Performance Testing:**

227

228

```java

229

// Create BlackHole sink for performance testing

230

TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")

231

.schema(Schema.newBuilder()

232

.column("id", DataTypes.BIGINT())

233

.column("payload", DataTypes.STRING())

234

.column("processing_time", DataTypes.TIMESTAMP(3))

235

.build())

236

.build();

237

238

tableEnv.createTable("perf_test_sink", blackholeDescriptor);

239

240

// Test query performance

241

Table testQuery = tableEnv.sqlQuery("""

242

SELECT

243

id,

244

UPPER(payload) as payload,

245

CURRENT_TIMESTAMP as processing_time

246

FROM source_table

247

WHERE id % 1000 = 0

248

""");

249

250

StreamStatementSet statementSet = tableEnv.createStatementSet();

251

statementSet.addInsert("perf_test_sink", testQuery);

252

statementSet.attachAsDataStream();

253

```

254

255

## Legacy CSV Connector (Testing Only)

256

257

The CSV connector is maintained only for testing the legacy connector stack and should not be used in production.

258

259

### CsvTableSource

260

261

```java { .api }

262

@Deprecated

263

public class CsvTableSource extends InputFormatTableSource<Row> {

264

// Constructor and configuration methods

265

public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);

266

public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes,

267

String fieldDelim, String rowDelim, Character quoteCharacter,

268

boolean ignoreFirstLine, String ignoreComments, boolean lenient);

269

}

270

```

271

272

### CsvTableSink

273

274

```java { .api }

275

@Deprecated

276

public class CsvTableSink extends OutputFormatTableSink<Row> {

277

// Constructor and configuration methods

278

public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);

279

}

280

```

281

282

### Legacy Usage (Deprecated)

283

284

```java

285

// Legacy CSV source (deprecated - use modern file connectors instead)

286

CsvTableSource csvSource = CsvTableSource.builder()

287

.path("/path/to/input.csv")

288

.field("id", Types.LONG)

289

.field("name", Types.STRING)

290

.field("age", Types.INT)

291

.fieldDelimiter(",")

292

.ignoreFirstLine()

293

.build();

294

295

tableEnv.registerTableSource("csv_input", csvSource);

296

297

// Legacy CSV sink (deprecated)

298

CsvTableSink csvSink = new CsvTableSink(

299

"/path/to/output.csv",

300

",", // field delimiter

301

1, // num files

302

WriteMode.OVERWRITE

303

);

304

305

tableEnv.registerTableSink("csv_output", csvSink);

306

```

307

308

## Common Patterns and Best Practices

309

310

### Testing Data Pipeline

311

312

```java

313

// Create test data with DataGen

314

TableDescriptor testDataDesc = TableDescriptor.forConnector("datagen")

315

.option(DataGenConnectorOptions.ROWS_PER_SECOND, 1000L)

316

.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 10000L)

317

.schema(Schema.newBuilder()

318

.column("transaction_id", DataTypes.BIGINT())

319

.column("user_id", DataTypes.STRING())

320

.column("amount", DataTypes.DECIMAL(10, 2))

321

.column("transaction_time", DataTypes.TIMESTAMP(3))

322

.build())

323

.build();

324

325

tableEnv.createTable("test_transactions", testDataDesc);

326

327

// Process the data

328

Table processedData = tableEnv.sqlQuery("""

329

SELECT

330

user_id,

331

COUNT(*) as transaction_count,

332

SUM(amount) as total_amount,

333

TUMBLE_START(transaction_time, INTERVAL '1' MINUTE) as window_start

334

FROM test_transactions

335

GROUP BY

336

user_id,

337

TUMBLE(transaction_time, INTERVAL '1' MINUTE)

338

""");

339

340

// Output for debugging

341

TableDescriptor printDesc = TableDescriptor.forConnector("print")

342

.option("print-identifier", "PROCESSED")

343

.schema(processedData.getResolvedSchema())

344

.build();

345

346

tableEnv.createTable("processed_output", printDesc);

347

348

// Execute

349

StreamStatementSet statements = tableEnv.createStatementSet();

350

statements.addInsert("processed_output", processedData);

351

statements.attachAsDataStream();

352

```

353

354

### Performance Benchmarking

355

356

```java

357

// Generate high-volume test data

358

TableDescriptor highVolumeSource = TableDescriptor.forConnector("datagen")

359

.option(DataGenConnectorOptions.ROWS_PER_SECOND, 10000L)

360

.schema(Schema.newBuilder()

361

.column("id", DataTypes.BIGINT())

362

.column("data", DataTypes.STRING())

363

.column("timestamp_col", DataTypes.TIMESTAMP(3))

364

.build())

365

.build();

366

367

tableEnv.createTable("benchmark_source", highVolumeSource);

368

369

// Complex processing query

370

Table benchmarkQuery = tableEnv.sqlQuery("""

371

SELECT

372

id,

373

UPPER(data) as processed_data,

374

COUNT(*) OVER (

375

PARTITION BY id % 100

376

ORDER BY timestamp_col

377

RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW

378

) as windowed_count

379

FROM benchmark_source

380

""");

381

382

// Discard results for pure processing benchmark

383

TableDescriptor blackholeDesc = TableDescriptor.forConnector("blackhole")

384

.schema(benchmarkQuery.getResolvedSchema())

385

.build();

386

387

tableEnv.createTable("benchmark_sink", blackholeDesc);

388

389

// Measure processing throughput

390

StreamStatementSet benchmark = tableEnv.createStatementSet();

391

benchmark.addInsert("benchmark_sink", benchmarkQuery);

392

benchmark.attachAsDataStream();

393

```

394

395

### Development Debugging

396

397

```java

398

// Debug intermediate results in complex pipelines

399

public void debugPipeline() {

400

// Step 1: Raw data

401

Table rawData = tableEnv.sqlQuery("SELECT * FROM source_table LIMIT 100");

402

createPrintSink("debug_raw", rawData);

403

404

// Step 2: After filtering

405

Table filtered = tableEnv.sqlQuery("""

406

SELECT * FROM source_table

407

WHERE status = 'ACTIVE' AND amount > 100

408

LIMIT 100

409

""");

410

createPrintSink("debug_filtered", filtered);

411

412

// Step 3: After aggregation

413

Table aggregated = tableEnv.sqlQuery("""

414

SELECT

415

user_id,

416

COUNT(*) as count,

417

SUM(amount) as total

418

FROM source_table

419

WHERE status = 'ACTIVE'

420

GROUP BY user_id

421

LIMIT 50

422

""");

423

createPrintSink("debug_aggregated", aggregated);

424

}

425

426

private void createPrintSink(String name, Table table) {

427

TableDescriptor desc = TableDescriptor.forConnector("print")

428

.option("print-identifier", name.toUpperCase())

429

.schema(table.getResolvedSchema())

430

.build();

431

432

tableEnv.createTable(name + "_sink", desc);

433

434

StreamStatementSet statements = tableEnv.createStatementSet();

435

statements.addInsert(name + "_sink", table);

436

statements.attachAsDataStream();

437

}

438

```