or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

stream-table-environment.mddocs/

0

# Stream Table Environment

1

2

The `StreamTableEnvironment` is the main entry point for creating and managing table environments that integrate with Flink's DataStream API. It provides factory methods for creating table environments and comprehensive methods for converting between DataStream and Table representations.

3

4

## Core Interface

5

6

```java { .api }

7

@PublicEvolving

8

public interface StreamTableEnvironment extends TableEnvironment {

9

// Factory methods

10

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

11

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

12

13

// DataStream to Table conversions

14

<T> Table fromDataStream(DataStream<T> dataStream);

15

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

16

Table fromChangelogStream(DataStream<Row> dataStream);

17

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

18

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

19

20

// Table to DataStream conversions

21

DataStream<Row> toDataStream(Table table);

22

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

23

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

24

DataStream<Row> toChangelogStream(Table table);

25

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

26

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

27

28

// Temporary view creation

29

<T> void createTemporaryView(String path, DataStream<T> dataStream);

30

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

31

32

// Statement set creation

33

StreamStatementSet createStatementSet();

34

}

35

```

36

37

## Factory Methods

38

39

### Creating Basic Environment

40

41

Create a `StreamTableEnvironment` from an existing `StreamExecutionEnvironment`:

42

43

```java

44

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

45

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

46

```

47

48

### Creating with Custom Settings

49

50

Create a `StreamTableEnvironment` with specific configuration settings:

51

52

```java

53

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

54

EnvironmentSettings settings = EnvironmentSettings.newInstance()

55

.inStreamingMode()

56

.build();

57

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

58

```

59

60

## DataStream to Table Conversion

61

62

### Basic Conversion

63

64

Convert a `DataStream` to a `Table` using automatic schema inference:

65

66

```java

67

DataStream<MyPojo> dataStream = env.fromElements(

68

new MyPojo("Alice", 25),

69

new MyPojo("Bob", 30)

70

);

71

Table table = tableEnv.fromDataStream(dataStream);

72

```

73

74

### Conversion with Custom Schema

75

76

Convert a `DataStream` to a `Table` with a custom schema definition:

77

78

```java

79

DataStream<Row> dataStream = env.fromElements(

80

Row.of("Alice", 25),

81

Row.of("Bob", 30)

82

);

83

84

Schema schema = Schema.newBuilder()

85

.column("name", DataTypes.STRING())

86

.column("age", DataTypes.INT())

87

.build();

88

89

Table table = tableEnv.fromDataStream(dataStream, schema);

90

```

91

92

### Changelog Stream Conversion

93

94

Convert a changelog `DataStream` to a `Table`:

95

96

```java

97

DataStream<Row> changelogStream = // ... source of changelog data

98

Table table = tableEnv.fromChangelogStream(changelogStream);

99

100

// With custom schema

101

Schema schema = Schema.newBuilder()

102

.column("id", DataTypes.BIGINT())

103

.column("name", DataTypes.STRING())

104

.column("value", DataTypes.DOUBLE())

105

.build();

106

107

Table tableWithSchema = tableEnv.fromChangelogStream(changelogStream, schema);

108

109

// With changelog mode specification

110

ChangelogMode changelogMode = ChangelogMode.insertOnly();

111

Table tableWithMode = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode);

112

```

113

114

## Table to DataStream Conversion

115

116

### Basic Conversion to Row

117

118

Convert a `Table` to a `DataStream<Row>`:

119

120

```java

121

Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");

122

DataStream<Row> resultStream = tableEnv.toDataStream(table);

123

```

124

125

### Conversion to Specific Type

126

127

Convert a `Table` to a `DataStream` of a specific class:

128

129

```java

130

Table table = tableEnv.sqlQuery("SELECT name, age FROM users");

131

DataStream<MyPojo> pojoStream = tableEnv.toDataStream(table, MyPojo.class);

132

```

133

134

### Conversion with Data Type

135

136

Convert a `Table` to a `DataStream` using a specific data type:

