0
# Cloud Provisioning
1
2
## Overview
3
4
Toil's cloud provisioning system enables automatic creation, scaling, and management of compute clusters across major cloud providers. The provisioning system integrates with batch systems to dynamically allocate resources based on workflow demands, supporting auto-scaling, cost optimization through spot instances, and sophisticated resource management policies. This allows workflows to seamlessly scale from single nodes to large distributed clusters without manual infrastructure management.
5
6
## Capabilities
7
8
### Abstract Provisioner Interface
9
{ .api }
10
11
The `AbstractProvisioner` provides the core interface for all cloud provisioning implementations.
12
13
```python
14
from toil.provisioners.abstractProvisioner import AbstractProvisioner
15
from toil.batchSystems import NodeInfo
16
from typing import List, Dict, Optional
17
18
class CustomProvisioner(AbstractProvisioner):
19
"""Custom provisioner implementation for specialized cloud environments."""
20
21
def __init__(self, clusterName: str, clusterType: str, zone: str,
22
nodeStorage: int, nodeStorageType: str, sseKey: Optional[str] = None):
23
"""Initialize provisioner with cluster configuration."""
24
super().__init__(clusterName, clusterType, zone, nodeStorage, nodeStorageType)
25
self.cluster_name = clusterName
26
self.cluster_type = clusterType
27
self.availability_zone = zone
28
self.sseKey = sseKey
29
30
# Initialize cloud provider client
31
self.cloud_client = self.initialize_cloud_client()
32
33
def launchCluster(self, leaderNodeType: str, leaderStorage: int,
34
owner: str, keyName: str, **kwargs) -> None:
35
"""Launch cluster with leader node."""
36
37
# Create leader node
38
leader_config = {
39
'instance_type': leaderNodeType,
40
'storage_size': leaderStorage,
41
'storage_type': self.nodeStorageType,
42
'key_name': keyName,
43
'security_groups': self.create_security_groups(),
44
'user_data': self.get_leader_user_data()
45
}
46
47
if self.sseKey:
48
leader_config['encryption_key'] = self.sseKey
49
50
leader_instance = self.cloud_client.create_instance(leader_config)
51
52
# Wait for leader to be ready
53
self.wait_for_instance_ready(leader_instance.id)
54
55
# Configure leader node
56
self.configure_leader_node(leader_instance)
57
58
# Store cluster metadata
59
self.store_cluster_metadata({
60
'leader_id': leader_instance.id,
61
'cluster_name': self.cluster_name,
62
'owner': owner,
63
'creation_time': time.time()
64
})
65
66
def addNodes(self, nodeTypes: List[NodeInfo], numNodes: int,
67
preemptible: bool = False, spotBid: Optional[float] = None) -> int:
68
"""Add worker nodes to cluster."""
69
70
instances_created = 0
71
72
for node_type in nodeTypes:
73
# Calculate nodes needed for this type
74
nodes_for_type = min(numNodes - instances_created,
75
self.calculate_optimal_node_count(node_type))
76
77
if nodes_for_type <= 0:
78
continue
79
80
# Configure worker node
81
worker_config = {
82
'instance_type': node_type.nodeType,
83
'count': nodes_for_type,
84
'storage_size': self.nodeStorage,
85
'storage_type': self.nodeStorageType,
86
'preemptible': preemptible,
87
'user_data': self.get_worker_user_data()
88
}
89
90
if preemptible and spotBid:
91
worker_config['spot_price'] = spotBid
92
93
# Launch instances
94
instances = self.cloud_client.create_instances(worker_config)
95
96
# Track created instances
97
for instance in instances:
98
self.register_worker_node(instance, node_type)
99
instances_created += 1
100
101
if instances_created >= numNodes:
102
break
103
104
return instances_created
105
106
def terminateNodes(self, nodes: List[str]) -> None:
107
"""Terminate specified worker nodes."""
108
109
for node_id in nodes:
110
try:
111
# Gracefully drain jobs from node
112
self.drain_node_jobs(node_id)
113
114
# Terminate instance
115
self.cloud_client.terminate_instance(node_id)
116
117
# Clean up metadata
118
self.unregister_worker_node(node_id)
119
120
except Exception as e:
121
self.logger.error(f"Failed to terminate node {node_id}: {e}")
122
123
def getNodeShape(self, nodeType: str) -> NodeInfo:
124
"""Get node configuration for specified type."""
125
126
# Query cloud provider for instance specifications
127
instance_info = self.cloud_client.get_instance_type_info(nodeType)
128
129
return NodeInfo(
130
cores=instance_info['cpu_count'],
131
memory=instance_info['memory_mb'] * 1024 * 1024, # Convert to bytes
132
disk=instance_info['storage_gb'] * 1024 * 1024 * 1024, # Convert to bytes
133
preemptible=instance_info.get('preemptible', False),
134
nodeType=nodeType
135
)
136
137
def destroyCluster(self) -> None:
138
"""Destroy entire cluster and clean up resources."""
139
140
# Get all cluster nodes
141
cluster_nodes = self.get_cluster_nodes()
142
143
# Terminate all nodes
144
for node in cluster_nodes:
145
try:
146
self.cloud_client.terminate_instance(node['instance_id'])
147
except Exception as e:
148
self.logger.warning(f"Failed to terminate {node['instance_id']}: {e}")
149
150
# Clean up security groups
151
self.cleanup_security_groups()
152
153
# Remove cluster metadata
154
self.cleanup_cluster_metadata()
155
156
self.logger.info(f"Cluster {self.cluster_name} destroyed")
157
```
158
159
### AWS Provisioner
160
{ .api }
161
162
AWS EC2-based provisioning with comprehensive support for AWS services and features.
163
164
```python
165
from toil.provisioners.aws.awsProvisioner import AWSProvisioner
166
from toil.lib.aws import establish_boto3_session
167
from toil.common import Config
168
169
def setup_aws_provisioning():
170
"""Configure AWS provisioning for scalable workflows."""
171
172
# Basic AWS provisioner configuration
173
config = Config()
174
config.provisioner = "aws"
175
config.nodeTypes = ["m5.large", "m5.xlarge:0.50", "c5.2xlarge"] # type:bid_price
176
config.maxNodes = 100
177
config.minNodes = 0
178
179
# AWS-specific configuration
180
config.awsRegion = "us-west-2"
181
config.zone = "us-west-2a" # Availability zone
182
config.keyPairName = "my-toil-keypair"
183
config.vpcSubnet = "subnet-12345678" # Optional: specific subnet
184
185
# Security and access
186
config.awsEc2ProfileArn = "arn:aws:iam::123456789:instance-profile/ToilRole"
187
config.sseKey = "alias/toil-encryption-key" # KMS encryption
188
189
# Storage configuration
190
config.nodeStorage = 100 # GB per node
191
config.nodeStorageType = "gp3" # EBS volume type
192
193
# Spot instance configuration
194
config.defaultPreemptible = True # Use spot instances by default
195
config.preemptibleCompensation = 1.0 # No compensation for spot
196
config.spotBid = 0.10 # Maximum spot price per hour
197
198
return config
199
200
def advanced_aws_features():
201
"""Demonstrate advanced AWS provisioning features."""
202
203
# Custom AWS provisioner with advanced features
204
provisioner = AWSProvisioner(
205
clusterName="advanced-cluster",
206
clusterType="mesos", # or "kubernetes"
207
zone="us-west-2a",
208
nodeStorage=200,
209
nodeStorageType="io2", # High-performance storage
210
sseKey="alias/my-kms-key"
211
)
212
213
# Launch cluster with custom configuration
214
provisioner.launchCluster(
215
leaderNodeType="m5.2xlarge",
216
leaderStorage=100,
217
owner="research-team",
218
keyName="research-keypair",
219
# Advanced options
220
securityGroupNames=["toil-cluster", "research-access"],
221
userData="""#!/bin/bash
222
# Custom initialization script
223
yum update -y
224
yum install -y docker
225
systemctl start docker
226
systemctl enable docker
227
228
# Install additional tools
229
pip3 install boto3 awscli
230
231
# Configure monitoring
232
yum install -y amazon-cloudwatch-agent
233
""",
234
leaderIamInstanceProfile="arn:aws:iam::123456789:instance-profile/ToilLeader"
235
)
236
237
# Add heterogeneous node types
238
node_types = [
239
NodeInfo(cores=4, memory=16*1024**3, disk=100*1024**3,
240
preemptible=True, nodeType="m5.large"),
241
NodeInfo(cores=16, memory=64*1024**3, disk=500*1024**3,
242
preemptible=False, nodeType="m5.4xlarge"),
243
NodeInfo(cores=32, memory=128*1024**3, disk=1000*1024**3,
244
preemptible=True, nodeType="c5.8xlarge")
245
]
246
247
# Add nodes with mixed instance types
248
provisioner.addNodes(
249
nodeTypes=node_types,
250
numNodes=50,
251
preemptible=True,
252
spotBid=0.25,
253
# Additional AWS options
254
availabilityZone="us-west-2a",
255
subnetID="subnet-12345678",
256
placementGroup="cluster-group" # For high-performance networking
257
)
258
259
return provisioner
260
261
def aws_auto_scaling_policies():
262
"""Configure advanced auto-scaling policies for AWS."""
263
264
class AdvancedAWSProvisioner(AWSProvisioner):
265
"""AWS provisioner with custom scaling logic."""
266
267
def __init__(self, *args, **kwargs):
268
super().__init__(*args, **kwargs)
269
self.scaling_policies = {
270
'scale_up_threshold': 0.8, # CPU utilization to scale up
271
'scale_down_threshold': 0.3, # CPU utilization to scale down
272
'scale_up_cooldown': 300, # 5 minutes cooldown
273
'scale_down_cooldown': 600, # 10 minutes cooldown
274
'max_scale_up': 10, # Max nodes to add at once
275
'max_scale_down': 5 # Max nodes to remove at once
276
}
277
278
def auto_scale_cluster(self):
279
"""Implement custom auto-scaling logic."""
280
281
# Get current cluster metrics
282
cluster_metrics = self.get_cluster_metrics()
283
284
current_nodes = len(self.get_worker_nodes())
285
cpu_utilization = cluster_metrics['avg_cpu_utilization']
286
queue_length = cluster_metrics['job_queue_length']
287
288
# Scale up decision
289
if (cpu_utilization > self.scaling_policies['scale_up_threshold'] or
290
queue_length > current_nodes * 2):
291
292
nodes_to_add = min(
293
self.scaling_policies['max_scale_up'],
294
max(1, queue_length // 5) # 1 node per 5 queued jobs
295
)
296
297
self.scale_up_cluster(nodes_to_add)
298
299
# Scale down decision
300
elif (cpu_utilization < self.scaling_policies['scale_down_threshold'] and
301
queue_length == 0 and current_nodes > 1):
302
303
nodes_to_remove = min(
304
self.scaling_policies['max_scale_down'],
305
max(1, current_nodes // 4) # Remove up to 25% of nodes
306
)
307
308
self.scale_down_cluster(nodes_to_remove)
309
310
def scale_up_cluster(self, node_count: int):
311
"""Scale up cluster with cost optimization."""
312
313
# Prioritize spot instances for cost savings
314
spot_node_types = [nt for nt in self.get_node_types() if nt.preemptible]
315
on_demand_node_types = [nt for nt in self.get_node_types() if not nt.preemptible]
316
317
# Try spot instances first
318
added_nodes = 0
319
if spot_node_types and added_nodes < node_count:
320
spot_nodes = min(node_count - added_nodes, int(node_count * 0.8))
321
added_nodes += self.addNodes(
322
nodeTypes=spot_node_types[:1], # Use most cost-effective
323
numNodes=spot_nodes,
324
preemptible=True,
325
spotBid=self.calculate_optimal_spot_bid()
326
)
327
328
# Add on-demand instances if needed
329
if added_nodes < node_count and on_demand_node_types:
330
remaining_nodes = node_count - added_nodes
331
self.addNodes(
332
nodeTypes=on_demand_node_types[:1],
333
numNodes=remaining_nodes,
334
preemptible=False
335
)
336
337
def calculate_optimal_spot_bid(self) -> float:
338
"""Calculate optimal spot instance bid price."""
339
340
# Get current spot price history
341
spot_history = self.get_spot_price_history()
342
343
# Calculate bid as 110% of average recent price
344
recent_prices = [price for price in spot_history[-24:]] # Last 24 hours
345
avg_price = sum(recent_prices) / len(recent_prices)
346
optimal_bid = avg_price * 1.1
347
348
return min(optimal_bid, self.maxSpotBid) # Cap at maximum allowed
349
```
350
351
### Google Cloud Provisioner
352
{ .api }
353
354
Google Compute Engine provisioning with support for GCP-specific features.
355
356
```python
357
from toil.provisioners.gceProvisioner import GCEProvisioner
358
359
def setup_gce_provisioning():
360
"""Configure Google Cloud provisioning."""
361
362
config = Config()
363
config.provisioner = "gce"
364
config.nodeTypes = ["n1-standard-2", "n1-standard-4", "n1-highmem-8"]
365
config.maxNodes = 200
366
367
# GCP-specific configuration
368
config.gcpRegion = "us-central1"
369
config.zone = "us-central1-a"
370
config.gcpProjectID = "my-research-project"
371
config.gcpServiceAccountEmail = "toil-service@my-project.iam.gserviceaccount.com"
372
373
# Authentication
374
config.googleCredentials = "/path/to/service-account.json"
375
376
# Networking
377
config.gcpNetwork = "default"
378
config.gcpSubnet = "default"
379
380
# Storage and encryption
381
config.nodeStorage = 100
382
config.nodeStorageType = "pd-ssd" # SSD persistent disk
383
config.gcpDiskEncryption = True
384
385
# Preemptible instances (Google's spot instances)
386
config.defaultPreemptible = True
387
config.preemptibleCompensation = 1.0
388
389
return config
390
391
def advanced_gce_features():
392
"""Advanced Google Cloud Engine features."""
393
394
provisioner = GCEProvisioner(
395
clusterName="gce-research-cluster",
396
clusterType="kubernetes",
397
zone="us-central1-b",
398
nodeStorage=200,
399
nodeStorageType="pd-ssd"
400
)
401
402
# Launch with custom machine types
403
provisioner.launchCluster(
404
leaderNodeType="custom-4-8192", # 4 vCPUs, 8GB RAM
405
leaderStorage=100,
406
owner="research-team",
407
keyName=None, # GCE uses SSH keys differently
408
# GCE-specific options
409
machineImage="cos-cloud/cos-stable", # Container-optimized OS
410
networkTags=["toil-cluster", "research"],
411
serviceAccountScopes=[
412
"https://www.googleapis.com/auth/cloud-platform",
413
"https://www.googleapis.com/auth/storage-full"
414
],
415
startupScript="""#!/bin/bash
416
# Install Docker and Kubernetes tools
417
curl -fsSL https://get.docker.com -o get-docker.sh
418
sh get-docker.sh
419
420
# Configure container runtime
421
systemctl start docker
422
systemctl enable docker
423
424
# Install kubectl
425
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
426
chmod +x kubectl
427
mv kubectl /usr/local/bin/
428
"""
429
)
430
431
# Add GPU-enabled nodes
432
gpu_node_types = [
433
NodeInfo(
434
cores=8,
435
memory=32*1024**3,
436
disk=200*1024**3,
437
preemptible=True,
438
nodeType="n1-standard-8",
439
# GCE-specific: GPU configuration
440
accelerators=[{
441
"acceleratorCount": 1,
442
"acceleratorType": "nvidia-tesla-k80"
443
}]
444
)
445
]
446
447
provisioner.addNodes(
448
nodeTypes=gpu_node_types,
449
numNodes=5,
450
preemptible=True,
451
# GPU-specific configuration
452
gpuType="nvidia-tesla-k80",
453
gpuCount=1,
454
installGpuDrivers=True
455
)
456
457
return provisioner
458
```
459
460
### Azure Provisioner
461
{ .api }
462
463
Microsoft Azure provisioning integration with Azure-specific capabilities.
464
465
```python
466
from toil.provisioners.azure import AzureProvisioner
467
468
def setup_azure_provisioning():
469
"""Configure Azure provisioning."""
470
471
config = Config()
472
config.provisioner = "azure"
473
config.nodeTypes = ["Standard_D2s_v3", "Standard_D4s_v3", "Standard_F8s_v2"]
474
config.maxNodes = 150
475
476
# Azure-specific configuration
477
config.azureRegion = "West US 2"
478
config.azureResourceGroup = "toil-research-rg"
479
config.azureStorageAccount = "toilstorageaccount"
480
481
# Authentication
482
config.azureSubscriptionId = "12345678-1234-1234-1234-123456789abc"
483
config.azureTenantId = "87654321-4321-4321-4321-cba987654321"
484
config.azureClientId = "abcdef01-2345-6789-abcd-ef0123456789"
485
config.azureClientSecret = "your-client-secret"
486
487
# Networking
488
config.azureVirtualNetwork = "toil-vnet"
489
config.azureSubnet = "toil-subnet"
490
491
# Storage
492
config.nodeStorage = 128
493
config.nodeStorageType = "Premium_LRS" # Premium SSD
494
495
# Spot instances
496
config.defaultPreemptible = True
497
config.azureSpotMaxPrice = 0.15 # Maximum hourly price
498
499
return config
500
501
def advanced_azure_features():
502
"""Advanced Azure provisioning features."""
503
504
provisioner = AzureProvisioner(
505
clusterName="azure-hpc-cluster",
506
clusterType="batch",
507
zone="West US 2",
508
nodeStorage=256,
509
nodeStorageType="Premium_LRS"
510
)
511
512
# Launch with Azure-specific configuration
513
provisioner.launchCluster(
514
leaderNodeType="Standard_D8s_v3",
515
leaderStorage=128,
516
owner="hpc-team",
517
keyName="azure-keypair",
518
# Azure-specific options
519
vmImage="Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:latest",
520
availabilitySet="toil-availability-set",
521
networkSecurityGroup="toil-nsg",
522
customData="""#!/bin/bash
523
# Azure-specific initialization
524
apt-get update
525
apt-get install -y docker.io
526
527
# Configure Azure CLI
528
curl -sL https://aka.ms/InstallAzureCLIDeb | bash
529
530
# Install Azure batch tools
531
pip3 install azure-batch azure-storage-blob
532
"""
533
)
534
535
# Add high-performance computing nodes
536
hpc_node_types = [
537
NodeInfo(
538
cores=16,
539
memory=128*1024**3,
540
disk=500*1024**3,
541
preemptible=False,
542
nodeType="Standard_H16r" # High-performance computing instance
543
),
544
NodeInfo(
545
cores=44,
546
memory=352*1024**3,
547
disk=1000*1024**3,
548
preemptible=True,
549
nodeType="Standard_H44rs" # High-memory HPC instance
550
)
551
]
552
553
provisioner.addNodes(
554
nodeTypes=hpc_node_types,
555
numNodes=20,
556
preemptible=True,
557
# Azure-specific HPC configuration
558
enableAcceleratedNetworking=True,
559
enableInfiniBand=True, # For HPC workloads
560
proximityPlacementGroup="hpc-placement-group"
561
)
562
563
return provisioner
564
```
565
566
### Cluster Management and Monitoring
567
{ .api }
568
569
Comprehensive cluster management utilities and monitoring capabilities.
570
571
```python
572
from toil.provisioners.clusterScaler import ClusterScaler
573
import time
574
import logging
575
576
def cluster_lifecycle_management():
577
"""Comprehensive cluster lifecycle management."""
578
579
class ClusterManager:
580
"""High-level cluster management interface."""
581
582
def __init__(self, provisioner: AbstractProvisioner):
583
self.provisioner = provisioner
584
self.logger = logging.getLogger(__name__)
585
self.metrics = {
586
'nodes_launched': 0,
587
'nodes_terminated': 0,
588
'cost_savings': 0.0,
589
'uptime': 0.0
590
}
591
592
def deploy_cluster(self, config: dict) -> str:
593
"""Deploy cluster with comprehensive configuration."""
594
595
start_time = time.time()
596
597
try:
598
# Launch leader node
599
self.provisioner.launchCluster(
600
leaderNodeType=config['leader_type'],
601
leaderStorage=config['leader_storage'],
602
owner=config['owner'],
603
keyName=config['key_name']
604
)
605
606
# Wait for leader to be ready
607
self.wait_for_leader_ready(timeout=600)
608
609
# Add initial worker nodes
610
if config.get('initial_workers', 0) > 0:
611
self.add_workers(
612
node_types=config['worker_types'],
613
count=config['initial_workers'],
614
preemptible=config.get('use_spot', True)
615
)
616
617
# Configure monitoring
618
self.setup_cluster_monitoring()
619
620
# Configure auto-scaling
621
if config.get('auto_scaling', False):
622
self.enable_auto_scaling(config['scaling_config'])
623
624
deployment_time = time.time() - start_time
625
self.logger.info(f"Cluster deployed in {deployment_time:.2f} seconds")
626
627
return self.provisioner.cluster_name
628
629
except Exception as e:
630
self.logger.error(f"Cluster deployment failed: {e}")
631
# Cleanup on failure
632
self.cleanup_failed_deployment()
633
raise
634
635
def add_workers(self, node_types: List[NodeInfo], count: int,
636
preemptible: bool = True) -> int:
637
"""Add worker nodes with monitoring."""
638
639
nodes_added = self.provisioner.addNodes(
640
nodeTypes=node_types,
641
numNodes=count,
642
preemptible=preemptible
643
)
644
645
self.metrics['nodes_launched'] += nodes_added
646
647
# Monitor node startup
648
self.monitor_node_startup(nodes_added)
649
650
return nodes_added
651
652
def remove_workers(self, node_count: int = None,
653
node_ids: List[str] = None) -> int:
654
"""Remove worker nodes gracefully."""
655
656
if node_ids:
657
nodes_to_remove = node_ids
658
else:
659
# Select nodes to remove (prefer spot instances)
660
all_nodes = self.get_cluster_nodes()
661
spot_nodes = [n for n in all_nodes if n.preemptible]
662
nodes_to_remove = spot_nodes[:node_count] if node_count else spot_nodes
663
664
# Gracefully drain jobs
665
for node_id in nodes_to_remove:
666
self.drain_node(node_id)
667
668
# Terminate nodes
669
self.provisioner.terminateNodes([n.id for n in nodes_to_remove])
670
671
self.metrics['nodes_terminated'] += len(nodes_to_remove)
672
673
return len(nodes_to_remove)
674
675
def monitor_cluster_health(self):
676
"""Monitor cluster health and performance."""
677
678
health_metrics = {
679
'total_nodes': 0,
680
'healthy_nodes': 0,
681
'unhealthy_nodes': 0,
682
'avg_cpu_usage': 0.0,
683
'avg_memory_usage': 0.0,
684
'job_queue_length': 0
685
}
686
687
cluster_nodes = self.get_cluster_nodes()
688
health_metrics['total_nodes'] = len(cluster_nodes)
689
690
for node in cluster_nodes:
691
node_health = self.check_node_health(node)
692
693
if node_health['healthy']:
694
health_metrics['healthy_nodes'] += 1
695
health_metrics['avg_cpu_usage'] += node_health['cpu_usage']
696
health_metrics['avg_memory_usage'] += node_health['memory_usage']
697
else:
698
health_metrics['unhealthy_nodes'] += 1
699
700
# Calculate averages
701
if health_metrics['healthy_nodes'] > 0:
702
health_metrics['avg_cpu_usage'] /= health_metrics['healthy_nodes']
703
health_metrics['avg_memory_usage'] /= health_metrics['healthy_nodes']
704
705
# Get job queue status
706
health_metrics['job_queue_length'] = self.get_job_queue_length()
707
708
# Log health status
709
self.logger.info(f"Cluster Health: {health_metrics}")
710
711
# Take corrective actions if needed
712
if health_metrics['unhealthy_nodes'] > 0:
713
self.handle_unhealthy_nodes(cluster_nodes)
714
715
return health_metrics
716
717
def optimize_costs(self):
718
"""Optimize cluster costs through intelligent resource management."""
719
720
# Get current pricing and usage
721
current_costs = self.calculate_current_costs()
722
723
# Analyze spot instance opportunities
724
spot_savings = self.analyze_spot_opportunities()
725
726
# Implement cost optimizations
727
if spot_savings['potential_savings'] > 0.2: # 20% savings threshold
728
self.implement_spot_optimization(spot_savings)
729
730
# Right-size instances based on actual usage
731
resize_recommendations = self.analyze_instance_utilization()
732
if resize_recommendations:
733
self.implement_instance_resizing(resize_recommendations)
734
735
# Clean up unused resources
736
self.cleanup_unused_resources()
737
738
new_costs = self.calculate_current_costs()
739
savings = current_costs - new_costs
740
self.metrics['cost_savings'] += savings
741
742
self.logger.info(f"Cost optimization saved ${savings:.2f}/hour")
743
744
def destroy_cluster(self):
745
"""Safely destroy cluster with cleanup."""
746
747
try:
748
# Stop all jobs gracefully
749
self.stop_all_jobs(timeout=300)
750
751
# Save final metrics
752
self.save_cluster_metrics()
753
754
# Destroy infrastructure
755
self.provisioner.destroyCluster()
756
757
self.logger.info("Cluster destroyed successfully")
758
759
except Exception as e:
760
self.logger.error(f"Error during cluster destruction: {e}")
761
# Force cleanup
762
self.force_cleanup()
763
764
def cluster_auto_scaling():
765
"""Advanced auto-scaling implementation."""
766
767
class AutoScaler:
768
"""Intelligent cluster auto-scaler."""
769
770
def __init__(self, provisioner: AbstractProvisioner, config: dict):
771
self.provisioner = provisioner
772
self.config = config
773
self.scaling_history = []
774
775
def run_scaling_loop(self):
776
"""Main auto-scaling control loop."""
777
778
while True:
779
try:
780
# Collect metrics
781
metrics = self.collect_scaling_metrics()
782
783
# Make scaling decision
784
scaling_decision = self.make_scaling_decision(metrics)
785
786
# Execute scaling action
787
if scaling_decision['action'] != 'none':
788
self.execute_scaling_action(scaling_decision)
789
790
# Wait before next evaluation
791
time.sleep(self.config.get('evaluation_interval', 60))
792
793
except Exception as e:
794
logging.error(f"Auto-scaling error: {e}")
795
time.sleep(30) # Short delay on error
796
797
def collect_scaling_metrics(self) -> dict:
798
"""Collect metrics for scaling decisions."""
799
800
return {
801
'cpu_utilization': self.get_cluster_cpu_utilization(),
802
'memory_utilization': self.get_cluster_memory_utilization(),
803
'job_queue_length': self.get_job_queue_length(),
804
'pending_jobs': self.get_pending_jobs_count(),
805
'node_count': len(self.get_cluster_nodes()),
806
'spot_instance_interruptions': self.get_recent_interruptions()
807
}
808
809
def make_scaling_decision(self, metrics: dict) -> dict:
810
"""Intelligent scaling decision based on multiple factors."""
811
812
decision = {'action': 'none', 'node_count': 0, 'node_types': []}
813
814
# Scale up conditions
815
if (metrics['cpu_utilization'] > 0.8 or
816
metrics['job_queue_length'] > metrics['node_count'] * 2):
817
818
decision['action'] = 'scale_up'
819
decision['node_count'] = self.calculate_scale_up_amount(metrics)
820
decision['node_types'] = self.select_optimal_node_types(metrics)
821
822
# Scale down conditions
823
elif (metrics['cpu_utilization'] < 0.3 and
824
metrics['job_queue_length'] == 0 and
825
metrics['node_count'] > 1):
826
827
decision['action'] = 'scale_down'
828
decision['node_count'] = self.calculate_scale_down_amount(metrics)
829
830
# Record decision
831
self.scaling_history.append({
832
'timestamp': time.time(),
833
'metrics': metrics,
834
'decision': decision
835
})
836
837
return decision
838
839
def calculate_scale_up_amount(self, metrics: dict) -> int:
840
"""Calculate optimal number of nodes to add."""
841
842
# Base calculation on queue length and utilization
843
queue_based = max(1, metrics['job_queue_length'] // 3)
844
utilization_based = max(1, int(metrics['node_count'] * 0.3))
845
846
# Consider recent interruptions
847
interruption_buffer = metrics['spot_instance_interruptions']
848
849
# Cap scaling to prevent over-provisioning
850
max_scale = self.config.get('max_scale_up', 10)
851
852
return min(max_scale, max(queue_based, utilization_based) + interruption_buffer)
853
```
854
855
This cloud provisioning system provides comprehensive, intelligent cluster management with cost optimization, auto-scaling, and multi-cloud support for scalable workflow execution.