0
# Built-in Connectors
1
2
Ready-to-use connectors for development, testing, and debugging table applications. These connectors are included in the Java Bridge module and provide essential functionality for testing data pipelines and debugging table operations.
3
4
## Capabilities
5
6
### BlackHole Connector
7
8
High-performance sink connector that discards all input records. Designed for performance testing and scenarios where output is not needed.
9
10
```sql { .api }
11
-- Create BlackHole sink table
12
CREATE TABLE sink_table (
13
user_id STRING,
14
order_count BIGINT,
15
total_amount DECIMAL(10, 2)
16
) WITH (
17
'connector' = 'blackhole'
18
);
19
20
-- Insert data (will be discarded)
21
INSERT INTO sink_table
22
SELECT user_id, COUNT(*), SUM(amount)
23
FROM orders
24
GROUP BY user_id;
25
```
26
27
**Java Factory Usage:**
28
29
```java { .api }
30
/**
31
* BlackHole table sink factory for discarding all input records
32
* Identifier: "blackhole"
33
*/
34
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
35
public static final String IDENTIFIER = "blackhole";
36
37
public String factoryIdentifier();
38
public Set<ConfigOption<?>> requiredOptions(); // Empty set
39
public Set<ConfigOption<?>> optionalOptions(); // Empty set
40
public DynamicTableSink createDynamicTableSink(Context context);
41
}
42
```
43
44
**Usage Examples:**
45
46
```java
47
// BlackHole sink supports all changelog modes and partitioning
48
// Automatically filters out UPDATE_BEFORE events for efficiency
49
50
// Performance testing setup
51
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
52
53
// Create source with high data volume
54
tableEnv.executeSql(
55
"CREATE TABLE source_table (" +
56
" id BIGINT," +
57
" data STRING," +
58
" ts TIMESTAMP(3)" +
59
") WITH (" +
60
" 'connector' = 'datagen'," +
61
" 'rows-per-second' = '1000000'" +
62
")"
63
);
64
65
// Create BlackHole sink for performance testing
66
tableEnv.executeSql(
67
"CREATE TABLE perf_sink (" +
68
" id BIGINT," +
69
" processed_data STRING" +
70
") WITH (" +
71
" 'connector' = 'blackhole'" +
72
")"
73
);
74
75
// Test query performance (output discarded)
76
tableEnv.executeSql(
77
"INSERT INTO perf_sink " +
78
"SELECT id, UPPER(data) FROM source_table"
79
);
80
```
81
82
### DataGen Connector
83
84
Flexible data generation connector for creating test data with configurable patterns and data types.
85
86
```java { .api }
87
/**
88
* Configuration options for DataGen connector
89
*/
90
public class DataGenConnectorOptions {
91
92
/** Control emission rate */
93
public static final ConfigOption<Long> ROWS_PER_SECOND; // Default: 10000
94
95
/** Total rows to emit (unbounded if not set) */
96
public static final ConfigOption<Long> NUMBER_OF_ROWS;
97
98
/** Source parallelism */
99
public static final ConfigOption<Integer> SOURCE_PARALLELISM;
100
101
// Field-specific configuration options
102
public static final ConfigOption<String> FIELD_KIND; // 'random' or 'sequence'
103
public static final ConfigOption<String> FIELD_MIN; // Minimum value for random
104
public static final ConfigOption<String> FIELD_MAX; // Maximum value for random
105
public static final ConfigOption<Duration> FIELD_MAX_PAST; // Max past for timestamps
106
public static final ConfigOption<Integer> FIELD_LENGTH; // Collection size/string length
107
public static final ConfigOption<String> FIELD_START; // Sequence start value
108
public static final ConfigOption<String> FIELD_END; // Sequence end value
109
public static final ConfigOption<Float> FIELD_NULL_RATE; // Proportion of nulls
110
public static final ConfigOption<Boolean> FIELD_VAR_LEN; // Variable length data
111
}
112
```
113
114
**SQL Usage Examples:**
115
116
```sql { .api }
117
-- Basic DataGen source
118
CREATE TABLE users_source (
119
user_id BIGINT,
120
username STRING,
121
age INT,
122
created_at TIMESTAMP(3)
123
) WITH (
124
'connector' = 'datagen',
125
'rows-per-second' = '100',
126
'number-of-rows' = '10000'
127
);
128
129
-- Advanced DataGen with field-specific configuration
130
CREATE TABLE orders_source (
131
order_id BIGINT,
132
user_id STRING,
133
product_name STRING,
134
quantity INT,
135
price DECIMAL(10, 2),
136
order_time TIMESTAMP(3)
137
) WITH (
138
'connector' = 'datagen',
139
'rows-per-second' = '50',
140
141
-- Sequence generator for order_id
142
'fields.order_id.kind' = 'sequence',
143
'fields.order_id.start' = '1',
144
'fields.order_id.end' = '1000000',
145
146
-- Random string for user_id
147
'fields.user_id.kind' = 'random',
148
'fields.user_id.length' = '8',
149
150
-- Random product names with variable length
151
'fields.product_name.kind' = 'random',
152
'fields.product_name.length' = '20',
153
'fields.product_name.var-len' = 'true',
154
155
-- Random quantity with bounds
156
'fields.quantity.kind' = 'random',
157
'fields.quantity.min' = '1',
158
'fields.quantity.max' = '10',
159
160
-- Random price with bounds
161
'fields.price.kind' = 'random',
162
'fields.price.min' = '10.00',
163
'fields.price.max' = '999.99',
164
165
-- Random timestamps within past 30 days
166
'fields.order_time.kind' = 'random',
167
'fields.order_time.max-past' = '30d'
168
);
169
170
-- DataGen with null values
171
CREATE TABLE sparse_data (
172
id BIGINT,
173
optional_field STRING,
174
another_field INT
175
) WITH (
176
'connector' = 'datagen',
177
'rows-per-second' = '10',
178
179
'fields.optional_field.null-rate' = '0.3', -- 30% null values
180
'fields.another_field.null-rate' = '0.1' -- 10% null values
181
);
182
```
183
184
**Java Factory Usage:**
185
186
```java { .api }
187
/**
188
* DataGen table source factory for generating test data
189
*/
190
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
191
192
public String factoryIdentifier(); // Returns "datagen"
193
public Set<ConfigOption<?>> requiredOptions(); // Empty - all options optional
194
public Set<ConfigOption<?>> optionalOptions(); // All DataGenConnectorOptions
195
public DynamicTableSource createDynamicTableSource(Context context);
196
}
197
198
/**
199
* DataGen table source implementation
200
*/
201
public class DataGenTableSource implements ScanTableSource, SupportsLimitPushDown {
202
// Supports limit push-down for bounded data generation
203
}
204
```
205
206
### Print Connector
207
208
Debug connector that outputs table data to console with configurable formatting.
209
210
```java { .api }
211
/**
212
* Configuration options for Print connector
213
*/
214
public class PrintConnectorOptions {
215
// Standard print connector options (inherited from base framework)
216
// Supports standard print formatting and output configuration
217
}
218
```
219
220
**SQL Usage Examples:**
221
222
```sql { .api }
223
-- Basic print sink
224
CREATE TABLE debug_output (
225
user_id STRING,
226
username STRING,
227
score INT
228
) WITH (
229
'connector' = 'print'
230
);
231
232
-- Print with custom prefix
233
CREATE TABLE debug_detailed (
234
order_id BIGINT,
235
status STRING,
236
timestamp TIMESTAMP(3)
237
) WITH (
238
'connector' = 'print',
239
'print-identifier' = 'ORDER_DEBUG'
240
);
241
242
-- Insert data to see output in console
243
INSERT INTO debug_output
244
SELECT user_id, username, score
245
FROM user_scores
246
WHERE score > 100;
247
```
248
249
**Java Factory Usage:**
250
251
```java { .api }
252
/**
253
* Print table sink factory for console output debugging
254
*/
255
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
256
257
public String factoryIdentifier(); // Returns "print"
258
public Set<ConfigOption<?>> requiredOptions(); // Empty
259
public Set<ConfigOption<?>> optionalOptions(); // Print-specific options
260
public DynamicTableSink createDynamicTableSink(Context context);
261
}
262
```
263
264
**Usage Examples:**
265
266
```java
267
// Debug complex transformations
268
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
269
270
// Create source
271
tableEnv.executeSql(
272
"CREATE TABLE transactions (" +
273
" txn_id STRING," +
274
" amount DECIMAL(10, 2)," +
275
" txn_time TIMESTAMP(3)" +
276
") WITH (" +
277
" 'connector' = 'datagen'," +
278
" 'fields.amount.min' = '1.00'," +
279
" 'fields.amount.max' = '1000.00'" +
280
")"
281
);
282
283
// Create debug sink
284
tableEnv.executeSql(
285
"CREATE TABLE debug_aggregates (" +
286
" window_start TIMESTAMP(3)," +
287
" window_end TIMESTAMP(3)," +
288
" total_amount DECIMAL(12, 2)," +
289
" txn_count BIGINT" +
290
") WITH (" +
291
" 'connector' = 'print'," +
292
" 'print-identifier' = 'WINDOW_AGGREGATES'" +
293
")"
294
);
295
296
// Debug windowed aggregation
297
tableEnv.executeSql(
298
"INSERT INTO debug_aggregates " +
299
"SELECT " +
300
" window_start, " +
301
" window_end, " +
302
" SUM(amount) as total_amount, " +
303
" COUNT(*) as txn_count " +
304
"FROM TABLE(" +
305
" TUMBLE(TABLE transactions, DESCRIPTOR(txn_time), INTERVAL '1' MINUTE)" +
306
") " +
307
"GROUP BY window_start, window_end"
308
);
309
```
310
311
## Connector Combinations
312
313
### Testing Pipeline Pattern
314
315
Combine connectors for comprehensive testing workflows.
316
317
```sql
318
-- Complete testing pipeline
319
-- 1. Generate test data
320
CREATE TABLE test_orders (
321
order_id BIGINT,
322
customer_id STRING,
323
amount DECIMAL(10, 2),
324
order_time TIMESTAMP(3),
325
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
326
) WITH (
327
'connector' = 'datagen',
328
'rows-per-second' = '100',
329
'fields.order_id.kind' = 'sequence',
330
'fields.order_id.start' = '1',
331
'fields.customer_id.kind' = 'random',
332
'fields.customer_id.length' = '6',
333
'fields.amount.kind' = 'random',
334
'fields.amount.min' = '10.00',
335
'fields.amount.max' = '500.00'
336
);
337
338
-- 2. Debug intermediate results
339
CREATE TABLE debug_customer_stats (
340
customer_id STRING,
341
order_count BIGINT,
342
total_amount DECIMAL(12, 2),
343
avg_amount DECIMAL(10, 2)
344
) WITH (
345
'connector' = 'print',
346
'print-identifier' = 'CUSTOMER_STATS'
347
);
348
349
-- 3. Performance test final sink
350
CREATE TABLE perf_sink (
351
customer_id STRING,
352
order_count BIGINT,
353
total_amount DECIMAL(12, 2)
354
) WITH (
355
'connector' = 'blackhole'
356
);
357
358
-- Execute testing workflow
359
INSERT INTO debug_customer_stats
360
SELECT
361
customer_id,
362
COUNT(*) as order_count,
363
SUM(amount) as total_amount,
364
AVG(amount) as avg_amount
365
FROM test_orders
366
GROUP BY customer_id;
367
368
INSERT INTO perf_sink
369
SELECT customer_id, order_count, total_amount
370
FROM debug_customer_stats;
371
```
372
373
## Data Generation Patterns
374
375
### Realistic Test Data
376
377
Configure DataGen for realistic business scenarios.
378
379
```sql
380
-- E-commerce user behavior simulation
381
CREATE TABLE user_events (
382
user_id STRING,
383
event_type STRING,
384
product_id STRING,
385
session_id STRING,
386
event_time TIMESTAMP(3),
387
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
388
) WITH (
389
'connector' = 'datagen',
390
'rows-per-second' = '1000',
391
392
-- Realistic user IDs
393
'fields.user_id.kind' = 'random',
394
'fields.user_id.length' = '10',
395
396
-- Event types with realistic distribution (would need custom generator)
397
'fields.event_type.kind' = 'random',
398
'fields.event_type.length' = '15',
399
400
-- Product catalog simulation
401
'fields.product_id.kind' = 'sequence',
402
'fields.product_id.start' = '100',
403
'fields.product_id.end' = '999',
404
405
-- Session tracking
406
'fields.session_id.kind' = 'random',
407
'fields.session_id.length' = '32'
408
);
409
```
410
411
## Types
412
413
### Connector Configuration Types
414
415
```java { .api }
416
import org.apache.flink.configuration.ConfigOption;
417
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
418
import org.apache.flink.connector.print.table.PrintConnectorOptions;
419
import java.time.Duration;
420
```
421
422
### Factory Implementation Types
423
424
```java { .api }
425
import org.apache.flink.table.factories.DynamicTableSourceFactory;
426
import org.apache.flink.table.factories.DynamicTableSinkFactory;
427
import org.apache.flink.table.connector.source.ScanTableSource;
428
import org.apache.flink.table.connector.sink.DynamicTableSink;
429
```
430
431
### Built-in Connector Identifiers
432
433
```java { .api }
434
// Connector identifiers for SQL DDL
435
public static final String BLACKHOLE_IDENTIFIER = "blackhole";
436
public static final String DATAGEN_IDENTIFIER = "datagen";
437
public static final String PRINT_IDENTIFIER = "print";
438
```