0
# Table Operations
1
2
## Overview
3
4
Table operations provide functionality for creating, registering, and managing tables and views within the StreamTableEnvironment. This includes temporary view creation, table registration, and catalog management for DataStream-based tables.
5
6
## Core API
7
8
### View Creation
9
10
```scala { .api }
11
trait StreamTableEnvironment {
12
def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit
13
def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit
14
}
15
```
16
17
### DataStreamConversions View Methods
18
19
```scala { .api }
20
class DataStreamConversions[T](dataStream: DataStream[T]) {
21
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit
22
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): Unit
23
}
24
```
25
26
### Legacy Registration Methods (Deprecated)
27
28
```scala { .api }
29
trait StreamTableEnvironment {
30
@deprecated def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit
31
@deprecated def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit
32
@deprecated def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
33
@deprecated def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit
34
}
35
```
36
37
## Creating Temporary Views
38
39
### Basic View Creation
40
41
```scala
42
case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)
43
44
val orderStream: DataStream[Order] = env.fromCollection(orders)
45
46
// Create temporary view with automatic schema
47
tableEnv.createTemporaryView("orders", orderStream)
48
49
// Now can reference in SQL
50
val result = tableEnv.sqlQuery("SELECT userId, SUM(amount) FROM orders GROUP BY userId")
51
```
52
53
### View with Custom Schema
54
55
```scala
56
import org.apache.flink.table.api.Schema
57
import org.apache.flink.table.types.DataTypes
58
59
val schema = Schema.newBuilder()
60
.column("order_id", DataTypes.STRING())
61
.column("user_id", DataTypes.STRING())
62
.column("amount", DataTypes.DECIMAL(10, 2))
63
.column("order_time", DataTypes.TIMESTAMP_LTZ(3))
64
.column("proc_time", DataTypes.TIMESTAMP_LTZ(3))
65
.columnByExpression("proc_time", "PROCTIME()")
66
.build()
67
68
tableEnv.createTemporaryView("orders_with_proctime", orderStream, schema)
69
```
70
71
### Catalog Path Views
72
73
```scala
74
// Create view in specific catalog and database
75
tableEnv.createTemporaryView("my_catalog.my_database.orders", orderStream)
76
77
// Create nested path
78
tableEnv.createTemporaryView("analytics.sales.daily_orders", orderStream)
79
```
80
81
### Using DataStreamConversions
82
83
```scala
84
import org.apache.flink.table.api.bridge.scala._
85
86
// Using implicit conversion
87
orderStream.createTemporaryView(tableEnv, "orders")
88
89
// With custom schema
90
orderStream.createTemporaryView(tableEnv, "orders_detailed", schema)
91
```
92
93
## Event-Time Views
94
95
### Views with Event-Time Attributes
96
97
```scala
98
val eventTimeSchema = Schema.newBuilder()
99
.column("order_id", DataTypes.STRING())
100
.column("user_id", DataTypes.STRING())
101
.column("amount", DataTypes.DECIMAL(10, 2))
102
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
103
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
104
.watermark("rowtime", "SOURCE_WATERMARK()")
105
.build()
106
107
// DataStream with watermarks
108
val timestampedOrders = orderStream.assignTimestampsAndWatermarks(
109
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(1))
110
.withTimestampAssigner((order, _) => order.timestamp)
111
)
112
113
tableEnv.createTemporaryView("timed_orders", timestampedOrders, eventTimeSchema)
114
115
// Now can use in time-based queries
116
val windowedResult = tableEnv.sqlQuery("""
117
SELECT
118
user_id,
119
TUMBLE_START(rowtime, INTERVAL '1' HOUR) as window_start,
120
SUM(amount) as total_amount
121
FROM timed_orders
122
GROUP BY user_id, TUMBLE(rowtime, INTERVAL '1' HOUR)
123
""")
124
```
125
126
## View Management
127
128
### Temporary vs Permanent Views
129
130
```scala
131
// Temporary views (session-scoped)
132
tableEnv.createTemporaryView("temp_orders", orderStream)
133
134
// Check if view exists
135
val viewExists = tableEnv.getCatalog("default_catalog")
136
.flatMap(_.getTable(ObjectPath.fromString("default_database.temp_orders")))
137
.isPresent
138
139
// Drop temporary view
140
tableEnv.executeSql("DROP TEMPORARY VIEW IF EXISTS temp_orders")
141
```
142
143
### View Shadowing
144
145
```scala
146
// Temporary views shadow permanent ones with the same name
147
tableEnv.createTemporaryView("orders", orderStream1) // Creates temporary view
148
tableEnv.createTemporaryView("orders", orderStream2) // Replaces temporary view
149
150
// To access permanent view, drop temporary one first
151
tableEnv.executeSql("DROP TEMPORARY VIEW orders")
152
```
153
154
## Legacy Table Registration (Deprecated)
155
156
### Basic Registration
157
158
```scala
159
// Deprecated - use createTemporaryView instead
160
tableEnv.registerDataStream("orders", orderStream)
161
162
// With field expressions (deprecated)
163
import org.apache.flink.table.api.Expressions.$
164
165
tableEnv.registerDataStream(
166
"orders_renamed",
167
orderStream,
168
$"orderId" as "id",
169
$"userId" as "customer",
170
$"amount",
171
$"timestamp".rowtime as "event_time"
172
)
173
```
174
175
### Field Expression Patterns
176
177
```scala
178
// Reference by position (tuples, case classes)
179
tableEnv.registerDataStream(
180
"tuple_stream",
181
tupleStream,
182
$"_1" as "first",
183
$"_2" as "second"
184
)
185
186
// Reference by name with reordering
187
tableEnv.registerDataStream(
188
"reordered",
189
orderStream,
190
$"amount", // Amount first
191
$"orderId" as "id", // Rename orderId
192
$"userId" // Keep userId as-is
193
)
194
```
195
196
## Complex View Scenarios
197
198
### Nested Object Views
199
200
```scala
201
case class Address(street: String, city: String, zipCode: String)
202
case class Customer(id: String, name: String, address: Address)
203
204
val customerStream: DataStream[Customer] = // ... source
205
206
// Flink flattens nested objects automatically
207
tableEnv.createTemporaryView("customers", customerStream)
208
// Results in columns: id, name, address.street, address.city, address.zipCode
209
210
// Query nested fields
211
val cityQuery = tableEnv.sqlQuery("""
212
SELECT id, name, address.city as customer_city
213
FROM customers
214
WHERE address.city = 'New York'
215
""")
216
```
217
218
### Collection Type Views
219
220
```scala
221
case class OrderWithItems(orderId: String, items: Array[String], metadata: Map[String, String])
222
223
val orderWithItemsStream: DataStream[OrderWithItems] = // ... source
224
225
tableEnv.createTemporaryView("orders_with_items", orderWithItemsStream)
226
227
// Query array and map elements
228
val itemQuery = tableEnv.sqlQuery("""
229
SELECT
230
orderId,
231
items[1] as first_item,
232
CARDINALITY(items) as item_count,
233
metadata['priority'] as order_priority
234
FROM orders_with_items
235
""")
236
```
237
238
### Union and Join Views
239
240
```scala
241
// Create multiple views for union operations
242
tableEnv.createTemporaryView("current_orders", currentOrderStream)
243
tableEnv.createTemporaryView("historical_orders", historicalOrderStream)
244
245
val unionResult = tableEnv.sqlQuery("""
246
SELECT * FROM current_orders
247
UNION ALL
248
SELECT * FROM historical_orders
249
""")
250
251
// Views for joins
252
tableEnv.createTemporaryView("orders", orderStream)
253
tableEnv.createTemporaryView("customers", customerStream)
254
255
val joinResult = tableEnv.sqlQuery("""
256
SELECT o.orderId, c.name, o.amount
257
FROM orders o
258
JOIN customers c ON o.userId = c.id
259
""")
260
```
261
262
## Error Handling
263
264
```scala
265
try {
266
tableEnv.createTemporaryView("orders", orderStream, schema)
267
} catch {
268
case e: ValidationException =>
269
// Schema validation failed or view name conflicts
270
case e: TableException =>
271
// View creation error
272
case e: CatalogException =>
273
// Catalog-related error (invalid path, etc.)
274
}
275
```
276
277
## Performance Considerations
278
279
1. **Schema Complexity**: Complex nested schemas may impact query performance
280
2. **View Reuse**: Reuse views across multiple queries for efficiency
281
3. **Temporary Storage**: Temporary views don't persist data but may cache metadata
282
4. **Path Resolution**: Simple view names are resolved faster than complex catalog paths
283
284
## Best Practices
285
286
1. **Use createTemporaryView**: Prefer new API over deprecated registration methods
287
2. **Meaningful Names**: Use descriptive view names that reflect data content
288
3. **Schema Definition**: Define explicit schemas for production applications
289
4. **Namespace Organization**: Use catalog paths to organize views logically
290
5. **Cleanup**: Drop temporary views when no longer needed to avoid name conflicts
291
6. **Documentation**: Document view schemas and their intended usage
292
7. **Testing**: Test view creation and querying in development environments first