0
# Configuration and Options
1
2
Configuration system for SQL client behavior including result modes, display options, and execution parameters. The configuration system provides comprehensive control over SQL client execution, result display, and performance tuning.
3
4
## Capabilities
5
6
### SqlClientOptions Class
7
8
Configuration options for SQL client behavior with typed configuration keys and default values.
9
10
```java { .api }
11
public class SqlClientOptions {
12
/**
13
* Maximum number of rows to cache in table result mode
14
* Default: 1,000,000 rows
15
* Type: Integer
16
*/
17
public static final ConfigOption<Integer> EXECUTION_MAX_TABLE_RESULT_ROWS;
18
19
/**
20
* How to display query results - table, changelog, or tableau mode
21
* Default: TABLE
22
* Type: ResultMode enum
23
*/
24
public static final ConfigOption<ResultMode> EXECUTION_RESULT_MODE;
25
26
/**
27
* Whether to output verbose error messages with stack traces
28
* Default: false
29
* Type: Boolean
30
*/
31
public static final ConfigOption<Boolean> VERBOSE;
32
33
/**
34
* Maximum column width for display formatting
35
* Default: 30 characters
36
* Type: Integer
37
*/
38
public static final ConfigOption<Integer> DISPLAY_MAX_COLUMN_WIDTH;
39
}
40
```
41
42
**Usage Example:**
43
44
```java
45
// Set configuration options through session properties
46
executor.setSessionProperty(sessionId,
47
SqlClientOptions.EXECUTION_RESULT_MODE.key(), "TABLEAU");
48
49
executor.setSessionProperty(sessionId,
50
SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS.key(), "50000");
51
52
executor.setSessionProperty(sessionId,
53
SqlClientOptions.VERBOSE.key(), "true");
54
55
executor.setSessionProperty(sessionId,
56
SqlClientOptions.DISPLAY_MAX_COLUMN_WIDTH.key(), "50");
57
58
// Access configuration values
59
ReadableConfig config = executor.getSessionConfig(sessionId);
60
ResultMode mode = config.get(SqlClientOptions.EXECUTION_RESULT_MODE);
61
boolean verbose = config.get(SqlClientOptions.VERBOSE);
62
int maxRows = config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS);
63
```
64
65
### ResultMode Enumeration
66
67
Query result display modes determining how results are presented to users.
68
69
```java { .api }
70
public enum ResultMode {
71
/**
72
* Materialized, paginated table view with interactive navigation
73
* Results are cached in memory for random page access
74
* Best for: Bounded result sets, data exploration, debugging
75
*/
76
TABLE,
77
78
/**
79
* Continuous stream visualization showing incremental changes
80
* Results stream in real-time as they arrive
81
* Best for: Streaming queries, real-time monitoring, change tracking
82
*/
83
CHANGELOG,
84
85
/**
86
* Direct tableau format display without interactive features
87
* Results displayed immediately in formatted table
88
* Best for: Non-interactive execution, scripting, simple output
89
*/
90
TABLEAU
91
}
92
```
93
94
**Usage Example:**
95
96
```java
97
// Set result mode based on query type
98
if (isStreamingQuery) {
99
executor.setSessionProperty(sessionId,
100
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.CHANGELOG.name());
101
} else if (isInteractiveSession) {
102
executor.setSessionProperty(sessionId,
103
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name());
104
} else {
105
executor.setSessionProperty(sessionId,
106
SqlClientOptions.EXECUTION_RESULT_MODE.key(), ResultMode.TABLEAU.name());
107
}
108
```
109
110
## Flink Configuration Integration
111
112
### Table Execution Options
113
114
Core Flink table execution configuration options accessible through session properties:
115
116
```java
117
// Parallelism configuration
118
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "8");
119
120
// Checkpointing configuration
121
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "30s");
122
executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");
123
124
// State backend configuration
125
executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
126
executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://bucket/checkpoints");
127
128
// Resource configuration
129
executor.setSessionProperty(sessionId, "taskmanager.memory.process.size", "4gb");
130
executor.setSessionProperty(sessionId, "jobmanager.memory.process.size", "2gb");
131
```
132
133
### SQL Client Execution Options
134
135
SQL client specific execution behavior configuration:
136
137
```java
138
// Result caching and display
139
executor.setSessionProperty(sessionId, "sql-client.execution.max-table-result.rows", "100000");
140
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
141
executor.setSessionProperty(sessionId, "sql-client.display.max-column-width", "40");
142
143
// Error handling and debugging
144
executor.setSessionProperty(sessionId, "sql-client.verbose", "true");
145
146
// DML synchronous execution
147
executor.setSessionProperty(sessionId, "table.dml-sync", "true");
148
```
149
150
### Pipeline and Job Configuration
151
152
Job-level configuration options for pipeline execution:
153
154
```java
155
// Job naming and identification
156
executor.setSessionProperty(sessionId, "pipeline.name", "My Analytics Job");
157
executor.setSessionProperty(sessionId, "pipeline.jars", "file:///path/to/connector.jar");
158
159
// Optimization settings
160
executor.setSessionProperty(sessionId, "table.optimizer.join-reorder-enabled", "true");
161
executor.setSessionProperty(sessionId, "table.optimizer.agg-phase-strategy", "TWO_PHASE");
162
163
// Time and timezone settings
164
executor.setSessionProperty(sessionId, "table.local-time-zone", "UTC");
165
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
166
```
167
168
## Configuration Management Patterns
169
170
### Session Configuration Lifecycle
171
172
Complete session configuration management pattern:
173
174
```java
175
// Open session and configure
176
String sessionId = executor.openSession("analytics-session");
177
178
try {
179
// Set base configuration
180
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
181
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
182
183
// Add required JARs
184
executor.addJar(sessionId, "flink-connector-kafka-1.14.6.jar");
185
executor.addJar(sessionId, "flink-json-1.14.6.jar");
186
187
// Execute operations with configured environment
188
Operation createTable = executor.parseStatement(sessionId, """
189
CREATE TABLE events (
190
id STRING,
191
timestamp TIMESTAMP(3),
192
data ROW<field1 STRING, field2 INT>
193
) WITH (
194
'connector' = 'kafka',
195
'topic' = 'events',
196
'properties.bootstrap.servers' = 'localhost:9092',
197
'format' = 'json'
198
)
199
""");
200
executor.executeOperation(sessionId, createTable);
201
202
// Query with session configuration applied
203
QueryOperation query = (QueryOperation) executor.parseStatement(sessionId,
204
"SELECT * FROM events WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '1' HOUR");
205
ResultDescriptor result = executor.executeQuery(sessionId, query);
206
207
} finally {
208
executor.closeSession(sessionId);
209
}
210
```
211
212
### Configuration Validation and Error Handling
213
214
Proper configuration validation and error handling:
215
216
```java
217
try {
218
// Validate configuration before setting
219
String parallelism = "invalid_number";
220
Integer.parseInt(parallelism); // Validate numeric values
221
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", parallelism);
222
223
} catch (NumberFormatException e) {
224
System.err.println("Invalid parallelism value: " + parallelism);
225
// Use default or prompt for correction
226
}
227
228
try {
229
executor.setSessionProperty(sessionId, "invalid.config.key", "value");
230
} catch (Exception e) {
231
System.err.println("Invalid configuration key: " + e.getMessage());
232
// Handle configuration error
233
}
234
235
// Check configuration values
236
ReadableConfig config = executor.getSessionConfig(sessionId);
237
if (config.get(SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS) > 1000000) {
238
System.out.println("Warning: Large result cache size may cause memory issues");
239
}
240
```
241
242
### Environment-Specific Configuration
243
244
Configuration patterns for different environments:
245
246
```java
247
// Development environment
248
void configureForDevelopment(Executor executor, String sessionId) {
249
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "1");
250
executor.setSessionProperty(sessionId, "sql-client.verbose", "true");
251
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "table");
252
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "10s");
253
}
254
255
// Production environment
256
void configureForProduction(Executor executor, String sessionId) {
257
executor.setSessionProperty(sessionId, "table.exec.resource.default-parallelism", "16");
258
executor.setSessionProperty(sessionId, "sql-client.verbose", "false");
259
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "tableau");
260
executor.setSessionProperty(sessionId, "execution.checkpointing.interval", "60s");
261
executor.setSessionProperty(sessionId, "state.backend", "rocksdb");
262
executor.setSessionProperty(sessionId, "state.checkpoints.dir", "s3://prod-checkpoints/");
263
}
264
265
// Streaming environment
266
void configureForStreaming(Executor executor, String sessionId) {
267
executor.setSessionProperty(sessionId, "sql-client.execution.result-mode", "changelog");
268
executor.setSessionProperty(sessionId, "table.exec.source.idle-timeout", "30s");
269
executor.setSessionProperty(sessionId, "execution.checkpointing.mode", "EXACTLY_ONCE");
270
executor.setSessionProperty(sessionId, "table.exec.sink.not-null-enforcer", "error");
271
}
272
```
273
274
## Configuration Property Categories
275
276
### Display and Output Configuration
277
278
```java
279
// Result display formatting
280
"sql-client.display.max-column-width" = "50" // Column width limit
281
"sql-client.execution.result-mode" = "table" // Display mode
282
"sql-client.execution.max-table-result.rows" = "10000" // Cache limit
283
284
// Error output configuration
285
"sql-client.verbose" = "true" // Verbose errors
286
```
287
288
### Execution and Performance Configuration
289
290
```java
291
// Core execution settings
292
"table.exec.resource.default-parallelism" = "8" // Default parallelism
293
"table.dml-sync" = "false" // Async DML execution
294
295
// Optimization settings
296
"table.optimizer.join-reorder-enabled" = "true" // Join optimization
297
"table.optimizer.agg-phase-strategy" = "AUTO" // Aggregation strategy
298
```
299
300
### Checkpointing and State Configuration
301
302
```java
303
// Checkpointing configuration
304
"execution.checkpointing.interval" = "30s" // Checkpoint interval
305
"execution.checkpointing.mode" = "EXACTLY_ONCE" // Consistency mode
306
"execution.checkpointing.timeout" = "10min" // Checkpoint timeout
307
308
// State backend configuration
309
"state.backend" = "rocksdb" // State backend type
310
"state.checkpoints.dir" = "s3://bucket/checkpoints" // Checkpoint storage
311
"state.savepoints.dir" = "s3://bucket/savepoints" // Savepoint storage
312
```
313
314
### Resource and Memory Configuration
315
316
```java
317
// TaskManager configuration
318
"taskmanager.memory.process.size" = "4gb" // Total TM memory
319
"taskmanager.memory.task.heap.fraction" = "0.4" // Task heap fraction
320
"taskmanager.memory.managed.fraction" = "0.4" // Managed memory
321
322
// JobManager configuration
323
"jobmanager.memory.process.size" = "2gb" // Total JM memory
324
"jobmanager.memory.heap.size" = "1.5gb" // JM heap size
325
```
326
327
## Configuration Reset and Management
328
329
### Property Reset Operations
330
331
```java
332
// Reset specific property to default
333
executor.resetSessionProperty(sessionId, "table.exec.resource.default-parallelism");
334
335
// Reset all session properties
336
executor.resetSessionProperties(sessionId);
337
338
// Get current configuration state
339
Map<String, String> currentConfig = executor.getSessionConfigMap(sessionId);
340
currentConfig.forEach((key, value) ->
341
System.out.println(key + " = " + value));
342
```
343
344
### Configuration Inheritance
345
346
Configuration inheritance and override patterns:
347
348
```java
349
// Base configuration from CLI options
350
DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
351
352
// Session inherits from default context
353
SessionContext sessionContext = LocalContextUtils.buildSessionContext(sessionId, defaultContext);
354
355
// Session-specific overrides
356
sessionContext.set("table.exec.resource.default-parallelism", "8");
357
358
// Property lookup order:
359
// 1. Session-specific properties
360
// 2. Default context properties
361
// 3. Flink configuration defaults
362
```
363
364
The configuration system provides comprehensive control over SQL client behavior, execution performance, and result display, enabling fine-tuned optimization for different use cases and environments.