or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

built-in-connectors.mddocs/

0

# Built-in Connectors

1

2

Production-ready connectors included with the Flink Table API Java Bridge for common use cases including test data generation, development output, and performance testing.

3

4

## Capabilities

5

6

### DataGen Connector

7

8

Test data generation connector that produces synthetic data based on configurable patterns and constraints.

9

10

#### DataGen Table Source Factory

11

12

Factory for creating DataGen table sources with configurable data generation options.

13

14

```java { .api }

15

/**

16

* Factory for creating DataGen table sources

17

* Connector identifier: "datagen"

18

*/

19

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

20

21

/**

22

* @return "datagen" - the connector identifier for SQL DDL

23

*/

24

public String factoryIdentifier();

25

26

/**

27

* Creates a DataGen table source based on configuration

28

* @param context Factory context with configuration and schema

29

* @return DynamicTableSource instance for data generation

30

*/

31

public DynamicTableSource createDynamicTableSource(Context context);

32

}

33

```

34

35

#### DataGen Configuration Options

36

37

Configuration options for controlling data generation behavior.

38

39

```java { .api }

40

/**

41

* Configuration options for DataGen connector

42

*/

43

public class DataGenConnectorOptions {

44

45

// Rate control options

46

public static final ConfigOption<Long> ROWS_PER_SECOND; // Rate limiting (default: 10000)

47

public static final ConfigOption<Long> NUMBER_OF_ROWS; // Total rows to generate (default: unlimited)

48

public static final ConfigOption<Integer> SOURCE_PARALLELISM; // Source parallelism

49

50

// Field-specific generation options

51

public static final ConfigOption<String> FIELD_KIND; // Generation kind: sequence, random

52

public static final ConfigOption<Integer> FIELD_MIN; // Minimum value for numeric fields

53

public static final ConfigOption<Integer> FIELD_MAX; // Maximum value for numeric fields

54

public static final ConfigOption<Integer> FIELD_MAX_PAST; // Max past time for timestamp fields

55

public static final ConfigOption<Integer> FIELD_LENGTH; // Length for string fields

56

public static final ConfigOption<Integer> FIELD_START; // Start value for sequence fields

57

public static final ConfigOption<Integer> FIELD_END; // End value for sequence fields

58

public static final ConfigOption<Double> FIELD_NULL_RATE; // Null rate (0.0 to 1.0)

59

public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length for string fields

60

}

61

```

62

63

**Usage Examples:**

64

65

```sql

66

-- SQL DDL for DataGen table

67

CREATE TABLE datagen_source (

68

id BIGINT,

69

name STRING,

70

amount DECIMAL(10,2),

71

event_time TIMESTAMP_LTZ(3)

72

) WITH (

73

'connector' = 'datagen',

74

'rows-per-second' = '1000',

75

'number-of-rows' = '100000',

76

'fields.id.kind' = 'sequence',

77

'fields.id.start' = '1',

78

'fields.id.end' = '100000',

79

'fields.name.length' = '10',

80

'fields.amount.min' = '10.00',

81

'fields.amount.max' = '1000.00'

82

);

83

```

84

85

```java

86

// Programmatic DataGen source creation

87

import org.apache.flink.table.api.TableDescriptor;

88

89

TableDescriptor datagenDescriptor = TableDescriptor.forConnector("datagen")

90

.schema(Schema.newBuilder()

91

.column("id", DataTypes.BIGINT())

92

.column("name", DataTypes.STRING())

93

.column("amount", DataTypes.DECIMAL(10, 2))

94

.build())

95

.option("rows-per-second", "1000")

96

.option("number-of-rows", "100000")

97

.option("fields.id.kind", "sequence")

98

.option("fields.id.start", "1")

99

.option("fields.name.length", "10")

100

.build();

101

102

tableEnv.createTable("datagen_source", datagenDescriptor);

103

```

104

105

#### DataGen Data Generation Patterns

106

107

Different data generation strategies available for various field types.

108

109

```java { .api }

110

// Generation kinds available

111

String SEQUENCE = "sequence"; // Sequential numeric values

112

String RANDOM = "random"; // Random values within bounds

113

114

// Field-specific generation patterns

115

// Numeric fields: min/max bounds with random or sequence generation

116

// String fields: configurable length with random character generation

117

// Timestamp fields: random timestamps within time bounds

118

// Boolean fields: random true/false with configurable distribution

119

```

120

121

### Print Connector

122

123

Development and debugging output connector that prints table data to standard output or log files.

124

125

#### Print Table Sink Factory

126

127

Factory for creating Print table sinks with configurable output formatting.

128

129

