or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md

implicit-conversions.mddocs/

0

# Implicit Conversions

1

2

The Flink Table API Scala Bridge provides Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs. These conversions are available through the package object and enable fluent, type-safe operations.

3

4

## Available Implicit Conversions

5

6

All implicit conversions are provided through the `org.apache.flink.table.api.bridge.scala` package object and are automatically available when you import the package.

7

8

### Import Statement

9

10

```scala

11

import org.apache.flink.table.api.bridge.scala._

12

```

13

14

This import makes all implicit conversions available in scope.

15

16

## Capabilities

17

18

### Table to TableConversions

19

20

Implicit conversion from Table to TableConversions for fluent DataStream conversion methods.

21

22

```scala { .api }

23

/**

24

* Conversions from Table to DataStream

25

* @param table The table to provide conversion methods for

26

* @return TableConversions wrapper with conversion methods

27

*/

28

implicit def tableConversions(table: Table): TableConversions

29

```

30

31

**Usage Example:**

32

33

```scala

34

import org.apache.flink.table.api._

35

import org.apache.flink.table.api.bridge.scala._

36

37

val table = tableEnv.fromDataStream(dataStream)

38

39

// These methods are available via implicit conversion:

40

val rowStream = table.toDataStream // Returns DataStream[Row]

41

val changelogStream = table.toChangelogStream // Returns DataStream[Row]

42

43

// Type-safe conversion

44

case class Person(name: String, age: Int)

45

val personStream = table.toDataStream(classOf[Person])

46

```

47

48

### Table to DataStream Direct Conversion

49

50

Direct implicit conversion from Table to changelog DataStream for the most common use case.

51

52

```scala { .api }

53

/**

54

* Conversions from Table to DataStream of changelog entries

55

* Provides direct conversion to changelog DataStream for convenience

56

* @param table The table to convert

57

* @return DataStream[Row] containing changelog entries

58

*/

59

implicit def tableToChangelogDataStream(table: Table): DataStream[Row]

60

```

61

62

**Usage Example:**

63

64

```scala

65

import org.apache.flink.table.api.bridge.scala._

66

67

val table = tableEnv.sqlQuery("SELECT name, COUNT(*) FROM events GROUP BY name")

68

69

// Direct implicit conversion to changelog stream

70

val changelogStream: DataStream[Row] = table

71

72

// Process the changelog stream

73

changelogStream.map { row =>

74

s"${row.getKind}: ${row.getField(0)} -> ${row.getField(1)}"

75

}

76

```

77

78

**Important Note:** This conversion only works with Tables that are part of a Scala StreamTableEnvironment. It will throw a ValidationException if used with tables from other environments.

79

80

### DataStream to DataStreamConversions

81

82

Implicit conversion from DataStream to DataStreamConversions for fluent Table conversion methods.

83

84

```scala { .api }

85

/**

86

* Conversions from DataStream to Table

87

* @param set The DataStream to provide conversion methods for

88

* @return DataStreamConversions wrapper with conversion methods

89

*/

90

implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]

91

```

92

93

**Usage Example:**

94

95

```scala

96

import org.apache.flink.streaming.api.scala._

97

import org.apache.flink.table.api.bridge.scala._

98

99

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

100

101

// These methods are available via implicit conversion:

102

val table = dataStream.toTable(tableEnv)

103

val tableWithSchema = dataStream.toTable(tableEnv, schema)

104

dataStream.createTemporaryView(tableEnv, "my_view")

105

106

// For changelog streams

107

val changelogStream = env.fromElements(

108

Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25))

109

)

110

val changelogTable = changelogStream.toChangelogTable(tableEnv)

111

```

112

113

## Complete Usage Examples

114

115

### Basic Conversion Example

116

117

```scala

118

import org.apache.flink.table.api._

119

import org.apache.flink.table.api.bridge.scala._

120

import org.apache.flink.streaming.api.scala._

121

122

val env = StreamExecutionEnvironment.getExecutionEnvironment

123

val tableEnv = StreamTableEnvironment.create(env)

124

125

// Create DataStream

126

val dataStream = env.fromElements(

127

("Alice", 25, "Engineer"),

128

("Bob", 30, "Manager"),

129

("Charlie", 35, "Developer")

130

)

131

132

// Convert to Table using implicit conversion

133

val table = dataStream.toTable(tableEnv)

134

135

// Apply table operations

136

val filteredTable = table.filter($"_2" > 28) // age > 28

137

138

// Convert back to DataStream using implicit conversion

139

val resultStream = filteredTable.toDataStream

140

141

// Or direct implicit conversion to changelog stream

142

val changelogResult: DataStream[Row] = filteredTable

143

144

resultStream.print("Results")

145

env.execute("Implicit Conversions Example")

146

```

