CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tenacity

Retry code until it succeeds

Overview
Eval results
Files

async-support.mddocs/

Async Support

Tenacity provides comprehensive support for asynchronous programming with native async/await syntax, asyncio integration, trio support, and Tornado web framework compatibility. The library automatically detects async functions and applies appropriate async retry controllers.

Core Async Classes

AsyncRetrying

from tenacity import AsyncRetrying

class AsyncRetrying(BaseRetrying):
    """
    Asynchronous retry controller for coroutines.
    
    Handles retry logic for async/await functions and coroutines.
    Auto-detects trio vs asyncio for appropriate sleep function.
    Provides async iterator support for manual retry loops.
    """
    
    def __init__(
        self,
        sleep: Callable[[float], Awaitable[None]] = None,  # Auto-detected
        stop: StopBaseT = stop_never,
        wait: WaitBaseT = wait_none(),
        retry: Union[RetryBaseT, AsyncRetryBaseT] = retry_if_exception_type(),
        before: Callable[[RetryCallState], Awaitable[None]] = None,
        after: Callable[[RetryCallState], Awaitable[None]] = None,
        before_sleep: Optional[Callable[[RetryCallState], Awaitable[None]]] = None,
        reraise: bool = False,
        retry_error_cls: type = RetryError,
        retry_error_callback: Optional[Callable[[RetryCallState], Awaitable[Any]]] = None
    ):
        """
        Initialize async retry controller.
        
        Parameters are same as BaseRetrying, but callbacks can be async functions.
        If sleep is None, auto-detects asyncio.sleep or trio.sleep.
        """
    
    async def __call__(self, fn: Callable[..., Any], *args, **kwargs) -> Any:
        """
        Execute coroutine with asynchronous retry logic.
        
        Parameters:
        - fn: Async function/coroutine to execute with retries
        - *args: Positional arguments to pass to fn
        - **kwargs: Keyword arguments to pass to fn
        
        Returns:
        Result of successful coroutine execution.
        
        Raises:
        RetryError: When all retry attempts are exhausted
        """
    
    def __aiter__(self) -> AsyncIterator[AttemptManager]:
        """Async iterator interface for attempt managers."""
    
    async def __anext__(self) -> AttemptManager:
        """Async iteration support for manual retry loops."""

Automatic Async Detection

The main @retry decorator automatically detects async functions and applies AsyncRetrying:

from tenacity import retry, stop_after_attempt, wait_exponential

