or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-line-interface.mdconfiguration.mdindex.mdinteractive-repl.md

configuration.mddocs/

0

# Configuration Management

1

2

The Flink Scala Shell provides comprehensive configuration management for cluster connections, execution modes, and resource allocation across local, remote, and YARN environments.

3

4

## Imports

5

6

```scala

7

import org.apache.flink.api.scala.FlinkShell.{Config, ExecutionMode, YarnConfig}

8

import org.apache.flink.configuration.Configuration

9

import org.apache.flink.client.program.ClusterClient

10

import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterClient}

11

```

12

13

## Capabilities

14

15

### Core Configuration

16

17

Main configuration container for shell execution settings.

18

19

```scala { .api }

20

/**

21

* Configuration container for Flink Scala Shell execution settings

22

* @param host Optional remote cluster host address

23

* @param port Optional remote cluster port number

24

* @param externalJars Optional array of external JAR file paths

25

* @param executionMode Cluster execution mode (LOCAL, REMOTE, YARN, UNDEFINED)

26

* @param yarnConfig Optional YARN-specific configuration settings

27

* @param configDir Optional path to Flink configuration directory

28

*/

29

case class Config(

30

host: Option[String] = None,

31

port: Option[Int] = None,

32

externalJars: Option[Array[String]] = None,

33

executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,

34

yarnConfig: Option[YarnConfig] = None,

35

configDir: Option[String] = None

36

)

37

```

38

39

**Usage Examples:**

40

```scala

41

// Local execution configuration

42

val localConfig = Config(executionMode = ExecutionMode.LOCAL)

43

44

// Remote execution configuration

45

val remoteConfig = Config(

46

host = Some("flink-cluster.example.com"),

47

port = Some(8081),

48

executionMode = ExecutionMode.REMOTE

49

)

50

51

// YARN execution with external JARs

52

val yarnConfig = Config(

53

executionMode = ExecutionMode.YARN,

54

externalJars = Some(Array("/path/to/lib1.jar", "/path/to/lib2.jar")),

55

yarnConfig = Some(YarnConfig(

56

jobManagerMemory = Some("1024m"),

57

taskManagerMemory = Some("2048m")

58

))

59

)

60

```

61

62

### Execution Mode Enumeration

63

64

Defines available cluster execution modes for the shell.

65

66

```scala { .api }

67

/**

68

* Enumeration of supported execution modes

69

*/

70

object ExecutionMode extends Enumeration {

71

/**

72

* Undefined execution mode (initial state)

73

*/

74

val UNDEFINED: ExecutionMode.Value

75

76

/**

77

* Local mini-cluster execution mode for development and testing

78

*/

79

val LOCAL: ExecutionMode.Value

80

81

/**

82

* Remote cluster connection mode for existing Flink clusters

83

*/

84

val REMOTE: ExecutionMode.Value

85

86

/**

87

* YARN cluster execution mode for Hadoop environments

88

*/

89

val YARN: ExecutionMode.Value

90

}

91

```

92

93

### YARN Configuration

94

95

Specialized configuration for YARN cluster deployments with resource management options.

96

97

```scala { .api }

98

/**

99

* YARN-specific configuration for cluster resource allocation and job management

100

* @param jobManagerMemory Optional memory allocation for JobManager container (e.g., "1024m", "2g")

101

* @param name Optional custom application name displayed in YARN UI

102

* @param queue Optional YARN queue for job submission and resource isolation

103

* @param slots Optional number of task slots per TaskManager for parallelism control

104

* @param taskManagerMemory Optional memory allocation per TaskManager container

105

*/

106

case class YarnConfig(

107

jobManagerMemory: Option[String] = None,

108

name: Option[String] = None,

109

queue: Option[String] = None,

110

slots: Option[Int] = None,

111

taskManagerMemory: Option[String] = None

112

)

113

```

114

115

**Usage Examples:**

116

