or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

datastream-conversions.mdimplicit-conversions.mdindex.mdstatement-sets.mdstream-table-environment.mdtable-conversions.md

stream-table-environment.mddocs/

0

# Stream Table Environment

1

2

The StreamTableEnvironment is the central entry point for creating Table and SQL API programs that integrate with Flink's DataStream API in Scala. It provides unified processing for both bounded and unbounded data streams.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating StreamTableEnvironment instances with default or custom settings.

9

10

```scala { .api }

11

object StreamTableEnvironment {

12

/**

13

* Creates a StreamTableEnvironment with default settings

14

* @param executionEnvironment The StreamExecutionEnvironment to use

15

* @return A new StreamTableEnvironment instance

16

*/

17

def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

18

19

/**

20

* Creates a StreamTableEnvironment with custom settings

21

* @param executionEnvironment The StreamExecutionEnvironment to use

22

* @param settings Custom environment settings

23

* @return A new StreamTableEnvironment instance

24

*/

25

def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment

26

}

27

```

28

29

**Usage Example:**

30

31

```scala

32

import org.apache.flink.streaming.api.scala._

33

import org.apache.flink.table.api._

34

import org.apache.flink.table.api.bridge.scala._

35

36

val env = StreamExecutionEnvironment.getExecutionEnvironment

37

val tableEnv = StreamTableEnvironment.create(env)

38

39

// With custom settings

40

val settings = EnvironmentSettings.newInstance()

41

.useBlinkPlanner()

42

.inStreamingMode()

43

.build()

44

val customTableEnv = StreamTableEnvironment.create(env, settings)

45

```

46

47

### DataStream to Table Conversion

48

49

Convert DataStreams to Tables with automatic schema derivation or custom schemas.

50

51

```scala { .api }

52

/**

53

* Convert DataStream to Table with auto-derived schema

54

* @param dataStream The DataStream to convert

55

* @return Table representation of the DataStream

56

*/

57

def fromDataStream[T](dataStream: DataStream[T]): Table

58

59

/**

60

* Convert DataStream to Table with custom schema

61

* @param dataStream The DataStream to convert

62

* @param schema Custom schema definition

63

* @return Table representation of the DataStream

64

*/

65

def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

66

67

/**

68

* Convert changelog DataStream to Table

69

* @param dataStream Changelog DataStream with Row elements

70

* @return Table representation of the changelog stream

71

*/

72

def fromChangelogStream(dataStream: DataStream[Row]): Table

73

74

/**

75

* Convert changelog DataStream to Table with custom schema

76

* @param dataStream Changelog DataStream with Row elements

77

* @param schema Custom schema definition

78

* @return Table representation of the changelog stream

79

*/

80

def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table

81

82

/**

83

* Convert changelog DataStream to Table with custom schema and changelog mode

84

* @param dataStream Changelog DataStream with Row elements

85

* @param schema Custom schema definition

86

* @param changelogMode Changelog mode configuration

87

* @return Table representation of the changelog stream

88

*/

89

def fromChangelogStream(dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table

90

```

91

92

**Usage Examples:**

93

94

```scala

95

// Auto-derived schema

96

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

97

val table = tableEnv.fromDataStream(dataStream)

98

99

// Custom schema

100

val schema = Schema.newBuilder()

101

.column("name", DataTypes.STRING())

102

.column("age", DataTypes.INT())

103

.build()

104

val tableWithSchema = tableEnv.fromDataStream(dataStream, schema)

105

106

// Changelog stream

107

val changelogStream = env.fromElements(

108

Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),

109

Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26))

110

)

111

val changelogTable = tableEnv.fromChangelogStream(changelogStream)

112

```

113

114

### Table to DataStream Conversion

115

116

Convert Tables back to DataStreams with different output modes and type specifications.

117

118

```scala { .api }

119

/**

120

* Convert insert-only Table to DataStream of Row

121

* @param table The Table to convert

122

* @return DataStream containing Row elements

123

*/

124

def toDataStream(table: Table): DataStream[Row]

125

126

/**

127

* Convert insert-only Table to DataStream of specified class

128

* @param table The Table to convert

129

* @param targetClass Target class for the DataStream elements

130

* @return DataStream containing elements of the specified class

131

*/

132

def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

133

134

/**

135

* Convert insert-only Table to DataStream of specified data type

136

* @param table The Table to convert

137

* @param targetDataType Target data type specification

138

* @return DataStream containing elements of the specified data type

139

*/

140

def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]

141

142

/**

143

* Convert Table to changelog DataStream

144

* @param table The Table to convert

145

* @return DataStream containing Row elements with changelog information

146

*/

147

def toChangelogStream(table: Table): DataStream[Row]

148

149

/**

150

* Convert Table to changelog DataStream with custom schema

151

* @param table The Table to convert

152

* @param targetSchema Custom schema for the output stream

153

* @return DataStream containing Row elements with changelog information

154

*/

155

def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]

156

157

/**

158

* Convert Table to changelog DataStream with custom schema and changelog mode

159

* @param table The Table to convert

160

* @param targetSchema Custom schema for the output stream

161

* @param changelogMode Changelog mode configuration

162

* @return DataStream containing Row elements with changelog information

163

*/

164

def toChangelogStream(table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]

165

```

166

167

**Usage Examples:**

168

169

```scala

170

// Convert to Row DataStream

171

val rowStream = tableEnv.toDataStream(table)

172

173

// Convert to typed DataStream

174

case class Person(name: String, age: Int)

175

val personStream = tableEnv.toDataStream(table, classOf[Person])

176

177

// Convert to changelog stream

178

val changelogStream = tableEnv.toChangelogStream(table)

179

```

180

181

### View Creation

182

183

Create temporary views from DataStreams for use in SQL queries.

184

185

```scala { .api }

186

/**

187

* Create temporary view from DataStream

188

* @param path The view path/name

189

* @param dataStream The DataStream to create view from

190

*/

191

def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit

192

193

/**

194

* Create temporary view from DataStream with custom schema

195

* @param path The view path/name

196

* @param dataStream The DataStream to create view from

197

* @param schema Custom schema definition

198

*/

199

def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit

200

```

201

202

**Usage Example:**

203

204

```scala

205

val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))

206

tableEnv.createTemporaryView("users", dataStream)

207

208

// Now you can query the view with SQL

209

val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")

210

```

211

212

### Statement Set Creation

213

214

Create statement sets for batching multiple table operations.

215

216

```scala { .api }

217

/**

218

* Create statement set for batch operations

219

* @return A new StreamStatementSet instance

220

*/

221

def createStatementSet(): StreamStatementSet

222

```

223

224

**Usage Example:**

225

226

```scala

227

val statementSet = tableEnv.createStatementSet()

228

statementSet

229

.addInsert("sink_table_1", table1)

230

.addInsert("sink_table_2", table2)

231

.attachAsDataStream()

232

```

233

234

### Legacy Deprecated Methods

235

236

These methods are deprecated and should not be used in new code:

237

238

```scala { .api }

239

// Deprecated - use fromDataStream with Schema instead

240

@deprecated("Use fromDataStream with Schema", "1.18.0")

241

def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table

242

243

// Deprecated - use createTemporaryView with Schema instead

244

@deprecated("Use createTemporaryView with Schema", "1.18.0")

245

def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit

246

247

// Deprecated - use toDataStream instead

248

@deprecated("Use toDataStream", "1.18.0")

249

def toAppendStream[T: TypeInformation](table: Table): DataStream[T]

250

251

// Deprecated - use toChangelogStream instead

252

@deprecated("Use toChangelogStream", "1.18.0")

253

def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]

254

```