or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-line-interface.mdconfiguration-options.mdindex.mdresult-handling-display.mdsession-context-management.mdsql-client-application.mdsql-execution-gateway.md

session-context-management.mddocs/

0

# Session and Context Management

1

2

Configuration and context management for execution environments with property handling, dependency management, and session isolation. The context system provides the foundation for SQL execution environments with proper resource management and configuration inheritance.

3

4

## Capabilities

5

6

### DefaultContext Class

7

8

Default context configuration container providing base execution environment setup.

9

10

```java { .api }

11

public class DefaultContext {

12

/**

13

* Create default context with dependencies, configuration and command lines

14

* @param dependencies List of dependency URLs for classpath

15

* @param flinkConfig Flink configuration properties

16

* @param commandLines Custom command line processors

17

*/

18

public DefaultContext(List<URL> dependencies, Configuration flinkConfig, List<CustomCommandLine> commandLines);

19

20

/**

21

* Get underlying Flink configuration

22

* @return Configuration instance with Flink properties

23

*/

24

public Configuration getFlinkConfig();

25

26

/**

27

* Get dependency URLs for classpath construction

28

* @return List of URLs for JARs and libraries

29

*/

30

public List<URL> getDependencies();

31

}

32

```

33

34

**Usage Example:**

35

36

```java

37

import org.apache.flink.table.client.gateway.local.LocalContextUtils;

38

39

// Build default context from CLI options

40

CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);

41

DefaultContext context = LocalContextUtils.buildDefaultContext(options);

42

43

// Access configuration

44

Configuration flinkConfig = context.getFlinkConfig();

45

List<URL> dependencies = context.getDependencies();

46

47

// Use context with executor

48

LocalExecutor executor = new LocalExecutor(context);

49

```

50

51

### SessionContext Class

52

53

Session-specific context and state management with isolated configuration and dependencies.

54

55

```java { .api }

56

public class SessionContext {

57

/**

58

* Get unique session identifier

59

* @return Session ID string

60

*/

61

public String getSessionId();

62

63

/**

64

* Get session configuration as mutable Map

65

* @return Map of configuration key-value pairs

66

*/

67

public Map<String, String> getConfigMap();

68

69

/**

70

* Get session configuration as ReadableConfig for type-safe access

71

* @return ReadableConfig instance

72

*/

73

public ReadableConfig getReadableConfig();

74

}

75

```

76

77

### Session Property Management

78

79

Manage session-level configuration properties with default value handling.

80

81

```java { .api }

82

/**

83

* Set session property to specific value

84

* @param key Configuration property key

85

* @param value Configuration property value

86

*/

87

public void set(String key, String value);

88

89

/**

90

* Reset all session properties to default values

91

* Clears all session-specific overrides

92

*/

93

public void reset();

94

95

/**

96

* Reset specific property to default value

97

* @param key Property key to reset

98

*/

99

public void reset(String key);

100

```

101

102

**Usage Example:**

103

104

```java

105

SessionContext session = LocalContextUtils.buildSessionContext("my-session", defaultContext);

106

107

// Set session properties

108

session.set("table.exec.resource.default-parallelism", "4");

109

session.set("sql-client.execution.result-mode", "tableau");

110

session.set("table.exec.sink.not-null-enforcer", "error");

111

112

// Access configuration

113

ReadableConfig config = session.getReadableConfig();

114

int parallelism = config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);

115

116

// Reset specific property

117

session.reset("table.exec.resource.default-parallelism");

118

119

// Reset all session properties

120

session.reset();

121

```

122

123

### JAR and Dependency Management

124

125

Manage session-level JAR dependencies for custom functions, connectors, and formats.

126

127

```java { .api }

128

/**

129

* Add JAR to session classpath

130

* @param jarUrl URL or path to JAR file

131

*/

132

public void addJar(String jarUrl);

133

134

/**

135

* Remove JAR from session classpath

136

* @param jarUrl URL or path to JAR file to remove

137

*/

138

public void removeJar(String jarUrl);

139

140

/**

141

* List all loaded JARs in session

142

* @return List of JAR URLs/paths

143

*/

144

public List<String> listJars();

145

```

146

147

**Usage Example:**

148

149

```java

150

// Add custom connector

151

session.addJar("/path/to/flink-connector-kafka-1.14.6.jar");

152

153

// Add format JAR

154

session.addJar("file:///opt/flink/lib/flink-json-1.14.6.jar");

155

156

// Add from Maven repository URL

157

session.addJar("https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.14.6/flink-csv-1.14.6.jar");

158

159

// List loaded JARs

160

List<String> jars = session.listJars();

161

jars.forEach(jar -> System.out.println("Loaded: " + jar));

162

163

// Remove JAR when no longer needed

164

session.removeJar("/path/to/flink-connector-kafka-1.14.6.jar");

165

```

166

167

### Session Lifecycle Management

168

169

Proper resource cleanup and session management.

170

171

```java { .api }

172

/**

173

* Close session and release all associated resources

174

* Cleans up classloader, temp files, and execution contexts

175

*/

176

public void close();

177

```

178

179

**Usage Example:**

180

181

```java

182

SessionContext session = LocalContextUtils.buildSessionContext("session-1", defaultContext);

183

try {

184

// Use session for SQL operations

185

session.set("key", "value");

186

session.addJar("path/to/connector.jar");

187

188

// Session operations...

189

190

} finally {

191

// Always close session to prevent resource leaks

192

session.close();

193

}

194

```

