or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

provisioning.mddocs/

0

# Cloud Provisioning

1

2

## Overview

3

4

Toil's cloud provisioning system enables automatic creation, scaling, and management of compute clusters across major cloud providers. The provisioning system integrates with batch systems to dynamically allocate resources based on workflow demands, supporting auto-scaling, cost optimization through spot instances, and sophisticated resource management policies. This allows workflows to seamlessly scale from single nodes to large distributed clusters without manual infrastructure management.

5

6

## Capabilities

7

8

### Abstract Provisioner Interface

9

{ .api }

10

11

The `AbstractProvisioner` provides the core interface for all cloud provisioning implementations.

12

13

```python

14

from toil.provisioners.abstractProvisioner import AbstractProvisioner

15

from toil.batchSystems import NodeInfo

16

from typing import List, Dict, Optional

17

18

class CustomProvisioner(AbstractProvisioner):

19

"""Custom provisioner implementation for specialized cloud environments."""

20

21

def __init__(self, clusterName: str, clusterType: str, zone: str,

22

nodeStorage: int, nodeStorageType: str, sseKey: Optional[str] = None):

23

"""Initialize provisioner with cluster configuration."""

24

super().__init__(clusterName, clusterType, zone, nodeStorage, nodeStorageType)

25

self.cluster_name = clusterName

26

self.cluster_type = clusterType

27

self.availability_zone = zone

28

self.sseKey = sseKey

29

30

# Initialize cloud provider client

31

self.cloud_client = self.initialize_cloud_client()

32

33

def launchCluster(self, leaderNodeType: str, leaderStorage: int,

34

owner: str, keyName: str, **kwargs) -> None:

35

"""Launch cluster with leader node."""

36

37

# Create leader node

38

leader_config = {

39

'instance_type': leaderNodeType,

40

'storage_size': leaderStorage,

41

'storage_type': self.nodeStorageType,

42

'key_name': keyName,

43

'security_groups': self.create_security_groups(),

44

'user_data': self.get_leader_user_data()

45

}

46

47

if self.sseKey:

48

leader_config['encryption_key'] = self.sseKey

49

50

leader_instance = self.cloud_client.create_instance(leader_config)

51

52

# Wait for leader to be ready

53

self.wait_for_instance_ready(leader_instance.id)

54

55

# Configure leader node

56

self.configure_leader_node(leader_instance)

57

58

# Store cluster metadata

59

self.store_cluster_metadata({

60

'leader_id': leader_instance.id,

61

'cluster_name': self.cluster_name,

62

'owner': owner,

63

'creation_time': time.time()

64

})

65

66

def addNodes(self, nodeTypes: List[NodeInfo], numNodes: int,

67

preemptible: bool = False, spotBid: Optional[float] = None) -> int:

68

"""Add worker nodes to cluster."""

69

70

instances_created = 0

71

72

for node_type in nodeTypes:

73

# Calculate nodes needed for this type

74

nodes_for_type = min(numNodes - instances_created,

75

self.calculate_optimal_node_count(node_type))

76

77

if nodes_for_type <= 0:

78

continue

79

80

# Configure worker node

81

worker_config = {

82

'instance_type': node_type.nodeType,

83

'count': nodes_for_type,

84

'storage_size': self.nodeStorage,

85

'storage_type': self.nodeStorageType,

86

'preemptible': preemptible,

87

'user_data': self.get_worker_user_data()

88

}

89

90

if preemptible and spotBid:

91

worker_config['spot_price'] = spotBid

92

93

# Launch instances

94

instances = self.cloud_client.create_instances(worker_config)

95

96

# Track created instances

97

for instance in instances:

98

self.register_worker_node(instance, node_type)

99

instances_created += 1

100

101

if instances_created >= numNodes:

102

break

103

104

return instances_created

105

106

def terminateNodes(self, nodes: List[str]) -> None:

107

"""Terminate specified worker nodes."""

108

109

for node_id in nodes:

110

try:

111

# Gracefully drain jobs from node

112

self.drain_node_jobs(node_id)

113

114

# Terminate instance

115

self.cloud_client.terminate_instance(node_id)

116

117

# Clean up metadata

118

self.unregister_worker_node(node_id)

119

120

except Exception as e:

121

self.logger.error(f"Failed to terminate node {node_id}: {e}")

122

123

def getNodeShape(self, nodeType: str) -> NodeInfo:

124

"""Get node configuration for specified type."""

125

126

# Query cloud provider for instance specifications

127

instance_info = self.cloud_client.get_instance_type_info(nodeType)

128

129

return NodeInfo(

130

cores=instance_info['cpu_count'],

131

memory=instance_info['memory_mb'] * 1024 * 1024, # Convert to bytes

132

disk=instance_info['storage_gb'] * 1024 * 1024 * 1024, # Convert to bytes

133

preemptible=instance_info.get('preemptible', False),

134

nodeType=nodeType

135

)

136

137

def destroyCluster(self) -> None:

138

"""Destroy entire cluster and clean up resources."""

139

140

# Get all cluster nodes

141

cluster_nodes = self.get_cluster_nodes()

142

143

# Terminate all nodes

144

for node in cluster_nodes:

145

try:

146

self.cloud_client.terminate_instance(node['instance_id'])

147

except Exception as e:

148

self.logger.warning(f"Failed to terminate {node['instance_id']}: {e}")

149

150

# Clean up security groups

151

self.cleanup_security_groups()

152

153

# Remove cluster metadata

154

self.cleanup_cluster_metadata()

155

156

self.logger.info(f"Cluster {self.cluster_name} destroyed")

157

```

