or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

builtin-connectors.mddocs/

0

# Built-in Connectors

1

2

Ready-to-use connectors for development, testing, and debugging table applications. These connectors are included in the Java Bridge module and provide essential functionality for testing data pipelines and debugging table operations.

3

4

## Capabilities

5

6

### BlackHole Connector

7

8

High-performance sink connector that discards all input records. Designed for performance testing and scenarios where output is not needed.

9

10

```sql { .api }

11

-- Create BlackHole sink table

12

CREATE TABLE sink_table (

13

user_id STRING,

14

order_count BIGINT,

15

total_amount DECIMAL(10, 2)

16

) WITH (

17

'connector' = 'blackhole'

18

);

19

20

-- Insert data (will be discarded)

21

INSERT INTO sink_table

22

SELECT user_id, COUNT(*), SUM(amount)

23

FROM orders

24

GROUP BY user_id;

25

```

26

27

**Java Factory Usage:**

28

29

```java { .api }

30

/**

31

* BlackHole table sink factory for discarding all input records

32

* Identifier: "blackhole"

33

*/

34

public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {

35

public static final String IDENTIFIER = "blackhole";

36

37

public String factoryIdentifier();

38

public Set<ConfigOption<?>> requiredOptions(); // Empty set

39

public Set<ConfigOption<?>> optionalOptions(); // Empty set

40

public DynamicTableSink createDynamicTableSink(Context context);

41

}

42

```

43

44

**Usage Examples:**

45

46

```java

47

// BlackHole sink supports all changelog modes and partitioning

48

// Automatically filters out UPDATE_BEFORE events for efficiency

49

50

// Performance testing setup

51

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

52

53

// Create source with high data volume

54

tableEnv.executeSql(

55

"CREATE TABLE source_table (" +

56

" id BIGINT," +

57

" data STRING," +

58

" ts TIMESTAMP(3)" +

59

") WITH (" +

60

" 'connector' = 'datagen'," +

61

" 'rows-per-second' = '1000000'" +

62

")"

63

);

64

65

// Create BlackHole sink for performance testing

66

tableEnv.executeSql(

67

"CREATE TABLE perf_sink (" +

68

" id BIGINT," +

69

" processed_data STRING" +

70

") WITH (" +

71

" 'connector' = 'blackhole'" +

72

")"

73

);

74

75

// Test query performance (output discarded)

76

tableEnv.executeSql(

77

"INSERT INTO perf_sink " +

78

"SELECT id, UPPER(data) FROM source_table"

79

);

80

```

81

82

### DataGen Connector

83

84

Flexible data generation connector for creating test data with configurable patterns and data types.

85

86

```java { .api }

87

/**

88

* Configuration options for DataGen connector

89

*/

90

public class DataGenConnectorOptions {

91

92

/** Control emission rate */

93

public static final ConfigOption<Long> ROWS_PER_SECOND; // Default: 10000

94

95

/** Total rows to emit (unbounded if not set) */

96

public static final ConfigOption<Long> NUMBER_OF_ROWS;

97

98

/** Source parallelism */

99

public static final ConfigOption<Integer> SOURCE_PARALLELISM;

100

101

// Field-specific configuration options

102

public static final ConfigOption<String> FIELD_KIND; // 'random' or 'sequence'

103

public static final ConfigOption<String> FIELD_MIN; // Minimum value for random

104

public static final ConfigOption<String> FIELD_MAX; // Maximum value for random

105

public static final ConfigOption<Duration> FIELD_MAX_PAST; // Max past for timestamps

106

public static final ConfigOption<Integer> FIELD_LENGTH; // Collection size/string length

107

public static final ConfigOption<String> FIELD_START; // Sequence start value

108

public static final ConfigOption<String> FIELD_END; // Sequence end value

109

public static final ConfigOption<Float> FIELD_NULL_RATE; // Proportion of nulls

110

public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length data

111

}

112

```

113

114

**SQL Usage Examples:**

115

116