```java { .api }

130

/**

131

* Factory for creating Print table sinks

132

* Connector identifier: "print"

133

*/

134

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

135

136

/**

137

* @return "print" - the connector identifier for SQL DDL

138

*/

139

public String factoryIdentifier();

140

141

/**

142

* Creates a Print table sink based on configuration

143

* @param context Factory context with configuration and schema

144

* @return DynamicTableSink instance for printing output

145

*/

146

public DynamicTableSink createDynamicTableSink(Context context);

147

}

148

```

149

150

#### Print Configuration Options

151

152

Configuration options for controlling print output behavior.

153

154

```java { .api }

155

/**

156

* Configuration options for Print connector

157

*/

158

public class PrintConnectorOptions {

159

160

public static final ConfigOption<String> PRINT_IDENTIFIER; // Prefix for printed lines

161

public static final ConfigOption<Boolean> STANDARD_ERROR; // Print to stderr instead of stdout

162

public static final ConfigOption<Integer> SINK_PARALLELISM; // Sink parallelism

163

}

164

```

165

166

**Usage Examples:**

167

168

```sql

169

-- SQL DDL for Print table

170

CREATE TABLE print_sink (

171

id BIGINT,

172

name STRING,

173

amount DECIMAL(10,2)

174

) WITH (

175

'connector' = 'print',

176

'print-identifier' = 'orders',

177

'standard-error' = 'false'

178

);

179

180

-- Insert data to print

181

INSERT INTO print_sink SELECT * FROM source_table;

182

```

183

184

```java

185

// Programmatic Print sink creation

186

TableDescriptor printDescriptor = TableDescriptor.forConnector("print")

187

.schema(Schema.newBuilder()

188

.column("id", DataTypes.BIGINT())

189

.column("name", DataTypes.STRING())

190

.column("amount", DataTypes.DECIMAL(10, 2))

191

.build())

192

.option("print-identifier", "debug-output")

193

.build();

194

195

tableEnv.createTable("print_sink", printDescriptor);

196

197

// Execute query with print output

198

Table result = tableEnv.sqlQuery("SELECT * FROM source WHERE amount > 100");

199

result.executeInsert("print_sink");

200

```

201

202

### BlackHole Connector

203

204

Performance testing connector that discards all input data without performing any I/O operations.

205

206

#### BlackHole Table Sink Factory

207

208

Factory for creating BlackHole table sinks for performance benchmarking.

209

210

```java { .api }

211

/**

212

* Factory for creating BlackHole table sinks

213

* Connector identifier: "blackhole"

214

*/

215

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

216

217

/**

218

* @return "blackhole" - the connector identifier for SQL DDL

219

*/

220

public String factoryIdentifier();

221

222

/**

223

* Creates a BlackHole table sink that discards all data

224

* @param context Factory context with configuration and schema

225

* @return DynamicTableSink instance that discards input

226

*/

227

public DynamicTableSink createDynamicTableSink(Context context);

228

229

/**

230

* @return Empty set - BlackHole connector has no required options

231

*/

232

public Set<ConfigOption<?>> requiredOptions();

233

234

/**

235

* @return Empty set - BlackHole connector has no optional options

236

*/

237

public Set<ConfigOption<?>> optionalOptions();

238

}

239

```

240

241

**Usage Examples:**

242

243

```sql

244

-- SQL DDL for BlackHole table

245

CREATE TABLE blackhole_sink (

246

id BIGINT,

247

name STRING,

248

amount DECIMAL(10,2),

249

event_time TIMESTAMP_LTZ(3)

250

) WITH (

251

'connector' = 'blackhole'

252

);

253

254

-- Performance test query

255

INSERT INTO blackhole_sink

256

SELECT id, name, amount, event_time

257

FROM high_volume_source;

258

```

259

260

```java

261

// Programmatic BlackHole sink creation

262

TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")

263

.schema(Schema.newBuilder()

264

.column("id", DataTypes.BIGINT())

265

.column("name", DataTypes.STRING())

266

.column("amount", DataTypes.DECIMAL(10, 2))

267

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

268

.build())

269

.build();

270

271

tableEnv.createTable("blackhole_sink", blackholeDescriptor);

272

273

// Benchmark query performance

274

long startTime = System.currentTimeMillis();

275

Table result = tableEnv.sqlQuery("SELECT * FROM large_source WHERE complex_condition");

276

result.executeInsert("blackhole_sink").await();

277

long endTime = System.currentTimeMillis();

278

System.out.println("Query execution time: " + (endTime - startTime) + "ms");

279

```

280

281

## Type Definitions

282

283

### Factory Base Interfaces

284

285