158

159

### AWS Provisioner

160

{ .api }

161

162

AWS EC2-based provisioning with comprehensive support for AWS services and features.

163

164

```python

165

from toil.provisioners.aws.awsProvisioner import AWSProvisioner

166

from toil.lib.aws import establish_boto3_session

167

from toil.common import Config

168

169

def setup_aws_provisioning():

170

"""Configure AWS provisioning for scalable workflows."""

171

172

# Basic AWS provisioner configuration

173

config = Config()

174

config.provisioner = "aws"

175

config.nodeTypes = ["m5.large", "m5.xlarge:0.50", "c5.2xlarge"] # type:bid_price

176

config.maxNodes = 100

177

config.minNodes = 0

178

179

# AWS-specific configuration

180

config.awsRegion = "us-west-2"

181

config.zone = "us-west-2a" # Availability zone

182

config.keyPairName = "my-toil-keypair"

183

config.vpcSubnet = "subnet-12345678" # Optional: specific subnet

184

185

# Security and access

186

config.awsEc2ProfileArn = "arn:aws:iam::123456789:instance-profile/ToilRole"

187

config.sseKey = "alias/toil-encryption-key" # KMS encryption

188

189

# Storage configuration

190

config.nodeStorage = 100 # GB per node

191

config.nodeStorageType = "gp3" # EBS volume type

192

193

# Spot instance configuration

194

config.defaultPreemptible = True # Use spot instances by default

195

config.preemptibleCompensation = 1.0 # No compensation for spot

196

config.spotBid = 0.10 # Maximum spot price per hour

197

198

return config

199

200

def advanced_aws_features():

201

"""Demonstrate advanced AWS provisioning features."""

202

203

# Custom AWS provisioner with advanced features

204

provisioner = AWSProvisioner(

205

clusterName="advanced-cluster",

206

clusterType="mesos", # or "kubernetes"

207

zone="us-west-2a",

208

nodeStorage=200,

209

nodeStorageType="io2", # High-performance storage

210

sseKey="alias/my-kms-key"

211

)

212

213

# Launch cluster with custom configuration

214

provisioner.launchCluster(

215

leaderNodeType="m5.2xlarge",

216

leaderStorage=100,

217

owner="research-team",

218

keyName="research-keypair",

219

# Advanced options

220

securityGroupNames=["toil-cluster", "research-access"],

221

userData="""#!/bin/bash

222

# Custom initialization script

223

yum update -y

224

yum install -y docker

225

systemctl start docker

226

systemctl enable docker

227

228

# Install additional tools

229

pip3 install boto3 awscli

230

231

# Configure monitoring

232

yum install -y amazon-cloudwatch-agent

233

""",

234

leaderIamInstanceProfile="arn:aws:iam::123456789:instance-profile/ToilLeader"

235

)

236

237

# Add heterogeneous node types

238

node_types = [

239

NodeInfo(cores=4, memory=16*1024**3, disk=100*1024**3,

240

preemptible=True, nodeType="m5.large"),

241

NodeInfo(cores=16, memory=64*1024**3, disk=500*1024**3,

242

preemptible=False, nodeType="m5.4xlarge"),

243

NodeInfo(cores=32, memory=128*1024**3, disk=1000*1024**3,

244

preemptible=True, nodeType="c5.8xlarge")

245

]

246

247

# Add nodes with mixed instance types

248

provisioner.addNodes(

249

nodeTypes=node_types,

250

numNodes=50,

251

preemptible=True,

252

spotBid=0.25,

253

# Additional AWS options

254

availabilityZone="us-west-2a",

255

subnetID="subnet-12345678",

256

placementGroup="cluster-group" # For high-performance networking

257

)

258

259

return provisioner

260

261

def aws_auto_scaling_policies():

262

"""Configure advanced auto-scaling policies for AWS."""

263

264

class AdvancedAWSProvisioner(AWSProvisioner):

265

"""AWS provisioner with custom scaling logic."""

266

267

def __init__(self, *args, **kwargs):

268

super().__init__(*args, **kwargs)

269

self.scaling_policies = {

270

'scale_up_threshold': 0.8, # CPU utilization to scale up

271

'scale_down_threshold': 0.3, # CPU utilization to scale down

272

'scale_up_cooldown': 300, # 5 minutes cooldown

273

'scale_down_cooldown': 600, # 10 minutes cooldown

274

'max_scale_up': 10, # Max nodes to add at once

275

'max_scale_down': 5 # Max nodes to remove at once

276

}

277

278

def auto_scale_cluster(self):

279

"""Implement custom auto-scaling logic."""

280

281

# Get current cluster metrics

282

cluster_metrics = self.get_cluster_metrics()

283

284

current_nodes = len(self.get_worker_nodes())

285

cpu_utilization = cluster_metrics['avg_cpu_utilization']

286

queue_length = cluster_metrics['job_queue_length']

287

288

# Scale up decision

289

if (cpu_utilization > self.scaling_policies['scale_up_threshold'] or

290

queue_length > current_nodes * 2):

291

292

nodes_to_add = min(

293

self.scaling_policies['max_scale_up'],

294

max(1, queue_length // 5) # 1 node per 5 queued jobs

295

)

296

297

self.scale_up_cluster(nodes_to_add)

298

299

# Scale down decision

300

elif (cpu_utilization < self.scaling_policies['scale_down_threshold'] and

301

queue_length == 0 and current_nodes > 1):

302

303

nodes_to_remove = min(

304

self.scaling_policies['max_scale_down'],

305

max(1, current_nodes // 4) # Remove up to 25% of nodes

306

)

307

308

self.scale_down_cluster(nodes_to_remove)

309

310

def scale_up_cluster(self, node_count: int):

311

"""Scale up cluster with cost optimization."""

312

313

# Prioritize spot instances for cost savings

314

spot_node_types = [nt for nt in self.get_node_types() if nt.preemptible]

315

on_demand_node_types = [nt for nt in self.get_node_types() if not nt.preemptible]

316

317

# Try spot instances first

318

added_nodes = 0

319

if spot_node_types and added_nodes < node_count:

320

spot_nodes = min(node_count - added_nodes, int(node_count * 0.8))

321

added_nodes += self.addNodes(

322

nodeTypes=spot_node_types[:1], # Use most cost-effective

323

numNodes=spot_nodes,

324

preemptible=True,

325

spotBid=self.calculate_optimal_spot_bid()

326

)

327

328

# Add on-demand instances if needed

329

if added_nodes < node_count and on_demand_node_types:

330

remaining_nodes = node_count - added_nodes

331

self.addNodes(

332

nodeTypes=on_demand_node_types[:1],

333

numNodes=remaining_nodes,

334

preemptible=False

335

)

336

337

def calculate_optimal_spot_bid(self) -> float:

338

"""Calculate optimal spot instance bid price."""

339

340

# Get current spot price history

341

spot_history = self.get_spot_price_history()

342

343

# Calculate bid as 110% of average recent price

344

recent_prices = [price for price in spot_history[-24:]] # Last 24 hours

345

avg_price = sum(recent_prices) / len(recent_prices)

346

optimal_bid = avg_price * 1.1

347

348

return min(optimal_bid, self.maxSpotBid) # Cap at maximum allowed

349

```

