or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-advanced.mdapi-client.mdapi-compute.mdapi-core.mdapi-data.mdindex.md
tile.json

api-advanced.mddocs/

Advanced Features

Parallel Processing

parallel_map

from metaflow import parallel_map

def parallel_map(func: callable, iterable, max_parallel: int = None,
                 dir: str = None) -> list:
    """
    Execute function across inputs in parallel with ordered results.

    Args:
        func: Function to apply (must be picklable)
        iterable: Input items
        max_parallel (int): Max workers (default: CPU count)
        dir (str): Temp directory for IPC

    Returns:
        list: Results in same order as inputs

    Example:
        def process(x):
            return x ** 2

        results = parallel_map(process, range(100), max_parallel=8)
    """

parallel_imap_unordered

from metaflow import parallel_imap_unordered

def parallel_imap_unordered(func: callable, iterable,
                             max_parallel: int = None, dir: str = None):
    """
    Execute function in parallel, yielding results as they complete.

    Yields:
        Results (order not preserved, more memory efficient)

    Example:
        def process(x):
            return x ** 2

        for result in parallel_imap_unordered(process, range(100), max_parallel=8):
            print(result)
    """

Parallel Examples

from metaflow import FlowSpec, step, parallel_map, S3

def process_file(path):
    """Process single file (runs in separate process)"""
    with S3() as s3:
        data = s3.get(path)
    return len(data)

class ParallelFlow(FlowSpec):
    @step
    def start(self):
        with S3() as s3:
            self.files = s3.list_paths(['s3://bucket/data/'])

        # Process all files in parallel within this step
        self.sizes = parallel_map(
            process_file,
            self.files,
            max_parallel=16
        )

        print(f"Total size: {sum(self.sizes)}")
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    ParallelFlow()
from metaflow import parallel_imap_unordered

def compute_features(record):
    return {'id': record['id'], 'features': [1, 2, 3]}

# Streaming results
processed = []
for result in parallel_imap_unordered(compute_features, records, max_parallel=8):
    processed.append(result)
    if len(processed) % 100 == 0:
        print(f"Processed {len(processed)} records")

Profiling

from metaflow import profile

def profile(label: str, stats_dict: dict = None):
    """
    Context manager for profiling code execution.

    Args:
        label (str): Profile identifier
        stats_dict (dict, optional): Dict to store stats

    Example:
        stats = {}

        with profile('data_loading', stats):
            data = load_data()

        with profile('training', stats):
            model = train(data)

        for op, timing in stats.items():
            print(f"{op}: {timing['duration']:.2f}s")
    """

Profiling Example

from metaflow import FlowSpec, step, profile

class ProfiledFlow(FlowSpec):
    @step
    def start(self):
        stats = {}

        with profile('load', stats):
            self.data = self.load_data()

        with profile('preprocess', stats):
            self.processed = self.preprocess(self.data)

        with profile('train', stats):
            self.model = self.train(self.processed)

        # Print performance
        for operation, timing in stats.items():
            print(f"{operation}: {timing['duration']:.2f}s")

        self.profiling_stats = stats
        self.next(self.end)

    @step
    def end(self):
        # Identify bottlenecks
        slowest = max(self.profiling_stats.items(),
                     key=lambda x: x[1]['duration'])
        print(f"Slowest: {slowest[0]} ({slowest[1]['duration']:.2f}s)")

    def load_data(self):
        return []

    def preprocess(self, data):
        return data

    def train(self, data):
        return "model"

if __name__ == '__main__':
    ProfiledFlow()

Custom Decorators

UserStepDecorator

from metaflow import UserStepDecorator

class UserStepDecorator:
    """
    Base class for custom step decorators.

    Override:
        pre_mutate(mutable_step): Modify step before execution
        mutate(mutable_step): Modify step during execution (return USER_SKIP_STEP to skip)

    Example:
        class TimingDecorator(UserStepDecorator):
            def pre_mutate(self, mutable_step):
                mutable_step.add_init_code('import time; _start = time.time()')
                mutable_step.add_cleanup_code('print(f"Duration: {time.time() - _start:.2f}s")')

        class MyFlow(FlowSpec):
            @TimingDecorator
            @step
            def start(self):
                self.next(self.end)
    """

    def pre_mutate(self, mutable_step):
        """Modify step before execution"""
        pass

    def mutate(self, mutable_step):
        """Modify step during execution"""
        pass

FlowMutator

from metaflow import FlowMutator

