Simple retry client for aiohttp with configurable backoff strategies and error handling
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive retry strategies for controlling timeout intervals between retry attempts. Each strategy implements different mathematical approaches to backoff timing, allowing fine-tuned control over retry behavior for various network conditions and failure scenarios.
Implements exponential backoff where timeout increases exponentially with each attempt. This is the default strategy and is recommended for most use cases as it provides good balance between responsiveness and avoiding server overload.
class ExponentialRetry(RetryOptionsBase):
def __init__(
self,
attempts: int = 3,
start_timeout: float = 0.1,
max_timeout: float = 30.0,
factor: float = 2.0,
statuses: set[int] | None = None,
exceptions: set[type[Exception]] | None = None,
methods: set[str] | None = None,
retry_all_server_errors: bool = True,
evaluate_response_callback: EvaluateResponseCallbackType | None = None
): ...
def get_timeout(
self,
attempt: int,
response: ClientResponse | None = None
) -> float:
"""
Calculate exponential backoff timeout.
Args:
attempt (int): Current attempt number (1-based)
response (ClientResponse, optional): Response object from previous attempt
Returns:
float: Timeout in seconds, calculated as start_timeout * (factor ** attempt), capped at max_timeout
"""Usage example:
from aiohttp_retry import RetryClient, ExponentialRetry
retry_options = ExponentialRetry(
attempts=4,
start_timeout=0.5, # Start with 0.5s
max_timeout=10.0, # Cap at 10s
factor=2.0 # Double each time: 0.5s, 1s, 2s, 4s
)
async with RetryClient(retry_options=retry_options) as client:
response = await client.get('https://api.example.com/data')Generates random timeout intervals within specified bounds. Useful for preventing thundering herd problems when multiple clients might retry simultaneously.
class RandomRetry(RetryOptionsBase):
def __init__(
self,
attempts: int = 3,
statuses: Iterable[int] | None = None,
exceptions: Iterable[type[Exception]] | None = None,
methods: Iterable[str] | None = None,
min_timeout: float = 0.1,
max_timeout: float = 3.0,
random_func: Callable[[], float] = random.random,
retry_all_server_errors: bool = True,
evaluate_response_callback: EvaluateResponseCallbackType | None = None
): ...
def get_timeout(
self,
attempt: int,
response: ClientResponse | None = None
) -> float:
"""
Generate random timeout between min and max bounds.
Args:
attempt (int): Current attempt number (ignored)
response (ClientResponse, optional): Response object (ignored)
Returns:
float: Random timeout between min_timeout and max_timeout
"""Usage example:
from aiohttp_retry import RetryClient, RandomRetry
retry_options = RandomRetry(
attempts=5,
min_timeout=1.0, # Minimum 1 second
max_timeout=5.0 # Maximum 5 seconds
)
async with RetryClient(retry_options=retry_options) as client:
response = await client.get('https://api.example.com/data')Uses a predefined list of timeout values, cycling through them in order. Provides complete control over timeout progression and is useful when you have specific timing requirements.
class ListRetry(RetryOptionsBase):
def __init__(
self,
timeouts: list[float],
statuses: Iterable[int] | None = None,
exceptions: Iterable[type[Exception]] | None = None,
methods: Iterable[str] | None = None,
retry_all_server_errors: bool = True,
evaluate_response_callback: EvaluateResponseCallbackType | None = None
):
"""
Initialize ListRetry with predefined timeout values.
The number of attempts is automatically set to len(timeouts).
Each retry will use the corresponding timeout from the list.
Args:
timeouts: List of timeout values in seconds for each retry attempt
"""
def get_timeout(
self,
attempt: int,
response: ClientResponse | None = None
) -> float:
"""
Return timeout from predefined list.
Args:
attempt (int): Current attempt number, used as index into timeouts list
response (ClientResponse, optional): Response object (ignored)
Returns:
float: Timeout from timeouts[attempt], since attempts is set to len(timeouts)
"""Usage example:
from aiohttp_retry import RetryClient, ListRetry
# Custom timeout sequence: quick, medium, slow, very slow
retry_options = ListRetry(
timeouts=[0.5, 2.0, 5.0, 10.0]
)
async with RetryClient(retry_options=retry_options) as client:
response = await client.get('https://api.example.com/data')Implements Fibonacci-based timeout progression where each timeout is the sum of the two preceding timeouts. Provides a middle ground between linear and exponential growth.
class FibonacciRetry(RetryOptionsBase):
def __init__(
self,
attempts: int = 3,
multiplier: float = 1.0,
statuses: Iterable[int] | None = None,
exceptions: Iterable[type[Exception]] | None = None,
methods: Iterable[str] | None = None,
max_timeout: float = 3.0,
retry_all_server_errors: bool = True,
evaluate_response_callback: EvaluateResponseCallbackType | None = None
): ...
def get_timeout(
self,
attempt: int,
response: ClientResponse | None = None
) -> float:
"""
Calculate Fibonacci-based timeout.
Args:
attempt (int): Current attempt number (ignored, uses internal state)
response (ClientResponse, optional): Response object (ignored)
Returns:
float: Timeout following Fibonacci sequence * multiplier, capped at max_timeout
"""Usage example:
from aiohttp_retry import RetryClient, FibonacciRetry
retry_options = FibonacciRetry(
attempts=6,
multiplier=0.5, # Scale down the sequence
max_timeout=15.0 # Cap at 15 seconds
)
# Timeout sequence: 0.5s, 0.5s, 1.0s, 1.5s, 2.5s, 4.0s
async with RetryClient(retry_options=retry_options) as client:
response = await client.get('https://api.example.com/data')Combines exponential backoff with random jitter to prevent synchronized retry attempts across multiple clients. Helps avoid thundering herd problems while maintaining exponential backoff benefits.
class JitterRetry(ExponentialRetry):
def __init__(
self,
attempts: int = 3,
start_timeout: float = 0.1,
max_timeout: float = 30.0,
factor: float = 2.0,
statuses: set[int] | None = None,
exceptions: set[type[Exception]] | None = None,
methods: set[str] | None = None,
random_interval_size: float = 2.0,
retry_all_server_errors: bool = True,
evaluate_response_callback: EvaluateResponseCallbackType | None = None
): ...
def get_timeout(
self,
attempt: int,
response: ClientResponse | None = None
) -> float:
"""
Calculate exponential backoff with random jitter.
Formula: base_exponential_timeout + (random(0, random_interval_size) ** factor)
Where base_exponential_timeout = start_timeout * (factor ** attempt), capped at max_timeout
Args:
attempt (int): Current attempt number (1-based)
response (ClientResponse, optional): Response object (ignored)
Returns:
float: Exponential timeout + random jitter component
"""Usage example:
from aiohttp_retry import RetryClient, JitterRetry
retry_options = JitterRetry(
attempts=4,
start_timeout=1.0,
factor=2.0,
random_interval_size=3.0 # Add up to 3 seconds of random jitter
)
async with RetryClient(retry_options=retry_options) as client:
response = await client.get('https://api.example.com/data')def RetryOptions(*args, **kwargs) -> ExponentialRetry:
"""
Deprecated alias for ExponentialRetry.
This function is deprecated and will be removed in a future version.
Use ExponentialRetry directly instead.
Returns:
ExponentialRetry: An ExponentialRetry instance with the provided arguments
"""All retry strategies support these common configuration parameters:
The evaluate_response_callback function receives a ClientResponse object and returns True if the request should be retried, False otherwise. This allows for custom retry logic based on response content, headers, or other factors beyond just status codes.
Install with Tessl CLI
npx tessl i tessl/pypi-aiohttp-retry