or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-processing.mddatastream-integration.mdenvironment-setup.mdimplicit-conversions.mdindex.mdstatement-sets.mdtable-operations.md

environment-setup.mddocs/

0

# Environment and Setup

1

2

## Overview

3

4

The StreamTableEnvironment is the main entry point for integrating Flink's Table API with the Scala DataStream API. It provides factory methods to create properly configured environments for stream processing applications.

5

6

## Core API

7

8

### StreamTableEnvironment Factory Methods

9

10

```scala { .api }

11

object StreamTableEnvironment {

12

def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

13

def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment

14

}

15

```

16

17

## Basic Environment Creation

18

19

### Default Configuration

20

21

```scala

22

import org.apache.flink.streaming.api.scala._

23

import org.apache.flink.table.api.bridge.scala._

24

25

val env = StreamExecutionEnvironment.getExecutionEnvironment

26

val tableEnv = StreamTableEnvironment.create(env)

27

```

28

29

### Custom Environment Settings

30

31

```scala

32

import org.apache.flink.table.api.EnvironmentSettings

33

34

val env = StreamExecutionEnvironment.getExecutionEnvironment

35

36

val settings = EnvironmentSettings.newInstance()

37

.inStreamingMode()

38

.build()

39

40

val tableEnv = StreamTableEnvironment.create(env, settings)

41

```

42

43

## Environment Configuration

44

45

### Execution Environment Setup

46

47

```scala

48

val env = StreamExecutionEnvironment.getExecutionEnvironment

49

50

// Configure parallelism

51

env.setParallelism(4)

52

53

// Configure checkpointing

54

env.enableCheckpointing(5000)

55

56

// Configure time characteristics

57

env.getConfig.setAutoWatermarkInterval(1000)

58

59

val tableEnv = StreamTableEnvironment.create(env)

60

```

61

62

### Table Environment Configuration

63

64

```scala

65

// Access table configuration

66

val tableConfig = tableEnv.getConfig

67

68

// Configure timezone

69

tableConfig.setLocalTimeZone(ZoneId.of("UTC"))

70

71

// Configure execution configuration

72

tableConfig.getConfiguration.setString("table.exec.mini-batch.enabled", "true")

73

tableConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency", "1s")

74

```

75

76

## Advanced Configuration

77

78

### Custom Planner Settings

79

80

```scala

81

val settings = EnvironmentSettings.newInstance()

82

.inStreamingMode()

83

.withBuiltInCatalogName("default_catalog")

84

.withBuiltInDatabaseName("default_database")

85

.build()

86

87

val tableEnv = StreamTableEnvironment.create(env, settings)

88

```

89

90

### Resource Configuration

91

92

```scala

93

// Configure for production workloads

94

val env = StreamExecutionEnvironment.getExecutionEnvironment

95

env.setParallelism(Runtime.getRuntime.availableProcessors())

96

97

// Configure memory settings

98

val config = env.getConfig

99

config.enableObjectReuse()

100

101

val tableEnv = StreamTableEnvironment.create(env)

102

```

103

104

## Implementation Details

105

106

The StreamTableEnvironment is implemented by `StreamTableEnvironmentImpl`, which:

107

108

- Manages catalog and function registration

109

- Handles query optimization and execution planning

110

- Provides DataStream ↔ Table conversion functionality

111

- Integrates with Flink's checkpoint and recovery mechanisms

112

113

## Error Handling

114

115

Environment creation can throw exceptions for invalid configurations:

116

117

```scala

118

try {

119

val tableEnv = StreamTableEnvironment.create(env, settings)

120

} catch {

121

case e: IllegalArgumentException => // Invalid settings

122

case e: TableException => // Environment creation failure

123

}

124

```

125

126

## Best Practices

127

128

1. **Reuse Environments**: Create one StreamTableEnvironment per application

129

2. **Configure Early**: Set all configuration before registering tables or executing queries

130

3. **Resource Management**: Match parallelism to available cluster resources

131

4. **Checkpointing**: Enable checkpointing for fault tolerance in production

132

5. **Time Handling**: Configure appropriate watermark intervals for event-time processing