0
# Environment and Setup
1
2
## Overview
3
4
The StreamTableEnvironment is the main entry point for integrating Flink's Table API with the Scala DataStream API. It provides factory methods to create properly configured environments for stream processing applications.
5
6
## Core API
7
8
### StreamTableEnvironment Factory Methods
9
10
```scala { .api }
11
object StreamTableEnvironment {
12
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
13
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
14
}
15
```
16
17
## Basic Environment Creation
18
19
### Default Configuration
20
21
```scala
22
import org.apache.flink.streaming.api.scala._
23
import org.apache.flink.table.api.bridge.scala._
24
25
val env = StreamExecutionEnvironment.getExecutionEnvironment
26
val tableEnv = StreamTableEnvironment.create(env)
27
```
28
29
### Custom Environment Settings
30
31
```scala
32
import org.apache.flink.table.api.EnvironmentSettings
33
34
val env = StreamExecutionEnvironment.getExecutionEnvironment
35
36
val settings = EnvironmentSettings.newInstance()
37
.inStreamingMode()
38
.build()
39
40
val tableEnv = StreamTableEnvironment.create(env, settings)
41
```
42
43
## Environment Configuration
44
45
### Execution Environment Setup
46
47
```scala
48
val env = StreamExecutionEnvironment.getExecutionEnvironment
49
50
// Configure parallelism
51
env.setParallelism(4)
52
53
// Configure checkpointing
54
env.enableCheckpointing(5000)
55
56
// Configure time characteristics
57
env.getConfig.setAutoWatermarkInterval(1000)
58
59
val tableEnv = StreamTableEnvironment.create(env)
60
```
61
62
### Table Environment Configuration
63
64
```scala
65
// Access table configuration
66
val tableConfig = tableEnv.getConfig
67
68
// Configure timezone
69
tableConfig.setLocalTimeZone(ZoneId.of("UTC"))
70
71
// Configure execution configuration
72
tableConfig.getConfiguration.setString("table.exec.mini-batch.enabled", "true")
73
tableConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency", "1s")
74
```
75
76
## Advanced Configuration
77
78
### Custom Planner Settings
79
80
```scala
81
val settings = EnvironmentSettings.newInstance()
82
.inStreamingMode()
83
.withBuiltInCatalogName("default_catalog")
84
.withBuiltInDatabaseName("default_database")
85
.build()
86
87
val tableEnv = StreamTableEnvironment.create(env, settings)
88
```
89
90
### Resource Configuration
91
92
```scala
93
// Configure for production workloads
94
val env = StreamExecutionEnvironment.getExecutionEnvironment
95
env.setParallelism(Runtime.getRuntime.availableProcessors())
96
97
// Configure memory settings
98
val config = env.getConfig
99
config.enableObjectReuse()
100
101
val tableEnv = StreamTableEnvironment.create(env)
102
```
103
104
## Implementation Details
105
106
The StreamTableEnvironment is implemented by `StreamTableEnvironmentImpl`, which:
107
108
- Manages catalog and function registration
109
- Handles query optimization and execution planning
110
- Provides DataStream ↔ Table conversion functionality
111
- Integrates with Flink's checkpoint and recovery mechanisms
112
113
## Error Handling
114
115
Environment creation can throw exceptions for invalid configurations:
116
117
```scala
118
try {
119
val tableEnv = StreamTableEnvironment.create(env, settings)
120
} catch {
121
case e: IllegalArgumentException => // Invalid settings
122
case e: TableException => // Environment creation failure
123
}
124
```
125
126
## Best Practices
127
128
1. **Reuse Environments**: Create one StreamTableEnvironment per application
129
2. **Configure Early**: Set all configuration before registering tables or executing queries
130
3. **Resource Management**: Match parallelism to available cluster resources
131
4. **Checkpointing**: Enable checkpointing for fault tolerance in production
132
5. **Time Handling**: Configure appropriate watermark intervals for event-time processing