0
# Error Handling and Status
1
2
Comprehensive error handling framework with gRPC status codes, custom exceptions, detailed error information, proper exception propagation for both sync and async contexts, and service-side error management capabilities.
3
4
## Capabilities
5
6
### Status Codes
7
8
Standard gRPC status codes for consistent error reporting across all RPC operations.
9
10
```python { .api }
11
class StatusCode(enum.Enum):
12
"""Mirrors grpc_status_code in the gRPC Core."""
13
14
OK = ... # Not an error; returned on success
15
CANCELLED = ... # The operation was cancelled (typically by the caller)
16
UNKNOWN = ... # Unknown error
17
INVALID_ARGUMENT = ... # Client specified an invalid argument
18
DEADLINE_EXCEEDED = ... # Deadline expired before operation could complete
19
NOT_FOUND = ... # Some requested entity was not found
20
ALREADY_EXISTS = ... # Some entity that we attempted to create already exists
21
PERMISSION_DENIED = ... # The caller does not have permission to execute the operation
22
UNAUTHENTICATED = ... # The request does not have valid authentication credentials
23
RESOURCE_EXHAUSTED = ... # Some resource has been exhausted (e.g., per-user quota)
24
FAILED_PRECONDITION = ... # Operation was rejected because system is not in required state
25
ABORTED = ... # The operation was aborted, typically due to concurrency issue
26
OUT_OF_RANGE = ... # Operation was attempted past the valid range
27
UNIMPLEMENTED = ... # Operation is not implemented or not supported/enabled
28
INTERNAL = ... # Internal errors; invariants expected by underlying system broken
29
UNAVAILABLE = ... # The service is currently unavailable
30
DATA_LOSS = ... # Unrecoverable data loss or corruption
31
```
32
33
**Usage Examples:**
34
35
```python
36
# Client-side status code handling
37
try:
38
response = stub.MyMethod(request)
39
except grpc.RpcError as e:
40
if e.code() == grpc.StatusCode.NOT_FOUND:
41
print("Resource not found")
42
elif e.code() == grpc.StatusCode.PERMISSION_DENIED:
43
print("Access denied")
44
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
45
print("Request timed out")
46
elif e.code() == grpc.StatusCode.UNAVAILABLE:
47
print("Service unavailable - retrying might help")
48
else:
49
print(f"RPC failed: {e.code()} - {e.details()}")
50
51
# Server-side status code setting
52
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
53
def GetUser(self, request, context):
54
user = self.find_user(request.user_id)
55
if not user:
56
context.set_code(grpc.StatusCode.NOT_FOUND)
57
context.set_details(f"User {request.user_id} not found")
58
return my_service_pb2.GetUserResponse()
59
60
if not self.has_permission(context, user):
61
context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")
62
63
return my_service_pb2.GetUserResponse(user=user)
64
```
65
66
### RPC Exceptions
67
68
Client-side exception hierarchy for handling RPC failures with comprehensive error information.
69
70
```python { .api }
71
class RpcError(Exception):
72
"""
73
Raised by the gRPC library to indicate non-OK-status RPC termination.
74
Also implements Call interface for accessing RPC metadata and status.
75
"""
76
77
def code(self) -> StatusCode:
78
"""Returns the status code sent by the server."""
79
80
def details(self) -> str:
81
"""Returns the details sent by the server."""
82
83
def initial_metadata(self):
84
"""Returns the initial metadata sent by the server."""
85
86
def trailing_metadata(self):
87
"""Returns the trailing metadata sent by the server."""
88
89
class FutureTimeoutError(Exception):
90
"""Indicates that a method call on a Future timed out."""
91
92
class FutureCancelledError(Exception):
93
"""Indicates that the computation underlying a Future was cancelled."""
94
```
95
96
**Usage Examples:**
97
98
```python
99
# Comprehensive error handling
100
def handle_rpc_call():
101
try:
102
response = stub.MyMethod(request, timeout=10.0)
103
return response
104
except grpc.RpcError as e:
105
# Access detailed error information
106
print(f"RPC failed with status: {e.code()}")
107
print(f"Error details: {e.details()}")
108
109
# Access metadata for debugging
110
initial_md = dict(e.initial_metadata())
111
trailing_md = dict(e.trailing_metadata())
112
print(f"Server metadata: initial={initial_md}, trailing={trailing_md}")
113
114
# Handle specific error conditions
115
if e.code() == grpc.StatusCode.UNAUTHENTICATED:
116
# Refresh authentication and retry
117
refresh_credentials()
118
return retry_rpc_call()
119
elif e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
120
# Implement backoff and retry
121
time.sleep(1.0)
122
return retry_rpc_call()
123
else:
124
# Log and re-raise for unhandled errors
125
log_error(f"Unhandled RPC error: {e}")
126
raise
127
except grpc.FutureTimeoutError:
128
print("RPC future timed out")
129
raise
130
except grpc.FutureCancelledError:
131
print("RPC was cancelled")
132
raise
133
134
# Async error handling
135
async def handle_async_rpc():
136
try:
137
response = await stub.AsyncMethod(request)
138
return response
139
except grpc.aio.AioRpcError as e:
140
print(f"Async RPC failed: {e.code()} - {e.details()}")
141
raise
142
except asyncio.TimeoutError:
143
print("Async operation timed out")
144
raise
145
```
146
147
### Server-Side Error Management
148
149
Service-side context methods for controlling RPC status, error details, and graceful error handling.
150
151
```python { .api }
152
class ServicerContext(RpcContext):
153
"""Context object for server-side error management."""
154
155
def abort(self, code: StatusCode, details: str):
156
"""
157
Raises an exception to terminate the RPC with a non-OK status.
158
159
Parameters:
160
- code: A StatusCode object (must not be StatusCode.OK)
161
- details: A UTF-8-encodable string to be sent to the client
162
163
Raises:
164
Exception: Always raised to signal the abortion of the RPC
165
"""
166
167
def abort_with_status(self, status):
168
"""
169
Raises an exception to terminate the RPC with a status object (EXPERIMENTAL).
170
171
Parameters:
172
- status: A grpc.Status object (status code must not be StatusCode.OK)
173
174
Raises:
175
Exception: Always raised to signal the abortion of the RPC
176
"""
177
178
def set_code(self, code: StatusCode):
179
"""
180
Sets the value to be used as status code upon RPC completion.
181
182
Parameters:
183
- code: A StatusCode object to be sent to the client
184
"""
185
186
def set_details(self, details: str):
187
"""
188
Sets the value to be used as detail string upon RPC completion.
189
190
Parameters:
191
- details: A UTF-8-encodable string to be sent to the client
192
"""
193
194
def code(self) -> StatusCode:
195
"""
196
Accesses the value to be used as status code upon RPC completion (EXPERIMENTAL).
197
198
Returns:
199
StatusCode: The status code value for the RPC
200
"""
201
202
def details(self) -> str:
203
"""
204
Accesses the value to be used as detail string upon RPC completion (EXPERIMENTAL).
205
206
Returns:
207
str: The details string of the RPC
208
"""
209
```
210
211
**Usage Examples:**
212
213
```python
214
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
215
def CreateUser(self, request, context):
216
# Input validation
217
if not request.username:
218
context.abort(
219
grpc.StatusCode.INVALID_ARGUMENT,
220
"Username is required"
221
)
222
223
if len(request.username) < 3:
224
context.abort(
225
grpc.StatusCode.INVALID_ARGUMENT,
226
"Username must be at least 3 characters long"
227
)
228
229
# Business logic with error handling
230
try:
231
user = self.user_service.create_user(request.username, request.email)
232
return my_service_pb2.CreateUserResponse(user=user)
233
except UserAlreadyExistsError:
234
context.abort(
235
grpc.StatusCode.ALREADY_EXISTS,
236
f"User '{request.username}' already exists"
237
)
238
except ValidationError as e:
239
context.abort(
240
grpc.StatusCode.INVALID_ARGUMENT,
241
f"Validation failed: {str(e)}"
242
)
243
except DatabaseError as e:
244
# Log internal error but don't expose details to client
245
logger.error(f"Database error creating user: {e}")
246
context.abort(
247
grpc.StatusCode.INTERNAL,
248
"Internal server error"
249
)
250
except Exception as e:
251
# Catch-all for unexpected errors
252
logger.error(f"Unexpected error: {e}")
253
context.abort(
254
grpc.StatusCode.UNKNOWN,
255
"An unexpected error occurred"
256
)
257
258
def GetUsers(self, request, context):
259
# Authentication check
260
if not self.is_authenticated(context):
261
context.abort(
262
grpc.StatusCode.UNAUTHENTICATED,
263
"Authentication required"
264
)
265
266
# Authorization check
267
if not self.has_permission(context, "read_users"):
268
context.abort(
269
grpc.StatusCode.PERMISSION_DENIED,
270
"Insufficient permissions to read users"
271
)
272
273
try:
274
users = self.user_service.get_users(
275
limit=request.limit,
276
offset=request.offset
277
)
278
return my_service_pb2.GetUsersResponse(users=users)
279
except ResourceExhaustedError:
280
context.abort(
281
grpc.StatusCode.RESOURCE_EXHAUSTED,
282
"Too many requests - please try again later"
283
)
284
285
def StreamingMethod(self, request, context):
286
"""Example of error handling in streaming methods."""
287
try:
288
for item in self.get_stream_data(request):
289
# Check if client cancelled
290
if not context.is_active():
291
logger.info("Client cancelled streaming request")
292
break
293
294
yield my_service_pb2.StreamResponse(data=item)
295
except DataCorruptionError as e:
296
# Set status but let the method complete normally
297
context.set_code(grpc.StatusCode.DATA_LOSS)
298
context.set_details(f"Data corruption detected: {str(e)}")
299
except Exception as e:
300
logger.error(f"Streaming error: {e}")
301
context.abort(grpc.StatusCode.INTERNAL, "Stream processing failed")
302
303
# Graceful error handling with cleanup
304
class DatabaseServicer(my_service_pb2_grpc.DatabaseServicer):
305
def ProcessTransaction(self, request, context):
306
transaction = None
307
try:
308
# Start transaction
309
transaction = self.db.begin_transaction()
310
311
# Process operations
312
for operation in request.operations:
313
self.execute_operation(transaction, operation)
314
315
# Commit transaction
316
transaction.commit()
317
return my_service_pb2.TransactionResponse(success=True)
318
319
except ValidationError as e:
320
if transaction:
321
transaction.rollback()
322
context.abort(
323
grpc.StatusCode.INVALID_ARGUMENT,
324
f"Invalid operation: {str(e)}"
325
)
326
except ConcurrencyError as e:
327
if transaction:
328
transaction.rollback()
329
context.abort(
330
grpc.StatusCode.ABORTED,
331
"Transaction aborted due to concurrent modification"
332
)
333
except Exception as e:
334
if transaction:
335
transaction.rollback()
336
logger.error(f"Transaction failed: {e}")
337
context.abort(
338
grpc.StatusCode.INTERNAL,
339
"Transaction processing failed"
340
)
341
```
342
343
### Status Objects
344
345
Status object interface for comprehensive error information (EXPERIMENTAL).
346
347
```python { .api }
348
class Status(abc.ABC):
349
"""
350
Describes the status of an RPC (EXPERIMENTAL).
351
352
Attributes:
353
- code: A StatusCode object to be sent to the client
354
- details: A UTF-8-encodable string to be sent upon termination
355
- trailing_metadata: The trailing metadata in the RPC
356
"""
357
```
358
359
### Error Handling Patterns
360
361
Common patterns for robust error handling in gRPC applications.
362
363
**Retry Logic:**
364
365
```python
366
import time
367
import random
368
369
def exponential_backoff_retry(rpc_func, max_retries=3, base_delay=1.0):
370
"""Retry RPC with exponential backoff."""
371
for attempt in range(max_retries + 1):
372
try:
373
return rpc_func()
374
except grpc.RpcError as e:
375
if attempt == max_retries:
376
raise # Final attempt, re-raise
377
378
# Only retry on transient errors
379
if e.code() in [
380
grpc.StatusCode.UNAVAILABLE,
381
grpc.StatusCode.DEADLINE_EXCEEDED,
382
grpc.StatusCode.RESOURCE_EXHAUSTED,
383
grpc.StatusCode.ABORTED
384
]:
385
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
386
print(f"Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")
387
time.sleep(delay)
388
else:
389
raise # Don't retry non-transient errors
390
391
# Usage
392
def make_rpc():
393
return stub.MyMethod(request, timeout=10.0)
394
395
response = exponential_backoff_retry(make_rpc)
396
```
397
398
**Circuit Breaker Pattern:**
399
400
```python
401
import time
402
from collections import defaultdict
403
from enum import Enum
404
405
class CircuitState(Enum):
406
CLOSED = "closed"
407
OPEN = "open"
408
HALF_OPEN = "half_open"
409
410
class CircuitBreaker:
411
def __init__(self, failure_threshold=5, timeout=60):
412
self.failure_threshold = failure_threshold
413
self.timeout = timeout
414
self.failure_count = 0
415
self.last_failure_time = 0
416
self.state = CircuitState.CLOSED
417
418
def call(self, rpc_func):
419
if self.state == CircuitState.OPEN:
420
if time.time() - self.last_failure_time > self.timeout:
421
self.state = CircuitState.HALF_OPEN
422
else:
423
raise grpc.RpcError("Circuit breaker is OPEN")
424
425
try:
426
result = rpc_func()
427
# Success - reset failure count
428
if self.state == CircuitState.HALF_OPEN:
429
self.state = CircuitState.CLOSED
430
self.failure_count = 0
431
return result
432
433
except grpc.RpcError as e:
434
self.failure_count += 1
435
self.last_failure_time = time.time()
436
437
if self.failure_count >= self.failure_threshold:
438
self.state = CircuitState.OPEN
439
440
raise
441
442
# Usage
443
circuit_breaker = CircuitBreaker()
444
response = circuit_breaker.call(lambda: stub.MyMethod(request))
445
```
446
447
**Structured Error Response:**
448
449
```python
450
# Server-side structured error details
451
import json
452
453
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
454
def ValidateData(self, request, context):
455
errors = []
456
457
# Collect all validation errors
458
if not request.email:
459
errors.append({"field": "email", "message": "Email is required"})
460
elif not self.is_valid_email(request.email):
461
errors.append({"field": "email", "message": "Invalid email format"})
462
463
if not request.age or request.age < 0:
464
errors.append({"field": "age", "message": "Age must be a positive number"})
465
466
if errors:
467
# Include structured error details in metadata
468
error_details = json.dumps({"validation_errors": errors})
469
context.set_trailing_metadata([("error-details", error_details)])
470
context.abort(
471
grpc.StatusCode.INVALID_ARGUMENT,
472
f"Validation failed: {len(errors)} errors found"
473
)
474
475
return my_service_pb2.ValidationResponse(valid=True)
476
477
# Client-side structured error handling
478
try:
479
response = stub.ValidateData(request)
480
except grpc.RpcError as e:
481
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
482
trailing_md = dict(e.trailing_metadata())
483
if "error-details" in trailing_md:
484
error_details = json.loads(trailing_md["error-details"])
485
print("Validation errors:")
486
for error in error_details["validation_errors"]:
487
print(f" {error['field']}: {error['message']}")
488
```
489
490
## Types
491
492
```python { .api }
493
class RpcContext(abc.ABC):
494
"""Provides RPC-related information and action."""
495
496
def is_active(self) -> bool:
497
"""Describes whether the RPC is active or has terminated."""
498
499
def time_remaining(self):
500
"""Describes the length of allowed time remaining for the RPC."""
501
502
def cancel(self):
503
"""Cancels the RPC. Idempotent and has no effect if already terminated."""
504
505
def add_callback(self, callback) -> bool:
506
"""Registers a callback to be called on RPC termination."""
507
508
class Call(RpcContext):
509
"""Invocation-side utility object for an RPC."""
510
511
def initial_metadata(self):
512
"""Accesses the initial metadata sent by the server."""
513
514
def trailing_metadata(self):
515
"""Accesses the trailing metadata sent by the server."""
516
517
def code(self) -> StatusCode:
518
"""Accesses the status code sent by the server."""
519
520
def details(self) -> str:
521
"""Accesses the details sent by the server."""
522
```