195

196

### ExecutionContext Class

197

198

Execution environment context providing Table API integration.

199

200

```java { .api }

201

public class ExecutionContext {

202

/**

203

* Create execution context from session context

204

* @param sessionContext Session-specific context

205

* @param defaultContext Default context for base configuration

206

*/

207

public static ExecutionContext create(SessionContext sessionContext, DefaultContext defaultContext);

208

209

/**

210

* Get TableEnvironment for SQL execution

211

* @return TableEnvironment instance configured for this context

212

*/

213

public TableEnvironment getTableEnvironment();

214

215

/**

216

* Get session identifier

217

* @return Session ID

218

*/

219

public String getSessionId();

220

}

221

```

222

223

## LocalContextUtils Class

224

225

Utility class providing factory methods for context creation and management.

226

227

```java { .api }

228

public class LocalContextUtils {

229

/**

230

* Build default context from CLI options

231

* @param options Parsed command line options

232

* @return DefaultContext configured with options

233

*/

234

public static DefaultContext buildDefaultContext(CliOptions options);

235

236

/**

237

* Build session context with specified ID and default context

238

* @param sessionId Desired session identifier (null for auto-generation)

239

* @param defaultContext Base context for configuration inheritance

240

* @return SessionContext ready for use

241

*/

242

public static SessionContext buildSessionContext(String sessionId, DefaultContext defaultContext);

243

}

244

```

245

246

**Usage Example:**

247

248

```java

249

// Create contexts

250

CliOptions options = CliOptionsParser.parseEmbeddedModeClient(args);

251

DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);

252

SessionContext sessionContext = LocalContextUtils.buildSessionContext("my-session", defaultContext);

253

254

// Use contexts

255

LocalExecutor executor = new LocalExecutor(defaultContext);

256

executor.start();

257

String actualSessionId = executor.openSession(sessionContext.getSessionId());

258

```

259

260

## Configuration Integration

261

262

### Flink Configuration Properties

263

264

The context system integrates with Flink's configuration system:

265

266

```java

267

// Common configuration keys

268

session.set("table.exec.resource.default-parallelism", "8");

269

session.set("table.optimizer.join-reorder-enabled", "true");

270

session.set("pipeline.name", "My SQL Job");

271

session.set("execution.checkpointing.interval", "30s");

272

session.set("state.backend", "rocksdb");

273

```

274

275

### SQL Client Specific Options

276

277

SQL Client specific configuration options:

278

279

```java

280

// Result display options

281

session.set("sql-client.execution.result-mode", "tableau");

282

session.set("sql-client.execution.max-table-result.rows", "10000");

283

session.set("sql-client.display.max-column-width", "50");

284

285

// Execution options

286

session.set("sql-client.verbose", "true");

287

```

288

289

### Environment Variable Integration

290

291

Context creation respects environment variables:

292

293

- `FLINK_CONF_DIR`: Configuration directory path

294

- `FLINK_LIB_DIR`: Library directory for dependencies

295

- `HADOOP_CONF_DIR`: Hadoop configuration for HDFS access

296

297

## Session Isolation

298

299

Each session provides complete isolation:

300

301

- **Configuration**: Independent property overrides

302

- **Classpath**: Session-specific JAR loading

303

- **State**: Separate execution contexts

304

- **Resources**: Isolated cleanup on session close

305

306

**Example Multi-Session Usage:**

307

308

```java

309

DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);

310

LocalExecutor executor = new LocalExecutor(defaultContext);

311

executor.start();

312

313

// Create multiple isolated sessions

314

String session1 = executor.openSession("analytics-session");

315

String session2 = executor.openSession("etl-session");

316

317

try {

318

// Configure session 1 for analytics

319

executor.setSessionProperty(session1, "table.exec.resource.default-parallelism", "16");

320

executor.addJar(session1, "analytics-connector.jar");

321

322

// Configure session 2 for ETL

323

executor.setSessionProperty(session2, "table.exec.resource.default-parallelism", "4");

324

executor.addJar(session2, "etl-connector.jar");

325

326

// Sessions operate independently

327

executor.executeOperation(session1, analyticsQuery);

328

executor.executeOperation(session2, etlQuery);

329

330

} finally {

331

executor.closeSession(session1);

332

executor.closeSession(session2);

333

}

334

```

335

336

## Error Handling

337

338

Context and session management handle various error conditions:

339

340

- **Invalid JAR paths**: File not found or access denied

341

- **Configuration errors**: Invalid property keys or values

342

- **Resource conflicts**: Duplicate session IDs

343

- **Cleanup failures**: Resource release issues

344

345

**Example Error Handling:**

346

347

```java

348

try {

349

session.addJar("/invalid/path/connector.jar");

350

} catch (Exception e) {

351

System.err.println("Failed to load JAR: " + e.getMessage());

352

// Handle JAR loading error

353

}

354

355

try {

356

session.set("invalid.config.key", "value");

357

} catch (IllegalArgumentException e) {

358

System.err.println("Invalid configuration: " + e.getMessage());

359

// Handle configuration error

360

}

361

```

362

363

The context and session management system provides a robust foundation for SQL execution with proper resource management, configuration isolation, and dependency handling.