```sql { .api }

117

-- Basic DataGen source

118

CREATE TABLE users_source (

119

user_id BIGINT,

120

username STRING,

121

age INT,

122

created_at TIMESTAMP(3)

123

) WITH (

124

'connector' = 'datagen',

125

'rows-per-second' = '100',

126

'number-of-rows' = '10000'

127

);

128

129

-- Advanced DataGen with field-specific configuration

130

CREATE TABLE orders_source (

131

order_id BIGINT,

132

user_id STRING,

133

product_name STRING,

134

quantity INT,

135

price DECIMAL(10, 2),

136

order_time TIMESTAMP(3)

137

) WITH (

138

'connector' = 'datagen',

139

'rows-per-second' = '50',

140

141

-- Sequence generator for order_id

142

'fields.order_id.kind' = 'sequence',

143

'fields.order_id.start' = '1',

144

'fields.order_id.end' = '1000000',

145

146

-- Random string for user_id

147

'fields.user_id.kind' = 'random',

148

'fields.user_id.length' = '8',

149

150

-- Random product names with variable length

151

'fields.product_name.kind' = 'random',

152

'fields.product_name.length' = '20',

153

'fields.product_name.var-len' = 'true',

154

155

-- Random quantity with bounds

156

'fields.quantity.kind' = 'random',

157

'fields.quantity.min' = '1',

158

'fields.quantity.max' = '10',

159

160

-- Random price with bounds

161

'fields.price.kind' = 'random',

162

'fields.price.min' = '10.00',

163

'fields.price.max' = '999.99',

164

165

-- Random timestamps within past 30 days

166

'fields.order_time.kind' = 'random',

167

'fields.order_time.max-past' = '30d'

168

);

169

170

-- DataGen with null values

171

CREATE TABLE sparse_data (

172

id BIGINT,

173

optional_field STRING,

174

another_field INT

175

) WITH (

176

'connector' = 'datagen',

177

'rows-per-second' = '10',

178

179

'fields.optional_field.null-rate' = '0.3', -- 30% null values

180

'fields.another_field.null-rate' = '0.1' -- 10% null values

181

);

182

```

183

184

**Java Factory Usage:**

185

186

```java { .api }

187

/**

188

* DataGen table source factory for generating test data

189

*/

190

public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

191

192

public String factoryIdentifier(); // Returns "datagen"

193

public Set<ConfigOption<?>> requiredOptions(); // Empty - all options optional

194

public Set<ConfigOption<?>> optionalOptions(); // All DataGenConnectorOptions

195

public DynamicTableSource createDynamicTableSource(Context context);

196

}

197

198

/**

199

* DataGen table source implementation

200

*/

201

public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {

202

// Supports limit push-down for bounded data generation

203

}

204

```

205

206

### Print Connector

207

208

Debug connector that outputs table data to console with configurable formatting.

209

210

```java { .api }

211

/**

212

* Configuration options for Print connector

213

*/

214

public class PrintConnectorOptions {

215

// Standard print connector options (inherited from base framework)

216

// Supports standard print formatting and output configuration

217

}

218

```

219

220

**SQL Usage Examples:**

221

222

```sql { .api }

223

-- Basic print sink

224

CREATE TABLE debug_output (

225

user_id STRING,

226

username STRING,

227

score INT

228

) WITH (

229

'connector' = 'print'

230

);

231

232

-- Print with custom prefix

233

CREATE TABLE debug_detailed (

234

order_id BIGINT,

235

status STRING,

236

timestamp TIMESTAMP(3)

237

) WITH (

238

'connector' = 'print',

239

'print-identifier' = 'ORDER_DEBUG'

240

);

241

242

-- Insert data to see output in console

243

INSERT INTO debug_output

244

SELECT user_id, username, score

245

FROM user_scores

246

WHERE score > 100;

247

```

248

249

**Java Factory Usage:**

250

251

```java { .api }

252

/**

253

* Print table sink factory for console output debugging

254

*/

255

public class PrintTableSinkFactory implements DynamicTableSinkFactory {

256

257

public String factoryIdentifier(); // Returns "print"

258

public Set<ConfigOption<?>> requiredOptions(); // Empty

259

public Set<ConfigOption<?>> optionalOptions(); // Print-specific options

260

public DynamicTableSink createDynamicTableSink(Context context);

261

}

262

```

263

264

**Usage Examples:**

265

266

```java

267

// Debug complex transformations

268

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

269

270

// Create source

271

tableEnv.executeSql(

272

"CREATE TABLE transactions (" +

273

" txn_id STRING," +

274

" amount DECIMAL(10, 2)," +

275

" txn_time TIMESTAMP(3)" +

276

") WITH (" +

277

" 'connector' = 'datagen'," +

278

" 'fields.amount.min' = '1.00'," +

279

" 'fields.amount.max' = '1000.00'" +

280

")"

281

);

282

283

// Create debug sink

284

tableEnv.executeSql(

285

"CREATE TABLE debug_aggregates (" +

286

" window_start TIMESTAMP(3)," +

287

" window_end TIMESTAMP(3)," +

288

" total_amount DECIMAL(12, 2)," +

289

" txn_count BIGINT" +

290

") WITH (" +

291

" 'connector' = 'print'," +

292

" 'print-identifier' = 'WINDOW_AGGREGATES'" +

293

")"

294

);

295

296

// Debug windowed aggregation

297

tableEnv.executeSql(

298

"INSERT INTO debug_aggregates " +

299

"SELECT " +

300

" window_start, " +

301

" window_end, " +

302

" SUM(amount) as total_amount, " +

303

" COUNT(*) as txn_count " +

304

"FROM TABLE(" +

305

" TUMBLE(TABLE transactions, DESCRIPTOR(txn_time), INTERVAL '1' MINUTE)" +

306

") " +

307

"GROUP BY window_start, window_end"

308

);

309

```

