or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

stream-table-environment.mddocs/

0

# Stream Table Environment

1

2

The StreamTableEnvironment is the entry point and central context for creating Table and SQL API programs that integrate with the Java DataStream API. It provides unified processing for both bounded and unbounded data.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating StreamTableEnvironment instances with optional configuration.

9

10

```java { .api }

11

/**

12

* Creates a table environment with default settings

13

* @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment

14

* @return StreamTableEnvironment instance

15

*/

16

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

17

18

/**

19

* Creates a table environment with custom settings

20

* @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment

21

* @param settings The EnvironmentSettings for configuration

22

* @return StreamTableEnvironment instance

23

*/

24

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

25

```

26

27

**Usage Examples:**

28

29

```java

30

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

31

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

32

import org.apache.flink.table.api.EnvironmentSettings;

33

34

// Basic environment creation

35

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

36

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

37

38

// Environment with custom settings

39

EnvironmentSettings settings = EnvironmentSettings.newInstance()

40

.inStreamingMode()

41

.build();

42

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

43

```

44

45

### DataStream to Table Conversion

46

47

Convert DataStreams into Tables with optional schema customization.

48

49

```java { .api }

50

/**

51

* Converts DataStream to Table with automatic schema derivation

52

* @param dataStream The DataStream to convert

53

* @return Table with derived schema

54

*/

55

<T> Table fromDataStream(DataStream<T> dataStream);

56

57

/**

58

* Converts DataStream to Table with custom schema

59

* @param dataStream The DataStream to convert

60

* @param schema Custom schema for the resulting table

61

* @return Table with specified schema

62

*/

63

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

64

```

65

66

**Usage Examples:**

67

68

```java

69

import org.apache.flink.streaming.api.datastream.DataStream;

70

import org.apache.flink.table.api.Table;

71

import org.apache.flink.table.api.Schema;

72

import org.apache.flink.types.Row;

73

74

// Automatic schema derivation

75

DataStream<Row> dataStream = env.fromElements(

76

Row.of("Alice", 25),

77

Row.of("Bob", 30)

78

);

79

Table table = tableEnv.fromDataStream(dataStream);

80

81

// Custom schema with renamed columns

82

Schema schema = Schema.newBuilder()

83

.column("name", "STRING")

84

.column("age", "INT")

85

.build();

86

Table customTable = tableEnv.fromDataStream(dataStream, schema);

87

88

// Schema with computed columns and watermarks

89

Schema advancedSchema = Schema.newBuilder()

90

.column("f0", "STRING")

91

.column("f1", "INT")

92

.columnByExpression("upper_name", "UPPER(f0)")

93

.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")

94

.watermark("rowtime", "SOURCE_WATERMARK()")

95

.build();

96

Table advancedTable = tableEnv.fromDataStream(dataStream, advancedSchema);

97

```

98

99

### Table to DataStream Conversion

100

101

Convert Tables back to DataStreams with type safety and optional type specification.

102

103

```java { .api }

104

/**

105

* Converts Table to DataStream<Row> for insert-only tables

106

* @param table The Table to convert (must be insert-only)

107

* @return DataStream of Row objects

108

*/

109

DataStream<Row> toDataStream(Table table);

110

111

/**

112

* Converts Table to typed DataStream for insert-only tables

113

* @param table The Table to convert (must be insert-only)

114

* @param targetClass Target class for type conversion

115

* @return Typed DataStream

116

*/

117

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

118

119

/**

120

* Converts Table to DataStream with specific data type

121

* @param table The Table to convert (must be insert-only)

122

* @param targetDataType Target data type specification

123

* @return Typed DataStream with specified data type

124

*/

125

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

126

```

127

128

**Usage Examples:**

129

130

