CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

provisioning.mddocs/

Cloud Provisioning

Overview

Toil's cloud provisioning system enables automatic creation, scaling, and management of compute clusters across major cloud providers. The provisioning system integrates with batch systems to dynamically allocate resources based on workflow demands, supporting auto-scaling, cost optimization through spot instances, and sophisticated resource management policies. This allows workflows to seamlessly scale from single nodes to large distributed clusters without manual infrastructure management.

Capabilities

Abstract Provisioner Interface

{ .api }

The AbstractProvisioner provides the core interface for all cloud provisioning implementations.

from toil.provisioners.abstractProvisioner import AbstractProvisioner
from toil.batchSystems import NodeInfo
from typing import List, Dict, Optional

class CustomProvisioner(AbstractProvisioner):
    """Custom provisioner implementation for specialized cloud environments."""
    
    def __init__(self, clusterName: str, clusterType: str, zone: str, 
                 nodeStorage: int, nodeStorageType: str, sseKey: Optional[str] = None):
        """Initialize provisioner with cluster configuration."""
        super().__init__(clusterName, clusterType, zone, nodeStorage, nodeStorageType)
        self.cluster_name = clusterName
        self.cluster_type = clusterType
        self.availability_zone = zone
        self.sseKey = sseKey
        
        # Initialize cloud provider client
        self.cloud_client = self.initialize_cloud_client()
        
    def launchCluster(self, leaderNodeType: str, leaderStorage: int, 
                     owner: str, keyName: str, **kwargs) -> None:
        """Launch cluster with leader node."""
        
        # Create leader node
        leader_config = {
            'instance_type': leaderNodeType,
            'storage_size': leaderStorage,
            'storage_type': self.nodeStorageType,
            'key_name': keyName,
            'security_groups': self.create_security_groups(),
            'user_data': self.get_leader_user_data()
        }
        
        if self.sseKey:
            leader_config['encryption_key'] = self.sseKey
        
        leader_instance = self.cloud_client.create_instance(leader_config)
        
        # Wait for leader to be ready
        self.wait_for_instance_ready(leader_instance.id)
        
        # Configure leader node
        self.configure_leader_node(leader_instance)
        
        # Store cluster metadata
        self.store_cluster_metadata({
            'leader_id': leader_instance.id,
            'cluster_name': self.cluster_name,
            'owner': owner,
            'creation_time': time.time()
        })
        
    def addNodes(self, nodeTypes: List[NodeInfo], numNodes: int, 
                preemptible: bool = False, spotBid: Optional[float] = None) -> int:
        """Add worker nodes to cluster."""
        
        instances_created = 0
        
        for node_type in nodeTypes:
            # Calculate nodes needed for this type
            nodes_for_type = min(numNodes - instances_created, 
                               self.calculate_optimal_node_count(node_type))
            
            if nodes_for_type <= 0:
                continue
                
            # Configure worker node
            worker_config = {
                'instance_type': node_type.nodeType,
                'count': nodes_for_type,
                'storage_size': self.nodeStorage,
                'storage_type': self.nodeStorageType,
                'preemptible': preemptible,
                'user_data': self.get_worker_user_data()
            }
            
            if preemptible and spotBid:
                worker_config['spot_price'] = spotBid
                
            # Launch instances
            instances = self.cloud_client.create_instances(worker_config)
            
            # Track created instances
            for instance in instances:
                self.register_worker_node(instance, node_type)
                instances_created += 1
                
            if instances_created >= numNodes:
                break
                
        return instances_created
    
    def terminateNodes(self, nodes: List[str]) -> None:
        """Terminate specified worker nodes."""
        
        for node_id in nodes:
            try:
                # Gracefully drain jobs from node
                self.drain_node_jobs(node_id)
                
                # Terminate instance
                self.cloud_client.terminate_instance(node_id)
                
                # Clean up metadata
                self.unregister_worker_node(node_id)
                
            except Exception as e:
                self.logger.error(f"Failed to terminate node {node_id}: {e}")
    
    def getNodeShape(self, nodeType: str) -> NodeInfo:
        """Get node configuration for specified type."""
        
        # Query cloud provider for instance specifications
        instance_info = self.cloud_client.get_instance_type_info(nodeType)
        
        return NodeInfo(
            cores=instance_info['cpu_count'],
            memory=instance_info['memory_mb'] * 1024 * 1024,  # Convert to bytes
            disk=instance_info['storage_gb'] * 1024 * 1024 * 1024,  # Convert to bytes
            preemptible=instance_info.get('preemptible', False),
            nodeType=nodeType
        )
    
    def destroyCluster(self) -> None:
        """Destroy entire cluster and clean up resources."""
        
        # Get all cluster nodes
        cluster_nodes = self.get_cluster_nodes()
        
        # Terminate all nodes
        for node in cluster_nodes:
            try:
                self.cloud_client.terminate_instance(node['instance_id'])
            except Exception as e:
                self.logger.warning(f"Failed to terminate {node['instance_id']}: {e}")
        
        # Clean up security groups
        self.cleanup_security_groups()
        
        # Remove cluster metadata
        self.cleanup_cluster_metadata()
        
        self.logger.info(f"Cluster {self.cluster_name} destroyed")

