or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

configuration.mddocs/

0

# Configuration System

1

2

Apache Flink's configuration system provides a flexible and type-safe way to manage application and cluster settings. The system supports default values, validation, documentation, and seamless integration with the execution environment.

3

4

## Configuration Basics

5

6

### Configuration Objects

7

8

The core configuration classes for reading and writing configuration values.

9

10

```java { .api }

11

import org.apache.flink.configuration.Configuration;

12

import org.apache.flink.configuration.ConfigOption;

13

import org.apache.flink.configuration.ConfigOptions;

14

15

// Basic configuration usage

16

public class ConfigurationBasics {

17

18

public static void basicConfigurationExample() {

19

Configuration config = new Configuration();

20

21

// Set basic values

22

config.setString("my.string.key", "hello world");

23

config.setInteger("my.int.key", 42);

24

config.setBoolean("my.boolean.key", true);

25

config.setLong("my.long.key", 1234567890L);

26

config.setDouble("my.double.key", 3.14159);

27

28

// Read values

29

String stringValue = config.getString("my.string.key", "default");

30

int intValue = config.getInteger("my.int.key", 0);

31

boolean boolValue = config.getBoolean("my.boolean.key", false);

32

long longValue = config.getLong("my.long.key", 0L);

33

double doubleValue = config.getDouble("my.double.key", 0.0);

34

35

// Check if key exists

36

boolean hasKey = config.containsKey("my.string.key");

37

38

// Get all keys

39

Set<String> allKeys = config.keySet();

40

41

// Convert to map

42

Map<String, String> configMap = config.toMap();

43

}

44

45

public static void createFromMap() {

46

// Create configuration from map

47

Map<String, String> properties = new HashMap<>();

48

properties.put("parallelism.default", "4");

49

properties.put("taskmanager.memory.process.size", "1024m");

50

51

Configuration config = Configuration.fromMap(properties);

52

}

53

}

54

```

55

56

### ConfigOption System

57

58

Type-safe configuration options with metadata and validation.

59

60

```java { .api }

61

import org.apache.flink.configuration.ConfigOption;

62

import org.apache.flink.configuration.ConfigOptions;

63

import org.apache.flink.configuration.description.Description;

64

65

public class MyConfigOptions {

66

67

// String configuration option

68

public static final ConfigOption<String> DATABASE_URL =

69

ConfigOptions.key("database.url")

70

.stringType()

71

.noDefaultValue()

72

.withDescription("The URL of the database to connect to");

73

74

// Integer option with default value

75

public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =

76

ConfigOptions.key("database.connection.pool.size")

77

.intType()

78

.defaultValue(10)

79

.withDescription("Maximum number of database connections in the pool");

80

81

// Boolean option

82

public static final ConfigOption<Boolean> ENABLE_METRICS =

83

ConfigOptions.key("metrics.enabled")

84

.booleanType()

85

.defaultValue(true)

86

.withDescription("Whether to enable metrics collection");

87

88

// Duration option

89

public static final ConfigOption<Duration> TIMEOUT =

90

ConfigOptions.key("request.timeout")

91

.durationType()

92

.defaultValue(Duration.ofSeconds(30))

93

.withDescription("Timeout for requests to external systems");

94

95

// Memory size option

96

public static final ConfigOption<MemorySize> BUFFER_SIZE =

97

ConfigOptions.key("buffer.size")

98

.memoryType()

99

.defaultValue(MemorySize.ofMebiBytes(64))

100

.withDescription("Size of the internal buffer");

101

102

// Enum option

103

public static final ConfigOption<CompressionType> COMPRESSION =

104

ConfigOptions.key("compression.type")

105

.enumType(CompressionType.class)

106

.defaultValue(CompressionType.GZIP)

107

.withDescription("Compression algorithm to use");

108

109

// List option

110

public static final ConfigOption<List<String>> ALLOWED_HOSTS =

111

ConfigOptions.key("security.allowed.hosts")

112

.stringType()

113

.asList()

114

.defaultValues("localhost", "127.0.0.1")

115

.withDescription("List of allowed host names");

116

117

// Map option

118

public static final ConfigOption<Map<String, String>> CUSTOM_PROPERTIES =

119

ConfigOptions.key("custom.properties")

120

.mapType()

121

.defaultValue(Collections.emptyMap())

122

.withDescription("Custom key-value properties");

123

}

124

125

// Using config options

126

public class ConfigOptionUsage {

127

128

public static void useConfigOptions() {

129

Configuration config = new Configuration();

130

131

// Set values using config options

132

config.set(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost:5432/mydb");

133

config.set(MyConfigOptions.CONNECTION_POOL_SIZE, 20);

134

config.set(MyConfigOptions.ENABLE_METRICS, false);

135

136

// Read values using config options

137

String dbUrl = config.get(MyConfigOptions.DATABASE_URL);

138

int poolSize = config.get(MyConfigOptions.CONNECTION_POOL_SIZE);

139

boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);

140

141

// Use optional for potentially missing values

142

Optional<String> optionalUrl = config.getOptional(MyConfigOptions.DATABASE_URL);

143

144

// Check if option has been set

145

boolean hasUrl = config.contains(MyConfigOptions.DATABASE_URL);

146

}

147

}

148

```

