or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12

Table/SQL API bridge for Scala, enabling interaction between Table API and DataStream/DataSet APIs in Apache Flink

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala-bridge_2.12@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge-2-12@2.1.0

0

# Flink Table API Scala Bridge

1

2

The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and DataStream API for Scala developers. This bridge enables bidirectional conversion between DataStreams and Tables, allowing mixed declarative SQL operations with procedural stream processing in unified Scala applications.

3

4

## Package Information

5

6

- **Package Name**: flink-table-api-scala-bridge_2.12

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Maven Coordinates**: `org.apache.flink:flink-table-api-scala-bridge_2.12:2.1.0`

10

- **Installation**: Add to your `pom.xml` dependencies

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-table-api-scala-bridge_2.12</artifactId>

16

<version>2.1.0</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```scala

23

import org.apache.flink.table.api._

24

import org.apache.flink.table.api.bridge.scala._

25

import org.apache.flink.streaming.api.scala._

26

```

27

28

## Basic Usage

29

30

```scala

31

import org.apache.flink.table.api._

32

import org.apache.flink.table.api.bridge.scala._

33

import org.apache.flink.streaming.api.scala._

34

35

// Create execution environment and table environment

36

val env = StreamExecutionEnvironment.getExecutionEnvironment

37

val tableEnv = StreamTableEnvironment.create(env)

38

39

// Create a DataStream

40

val dataStream = env.fromElements(

41

("Alice", 25, "Engineer"),

42

("Bob", 30, "Manager"),

43

("Charlie", 35, "Developer")

44

)

45

46

// Convert DataStream to Table

47

val table = tableEnv.fromDataStream(dataStream, $"name", $"age", $"role")

48

49

// Perform SQL operations

50

val filteredTable = table.filter($"age" > 28)

51

52

// Convert back to DataStream

53

val resultStream = tableEnv.toDataStream(filteredTable)

54

55

// Execute the job

56

env.execute("Table Bridge Example")

57

```

58

59

## Architecture

60

61

The Flink Table API Scala Bridge is built around several key components:

62

63

- **StreamTableEnvironment**: Entry point for creating and managing table environments in streaming contexts

64

- **Conversion Classes**: `DataStreamConversions` and `TableConversions` for seamless type-safe conversions

65

- **Statement Management**: `StreamStatementSet` for batching multiple table operations for optimized execution

66

- **Implicit Conversions**: Scala-idiomatic conversion utilities available through package object

67

- **Schema Support**: Flexible schema definition and type mapping between Scala types and table schemas

68

69

## Capabilities

70

71

### Stream Table Environment

72

73

Core table environment for streaming applications, providing the entry point for all Table/SQL API operations integrated with DataStream processing.

74

75

```scala { .api }

76

trait StreamTableEnvironment extends TableEnvironment

77

78

object StreamTableEnvironment {

79

def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

80

def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment

81

}

82

```

83

84

[Stream Table Environment](./stream-table-environment.md)

85

86

### DataStream to Table Conversions

87

88

Convert DataStreams to Tables with automatic or custom schema derivation, supporting both regular streams and changelog streams.

89

90

```scala { .api }

91

class DataStreamConversions[T](dataStream: DataStream[T]) {

92

def toTable(tableEnv: StreamTableEnvironment): Table

93

def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

94

def toChangelogTable(tableEnv: StreamTableEnvironment): Table

95

}

96

```

97

98

[DataStream Conversions](./datastream-conversions.md)

99

100

### Table to DataStream Conversions

101

102

Convert Tables back to DataStreams with support for different output modes including insert-only and full changelog streams.

103

104

```scala { .api }

105

class TableConversions(table: Table) {

106

def toDataStream: DataStream[Row]

107

def toDataStream[T](targetClass: Class[T]): DataStream[T]

108

def toChangelogStream: DataStream[Row]

109

def toChangelogStream(targetSchema: Schema): DataStream[Row]

110

}

111

```

112

113

[Table Conversions](./table-conversions.md)

114

115

### Statement Set Operations

116

117

Batch multiple table operations together for optimized execution and resource management.

118

119

```scala { .api }

120

trait StreamStatementSet extends StatementSet {

121

def add(tablePipeline: TablePipeline): StreamStatementSet

122

def addInsertSql(statement: String): StreamStatementSet

123

def addInsert(targetPath: String, table: Table): StreamStatementSet

124

def attachAsDataStream(): Unit

125

}

126

```

127

128

[Statement Sets](./statement-sets.md)

129

130

### Implicit Conversions

131

132

Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs.

133

134

```scala { .api }

135

// Available implicit conversions from package object

136

implicit def tableConversions(table: Table): TableConversions

137

implicit def tableToChangelogDataStream(table: Table): DataStream[Row]

138

implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]

139

```

140

141

[Implicit Conversions](./implicit-conversions.md)

142

143

## Types

144

145

```scala { .api }

146

// Core Flink types used throughout the API

147

import org.apache.flink.streaming.api.datastream.DataStream

148

import org.apache.flink.table.api.Table

149

import org.apache.flink.table.api.Schema

150

import org.apache.flink.table.connector.ChangelogMode

151

import org.apache.flink.types.Row

152

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

153

import org.apache.flink.table.api.EnvironmentSettings

154

import org.apache.flink.table.expressions.Expression

155

import org.apache.flink.table.types.AbstractDataType

156

import org.apache.flink.api.common.typeinfo.TypeInformation

157

import org.apache.flink.table.api.{TableDescriptor, TablePipeline, ExplainDetail}

158

import org.apache.flink.types.RowKind

159

```

160

161

## Deprecation Notice

162

163

**Important**: All APIs in this module are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Consider migrating to the new unified Table API approach as documented in the Flink migration guide.