147

148

### Advanced Schema Example

149

150

```scala

151

import org.apache.flink.table.api._

152

import org.apache.flink.table.api.bridge.scala._

153

import org.apache.flink.streaming.api.scala._

154

155

val env = StreamExecutionEnvironment.getExecutionEnvironment

156

val tableEnv = StreamTableEnvironment.create(env)

157

158

case class Employee(name: String, age: Int, department: String)

159

160

val dataStream = env.fromElements(

161

Employee("Alice", 25, "Engineering"),

162

Employee("Bob", 30, "Marketing"),

163

Employee("Charlie", 35, "Engineering")

164

)

165

166

// Define custom schema

167

val schema = Schema.newBuilder()

168

.column("name", DataTypes.STRING())

169

.column("age", DataTypes.INT())

170

.column("department", DataTypes.STRING())

171

.build()

172

173

// Convert with schema using implicit conversion

174

val table = dataStream.toTable(tableEnv, schema)

175

176

// Create temporary view using implicit conversion

177

dataStream.createTemporaryView(tableEnv, "employees", schema)

178

179

// Query the view

180

val engineeringTable = tableEnv.sqlQuery("""

181

SELECT name, age

182

FROM employees

183

WHERE department = 'Engineering' AND age > 30

184

""")

185

186

// Convert back to typed DataStream using implicit conversion

187

val engineeringStream = engineeringTable.toDataStream(classOf[Employee])

188

189

engineeringStream.print("Engineering Results")

190

env.execute("Schema Example")

191

```

192

193

### Changelog Processing Example

194

195

```scala

196

import org.apache.flink.table.api._

197

import org.apache.flink.table.api.bridge.scala._

198

import org.apache.flink.streaming.api.scala._

199

import org.apache.flink.types.{Row, RowKind}

200

201

val env = StreamExecutionEnvironment.getExecutionEnvironment

202

val tableEnv = StreamTableEnvironment.create(env)

203

204

// Create changelog stream

205

val changelogStream = env.fromElements(

206

Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),

207

Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),

208

Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30)),

209

Row.of(RowKind.INSERT, "Charlie", Integer.valueOf(35))

210

)

211

212

// Convert to table using implicit conversion

213

val changelogTable = changelogStream.toChangelogTable(tableEnv)

214

215

// Apply aggregation (will produce more changelog events)

216

val aggregatedTable = changelogTable

217

.groupBy($"f1" >= 30) // group by age >= 30

218

.select($"f1" >= 30 as "age_group", $"f0".count() as "count")

219

220

// Convert back to changelog stream using direct implicit conversion

221

val resultChangelogStream: DataStream[Row] = aggregatedTable

222

223

// Process changelog events

224

resultChangelogStream.map { row =>

225

val kind = row.getKind

226

val ageGroup = row.getField(0).asInstanceOf[Boolean]

227

val count = row.getField(1).asInstanceOf[Long]

228

229

val group = if (ageGroup) "30+" else "under 30"

230

s"$kind: $group has $count people"

231

}.print("Changelog Results")

232

233

env.execute("Changelog Example")

234

```

235

236

## Error Handling

237

238

### ValidationException for Invalid Conversions

239

240

The direct `tableToChangelogDataStream` implicit conversion performs validation:

241

242

```scala

243

try {

244

val invalidTable = batchTableEnv.fromValues(1, 2, 3) // Not from StreamTableEnvironment

245

val stream: DataStream[Row] = invalidTable // This will throw ValidationException

246

} catch {

247

case e: ValidationException =>

248

println(s"Cannot convert table: ${e.getMessage}")

249

}

250

```

251

252

### Type Safety

253

254

Implicit conversions maintain type safety:

255

256

```scala

257

val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25))

258

val table = dataStream.toTable(tableEnv) // Type information preserved

259

260

// This will work:

261

case class Person(name: String, age: Int)

262

val personStream = table.toDataStream(classOf[Person])

263

264

// This will fail at runtime if types don't match:

265

case class WrongType(id: Int, value: String)

266

// val wrongStream = table.toDataStream(classOf[WrongType]) // Runtime error

267

```

268

269

## Best Practices

270

271

1. **Always import the package**: `import org.apache.flink.table.api.bridge.scala._`

272

2. **Use type-safe conversions**: Prefer strongly-typed conversions when possible

273

3. **Handle changelog appropriately**: Use changelog streams for operations that produce updates

274

4. **Schema compatibility**: Ensure target types match table schemas

275

5. **Environment consistency**: Keep tables and streams within the same environment type