Google API client core library providing common helpers, utilities, and components for Python client libraries
—
Management of Google API long-running operations with polling, cancellation, and both synchronous and asynchronous support. The operations system handles the complexities of tracking operation status, retrieving results, and managing timeouts for operations that may take minutes or hours to complete.
Operations that can be polled synchronously until completion with configurable polling intervals and timeouts.
class Operation:
"""
A Future for Google API long-running operations.
Args:
operation: Initial operation response from the API
refresh (Callable): Function to refresh operation status
cancel (Callable): Function to cancel the operation
result_type (type, optional): Type to deserialize operation result
metadata_type (type, optional): Type to deserialize operation metadata
retry (Retry, optional): Retry configuration for polling
"""
def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
def done(self):
"""
Check if the operation is complete.
Returns:
bool: True if operation is complete, False otherwise
"""
def running(self):
"""
Check if the operation is currently running.
Returns:
bool: True if operation is running, False otherwise
"""
def result(self, timeout=None):
"""
Get the result of the operation, blocking until complete.
Args:
timeout (float, optional): Maximum time to wait in seconds
Returns:
Any: The operation result
Raises:
GoogleAPICallError: If operation failed
TimeoutError: If timeout exceeded
"""
def exception(self, timeout=None):
"""
Get the exception from a failed operation.
Args:
timeout (float, optional): Maximum time to wait in seconds
Returns:
Exception or None: Exception if operation failed, None if successful
"""
def add_done_callback(self, callback):
"""
Add a callback to be executed when operation completes.
Args:
callback (Callable): Function to call when operation completes
"""
def cancel(self):
"""
Cancel the operation.
Returns:
bool: True if cancellation was successful
"""
def cancelled(self):
"""
Check if the operation was cancelled.
Returns:
bool: True if operation was cancelled
"""
@property
def metadata(self):
"""
Get operation metadata.
Returns:
Any: Operation metadata if available
"""
@property
def name(self):
"""
Get the operation name/identifier.
Returns:
str: Operation name
"""Async versions of operations that integrate with Python's asyncio event loop.
class AsyncOperation:
"""
An async Future for Google API long-running operations.
Args:
operation: Initial operation response from the API
refresh (Callable): Async function to refresh operation status
cancel (Callable): Async function to cancel the operation
result_type (type, optional): Type to deserialize operation result
metadata_type (type, optional): Type to deserialize operation metadata
retry (AsyncRetry, optional): Async retry configuration for polling
"""
def __init__(self, operation, refresh, cancel, result_type=None, metadata_type=None, retry=None): ...
async def done(self):
"""
Check if the operation is complete.
Returns:
bool: True if operation is complete, False otherwise
"""
def running(self):
"""
Check if the operation is currently running.
Returns:
bool: True if operation is running, False otherwise
"""
async def result(self, timeout=None):
"""
Get the result of the operation, awaiting until complete.
Args:
timeout (float, optional): Maximum time to wait in seconds
Returns:
Any: The operation result
Raises:
GoogleAPICallError: If operation failed
asyncio.TimeoutError: If timeout exceeded
"""
async def exception(self, timeout=None):
"""
Get the exception from a failed operation.
Args:
timeout (float, optional): Maximum time to wait in seconds
Returns:
Exception or None: Exception if operation failed, None if successful
"""
def add_done_callback(self, callback):
"""
Add a callback to be executed when operation completes.
Args:
callback (Callable): Function to call when operation completes
"""
async def cancel(self):
"""
Cancel the operation.
Returns:
bool: True if cancellation was successful
"""
def cancelled(self):
"""
Check if the operation was cancelled.
Returns:
bool: True if operation was cancelled
"""
@property
def metadata(self):
"""
Get operation metadata.
Returns:
Any: Operation metadata if available
"""
@property
def name(self):
"""
Get the operation name/identifier.
Returns:
str: Operation name
"""Enhanced operation handling for APIs with additional metadata and progress tracking.
class ExtendedOperation:
"""
Extended operation with enhanced progress tracking and metadata.
Args:
operation: Initial operation response
refresh (Callable): Function to refresh operation status
cancel (Callable): Function to cancel operation
polling_method (Callable, optional): Custom polling implementation
result_type (type, optional): Type for operation result
metadata_type (type, optional): Type for operation metadata
retry (Retry, optional): Retry configuration
"""
def __init__(self, operation, refresh, cancel, polling_method=None, result_type=None, metadata_type=None, retry=None): ...
def done(self):
"""Check if operation is complete."""
def result(self, timeout=None):
"""Get operation result with timeout."""
def cancel(self):
"""Cancel the operation."""
@property
def progress_percent(self):
"""
Get operation progress as percentage.
Returns:
float: Progress percentage (0.0 to 100.0)
"""
@property
def status_message(self):
"""
Get current operation status message.
Returns:
str: Human-readable status message
"""Helper functions for creating and managing operations.
def _refresh_http(api_request, operation_name, retry=None):
"""
Refresh operation status via HTTP request.
Args:
api_request (Callable): HTTP request function
operation_name (str): Name/ID of the operation
retry (Retry, optional): Retry configuration for requests
Returns:
dict: Updated operation status
"""
def _cancel_http(api_request, operation_name):
"""
Cancel operation via HTTP request.
Args:
api_request (Callable): HTTP request function
operation_name (str): Name/ID of the operation
Returns:
dict: Cancellation response
"""from google.api_core import operation
from google.api_core import retry
import requests
def refresh_operation(op_name):
"""Refresh operation status from API."""
response = requests.get(f"https://api.example.com/operations/{op_name}")
return response.json()
def cancel_operation(op_name):
"""Cancel operation via API."""
response = requests.delete(f"https://api.example.com/operations/{op_name}")
return response.json()
# Start a long-running operation
initial_response = requests.post("https://api.example.com/long-task",
json={"task": "data_processing"})
operation_data = initial_response.json()
# Create operation object
op = operation.Operation(
operation_data,
refresh=lambda: refresh_operation(operation_data["name"]),
cancel=lambda: cancel_operation(operation_data["name"]),
retry=retry.Retry(initial=1.0, maximum=30.0)
)
# Wait for completion
try:
result = op.result(timeout=300) # Wait up to 5 minutes
print("Operation completed:", result)
except Exception as e:
print("Operation failed:", e)import asyncio
from google.api_core import operation_async
from google.api_core import retry
import aiohttp
async def async_refresh_operation(op_name):
"""Async refresh of operation status."""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/operations/{op_name}") as response:
return await response.json()
async def async_cancel_operation(op_name):
"""Async cancel operation."""
async with aiohttp.ClientSession() as session:
async with session.delete(f"https://api.example.com/operations/{op_name}") as response:
return await response.json()
async def main():
# Start async long-running operation
async with aiohttp.ClientSession() as session:
async with session.post("https://api.example.com/long-task",
json={"task": "async_processing"}) as response:
operation_data = await response.json()
# Create async operation
async_op = operation_async.AsyncOperation(
operation_data,
refresh=lambda: async_refresh_operation(operation_data["name"]),
cancel=lambda: async_cancel_operation(operation_data["name"]),
retry=retry.AsyncRetry(initial=1.0, maximum=30.0)
)
# Await completion
try:
result = await async_op.result(timeout=300)
print("Async operation completed:", result)
except Exception as e:
print("Async operation failed:", e)
asyncio.run(main())from google.api_core import operation
import threading
def operation_completed_callback(op):
"""Callback executed when operation completes."""
if op.exception():
print(f"Operation {op.name} failed: {op.exception()}")
else:
print(f"Operation {op.name} succeeded: {op.result()}")
# Create operation with callback
op = operation.Operation(
operation_data,
refresh=refresh_func,
cancel=cancel_func
)
# Add completion callback
op.add_done_callback(operation_completed_callback)
# Start operation in background thread
def monitor_operation():
try:
op.result() # This will block until complete
except Exception:
pass # Callback will handle the result
thread = threading.Thread(target=monitor_operation)
thread.start()
# Continue with other work...
print("Operation running in background")from google.api_core import operation
import time
import threading
# Start operation
op = operation.Operation(
operation_data,
refresh=refresh_func,
cancel=cancel_func
)
# Monitor in background thread
def background_monitor():
try:
result = op.result(timeout=600) # 10 minute timeout
print("Operation completed:", result)
except Exception as e:
print("Operation error:", e)
thread = threading.Thread(target=background_monitor)
thread.start()
# Cancel after 30 seconds if still running
time.sleep(30)
if op.running():
print("Cancelling long-running operation...")
success = op.cancel()
if success:
print("Operation cancelled successfully")
else:
print("Failed to cancel operation")
thread.join()from google.api_core import extended_operation
# Create extended operation with progress tracking
ext_op = extended_operation.ExtendedOperation(
operation_data,
refresh=refresh_func,
cancel=cancel_func
)
# Monitor progress
while not ext_op.done():
progress = ext_op.progress_percent
status = ext_op.status_message
print(f"Progress: {progress:.1f}% - {status}")
time.sleep(5)
# Get final result
if ext_op.exception():
print("Operation failed:", ext_op.exception())
else:
print("Operation completed:", ext_op.result())Install with Tessl CLI
npx tessl i tessl/pypi-google-api-core