CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling and Status

Comprehensive error handling framework with gRPC status codes, custom exceptions, detailed error information, proper exception propagation for both sync and async contexts, and service-side error management capabilities.

Capabilities

Status Codes

Standard gRPC status codes for consistent error reporting across all RPC operations.

class StatusCode(enum.Enum):
    """Mirrors grpc_status_code in the gRPC Core."""
    
    OK = ...                    # Not an error; returned on success
    CANCELLED = ...             # The operation was cancelled (typically by the caller)
    UNKNOWN = ...               # Unknown error
    INVALID_ARGUMENT = ...      # Client specified an invalid argument
    DEADLINE_EXCEEDED = ...     # Deadline expired before operation could complete
    NOT_FOUND = ...             # Some requested entity was not found
    ALREADY_EXISTS = ...        # Some entity that we attempted to create already exists
    PERMISSION_DENIED = ...     # The caller does not have permission to execute the operation
    UNAUTHENTICATED = ...       # The request does not have valid authentication credentials
    RESOURCE_EXHAUSTED = ...    # Some resource has been exhausted (e.g., per-user quota)
    FAILED_PRECONDITION = ...   # Operation was rejected because system is not in required state
    ABORTED = ...               # The operation was aborted, typically due to concurrency issue
    OUT_OF_RANGE = ...          # Operation was attempted past the valid range
    UNIMPLEMENTED = ...         # Operation is not implemented or not supported/enabled
    INTERNAL = ...              # Internal errors; invariants expected by underlying system broken
    UNAVAILABLE = ...           # The service is currently unavailable
    DATA_LOSS = ...             # Unrecoverable data loss or corruption

Usage Examples:

# Client-side status code handling
try:
    response = stub.MyMethod(request)
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.NOT_FOUND:
        print("Resource not found")
    elif e.code() == grpc.StatusCode.PERMISSION_DENIED:
        print("Access denied")
    elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
        print("Request timed out")
    elif e.code() == grpc.StatusCode.UNAVAILABLE:
        print("Service unavailable - retrying might help")
    else:
        print(f"RPC failed: {e.code()} - {e.details()}")

# Server-side status code setting
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
    def GetUser(self, request, context):
        user = self.find_user(request.user_id)
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id} not found")
            return my_service_pb2.GetUserResponse()
        
        if not self.has_permission(context, user):
            context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
        
        return my_service_pb2.GetUserResponse(user=user)

RPC Exceptions

Client-side exception hierarchy for handling RPC failures with comprehensive error information.

class RpcError(Exception):
    """
    Raised by the gRPC library to indicate non-OK-status RPC termination.
    Also implements Call interface for accessing RPC metadata and status.
    """
    
    def code(self) -> StatusCode:
        """Returns the status code sent by the server."""
    
    def details(self) -> str:
        """Returns the details sent by the server."""
    
    def initial_metadata(self):
        """Returns the initial metadata sent by the server."""
    
    def trailing_metadata(self):
        """Returns the trailing metadata sent by the server."""

class FutureTimeoutError(Exception):
    """Indicates that a method call on a Future timed out."""

class FutureCancelledError(Exception):
    """Indicates that the computation underlying a Future was cancelled."""

Usage Examples:

# Comprehensive error handling
def handle_rpc_call():
    try:
        response = stub.MyMethod(request, timeout=10.0)
        return response
    except grpc.RpcError as e:
        # Access detailed error information
        print(f"RPC failed with status: {e.code()}")
        print(f"Error details: {e.details()}")
        
        # Access metadata for debugging
        initial_md = dict(e.initial_metadata())
        trailing_md = dict(e.trailing_metadata())
        print(f"Server metadata: initial={initial_md}, trailing={trailing_md}")
        
        # Handle specific error conditions
        if e.code() == grpc.StatusCode.UNAUTHENTICATED:
            # Refresh authentication and retry
            refresh_credentials()
            return retry_rpc_call()
        elif e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
            # Implement backoff and retry
            time.sleep(1.0)
            return retry_rpc_call()
        else:
            # Log and re-raise for unhandled errors
            log_error(f"Unhandled RPC error: {e}")
            raise
    except grpc.FutureTimeoutError:
        print("RPC future timed out")
        raise
    except grpc.FutureCancelledError:
        print("RPC was cancelled")
        raise

# Async error handling
async def handle_async_rpc():
    try:
        response = await stub.AsyncMethod(request)
        return response
    except grpc.aio.AioRpcError as e:
        print(f"Async RPC failed: {e.code()} - {e.details()}")
        raise
    except asyncio.TimeoutError:
        print("Async operation timed out")
        raise

Server-Side Error Management

