Retry code until it succeeds
Tenacity provides various utility functions, helper classes, and supporting functionality that enable advanced retry scenarios, time conversion, sleep strategies, and integration with different environments.
from tenacity import TryAgain
class TryAgain(Exception):
"""
Exception to force immediate retry regardless of retry condition.
Raise this exception to bypass all retry strategy evaluation
and force another attempt (subject to stop conditions).
Useful for explicit retry control within application logic.
"""
pass@retry(
stop=stop_after_attempt(5),
retry=retry_if_exception_type(ValueError) # Only retry ValueErrors normally
)
def conditional_retry_operation():
if some_temporary_condition():
# Force retry even though condition might not normally trigger it
raise TryAgain
# Normal operation logic
if error_condition():
raise ConnectionError("Network failed") # This would NOT normally retry
return "success"from tenacity import RetryError
class RetryError(Exception):
"""
Exception raised when all retry attempts are exhausted.
Contains information about the final failed attempt and provides
methods to access the original exception that caused the failure.
"""
def __init__(self, last_attempt: Future):
"""
Initialize with the final failed attempt.
Parameters:
- last_attempt: Future object representing the final attempt
"""
self.last_attempt = last_attempt
def reraise(self) -> None:
"""
Reraise the original exception from the last attempt.
Raises the actual exception that caused the final failure,
preserving the original traceback information.
"""
@property
def last_attempt(self) -> Future:
"""Access the final failed attempt Future object."""from tenacity import RetryError
def handle_retry_failure():
try:
result = failing_operation()
except RetryError as retry_err:
# Access the final attempt
final_attempt = retry_err.last_attempt
print(f"Failed after attempt {final_attempt.attempt_number}")
# Get the original exception
try:
original_result = final_attempt.result() # This will raise the original exception
except Exception as original_exc:
print(f"Original failure: {original_exc}")
# Or reraise the original exception directly
# retry_err.reraise() # Preserves original traceback
@retry(
stop=stop_after_attempt(3),
reraise=False # RetryError will be raised instead of original exception
)
def failing_operation():
raise ValueError("Something went wrong")from tenacity.nap import sleep
def sleep(seconds: float) -> None:
"""
Default sleep function using time.sleep().
Parameters:
- seconds: Number of seconds to sleep
Standard blocking sleep implementation used by default
in synchronous retry operations.
"""from tenacity.nap import sleep_using_event
import threading
class sleep_using_event:
"""
Sleep strategy that waits on a threading event with timeout.
Allows external interruption of sleep periods through event signaling.
Useful for graceful shutdown scenarios or coordinated stopping.
"""
def __init__(self, event: threading.Event):
"""
Initialize with threading event for sleep control.
Parameters:
- event: threading.Event that can interrupt sleep when set
"""
def __call__(self, timeout: Optional[float]) -> None:
"""
Sleep for specified timeout or until event is set.
Parameters:
- timeout: Maximum time to sleep in seconds (None = indefinite)
Returns immediately if event is already set.
"""import threading
# Create event for coordinated sleep control
shutdown_event = threading.Event()
event_sleep = sleep_using_event(shutdown_event)
@retry(
sleep=event_sleep,
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, max=60)
)
def interruptible_operation():
# This operation's sleep periods can be interrupted
pass
# In another thread or signal handler
def shutdown_handler():
shutdown_event.set() # Interrupts any ongoing sleep periodsfrom tenacity._utils import time_unit_type, to_seconds
from datetime import timedelta
# Type alias for time specifications
time_unit_type = Union[int, float, timedelta]
def to_seconds(time_unit: time_unit_type) -> float:
"""
Convert various time units to seconds.
Parameters:
- time_unit: Time specification as int/float (seconds) or timedelta
Returns:
Time converted to seconds as float
"""from datetime import timedelta
# All equivalent - 30 seconds
seconds_int = to_seconds(30) # 30.0
seconds_float = to_seconds(30.5) # 30.5
seconds_timedelta = to_seconds(timedelta(seconds=30)) # 30.0
# Complex time specifications
minutes = to_seconds(timedelta(minutes=5)) # 300.0
mixed = to_seconds(timedelta(minutes=2, seconds=30)) # 150.0
hours = to_seconds(timedelta(hours=1)) # 3600.0
# Usage in retry configurations
@retry(
stop=stop_after_delay(to_seconds(timedelta(minutes=10))),
wait=wait_fixed(to_seconds(timedelta(seconds=30)))
)
def time_converted_operation():
passfrom tenacity._utils import MAX_WAIT
import sys
# Maximum wait time constant
MAX_WAIT: float = sys.maxsize / 2from tenacity._utils import get_callback_name
def get_callback_name(cb: Callable[..., Any]) -> str:
"""
Get fully qualified name of callback function.
Parameters:
- cb: Callback function to get name for
Returns:
Fully qualified function name as string
Useful for logging and debugging callback configurations.
"""def my_custom_callback(retry_state):
pass
callback_name = get_callback_name(my_custom_callback)
print(callback_name) # "my_module.my_custom_callback"
# Usage in logging
def log_callback_config(retrying_instance):
before_name = get_callback_name(retrying_instance.before)
after_name = get_callback_name(retrying_instance.after)
print(f"Before callback: {before_name}")
print(f"After callback: {after_name}")from tenacity._utils import to_ordinal
def to_ordinal(pos_num: int) -> str:
"""
Convert positive number to ordinal string representation.
Parameters:
- pos_num: Positive integer to convert
Returns:
Ordinal string (1st, 2nd, 3rd, 4th, etc.)
Useful for human-friendly logging and error messages.
"""# Convert numbers to ordinals
print(to_ordinal(1)) # "1st"
print(to_ordinal(2)) # "2nd"
print(to_ordinal(3)) # "3rd"
print(to_ordinal(4)) # "4th"
print(to_ordinal(21)) # "21st"
print(to_ordinal(22)) # "22nd"
# Usage in retry messages
def ordinal_logging_callback(retry_state):
attempt_ordinal = to_ordinal(retry_state.attempt_number)
print(f"Starting {attempt_ordinal} attempt at {retry_state.fn.__name__}")
@retry(before=ordinal_logging_callback)
def operation_with_ordinal_logging():
passfrom tenacity._utils import is_coroutine_callable
def is_coroutine_callable(call: Callable[..., Any]) -> bool:
"""
Check if callable is a coroutine function.
Parameters:
- call: Callable to check
Returns:
True if callable is async/coroutine function, False otherwise
Used internally to detect async functions for automatic
AsyncRetrying application.
"""from tenacity._utils import wrap_to_async_func
def wrap_to_async_func(call: Callable[..., Any]) -> Callable[..., Awaitable[Any]]:
"""
Wrap synchronous function to async, or return as-is if already async.
Parameters:
- call: Function to wrap (sync or async)
Returns:
Async function that can be awaited
Enables using sync callbacks in async retry contexts.
"""# Check if function is async
def sync_function():
return "sync result"
async def async_function():
return "async result"
print(is_coroutine_callable(sync_function)) # False
print(is_coroutine_callable(async_function)) # True
# Wrap sync function for async use
wrapped_sync = wrap_to_async_func(sync_function)
wrapped_async = wrap_to_async_func(async_function) # Returns as-is
# Both can now be awaited
async def demo():
result1 = await wrapped_sync() # Works
result2 = await wrapped_async() # Worksfrom tenacity import NO_RESULT
# Sentinel object for unset results
NO_RESULT: objectdef check_result_status(retry_state):
if retry_state.outcome is NO_RESULT:
print("No result set yet")
elif retry_state.outcome.failed:
print("Attempt failed")
else:
print("Attempt succeeded")import asyncio
import time
class ProgressiveSleep:
"""Custom sleep with progress indication."""
def __init__(self, progress_callback=None):
self.progress_callback = progress_callback
def __call__(self, seconds):
if self.progress_callback:
# Show progress during long sleeps
for i in range(int(seconds)):
time.sleep(1)
self.progress_callback(i + 1, int(seconds))
# Handle fractional remainder
remainder = seconds - int(seconds)
if remainder > 0:
time.sleep(remainder)
else:
time.sleep(seconds)
# Usage with progress indication
def progress_indicator(current, total):
print(f"Sleeping... {current}/{total} seconds")
progressive_sleep = ProgressiveSleep(progress_indicator)
@retry(
sleep=progressive_sleep,
wait=wait_exponential(multiplier=1, min=5, max=30)
)
def operation_with_progress():
passdef analyze_retry_state(retry_state: RetryCallState) -> dict:
"""Comprehensive retry state analysis."""
return {
'function_name': retry_state.fn.__name__,
'attempt_number': retry_state.attempt_number,
'elapsed_seconds': retry_state.seconds_since_start,
'total_idle_time': retry_state.idle_for,
'active_time': retry_state.seconds_since_start - retry_state.idle_for,
'last_outcome': 'failed' if retry_state.outcome and retry_state.outcome.failed else 'success',
'upcoming_sleep': retry_state.upcoming_sleep if retry_state.upcoming_sleep else 0,
'efficiency': (retry_state.seconds_since_start - retry_state.idle_for) / retry_state.seconds_since_start if retry_state.seconds_since_start > 0 else 1.0
}
def detailed_analysis_callback(retry_state):
analysis = analyze_retry_state(retry_state)
print(f"Retry Analysis: {analysis}")
@retry(after=detailed_analysis_callback)
def analyzed_operation():
passfrom datetime import datetime, timezone
import time
class TimeZoneAwareSleep:
"""Sleep implementation that considers time zones."""
def __init__(self, timezone_offset=0):
self.timezone_offset = timezone_offset
def __call__(self, seconds):
# Could adjust sleep based on time zone considerations
current_hour = datetime.now(timezone.utc).hour + self.timezone_offset
# Example: longer sleeps during business hours
if 9 <= current_hour <= 17:
adjusted_seconds = seconds * 1.5 # 50% longer during business hours
else:
adjusted_seconds = seconds
time.sleep(adjusted_seconds)
# Usage for timezone-aware retry behavior
tz_sleep = TimeZoneAwareSleep(timezone_offset=-8) # PST
@retry(
sleep=tz_sleep,
wait=wait_exponential(multiplier=1, max=60)
)
def timezone_aware_operation():
passimport time
import threading
from collections import defaultdict
class RetryPerformanceMonitor:
"""Monitor retry performance across the application."""
def __init__(self):
self.stats = defaultdict(list)
self.lock = threading.Lock()
def record_attempt(self, retry_state):
with self.lock:
function_name = retry_state.fn.__name__
self.stats[function_name].append({
'attempt': retry_state.attempt_number,
'timestamp': time.time(),
'elapsed': retry_state.seconds_since_start,
'success': not (retry_state.outcome and retry_state.outcome.failed)
})
def get_summary(self, function_name=None):
with self.lock:
if function_name:
return self._summarize_function(function_name, self.stats[function_name])
else:
return {fn: self._summarize_function(fn, attempts)
for fn, attempts in self.stats.items()}
def _summarize_function(self, function_name, attempts):
if not attempts:
return {'total_attempts': 0}
total_attempts = len(attempts)
successful_operations = len([a for a in attempts if a['success']])
avg_duration = sum(a['elapsed'] for a in attempts) / total_attempts
return {
'function_name': function_name,
'total_attempts': total_attempts,
'successful_operations': successful_operations,
'success_rate': successful_operations / total_attempts,
'avg_duration': avg_duration
}
# Global performance monitor
perf_monitor = RetryPerformanceMonitor()
@retry(after=perf_monitor.record_attempt)
def monitored_operation():
pass
# Get performance summary
summary = perf_monitor.get_summary()
print(summary)import weakref
from contextlib import contextmanager
class RetryResourceManager:
"""Manage resources across retry attempts."""
def __init__(self):
self.active_retries = weakref.WeakSet()
def register_retry(self, retry_state):
self.active_retries.add(retry_state)
def get_active_count(self):
return len(self.active_retries)
@contextmanager
def resource_context(self, retry_state):
self.register_retry(retry_state)
try:
yield
finally:
# Cleanup happens automatically via WeakSet
pass
# Global resource manager
resource_manager = RetryResourceManager()
def resource_aware_callback(retry_state):
active_count = resource_manager.get_active_count()
if active_count > 10:
print(f"Warning: {active_count} active retry operations")
@retry(
before=resource_aware_callback,
before_sleep=lambda rs: resource_manager.register_retry(rs)
)
def resource_managed_operation():
passThis comprehensive collection of utilities and helpers provides the building blocks for advanced retry scenarios, performance monitoring, resource management, and integration with various system components.
Install with Tessl CLI
npx tessl i tessl/pypi-tenacity