@batch(cpu: int = None, memory: int = None, image: str = None,
queue: str = None, iam_role: str = None, gpu: int = None,
shared_memory: int = None, inferentia: int = None, trainium: int = None)
"""
Execute on AWS Batch.
Args:
cpu (int): CPU units (1024 = 1 vCPU)
memory (int): Memory in MB
image (str): Docker image
queue (str): Batch queue
iam_role (str): IAM role ARN
gpu (int): GPU count
shared_memory (int): Shared memory in MB
inferentia (int): AWS Inferentia chips
trainium (int): AWS Trainium chips
Example:
@batch(cpu=8192, memory=32000, gpu=2, queue='gpu-queue')
@step
def train(self):
pass
"""@kubernetes(cpu: int = None, memory: int = None, image: str = None,
gpu: int = None, secrets: list = None, namespace: str = None,
service_account: str = None, persistent_volume_claims: dict = None,
node_selector: dict = None, tolerations: list = None,
labels: dict = None, annotations: dict = None)
"""
Execute on Kubernetes.
Args:
cpu (int): CPU cores
memory (int): Memory in MB
image (str): Docker image
gpu (int): GPU count
secrets (list): Kubernetes secrets
namespace (str): K8s namespace
service_account (str): Service account
persistent_volume_claims (dict): PVC mounts
node_selector (dict): Node selection
tolerations (list): Tolerations
Example:
@kubernetes(
cpu=8,
memory=32000,
gpu=2,
node_selector={'node-type': 'gpu'},
secrets=['aws-creds']
)
@step
def train(self):
pass
"""@resources(cpu: int = None, memory: int = None, gpu: int = None)
"""
Declare resource requirements.
Works with @batch and @kubernetes decorators.
Example:
@batch
@resources(cpu=16, memory=64000, gpu=4)
@step
def train(self):
pass
"""# CPU-intensive
@batch(queue='compute')
@resources(cpu=32, memory=128000)
@step
def preprocess(self):
pass
# GPU training
@batch(queue='gpu')
@resources(gpu=8, memory=256000)
@step
def train(self):
pass
# AWS Inferentia inference
@batch(inferentia=1, memory=16000)
@step
def inference(self):
pass
# Kubernetes with persistent storage
@kubernetes(
cpu=4,
memory=16000,
persistent_volume_claims={'data': '/mnt/data'}
)
@step
def load(self):
pass@conda(libraries: dict = None, python: str = None,
sources: list = None, disabled: bool = False)
"""
Specify Conda environment.
Args:
libraries (dict): Package versions {'name': 'version'}
python (str): Python version
sources (list): Additional conda channels
disabled (bool): Disable conda for this step
Example:
@conda(
libraries={
'numpy': '1.21.0',
'pandas': '1.3.0',
'scikit-learn': '0.24.2'
},
python='3.9'
)
@step
def train(self):
import numpy as np
import pandas as pd
pass
"""@pypi(packages: dict = None, python: str = None)
"""
Specify PyPI packages.
Args:
packages (dict): Package versions {'name': 'version'}
python (str): Python version
Example:
@pypi(packages={
'torch': '1.9.0',
'transformers': '4.18.0',
'datasets': '1.17.0'
})
@step
def train(self):
import torch
import transformers
pass
"""@pypi_base(packages: dict = None, python: str = None)
"""
Base packages for all steps (flow-level).
Example:
from metaflow import FlowSpec, pypi_base, step
@pypi_base(packages={'pandas': '1.3.0', 'numpy': '1.21.0'})
class MyFlow(FlowSpec):
@step
def start(self):
import pandas as pd # Available in all steps
pass
"""@conda_base(libraries: dict = None, python: str = None)
"""
Base conda packages for all steps (flow-level).
Example:
@conda_base(libraries={'numpy': '1.21.0'}, python='3.9')
class MyFlow(FlowSpec):
pass
"""from metaflow import FlowSpec, step, batch, pypi, conda
class MLFlow(FlowSpec):
@step
def start(self):
self.next(self.preprocess, self.train)
# Pandas for preprocessing
@pypi(packages={'pandas': '1.3.0', 'pyarrow': '5.0.0'})
@step
def preprocess(self):
import pandas as pd
self.data = pd.DataFrame()
self.next(self.join)
# PyTorch for training
@batch(gpu=1)
@pypi(packages={'torch': '1.9.0', 'torchvision': '0.10.0'})
@step
def train(self):
import torch
self.model = torch.nn.Linear(10, 1)
self.next(self.join)
@step
def join(self, inputs):
self.merge_artifacts(inputs)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
MLFlow()@environment(vars: dict = None)
"""
Set environment variables.
Args:
vars (dict): Environment variables
Example:
@environment(vars={
'AWS_DEFAULT_REGION': 'us-west-2',
'TOKENIZERS_PARALLELISM': 'false',
'OMP_NUM_THREADS': '8'
})
@step
def process(self):
import os
region = os.environ['AWS_DEFAULT_REGION']
pass
"""@batch(
image='custom-ml:latest',
cpu=8192,
memory=32000
)
@step
def process(self):
# Runs in custom Docker image
pass@kubernetes(
image='gcr.io/my-project/ml-image:v1.2',
cpu=8,
memory=32000
)
@step
def train(self):
# Runs in custom image
pass@batch(
cpu=16384, # 16 vCPUs
memory=128000, # 128 GB
gpu=4, # 4 GPUs
queue='gpu-queue', # Specific queue
iam_role='arn:aws:iam::123:role/MetaflowRole', # IAM role
shared_memory=32000, # 32 GB shared memory
image='my-image:latest' # Custom image
)
@step
def train(self):
pass@kubernetes(
cpu=16,
memory=128000,
gpu=4,
namespace='ml-workloads',
service_account='metaflow-sa',
secrets=['aws-creds', 'model-registry-key'],
persistent_volume_claims={
'data-pvc': '/mnt/data',
'models-pvc': '/mnt/models'
},
node_selector={
'node-type': 'gpu',
'gpu-type': 'a100'
},
tolerations=[
{
'key': 'gpu',
'operator': 'Equal',
'value': 'true',
'effect': 'NoSchedule'
}
],
labels={'project': 'ml', 'team': 'research'},
annotations={'sidecar.istio.io/inject': 'false'}
)
@step
def train(self):
passfrom metaflow import FlowSpec, step, batch, resources
class ParallelProcessor(FlowSpec):
@step
def start(self):
self.files = [f'file{i}.csv' for i in range(100)]
self.next(self.process, foreach='files')
@batch(queue='standard')
@resources(cpu=4, memory=16000)
@step
def process(self):
# Each file processed in separate batch job
self.result = self.process_file(self.input)
self.next(self.join)
@step
def join(self, inputs):
self.results = [i.result for i in inputs]
self.next(self.end)
@step
def end(self):
print(f"Processed {len(self.results)} files")
def process_file(self, path):
return f"processed_{path}"
if __name__ == '__main__':
ParallelProcessor()from metaflow import FlowSpec, step, batch, resources
class HybridFlow(FlowSpec):
@step
def start(self):
# Runs locally
self.config = self.load_config()
self.next(self.train)
@batch
@resources(cpu=32, memory=128000, gpu=8)
@step
def train(self):
# Runs on cloud with GPUs
self.model = self.train_model(self.config)
self.next(self.evaluate)
@batch
@resources(cpu=16, memory=64000)
@step
def evaluate(self):
# Runs on cloud with CPUs
self.metrics = self.evaluate_model(self.model)
self.next(self.end)
@step
def end(self):
# Runs locally
print(f"Metrics: {self.metrics}")
def load_config(self):
return {'lr': 0.01}
def train_model(self, config):
return "model"
def evaluate_model(self, model):
return {'acc': 0.95}
if __name__ == '__main__':
HybridFlow()from metaflow import FlowSpec, step, batch, resources, pypi
class SpecializedML(FlowSpec):
@step
def start(self):
self.data = self.load_data()
self.next(self.train_gpu, self.train_inferentia)
@batch(queue='gpu-queue')
@resources(gpu=8, memory=256000)
@pypi(packages={'torch': '1.9.0'})
@step
def train_gpu(self):
# Train on GPUs
self.gpu_model = self.train_with_gpu(self.data)
self.next(self.join)
@batch(inferentia=4, memory=128000)
@pypi(packages={'torch-neuron': '1.9.0'})
@step
def train_inferentia(self):
# Train on AWS Inferentia
self.inf_model = self.train_with_inferentia(self.data)
self.next(self.join)
@step
def join(self, inputs):
self.models = [i.gpu_model if hasattr(i, 'gpu_model') else i.inf_model
for i in inputs]
self.next(self.end)
@step
def end(self):
print(f"Trained {len(self.models)} models")
def load_data(self):
return []
def train_with_gpu(self, data):
return "gpu_model"
def train_with_inferentia(self, data):
return "inf_model"
if __name__ == '__main__':
SpecializedML()