or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md

bridge.mddocs/

0

# DataStream Integration (Bridge API)

1

2

The bridge API provides seamless integration between Flink's Table API and DataStream API, enabling conversion between Table and DataStream objects and streaming-specific table operations.

3

4

## Core Classes

5

6

### StreamTableEnvironment

7

8

The main entry point for DataStream-Table integration, providing methods to convert between DataStream and Table objects.

9

10

```scala { .api }

11

trait StreamTableEnvironment extends TableEnvironment {

12

// DataStream to Table conversion

13

def fromDataStream[T](dataStream: DataStream[T]): Table

14

def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

15

def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table

16

17

// Table to DataStream conversion

18

def toDataStream(table: Table): DataStream[Row]

19

def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

20

def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]

21

22

// Changelog stream conversion

23

def toChangelogStream(table: Table): DataStream[Row]

24

def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]

25

def toChangelogStream(

26

table: Table,

27

targetDataType: AbstractDataType[_],

28

changelogMode: ChangelogMode

29

): DataStream[Row]

30

}

31

```

32

33

### Factory Methods

34

35

```scala { .api }

36

object StreamTableEnvironment {

37

// Create from existing StreamExecutionEnvironment

38

def create(

39

executionEnvironment: StreamExecutionEnvironment

40

): StreamTableEnvironment

41

42

def create(

43

executionEnvironment: StreamExecutionEnvironment,

44

settings: EnvironmentSettings

45

): StreamTableEnvironment

46

47

def create(settings: EnvironmentSettings): StreamTableEnvironment

48

}

49

```

50

51

## Implicit Conversion Classes

52

53

### TableConversions

54

55

Provides implicit conversion methods for Table objects to DataStream.

56

57

```scala { .api }

58

class TableConversions(table: Table) {

59

/**

60

* Converts the Table to a DataStream of Row objects.

61

* Equivalent to StreamTableEnvironment.toDataStream(table)

62

*/

63

def toDataStream(): DataStream[Row]

64

65

/**

66

* Converts the Table to a typed DataStream.

67

* Equivalent to StreamTableEnvironment.toDataStream(table, targetClass)

68

*/

69

def toDataStream[T](targetClass: Class[T]): DataStream[T]

70

71

/**

72

* Converts the Table to a typed DataStream with specified data type.

73

* Equivalent to StreamTableEnvironment.toDataStream(table, targetDataType)

74

*/

75

def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]

76

77

/**

78

* Converts the Table to a changelog DataStream.

79

* Equivalent to StreamTableEnvironment.toChangelogStream(table)

80

*/

81

def toChangelogStream(): DataStream[Row]

82

83

/**

84

* Converts the Table to a changelog DataStream with schema.

85

* Equivalent to StreamTableEnvironment.toChangelogStream(table, targetSchema)

86

*/

87

def toChangelogStream(targetSchema: Schema): DataStream[Row]

88

89

/**

90

* Converts the Table to a changelog DataStream with data type and changelog mode.

91

*/

92

def toChangelogStream(

93

targetDataType: AbstractDataType[_],

94

changelogMode: ChangelogMode

95

): DataStream[Row]

96

}

97

```

98

99

### DataStreamConversions

100

101

Provides implicit conversion methods for DataStream objects to Table.

102

103

```scala { .api }

104

class DataStreamConversions[T](dataStream: DataStream[T]) {

105

/**

106

* Converts the DataStream to a Table.

107

* Equivalent to StreamTableEnvironment.fromDataStream(dataStream)

108

*/

109

def toTable()(implicit tEnv: StreamTableEnvironment): Table

110

111

/**

112

* Converts the DataStream to a Table with schema.

113

* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, schema)

114

*/

115

def toTable(schema: Schema)(implicit tEnv: StreamTableEnvironment): Table

116

117

/**

118

* Converts the DataStream to a Table with field expressions.

119

* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, fields)

120

*/

121

def toTable(fields: Expression*)(implicit tEnv: StreamTableEnvironment): Table

122

}

123

```