```java

131

import org.apache.flink.streaming.api.datastream.DataStream;

132

import org.apache.flink.table.api.Table;

133

import org.apache.flink.table.api.DataTypes;

134

import org.apache.flink.types.Row;

135

136

// Convert to Row DataStream

137

Table sourceTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");

138

DataStream<Row> resultStream = tableEnv.toDataStream(sourceTable);

139

140

// Convert to POJO DataStream

141

public static class User {

142

public String name;

143

public Integer age;

144

// constructors, getters, setters...

145

}

146

147

DataStream<User> userStream = tableEnv.toDataStream(sourceTable, User.class);

148

149

// Convert with specific data type

150

DataStream<User> typedUserStream = tableEnv.toDataStream(

151

sourceTable,

152

DataTypes.of(User.class)

153

);

154

```

155

156

### Temporary View Creation

157

158

Create temporary views from DataStreams for SQL querying.

159

160

```java { .api }

161

/**

162

* Creates temporary view from DataStream with automatic schema

163

* @param path View path (catalog.database.view or database.view or view)

164

* @param dataStream The DataStream to create view from

165

*/

166

<T> void createTemporaryView(String path, DataStream<T> dataStream);

167

168

/**

169

* Creates temporary view from DataStream with custom schema

170

* @param path View path (catalog.database.view or database.view or view)

171

* @param dataStream The DataStream to create view from

172

* @param schema Custom schema for the view

173

*/

174

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

175

```

176

177

**Usage Examples:**

178

179

```java

180

// Create view with automatic schema

181

DataStream<Row> orderStream = env.fromElements(

182

Row.of("order1", "user1", 100.0),

183

Row.of("order2", "user2", 250.0)

184

);

185

tableEnv.createTemporaryView("orders", orderStream);

186

187

// Query the view with SQL

188

Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE f2 > 150");

189

190

// Create view with custom schema

191

Schema orderSchema = Schema.newBuilder()

192

.column("order_id", "STRING")

193

.column("user_id", "STRING")

194

.column("amount", "DOUBLE")

195

.build();

196

tableEnv.createTemporaryView("orders_named", orderStream, orderSchema);

197

198

// Query with named columns

199

Table namedResult = tableEnv.sqlQuery("SELECT user_id, amount FROM orders_named WHERE amount > 150");

200

```

201

202

### Statement Set Creation

203

204

Create StreamStatementSet for batch execution optimization.

205

206

```java { .api }

207

/**

208

* Creates StreamStatementSet for batch execution of multiple statements

209

* @return StreamStatementSet for adding multiple operations

210

*/

211

StreamStatementSet createStatementSet();

212

```

213

214

**Usage Examples:**

215

216

```java

217

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

218

219

// Create statement set for batch execution

220

StreamStatementSet statementSet = tableEnv.createStatementSet();

221

222

// Add multiple operations

223

statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");

224

statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");

225

statementSet.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));

226

227

// Execute all statements together

228

statementSet.execute();

229

```

230

231

## Types

232

233

### Schema Configuration

234

235

```java { .api }

236

import org.apache.flink.table.api.Schema;

237

import org.apache.flink.table.api.EnvironmentSettings;

238

import org.apache.flink.table.expressions.Expression;

239

```

240

241

### DataStream Integration

242

243

```java { .api }

244

import org.apache.flink.streaming.api.datastream.DataStream;

245

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

246

import org.apache.flink.api.common.typeinfo.TypeInformation;

247

import org.apache.flink.table.types.AbstractDataType;

248

import org.apache.flink.types.Row;

249

```

250

251

### Deprecated Methods

252

253

The following methods are deprecated and should be replaced with newer Schema-based alternatives:

254

255

```java { .api }

256

// Deprecated - use fromDataStream(DataStream<T>, Schema) instead

257

@Deprecated

258

<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

259

260

// Deprecated - use createTemporaryView(String, DataStream<T>, Schema) instead

261

@Deprecated

262

<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);

263

264

// Deprecated - use toDataStream(Table, Class<T>) instead

265

@Deprecated

266

<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

267

268

// Deprecated - use toDataStream(Table, Class<T>) instead

269

@Deprecated

270

<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

271

272

// Deprecated - use toChangelogStream(Table, Schema) instead

273

@Deprecated

274

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

275

276

// Deprecated - use toChangelogStream(Table, Schema) instead

277

@Deprecated

278

<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

279

```