0
# Environment Management
1
2
Centralized management of Spark and Hive execution contexts with optimized configurations. Provides singleton access to shared contexts used across the thrift server.
3
4
## Capabilities
5
6
### Context Variables
7
8
Shared context instances accessible throughout the thrift server components.
9
10
```scala { .api }
11
/**
12
* Shared HiveContext instance used for SQL execution
13
* Provides access to Hive metastore and Spark SQL functionality
14
*/
15
var hiveContext: HiveContext
16
17
/**
18
* Underlying SparkContext instance
19
* Manages Spark cluster connection and job execution
20
*/
21
var sparkContext: SparkContext
22
```
23
24
### Environment Initialization
25
26
Initialize Spark and Hive contexts with optimized default configurations.
27
28
```scala { .api }
29
/**
30
* Initialize Spark and Hive contexts with default configurations
31
* Creates SparkContext with optimized settings and wraps it in HiveContext
32
* Should be called once before using other thrift server components
33
*/
34
def init(): Unit
35
```
36
37
**Default Configuration Applied:**
38
39
```scala
40
val sparkConf = new SparkConf(loadDefaults = true)
41
sparkConf
42
.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))
43
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
44
.set("spark.kryo.referenceTracking", "false")
45
46
sparkContext = new SparkContext(sparkConf)
47
sparkContext.addSparkListener(new StatsReportListener())
48
hiveContext = new HiveContext(sparkContext)
49
```
50
51
**Hive Integration Setup:**
52
53
```scala
54
// Configure output streams for Hive metastore
55
hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
56
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
57
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
58
59
// Set Hive version compatibility
60
hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
61
```
62
63
**Usage Example:**
64
65
```scala
66
import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
67
68
// Initialize the environment (call once at startup)
69
SparkSQLEnv.init()
70
71
// Access shared contexts
72
val spark = SparkSQLEnv.sparkContext
73
val hive = SparkSQLEnv.hiveContext
74
75
// Execute SQL
76
val df = hive.sql("SELECT * FROM my_table")
77
df.show()
78
```
79
80
### Environment Cleanup
81
82
Clean shutdown of Spark SQL environment with proper resource cleanup.
83
84
```scala { .api }
85
/**
86
* Clean shutdown of Spark SQL environment
87
* Stops SparkContext and releases all associated resources
88
* Should be called during application shutdown
89
*/
90
def stop(): Unit
91
```
92
93
**Cleanup Process:**
94
95
```scala
96
logDebug("Shutting down Spark SQL Environment")
97
if (SparkSQLEnv.sparkContext != null) {
98
sparkContext.stop() // Stop Spark cluster connection
99
sparkContext = null // Clear reference
100
hiveContext = null // Clear reference
101
}
102
```
103
104
**Usage with Shutdown Hooks:**
105
106
```scala
107
import org.apache.spark.util.ShutdownHookManager
108
109
// Register shutdown hook for clean environment cleanup
110
ShutdownHookManager.addShutdownHook { () =>
111
SparkSQLEnv.stop()
112
uiTab.foreach(_.detach())
113
}
114
```
115
116
### Configuration Customization
117
118
The environment respects existing Spark configuration while applying sensible defaults:
119
120
**Application Name Resolution:**
121
```scala
122
val maybeAppName = sparkConf
123
.getOption("spark.app.name")
124
.filterNot(_ == classOf[SparkSQLCLIDriver].getName)
125
126
sparkConf.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))
127
```
128
129
**Serializer Optimization:**
130
```scala
131
val maybeSerializer = sparkConf.getOption("spark.serializer")
132
sparkConf.set("spark.serializer",
133
maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
134
```
135
136
**Kryo Reference Tracking:**
137
```scala
138
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
139
sparkConf.set("spark.kryo.referenceTracking",
140
maybeKryoReferenceTracking.getOrElse("false"))
141
```
142
143
### Monitoring Integration
144
145
The environment automatically adds performance monitoring:
146
147
```scala
148
sparkContext.addSparkListener(new StatsReportListener())
149
```
150
151
This enables:
152
- Job execution statistics
153
- Stage completion metrics
154
- Task-level performance data
155
- Integration with Spark UI
156
157
### Hive Configuration Debugging
158
159
When debug logging is enabled, the environment logs all Hive configuration properties:
160
161
```scala
162
if (log.isDebugEnabled) {
163
hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) =>
164
logDebug(s"HiveConf var: $k=$v")
165
}
166
}
167
```
168
169
This helps troubleshoot configuration issues and verify proper setup.
170
171
### Thread Safety
172
173
The SparkSQLEnv object implements thread-safe initialization:
174
175
```scala
176
def init() {
177
if (hiveContext == null) {
178
// Initialization code only runs once
179
// Subsequent calls are no-ops
180
}
181
}
182
```
183
184
Multiple threads can safely call `init()` without creating duplicate contexts.
185
186
### Error Scenarios
187
188
**Context Already Stopped:**
189
If the SparkContext is stopped externally, the thrift server detects this condition:
190
191
```scala
192
if (SparkSQLEnv.sparkContext.stopped.get()) {
193
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
194
System.exit(-1)
195
}
196
```
197
198
**Initialization Failures:**
199
Initialization errors are propagated to calling code:
200
201
```scala
202
try {
203
SparkSQLEnv.init()
204
} catch {
205
case e: Exception =>
206
logError("Failed to initialize SparkSQL environment", e)
207
throw e
208
}
209
```
210
211
### Integration Points
212
213
The environment is used by several thrift server components:
214
215
- **HiveThriftServer2**: Uses contexts for server initialization
216
- **SparkSQLCLIDriver**: Accesses contexts for SQL execution
217
- **SparkSQLSessionManager**: Creates isolated sessions from base context
218
- **SparkSQLDriver**: Executes queries against the shared context
219
220
### Best Practices
221
222
1. **Single Initialization**: Call `init()` once at application startup
223
2. **Shutdown Hooks**: Always register cleanup with `ShutdownHookManager`
224
3. **Configuration**: Set Spark configuration before calling `init()`
225
4. **Error Handling**: Wrap `init()` in try-catch for proper error handling
226
5. **Context Access**: Use the shared contexts rather than creating new ones