Common protocol buffer definitions used across Google APIs and client libraries
—
Support for asynchronous operations that may take significant time to complete. Provides standard patterns for operation management, polling, result retrieval, and cancellation across Google services.
Core operation message representing asynchronous operations with metadata and results.
from google.longrunning.operations_proto_pb2 import Operation, OperationInfo
class Operation(message.Message):
"""Long-running operation representation."""
name: str # Operation resource name (unique identifier)
metadata: Any # Operation-specific metadata
done: bool # True when operation is complete
# Union field 'result' (one of):
error: Status # Error result if operation failed
response: Any # Success result if operation completed
class OperationInfo(message.Message):
"""Operation type information."""
response_type: str # Expected response message type
metadata_type: str # Expected metadata message typeStandard request messages for managing long-running operations.
from google.longrunning.operations_proto_pb2 import (
GetOperationRequest, ListOperationsRequest, ListOperationsResponse,
CancelOperationRequest, DeleteOperationRequest, WaitOperationRequest
)
class GetOperationRequest(message.Message):
"""Request to get an operation by name."""
name: str # Operation name to retrieve
class ListOperationsRequest(message.Message):
"""Request to list operations."""
name: str # Parent resource name for filtering
filter: str # Filter expression
page_size: int # Maximum number of operations to return
page_token: str # Page token for pagination
class ListOperationsResponse(message.Message):
"""Response containing list of operations."""
operations: list[Operation] # List of operations
next_page_token: str # Token for next page (if any)
class CancelOperationRequest(message.Message):
"""Request to cancel an operation."""
name: str # Operation name to cancel
class DeleteOperationRequest(message.Message):
"""Request to delete an operation."""
name: str # Operation name to delete
class WaitOperationRequest(message.Message):
"""Request to wait for operation completion."""
name: str # Operation name to wait for
timeout: Duration # Maximum time to waitfrom google.longrunning.operations_proto_pb2 import Operation, GetOperationRequest
from google.rpc.status_pb2 import Status
def check_operation_status(operation: Operation):
"""Check and handle operation status."""
if operation.done:
if operation.HasField('error'):
# Operation failed
print(f"Operation failed: {operation.error.message}")
return False
elif operation.HasField('response'):
# Operation completed successfully
print("Operation completed successfully")
return True
else:
# Operation still in progress
print(f"Operation {operation.name} is still running...")
return False
# Example usage
operation = Operation()
operation.name = "projects/my-project/operations/12345"
operation.done = False
# Check status
is_complete = check_operation_status(operation)import time
from google.longrunning.operations_proto_pb2 import GetOperationRequest
from google.protobuf.duration_pb2 import Duration
def poll_operation(operation_name: str, client, max_wait_seconds: int = 300):
"""Poll operation until completion or timeout."""
start_time = time.time()
poll_interval = 1 # Start with 1 second
max_poll_interval = 60 # Cap at 60 seconds
while time.time() - start_time < max_wait_seconds:
# Get current operation status
request = GetOperationRequest()
request.name = operation_name
operation = client.get_operation(request)
if operation.done:
if operation.HasField('error'):
raise Exception(f"Operation failed: {operation.error.message}")
return operation.response
# Wait before next poll with exponential backoff
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, max_poll_interval)
raise TimeoutError(f"Operation {operation_name} did not complete within {max_wait_seconds} seconds")from google.longrunning.operations_proto_pb2 import Operation
from google.protobuf.any_pb2 import Any
from google.protobuf.timestamp_pb2 import Timestamp
# Custom metadata message (example)
class CreateInstanceMetadata:
"""Example metadata for instance creation."""
def __init__(self):
self.start_time = Timestamp()
self.start_time.GetCurrentTime()
self.progress_percent = 0
self.status_message = "Starting instance creation"
def create_operation_with_metadata(operation_name: str, metadata) -> Operation:
"""Create operation with custom metadata."""
operation = Operation()
operation.name = operation_name
operation.done = False
# Pack metadata into Any field
metadata_any = Any()
metadata_any.Pack(metadata)
operation.metadata.CopyFrom(metadata_any)
return operationfrom google.protobuf.any_pb2 import Any
def extract_operation_result(operation: Operation, expected_type):
"""Extract strongly-typed result from operation."""
if not operation.done:
raise ValueError("Operation is not yet complete")
if operation.HasField('error'):
raise Exception(f"Operation failed: {operation.error.message}")
if not operation.HasField('response'):
raise ValueError("Operation completed but has no response")
# Unpack the response to expected type
result = expected_type()
operation.response.Unpack(result)
return result
# Example usage with custom response type
class CreateInstanceResponse:
"""Example response for instance creation."""
def __init__(self):
self.instance_id = ""
self.instance_name = ""
self.status = "RUNNING"
# Usage
# operation = get_completed_operation()
# result = extract_operation_result(operation, CreateInstanceResponse)
# print(f"Created instance: {result.instance_id}")from google.longrunning.operations_proto_pb2 import CancelOperationRequest
def cancel_operation(operation_name: str, client):
"""Cancel a long-running operation."""
request = CancelOperationRequest()
request.name = operation_name
try:
client.cancel_operation(request)
print(f"Cancellation requested for operation: {operation_name}")
# Check if cancellation was successful
get_request = GetOperationRequest()
get_request.name = operation_name
operation = client.get_operation(get_request)
if operation.done and operation.HasField('error'):
if operation.error.code == 1: # CANCELLED
print("Operation was successfully cancelled")
else:
print(f"Operation failed with error: {operation.error.message}")
else:
print("Cancellation may still be in progress")
except Exception as e:
print(f"Failed to cancel operation: {e}")from google.longrunning.operations_proto_pb2 import ListOperationsRequest
def list_operations(parent: str, client, filter_expr: str = "", page_size: int = 50):
"""List operations with filtering and pagination."""
request = ListOperationsRequest()
request.name = parent
request.filter = filter_expr
request.page_size = page_size
all_operations = []
while True:
response = client.list_operations(request)
all_operations.extend(response.operations)
# Check if there are more pages
if not response.next_page_token:
break
# Set up next page request
request.page_token = response.next_page_token
return all_operations
# Example usage
parent = "projects/my-project"
filter_expr = "done=false" # Only incomplete operations
operations = list_operations(parent, client, filter_expr)
for op in operations:
print(f"Operation: {op.name}, Done: {op.done}")from google.longrunning.operations_proto_pb2 import WaitOperationRequest
from google.protobuf.duration_pb2 import Duration
def wait_for_operation(operation_name: str, client, timeout_seconds: int = 300):
"""Wait for operation to complete with specified timeout."""
request = WaitOperationRequest()
request.name = operation_name
# Set timeout
request.timeout.seconds = timeout_seconds
try:
# This call blocks until operation completes or timeout
operation = client.wait_operation(request)
if operation.HasField('error'):
raise Exception(f"Operation failed: {operation.error.message}")
print(f"Operation {operation_name} completed successfully")
return operation.response
except Exception as e:
if "timeout" in str(e).lower():
print(f"Operation {operation_name} did not complete within {timeout_seconds} seconds")
raisefrom google.protobuf.any_pb2 import Any
def track_operation_progress(operation: Operation):
"""Extract and display progress information from operation metadata."""
if not operation.metadata:
print("No metadata available")
return
# Try to extract common progress information
# This would depend on the specific metadata type
try:
# Example for a hypothetical progress metadata
if operation.metadata.type_url.endswith("ProgressMetadata"):
# Unpack metadata (this is service-specific)
print(f"Operation {operation.name} progress tracking:")
print(f" Status: {'Complete' if operation.done else 'In Progress'}")
if operation.done:
if operation.HasField('error'):
print(f" Error: {operation.error.message}")
else:
print(" Result: Success")
else:
print(" Still processing...")
except Exception as e:
print(f"Could not extract progress information: {e}")
# Example usage in a monitoring loop
def monitor_operation(operation_name: str, client, check_interval: int = 10):
"""Monitor operation progress with periodic updates."""
while True:
request = GetOperationRequest()
request.name = operation_name
operation = client.get_operation(request)
track_operation_progress(operation)
if operation.done:
break
time.sleep(check_interval)Install with Tessl CLI
npx tessl i tessl/pypi-googleapis-common-protos