or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-advanced.mdapi-client.mdapi-compute.mdapi-core.mdapi-data.mdindex.md
tile.json

api-data.mddocs/

Data & Visualization

S3 Data Access

S3 Client

from metaflow import S3

with S3() as s3:
    # Download
    data = s3.get('s3://bucket/file.pkl')

    # Upload
    s3.put('s3://bucket/output.pkl', my_data)

    # List files
    files = s3.list_paths(['s3://bucket/prefix/'])

    # Metadata
    info = s3.info('s3://bucket/file.pkl')

S3 Methods

class S3:
    """S3 client (use as context manager)"""

    def get(self, key: str, return_missing: bool = False,
            return_info: bool = True):
        """
        Download object.

        Returns:
            S3Object with .blob (bytes), .text (str), .path (str) attributes
        """

    def get_many(self, keys: list, return_missing: bool = False,
                 return_info: bool = True) -> list:
        """Download multiple objects in parallel"""

    def put(self, key: str, obj, overwrite: bool = True):
        """Upload object"""

    def put_many(self, key_obj_pairs: list, overwrite: bool = True):
        """Upload multiple objects in parallel"""

    def list_paths(self, paths: list) -> list:
        """List objects under prefixes"""

    def info(self, key: str) -> dict:
        """
        Get object metadata.

        Returns dict with: content_length, last_modified, content_type, etag
        """

    def get_recursive(self, path: str) -> list:
        """Download all objects under prefix"""

    def put_files(self, key_path_pairs: list):
        """Upload local files to S3"""

S3 Examples

from metaflow import FlowSpec, step, S3

class S3Flow(FlowSpec):
    @step
    def start(self):
        with S3() as s3:
            # List data files
            self.files = s3.list_paths(['s3://bucket/data/'])
        self.next(self.load, foreach='files')

    @step
    def load(self):
        with S3() as s3:
            self.data = s3.get(self.input)
        self.next(self.join)

    @step
    def join(self, inputs):
        # Combine and save
        all_data = [i.data for i in inputs]

        with S3() as s3:
            s3.put('s3://bucket/combined.pkl', all_data)

        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    S3Flow()

Batch S3 Operations

with S3() as s3:
    # Parallel downloads
    files = ['s3://bucket/file1.csv', 's3://bucket/file2.csv', 's3://bucket/file3.csv']
    data_list = s3.get_many(files)

    # Parallel uploads
    uploads = [
        ('s3://bucket/out1.pkl', result1),
        ('s3://bucket/out2.pkl', result2),
        ('s3://bucket/out3.pkl', result3)
    ]
    s3.put_many(uploads)

    # Check metadata before download
    info = s3.info('s3://bucket/large-file.bin')
    if info['content_length'] < 1e9:  # < 1GB
        data = s3.get('s3://bucket/large-file.bin')

Local Filesystem

from metaflow.datatools import Local

with Local() as local:
    data = local.get('/path/to/file.txt')
    local.put('/path/to/output.txt', my_data)
    files = local.list_paths(['/path/to/dir/'])

Visualization Cards

@card Decorator

@card(type: str = 'default', id: str = None, options: dict = None,
      timeout: int = None, refresh_interval: int = None)
"""
Attach visualization card to step.

Args:
    type (str): 'default', 'blank', or custom type
    id (str): Card ID for multiple cards per step
    refresh_interval (int): Refresh interval (seconds) for real-time updates

Example:
    @card(type='blank')
    @step
    def train(self):
        from metaflow import current
        from metaflow.cards import Markdown, Table

        current.card.append(Markdown("# Training Results"))
        current.card.append(Table([['Metric', 'Value'], ['Acc', '0.95']]))
        self.next(self.end)
"""

Card Components

from metaflow.cards import Markdown, Table, Image, VegaChart, Artifact, Error

# Text
Markdown(text: str)

# Table
Table(data: list, headers: list = None)
Table.from_dataframe(df)

# Image
Image(src, label: str = None)  # src = path, bytes, URL, or PIL Image

# Chart
VegaChart(spec: dict)  # Vega/Vega-Lite spec

# Artifact display
Artifact(artifact, name: str = None, compressed: bool = False)

# Error message
Error(error: str)

Using Cards

from metaflow import FlowSpec, step, card, current
from metaflow.cards import Markdown, Table, Image, VegaChart

class VisualizationFlow(FlowSpec):
    @card(type='blank')
    @step
    def analyze(self):
        # Title
        current.card.append(Markdown("# Analysis Results"))

        # Metrics table
        metrics = [
            ['Metric', 'Value'],
            ['Accuracy', '95.3%'],
            ['F1 Score', '0.94']
        ]
        current.card.append(Table(metrics))

        # Plot
        spec = {
            "$schema": "https://vega.github.io/schema/vega-lite/v5.json",
            "data": {"values": [{"x": 1, "y": 2}, {"x": 2, "y": 4}]},
            "mark": "line",
            "encoding": {
                "x": {"field": "x", "type": "quantitative"},
                "y": {"field": "y", "type": "quantitative"}
            }
        }
        current.card.append(VegaChart(spec))

        # Image
        import matplotlib.pyplot as plt
        fig, ax = plt.subplots()
        ax.plot([1, 2, 3], [1, 4, 9])
        fig.savefig('plot.png')
        plt.close()
        current.card.append(Image('plot.png', label='Training Curve'))

        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    VisualizationFlow()

