CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-api-core

Google API client core library providing common helpers, utilities, and components for Python client libraries

Pending
Overview
Eval results
Files

retry.mddocs/

Retry Logic

Configurable retry mechanisms with exponential backoff, custom predicates, and support for both synchronous and asynchronous operations including streaming. The retry system provides robust error recovery for transient failures in API communication.

Capabilities

Unary Retry Classes

Retry decorators for single request-response operations with exponential backoff and configurable retry conditions.

class Retry:
    """
    Retry decorator for synchronous unary operations.
    
    Args:
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        initial (float): Initial delay between retries in seconds (default: 1.0)
        maximum (float): Maximum delay between retries in seconds (default: 60.0)  
        multiplier (float): Multiplier for exponential backoff (default: 2.0)
        deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
        on_error (Callable[[Exception], None]): Optional callback for each retry attempt
    """
    def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
    
    def __call__(self, func): ...
    def with_deadline(self, deadline): ...
    def with_delay(self, initial=None, maximum=None, multiplier=None): ...
    def with_predicate(self, predicate): ...

class AsyncRetry:
    """
    Retry decorator for asynchronous unary operations.
    
    Args:
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        initial (float): Initial delay between retries in seconds (default: 1.0)
        maximum (float): Maximum delay between retries in seconds (default: 60.0)
        multiplier (float): Multiplier for exponential backoff (default: 2.0)
        deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
        on_error (Callable[[Exception], None]): Optional callback for each retry attempt
    """
    def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
    
    def __call__(self, func): ...
    def with_deadline(self, deadline): ...
    def with_delay(self, initial=None, maximum=None, multiplier=None): ...
    def with_predicate(self, predicate): ...

Streaming Retry Classes

Retry decorators for streaming operations that can handle connection interruptions and resume streaming.

class StreamingRetry:
    """
    Retry decorator for synchronous streaming operations.
    
    Args:
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        initial (float): Initial delay between retries in seconds (default: 1.0)  
        maximum (float): Maximum delay between retries in seconds (default: 60.0)
        multiplier (float): Multiplier for exponential backoff (default: 2.0)
        deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
        on_error (Callable[[Exception], None]): Optional callback for each retry attempt
    """
    def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
    
    def __call__(self, func): ...
    def with_deadline(self, deadline): ...
    def with_delay(self, initial=None, maximum=None, multiplier=None): ...
    def with_predicate(self, predicate): ...

class AsyncStreamingRetry:
    """
    Retry decorator for asynchronous streaming operations.
    
    Args:
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        initial (float): Initial delay between retries in seconds (default: 1.0)
        maximum (float): Maximum delay between retries in seconds (default: 60.0)
        multiplier (float): Multiplier for exponential backoff (default: 2.0) 
        deadline (float): Total time limit for all retry attempts in seconds (default: 120.0)
        on_error (Callable[[Exception], None]): Optional callback for each retry attempt
    """
    def __init__(self, predicate=None, initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, on_error=None): ...
    
    def __call__(self, func): ...
    def with_deadline(self, deadline): ...
    def with_delay(self, initial=None, maximum=None, multiplier=None): ...
    def with_predicate(self, predicate): ...

Retry Utility Functions

Core retry mechanism and helper functions for implementing retry logic.

def retry_target(target, predicate, sleep_generator, deadline=None, on_error=None):
    """
    Call a function and retry on transient errors.
    
    Args:
        target (Callable): Function to call
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        sleep_generator: Iterator yielding sleep durations between retries
        deadline (float, optional): Deadline for all retry attempts in seconds
        on_error (Callable[[Exception], None], optional): Callback for each retry attempt
        
    Returns:
        Any: Result of successful target function call
        
    Raises:
        RetryError: When all retry attempts are exhausted
    """

async def retry_target_async(target, predicate, sleep_generator, deadline=None, on_error=None):
    """
    Async version of retry_target.
    
    Args:
        target (Callable): Async function to call
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        sleep_generator: Iterator yielding sleep durations between retries
        deadline (float, optional): Deadline for all retry attempts in seconds
        on_error (Callable[[Exception], None], optional): Callback for each retry attempt
        
    Returns:
        Any: Result of successful target function call
        
    Raises:
        RetryError: When all retry attempts are exhausted
    """

def retry_target_stream(target, predicate, sleep_generator, deadline=None, on_error=None):
    """
    Call a streaming function and retry on transient errors.
    
    Args:
        target (Callable): Streaming function to call
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        sleep_generator: Iterator yielding sleep durations between retries  
        deadline (float, optional): Deadline for all retry attempts in seconds
        on_error (Callable[[Exception], None], optional): Callback for each retry attempt
        
    Returns:
        Iterator: Stream results from successful target function call
        
    Raises:
        RetryError: When all retry attempts are exhausted
    """

async def retry_target_stream_async(target, predicate, sleep_generator, deadline=None, on_error=None):
    """
    Async version of retry_target_stream.
    
    Args:
        target (Callable): Async streaming function to call
        predicate (Callable[[Exception], bool]): Function to determine if exception should trigger retry
        sleep_generator: Iterator yielding sleep durations between retries
        deadline (float, optional): Deadline for all retry attempts in seconds
        on_error (Callable[[Exception], None], optional): Callback for each retry attempt
        
    Returns:
        AsyncIterator: Async stream results from successful target function call
        
    Raises:
        RetryError: When all retry attempts are exhausted
    """

Sleep Generation and Predicates

Utilities for generating retry delays and determining retry conditions.