# Automatically uses AsyncRetrying for async functions
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
async def async_api_call():
    """Async function automatically gets async retry behavior."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        return response.json()

# Usage
async def main():
    result = await async_api_call()

Async Retry Strategies

Tenacity provides async-specific retry strategies that support async predicates:

async_retry_base

from tenacity.asyncio.retry import async_retry_base

class async_retry_base(retry_base):
    """
    Base class for async retry strategies.
    
    Extends retry_base to support async predicate functions.
    """
    
    @abstractmethod
    async def __call__(self, retry_state: RetryCallState) -> bool:
        """
        Async method to determine whether to retry.
        
        Parameters:
        - retry_state: Complete state of current retry session
        
        Returns:
        True if another attempt should be made, False to stop retrying
        """

Async Exception Strategies

from tenacity.asyncio import retry_if_exception

class retry_if_exception(async_retry_base):
    """
    Async version of retry_if_exception with async predicate support.
    
    Allows using async functions to evaluate whether exceptions
    should trigger retries.
    """
    
    def __init__(self, predicate: Callable[[BaseException], Awaitable[bool]]):
        """
        Initialize with async exception predicate.
        
        Parameters:
        - predicate: Async function that takes exception and returns bool
        """

# Usage example
async def is_retryable_async(exc):
    # Could perform async operations like database lookups
    if isinstance(exc, CustomAPIError):
        # Check if error code indicates retryable condition
        return await check_error_retryable(exc.error_code)
    return isinstance(exc, (ConnectionError, TimeoutError))

@retry(retry=retry_if_exception(is_retryable_async))
async def complex_async_operation():
    pass

Async Result Strategies

from tenacity.asyncio import retry_if_result

class retry_if_result(async_retry_base):
    """
    Async version of retry_if_result with async predicate support.
    
    Allows using async functions to evaluate whether successful
    results should trigger retries.
    """
    
    def __init__(self, predicate: Callable[[Any], Awaitable[bool]]):
        """
        Initialize with async result predicate.
        
        Parameters:
        - predicate: Async function that takes result and returns bool
        """

# Usage example  
async def is_result_incomplete(result):
    if isinstance(result, dict) and 'job_id' in result:
        # Async check of job status
        status = await check_job_status(result['job_id'])
        return status in ['PENDING', 'RUNNING']
    return False

@retry(retry=retry_if_result(is_result_incomplete))
async def submit_and_wait_for_job():
    pass

Async Logical Combinations

from tenacity.asyncio import retry_any, retry_all

class retry_any(async_retry_base):
    """
    Async version of retry_any supporting mixed sync/async conditions.
    
    Combines multiple retry strategies (sync or async) with OR logic.
    """
    
    def __init__(self, *retries: Union[retry_base, async_retry_base]):
        """
        Initialize with retry strategies to combine.
        
        Parameters:
        - *retries: Mix of sync and async retry strategies
        """

class retry_all(async_retry_base):
    """
    Async version of retry_all supporting mixed sync/async conditions.
    
    Combines multiple retry strategies (sync or async) with AND logic.
    """
    
    def __init__(self, *retries: Union[retry_base, async_retry_base]):
        """
        Initialize with retry strategies to combine.
        
        Parameters:
        - *retries: Mix of sync and async retry strategies
        """

Async Framework Integration

AsyncIO Integration

Tenacity automatically integrates with asyncio by detecting the running event loop:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry=retry_if_exception_type((asyncio.TimeoutError, ConnectionError))
)
async def asyncio_operation():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as response:
            return await response.json()

# Run with asyncio
async def main():
    result = await asyncio_operation()
    
asyncio.run(main())

Trio Integration

Tenacity auto-detects trio and uses trio.sleep instead of asyncio.sleep:

import trio
from tenacity import retry, stop_after_delay, wait_random_exponential

@retry(
    stop=stop_after_delay(30),
    wait=wait_random_exponential(multiplier=1, max=10)
)
async def trio_operation():
    # Automatically uses trio.sleep for delays
    await trio.sleep(0.1)  # Your trio code here
    return "success"

# Run with trio
async def main():
    result = await trio_operation()
    
trio.run(main)

Manual AsyncRetrying Usage

# Direct usage of AsyncRetrying controller
async_retrying = AsyncRetrying(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(ConnectionError)
)

# Apply to different async functions
result1 = await async_retrying(async_function1, arg1, arg2)
result2 = await async_retrying(async_function2, kwarg=value)

# Create variations
fast_async_retrying = async_retrying.copy(wait=wait_fixed(0.5))

Async Iterator Support

AsyncRetrying provides async iterator support for manual retry loops:

async def manual_async_retry():
    async_retrying = AsyncRetrying(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1)
    )
    
    async for attempt in async_retrying:
        with attempt:
            # Your async code that might fail
            result = await risky_async_operation()
            break  # Success - exit retry loop
    
    return result

Async Callbacks and Hooks

All callback parameters in async retry controllers can be async functions:

import logging

async def async_before_callback(retry_state):
    """Async callback executed before each attempt."""
    logger.info(f"Starting attempt {retry_state.attempt_number}")
    await audit_attempt_start(retry_state)

async def async_after_callback(retry_state):
    """Async callback executed after each attempt."""
    if retry_state.outcome.failed:
        await log_failure_to_monitoring(retry_state)
    else:
        await record_success_metrics(retry_state)

async def async_before_sleep_callback(retry_state):
    """Async callback executed before sleeping between retries."""
    await update_retry_status(
        f"Retrying in {retry_state.upcoming_sleep} seconds"
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1),
    before=async_before_callback,
    after=async_after_callback,
    before_sleep=async_before_sleep_callback
)
async def monitored_async_operation():
    pass

Tornado Integration

TornadoRetrying

from tenacity.tornadoweb import TornadoRetrying
from tornado import gen

class TornadoRetrying(BaseRetrying):
    """
    Tornado web framework retry controller.
    
    Specialized for Tornado's @gen.coroutine decorated functions.
    Uses Tornado's IOLoop for asynchronous sleep operations.
    """
    
    @gen.coroutine
    def __call__(self, fn: Callable[..., Any], *args, **kwargs) -> Any:
        """
        Execute Tornado coroutine with retry logic.
        
        Uses Tornado's generator-based coroutine model and IOLoop.sleep.
        
        Parameters:
        - fn: Tornado coroutine to execute with retries
        - *args: Positional arguments to pass to fn
        - **kwargs: Keyword arguments to pass to fn
        
        Returns:
        tornado.concurrent.Future with result of successful execution.
        """

Tornado Usage

from tornado import gen, ioloop
from tenacity import retry, stop_after_attempt, wait_exponential

# Automatic Tornado detection
@retry(stop=stop_after_attempt(3), wait=wait_exponential())
@gen.coroutine
def tornado_operation():
    """Automatically uses TornadoRetrying for @gen.coroutine functions."""
    response = yield tornado_http_client.fetch("http://api.example.com")
    raise gen.Return(response.body)

# Manual TornadoRetrying usage
tornado_retrying = TornadoRetrying(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, max=10)
)

@gen.coroutine
def manual_tornado_retry():
    result = yield tornado_retrying(tornado_api_call, url="http://api.example.com")
    raise gen.Return(result)

Async Error Handling

Async RetryError Handling

from tenacity import RetryError

async def handle_async_retry_failure():
    try:
        result = await failing_async_operation()
    except RetryError as retry_err:
        # Access the last failed attempt
        last_attempt = retry_err.last_attempt
        
        # Get the original exception
        if last_attempt.failed:
            original_exc = last_attempt.result()  # Raises original exception
        
        # Or reraise the original exception directly
        retry_err.reraise()

Async Exception Chaining

@retry(
    retry=retry_if_exception_cause_type(ConnectionError),
    stop=stop_after_attempt(3)
)
async def async_with_exception_chaining():
    try:
        await external_async_api()
    except ConnectionError as e:
        # Chain exceptions for cause analysis  
        raise ValueError("External API failed") from e

Advanced Async Patterns

Async Circuit Breaker

import asyncio
from datetime import datetime, timedelta

class AsyncCircuitBreaker:
    def __init__(self):
        self.failure_count = 0
        self.last_failure_time = None
        self.circuit_open = False
    
    async def should_retry(self, retry_state):
        if self.circuit_open:
            # Circuit is open, check if enough time has passed
            if datetime.now() - self.last_failure_time > timedelta(minutes=5):
                self.circuit_open = False
                self.failure_count = 0
                return True
            return False
        
        # Circuit is closed, normal retry logic
        if retry_state.outcome and retry_state.outcome.failed:
            self.failure_count += 1
            if self.failure_count >= 5:
                self.circuit_open = True
                self.last_failure_time = datetime.now()
                return False
        
        return True

circuit_breaker = AsyncCircuitBreaker()

@retry(retry=circuit_breaker.should_retry)
async def circuit_protected_async_operation():
    pass

Async Resource Pool Retry

import asyncio

async def async_resource_retry():
    # Pool of async resources (connections, etc.)
    resource_pool = asyncio.BoundedSemaphore(10)
    
    @retry(
        stop=stop_after_delay(60),
        wait=wait_exponential(multiplier=1, max=5)
    )
    async def acquire_and_use_resource():
        async with resource_pool:
            # Use resource with retry protection
            return await use_async_resource()
    
    return await acquire_and_use_resource()

Async Rate Limiting Integration

import asyncio
from aiolimiter import AsyncLimiter

# Rate limiter integration
rate_limiter = AsyncLimiter(10, 60)  # 10 requests per minute

@retry(
    retry=retry_if_exception_type(RateLimitError),
    wait=wait_exponential(multiplier=1, min=60, max=300),  # Wait for rate limit reset
    stop=stop_after_attempt(3)
)
async def rate_limited_async_call():
    async with rate_limiter:
        return await api_call()

Async Retry with Context Managers

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_retry_context(retrying_config):
    """Async context manager for retry operations."""
    async_retrying = AsyncRetrying(**retrying_config)
    
    try:
        yield async_retrying
    finally:
        # Cleanup code
        await cleanup_async_resources()

# Usage
async def context_retry_example():
    config = {
        'stop': stop_after_attempt(3),
        'wait': wait_exponential(multiplier=1)
    }
    
    async with async_retry_context(config) as retrying:
        result = await retrying(async_operation, param=value)
        return result

Performance Considerations

Async Overhead Minimization

# For high-frequency async operations, minimize retry overhead
@retry(
    wait=wait_none(),  # No async sleep overhead
    stop=stop_after_attempt(2),  # Quick failure
    retry=retry_if_exception_type(TransientAsyncError)
)
async def high_frequency_async_operation():
    pass

Memory Management in Long-Running Async Retries

# For long-running async services, manage retry state memory
@retry(
    stop=stop_never,  # Infinite retry for services
    wait=wait_exponential(multiplier=1, max=300),  # Cap wait at 5 minutes
    retry=retry_if_exception_type(RecoverableAsyncError)
)
async def persistent_async_service():
    # Service code that should retry indefinitely
    pass

This comprehensive async support enables robust retry behavior for modern asynchronous Python applications across asyncio, trio, and Tornado frameworks with full integration into Python's async/await ecosystem.

Install with Tessl CLI

npx tessl i tessl/pypi-tenacity

docs

async-support.md

callbacks-hooks.md

core-decorator.md

index.md

retry-strategies.md

stop-conditions.md

utilities.md

wait-strategies.md

tile.json