or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

artifact-resource-management.mddocs/

0

# Artifact and Resource Management

1

2

Configuration management for plugin artifacts and pipeline resource allocation. Handles artifact selection for plugins, resource specification for different pipeline components, and optimization of computing resources across ETL pipeline execution.

3

4

## Capabilities

5

6

### Artifact Selection Configuration

7

8

Configuration for selecting and resolving plugin artifacts during pipeline execution.

9

10

```java { .api }

11

/**

12

* Configuration for selecting plugin artifacts during ETL pipeline execution

13

*/

14

public class ArtifactSelectorConfig {

15

/**

16

* Create default artifact selector (empty configuration)

17

*/

18

public ArtifactSelectorConfig();

19

20

/**

21

* Create artifact selector with specific scope, name, and version

22

* @param scope artifact scope (e.g., "SYSTEM", "USER")

23

* @param name artifact name

24

* @param version artifact version

25

*/

26

public ArtifactSelectorConfig(String scope, String name, String version);

27

28

/**

29

* Get artifact scope

30

* @return artifact scope identifier, may be null

31

*/

32

public String getScope();

33

34

/**

35

* Get artifact name

36

* @return artifact name, may be null

37

*/

38

public String getName();

39

40

/**

41

* Get artifact version

42

* @return artifact version string, may be null

43

*/

44

public String getVersion();

45

}

46

```

47

48

**Usage Examples:**

49

50

```java

51

import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

52

53

// System artifact selection (built-in plugins)

54

ArtifactSelectorConfig systemArtifact = new ArtifactSelectorConfig(

55

"SYSTEM",

56

"core-plugins",

57

"2.8.0"

58

);

59

60

// User artifact selection (custom plugins)

61

ArtifactSelectorConfig userArtifact = new ArtifactSelectorConfig(

62

"USER",

63

"custom-analytics-plugins",

64

"1.2.0"

65

);

66

67

// Default artifact selection (no specific requirements)

68

ArtifactSelectorConfig defaultArtifact = new ArtifactSelectorConfig();

69

70

// Use with plugin configuration

71

Map<String, String> pluginProps = new HashMap<>();

72

pluginProps.put("algorithm", "advanced");

73

pluginProps.put("threshold", "0.85");

74

75

ETLPlugin pluginWithArtifact = new ETLPlugin(

76

"CustomTransform",

77

"transform",

78

pluginProps,

79

userArtifact

80

);

81

82

// Environment-specific artifact selection

83

String environment = System.getenv("CDAP_ENV");

84

ArtifactSelectorConfig envSpecificArtifact = new ArtifactSelectorConfig(

85

"SYSTEM",

86

"core-plugins",

87

environment.equals("prod") ? "2.8.0" : "2.9.0-SNAPSHOT"

88

);

89

```

90

91

### Resource Allocation Configuration

92

93

Resource specification and allocation for different components of ETL pipeline execution.

94

95

```java { .api }

96

/**

97

* Resource allocation configuration for pipeline components

98

*/

99

public class Resources {

100

/**

101

* Create resource configuration with memory and CPU specification

102

* @param memoryMB memory allocation in megabytes

103

* @param virtualCores number of virtual CPU cores

104

*/

105

public Resources(int memoryMB, int virtualCores);

106

107

/**

108

* Create default resource configuration (1024 MB, 1 core)

109

*/

110

public Resources();

111

112

/**

113

* Get memory allocation

114

* @return memory in megabytes

115

*/

116

public int getMemoryMB();

117

118

/**

119

* Get CPU core allocation

120

* @return number of virtual cores

121

*/

122

public int getVirtualCores();

123

}

124

```

125

126

**Usage Examples:**

127

128

