or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12

Apache Flink Table API Scala Bridge provides seamless integration between Flink's Table/SQL API and Scala-specific DataStream operations for stream processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala-bridge_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12@1.20.0

0

# Flink Table API Scala Bridge

1

2

## Overview

3

4

The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and the Scala-specific DataStream API. This bridge library enables developers to convert DataStreams to Tables and vice-versa while leveraging Scala's type system and functional programming paradigms.

5

6

**Key Features:**

7

- Bidirectional conversion between DataStream and Table APIs

8

- Support for both bounded and unbounded data processing

9

- Integration with Flink's SQL engine for complex queries

10

- Scala-idiomatic APIs with implicit conversions

11

- Event-time processing and watermark propagation

12

- Changelog stream processing for updating tables

13

14

**⚠️ Deprecation Notice:** All Flink Scala APIs are deprecated as of version 1.18.0 and will be removed in a future major version. Users should migrate to the Java APIs. See [FLIP-265](https://s.apache.org/flip-265) for details.

15

16

## Package Information

17

18

- **Package:** `org.apache.flink:flink-table-api-scala-bridge_2.12`

19

- **Version:** 1.20.2

20

- **Language:** Scala 2.12

21

- **License:** Apache-2.0

22

23

### Installation

24

25

Add to your `build.sbt`:

26

27

```scala

28

libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.20.2"

29

```

30

31

Or in Maven `pom.xml`:

32

33

```xml

34

<dependency>

35

<groupId>org.apache.flink</groupId>

36

<artifactId>flink-table-api-scala-bridge_2.12</artifactId>

37

<version>1.20.2</version>

38

</dependency>

39

```

40

41

## Core Imports

42

43

```scala

44

// Essential imports for Table API and DataStream integration

45

import org.apache.flink.table.api._

46

import org.apache.flink.table.api.bridge.scala._

47

import org.apache.flink.streaming.api.scala._

48

49

// Common type imports

50

import org.apache.flink.types.Row

51

import org.apache.flink.table.types.DataType

52

import org.apache.flink.table.connector.ChangelogMode

53

```

54

55

## Basic Usage

56

57

### Environment Setup

58

59

```scala

60

import org.apache.flink.streaming.api.scala._

61

import org.apache.flink.table.api.bridge.scala._

62

63

// Create execution environment

64

val env = StreamExecutionEnvironment.getExecutionEnvironment

65

val tableEnv = StreamTableEnvironment.create(env)

66

```

67

68

### DataStream to Table Conversion

69

70

```scala

71

case class User(name: String, age: Int)

72

73

val users: DataStream[User] = env.fromCollection(Seq(

74

User("Alice", 25),

75

User("Bob", 30)

76

))

77

78

// Convert DataStream to Table (automatic schema derivation)

79

val userTable: Table = tableEnv.fromDataStream(users)

80

81

// Or use implicit conversion

82

import org.apache.flink.table.api.bridge.scala._

83

val userTable2: Table = users.toTable(tableEnv)

84

```

85

86

### Table to DataStream Conversion

87

88

```scala

89

// Convert Table back to DataStream

90

val resultStream: DataStream[Row] = tableEnv.toDataStream(userTable)

91

92

// Or use implicit conversion

93

val resultStream2: DataStream[Row] = userTable.toDataStream

94

```

95

96

### SQL Operations

97

98

```scala

99

// Register table for SQL queries

100

tableEnv.createTemporaryView("users", userTable)

101

102

// Execute SQL query

103

val sqlResult: Table = tableEnv.sqlQuery(

104

"SELECT name, age FROM users WHERE age > 25"

105

)

106

107

val resultStream: DataStream[Row] = tableEnv.toDataStream(sqlResult)

108

```

109

110

## Architecture

111

112

The Flink Table API Scala Bridge consists of several key components:

113

114

- **StreamTableEnvironment**: Main entry point for table operations with DataStream integration

115

- **Conversion Utilities**: Classes providing DataStream ↔ Table conversion methods

116

- **Implicit Conversions**: Package-level implicits for seamless API integration

117

- **Statement Sets**: Batch execution of multiple table operations

118

- **Schema System**: Type-safe schema definitions and transformations

119

120

## Capabilities

121

122

### [Environment and Setup](./environment-setup.md)

123

Configure and create StreamTableEnvironment instances with various settings and execution modes.

124

125

```scala { .api }

126

object StreamTableEnvironment {

127

def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

128

def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment

129

}

130

```

131

132

### [DataStream Integration](./datastream-integration.md)

133

Convert between DataStreams and Tables with automatic schema derivation and custom schema definitions.

134

135

```scala { .api }

136

trait StreamTableEnvironment {

137

def fromDataStream[T](dataStream: DataStream[T]): Table

138

def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

139

def toDataStream(table: Table): DataStream[Row]

140

def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

141

}

142

```

143

144

### [Changelog Processing](./changelog-processing.md)

145

Handle updating tables and changelog streams for complex event processing scenarios.

146

147

```scala { .api }

148

trait StreamTableEnvironment {

149

def fromChangelogStream(dataStream: DataStream[Row]): Table

150

def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table

151

def toChangelogStream(table: Table): DataStream[Row]

152

def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]

153

}

154

```

155

156

### [Table Operations](./table-operations.md)

157

Create, register, and manage tables and views within the table environment.

158

159

```scala { .api }

160

trait StreamTableEnvironment {

161

def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit

162

def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit

163

}

164

```

165

166

### [Statement Sets](./statement-sets.md)

167

Batch multiple table operations together for optimized execution planning.

168

169

```scala { .api }

170

trait StreamTableEnvironment {

171

def createStatementSet(): StreamStatementSet

172

}

173

174

trait StreamStatementSet {

175

def addInsert(targetPath: String, table: Table): StreamStatementSet

176

def addInsertSql(statement: String): StreamStatementSet

177

def execute(): TableResult

178

}

179

```

180

181

### [Implicit Conversions](./implicit-conversions.md)

182

Package-level implicit conversions for seamless integration between DataStream and Table APIs.

183

184

```scala { .api }

185

package object scala {

186

implicit def tableConversions(table: Table): TableConversions

187

implicit def dataStreamConversions[T](dataStream: DataStream[T]): DataStreamConversions[T]

188

implicit def tableToChangelogDataStream(table: Table): DataStream[Row]

189

}

190

```

191

192

## Common Types

193

194

```scala { .api }

195

// Core Flink types

196

import org.apache.flink.streaming.api.scala.DataStream

197

import org.apache.flink.table.api.Table

198

import org.apache.flink.types.Row

199

200

// Schema and type system

201

import org.apache.flink.table.api.Schema

202

import org.apache.flink.table.types.DataType

203

import org.apache.flink.table.types.AbstractDataType

204

205

// Execution environment

206

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

207

import org.apache.flink.table.api.EnvironmentSettings

208

209

// Changelog processing

210

import org.apache.flink.table.connector.ChangelogMode

211

import org.apache.flink.types.RowKind

212

```

213

214

## Error Handling

215

216

The bridge API throws several types of exceptions:

217

218

- **ValidationException**: Invalid operations or incompatible types

219

- **TableException**: General table processing errors

220

- **UnsupportedOperationException**: Operations not supported in streaming mode

221

222

```scala

223

try {

224

val table = tableEnv.fromDataStream(dataStream)

225

val result = tableEnv.toDataStream(table)

226

} catch {

227

case e: ValidationException => // Handle validation errors

228

case e: TableException => // Handle table processing errors

229

}

230

```

231

232

## Migration Guide

233

234

Since this API is deprecated, consider migrating to the Java Table API:

235

236

```java

237

// Java equivalent

238

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

239

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

240

241

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

242

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

243

```