350

351

### Google Cloud Provisioner

352

{ .api }

353

354

Google Compute Engine provisioning with support for GCP-specific features.

355

356

```python

357

from toil.provisioners.gceProvisioner import GCEProvisioner

358

359

def setup_gce_provisioning():

360

"""Configure Google Cloud provisioning."""

361

362

config = Config()

363

config.provisioner = "gce"

364

config.nodeTypes = ["n1-standard-2", "n1-standard-4", "n1-highmem-8"]

365

config.maxNodes = 200

366

367

# GCP-specific configuration

368

config.gcpRegion = "us-central1"

369

config.zone = "us-central1-a"

370

config.gcpProjectID = "my-research-project"

371

config.gcpServiceAccountEmail = "toil-service@my-project.iam.gserviceaccount.com"

372

373

# Authentication

374

config.googleCredentials = "/path/to/service-account.json"

375

376

# Networking

377

config.gcpNetwork = "default"

378

config.gcpSubnet = "default"

379

380

# Storage and encryption

381

config.nodeStorage = 100

382

config.nodeStorageType = "pd-ssd" # SSD persistent disk

383

config.gcpDiskEncryption = True

384

385

# Preemptible instances (Google's spot instances)

386

config.defaultPreemptible = True

387

config.preemptibleCompensation = 1.0

388

389

return config

390

391

def advanced_gce_features():

392

"""Advanced Google Cloud Engine features."""

393

394

provisioner = GCEProvisioner(

395

clusterName="gce-research-cluster",

396

clusterType="kubernetes",

397

zone="us-central1-b",

398

nodeStorage=200,

399

nodeStorageType="pd-ssd"

400

)

401

402

# Launch with custom machine types

403

provisioner.launchCluster(

404

leaderNodeType="custom-4-8192", # 4 vCPUs, 8GB RAM

405

leaderStorage=100,

406

owner="research-team",

407

keyName=None, # GCE uses SSH keys differently

408

# GCE-specific options

409

machineImage="cos-cloud/cos-stable", # Container-optimized OS

410

networkTags=["toil-cluster", "research"],

411

serviceAccountScopes=[

412

"https://www.googleapis.com/auth/cloud-platform",

413

"https://www.googleapis.com/auth/storage-full"

414

],

415

startupScript="""#!/bin/bash

416

# Install Docker and Kubernetes tools

417

curl -fsSL https://get.docker.com -o get-docker.sh

418

sh get-docker.sh

419

420

# Configure container runtime

421

systemctl start docker

422

systemctl enable docker

423

424

# Install kubectl

425

curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"

426

chmod +x kubectl

427

mv kubectl /usr/local/bin/

428

"""

429

)

430

431

# Add GPU-enabled nodes

432

gpu_node_types = [

433

NodeInfo(

434

cores=8,

435

memory=32*1024**3,

436

disk=200*1024**3,

437

preemptible=True,

438

nodeType="n1-standard-8",

439

# GCE-specific: GPU configuration

440

accelerators=[{

441

"acceleratorCount": 1,

442

"acceleratorType": "nvidia-tesla-k80"

443

}]

444

)

445

]

446

447

provisioner.addNodes(

448

nodeTypes=gpu_node_types,

449

numNodes=5,

450

preemptible=True,

451

# GPU-specific configuration

452

gpuType="nvidia-tesla-k80",

453

gpuCount=1,

454

installGpuDrivers=True

455

)

456

457

return provisioner

458

```

