or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md

utilities.mddocs/

0

# Utilities and Helpers

1

2

Collection of utility classes for Mesos integration, including artifact distribution, resource management, configuration helpers, and Mesos protobuf utilities. These utilities provide essential support functions for all aspects of Flink-Mesos integration.

3

4

## Capabilities

5

6

### Artifact Distribution

7

8

HTTP server and resolver interfaces for distributing job artifacts to Mesos tasks via Mesos Fetcher integration.

9

10

```java { .api }

11

/**

12

* Interface for resolving artifact URIs for Mesos Fetcher

13

* Provides URI resolution for files that need to be distributed to tasks

14

*/

15

public interface MesosArtifactResolver {

16

/**

17

* Resolve artifact URI for the given remote file path

18

* @param remoteFile - Path to file as it should appear in task container

19

* @return Optional URL where file can be fetched, empty if not found

20

*/

21

Option<URL> resolve(Path remoteFile);

22

}

23

24

/**

25

* HTTP server for distributing artifacts to Mesos tasks via Mesos Fetcher

26

* Extends resolver interface with server lifecycle management

27

*/

28

public interface MesosArtifactServer extends MesosArtifactResolver {

29

/**

30

* Add a local file path for distribution to tasks

31

* @param path - Local file system path to serve

32

* @param remoteFile - Path as it should appear in task containers

33

* @return URL where file can be fetched by Mesos Fetcher

34

*/

35

URL addPath(Path path, Path remoteFile);

36

37

/**

38

* Stop the artifact server and cleanup resources

39

* Closes all connections and releases server port

40

*/

41

void stop();

42

}

43

44

/**

45

* HTTP server implementation for artifact distribution

46

* Provides secure, scalable file distribution to Mesos tasks

47

*/

48

public class MesosArtifactServerImpl implements MesosArtifactServer {

49

/**

50

* Create artifact server with configuration

51

* @param hostname - Hostname for server binding

52

* @param port - Port for server (0 for automatic assignment)

53

* @param sslConfig - Optional SSL configuration for secure distribution

54

*/

55

public MesosArtifactServerImpl(String hostname, int port, SSLConfiguration sslConfig);

56

57

/**

58

* Start the artifact server

59

* @return Server URL for artifact access

60

*/

61

public URL start();

62

63

/**

64

* Get the server port (useful when auto-assigned)

65

* @return Actual port number being used

66

*/

67

public int getPort();

68

}

69

```

70

71

**Artifact Server Usage Example:**

72

73

```java

74

import org.apache.flink.mesos.util.MesosArtifactServer;

75

import org.apache.flink.mesos.util.MesosArtifactServerImpl;

76

77

// Create and start artifact server

78

MesosArtifactServerImpl server = new MesosArtifactServerImpl("master-host", 0, null);

79

URL serverUrl = server.start();

80

81

// Add job JAR for distribution

82

Path jobJarPath = Paths.get("/path/to/job.jar");

83

URL jarUrl = server.addPath(jobJarPath, Paths.get("lib/job.jar"));

84

85

// Add configuration files

86

Path configPath = Paths.get("/opt/flink/conf/flink-conf.yaml");

87

URL configUrl = server.addPath(configPath, Paths.get("conf/flink-conf.yaml"));

88

89

// URLs can now be used in Mesos TaskInfo URIs

90

// Mesos Fetcher will download files to task containers

91

```

92

93

### Configuration Utilities

94

95

Comprehensive utilities for creating and managing Mesos-specific configurations, TaskManager parameters, and container specifications.

96

97