class FlowMutator:
    """
    Base class for custom flow decorators.

    Override:
        pre_mutate(mutable_flow): Modify flow before execution

    Example:
        class LoggingFlowDecorator(FlowMutator):
            def pre_mutate(self, mutable_flow):
                for step in mutable_flow.steps:
                    step.add_init_code('print(f"Starting {self.name}")')

        @LoggingFlowDecorator
        class MyFlow(FlowSpec):
            pass
    """

    def pre_mutate(self, mutable_flow):
        """Modify flow before execution"""
        pass

USER_SKIP_STEP

from metaflow import USER_SKIP_STEP

"""
Return from decorator's mutate() to skip step execution.

Example:
    class ConditionalDecorator(UserStepDecorator):
        def __init__(self, condition_attr):
            self.condition_attr = condition_attr

        def mutate(self, mutable_step):
            flow = mutable_step.flow
            if hasattr(flow, self.condition_attr) and getattr(flow, self.condition_attr):
                return USER_SKIP_STEP

    class MyFlow(FlowSpec):
        skip_training = Parameter('skip', type=bool, default=False)

        @ConditionalDecorator('skip_training')
        @step
        def train(self):
            # Skipped if skip_training=True
            pass
"""

Custom Decorator Examples

from metaflow import UserStepDecorator, FlowSpec, step

class ValidateDecorator(UserStepDecorator):
    """Validate required attributes exist"""

    def __init__(self, *required_attrs):
        self.required_attrs = required_attrs

    def mutate(self, mutable_step):
        flow = mutable_step.flow
        missing = [attr for attr in self.required_attrs
                  if not hasattr(flow, attr)]
        if missing:
            raise ValueError(f"Missing required attributes: {missing}")

class MyFlow(FlowSpec):
    @step
    def start(self):
        self.data = [1, 2, 3]
        self.config = {'param': 'value'}
        self.next(self.process)

    @ValidateDecorator('data', 'config')
    @step
    def process(self):
        # Validated that data and config exist
        print(f"Processing {len(self.data)} items")
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    MyFlow()
from metaflow import UserStepDecorator

class NotifyDecorator(UserStepDecorator):
    """Send notification on completion"""

    def __init__(self, channel='slack'):
        self.channel = channel

    def pre_mutate(self, mutable_step):
        notify_code = f'''
try:
    print(f"Step {{self.name}} completed - notify {self.channel}")
except Exception as e:
    print(f"Notification failed: {{e}}")
'''
        mutable_step.add_cleanup_code(notify_code)
from metaflow import FlowMutator

class TimingFlowDecorator(FlowMutator):
    """Add timing to all steps"""

    def pre_mutate(self, mutable_flow):
        for step in mutable_flow.steps:
            step.add_init_code('import time; _start = time.time()')
            step.add_cleanup_code('print(f"{self.name}: {time.time()-_start:.2f}s")')

@TimingFlowDecorator
class MyFlow(FlowSpec):
    # All steps automatically get timing
    pass

Exceptions

from metaflow.exception import (
    MetaflowException,          # Base exception
    MetaflowNotFound,           # Object not found
    MetaflowNamespaceMismatch,  # Namespace mismatch
    MetaflowInvalidPathspec,    # Invalid pathspec
    MetaflowInternalError,      # Internal error
    UnhandledInMergeArtifactsException,  # Merge conflict
    MissingInMergeArtifactsException     # Missing required artifacts
)

# Catch specific exceptions
try:
    flow = Flow('MyFlow')
    run = flow.latest_run
except MetaflowNotFound:
    print("Flow or run not found")
except MetaflowException as e:
    print(f"Metaflow error: {e}")

Exception Handling Examples

from metaflow import Flow
from metaflow.exception import MetaflowNotFound, MetaflowException

def get_latest_model(flow_name):
    """Safely get latest model"""
    try:
        flow = Flow(flow_name)
        run = flow.latest_successful_run

        if hasattr(run.data, 'model'):
            return run.data.model
        else:
            print("No model artifact found")
            return None

    except MetaflowNotFound:
        print(f"Flow '{flow_name}' not found")
        return None

    except MetaflowException as e:
        print(f"Metaflow error: {e}")
        return None

model = get_latest_model('TrainingFlow')
from metaflow import FlowSpec, step
from metaflow.exception import UnhandledInMergeArtifactsException