124

125

## Package Object Implicits

126

127

The `org.apache.flink.table.api.bridge.scala` package object provides automatic implicit conversions:

128

129

```scala { .api }

130

package object scala {

131

// Automatic Table to TableConversions

132

implicit def tableConversions(table: Table): TableConversions

133

134

// Automatic Table to DataStream[Row] conversion

135

implicit def tableToChangelogDataStream(table: Table): DataStream[Row]

136

137

// Automatic DataStream to DataStreamConversions

138

implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]

139

}

140

```

141

142

## Usage Examples

143

144

### Environment Setup

145

```scala

146

import org.apache.flink.streaming.api.scala._

147

import org.apache.flink.table.api._

148

import org.apache.flink.table.api.bridge.scala._

149

150

val env = StreamExecutionEnvironment.getExecutionEnvironment

151

val tableEnv = StreamTableEnvironment.create(env)

152

```

153

154

### DataStream to Table Conversion

155

```scala

156

case class Order(id: Int, product: String, amount: Double)

157

158

val orders: DataStream[Order] = env.fromElements(

159

Order(1, "laptop", 999.99),

160

Order(2, "mouse", 29.99)

161

)

162

163

// Direct conversion

164

val ordersTable = tableEnv.fromDataStream(orders)

165

166

// With schema

167

val ordersTableWithSchema = tableEnv.fromDataStream(

168

orders,

169

Schema.newBuilder()

170

.column("id", DataTypes.INT())

171

.column("product", DataTypes.STRING())

172

.column("amount", DataTypes.DOUBLE())

173

.build()

174

)

175

176

// Using implicit conversion

177

val ordersTableImplicit = orders.toTable()

178

```

179

180

### Table to DataStream Conversion

181

```scala

182

val processedTable = ordersTable

183

.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")

184

.where($"amount" > 50.0)

185

186

// Direct conversion

187

val resultStream: DataStream[Row] = tableEnv.toDataStream(processedTable)

188

189

// Typed conversion

190

val typedStream: DataStream[Order] = tableEnv.toDataStream(processedTable, classOf[Order])

191

192

// Using implicit conversion

193

val implicitStream: DataStream[Row] = processedTable.toDataStream()

194

```

195

196

### Changelog Streams

197

```scala

198

// For tables with updates/deletes

199

val changelogStream = tableEnv.toChangelogStream(processedTable)

200

201

// Process changelog entries

202

changelogStream.process(new ProcessFunction[Row, String] {

203

override def processElement(

204

value: Row,

205

ctx: ProcessFunction[Row, String]#Context,

206

out: Collector[String]

207

): Unit = {

208

val rowKind = value.getKind

209

val data = value.toString

210

out.collect(s"$rowKind: $data")

211

}

212

})

213

```

214

215

## Integration Patterns

216

217

### Hybrid Processing Pipeline

218

```scala

219

// DataStream processing

220

val rawStream = env.addSource(new MySourceFunction())

221

val cleanedStream = rawStream.filter(_.isValid)

222

223

// Convert to Table for SQL processing

224

val cleanedTable = tableEnv.fromDataStream(cleanedStream)

225

val aggregatedTable = tableEnv.sqlQuery(

226

"SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amount " +

227

"FROM " + cleanedTable + " " +

228

"GROUP BY category"

229

)

230

231

// Convert back to DataStream for further processing

232

val aggregatedStream = tableEnv.toDataStream(aggregatedTable)

233

aggregatedStream.addSink(new MySinkFunction())

234

```

235

236

## Notes

237

238

- All bridge API components are marked as `@deprecated` as part of FLIP-265

239

- Implicit conversions are automatically available when importing the bridge package

240

- Schema inference works automatically for case classes and basic types

241

- Changelog streams preserve Row-level change information (INSERT, UPDATE, DELETE)

242

- The bridge package requires both table and streaming dependencies