149

150

### Advanced Configuration Options

151

152

```java { .api }

153

// Complex configuration options with validation and fallbacks

154

public class AdvancedConfigOptions {

155

156

// Option with deprecated keys

157

public static final ConfigOption<Integer> PARALLELISM =

158

ConfigOptions.key("parallelism.default")

159

.intType()

160

.defaultValue(1)

161

.withDeprecatedKeys("env.parallelism", "taskmanager.parallelism")

162

.withDescription("The default parallelism for operators");

163

164

// Option with fallback keys

165

public static final ConfigOption<String> CHECKPOINT_DIR =

166

ConfigOptions.key("state.checkpoints.dir")

167

.stringType()

168

.noDefaultValue()

169

.withFallbackKeys("state.backend.fs.checkpointdir")

170

.withDescription("Directory for storing checkpoints");

171

172

// Option with rich description

173

public static final ConfigOption<Duration> CHECKPOINT_INTERVAL =

174

ConfigOptions.key("execution.checkpointing.interval")

175

.durationType()

176

.noDefaultValue()

177

.withDescription(

178

Description.builder()

179

.text("Interval between consecutive checkpoints. ")

180

.text("Setting this value enables checkpointing. ")

181

.linebreak()

182

.text("Example: 10s, 5min, 1h")

183

.build()

184

);

185

186

// Option with validation

187

public static final ConfigOption<Integer> NETWORK_BUFFERS =

188

ConfigOptions.key("taskmanager.network.numberOfBuffers")

189

.intType()

190

.defaultValue(2048)

191

.withDescription("Number of network buffers available to each TaskManager")

192

.withValidator(value -> {

193

if (value < 1) {

194

throw new IllegalArgumentException("Number of buffers must be positive");

195

}

196

if (value > 100000) {

197

throw new IllegalArgumentException("Number of buffers too large (max: 100000)");

198

}

199

});

200

}

201

```

202

203

## Built-in Configuration Options

204

205

### Core Options

206

207

