CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-api-core

Google API client core library providing common helpers, utilities, and components for Python client libraries

Pending
Overview
Eval results
Files

operations.mddocs/

Long-Running Operations

Management of Google API long-running operations with polling, cancellation, and both synchronous and asynchronous support. The operations system handles the complexities of tracking operation status, retrieving results, and managing timeouts for operations that may take minutes or hours to complete.

Capabilities

Synchronous Operations

Operations that can be polled synchronously until completion with configurable polling intervals and timeouts.

class Operation:
    """
    A Future for Google API long-running operations.
    
    Args:
        operation: Initial operation response from the API
        refresh (Callable): Function to refresh operation status  
        cancel (Callable): Function to cancel the operation
        result_type (type, optional): Type to deserialize operation result
        metadata_type (type, optional): Type to deserialize operation metadata
        retry (Retry, optional): Retry configuration for polling
    """
    def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
    
    def done(self):
        """
        Check if the operation is complete.
        
        Returns:
            bool: True if operation is complete, False otherwise
        """
    
    def running(self):
        """
        Check if the operation is currently running.
        
        Returns:
            bool: True if operation is running, False otherwise  
        """
    
    def result(self, timeout=None):
        """
        Get the result of the operation, blocking until complete.
        
        Args:
            timeout (float, optional): Maximum time to wait in seconds
            
        Returns:
            Any: The operation result
            
        Raises:
            GoogleAPICallError: If operation failed
            TimeoutError: If timeout exceeded
        """
    
    def exception(self, timeout=None):
        """
        Get the exception from a failed operation.
        
        Args:
            timeout (float, optional): Maximum time to wait in seconds
            
        Returns:
            Exception or None: Exception if operation failed, None if successful
        """
    
    def add_done_callback(self, callback):
        """
        Add a callback to be executed when operation completes.
        
        Args:
            callback (Callable): Function to call when operation completes
        """
    
    def cancel(self):
        """
        Cancel the operation.
        
        Returns:
            bool: True if cancellation was successful
        """
    
    def cancelled(self):
        """
        Check if the operation was cancelled.
        
        Returns:
            bool: True if operation was cancelled
        """
    
    @property
    def metadata(self):
        """
        Get operation metadata.
        
        Returns:
            Any: Operation metadata if available
        """
    
    @property
    def name(self):
        """
        Get the operation name/identifier.
        
        Returns:
            str: Operation name
        """

Asynchronous Operations

Async versions of operations that integrate with Python's asyncio event loop.

class AsyncOperation:
    """
    An async Future for Google API long-running operations.
    
    Args:
        operation: Initial operation response from the API
        refresh (Callable): Async function to refresh operation status
        cancel (Callable): Async function to cancel the operation  
        result_type (type, optional): Type to deserialize operation result
        metadata_type (type, optional): Type to deserialize operation metadata
        retry (AsyncRetry, optional): Async retry configuration for polling
    """
    def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
    
    async def done(self):
        """
        Check if the operation is complete.
        
        Returns:
            bool: True if operation is complete, False otherwise
        """
    
    def running(self):
        """
        Check if the operation is currently running.
        
        Returns:
            bool: True if operation is running, False otherwise
        """
    
    async def result(self, timeout=None):
        """
        Get the result of the operation, awaiting until complete.
        
        Args:
            timeout (float, optional): Maximum time to wait in seconds
            
        Returns:
            Any: The operation result
            
        Raises:
            GoogleAPICallError: If operation failed
            asyncio.TimeoutError: If timeout exceeded
        """
    
    async def exception(self, timeout=None):
        """
        Get the exception from a failed operation.
        
        Args:
            timeout (float, optional): Maximum time to wait in seconds
            
        Returns:
            Exception or None: Exception if operation failed, None if successful
        """
    
    def add_done_callback(self, callback):
        """
        Add a callback to be executed when operation completes.
        
        Args:
            callback (Callable): Function to call when operation completes
        """
    
    async def cancel(self):
        """
        Cancel the operation.
        
        Returns:
            bool: True if cancellation was successful
        """
    
    def cancelled(self):
        """
        Check if the operation was cancelled.
        
        Returns:
            bool: True if operation was cancelled
        """
    
    @property
    def metadata(self):
        """
        Get operation metadata.
        
        Returns:
            Any: Operation metadata if available
        """
    
    @property
    def name(self):
        """
        Get the operation name/identifier.
        
        Returns:
            str: Operation name
        """

Extended Operations

Enhanced operation handling for APIs with additional metadata and progress tracking.

class ExtendedOperation:
    """
    Extended operation with enhanced progress tracking and metadata.
    
    Args:
        operation: Initial operation response
        refresh (Callable): Function to refresh operation status
        cancel (Callable): Function to cancel operation
        polling_method (Callable, optional): Custom polling implementation
        result_type (type, optional): Type for operation result
        metadata_type (type, optional): Type for operation metadata
        retry (Retry, optional): Retry configuration
    """
    def __init__(self, operation, refresh, cancel, polling_method=None, result_type=None, metadata_type=None, retry=None): ...
    
    def done(self):
        """Check if operation is complete."""
    
    def result(self, timeout=None):
        """Get operation result with timeout."""
    
    def cancel(self):
        """Cancel the operation."""
    
    @property
    def progress_percent(self):
        """
        Get operation progress as percentage.
        
        Returns:
            float: Progress percentage (0.0 to 100.0)
        """
    
    @property
    def status_message(self):
        """
        Get current operation status message.
        
        Returns:
            str: Human-readable status message
        """