AWS Provisioner

{ .api }

AWS EC2-based provisioning with comprehensive support for AWS services and features.

from toil.provisioners.aws.awsProvisioner import AWSProvisioner
from toil.lib.aws import establish_boto3_session
from toil.common import Config

def setup_aws_provisioning():
    """Configure AWS provisioning for scalable workflows."""
    
    # Basic AWS provisioner configuration
    config = Config()
    config.provisioner = "aws"
    config.nodeTypes = ["m5.large", "m5.xlarge:0.50", "c5.2xlarge"]  # type:bid_price
    config.maxNodes = 100
    config.minNodes = 0
    
    # AWS-specific configuration
    config.awsRegion = "us-west-2"
    config.zone = "us-west-2a"  # Availability zone
    config.keyPairName = "my-toil-keypair"
    config.vpcSubnet = "subnet-12345678"  # Optional: specific subnet
    
    # Security and access
    config.awsEc2ProfileArn = "arn:aws:iam::123456789:instance-profile/ToilRole"
    config.sseKey = "alias/toil-encryption-key"  # KMS encryption
    
    # Storage configuration
    config.nodeStorage = 100  # GB per node
    config.nodeStorageType = "gp3"  # EBS volume type
    
    # Spot instance configuration
    config.defaultPreemptible = True  # Use spot instances by default
    config.preemptibleCompensation = 1.0  # No compensation for spot
    config.spotBid = 0.10  # Maximum spot price per hour
    
    return config

def advanced_aws_features():
    """Demonstrate advanced AWS provisioning features."""
    
    # Custom AWS provisioner with advanced features
    provisioner = AWSProvisioner(
        clusterName="advanced-cluster",
        clusterType="mesos",  # or "kubernetes"
        zone="us-west-2a",
        nodeStorage=200,
        nodeStorageType="io2",  # High-performance storage
        sseKey="alias/my-kms-key"
    )
    
    # Launch cluster with custom configuration
    provisioner.launchCluster(
        leaderNodeType="m5.2xlarge",
        leaderStorage=100,
        owner="research-team",
        keyName="research-keypair",
        # Advanced options
        securityGroupNames=["toil-cluster", "research-access"],
        userData="""#!/bin/bash
            # Custom initialization script
            yum update -y
            yum install -y docker
            systemctl start docker
            systemctl enable docker
            
            # Install additional tools
            pip3 install boto3 awscli
            
            # Configure monitoring
            yum install -y amazon-cloudwatch-agent
        """,
        leaderIamInstanceProfile="arn:aws:iam::123456789:instance-profile/ToilLeader"
    )
    
    # Add heterogeneous node types
    node_types = [
        NodeInfo(cores=4, memory=16*1024**3, disk=100*1024**3, 
                preemptible=True, nodeType="m5.large"),
        NodeInfo(cores=16, memory=64*1024**3, disk=500*1024**3, 
                preemptible=False, nodeType="m5.4xlarge"),
        NodeInfo(cores=32, memory=128*1024**3, disk=1000*1024**3, 
                preemptible=True, nodeType="c5.8xlarge")
    ]
    
    # Add nodes with mixed instance types
    provisioner.addNodes(
        nodeTypes=node_types,
        numNodes=50,
        preemptible=True,
        spotBid=0.25,
        # Additional AWS options
        availabilityZone="us-west-2a",
        subnetID="subnet-12345678",
        placementGroup="cluster-group"  # For high-performance networking
    )
    
    return provisioner

