CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-grpcio

HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems

Pending
Overview
Eval results
Files

rpc-patterns.mddocs/

RPC Patterns and Multi-Callables

Support for all four RPC patterns (unary-unary, unary-stream, stream-unary, stream-stream) with synchronous and asynchronous invocation methods, comprehensive timeout handling, metadata passing, and credential specification for flexible client-side RPC execution.

Capabilities

Unary-Unary Pattern

Single request to single response RPC pattern with synchronous, asynchronous, and callback-based invocation modes.

class UnaryUnaryMultiCallable(abc.ABC):
    """Affords invoking a unary-unary RPC from client-side."""
    
    def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Synchronously invokes the underlying RPC.

        Parameters:
        - request: The request value for the RPC
        - timeout: Optional duration in seconds to allow for the RPC
        - metadata: Optional metadata to be transmitted to the service-side
        - credentials: Optional CallCredentials for the RPC (secure Channel only)
        - wait_for_ready: Optional flag to enable wait_for_ready mechanism
        - compression: Optional compression element (e.g., grpc.compression.Gzip)

        Returns:
        The response value for the RPC

        Raises:
        RpcError: Indicating that the RPC terminated with non-OK status
        """
    
    def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Synchronously invokes the underlying RPC and returns Call object.

        Parameters:
        Same as __call__()

        Returns:
        tuple: (response_value, Call) - Response and Call object for RPC metadata

        Raises:
        RpcError: Indicating that the RPC terminated with non-OK status
        """
    
    def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Asynchronously invokes the underlying RPC.

        Parameters:
        Same as __call__()

        Returns:
        Future: Call-Future object; result() returns response, exception() returns RpcError
        """

Usage Examples:

# Create channel and stub
channel = grpc.insecure_channel('localhost:50051')
stub = my_service_pb2_grpc.MyServiceStub(channel)

# Synchronous call
request = my_service_pb2.MyRequest(message="Hello")
response = stub.UnaryMethod(request, timeout=5.0)
print(response.reply)

# Synchronous call with metadata and call info
metadata = [('user-agent', 'my-client/1.0')]
response, call = stub.UnaryMethod.with_call(
    request, 
    timeout=5.0, 
    metadata=metadata
)
print(f"Response: {response.reply}")
print(f"Status code: {call.code()}")
print(f"Status details: {call.details()}")

# Asynchronous call with Future
future = stub.UnaryMethod.future(request, timeout=5.0)
try:
    response = future.result(timeout=10.0)
    print(response.reply)
except grpc.RpcError as e:
    print(f"RPC failed: {e.code()} - {e.details()}")
except grpc.FutureTimeoutError:
    print("Future timed out")

# With credentials and compression
ssl_creds = grpc.ssl_channel_credentials()
call_creds = grpc.access_token_call_credentials("token")
channel_creds = grpc.composite_channel_credentials(ssl_creds, call_creds)
secure_channel = grpc.secure_channel('secure-server.com:443', channel_creds)
secure_stub = my_service_pb2_grpc.MyServiceStub(secure_channel)

response = secure_stub.UnaryMethod(
    request,
    timeout=30.0,
    compression=grpc.compression.Gzip,
    metadata=[('request-id', 'req-123')]
)

Unary-Stream Pattern

Single request to multiple response RPC pattern with iterator-based response consumption.

class UnaryStreamMultiCallable(abc.ABC):
    """Affords invoking a unary-stream RPC from client-side."""
    
    def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Invokes the underlying RPC.

        Parameters:
        - request: The request value for the RPC
        - timeout: Optional duration in seconds (None for infinite)
        - metadata: Optional metadata to be transmitted to the service-side
        - credentials: Optional CallCredentials for the RPC (secure Channel only)
        - wait_for_ready: Optional flag to enable wait_for_ready mechanism
        - compression: Optional compression element

        Returns:
        Iterator: Call-iterator for response values and Future for RPC completion
        
        Note:
        Drawing response values may raise RpcError indicating non-OK termination
        """

Usage Examples:

# Unary-stream call
request = my_service_pb2.StreamRequest(count=5)
response_iterator = stub.UnaryStreamMethod(request, timeout=30.0)

# Iterate over responses
try:
    for response in response_iterator:
        print(f"Received: {response.message}")
        # Can break early if needed
        if should_stop():
            break
except grpc.RpcError as e:
    print(f"Stream failed: {e.code()} - {e.details()}")

# Access call information
response_iterator = stub.UnaryStreamMethod(request)
print(f"Initial metadata: {response_iterator.initial_metadata()}")

try:
    responses = list(response_iterator)  # Consume all responses
    print(f"Trailing metadata: {response_iterator.trailing_metadata()}")
    print(f"Final status: {response_iterator.code()}")
except grpc.RpcError as e:
    print(f"Stream terminated with error: {e}")