Multiple Cards

@card(type='blank', id='metrics')
@card(type='blank', id='plots')
@step
def analyze(self):
    from metaflow import current
    from metaflow.cards import Markdown

    # Add to different cards
    current.card['metrics'].append(Markdown("## Metrics"))
    current.card['plots'].append(Markdown("## Plots"))

Real-time Cards

@card(type='blank', refresh_interval=5)
@step
def train(self):
    from metaflow import current
    from metaflow.cards import Markdown
    import time

    for epoch in range(10):
        loss = 1.0 / (epoch + 1)
        # Card updates every 5 seconds
        current.card.append(Markdown(f"Epoch {epoch}: loss={loss:.4f}"))
        time.sleep(1)

    self.next(self.end)

Custom Card Types

from metaflow.cards import MetaflowCard

class CustomCard(MetaflowCard):
    """Custom card renderer"""

    def render(self, task):
        """
        Render card HTML.

        Args:
            task: Task object with .data attribute for artifacts

        Returns:
            str: HTML content
        """
        metrics = task.data.metrics
        html = f"<h1>Results</h1><p>Accuracy: {metrics['acc']}</p>"
        return html

# Use custom card
@card(type='custom')
@step
def train(self):
    self.metrics = {'acc': 0.95}
    self.next(self.end)

Accessing Cards

from metaflow import Flow
from metaflow.cards import get_cards

flow = Flow('MyFlow')
run = flow.latest_run
task = run['analyze'].task

cards = get_cards(task)
for card in cards:
    print(f"Card {card.id}: {card.type}")
    with open(f'{card.id}.html', 'w') as f:
        f.write(card.html)

Complete Data Pipeline Example

from metaflow import FlowSpec, step, batch, resources, card, current, S3
from metaflow.cards import Markdown, Table, VegaChart
from datetime import datetime

class DataPipeline(FlowSpec):
    @step
    def start(self):
        with S3() as s3:
            # Find recent files
            all_files = s3.list_paths(['s3://data/raw/'])
            self.files = []

            cutoff = datetime.now().timestamp() - 86400  # Last 24h
            for path in all_files:
                info = s3.info(path)
                if info['last_modified'].timestamp() > cutoff:
                    self.files.append(path)

        print(f"Processing {len(self.files)} files")
        self.next(self.process, foreach='files')

    @batch
    @resources(cpu=4, memory=16000)
    @step
    def process(self):
        with S3() as s3:
            # Load and process
            data = s3.get(self.input)
            self.result = self.transform(data)

            # Save result
            output_path = self.input.replace('/raw/', '/processed/')
            s3.put(output_path, self.result)

        self.processed_path = output_path
        self.record_count = len(self.result)
        self.next(self.join)

    @card(type='blank')
    @step
    def join(self, inputs):
        # Summary stats
        total_records = sum(i.record_count for i in inputs)
        processed_files = [i.processed_path for i in inputs]

        # Create report card
        current.card.append(Markdown("# Processing Report"))

        stats_table = [
            ['Metric', 'Value'],
            ['Files Processed', str(len(inputs))],
            ['Total Records', f'{total_records:,}'],
            ['Timestamp', datetime.now().isoformat()]
        ]
        current.card.append(Table(stats_table))

        # File sizes chart
        with S3() as s3:
            sizes = []
            for path in processed_files[:10]:  # Sample
                info = s3.info(path)
                sizes.append({
                    'file': path.split('/')[-1],
                    'size_mb': info['content_length'] / 1024 / 1024
                })

        chart_spec = {
            "$schema": "https://vega.github.io/schema/vega-lite/v5.json",
            "data": {"values": sizes},
            "mark": "bar",
            "encoding": {
                "x": {"field": "file", "type": "nominal"},
                "y": {"field": "size_mb", "type": "quantitative"}
            }
        }
        current.card.append(Markdown("## File Sizes"))
        current.card.append(VegaChart(chart_spec))

        # Save summary
        with S3() as s3:
            summary = {
                'timestamp': datetime.now().isoformat(),
                'files': len(inputs),
                'records': total_records,
                'paths': processed_files
            }
            s3.put('s3://data/summary/latest.json', summary)

        self.next(self.end)

    @step
    def end(self):
        print("Pipeline complete")

    def transform(self, data):
        # Transformation logic
        return data

if __name__ == '__main__':
    DataPipeline()

Card Best Practices

# Use blank cards for custom content
@card(type='blank')
@step
def step1(self):
    pass

# Use default for automatic artifact display
@card  # or @card(type='default')
@step
def step2(self):
    pass

# Multiple cards for organization
@card(type='blank', id='training')
@card(type='blank', id='validation')
@step
def train(self):
    current.card['training'].append(Markdown("## Training"))
    current.card['validation'].append(Markdown("## Validation"))

# Real-time monitoring
@card(type='blank', refresh_interval=10)
@step
def long_running(self):
    # Updates visible every 10 seconds
    for i in range(100):
        current.card.append(Markdown(f"Progress: {i}%"))
        time.sleep(5)