or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md

datastream-conversions.mddocs/

0

# DataStream Conversions

1

2

The DataStreamConversions class provides utilities for converting DataStreams to Tables with various schema and conversion options. It serves as a fluent wrapper around DataStream instances to enable easy table conversion.

3

4

## Capabilities

5

6

### DataStreamConversions Construction

7

8

Wrapper class for DataStream conversion operations.

9

10

```scala { .api }

11

/**

12

* Creates conversion utilities for a DataStream

13

* @param dataStream The DataStream to provide conversion methods for

14

*/

15

class DataStreamConversions[T](dataStream: DataStream[T])

16

```

17

18

This class is typically accessed through implicit conversions from the package object:

19

20

```scala

21

import org.apache.flink.table.api.bridge.scala._

22

23

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

24

// dataStream now has conversion methods available via implicit conversion

25

val table = dataStream.toTable(tableEnv)

26

```

27

28

### Table Conversion Methods

29

30

Convert DataStreams to Tables with automatic or custom schema derivation.

31

32

```scala { .api }

33

/**

34

* Convert to Table with auto-derived schema

35

* @param tableEnv The StreamTableEnvironment to use for conversion

36

* @return Table representation of the DataStream

37

*/

38

def toTable(tableEnv: StreamTableEnvironment): Table

39

40

/**

41

* Convert to Table with custom schema

42

* @param tableEnv The StreamTableEnvironment to use for conversion

43

* @param schema Custom schema definition

44

* @return Table representation of the DataStream

45

*/

46

def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

47

```

48

49

**Usage Examples:**

50

51

```scala

52

import org.apache.flink.table.api._

53

import org.apache.flink.table.api.bridge.scala._

54

import org.apache.flink.streaming.api.scala._

55

56

val env = StreamExecutionEnvironment.getExecutionEnvironment

57

val tableEnv = StreamTableEnvironment.create(env)

58

59

// Auto-derived schema

60

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

61

val table = dataStream.toTable(tableEnv)

62

63

// Custom schema

64

val schema = Schema.newBuilder()

65

.column("name", DataTypes.STRING())

66

.column("age", DataTypes.INT())

67

.build()

68

val tableWithSchema = dataStream.toTable(tableEnv, schema)

69

```

70

71

### Changelog Table Conversion

72

73

Convert changelog DataStreams to Tables for handling streams with insert/update/delete operations.

74

75

```scala { .api }

76

/**

77

* Convert changelog DataStream to Table

78

* @param tableEnv The StreamTableEnvironment to use for conversion

79

* @return Table representation of the changelog stream

80

*/

81

def toChangelogTable(tableEnv: StreamTableEnvironment): Table

82

83

/**

84

* Convert changelog DataStream to Table with custom schema

85

* @param tableEnv The StreamTableEnvironment to use for conversion

86

* @param schema Custom schema definition

87

* @return Table representation of the changelog stream

88

*/

89

def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table

90

91

/**

92

* Convert changelog DataStream to Table with custom schema and changelog mode

93

* @param tableEnv The StreamTableEnvironment to use for conversion

94

* @param schema Custom schema definition

95

* @param changelogMode Changelog mode configuration

96

* @return Table representation of the changelog stream

97

*/

98

def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): Table

99

```

100

101

**Usage Examples:**

102

103

```scala

104

import org.apache.flink.types.{Row, RowKind}

105

106

// Changelog stream with Row elements

107

val changelogStream = env.fromElements(

108

Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),

109

Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),

110

Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30))

111

)

112

113

// Convert to changelog table

114

val changelogTable = changelogStream.toChangelogTable(tableEnv)

115

116

// With custom schema

117

val schema = Schema.newBuilder()

118

.column("name", DataTypes.STRING())

119

.column("age", DataTypes.INT())

120

.build()

121

val changelogTableWithSchema = changelogStream.toChangelogTable(tableEnv, schema)

122

123

// With custom changelog mode

124

val changelogMode = ChangelogMode.insertOnly()

125

val changelogTableWithMode = changelogStream.toChangelogTable(tableEnv, schema, changelogMode)

126

```

127

128

### Temporary View Creation

129

130

Create temporary views from DataStreams for use in SQL queries.

131

132

```scala { .api }

133

/**

134

* Create temporary view from the DataStream

135

* @param tableEnv The StreamTableEnvironment to use for view creation

136

* @param path The view path/name

137

*/

138

def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit

139

140

/**

141

* Create temporary view from the DataStream with custom schema

142

* @param tableEnv The StreamTableEnvironment to use for view creation

143

* @param path The view path/name

144

* @param schema Custom schema definition

145

*/

146

def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): Unit

147

```

148

149

**Usage Examples:**

150

151

```scala

152

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

153

154

// Create temporary view

155

dataStream.createTemporaryView(tableEnv, "users")

156

157

// Create temporary view with custom schema

158

val schema = Schema.newBuilder()

159

.column("name", DataTypes.STRING())

160

.column("age", DataTypes.INT())

161

.build()

162

dataStream.createTemporaryView(tableEnv, "users_with_schema", schema)

163

164

// Now you can query the views with SQL

165

val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")

166

```

167

168

### Legacy Methods

169

170

Deprecated methods that should not be used in new code:

171

172

```scala { .api }

173

/**

174

* Convert with field expressions (legacy)

175

* @deprecated Use toTable with Schema instead

176

*/

177

@deprecated("Use toTable with Schema", "1.18.0")

178

def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table

179

```

180

181

The legacy method allows specifying field expressions directly, but the new approach using Schema is preferred:

182

183

```scala

184

// Legacy approach (deprecated)

185

val legacyTable = dataStream.toTable(tableEnv, $"name", $"age")

186

187

// Preferred approach

188

val schema = Schema.newBuilder()

189

.column("name", DataTypes.STRING())

190

.column("age", DataTypes.INT())

191

.build()

192

val preferredTable = dataStream.toTable(tableEnv, schema)

193

```

194

195

## Type Requirements

196

197

For DataStreamConversions to work properly, the DataStream element type `T` must be one of:

198

199

- **Scala case classes** with public fields

200

- **Scala Tuples** (up to Tuple22)

201

- **Row types** for changelog streams

202

- **POJOs** with public fields and default constructor

203

- **Basic types** (String, Int, Long, etc.)

204

205

The type information is automatically derived using Flink's TypeInformation system for Scala types.