0
# Artifact and Resource Management
1
2
Configuration management for plugin artifacts and pipeline resource allocation. Handles artifact selection for plugins, resource specification for different pipeline components, and optimization of computing resources across ETL pipeline execution.
3
4
## Capabilities
5
6
### Artifact Selection Configuration
7
8
Configuration for selecting and resolving plugin artifacts during pipeline execution.
9
10
```java { .api }
11
/**
12
* Configuration for selecting plugin artifacts during ETL pipeline execution
13
*/
14
public class ArtifactSelectorConfig {
15
/**
16
* Create default artifact selector (empty configuration)
17
*/
18
public ArtifactSelectorConfig();
19
20
/**
21
* Create artifact selector with specific scope, name, and version
22
* @param scope artifact scope (e.g., "SYSTEM", "USER")
23
* @param name artifact name
24
* @param version artifact version
25
*/
26
public ArtifactSelectorConfig(String scope, String name, String version);
27
28
/**
29
* Get artifact scope
30
* @return artifact scope identifier, may be null
31
*/
32
public String getScope();
33
34
/**
35
* Get artifact name
36
* @return artifact name, may be null
37
*/
38
public String getName();
39
40
/**
41
* Get artifact version
42
* @return artifact version string, may be null
43
*/
44
public String getVersion();
45
}
46
```
47
48
**Usage Examples:**
49
50
```java
51
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
52
53
// System artifact selection (built-in plugins)
54
ArtifactSelectorConfig systemArtifact = new ArtifactSelectorConfig(
55
"SYSTEM",
56
"core-plugins",
57
"2.8.0"
58
);
59
60
// User artifact selection (custom plugins)
61
ArtifactSelectorConfig userArtifact = new ArtifactSelectorConfig(
62
"USER",
63
"custom-analytics-plugins",
64
"1.2.0"
65
);
66
67
// Default artifact selection (no specific requirements)
68
ArtifactSelectorConfig defaultArtifact = new ArtifactSelectorConfig();
69
70
// Use with plugin configuration
71
Map<String, String> pluginProps = new HashMap<>();
72
pluginProps.put("algorithm", "advanced");
73
pluginProps.put("threshold", "0.85");
74
75
ETLPlugin pluginWithArtifact = new ETLPlugin(
76
"CustomTransform",
77
"transform",
78
pluginProps,
79
userArtifact
80
);
81
82
// Environment-specific artifact selection
83
String environment = System.getenv("CDAP_ENV");
84
ArtifactSelectorConfig envSpecificArtifact = new ArtifactSelectorConfig(
85
"SYSTEM",
86
"core-plugins",
87
environment.equals("prod") ? "2.8.0" : "2.9.0-SNAPSHOT"
88
);
89
```
90
91
### Resource Allocation Configuration
92
93
Resource specification and allocation for different components of ETL pipeline execution.
94
95
```java { .api }
96
/**
97
* Resource allocation configuration for pipeline components
98
*/
99
public class Resources {
100
/**
101
* Create resource configuration with memory and CPU specification
102
* @param memoryMB memory allocation in megabytes
103
* @param virtualCores number of virtual CPU cores
104
*/
105
public Resources(int memoryMB, int virtualCores);
106
107
/**
108
* Create default resource configuration (1024 MB, 1 core)
109
*/
110
public Resources();
111
112
/**
113
* Get memory allocation
114
* @return memory in megabytes
115
*/
116
public int getMemoryMB();
117
118
/**
119
* Get CPU core allocation
120
* @return number of virtual cores
121
*/
122
public int getVirtualCores();
123
}
124
```
125
126
**Usage Examples:**
127
128
```java
129
import co.cask.cdap.api.Resources;
130
131
// Basic resource configurations
132
Resources smallPipeline = new Resources(1024, 1); // 1GB, 1 core
133
Resources mediumPipeline = new Resources(4096, 4); // 4GB, 4 cores
134
Resources largePipeline = new Resources(16384, 16); // 16GB, 16 cores
135
136
// Default resources
137
Resources defaultResources = new Resources(); // 1GB, 1 core
138
139
// Component-specific resource allocation
140
Resources driverResources = new Resources(2048, 2); // Driver: 2GB, 2 cores
141
Resources executorResources = new Resources(8192, 8); // Executor: 8GB, 8 cores
142
Resources clientResources = new Resources(512, 1); // Client: 512MB, 1 core
143
144
// Configure pipeline with different resource tiers
145
ETLBatchConfig resourceOptimizedConfig = ETLBatchConfig.builder()
146
.setResources(executorResources) // Main execution resources
147
.setDriverResources(driverResources) // Driver resources
148
.setClientResources(clientResources) // Client resources
149
.addStage(sourceStage)
150
.addStage(transformStage)
151
.addStage(sinkStage)
152
.build();
153
154
// Dynamic resource allocation based on data volume
155
public Resources calculateResourcesForDataVolume(long recordCount) {
156
if (recordCount < 1_000_000) {
157
return new Resources(2048, 2); // < 1M records
158
} else if (recordCount < 10_000_000) {
159
return new Resources(8192, 8); // 1M-10M records
160
} else {
161
return new Resources(32768, 32); // > 10M records
162
}
163
}
164
```
165
166
### Artifact Management Patterns
167
168
Common patterns for artifact selection and management across different deployment scenarios.
169
170
**Environment-based Artifact Selection:**
171
172
```java
173
// Development environment with latest snapshots
174
public class DevelopmentArtifactSelector {
175
public static ArtifactSelectorConfig getCorePluins() {
176
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "3.0.0-SNAPSHOT");
177
}
178
179
public static ArtifactSelectorConfig getCustomPlugins() {
180
return new ArtifactSelectorConfig("USER", "dev-plugins", "latest");
181
}
182
}
183
184
// Production environment with stable versions
185
public class ProductionArtifactSelector {
186
public static ArtifactSelectorConfig getCorePlugins() {
187
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0");
188
}
189
190
public static ArtifactSelectorConfig getCustomPlugins() {
191
return new ArtifactSelectorConfig("USER", "analytics-plugins", "1.5.2");
192
}
193
}
194
195
// Conditional artifact selection
196
public ArtifactSelectorConfig selectArtifact(String pluginName, String environment) {
197
switch (environment.toLowerCase()) {
198
case "prod":
199
return ProductionArtifactSelector.getCorePlugins();
200
case "staging":
201
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.1-RC1");
202
case "dev":
203
default:
204
return DevelopmentArtifactSelector.getCorePluins();
205
}
206
}
207
```
208
209
**Plugin-specific Artifact Selection:**
210
211
```java
212
// Artifact selection based on plugin capabilities
213
public class PluginArtifactResolver {
214
private static final Map<String, ArtifactSelectorConfig> PLUGIN_ARTIFACTS = new HashMap<>();
215
216
static {
217
// Core plugins
218
PLUGIN_ARTIFACTS.put("Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
219
PLUGIN_ARTIFACTS.put("File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
220
PLUGIN_ARTIFACTS.put("Database", new ArtifactSelectorConfig("SYSTEM", "database-plugins", "2.8.0"));
221
222
// Advanced analytics plugins
223
PLUGIN_ARTIFACTS.put("MLTransform", new ArtifactSelectorConfig("USER", "ml-plugins", "2.1.0"));
224
PLUGIN_ARTIFACTS.put("TensorFlow", new ArtifactSelectorConfig("USER", "tensorflow-plugins", "1.3.0"));
225
226
// Cloud-specific plugins
227
PLUGIN_ARTIFACTS.put("BigQuery", new ArtifactSelectorConfig("SYSTEM", "gcp-plugins", "0.20.0"));
228
PLUGIN_ARTIFACTS.put("S3", new ArtifactSelectorConfig("SYSTEM", "aws-plugins", "0.15.0"));
229
}
230
231
public static ArtifactSelectorConfig resolveArtifact(String pluginName) {
232
return PLUGIN_ARTIFACTS.getOrDefault(pluginName, new ArtifactSelectorConfig());
233
}
234
}
235
236
// Usage in plugin creation
237
ETLPlugin createPluginWithArtifact(String pluginName, String pluginType, Map<String, String> properties) {
238
ArtifactSelectorConfig artifact = PluginArtifactResolver.resolveArtifact(pluginName);
239
return new ETLPlugin(pluginName, pluginType, properties, artifact);
240
}
241
```
242
243
### Resource Optimization Patterns
244
245
Strategies for optimizing resource allocation across different pipeline types and data volumes.
246
247
**Data Volume-based Resource Scaling:**
248
249
```java
250
public class ResourceOptimizer {
251
252
/**
253
* Calculate optimal resources based on input data characteristics
254
*/
255
public static Resources optimizeForDataVolume(DataVolumeMetrics metrics) {
256
long recordCount = metrics.getRecordCount();
257
long avgRecordSize = metrics.getAvgRecordSizeBytes();
258
int transformComplexity = metrics.getTransformComplexityScore();
259
260
// Base resource calculation
261
int baseMemoryMB = 1024;
262
int baseCores = 1;
263
264
// Scale based on record count
265
if (recordCount > 10_000_000) {
266
baseMemoryMB *= 8;
267
baseCores *= 8;
268
} else if (recordCount > 1_000_000) {
269
baseMemoryMB *= 4;
270
baseCores *= 4;
271
} else if (recordCount > 100_000) {
272
baseMemoryMB *= 2;
273
baseCores *= 2;
274
}
275
276
// Adjust for record size
277
if (avgRecordSize > 1024) { // Large records (>1KB)
278
baseMemoryMB *= 2;
279
}
280
281
// Adjust for transform complexity
282
if (transformComplexity > 7) { // Complex transformations
283
baseCores *= 2;
284
baseMemoryMB = Math.max(baseMemoryMB, 4096);
285
}
286
287
// Apply resource limits
288
baseMemoryMB = Math.min(baseMemoryMB, 65536); // Max 64GB
289
baseCores = Math.min(baseCores, 64); // Max 64 cores
290
291
return new Resources(baseMemoryMB, baseCores);
292
}
293
294
/**
295
* Optimize resources for streaming pipelines
296
*/
297
public static Resources optimizeForStreaming(String batchInterval, int expectedThroughput) {
298
int intervalSeconds = parseBatchInterval(batchInterval);
299
300
// More frequent batches need more resources
301
int memoryMB = 2048;
302
int cores = 2;
303
304
if (intervalSeconds < 60) { // Sub-minute batches
305
memoryMB *= 4;
306
cores *= 4;
307
} else if (intervalSeconds < 300) { // Sub-5-minute batches
308
memoryMB *= 2;
309
cores *= 2;
310
}
311
312
// Scale based on throughput
313
if (expectedThroughput > 10000) { // High throughput
314
memoryMB *= 2;
315
cores *= 2;
316
}
317
318
return new Resources(memoryMB, cores);
319
}
320
321
private static int parseBatchInterval(String interval) {
322
// Parse interval strings like "30s", "5m", "1h"
323
if (interval.endsWith("s")) {
324
return Integer.parseInt(interval.substring(0, interval.length() - 1));
325
} else if (interval.endsWith("m")) {
326
return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 60;
327
} else if (interval.endsWith("h")) {
328
return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 3600;
329
}
330
return 60; // Default to 1 minute
331
}
332
}
333
```
334
335
**Environment-specific Resource Configuration:**
336
337
```java
338
public class EnvironmentResourceManager {
339
340
public static class ResourceProfile {
341
public final Resources executor;
342
public final Resources driver;
343
public final Resources client;
344
345
public ResourceProfile(Resources executor, Resources driver, Resources client) {
346
this.executor = executor;
347
this.driver = driver;
348
this.client = client;
349
}
350
}
351
352
// Predefined resource profiles for different environments
353
public static final ResourceProfile DEVELOPMENT = new ResourceProfile(
354
new Resources(1024, 1), // Small executor
355
new Resources(512, 1), // Minimal driver
356
new Resources(256, 1) // Minimal client
357
);
358
359
public static final ResourceProfile TESTING = new ResourceProfile(
360
new Resources(2048, 2), // Medium executor
361
new Resources(1024, 1), // Small driver
362
new Resources(512, 1) // Small client
363
);
364
365
public static final ResourceProfile PRODUCTION = new ResourceProfile(
366
new Resources(8192, 8), // Large executor
367
new Resources(4096, 4), // Medium driver
368
new Resources(1024, 2) // Medium client
369
);
370
371
public static final ResourceProfile HIGH_VOLUME = new ResourceProfile(
372
new Resources(32768, 32), // XL executor
373
new Resources(8192, 8), // Large driver
374
new Resources(2048, 4) // Large client
375
);
376
377
public static ResourceProfile getProfileForEnvironment(String environment) {
378
switch (environment.toLowerCase()) {
379
case "prod":
380
case "production":
381
return PRODUCTION;
382
case "staging":
383
case "test":
384
return TESTING;
385
case "high-volume":
386
case "batch":
387
return HIGH_VOLUME;
388
case "dev":
389
case "development":
390
default:
391
return DEVELOPMENT;
392
}
393
}
394
395
// Apply resource profile to pipeline configuration
396
public static ETLBatchConfig.Builder applyResourceProfile(
397
ETLBatchConfig.Builder builder, ResourceProfile profile) {
398
return builder
399
.setResources(profile.executor)
400
.setDriverResources(profile.driver)
401
.setClientResources(profile.client);
402
}
403
}
404
405
// Usage example
406
String environment = System.getProperty("environment", "dev");
407
ResourceProfile profile = EnvironmentResourceManager.getProfileForEnvironment(environment);
408
409
ETLBatchConfig config = EnvironmentResourceManager.applyResourceProfile(
410
ETLBatchConfig.builder(), profile)
411
.addStage(sourceStage)
412
.addStage(transformStage)
413
.addStage(sinkStage)
414
.build();
415
```
416
417
### Resource Monitoring and Adjustment
418
419
Patterns for monitoring resource usage and making dynamic adjustments.
420
421
```java
422
/**
423
* Resource monitoring and adjustment utilities
424
*/
425
public class ResourceMonitor {
426
427
/**
428
* Monitor pipeline resource usage and suggest optimizations
429
*/
430
public static ResourceRecommendation analyzeResourceUsage(PipelineExecutionMetrics metrics) {
431
double cpuUtilization = metrics.getAvgCpuUtilization();
432
double memoryUtilization = metrics.getAvgMemoryUtilization();
433
long executionTimeMs = metrics.getExecutionTimeMs();
434
435
ResourceRecommendation recommendation = new ResourceRecommendation();
436
437
// CPU optimization
438
if (cpuUtilization < 30) {
439
recommendation.suggestCpuReduction();
440
} else if (cpuUtilization > 90) {
441
recommendation.suggestCpuIncrease();
442
}
443
444
// Memory optimization
445
if (memoryUtilization < 40) {
446
recommendation.suggestMemoryReduction();
447
} else if (memoryUtilization > 85) {
448
recommendation.suggestMemoryIncrease();
449
}
450
451
// Execution time optimization
452
if (executionTimeMs > metrics.getSlaTimeMs()) {
453
recommendation.suggestPerformanceImprovement();
454
}
455
456
return recommendation;
457
}
458
459
/**
460
* Auto-scale resources based on historical performance
461
*/
462
public static Resources autoScaleResources(Resources current, List<PipelineExecutionMetrics> history) {
463
if (history.isEmpty()) {
464
return current;
465
}
466
467
// Calculate average resource utilization
468
double avgCpuUtil = history.stream()
469
.mapToDouble(PipelineExecutionMetrics::getAvgCpuUtilization)
470
.average()
471
.orElse(50.0);
472
473
double avgMemoryUtil = history.stream()
474
.mapToDouble(PipelineExecutionMetrics::getAvgMemoryUtilization)
475
.average()
476
.orElse(50.0);
477
478
// Scale conservatively
479
int newCores = current.getVirtualCores();
480
int newMemoryMB = current.getMemoryMB();
481
482
if (avgCpuUtil > 80) {
483
newCores = Math.min(newCores * 2, 64);
484
} else if (avgCpuUtil < 20) {
485
newCores = Math.max(newCores / 2, 1);
486
}
487
488
if (avgMemoryUtil > 80) {
489
newMemoryMB = Math.min(newMemoryMB * 2, 65536);
490
} else if (avgMemoryUtil < 30) {
491
newMemoryMB = Math.max(newMemoryMB / 2, 512);
492
}
493
494
return new Resources(newMemoryMB, newCores);
495
}
496
}
497
```