0
# Apache Flink Table API Java Bridge
1
2
Apache Flink Table API Java Bridge is a critical component that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs. This bridge provides the integration layer between Flink's high-level Table API and the core Java DataStream API, offering a unified programming model for both stream and batch table operations.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-table-api-java-bridge_2.11
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Version**: 1.14.6
10
- **Installation**: Add to your Maven dependencies or use as provided dependency in Flink applications
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
24
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
25
import org.apache.flink.table.api.Table;
26
import org.apache.flink.table.api.Schema;
27
import org.apache.flink.table.api.EnvironmentSettings;
28
import org.apache.flink.table.api.TableConfig;
29
import org.apache.flink.streaming.api.datastream.DataStream;
30
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31
import org.apache.flink.types.Row;
32
import org.apache.flink.table.connector.ChangelogMode;
33
import org.apache.flink.table.types.AbstractDataType;
34
import org.apache.flink.api.common.typeinfo.TypeInformation;
35
import org.apache.flink.api.java.tuple.Tuple2;
36
```
37
38
## Basic Usage
39
40
```java
41
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
42
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
43
import org.apache.flink.table.api.Table;
44
import org.apache.flink.streaming.api.datastream.DataStream;
45
import org.apache.flink.types.Row;
46
import org.apache.flink.table.api.Schema;
47
48
// Create execution environment
49
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
50
51
// Create table environment
52
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
53
54
// Convert DataStream to Table with automatic schema derivation
55
DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("John", 25), new MyPojo("Jane", 30));
56
Table table = tableEnv.fromDataStream(dataStream);
57
58
// Or with explicit schema
59
Schema schema = Schema.newBuilder()
60
.column("name", "STRING")
61
.column("age", "INT")
62
.build();
63
Table tableWithSchema = tableEnv.fromDataStream(dataStream, schema);
64
65
// Perform table operations
66
Table result = table.select($("name"), $("age").plus(1).as("age_plus_one"));
67
68
// Convert back to DataStream
69
DataStream<Row> resultStream = tableEnv.toDataStream(result);
70
71
// Execute
72
env.execute("Table Bridge Example");
73
```
74
75
## Architecture
76
77
The Flink Table API Java Bridge consists of several key components:
78
79
1. **Core Bridge API**: Main integration interfaces (`StreamTableEnvironment`, `StreamStatementSet`)
80
2. **Modern Connector Framework**: New connector interfaces for sources and sinks
81
3. **Legacy Connector Support**: Backward compatibility with deprecated connector interfaces
82
4. **Built-in Connectors**: DataGen, Print, and BlackHole connectors for testing
83
5. **Watermark Strategies**: Time-based watermarking for event-time processing
84
85
## Capabilities
86
87
### Stream Table Environment
88
89
The main entry point for creating and managing table environments that integrate with DataStream API.
90
91
```java { .api }
92
public interface StreamTableEnvironment extends TableEnvironment {
93
// Factory methods
94
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
95
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
96
@Deprecated
97
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);
98
99
// Function registration (deprecated)
100
@Deprecated
101
<T> void registerFunction(String name, TableFunction<T> tableFunction);
102
@Deprecated
103
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);
104
@Deprecated
105
<T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction);
106
107
// DataStream to Table conversions
108
<T> Table fromDataStream(DataStream<T> dataStream);
109
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
110
@Deprecated
111
<T> Table fromDataStream(DataStream<T> dataStream, String fields);
112
@Deprecated
113
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
114
115
Table fromChangelogStream(DataStream<Row> dataStream);
116
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
117
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
118
119
// Temporary view creation
120
<T> void createTemporaryView(String path, DataStream<T> dataStream);
121
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
122
@Deprecated
123
<T> void createTemporaryView(String path, DataStream<T> dataStream, String fields);
124
@Deprecated
125
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
126
@Deprecated
127
<T> void registerDataStream(String name, DataStream<T> dataStream);
128
@Deprecated
129
<T> void registerDataStream(String name, DataStream<T> dataStream, String fields);
130
131
// Table to DataStream conversions
132
DataStream<Row> toDataStream(Table table);
133
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
134
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
135
@Deprecated
136
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
137
@Deprecated
138
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
139
140
DataStream<Row> toChangelogStream(Table table);
141
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
142
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
143
@Deprecated
144
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
145
@Deprecated
146
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
147
148
// Statement set creation
149
StreamStatementSet createStatementSet();
150
151
// Job execution (deprecated)
152
@Deprecated
153
JobExecutionResult execute(String jobName) throws Exception;
154
}
155
```
156
157
[Stream Table Environment](./stream-table-environment.md)
158
159
### DataStream Conversions
160
161
Convert between DataStream and Table representations for seamless integration.
162
163
```java { .api }
164
// DataStream to Table conversions
165
<T> Table fromDataStream(DataStream<T> dataStream);
166
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
167
Table fromChangelogStream(DataStream<Row> dataStream);
168
169
// Table to DataStream conversions
170
DataStream<Row> toDataStream(Table table);
171
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
172
DataStream<Row> toChangelogStream(Table table);
173
```
174
175
[DataStream Conversions](./datastream-conversions.md)
176
177
### Modern Connector Framework
178
179
New connector interfaces following FLIP-95 design for better integration with DataStream API.
180
181
```java { .api }
182
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
183
DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
184
}
185
186
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
187
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
188
189
@Override
190
default Optional<Integer> getParallelism() {
191
return Optional.empty();
192
}
193
}
194
195
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
196
static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);
197
SourceFunction<RowData> createSourceFunction();
198
boolean isBounded();
199
}
200
201
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
202
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);
203
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);
204
SinkFunction<RowData> createSinkFunction();
205
Optional<Integer> getParallelism();
206
}
207
```
208
209
[Modern Connector Framework](./modern-connector-framework.md)
210
211
### Built-in Connectors
212
213
Ready-to-use connectors for development and testing scenarios.
214
215
```java { .api }
216
// DataGen Connector for generating test data
217
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
218
// Factory methods and configuration
219
}
220
221
// Print Connector for outputting results
222
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
223
// Factory methods and configuration
224
}
225
226
// BlackHole Connector for performance testing
227
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
228
// Factory methods and configuration
229
}
230
```
231
232
[Built-in Connectors](./built-in-connectors.md)
233
234
### Watermark Strategies
235
236
Time-based watermarking strategies for event-time processing in streaming applications. These are legacy watermark strategy classes for table sources.
237
238
```java { .api }
239
public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
240
public abstract void nextTimestamp(long timestamp);
241
public abstract Watermark getWatermark();
242
public abstract Map<String, String> toProperties();
243
}
244
245
public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
246
public abstract Watermark getWatermark(Row row, long timestamp);
247
public abstract Map<String, String> toProperties();
248
}
249
250
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
251
@Override
252
public void nextTimestamp(long timestamp);
253
@Override
254
public Watermark getWatermark();
255
@Override
256
public Map<String, String> toProperties();
257
}
258
259
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
260
public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);
261
@Override
262
public void nextTimestamp(long timestamp);
263
@Override
264
public Watermark getWatermark();
265
@Override
266
public Map<String, String> toProperties();
267
}
268
```
269
270
[Watermark Strategies](./watermark-strategies.md)
271
272
### Legacy Connector Support
273
274
Deprecated but maintained interfaces for backward compatibility with existing connector implementations.
275
276
```java { .api }
277
@Deprecated
278
public interface StreamTableSource<T> extends TableSource<T> {
279
default boolean isBounded() { return false; }
280
DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
281
}
282
283
@Deprecated
284
public interface StreamTableSink<T> extends TableSink<T> {
285
DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
286
}
287
```
288
289
[Legacy Connector Support](./legacy-connector-support.md)
290
291
## Types
292
293
```java { .api }
294
public interface StreamStatementSet extends StatementSet {
295
@Override
296
StreamStatementSet addInsertSql(String statement);
297
@Override
298
StreamStatementSet addInsert(String targetPath, Table table);
299
@Override
300
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
301
@Override
302
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
303
@Override
304
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
305
void attachAsDataStream();
306
}
307
308
public class Schema {
309
public static Schema.Builder newBuilder();
310
// Schema definition for DataStream to Table conversions
311
}
312
313
public enum ChangelogMode {
314
INSERT_ONLY,
315
UPSERT,
316
ALL;
317
318
public static ChangelogMode insertOnly();
319
public static ChangelogMode upsert();
320
public static ChangelogMode all();
321
}
322
323
public abstract class WatermarkStrategy {
324
public abstract Map<String, String> toProperties();
325
}
326
327
public interface ParallelismProvider {
328
Optional<Integer> getParallelism();
329
}
330
```