```java

129

import co.cask.cdap.api.Resources;

130

131

// Basic resource configurations

132

Resources smallPipeline = new Resources(1024, 1); // 1GB, 1 core

133

Resources mediumPipeline = new Resources(4096, 4); // 4GB, 4 cores

134

Resources largePipeline = new Resources(16384, 16); // 16GB, 16 cores

135

136

// Default resources

137

Resources defaultResources = new Resources(); // 1GB, 1 core

138

139

// Component-specific resource allocation

140

Resources driverResources = new Resources(2048, 2); // Driver: 2GB, 2 cores

141

Resources executorResources = new Resources(8192, 8); // Executor: 8GB, 8 cores

142

Resources clientResources = new Resources(512, 1); // Client: 512MB, 1 core

143

144

// Configure pipeline with different resource tiers

145

ETLBatchConfig resourceOptimizedConfig = ETLBatchConfig.builder()

146

.setResources(executorResources) // Main execution resources

147

.setDriverResources(driverResources) // Driver resources

148

.setClientResources(clientResources) // Client resources

149

.addStage(sourceStage)

150

.addStage(transformStage)

151

.addStage(sinkStage)

152

.build();

153

154

// Dynamic resource allocation based on data volume

155

public Resources calculateResourcesForDataVolume(long recordCount) {

156

if (recordCount < 1_000_000) {

157

return new Resources(2048, 2); // < 1M records

158

} else if (recordCount < 10_000_000) {

159

return new Resources(8192, 8); // 1M-10M records

160

} else {

161

return new Resources(32768, 32); // > 10M records

162

}

163

}

164

```

165

166

### Artifact Management Patterns

167

168

Common patterns for artifact selection and management across different deployment scenarios.

169

170

**Environment-based Artifact Selection:**

171

172

```java

173

// Development environment with latest snapshots

174

public class DevelopmentArtifactSelector {

175

public static ArtifactSelectorConfig getCorePluins() {

176

return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "3.0.0-SNAPSHOT");

177

}

178

179

public static ArtifactSelectorConfig getCustomPlugins() {

180

return new ArtifactSelectorConfig("USER", "dev-plugins", "latest");

181

}

182

}

183

184

// Production environment with stable versions

185

public class ProductionArtifactSelector {

186

public static ArtifactSelectorConfig getCorePlugins() {

187

return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0");

188

}

189

190

public static ArtifactSelectorConfig getCustomPlugins() {

191

return new ArtifactSelectorConfig("USER", "analytics-plugins", "1.5.2");

192

}

193

}

194

195

// Conditional artifact selection

196

public ArtifactSelectorConfig selectArtifact(String pluginName, String environment) {

197

switch (environment.toLowerCase()) {

198

case "prod":

199

return ProductionArtifactSelector.getCorePlugins();

200

case "staging":

201

return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.1-RC1");

202

case "dev":

203

default:

204

return DevelopmentArtifactSelector.getCorePluins();

205

}

206

}

207

```

208

209

**Plugin-specific Artifact Selection:**

210

211

```java

212

// Artifact selection based on plugin capabilities

213

public class PluginArtifactResolver {

214

private static final Map<String, ArtifactSelectorConfig> PLUGIN_ARTIFACTS = new HashMap<>();

215

216

static {

217

// Core plugins

218

PLUGIN_ARTIFACTS.put("Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));

219

PLUGIN_ARTIFACTS.put("File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));

220

PLUGIN_ARTIFACTS.put("Database", new ArtifactSelectorConfig("SYSTEM", "database-plugins", "2.8.0"));

221

222

// Advanced analytics plugins

223

PLUGIN_ARTIFACTS.put("MLTransform", new ArtifactSelectorConfig("USER", "ml-plugins", "2.1.0"));

224

PLUGIN_ARTIFACTS.put("TensorFlow", new ArtifactSelectorConfig("USER", "tensorflow-plugins", "1.3.0"));

225

226

// Cloud-specific plugins

227

PLUGIN_ARTIFACTS.put("BigQuery", new ArtifactSelectorConfig("SYSTEM", "gcp-plugins", "0.20.0"));

228

PLUGIN_ARTIFACTS.put("S3", new ArtifactSelectorConfig("SYSTEM", "aws-plugins", "0.15.0"));

229

}

230

231

public static ArtifactSelectorConfig resolveArtifact(String pluginName) {

232

return PLUGIN_ARTIFACTS.getOrDefault(pluginName, new ArtifactSelectorConfig());

233

}

234

}

235

236

// Usage in plugin creation

237

ETLPlugin createPluginWithArtifact(String pluginName, String pluginType, Map<String, String> properties) {

238

ArtifactSelectorConfig artifact = PluginArtifactResolver.resolveArtifact(pluginName);

239

return new ETLPlugin(pluginName, pluginType, properties, artifact);

240

}

241

```