```java { .api }

286

import org.apache.flink.table.factories.DynamicTableSourceFactory;

287

import org.apache.flink.table.factories.DynamicTableSinkFactory;

288

import org.apache.flink.table.factories.FactoryUtil;

289

290

// Factory interface hierarchy

291

interface DynamicTableSourceFactory extends Factory {

292

String factoryIdentifier();

293

DynamicTableSource createDynamicTableSource(Context context);

294

Set<ConfigOption<?>> requiredOptions();

295

Set<ConfigOption<?>> optionalOptions();

296

}

297

298

interface DynamicTableSinkFactory extends Factory {

299

String factoryIdentifier();

300

DynamicTableSink createDynamicTableSink(Context context);

301

Set<ConfigOption<?>> requiredOptions();

302

Set<ConfigOption<?>> optionalOptions();

303

}

304

```

305

306

### Configuration Options

307

308

```java { .api }

309

import org.apache.flink.configuration.ConfigOption;

310

import org.apache.flink.configuration.ConfigOptions;

311

312

// Configuration option creation patterns

313

ConfigOption<String> stringOption = ConfigOptions

314

.key("option.name")

315

.stringType()

316

.defaultValue("default")

317

.withDescription("Option description");

318

319

ConfigOption<Integer> intOption = ConfigOptions

320

.key("option.number")

321

.intType()

322

.noDefaultValue()

323

.withDescription("Numeric option");

324

325

ConfigOption<Boolean> boolOption = ConfigOptions

326

.key("option.flag")

327

.booleanType()

328

.defaultValue(false)

329

.withDescription("Boolean flag");

330

```

331

332

### Data Generation Types

333

334

```java { .api }

335

// DataGen field generation strategies

336

enum GenerationKind {

337

SEQUENCE, // Sequential values (numeric types)

338

RANDOM // Random values within bounds

339

}

340

341

// DataGen type-specific options

342

class NumericGenerationOptions {

343

Integer min; // Minimum value

344

Integer max; // Maximum value

345

GenerationKind kind; // SEQUENCE or RANDOM

346

}

347

348

class StringGenerationOptions {

349

Integer length; // Fixed or maximum length

350

Boolean variableLength; // Enable variable length strings

351

}

352

353

class TimestampGenerationOptions {

354

Integer maxPast; // Maximum milliseconds in past

355

}

356

```

357

358

## Use Cases

359

360

### Development and Testing

361

362

```java

363

// Complete test data pipeline

364

// 1. Generate test data

365

CREATE TABLE test_orders (

366

order_id BIGINT,

367

customer_id INT,

368

product_name STRING,

369

quantity INT,

370

price DECIMAL(10,2),

371

order_time TIMESTAMP_LTZ(3)

372

) WITH (

373

'connector' = 'datagen',

374

'rows-per-second' = '100',

375

'number-of-rows' = '10000',

376

'fields.order_id.kind' = 'sequence',

377

'fields.customer_id.min' = '1000',

378

'fields.customer_id.max' = '9999',

379

'fields.product_name.length' = '20',

380

'fields.quantity.min' = '1',

381

'fields.quantity.max' = '10',

382

'fields.price.min' = '10.00',

383

'fields.price.max' = '999.99'

384

);

385

386

// 2. Process and debug

387

CREATE TABLE debug_output (

388

customer_id INT,

389

total_amount DECIMAL(10,2),

390

order_count BIGINT

391

) WITH (

392

'connector' = 'print',

393

'print-identifier' = 'customer-summary'

394

);

395

396

INSERT INTO debug_output

397

SELECT

398

customer_id,

399

SUM(price * quantity) as total_amount,

400

COUNT(*) as order_count

401

FROM test_orders

402

GROUP BY customer_id;

403

```

404

405

### Performance Benchmarking

406

407

```java

408

// Benchmark query processing performance

409

CREATE TABLE perf_source (

410

id BIGINT,

411

payload STRING,

412

timestamp_val TIMESTAMP_LTZ(3)

413

) WITH (

414

'connector' = 'datagen',

415

'rows-per-second' = '10000',

416

'fields.payload.length' = '1000'

417

);

418

419

CREATE TABLE perf_sink (

420

id BIGINT,

421

processed_payload STRING,

422

processing_time TIMESTAMP_LTZ(3)

423

) WITH (

424

'connector' = 'blackhole'

425

);

426

427

-- Benchmark complex processing

428

INSERT INTO perf_sink

429

SELECT

430

id,

431

UPPER(SUBSTRING(payload, 1, 100)) as processed_payload,

432

CURRENT_TIMESTAMP as processing_time

433

FROM perf_source

434

WHERE LENGTH(payload) > 500;

435

```