0
# Utilities and Helpers
1
2
Collection of utility classes for Mesos integration, including artifact distribution, resource management, configuration helpers, and Mesos protobuf utilities. These utilities provide essential support functions for all aspects of Flink-Mesos integration.
3
4
## Capabilities
5
6
### Artifact Distribution
7
8
HTTP server and resolver interfaces for distributing job artifacts to Mesos tasks via Mesos Fetcher integration.
9
10
```java { .api }
11
/**
12
* Interface for resolving artifact URIs for Mesos Fetcher
13
* Provides URI resolution for files that need to be distributed to tasks
14
*/
15
public interface MesosArtifactResolver {
16
/**
17
* Resolve artifact URI for the given remote file path
18
* @param remoteFile - Path to file as it should appear in task container
19
* @return Optional URL where file can be fetched, empty if not found
20
*/
21
Option<URL> resolve(Path remoteFile);
22
}
23
24
/**
25
* HTTP server for distributing artifacts to Mesos tasks via Mesos Fetcher
26
* Extends resolver interface with server lifecycle management
27
*/
28
public interface MesosArtifactServer extends MesosArtifactResolver {
29
/**
30
* Add a local file path for distribution to tasks
31
* @param path - Local file system path to serve
32
* @param remoteFile - Path as it should appear in task containers
33
* @return URL where file can be fetched by Mesos Fetcher
34
*/
35
URL addPath(Path path, Path remoteFile);
36
37
/**
38
* Stop the artifact server and cleanup resources
39
* Closes all connections and releases server port
40
*/
41
void stop();
42
}
43
44
/**
45
* HTTP server implementation for artifact distribution
46
* Provides secure, scalable file distribution to Mesos tasks
47
*/
48
public class MesosArtifactServerImpl implements MesosArtifactServer {
49
/**
50
* Create artifact server with configuration
51
* @param hostname - Hostname for server binding
52
* @param port - Port for server (0 for automatic assignment)
53
* @param sslConfig - Optional SSL configuration for secure distribution
54
*/
55
public MesosArtifactServerImpl(String hostname, int port, SSLConfiguration sslConfig);
56
57
/**
58
* Start the artifact server
59
* @return Server URL for artifact access
60
*/
61
public URL start();
62
63
/**
64
* Get the server port (useful when auto-assigned)
65
* @return Actual port number being used
66
*/
67
public int getPort();
68
}
69
```
70
71
**Artifact Server Usage Example:**
72
73
```java
74
import org.apache.flink.mesos.util.MesosArtifactServer;
75
import org.apache.flink.mesos.util.MesosArtifactServerImpl;
76
77
// Create and start artifact server
78
MesosArtifactServerImpl server = new MesosArtifactServerImpl("master-host", 0, null);
79
URL serverUrl = server.start();
80
81
// Add job JAR for distribution
82
Path jobJarPath = Paths.get("/path/to/job.jar");
83
URL jarUrl = server.addPath(jobJarPath, Paths.get("lib/job.jar"));
84
85
// Add configuration files
86
Path configPath = Paths.get("/opt/flink/conf/flink-conf.yaml");
87
URL configUrl = server.addPath(configPath, Paths.get("conf/flink-conf.yaml"));
88
89
// URLs can now be used in Mesos TaskInfo URIs
90
// Mesos Fetcher will download files to task containers
91
```
92
93
### Configuration Utilities
94
95
Comprehensive utilities for creating and managing Mesos-specific configurations, TaskManager parameters, and container specifications.
96
97
```java { .api }
98
/**
99
* Collection of Mesos-related utility methods
100
* Provides configuration creation and management functions
101
*/
102
public class MesosUtils {
103
/**
104
* Create Mesos scheduler configuration from Flink configuration
105
* @param config - Flink configuration with Mesos settings
106
* @param hostname - Hostname for framework registration
107
* @return Configured MesosConfiguration for scheduler
108
*/
109
public static MesosConfiguration createMesosSchedulerConfiguration(Configuration config,
110
String hostname);
111
112
/**
113
* Create TaskManager parameters from configuration
114
* @param config - Flink configuration
115
* @param logger - Logger for parameter validation messages
116
* @return Configured TaskManager parameters for Mesos deployment
117
*/
118
public static MesosTaskManagerParameters createTmParameters(Configuration config,
119
Logger logger);
120
121
/**
122
* Create container specification from configuration
123
* @param config - Flink configuration with container settings
124
* @return Container specification for TaskManager deployment
125
*/
126
public static ContainerSpecification createContainerSpec(Configuration config);
127
128
/**
129
* Apply configuration overlays to container specification
130
* Merges environment variables, volumes, and other container settings
131
* @param config - Flink configuration
132
* @param containerSpec - Base container specification to modify
133
*/
134
public static void applyOverlays(Configuration config,
135
ContainerSpecification containerSpec);
136
137
/**
138
* Load and merge Flink configuration from multiple sources
139
* @param baseConfig - Base configuration
140
* @param logger - Logger for configuration loading messages
141
* @return Merged configuration with all sources applied
142
*/
143
public static Configuration loadConfiguration(Configuration baseConfig, Logger logger);
144
}
145
```
146
147
**Configuration Creation Example:**
148
149
```java
150
import org.apache.flink.configuration.Configuration;
151
import org.apache.flink.mesos.util.MesosUtils;
152
import org.apache.flink.mesos.configuration.MesosOptions;
153
154
// Create base configuration
155
Configuration config = new Configuration();
156
config.setString(MesosOptions.MASTER_URL, "mesos://master:5050");
157
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME, "flink-cluster");
158
config.setDouble("taskmanager.numberOfTaskSlots", 4.0);
159
config.setString("taskmanager.memory.process.size", "2g");
160
161
// Create Mesos scheduler configuration
162
MesosConfiguration mesosConfig = MesosUtils.createMesosSchedulerConfiguration(
163
config, "cluster-master"
164
);
165
166
// Create TaskManager parameters
167
MesosTaskManagerParameters tmParams = MesosUtils.createTmParameters(
168
config, LoggerFactory.getLogger(MyClass.class)
169
);
170
171
// Create container specification
172
ContainerSpecification containerSpec = MesosUtils.createContainerSpec(config);
173
MesosUtils.applyOverlays(config, containerSpec);
174
```
175
176
### Mesos Protobuf Utilities
177
178
Collection of utility methods for creating and manipulating Mesos protobuf objects including resources, environment variables, and URIs.
179
180
```java { .api }
181
/**
182
* Collection of Mesos protobuf and resource utility methods
183
* Provides helpers for creating Mesos protocol buffer objects
184
*/
185
public class Utils {
186
/**
187
* Create CPU resource specification
188
* @param cpus - Number of CPU cores
189
* @return Mesos Resource for CPU allocation
190
*/
191
public static Protos.Resource cpus(double cpus);
192
193
/**
194
* Create memory resource specification
195
* @param mem - Memory amount in MB
196
* @return Mesos Resource for memory allocation
197
*/
198
public static Protos.Resource mem(double mem);
199
200
/**
201
* Create GPU resource specification
202
* @param gpus - Number of GPU units
203
* @return Mesos Resource for GPU allocation
204
*/
205
public static Protos.Resource gpus(double gpus);
206
207
/**
208
* Create disk resource specification
209
* @param disk - Disk space in MB
210
* @return Mesos Resource for disk allocation
211
*/
212
public static Protos.Resource disk(double disk);
213
214
/**
215
* Create network resource specification
216
* @param bandwidth - Network bandwidth in Mbps
217
* @return Mesos Resource for network allocation
218
*/
219
public static Protos.Resource network(double bandwidth);
220
221
/**
222
* Create port range resource specification
223
* @param begin - Start of port range (inclusive)
224
* @param end - End of port range (inclusive)
225
* @return Mesos Resource for port range allocation
226
*/
227
public static Protos.Resource ports(long begin, long end);
228
229
/**
230
* Create environment variable for Mesos tasks
231
* @param key - Environment variable name
232
* @param value - Environment variable value
233
* @return Mesos Environment.Variable object
234
*/
235
public static Protos.Environment.Variable variable(String key, String value);
236
237
/**
238
* Create URI specification for Mesos Fetcher
239
* @param uri - URI to fetch
240
* @param extract - Whether to extract archives
241
* @param executable - Whether file should be executable
242
* @param cache - Whether to cache the URI
243
* @param outputFile - Optional output filename
244
* @return Mesos CommandInfo.URI object
245
*/
246
public static Protos.CommandInfo.URI uri(String uri,
247
boolean extract,
248
boolean executable,
249
boolean cache,
250
String outputFile);
251
252
/**
253
* Convert range values to string representation
254
* @param ranges - List of Value.Range objects
255
* @return String representation of ranges (e.g., "8000-8010,9000-9010")
256
*/
257
public static String rangeValues(List<Protos.Value.Range> ranges);
258
259
/**
260
* Convert Mesos resource to string representation
261
* @param resource - Mesos Resource object
262
* @return Human-readable string representation
263
*/
264
public static String toString(Protos.Resource resource);
265
}
266
```
267
268
**Protobuf Utilities Example:**
269
270
```java
271
import org.apache.flink.mesos.Utils;
272
import org.apache.mesos.Protos;
273
274
// Create resource specifications
275
Protos.Resource cpuResource = Utils.cpus(2.0);
276
Protos.Resource memResource = Utils.mem(2048.0);
277
Protos.Resource diskResource = Utils.disk(1024.0);
278
Protos.Resource portsResource = Utils.ports(8000, 8010);
279
280
// Create environment variables
281
Protos.Environment.Variable javaHome = Utils.variable("JAVA_HOME", "/usr/lib/jvm/java-8");
282
Protos.Environment.Variable flinkHome = Utils.variable("FLINK_HOME", "/opt/flink");
283
284
// Create URIs for Mesos Fetcher
285
Protos.CommandInfo.URI jobJarUri = Utils.uri(
286
"http://artifact-server:8080/job.jar",
287
false, // don't extract
288
false, // not executable
289
true, // cache
290
"lib/job.jar" // output filename
291
);
292
293
Protos.CommandInfo.URI configUri = Utils.uri(
294
"http://artifact-server:8080/flink-conf.yaml",
295
false, false, true, "conf/flink-conf.yaml"
296
);
297
298
// Use in TaskInfo creation
299
Protos.TaskInfo taskInfo = Protos.TaskInfo.newBuilder()
300
.addAllResources(Arrays.asList(cpuResource, memResource, diskResource, portsResource))
301
.setCommand(Protos.CommandInfo.newBuilder()
302
.addAllUris(Arrays.asList(jobJarUri, configUri))
303
.setEnvironment(Protos.Environment.newBuilder()
304
.addAllVariables(Arrays.asList(javaHome, flinkHome))
305
)
306
)
307
.build();
308
```
309
310
### Resource Allocation Utilities
311
312
Utilities for managing Mesos resource allocations and calculations.
313
314
```java { .api }
315
/**
316
* Represents allocated Mesos resources for a task
317
* Provides resource information and allocation details
318
*/
319
public class MesosResourceAllocation {
320
/**
321
* Create resource allocation from Mesos resources
322
* @param resources - List of allocated Mesos resources
323
*/
324
public MesosResourceAllocation(List<Protos.Resource> resources);
325
326
/**
327
* Get allocated CPU cores
328
* @return Number of CPU cores allocated
329
*/
330
public double cpus();
331
332
/**
333
* Get allocated memory in megabytes
334
* @return Memory allocation in MB
335
*/
336
public double memoryMB();
337
338
/**
339
* Get allocated disk space in megabytes
340
* @return Disk allocation in MB
341
*/
342
public double diskMB();
343
344
/**
345
* Get allocated network bandwidth in Mbps
346
* @return Network bandwidth allocation
347
*/
348
public double networkMbps();
349
350
/**
351
* Get allocated GPU units
352
* @return Number of GPUs allocated
353
*/
354
public double gpus();
355
356
/**
357
* Get allocated port ranges
358
* @return List of allocated port ranges
359
*/
360
public List<Protos.Value.Range> ports();
361
362
/**
363
* Get all allocated Mesos resources
364
* @return Complete list of Mesos Resource objects
365
*/
366
public List<Protos.Resource> mesosResources();
367
368
/**
369
* Check if allocation satisfies resource requirements
370
* @param requirements - Required resource amounts
371
* @return true if allocation meets or exceeds requirements
372
*/
373
public boolean satisfies(ResourceProfile requirements);
374
}
375
```
376
377
## Utility Patterns
378
379
### Configuration Validation
380
381
Comprehensive validation of Mesos configurations:
382
383
```java
384
// Configuration validation utility
385
public class MesosConfigValidator {
386
public static void validateConfiguration(Configuration config) {
387
// Validate required settings
388
if (!config.contains(MesosOptions.MASTER_URL)) {
389
throw new IllegalArgumentException("Mesos master URL is required");
390
}
391
392
// Validate resource requirements
393
double cpus = config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0);
394
if (cpus <= 0) {
395
throw new IllegalArgumentException("CPU requirement must be positive");
396
}
397
398
// Validate framework settings
399
String frameworkName = config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME);
400
if (frameworkName == null || frameworkName.trim().isEmpty()) {
401
throw new IllegalArgumentException("Framework name cannot be empty");
402
}
403
404
// Validate timeout settings
405
int failoverTimeout = config.getInteger(MesosOptions.FAILOVER_TIMEOUT_SECONDS);
406
if (failoverTimeout < 0) {
407
throw new IllegalArgumentException("Failover timeout cannot be negative");
408
}
409
}
410
}
411
```
412
413
### Resource Calculation
414
415
Helper methods for resource requirement calculations:
416
417
```java
418
// Resource calculation utilities
419
public class ResourceCalculator {
420
public static ResourceProfile calculateTaskManagerProfile(Configuration config) {
421
// Calculate memory requirements
422
MemorySize processMemory = TaskExecutorResourceUtils.extractTotalProcessMemoryConfiguration(config);
423
MemorySize managedMemory = TaskExecutorResourceUtils.extractManagedMemoryConfiguration(config);
424
425
// Calculate CPU requirements
426
double cpuCores = config.getDouble("taskmanager.numberOfTaskSlots", 1.0);
427
428
// Calculate disk requirements
429
long diskSize = config.getLong("mesos.resourcemanager.tasks.disk", 1024);
430
431
return ResourceProfile.newBuilder()
432
.setCpuCores(cpuCores)
433
.setTaskHeapMemory(processMemory)
434
.setManagedMemory(managedMemory)
435
.build();
436
}
437
438
public static boolean isResourceSufficient(ResourceProfile required,
439
MesosResourceAllocation available) {
440
return available.cpus() >= required.getCpuCores().getValue() &&
441
available.memoryMB() >= required.getTotalMemory().getMebiBytes() &&
442
available.diskMB() >= 1024; // Minimum disk requirement
443
}
444
}
445
```
446
447
### Container Image Management
448
449
Utilities for Docker container image handling:
450
451
```java
452
// Container image utilities
453
public class ContainerImageUtils {
454
public static String resolveImageName(Configuration config) {
455
String imageName = config.getString("mesos.resourcemanager.tasks.container.docker.image");
456
457
if (imageName == null) {
458
// Default image based on Flink version
459
String flinkVersion = config.getString("flink.version", "1.13.6");
460
String scalaVersion = config.getString("scala.version", "2.11");
461
imageName = String.format("flink:%s-scala_%s", flinkVersion, scalaVersion);
462
}
463
464
return imageName;
465
}
466
467
public static List<String> buildDockerParameters(Configuration config) {
468
List<String> parameters = new ArrayList<>();
469
470
// Network configuration
471
String network = config.getString("mesos.resourcemanager.tasks.container.docker.network", "HOST");
472
parameters.add("--net=" + network);
473
474
// Volume mounts
475
String volumes = config.getString("mesos.resourcemanager.tasks.container.volumes", "");
476
for (String volume : volumes.split(",")) {
477
if (!volume.trim().isEmpty()) {
478
parameters.add("-v");
479
parameters.add(volume.trim());
480
}
481
}
482
483
return parameters;
484
}
485
}
486
```
487
488
## Error Handling
489
490
### Robust Error Handling
491
492
Comprehensive error handling patterns for utility operations:
493
494
- **Configuration errors**: Clear validation messages with suggestions
495
- **Resource allocation failures**: Detailed resource requirement analysis
496
- **Network failures**: Retry mechanisms with exponential backoff
497
- **File system errors**: Graceful degradation and alternative paths
498
499
### Logging and Debugging
500
501
Enhanced logging for troubleshooting:
502
503
```java
504
// Enhanced logging utilities
505
public class MesosLoggingUtils {
506
public static void logResourceAllocation(Logger logger, MesosResourceAllocation allocation) {
507
logger.info("Resource allocation: CPU={}, Memory={}MB, Disk={}MB, GPU={}",
508
allocation.cpus(), allocation.memoryMB(),
509
allocation.diskMB(), allocation.gpus());
510
}
511
512
public static void logConfigurationSummary(Logger logger, Configuration config) {
513
logger.info("Mesos configuration summary:");
514
logger.info(" Master URL: {}", config.getString(MesosOptions.MASTER_URL));
515
logger.info(" Framework: {}", config.getString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
516
logger.info(" Resources: CPU={}, Memory={}",
517
config.getDouble("mesos.resourcemanager.tasks.cpus", 1.0),
518
config.getString("taskmanager.memory.process.size", "1g"));
519
}
520
}
521
```
522
523
## Performance Optimization
524
525
### Caching Strategies
526
527
Efficient caching for expensive operations:
528
529
- **Configuration parsing**: Cache parsed configurations
530
- **Resource calculations**: Memoize resource requirement calculations
531
- **Network operations**: Cache artifact server connections
532
- **Protobuf objects**: Reuse commonly created protobuf objects
533
534
### Connection Management
535
536
Optimal connection handling for external services:
537
538
- **HTTP connection pooling**: Reuse connections for artifact distribution
539
- **ZooKeeper session management**: Persistent sessions with reconnection
540
- **Mesos master connections**: Connection pooling and load balancing
541
542
## Deprecation Notice
543
544
All utility classes are deprecated as of Flink 1.13. Migration guidance:
545
546
- **Kubernetes utilities**: Use `org.apache.flink.kubernetes.utils.*`
547
- **YARN utilities**: Use `org.apache.flink.yarn.utils.*`
548
- **Generic utilities**: Use `org.apache.flink.runtime.util.*`
549
550
## Types
551
552
```java { .api }
553
/**
554
* SSL configuration for artifact server
555
*/
556
public class SSLConfiguration {
557
public String getKeystorePath();
558
public String getKeystorePassword();
559
public String getTruststorePath();
560
public String getTruststorePassword();
561
public boolean isClientAuthRequired();
562
}
563
564
/**
565
* Container specification for TaskManager deployment
566
*/
567
public class ContainerSpecification {
568
public String getImageName();
569
public ContainerType getType();
570
public Map<String, String> getEnvironmentVariables();
571
public List<VolumeMount> getVolumeMounts();
572
public List<String> getCommand();
573
public ResourceProfile getResourceProfile();
574
}
575
576
/**
577
* Volume mount specification
578
*/
579
public class VolumeMount {
580
public String getHostPath();
581
public String getContainerPath();
582
public MountMode getMode(); // READ_ONLY, READ_WRITE
583
}
584
585
/**
586
* Artifact distribution statistics
587
*/
588
public class ArtifactStats {
589
public int getTotalArtifacts();
590
public long getTotalSizeBytes();
591
public int getDownloadCount();
592
public double getAverageDownloadTime();
593
public List<String> getMostRequestedArtifacts();
594
}
595
```