0
# Flink Table API Scala Bridge
1
2
The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and DataStream API for Scala developers. This bridge enables bidirectional conversion between DataStreams and Tables, allowing mixed declarative SQL operations with procedural stream processing in unified Scala applications.
3
4
## Package Information
5
6
- **Package Name**: flink-table-api-scala-bridge_2.12
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Maven Coordinates**: `org.apache.flink:flink-table-api-scala-bridge_2.12:2.1.0`
10
- **Installation**: Add to your `pom.xml` dependencies
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
16
<version>2.1.0</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.flink.table.api._
24
import org.apache.flink.table.api.bridge.scala._
25
import org.apache.flink.streaming.api.scala._
26
```
27
28
## Basic Usage
29
30
```scala
31
import org.apache.flink.table.api._
32
import org.apache.flink.table.api.bridge.scala._
33
import org.apache.flink.streaming.api.scala._
34
35
// Create execution environment and table environment
36
val env = StreamExecutionEnvironment.getExecutionEnvironment
37
val tableEnv = StreamTableEnvironment.create(env)
38
39
// Create a DataStream
40
val dataStream = env.fromElements(
41
("Alice", 25, "Engineer"),
42
("Bob", 30, "Manager"),
43
("Charlie", 35, "Developer")
44
)
45
46
// Convert DataStream to Table
47
val table = tableEnv.fromDataStream(dataStream, $"name", $"age", $"role")
48
49
// Perform SQL operations
50
val filteredTable = table.filter($"age" > 28)
51
52
// Convert back to DataStream
53
val resultStream = tableEnv.toDataStream(filteredTable)
54
55
// Execute the job
56
env.execute("Table Bridge Example")
57
```
58
59
## Architecture
60
61
The Flink Table API Scala Bridge is built around several key components:
62
63
- **StreamTableEnvironment**: Entry point for creating and managing table environments in streaming contexts
64
- **Conversion Classes**: `DataStreamConversions` and `TableConversions` for seamless type-safe conversions
65
- **Statement Management**: `StreamStatementSet` for batching multiple table operations for optimized execution
66
- **Implicit Conversions**: Scala-idiomatic conversion utilities available through package object
67
- **Schema Support**: Flexible schema definition and type mapping between Scala types and table schemas
68
69
## Capabilities
70
71
### Stream Table Environment
72
73
Core table environment for streaming applications, providing the entry point for all Table/SQL API operations integrated with DataStream processing.
74
75
```scala { .api }
76
trait StreamTableEnvironment extends TableEnvironment
77
78
object StreamTableEnvironment {
79
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
80
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
81
}
82
```
83
84
[Stream Table Environment](./stream-table-environment.md)
85
86
### DataStream to Table Conversions
87
88
Convert DataStreams to Tables with automatic or custom schema derivation, supporting both regular streams and changelog streams.
89
90
```scala { .api }
91
class DataStreamConversions[T](dataStream: DataStream[T]) {
92
def toTable(tableEnv: StreamTableEnvironment): Table
93
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
94
def toChangelogTable(tableEnv: StreamTableEnvironment): Table
95
}
96
```
97
98
[DataStream Conversions](./datastream-conversions.md)
99
100
### Table to DataStream Conversions
101
102
Convert Tables back to DataStreams with support for different output modes including insert-only and full changelog streams.
103
104
```scala { .api }
105
class TableConversions(table: Table) {
106
def toDataStream: DataStream[Row]
107
def toDataStream[T](targetClass: Class[T]): DataStream[T]
108
def toChangelogStream: DataStream[Row]
109
def toChangelogStream(targetSchema: Schema): DataStream[Row]
110
}
111
```
112
113
[Table Conversions](./table-conversions.md)
114
115
### Statement Set Operations
116
117
Batch multiple table operations together for optimized execution and resource management.
118
119
```scala { .api }
120
trait StreamStatementSet extends StatementSet {
121
def add(tablePipeline: TablePipeline): StreamStatementSet
122
def addInsertSql(statement: String): StreamStatementSet
123
def addInsert(targetPath: String, table: Table): StreamStatementSet
124
def attachAsDataStream(): Unit
125
}
126
```
127
128
[Statement Sets](./statement-sets.md)
129
130
### Implicit Conversions
131
132
Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs.
133
134
```scala { .api }
135
// Available implicit conversions from package object
136
implicit def tableConversions(table: Table): TableConversions
137
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
138
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
139
```
140
141
[Implicit Conversions](./implicit-conversions.md)
142
143
## Types
144
145
```scala { .api }
146
// Core Flink types used throughout the API
147
import org.apache.flink.streaming.api.datastream.DataStream
148
import org.apache.flink.table.api.Table
149
import org.apache.flink.table.api.Schema
150
import org.apache.flink.table.connector.ChangelogMode
151
import org.apache.flink.types.Row
152
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
153
import org.apache.flink.table.api.EnvironmentSettings
154
import org.apache.flink.table.expressions.Expression
155
import org.apache.flink.table.types.AbstractDataType
156
import org.apache.flink.api.common.typeinfo.TypeInformation
157
import org.apache.flink.table.api.{TableDescriptor, TablePipeline, ExplainDetail}
158
import org.apache.flink.types.RowKind
159
```
160
161
## Deprecation Notice
162
163
**Important**: All APIs in this module are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Consider migrating to the new unified Table API approach as documented in the Flink migration guide.