Retry code until it succeeds
Stop conditions determine when to stop retrying and give up. Tenacity provides 7+ stop strategies based on attempt counts, elapsed time, external events, and logical combinations to control retry termination precisely.
from tenacity.stop import stop_base
class stop_base(ABC):
"""
Abstract base class for all stop strategies.
Provides logical operators for combining stop conditions
and defines the interface all stop strategies must implement.
"""
@abstractmethod
def __call__(self, retry_state: RetryCallState) -> bool:
"""
Determine whether to stop retrying based on current state.
Parameters:
- retry_state: Complete state of current retry session
Returns:
True if retrying should stop, False to continue attempting
"""
def __and__(self, other: 'stop_base') -> 'stop_all':
"""Combine with another condition using AND logic."""
def __or__(self, other: 'stop_base') -> 'stop_any':
"""Combine with another condition using OR logic."""from tenacity.stop import StopBaseT
StopBaseT = Union[stop_base, Callable[[RetryCallState], bool]]from tenacity import stop_never
# Singleton instance - never stop retrying
stop_never: stop_base # Always returns FalseWarning: Using stop_never without other stop conditions can create infinite retry loops. Always combine with retry conditions that will eventually return False.
from tenacity import stop_after_attempt
class stop_after_attempt(stop_base):
"""
Stop after reaching maximum number of attempts.
Most commonly used stop strategy - limits total attempts including
the initial attempt. For example, max_attempt_number=3 allows
initial attempt + 2 retries.
"""
def __init__(self, max_attempt_number: int):
"""
Initialize with maximum attempt count.
Parameters:
- max_attempt_number: Maximum number of attempts (including initial)
Must be >= 1
"""# Stop after 3 total attempts (initial + 2 retries)
@retry(stop=stop_after_attempt(3))
def limited_retries():
pass
# Single attempt only (no retries)
@retry(stop=stop_after_attempt(1))
def no_retries():
passfrom tenacity import stop_after_delay
from tenacity._utils import time_unit_type
class stop_after_delay(stop_base):
"""
Stop after maximum elapsed time from first attempt.
Measures total time from the start of the first attempt.
May exceed max_delay due to the final sleep period before
the last attempt that causes the timeout.
"""
def __init__(self, max_delay: time_unit_type):
"""
Initialize with maximum elapsed time.
Parameters:
- max_delay: Maximum time to spend retrying
Can be int/float (seconds) or timedelta
"""from tenacity import stop_before_delay
class stop_before_delay(stop_base):
"""
Stop before next sleep would exceed maximum delay.
More precise timing control than stop_after_delay.
Prevents starting attempts that would exceed the time limit
when including their sleep period.
"""
def __init__(self, max_delay: time_unit_type):
"""
Initialize with maximum delay threshold.
Parameters:
- max_delay: Maximum total time including next sleep
Can be int/float (seconds) or timedelta
"""from datetime import timedelta
# Stop after 30 seconds total
@retry(stop=stop_after_delay(30))
def time_limited_operation():
pass
# Stop after 5 minutes using timedelta
@retry(stop=stop_after_delay(timedelta(minutes=5)))
def long_running_operation():
pass
# Precise timing - stop before exceeding 10 seconds
@retry(stop=stop_before_delay(10))
def precisely_timed_operation():
passfrom tenacity import stop_when_event_set
import threading
class stop_when_event_set(stop_base):
"""
Stop when a threading.Event is set.
Enables external control over retry stopping through
threading events. Useful for graceful shutdown scenarios
or coordinated stopping across multiple threads.
"""
def __init__(self, event: threading.Event):
"""
Initialize with threading event.
Parameters:
- event: threading.Event that controls stopping
When event.is_set() returns True, retrying stops
"""import threading
# Create shutdown event
shutdown_event = threading.Event()
@retry(stop=stop_when_event_set(shutdown_event))
def interruptible_operation():
pass
# In another thread or signal handler
def shutdown_handler():
shutdown_event.set() # This will stop all retrying operationsfrom tenacity import stop_any
class stop_any(stop_base):
"""
Stop if ANY of the provided conditions are true (OR logic).
Combines multiple stop strategies with logical OR.
Stops as soon as any condition is met.
"""
def __init__(self, *stops: stop_base):
"""
Initialize with stop strategies to combine.
Parameters:
- *stops: Variable number of stop strategies
Returns True if any strategy returns True.
"""from tenacity import stop_all
class stop_all(stop_base):
"""
Stop if ALL of the provided conditions are true (AND logic).
Combines multiple stop strategies with logical AND.
Only stops when all conditions are simultaneously met.
"""
def __init__(self, *stops: stop_base):
"""
Initialize with stop strategies to combine.
Parameters:
- *stops: Variable number of stop strategies
Returns True only if all strategies return True.
"""# Stop after either 5 attempts OR 30 seconds
@retry(stop=stop_any(
stop_after_attempt(5),
stop_after_delay(30)
))
def flexible_stopping():
pass
# Stop only when BOTH 10 attempts AND 60 seconds are reached
@retry(stop=stop_all(
stop_after_attempt(10),
stop_after_delay(60)
))
def conservative_stopping():
passimport threading
from datetime import timedelta
# Complex stopping logic
shutdown_event = threading.Event()
comprehensive_stop = stop_any(
stop_after_attempt(10), # Max 10 attempts
stop_after_delay(timedelta(minutes=5)), # Or 5 minutes
stop_when_event_set(shutdown_event) # Or external shutdown
)
@retry(stop=comprehensive_stop)
def robust_operation():
pass# Use & and | operators for logical combinations
attempt_limit = stop_after_attempt(5)
time_limit = stop_after_delay(30)
# OR logic using | operator
flexible_stop = attempt_limit | time_limit
# AND logic using & operator
strict_stop = attempt_limit & time_limit
@retry(stop=flexible_stop)
def operation_with_flexible_stopping():
pass# Stop before hitting rate limit reset time
@retry(
stop=stop_before_delay(3600), # API rate limit resets every hour
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def rate_limited_api_call():
pass# Conservative database connection with multiple stop conditions
@retry(stop=stop_any(
stop_after_attempt(3), # Don't overwhelm DB
stop_after_delay(30), # Don't block application startup
stop_when_event_set(shutdown_event) # Respect shutdown requests
))
def connect_to_database():
pass# Background task that should retry until explicitly stopped
@retry(
stop=stop_when_event_set(task_stop_event),
wait=wait_exponential(multiplier=1, max=300) # Max 5 minute wait
)
def background_sync_task():
pass# Implement circuit breaker with time-based recovery
failure_count = 0
circuit_open_time = None
def circuit_breaker_stop(retry_state):
global failure_count, circuit_open_time
# Open circuit after 5 failures
if failure_count >= 5:
if circuit_open_time is None:
circuit_open_time = time.time()
# Keep circuit open for 60 seconds
if time.time() - circuit_open_time < 60:
return True
else:
# Reset circuit after timeout
failure_count = 0
circuit_open_time = None
return False
@retry(stop=circuit_breaker_stop)
def circuit_protected_operation():
passfrom datetime import timedelta
# All equivalent - 30 second time limit
@retry(stop=stop_after_delay(30)) # int seconds
@retry(stop=stop_after_delay(30.0)) # float seconds
@retry(stop=stop_after_delay(timedelta(seconds=30))) # timedelta
# Complex time specifications
@retry(stop=stop_after_delay(timedelta(minutes=2, seconds=30)))
def precisely_timed_operation():
passimport signal
import threading
# Global shutdown event for coordinated stopping
global_shutdown = threading.Event()
def signal_handler(signum, frame):
global_shutdown.set()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# All retry operations respect global shutdown
@retry(stop=stop_any(
stop_after_attempt(5),
stop_when_event_set(global_shutdown)
))
def service_operation():
pass# Testing with immediate stop
@retry(stop=stop_after_attempt(1))
def test_no_retry():
pass
# Testing with never stop (use with mock that eventually succeeds)
@retry(stop=stop_never, retry=retry_if_result(lambda x: x != "success"))
def test_until_success():
pass# Cheap checks first in combinations
efficient_stop = stop_any(
stop_after_attempt(3), # Fast: simple counter check
stop_when_event_set(event), # Fast: single flag check
stop_after_delay(60) # Slower: time calculation
)# For very long-running retry scenarios, prefer stop_before_delay
# to avoid accumulating large idle_for values in retry state
@retry(
stop=stop_before_delay(timedelta(hours=24)), # 24 hour limit
wait=wait_exponential(multiplier=1, max=3600) # Max 1 hour wait
)
def daily_sync_operation():
passThis comprehensive coverage of stop conditions provides precise control over retry termination, enabling robust retry behavior that respects time limits, attempt limits, and external coordination requirements.
Install with Tessl CLI
npx tessl i tessl/pypi-tenacity