137

138

```java

139

Table table = tableEnv.sqlQuery("SELECT name, age FROM users");

140

AbstractDataType<?> dataType = DataTypes.ROW(

141

DataTypes.FIELD("name", DataTypes.STRING()),

142

DataTypes.FIELD("age", DataTypes.INT())

143

);

144

DataStream<Row> typedStream = tableEnv.toDataStream(table, dataType);

145

```

146

147

### Changelog Stream Conversion

148

149

Convert a `Table` to a changelog `DataStream`:

150

151

```java

152

Table table = tableEnv.sqlQuery("SELECT id, name, COUNT(*) as cnt FROM events GROUP BY id, name");

153

DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);

154

155

// With target schema

156

Schema targetSchema = Schema.newBuilder()

157

.column("id", DataTypes.BIGINT())

158

.column("name", DataTypes.STRING())

159

.column("cnt", DataTypes.BIGINT())

160

.build();

161

162

DataStream<Row> changelogWithSchema = tableEnv.toChangelogStream(table, targetSchema);

163

164

// With changelog mode

165

ChangelogMode mode = ChangelogMode.upsert();

166

DataStream<Row> upsertChangelog = tableEnv.toChangelogStream(table, targetSchema, mode);

167

```

168

169

## Temporary View Creation

170

171

### Basic View Creation

172

173

Create a temporary view from a `DataStream`:

174

175

```java

176

DataStream<MyPojo> dataStream = env.fromElements(

177

new MyPojo("Alice", 25),

178

new MyPojo("Bob", 30)

179

);

180

181

tableEnv.createTemporaryView("users", dataStream);

182

183

// Now you can query the view

184

Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25");

185

```

186

187

### View Creation with Schema

188

189

Create a temporary view with a custom schema:

190

191

```java

192

DataStream<Row> dataStream = env.fromElements(

193

Row.of("Alice", 25),

194

Row.of("Bob", 30)

195

);

196

197

Schema schema = Schema.newBuilder()

198

.column("name", DataTypes.STRING())

199

.column("age", DataTypes.INT())

200

.build();

201

202

tableEnv.createTemporaryView("users_with_schema", dataStream, schema);

203

Table result = tableEnv.sqlQuery("SELECT name FROM users_with_schema WHERE age > 25");

204

```

205

206

## Statement Set Creation

207

208

Create a `StreamStatementSet` for batch operations:

209

210

```java

211

StreamStatementSet statementSet = tableEnv.createStatementSet();

212

213

// Add multiple insert operations

214

statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE condition1");

215

statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE condition2");

216

217

// Execute all statements together

218

statementSet.attachAsDataStream();

219

```

220

221

## Deprecated Methods

222

223

The following methods are deprecated and should be avoided in new code:

224

225

- `registerFunction()` - Use `createFunction()` instead

226

- `fromDataStream(DataStream, String)` - Use `fromDataStream(DataStream, Schema)` instead

227

- `registerDataStream()` - Use `createTemporaryView()` instead

228

- `toAppendStream()` - Use `toDataStream()` instead

229

- `toRetractStream()` - Use `toChangelogStream()` instead

230

- `execute()` - Use `StreamExecutionEnvironment.execute()` instead

231

232

## Error Handling

233

234

Common exceptions that may be thrown:

235

236

- `ValidationException` - When schema validation fails during conversion

237

- `TableException` - When table operations fail

238

- `IllegalArgumentException` - When invalid parameters are provided

239

240

Always ensure proper error handling when working with conversions:

241

242

```java

243

try {

244

Table table = tableEnv.fromDataStream(dataStream, schema);

245

DataStream<Row> result = tableEnv.toDataStream(table);

246

} catch (ValidationException e) {

247

// Handle schema validation errors

248

log.error("Schema validation failed: " + e.getMessage());

249

} catch (TableException e) {

250

// Handle table operation errors

251

log.error("Table operation failed: " + e.getMessage());

252

}

253

```