242

243

### Resource Optimization Patterns

244

245

Strategies for optimizing resource allocation across different pipeline types and data volumes.

246

247

**Data Volume-based Resource Scaling:**

248

249

```java

250

public class ResourceOptimizer {

251

252

/**

253

* Calculate optimal resources based on input data characteristics

254

*/

255

public static Resources optimizeForDataVolume(DataVolumeMetrics metrics) {

256

long recordCount = metrics.getRecordCount();

257

long avgRecordSize = metrics.getAvgRecordSizeBytes();

258

int transformComplexity = metrics.getTransformComplexityScore();

259

260

// Base resource calculation

261

int baseMemoryMB = 1024;

262

int baseCores = 1;

263

264

// Scale based on record count

265

if (recordCount > 10_000_000) {

266

baseMemoryMB *= 8;

267

baseCores *= 8;

268

} else if (recordCount > 1_000_000) {

269

baseMemoryMB *= 4;

270

baseCores *= 4;

271

} else if (recordCount > 100_000) {

272

baseMemoryMB *= 2;

273

baseCores *= 2;

274

}

275

276

// Adjust for record size

277

if (avgRecordSize > 1024) { // Large records (>1KB)

278

baseMemoryMB *= 2;

279

}

280

281

// Adjust for transform complexity

282

if (transformComplexity > 7) { // Complex transformations

283

baseCores *= 2;

284

baseMemoryMB = Math.max(baseMemoryMB, 4096);

285

}

286

287

// Apply resource limits

288

baseMemoryMB = Math.min(baseMemoryMB, 65536); // Max 64GB

289

baseCores = Math.min(baseCores, 64); // Max 64 cores

290

291

return new Resources(baseMemoryMB, baseCores);

292

}

293

294

/**

295

* Optimize resources for streaming pipelines

296

*/

297

public static Resources optimizeForStreaming(String batchInterval, int expectedThroughput) {

298

int intervalSeconds = parseBatchInterval(batchInterval);

299

300

// More frequent batches need more resources

301

int memoryMB = 2048;

302

int cores = 2;

303

304

if (intervalSeconds < 60) { // Sub-minute batches

305

memoryMB *= 4;

306

cores *= 4;

307

} else if (intervalSeconds < 300) { // Sub-5-minute batches

308

memoryMB *= 2;

309

cores *= 2;

310

}

311

312

// Scale based on throughput

313

if (expectedThroughput > 10000) { // High throughput

314

memoryMB *= 2;

315

cores *= 2;

316

}

317

318

return new Resources(memoryMB, cores);

319

}

320

321

private static int parseBatchInterval(String interval) {

322

// Parse interval strings like "30s", "5m", "1h"

323

if (interval.endsWith("s")) {

324

return Integer.parseInt(interval.substring(0, interval.length() - 1));

325

} else if (interval.endsWith("m")) {

326

return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 60;

327

} else if (interval.endsWith("h")) {

328

return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 3600;

329

}

330

return 60; // Default to 1 minute

331

}

332

}

333

```

334

335

**Environment-specific Resource Configuration:**

336

337

