class Flow:
"""
Access a flow and its runs.
Args:
name (str): Flow name
Attributes:
latest_run: Most recent run
latest_successful_run: Most recent successful run
runs: Iterator over all runs
Example:
from metaflow import Flow
flow = Flow('MyFlow')
run = flow.latest_run
print(f"Run ID: {run.id}")
# Iterate over runs
for run in flow.runs():
print(run.id)
"""
def __getitem__(self, run_id: str) -> 'Run':
"""Get run by ID: flow['123']"""class Run:
"""
Represents a flow run.
Args:
pathspec (str): 'FlowName/RunID'
Attributes:
id (str): Run ID
created_at (datetime): Creation time
finished_at (datetime): Completion time
successful (bool): Whether run succeeded
finished (bool): Whether run completed
data: Access artifacts from final step
tags (set): Run tags
Example:
from metaflow import Flow, Run
# Via Flow
flow = Flow('MyFlow')
run = flow.latest_run
# Direct access
run = Run('MyFlow/123')
# Access data
result = run.data.result
model = run.data.model
# Check status
if run.successful:
print("Run succeeded")
# Access steps
for step in run:
print(step.name)
"""
def __getitem__(self, step_name: str) -> 'Step':
"""Get step: run['train']"""
def add_tag(self, tag: str):
"""Add tag to run"""
def add_tags(self, tags: list):
"""Add multiple tags"""
def remove_tag(self, tag: str):
"""Remove tag"""
def replace_tag(self, old: str, new: str):
"""Replace tag"""class Step:
"""
Represents a workflow step.
Attributes:
name (str): Step name
finished_at (datetime): Completion time
task: Single task (for non-foreach steps)
tasks: Iterator over all tasks
Example:
run = flow.latest_run
step = run['train']
# Single task
task = step.task
model = task.data.model
# Multiple tasks (foreach)
for task in step.tasks():
print(task.data.result)
"""
def __getitem__(self, task_id: str) -> 'Task':
"""Get task: step['456']"""class Task:
"""
Represents a step task (single execution).
Attributes:
id (str): Task ID
pathspec (str): Full path
finished (bool): Whether completed
successful (bool): Whether succeeded
finished_at (datetime): Completion time
metadata_dict (dict): Task metadata
exception (Exception): Exception if failed
data: Access artifacts
artifacts: Iterator over artifacts
parent (Task): Parent task
Example:
task = step.task
# Access artifacts
result = task.data.result
model = task.data.model
# List all artifacts
for artifact in task.artifacts():
print(f"{artifact.name}: {artifact.data}")
# Check success
if not task.successful:
print(f"Failed: {task.exception}")
"""
def __getitem__(self, artifact_name: str):
"""Get artifact: task['result']"""class DataArtifact:
"""
Represents an artifact object.
Attributes:
name (str): Artifact name
data: Artifact value
sha (str): Content hash
Example:
artifact = task['model']
print(f"Name: {artifact.name}")
model = artifact.data # Access value
"""from metaflow import Flow
# Get latest data
flow = Flow('MyFlow')
result = flow.latest_run.data.result
# Get specific run
run = Flow('MyFlow')['123']
data = run.data
# Access step/task data
run = flow.latest_run
step = run['train']
task = step.task
model = task.data.model
# Foreach step data
step = run['process']
results = [task.data.result for task in step.tasks()]
# Filter runs by tag
for run in flow.runs('production'):
print(run.id)
# Find successful runs
for run in flow:
if run.successful:
print(run.data.metrics)class Runner:
"""
Execute flows programmatically.
Args:
flow (str): Path to flow file
Example:
from metaflow import Runner
# Execute flow
with Runner('flow.py').run(alpha=0.01) as running:
run = running.wait()
print(f"Run ID: {run.id}")
print(f"Result: {run.data.result}")
"""
def run(self, **kwargs) -> 'ExecutingRun':
"""
Start flow execution.
Args:
**kwargs: Flow parameters
Returns:
ExecutingRun: Running flow handle
Example:
with Runner('flow.py').run(param1='value', param2=42) as r:
run = r.wait()
"""
def resume(self, run_id: str, **kwargs) -> 'ExecutingRun':
"""
Resume failed run.
Args:
run_id (str): Run ID to resume
**kwargs: Override parameters
Example:
with Runner('flow.py').resume('123') as r:
run = r.wait()
"""
def deploy(self, **kwargs):
"""
Deploy flow to production.
Example:
Runner('flow.py').deploy(name='my-flow')
"""class ExecutingRun:
"""
Handle to running flow.
Attributes:
run_id (str): Run ID
status (str): Current status
Example:
with Runner('flow.py').run() as running:
print(f"Run ID: {running.run_id}")
print(f"Status: {running.status}")
# Wait for completion
run = running.wait()
if run.successful:
print(f"Result: {run.data.result}")
"""
def wait(self, timeout: int = None, stream: str = None) -> 'Run':
"""
Wait for completion.
Args:
timeout (int): Timeout in seconds
stream (str): Stream logs ('stdout', 'stderr', None)
Returns:
Run: Completed run object
Example:
# Wait indefinitely
run = running.wait()
# Wait with timeout
try:
run = running.wait(timeout=3600) # 1 hour
except TimeoutError:
print("Flow still running")
# Stream logs
run = running.wait(stream='stdout')
"""
def kill(self):
"""Terminate execution"""from metaflow import Flow
import pandas as pd
# Compare runs
flow = Flow('ExperimentFlow')
results = []
for run in flow.runs():
if run.successful:
results.append({
'run_id': run.id,
'accuracy': run.data.accuracy,
'learning_rate': run.data.learning_rate,
'timestamp': run.finished_at
})
df = pd.DataFrame(results)
best_run = df.loc[df['accuracy'].idxmax()]
print(f"Best run: {best_run['run_id']} with accuracy {best_run['accuracy']}")
# Load best model
run = flow[best_run['run_id']]
model = run.data.modelfrom metaflow import Flow
flow = Flow('DataPipeline')
run = flow.latest_run
# Inspect all artifacts
for step in run:
print(f"\nStep: {step.name}")
# Handle foreach steps
if step.name == 'process':
for task in step.tasks():
for artifact in task.artifacts():
print(f" Task {task.id}: {artifact.name} = {artifact.data}")
else:
task = step.task
for artifact in task.artifacts():
print(f" {artifact.name} = {artifact.data}")from metaflow import Flow
flow = Flow('ProcessingFlow')
failed_runs = []
for run in flow.runs():
if run.finished and not run.successful:
failed_runs.append(run)
for run in failed_runs:
print(f"\nRun {run.id} failed:")
for step in run:
for task in step.tasks():
if not task.successful and task.exception:
print(f" Step {step.name}: {task.exception}")from metaflow import Runner
import time
# Execute multiple experiments
experiments = [
{'learning_rate': 0.001, 'batch_size': 32},
{'learning_rate': 0.01, 'batch_size': 64},
{'learning_rate': 0.1, 'batch_size': 128},
]
for exp in experiments:
with Runner('train.py').run(**exp) as running:
print(f"Started run {running.run_id} with {exp}")
time.sleep(1) # Stagger starts
# Wait and collect results
time.sleep(3600) # Wait for completion
from metaflow import Flow
flow = Flow('TrainingFlow')
for run in flow.runs():
if run.successful:
print(f"Run {run.id}: acc={run.data.accuracy:.2%}, lr={run.data.learning_rate}")from metaflow import Runner, Flow
flow = Flow('LongPipeline')
latest = flow.latest_run
if not latest.successful and not latest.finished:
# Resume interrupted run
print(f"Resuming run {latest.id}")
with Runner('flow.py').resume(latest.id) as running:
run = running.wait(stream='stdout')
elif not latest.successful:
# Restart failed run
print("Starting new run")
with Runner('flow.py').run() as running:
run = running.wait(stream='stdout')
else:
print(f"Latest run {latest.id} succeeded")
run = latest
print(f"Result: {run.data.result}")from metaflow import Runner
import unittest
class TestMyFlow(unittest.TestCase):
def test_flow_success(self):
with Runner('myflow.py').run(test_mode=True) as running:
run = running.wait(timeout=300)
self.assertTrue(run.successful)
self.assertIsNotNone(run.data.result)
self.assertGreater(run.data.result, 0)
def test_flow_with_invalid_data(self):
with Runner('myflow.py').run(data_path='invalid.csv') as running:
run = running.wait(timeout=300)
# Should handle error gracefully
self.assertTrue(run.successful)
self.assertEqual(run.data.processed_count, 0)
if __name__ == '__main__':
unittest.main()from metaflow import Runner, Flow
def deploy_if_improved(flow_name, metric_threshold):
"""Deploy new model if it beats threshold"""
# Train new model
with Runner('train.py').run() as running:
run = running.wait(stream='stdout')
if not run.successful:
print("Training failed")
return False
new_metric = run.data.accuracy
print(f"New model accuracy: {new_metric:.2%}")
# Compare with production
flow = Flow(flow_name)
prod_runs = list(flow.runs('production'))
if prod_runs:
current_metric = prod_runs[0].data.accuracy
print(f"Current production accuracy: {current_metric:.2%}")
if new_metric > current_metric and new_metric > metric_threshold:
print("Deploying improved model")
run.add_tag('production')
Runner('serve.py').deploy(model_run_id=run.id)
return True
else:
if new_metric > metric_threshold:
print("Deploying first model")
run.add_tag('production')
Runner('serve.py').deploy(model_run_id=run.id)
return True
print("Model did not meet deployment criteria")
return False
# Run deployment check
deploy_if_improved('ModelTraining', metric_threshold=0.90)