HTTP/2-based RPC framework with synchronous and asynchronous APIs for building distributed systems
—
Support for all four RPC patterns (unary-unary, unary-stream, stream-unary, stream-stream) with synchronous and asynchronous invocation methods, comprehensive timeout handling, metadata passing, and credential specification for flexible client-side RPC execution.
Single request to single response RPC pattern with synchronous, asynchronous, and callback-based invocation modes.
class UnaryUnaryMultiCallable(abc.ABC):
"""Affords invoking a unary-unary RPC from client-side."""
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Synchronously invokes the underlying RPC.
Parameters:
- request: The request value for the RPC
- timeout: Optional duration in seconds to allow for the RPC
- metadata: Optional metadata to be transmitted to the service-side
- credentials: Optional CallCredentials for the RPC (secure Channel only)
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
- compression: Optional compression element (e.g., grpc.compression.Gzip)
Returns:
The response value for the RPC
Raises:
RpcError: Indicating that the RPC terminated with non-OK status
"""
def with_call(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Synchronously invokes the underlying RPC and returns Call object.
Parameters:
Same as __call__()
Returns:
tuple: (response_value, Call) - Response and Call object for RPC metadata
Raises:
RpcError: Indicating that the RPC terminated with non-OK status
"""
def future(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Asynchronously invokes the underlying RPC.
Parameters:
Same as __call__()
Returns:
Future: Call-Future object; result() returns response, exception() returns RpcError
"""Usage Examples:
# Create channel and stub
channel = grpc.insecure_channel('localhost:50051')
stub = my_service_pb2_grpc.MyServiceStub(channel)
# Synchronous call
request = my_service_pb2.MyRequest(message="Hello")
response = stub.UnaryMethod(request, timeout=5.0)
print(response.reply)
# Synchronous call with metadata and call info
metadata = [('user-agent', 'my-client/1.0')]
response, call = stub.UnaryMethod.with_call(
request,
timeout=5.0,
metadata=metadata
)
print(f"Response: {response.reply}")
print(f"Status code: {call.code()}")
print(f"Status details: {call.details()}")
# Asynchronous call with Future
future = stub.UnaryMethod.future(request, timeout=5.0)
try:
response = future.result(timeout=10.0)
print(response.reply)
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
except grpc.FutureTimeoutError:
print("Future timed out")
# With credentials and compression
ssl_creds = grpc.ssl_channel_credentials()
call_creds = grpc.access_token_call_credentials("token")
channel_creds = grpc.composite_channel_credentials(ssl_creds, call_creds)
secure_channel = grpc.secure_channel('secure-server.com:443', channel_creds)
secure_stub = my_service_pb2_grpc.MyServiceStub(secure_channel)
response = secure_stub.UnaryMethod(
request,
timeout=30.0,
compression=grpc.compression.Gzip,
metadata=[('request-id', 'req-123')]
)Single request to multiple response RPC pattern with iterator-based response consumption.
class UnaryStreamMultiCallable(abc.ABC):
"""Affords invoking a unary-stream RPC from client-side."""
def __call__(self, request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Invokes the underlying RPC.
Parameters:
- request: The request value for the RPC
- timeout: Optional duration in seconds (None for infinite)
- metadata: Optional metadata to be transmitted to the service-side
- credentials: Optional CallCredentials for the RPC (secure Channel only)
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
- compression: Optional compression element
Returns:
Iterator: Call-iterator for response values and Future for RPC completion
Note:
Drawing response values may raise RpcError indicating non-OK termination
"""Usage Examples:
# Unary-stream call
request = my_service_pb2.StreamRequest(count=5)
response_iterator = stub.UnaryStreamMethod(request, timeout=30.0)
# Iterate over responses
try:
for response in response_iterator:
print(f"Received: {response.message}")
# Can break early if needed
if should_stop():
break
except grpc.RpcError as e:
print(f"Stream failed: {e.code()} - {e.details()}")
# Access call information
response_iterator = stub.UnaryStreamMethod(request)
print(f"Initial metadata: {response_iterator.initial_metadata()}")
try:
responses = list(response_iterator) # Consume all responses
print(f"Trailing metadata: {response_iterator.trailing_metadata()}")
print(f"Final status: {response_iterator.code()}")
except grpc.RpcError as e:
print(f"Stream terminated with error: {e}")Multiple request to single response RPC pattern with iterator-based request sending.
class StreamUnaryMultiCallable(abc.ABC):
"""Affords invoking a stream-unary RPC from client-side."""
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Synchronously invokes the underlying RPC.
Parameters:
- request_iterator: An iterator that yields request values for the RPC
- timeout: Optional duration in seconds (None for infinite)
- metadata: Optional metadata to be transmitted to the service-side
- credentials: Optional CallCredentials for the RPC (secure Channel only)
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
- compression: Optional compression element
Returns:
The response value for the RPC
Raises:
RpcError: Indicating that the RPC terminated with non-OK status
"""
def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Synchronously invokes the underlying RPC and returns Call object.
Returns:
tuple: (response_value, Call) - Response and Call object for RPC metadata
"""
def future(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Asynchronously invokes the underlying RPC.
Returns:
Future: Call-Future object for asynchronous result retrieval
"""Usage Examples:
# Stream-unary call with generator
def request_generator():
for i in range(10):
yield my_service_pb2.StreamRequest(data=f"item-{i}")
time.sleep(0.1) # Simulate processing time
response = stub.StreamUnaryMethod(request_generator(), timeout=30.0)
print(f"Final result: {response.summary}")
# Stream-unary with list of requests
requests = [
my_service_pb2.StreamRequest(data="first"),
my_service_pb2.StreamRequest(data="second"),
my_service_pb2.StreamRequest(data="third"),
]
response, call = stub.StreamUnaryMethod.with_call(iter(requests))
print(f"Response: {response.summary}")
print(f"Metadata: {dict(call.trailing_metadata())}")
# Asynchronous stream-unary
def async_request_generator():
for i in range(100):
yield my_service_pb2.StreamRequest(data=f"batch-{i}")
future = stub.StreamUnaryMethod.future(async_request_generator(), timeout=60.0)
# Do other work while RPC executes
do_other_work()
# Get result when ready
try:
response = future.result(timeout=10.0)
print(f"Async result: {response.summary}")
except grpc.FutureTimeoutError:
print("Still waiting for result...")
response = future.result() # Wait indefinitelyBidirectional streaming RPC pattern with full-duplex communication capabilities.
class StreamStreamMultiCallable(abc.ABC):
"""Affords invoking a stream-stream RPC on client-side."""
def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None):
"""
Invokes the underlying RPC on the client.
Parameters:
- request_iterator: An iterator that yields request values for the RPC
- timeout: Optional duration in seconds (None for infinite)
- metadata: Optional metadata to be transmitted to the service-side
- credentials: Optional CallCredentials for the RPC (secure Channel only)
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
- compression: Optional compression element
Returns:
Iterator: Call-iterator for response values and Future for RPC completion
Note:
Drawing response values may raise RpcError indicating non-OK termination
"""Usage Examples:
# Bidirectional streaming
def request_generator():
for i in range(5):
yield my_service_pb2.ChatMessage(user="client", message=f"Message {i}")
time.sleep(1)
response_iterator = stub.StreamStreamMethod(request_generator())
# Process responses as they arrive
try:
for response in response_iterator:
print(f"Server says: {response.message}")
except grpc.RpcError as e:
print(f"Stream error: {e}")
# Chat-like bidirectional streaming
import threading
import queue
def chat_client():
# Queue for sending messages
message_queue = queue.Queue()
def request_generator():
while True:
try:
message = message_queue.get(timeout=1.0)
if message is None: # Sentinel to stop
break
yield my_service_pb2.ChatMessage(user="client", message=message)
except queue.Empty:
continue
# Start streaming RPC
response_iterator = stub.ChatMethod(request_generator())
# Thread to handle incoming messages
def handle_responses():
try:
for response in response_iterator:
print(f"[{response.user}]: {response.message}")
except grpc.RpcError as e:
print(f"Chat ended: {e}")
response_thread = threading.Thread(target=handle_responses)
response_thread.start()
# Send messages from user input
try:
while True:
user_input = input("You: ")
if user_input.lower() == '/quit':
break
message_queue.put(user_input)
finally:
message_queue.put(None) # Signal to stop request generator
response_thread.join()
# Advanced bidirectional streaming with flow control
class FlowControlledStreaming:
def __init__(self, stub):
self.stub = stub
self.request_queue = queue.Queue(maxsize=10) # Limit pending requests
self.stop_event = threading.Event()
def request_generator(self):
while not self.stop_event.is_set():
try:
request = self.request_queue.get(timeout=0.5)
yield request
except queue.Empty:
continue
def start_streaming(self):
response_iterator = self.stub.StreamStreamMethod(
self.request_generator(),
timeout=300.0
)
for response in response_iterator:
self.process_response(response)
# Flow control: only send new requests after processing response
if not self.request_queue.full():
self.maybe_send_request()
def send_request(self, request):
if not self.request_queue.full():
self.request_queue.put(request)
else:
print("Request queue full, dropping request")
def stop(self):
self.stop_event.set()Access to RPC metadata, status, and control information through Call objects.
class Call(RpcContext):
"""Invocation-side utility object for an RPC."""
def initial_metadata(self):
"""
Accesses the initial metadata sent by the server.
This method blocks until the value is available.
Returns:
Metadata: The initial metadata key-value pairs
"""
def trailing_metadata(self):
"""
Accesses the trailing metadata sent by the server.
This method blocks until the value is available.
Returns:
Metadata: The trailing metadata key-value pairs
"""
def code(self) -> StatusCode:
"""
Accesses the status code sent by the server.
This method blocks until the value is available.
Returns:
StatusCode: The status code value for the RPC
"""
def details(self) -> str:
"""
Accesses the details sent by the server.
This method blocks until the value is available.
Returns:
str: The details string of the RPC
"""
def is_active(self) -> bool:
"""
Describes whether the RPC is active or has terminated.
Returns:
bool: True if RPC is active, False otherwise
"""
def time_remaining(self):
"""
Describes the length of allowed time remaining for the RPC.
Returns:
float or None: Seconds remaining for RPC completion, or None if no deadline
"""
def cancel(self):
"""
Cancels the RPC.
Idempotent and has no effect if the RPC has already terminated.
"""
def add_callback(self, callback) -> bool:
"""
Registers a callback to be called on RPC termination.
Parameters:
- callback: A no-parameter callable to be called on RPC termination
Returns:
bool: True if callback was added, False if RPC already terminated
"""Usage Examples:
# Access call metadata and status
response, call = stub.UnaryMethod.with_call(request)
print(f"Initial metadata: {dict(call.initial_metadata())}")
print(f"Status code: {call.code()}")
print(f"Status details: {call.details()}")
print(f"Trailing metadata: {dict(call.trailing_metadata())}")
# RPC cancellation
future = stub.LongRunningMethod.future(request)
# Cancel after 5 seconds if not done
def cancel_if_needed():
time.sleep(5)
if not future.done():
future.cancel()
print("RPC cancelled due to timeout")
threading.Thread(target=cancel_if_needed).start()
try:
response = future.result()
except grpc.FutureCancelledError:
print("RPC was cancelled")
# Streaming with call control
response_iterator = stub.UnaryStreamMethod(request)
def handle_cancellation():
time.sleep(10)
if response_iterator.is_active():
response_iterator.cancel()
print("Stream cancelled")
threading.Thread(target=handle_cancellation).start()
try:
for response in response_iterator:
print(f"Response: {response}")
if not response_iterator.is_active():
break
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
print("Stream was cancelled")class ClientCallDetails(abc.ABC):
"""
Describes an RPC to be invoked.
Attributes:
- method: The method name of the RPC
- timeout: Optional duration of time in seconds to allow for the RPC
- metadata: Optional metadata to be transmitted to the service-side
- credentials: Optional CallCredentials for the RPC
- wait_for_ready: Optional flag to enable wait_for_ready mechanism
- compression: Optional compression element
"""
class RpcContext(abc.ABC):
"""Provides RPC-related information and action."""
def is_active(self) -> bool:
"""Returns True if RPC is active, False otherwise."""
def time_remaining(self):
"""Returns seconds remaining for RPC or None if no deadline."""
def cancel(self):
"""Cancels the RPC. Idempotent."""
def add_callback(self, callback) -> bool:
"""Registers callback for RPC termination."""Install with Tessl CLI
npx tessl i tessl/pypi-grpcio