```java

338

public class EnvironmentResourceManager {

339

340

public static class ResourceProfile {

341

public final Resources executor;

342

public final Resources driver;

343

public final Resources client;

344

345

public ResourceProfile(Resources executor, Resources driver, Resources client) {

346

this.executor = executor;

347

this.driver = driver;

348

this.client = client;

349

}

350

}

351

352

// Predefined resource profiles for different environments

353

public static final ResourceProfile DEVELOPMENT = new ResourceProfile(

354

new Resources(1024, 1), // Small executor

355

new Resources(512, 1), // Minimal driver

356

new Resources(256, 1) // Minimal client

357

);

358

359

public static final ResourceProfile TESTING = new ResourceProfile(

360

new Resources(2048, 2), // Medium executor

361

new Resources(1024, 1), // Small driver

362

new Resources(512, 1) // Small client

363

);

364

365

public static final ResourceProfile PRODUCTION = new ResourceProfile(

366

new Resources(8192, 8), // Large executor

367

new Resources(4096, 4), // Medium driver

368

new Resources(1024, 2) // Medium client

369

);

370

371

public static final ResourceProfile HIGH_VOLUME = new ResourceProfile(

372

new Resources(32768, 32), // XL executor

373

new Resources(8192, 8), // Large driver

374

new Resources(2048, 4) // Large client

375

);

376

377

public static ResourceProfile getProfileForEnvironment(String environment) {

378

switch (environment.toLowerCase()) {

379

case "prod":

380

case "production":

381

return PRODUCTION;

382

case "staging":

383

case "test":

384

return TESTING;

385

case "high-volume":

386

case "batch":

387

return HIGH_VOLUME;

388

case "dev":

389

case "development":

390

default:

391

return DEVELOPMENT;

392

}

393

}

394

395

// Apply resource profile to pipeline configuration

396

public static ETLBatchConfig.Builder applyResourceProfile(

397

ETLBatchConfig.Builder builder, ResourceProfile profile) {

398

return builder

399

.setResources(profile.executor)

400

.setDriverResources(profile.driver)

401

.setClientResources(profile.client);

402

}

403

}

404

405

// Usage example

406

String environment = System.getProperty("environment", "dev");

407

ResourceProfile profile = EnvironmentResourceManager.getProfileForEnvironment(environment);

408

409

ETLBatchConfig config = EnvironmentResourceManager.applyResourceProfile(

410

ETLBatchConfig.builder(), profile)

411

.addStage(sourceStage)

412

.addStage(transformStage)

413

.addStage(sinkStage)

414

.build();

415

```

416

417

### Resource Monitoring and Adjustment

418

419

Patterns for monitoring resource usage and making dynamic adjustments.

420

421

```java

422

/**

423

* Resource monitoring and adjustment utilities

424

*/

425

public class ResourceMonitor {

426

427

/**

428

* Monitor pipeline resource usage and suggest optimizations

429

*/

430

public static ResourceRecommendation analyzeResourceUsage(PipelineExecutionMetrics metrics) {

431

double cpuUtilization = metrics.getAvgCpuUtilization();

432

double memoryUtilization = metrics.getAvgMemoryUtilization();

433

long executionTimeMs = metrics.getExecutionTimeMs();

434

435

ResourceRecommendation recommendation = new ResourceRecommendation();

436

437

// CPU optimization

438

if (cpuUtilization < 30) {

439

recommendation.suggestCpuReduction();

440

} else if (cpuUtilization > 90) {

441

recommendation.suggestCpuIncrease();

442

}

443

444

// Memory optimization

445

if (memoryUtilization < 40) {

446

recommendation.suggestMemoryReduction();

447

} else if (memoryUtilization > 85) {

448

recommendation.suggestMemoryIncrease();

449

}

450

451

// Execution time optimization

452

if (executionTimeMs > metrics.getSlaTimeMs()) {

453

recommendation.suggestPerformanceImprovement();

454

}

455

456

return recommendation;

457

}

458

459

/**

460

* Auto-scale resources based on historical performance

461

*/

462

public static Resources autoScaleResources(Resources current, List<PipelineExecutionMetrics> history) {

463

if (history.isEmpty()) {

464

return current;

465

}

466

467

// Calculate average resource utilization

468

double avgCpuUtil = history.stream()

469

.mapToDouble(PipelineExecutionMetrics::getAvgCpuUtilization)

470

.average()

471

.orElse(50.0);

472

473

double avgMemoryUtil = history.stream()

474

.mapToDouble(PipelineExecutionMetrics::getAvgMemoryUtilization)

475

.average()

476

.orElse(50.0);

477

478

// Scale conservatively

479

int newCores = current.getVirtualCores();

480

int newMemoryMB = current.getMemoryMB();

481

482

if (avgCpuUtil > 80) {

483

newCores = Math.min(newCores * 2, 64);

484

} else if (avgCpuUtil < 20) {

485

newCores = Math.max(newCores / 2, 1);

486

}

487

488

if (avgMemoryUtil > 80) {

489

newMemoryMB = Math.min(newMemoryMB * 2, 65536);

490

} else if (avgMemoryUtil < 30) {

491

newMemoryMB = Math.max(newMemoryMB / 2, 512);

492

}

493

494

return new Resources(newMemoryMB, newCores);

495

}

496

}

497

```