```java { .api }

208

import org.apache.flink.configuration.CoreOptions;

209

import org.apache.flink.configuration.CheckpointingOptions;

210

import org.apache.flink.configuration.TaskManagerOptions;

211

212

public class BuiltInOptions {

213

214

public static void coreConfigurationExample() {

215

Configuration config = new Configuration();

216

217

// Core execution options

218

config.set(CoreOptions.DEFAULT_PARALLELISM, 4);

219

config.set(CoreOptions.TMP_DIRS, "/tmp/flink");

220

config.set(CoreOptions.FLINK_SHUTDOWN_TIMEOUT, Duration.ofMinutes(1));

221

222

// Checkpointing options

223

config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));

224

config.set(CheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMinutes(10));

225

config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);

226

config.set(CheckpointingOptions.CHECKPOINT_STORAGE_ACCESS_ENV_VAR, "CHECKPOINT_STORAGE");

227

228

// TaskManager options

229

config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1024));

230

config.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);

231

config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(512));

232

}

233

234

public static void readBuiltInOptions() {

235

Configuration config = new Configuration();

236

237

// Read core options

238

int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM);

239

String tmpDirs = config.get(CoreOptions.TMP_DIRS);

240

241

// Read checkpointing options

242

Optional<Duration> checkpointInterval =

243

config.getOptional(CheckpointingOptions.CHECKPOINTING_INTERVAL);

244

245

if (checkpointInterval.isPresent()) {

246

System.out.println("Checkpointing enabled with interval: " +

247

checkpointInterval.get());

248

}

249

}

250

}

251

```

252

253

### Memory Configuration

254

255

```java { .api }

256

import org.apache.flink.configuration.MemorySize;

257

import org.apache.flink.configuration.TaskManagerOptions;

258

259

public class MemoryConfiguration {

260

261

public static void configureTaskManagerMemory() {

262

Configuration config = new Configuration();

263

264

// Total process memory

265

config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g"));

266

267

// Or configure individual memory components

268

config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024));

269

config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(512));

270

config.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.ofMebiBytes(128));

271

config.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(128));

272

config.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(64));

273

274

// Network memory

275

config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(64));

276

config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.ofMebiBytes(1024));

277

config.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.1f);

278

}

279

280

public static void memoryUtilities() {

281

// Creating memory sizes

282

MemorySize size1 = MemorySize.ofBytes(1024);

283

MemorySize size2 = MemorySize.ofMebiBytes(64);

284

MemorySize size3 = MemorySize.parse("1gb");

285

MemorySize size4 = MemorySize.parse("512mb");

286

287

// Memory arithmetic

288

MemorySize total = size2.add(size3);

289

MemorySize difference = size3.subtract(size2);

290

MemorySize scaled = size2.multiply(2);

291

292

// Conversions

293

long bytes = size3.getBytes();

294

long kiloBytes = size3.getKibiBytes();

295

long megaBytes = size3.getMebiBytes();

296

297

// Formatting

298

String humanReadable = size3.toHumanReadableString(); // "1 gb"

299

}

300

}

301

```

302

303

## Configuration Validation and Utilities

304

305

### Configuration Validation

306

307

```java { .api }

308

public class ConfigurationValidation {

309

310

// Custom validator function

311

public static final ConfigOption<Integer> PORT =

312

ConfigOptions.key("server.port")

313

.intType()

314

.defaultValue(8080)

315

.withDescription("Server port number")

316

.withValidator(ConfigurationValidation::validatePort);

317

318

private static void validatePort(Integer port) {

319

if (port < 1 || port > 65535) {

320

throw new IllegalArgumentException(

321

"Port must be between 1 and 65535, got: " + port);

322

}

323

if (port < 1024) {

324

System.out.println("Warning: Using privileged port " + port);

325

}

326

}

327

328

// Validate configuration object

329

public static void validateConfiguration(Configuration config) {

330

// Check required options

331

if (!config.contains(MyConfigOptions.DATABASE_URL)) {

332

throw new IllegalArgumentException("Database URL is required");

333

}

334

335

// Validate option combinations

336

boolean metricsEnabled = config.get(MyConfigOptions.ENABLE_METRICS);

337

if (metricsEnabled && !config.contains(MyConfigOptions.CUSTOM_PROPERTIES)) {

338

System.out.println("Warning: Metrics enabled but no custom properties set");

339

}

340

341

// Validate memory settings

342

MemorySize heapMemory = config.get(TaskManagerOptions.TASK_HEAP_MEMORY);

343

MemorySize totalMemory = config.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);

344

345

if (heapMemory.compareTo(totalMemory) > 0) {

346

throw new IllegalArgumentException(

347

"Heap memory cannot exceed total process memory");

348

}

349

}

350

}

351

```