def exponential_sleep_generator(initial_delay, maximum_delay, multiplier):
    """
    Generate exponentially increasing sleep delays.
    
    Args:
        initial_delay (float): Initial delay in seconds
        maximum_delay (float): Maximum delay in seconds
        multiplier (float): Multiplier for exponential backoff
        
    Yields:
        float: Sleep duration in seconds for each retry attempt
    """

def if_exception_type(*exception_types):
    """
    Create a predicate that returns True if exception is one of the specified types.
    
    Args:
        *exception_types: Exception classes to match
        
    Returns:
        Callable[[Exception], bool]: Predicate function
    """

def if_transient_error(exception):
    """
    Predicate that returns True for transient errors that should be retried.
    
    Args:
        exception (Exception): Exception to check
        
    Returns:
        bool: True if exception represents a transient error
    """

def build_retry_error(exc_list, reason):
    """
    Build a RetryError from a list of exceptions and failure reason.
    
    Args:
        exc_list (List[Exception]): List of exceptions encountered during retries
        reason (RetryFailureReason): Reason for retry failure
        
    Returns:
        RetryError: Constructed retry error with exception history
    """

Retry Failure Reasons

Enumeration of reasons why retry attempts failed.

from enum import Enum

class RetryFailureReason(Enum):
    """Reasons why retry attempts failed."""
    TIMEOUT = "timeout"
    NON_RETRYABLE_ERROR = "non_retryable_error" 
    EMPTY_GENERATOR = "empty_generator"

Usage Examples

Basic Retry Configuration

from google.api_core import retry
from google.api_core import exceptions
import requests

# Configure retry with exponential backoff
retry_config = retry.Retry(
    predicate=retry.if_exception_type(
        exceptions.InternalServerError,
        exceptions.ServiceUnavailable,
        exceptions.DeadlineExceeded
    ),
    initial=1.0,
    maximum=60.0,
    multiplier=2.0,
    deadline=300.0
)

@retry_config
def make_api_call():
    response = requests.get("https://api.example.com/data")
    if response.status_code >= 500:
        raise exceptions.InternalServerError("Server error")
    return response.json()

# Call with automatic retry
try:
    data = make_api_call()
    print("Success:", data)
except retry.RetryError as e:
    print("All retry attempts failed:", e)

Custom Retry Predicate

from google.api_core import retry
from google.api_core import exceptions

def custom_predicate(exception):
    """Custom logic for determining if error should be retried."""
    if isinstance(exception, exceptions.TooManyRequests):
        return True
    if isinstance(exception, exceptions.ServiceUnavailable):
        return True
    if isinstance(exception, ConnectionError):
        return True
    return False

retry_config = retry.Retry(
    predicate=custom_predicate,
    initial=2.0,
    maximum=120.0,
    deadline=600.0
)

@retry_config
def complex_operation():
    # Operation that may fail with various errors
    pass

Async Retry Usage

import asyncio
from google.api_core import retry
from google.api_core import exceptions
import aiohttp

async_retry_config = retry.AsyncRetry(
    predicate=retry.if_exception_type(
        exceptions.ServiceUnavailable,
        aiohttp.ClientError
    ),
    initial=1.0,
    maximum=60.0,
    multiplier=1.5
)

@async_retry_config
async def async_api_call():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            if response.status >= 500:
                raise exceptions.ServiceUnavailable("Server unavailable")
            return await response.json()

# Use async retry
async def main():
    try:
        data = await async_api_call()
        print("Success:", data)
    except retry.RetryError as e:
        print("Async retry failed:", e)

asyncio.run(main())

Streaming Retry Usage

from google.api_core import retry
from google.api_core import exceptions

streaming_retry = retry.StreamingRetry(
    predicate=retry.if_exception_type(
        exceptions.ServiceUnavailable,
        ConnectionError
    ),
    initial=0.5,
    maximum=30.0,
    deadline=180.0
)

@streaming_retry
def stream_data():
    """Generator function that yields data from a stream."""
    # Simulate streaming API call that may fail
    for i in range(100):
        if i == 50:  # Simulate transient error
            raise exceptions.ServiceUnavailable("Temporary unavailable")
        yield f"data_item_{i}"

# Use streaming retry
try:
    for item in stream_data():
        print(f"Received: {item}")
except retry.RetryError as e:
    print("Streaming retry exhausted:", e)

Using Retry Target Directly

from google.api_core import retry
import time

def unreliable_function():
    """Function that fails randomly."""
    import random
    if random.random() < 0.7:  # 70% failure rate
        raise ConnectionError("Random failure")
    return "Success!"

# Use retry_target directly without decorator
sleep_gen = retry.exponential_sleep_generator(1.0, 60.0, 2.0)
predicate = retry.if_exception_type(ConnectionError)

try:
    result = retry.retry_target(
        target=unreliable_function,
        predicate=predicate, 
        sleep_generator=sleep_gen,
        deadline=120.0,
        on_error=lambda exc: print(f"Retry attempt failed: {exc}")
    )
    print("Final result:", result)
except retry.RetryError as e:
    print("All retries exhausted:", e)

Install with Tessl CLI

npx tessl i tessl/pypi-google-api-core

docs

bidirectional-streaming.md

client-config.md

datetime.md

exceptions.md

gapic-framework.md

iam-policies.md

index.md

operations.md

page-iteration.md

path-templates.md

protobuf-helpers.md

retry.md

timeout.md

transport.md

universe-domain.md

tile.json