```java { .api }

98

/**

99

* Collection of Mesos-related utility methods

100

* Provides configuration creation and management functions

101

*/

102

public class MesosUtils {

103

/**

104

* Create Mesos scheduler configuration from Flink configuration

105

* @param config - Flink configuration with Mesos settings

106

* @param hostname - Hostname for framework registration

107

* @return Configured MesosConfiguration for scheduler

108

*/

109

public static MesosConfiguration createMesosSchedulerConfiguration(Configuration config,

110

String hostname);

111

112

/**

113

* Create TaskManager parameters from configuration

114

* @param config - Flink configuration

115

* @param logger - Logger for parameter validation messages

116

* @return Configured TaskManager parameters for Mesos deployment

117

*/

118

public static MesosTaskManagerParameters createTmParameters(Configuration config,

119

Logger logger);

120

121

/**

122

* Create container specification from configuration

123

* @param config - Flink configuration with container settings

124

* @return Container specification for TaskManager deployment

125

*/

126

public static ContainerSpecification createContainerSpec(Configuration config);

127

128

/**

129

* Apply configuration overlays to container specification

130

* Merges environment variables, volumes, and other container settings

131

* @param config - Flink configuration

132

* @param containerSpec - Base container specification to modify

133

*/

134

public static void applyOverlays(Configuration config,

135

ContainerSpecification containerSpec);

136

137

/**

138

* Load and merge Flink configuration from multiple sources

139

* @param baseConfig - Base configuration

140

* @param logger - Logger for configuration loading messages

141

* @return Merged configuration with all sources applied

142

*/

143

public static Configuration loadConfiguration(Configuration baseConfig, Logger logger);

144

}

145

```

146

147

**Configuration Creation Example:**

148

149

```java

150

import org.apache.flink.configuration.Configuration;

151

import org.apache.flink.mesos.util.MesosUtils;

152

import org.apache.flink.mesos.configuration.MesosOptions;

153

154

// Create base configuration

155

Configuration config = new Configuration();

156

config.setString(MesosOptions.MASTER_URL, "mesos://master:5050");

157

config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "flink-cluster");

158

config.setDouble("taskmanager.numberOfTaskSlots", 4.0);

159

config.setString("taskmanager.memory.process.size", "2g");

160

161

// Create Mesos scheduler configuration

162

MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(

163

config, "cluster-master"

164

);

165

166

// Create TaskManager parameters

167

MesosTaskManagerParameters tmParams = MesosUtils.createTmParameters(

168

config, LoggerFactory.getLogger(MyClass.class)

169

);

170

171

// Create container specification

172

ContainerSpecification containerSpec = MesosUtils.createContainerSpec(config);

173

MesosUtils.applyOverlays(config, containerSpec);

174

```

175

176

### Mesos Protobuf Utilities

177

178

Collection of utility methods for creating and manipulating Mesos protobuf objects including resources, environment variables, and URIs.

179

180

```java { .api }

181

/**

182

* Collection of Mesos protobuf and resource utility methods

183

* Provides helpers for creating Mesos protocol buffer objects

184

*/

185

public class Utils {

186

/**

187

* Create CPU resource specification

188

* @param cpus - Number of CPU cores

189

* @return Mesos Resource for CPU allocation

190

*/

191

public static Protos.Resource cpus(double cpus);

192

193

/**

194

* Create memory resource specification

195

* @param mem - Memory amount in MB

196

* @return Mesos Resource for memory allocation

197

*/

198

public static Protos.Resource mem(double mem);

199

200

/**

201

* Create GPU resource specification

202

* @param gpus - Number of GPU units

203

* @return Mesos Resource for GPU allocation

204

*/

205

public static Protos.Resource gpus(double gpus);

206

207

/**

208

* Create disk resource specification

209

* @param disk - Disk space in MB

210

* @return Mesos Resource for disk allocation

211

*/

212

public static Protos.Resource disk(double disk);

213

214

/**

215

* Create network resource specification

216

* @param bandwidth - Network bandwidth in Mbps

217

* @return Mesos Resource for network allocation

218

*/

219

public static Protos.Resource network(double bandwidth);

220

221

/**

222

* Create port range resource specification

223

* @param begin - Start of port range (inclusive)

224

* @param end - End of port range (inclusive)

225

* @return Mesos Resource for port range allocation

226

*/

227

public static Protos.Resource ports(long begin, long end);

228

229

/**

230

* Create environment variable for Mesos tasks

231

* @param key - Environment variable name

232

* @param value - Environment variable value

233

* @return Mesos Environment.Variable object

234

*/

235

public static Protos.Environment.Variable variable(String key, String value);

236

237

/**

238

* Create URI specification for Mesos Fetcher

239

* @param uri - URI to fetch

240

* @param extract - Whether to extract archives

241

* @param executable - Whether file should be executable

242

* @param cache - Whether to cache the URI

243

* @param outputFile - Optional output filename

244

* @return Mesos CommandInfo.URI object

245

*/

246

public static Protos.CommandInfo.URI uri(String uri,

247

boolean extract,

248

boolean executable,

249

boolean cache,

250

String outputFile);

251

252

/**

253

* Convert range values to string representation

254

* @param ranges - List of Value.Range objects

255

* @return String representation of ranges (e.g., "8000-8010,9000-9010")

256

*/

257

public static String rangeValues(List<Protos.Value.Range> ranges);

258

259

/**

260

* Convert Mesos resource to string representation

261

* @param resource - Mesos Resource object

262

* @return Human-readable string representation

263

*/

264

public static String toString(Protos.Resource resource);

265

}

266

```