352

353

### Configuration Utilities

354

355

```java { .api }

356

import org.apache.flink.configuration.ConfigUtils;

357

import org.apache.flink.configuration.ConfigurationUtils;

358

359

public class ConfigurationUtilities {

360

361

public static void configurationUtilsExample() {

362

Configuration config = new Configuration();

363

364

// Parse string to map

365

String propertiesString = "key1:value1,key2:value2";

366

Map<String, String> parsed = ConfigurationUtils.parseStringToMap(

367

propertiesString, ",", ":");

368

369

// Encode/decode arrays and collections

370

List<String> hosts = Arrays.asList("host1", "host2", "host3");

371

String encoded = ConfigUtils.encodeCollectionToConfig(

372

config, "allowed.hosts", hosts, Object::toString);

373

374

List<String> decoded = ConfigUtils.decodeListFromConfig(

375

config, "allowed.hosts", String::valueOf);

376

377

// Parse temporary directories

378

String tempDirsConfig = "/tmp1,/tmp2,/tmp3";

379

String[] tempDirs = ConfigurationUtils.parseTempDirectories(tempDirsConfig);

380

String randomTempDir = ConfigurationUtils.getRandomTempDirectory(tempDirs);

381

}

382

383

public static void workingWithMaps() {

384

Configuration config = new Configuration();

385

386

// Set map values

387

Map<String, String> properties = new HashMap<>();

388

properties.put("timeout", "30s");

389

properties.put("retries", "3");

390

properties.put("compression", "gzip");

391

392

config.set(MyConfigOptions.CUSTOM_PROPERTIES, properties);

393

394

// Read map values

395

Map<String, String> readProperties = config.get(MyConfigOptions.CUSTOM_PROPERTIES);

396

397

// Add individual map entries

398

config.setString("custom.properties.max-connections", "100");

399

config.setString("custom.properties.buffer-size", "64kb");

400

}

401

}

402

```

403

404

## Dynamic Configuration

405

406

### Runtime Configuration Updates

407

408

```java { .api }

409

public class DynamicConfiguration {

410

411

// Configuration that can be updated at runtime

412

private volatile Configuration currentConfig;

413

private final Object configLock = new Object();

414

415

public DynamicConfiguration(Configuration initialConfig) {

416

this.currentConfig = new Configuration(initialConfig);

417

}

418

419

public void updateConfiguration(Configuration newConfig) {

420

synchronized (configLock) {

421

// Validate new configuration

422

validateConfiguration(newConfig);

423

424

// Apply updates

425

this.currentConfig = new Configuration(newConfig);

426

427

// Notify components of configuration change

428

notifyConfigurationChange();

429

}

430

}

431

432

public <T> T getConfigValue(ConfigOption<T> option) {

433

synchronized (configLock) {

434

return currentConfig.get(option);

435

}

436

}

437

438

public Configuration getSnapshot() {

439

synchronized (configLock) {

440

return new Configuration(currentConfig);

441

}

442

}

443

444

private void validateConfiguration(Configuration config) {

445

// Validation logic

446

}

447

448

private void notifyConfigurationChange() {

449

// Notify listeners about configuration changes

450

}

451

}

452

```

453

454

### Configuration Providers

455

456

