or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-advanced.mdapi-client.mdapi-compute.mdapi-core.mdapi-data.mdindex.md
tile.json

api-client.mddocs/

Client & Runner API

Client API - Accessing Flow Data

Flow

class Flow:
    """
    Access a flow and its runs.

    Args:
        name (str): Flow name

    Attributes:
        latest_run: Most recent run
        latest_successful_run: Most recent successful run
        runs: Iterator over all runs

    Example:
        from metaflow import Flow

        flow = Flow('MyFlow')
        run = flow.latest_run
        print(f"Run ID: {run.id}")

        # Iterate over runs
        for run in flow.runs():
            print(run.id)
    """

    def __getitem__(self, run_id: str) -> 'Run':
        """Get run by ID: flow['123']"""

Run

class Run:
    """
    Represents a flow run.

    Args:
        pathspec (str): 'FlowName/RunID'

    Attributes:
        id (str): Run ID
        created_at (datetime): Creation time
        finished_at (datetime): Completion time
        successful (bool): Whether run succeeded
        finished (bool): Whether run completed
        data: Access artifacts from final step
        tags (set): Run tags

    Example:
        from metaflow import Flow, Run

        # Via Flow
        flow = Flow('MyFlow')
        run = flow.latest_run

        # Direct access
        run = Run('MyFlow/123')

        # Access data
        result = run.data.result
        model = run.data.model

        # Check status
        if run.successful:
            print("Run succeeded")

        # Access steps
        for step in run:
            print(step.name)
    """

    def __getitem__(self, step_name: str) -> 'Step':
        """Get step: run['train']"""

    def add_tag(self, tag: str):
        """Add tag to run"""

    def add_tags(self, tags: list):
        """Add multiple tags"""

    def remove_tag(self, tag: str):
        """Remove tag"""

    def replace_tag(self, old: str, new: str):
        """Replace tag"""

Step

class Step:
    """
    Represents a workflow step.

    Attributes:
        name (str): Step name
        finished_at (datetime): Completion time
        task: Single task (for non-foreach steps)
        tasks: Iterator over all tasks

    Example:
        run = flow.latest_run
        step = run['train']

        # Single task
        task = step.task
        model = task.data.model

        # Multiple tasks (foreach)
        for task in step.tasks():
            print(task.data.result)
    """

    def __getitem__(self, task_id: str) -> 'Task':
        """Get task: step['456']"""

Task

class Task:
    """
    Represents a step task (single execution).

    Attributes:
        id (str): Task ID
        pathspec (str): Full path
        finished (bool): Whether completed
        successful (bool): Whether succeeded
        finished_at (datetime): Completion time
        metadata_dict (dict): Task metadata
        exception (Exception): Exception if failed
        data: Access artifacts
        artifacts: Iterator over artifacts
        parent (Task): Parent task

    Example:
        task = step.task

        # Access artifacts
        result = task.data.result
        model = task.data.model

        # List all artifacts
        for artifact in task.artifacts():
            print(f"{artifact.name}: {artifact.data}")

        # Check success
        if not task.successful:
            print(f"Failed: {task.exception}")
    """

    def __getitem__(self, artifact_name: str):
        """Get artifact: task['result']"""

DataArtifact

class DataArtifact:
    """
    Represents an artifact object.

    Attributes:
        name (str): Artifact name
        data: Artifact value
        sha (str): Content hash

    Example:
        artifact = task['model']
        print(f"Name: {artifact.name}")
        model = artifact.data  # Access value
    """

Quick Access Patterns

from metaflow import Flow

# Get latest data
flow = Flow('MyFlow')
result = flow.latest_run.data.result

# Get specific run
run = Flow('MyFlow')['123']
data = run.data

# Access step/task data
run = flow.latest_run
step = run['train']
task = step.task
model = task.data.model

# Foreach step data
step = run['process']
results = [task.data.result for task in step.tasks()]

# Filter runs by tag
for run in flow.runs('production'):
    print(run.id)

# Find successful runs
for run in flow:
    if run.successful:
        print(run.data.metrics)

Runner API - Programmatic Execution

Runner

class Runner:
    """
    Execute flows programmatically.

    Args:
        flow (str): Path to flow file

    Example:
        from metaflow import Runner

        # Execute flow
        with Runner('flow.py').run(alpha=0.01) as running:
            run = running.wait()
            print(f"Run ID: {run.id}")
            print(f"Result: {run.data.result}")
    """

    def run(self, **kwargs) -> 'ExecutingRun':
        """
        Start flow execution.

        Args:
            **kwargs: Flow parameters

        Returns:
            ExecutingRun: Running flow handle

        Example:
            with Runner('flow.py').run(param1='value', param2=42) as r:
                run = r.wait()
        """

    def resume(self, run_id: str, **kwargs) -> 'ExecutingRun':
        """
        Resume failed run.

        Args:
            run_id (str): Run ID to resume
            **kwargs: Override parameters

        Example:
            with Runner('flow.py').resume('123') as r:
                run = r.wait()
        """

    def deploy(self, **kwargs):
        """
        Deploy flow to production.

        Example:
            Runner('flow.py').deploy(name='my-flow')
        """

ExecutingRun