Service-side context methods for controlling RPC status, error details, and graceful error handling.

class ServicerContext(RpcContext):
    """Context object for server-side error management."""
    
    def abort(self, code: StatusCode, details: str):
        """
        Raises an exception to terminate the RPC with a non-OK status.

        Parameters:
        - code: A StatusCode object (must not be StatusCode.OK)
        - details: A UTF-8-encodable string to be sent to the client

        Raises:
        Exception: Always raised to signal the abortion of the RPC
        """
    
    def abort_with_status(self, status):
        """
        Raises an exception to terminate the RPC with a status object (EXPERIMENTAL).

        Parameters:
        - status: A grpc.Status object (status code must not be StatusCode.OK)

        Raises:
        Exception: Always raised to signal the abortion of the RPC
        """
    
    def set_code(self, code: StatusCode):
        """
        Sets the value to be used as status code upon RPC completion.

        Parameters:
        - code: A StatusCode object to be sent to the client
        """
    
    def set_details(self, details: str):
        """
        Sets the value to be used as detail string upon RPC completion.

        Parameters:
        - details: A UTF-8-encodable string to be sent to the client
        """
    
    def code(self) -> StatusCode:
        """
        Accesses the value to be used as status code upon RPC completion (EXPERIMENTAL).

        Returns:
        StatusCode: The status code value for the RPC
        """
    
    def details(self) -> str:
        """
        Accesses the value to be used as detail string upon RPC completion (EXPERIMENTAL).

        Returns:
        str: The details string of the RPC
        """

Usage Examples:

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
    def CreateUser(self, request, context):
        # Input validation
        if not request.username:
            context.abort(
                grpc.StatusCode.INVALID_ARGUMENT,
                "Username is required"
            )
        
        if len(request.username) < 3:
            context.abort(
                grpc.StatusCode.INVALID_ARGUMENT,
                "Username must be at least 3 characters long"
            )
        
        # Business logic with error handling
        try:
            user = self.user_service.create_user(request.username, request.email)
            return my_service_pb2.CreateUserResponse(user=user)
        except UserAlreadyExistsError:
            context.abort(
                grpc.StatusCode.ALREADY_EXISTS,
                f"User '{request.username}' already exists"
            )
        except ValidationError as e:
            context.abort(
                grpc.StatusCode.INVALID_ARGUMENT,
                f"Validation failed: {str(e)}"
            )
        except DatabaseError as e:
            # Log internal error but don't expose details to client
            logger.error(f"Database error creating user: {e}")
            context.abort(
                grpc.StatusCode.INTERNAL,
                "Internal server error"
            )
        except Exception as e:
            # Catch-all for unexpected errors
            logger.error(f"Unexpected error: {e}")
            context.abort(
                grpc.StatusCode.UNKNOWN,
                "An unexpected error occurred"
            )
    
    def GetUsers(self, request, context):
        # Authentication check
        if not self.is_authenticated(context):
            context.abort(
                grpc.StatusCode.UNAUTHENTICATED,
                "Authentication required"
            )
        
        # Authorization check
        if not self.has_permission(context, "read_users"):
            context.abort(
                grpc.StatusCode.PERMISSION_DENIED,
                "Insufficient permissions to read users"
            )
        
        try:
            users = self.user_service.get_users(
                limit=request.limit,
                offset=request.offset
            )
            return my_service_pb2.GetUsersResponse(users=users)
        except ResourceExhaustedError:
            context.abort(
                grpc.StatusCode.RESOURCE_EXHAUSTED,
                "Too many requests - please try again later"
            )
    
    def StreamingMethod(self, request, context):
        """Example of error handling in streaming methods."""
        try:
            for item in self.get_stream_data(request):
                # Check if client cancelled
                if not context.is_active():
                    logger.info("Client cancelled streaming request")
                    break
                
                yield my_service_pb2.StreamResponse(data=item)
        except DataCorruptionError as e:
            # Set status but let the method complete normally
            context.set_code(grpc.StatusCode.DATA_LOSS)
            context.set_details(f"Data corruption detected: {str(e)}")
        except Exception as e:
            logger.error(f"Streaming error: {e}")
            context.abort(grpc.StatusCode.INTERNAL, "Stream processing failed")

