Retry code until it succeeds
Tenacity provides a comprehensive callback system with hooks that execute at different stages of the retry lifecycle. These callbacks enable logging, monitoring, custom actions, and integration with external systems during retry operations.
Callbacks are executed in this order during retry operations:
Before callbacks execute immediately before each attempt, including the initial attempt.
from tenacity import before_nothing
def before_nothing(retry_state: RetryCallState) -> None:
"""
Default before callback that performs no action.
Parameters:
- retry_state: Complete state of current retry session
"""from tenacity import before_log
import logging
def before_log(
logger: logging.Logger,
log_level: int
) -> Callable[[RetryCallState], None]:
"""
Create a before callback that logs attempt start.
Parameters:
- logger: Logger instance to use for output
- log_level: Logging level (e.g., logging.INFO, logging.WARNING)
Returns:
Callback function that logs before each attempt
"""import logging
# Basic logging before each attempt
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
before=before_log(logger, logging.INFO)
)
def logged_operation():
pass
# Custom before callback
def custom_before_callback(retry_state):
print(f"Starting attempt {retry_state.attempt_number}")
if retry_state.attempt_number > 1:
print(f"Previous attempt failed after {retry_state.seconds_since_start:.2f}s")
@retry(before=custom_before_callback)
def monitored_operation():
pass
# Metrics collection before callback
def metrics_before_callback(retry_state):
metrics.increment('operation.attempts', tags={
'function': retry_state.fn.__name__,
'attempt': retry_state.attempt_number
})
@retry(before=metrics_before_callback)
def instrumented_operation():
passAfter callbacks execute immediately after each attempt completes, whether successful or failed.
from tenacity import after_nothing
def after_nothing(retry_state: RetryCallState) -> None:
"""
Default after callback that performs no action.
Parameters:
- retry_state: Complete state of current retry session
"""from tenacity import after_log
def after_log(
logger: logging.Logger,
log_level: int,
sec_format: str = "%0.3f"
) -> Callable[[RetryCallState], None]:
"""
Create an after callback that logs attempt completion.
Parameters:
- logger: Logger instance to use for output
- log_level: Logging level for the log message
- sec_format: Format string for displaying seconds (default: "%.3f")
Returns:
Callback function that logs after each attempt
"""# Basic logging after each attempt
@retry(
stop=stop_after_attempt(3),
after=after_log(logger, logging.INFO)
)
def logged_operation():
pass
# Custom after callback with outcome analysis
def analyze_after_callback(retry_state):
if retry_state.outcome.failed:
exc = retry_state.outcome.result()
print(f"Attempt {retry_state.attempt_number} failed: {exc}")
else:
result = retry_state.outcome.result()
print(f"Attempt {retry_state.attempt_number} succeeded: {result}")
@retry(after=analyze_after_callback)
def analyzed_operation():
pass
# Performance monitoring after callback
def perf_after_callback(retry_state):
attempt_duration = time.time() - retry_state.outcome_timestamp
metrics.histogram('operation.attempt_duration', attempt_duration, tags={
'success': not retry_state.outcome.failed,
'attempt': retry_state.attempt_number
})
@retry(after=perf_after_callback)
def performance_monitored_operation():
passBefore sleep callbacks execute before waiting between retry attempts (not called after successful attempts).
from tenacity import before_sleep_nothing
def before_sleep_nothing(retry_state: RetryCallState) -> None:
"""
Default before sleep callback that performs no action.
Parameters:
- retry_state: Complete state of current retry session
"""from tenacity import before_sleep_log
def before_sleep_log(
logger: logging.Logger,
log_level: int,
exc_info: bool = False
) -> Callable[[RetryCallState], None]:
"""
Create a before sleep callback that logs retry reason and sleep time.
Parameters:
- logger: Logger instance to use for output
- log_level: Logging level for the log message
- exc_info: Whether to include exception information in logs
Returns:
Callback function that logs before sleeping between retries
"""# Basic sleep logging
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def sleep_logged_operation():
pass
# Custom sleep callback with detailed info
def detailed_sleep_callback(retry_state):
if retry_state.outcome.failed:
exc = retry_state.outcome.result()
print(f"Retrying due to {type(exc).__name__}: {exc}")
print(f"Sleeping for {retry_state.upcoming_sleep:.2f} seconds...")
print(f"Total elapsed: {retry_state.seconds_since_start:.2f}s")
@retry(before_sleep=detailed_sleep_callback)
def detailed_retry_operation():
pass
# Exponential backoff notification
def backoff_notification_callback(retry_state):
notify_monitoring_system({
'event': 'retry_backoff',
'attempt': retry_state.attempt_number,
'sleep_duration': retry_state.upcoming_sleep,
'total_elapsed': retry_state.seconds_since_start,
'function': retry_state.fn.__name__
})
@retry(before_sleep=backoff_notification_callback)
def monitored_retry_operation():
passRetry error callbacks execute when all retry attempts are exhausted and a RetryError is about to be raised.
def retry_error_callback(retry_state: RetryCallState) -> Any:
"""
Callback executed when retries are exhausted.
Parameters:
- retry_state: Final state of the retry session
Returns:
Any value (return value is typically ignored)
"""
# Log final failure
logger.error(f"All retries exhausted for {retry_state.fn.__name__}")
# Send alert
send_alert({
'function': retry_state.fn.__name__,
'attempts': retry_state.attempt_number,
'total_time': retry_state.seconds_since_start,
'final_exception': str(retry_state.outcome.result())
})
@retry(
stop=stop_after_attempt(3),
retry_error_callback=retry_error_callback
)
def critical_operation():
passMultiple callback types can be used together for comprehensive monitoring:
# Complete callback setup for production monitoring
production_logger = logging.getLogger('production')
def production_before(retry_state):
production_logger.info(
f"Starting {retry_state.fn.__name__} attempt {retry_state.attempt_number}"
)
def production_after(retry_state):
if retry_state.outcome.failed:
exc = retry_state.outcome.result()
production_logger.warning(
f"Attempt {retry_state.attempt_number} failed: {type(exc).__name__}"
)
def production_sleep(retry_state):
production_logger.info(
f"Retrying in {retry_state.upcoming_sleep}s "
f"(elapsed: {retry_state.seconds_since_start:.1f}s)"
)
def production_error(retry_state):
production_logger.error(
f"All retries failed for {retry_state.fn.__name__} "
f"after {retry_state.attempt_number} attempts "
f"in {retry_state.seconds_since_start:.1f}s"
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=10),
before=production_before,
after=production_after,
before_sleep=production_sleep,
retry_error_callback=production_error
)
def production_api_call():
passAll callbacks can be async functions when using AsyncRetrying:
# Async callback examples
async def async_before_callback(retry_state):
await log_attempt_to_database(
function=retry_state.fn.__name__,
attempt=retry_state.attempt_number
)
async def async_after_callback(retry_state):
if retry_state.outcome.failed:
await record_failure_metrics(retry_state)
else:
await record_success_metrics(retry_state)
async def async_sleep_callback(retry_state):
await update_retry_dashboard({
'function': retry_state.fn.__name__,
'status': 'retrying',
'next_attempt_in': retry_state.upcoming_sleep
})
async def async_error_callback(retry_state):
await send_failure_notification({
'function': retry_state.fn.__name__,
'final_state': retry_state
})
@retry(
stop=stop_after_attempt(3),
before=async_before_callback,
after=async_after_callback,
before_sleep=async_sleep_callback,
retry_error_callback=async_error_callback
)
async def async_operation_with_callbacks():
passclass RetryMetrics:
def __init__(self):
self.attempt_times = []
self.failure_reasons = []
def before_callback(self, retry_state):
self.attempt_times.append(time.time())
def after_callback(self, retry_state):
if retry_state.outcome.failed:
exc = retry_state.outcome.result()
self.failure_reasons.append(type(exc).__name__)
def error_callback(self, retry_state):
print(f"Final metrics:")
print(f" Attempts: {len(self.attempt_times)}")
print(f" Failure types: {set(self.failure_reasons)}")
print(f" Total duration: {retry_state.seconds_since_start:.2f}s")
# Usage with stateful callbacks
metrics = RetryMetrics()
@retry(
stop=stop_after_attempt(5),
before=metrics.before_callback,
after=metrics.after_callback,
retry_error_callback=metrics.error_callback
)
def operation_with_metrics():
passdef conditional_sleep_callback(retry_state):
# Only log for longer sleep periods
if retry_state.upcoming_sleep > 5:
logger.warning(
f"Long backoff: sleeping {retry_state.upcoming_sleep}s "
f"after attempt {retry_state.attempt_number}"
)
# Send alerts after multiple failures
if retry_state.attempt_number >= 3:
send_alert(f"Multiple failures in {retry_state.fn.__name__}")
@retry(before_sleep=conditional_sleep_callback)
def monitored_operation():
passdef chain_callbacks(*callbacks):
"""Chain multiple callbacks together."""
def chained_callback(retry_state):
for callback in callbacks:
if callback: # Skip None callbacks
callback(retry_state)
return chained_callback
# Combine multiple callback functions
logging_callback = before_log(logger, logging.INFO)
metrics_callback = lambda rs: metrics.record_attempt(rs)
alert_callback = lambda rs: maybe_send_alert(rs)
combined_before = chain_callbacks(
logging_callback,
metrics_callback,
alert_callback
)
@retry(before=combined_before)
def multi_callback_operation():
passdef create_monitoring_callbacks(service_name, alert_threshold=3):
"""Factory for creating consistent monitoring callbacks."""
def before_callback(retry_state):
metrics.increment(f'{service_name}.attempts')
def after_callback(retry_state):
if retry_state.outcome.failed:
metrics.increment(f'{service_name}.failures')
else:
metrics.increment(f'{service_name}.successes')
def sleep_callback(retry_state):
if retry_state.attempt_number >= alert_threshold:
send_alert(f'{service_name} experiencing repeated failures')
return before_callback, after_callback, sleep_callback
# Use factory for consistent monitoring
before_cb, after_cb, sleep_cb = create_monitoring_callbacks('user_service')
@retry(
stop=stop_after_attempt(5),
before=before_cb,
after=after_cb,
before_sleep=sleep_cb
)
def user_service_operation():
passdef debug_callback_suite():
"""Comprehensive debug callbacks for development."""
def debug_before(retry_state):
print(f"\n--- Attempt {retry_state.attempt_number} ---")
print(f"Function: {retry_state.fn.__name__}")
print(f"Args: {retry_state.args}")
print(f"Kwargs: {retry_state.kwargs}")
def debug_after(retry_state):
print(f"Outcome: {'FAILED' if retry_state.outcome.failed else 'SUCCESS'}")
if retry_state.outcome.failed:
print(f"Exception: {retry_state.outcome.result()}")
else:
print(f"Result: {retry_state.outcome.result()}")
def debug_sleep(retry_state):
print(f"Sleeping for {retry_state.upcoming_sleep}s")
print(f"Total elapsed: {retry_state.seconds_since_start:.2f}s")
print(f"Total idle time: {retry_state.idle_for:.2f}s")
return debug_before, debug_after, debug_sleep
# Apply debug callbacks
debug_before, debug_after, debug_sleep = debug_callback_suite()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1),
before=debug_before,
after=debug_after,
before_sleep=debug_sleep
)
def debug_operation():
passclass TestCallbacks:
"""Callback suite for testing retry behavior."""
def __init__(self):
self.attempts = []
self.failures = []
self.sleep_times = []
def before_callback(self, retry_state):
self.attempts.append(retry_state.attempt_number)
def after_callback(self, retry_state):
if retry_state.outcome.failed:
self.failures.append(retry_state.outcome.result())
def sleep_callback(self, retry_state):
self.sleep_times.append(retry_state.upcoming_sleep)
def verify_behavior(self, expected_attempts, expected_failures):
assert len(self.attempts) == expected_attempts
assert len(self.failures) == expected_failures - 1 # Last attempt might succeed
# Usage in tests
def test_retry_behavior():
test_callbacks = TestCallbacks()
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
before=test_callbacks.before_callback,
after=test_callbacks.after_callback,
before_sleep=test_callbacks.sleep_callback
)
def failing_function():
raise ValueError("Test failure")
with pytest.raises(RetryError):
failing_function()
test_callbacks.verify_behavior(expected_attempts=3, expected_failures=3)from prometheus_client import Counter, Histogram, Gauge
# Prometheus metrics
retry_attempts = Counter('retry_attempts_total', 'Total retry attempts', ['function'])
retry_duration = Histogram('retry_duration_seconds', 'Retry operation duration', ['function'])
active_retries = Gauge('active_retries', 'Currently active retry operations', ['function'])
def prometheus_before_callback(retry_state):
retry_attempts.labels(function=retry_state.fn.__name__).inc()
if retry_state.attempt_number == 1:
active_retries.labels(function=retry_state.fn.__name__).inc()
def prometheus_error_callback(retry_state):
active_retries.labels(function=retry_state.fn.__name__).dec()
retry_duration.labels(function=retry_state.fn.__name__).observe(
retry_state.seconds_since_start
)
@retry(
before=prometheus_before_callback,
retry_error_callback=prometheus_error_callback
)
def monitored_api_call():
passimport structlog
structured_logger = structlog.get_logger()
def structured_logging_callbacks():
def before_callback(retry_state):
structured_logger.info(
"retry_attempt_start",
function=retry_state.fn.__name__,
attempt=retry_state.attempt_number,
elapsed_seconds=retry_state.seconds_since_start
)
def after_callback(retry_state):
structured_logger.info(
"retry_attempt_complete",
function=retry_state.fn.__name__,
attempt=retry_state.attempt_number,
success=not retry_state.outcome.failed,
elapsed_seconds=retry_state.seconds_since_start
)
return before_callback, after_callback
before_cb, after_cb = structured_logging_callbacks()
@retry(before=before_cb, after=after_cb)
def structured_logged_operation():
passThis comprehensive callback system provides extensive hooks for monitoring, logging, alerting, and custom actions throughout the retry lifecycle, enabling full observability and control over retry behavior.
Install with Tessl CLI
npx tessl i tessl/pypi-tenacity