267

268

**Protobuf Utilities Example:**

269

270

```java

271

import org.apache.flink.mesos.Utils;

272

import org.apache.mesos.Protos;

273

274

// Create resource specifications

275

Protos.Resource cpuResource = Utils.cpus(2.0);

276

Protos.Resource memResource = Utils.mem(2048.0);

277

Protos.Resource diskResource = Utils.disk(1024.0);

278

Protos.Resource portsResource = Utils.ports(8000, 8010);

279

280

// Create environment variables

281

Protos.Environment.Variable javaHome = Utils.variable("JAVA_HOME", "/usr/lib/jvm/java-8");

282

Protos.Environment.Variable flinkHome = Utils.variable("FLINK_HOME", "/opt/flink");

283

284

// Create URIs for Mesos Fetcher

285

Protos.CommandInfo.URI jobJarUri = Utils.uri(

286

"http://artifact-server:8080/job.jar",

287

false, // don't extract

288

false, // not executable

289

true, // cache

290

"lib/job.jar" // output filename

291

);

292

293

Protos.CommandInfo.URI configUri = Utils.uri(

294

"http://artifact-server:8080/flink-conf.yaml",

295

false, false, true, "conf/flink-conf.yaml"

296

);

297

298

// Use in TaskInfo creation

299

Protos.TaskInfo taskInfo = Protos.TaskInfo.newBuilder()

300

.addAllResources(Arrays.asList(cpuResource, memResource, diskResource, portsResource))

301

.setCommand(Protos.CommandInfo.newBuilder()

302

.addAllUris(Arrays.asList(jobJarUri, configUri))

303

.setEnvironment(Protos.Environment.newBuilder()

304

.addAllVariables(Arrays.asList(javaHome, flinkHome))

305

)

306

)

307

.build();

308

```

309

310

### Resource Allocation Utilities

311

312

Utilities for managing Mesos resource allocations and calculations.

313

314

```java { .api }

315

/**

316

* Represents allocated Mesos resources for a task

317

* Provides resource information and allocation details

318

*/

319

public class MesosResourceAllocation {

320

/**

321

* Create resource allocation from Mesos resources

322

* @param resources - List of allocated Mesos resources

323

*/

324

public MesosResourceAllocation(List<Protos.Resource> resources);

325

326

/**

327

* Get allocated CPU cores

328

* @return Number of CPU cores allocated

329

*/

330

public double cpus();

331

332

/**

333

* Get allocated memory in megabytes

334

* @return Memory allocation in MB

335

*/

336

public double memoryMB();

337

338

/**

339

* Get allocated disk space in megabytes

340

* @return Disk allocation in MB

341

*/

342

public double diskMB();

343

344

/**

345

* Get allocated network bandwidth in Mbps

346

* @return Network bandwidth allocation

347

*/

348

public double networkMbps();

349

350

/**

351

* Get allocated GPU units

352

* @return Number of GPUs allocated

353

*/

354

public double gpus();

355

356

/**

357

* Get allocated port ranges

358

* @return List of allocated port ranges

359

*/

360

public List<Protos.Value.Range> ports();

361

362

/**

363

* Get all allocated Mesos resources

364

* @return Complete list of Mesos Resource objects

365

*/

366

public List<Protos.Resource> mesosResources();

367

368

/**

369

* Check if allocation satisfies resource requirements

370

* @param requirements - Required resource amounts

371

* @return true if allocation meets or exceeds requirements

372

*/

373

public boolean satisfies(ResourceProfile requirements);

374

}

375

```

