or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md

table-operations.mddocs/

0

# Table Operations

1

2

## Overview

3

4

Table operations provide functionality for creating, registering, and managing tables and views within the StreamTableEnvironment. This includes temporary view creation, table registration, and catalog management for DataStream-based tables.

5

6

## Core API

7

8

### View Creation

9

10

```scala { .api }

11

trait StreamTableEnvironment {

12

def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit

13

def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit

14

}

15

```

16

17

### DataStreamConversions View Methods

18

19

```scala { .api }

20

class DataStreamConversions[T](dataStream: DataStream[T]) {

21

def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit

22

def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): Unit

23

}

24

```

25

26

### Legacy Registration Methods (Deprecated)

27

28

```scala { .api }

29

trait StreamTableEnvironment {

30

@deprecated def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit

31

@deprecated def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit

32

@deprecated def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table

33

@deprecated def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit

34

}

35

```

36

37

## Creating Temporary Views

38

39

### Basic View Creation

40

41

```scala

42

case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)

43

44

val orderStream: DataStream[Order] = env.fromCollection(orders)

45

46

// Create temporary view with automatic schema

47

tableEnv.createTemporaryView("orders", orderStream)

48

49

// Now can reference in SQL

50

val result = tableEnv.sqlQuery("SELECT userId, SUM(amount) FROM orders GROUP BY userId")

51

```

52

53

### View with Custom Schema

54

55

```scala

56

import org.apache.flink.table.api.Schema

57

import org.apache.flink.table.types.DataTypes

58

59

val schema = Schema.newBuilder()

60

.column("order_id", DataTypes.STRING())

61

.column("user_id", DataTypes.STRING())

62

.column("amount", DataTypes.DECIMAL(10, 2))

63

.column("order_time", DataTypes.TIMESTAMP_LTZ(3))

64

.column("proc_time", DataTypes.TIMESTAMP_LTZ(3))

65

.columnByExpression("proc_time", "PROCTIME()")

66

.build()

67

68

tableEnv.createTemporaryView("orders_with_proctime", orderStream, schema)

69

```

70

71

### Catalog Path Views

72

73

```scala

74

// Create view in specific catalog and database

75

tableEnv.createTemporaryView("my_catalog.my_database.orders", orderStream)

76

77

// Create nested path

78

tableEnv.createTemporaryView("analytics.sales.daily_orders", orderStream)

79

```

80

81

### Using DataStreamConversions

82

83

```scala

84

import org.apache.flink.table.api.bridge.scala._

85

86

// Using implicit conversion

87

orderStream.createTemporaryView(tableEnv, "orders")

88

89

// With custom schema

90

orderStream.createTemporaryView(tableEnv, "orders_detailed", schema)

91

```

92

93

## Event-Time Views

94

95

### Views with Event-Time Attributes

96

97

```scala

98

val eventTimeSchema = Schema.newBuilder()

99

.column("order_id", DataTypes.STRING())

100

.column("user_id", DataTypes.STRING())

101

.column("amount", DataTypes.DECIMAL(10, 2))

102

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

103

.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))

104

.watermark("rowtime", "SOURCE_WATERMARK()")

105

.build()

106

107

// DataStream with watermarks

108

val timestampedOrders = orderStream.assignTimestampsAndWatermarks(

109

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(1))

110

.withTimestampAssigner((order, _) => order.timestamp)

111

)

112

113

tableEnv.createTemporaryView("timed_orders", timestampedOrders, eventTimeSchema)

114

115

// Now can use in time-based queries

116

val windowedResult = tableEnv.sqlQuery("""

117

SELECT

118

user_id,

119

TUMBLE_START(rowtime, INTERVAL '1' HOUR) as window_start,

120

SUM(amount) as total_amount

121

FROM timed_orders

122

GROUP BY user_id, TUMBLE(rowtime, INTERVAL '1' HOUR)

123

""")

124

```

125

126

## View Management

127

128

### Temporary vs Permanent Views

129

130

```scala

131

// Temporary views (session-scoped)

132

tableEnv.createTemporaryView("temp_orders", orderStream)

133

134

// Check if view exists

135

val viewExists = tableEnv.getCatalog("default_catalog")

136

.flatMap(_.getTable(ObjectPath.fromString("default_database.temp_orders")))

137

.isPresent

138

139

// Drop temporary view

140

tableEnv.executeSql("DROP TEMPORARY VIEW IF EXISTS temp_orders")

141

```

142

143

### View Shadowing

144

145

