0
# Session and Context Management
1
2
Configuration and context management for execution environments with property handling, dependency management, and session isolation. The context system provides the foundation for SQL execution environments with proper resource management and configuration inheritance.
3
4
## Capabilities
5
6
### DefaultContext Class
7
8
Default context configuration container providing base execution environment setup.
9
10
```java { .api }
11
public class DefaultContext {
12
/**
13
* Create default context with dependencies, configuration and command lines
14
* @param dependencies List of dependency URLs for classpath
15
* @param flinkConfig Flink configuration properties
16
* @param commandLines Custom command line processors
17
*/
18
public DefaultContext(List<URL> dependencies, Configuration flinkConfig, List<CustomCommandLine> commandLines);
19
20
/**
21
* Get underlying Flink configuration
22
* @return Configuration instance with Flink properties
23
*/
24
public Configuration getFlinkConfig();
25
26
/**
27
* Get dependency URLs for classpath construction
28
* @return List of URLs for JARs and libraries
29
*/
30
public List<URL> getDependencies();
31
}
32
```
33
34
**Usage Example:**
35
36
```java
37
import org.apache.flink.table.client.gateway.local.LocalContextUtils;
38
39
// Build default context from CLI options
40
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
41
DefaultContext context = LocalContextUtils.buildDefaultContext(options);
42
43
// Access configuration
44
Configuration flinkConfig = context.getFlinkConfig();
45
List<URL> dependencies = context.getDependencies();
46
47
// Use context with executor
48
LocalExecutor executor = new LocalExecutor(context);
49
```
50
51
### SessionContext Class
52
53
Session-specific context and state management with isolated configuration and dependencies.
54
55
```java { .api }
56
public class SessionContext {
57
/**
58
* Get unique session identifier
59
* @return Session ID string
60
*/
61
public String getSessionId();
62
63
/**
64
* Get session configuration as mutable Map
65
* @return Map of configuration key-value pairs
66
*/
67
public Map<String, String> getConfigMap();
68
69
/**
70
* Get session configuration as ReadableConfig for type-safe access
71
* @return ReadableConfig instance
72
*/
73
public ReadableConfig getReadableConfig();
74
}
75
```
76
77
### Session Property Management
78
79
Manage session-level configuration properties with default value handling.
80
81
```java { .api }
82
/**
83
* Set session property to specific value
84
* @param key Configuration property key
85
* @param value Configuration property value
86
*/
87
public void set(String key, String value);
88
89
/**
90
* Reset all session properties to default values
91
* Clears all session-specific overrides
92
*/
93
public void reset();
94
95
/**
96
* Reset specific property to default value
97
* @param key Property key to reset
98
*/
99
public void reset(String key);
100
```
101
102
**Usage Example:**
103
104
```java
105
SessionContext session = LocalContextUtils.buildSessionContext("my-session", defaultContext);
106
107
// Set session properties
108
session.set("table.exec.resource.default-parallelism", "4");
109
session.set("sql-client.execution.result-mode", "tableau");
110
session.set("table.exec.sink.not-null-enforcer", "error");
111
112
// Access configuration
113
ReadableConfig config = session.getReadableConfig();
114
int parallelism = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
115
116
// Reset specific property
117
session.reset("table.exec.resource.default-parallelism");
118
119
// Reset all session properties
120
session.reset();
121
```
122
123
### JAR and Dependency Management
124
125
Manage session-level JAR dependencies for custom functions, connectors, and formats.
126
127
```java { .api }
128
/**
129
* Add JAR to session classpath
130
* @param jarUrl URL or path to JAR file
131
*/
132
public void addJar(String jarUrl);
133
134
/**
135
* Remove JAR from session classpath
136
* @param jarUrl URL or path to JAR file to remove
137
*/
138
public void removeJar(String jarUrl);
139
140
/**
141
* List all loaded JARs in session
142
* @return List of JAR URLs/paths
143
*/
144
public List<String> listJars();
145
```
146
147
**Usage Example:**
148
149
```java
150
// Add custom connector
151
session.addJar("/path/to/flink-connector-kafka-1.14.6.jar");
152
153
// Add format JAR
154
session.addJar("file:///opt/flink/lib/flink-json-1.14.6.jar");
155
156
// Add from Maven repository URL
157
session.addJar("https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.6/flink-csv-1.14.6.jar");
158
159
// List loaded JARs
160
List<String> jars = session.listJars();
161
jars.forEach(jar -> System.out.println("Loaded: " + jar));
162
163
// Remove JAR when no longer needed
164
session.removeJar("/path/to/flink-connector-kafka-1.14.6.jar");
165
```
166
167
### Session Lifecycle Management
168
169
Proper resource cleanup and session management.
170
171
```java { .api }
172
/**
173
* Close session and release all associated resources
174
* Cleans up classloader, temp files, and execution contexts
175
*/
176
public void close();
177
```
178
179
**Usage Example:**
180
181
```java
182
SessionContext session = LocalContextUtils.buildSessionContext("session-1", defaultContext);
183
try {
184
// Use session for SQL operations
185
session.set("key", "value");
186
session.addJar("path/to/connector.jar");
187
188
// Session operations...
189
190
} finally {
191
// Always close session to prevent resource leaks
192
session.close();
193
}
194
```
195
196
### ExecutionContext Class
197
198
Execution environment context providing Table API integration.
199
200
```java { .api }
201
public class ExecutionContext {
202
/**
203
* Create execution context from session context
204
* @param sessionContext Session-specific context
205
* @param defaultContext Default context for base configuration
206
*/
207
public static ExecutionContext create(SessionContext sessionContext, DefaultContext defaultContext);
208
209
/**
210
* Get TableEnvironment for SQL execution
211
* @return TableEnvironment instance configured for this context
212
*/
213
public TableEnvironment getTableEnvironment();
214
215
/**
216
* Get session identifier
217
* @return Session ID
218
*/
219
public String getSessionId();
220
}
221
```
222
223
## LocalContextUtils Class
224
225
Utility class providing factory methods for context creation and management.
226
227
```java { .api }
228
public class LocalContextUtils {
229
/**
230
* Build default context from CLI options
231
* @param options Parsed command line options
232
* @return DefaultContext configured with options
233
*/
234
public static DefaultContext buildDefaultContext(CliOptions options);
235
236
/**
237
* Build session context with specified ID and default context
238
* @param sessionId Desired session identifier (null for auto-generation)
239
* @param defaultContext Base context for configuration inheritance
240
* @return SessionContext ready for use
241
*/
242
public static SessionContext buildSessionContext(String sessionId, DefaultContext defaultContext);
243
}
244
```
245
246
**Usage Example:**
247
248
```java
249
// Create contexts
250
CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);
251
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
252
SessionContext sessionContext = LocalContextUtils.buildSessionContext("my-session", defaultContext);
253
254
// Use contexts
255
LocalExecutor executor = new LocalExecutor(defaultContext);
256
executor.start();
257
String actualSessionId = executor.openSession(sessionContext.getSessionId());
258
```
259
260
## Configuration Integration
261
262
### Flink Configuration Properties
263
264
The context system integrates with Flink's configuration system:
265
266
```java
267
// Common configuration keys
268
session.set("table.exec.resource.default-parallelism", "8");
269
session.set("table.optimizer.join-reorder-enabled", "true");
270
session.set("pipeline.name", "My SQL Job");
271
session.set("execution.checkpointing.interval", "30s");
272
session.set("state.backend", "rocksdb");
273
```
274
275
### SQL Client Specific Options
276
277
SQL Client specific configuration options:
278
279
```java
280
// Result display options
281
session.set("sql-client.execution.result-mode", "tableau");
282
session.set("sql-client.execution.max-table-result.rows", "10000");
283
session.set("sql-client.display.max-column-width", "50");
284
285
// Execution options
286
session.set("sql-client.verbose", "true");
287
```
288
289
### Environment Variable Integration
290
291
Context creation respects environment variables:
292
293
- `FLINK_CONF_DIR`: Configuration directory path
294
- `FLINK_LIB_DIR`: Library directory for dependencies
295
- `HADOOP_CONF_DIR`: Hadoop configuration for HDFS access
296
297
## Session Isolation
298
299
Each session provides complete isolation:
300
301
- **Configuration**: Independent property overrides
302
- **Classpath**: Session-specific JAR loading
303
- **State**: Separate execution contexts
304
- **Resources**: Isolated cleanup on session close
305
306
**Example Multi-Session Usage:**
307
308
```java
309
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
310
LocalExecutor executor = new LocalExecutor(defaultContext);
311
executor.start();
312
313
// Create multiple isolated sessions
314
String session1 = executor.openSession("analytics-session");
315
String session2 = executor.openSession("etl-session");
316
317
try {
318
// Configure session 1 for analytics
319
executor.setSessionProperty(session1, "table.exec.resource.default-parallelism", "16");
320
executor.addJar(session1, "analytics-connector.jar");
321
322
// Configure session 2 for ETL
323
executor.setSessionProperty(session2, "table.exec.resource.default-parallelism", "4");
324
executor.addJar(session2, "etl-connector.jar");
325
326
// Sessions operate independently
327
executor.executeOperation(session1, analyticsQuery);
328
executor.executeOperation(session2, etlQuery);
329
330
} finally {
331
executor.closeSession(session1);
332
executor.closeSession(session2);
333
}
334
```
335
336
## Error Handling
337
338
Context and session management handle various error conditions:
339
340
- **Invalid JAR paths**: File not found or access denied
341
- **Configuration errors**: Invalid property keys or values
342
- **Resource conflicts**: Duplicate session IDs
343
- **Cleanup failures**: Resource release issues
344
345
**Example Error Handling:**
346
347
```java
348
try {
349
session.addJar("/invalid/path/connector.jar");
350
} catch (Exception e) {
351
System.err.println("Failed to load JAR: " + e.getMessage());
352
// Handle JAR loading error
353
}
354
355
try {
356
session.set("invalid.config.key", "value");
357
} catch (IllegalArgumentException e) {
358
System.err.println("Invalid configuration: " + e.getMessage());
359
// Handle configuration error
360
}
361
```
362
363
The context and session management system provides a robust foundation for SQL execution with proper resource management, configuration isolation, and dependency handling.