310

311

## Connector Combinations

312

313

### Testing Pipeline Pattern

314

315

Combine connectors for comprehensive testing workflows.

316

317

```sql

318

-- Complete testing pipeline

319

-- 1. Generate test data

320

CREATE TABLE test_orders (

321

order_id BIGINT,

322

customer_id STRING,

323

amount DECIMAL(10, 2),

324

order_time TIMESTAMP(3),

325

WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

326

) WITH (

327

'connector' = 'datagen',

328

'rows-per-second' = '100',

329

'fields.order_id.kind' = 'sequence',

330

'fields.order_id.start' = '1',

331

'fields.customer_id.kind' = 'random',

332

'fields.customer_id.length' = '6',

333

'fields.amount.kind' = 'random',

334

'fields.amount.min' = '10.00',

335

'fields.amount.max' = '500.00'

336

);

337

338

-- 2. Debug intermediate results

339

CREATE TABLE debug_customer_stats (

340

customer_id STRING,

341

order_count BIGINT,

342

total_amount DECIMAL(12, 2),

343

avg_amount DECIMAL(10, 2)

344

) WITH (

345

'connector' = 'print',

346

'print-identifier' = 'CUSTOMER_STATS'

347

);

348

349

-- 3. Performance test final sink

350

CREATE TABLE perf_sink (

351

customer_id STRING,

352

order_count BIGINT,

353

total_amount DECIMAL(12, 2)

354

) WITH (

355

'connector' = 'blackhole'

356

);

357

358

-- Execute testing workflow

359

INSERT INTO debug_customer_stats

360

SELECT

361

customer_id,

362

COUNT(*) as order_count,

363

SUM(amount) as total_amount,

364

AVG(amount) as avg_amount

365

FROM test_orders

366

GROUP BY customer_id;

367

368

INSERT INTO perf_sink

369

SELECT customer_id, order_count, total_amount

370

FROM debug_customer_stats;

371

```

372

373

## Data Generation Patterns

374

375

### Realistic Test Data

376

377

Configure DataGen for realistic business scenarios.

378

379

```sql

380

-- E-commerce user behavior simulation

381

CREATE TABLE user_events (

382

user_id STRING,

383

event_type STRING,

384

product_id STRING,

385

session_id STRING,

386

event_time TIMESTAMP(3),

387

WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

388

) WITH (

389

'connector' = 'datagen',

390

'rows-per-second' = '1000',

391

392

-- Realistic user IDs

393

'fields.user_id.kind' = 'random',

394

'fields.user_id.length' = '10',

395

396

-- Event types with realistic distribution (would need custom generator)

397

'fields.event_type.kind' = 'random',

398

'fields.event_type.length' = '15',

399

400

-- Product catalog simulation

401

'fields.product_id.kind' = 'sequence',

402

'fields.product_id.start' = '100',

403

'fields.product_id.end' = '999',

404

405

-- Session tracking

406

'fields.session_id.kind' = 'random',

407

'fields.session_id.length' = '32'

408

);

409

```

410

411

## Types

412

413

### Connector Configuration Types

414

415

```java { .api }

416

import org.apache.flink.configuration.ConfigOption;

417

import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;

418

import org.apache.flink.connector.print.table.PrintConnectorOptions;

419

import java.time.Duration;

420

```

421

422

### Factory Implementation Types

423

424

```java { .api }

425

import org.apache.flink.table.factories.DynamicTableSourceFactory;

426

import org.apache.flink.table.factories.DynamicTableSinkFactory;

427

import org.apache.flink.table.connector.source.ScanTableSource;

428

import org.apache.flink.table.connector.sink.DynamicTableSink;

429

```

430

431

### Built-in Connector Identifiers

432

433

```java { .api }

434

// Connector identifiers for SQL DDL

435

public static final String BLACKHOLE_IDENTIFIER = "blackhole";

436

public static final String DATAGEN_IDENTIFIER = "datagen";

437

public static final String PRINT_IDENTIFIER = "print";

438

```