0
# Apache Flink Table API Java Bridge
1
2
The Apache Flink Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API for Java applications. This module enables developers to convert between DataStreams and Tables, create StreamTableEnvironments for unified batch and stream processing, and leverage SQL queries on streaming data with comprehensive connector support.
3
4
## Package Information
5
6
- **Package Name**: flink-table-api-java-bridge
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Language**: Java
10
- **Installation**: Add to Maven dependencies:
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-table-api-java-bridge</artifactId>
15
<version>2.1.0</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
23
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
24
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25
import org.apache.flink.table.api.Table;
26
import org.apache.flink.streaming.api.datastream.DataStream;
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
34
import org.apache.flink.table.api.Table;
35
import org.apache.flink.streaming.api.datastream.DataStream;
36
import org.apache.flink.types.Row;
37
38
// Create execution environment and table environment
39
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
41
42
// Convert DataStream to Table
43
DataStream<Row> dataStream = env.fromElements(
44
Row.of("Alice", 25),
45
Row.of("Bob", 30)
46
);
47
Table table = tableEnv.fromDataStream(dataStream);
48
49
// Execute SQL on the table
50
Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE f1 > 25");
51
52
// Convert Table back to DataStream
53
DataStream<Row> resultStream = tableEnv.toDataStream(result);
54
55
// Execute the pipeline
56
env.execute("Table API Bridge Example");
57
```
58
59
## Architecture
60
61
The Flink Table API Java Bridge is built around several key components:
62
63
- **StreamTableEnvironment**: Central context for bridging Table API and DataStream API operations
64
- **Stream-Table Conversion**: Bidirectional conversion utilities between DataStreams and Tables
65
- **Schema Management**: Type-safe schema definitions and transformations with watermark support
66
- **Built-in Connectors**: Testing and development connectors (blackhole, datagen, print)
67
- **Legacy Support**: Backward compatibility for existing table sources and sinks
68
- **Watermark Strategies**: Time-based event processing with configurable watermark assignment
69
70
## Capabilities
71
72
### Stream Table Environment
73
74
Core environment for creating and managing tables that integrate with DataStream API. Provides unified context for both batch and streaming operations.
75
76
```java { .api }
77
public interface StreamTableEnvironment extends TableEnvironment {
78
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
79
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
80
81
<T> Table fromDataStream(DataStream<T> dataStream);
82
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
83
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields); // @Deprecated
84
85
<T> void createTemporaryView(String path, DataStream<T> dataStream);
86
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
87
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields); // @Deprecated
88
89
DataStream<Row> toDataStream(Table table);
90
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
91
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
92
93
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz); // @Deprecated
94
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo); // @Deprecated
95
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz); // @Deprecated
96
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo); // @Deprecated
97
98
StreamStatementSet createStatementSet();
99
}
100
```
101
102
[Stream Table Environment](./stream-table-environment.md)
103
104
### Changelog Stream Processing
105
106
Advanced stream processing with support for changelog semantics including inserts, updates, and deletes.
107
108
```java { .api }
109
public interface StreamTableEnvironment extends TableEnvironment {
110
Table fromChangelogStream(DataStream<Row> dataStream);
111
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
112
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
113
114
DataStream<Row> toChangelogStream(Table table);
115
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
116
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
117
}
118
```
119
120
[Changelog Processing](./changelog-processing.md)
121
122
### DataStream Connector Providers
123
124
Provider interfaces for advanced connector development that integrate directly with DataStream API.
125
126
```java { .api }
127
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
128
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
129
}
130
131
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
132
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
133
Optional<Integer> getParallelism();
134
}
135
```
136
137
[DataStream Connectors](./datastream-connectors.md)
138
139
### Built-in Test Connectors
140
141
Ready-to-use connectors for development, testing, and debugging table applications.
142
143
```java { .api }
144
// BlackHole connector - discards all data for performance testing
145
CREATE TABLE sink_table (...) WITH ('connector' = 'blackhole');
146
147
// DataGen connector - generates test data
148
CREATE TABLE source_table (...) WITH (
149
'connector' = 'datagen',
150
'rows-per-second' = '100',
151
'fields.user_id.kind' = 'sequence',
152
'fields.name.kind' = 'random'
153
);
154
155
// Print connector - outputs data to console for debugging
156
CREATE TABLE debug_table (...) WITH ('connector' = 'print');
157
```
158
159
[Built-in Connectors](./builtin-connectors.md)
160
161
### Watermark Strategies
162
163
Time-based event processing with configurable watermark assignment strategies for handling out-of-order events.
164
165
```java { .api }
166
public abstract class PeriodicWatermarkAssigner {
167
public abstract void nextTimestamp(long timestamp);
168
public abstract Watermark getWatermark();
169
public abstract Map<String, String> toProperties();
170
}
171
172
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
173
public BoundedOutOfOrderTimestamps(long delay);
174
}
175
176
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
177
public AscendingTimestamps();
178
}
179
```
180
181
[Watermark Strategies](./watermark-strategies.md)
182
183
### Statement Set Operations
184
185
Batch execution optimization for multiple table operations with shared planning and execution.
186
187
```java { .api }
188
public interface StreamStatementSet extends StatementSet {
189
StreamStatementSet add(TablePipeline tablePipeline);
190
StreamStatementSet addInsertSql(String statement);
191
StreamStatementSet addInsert(String targetPath, Table table);
192
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
193
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
194
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
195
StreamStatementSet printExplain(ExplainDetail... extraDetails);
196
197
void attachAsDataStream();
198
199
// Inherited from StatementSet:
200
// TableResult execute();
201
// CompiledPlan compilePlan(); // @Experimental
202
// String explain(ExplainDetail... extraDetails);
203
// String explain(ExplainFormat format, ExplainDetail... extraDetails);
204
}
205
```
206
207
[Statement Sets](./statement-sets.md)
208
209
### Procedure Context
210
211
Context for stored procedure execution with access to StreamExecutionEnvironment.
212
213
```java { .api }
214
public interface ProcedureContext {
215
StreamExecutionEnvironment getExecutionEnvironment();
216
}
217
218
public class DefaultProcedureContext implements ProcedureContext {
219
// Default implementation
220
}
221
```
222
223
[Procedures](./procedures.md)
224
225
## Types
226
227
### Core Schema Types
228
229
```java { .api }
230
import org.apache.flink.table.api.Schema;
231
import org.apache.flink.table.api.EnvironmentSettings;
232
import org.apache.flink.table.api.TableDescriptor;
233
import org.apache.flink.table.api.TableResult;
234
import org.apache.flink.table.api.CompiledPlan;
235
import org.apache.flink.table.api.ExplainDetail;
236
import org.apache.flink.table.api.ExplainFormat;
237
import org.apache.flink.table.connector.ChangelogMode;
238
import org.apache.flink.types.Row;
239
import org.apache.flink.types.RowKind;
240
```
241
242
### DataStream Integration Types
243
244
```java { .api }
245
import org.apache.flink.streaming.api.datastream.DataStream;
246
import org.apache.flink.streaming.api.datastream.DataStreamSink;
247
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
248
import org.apache.flink.api.common.typeinfo.TypeInformation;
249
import org.apache.flink.table.types.AbstractDataType;
250
import org.apache.flink.table.types.DataType;
251
import org.apache.flink.api.java.tuple.Tuple2;
252
import org.apache.flink.table.expressions.Expression;
253
```
254
255
### Connector Provider Types
256
257
```java { .api }
258
import org.apache.flink.table.connector.ProviderContext;
259
import org.apache.flink.table.connector.ParallelismProvider;
260
import org.apache.flink.table.data.RowData;
261
import java.util.Optional;
262
```