0
# Table API Integration
1
2
SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks, enabling integration with Flink's unified batch and stream processing APIs.
3
4
## Capabilities
5
6
### KinesisDynamicTableFactory
7
8
Factory class for creating Kinesis table sources and sinks that integrate with Flink's Table API ecosystem.
9
10
```java { .api }
11
@Internal
12
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
13
14
public static final String IDENTIFIER = "kinesis";
15
16
/**
17
* Create a dynamic table source for reading from Kinesis.
18
*
19
* @param context Factory context with table schema and options
20
* @return Configured Kinesis table source
21
*/
22
public DynamicTableSource createDynamicTableSource(Context context);
23
24
/**
25
* Create a dynamic table sink for writing to Kinesis.
26
*
27
* @param context Factory context with table schema and options
28
* @return Configured Kinesis table sink
29
*/
30
public DynamicTableSink createDynamicTableSink(Context context);
31
32
/**
33
* Get the factory identifier for table DDL.
34
*
35
* @return Factory identifier string
36
*/
37
public String factoryIdentifier();
38
39
/**
40
* Get required configuration options.
41
*
42
* @return Set of required configuration options
43
*/
44
public Set<ConfigOption<?>> requiredOptions();
45
46
/**
47
* Get optional configuration options.
48
*
49
* @return Set of optional configuration options
50
*/
51
public Set<ConfigOption<?>> optionalOptions();
52
53
/**
54
* Validate Kinesis partitioner configuration.
55
*
56
* @param tableOptions Table configuration options
57
* @param targetTable Catalog table definition
58
*/
59
public static void validateKinesisPartitioner(ReadableConfig tableOptions, CatalogTable targetTable);
60
}
61
```
62
63
### KinesisDynamicSource
64
65
Dynamic table source implementation for reading from Kinesis streams in Table API queries.
66
67
```java { .api }
68
@Internal
69
public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {
70
71
/**
72
* Get the change log mode supported by this source.
73
*
74
* @return Change log mode (INSERT only for Kinesis)
75
*/
76
public ChangelogMode getChangelogMode();
77
78
/**
79
* Create the actual source function for reading data.
80
*
81
* @param context Source function context
82
* @return Configured source function
83
*/
84
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);
85
86
/**
87
* Copy the source with projection applied.
88
*
89
* @param projectedFields Projected field indices
90
* @return New source with projection
91
*/
92
public DynamicTableSource copy();
93
94
/**
95
* Get summary string for debugging.
96
*
97
* @return Summary string
98
*/
99
public String asSummaryString();
100
}
101
```
102
103
### KinesisDynamicSink
104
105
Dynamic table sink implementation for writing to Kinesis streams from Table API queries.
106
107
```java { .api }
108
@Internal
109
public class KinesisDynamicSink implements DynamicTableSink {
110
111
/**
112
* Get the change log mode accepted by this sink.
113
*
114
* @param requestedMode Requested change log mode
115
* @return Accepted change log mode
116
*/
117
public ChangelogMode getChangelogMode(ChangelogMode requestedMode);
118
119
/**
120
* Create the actual sink function for writing data.
121
*
122
* @param context Sink function context
123
* @return Configured sink function
124
*/
125
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
126
127
/**
128
* Copy the sink with updated configuration.
129
*
130
* @return New sink copy
131
*/
132
public DynamicTableSink copy();
133
134
/**
135
* Get summary string for debugging.
136
*
137
* @return Summary string
138
*/
139
public String asSummaryString();
140
}
141
```
142
143
## Usage Examples
144
145
### Creating Kinesis Tables with DDL
146
147
```sql
148
-- Create a Kinesis source table
149
CREATE TABLE kinesis_source (
150
event_id STRING,
151
user_id BIGINT,
152
event_type STRING,
153
timestamp_col TIMESTAMP(3),
154
payload ROW<
155
action STRING,
156
properties MAP<STRING, STRING>
157
>,
158
-- Kinesis metadata columns
159
kinesis_partition_key STRING METADATA FROM 'partition-key',
160
kinesis_sequence_number STRING METADATA FROM 'sequence-number',
161
kinesis_shard_id STRING METADATA FROM 'shard-id',
162
kinesis_stream_name STRING METADATA FROM 'stream-name',
163
kinesis_arrival_timestamp TIMESTAMP(3) METADATA FROM 'arrival-timestamp',
164
-- Watermark for event time processing
165
WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '30' SECOND
166
) WITH (
167
'connector' = 'kinesis',
168
'stream' = 'user-events',
169
'aws.region' = 'us-west-2',
170
'aws.credentials.provider' = 'AUTO',
171
'scan.stream.initpos' = 'LATEST',
172
'format' = 'json'
173
);
174
175
-- Create a Kinesis sink table
176
CREATE TABLE kinesis_sink (
177
processed_event_id STRING,
178
user_id BIGINT,
179
aggregated_count BIGINT,
180
window_start TIMESTAMP(3),
181
window_end TIMESTAMP(3)
182
) WITH (
183
'connector' = 'kinesis',
184
'stream' = 'processed-events',
185
'aws.region' = 'us-west-2',
186
'aws.credentials.provider' = 'AUTO',
187
'format' = 'json',
188
'sink.partitioner' = 'fixed'
189
);
190
```
191
192
### Real-Time Analytics Query
193
194
```sql
195
-- Real-time user event aggregation
196
INSERT INTO kinesis_sink
197
SELECT
198
CONCAT('agg_', event_id) as processed_event_id,
199
user_id,
200
COUNT(*) as aggregated_count,
201
TUMBLE_START(timestamp_col, INTERVAL '5' MINUTE) as window_start,
202
TUMBLE_END(timestamp_col, INTERVAL '5' MINUTE) as window_end
203
FROM kinesis_source
204
WHERE event_type = 'page_view'
205
GROUP BY
206
user_id,
207
event_id,
208
TUMBLE(timestamp_col, INTERVAL '5' MINUTE);
209
```
210
211
### Multi-Stream Processing
212
213
```sql
214
-- Create multiple Kinesis source tables
215
CREATE TABLE orders_stream (
216
order_id STRING,
217
customer_id STRING,
218
product_id STRING,
219
quantity INT,
220
price DECIMAL(10,2),
221
order_time TIMESTAMP(3),
222
WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
223
) WITH (
224
'connector' = 'kinesis',
225
'stream' = 'orders',
226
'aws.region' = 'us-west-2',
227
'format' = 'json'
228
);
229
230
CREATE TABLE inventory_stream (
231
product_id STRING,
232
available_quantity INT,
233
update_time TIMESTAMP(3),
234
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
235
) WITH (
236
'connector' = 'kinesis',
237
'stream' = 'inventory-updates',
238
'aws.region' = 'us-west-2',
239
'format' = 'json'
240
);
241
242
-- Join streams for real-time inventory management
243
CREATE TABLE inventory_alerts (
244
product_id STRING,
245
order_quantity INT,
246
available_quantity INT,
247
alert_message STRING,
248
alert_time TIMESTAMP(3)
249
) WITH (
250
'connector' = 'kinesis',
251
'stream' = 'inventory-alerts',
252
'aws.region' = 'us-west-2',
253
'format' = 'json'
254
);
255
256
INSERT INTO inventory_alerts
257
SELECT
258
o.product_id,
259
SUM(o.quantity) as order_quantity,
260
LAST_VALUE(i.available_quantity) as available_quantity,
261
CASE
262
WHEN LAST_VALUE(i.available_quantity) < SUM(o.quantity)
263
THEN 'LOW_STOCK_ALERT'
264
ELSE 'STOCK_OK'
265
END as alert_message,
266
CURRENT_TIMESTAMP as alert_time
267
FROM orders_stream o
268
LEFT JOIN inventory_stream i
269
ON o.product_id = i.product_id
270
AND i.update_time BETWEEN o.order_time - INTERVAL '1' HOUR AND o.order_time + INTERVAL '5' MINUTE
271
GROUP BY
272
o.product_id,
273
TUMBLE(o.order_time, INTERVAL '1' MINUTE);
274
```
275
276
### DynamoDB Streams Integration
277
278
```sql
279
-- Create table for DynamoDB Streams
280
CREATE TABLE dynamodb_changes (
281
event_name STRING,
282
table_name STRING,
283
partition_key STRING,
284
sort_key STRING,
285
old_image ROW<
286
user_id STRING,
287
username STRING,
288
email STRING
289
>,
290
new_image ROW<
291
user_id STRING,
292
username STRING,
293
email STRING
294
>,
295
approximate_creation_time TIMESTAMP(3),
296
WATERMARK FOR approximate_creation_time AS approximate_creation_time - INTERVAL '1' MINUTE
297
) WITH (
298
'connector' = 'kinesis',
299
'stream' = 'arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000',
300
'aws.region' = 'us-west-2',
301
'format' = 'json'
302
);
303
304
-- Create change log for audit purposes
305
CREATE TABLE user_audit_log (
306
change_id STRING,
307
user_id STRING,
308
change_type STRING,
309
old_values STRING,
310
new_values STRING,
311
change_timestamp TIMESTAMP(3)
312
) WITH (
313
'connector' = 'kinesis',
314
'stream' = 'user-audit-log',
315
'aws.region' = 'us-west-2',
316
'format' = 'json'
317
);
318
319
INSERT INTO user_audit_log
320
SELECT
321
CONCAT(table_name, '_', partition_key, '_', UNIX_TIMESTAMP(approximate_creation_time)) as change_id,
322
partition_key as user_id,
323
event_name as change_type,
324
CASE WHEN old_image IS NOT NULL THEN CAST(old_image AS STRING) ELSE NULL END as old_values,
325
CASE WHEN new_image IS NOT NULL THEN CAST(new_image AS STRING) ELSE NULL END as new_values,
326
approximate_creation_time as change_timestamp
327
FROM dynamodb_changes
328
WHERE event_name IN ('INSERT', 'MODIFY', 'REMOVE');
329
```
330
331
### Complex Event Processing
332
333
```sql
334
-- Create pattern detection table
335
CREATE TABLE user_behavior_events (
336
user_id STRING,
337
event_type STRING,
338
page_url STRING,
339
session_id STRING,
340
event_timestamp TIMESTAMP(3),
341
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
342
) WITH (
343
'connector' = 'kinesis',
344
'stream' = 'user-behavior',
345
'aws.region' = 'us-west-2',
346
'format' = 'json'
347
);
348
349
-- Fraud detection patterns
350
CREATE TABLE fraud_alerts (
351
user_id STRING,
352
alert_type STRING,
353
event_count BIGINT,
354
time_window_start TIMESTAMP(3),
355
time_window_end TIMESTAMP(3),
356
alert_timestamp TIMESTAMP(3)
357
) WITH (
358
'connector' = 'kinesis',
359
'stream' = 'fraud-alerts',
360
'aws.region' = 'us-west-2',
361
'format' = 'json'
362
);
363
364
-- Detect suspicious patterns (too many events in short time)
365
INSERT INTO fraud_alerts
366
SELECT
367
user_id,
368
'HIGH_FREQUENCY_ACTIVITY' as alert_type,
369
COUNT(*) as event_count,
370
TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) as time_window_start,
371
TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) as time_window_end,
372
CURRENT_TIMESTAMP as alert_timestamp
373
FROM user_behavior_events
374
GROUP BY
375
user_id,
376
TUMBLE(event_timestamp, INTERVAL '1' MINUTE)
377
HAVING COUNT(*) > 100; -- More than 100 events per minute
378
```
379
380
### Temporal Table Joins
381
382
```sql
383
-- Create product catalog table (changelog stream)
384
CREATE TABLE product_catalog (
385
product_id STRING,
386
product_name STRING,
387
category STRING,
388
price DECIMAL(10,2),
389
update_time TIMESTAMP(3),
390
WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
391
PRIMARY KEY (product_id) NOT ENFORCED
392
) WITH (
393
'connector' = 'kinesis',
394
'stream' = 'product-catalog-changes',
395
'aws.region' = 'us-west-2',
396
'format' = 'json'
397
);
398
399
-- Create versioned table for temporal joins
400
CREATE TABLE product_catalog_versioned (
401
product_id STRING,
402
product_name STRING,
403
category STRING,
404
price DECIMAL(10,2),
405
update_time TIMESTAMP(3),
406
WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,
407
PRIMARY KEY (product_id) NOT ENFORCED
408
) WITH (
409
'connector' = 'kinesis',
410
'stream' = 'product-catalog-changes',
411
'aws.region' = 'us-west-2',
412
'format' = 'json'
413
);
414
415
-- Join orders with product information as of order time
416
CREATE TABLE enriched_orders (
417
order_id STRING,
418
customer_id STRING,
419
product_id STRING,
420
product_name STRING,
421
category STRING,
422
quantity INT,
423
unit_price DECIMAL(10,2),
424
total_amount DECIMAL(10,2),
425
order_time TIMESTAMP(3)
426
) WITH (
427
'connector' = 'kinesis',
428
'stream' = 'enriched-orders',
429
'aws.region' = 'us-west-2',
430
'format' = 'json'
431
);
432
433
INSERT INTO enriched_orders
434
SELECT
435
o.order_id,
436
o.customer_id,
437
o.product_id,
438
p.product_name,
439
p.category,
440
o.quantity,
441
p.price as unit_price,
442
o.quantity * p.price as total_amount,
443
o.order_time
444
FROM orders_stream o
445
LEFT JOIN product_catalog_versioned FOR SYSTEM_TIME AS OF o.order_time AS p
446
ON o.product_id = p.product_id;
447
```
448
449
## Java Table API Examples
450
451
### Programmatic Table Creation
452
453
```java
454
import org.apache.flink.table.api.EnvironmentSettings;
455
import org.apache.flink.table.api.TableEnvironment;
456
import org.apache.flink.table.api.Schema;
457
import org.apache.flink.table.api.DataTypes;
458
459
// Create Table Environment
460
EnvironmentSettings settings = EnvironmentSettings
461
.newInstance()
462
.inStreamingMode()
463
.build();
464
TableEnvironment tableEnv = TableEnvironment.create(settings);
465
466
// Create Kinesis source table programmatically
467
tableEnv.createTemporaryTable("kinesis_events",
468
TableDescriptor.forConnector("kinesis")
469
.schema(Schema.newBuilder()
470
.column("event_id", DataTypes.STRING())
471
.column("user_id", DataTypes.BIGINT())
472
.column("event_type", DataTypes.STRING())
473
.column("timestamp_col", DataTypes.TIMESTAMP(3))
474
.column("kinesis_partition_key", DataTypes.STRING())
475
.metadata("partition-key")
476
.column("kinesis_sequence_number", DataTypes.STRING())
477
.metadata("sequence-number")
478
.watermark("timestamp_col", "timestamp_col - INTERVAL '30' SECOND")
479
.build())
480
.option("stream", "user-events")
481
.option("aws.region", "us-west-2")
482
.option("aws.credentials.provider", "AUTO")
483
.option("scan.stream.initpos", "LATEST")
484
.option("format", "json")
485
.build());
486
487
// Execute query
488
Table result = tableEnv.sqlQuery(
489
"SELECT user_id, COUNT(*) as event_count " +
490
"FROM kinesis_events " +
491
"WHERE event_type = 'click' " +
492
"GROUP BY user_id"
493
);
494
495
// Write to another Kinesis stream
496
result.executeInsert("kinesis_sink");
497
```
498
499
### Custom Formats and Serialization
500
501
```java
502
// Register custom format
503
tableEnv.executeSql(
504
"CREATE TABLE custom_format_table (" +
505
" data STRING," +
506
" metadata_field STRING METADATA FROM 'partition-key'" +
507
") WITH (" +
508
" 'connector' = 'kinesis'," +
509
" 'stream' = 'custom-format-stream'," +
510
" 'aws.region' = 'us-west-2'," +
511
" 'format' = 'avro'," +
512
" 'avro.schema' = '{" +
513
" \"type\": \"record\"," +
514
" \"name\": \"CustomEvent\"," +
515
" \"fields\": [" +
516
" {\"name\": \"data\", \"type\": \"string\"}" +
517
" ]" +
518
" }'" +
519
")"
520
);
521
```
522
523
## Configuration Options
524
525
### Common Table Options
526
527
```properties
528
# Required options
529
connector = kinesis
530
stream = my-stream-name
531
aws.region = us-west-2
532
533
# Authentication options
534
aws.credentials.provider = AUTO | BASIC | PROFILE | ASSUME_ROLE | ENV_VAR | SYS_PROP
535
aws.access-key-id = your-access-key
536
aws.secret-access-key = your-secret-key
537
538
# Source-specific options
539
scan.stream.initpos = LATEST | TRIM_HORIZON | AT_TIMESTAMP
540
scan.stream.initpos.timestamp = 2023-01-01T00:00:00Z
541
scan.shard.getrecords.maxrecordcount = 10000
542
scan.shard.getrecords.intervalmillis = 200
543
544
# Sink-specific options
545
sink.partitioner = fixed | random | custom
546
sink.partitioner.field-delimiter = |
547
sink.flush-buffer.size = 1000
548
sink.flush-buffer.timeout = 2s
549
550
# Format options
551
format = json | avro | csv | raw
552
```
553
554
### Advanced Configuration
555
556
```sql
557
-- Enhanced Fan-Out configuration
558
CREATE TABLE efo_source (
559
data STRING
560
) WITH (
561
'connector' = 'kinesis',
562
'stream' = 'my-stream',
563
'aws.region' = 'us-west-2',
564
'scan.stream.recordpublisher' = 'EFO',
565
'scan.stream.efo.consumername' = 'my-flink-app',
566
'scan.stream.efo.registration' = 'LAZY'
567
);
568
569
-- Custom partitioning for sink
570
CREATE TABLE partitioned_sink (
571
user_id STRING,
572
data STRING
573
) WITH (
574
'connector' = 'kinesis',
575
'stream' = 'partitioned-output',
576
'aws.region' = 'us-west-2',
577
'format' = 'json',
578
'sink.partitioner' = 'custom',
579
'sink.partitioner.class' = 'com.example.MyCustomPartitioner'
580
);
581
```
582
583
## Best Practices
584
585
1. **Schema Evolution**: Use flexible formats like JSON or Avro for schema evolution
586
2. **Watermarks**: Configure appropriate watermark strategies for event-time processing
587
3. **Metadata**: Leverage Kinesis metadata columns for debugging and monitoring
588
4. **Partitioning**: Choose appropriate partitioning strategies for optimal performance
589
5. **Error Handling**: Implement proper error handling and dead letter queues
590
6. **Resource Management**: Configure appropriate parallelism and resource allocation
591
7. **Monitoring**: Use Flink metrics and Kinesis CloudWatch metrics for monitoring