def aws_auto_scaling_policies():
    """Configure advanced auto-scaling policies for AWS."""
    
    class AdvancedAWSProvisioner(AWSProvisioner):
        """AWS provisioner with custom scaling logic."""
        
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.scaling_policies = {
                'scale_up_threshold': 0.8,    # CPU utilization to scale up
                'scale_down_threshold': 0.3,  # CPU utilization to scale down
                'scale_up_cooldown': 300,     # 5 minutes cooldown
                'scale_down_cooldown': 600,   # 10 minutes cooldown
                'max_scale_up': 10,           # Max nodes to add at once
                'max_scale_down': 5           # Max nodes to remove at once
            }
        
        def auto_scale_cluster(self):
            """Implement custom auto-scaling logic."""
            
            # Get current cluster metrics
            cluster_metrics = self.get_cluster_metrics()
            
            current_nodes = len(self.get_worker_nodes())
            cpu_utilization = cluster_metrics['avg_cpu_utilization']
            queue_length = cluster_metrics['job_queue_length']
            
            # Scale up decision
            if (cpu_utilization > self.scaling_policies['scale_up_threshold'] or 
                queue_length > current_nodes * 2):
                
                nodes_to_add = min(
                    self.scaling_policies['max_scale_up'],
                    max(1, queue_length // 5)  # 1 node per 5 queued jobs
                )
                
                self.scale_up_cluster(nodes_to_add)
            
            # Scale down decision
            elif (cpu_utilization < self.scaling_policies['scale_down_threshold'] and
                  queue_length == 0 and current_nodes > 1):
                
                nodes_to_remove = min(
                    self.scaling_policies['max_scale_down'],
                    max(1, current_nodes // 4)  # Remove up to 25% of nodes
                )
                
                self.scale_down_cluster(nodes_to_remove)
        
        def scale_up_cluster(self, node_count: int):
            """Scale up cluster with cost optimization."""
            
            # Prioritize spot instances for cost savings
            spot_node_types = [nt for nt in self.get_node_types() if nt.preemptible]
            on_demand_node_types = [nt for nt in self.get_node_types() if not nt.preemptible]
            
            # Try spot instances first
            added_nodes = 0
            if spot_node_types and added_nodes < node_count:
                spot_nodes = min(node_count - added_nodes, int(node_count * 0.8))
                added_nodes += self.addNodes(
                    nodeTypes=spot_node_types[:1],  # Use most cost-effective
                    numNodes=spot_nodes,
                    preemptible=True,
                    spotBid=self.calculate_optimal_spot_bid()
                )
            
            # Add on-demand instances if needed
            if added_nodes < node_count and on_demand_node_types:
                remaining_nodes = node_count - added_nodes
                self.addNodes(
                    nodeTypes=on_demand_node_types[:1],
                    numNodes=remaining_nodes,
                    preemptible=False
                )
        
        def calculate_optimal_spot_bid(self) -> float:
            """Calculate optimal spot instance bid price."""
            
            # Get current spot price history
            spot_history = self.get_spot_price_history()
            
            # Calculate bid as 110% of average recent price
            recent_prices = [price for price in spot_history[-24:]]  # Last 24 hours
            avg_price = sum(recent_prices) / len(recent_prices)
            optimal_bid = avg_price * 1.1
            
            return min(optimal_bid, self.maxSpotBid)  # Cap at maximum allowed

Google Cloud Provisioner

{ .api }

Google Compute Engine provisioning with support for GCP-specific features.

from toil.provisioners.gceProvisioner import GCEProvisioner

def setup_gce_provisioning():
    """Configure Google Cloud provisioning."""
    
    config = Config()
    config.provisioner = "gce" 
    config.nodeTypes = ["n1-standard-2", "n1-standard-4", "n1-highmem-8"]
    config.maxNodes = 200
    
    # GCP-specific configuration
    config.gcpRegion = "us-central1"
    config.zone = "us-central1-a"
    config.gcpProjectID = "my-research-project"
    config.gcpServiceAccountEmail = "toil-service@my-project.iam.gserviceaccount.com"
    
    # Authentication
    config.googleCredentials = "/path/to/service-account.json"
    
    # Networking
    config.gcpNetwork = "default"
    config.gcpSubnet = "default" 
    
    # Storage and encryption
    config.nodeStorage = 100
    config.nodeStorageType = "pd-ssd"  # SSD persistent disk
    config.gcpDiskEncryption = True
    
    # Preemptible instances (Google's spot instances)
    config.defaultPreemptible = True
    config.preemptibleCompensation = 1.0
    
    return config

def advanced_gce_features():
    """Advanced Google Cloud Engine features."""
    
    provisioner = GCEProvisioner(
        clusterName="gce-research-cluster",
        clusterType="kubernetes", 
        zone="us-central1-b",
        nodeStorage=200,
        nodeStorageType="pd-ssd"
    )
    
    # Launch with custom machine types
    provisioner.launchCluster(
        leaderNodeType="custom-4-8192",  # 4 vCPUs, 8GB RAM
        leaderStorage=100,
        owner="research-team",
        keyName=None,  # GCE uses SSH keys differently
        # GCE-specific options
        machineImage="cos-cloud/cos-stable",  # Container-optimized OS
        networkTags=["toil-cluster", "research"],
        serviceAccountScopes=[
            "https://www.googleapis.com/auth/cloud-platform",
            "https://www.googleapis.com/auth/storage-full"
        ],
        startupScript="""#!/bin/bash
            # Install Docker and Kubernetes tools
            curl -fsSL https://get.docker.com -o get-docker.sh
            sh get-docker.sh
            
            # Configure container runtime
            systemctl start docker
            systemctl enable docker
            
            # Install kubectl
            curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
            chmod +x kubectl
            mv kubectl /usr/local/bin/
        """
    )
    
    # Add GPU-enabled nodes
    gpu_node_types = [
        NodeInfo(
            cores=8, 
            memory=32*1024**3, 
            disk=200*1024**3,
            preemptible=True,
            nodeType="n1-standard-8",
            # GCE-specific: GPU configuration
            accelerators=[{
                "acceleratorCount": 1,
                "acceleratorType": "nvidia-tesla-k80"
            }]
        )
    ]
    
    provisioner.addNodes(
        nodeTypes=gpu_node_types,
        numNodes=5,
        preemptible=True,
        # GPU-specific configuration
        gpuType="nvidia-tesla-k80",
        gpuCount=1,
        installGpuDrivers=True
    )
    
    return provisioner

Azure Provisioner

{ .api }

Microsoft Azure provisioning integration with Azure-specific capabilities.

from toil.provisioners.azure import AzureProvisioner

def setup_azure_provisioning():
    """Configure Azure provisioning."""
    
    config = Config()
    config.provisioner = "azure"
    config.nodeTypes = ["Standard_D2s_v3", "Standard_D4s_v3", "Standard_F8s_v2"]
    config.maxNodes = 150
    
    # Azure-specific configuration
    config.azureRegion = "West US 2"
    config.azureResourceGroup = "toil-research-rg"
    config.azureStorageAccount = "toilstorageaccount"
    
    # Authentication
    config.azureSubscriptionId = "12345678-1234-1234-1234-123456789abc"
    config.azureTenantId = "87654321-4321-4321-4321-cba987654321"
    config.azureClientId = "abcdef01-2345-6789-abcd-ef0123456789"
    config.azureClientSecret = "your-client-secret"
    
    # Networking
    config.azureVirtualNetwork = "toil-vnet"
    config.azureSubnet = "toil-subnet"
    
    # Storage
    config.nodeStorage = 128
    config.nodeStorageType = "Premium_LRS"  # Premium SSD
    
    # Spot instances
    config.defaultPreemptible = True
    config.azureSpotMaxPrice = 0.15  # Maximum hourly price
    
    return config

def advanced_azure_features():
    """Advanced Azure provisioning features."""
    
    provisioner = AzureProvisioner(
        clusterName="azure-hpc-cluster",
        clusterType="batch",
        zone="West US 2",
        nodeStorage=256,
        nodeStorageType="Premium_LRS"
    )
    
    # Launch with Azure-specific configuration
    provisioner.launchCluster(
        leaderNodeType="Standard_D8s_v3",
        leaderStorage=128,
        owner="hpc-team",
        keyName="azure-keypair",
        # Azure-specific options
        vmImage="Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:latest",
        availabilitySet="toil-availability-set",
        networkSecurityGroup="toil-nsg",
        customData="""#!/bin/bash
            # Azure-specific initialization
            apt-get update
            apt-get install -y docker.io
            
            # Configure Azure CLI
            curl -sL https://aka.ms/InstallAzureCLIDeb | bash
            
            # Install Azure batch tools
            pip3 install azure-batch azure-storage-blob
        """
    )
    
    # Add high-performance computing nodes
    hpc_node_types = [
        NodeInfo(
            cores=16,
            memory=128*1024**3,
            disk=500*1024**3,
            preemptible=False,
            nodeType="Standard_H16r"  # High-performance computing instance
        ),
        NodeInfo(
            cores=44,
            memory=352*1024**3,
            disk=1000*1024**3,
            preemptible=True,
            nodeType="Standard_H44rs"  # High-memory HPC instance
        )
    ]
    
    provisioner.addNodes(
        nodeTypes=hpc_node_types,
        numNodes=20,
        preemptible=True,
        # Azure-specific HPC configuration
        enableAcceleratedNetworking=True,
        enableInfiniBand=True,  # For HPC workloads
        proximityPlacementGroup="hpc-placement-group"
    )
    
    return provisioner

Cluster Management and Monitoring

{ .api }

Comprehensive cluster management utilities and monitoring capabilities.

from toil.provisioners.clusterScaler import ClusterScaler
import time
import logging

def cluster_lifecycle_management():
    """Comprehensive cluster lifecycle management."""
    
    class ClusterManager:
        """High-level cluster management interface."""
        
        def __init__(self, provisioner: AbstractProvisioner):
            self.provisioner = provisioner
            self.logger = logging.getLogger(__name__)
            self.metrics = {
                'nodes_launched': 0,
                'nodes_terminated': 0,
                'cost_savings': 0.0,
                'uptime': 0.0
            }
        
        def deploy_cluster(self, config: dict) -> str:
            """Deploy cluster with comprehensive configuration."""
            
            start_time = time.time()
            
            try:
                # Launch leader node
                self.provisioner.launchCluster(
                    leaderNodeType=config['leader_type'],
                    leaderStorage=config['leader_storage'],
                    owner=config['owner'],
                    keyName=config['key_name']
                )
                
                # Wait for leader to be ready
                self.wait_for_leader_ready(timeout=600)
                
                # Add initial worker nodes
                if config.get('initial_workers', 0) > 0:
                    self.add_workers(
                        node_types=config['worker_types'],
                        count=config['initial_workers'],
                        preemptible=config.get('use_spot', True)
                    )
                
                # Configure monitoring
                self.setup_cluster_monitoring()
                
                # Configure auto-scaling
                if config.get('auto_scaling', False):
                    self.enable_auto_scaling(config['scaling_config'])
                
                deployment_time = time.time() - start_time
                self.logger.info(f"Cluster deployed in {deployment_time:.2f} seconds")
                
                return self.provisioner.cluster_name
                
            except Exception as e:
                self.logger.error(f"Cluster deployment failed: {e}")
                # Cleanup on failure
                self.cleanup_failed_deployment()
                raise
        
        def add_workers(self, node_types: List[NodeInfo], count: int, 
                       preemptible: bool = True) -> int:
            """Add worker nodes with monitoring."""
            
            nodes_added = self.provisioner.addNodes(
                nodeTypes=node_types,
                numNodes=count,
                preemptible=preemptible
            )
            
            self.metrics['nodes_launched'] += nodes_added
            
            # Monitor node startup
            self.monitor_node_startup(nodes_added)
            
            return nodes_added
        
        def remove_workers(self, node_count: int = None, 
                          node_ids: List[str] = None) -> int:
            """Remove worker nodes gracefully."""
            
            if node_ids:
                nodes_to_remove = node_ids
            else:
                # Select nodes to remove (prefer spot instances)
                all_nodes = self.get_cluster_nodes()
                spot_nodes = [n for n in all_nodes if n.preemptible]
                nodes_to_remove = spot_nodes[:node_count] if node_count else spot_nodes
            
            # Gracefully drain jobs
            for node_id in nodes_to_remove:
                self.drain_node(node_id)
            
            # Terminate nodes
            self.provisioner.terminateNodes([n.id for n in nodes_to_remove])
            
            self.metrics['nodes_terminated'] += len(nodes_to_remove)
            
            return len(nodes_to_remove)
        
        def monitor_cluster_health(self):
            """Monitor cluster health and performance."""
            
            health_metrics = {
                'total_nodes': 0,
                'healthy_nodes': 0,
                'unhealthy_nodes': 0,
                'avg_cpu_usage': 0.0,
                'avg_memory_usage': 0.0,
                'job_queue_length': 0
            }
            
            cluster_nodes = self.get_cluster_nodes()
            health_metrics['total_nodes'] = len(cluster_nodes)
            
            for node in cluster_nodes:
                node_health = self.check_node_health(node)
                
                if node_health['healthy']:
                    health_metrics['healthy_nodes'] += 1
                    health_metrics['avg_cpu_usage'] += node_health['cpu_usage']
                    health_metrics['avg_memory_usage'] += node_health['memory_usage']
                else:
                    health_metrics['unhealthy_nodes'] += 1
            
            # Calculate averages
            if health_metrics['healthy_nodes'] > 0:
                health_metrics['avg_cpu_usage'] /= health_metrics['healthy_nodes']
                health_metrics['avg_memory_usage'] /= health_metrics['healthy_nodes']
            
            # Get job queue status
            health_metrics['job_queue_length'] = self.get_job_queue_length()
            
            # Log health status
            self.logger.info(f"Cluster Health: {health_metrics}")
            
            # Take corrective actions if needed
            if health_metrics['unhealthy_nodes'] > 0:
                self.handle_unhealthy_nodes(cluster_nodes)
            
            return health_metrics
        
        def optimize_costs(self):
            """Optimize cluster costs through intelligent resource management."""
            
            # Get current pricing and usage
            current_costs = self.calculate_current_costs()
            
            # Analyze spot instance opportunities
            spot_savings = self.analyze_spot_opportunities()
            
            # Implement cost optimizations
            if spot_savings['potential_savings'] > 0.2:  # 20% savings threshold
                self.implement_spot_optimization(spot_savings)
            
            # Right-size instances based on actual usage
            resize_recommendations = self.analyze_instance_utilization()
            if resize_recommendations:
                self.implement_instance_resizing(resize_recommendations)
            
            # Clean up unused resources
            self.cleanup_unused_resources()
            
            new_costs = self.calculate_current_costs()
            savings = current_costs - new_costs
            self.metrics['cost_savings'] += savings
            
            self.logger.info(f"Cost optimization saved ${savings:.2f}/hour")
        
        def destroy_cluster(self):
            """Safely destroy cluster with cleanup."""
            
            try:
                # Stop all jobs gracefully
                self.stop_all_jobs(timeout=300)
                
                # Save final metrics
                self.save_cluster_metrics()
                
                # Destroy infrastructure
                self.provisioner.destroyCluster()
                
                self.logger.info("Cluster destroyed successfully")
                
            except Exception as e:
                self.logger.error(f"Error during cluster destruction: {e}")
                # Force cleanup
                self.force_cleanup()

def cluster_auto_scaling():
    """Advanced auto-scaling implementation."""
    
    class AutoScaler:
        """Intelligent cluster auto-scaler."""
        
        def __init__(self, provisioner: AbstractProvisioner, config: dict):
            self.provisioner = provisioner
            self.config = config
            self.scaling_history = []
        
        def run_scaling_loop(self):
            """Main auto-scaling control loop."""
            
            while True:
                try:
                    # Collect metrics
                    metrics = self.collect_scaling_metrics()
                    
                    # Make scaling decision
                    scaling_decision = self.make_scaling_decision(metrics)
                    
                    # Execute scaling action
                    if scaling_decision['action'] != 'none':
                        self.execute_scaling_action(scaling_decision)
                    
                    # Wait before next evaluation
                    time.sleep(self.config.get('evaluation_interval', 60))
                    
                except Exception as e:
                    logging.error(f"Auto-scaling error: {e}")
                    time.sleep(30)  # Short delay on error
        
        def collect_scaling_metrics(self) -> dict:
            """Collect metrics for scaling decisions."""
            
            return {
                'cpu_utilization': self.get_cluster_cpu_utilization(),
                'memory_utilization': self.get_cluster_memory_utilization(),
                'job_queue_length': self.get_job_queue_length(),
                'pending_jobs': self.get_pending_jobs_count(),
                'node_count': len(self.get_cluster_nodes()),
                'spot_instance_interruptions': self.get_recent_interruptions()
            }
        
        def make_scaling_decision(self, metrics: dict) -> dict:
            """Intelligent scaling decision based on multiple factors."""
            
            decision = {'action': 'none', 'node_count': 0, 'node_types': []}
            
            # Scale up conditions
            if (metrics['cpu_utilization'] > 0.8 or 
                metrics['job_queue_length'] > metrics['node_count'] * 2):
                
                decision['action'] = 'scale_up'
                decision['node_count'] = self.calculate_scale_up_amount(metrics)
                decision['node_types'] = self.select_optimal_node_types(metrics)
            
            # Scale down conditions  
            elif (metrics['cpu_utilization'] < 0.3 and
                  metrics['job_queue_length'] == 0 and
                  metrics['node_count'] > 1):
                
                decision['action'] = 'scale_down'
                decision['node_count'] = self.calculate_scale_down_amount(metrics)
            
            # Record decision
            self.scaling_history.append({
                'timestamp': time.time(),
                'metrics': metrics,
                'decision': decision
            })
            
            return decision
        
        def calculate_scale_up_amount(self, metrics: dict) -> int:
            """Calculate optimal number of nodes to add."""
            
            # Base calculation on queue length and utilization
            queue_based = max(1, metrics['job_queue_length'] // 3)
            utilization_based = max(1, int(metrics['node_count'] * 0.3))
            
            # Consider recent interruptions
            interruption_buffer = metrics['spot_instance_interruptions'] 
            
            # Cap scaling to prevent over-provisioning
            max_scale = self.config.get('max_scale_up', 10)
            
            return min(max_scale, max(queue_based, utilization_based) + interruption_buffer)

This cloud provisioning system provides comprehensive, intelligent cluster management with cost optimization, auto-scaling, and multi-cloud support for scalable workflow execution.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json