or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-advanced.mdapi-client.mdapi-compute.mdapi-core.mdapi-data.mdindex.md
tile.json

api-compute.mddocs/

Compute & Environment

Cloud Execution

@batch - AWS Batch

@batch(cpu: int = None, memory: int = None, image: str = None,
       queue: str = None, iam_role: str = None, gpu: int = None,
       shared_memory: int = None, inferentia: int = None, trainium: int = None)
"""
Execute on AWS Batch.

Args:
    cpu (int): CPU units (1024 = 1 vCPU)
    memory (int): Memory in MB
    image (str): Docker image
    queue (str): Batch queue
    iam_role (str): IAM role ARN
    gpu (int): GPU count
    shared_memory (int): Shared memory in MB
    inferentia (int): AWS Inferentia chips
    trainium (int): AWS Trainium chips

Example:
    @batch(cpu=8192, memory=32000, gpu=2, queue='gpu-queue')
    @step
    def train(self):
        pass
"""

@kubernetes

@kubernetes(cpu: int = None, memory: int = None, image: str = None,
            gpu: int = None, secrets: list = None, namespace: str = None,
            service_account: str = None, persistent_volume_claims: dict = None,
            node_selector: dict = None, tolerations: list = None,
            labels: dict = None, annotations: dict = None)
"""
Execute on Kubernetes.

Args:
    cpu (int): CPU cores
    memory (int): Memory in MB
    image (str): Docker image
    gpu (int): GPU count
    secrets (list): Kubernetes secrets
    namespace (str): K8s namespace
    service_account (str): Service account
    persistent_volume_claims (dict): PVC mounts
    node_selector (dict): Node selection
    tolerations (list): Tolerations

Example:
    @kubernetes(
        cpu=8,
        memory=32000,
        gpu=2,
        node_selector={'node-type': 'gpu'},
        secrets=['aws-creds']
    )
    @step
    def train(self):
        pass
"""

@resources

@resources(cpu: int = None, memory: int = None, gpu: int = None)
"""
Declare resource requirements.

Works with @batch and @kubernetes decorators.

Example:
    @batch
    @resources(cpu=16, memory=64000, gpu=4)
    @step
    def train(self):
        pass
"""

Resource Combinations

# CPU-intensive
@batch(queue='compute')
@resources(cpu=32, memory=128000)
@step
def preprocess(self):
    pass

# GPU training
@batch(queue='gpu')
@resources(gpu=8, memory=256000)
@step
def train(self):
    pass

# AWS Inferentia inference
@batch(inferentia=1, memory=16000)
@step
def inference(self):
    pass

# Kubernetes with persistent storage
@kubernetes(
    cpu=4,
    memory=16000,
    persistent_volume_claims={'data': '/mnt/data'}
)
@step
def load(self):
    pass

Package Management

@conda

@conda(libraries: dict = None, python: str = None,
       sources: list = None, disabled: bool = False)
"""
Specify Conda environment.

Args:
    libraries (dict): Package versions {'name': 'version'}
    python (str): Python version
    sources (list): Additional conda channels
    disabled (bool): Disable conda for this step

Example:
    @conda(
        libraries={
            'numpy': '1.21.0',
            'pandas': '1.3.0',
            'scikit-learn': '0.24.2'
        },
        python='3.9'
    )
    @step
    def train(self):
        import numpy as np
        import pandas as pd
        pass
"""

@pypi

@pypi(packages: dict = None, python: str = None)
"""
Specify PyPI packages.

Args:
    packages (dict): Package versions {'name': 'version'}
    python (str): Python version

Example:
    @pypi(packages={
        'torch': '1.9.0',
        'transformers': '4.18.0',
        'datasets': '1.17.0'
    })
    @step
    def train(self):
        import torch
        import transformers
        pass
"""

@pypi_base

@pypi_base(packages: dict = None, python: str = None)
"""
Base packages for all steps (flow-level).

Example:
    from metaflow import FlowSpec, pypi_base, step

    @pypi_base(packages={'pandas': '1.3.0', 'numpy': '1.21.0'})
    class MyFlow(FlowSpec):
        @step
        def start(self):
            import pandas as pd  # Available in all steps
            pass
"""

@conda_base

@conda_base(libraries: dict = None, python: str = None)
"""
Base conda packages for all steps (flow-level).

Example:
    @conda_base(libraries={'numpy': '1.21.0'}, python='3.9')
    class MyFlow(FlowSpec):
        pass
"""

Combining Dependencies

from metaflow import FlowSpec, step, batch, pypi, conda

class MLFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.preprocess, self.train)

    # Pandas for preprocessing
    @pypi(packages={'pandas': '1.3.0', 'pyarrow': '5.0.0'})
    @step
    def preprocess(self):
        import pandas as pd
        self.data = pd.DataFrame()
        self.next(self.join)

    # PyTorch for training
    @batch(gpu=1)
    @pypi(packages={'torch': '1.9.0', 'torchvision': '0.10.0'})
    @step
    def train(self):
        import torch
        self.model = torch.nn.Linear(10, 1)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.merge_artifacts(inputs)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    MLFlow()