class ExecutingRun:
    """
    Handle to running flow.

    Attributes:
        run_id (str): Run ID
        status (str): Current status

    Example:
        with Runner('flow.py').run() as running:
            print(f"Run ID: {running.run_id}")
            print(f"Status: {running.status}")

            # Wait for completion
            run = running.wait()
            if run.successful:
                print(f"Result: {run.data.result}")
    """

    def wait(self, timeout: int = None, stream: str = None) -> 'Run':
        """
        Wait for completion.

        Args:
            timeout (int): Timeout in seconds
            stream (str): Stream logs ('stdout', 'stderr', None)

        Returns:
            Run: Completed run object

        Example:
            # Wait indefinitely
            run = running.wait()

            # Wait with timeout
            try:
                run = running.wait(timeout=3600)  # 1 hour
            except TimeoutError:
                print("Flow still running")

            # Stream logs
            run = running.wait(stream='stdout')
        """

    def kill(self):
        """Terminate execution"""

Complete Examples

Client API - Data Analysis

from metaflow import Flow
import pandas as pd

# Compare runs
flow = Flow('ExperimentFlow')
results = []

for run in flow.runs():
    if run.successful:
        results.append({
            'run_id': run.id,
            'accuracy': run.data.accuracy,
            'learning_rate': run.data.learning_rate,
            'timestamp': run.finished_at
        })

df = pd.DataFrame(results)
best_run = df.loc[df['accuracy'].idxmax()]
print(f"Best run: {best_run['run_id']} with accuracy {best_run['accuracy']}")

# Load best model
run = flow[best_run['run_id']]
model = run.data.model

Client API - Artifact Inspection

from metaflow import Flow

flow = Flow('DataPipeline')
run = flow.latest_run

# Inspect all artifacts
for step in run:
    print(f"\nStep: {step.name}")

    # Handle foreach steps
    if step.name == 'process':
        for task in step.tasks():
            for artifact in task.artifacts():
                print(f"  Task {task.id}: {artifact.name} = {artifact.data}")
    else:
        task = step.task
        for artifact in task.artifacts():
            print(f"  {artifact.name} = {artifact.data}")

Client API - Error Analysis

from metaflow import Flow

flow = Flow('ProcessingFlow')

failed_runs = []
for run in flow.runs():
    if run.finished and not run.successful:
        failed_runs.append(run)

for run in failed_runs:
    print(f"\nRun {run.id} failed:")
    for step in run:
        for task in step.tasks():
            if not task.successful and task.exception:
                print(f"  Step {step.name}: {task.exception}")

Runner API - Batch Execution

from metaflow import Runner
import time

# Execute multiple experiments
experiments = [
    {'learning_rate': 0.001, 'batch_size': 32},
    {'learning_rate': 0.01, 'batch_size': 64},
    {'learning_rate': 0.1, 'batch_size': 128},
]

for exp in experiments:
    with Runner('train.py').run(**exp) as running:
        print(f"Started run {running.run_id} with {exp}")
        time.sleep(1)  # Stagger starts

# Wait and collect results
time.sleep(3600)  # Wait for completion

from metaflow import Flow
flow = Flow('TrainingFlow')
for run in flow.runs():
    if run.successful:
        print(f"Run {run.id}: acc={run.data.accuracy:.2%}, lr={run.data.learning_rate}")

Runner API - Conditional Resume

from metaflow import Runner, Flow

flow = Flow('LongPipeline')
latest = flow.latest_run

if not latest.successful and not latest.finished:
    # Resume interrupted run
    print(f"Resuming run {latest.id}")
    with Runner('flow.py').resume(latest.id) as running:
        run = running.wait(stream='stdout')
elif not latest.successful:
    # Restart failed run
    print("Starting new run")
    with Runner('flow.py').run() as running:
        run = running.wait(stream='stdout')
else:
    print(f"Latest run {latest.id} succeeded")
    run = latest

print(f"Result: {run.data.result}")

Runner API - Automated Testing

from metaflow import Runner
import unittest

class TestMyFlow(unittest.TestCase):
    def test_flow_success(self):
        with Runner('myflow.py').run(test_mode=True) as running:
            run = running.wait(timeout=300)

            self.assertTrue(run.successful)
            self.assertIsNotNone(run.data.result)
            self.assertGreater(run.data.result, 0)

    def test_flow_with_invalid_data(self):
        with Runner('myflow.py').run(data_path='invalid.csv') as running:
            run = running.wait(timeout=300)

            # Should handle error gracefully
            self.assertTrue(run.successful)
            self.assertEqual(run.data.processed_count, 0)

if __name__ == '__main__':
    unittest.main()

Runner API - Deployment Pipeline

from metaflow import Runner, Flow

def deploy_if_improved(flow_name, metric_threshold):
    """Deploy new model if it beats threshold"""

    # Train new model
    with Runner('train.py').run() as running:
        run = running.wait(stream='stdout')

    if not run.successful:
        print("Training failed")
        return False

    new_metric = run.data.accuracy
    print(f"New model accuracy: {new_metric:.2%}")

    # Compare with production
    flow = Flow(flow_name)
    prod_runs = list(flow.runs('production'))

    if prod_runs:
        current_metric = prod_runs[0].data.accuracy
        print(f"Current production accuracy: {current_metric:.2%}")

        if new_metric > current_metric and new_metric > metric_threshold:
            print("Deploying improved model")
            run.add_tag('production')
            Runner('serve.py').deploy(model_run_id=run.id)
            return True
    else:
        if new_metric > metric_threshold:
            print("Deploying first model")
            run.add_tag('production')
            Runner('serve.py').deploy(model_run_id=run.id)
            return True

    print("Model did not meet deployment criteria")
    return False

# Run deployment check
deploy_if_improved('ModelTraining', metric_threshold=0.90)