```scala

117

// Basic YARN configuration

118

val basicYarn = YarnConfig(

119

jobManagerMemory = Some("1024m"),

120

taskManagerMemory = Some("2048m")

121

)

122

123

// Advanced YARN configuration with resource management

124

val advancedYarn = YarnConfig(

125

jobManagerMemory = Some("2g"),

126

taskManagerMemory = Some("4g"),

127

name = Some("Flink Scala Shell - Data Analysis"),

128

queue = Some("analytics"),

129

slots = Some(8)

130

)

131

```

132

133

### Connection Management

134

135

Handles cluster connection setup and configuration resolution based on execution mode.

136

137

```scala { .api }

138

/**

139

* Fetches connection information and sets up cluster configuration

140

* @param config Shell configuration with execution mode and parameters

141

* @param flinkConfig Base Flink configuration

142

* @return Tuple of effective configuration and optional cluster client

143

*/

144

@Internal

145

def fetchConnectionInfo(

146

config: Config,

147

flinkConfig: Configuration

148

): (Configuration, Option[ClusterClient[_]])

149

```

150

151

### Local Cluster Configuration

152

153

Creates and configures local mini-cluster for development and testing.

154

155

```scala { .api }

156

/**

157

* Creates local mini-cluster configuration and client

158

* @param flinkConfig Base Flink configuration

159

* @return Tuple of effective configuration and cluster client

160

*/

161

private def createLocalClusterAndConfig(

162

flinkConfig: Configuration

163

): (Configuration, Some[MiniClusterClient])

164

165

/**

166

* Creates local mini-cluster instance with specified configuration

167

* @param flinkConfig Configuration for cluster setup

168

* @return Started MiniCluster instance

169

*/

170

private def createLocalCluster(flinkConfig: Configuration): MiniCluster

171

```

172

173

**Local Cluster Features:**

174

- Automatic port allocation (uses port 0 for dynamic assignment)

175

- JobManager and TaskManager co-location

176

- Configurable task slots and parallelism

177

- Immediate cluster startup and connection

178

179

### Remote Cluster Configuration

180

181

Configures connection to existing remote Flink clusters.

182

183

```scala { .api }

184

/**

185

* Creates configuration for remote cluster connection

186

* @param config Shell configuration with host and port

187

* @param flinkConfig Base Flink configuration

188

* @return Tuple of effective configuration and None (no local client)

189

*/

190

private def createRemoteConfig(

191

config: Config,

192

flinkConfig: Configuration

193

): (Configuration, None.type)

194

195

/**

196

* Sets JobManager connection information in configuration

197

* @param config Configuration to modify

198

* @param host JobManager host address

199

* @param port JobManager port number

200

*/

201

private def setJobManagerInfoToConfig(

202

config: Configuration,

203

host: String,

204

port: Integer

205

): Unit

206

```

207

208

**Remote Configuration Requirements:**

209

- Valid host address (IP or hostname)

210

- Accessible JobManager port

211

- Network connectivity to Flink cluster

212

- Compatible Flink version

213

214

### YARN Configuration Management

215

216

Handles YARN cluster deployment and connection configuration.

217

218

```scala { .api }

219

/**

220

* Creates or connects to YARN cluster based on configuration

221

* @param config Shell configuration with YARN settings

222

* @param flinkConfig Base Flink configuration

223

* @return Tuple of effective configuration and optional cluster client

224

*/

225

private def createYarnClusterIfNeededAndGetConfig(

226

config: Config,

227

flinkConfig: Configuration

228

): (Configuration, Option[ClusterClient[_]])

229

230

/**

231

* Deploys new YARN cluster with specified configuration

232

* @param config Shell configuration

233

* @param flinkConfig Base Flink configuration

234

* @return Tuple of effective configuration and cluster client

235

*/

236

private def deployNewYarnCluster(

237

config: Config,

238

flinkConfig: Configuration

239

): (Configuration, Some[ClusterClient[_]])

240

241

/**

242

* Fetches configuration information from existing YARN cluster

243

* @param config Shell configuration

244

* @param flinkConfig Base Flink configuration

245

* @param mode Execution mode string ("yarn-cluster", "default")

246

* @return Tuple of effective configuration and None (no local client)

247

*/

248

private def fetchDeployedYarnClusterInfo(

249

config: Config,

250

flinkConfig: Configuration,

251

mode: String

252

): (Configuration, None.type)

253

254

/**

255

* Ensures YarnConfig exists, creating default if necessary

256

* @param config Shell configuration

257

* @return YarnConfig instance (existing or default)

258

*/

259

@Internal

260

def ensureYarnConfig(config: Config): YarnConfig

261

```

