Retry code until it succeeds
Tenacity provides comprehensive support for asynchronous programming with native async/await syntax, asyncio integration, trio support, and Tornado web framework compatibility. The library automatically detects async functions and applies appropriate async retry controllers.
from tenacity import AsyncRetrying
class AsyncRetrying(BaseRetrying):
"""
Asynchronous retry controller for coroutines.
Handles retry logic for async/await functions and coroutines.
Auto-detects trio vs asyncio for appropriate sleep function.
Provides async iterator support for manual retry loops.
"""
def __init__(
self,
sleep: Callable[[float], Awaitable[None]] = None, # Auto-detected
stop: StopBaseT = stop_never,
wait: WaitBaseT = wait_none(),
retry: Union[RetryBaseT, AsyncRetryBaseT] = retry_if_exception_type(),
before: Callable[[RetryCallState], Awaitable[None]] = None,
after: Callable[[RetryCallState], Awaitable[None]] = None,
before_sleep: Optional[Callable[[RetryCallState], Awaitable[None]]] = None,
reraise: bool = False,
retry_error_cls: type = RetryError,
retry_error_callback: Optional[Callable[[RetryCallState], Awaitable[Any]]] = None
):
"""
Initialize async retry controller.
Parameters are same as BaseRetrying, but callbacks can be async functions.
If sleep is None, auto-detects asyncio.sleep or trio.sleep.
"""
async def __call__(self, fn: Callable[..., Any], *args, **kwargs) -> Any:
"""
Execute coroutine with asynchronous retry logic.
Parameters:
- fn: Async function/coroutine to execute with retries
- *args: Positional arguments to pass to fn
- **kwargs: Keyword arguments to pass to fn
Returns:
Result of successful coroutine execution.
Raises:
RetryError: When all retry attempts are exhausted
"""
def __aiter__(self) -> AsyncIterator[AttemptManager]:
"""Async iterator interface for attempt managers."""
async def __anext__(self) -> AttemptManager:
"""Async iteration support for manual retry loops."""The main @retry decorator automatically detects async functions and applies AsyncRetrying:
from tenacity import retry, stop_after_attempt, wait_exponential
# Automatically uses AsyncRetrying for async functions
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
async def async_api_call():
"""Async function automatically gets async retry behavior."""
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# Usage
async def main():
result = await async_api_call()Tenacity provides async-specific retry strategies that support async predicates:
from tenacity.asyncio.retry import async_retry_base
class async_retry_base(retry_base):
"""
Base class for async retry strategies.
Extends retry_base to support async predicate functions.
"""
@abstractmethod
async def __call__(self, retry_state: RetryCallState) -> bool:
"""
Async method to determine whether to retry.
Parameters:
- retry_state: Complete state of current retry session
Returns:
True if another attempt should be made, False to stop retrying
"""from tenacity.asyncio import retry_if_exception
class retry_if_exception(async_retry_base):
"""
Async version of retry_if_exception with async predicate support.
Allows using async functions to evaluate whether exceptions
should trigger retries.
"""
def __init__(self, predicate: Callable[[BaseException], Awaitable[bool]]):
"""
Initialize with async exception predicate.
Parameters:
- predicate: Async function that takes exception and returns bool
"""
# Usage example
async def is_retryable_async(exc):
# Could perform async operations like database lookups
if isinstance(exc, CustomAPIError):
# Check if error code indicates retryable condition
return await check_error_retryable(exc.error_code)
return isinstance(exc, (ConnectionError, TimeoutError))
@retry(retry=retry_if_exception(is_retryable_async))
async def complex_async_operation():
passfrom tenacity.asyncio import retry_if_result
class retry_if_result(async_retry_base):
"""
Async version of retry_if_result with async predicate support.
Allows using async functions to evaluate whether successful
results should trigger retries.
"""
def __init__(self, predicate: Callable[[Any], Awaitable[bool]]):
"""
Initialize with async result predicate.
Parameters:
- predicate: Async function that takes result and returns bool
"""
# Usage example
async def is_result_incomplete(result):
if isinstance(result, dict) and 'job_id' in result:
# Async check of job status
status = await check_job_status(result['job_id'])
return status in ['PENDING', 'RUNNING']
return False
@retry(retry=retry_if_result(is_result_incomplete))
async def submit_and_wait_for_job():
passfrom tenacity.asyncio import retry_any, retry_all
class retry_any(async_retry_base):
"""
Async version of retry_any supporting mixed sync/async conditions.
Combines multiple retry strategies (sync or async) with OR logic.
"""
def __init__(self, *retries: Union[retry_base, async_retry_base]):
"""
Initialize with retry strategies to combine.
Parameters:
- *retries: Mix of sync and async retry strategies
"""
class retry_all(async_retry_base):
"""
Async version of retry_all supporting mixed sync/async conditions.
Combines multiple retry strategies (sync or async) with AND logic.
"""
def __init__(self, *retries: Union[retry_base, async_retry_base]):
"""
Initialize with retry strategies to combine.
Parameters:
- *retries: Mix of sync and async retry strategies
"""Tenacity automatically integrates with asyncio by detecting the running event loop:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((asyncio.TimeoutError, ConnectionError))
)
async def asyncio_operation():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()
# Run with asyncio
async def main():
result = await asyncio_operation()
asyncio.run(main())Tenacity auto-detects trio and uses trio.sleep instead of asyncio.sleep:
import trio
from tenacity import retry, stop_after_delay, wait_random_exponential
@retry(
stop=stop_after_delay(30),
wait=wait_random_exponential(multiplier=1, max=10)
)
async def trio_operation():
# Automatically uses trio.sleep for delays
await trio.sleep(0.1) # Your trio code here
return "success"
# Run with trio
async def main():
result = await trio_operation()
trio.run(main)# Direct usage of AsyncRetrying controller
async_retrying = AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(ConnectionError)
)
# Apply to different async functions
result1 = await async_retrying(async_function1, arg1, arg2)
result2 = await async_retrying(async_function2, kwarg=value)
# Create variations
fast_async_retrying = async_retrying.copy(wait=wait_fixed(0.5))AsyncRetrying provides async iterator support for manual retry loops:
async def manual_async_retry():
async_retrying = AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1)
)
async for attempt in async_retrying:
with attempt:
# Your async code that might fail
result = await risky_async_operation()
break # Success - exit retry loop
return resultAll callback parameters in async retry controllers can be async functions:
import logging
async def async_before_callback(retry_state):
"""Async callback executed before each attempt."""
logger.info(f"Starting attempt {retry_state.attempt_number}")
await audit_attempt_start(retry_state)
async def async_after_callback(retry_state):
"""Async callback executed after each attempt."""
if retry_state.outcome.failed:
await log_failure_to_monitoring(retry_state)
else:
await record_success_metrics(retry_state)
async def async_before_sleep_callback(retry_state):
"""Async callback executed before sleeping between retries."""
await update_retry_status(
f"Retrying in {retry_state.upcoming_sleep} seconds"
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1),
before=async_before_callback,
after=async_after_callback,
before_sleep=async_before_sleep_callback
)
async def monitored_async_operation():
passfrom tenacity.tornadoweb import TornadoRetrying
from tornado import gen
class TornadoRetrying(BaseRetrying):
"""
Tornado web framework retry controller.
Specialized for Tornado's @gen.coroutine decorated functions.
Uses Tornado's IOLoop for asynchronous sleep operations.
"""
@gen.coroutine
def __call__(self, fn: Callable[..., Any], *args, **kwargs) -> Any:
"""
Execute Tornado coroutine with retry logic.
Uses Tornado's generator-based coroutine model and IOLoop.sleep.
Parameters:
- fn: Tornado coroutine to execute with retries
- *args: Positional arguments to pass to fn
- **kwargs: Keyword arguments to pass to fn
Returns:
tornado.concurrent.Future with result of successful execution.
"""from tornado import gen, ioloop
from tenacity import retry, stop_after_attempt, wait_exponential
# Automatic Tornado detection
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
@gen.coroutine
def tornado_operation():
"""Automatically uses TornadoRetrying for @gen.coroutine functions."""
response = yield tornado_http_client.fetch("http://api.example.com")
raise gen.Return(response.body)
# Manual TornadoRetrying usage
tornado_retrying = TornadoRetrying(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, max=10)
)
@gen.coroutine
def manual_tornado_retry():
result = yield tornado_retrying(tornado_api_call, url="http://api.example.com")
raise gen.Return(result)from tenacity import RetryError
async def handle_async_retry_failure():
try:
result = await failing_async_operation()
except RetryError as retry_err:
# Access the last failed attempt
last_attempt = retry_err.last_attempt
# Get the original exception
if last_attempt.failed:
original_exc = last_attempt.result() # Raises original exception
# Or reraise the original exception directly
retry_err.reraise()@retry(
retry=retry_if_exception_cause_type(ConnectionError),
stop=stop_after_attempt(3)
)
async def async_with_exception_chaining():
try:
await external_async_api()
except ConnectionError as e:
# Chain exceptions for cause analysis
raise ValueError("External API failed") from eimport asyncio
from datetime import datetime, timedelta
class AsyncCircuitBreaker:
def __init__(self):
self.failure_count = 0
self.last_failure_time = None
self.circuit_open = False
async def should_retry(self, retry_state):
if self.circuit_open:
# Circuit is open, check if enough time has passed
if datetime.now() - self.last_failure_time > timedelta(minutes=5):
self.circuit_open = False
self.failure_count = 0
return True
return False
# Circuit is closed, normal retry logic
if retry_state.outcome and retry_state.outcome.failed:
self.failure_count += 1
if self.failure_count >= 5:
self.circuit_open = True
self.last_failure_time = datetime.now()
return False
return True
circuit_breaker = AsyncCircuitBreaker()
@retry(retry=circuit_breaker.should_retry)
async def circuit_protected_async_operation():
passimport asyncio
async def async_resource_retry():
# Pool of async resources (connections, etc.)
resource_pool = asyncio.BoundedSemaphore(10)
@retry(
stop=stop_after_delay(60),
wait=wait_exponential(multiplier=1, max=5)
)
async def acquire_and_use_resource():
async with resource_pool:
# Use resource with retry protection
return await use_async_resource()
return await acquire_and_use_resource()import asyncio
from aiolimiter import AsyncLimiter
# Rate limiter integration
rate_limiter = AsyncLimiter(10, 60) # 10 requests per minute
@retry(
retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=60, max=300), # Wait for rate limit reset
stop=stop_after_attempt(3)
)
async def rate_limited_async_call():
async with rate_limiter:
return await api_call()import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_retry_context(retrying_config):
"""Async context manager for retry operations."""
async_retrying = AsyncRetrying(**retrying_config)
try:
yield async_retrying
finally:
# Cleanup code
await cleanup_async_resources()
# Usage
async def context_retry_example():
config = {
'stop': stop_after_attempt(3),
'wait': wait_exponential(multiplier=1)
}
async with async_retry_context(config) as retrying:
result = await retrying(async_operation, param=value)
return result# For high-frequency async operations, minimize retry overhead
@retry(
wait=wait_none(), # No async sleep overhead
stop=stop_after_attempt(2), # Quick failure
retry=retry_if_exception_type(TransientAsyncError)
)
async def high_frequency_async_operation():
pass# For long-running async services, manage retry state memory
@retry(
stop=stop_never, # Infinite retry for services
wait=wait_exponential(multiplier=1, max=300), # Cap wait at 5 minutes
retry=retry_if_exception_type(RecoverableAsyncError)
)
async def persistent_async_service():
# Service code that should retry indefinitely
passThis comprehensive async support enables robust retry behavior for modern asynchronous Python applications across asyncio, trio, and Tornado frameworks with full integration into Python's async/await ecosystem.
Install with Tessl CLI
npx tessl i tessl/pypi-tenacity