459

460

### Azure Provisioner

461

{ .api }

462

463

Microsoft Azure provisioning integration with Azure-specific capabilities.

464

465

```python

466

from toil.provisioners.azure import AzureProvisioner

467

468

def setup_azure_provisioning():

469

"""Configure Azure provisioning."""

470

471

config = Config()

472

config.provisioner = "azure"

473

config.nodeTypes = ["Standard_D2s_v3", "Standard_D4s_v3", "Standard_F8s_v2"]

474

config.maxNodes = 150

475

476

# Azure-specific configuration

477

config.azureRegion = "West US 2"

478

config.azureResourceGroup = "toil-research-rg"

479

config.azureStorageAccount = "toilstorageaccount"

480

481

# Authentication

482

config.azureSubscriptionId = "12345678-1234-1234-1234-123456789abc"

483

config.azureTenantId = "87654321-4321-4321-4321-cba987654321"

484

config.azureClientId = "abcdef01-2345-6789-abcd-ef0123456789"

485

config.azureClientSecret = "your-client-secret"

486

487

# Networking

488

config.azureVirtualNetwork = "toil-vnet"

489

config.azureSubnet = "toil-subnet"

490

491

# Storage

492

config.nodeStorage = 128

493

config.nodeStorageType = "Premium_LRS" # Premium SSD

494

495

# Spot instances

496

config.defaultPreemptible = True

497

config.azureSpotMaxPrice = 0.15 # Maximum hourly price

498

499

return config

500

501

def advanced_azure_features():

502

"""Advanced Azure provisioning features."""

503

504

provisioner = AzureProvisioner(

505

clusterName="azure-hpc-cluster",

506

clusterType="batch",

507

zone="West US 2",

508

nodeStorage=256,

509

nodeStorageType="Premium_LRS"

510

)

511

512

# Launch with Azure-specific configuration

513

provisioner.launchCluster(

514

leaderNodeType="Standard_D8s_v3",

515

leaderStorage=128,

516

owner="hpc-team",

517

keyName="azure-keypair",

518

# Azure-specific options

519

vmImage="Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:latest",

520

availabilitySet="toil-availability-set",

521

networkSecurityGroup="toil-nsg",

522

customData="""#!/bin/bash

523

# Azure-specific initialization

524

apt-get update

525

apt-get install -y docker.io

526

527

# Configure Azure CLI

528

curl -sL https://aka.ms/InstallAzureCLIDeb | bash

529

530

# Install Azure batch tools

531

pip3 install azure-batch azure-storage-blob

532

"""

533

)

534

535

# Add high-performance computing nodes

536

hpc_node_types = [

537

NodeInfo(

538

cores=16,

539

memory=128*1024**3,

540

disk=500*1024**3,

541

preemptible=False,

542

nodeType="Standard_H16r" # High-performance computing instance

543

),

544

NodeInfo(

545

cores=44,

546

memory=352*1024**3,

547

disk=1000*1024**3,

548

preemptible=True,

549

nodeType="Standard_H44rs" # High-memory HPC instance

550

)

551

]

552

553

provisioner.addNodes(

554

nodeTypes=hpc_node_types,

555

numNodes=20,

556

preemptible=True,

557

# Azure-specific HPC configuration

558

enableAcceleratedNetworking=True,

559

enableInfiniBand=True, # For HPC workloads

560

proximityPlacementGroup="hpc-placement-group"

561

)

562

563

return provisioner

564

```