Operation Utility Functions

Helper functions for creating and managing operations.

def _refresh_http(api_request, operation_name, retry=None):
    """
    Refresh operation status via HTTP request.
    
    Args:
        api_request (Callable): HTTP request function
        operation_name (str): Name/ID of the operation
        retry (Retry, optional): Retry configuration for requests
        
    Returns:
        dict: Updated operation status
    """

def _cancel_http(api_request, operation_name):
    """
    Cancel operation via HTTP request.
    
    Args:
        api_request (Callable): HTTP request function  
        operation_name (str): Name/ID of the operation
        
    Returns:
        dict: Cancellation response
    """

Usage Examples

Basic Operation Usage

from google.api_core import operation
from google.api_core import retry
import requests

def refresh_operation(op_name):
    """Refresh operation status from API."""
    response = requests.get(f"https://api.example.com/operations/{op_name}")
    return response.json()

def cancel_operation(op_name):
    """Cancel operation via API."""
    response = requests.delete(f"https://api.example.com/operations/{op_name}")
    return response.json()

# Start a long-running operation
initial_response = requests.post("https://api.example.com/long-task", 
                                json={"task": "data_processing"})
operation_data = initial_response.json()

# Create operation object
op = operation.Operation(
    operation_data,
    refresh=lambda: refresh_operation(operation_data["name"]),
    cancel=lambda: cancel_operation(operation_data["name"]),
    retry=retry.Retry(initial=1.0, maximum=30.0)
)

# Wait for completion
try:
    result = op.result(timeout=300)  # Wait up to 5 minutes
    print("Operation completed:", result)
except Exception as e:
    print("Operation failed:", e)

Async Operation Usage

import asyncio
from google.api_core import operation_async
from google.api_core import retry
import aiohttp

async def async_refresh_operation(op_name):
    """Async refresh of operation status."""
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/operations/{op_name}") as response:
            return await response.json()

async def async_cancel_operation(op_name):
    """Async cancel operation.""" 
    async with aiohttp.ClientSession() as session:
        async with session.delete(f"https://api.example.com/operations/{op_name}") as response:
            return await response.json()

async def main():
    # Start async long-running operation
    async with aiohttp.ClientSession() as session:
        async with session.post("https://api.example.com/long-task",
                               json={"task": "async_processing"}) as response:
            operation_data = await response.json()
    
    # Create async operation
    async_op = operation_async.AsyncOperation(
        operation_data,
        refresh=lambda: async_refresh_operation(operation_data["name"]),
        cancel=lambda: async_cancel_operation(operation_data["name"]),
        retry=retry.AsyncRetry(initial=1.0, maximum=30.0)
    )
    
    # Await completion
    try:
        result = await async_op.result(timeout=300)
        print("Async operation completed:", result)
    except Exception as e:
        print("Async operation failed:", e)

asyncio.run(main())

Operation with Callbacks

from google.api_core import operation
import threading

def operation_completed_callback(op):
    """Callback executed when operation completes."""
    if op.exception():
        print(f"Operation {op.name} failed: {op.exception()}")
    else:
        print(f"Operation {op.name} succeeded: {op.result()}")

# Create operation with callback
op = operation.Operation(
    operation_data,
    refresh=refresh_func,
    cancel=cancel_func
)

# Add completion callback
op.add_done_callback(operation_completed_callback)

# Start operation in background thread
def monitor_operation():
    try:
        op.result()  # This will block until complete
    except Exception:
        pass  # Callback will handle the result

thread = threading.Thread(target=monitor_operation)
thread.start()

# Continue with other work...
print("Operation running in background")

Operation Cancellation

from google.api_core import operation
import time
import threading

# Start operation
op = operation.Operation(
    operation_data,
    refresh=refresh_func,
    cancel=cancel_func
)

# Monitor in background thread
def background_monitor():
    try:
        result = op.result(timeout=600)  # 10 minute timeout
        print("Operation completed:", result)
    except Exception as e:
        print("Operation error:", e)

thread = threading.Thread(target=background_monitor)
thread.start()

# Cancel after 30 seconds if still running
time.sleep(30)
if op.running():
    print("Cancelling long-running operation...")
    success = op.cancel()
    if success:
        print("Operation cancelled successfully")
    else:
        print("Failed to cancel operation")

thread.join()

Extended Operation with Progress

from google.api_core import extended_operation

# Create extended operation with progress tracking
ext_op = extended_operation.ExtendedOperation(
    operation_data,
    refresh=refresh_func,
    cancel=cancel_func
)

# Monitor progress
while not ext_op.done():
    progress = ext_op.progress_percent
    status = ext_op.status_message
    print(f"Progress: {progress:.1f}% - {status}")
    time.sleep(5)

# Get final result
if ext_op.exception():
    print("Operation failed:", ext_op.exception())
else:
    print("Operation completed:", ext_op.result())

Install with Tessl CLI

npx tessl i tessl/pypi-google-api-core

docs

bidirectional-streaming.md

client-config.md

datetime.md

exceptions.md

gapic-framework.md

iam-policies.md

index.md

operations.md

page-iteration.md

path-templates.md

protobuf-helpers.md

retry.md

timeout.md

transport.md

universe-domain.md

tile.json