```scala

146

// Temporary views shadow permanent ones with the same name

147

tableEnv.createTemporaryView("orders", orderStream1) // Creates temporary view

148

tableEnv.createTemporaryView("orders", orderStream2) // Replaces temporary view

149

150

// To access permanent view, drop temporary one first

151

tableEnv.executeSql("DROP TEMPORARY VIEW orders")

152

```

153

154

## Legacy Table Registration (Deprecated)

155

156

### Basic Registration

157

158

```scala

159

// Deprecated - use createTemporaryView instead

160

tableEnv.registerDataStream("orders", orderStream)

161

162

// With field expressions (deprecated)

163

import org.apache.flink.table.api.Expressions.$

164

165

tableEnv.registerDataStream(

166

"orders_renamed",

167

orderStream,

168

$"orderId" as "id",

169

$"userId" as "customer",

170

$"amount",

171

$"timestamp".rowtime as "event_time"

172

)

173

```

174

175

### Field Expression Patterns

176

177

```scala

178

// Reference by position (tuples, case classes)

179

tableEnv.registerDataStream(

180

"tuple_stream",

181

tupleStream,

182

$"_1" as "first",

183

$"_2" as "second"

184

)

185

186

// Reference by name with reordering

187

tableEnv.registerDataStream(

188

"reordered",

189

orderStream,

190

$"amount", // Amount first

191

$"orderId" as "id", // Rename orderId

192

$"userId" // Keep userId as-is

193

)

194

```

195

196

## Complex View Scenarios

197

198

### Nested Object Views

199

200

```scala

201

case class Address(street: String, city: String, zipCode: String)

202

case class Customer(id: String, name: String, address: Address)

203

204

val customerStream: DataStream[Customer] = // ... source

205

206

// Flink flattens nested objects automatically

207

tableEnv.createTemporaryView("customers", customerStream)

208

// Results in columns: id, name, address.street, address.city, address.zipCode

209

210

// Query nested fields

211

val cityQuery = tableEnv.sqlQuery("""

212

SELECT id, name, address.city as customer_city

213

FROM customers

214

WHERE address.city = 'New York'

215

""")

216

```

217

218

### Collection Type Views

219

220

```scala

221

case class OrderWithItems(orderId: String, items: Array[String], metadata: Map[String, String])

222

223

val orderWithItemsStream: DataStream[OrderWithItems] = // ... source

224

225

tableEnv.createTemporaryView("orders_with_items", orderWithItemsStream)

226

227

// Query array and map elements

228

val itemQuery = tableEnv.sqlQuery("""

229

SELECT

230

orderId,

231

items[1] as first_item,

232

CARDINALITY(items) as item_count,

233

metadata['priority'] as order_priority

234

FROM orders_with_items

235

""")

236

```

237

238

### Union and Join Views

239

240

```scala

241

// Create multiple views for union operations

242

tableEnv.createTemporaryView("current_orders", currentOrderStream)

243

tableEnv.createTemporaryView("historical_orders", historicalOrderStream)

244

245

val unionResult = tableEnv.sqlQuery("""

246

SELECT * FROM current_orders

247

UNION ALL

248

SELECT * FROM historical_orders

249

""")

250

251

// Views for joins

252

tableEnv.createTemporaryView("orders", orderStream)

253

tableEnv.createTemporaryView("customers", customerStream)

254

255

val joinResult = tableEnv.sqlQuery("""

256

SELECT o.orderId, c.name, o.amount

257

FROM orders o

258

JOIN customers c ON o.userId = c.id

259

""")

260

```

261

262

## Error Handling

263

264

```scala

265

try {

266

tableEnv.createTemporaryView("orders", orderStream, schema)

267

} catch {

268

case e: ValidationException =>

269

// Schema validation failed or view name conflicts

270

case e: TableException =>

271

// View creation error

272

case e: CatalogException =>

273

// Catalog-related error (invalid path, etc.)

274

}

275

```

276

277

## Performance Considerations

278

279

1. **Schema Complexity**: Complex nested schemas may impact query performance

280

2. **View Reuse**: Reuse views across multiple queries for efficiency

281

3. **Temporary Storage**: Temporary views don't persist data but may cache metadata

282

4. **Path Resolution**: Simple view names are resolved faster than complex catalog paths

283

284

## Best Practices

285

286

1. **Use createTemporaryView**: Prefer new API over deprecated registration methods

287

2. **Meaningful Names**: Use descriptive view names that reflect data content

288

3. **Schema Definition**: Define explicit schemas for production applications

289

4. **Namespace Organization**: Use catalog paths to organize views logically

290

5. **Cleanup**: Drop temporary views when no longer needed to avoid name conflicts

291

6. **Documentation**: Document view schemas and their intended usage

292

7. **Testing**: Test view creation and querying in development environments first