Stream-Unary Pattern

Multiple request to single response RPC pattern with iterator-based request sending.

class StreamUnaryMultiCallable(abc.ABC):
    """Affords invoking a stream-unary RPC from client-side."""
    
    def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Synchronously invokes the underlying RPC.

        Parameters:
        - request_iterator: An iterator that yields request values for the RPC
        - timeout: Optional duration in seconds (None for infinite)
        - metadata: Optional metadata to be transmitted to the service-side
        - credentials: Optional CallCredentials for the RPC (secure Channel only)
        - wait_for_ready: Optional flag to enable wait_for_ready mechanism
        - compression: Optional compression element

        Returns:
        The response value for the RPC

        Raises:
        RpcError: Indicating that the RPC terminated with non-OK status
        """
    
    def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Synchronously invokes the underlying RPC and returns Call object.

        Returns:
        tuple: (response_value, Call) - Response and Call object for RPC metadata
        """
    
    def future(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Asynchronously invokes the underlying RPC.

        Returns:
        Future: Call-Future object for asynchronous result retrieval
        """

Usage Examples:

# Stream-unary call with generator
def request_generator():
    for i in range(10):
        yield my_service_pb2.StreamRequest(data=f"item-{i}")
        time.sleep(0.1)  # Simulate processing time

response = stub.StreamUnaryMethod(request_generator(), timeout=30.0)
print(f"Final result: {response.summary}")

# Stream-unary with list of requests
requests = [
    my_service_pb2.StreamRequest(data="first"),
    my_service_pb2.StreamRequest(data="second"),
    my_service_pb2.StreamRequest(data="third"),
]

response, call = stub.StreamUnaryMethod.with_call(iter(requests))
print(f"Response: {response.summary}")
print(f"Metadata: {dict(call.trailing_metadata())}")

# Asynchronous stream-unary
def async_request_generator():
    for i in range(100):
        yield my_service_pb2.StreamRequest(data=f"batch-{i}")

future = stub.StreamUnaryMethod.future(async_request_generator(), timeout=60.0)

# Do other work while RPC executes
do_other_work()

# Get result when ready
try:
    response = future.result(timeout=10.0)
    print(f"Async result: {response.summary}")
except grpc.FutureTimeoutError:
    print("Still waiting for result...")
    response = future.result()  # Wait indefinitely

Stream-Stream Pattern

Bidirectional streaming RPC pattern with full-duplex communication capabilities.

class StreamStreamMultiCallable(abc.ABC):
    """Affords invoking a stream-stream RPC on client-side."""
    
    def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
        """
        Invokes the underlying RPC on the client.

        Parameters:
        - request_iterator: An iterator that yields request values for the RPC
        - timeout: Optional duration in seconds (None for infinite)
        - metadata: Optional metadata to be transmitted to the service-side
        - credentials: Optional CallCredentials for the RPC (secure Channel only)
        - wait_for_ready: Optional flag to enable wait_for_ready mechanism
        - compression: Optional compression element

        Returns:
        Iterator: Call-iterator for response values and Future for RPC completion
        
        Note:
        Drawing response values may raise RpcError indicating non-OK termination
        """

Usage Examples:

# Bidirectional streaming
def request_generator():
    for i in range(5):
        yield my_service_pb2.ChatMessage(user="client", message=f"Message {i}")
        time.sleep(1)

response_iterator = stub.StreamStreamMethod(request_generator())

# Process responses as they arrive
try:
    for response in response_iterator:
        print(f"Server says: {response.message}")
except grpc.RpcError as e:
    print(f"Stream error: {e}")

# Chat-like bidirectional streaming
import threading
import queue

def chat_client():
    # Queue for sending messages
    message_queue = queue.Queue()
    
    def request_generator():
        while True:
            try:
                message = message_queue.get(timeout=1.0)
                if message is None:  # Sentinel to stop
                    break
                yield my_service_pb2.ChatMessage(user="client", message=message)
            except queue.Empty:
                continue
    
    # Start streaming RPC
    response_iterator = stub.ChatMethod(request_generator())
    
    # Thread to handle incoming messages
    def handle_responses():
        try:
            for response in response_iterator:
                print(f"[{response.user}]: {response.message}")
        except grpc.RpcError as e:
            print(f"Chat ended: {e}")
    
    response_thread = threading.Thread(target=handle_responses)
    response_thread.start()
    
    # Send messages from user input
    try:
        while True:
            user_input = input("You: ")
            if user_input.lower() == '/quit':
                break
            message_queue.put(user_input)
    finally:
        message_queue.put(None)  # Signal to stop request generator
        response_thread.join()

# Advanced bidirectional streaming with flow control
class FlowControlledStreaming:
    def __init__(self, stub):
        self.stub = stub
        self.request_queue = queue.Queue(maxsize=10)  # Limit pending requests
        self.stop_event = threading.Event()
    
    def request_generator(self):
        while not self.stop_event.is_set():
            try:
                request = self.request_queue.get(timeout=0.5)
                yield request
            except queue.Empty:
                continue
    
    def start_streaming(self):
        response_iterator = self.stub.StreamStreamMethod(
            self.request_generator(),
            timeout=300.0
        )
        
        for response in response_iterator:
            self.process_response(response)
            
            # Flow control: only send new requests after processing response
            if not self.request_queue.full():
                self.maybe_send_request()
    
    def send_request(self, request):
        if not self.request_queue.full():
            self.request_queue.put(request)
        else:
            print("Request queue full, dropping request")
    
    def stop(self):
        self.stop_event.set()

Call Objects and Context

Access to RPC metadata, status, and control information through Call objects.

class Call(RpcContext):
    """Invocation-side utility object for an RPC."""
    
    def initial_metadata(self):
        """
        Accesses the initial metadata sent by the server.
        This method blocks until the value is available.

        Returns:
        Metadata: The initial metadata key-value pairs
        """
    
    def trailing_metadata(self):
        """
        Accesses the trailing metadata sent by the server.
        This method blocks until the value is available.

        Returns:
        Metadata: The trailing metadata key-value pairs
        """
    
    def code(self) -> StatusCode:
        """
        Accesses the status code sent by the server.
        This method blocks until the value is available.

        Returns:
        StatusCode: The status code value for the RPC
        """
    
    def details(self) -> str:
        """
        Accesses the details sent by the server.
        This method blocks until the value is available.

        Returns:
        str: The details string of the RPC
        """
    
    def is_active(self) -> bool:
        """
        Describes whether the RPC is active or has terminated.

        Returns:
        bool: True if RPC is active, False otherwise
        """
    
    def time_remaining(self):
        """
        Describes the length of allowed time remaining for the RPC.

        Returns:
        float or None: Seconds remaining for RPC completion, or None if no deadline
        """
    
    def cancel(self):
        """
        Cancels the RPC.
        Idempotent and has no effect if the RPC has already terminated.
        """
    
    def add_callback(self, callback) -> bool:
        """
        Registers a callback to be called on RPC termination.

        Parameters:
        - callback: A no-parameter callable to be called on RPC termination

        Returns:
        bool: True if callback was added, False if RPC already terminated
        """

Usage Examples:

# Access call metadata and status
response, call = stub.UnaryMethod.with_call(request)

print(f"Initial metadata: {dict(call.initial_metadata())}")
print(f"Status code: {call.code()}")
print(f"Status details: {call.details()}")
print(f"Trailing metadata: {dict(call.trailing_metadata())}")

# RPC cancellation
future = stub.LongRunningMethod.future(request)

# Cancel after 5 seconds if not done
def cancel_if_needed():
    time.sleep(5)
    if not future.done():
        future.cancel()
        print("RPC cancelled due to timeout")

threading.Thread(target=cancel_if_needed).start()

try:
    response = future.result()
except grpc.FutureCancelledError:
    print("RPC was cancelled")

# Streaming with call control
response_iterator = stub.UnaryStreamMethod(request)

def handle_cancellation():
    time.sleep(10)
    if response_iterator.is_active():
        response_iterator.cancel()
        print("Stream cancelled")

threading.Thread(target=handle_cancellation).start()

try:
    for response in response_iterator:
        print(f"Response: {response}")
        if not response_iterator.is_active():
            break
except grpc.RpcError as e:
    if e.code() == grpc.StatusCode.CANCELLED:
        print("Stream was cancelled")

Types

class ClientCallDetails(abc.ABC):
    """
    Describes an RPC to be invoked.
    
    Attributes:
    - method: The method name of the RPC
    - timeout: Optional duration of time in seconds to allow for the RPC
    - metadata: Optional metadata to be transmitted to the service-side
    - credentials: Optional CallCredentials for the RPC
    - wait_for_ready: Optional flag to enable wait_for_ready mechanism
    - compression: Optional compression element
    """

class RpcContext(abc.ABC):
    """Provides RPC-related information and action."""
    
    def is_active(self) -> bool:
        """Returns True if RPC is active, False otherwise."""
    
    def time_remaining(self):
        """Returns seconds remaining for RPC or None if no deadline."""
    
    def cancel(self):
        """Cancels the RPC. Idempotent."""
    
    def add_callback(self, callback) -> bool:
        """Registers callback for RPC termination."""

Install with Tessl CLI

npx tessl i tessl/pypi-grpcio

docs

async-api.md

channel-management.md

error-handling.md

index.md

interceptors.md

protobuf-integration.md

rpc-patterns.md

security-authentication.md

server-implementation.md

tile.json