```java { .api }

457

// Configuration provider interface

458

public interface ConfigurationProvider {

459

Configuration getConfiguration();

460

void addListener(ConfigurationListener listener);

461

void removeListener(ConfigurationListener listener);

462

}

463

464

public interface ConfigurationListener {

465

void onConfigurationChanged(Configuration newConfig);

466

}

467

468

// File-based configuration provider

469

public class FileConfigurationProvider implements ConfigurationProvider {

470

private final Path configFile;

471

private final List<ConfigurationListener> listeners;

472

private Configuration currentConfig;

473

private final ScheduledExecutorService watcherService;

474

475

public FileConfigurationProvider(Path configFile) {

476

this.configFile = configFile;

477

this.listeners = new CopyOnWriteArrayList<>();

478

this.currentConfig = loadConfiguration();

479

this.watcherService = Executors.newSingleThreadScheduledExecutor();

480

481

// Watch for file changes

482

watcherService.scheduleWithFixedDelay(this::checkForUpdates, 5, 5, TimeUnit.SECONDS);

483

}

484

485

@Override

486

public Configuration getConfiguration() {

487

return new Configuration(currentConfig);

488

}

489

490

@Override

491

public void addListener(ConfigurationListener listener) {

492

listeners.add(listener);

493

}

494

495

@Override

496

public void removeListener(ConfigurationListener listener) {

497

listeners.remove(listener);

498

}

499

500

private Configuration loadConfiguration() {

501

try {

502

Properties props = new Properties();

503

props.load(Files.newBufferedReader(configFile));

504

505

Configuration config = new Configuration();

506

for (String key : props.stringPropertyNames()) {

507

config.setString(key, props.getProperty(key));

508

}

509

510

return config;

511

} catch (IOException e) {

512

throw new RuntimeException("Failed to load configuration from " + configFile, e);

513

}

514

}

515

516

private void checkForUpdates() {

517

try {

518

Configuration newConfig = loadConfiguration();

519

if (!newConfig.equals(currentConfig)) {

520

currentConfig = newConfig;

521

notifyListeners(newConfig);

522

}

523

} catch (Exception e) {

524

System.err.println("Error checking for configuration updates: " + e.getMessage());

525

}

526

}

527

528

private void notifyListeners(Configuration newConfig) {

529

for (ConfigurationListener listener : listeners) {

530

try {

531

listener.onConfigurationChanged(newConfig);

532

} catch (Exception e) {

533

System.err.println("Error notifying configuration listener: " + e.getMessage());

534

}

535

}

536

}

537

}

538

```

539

540

## Configuration Best Practices

541

542

### Configuration Management Patterns

543

544

```java { .api }

545

// Configuration holder with lazy initialization

546

public class ApplicationConfig {

547

private static final ApplicationConfig INSTANCE = new ApplicationConfig();

548

549

// Configuration options

550

public static final ConfigOption<String> APP_NAME =

551

ConfigOptions.key("app.name")

552

.stringType()

553

.defaultValue("MyFlinkApp")

554

.withDescription("Application name");

555

556

public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =

557

ConfigOptions.key("app.heartbeat.interval")

558

.durationType()

559

.defaultValue(Duration.ofSeconds(30))

560

.withDescription("Heartbeat interval for health checks");

561

562

private final Configuration config;

563

564

private ApplicationConfig() {

565

this.config = loadConfiguration();

566

}

567

568

public static ApplicationConfig getInstance() {

569

return INSTANCE;

570

}

571

572

public Configuration getConfiguration() {

573

return new Configuration(config);

574

}

575

576

public <T> T get(ConfigOption<T> option) {

577

return config.get(option);

578

}

579

580

private Configuration loadConfiguration() {

581

Configuration config = new Configuration();

582

583

// Load from system properties

584

System.getProperties().forEach((key, value) -> {

585

if (key.toString().startsWith("app.")) {

586

config.setString(key.toString(), value.toString());

587

}

588

});

589

590

// Load from environment variables

591

System.getenv().forEach((key, value) -> {

592

if (key.startsWith("FLINK_")) {

593

String configKey = key.toLowerCase().replace("_", ".");

594

config.setString(configKey, value);

595

}

596

});

597

598

// Load from configuration file

599

loadFromFile(config);

600

601

return config;

602

}

603

604

private void loadFromFile(Configuration config) {

605

// Load from application.properties or flink-conf.yaml

606

try {

607

Path configPath = Paths.get("conf/application.properties");

608

if (Files.exists(configPath)) {

609

Properties props = new Properties();

610

props.load(Files.newBufferedReader(configPath));

611

612

props.forEach((key, value) ->

613

config.setString(key.toString(), value.toString()));

614

}

615

} catch (IOException e) {

616

System.err.println("Could not load configuration file: " + e.getMessage());

617

}

618

}

619

}

620

621

// Configuration builder pattern

622

public class ConfigurationBuilder {

623

private final Configuration config = new Configuration();

624

625

public static ConfigurationBuilder create() {

626

return new ConfigurationBuilder();

627

}

628

629

public ConfigurationBuilder withParallelism(int parallelism) {

630

config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);

631

return this;

632

}

633

634

public ConfigurationBuilder withCheckpointing(Duration interval) {

635

config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, interval);

636

return this;

637

}

638

639

public ConfigurationBuilder withMemory(MemorySize totalMemory) {

640

config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemory);

641

return this;

642

}

643

644

public <T> ConfigurationBuilder with(ConfigOption<T> option, T value) {

645

config.set(option, value);

646

return this;

647

}

648

649

public Configuration build() {

650

return new Configuration(config);

651

}

652

}

653

654

// Usage example

655

Configuration config = ConfigurationBuilder.create()

656

.withParallelism(8)

657

.withCheckpointing(Duration.ofMinutes(2))

658

.withMemory(MemorySize.parse("4g"))

659

.with(MyConfigOptions.DATABASE_URL, "jdbc:postgresql://localhost/db")

660

.with(MyConfigOptions.ENABLE_METRICS, true)

661

.build();

662

```

