0
# Apache Flink Table API Java Bridge
1
2
Apache Flink's Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API. It enables developers to convert between DataStream and Table representations, register custom connectors, and execute table operations within streaming applications.
3
4
## Package Information
5
6
- **Package Name**: flink-table-api-java-bridge
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-table-api-java-bridge</artifactId>
14
<version>1.20.2</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
22
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
23
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24
import org.apache.flink.table.api.Table;
25
import org.apache.flink.table.api.Schema;
26
import org.apache.flink.streaming.api.datastream.DataStream;
27
import org.apache.flink.types.Row;
28
import org.apache.flink.table.types.AbstractDataType;
29
```
30
31
## Basic Usage
32
33
```java
34
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
35
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
36
import org.apache.flink.table.api.Table;
37
import org.apache.flink.streaming.api.datastream.DataStream;
38
import org.apache.flink.types.Row;
39
40
// Create execution environment
41
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
42
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
43
44
// Convert DataStream to Table
45
DataStream<Row> dataStream = // ... your data stream
46
Table table = tableEnv.fromDataStream(dataStream);
47
48
// Execute SQL queries
49
Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE amount > 100");
50
51
// Convert back to DataStream
52
DataStream<Row> resultStream = tableEnv.toDataStream(result);
53
```
54
55
## Architecture
56
57
The Flink Table API Java Bridge is organized around several key components:
58
59
- **StreamTableEnvironment**: Main entry point providing factory methods and conversion utilities
60
- **Connector Framework**: Interfaces for implementing custom table sources and sinks
61
- **Built-in Connectors**: Ready-to-use connectors for testing and common use cases (DataGen, Print, BlackHole)
62
- **Legacy APIs**: Deprecated interfaces maintained for backward compatibility
63
- **Procedure Context**: Framework for stored procedure execution
64
65
## Capabilities
66
67
### Table Environment Management
68
69
Core functionality for creating and managing table environments that bridge DataStream and Table APIs.
70
71
```java { .api }
72
public interface StreamTableEnvironment extends TableEnvironment {
73
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
74
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
75
}
76
```
77
78
[Table Environment](./table-environment.md)
79
80
### DataStream Integration
81
82
Bi-directional conversion between DataStream and Table with support for custom schemas and changelog processing.
83
84
```java { .api }
85
// DataStream to Table conversion
86
<T> Table fromDataStream(DataStream<T> dataStream);
87
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
88
Table fromChangelogStream(DataStream<Row> dataStream);
89
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
90
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
91
92
// Table to DataStream conversion
93
DataStream<Row> toDataStream(Table table);
94
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
95
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
96
DataStream<Row> toChangelogStream(Table table);
97
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
98
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
99
100
// Temporary view creation
101
<T> void createTemporaryView(String path, DataStream<T> dataStream);
102
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
103
104
// Statement set creation
105
StreamStatementSet createStatementSet();
106
107
// Deprecated methods (maintained for backward compatibility)
108
@Deprecated <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
109
@Deprecated <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
110
@Deprecated <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
111
@Deprecated <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
112
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
113
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
114
```
115
116
[DataStream Integration](./datastream-integration.md)
117
118
### Connector Framework
119
120
Provider interfaces for implementing custom table sources and sinks that integrate with DataStream API.
121
122
```java { .api }
123
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
124
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
125
}
126
127
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
128
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
129
Optional<Integer> getParallelism();
130
}
131
132
public interface ProviderContext {
133
String generateUid(String name);
134
}
135
136
public interface ParallelismProvider {
137
Optional<Integer> getParallelism();
138
}
139
```
140
141
[Connector Framework](./connector-framework.md)
142
143
### Built-in Connectors
144
145
Production-ready connectors for common use cases including test data generation, development output, and performance testing.
146
147
```java { .api }
148
// DataGen connector for test data generation
149
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
150
public String factoryIdentifier(); // Returns "datagen"
151
}
152
153
// Print connector for development output
154
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
155
public String factoryIdentifier(); // Returns "print"
156
}
157
158
// BlackHole connector for performance testing
159
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
160
public String factoryIdentifier(); // Returns "blackhole"
161
}
162
```
163
164
[Built-in Connectors](./built-in-connectors.md)
165
166
### Statement Set Operations
167
168
Batch execution of multiple table operations for optimized query planning and execution.
169
170
```java { .api }
171
public interface StreamStatementSet extends StatementSet {
172
StreamStatementSet add(TablePipeline tablePipeline);
173
StreamStatementSet addInsertSql(String statement);
174
StreamStatementSet addInsert(String targetPath, Table table);
175
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
176
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
177
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
178
void attachAsDataStream();
179
StreamStatementSet printExplain(ExplainDetail... extraDetails);
180
}
181
```
182
183
[Statement Set Operations](./statement-set.md)
184
185
### Procedure Context
186
187
Framework for stored procedure execution with access to StreamExecutionEnvironment.
188
189
```java { .api }
190
public interface ProcedureContext {
191
StreamExecutionEnvironment getExecutionEnvironment();
192
}
193
194
public class DefaultProcedureContext implements ProcedureContext {
195
public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);
196
}
197
```
198
199
[Procedure Context](./procedure-context.md)
200
201
## Type Definitions
202
203
### Core Types
204
205
```java { .api }
206
import org.apache.flink.table.api.Schema;
207
import org.apache.flink.table.api.Table;
208
import org.apache.flink.table.api.EnvironmentSettings;
209
import org.apache.flink.table.api.TableDescriptor;
210
import org.apache.flink.table.api.ExplainDetail;
211
import org.apache.flink.table.api.TablePipeline;
212
import org.apache.flink.table.connector.ChangelogMode;
213
import org.apache.flink.table.connector.ProviderContext;
214
import org.apache.flink.table.connector.ParallelismProvider;
215
import org.apache.flink.table.connector.source.ScanTableSource;
216
import org.apache.flink.table.connector.sink.DynamicTableSink;
217
import org.apache.flink.table.factories.DynamicTableSourceFactory;
218
import org.apache.flink.table.factories.DynamicTableSinkFactory;
219
import org.apache.flink.types.Row;
220
import org.apache.flink.types.RowKind;
221
import org.apache.flink.streaming.api.datastream.DataStream;
222
import org.apache.flink.streaming.api.datastream.DataStreamSink;
223
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
224
import org.apache.flink.table.data.RowData;
225
import org.apache.flink.table.expressions.Expression;
226
import org.apache.flink.api.common.typeinfo.TypeInformation;
227
import org.apache.flink.api.java.tuple.Tuple2;
228
import java.util.Optional;
229
```
230
231
### Schema Building
232
233
```java { .api }
234
// Schema builder for custom table schemas
235
Schema schema = Schema.newBuilder()
236
.column("id", DataTypes.BIGINT())
237
.column("name", DataTypes.STRING())
238
.columnByExpression("computed", "id * 2")
239
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
240
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")
241
.build();
242
```
243
244
### Changelog Modes
245
246
```java { .api }
247
// Changelog mode options
248
ChangelogMode.all() // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
249
ChangelogMode.insertOnly() // INSERT only
250
ChangelogMode.upsert() // INSERT, UPDATE_AFTER, DELETE
251
```