376

377

## Utility Patterns

378

379

### Configuration Validation

380

381

Comprehensive validation of Mesos configurations:

382

383

```java

384

// Configuration validation utility

385

public class MesosConfigValidator {

386

public static void validateConfiguration(Configuration config) {

387

// Validate required settings

388

if (!config.contains(MesosOptions.MASTER_URL)) {

389

throw new IllegalArgumentException("Mesos master URL is required");

390

}

391

392

// Validate resource requirements

393

double cpus = config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0);

394

if (cpus <= 0) {

395

throw new IllegalArgumentException("CPU requirement must be positive");

396

}

397

398

// Validate framework settings

399

String frameworkName = config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME);

400

if (frameworkName == null || frameworkName.trim().isEmpty()) {

401

throw new IllegalArgumentException("Framework name cannot be empty");

402

}

403

404

// Validate timeout settings

405

int failoverTimeout = config.getInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS);

406

if (failoverTimeout < 0) {

407

throw new IllegalArgumentException("Failover timeout cannot be negative");

408

}

409

}

410

}

411

```

412

413

### Resource Calculation

414

415

Helper methods for resource requirement calculations:

416

417

```java

418

// Resource calculation utilities

419

public class ResourceCalculator {

420

public static ResourceProfile calculateTaskManagerProfile(Configuration config) {

421

// Calculate memory requirements

422

MemorySize processMemory = TaskExecutorResourceUtils.extractTotalProcessMemoryConfiguration(config);

423

MemorySize managedMemory = TaskExecutorResourceUtils.extractManagedMemoryConfiguration(config);

424

425

// Calculate CPU requirements

426

double cpuCores = config.getDouble("taskmanager.numberOfTaskSlots", 1.0);

427

428

// Calculate disk requirements

429

long diskSize = config.getLong("mesos.resourcemanager.tasks.disk", 1024);

430

431

return ResourceProfile.newBuilder()

432

.setCpuCores(cpuCores)

433

.setTaskHeapMemory(processMemory)

434

.setManagedMemory(managedMemory)

435

.build();

436

}

437

438

public static boolean isResourceSufficient(ResourceProfile required,

439

MesosResourceAllocation available) {

440

return available.cpus() >= required.getCpuCores().getValue() &&

441

available.memoryMB() >= required.getTotalMemory().getMebiBytes() &&

442

available.diskMB() >= 1024; // Minimum disk requirement

443

}

444

}

445

```

446

447

### Container Image Management

448

449

Utilities for Docker container image handling:

450

451

```java

452

// Container image utilities

453

public class ContainerImageUtils {

454

public static String resolveImageName(Configuration config) {

455

String imageName = config.getString("mesos.resourcemanager.tasks.container.docker.image");

456

457

if (imageName == null) {

458

// Default image based on Flink version

459

String flinkVersion = config.getString("flink.version", "1.13.6");

460

String scalaVersion = config.getString("scala.version", "2.11");

461

imageName = String.format("flink:%s-scala_%s", flinkVersion, scalaVersion);

462

}

463

464

return imageName;

465

}

466

467

public static List<String> buildDockerParameters(Configuration config) {

468

List<String> parameters = new ArrayList<>();

469

470

// Network configuration

471

String network = config.getString("mesos.resourcemanager.tasks.container.docker.network", "HOST");

472

parameters.add("--net=" + network);

473

474

// Volume mounts

475

String volumes = config.getString("mesos.resourcemanager.tasks.container.volumes", "");

476

for (String volume : volumes.split(",")) {

477

if (!volume.trim().isEmpty()) {

478

parameters.add("-v");

479

parameters.add(volume.trim());

480

}

481

}

482

483

return parameters;

484

}

485

}

486

```

