from metaflow import parallel_map
def parallel_map(func: callable, iterable, max_parallel: int = None,
dir: str = None) -> list:
"""
Execute function across inputs in parallel with ordered results.
Args:
func: Function to apply (must be picklable)
iterable: Input items
max_parallel (int): Max workers (default: CPU count)
dir (str): Temp directory for IPC
Returns:
list: Results in same order as inputs
Example:
def process(x):
return x ** 2
results = parallel_map(process, range(100), max_parallel=8)
"""from metaflow import parallel_imap_unordered
def parallel_imap_unordered(func: callable, iterable,
max_parallel: int = None, dir: str = None):
"""
Execute function in parallel, yielding results as they complete.
Yields:
Results (order not preserved, more memory efficient)
Example:
def process(x):
return x ** 2
for result in parallel_imap_unordered(process, range(100), max_parallel=8):
print(result)
"""from metaflow import FlowSpec, step, parallel_map, S3
def process_file(path):
"""Process single file (runs in separate process)"""
with S3() as s3:
data = s3.get(path)
return len(data)
class ParallelFlow(FlowSpec):
@step
def start(self):
with S3() as s3:
self.files = s3.list_paths(['s3://bucket/data/'])
# Process all files in parallel within this step
self.sizes = parallel_map(
process_file,
self.files,
max_parallel=16
)
print(f"Total size: {sum(self.sizes)}")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
ParallelFlow()from metaflow import parallel_imap_unordered
def compute_features(record):
return {'id': record['id'], 'features': [1, 2, 3]}
# Streaming results
processed = []
for result in parallel_imap_unordered(compute_features, records, max_parallel=8):
processed.append(result)
if len(processed) % 100 == 0:
print(f"Processed {len(processed)} records")from metaflow import profile
def profile(label: str, stats_dict: dict = None):
"""
Context manager for profiling code execution.
Args:
label (str): Profile identifier
stats_dict (dict, optional): Dict to store stats
Example:
stats = {}
with profile('data_loading', stats):
data = load_data()
with profile('training', stats):
model = train(data)
for op, timing in stats.items():
print(f"{op}: {timing['duration']:.2f}s")
"""from metaflow import FlowSpec, step, profile
class ProfiledFlow(FlowSpec):
@step
def start(self):
stats = {}
with profile('load', stats):
self.data = self.load_data()
with profile('preprocess', stats):
self.processed = self.preprocess(self.data)
with profile('train', stats):
self.model = self.train(self.processed)
# Print performance
for operation, timing in stats.items():
print(f"{operation}: {timing['duration']:.2f}s")
self.profiling_stats = stats
self.next(self.end)
@step
def end(self):
# Identify bottlenecks
slowest = max(self.profiling_stats.items(),
key=lambda x: x[1]['duration'])
print(f"Slowest: {slowest[0]} ({slowest[1]['duration']:.2f}s)")
def load_data(self):
return []
def preprocess(self, data):
return data
def train(self, data):
return "model"
if __name__ == '__main__':
ProfiledFlow()from metaflow import UserStepDecorator
class UserStepDecorator:
"""
Base class for custom step decorators.
Override:
pre_mutate(mutable_step): Modify step before execution
mutate(mutable_step): Modify step during execution (return USER_SKIP_STEP to skip)
Example:
class TimingDecorator(UserStepDecorator):
def pre_mutate(self, mutable_step):
mutable_step.add_init_code('import time; _start = time.time()')
mutable_step.add_cleanup_code('print(f"Duration: {time.time() - _start:.2f}s")')
class MyFlow(FlowSpec):
@TimingDecorator
@step
def start(self):
self.next(self.end)
"""
def pre_mutate(self, mutable_step):
"""Modify step before execution"""
pass
def mutate(self, mutable_step):
"""Modify step during execution"""
passfrom metaflow import FlowMutator
class FlowMutator:
"""
Base class for custom flow decorators.
Override:
pre_mutate(mutable_flow): Modify flow before execution
Example:
class LoggingFlowDecorator(FlowMutator):
def pre_mutate(self, mutable_flow):
for step in mutable_flow.steps:
step.add_init_code('print(f"Starting {self.name}")')
@LoggingFlowDecorator
class MyFlow(FlowSpec):
pass
"""
def pre_mutate(self, mutable_flow):
"""Modify flow before execution"""
passfrom metaflow import USER_SKIP_STEP
"""
Return from decorator's mutate() to skip step execution.
Example:
class ConditionalDecorator(UserStepDecorator):
def __init__(self, condition_attr):
self.condition_attr = condition_attr
def mutate(self, mutable_step):
flow = mutable_step.flow
if hasattr(flow, self.condition_attr) and getattr(flow, self.condition_attr):
return USER_SKIP_STEP
class MyFlow(FlowSpec):
skip_training = Parameter('skip', type=bool, default=False)
@ConditionalDecorator('skip_training')
@step
def train(self):
# Skipped if skip_training=True
pass
"""from metaflow import UserStepDecorator, FlowSpec, step
class ValidateDecorator(UserStepDecorator):
"""Validate required attributes exist"""
def __init__(self, *required_attrs):
self.required_attrs = required_attrs
def mutate(self, mutable_step):
flow = mutable_step.flow
missing = [attr for attr in self.required_attrs
if not hasattr(flow, attr)]
if missing:
raise ValueError(f"Missing required attributes: {missing}")
class MyFlow(FlowSpec):
@step
def start(self):
self.data = [1, 2, 3]
self.config = {'param': 'value'}
self.next(self.process)
@ValidateDecorator('data', 'config')
@step
def process(self):
# Validated that data and config exist
print(f"Processing {len(self.data)} items")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
MyFlow()from metaflow import UserStepDecorator
class NotifyDecorator(UserStepDecorator):
"""Send notification on completion"""
def __init__(self, channel='slack'):
self.channel = channel
def pre_mutate(self, mutable_step):
notify_code = f'''
try:
print(f"Step {{self.name}} completed - notify {self.channel}")
except Exception as e:
print(f"Notification failed: {{e}}")
'''
mutable_step.add_cleanup_code(notify_code)from metaflow import FlowMutator
class TimingFlowDecorator(FlowMutator):
"""Add timing to all steps"""
def pre_mutate(self, mutable_flow):
for step in mutable_flow.steps:
step.add_init_code('import time; _start = time.time()')
step.add_cleanup_code('print(f"{self.name}: {time.time()-_start:.2f}s")')
@TimingFlowDecorator
class MyFlow(FlowSpec):
# All steps automatically get timing
passfrom metaflow.exception import (
MetaflowException, # Base exception
MetaflowNotFound, # Object not found
MetaflowNamespaceMismatch, # Namespace mismatch
MetaflowInvalidPathspec, # Invalid pathspec
MetaflowInternalError, # Internal error
UnhandledInMergeArtifactsException, # Merge conflict
MissingInMergeArtifactsException # Missing required artifacts
)
# Catch specific exceptions
try:
flow = Flow('MyFlow')
run = flow.latest_run
except MetaflowNotFound:
print("Flow or run not found")
except MetaflowException as e:
print(f"Metaflow error: {e}")from metaflow import Flow
from metaflow.exception import MetaflowNotFound, MetaflowException
def get_latest_model(flow_name):
"""Safely get latest model"""
try:
flow = Flow(flow_name)
run = flow.latest_successful_run
if hasattr(run.data, 'model'):
return run.data.model
else:
print("No model artifact found")
return None
except MetaflowNotFound:
print(f"Flow '{flow_name}' not found")
return None
except MetaflowException as e:
print(f"Metaflow error: {e}")
return None
model = get_latest_model('TrainingFlow')from metaflow import FlowSpec, step
from metaflow.exception import UnhandledInMergeArtifactsException
class FlexibleMerge(FlowSpec):
@step
def start(self):
self.items = [1, 2, 3, 4]
self.next(self.process, foreach='items')
@step
def process(self):
self.item_id = self.input
# Only create 'result' for even numbers
if self.input % 2 == 0:
self.result = self.input * 2
self.next(self.join)
@step
def join(self, inputs):
try:
self.merge_artifacts(inputs, exclude=['result'])
except UnhandledInMergeArtifactsException as e:
print(f"Expected merge conflict: {e}")
# Handle varying artifacts manually
self.results = {i.item_id: i.result for i in inputs
if hasattr(i, 'result')}
self.next(self.end)
@step
def end(self):
print(f"Results: {self.results}")
if __name__ == '__main__':
FlexibleMerge()from metaflow import FlowSpec, step, batch, resources
from metaflow import parallel_map, profile, UserStepDecorator, Parameter
from metaflow import S3
from metaflow.exception import MetaflowException
class MonitorDecorator(UserStepDecorator):
"""Monitor resource usage"""
def pre_mutate(self, mutable_step):
mutable_step.add_init_code('''
import psutil
import os
_proc = psutil.Process(os.getpid())
_start_mem = _proc.memory_info().rss / 1024 / 1024
''')
mutable_step.add_cleanup_code('''
_end_mem = _proc.memory_info().rss / 1024 / 1024
print(f"Memory delta: {_end_mem - _start_mem:.1f} MB")
''')
def process_chunk(chunk):
"""Process data chunk in parallel"""
return sum(chunk)
class AdvancedPipeline(FlowSpec):
batch_size = Parameter('batch_size', default=1000, type=int)
@step
def start(self):
stats = {}
with profile('data_loading', stats):
with S3() as s3:
self.files = s3.list_paths(['s3://bucket/data/'])
print(f"Loaded {len(self.files)} files in {stats['data_loading']['duration']:.2f}s")
self.next(self.process)
@batch
@resources(cpu=16, memory=64000)
@MonitorDecorator
@step
def process(self):
stats = {}
# Load data
with profile('download', stats):
with S3() as s3:
data_list = s3.get_many(self.files[:100]) # Sample
# Process in parallel
with profile('processing', stats):
chunks = [data_list[i:i+self.batch_size]
for i in range(0, len(data_list), self.batch_size)]
self.results = parallel_map(
process_chunk,
chunks,
max_parallel=8
)
# Save results
with profile('upload', stats):
with S3() as s3:
s3.put('s3://bucket/results.pkl', self.results)
self.timing = stats
self.next(self.end)
@step
def end(self):
# Analyze performance
print("\nPerformance Report:")
for operation, timing in self.timing.items():
print(f" {operation}: {timing['duration']:.2f}s")
total = sum(t['duration'] for t in self.timing.values())
print(f" Total: {total:.2f}s")
# Results
print(f"\nProcessed {len(self.results)} chunks")
print(f"Total: {sum(self.results)}")
if __name__ == '__main__':
AdvancedPipeline()# Use for CPU-bound operations within a step
results = parallel_map(cpu_intensive_func, items, max_parallel=8)
# For I/O-bound operations, use foreach instead
self.next(self.download, foreach='urls') # Better for I/O
# Functions must be picklable (top-level, not nested)
def process(x): # Good
return x * 2
def outer():
def process(x): # Avoid (may not pickle)
return x * 2
return parallel_map(process, items)# Keep decorators focused and composable
class TimingDecorator(UserStepDecorator):
"""Does one thing well"""
pass
class ValidationDecorator(UserStepDecorator):
"""Separate concern"""
pass
# Can be combined
@TimingDecorator
@ValidationDecorator
@step
def process(self):
pass
# Use unique variable names in injected code (prefix with _)
mutable_step.add_init_code('_start_time = time.time()') # Good
mutable_step.add_init_code('start_time = time.time()') # May conflict
# Handle errors gracefully
def mutate(self, mutable_step):
try:
# Decorator logic
pass
except Exception as e:
print(f"Decorator error: {e}")
# Decide: re-raise, skip, or continue# Catch specific exceptions
try:
run = flow.latest_run
except MetaflowNotFound:
print("Not found")
except MetaflowException:
print("Other Metaflow error")
# Provide context in error messages
try:
data = run.data.artifact
except MetaflowNotFound as e:
print(f"Artifact 'artifact' not found in run {run.id}: {e}")
# Clean up resources
connection = None
try:
connection = connect()
data = fetch(connection)
except MetaflowException as e:
print(f"Error: {e}")
finally:
if connection:
connection.close()