Environment Variables

@environment

@environment(vars: dict = None)
"""
Set environment variables.

Args:
    vars (dict): Environment variables

Example:
    @environment(vars={
        'AWS_DEFAULT_REGION': 'us-west-2',
        'TOKENIZERS_PARALLELISM': 'false',
        'OMP_NUM_THREADS': '8'
    })
    @step
    def process(self):
        import os
        region = os.environ['AWS_DEFAULT_REGION']
        pass
"""

Docker Images

Custom Images with @batch

@batch(
    image='custom-ml:latest',
    cpu=8192,
    memory=32000
)
@step
def process(self):
    # Runs in custom Docker image
    pass

Custom Images with @kubernetes

@kubernetes(
    image='gcr.io/my-project/ml-image:v1.2',
    cpu=8,
    memory=32000
)
@step
def train(self):
    # Runs in custom image
    pass

Advanced Batch Configuration

@batch(
    cpu=16384,              # 16 vCPUs
    memory=128000,          # 128 GB
    gpu=4,                  # 4 GPUs
    queue='gpu-queue',      # Specific queue
    iam_role='arn:aws:iam::123:role/MetaflowRole',  # IAM role
    shared_memory=32000,    # 32 GB shared memory
    image='my-image:latest' # Custom image
)
@step
def train(self):
    pass

Advanced Kubernetes Configuration

@kubernetes(
    cpu=16,
    memory=128000,
    gpu=4,
    namespace='ml-workloads',
    service_account='metaflow-sa',
    secrets=['aws-creds', 'model-registry-key'],
    persistent_volume_claims={
        'data-pvc': '/mnt/data',
        'models-pvc': '/mnt/models'
    },
    node_selector={
        'node-type': 'gpu',
        'gpu-type': 'a100'
    },
    tolerations=[
        {
            'key': 'gpu',
            'operator': 'Equal',
            'value': 'true',
            'effect': 'NoSchedule'
        }
    ],
    labels={'project': 'ml', 'team': 'research'},
    annotations={'sidecar.istio.io/inject': 'false'}
)
@step
def train(self):
    pass

Execution Examples

Parallel Cloud Processing

from metaflow import FlowSpec, step, batch, resources

class ParallelProcessor(FlowSpec):
    @step
    def start(self):
        self.files = [f'file{i}.csv' for i in range(100)]
        self.next(self.process, foreach='files')

    @batch(queue='standard')
    @resources(cpu=4, memory=16000)
    @step
    def process(self):
        # Each file processed in separate batch job
        self.result = self.process_file(self.input)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [i.result for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        print(f"Processed {len(self.results)} files")

    def process_file(self, path):
        return f"processed_{path}"

if __name__ == '__main__':
    ParallelProcessor()

Mixed Local and Cloud

from metaflow import FlowSpec, step, batch, resources

class HybridFlow(FlowSpec):
    @step
    def start(self):
        # Runs locally
        self.config = self.load_config()
        self.next(self.train)

    @batch
    @resources(cpu=32, memory=128000, gpu=8)
    @step
    def train(self):
        # Runs on cloud with GPUs
        self.model = self.train_model(self.config)
        self.next(self.evaluate)

    @batch
    @resources(cpu=16, memory=64000)
    @step
    def evaluate(self):
        # Runs on cloud with CPUs
        self.metrics = self.evaluate_model(self.model)
        self.next(self.end)

    @step
    def end(self):
        # Runs locally
        print(f"Metrics: {self.metrics}")

    def load_config(self):
        return {'lr': 0.01}

    def train_model(self, config):
        return "model"

    def evaluate_model(self, model):
        return {'acc': 0.95}

if __name__ == '__main__':
    HybridFlow()

Specialized Hardware

from metaflow import FlowSpec, step, batch, resources, pypi

class SpecializedML(FlowSpec):
    @step
    def start(self):
        self.data = self.load_data()
        self.next(self.train_gpu, self.train_inferentia)

    @batch(queue='gpu-queue')
    @resources(gpu=8, memory=256000)
    @pypi(packages={'torch': '1.9.0'})
    @step
    def train_gpu(self):
        # Train on GPUs
        self.gpu_model = self.train_with_gpu(self.data)
        self.next(self.join)

    @batch(inferentia=4, memory=128000)
    @pypi(packages={'torch-neuron': '1.9.0'})
    @step
    def train_inferentia(self):
        # Train on AWS Inferentia
        self.inf_model = self.train_with_inferentia(self.data)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.models = [i.gpu_model if hasattr(i, 'gpu_model') else i.inf_model
                       for i in inputs]
        self.next(self.end)

    @step
    def end(self):
        print(f"Trained {len(self.models)} models")

    def load_data(self):
        return []

    def train_with_gpu(self, data):
        return "gpu_model"

    def train_with_inferentia(self, data):
        return "inf_model"

if __name__ == '__main__':
    SpecializedML()