0
# Configuration Management
1
2
The Flink Scala Shell provides comprehensive configuration management for cluster connections, execution modes, and resource allocation across local, remote, and YARN environments.
3
4
## Imports
5
6
```scala
7
import org.apache.flink.api.scala.FlinkShell.{Config, ExecutionMode, YarnConfig}
8
import org.apache.flink.configuration.Configuration
9
import org.apache.flink.client.program.ClusterClient
10
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterClient}
11
```
12
13
## Capabilities
14
15
### Core Configuration
16
17
Main configuration container for shell execution settings.
18
19
```scala { .api }
20
/**
21
* Configuration container for Flink Scala Shell execution settings
22
* @param host Optional remote cluster host address
23
* @param port Optional remote cluster port number
24
* @param externalJars Optional array of external JAR file paths
25
* @param executionMode Cluster execution mode (LOCAL, REMOTE, YARN, UNDEFINED)
26
* @param yarnConfig Optional YARN-specific configuration settings
27
* @param configDir Optional path to Flink configuration directory
28
*/
29
case class Config(
30
host: Option[String] = None,
31
port: Option[Int] = None,
32
externalJars: Option[Array[String]] = None,
33
executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
34
yarnConfig: Option[YarnConfig] = None,
35
configDir: Option[String] = None
36
)
37
```
38
39
**Usage Examples:**
40
```scala
41
// Local execution configuration
42
val localConfig = Config(executionMode = ExecutionMode.LOCAL)
43
44
// Remote execution configuration
45
val remoteConfig = Config(
46
host = Some("flink-cluster.example.com"),
47
port = Some(8081),
48
executionMode = ExecutionMode.REMOTE
49
)
50
51
// YARN execution with external JARs
52
val yarnConfig = Config(
53
executionMode = ExecutionMode.YARN,
54
externalJars = Some(Array("/path/to/lib1.jar", "/path/to/lib2.jar")),
55
yarnConfig = Some(YarnConfig(
56
jobManagerMemory = Some("1024m"),
57
taskManagerMemory = Some("2048m")
58
))
59
)
60
```
61
62
### Execution Mode Enumeration
63
64
Defines available cluster execution modes for the shell.
65
66
```scala { .api }
67
/**
68
* Enumeration of supported execution modes
69
*/
70
object ExecutionMode extends Enumeration {
71
/**
72
* Undefined execution mode (initial state)
73
*/
74
val UNDEFINED: ExecutionMode.Value
75
76
/**
77
* Local mini-cluster execution mode for development and testing
78
*/
79
val LOCAL: ExecutionMode.Value
80
81
/**
82
* Remote cluster connection mode for existing Flink clusters
83
*/
84
val REMOTE: ExecutionMode.Value
85
86
/**
87
* YARN cluster execution mode for Hadoop environments
88
*/
89
val YARN: ExecutionMode.Value
90
}
91
```
92
93
### YARN Configuration
94
95
Specialized configuration for YARN cluster deployments with resource management options.
96
97
```scala { .api }
98
/**
99
* YARN-specific configuration for cluster resource allocation and job management
100
* @param jobManagerMemory Optional memory allocation for JobManager container (e.g., "1024m", "2g")
101
* @param name Optional custom application name displayed in YARN UI
102
* @param queue Optional YARN queue for job submission and resource isolation
103
* @param slots Optional number of task slots per TaskManager for parallelism control
104
* @param taskManagerMemory Optional memory allocation per TaskManager container
105
*/
106
case class YarnConfig(
107
jobManagerMemory: Option[String] = None,
108
name: Option[String] = None,
109
queue: Option[String] = None,
110
slots: Option[Int] = None,
111
taskManagerMemory: Option[String] = None
112
)
113
```
114
115
**Usage Examples:**
116
```scala
117
// Basic YARN configuration
118
val basicYarn = YarnConfig(
119
jobManagerMemory = Some("1024m"),
120
taskManagerMemory = Some("2048m")
121
)
122
123
// Advanced YARN configuration with resource management
124
val advancedYarn = YarnConfig(
125
jobManagerMemory = Some("2g"),
126
taskManagerMemory = Some("4g"),
127
name = Some("Flink Scala Shell - Data Analysis"),
128
queue = Some("analytics"),
129
slots = Some(8)
130
)
131
```
132
133
### Connection Management
134
135
Handles cluster connection setup and configuration resolution based on execution mode.
136
137
```scala { .api }
138
/**
139
* Fetches connection information and sets up cluster configuration
140
* @param config Shell configuration with execution mode and parameters
141
* @param flinkConfig Base Flink configuration
142
* @return Tuple of effective configuration and optional cluster client
143
*/
144
@Internal
145
def fetchConnectionInfo(
146
config: Config,
147
flinkConfig: Configuration
148
): (Configuration, Option[ClusterClient[_]])
149
```
150
151
### Local Cluster Configuration
152
153
Creates and configures local mini-cluster for development and testing.
154
155
```scala { .api }
156
/**
157
* Creates local mini-cluster configuration and client
158
* @param flinkConfig Base Flink configuration
159
* @return Tuple of effective configuration and cluster client
160
*/
161
private def createLocalClusterAndConfig(
162
flinkConfig: Configuration
163
): (Configuration, Some[MiniClusterClient])
164
165
/**
166
* Creates local mini-cluster instance with specified configuration
167
* @param flinkConfig Configuration for cluster setup
168
* @return Started MiniCluster instance
169
*/
170
private def createLocalCluster(flinkConfig: Configuration): MiniCluster
171
```
172
173
**Local Cluster Features:**
174
- Automatic port allocation (uses port 0 for dynamic assignment)
175
- JobManager and TaskManager co-location
176
- Configurable task slots and parallelism
177
- Immediate cluster startup and connection
178
179
### Remote Cluster Configuration
180
181
Configures connection to existing remote Flink clusters.
182
183
```scala { .api }
184
/**
185
* Creates configuration for remote cluster connection
186
* @param config Shell configuration with host and port
187
* @param flinkConfig Base Flink configuration
188
* @return Tuple of effective configuration and None (no local client)
189
*/
190
private def createRemoteConfig(
191
config: Config,
192
flinkConfig: Configuration
193
): (Configuration, None.type)
194
195
/**
196
* Sets JobManager connection information in configuration
197
* @param config Configuration to modify
198
* @param host JobManager host address
199
* @param port JobManager port number
200
*/
201
private def setJobManagerInfoToConfig(
202
config: Configuration,
203
host: String,
204
port: Integer
205
): Unit
206
```
207
208
**Remote Configuration Requirements:**
209
- Valid host address (IP or hostname)
210
- Accessible JobManager port
211
- Network connectivity to Flink cluster
212
- Compatible Flink version
213
214
### YARN Configuration Management
215
216
Handles YARN cluster deployment and connection configuration.
217
218
```scala { .api }
219
/**
220
* Creates or connects to YARN cluster based on configuration
221
* @param config Shell configuration with YARN settings
222
* @param flinkConfig Base Flink configuration
223
* @return Tuple of effective configuration and optional cluster client
224
*/
225
private def createYarnClusterIfNeededAndGetConfig(
226
config: Config,
227
flinkConfig: Configuration
228
): (Configuration, Option[ClusterClient[_]])
229
230
/**
231
* Deploys new YARN cluster with specified configuration
232
* @param config Shell configuration
233
* @param flinkConfig Base Flink configuration
234
* @return Tuple of effective configuration and cluster client
235
*/
236
private def deployNewYarnCluster(
237
config: Config,
238
flinkConfig: Configuration
239
): (Configuration, Some[ClusterClient[_]])
240
241
/**
242
* Fetches configuration information from existing YARN cluster
243
* @param config Shell configuration
244
* @param flinkConfig Base Flink configuration
245
* @param mode Execution mode string ("yarn-cluster", "default")
246
* @return Tuple of effective configuration and None (no local client)
247
*/
248
private def fetchDeployedYarnClusterInfo(
249
config: Config,
250
flinkConfig: Configuration,
251
mode: String
252
): (Configuration, None.type)
253
254
/**
255
* Ensures YarnConfig exists, creating default if necessary
256
* @param config Shell configuration
257
* @return YarnConfig instance (existing or default)
258
*/
259
@Internal
260
def ensureYarnConfig(config: Config): YarnConfig
261
```
262
263
### Command-Line Argument Parsing
264
265
Converts configuration objects to command-line arguments for cluster deployment.
266
267
```scala { .api }
268
/**
269
* Converts configuration to command-line argument array for cluster deployment
270
* @param config Shell configuration
271
* @param mode Execution mode string ("local", "remote", "yarn-cluster", "default")
272
* @return Array of command-line arguments
273
*/
274
def parseArgList(config: Config, mode: String): Array[String]
275
```
276
277
**Generated Arguments Examples:**
278
```scala
279
// YARN mode arguments
280
parseArgList(yarnConfig, "yarn-cluster")
281
// Returns: Array("-m", "yarn-cluster", "-yjm", "1024m", "-ytm", "2048m", "-ynm", "MyApp")
282
283
// Default mode (no specific arguments)
284
parseArgList(defaultConfig, "default")
285
// Returns: Array()
286
```
287
288
### Configuration Directory Resolution
289
290
Manages Flink configuration directory discovery and loading.
291
292
```scala { .api }
293
/**
294
* Resolves configuration directory from multiple sources
295
* Priority: command-line option > environment variable > default
296
* @param config Shell configuration with optional configDir
297
* @return Resolved configuration directory path
298
*/
299
private def getConfigDir(config: Config): String
300
301
/**
302
* Loads global Flink configuration from resolved directory
303
* @param config Shell configuration
304
* @return Loaded Configuration instance with merged settings
305
*/
306
private def getGlobalConfig(config: Config): Configuration
307
```
308
309
**Configuration Resolution Order:**
310
1. `--configDir` command-line option (highest priority)
311
2. `FLINK_CONF_DIR` environment variable
312
3. `CliFrontend.getConfigurationDirectoryFromEnv()` (Flink default)
313
314
### Error Handling and Validation
315
316
Comprehensive validation and error handling for configuration issues.
317
318
**Common Configuration Errors:**
319
- Missing host/port for remote mode
320
- Invalid YARN configuration parameters
321
- Inaccessible configuration directories
322
- Network connectivity issues
323
- Incompatible Flink versions
324
325
**Error Handling Patterns:**
326
```scala
327
// Remote mode validation
328
if (config.host.isEmpty || config.port.isEmpty) {
329
throw new IllegalArgumentException("<host> or <port> is not specified!")
330
}
331
332
// Execution mode validation
333
config.executionMode match {
334
case ExecutionMode.UNDEFINED =>
335
throw new IllegalArgumentException("please specify execution mode: [local | remote <host> <port> | yarn]")
336
case _ => // Continue with valid mode
337
}
338
```
339
340
## Configuration Examples
341
342
### Development Setup (Local)
343
```scala
344
val devConfig = Config(
345
executionMode = ExecutionMode.LOCAL,
346
externalJars = Some(Array("/path/to/test-data.jar")),
347
configDir = Some("/opt/flink/conf")
348
)
349
```
350
351
### Production Remote Cluster
352
```scala
353
val prodConfig = Config(
354
host = Some("prod-flink-cluster.company.com"),
355
port = Some(8081),
356
executionMode = ExecutionMode.REMOTE,
357
externalJars = Some(Array(
358
"/shared/libs/company-commons.jar",
359
"/shared/libs/data-connectors.jar"
360
))
361
)
362
```
363
364
### YARN Analytics Environment
365
```scala
366
val analyticsConfig = Config(
367
executionMode = ExecutionMode.YARN,
368
yarnConfig = Some(YarnConfig(
369
jobManagerMemory = Some("4g"),
370
taskManagerMemory = Some("8g"),
371
name = Some("Analytics Shell - User Research"),
372
queue = Some("analytics-queue"),
373
slots = Some(16)
374
)),
375
externalJars = Some(Array("/analytics/libs/ml-algorithms.jar"))
376
)
377
```