HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Comprehensive error handling framework with gRPC status codes, custom exceptions, detailed error information, proper exception propagation for both sync and async contexts, and service-side error management capabilities.
Standard gRPC status codes for consistent error reporting across all RPC operations.
class StatusCode(enum.Enum):
"""Mirrors grpc_status_code in the gRPC Core."""
OK = ... # Not an error; returned on success
CANCELLED = ... # The operation was cancelled (typically by the caller)
UNKNOWN = ... # Unknown error
INVALID_ARGUMENT = ... # Client specified an invalid argument
DEADLINE_EXCEEDED = ... # Deadline expired before operation could complete
NOT_FOUND = ... # Some requested entity was not found
ALREADY_EXISTS = ... # Some entity that we attempted to create already exists
PERMISSION_DENIED = ... # The caller does not have permission to execute the operation
UNAUTHENTICATED = ... # The request does not have valid authentication credentials
RESOURCE_EXHAUSTED = ... # Some resource has been exhausted (e.g., per-user quota)
FAILED_PRECONDITION = ... # Operation was rejected because system is not in required state
ABORTED = ... # The operation was aborted, typically due to concurrency issue
OUT_OF_RANGE = ... # Operation was attempted past the valid range
UNIMPLEMENTED = ... # Operation is not implemented or not supported/enabled
INTERNAL = ... # Internal errors; invariants expected by underlying system broken
UNAVAILABLE = ... # The service is currently unavailable
DATA_LOSS = ... # Unrecoverable data loss or corruptionUsage Examples:
# Client-side status code handling
try:
response = stub.MyMethod(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
print("Resource not found")
elif e.code() == grpc.StatusCode.PERMISSION_DENIED:
print("Access denied")
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Request timed out")
elif e.code() == grpc.StatusCode.UNAVAILABLE:
print("Service unavailable - retrying might help")
else:
print(f"RPC failed: {e.code()} - {e.details()}")
# Server-side status code setting
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def GetUser(self, request, context):
user = self.find_user(request.user_id)
if not user:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {request.user_id} not found")
return my_service_pb2.GetUserResponse()
if not self.has_permission(context, user):
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
return my_service_pb2.GetUserResponse(user=user)Client-side exception hierarchy for handling RPC failures with comprehensive error information.
class RpcError(Exception):
"""
Raised by the gRPC library to indicate non-OK-status RPC termination.
Also implements Call interface for accessing RPC metadata and status.
"""
def code(self) -> StatusCode:
"""Returns the status code sent by the server."""
def details(self) -> str:
"""Returns the details sent by the server."""
def initial_metadata(self):
"""Returns the initial metadata sent by the server."""
def trailing_metadata(self):
"""Returns the trailing metadata sent by the server."""
class FutureTimeoutError(Exception):
"""Indicates that a method call on a Future timed out."""
class FutureCancelledError(Exception):
"""Indicates that the computation underlying a Future was cancelled."""Usage Examples:
# Comprehensive error handling
def handle_rpc_call():
try:
response = stub.MyMethod(request, timeout=10.0)
return response
except grpc.RpcError as e:
# Access detailed error information
print(f"RPC failed with status: {e.code()}")
print(f"Error details: {e.details()}")
# Access metadata for debugging
initial_md = dict(e.initial_metadata())
trailing_md = dict(e.trailing_metadata())
print(f"Server metadata: initial={initial_md}, trailing={trailing_md}")
# Handle specific error conditions
if e.code() == grpc.StatusCode.UNAUTHENTICATED:
# Refresh authentication and retry
refresh_credentials()
return retry_rpc_call()
elif e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
# Implement backoff and retry
time.sleep(1.0)
return retry_rpc_call()
else:
# Log and re-raise for unhandled errors
log_error(f"Unhandled RPC error: {e}")
raise
except grpc.FutureTimeoutError:
print("RPC future timed out")
raise
except grpc.FutureCancelledError:
print("RPC was cancelled")
raise
# Async error handling
async def handle_async_rpc():
try:
response = await stub.AsyncMethod(request)
return response
except grpc.aio.AioRpcError as e:
print(f"Async RPC failed: {e.code()} - {e.details()}")
raise
except asyncio.TimeoutError:
print("Async operation timed out")
raiseService-side context methods for controlling RPC status, error details, and graceful error handling.
class ServicerContext(RpcContext):
"""Context object for server-side error management."""
def abort(self, code: StatusCode, details: str):
"""
Raises an exception to terminate the RPC with a non-OK status.
Parameters:
- code: A StatusCode object (must not be StatusCode.OK)
- details: A UTF-8-encodable string to be sent to the client
Raises:
Exception: Always raised to signal the abortion of the RPC
"""
def abort_with_status(self, status):
"""
Raises an exception to terminate the RPC with a status object (EXPERIMENTAL).
Parameters:
- status: A grpc.Status object (status code must not be StatusCode.OK)
Raises:
Exception: Always raised to signal the abortion of the RPC
"""
def set_code(self, code: StatusCode):
"""
Sets the value to be used as status code upon RPC completion.
Parameters:
- code: A StatusCode object to be sent to the client
"""
def set_details(self, details: str):
"""
Sets the value to be used as detail string upon RPC completion.
Parameters:
- details: A UTF-8-encodable string to be sent to the client
"""
def code(self) -> StatusCode:
"""
Accesses the value to be used as status code upon RPC completion (EXPERIMENTAL).
Returns:
StatusCode: The status code value for the RPC
"""
def details(self) -> str:
"""
Accesses the value to be used as detail string upon RPC completion (EXPERIMENTAL).
Returns:
str: The details string of the RPC
"""Usage Examples:
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def CreateUser(self, request, context):
# Input validation
if not request.username:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"Username is required"
)
if len(request.username) < 3:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"Username must be at least 3 characters long"
)
# Business logic with error handling
try:
user = self.user_service.create_user(request.username, request.email)
return my_service_pb2.CreateUserResponse(user=user)
except UserAlreadyExistsError:
context.abort(
grpc.StatusCode.ALREADY_EXISTS,
f"User '{request.username}' already exists"
)
except ValidationError as e:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"Validation failed: {str(e)}"
)
except DatabaseError as e:
# Log internal error but don't expose details to client
logger.error(f"Database error creating user: {e}")
context.abort(
grpc.StatusCode.INTERNAL,
"Internal server error"
)
except Exception as e:
# Catch-all for unexpected errors
logger.error(f"Unexpected error: {e}")
context.abort(
grpc.StatusCode.UNKNOWN,
"An unexpected error occurred"
)
def GetUsers(self, request, context):
# Authentication check
if not self.is_authenticated(context):
context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"Authentication required"
)
# Authorization check
if not self.has_permission(context, "read_users"):
context.abort(
grpc.StatusCode.PERMISSION_DENIED,
"Insufficient permissions to read users"
)
try:
users = self.user_service.get_users(
limit=request.limit,
offset=request.offset
)
return my_service_pb2.GetUsersResponse(users=users)
except ResourceExhaustedError:
context.abort(
grpc.StatusCode.RESOURCE_EXHAUSTED,
"Too many requests - please try again later"
)
def StreamingMethod(self, request, context):
"""Example of error handling in streaming methods."""
try:
for item in self.get_stream_data(request):
# Check if client cancelled
if not context.is_active():
logger.info("Client cancelled streaming request")
break
yield my_service_pb2.StreamResponse(data=item)
except DataCorruptionError as e:
# Set status but let the method complete normally
context.set_code(grpc.StatusCode.DATA_LOSS)
context.set_details(f"Data corruption detected: {str(e)}")
except Exception as e:
logger.error(f"Streaming error: {e}")
context.abort(grpc.StatusCode.INTERNAL, "Stream processing failed")
# Graceful error handling with cleanup
class DatabaseServicer(my_service_pb2_grpc.DatabaseServicer):
def ProcessTransaction(self, request, context):
transaction = None
try:
# Start transaction
transaction = self.db.begin_transaction()
# Process operations
for operation in request.operations:
self.execute_operation(transaction, operation)
# Commit transaction
transaction.commit()
return my_service_pb2.TransactionResponse(success=True)
except ValidationError as e:
if transaction:
transaction.rollback()
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"Invalid operation: {str(e)}"
)
except ConcurrencyError as e:
if transaction:
transaction.rollback()
context.abort(
grpc.StatusCode.ABORTED,
"Transaction aborted due to concurrent modification"
)
except Exception as e:
if transaction:
transaction.rollback()
logger.error(f"Transaction failed: {e}")
context.abort(
grpc.StatusCode.INTERNAL,
"Transaction processing failed"
)Status object interface for comprehensive error information (EXPERIMENTAL).
class Status(abc.ABC):
"""
Describes the status of an RPC (EXPERIMENTAL).
Attributes:
- code: A StatusCode object to be sent to the client
- details: A UTF-8-encodable string to be sent upon termination
- trailing_metadata: The trailing metadata in the RPC
"""Common patterns for robust error handling in gRPC applications.
Retry Logic:
import time
import random
def exponential_backoff_retry(rpc_func, max_retries=3, base_delay=1.0):
"""Retry RPC with exponential backoff."""
for attempt in range(max_retries + 1):
try:
return rpc_func()
except grpc.RpcError as e:
if attempt == max_retries:
raise # Final attempt, re-raise
# Only retry on transient errors
if e.code() in [
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.RESOURCE_EXHAUSTED,
grpc.StatusCode.ABORTED
]:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
else:
raise # Don't retry non-transient errors
# Usage
def make_rpc():
return stub.MyMethod(request, timeout=10.0)
response = exponential_backoff_retry(make_rpc)Circuit Breaker Pattern:
import time
from collections import defaultdict
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
def call(self, rpc_func):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise grpc.RpcError("Circuit breaker is OPEN")
try:
result = rpc_func()
# Success - reset failure count
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except grpc.RpcError as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage
circuit_breaker = CircuitBreaker()
response = circuit_breaker.call(lambda: stub.MyMethod(request))Structured Error Response:
# Server-side structured error details
import json
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def ValidateData(self, request, context):
errors = []
# Collect all validation errors
if not request.email:
errors.append({"field": "email", "message": "Email is required"})
elif not self.is_valid_email(request.email):
errors.append({"field": "email", "message": "Invalid email format"})
if not request.age or request.age < 0:
errors.append({"field": "age", "message": "Age must be a positive number"})
if errors:
# Include structured error details in metadata
error_details = json.dumps({"validation_errors": errors})
context.set_trailing_metadata([("error-details", error_details)])
context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"Validation failed: {len(errors)} errors found"
)
return my_service_pb2.ValidationResponse(valid=True)
# Client-side structured error handling
try:
response = stub.ValidateData(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
trailing_md = dict(e.trailing_metadata())
if "error-details" in trailing_md:
error_details = json.loads(trailing_md["error-details"])
print("Validation errors:")
for error in error_details["validation_errors"]:
print(f" {error['field']}: {error['message']}")class RpcContext(abc.ABC):
"""Provides RPC-related information and action."""
def is_active(self) -> bool:
"""Describes whether the RPC is active or has terminated."""
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC."""
def cancel(self):
"""Cancels the RPC. Idempotent and has no effect if already terminated."""
def add_callback(self, callback) -> bool:
"""Registers a callback to be called on RPC termination."""
class Call(RpcContext):
"""Invocation-side utility object for an RPC."""
def initial_metadata(self):
"""Accesses the initial metadata sent by the server."""
def trailing_metadata(self):
"""Accesses the trailing metadata sent by the server."""
def code(self) -> StatusCode:
"""Accesses the status code sent by the server."""
def details(self) -> str:
"""Accesses the details sent by the server."""Install with Tessl CLI
npx tessl i tessl/pypi-grpcio