or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

table-environment.mddocs/

0

# Table Environment

1

2

The StreamTableEnvironment is the central entry point for integrating Table/SQL API with DataStream API. It provides factory methods, configuration options, and lifecycle management for table-stream operations.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Creates table environments with StreamExecutionEnvironment integration.

9

10

```java { .api }

11

/**

12

* Creates a table environment with default settings

13

* @param executionEnvironment The StreamExecutionEnvironment to integrate with

14

* @return StreamTableEnvironment instance

15

*/

16

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

17

18

/**

19

* Creates a table environment with custom settings

20

* @param executionEnvironment The StreamExecutionEnvironment to integrate with

21

* @param settings Custom environment settings for configuration

22

* @return StreamTableEnvironment instance

23

*/

24

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

25

```

26

27

**Usage Examples:**

28

29

```java

30

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

31

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

32

import org.apache.flink.table.api.EnvironmentSettings;

33

34

// Basic environment creation

35

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

36

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

37

38

// Environment with custom settings

39

EnvironmentSettings settings = EnvironmentSettings.newInstance()

40

.inStreamingMode()

41

.useBlinkPlanner()

42

.build();

43

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

44

```

45

46

### View Management

47

48

Creates temporary views from DataStream sources that can be referenced in SQL queries.

49

50

```java { .api }

51

/**

52

* Creates a temporary view from DataStream with automatic schema derivation

53

* @param path The catalog path for the view

54

* @param dataStream The DataStream to create view from

55

*/

56

<T> void createTemporaryView(String path, DataStream<T> dataStream);

57

58

/**

59

* Creates a temporary view from DataStream with custom schema

60

* @param path The catalog path for the view

61

* @param dataStream The DataStream to create view from

62

* @param schema Custom schema definition for the view

63

*/

64

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

65

```

66

67

**Usage Examples:**

68

69

```java

70

import org.apache.flink.streaming.api.datastream.DataStream;

71

import org.apache.flink.table.api.Schema;

72

import org.apache.flink.table.api.DataTypes;

73

import org.apache.flink.types.Row;

74

75

// Create view with automatic schema

76

DataStream<Row> orderStream = // ... your data stream

77

tableEnv.createTemporaryView("orders", orderStream);

78

79

// Query the view

80

Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100");

81

82

// Create view with custom schema

83

Schema customSchema = Schema.newBuilder()

84

.column("order_id", DataTypes.BIGINT())

85

.column("customer_name", DataTypes.STRING())

86

.column("amount", DataTypes.DECIMAL(10, 2))

87

.column("order_time", DataTypes.TIMESTAMP_LTZ(3))

88

.build();

89

90

tableEnv.createTemporaryView("custom_orders", orderStream, customSchema);

91

```

92

93

### Function Registration (Deprecated)

94

95

Legacy methods for registering user-defined functions. These are deprecated in favor of createTemporarySystemFunction.

96

97

```java { .api }

98

/**

99

* @deprecated Use createTemporarySystemFunction instead

100

*/

101

@Deprecated

102

<T> void registerFunction(String name, TableFunction<T> tableFunction);

103

104

/**

105

* @deprecated Use createTemporarySystemFunction instead

106

*/

107

@Deprecated

108

<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);

109

110

/**

111

* @deprecated Use createTemporarySystemFunction instead

112

*/

113

@Deprecated

114

<T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction);

115

```

116

117

### Statement Set Creation

118

119

Creates StreamStatementSet for batch execution of multiple operations.

120

121

```java { .api }

122

/**

123

* Creates a statement set for batch execution of table operations

124

* @return StreamStatementSet for adding multiple operations

125

*/

126

StreamStatementSet createStatementSet();

127

```

128

129

**Usage Examples:**

130

131

```java

132

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

133

134

// Create statement set for batch operations

135

StreamStatementSet statementSet = tableEnv.createStatementSet();

136

137

// Add multiple operations

138

statementSet.addInsert("sink_table1", table1);

139

statementSet.addInsert("sink_table2", table2);

140

statementSet.addInsertSql("INSERT INTO sink_table3 SELECT * FROM source_table");

141

142

// Execute all operations together

143

statementSet.execute();

144

```

145

146

## Type Definitions

147

148

### Environment Settings

149

150

```java { .api }

151

import org.apache.flink.table.api.EnvironmentSettings;

152

153

// Environment settings builder

154

EnvironmentSettings settings = EnvironmentSettings.newInstance()

155

.inStreamingMode() // or .inBatchMode()

156

.useBlinkPlanner() // Default planner

157

.build();

158

```

159

160

### Catalog Paths

161

162

View paths follow Flink's three-part naming convention:

163

164

- **Simple name**: `"my_view"` (uses default catalog and database)

165

- **Database qualified**: `"my_db.my_view"` (uses default catalog)

166

- **Fully qualified**: `"my_catalog.my_db.my_view"`

167

168

### Schema Definition

169

170

```java { .api }

171

import org.apache.flink.table.api.Schema;

172

import org.apache.flink.table.api.DataTypes;

173

174

// Schema building patterns

175

Schema schema = Schema.newBuilder()

176

.column("physical_col", DataTypes.STRING())

177

.columnByExpression("computed_col", "UPPER(physical_col)")

178

.columnByMetadata("metadata_col", DataTypes.TIMESTAMP_LTZ(3))

179

.watermark("event_time", "event_time - INTERVAL '5' SECOND")

180

.primaryKey("id")

181

.build();

182

```