0
# Table API Integration
1
2
Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference, SQL compatibility, and seamless integration with Flink's unified batch and streaming table processing.
3
4
## Capabilities
5
6
### Append Table Sink
7
8
Implementation of Flink's `AppendStreamTableSink` for writing table data to Cassandra.
9
10
```java { .api }
11
public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
12
public CassandraAppendTableSink(ClusterBuilder builder, String cql);
13
public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties);
14
15
public TypeInformation<Row> getOutputType();
16
public String[] getFieldNames();
17
public TypeInformation<?>[] getFieldTypes();
18
public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
19
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
20
}
21
```
22
23
### Basic Usage
24
25
**Simple Table Sink:**
26
27
```java
28
import org.apache.flink.streaming.connectors.cassandra.CassandraAppendTableSink;
29
import org.apache.flink.table.api.EnvironmentSettings;
30
import org.apache.flink.table.api.TableEnvironment;
31
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
32
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33
34
// Set up table environment
35
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
36
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
37
38
// Create cluster builder
39
ClusterBuilder builder = new ClusterBuilder() {
40
@Override
41
protected Cluster buildCluster(Cluster.Builder builder) {
42
return builder.addContactPoint("127.0.0.1").build();
43
}
44
};
45
46
// Create and register Cassandra table sink
47
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
48
builder,
49
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
50
);
51
52
tableEnv.registerTableSink("cassandra_users", cassandraSink);
53
```
54
55
**With Configuration Properties:**
56
57
```java
58
import java.util.Properties;
59
60
// Additional configuration properties
61
Properties properties = new Properties();
62
properties.setProperty("cassandra.connection.timeout", "10000");
63
properties.setProperty("cassandra.read.timeout", "5000");
64
65
CassandraAppendTableSink configuredSink = new CassandraAppendTableSink(
66
builder,
67
"INSERT INTO example.orders (order_id, customer_id, total, created_at) VALUES (?, ?, ?, ?)",
68
properties
69
);
70
71
tableEnv.registerTableSink("cassandra_orders", configuredSink);
72
```
73
74
### SQL Integration
75
76
Use SQL to write data to Cassandra:
77
78
```java
79
// Create source table (from Kafka, file, etc.)
80
tableEnv.executeSql(
81
"CREATE TABLE source_users (" +
82
" id STRING," +
83
" name STRING," +
84
" age INT," +
85
" email STRING" +
86
") WITH (" +
87
" 'connector' = 'kafka'," +
88
" 'topic' = 'user_events'," +
89
" 'properties.bootstrap.servers' = 'localhost:9092'," +
90
" 'format' = 'json'" +
91
")"
92
);
93
94
// Register Cassandra sink
95
CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(
96
builder,
97
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
98
);
99
tableEnv.registerTableSink("cassandra_users", cassandraSink);
100
101
// SQL query to transform and sink data
102
tableEnv.executeSql(
103
"INSERT INTO cassandra_users " +
104
"SELECT id, UPPER(name) as name, age, email " +
105
"FROM source_users " +
106
"WHERE age >= 18"
107
);
108
```
109
110
### Schema Configuration
111
112
Configure field names and types explicitly:
113
114
```java
115
// Define schema
116
String[] fieldNames = {"user_id", "full_name", "user_age", "email_address"};
117
TypeInformation<?>[] fieldTypes = {
118
Types.STRING,
119
Types.STRING,
120
Types.INT,
121
Types.STRING
122
};
123
124
// Configure sink with schema
125
CassandraAppendTableSink schemaConfiguredSink = new CassandraAppendTableSink(
126
builder,
127
"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"
128
).configure(fieldNames, fieldTypes);
129
130
tableEnv.registerTableSink("typed_cassandra_users", schemaConfiguredSink);
131
```
132
133
### Complex Data Processing
134
135
Leverage Table API for complex transformations before writing to Cassandra:
136
137
```java
138
import org.apache.flink.table.api.Table;
139
140
// Create source table with complex schema
141
tableEnv.executeSql(
142
"CREATE TABLE event_stream (" +
143
" event_id STRING," +
144
" user_id STRING," +
145
" event_type STRING," +
146
" event_time TIMESTAMP(3)," +
147
" properties MAP<STRING, STRING>," +
148
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
149
") WITH (" +
150
" 'connector' = 'kafka'," +
151
" 'topic' = 'events'," +
152
" 'properties.bootstrap.servers' = 'localhost:9092'," +
153
" 'format' = 'json'" +
154
")"
155
);
156
157
// Process data with windowing and aggregation
158
Table processedData = tableEnv.sqlQuery(
159
"SELECT " +
160
" user_id," +
161
" event_type," +
162
" COUNT(*) as event_count," +
163
" MAX(event_time) as last_event_time," +
164
" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start " +
165
"FROM event_stream " +
166
"GROUP BY " +
167
" user_id, " +
168
" event_type, " +
169
" TUMBLE(event_time, INTERVAL '1' HOUR)"
170
);
171
172
// Register Cassandra sink for aggregated data
173
CassandraAppendTableSink aggregationSink = new CassandraAppendTableSink(
174
builder,
175
"INSERT INTO example.user_hourly_stats (user_id, event_type, event_count, last_event_time, window_start) VALUES (?, ?, ?, ?, ?)"
176
);
177
178
String[] aggFieldNames = {"user_id", "event_type", "event_count", "last_event_time", "window_start"};
179
TypeInformation<?>[] aggFieldTypes = {
180
Types.STRING,
181
Types.STRING,
182
Types.LONG,
183
Types.SQL_TIMESTAMP,
184
Types.SQL_TIMESTAMP
185
};
186
187
tableEnv.registerTableSink("user_stats", aggregationSink.configure(aggFieldNames, aggFieldTypes));
188
189
// Insert processed data
190
processedData.executeInsert("user_stats");
191
```
192
193
### Catalog Integration
194
195
Register Cassandra tables in Flink's catalog system:
196
197
```java
198
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
199
import org.apache.flink.table.catalog.ObjectPath;
200
import org.apache.flink.table.catalog.CatalogTable;
201
import org.apache.flink.table.catalog.CatalogTableImpl;
202
import org.apache.flink.table.api.Schema;
203
204
// Create catalog
205
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("cassandra_catalog");
206
tableEnv.registerCatalog("cassandra_catalog", catalog);
207
tableEnv.useCatalog("cassandra_catalog");
208
209
// Define table schema
210
Schema schema = Schema.newBuilder()
211
.column("id", DataTypes.STRING())
212
.column("name", DataTypes.STRING())
213
.column("age", DataTypes.INT())
214
.column("created_at", DataTypes.TIMESTAMP(3))
215
.build();
216
217
// Create table descriptor
218
Map<String, String> properties = new HashMap<>();
219
properties.put("connector", "cassandra");
220
properties.put("cassandra.hosts", "127.0.0.1");
221
properties.put("cassandra.keyspace", "example");
222
properties.put("cassandra.table", "users");
223
224
CatalogTable catalogTable = CatalogTableImpl.of(
225
schema,
226
"Cassandra users table",
227
new ArrayList<>(),
228
properties
229
);
230
231
// Register table in catalog
232
catalog.createTable(
233
new ObjectPath("default", "users"),
234
catalogTable,
235
false
236
);
237
```
238
239
## Advanced Table Patterns
240
241
### Dynamic Tables
242
243
Handle changing schemas with dynamic table concepts:
244
245
```java
246
// Source with evolving schema
247
tableEnv.executeSql(
248
"CREATE TABLE dynamic_events (" +
249
" event_id STRING," +
250
" event_data ROW<" +
251
" user_id STRING," +
252
" action STRING," +
253
" metadata MAP<STRING, STRING>" +
254
" >," +
255
" event_time TIMESTAMP(3)" +
256
") WITH (" +
257
" 'connector' = 'kafka'," +
258
" 'topic' = 'dynamic_events'," +
259
" 'properties.bootstrap.servers' = 'localhost:9092'," +
260
" 'format' = 'json'" +
261
")"
262
);
263
264
// Flatten and transform
265
Table flattenedEvents = tableEnv.sqlQuery(
266
"SELECT " +
267
" event_id," +
268
" event_data.user_id," +
269
" event_data.action," +
270
" event_data.metadata['source'] as source," +
271
" event_time " +
272
"FROM dynamic_events"
273
);
274
275
// Register sink for flattened data
276
CassandraAppendTableSink dynamicSink = new CassandraAppendTableSink(
277
builder,
278
"INSERT INTO example.flattened_events (event_id, user_id, action, source, event_time) VALUES (?, ?, ?, ?, ?)"
279
);
280
281
tableEnv.registerTableSink("flattened_events", dynamicSink);
282
flattenedEvents.executeInsert("flattened_events");
283
```
284
285
### Temporal Tables
286
287
Work with time-based data processing:
288
289
```java
290
// Create temporal table with event time
291
tableEnv.executeSql(
292
"CREATE TABLE temporal_data (" +
293
" key_id STRING," +
294
" value_data STRING," +
295
" process_time AS PROCTIME()," +
296
" event_time TIMESTAMP(3)," +
297
" WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" +
298
") WITH (" +
299
" 'connector' = 'kafka'," +
300
" 'topic' = 'temporal_stream'," +
301
" 'properties.bootstrap.servers' = 'localhost:9092'," +
302
" 'format' = 'json'" +
303
")"
304
);
305
306
// Temporal join example (requires reference table)
307
Table temporalResult = tableEnv.sqlQuery(
308
"SELECT " +
309
" t1.key_id," +
310
" t1.value_data," +
311
" t1.event_time " +
312
"FROM temporal_data t1 " +
313
"WHERE t1.event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
314
);
315
316
// Sink recent data to Cassandra
317
CassandraAppendTableSink temporalSink = new CassandraAppendTableSink(
318
builder,
319
"INSERT INTO example.recent_data (key_id, value_data, event_time) VALUES (?, ?, ?)"
320
);
321
322
tableEnv.registerTableSink("recent_data", temporalSink);
323
temporalResult.executeInsert("recent_data");
324
```
325
326
## Error Handling and Monitoring
327
328
### Sink Error Handling
329
330
Handle errors at the table level:
331
332
```java
333
// Custom sink with error handling
334
public class ErrorHandlingCassandraAppendTableSink extends CassandraAppendTableSink {
335
private final CassandraFailureHandler failureHandler;
336
337
public ErrorHandlingCassandraAppendTableSink(
338
ClusterBuilder builder,
339
String cql,
340
CassandraFailureHandler failureHandler) {
341
super(builder, cql);
342
this.failureHandler = failureHandler;
343
}
344
345
@Override
346
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
347
// Create sink with custom failure handler
348
CassandraRowSink sink = new CassandraRowSink(
349
dataStream.getType().getArity(),
350
getCql(),
351
getBuilder(),
352
CassandraSinkBaseConfig.newBuilder().build(),
353
failureHandler
354
);
355
356
return dataStream.addSink(sink)
357
.setParallelism(dataStream.getParallelism())
358
.name("Cassandra Table Sink");
359
}
360
}
361
```
362
363
### Performance Monitoring
364
365
Monitor table sink performance:
366
367
```java
368
// Enable metrics collection
369
env.getConfig().setLatencyTrackingInterval(1000);
370
371
// Create sink with monitoring
372
CassandraAppendTableSink monitoredSink = new CassandraAppendTableSink(
373
builder,
374
"INSERT INTO example.monitored_data (id, value, timestamp) VALUES (?, ?, ?)"
375
) {
376
@Override
377
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
378
return super.consumeDataStream(dataStream)
379
.name("Monitored Cassandra Sink")
380
.setParallelism(4); // Explicit parallelism for monitoring
381
}
382
};
383
384
tableEnv.registerTableSink("monitored_sink", monitoredSink);
385
```
386
387
## Limitations and Considerations
388
389
### Append-Only Semantics
390
391
The Cassandra table sink only supports append operations:
392
393
```java
394
// Supported: INSERT operations
395
table.executeInsert("cassandra_sink"); // ✓
396
397
// NOT supported: UPDATE/DELETE operations
398
// Table API UPDATE/DELETE queries will fail with CassandraAppendTableSink
399
```
400
401
### Schema Evolution
402
403
Handle schema changes carefully:
404
405
```java
406
// Define flexible schema for evolution
407
String[] flexibleFieldNames = {"id", "data", "metadata", "timestamp"};
408
TypeInformation<?>[] flexibleFieldTypes = {
409
Types.STRING,
410
Types.STRING, // JSON string for flexible data
411
Types.MAP(Types.STRING, Types.STRING), // Key-value metadata
412
Types.SQL_TIMESTAMP
413
};
414
415
CassandraAppendTableSink flexibleSink = new CassandraAppendTableSink(
416
builder,
417
"INSERT INTO example.flexible_data (id, data, metadata, timestamp) VALUES (?, ?, ?, ?)"
418
).configure(flexibleFieldNames, flexibleFieldTypes);
419
```
420
421
### Performance Considerations
422
423
Table API adds processing overhead:
424
425
```java
426
// For high-throughput scenarios, consider direct DataStream API
427
// Table API is better for complex transformations and SQL compatibility
428
429
// Direct DataStream (higher performance)
430
stream.addSink(cassandraDirectSink);
431
432
// Table API (more functionality, slightly lower performance)
433
Table table = tableEnv.fromDataStream(stream);
434
table.executeInsert("cassandra_table_sink");
435
```