Google API client core library providing common helpers, utilities, and components for Python client libraries
—
Configurable retry mechanisms with exponential backoff, custom predicates, and support for both synchronous and asynchronous operations including streaming. The retry system provides robust error recovery for transient failures in API communication.
Retry decorators for single request-response operations with exponential backoff and configurable retry conditions.
class Retry:
"""
Retry decorator for synchronous unary operations.
Args:
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
initial (float): Initial delay between retries in seconds (default: 1.0)
maximum (float): Maximum delay between retries in seconds (default: 60.0)
multiplier (float): Multiplier for exponential backoff (default: 2.0)
deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
on_error (Callable[[Exception], None]): Optional callback for each retry attempt
"""
def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
def __call__(self, func): ...
def with_deadline(self, deadline): ...
def with_delay(self, initial=None, maximum=None, multiplier=None): ...
def with_predicate(self, predicate): ...
class AsyncRetry:
"""
Retry decorator for asynchronous unary operations.
Args:
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
initial (float): Initial delay between retries in seconds (default: 1.0)
maximum (float): Maximum delay between retries in seconds (default: 60.0)
multiplier (float): Multiplier for exponential backoff (default: 2.0)
deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
on_error (Callable[[Exception], None]): Optional callback for each retry attempt
"""
def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
def __call__(self, func): ...
def with_deadline(self, deadline): ...
def with_delay(self, initial=None, maximum=None, multiplier=None): ...
def with_predicate(self, predicate): ...Retry decorators for streaming operations that can handle connection interruptions and resume streaming.
class StreamingRetry:
"""
Retry decorator for synchronous streaming operations.
Args:
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
initial (float): Initial delay between retries in seconds (default: 1.0)
maximum (float): Maximum delay between retries in seconds (default: 60.0)
multiplier (float): Multiplier for exponential backoff (default: 2.0)
deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
on_error (Callable[[Exception], None]): Optional callback for each retry attempt
"""
def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
def __call__(self, func): ...
def with_deadline(self, deadline): ...
def with_delay(self, initial=None, maximum=None, multiplier=None): ...
def with_predicate(self, predicate): ...
class AsyncStreamingRetry:
"""
Retry decorator for asynchronous streaming operations.
Args:
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
initial (float): Initial delay between retries in seconds (default: 1.0)
maximum (float): Maximum delay between retries in seconds (default: 60.0)
multiplier (float): Multiplier for exponential backoff (default: 2.0)
deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
on_error (Callable[[Exception], None]): Optional callback for each retry attempt
"""
def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
def __call__(self, func): ...
def with_deadline(self, deadline): ...
def with_delay(self, initial=None, maximum=None, multiplier=None): ...
def with_predicate(self, predicate): ...Core retry mechanism and helper functions for implementing retry logic.
def retry_target(target, predicate, sleep_generator, deadline=None, on_error=None):
"""
Call a function and retry on transient errors.
Args:
target (Callable): Function to call
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
sleep_generator: Iterator yielding sleep durations between retries
deadline (float, optional): Deadline for all retry attempts in seconds
on_error (Callable[[Exception], None], optional): Callback for each retry attempt
Returns:
Any: Result of successful target function call
Raises:
RetryError: When all retry attempts are exhausted
"""
async def retry_target_async(target, predicate, sleep_generator, deadline=None, on_error=None):
"""
Async version of retry_target.
Args:
target (Callable): Async function to call
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
sleep_generator: Iterator yielding sleep durations between retries
deadline (float, optional): Deadline for all retry attempts in seconds
on_error (Callable[[Exception], None], optional): Callback for each retry attempt
Returns:
Any: Result of successful target function call
Raises:
RetryError: When all retry attempts are exhausted
"""
def retry_target_stream(target, predicate, sleep_generator, deadline=None, on_error=None):
"""
Call a streaming function and retry on transient errors.
Args:
target (Callable): Streaming function to call
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
sleep_generator: Iterator yielding sleep durations between retries
deadline (float, optional): Deadline for all retry attempts in seconds
on_error (Callable[[Exception], None], optional): Callback for each retry attempt
Returns:
Iterator: Stream results from successful target function call
Raises:
RetryError: When all retry attempts are exhausted
"""
async def retry_target_stream_async(target, predicate, sleep_generator, deadline=None, on_error=None):
"""
Async version of retry_target_stream.
Args:
target (Callable): Async streaming function to call
predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
sleep_generator: Iterator yielding sleep durations between retries
deadline (float, optional): Deadline for all retry attempts in seconds
on_error (Callable[[Exception], None], optional): Callback for each retry attempt
Returns:
AsyncIterator: Async stream results from successful target function call
Raises:
RetryError: When all retry attempts are exhausted
"""Utilities for generating retry delays and determining retry conditions.
def exponential_sleep_generator(initial_delay, maximum_delay, multiplier):
"""
Generate exponentially increasing sleep delays.
Args:
initial_delay (float): Initial delay in seconds
maximum_delay (float): Maximum delay in seconds
multiplier (float): Multiplier for exponential backoff
Yields:
float: Sleep duration in seconds for each retry attempt
"""
def if_exception_type(*exception_types):
"""
Create a predicate that returns True if exception is one of the specified types.
Args:
*exception_types: Exception classes to match
Returns:
Callable[[Exception], bool]: Predicate function
"""
def if_transient_error(exception):
"""
Predicate that returns True for transient errors that should be retried.
Args:
exception (Exception): Exception to check
Returns:
bool: True if exception represents a transient error
"""
def build_retry_error(exc_list, reason):
"""
Build a RetryError from a list of exceptions and failure reason.
Args:
exc_list (List[Exception]): List of exceptions encountered during retries
reason (RetryFailureReason): Reason for retry failure
Returns:
RetryError: Constructed retry error with exception history
"""Enumeration of reasons why retry attempts failed.
from enum import Enum
class RetryFailureReason(Enum):
"""Reasons why retry attempts failed."""
TIMEOUT = "timeout"
NON_RETRYABLE_ERROR = "non_retryable_error"
EMPTY_GENERATOR = "empty_generator"from google.api_core import retry
from google.api_core import exceptions
import requests
# Configure retry with exponential backoff
retry_config = retry.Retry(
predicate=retry.if_exception_type(
exceptions.InternalServerError,
exceptions.ServiceUnavailable,
exceptions.DeadlineExceeded
),
initial=1.0,
maximum=60.0,
multiplier=2.0,
deadline=300.0
)
@retry_config
def make_api_call():
response = requests.get("https://api.example.com/data")
if response.status_code >= 500:
raise exceptions.InternalServerError("Server error")
return response.json()
# Call with automatic retry
try:
data = make_api_call()
print("Success:", data)
except retry.RetryError as e:
print("All retry attempts failed:", e)from google.api_core import retry
from google.api_core import exceptions
def custom_predicate(exception):
"""Custom logic for determining if error should be retried."""
if isinstance(exception, exceptions.TooManyRequests):
return True
if isinstance(exception, exceptions.ServiceUnavailable):
return True
if isinstance(exception, ConnectionError):
return True
return False
retry_config = retry.Retry(
predicate=custom_predicate,
initial=2.0,
maximum=120.0,
deadline=600.0
)
@retry_config
def complex_operation():
# Operation that may fail with various errors
passimport asyncio
from google.api_core import retry
from google.api_core import exceptions
import aiohttp
async_retry_config = retry.AsyncRetry(
predicate=retry.if_exception_type(
exceptions.ServiceUnavailable,
aiohttp.ClientError
),
initial=1.0,
maximum=60.0,
multiplier=1.5
)
@async_retry_config
async def async_api_call():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
if response.status >= 500:
raise exceptions.ServiceUnavailable("Server unavailable")
return await response.json()
# Use async retry
async def main():
try:
data = await async_api_call()
print("Success:", data)
except retry.RetryError as e:
print("Async retry failed:", e)
asyncio.run(main())from google.api_core import retry
from google.api_core import exceptions
streaming_retry = retry.StreamingRetry(
predicate=retry.if_exception_type(
exceptions.ServiceUnavailable,
ConnectionError
),
initial=0.5,
maximum=30.0,
deadline=180.0
)
@streaming_retry
def stream_data():
"""Generator function that yields data from a stream."""
# Simulate streaming API call that may fail
for i in range(100):
if i == 50: # Simulate transient error
raise exceptions.ServiceUnavailable("Temporary unavailable")
yield f"data_item_{i}"
# Use streaming retry
try:
for item in stream_data():
print(f"Received: {item}")
except retry.RetryError as e:
print("Streaming retry exhausted:", e)from google.api_core import retry
import time
def unreliable_function():
"""Function that fails randomly."""
import random
if random.random() < 0.7: # 70% failure rate
raise ConnectionError("Random failure")
return "Success!"
# Use retry_target directly without decorator
sleep_gen = retry.exponential_sleep_generator(1.0, 60.0, 2.0)
predicate = retry.if_exception_type(ConnectionError)
try:
result = retry.retry_target(
target=unreliable_function,
predicate=predicate,
sleep_generator=sleep_gen,
deadline=120.0,
on_error=lambda exc: print(f"Retry attempt failed: {exc}")
)
print("Final result:", result)
except retry.RetryError as e:
print("All retries exhausted:", e)Install with Tessl CLI
npx tessl i tessl/pypi-google-api-core