262

263

### Command-Line Argument Parsing

264

265

Converts configuration objects to command-line arguments for cluster deployment.

266

267

```scala { .api }

268

/**

269

* Converts configuration to command-line argument array for cluster deployment

270

* @param config Shell configuration

271

* @param mode Execution mode string ("local", "remote", "yarn-cluster", "default")

272

* @return Array of command-line arguments

273

*/

274

def parseArgList(config: Config, mode: String): Array[String]

275

```

276

277

**Generated Arguments Examples:**

278

```scala

279

// YARN mode arguments

280

parseArgList(yarnConfig, "yarn-cluster")

281

// Returns: Array("-m", "yarn-cluster", "-yjm", "1024m", "-ytm", "2048m", "-ynm", "MyApp")

282

283

// Default mode (no specific arguments)

284

parseArgList(defaultConfig, "default")

285

// Returns: Array()

286

```

287

288

### Configuration Directory Resolution

289

290

Manages Flink configuration directory discovery and loading.

291

292

```scala { .api }

293

/**

294

* Resolves configuration directory from multiple sources

295

* Priority: command-line option > environment variable > default

296

* @param config Shell configuration with optional configDir

297

* @return Resolved configuration directory path

298

*/

299

private def getConfigDir(config: Config): String

300

301

/**

302

* Loads global Flink configuration from resolved directory

303

* @param config Shell configuration

304

* @return Loaded Configuration instance with merged settings

305

*/

306

private def getGlobalConfig(config: Config): Configuration

307

```

308

309

**Configuration Resolution Order:**

310

1. `--configDir` command-line option (highest priority)

311

2. `FLINK_CONF_DIR` environment variable

312

3. `CliFrontend.getConfigurationDirectoryFromEnv()` (Flink default)

313

314

### Error Handling and Validation

315

316

Comprehensive validation and error handling for configuration issues.

317

318

**Common Configuration Errors:**

319

- Missing host/port for remote mode

320

- Invalid YARN configuration parameters

321

- Inaccessible configuration directories

322

- Network connectivity issues

323

- Incompatible Flink versions

324

325

**Error Handling Patterns:**

326

```scala

327

// Remote mode validation

328

if (config.host.isEmpty || config.port.isEmpty) {

329

throw new IllegalArgumentException("<host> or <port> is not specified!")

330

}

331

332

// Execution mode validation

333

config.executionMode match {

334

case ExecutionMode.UNDEFINED =>

335

throw new IllegalArgumentException("please specify execution mode: [local | remote <host> <port> | yarn]")

336

case _ => // Continue with valid mode

337

}

338

```

339

340

## Configuration Examples

341

342

### Development Setup (Local)

343

```scala

344

val devConfig = Config(

345

executionMode = ExecutionMode.LOCAL,

346

externalJars = Some(Array("/path/to/test-data.jar")),

347

configDir = Some("/opt/flink/conf")

348

)

349

```

350

351

### Production Remote Cluster

352

```scala

353

val prodConfig = Config(

354

host = Some("prod-flink-cluster.company.com"),

355

port = Some(8081),

356

executionMode = ExecutionMode.REMOTE,

357

externalJars = Some(Array(

358

"/shared/libs/company-commons.jar",

359

"/shared/libs/data-connectors.jar"

360

))

361

)

362

```

363

364

### YARN Analytics Environment

365

```scala

366

val analyticsConfig = Config(

367

executionMode = ExecutionMode.YARN,

368

yarnConfig = Some(YarnConfig(

369

jobManagerMemory = Some("4g"),

370

taskManagerMemory = Some("8g"),

371

name = Some("Analytics Shell - User Research"),

372

queue = Some("analytics-queue"),

373

slots = Some(16)

374

)),

375

externalJars = Some(Array("/analytics/libs/ml-algorithms.jar"))

376

)

377

```