0
# Built-in Connectors
1
2
The Flink Table API Java Bridge includes several built-in connectors designed for development, testing, and specific use cases. These connectors provide ready-to-use implementations for common scenarios.
3
4
## DataGen Connector
5
6
The DataGen connector generates random data for testing and development purposes. It supports various data types and generation strategies.
7
8
### DataGenTableSourceFactory
9
10
```java { .api }
11
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
12
public static final String IDENTIFIER = "datagen";
13
14
// Factory methods for creating DataGen sources
15
}
16
```
17
18
### Configuration Options
19
20
The DataGen connector supports the following configuration options through `DataGenConnectorOptions`:
21
22
```java { .api }
23
public class DataGenConnectorOptions {
24
// Row generation options
25
public static final ConfigOption<Long> ROWS_PER_SECOND;
26
public static final ConfigOption<Long> NUMBER_OF_ROWS;
27
28
// Field-specific options
29
public static final ConfigOption<String> FIELDS_PREFIX;
30
public static final ConfigOption<String> KIND_OPTION;
31
public static final ConfigOption<String> START_OPTION;
32
public static final ConfigOption<String> END_OPTION;
33
public static final ConfigOption<String> MAX_PAST_OPTION;
34
public static final ConfigOption<String> LENGTH_OPTION;
35
}
36
```
37
38
### Usage Examples
39
40
**Basic Random Data Generation:**
41
42
```sql
43
CREATE TABLE random_source (
44
id BIGINT,
45
name STRING,
46
age INT,
47
score DOUBLE,
48
birthday TIMESTAMP(3)
49
) WITH (
50
'connector' = 'datagen',
51
'rows-per-second' = '100',
52
'number-of-rows' = '1000'
53
);
54
```
55
56
**Field-Specific Configuration:**
57
58
```sql
59
CREATE TABLE configured_source (
60
user_id BIGINT,
61
username STRING,
62
age INT,
63
balance DECIMAL(10,2),
64
registration_time TIMESTAMP(3)
65
) WITH (
66
'connector' = 'datagen',
67
'rows-per-second' = '50',
68
69
-- Configure user_id as sequence
70
'fields.user_id.kind' = 'sequence',
71
'fields.user_id.start' = '1',
72
'fields.user_id.end' = '1000',
73
74
-- Configure username with specific length
75
'fields.username.length' = '10',
76
77
-- Configure age with range
78
'fields.age.min' = '18',
79
'fields.age.max' = '65',
80
81
-- Configure balance with range
82
'fields.balance.min' = '0.00',
83
'fields.balance.max' = '10000.00'
84
);
85
```
86
87
**Programmatic Creation:**
88
89
```java
90
// Create DataGen source programmatically
91
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
92
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
93
.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 5000L)
94
.schema(Schema.newBuilder()
95
.column("id", DataTypes.BIGINT())
96
.column("name", DataTypes.STRING())
97
.column("value", DataTypes.DOUBLE())
98
.build())
99
.build();
100
101
tableEnv.createTable("generated_data", sourceDescriptor);
102
```
103
104
### Data Generation Strategies
105
106
The DataGen connector supports different generation strategies:
107
108
1. **Random Generation**: Default behavior for most data types
109
2. **Sequence Generation**: Sequential numeric values
110
3. **Custom Ranges**: Min/max values for numeric types
111
4. **String Length**: Configurable string lengths
112
113
## Print Connector
114
115
The Print connector outputs table data to standard output, useful for debugging and development.
116
117
### PrintTableSinkFactory
118
119
```java { .api }
120
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
121
public static final String IDENTIFIER = "print";
122
123
// Factory methods for creating Print sinks
124
}
125
```
126
127
### Usage Examples
128
129
**Basic Print Sink:**
130
131
```sql
132
CREATE TABLE print_sink (
133
id BIGINT,
134
name STRING,
135
value DOUBLE
136
) WITH (
137
'connector' = 'print'
138
);
139
140
-- Insert data to see output
141
INSERT INTO print_sink SELECT id, name, value FROM source_table;
142
```
143
144
**Print with Identifier:**
145
146
```sql
147
CREATE TABLE debug_print (
148
user_id BIGINT,
149
event_type STRING,
150
timestamp_col TIMESTAMP(3)
151
) WITH (
152
'connector' = 'print',
153
'print-identifier' = 'DEBUG'
154
);
155
```
156
157
**Programmatic Creation:**
158
159
```java
160
// Create Print sink programmatically
161
TableDescriptor printDescriptor = TableDescriptor.forConnector("print")
162
.option("print-identifier", "MyOutput")
163
.schema(Schema.newBuilder()
164
.column("id", DataTypes.BIGINT())
165
.column("message", DataTypes.STRING())
166
.column("timestamp_col", DataTypes.TIMESTAMP(3))
167
.build())
168
.build();
169
170
tableEnv.createTable("debug_output", printDescriptor);
171
172
// Use in statement set
173
StreamStatementSet statementSet = tableEnv.createStatementSet();
174
statementSet.addInsert("debug_output", sourceTable);
175
statementSet.attachAsDataStream();
176
```
177
178
### Output Format
179
180
The Print connector outputs data in a readable format:
181
182
```
183
DEBUG> +I[1001, Alice, 2023-12-01T10:30:00]
184
DEBUG> +I[1002, Bob, 2023-12-01T10:31:00]
185
DEBUG> -U[1001, Alice, 2023-12-01T10:30:00]
186
DEBUG> +U[1001, Alice Updated, 2023-12-01T10:30:00]
187
```
188
189
Where:
190
- `+I`: Insert operation
191
- `-U`: Update before (retract)
192
- `+U`: Update after
193
- `-D`: Delete operation
194
195
## BlackHole Connector
196
197
The BlackHole connector discards all data, useful for performance testing and benchmarking.
198
199
### BlackHoleTableSinkFactory
200
201
```java { .api }
202
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
203
public static final String IDENTIFIER = "blackhole";
204
205
// Factory methods for creating BlackHole sinks
206
}
207
```
208
209
### Usage Examples
210
211
**Basic BlackHole Sink:**
212
213
```sql
214
CREATE TABLE blackhole_sink (
215
id BIGINT,
216
data STRING,
217
timestamp_col TIMESTAMP(3)
218
) WITH (
219
'connector' = 'blackhole'
220
);
221
222
-- All data inserted here will be discarded
223
INSERT INTO blackhole_sink SELECT * FROM high_volume_source;
224
```
225
226
**Performance Testing:**
227
228
```java
229
// Create BlackHole sink for performance testing
230
TableDescriptor blackholeDescriptor = TableDescriptor.forConnector("blackhole")
231
.schema(Schema.newBuilder()
232
.column("id", DataTypes.BIGINT())
233
.column("payload", DataTypes.STRING())
234
.column("processing_time", DataTypes.TIMESTAMP(3))
235
.build())
236
.build();
237
238
tableEnv.createTable("perf_test_sink", blackholeDescriptor);
239
240
// Test query performance
241
Table testQuery = tableEnv.sqlQuery("""
242
SELECT
243
id,
244
UPPER(payload) as payload,
245
CURRENT_TIMESTAMP as processing_time
246
FROM source_table
247
WHERE id % 1000 = 0
248
""");
249
250
StreamStatementSet statementSet = tableEnv.createStatementSet();
251
statementSet.addInsert("perf_test_sink", testQuery);
252
statementSet.attachAsDataStream();
253
```
254
255
## Legacy CSV Connector (Testing Only)
256
257
The CSV connector is maintained only for testing the legacy connector stack and should not be used in production.
258
259
### CsvTableSource
260
261
```java { .api }
262
@Deprecated
263
public class CsvTableSource extends InputFormatTableSource<Row> {
264
// Constructor and configuration methods
265
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
266
public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes,
267
String fieldDelim, String rowDelim, Character quoteCharacter,
268
boolean ignoreFirstLine, String ignoreComments, boolean lenient);
269
}
270
```
271
272
### CsvTableSink
273
274
```java { .api }
275
@Deprecated
276
public class CsvTableSink extends OutputFormatTableSink<Row> {
277
// Constructor and configuration methods
278
public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
279
}
280
```
281
282
### Legacy Usage (Deprecated)
283
284
```java
285
// Legacy CSV source (deprecated - use modern file connectors instead)
286
CsvTableSource csvSource = CsvTableSource.builder()
287
.path("/path/to/input.csv")
288
.field("id", Types.LONG)
289
.field("name", Types.STRING)
290
.field("age", Types.INT)
291
.fieldDelimiter(",")
292
.ignoreFirstLine()
293
.build();
294
295
tableEnv.registerTableSource("csv_input", csvSource);
296
297
// Legacy CSV sink (deprecated)
298
CsvTableSink csvSink = new CsvTableSink(
299
"/path/to/output.csv",
300
",", // field delimiter
301
1, // num files
302
WriteMode.OVERWRITE
303
);
304
305
tableEnv.registerTableSink("csv_output", csvSink);
306
```
307
308
## Common Patterns and Best Practices
309
310
### Testing Data Pipeline
311
312
```java
313
// Create test data with DataGen
314
TableDescriptor testDataDesc = TableDescriptor.forConnector("datagen")
315
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 1000L)
316
.option(DataGenConnectorOptions.NUMBER_OF_ROWS, 10000L)
317
.schema(Schema.newBuilder()
318
.column("transaction_id", DataTypes.BIGINT())
319
.column("user_id", DataTypes.STRING())
320
.column("amount", DataTypes.DECIMAL(10, 2))
321
.column("transaction_time", DataTypes.TIMESTAMP(3))
322
.build())
323
.build();
324
325
tableEnv.createTable("test_transactions", testDataDesc);
326
327
// Process the data
328
Table processedData = tableEnv.sqlQuery("""
329
SELECT
330
user_id,
331
COUNT(*) as transaction_count,
332
SUM(amount) as total_amount,
333
TUMBLE_START(transaction_time, INTERVAL '1' MINUTE) as window_start
334
FROM test_transactions
335
GROUP BY
336
user_id,
337
TUMBLE(transaction_time, INTERVAL '1' MINUTE)
338
""");
339
340
// Output for debugging
341
TableDescriptor printDesc = TableDescriptor.forConnector("print")
342
.option("print-identifier", "PROCESSED")
343
.schema(processedData.getResolvedSchema())
344
.build();
345
346
tableEnv.createTable("processed_output", printDesc);
347
348
// Execute
349
StreamStatementSet statements = tableEnv.createStatementSet();
350
statements.addInsert("processed_output", processedData);
351
statements.attachAsDataStream();
352
```
353
354
### Performance Benchmarking
355
356
```java
357
// Generate high-volume test data
358
TableDescriptor highVolumeSource = TableDescriptor.forConnector("datagen")
359
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 10000L)
360
.schema(Schema.newBuilder()
361
.column("id", DataTypes.BIGINT())
362
.column("data", DataTypes.STRING())
363
.column("timestamp_col", DataTypes.TIMESTAMP(3))
364
.build())
365
.build();
366
367
tableEnv.createTable("benchmark_source", highVolumeSource);
368
369
// Complex processing query
370
Table benchmarkQuery = tableEnv.sqlQuery("""
371
SELECT
372
id,
373
UPPER(data) as processed_data,
374
COUNT(*) OVER (
375
PARTITION BY id % 100
376
ORDER BY timestamp_col
377
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
378
) as windowed_count
379
FROM benchmark_source
380
""");
381
382
// Discard results for pure processing benchmark
383
TableDescriptor blackholeDesc = TableDescriptor.forConnector("blackhole")
384
.schema(benchmarkQuery.getResolvedSchema())
385
.build();
386
387
tableEnv.createTable("benchmark_sink", blackholeDesc);
388
389
// Measure processing throughput
390
StreamStatementSet benchmark = tableEnv.createStatementSet();
391
benchmark.addInsert("benchmark_sink", benchmarkQuery);
392
benchmark.attachAsDataStream();
393
```
394
395
### Development Debugging
396
397
```java
398
// Debug intermediate results in complex pipelines
399
public void debugPipeline() {
400
// Step 1: Raw data
401
Table rawData = tableEnv.sqlQuery("SELECT * FROM source_table LIMIT 100");
402
createPrintSink("debug_raw", rawData);
403
404
// Step 2: After filtering
405
Table filtered = tableEnv.sqlQuery("""
406
SELECT * FROM source_table
407
WHERE status = 'ACTIVE' AND amount > 100
408
LIMIT 100
409
""");
410
createPrintSink("debug_filtered", filtered);
411
412
// Step 3: After aggregation
413
Table aggregated = tableEnv.sqlQuery("""
414
SELECT
415
user_id,
416
COUNT(*) as count,
417
SUM(amount) as total
418
FROM source_table
419
WHERE status = 'ACTIVE'
420
GROUP BY user_id
421
LIMIT 50
422
""");
423
createPrintSink("debug_aggregated", aggregated);
424
}
425
426
private void createPrintSink(String name, Table table) {
427
TableDescriptor desc = TableDescriptor.forConnector("print")
428
.option("print-identifier", name.toUpperCase())
429
.schema(table.getResolvedSchema())
430
.build();
431
432
tableEnv.createTable(name + "_sink", desc);
433
434
StreamStatementSet statements = tableEnv.createStatementSet();
435
statements.addInsert(name + "_sink", table);
436
statements.attachAsDataStream();
437
}
438
```