663

664

### Environment-Specific Configuration

665

666

```java { .api }

667

public class EnvironmentConfiguration {

668

669

public enum Environment {

670

DEVELOPMENT, TESTING, PRODUCTION

671

}

672

673

public static Configuration createConfiguration(Environment env) {

674

Configuration baseConfig = createBaseConfiguration();

675

676

switch (env) {

677

case DEVELOPMENT:

678

return applyDevelopmentOverrides(baseConfig);

679

case TESTING:

680

return applyTestingOverrides(baseConfig);

681

case PRODUCTION:

682

return applyProductionOverrides(baseConfig);

683

default:

684

return baseConfig;

685

}

686

}

687

688

private static Configuration createBaseConfiguration() {

689

return ConfigurationBuilder.create()

690

.withParallelism(1)

691

.with(MyConfigOptions.ENABLE_METRICS, true)

692

.with(CoreOptions.TMP_DIRS, "/tmp/flink")

693

.build();

694

}

695

696

private static Configuration applyDevelopmentOverrides(Configuration base) {

697

Configuration config = new Configuration(base);

698

config.set(CoreOptions.DEFAULT_PARALLELISM, 2);

699

config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));

700

config.setString("logging.level", "DEBUG");

701

return config;

702

}

703

704

private static Configuration applyTestingOverrides(Configuration base) {

705

Configuration config = new Configuration(base);

706

config.set(CoreOptions.DEFAULT_PARALLELISM, 1);

707

config.set(MyConfigOptions.DATABASE_URL, "jdbc:h2:mem:testdb");

708

config.setString("logging.level", "INFO");

709

return config;

710

}

711

712

private static Configuration applyProductionOverrides(Configuration base) {

713

Configuration config = new Configuration(base);

714

config.set(CoreOptions.DEFAULT_PARALLELISM, 16);

715

config.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));

716

config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("8g"));

717

config.setString("logging.level", "WARN");

718

return config;

719

}

720

}

721

```

722

723

Apache Flink's configuration system provides powerful capabilities for managing application settings in a type-safe, validated, and flexible manner. By leveraging ConfigOptions, proper validation, and configuration management patterns, you can build maintainable and robust Flink applications that adapt to different deployment environments and requirements.