# Graceful error handling with cleanup
class DatabaseServicer(my_service_pb2_grpc.DatabaseServicer):
    def ProcessTransaction(self, request, context):
        transaction = None
        try:
            # Start transaction
            transaction = self.db.begin_transaction()
            
            # Process operations
            for operation in request.operations:
                self.execute_operation(transaction, operation)
            
            # Commit transaction
            transaction.commit()
            return my_service_pb2.TransactionResponse(success=True)
            
        except ValidationError as e:
            if transaction:
                transaction.rollback()
            context.abort(
                grpc.StatusCode.INVALID_ARGUMENT,
                f"Invalid operation: {str(e)}"
            )
        except ConcurrencyError as e:
            if transaction:
                transaction.rollback()
            context.abort(
                grpc.StatusCode.ABORTED,
                "Transaction aborted due to concurrent modification"
            )
        except Exception as e:
            if transaction:
                transaction.rollback()
            logger.error(f"Transaction failed: {e}")
            context.abort(
                grpc.StatusCode.INTERNAL,
                "Transaction processing failed"
            )

Status Objects

Status object interface for comprehensive error information (EXPERIMENTAL).

class Status(abc.ABC):
    """
    Describes the status of an RPC (EXPERIMENTAL).
    
    Attributes:
    - code: A StatusCode object to be sent to the client
    - details: A UTF-8-encodable string to be sent upon termination
    - trailing_metadata: The trailing metadata in the RPC
    """

Error Handling Patterns

Common patterns for robust error handling in gRPC applications.

Retry Logic:

import time
import random

def exponential_backoff_retry(rpc_func, max_retries=3, base_delay=1.0):
    """Retry RPC with exponential backoff."""
    for attempt in range(max_retries + 1):
        try:
            return rpc_func()
        except grpc.RpcError as e:
            if attempt == max_retries:
                raise  # Final attempt, re-raise
            
            # Only retry on transient errors
            if e.code() in [
                grpc.StatusCode.UNAVAILABLE,
                grpc.StatusCode.DEADLINE_EXCEEDED,
                grpc.StatusCode.RESOURCE_EXHAUSTED,
                grpc.StatusCode.ABORTED
            ]:
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")
                time.sleep(delay)
            else:
                raise  # Don't retry non-transient errors

# Usage
def make_rpc():
    return stub.MyMethod(request, timeout=10.0)

response = exponential_backoff_retry(make_rpc)

Circuit Breaker Pattern:

import time
from collections import defaultdict
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED
    
    def call(self, rpc_func):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise grpc.RpcError("Circuit breaker is OPEN")
        
        try:
            result = rpc_func()
            # Success - reset failure count
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
            self.failure_count = 0
            return result
            
        except grpc.RpcError as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise

# Usage
circuit_breaker = CircuitBreaker()
response = circuit_breaker.call(lambda: stub.MyMethod(request))

Structured Error Response:

# Server-side structured error details
import json

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
    def ValidateData(self, request, context):
        errors = []
        
        # Collect all validation errors
        if not request.email:
            errors.append({"field": "email", "message": "Email is required"})
        elif not self.is_valid_email(request.email):
            errors.append({"field": "email", "message": "Invalid email format"})
        
        if not request.age or request.age < 0:
            errors.append({"field": "age", "message": "Age must be a positive number"})
        
        if errors:
            # Include structured error details in metadata
            error_details = json.dumps({"validation_errors": errors})
            context.set_trailing_metadata([("error-details", error_details)])
            context.abort(
                grpc.StatusCode.INVALID_ARGUMENT,
                f"Validation failed: {len(errors)} errors found"
            )
        
        return my_service_pb2.ValidationResponse(valid=True)

# Client-side structured error handling
try:
    response = stub.ValidateData(request)
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
        trailing_md = dict(e.trailing_metadata())
        if "error-details" in trailing_md:
            error_details = json.loads(trailing_md["error-details"])
            print("Validation errors:")
            for error in error_details["validation_errors"]:
                print(f"  {error['field']}: {error['message']}")

Types

class RpcContext(abc.ABC):
    """Provides RPC-related information and action."""
    
    def is_active(self) -> bool:
        """Describes whether the RPC is active or has terminated."""
    
    def time_remaining(self):
        """Describes the length of allowed time remaining for the RPC."""
    
    def cancel(self):
        """Cancels the RPC. Idempotent and has no effect if already terminated."""
    
    def add_callback(self, callback) -> bool:
        """Registers a callback to be called on RPC termination."""

class Call(RpcContext):
    """Invocation-side utility object for an RPC."""
    
    def initial_metadata(self):
        """Accesses the initial metadata sent by the server."""
    
    def trailing_metadata(self):
        """Accesses the trailing metadata sent by the server."""
    
    def code(self) -> StatusCode:
        """Accesses the status code sent by the server."""
    
    def details(self) -> str:
        """Accesses the details sent by the server."""

Install with Tessl CLI

npx tessl i tessl/pypi-grpcio

docs

async-api.md

channel-management.md

error-handling.md

index.md

interceptors.md

protobuf-integration.md

rpc-patterns.md

security-authentication.md

server-implementation.md

tile.json