A human-centric Python framework for building and managing production-ready AI and ML systems with built-in experiment tracking, versioning, and cloud orchestration.
npx @tessl/cli install tessl/pypi-metaflow@2.19.0Metaflow is a framework for building and managing data science workflows. Version: 2.x
# Core imports
from metaflow import FlowSpec, step, Parameter, current, namespace
from metaflow import Flow, Run, Step, Task
# Decorators
from metaflow import batch, kubernetes, resources, retry, catch, timeout
from metaflow import environment, conda, pypi, card
# Data & utilities
from metaflow import S3, parallel_map, parallel_imap_unordered, profile
from metaflow.cards import Markdown, Table, Image, VegaChart, get_cards
# Runner API
from metaflow import Runner
# Custom extensions
from metaflow import UserStepDecorator, FlowMutator, USER_SKIP_STEP
# Exceptions
from metaflow.exception import MetaflowException, MetaflowNotFound@api-core.md Core API - FlowSpec, steps, decorators, runtime context
@api-client.md Client & Runner API - Accessing and executing flows programmatically
@api-compute.md Compute & Environment - Cloud execution, resource management, dependencies
@api-data.md Data & Visualization - S3, data tools, cards
@api-advanced.md Advanced Features - Parallel processing, custom decorators, exceptions
from metaflow import FlowSpec, step, Parameter
class MyFlow(FlowSpec):
param = Parameter('name', default='value')
@step
def start(self):
self.data = [] # Artifacts persist
self.next(self.process)
@step
def process(self):
self.result = len(self.data)
self.next(self.end)
@step
def end(self):
print(f"Result: {self.result}")
if __name__ == '__main__':
MyFlow()@step
def start(self):
self.items = [1, 2, 3]
self.next(self.process, foreach='items')
@step
def process(self):
self.result = self.input * 2
self.next(self.join)
@step
def join(self, inputs):
self.results = [i.result for i in inputs]
self.next(self.end)@batch(cpu=4, memory=8000, queue='gpu')
@resources(gpu=1)
@step
def train(self):
# Runs on AWS Batch
pass@retry(times=3)
@catch(var='error')
@step
def risky_step(self):
# Retries on failure, catches errors
pass@step
def load(self):
with S3() as s3:
self.data = s3.get('s3://bucket/file.pkl')
self.next(self.end)from metaflow import Flow
flow = Flow('MyFlow')
run = flow.latest_run
data = run.data.result# Run locally
python flow.py run
# Run with parameters
python flow.py run --param value
# Run on cloud
python flow.py run --with batch
# Show flow
python flow.py show
# Resume failed run
python flow.py resume