class FlexibleMerge(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3, 4]
        self.next(self.process, foreach='items')

    @step
    def process(self):
        self.item_id = self.input
        # Only create 'result' for even numbers
        if self.input % 2 == 0:
            self.result = self.input * 2
        self.next(self.join)

    @step
    def join(self, inputs):
        try:
            self.merge_artifacts(inputs, exclude=['result'])
        except UnhandledInMergeArtifactsException as e:
            print(f"Expected merge conflict: {e}")

        # Handle varying artifacts manually
        self.results = {i.item_id: i.result for i in inputs
                       if hasattr(i, 'result')}

        self.next(self.end)

    @step
    def end(self):
        print(f"Results: {self.results}")

if __name__ == '__main__':
    FlexibleMerge()

Complete Advanced Example

from metaflow import FlowSpec, step, batch, resources
from metaflow import parallel_map, profile, UserStepDecorator, Parameter
from metaflow import S3
from metaflow.exception import MetaflowException

class MonitorDecorator(UserStepDecorator):
    """Monitor resource usage"""

    def pre_mutate(self, mutable_step):
        mutable_step.add_init_code('''
import psutil
import os
_proc = psutil.Process(os.getpid())
_start_mem = _proc.memory_info().rss / 1024 / 1024
        ''')

        mutable_step.add_cleanup_code('''
_end_mem = _proc.memory_info().rss / 1024 / 1024
print(f"Memory delta: {_end_mem - _start_mem:.1f} MB")
        ''')

def process_chunk(chunk):
    """Process data chunk in parallel"""
    return sum(chunk)

class AdvancedPipeline(FlowSpec):
    batch_size = Parameter('batch_size', default=1000, type=int)

    @step
    def start(self):
        stats = {}

        with profile('data_loading', stats):
            with S3() as s3:
                self.files = s3.list_paths(['s3://bucket/data/'])

        print(f"Loaded {len(self.files)} files in {stats['data_loading']['duration']:.2f}s")
        self.next(self.process)

    @batch
    @resources(cpu=16, memory=64000)
    @MonitorDecorator
    @step
    def process(self):
        stats = {}

        # Load data
        with profile('download', stats):
            with S3() as s3:
                data_list = s3.get_many(self.files[:100])  # Sample

        # Process in parallel
        with profile('processing', stats):
            chunks = [data_list[i:i+self.batch_size]
                     for i in range(0, len(data_list), self.batch_size)]

            self.results = parallel_map(
                process_chunk,
                chunks,
                max_parallel=8
            )

        # Save results
        with profile('upload', stats):
            with S3() as s3:
                s3.put('s3://bucket/results.pkl', self.results)

        self.timing = stats
        self.next(self.end)

    @step
    def end(self):
        # Analyze performance
        print("\nPerformance Report:")
        for operation, timing in self.timing.items():
            print(f"  {operation}: {timing['duration']:.2f}s")

        total = sum(t['duration'] for t in self.timing.values())
        print(f"  Total: {total:.2f}s")

        # Results
        print(f"\nProcessed {len(self.results)} chunks")
        print(f"Total: {sum(self.results)}")

if __name__ == '__main__':
    AdvancedPipeline()

Best Practices

Parallel Processing

# Use for CPU-bound operations within a step
results = parallel_map(cpu_intensive_func, items, max_parallel=8)

# For I/O-bound operations, use foreach instead
self.next(self.download, foreach='urls')  # Better for I/O

# Functions must be picklable (top-level, not nested)
def process(x):  # Good
    return x * 2

def outer():
    def process(x):  # Avoid (may not pickle)
        return x * 2
    return parallel_map(process, items)

Custom Decorators

# Keep decorators focused and composable
class TimingDecorator(UserStepDecorator):
    """Does one thing well"""
    pass

class ValidationDecorator(UserStepDecorator):
    """Separate concern"""
    pass

# Can be combined
@TimingDecorator
@ValidationDecorator
@step
def process(self):
    pass

# Use unique variable names in injected code (prefix with _)
mutable_step.add_init_code('_start_time = time.time()')  # Good
mutable_step.add_init_code('start_time = time.time()')   # May conflict

# Handle errors gracefully
def mutate(self, mutable_step):
    try:
        # Decorator logic
        pass
    except Exception as e:
        print(f"Decorator error: {e}")
        # Decide: re-raise, skip, or continue

Exception Handling

# Catch specific exceptions
try:
    run = flow.latest_run
except MetaflowNotFound:
    print("Not found")
except MetaflowException:
    print("Other Metaflow error")

# Provide context in error messages
try:
    data = run.data.artifact
except MetaflowNotFound as e:
    print(f"Artifact 'artifact' not found in run {run.id}: {e}")

# Clean up resources
connection = None
try:
    connection = connect()
    data = fetch(connection)
except MetaflowException as e:
    print(f"Error: {e}")
finally:
    if connection:
        connection.close()