565

566

### Cluster Management and Monitoring

567

{ .api }

568

569

Comprehensive cluster management utilities and monitoring capabilities.

570

571

```python

572

from toil.provisioners.clusterScaler import ClusterScaler

573

import time

574

import logging

575

576

def cluster_lifecycle_management():

577

"""Comprehensive cluster lifecycle management."""

578

579

class ClusterManager:

580

"""High-level cluster management interface."""

581

582

def __init__(self, provisioner: AbstractProvisioner):

583

self.provisioner = provisioner

584

self.logger = logging.getLogger(__name__)

585

self.metrics = {

586

'nodes_launched': 0,

587

'nodes_terminated': 0,

588

'cost_savings': 0.0,

589

'uptime': 0.0

590

}

591

592

def deploy_cluster(self, config: dict) -> str:

593

"""Deploy cluster with comprehensive configuration."""

594

595

start_time = time.time()

596

597

try:

598

# Launch leader node

599

self.provisioner.launchCluster(

600

leaderNodeType=config['leader_type'],

601

leaderStorage=config['leader_storage'],

602

owner=config['owner'],

603

keyName=config['key_name']

604

)

605

606

# Wait for leader to be ready

607

self.wait_for_leader_ready(timeout=600)

608

609

# Add initial worker nodes

610

if config.get('initial_workers', 0) > 0:

611

self.add_workers(

612

node_types=config['worker_types'],

613

count=config['initial_workers'],

614

preemptible=config.get('use_spot', True)

615

)

616

617

# Configure monitoring

618

self.setup_cluster_monitoring()

619

620

# Configure auto-scaling

621

if config.get('auto_scaling', False):

622

self.enable_auto_scaling(config['scaling_config'])

623

624

deployment_time = time.time() - start_time

625

self.logger.info(f"Cluster deployed in {deployment_time:.2f} seconds")

626

627

return self.provisioner.cluster_name

628

629

except Exception as e:

630

self.logger.error(f"Cluster deployment failed: {e}")

631

# Cleanup on failure

632

self.cleanup_failed_deployment()

633

raise

634

635

def add_workers(self, node_types: List[NodeInfo], count: int,

636

preemptible: bool = True) -> int:

637

"""Add worker nodes with monitoring."""

638

639

nodes_added = self.provisioner.addNodes(

640

nodeTypes=node_types,

641

numNodes=count,

642

preemptible=preemptible

643

)

644

645

self.metrics['nodes_launched'] += nodes_added

646

647

# Monitor node startup

648

self.monitor_node_startup(nodes_added)

649

650

return nodes_added

651

652

def remove_workers(self, node_count: int = None,

653

node_ids: List[str] = None) -> int:

654

"""Remove worker nodes gracefully."""

655

656

if node_ids:

657

nodes_to_remove = node_ids

658

else:

659

# Select nodes to remove (prefer spot instances)

660

all_nodes = self.get_cluster_nodes()

661

spot_nodes = [n for n in all_nodes if n.preemptible]

662

nodes_to_remove = spot_nodes[:node_count] if node_count else spot_nodes

663

664

# Gracefully drain jobs

665

for node_id in nodes_to_remove:

666

self.drain_node(node_id)

667

668

# Terminate nodes

669

self.provisioner.terminateNodes([n.id for n in nodes_to_remove])

670

671

self.metrics['nodes_terminated'] += len(nodes_to_remove)

672

673

return len(nodes_to_remove)

674

675

def monitor_cluster_health(self):

676

"""Monitor cluster health and performance."""

677

678

health_metrics = {

679

'total_nodes': 0,

680

'healthy_nodes': 0,

681

'unhealthy_nodes': 0,

682

'avg_cpu_usage': 0.0,

683

'avg_memory_usage': 0.0,

684

'job_queue_length': 0

685

}

686

687

cluster_nodes = self.get_cluster_nodes()

688

health_metrics['total_nodes'] = len(cluster_nodes)

689

690

for node in cluster_nodes:

691

node_health = self.check_node_health(node)

692

693

if node_health['healthy']:

694

health_metrics['healthy_nodes'] += 1

695

health_metrics['avg_cpu_usage'] += node_health['cpu_usage']

696

health_metrics['avg_memory_usage'] += node_health['memory_usage']

697

else:

698

health_metrics['unhealthy_nodes'] += 1

699

700

# Calculate averages

701

if health_metrics['healthy_nodes'] > 0:

702

health_metrics['avg_cpu_usage'] /= health_metrics['healthy_nodes']

703

health_metrics['avg_memory_usage'] /= health_metrics['healthy_nodes']

704

705

# Get job queue status

706

health_metrics['job_queue_length'] = self.get_job_queue_length()

707

708

# Log health status

709

self.logger.info(f"Cluster Health: {health_metrics}")

710

711

# Take corrective actions if needed

712

if health_metrics['unhealthy_nodes'] > 0:

713

self.handle_unhealthy_nodes(cluster_nodes)

714

715

return health_metrics

716

717

def optimize_costs(self):

718

"""Optimize cluster costs through intelligent resource management."""

719

720

# Get current pricing and usage

721

current_costs = self.calculate_current_costs()

722

723

# Analyze spot instance opportunities

724

spot_savings = self.analyze_spot_opportunities()

725

726

# Implement cost optimizations

727

if spot_savings['potential_savings'] > 0.2: # 20% savings threshold

728

self.implement_spot_optimization(spot_savings)

729

730

# Right-size instances based on actual usage

731

resize_recommendations = self.analyze_instance_utilization()

732

if resize_recommendations:

733

self.implement_instance_resizing(resize_recommendations)

734

735

# Clean up unused resources

736

self.cleanup_unused_resources()

737

738

new_costs = self.calculate_current_costs()

739

savings = current_costs - new_costs

740

self.metrics['cost_savings'] += savings

741

742

self.logger.info(f"Cost optimization saved ${savings:.2f}/hour")

743

744

def destroy_cluster(self):

745

"""Safely destroy cluster with cleanup."""

746

747

try:

748

# Stop all jobs gracefully

749

self.stop_all_jobs(timeout=300)

750

751

# Save final metrics

752

self.save_cluster_metrics()

753

754

# Destroy infrastructure

755

self.provisioner.destroyCluster()

756

757

self.logger.info("Cluster destroyed successfully")

758

759

except Exception as e:

760

self.logger.error(f"Error during cluster destruction: {e}")

761

# Force cleanup

762

self.force_cleanup()

763

764

def cluster_auto_scaling():

765

"""Advanced auto-scaling implementation."""

766

767

class AutoScaler:

768

"""Intelligent cluster auto-scaler."""

769

770

def __init__(self, provisioner: AbstractProvisioner, config: dict):

771

self.provisioner = provisioner

772

self.config = config

773

self.scaling_history = []

774

775

def run_scaling_loop(self):

776

"""Main auto-scaling control loop."""

777

778

while True:

779

try:

780

# Collect metrics

781

metrics = self.collect_scaling_metrics()

782

783

# Make scaling decision

784

scaling_decision = self.make_scaling_decision(metrics)

785

786

# Execute scaling action

787

if scaling_decision['action'] != 'none':

788

self.execute_scaling_action(scaling_decision)

789

790

# Wait before next evaluation

791

time.sleep(self.config.get('evaluation_interval', 60))

792

793

except Exception as e:

794

logging.error(f"Auto-scaling error: {e}")

795

time.sleep(30) # Short delay on error

796

797

def collect_scaling_metrics(self) -> dict:

798

"""Collect metrics for scaling decisions."""

799

800

return {

801

'cpu_utilization': self.get_cluster_cpu_utilization(),

802

'memory_utilization': self.get_cluster_memory_utilization(),

803

'job_queue_length': self.get_job_queue_length(),

804

'pending_jobs': self.get_pending_jobs_count(),

805

'node_count': len(self.get_cluster_nodes()),

806

'spot_instance_interruptions': self.get_recent_interruptions()

807

}

808

809

def make_scaling_decision(self, metrics: dict) -> dict:

810

"""Intelligent scaling decision based on multiple factors."""

811

812

decision = {'action': 'none', 'node_count': 0, 'node_types': []}

813

814

# Scale up conditions

815

if (metrics['cpu_utilization'] > 0.8 or

816

metrics['job_queue_length'] > metrics['node_count'] * 2):

817

818

decision['action'] = 'scale_up'

819

decision['node_count'] = self.calculate_scale_up_amount(metrics)

820

decision['node_types'] = self.select_optimal_node_types(metrics)

821

822

# Scale down conditions

823

elif (metrics['cpu_utilization'] < 0.3 and

824

metrics['job_queue_length'] == 0 and

825

metrics['node_count'] > 1):

826

827

decision['action'] = 'scale_down'

828

decision['node_count'] = self.calculate_scale_down_amount(metrics)

829

830

# Record decision

831

self.scaling_history.append({

832

'timestamp': time.time(),

833

'metrics': metrics,

834

'decision': decision

835

})

836

837

return decision

838

839

def calculate_scale_up_amount(self, metrics: dict) -> int:

840

"""Calculate optimal number of nodes to add."""

841

842

# Base calculation on queue length and utilization

843

queue_based = max(1, metrics['job_queue_length'] // 3)

844

utilization_based = max(1, int(metrics['node_count'] * 0.3))

845

846

# Consider recent interruptions

847

interruption_buffer = metrics['spot_instance_interruptions']

848

849

# Cap scaling to prevent over-provisioning

850

max_scale = self.config.get('max_scale_up', 10)

851

852

return min(max_scale, max(queue_based, utilization_based) + interruption_buffer)

853

```

854

855

This cloud provisioning system provides comprehensive, intelligent cluster management with cost optimization, auto-scaling, and multi-cloud support for scalable workflow execution.