0
# Built-in Connectors
1
2
Production-ready connectors included with the Flink Table API Java Bridge for common use cases including test data generation, development output, and performance testing.
3
4
## Capabilities
5
6
### DataGen Connector
7
8
Test data generation connector that produces synthetic data based on configurable patterns and constraints.
9
10
#### DataGen Table Source Factory
11
12
Factory for creating DataGen table sources with configurable data generation options.
13
14
```java { .api }
15
/**
16
* Factory for creating DataGen table sources
17
* Connector identifier: "datagen"
18
*/
19
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
20
21
/**
22
* @return "datagen" - the connector identifier for SQL DDL
23
*/
24
public String factoryIdentifier();
25
26
/**
27
* Creates a DataGen table source based on configuration
28
* @param context Factory context with configuration and schema
29
* @return DynamicTableSource instance for data generation
30
*/
31
public DynamicTableSource createDynamicTableSource(Context context);
32
}
33
```
34
35
#### DataGen Configuration Options
36
37
Configuration options for controlling data generation behavior.
38
39
```java { .api }
40
/**
41
* Configuration options for DataGen connector
42
*/
43
public class DataGenConnectorOptions {
44
45
// Rate control options
46
public static final ConfigOption<Long> ROWS_PER_SECOND; // Rate limiting (default: 10000)
47
public static final ConfigOption<Long> NUMBER_OF_ROWS; // Total rows to generate (default: unlimited)
48
public static final ConfigOption<Integer> SOURCE_PARALLELISM; // Source parallelism
49
50
// Field-specific generation options
51
public static final ConfigOption<String> FIELD_KIND; // Generation kind: sequence, random
52
public static final ConfigOption<Integer> FIELD_MIN; // Minimum value for numeric fields
53
public static final ConfigOption<Integer> FIELD_MAX; // Maximum value for numeric fields
54
public static final ConfigOption<Integer> FIELD_MAX_PAST; // Max past time for timestamp fields
55
public static final ConfigOption<Integer> FIELD_LENGTH; // Length for string fields
56
public static final ConfigOption<Integer> FIELD_START; // Start value for sequence fields
57
public static final ConfigOption<Integer> FIELD_END; // End value for sequence fields
58
public static final ConfigOption<Double> FIELD_NULL_RATE; // Null rate (0.0 to 1.0)
59
public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length for string fields
60
}
61
```
62
63
**Usage Examples:**
64
65
```sql
66
-- SQL DDL for DataGen table
67
CREATE TABLE datagen_source (
68
id BIGINT,
69
name STRING,
70
amount DECIMAL(10,2),
71
event_time TIMESTAMP_LTZ(3)
72
) WITH (
73
'connector' = 'datagen',
74
'rows-per-second' = '1000',
75
'number-of-rows' = '100000',
76
'fields.id.kind' = 'sequence',
77
'fields.id.start' = '1',
78
'fields.id.end' = '100000',
79
'fields.name.length' = '10',
80
'fields.amount.min' = '10.00',
81
'fields.amount.max' = '1000.00'
82
);
83
```
84
85
```java
86
// Programmatic DataGen source creation
87
import org.apache.flink.table.api.TableDescriptor;
88
89
TableDescriptor datagenDescriptor = TableDescriptor.forConnector("datagen")
90
.schema(Schema.newBuilder()
91
.column("id", DataTypes.BIGINT())
92
.column("name", DataTypes.STRING())
93
.column("amount", DataTypes.DECIMAL(10, 2))
94
.build())
95
.option("rows-per-second", "1000")
96
.option("number-of-rows", "100000")
97
.option("fields.id.kind", "sequence")
98
.option("fields.id.start", "1")
99
.option("fields.name.length", "10")
100
.build();
101
102
tableEnv.createTable("datagen_source", datagenDescriptor);
103
```
104
105
#### DataGen Data Generation Patterns
106
107
Different data generation strategies available for various field types.
108
109
```java { .api }
110
// Generation kinds available
111
String SEQUENCE = "sequence"; // Sequential numeric values
112
String RANDOM = "random"; // Random values within bounds
113
114
// Field-specific generation patterns
115
// Numeric fields: min/max bounds with random or sequence generation
116
// String fields: configurable length with random character generation
117
// Timestamp fields: random timestamps within time bounds
118
// Boolean fields: random true/false with configurable distribution
119
```
120
121
### Print Connector
122
123
Development and debugging output connector that prints table data to standard output or log files.
124
125
#### Print Table Sink Factory
126
127
Factory for creating Print table sinks with configurable output formatting.
128
129
```java { .api }
130
/**
131
* Factory for creating Print table sinks
132
* Connector identifier: "print"
133
*/
134
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
135
136
/**
137
* @return "print" - the connector identifier for SQL DDL
138
*/
139
public String factoryIdentifier();
140
141
/**
142
* Creates a Print table sink based on configuration
143
* @param context Factory context with configuration and schema
144
* @return DynamicTableSink instance for printing output
145
*/
146
public DynamicTableSink createDynamicTableSink(Context context);
147
}
148
```
149
150
#### Print Configuration Options
151
152
Configuration options for controlling print output behavior.
153
154
```java { .api }
155
/**
156
* Configuration options for Print connector
157
*/
158
public class PrintConnectorOptions {
159
160
public static final ConfigOption<String> PRINT_IDENTIFIER; // Prefix for printed lines
161
public static final ConfigOption<Boolean> STANDARD_ERROR; // Print to stderr instead of stdout
162
public static final ConfigOption<Integer> SINK_PARALLELISM; // Sink parallelism
163
}
164
```
165
166
**Usage Examples:**
167
168
```sql
169
-- SQL DDL for Print table
170
CREATE TABLE print_sink (
171
id BIGINT,
172
name STRING,
173
amount DECIMAL(10,2)
174
) WITH (
175
'connector' = 'print',
176
'print-identifier' = 'orders',
177
'standard-error' = 'false'
178
);
179
180
-- Insert data to print
181
INSERT INTO print_sink SELECT * FROM source_table;
182
```
183
184
```java
185
// Programmatic Print sink creation
186
TableDescriptor printDescriptor = TableDescriptor.forConnector("print")
187
.schema(Schema.newBuilder()
188
.column("id", DataTypes.BIGINT())
189
.column("name", DataTypes.STRING())
190
.column("amount", DataTypes.DECIMAL(10, 2))
191
.build())
192
.option("print-identifier", "debug-output")
193
.build();
194
195
tableEnv.createTable("print_sink", printDescriptor);
196
197
// Execute query with print output
198
Table result = tableEnv.sqlQuery("SELECT * FROM source WHERE amount > 100");
199
result.executeInsert("print_sink");
200
```
201
202
### BlackHole Connector
203
204
Performance testing connector that discards all input data without performing any I/O operations.
205
206
#### BlackHole Table Sink Factory
207
208
Factory for creating BlackHole table sinks for performance benchmarking.
209
210
```java { .api }
211
/**
212
* Factory for creating BlackHole table sinks
213
* Connector identifier: "blackhole"
214
*/
215
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
216
217
/**
218
* @return "blackhole" - the connector identifier for SQL DDL
219
*/
220
public String factoryIdentifier();
221
222
/**
223
* Creates a BlackHole table sink that discards all data
224
* @param context Factory context with configuration and schema
225
* @return DynamicTableSink instance that discards input
226
*/
227
public DynamicTableSink createDynamicTableSink(Context context);
228
229
/**
230
* @return Empty set - BlackHole connector has no required options
231
*/
232
public Set<ConfigOption<?>> requiredOptions();
233
234
/**
235
* @return Empty set - BlackHole connector has no optional options
236
*/
237
public Set<ConfigOption<?>> optionalOptions();
238
}
239
```
240
241
**Usage Examples:**
242
243
```sql
244
-- SQL DDL for BlackHole table
245
CREATE TABLE blackhole_sink (
246
id BIGINT,
247
name STRING,
248
amount DECIMAL(10,2),
249
event_time TIMESTAMP_LTZ(3)
250
) WITH (
251
'connector' = 'blackhole'
252
);
253
254
-- Performance test query
255
INSERT INTO blackhole_sink
256
SELECT id, name, amount, event_time
257
FROM high_volume_source;
258
```
259
260
```java
261
// Programmatic BlackHole sink creation
262
TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")
263
.schema(Schema.newBuilder()
264
.column("id", DataTypes.BIGINT())
265
.column("name", DataTypes.STRING())
266
.column("amount", DataTypes.DECIMAL(10, 2))
267
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
268
.build())
269
.build();
270
271
tableEnv.createTable("blackhole_sink", blackholeDescriptor);
272
273
// Benchmark query performance
274
long startTime = System.currentTimeMillis();
275
Table result = tableEnv.sqlQuery("SELECT * FROM large_source WHERE complex_condition");
276
result.executeInsert("blackhole_sink").await();
277
long endTime = System.currentTimeMillis();
278
System.out.println("Query execution time: " + (endTime - startTime) + "ms");
279
```
280
281
## Type Definitions
282
283
### Factory Base Interfaces
284
285
```java { .api }
286
import org.apache.flink.table.factories.DynamicTableSourceFactory;
287
import org.apache.flink.table.factories.DynamicTableSinkFactory;
288
import org.apache.flink.table.factories.FactoryUtil;
289
290
// Factory interface hierarchy
291
interface DynamicTableSourceFactory extends Factory {
292
String factoryIdentifier();
293
DynamicTableSource createDynamicTableSource(Context context);
294
Set<ConfigOption<?>> requiredOptions();
295
Set<ConfigOption<?>> optionalOptions();
296
}
297
298
interface DynamicTableSinkFactory extends Factory {
299
String factoryIdentifier();
300
DynamicTableSink createDynamicTableSink(Context context);
301
Set<ConfigOption<?>> requiredOptions();
302
Set<ConfigOption<?>> optionalOptions();
303
}
304
```
305
306
### Configuration Options
307
308
```java { .api }
309
import org.apache.flink.configuration.ConfigOption;
310
import org.apache.flink.configuration.ConfigOptions;
311
312
// Configuration option creation patterns
313
ConfigOption<String> stringOption = ConfigOptions
314
.key("option.name")
315
.stringType()
316
.defaultValue("default")
317
.withDescription("Option description");
318
319
ConfigOption<Integer> intOption = ConfigOptions
320
.key("option.number")
321
.intType()
322
.noDefaultValue()
323
.withDescription("Numeric option");
324
325
ConfigOption<Boolean> boolOption = ConfigOptions
326
.key("option.flag")
327
.booleanType()
328
.defaultValue(false)
329
.withDescription("Boolean flag");
330
```
331
332
### Data Generation Types
333
334
```java { .api }
335
// DataGen field generation strategies
336
enum GenerationKind {
337
SEQUENCE, // Sequential values (numeric types)
338
RANDOM // Random values within bounds
339
}
340
341
// DataGen type-specific options
342
class NumericGenerationOptions {
343
Integer min; // Minimum value
344
Integer max; // Maximum value
345
GenerationKind kind; // SEQUENCE or RANDOM
346
}
347
348
class StringGenerationOptions {
349
Integer length; // Fixed or maximum length
350
Boolean variableLength; // Enable variable length strings
351
}
352
353
class TimestampGenerationOptions {
354
Integer maxPast; // Maximum milliseconds in past
355
}
356
```
357
358
## Use Cases
359
360
### Development and Testing
361
362
```java
363
// Complete test data pipeline
364
// 1. Generate test data
365
CREATE TABLE test_orders (
366
order_id BIGINT,
367
customer_id INT,
368
product_name STRING,
369
quantity INT,
370
price DECIMAL(10,2),
371
order_time TIMESTAMP_LTZ(3)
372
) WITH (
373
'connector' = 'datagen',
374
'rows-per-second' = '100',
375
'number-of-rows' = '10000',
376
'fields.order_id.kind' = 'sequence',
377
'fields.customer_id.min' = '1000',
378
'fields.customer_id.max' = '9999',
379
'fields.product_name.length' = '20',
380
'fields.quantity.min' = '1',
381
'fields.quantity.max' = '10',
382
'fields.price.min' = '10.00',
383
'fields.price.max' = '999.99'
384
);
385
386
// 2. Process and debug
387
CREATE TABLE debug_output (
388
customer_id INT,
389
total_amount DECIMAL(10,2),
390
order_count BIGINT
391
) WITH (
392
'connector' = 'print',
393
'print-identifier' = 'customer-summary'
394
);
395
396
INSERT INTO debug_output
397
SELECT
398
customer_id,
399
SUM(price * quantity) as total_amount,
400
COUNT(*) as order_count
401
FROM test_orders
402
GROUP BY customer_id;
403
```
404
405
### Performance Benchmarking
406
407
```java
408
// Benchmark query processing performance
409
CREATE TABLE perf_source (
410
id BIGINT,
411
payload STRING,
412
timestamp_val TIMESTAMP_LTZ(3)
413
) WITH (
414
'connector' = 'datagen',
415
'rows-per-second' = '10000',
416
'fields.payload.length' = '1000'
417
);
418
419
CREATE TABLE perf_sink (
420
id BIGINT,
421
processed_payload STRING,
422
processing_time TIMESTAMP_LTZ(3)
423
) WITH (
424
'connector' = 'blackhole'
425
);
426
427
-- Benchmark complex processing
428
INSERT INTO perf_sink
429
SELECT
430
id,
431
UPPER(SUBSTRING(payload, 1, 100)) as processed_payload,
432
CURRENT_TIMESTAMP as processing_time
433
FROM perf_source
434
WHERE LENGTH(payload) > 500;
435
```