from metaflow import S3
with S3() as s3:
# Download
data = s3.get('s3://bucket/file.pkl')
# Upload
s3.put('s3://bucket/output.pkl', my_data)
# List files
files = s3.list_paths(['s3://bucket/prefix/'])
# Metadata
info = s3.info('s3://bucket/file.pkl')class S3:
"""S3 client (use as context manager)"""
def get(self, key: str, return_missing: bool = False,
return_info: bool = True):
"""
Download object.
Returns:
S3Object with .blob (bytes), .text (str), .path (str) attributes
"""
def get_many(self, keys: list, return_missing: bool = False,
return_info: bool = True) -> list:
"""Download multiple objects in parallel"""
def put(self, key: str, obj, overwrite: bool = True):
"""Upload object"""
def put_many(self, key_obj_pairs: list, overwrite: bool = True):
"""Upload multiple objects in parallel"""
def list_paths(self, paths: list) -> list:
"""List objects under prefixes"""
def info(self, key: str) -> dict:
"""
Get object metadata.
Returns dict with: content_length, last_modified, content_type, etag
"""
def get_recursive(self, path: str) -> list:
"""Download all objects under prefix"""
def put_files(self, key_path_pairs: list):
"""Upload local files to S3"""from metaflow import FlowSpec, step, S3
class S3Flow(FlowSpec):
@step
def start(self):
with S3() as s3:
# List data files
self.files = s3.list_paths(['s3://bucket/data/'])
self.next(self.load, foreach='files')
@step
def load(self):
with S3() as s3:
self.data = s3.get(self.input)
self.next(self.join)
@step
def join(self, inputs):
# Combine and save
all_data = [i.data for i in inputs]
with S3() as s3:
s3.put('s3://bucket/combined.pkl', all_data)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
S3Flow()with S3() as s3:
# Parallel downloads
files = ['s3://bucket/file1.csv', 's3://bucket/file2.csv', 's3://bucket/file3.csv']
data_list = s3.get_many(files)
# Parallel uploads
uploads = [
('s3://bucket/out1.pkl', result1),
('s3://bucket/out2.pkl', result2),
('s3://bucket/out3.pkl', result3)
]
s3.put_many(uploads)
# Check metadata before download
info = s3.info('s3://bucket/large-file.bin')
if info['content_length'] < 1e9: # < 1GB
data = s3.get('s3://bucket/large-file.bin')from metaflow.datatools import Local
with Local() as local:
data = local.get('/path/to/file.txt')
local.put('/path/to/output.txt', my_data)
files = local.list_paths(['/path/to/dir/'])@card(type: str = 'default', id: str = None, options: dict = None,
timeout: int = None, refresh_interval: int = None)
"""
Attach visualization card to step.
Args:
type (str): 'default', 'blank', or custom type
id (str): Card ID for multiple cards per step
refresh_interval (int): Refresh interval (seconds) for real-time updates
Example:
@card(type='blank')
@step
def train(self):
from metaflow import current
from metaflow.cards import Markdown, Table
current.card.append(Markdown("# Training Results"))
current.card.append(Table([['Metric', 'Value'], ['Acc', '0.95']]))
self.next(self.end)
"""from metaflow.cards import Markdown, Table, Image, VegaChart, Artifact, Error
# Text
Markdown(text: str)
# Table
Table(data: list, headers: list = None)
Table.from_dataframe(df)
# Image
Image(src, label: str = None) # src = path, bytes, URL, or PIL Image
# Chart
VegaChart(spec: dict) # Vega/Vega-Lite spec
# Artifact display
Artifact(artifact, name: str = None, compressed: bool = False)
# Error message
Error(error: str)from metaflow import FlowSpec, step, card, current
from metaflow.cards import Markdown, Table, Image, VegaChart
class VisualizationFlow(FlowSpec):
@card(type='blank')
@step
def analyze(self):
# Title
current.card.append(Markdown("# Analysis Results"))
# Metrics table
metrics = [
['Metric', 'Value'],
['Accuracy', '95.3%'],
['F1 Score', '0.94']
]
current.card.append(Table(metrics))
# Plot
spec = {
"$schema": "https://vega.github.io/schema/vega-lite/v5.json",
"data": {"values": [{"x": 1, "y": 2}, {"x": 2, "y": 4}]},
"mark": "line",
"encoding": {
"x": {"field": "x", "type": "quantitative"},
"y": {"field": "y", "type": "quantitative"}
}
}
current.card.append(VegaChart(spec))
# Image
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot([1, 2, 3], [1, 4, 9])
fig.savefig('plot.png')
plt.close()
current.card.append(Image('plot.png', label='Training Curve'))
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
VisualizationFlow()@card(type='blank', id='metrics')
@card(type='blank', id='plots')
@step
def analyze(self):
from metaflow import current
from metaflow.cards import Markdown
# Add to different cards
current.card['metrics'].append(Markdown("## Metrics"))
current.card['plots'].append(Markdown("## Plots"))@card(type='blank', refresh_interval=5)
@step
def train(self):
from metaflow import current
from metaflow.cards import Markdown
import time
for epoch in range(10):
loss = 1.0 / (epoch + 1)
# Card updates every 5 seconds
current.card.append(Markdown(f"Epoch {epoch}: loss={loss:.4f}"))
time.sleep(1)
self.next(self.end)from metaflow.cards import MetaflowCard
class CustomCard(MetaflowCard):
"""Custom card renderer"""
def render(self, task):
"""
Render card HTML.
Args:
task: Task object with .data attribute for artifacts
Returns:
str: HTML content
"""
metrics = task.data.metrics
html = f"<h1>Results</h1><p>Accuracy: {metrics['acc']}</p>"
return html
# Use custom card
@card(type='custom')
@step
def train(self):
self.metrics = {'acc': 0.95}
self.next(self.end)from metaflow import Flow
from metaflow.cards import get_cards
flow = Flow('MyFlow')
run = flow.latest_run
task = run['analyze'].task
cards = get_cards(task)
for card in cards:
print(f"Card {card.id}: {card.type}")
with open(f'{card.id}.html', 'w') as f:
f.write(card.html)from metaflow import FlowSpec, step, batch, resources, card, current, S3
from metaflow.cards import Markdown, Table, VegaChart
from datetime import datetime
class DataPipeline(FlowSpec):
@step
def start(self):
with S3() as s3:
# Find recent files
all_files = s3.list_paths(['s3://data/raw/'])
self.files = []
cutoff = datetime.now().timestamp() - 86400 # Last 24h
for path in all_files:
info = s3.info(path)
if info['last_modified'].timestamp() > cutoff:
self.files.append(path)
print(f"Processing {len(self.files)} files")
self.next(self.process, foreach='files')
@batch
@resources(cpu=4, memory=16000)
@step
def process(self):
with S3() as s3:
# Load and process
data = s3.get(self.input)
self.result = self.transform(data)
# Save result
output_path = self.input.replace('/raw/', '/processed/')
s3.put(output_path, self.result)
self.processed_path = output_path
self.record_count = len(self.result)
self.next(self.join)
@card(type='blank')
@step
def join(self, inputs):
# Summary stats
total_records = sum(i.record_count for i in inputs)
processed_files = [i.processed_path for i in inputs]
# Create report card
current.card.append(Markdown("# Processing Report"))
stats_table = [
['Metric', 'Value'],
['Files Processed', str(len(inputs))],
['Total Records', f'{total_records:,}'],
['Timestamp', datetime.now().isoformat()]
]
current.card.append(Table(stats_table))
# File sizes chart
with S3() as s3:
sizes = []
for path in processed_files[:10]: # Sample
info = s3.info(path)
sizes.append({
'file': path.split('/')[-1],
'size_mb': info['content_length'] / 1024 / 1024
})
chart_spec = {
"$schema": "https://vega.github.io/schema/vega-lite/v5.json",
"data": {"values": sizes},
"mark": "bar",
"encoding": {
"x": {"field": "file", "type": "nominal"},
"y": {"field": "size_mb", "type": "quantitative"}
}
}
current.card.append(Markdown("## File Sizes"))
current.card.append(VegaChart(chart_spec))
# Save summary
with S3() as s3:
summary = {
'timestamp': datetime.now().isoformat(),
'files': len(inputs),
'records': total_records,
'paths': processed_files
}
s3.put('s3://data/summary/latest.json', summary)
self.next(self.end)
@step
def end(self):
print("Pipeline complete")
def transform(self, data):
# Transformation logic
return data
if __name__ == '__main__':
DataPipeline()# Use blank cards for custom content
@card(type='blank')
@step
def step1(self):
pass
# Use default for automatic artifact display
@card # or @card(type='default')
@step
def step2(self):
pass
# Multiple cards for organization
@card(type='blank', id='training')
@card(type='blank', id='validation')
@step
def train(self):
current.card['training'].append(Markdown("## Training"))
current.card['validation'].append(Markdown("## Validation"))
# Real-time monitoring
@card(type='blank', refresh_interval=10)
@step
def long_running(self):
# Updates visible every 10 seconds
for i in range(100):
current.card.append(Markdown(f"Progress: {i}%"))
time.sleep(5)