or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md

table-conversions.mddocs/

0

# Table Conversions

1

2

The TableConversions class provides utilities for converting Tables back to DataStreams with support for different output modes, type specifications, and changelog handling. It serves as a fluent wrapper around Table instances.

3

4

## Capabilities

5

6

### TableConversions Construction

7

8

Wrapper class for Table conversion operations.

9

10

```scala { .api }

11

/**

12

* Creates conversion utilities for a Table

13

* @param table The Table to provide conversion methods for

14

*/

15

class TableConversions(table: Table)

16

```

17

18

This class is typically accessed through implicit conversions from the package object:

19

20

```scala

21

import org.apache.flink.table.api.bridge.scala._

22

23

val table = tableEnv.fromDataStream(dataStream)

24

// table now has conversion methods available via implicit conversion

25

val resultStream = table.toDataStream

26

```

27

28

### DataStream Conversion Methods

29

30

Convert Tables to DataStreams for insert-only operations with different type specifications.

31

32

```scala { .api }

33

/**

34

* Convert insert-only Table to DataStream of Row

35

* @return DataStream containing Row elements

36

*/

37

def toDataStream: DataStream[Row]

38

39

/**

40

* Convert insert-only Table to DataStream of specified class

41

* @param targetClass Target class for the DataStream elements

42

* @return DataStream containing elements of the specified class

43

*/

44

def toDataStream[T](targetClass: Class[T]): DataStream[T]

45

46

/**

47

* Convert insert-only Table to DataStream of specified data type

48

* @param targetDataType Target data type specification

49

* @return DataStream containing elements of the specified data type

50

*/

51

def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]

52

```

53

54

**Usage Examples:**

55

56

```scala

57

import org.apache.flink.table.api._

58

import org.apache.flink.table.api.bridge.scala._

59

import org.apache.flink.streaming.api.scala._

60

61

val env = StreamExecutionEnvironment.getExecutionEnvironment

62

val tableEnv = StreamTableEnvironment.create(env)

63

64

// Create a table

65

val table = tableEnv.fromDataStream(env.fromElements(("Alice", 25), ("Bob", 30)))

66

67

// Convert to Row DataStream

68

val rowStream = table.toDataStream

69

70

// Convert to typed DataStream using case class

71

case class Person(name: String, age: Int)

72

val personStream = table.toDataStream(classOf[Person])

73

74

// Convert using data type specification

75

val typedStream = table.toDataStream[Person](DataTypes.STRUCTURED(classOf[Person]))

76

```

77

78

### Changelog Stream Conversion

79

80

Convert Tables to changelog DataStreams for handling streams with insert/update/delete operations.

81

82

```scala { .api }

83

/**

84

* Convert Table to changelog DataStream

85

* @return DataStream containing Row elements with changelog information

86

*/

87

def toChangelogStream: DataStream[Row]

88

89

/**

90

* Convert Table to changelog DataStream with custom schema

91

* @param targetSchema Custom schema for the output stream

92

* @return DataStream containing Row elements with changelog information

93

*/

94

def toChangelogStream(targetSchema: Schema): DataStream[Row]

95

96

/**

97

* Convert Table to changelog DataStream with custom schema and changelog mode

98

* @param targetSchema Custom schema for the output stream

99

* @param changelogMode Changelog mode configuration

100

* @return DataStream containing Row elements with changelog information

101

*/

102

def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

103

```

104

105

**Usage Examples:**

106

107

```scala

108

import org.apache.flink.types.{Row, RowKind}

109

110

// Convert to basic changelog stream

111

val changelogStream = table.toChangelogStream

112

113

// Process changelog stream

114

changelogStream.map { row =>

115

val kind = row.getKind

116

val name = row.getField(0).toString

117

val age = row.getField(1).asInstanceOf[Int]

118

119

kind match {

120

case RowKind.INSERT => s"Added: $name, $age"

121

case RowKind.UPDATE_AFTER => s"Updated: $name, $age"

122

case RowKind.DELETE => s"Deleted: $name, $age"

123

case _ => s"Other: $name, $age"

124

}

125

}

126

127

// Convert with custom schema

128

val targetSchema = Schema.newBuilder()

129

.column("name", DataTypes.STRING())

130

.column("age", DataTypes.INT())

131

.build()

132

val changelogStreamWithSchema = table.toChangelogStream(targetSchema)

133

134

// Convert with custom changelog mode

135

val changelogMode = ChangelogMode.insertOnly()

136

val changelogStreamWithMode = table.toChangelogStream(targetSchema, changelogMode)

137

```

138

139

### Legacy Methods

140

141

Deprecated methods that should not be used in new code:

142

143

```scala { .api }

144

/**

145

* Convert to append-only stream (deprecated)

146

* @deprecated Use toDataStream instead

147

*/

148

@deprecated("Use toDataStream", "1.18.0")

149

def toAppendStream[T: TypeInformation]: DataStream[T]

150

151

/**

152

* Convert to retract stream (deprecated)

153

* @deprecated Use toChangelogStream instead

154

*/

155

@deprecated("Use toChangelogStream", "1.18.0")

156

def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)]

157

```

158

159

**Migration Examples:**

160

161

```scala

162

// Legacy approach (deprecated)

163

val appendStream = table.toAppendStream[Person]

164

val retractStream = table.toRetractStream[Person]

165

166

// Preferred approach

167

val insertOnlyStream = table.toDataStream(classOf[Person])

168

val changelogStream = table.toChangelogStream

169

170

// Process changelog stream to get retract-style tuples if needed

171

val retractStyleStream = changelogStream.map { row =>

172

val isInsert = row.getKind == RowKind.INSERT || row.getKind == RowKind.UPDATE_AFTER

173

val person = Person(row.getField(0).toString, row.getField(1).asInstanceOf[Int])

174

(isInsert, person)

175

}

176

```

177

178

## Insert-Only vs Changelog Streams

179

180

### Insert-Only Streams

181

182

Use `toDataStream` methods when:

183

- Your table only contains INSERT operations

184

- You're working with bounded data or append-only streams

185

- You don't need to handle updates or deletes

186

187

```scala

188

// Good for append-only scenarios

189

val insertOnlyTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18")

190

val resultStream = insertOnlyTable.toDataStream(classOf[Person])

191

```

192

193

### Changelog Streams

194

195

Use `toChangelogStream` methods when:

196

- Your table may contain UPDATE or DELETE operations

197

- You're working with aggregations or joins that produce updates

198

- You need to handle the full lifecycle of data changes

199

200

```scala

201

// Good for aggregation scenarios

202

val aggregatedTable = tableEnv.sqlQuery("SELECT name, COUNT(*) as count FROM events GROUP BY name")

203

val changelogStream = aggregatedTable.toChangelogStream

204

205

changelogStream.map { row =>

206

row.getKind match {

207

case RowKind.INSERT => s"New group: ${row.getField(0)}"

208

case RowKind.UPDATE_AFTER => s"Updated count for: ${row.getField(0)}"

209

case _ => s"Other change for: ${row.getField(0)}"

210

}

211

}

212

```

213

214

## Type Conversion Requirements

215

216

For successful Table to DataStream conversion:

217

218

- **Target types** must be supported by Flink's type system

219

- **Schema compatibility** between table and target type is required

220

- **Changelog mode compatibility** must match the table's characteristics

221

- **Null handling** should be considered for nullable fields

222

223

Common target types include:

224

- **Row**: Universal type that can represent any table schema

225

- **Case classes**: Type-safe conversion for structured data

226

- **Tuples**: Simple conversion for tuple-like data

227

- **POJOs**: Java bean-style classes with getters/setters