Pipeline management software for clusters.
Overall
score
67%
Toil's cloud provisioning system enables automatic creation, scaling, and management of compute clusters across major cloud providers. The provisioning system integrates with batch systems to dynamically allocate resources based on workflow demands, supporting auto-scaling, cost optimization through spot instances, and sophisticated resource management policies. This allows workflows to seamlessly scale from single nodes to large distributed clusters without manual infrastructure management.
{ .api }
The AbstractProvisioner provides the core interface for all cloud provisioning implementations.
from toil.provisioners.abstractProvisioner import AbstractProvisioner
from toil.batchSystems import NodeInfo
from typing import List, Dict, Optional
class CustomProvisioner(AbstractProvisioner):
"""Custom provisioner implementation for specialized cloud environments."""
def __init__(self, clusterName: str, clusterType: str, zone: str,
nodeStorage: int, nodeStorageType: str, sseKey: Optional[str] = None):
"""Initialize provisioner with cluster configuration."""
super().__init__(clusterName, clusterType, zone, nodeStorage, nodeStorageType)
self.cluster_name = clusterName
self.cluster_type = clusterType
self.availability_zone = zone
self.sseKey = sseKey
# Initialize cloud provider client
self.cloud_client = self.initialize_cloud_client()
def launchCluster(self, leaderNodeType: str, leaderStorage: int,
owner: str, keyName: str, **kwargs) -> None:
"""Launch cluster with leader node."""
# Create leader node
leader_config = {
'instance_type': leaderNodeType,
'storage_size': leaderStorage,
'storage_type': self.nodeStorageType,
'key_name': keyName,
'security_groups': self.create_security_groups(),
'user_data': self.get_leader_user_data()
}
if self.sseKey:
leader_config['encryption_key'] = self.sseKey
leader_instance = self.cloud_client.create_instance(leader_config)
# Wait for leader to be ready
self.wait_for_instance_ready(leader_instance.id)
# Configure leader node
self.configure_leader_node(leader_instance)
# Store cluster metadata
self.store_cluster_metadata({
'leader_id': leader_instance.id,
'cluster_name': self.cluster_name,
'owner': owner,
'creation_time': time.time()
})
def addNodes(self, nodeTypes: List[NodeInfo], numNodes: int,
preemptible: bool = False, spotBid: Optional[float] = None) -> int:
"""Add worker nodes to cluster."""
instances_created = 0
for node_type in nodeTypes:
# Calculate nodes needed for this type
nodes_for_type = min(numNodes - instances_created,
self.calculate_optimal_node_count(node_type))
if nodes_for_type <= 0:
continue
# Configure worker node
worker_config = {
'instance_type': node_type.nodeType,
'count': nodes_for_type,
'storage_size': self.nodeStorage,
'storage_type': self.nodeStorageType,
'preemptible': preemptible,
'user_data': self.get_worker_user_data()
}
if preemptible and spotBid:
worker_config['spot_price'] = spotBid
# Launch instances
instances = self.cloud_client.create_instances(worker_config)
# Track created instances
for instance in instances:
self.register_worker_node(instance, node_type)
instances_created += 1
if instances_created >= numNodes:
break
return instances_created
def terminateNodes(self, nodes: List[str]) -> None:
"""Terminate specified worker nodes."""
for node_id in nodes:
try:
# Gracefully drain jobs from node
self.drain_node_jobs(node_id)
# Terminate instance
self.cloud_client.terminate_instance(node_id)
# Clean up metadata
self.unregister_worker_node(node_id)
except Exception as e:
self.logger.error(f"Failed to terminate node {node_id}: {e}")
def getNodeShape(self, nodeType: str) -> NodeInfo:
"""Get node configuration for specified type."""
# Query cloud provider for instance specifications
instance_info = self.cloud_client.get_instance_type_info(nodeType)
return NodeInfo(
cores=instance_info['cpu_count'],
memory=instance_info['memory_mb'] * 1024 * 1024, # Convert to bytes
disk=instance_info['storage_gb'] * 1024 * 1024 * 1024, # Convert to bytes
preemptible=instance_info.get('preemptible', False),
nodeType=nodeType
)
def destroyCluster(self) -> None:
"""Destroy entire cluster and clean up resources."""
# Get all cluster nodes
cluster_nodes = self.get_cluster_nodes()
# Terminate all nodes
for node in cluster_nodes:
try:
self.cloud_client.terminate_instance(node['instance_id'])
except Exception as e:
self.logger.warning(f"Failed to terminate {node['instance_id']}: {e}")
# Clean up security groups
self.cleanup_security_groups()
# Remove cluster metadata
self.cleanup_cluster_metadata()
self.logger.info(f"Cluster {self.cluster_name} destroyed"){ .api }
AWS EC2-based provisioning with comprehensive support for AWS services and features.
from toil.provisioners.aws.awsProvisioner import AWSProvisioner
from toil.lib.aws import establish_boto3_session
from toil.common import Config
def setup_aws_provisioning():
"""Configure AWS provisioning for scalable workflows."""
# Basic AWS provisioner configuration
config = Config()
config.provisioner = "aws"
config.nodeTypes = ["m5.large", "m5.xlarge:0.50", "c5.2xlarge"] # type:bid_price
config.maxNodes = 100
config.minNodes = 0
# AWS-specific configuration
config.awsRegion = "us-west-2"
config.zone = "us-west-2a" # Availability zone
config.keyPairName = "my-toil-keypair"
config.vpcSubnet = "subnet-12345678" # Optional: specific subnet
# Security and access
config.awsEc2ProfileArn = "arn:aws:iam::123456789:instance-profile/ToilRole"
config.sseKey = "alias/toil-encryption-key" # KMS encryption
# Storage configuration
config.nodeStorage = 100 # GB per node
config.nodeStorageType = "gp3" # EBS volume type
# Spot instance configuration
config.defaultPreemptible = True # Use spot instances by default
config.preemptibleCompensation = 1.0 # No compensation for spot
config.spotBid = 0.10 # Maximum spot price per hour
return config
def advanced_aws_features():
"""Demonstrate advanced AWS provisioning features."""
# Custom AWS provisioner with advanced features
provisioner = AWSProvisioner(
clusterName="advanced-cluster",
clusterType="mesos", # or "kubernetes"
zone="us-west-2a",
nodeStorage=200,
nodeStorageType="io2", # High-performance storage
sseKey="alias/my-kms-key"
)
# Launch cluster with custom configuration
provisioner.launchCluster(
leaderNodeType="m5.2xlarge",
leaderStorage=100,
owner="research-team",
keyName="research-keypair",
# Advanced options
securityGroupNames=["toil-cluster", "research-access"],
userData="""#!/bin/bash
# Custom initialization script
yum update -y
yum install -y docker
systemctl start docker
systemctl enable docker
# Install additional tools
pip3 install boto3 awscli
# Configure monitoring
yum install -y amazon-cloudwatch-agent
""",
leaderIamInstanceProfile="arn:aws:iam::123456789:instance-profile/ToilLeader"
)
# Add heterogeneous node types
node_types = [
NodeInfo(cores=4, memory=16*1024**3, disk=100*1024**3,
preemptible=True, nodeType="m5.large"),
NodeInfo(cores=16, memory=64*1024**3, disk=500*1024**3,
preemptible=False, nodeType="m5.4xlarge"),
NodeInfo(cores=32, memory=128*1024**3, disk=1000*1024**3,
preemptible=True, nodeType="c5.8xlarge")
]
# Add nodes with mixed instance types
provisioner.addNodes(
nodeTypes=node_types,
numNodes=50,
preemptible=True,
spotBid=0.25,
# Additional AWS options
availabilityZone="us-west-2a",
subnetID="subnet-12345678",
placementGroup="cluster-group" # For high-performance networking
)
return provisioner
def aws_auto_scaling_policies():
"""Configure advanced auto-scaling policies for AWS."""
class AdvancedAWSProvisioner(AWSProvisioner):
"""AWS provisioner with custom scaling logic."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.scaling_policies = {
'scale_up_threshold': 0.8, # CPU utilization to scale up
'scale_down_threshold': 0.3, # CPU utilization to scale down
'scale_up_cooldown': 300, # 5 minutes cooldown
'scale_down_cooldown': 600, # 10 minutes cooldown
'max_scale_up': 10, # Max nodes to add at once
'max_scale_down': 5 # Max nodes to remove at once
}
def auto_scale_cluster(self):
"""Implement custom auto-scaling logic."""
# Get current cluster metrics
cluster_metrics = self.get_cluster_metrics()
current_nodes = len(self.get_worker_nodes())
cpu_utilization = cluster_metrics['avg_cpu_utilization']
queue_length = cluster_metrics['job_queue_length']
# Scale up decision
if (cpu_utilization > self.scaling_policies['scale_up_threshold'] or
queue_length > current_nodes * 2):
nodes_to_add = min(
self.scaling_policies['max_scale_up'],
max(1, queue_length // 5) # 1 node per 5 queued jobs
)
self.scale_up_cluster(nodes_to_add)
# Scale down decision
elif (cpu_utilization < self.scaling_policies['scale_down_threshold'] and
queue_length == 0 and current_nodes > 1):
nodes_to_remove = min(
self.scaling_policies['max_scale_down'],
max(1, current_nodes // 4) # Remove up to 25% of nodes
)
self.scale_down_cluster(nodes_to_remove)
def scale_up_cluster(self, node_count: int):
"""Scale up cluster with cost optimization."""
# Prioritize spot instances for cost savings
spot_node_types = [nt for nt in self.get_node_types() if nt.preemptible]
on_demand_node_types = [nt for nt in self.get_node_types() if not nt.preemptible]
# Try spot instances first
added_nodes = 0
if spot_node_types and added_nodes < node_count:
spot_nodes = min(node_count - added_nodes, int(node_count * 0.8))
added_nodes += self.addNodes(
nodeTypes=spot_node_types[:1], # Use most cost-effective
numNodes=spot_nodes,
preemptible=True,
spotBid=self.calculate_optimal_spot_bid()
)
# Add on-demand instances if needed
if added_nodes < node_count and on_demand_node_types:
remaining_nodes = node_count - added_nodes
self.addNodes(
nodeTypes=on_demand_node_types[:1],
numNodes=remaining_nodes,
preemptible=False
)
def calculate_optimal_spot_bid(self) -> float:
"""Calculate optimal spot instance bid price."""
# Get current spot price history
spot_history = self.get_spot_price_history()
# Calculate bid as 110% of average recent price
recent_prices = [price for price in spot_history[-24:]] # Last 24 hours
avg_price = sum(recent_prices) / len(recent_prices)
optimal_bid = avg_price * 1.1
return min(optimal_bid, self.maxSpotBid) # Cap at maximum allowed{ .api }
Google Compute Engine provisioning with support for GCP-specific features.
from toil.provisioners.gceProvisioner import GCEProvisioner
def setup_gce_provisioning():
"""Configure Google Cloud provisioning."""
config = Config()
config.provisioner = "gce"
config.nodeTypes = ["n1-standard-2", "n1-standard-4", "n1-highmem-8"]
config.maxNodes = 200
# GCP-specific configuration
config.gcpRegion = "us-central1"
config.zone = "us-central1-a"
config.gcpProjectID = "my-research-project"
config.gcpServiceAccountEmail = "toil-service@my-project.iam.gserviceaccount.com"
# Authentication
config.googleCredentials = "/path/to/service-account.json"
# Networking
config.gcpNetwork = "default"
config.gcpSubnet = "default"
# Storage and encryption
config.nodeStorage = 100
config.nodeStorageType = "pd-ssd" # SSD persistent disk
config.gcpDiskEncryption = True
# Preemptible instances (Google's spot instances)
config.defaultPreemptible = True
config.preemptibleCompensation = 1.0
return config
def advanced_gce_features():
"""Advanced Google Cloud Engine features."""
provisioner = GCEProvisioner(
clusterName="gce-research-cluster",
clusterType="kubernetes",
zone="us-central1-b",
nodeStorage=200,
nodeStorageType="pd-ssd"
)
# Launch with custom machine types
provisioner.launchCluster(
leaderNodeType="custom-4-8192", # 4 vCPUs, 8GB RAM
leaderStorage=100,
owner="research-team",
keyName=None, # GCE uses SSH keys differently
# GCE-specific options
machineImage="cos-cloud/cos-stable", # Container-optimized OS
networkTags=["toil-cluster", "research"],
serviceAccountScopes=[
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/storage-full"
],
startupScript="""#!/bin/bash
# Install Docker and Kubernetes tools
curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
# Configure container runtime
systemctl start docker
systemctl enable docker
# Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
mv kubectl /usr/local/bin/
"""
)
# Add GPU-enabled nodes
gpu_node_types = [
NodeInfo(
cores=8,
memory=32*1024**3,
disk=200*1024**3,
preemptible=True,
nodeType="n1-standard-8",
# GCE-specific: GPU configuration
accelerators=[{
"acceleratorCount": 1,
"acceleratorType": "nvidia-tesla-k80"
}]
)
]
provisioner.addNodes(
nodeTypes=gpu_node_types,
numNodes=5,
preemptible=True,
# GPU-specific configuration
gpuType="nvidia-tesla-k80",
gpuCount=1,
installGpuDrivers=True
)
return provisioner{ .api }
Microsoft Azure provisioning integration with Azure-specific capabilities.
from toil.provisioners.azure import AzureProvisioner
def setup_azure_provisioning():
"""Configure Azure provisioning."""
config = Config()
config.provisioner = "azure"
config.nodeTypes = ["Standard_D2s_v3", "Standard_D4s_v3", "Standard_F8s_v2"]
config.maxNodes = 150
# Azure-specific configuration
config.azureRegion = "West US 2"
config.azureResourceGroup = "toil-research-rg"
config.azureStorageAccount = "toilstorageaccount"
# Authentication
config.azureSubscriptionId = "12345678-1234-1234-1234-123456789abc"
config.azureTenantId = "87654321-4321-4321-4321-cba987654321"
config.azureClientId = "abcdef01-2345-6789-abcd-ef0123456789"
config.azureClientSecret = "your-client-secret"
# Networking
config.azureVirtualNetwork = "toil-vnet"
config.azureSubnet = "toil-subnet"
# Storage
config.nodeStorage = 128
config.nodeStorageType = "Premium_LRS" # Premium SSD
# Spot instances
config.defaultPreemptible = True
config.azureSpotMaxPrice = 0.15 # Maximum hourly price
return config
def advanced_azure_features():
"""Advanced Azure provisioning features."""
provisioner = AzureProvisioner(
clusterName="azure-hpc-cluster",
clusterType="batch",
zone="West US 2",
nodeStorage=256,
nodeStorageType="Premium_LRS"
)
# Launch with Azure-specific configuration
provisioner.launchCluster(
leaderNodeType="Standard_D8s_v3",
leaderStorage=128,
owner="hpc-team",
keyName="azure-keypair",
# Azure-specific options
vmImage="Canonical:0001-com-ubuntu-server-focal:20_04-lts-gen2:latest",
availabilitySet="toil-availability-set",
networkSecurityGroup="toil-nsg",
customData="""#!/bin/bash
# Azure-specific initialization
apt-get update
apt-get install -y docker.io
# Configure Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb | bash
# Install Azure batch tools
pip3 install azure-batch azure-storage-blob
"""
)
# Add high-performance computing nodes
hpc_node_types = [
NodeInfo(
cores=16,
memory=128*1024**3,
disk=500*1024**3,
preemptible=False,
nodeType="Standard_H16r" # High-performance computing instance
),
NodeInfo(
cores=44,
memory=352*1024**3,
disk=1000*1024**3,
preemptible=True,
nodeType="Standard_H44rs" # High-memory HPC instance
)
]
provisioner.addNodes(
nodeTypes=hpc_node_types,
numNodes=20,
preemptible=True,
# Azure-specific HPC configuration
enableAcceleratedNetworking=True,
enableInfiniBand=True, # For HPC workloads
proximityPlacementGroup="hpc-placement-group"
)
return provisioner{ .api }
Comprehensive cluster management utilities and monitoring capabilities.
from toil.provisioners.clusterScaler import ClusterScaler
import time
import logging
def cluster_lifecycle_management():
"""Comprehensive cluster lifecycle management."""
class ClusterManager:
"""High-level cluster management interface."""
def __init__(self, provisioner: AbstractProvisioner):
self.provisioner = provisioner
self.logger = logging.getLogger(__name__)
self.metrics = {
'nodes_launched': 0,
'nodes_terminated': 0,
'cost_savings': 0.0,
'uptime': 0.0
}
def deploy_cluster(self, config: dict) -> str:
"""Deploy cluster with comprehensive configuration."""
start_time = time.time()
try:
# Launch leader node
self.provisioner.launchCluster(
leaderNodeType=config['leader_type'],
leaderStorage=config['leader_storage'],
owner=config['owner'],
keyName=config['key_name']
)
# Wait for leader to be ready
self.wait_for_leader_ready(timeout=600)
# Add initial worker nodes
if config.get('initial_workers', 0) > 0:
self.add_workers(
node_types=config['worker_types'],
count=config['initial_workers'],
preemptible=config.get('use_spot', True)
)
# Configure monitoring
self.setup_cluster_monitoring()
# Configure auto-scaling
if config.get('auto_scaling', False):
self.enable_auto_scaling(config['scaling_config'])
deployment_time = time.time() - start_time
self.logger.info(f"Cluster deployed in {deployment_time:.2f} seconds")
return self.provisioner.cluster_name
except Exception as e:
self.logger.error(f"Cluster deployment failed: {e}")
# Cleanup on failure
self.cleanup_failed_deployment()
raise
def add_workers(self, node_types: List[NodeInfo], count: int,
preemptible: bool = True) -> int:
"""Add worker nodes with monitoring."""
nodes_added = self.provisioner.addNodes(
nodeTypes=node_types,
numNodes=count,
preemptible=preemptible
)
self.metrics['nodes_launched'] += nodes_added
# Monitor node startup
self.monitor_node_startup(nodes_added)
return nodes_added
def remove_workers(self, node_count: int = None,
node_ids: List[str] = None) -> int:
"""Remove worker nodes gracefully."""
if node_ids:
nodes_to_remove = node_ids
else:
# Select nodes to remove (prefer spot instances)
all_nodes = self.get_cluster_nodes()
spot_nodes = [n for n in all_nodes if n.preemptible]
nodes_to_remove = spot_nodes[:node_count] if node_count else spot_nodes
# Gracefully drain jobs
for node_id in nodes_to_remove:
self.drain_node(node_id)
# Terminate nodes
self.provisioner.terminateNodes([n.id for n in nodes_to_remove])
self.metrics['nodes_terminated'] += len(nodes_to_remove)
return len(nodes_to_remove)
def monitor_cluster_health(self):
"""Monitor cluster health and performance."""
health_metrics = {
'total_nodes': 0,
'healthy_nodes': 0,
'unhealthy_nodes': 0,
'avg_cpu_usage': 0.0,
'avg_memory_usage': 0.0,
'job_queue_length': 0
}
cluster_nodes = self.get_cluster_nodes()
health_metrics['total_nodes'] = len(cluster_nodes)
for node in cluster_nodes:
node_health = self.check_node_health(node)
if node_health['healthy']:
health_metrics['healthy_nodes'] += 1
health_metrics['avg_cpu_usage'] += node_health['cpu_usage']
health_metrics['avg_memory_usage'] += node_health['memory_usage']
else:
health_metrics['unhealthy_nodes'] += 1
# Calculate averages
if health_metrics['healthy_nodes'] > 0:
health_metrics['avg_cpu_usage'] /= health_metrics['healthy_nodes']
health_metrics['avg_memory_usage'] /= health_metrics['healthy_nodes']
# Get job queue status
health_metrics['job_queue_length'] = self.get_job_queue_length()
# Log health status
self.logger.info(f"Cluster Health: {health_metrics}")
# Take corrective actions if needed
if health_metrics['unhealthy_nodes'] > 0:
self.handle_unhealthy_nodes(cluster_nodes)
return health_metrics
def optimize_costs(self):
"""Optimize cluster costs through intelligent resource management."""
# Get current pricing and usage
current_costs = self.calculate_current_costs()
# Analyze spot instance opportunities
spot_savings = self.analyze_spot_opportunities()
# Implement cost optimizations
if spot_savings['potential_savings'] > 0.2: # 20% savings threshold
self.implement_spot_optimization(spot_savings)
# Right-size instances based on actual usage
resize_recommendations = self.analyze_instance_utilization()
if resize_recommendations:
self.implement_instance_resizing(resize_recommendations)
# Clean up unused resources
self.cleanup_unused_resources()
new_costs = self.calculate_current_costs()
savings = current_costs - new_costs
self.metrics['cost_savings'] += savings
self.logger.info(f"Cost optimization saved ${savings:.2f}/hour")
def destroy_cluster(self):
"""Safely destroy cluster with cleanup."""
try:
# Stop all jobs gracefully
self.stop_all_jobs(timeout=300)
# Save final metrics
self.save_cluster_metrics()
# Destroy infrastructure
self.provisioner.destroyCluster()
self.logger.info("Cluster destroyed successfully")
except Exception as e:
self.logger.error(f"Error during cluster destruction: {e}")
# Force cleanup
self.force_cleanup()
def cluster_auto_scaling():
"""Advanced auto-scaling implementation."""
class AutoScaler:
"""Intelligent cluster auto-scaler."""
def __init__(self, provisioner: AbstractProvisioner, config: dict):
self.provisioner = provisioner
self.config = config
self.scaling_history = []
def run_scaling_loop(self):
"""Main auto-scaling control loop."""
while True:
try:
# Collect metrics
metrics = self.collect_scaling_metrics()
# Make scaling decision
scaling_decision = self.make_scaling_decision(metrics)
# Execute scaling action
if scaling_decision['action'] != 'none':
self.execute_scaling_action(scaling_decision)
# Wait before next evaluation
time.sleep(self.config.get('evaluation_interval', 60))
except Exception as e:
logging.error(f"Auto-scaling error: {e}")
time.sleep(30) # Short delay on error
def collect_scaling_metrics(self) -> dict:
"""Collect metrics for scaling decisions."""
return {
'cpu_utilization': self.get_cluster_cpu_utilization(),
'memory_utilization': self.get_cluster_memory_utilization(),
'job_queue_length': self.get_job_queue_length(),
'pending_jobs': self.get_pending_jobs_count(),
'node_count': len(self.get_cluster_nodes()),
'spot_instance_interruptions': self.get_recent_interruptions()
}
def make_scaling_decision(self, metrics: dict) -> dict:
"""Intelligent scaling decision based on multiple factors."""
decision = {'action': 'none', 'node_count': 0, 'node_types': []}
# Scale up conditions
if (metrics['cpu_utilization'] > 0.8 or
metrics['job_queue_length'] > metrics['node_count'] * 2):
decision['action'] = 'scale_up'
decision['node_count'] = self.calculate_scale_up_amount(metrics)
decision['node_types'] = self.select_optimal_node_types(metrics)
# Scale down conditions
elif (metrics['cpu_utilization'] < 0.3 and
metrics['job_queue_length'] == 0 and
metrics['node_count'] > 1):
decision['action'] = 'scale_down'
decision['node_count'] = self.calculate_scale_down_amount(metrics)
# Record decision
self.scaling_history.append({
'timestamp': time.time(),
'metrics': metrics,
'decision': decision
})
return decision
def calculate_scale_up_amount(self, metrics: dict) -> int:
"""Calculate optimal number of nodes to add."""
# Base calculation on queue length and utilization
queue_based = max(1, metrics['job_queue_length'] // 3)
utilization_based = max(1, int(metrics['node_count'] * 0.3))
# Consider recent interruptions
interruption_buffer = metrics['spot_instance_interruptions']
# Cap scaling to prevent over-provisioning
max_scale = self.config.get('max_scale_up', 10)
return min(max_scale, max(queue_based, utilization_based) + interruption_buffer)This cloud provisioning system provides comprehensive, intelligent cluster management with cost optimization, auto-scaling, and multi-cloud support for scalable workflow execution.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10