487

488

## Error Handling

489

490

### Robust Error Handling

491

492

Comprehensive error handling patterns for utility operations:

493

494

- **Configuration errors**: Clear validation messages with suggestions

495

- **Resource allocation failures**: Detailed resource requirement analysis

496

- **Network failures**: Retry mechanisms with exponential backoff

497

- **File system errors**: Graceful degradation and alternative paths

498

499

### Logging and Debugging

500

501

Enhanced logging for troubleshooting:

502

503

```java

504

// Enhanced logging utilities

505

public class MesosLoggingUtils {

506

public static void logResourceAllocation(Logger logger, MesosResourceAllocation allocation) {

507

logger.info("Resource allocation: CPU={}, Memory={}MB, Disk={}MB, GPU={}",

508

allocation.cpus(), allocation.memoryMB(),

509

allocation.diskMB(), allocation.gpus());

510

}

511

512

public static void logConfigurationSummary(Logger logger, Configuration config) {

513

logger.info("Mesos configuration summary:");

514

logger.info(" Master URL: {}", config.getString(MesosOptions.MASTER_URL));

515

logger.info(" Framework: {}", config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));

516

logger.info(" Resources: CPU={}, Memory={}",

517

config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0),

518

config.getString("taskmanager.memory.process.size", "1g"));

519

}

520

}

521

```

522

523

## Performance Optimization

524

525

### Caching Strategies

526

527

Efficient caching for expensive operations:

528

529

- **Configuration parsing**: Cache parsed configurations

530

- **Resource calculations**: Memoize resource requirement calculations

531

- **Network operations**: Cache artifact server connections

532

- **Protobuf objects**: Reuse commonly created protobuf objects

533

534

### Connection Management

535

536

Optimal connection handling for external services:

537

538

- **HTTP connection pooling**: Reuse connections for artifact distribution

539

- **ZooKeeper session management**: Persistent sessions with reconnection

540

- **Mesos master connections**: Connection pooling and load balancing

541

542

## Deprecation Notice

543

544

All utility classes are deprecated as of Flink 1.13. Migration guidance:

545

546

- **Kubernetes utilities**: Use `org.apache.flink.kubernetes.utils.*`

547

- **YARN utilities**: Use `org.apache.flink.yarn.utils.*`

548

- **Generic utilities**: Use `org.apache.flink.runtime.util.*`

549

550

## Types

551

552

```java { .api }

553

/**

554

* SSL configuration for artifact server

555

*/

556

public class SSLConfiguration {

557

public String getKeystorePath();

558

public String getKeystorePassword();

559

public String getTruststorePath();

560

public String getTruststorePassword();

561

public boolean isClientAuthRequired();

562

}

563

564

/**

565

* Container specification for TaskManager deployment

566

*/

567

public class ContainerSpecification {

568

public String getImageName();

569

public ContainerType getType();

570

public Map<String, String> getEnvironmentVariables();

571

public List<VolumeMount> getVolumeMounts();

572

public List<String> getCommand();

573

public ResourceProfile getResourceProfile();

574

}

575

576

/**

577

* Volume mount specification

578

*/

579

public class VolumeMount {

580

public String getHostPath();

581

public String getContainerPath();

582

public MountMode getMode(); // READ_ONLY, READ_WRITE

583

}

584

585

/**

586

* Artifact distribution statistics

587

*/

588

public class ArtifactStats {

589

public int getTotalArtifacts();

590

public long getTotalSizeBytes();

591

public int getDownloadCount();

592

public double getAverageDownloadTime();

593

public List<String> getMostRequestedArtifacts();

594

}

595

```