or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-api.mdchannel-management.mderror-handling.mdindex.mdinterceptors.mdprotobuf-integration.mdrpc-patterns.mdsecurity-authentication.mdserver-implementation.md

error-handling.mddocs/

0

# Error Handling and Status

1

2

Comprehensive error handling framework with gRPC status codes, custom exceptions, detailed error information, proper exception propagation for both sync and async contexts, and service-side error management capabilities.

3

4

## Capabilities

5

6

### Status Codes

7

8

Standard gRPC status codes for consistent error reporting across all RPC operations.

9

10

```python { .api }

11

class StatusCode(enum.Enum):

12

"""Mirrors grpc_status_code in the gRPC Core."""

13

14

OK = ... # Not an error; returned on success

15

CANCELLED = ... # The operation was cancelled (typically by the caller)

16

UNKNOWN = ... # Unknown error

17

INVALID_ARGUMENT = ... # Client specified an invalid argument

18

DEADLINE_EXCEEDED = ... # Deadline expired before operation could complete

19

NOT_FOUND = ... # Some requested entity was not found

20

ALREADY_EXISTS = ... # Some entity that we attempted to create already exists

21

PERMISSION_DENIED = ... # The caller does not have permission to execute the operation

22

UNAUTHENTICATED = ... # The request does not have valid authentication credentials

23

RESOURCE_EXHAUSTED = ... # Some resource has been exhausted (e.g., per-user quota)

24

FAILED_PRECONDITION = ... # Operation was rejected because system is not in required state

25

ABORTED = ... # The operation was aborted, typically due to concurrency issue

26

OUT_OF_RANGE = ... # Operation was attempted past the valid range

27

UNIMPLEMENTED = ... # Operation is not implemented or not supported/enabled

28

INTERNAL = ... # Internal errors; invariants expected by underlying system broken

29

UNAVAILABLE = ... # The service is currently unavailable

30

DATA_LOSS = ... # Unrecoverable data loss or corruption

31

```

32

33

**Usage Examples:**

34

35

```python

36

# Client-side status code handling

37

try:

38

response = stub.MyMethod(request)

39

except grpc.RpcError as e:

40

if e.code() == grpc.StatusCode.NOT_FOUND:

41

print("Resource not found")

42

elif e.code() == grpc.StatusCode.PERMISSION_DENIED:

43

print("Access denied")

44

elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:

45

print("Request timed out")

46

elif e.code() == grpc.StatusCode.UNAVAILABLE:

47

print("Service unavailable - retrying might help")

48

else:

49

print(f"RPC failed: {e.code()} - {e.details()}")

50

51

# Server-side status code setting

52

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):

53

def GetUser(self, request, context):

54

user = self.find_user(request.user_id)

55

if not user:

56

context.set_code(grpc.StatusCode.NOT_FOUND)

57

context.set_details(f"User {request.user_id} not found")

58

return my_service_pb2.GetUserResponse()

59

60

if not self.has_permission(context, user):

61

context.abort(grpc.StatusCode.PERMISSION_DENIED, "Access denied")

62

63

return my_service_pb2.GetUserResponse(user=user)

64

```

65

66

### RPC Exceptions

67

68

Client-side exception hierarchy for handling RPC failures with comprehensive error information.

69

70

```python { .api }

71

class RpcError(Exception):

72

"""

73

Raised by the gRPC library to indicate non-OK-status RPC termination.

74

Also implements Call interface for accessing RPC metadata and status.

75

"""

76

77

def code(self) -> StatusCode:

78

"""Returns the status code sent by the server."""

79

80

def details(self) -> str:

81

"""Returns the details sent by the server."""

82

83

def initial_metadata(self):

84

"""Returns the initial metadata sent by the server."""

85

86

def trailing_metadata(self):

87

"""Returns the trailing metadata sent by the server."""

88

89

class FutureTimeoutError(Exception):

90

"""Indicates that a method call on a Future timed out."""

91

92

class FutureCancelledError(Exception):

93

"""Indicates that the computation underlying a Future was cancelled."""

94

```

95

96

**Usage Examples:**

97

98

```python

99

# Comprehensive error handling

100

def handle_rpc_call():

101

try:

102

response = stub.MyMethod(request, timeout=10.0)

103

return response

104

except grpc.RpcError as e:

105

# Access detailed error information

106

print(f"RPC failed with status: {e.code()}")

107

print(f"Error details: {e.details()}")

108

109

# Access metadata for debugging

110

initial_md = dict(e.initial_metadata())

111

trailing_md = dict(e.trailing_metadata())

112

print(f"Server metadata: initial={initial_md}, trailing={trailing_md}")

113

114

# Handle specific error conditions

115

if e.code() == grpc.StatusCode.UNAUTHENTICATED:

116

# Refresh authentication and retry

117

refresh_credentials()

118

return retry_rpc_call()

119

elif e.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:

120

# Implement backoff and retry

121

time.sleep(1.0)

122

return retry_rpc_call()

123

else:

124

# Log and re-raise for unhandled errors

125

log_error(f"Unhandled RPC error: {e}")

126

raise

127

except grpc.FutureTimeoutError:

128

print("RPC future timed out")

129

raise

130

except grpc.FutureCancelledError:

131

print("RPC was cancelled")

132

raise

133

134

# Async error handling

135

async def handle_async_rpc():

136

try:

137

response = await stub.AsyncMethod(request)

138

return response

139

except grpc.aio.AioRpcError as e:

140

print(f"Async RPC failed: {e.code()} - {e.details()}")

141

raise

142

except asyncio.TimeoutError:

143

print("Async operation timed out")

144

raise

145

```

146

147

### Server-Side Error Management

148

149

Service-side context methods for controlling RPC status, error details, and graceful error handling.

150

151

```python { .api }

152

class ServicerContext(RpcContext):

153

"""Context object for server-side error management."""

154

155

def abort(self, code: StatusCode, details: str):

156

"""

157

Raises an exception to terminate the RPC with a non-OK status.

158

159

Parameters:

160

- code: A StatusCode object (must not be StatusCode.OK)

161

- details: A UTF-8-encodable string to be sent to the client

162

163

Raises:

164

Exception: Always raised to signal the abortion of the RPC

165

"""

166

167

def abort_with_status(self, status):

168

"""

169

Raises an exception to terminate the RPC with a status object (EXPERIMENTAL).

170

171

Parameters:

172

- status: A grpc.Status object (status code must not be StatusCode.OK)

173

174

Raises:

175

Exception: Always raised to signal the abortion of the RPC

176

"""

177

178

def set_code(self, code: StatusCode):

179

"""

180

Sets the value to be used as status code upon RPC completion.

181

182

Parameters:

183

- code: A StatusCode object to be sent to the client

184

"""

185

186

def set_details(self, details: str):

187

"""

188

Sets the value to be used as detail string upon RPC completion.

189

190

Parameters:

191

- details: A UTF-8-encodable string to be sent to the client

192

"""

193

194

def code(self) -> StatusCode:

195

"""

196

Accesses the value to be used as status code upon RPC completion (EXPERIMENTAL).

197

198

Returns:

199

StatusCode: The status code value for the RPC

200

"""

201

202

def details(self) -> str:

203

"""

204

Accesses the value to be used as detail string upon RPC completion (EXPERIMENTAL).

205

206

Returns:

207

str: The details string of the RPC

208

"""

209

```

210

211

**Usage Examples:**

212

213

```python

214

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):

215

def CreateUser(self, request, context):

216

# Input validation

217

if not request.username:

218

context.abort(

219

grpc.StatusCode.INVALID_ARGUMENT,

220

"Username is required"

221

)

222

223

if len(request.username) < 3:

224

context.abort(

225

grpc.StatusCode.INVALID_ARGUMENT,

226

"Username must be at least 3 characters long"

227

)

228

229

# Business logic with error handling

230

try:

231

user = self.user_service.create_user(request.username, request.email)

232

return my_service_pb2.CreateUserResponse(user=user)

233

except UserAlreadyExistsError:

234

context.abort(

235

grpc.StatusCode.ALREADY_EXISTS,

236

f"User '{request.username}' already exists"

237

)

238

except ValidationError as e:

239

context.abort(

240

grpc.StatusCode.INVALID_ARGUMENT,

241

f"Validation failed: {str(e)}"

242

)

243

except DatabaseError as e:

244

# Log internal error but don't expose details to client

245

logger.error(f"Database error creating user: {e}")

246

context.abort(

247

grpc.StatusCode.INTERNAL,

248

"Internal server error"

249

)

250

except Exception as e:

251

# Catch-all for unexpected errors

252

logger.error(f"Unexpected error: {e}")

253

context.abort(

254

grpc.StatusCode.UNKNOWN,

255

"An unexpected error occurred"

256

)

257

258

def GetUsers(self, request, context):

259

# Authentication check

260

if not self.is_authenticated(context):

261

context.abort(

262

grpc.StatusCode.UNAUTHENTICATED,

263

"Authentication required"

264

)

265

266

# Authorization check

267

if not self.has_permission(context, "read_users"):

268

context.abort(

269

grpc.StatusCode.PERMISSION_DENIED,

270

"Insufficient permissions to read users"

271

)

272

273

try:

274

users = self.user_service.get_users(

275

limit=request.limit,

276

offset=request.offset

277

)

278

return my_service_pb2.GetUsersResponse(users=users)

279

except ResourceExhaustedError:

280

context.abort(

281

grpc.StatusCode.RESOURCE_EXHAUSTED,

282

"Too many requests - please try again later"

283

)

284

285

def StreamingMethod(self, request, context):

286

"""Example of error handling in streaming methods."""

287

try:

288

for item in self.get_stream_data(request):

289

# Check if client cancelled

290

if not context.is_active():

291

logger.info("Client cancelled streaming request")

292

break

293

294

yield my_service_pb2.StreamResponse(data=item)

295

except DataCorruptionError as e:

296

# Set status but let the method complete normally

297

context.set_code(grpc.StatusCode.DATA_LOSS)

298

context.set_details(f"Data corruption detected: {str(e)}")

299

except Exception as e:

300

logger.error(f"Streaming error: {e}")

301

context.abort(grpc.StatusCode.INTERNAL, "Stream processing failed")

302

303

# Graceful error handling with cleanup

304

class DatabaseServicer(my_service_pb2_grpc.DatabaseServicer):

305

def ProcessTransaction(self, request, context):

306

transaction = None

307

try:

308

# Start transaction

309

transaction = self.db.begin_transaction()

310

311

# Process operations

312

for operation in request.operations:

313

self.execute_operation(transaction, operation)

314

315

# Commit transaction

316

transaction.commit()

317

return my_service_pb2.TransactionResponse(success=True)

318

319

except ValidationError as e:

320

if transaction:

321

transaction.rollback()

322

context.abort(

323

grpc.StatusCode.INVALID_ARGUMENT,

324

f"Invalid operation: {str(e)}"

325

)

326

except ConcurrencyError as e:

327

if transaction:

328

transaction.rollback()

329

context.abort(

330

grpc.StatusCode.ABORTED,

331

"Transaction aborted due to concurrent modification"

332

)

333

except Exception as e:

334

if transaction:

335

transaction.rollback()

336

logger.error(f"Transaction failed: {e}")

337

context.abort(

338

grpc.StatusCode.INTERNAL,

339

"Transaction processing failed"

340

)

341

```

342

343

### Status Objects

344

345

Status object interface for comprehensive error information (EXPERIMENTAL).

346

347

```python { .api }

348

class Status(abc.ABC):

349

"""

350

Describes the status of an RPC (EXPERIMENTAL).

351

352

Attributes:

353

- code: A StatusCode object to be sent to the client

354

- details: A UTF-8-encodable string to be sent upon termination

355

- trailing_metadata: The trailing metadata in the RPC

356

"""

357

```

358

359

### Error Handling Patterns

360

361

Common patterns for robust error handling in gRPC applications.

362

363

**Retry Logic:**

364

365

```python

366

import time

367

import random

368

369

def exponential_backoff_retry(rpc_func, max_retries=3, base_delay=1.0):

370

"""Retry RPC with exponential backoff."""

371

for attempt in range(max_retries + 1):

372

try:

373

return rpc_func()

374

except grpc.RpcError as e:

375

if attempt == max_retries:

376

raise # Final attempt, re-raise

377

378

# Only retry on transient errors

379

if e.code() in [

380

grpc.StatusCode.UNAVAILABLE,

381

grpc.StatusCode.DEADLINE_EXCEEDED,

382

grpc.StatusCode.RESOURCE_EXHAUSTED,

383

grpc.StatusCode.ABORTED

384

]:

385

delay = base_delay * (2 ** attempt) + random.uniform(0, 1)

386

print(f"Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")

387

time.sleep(delay)

388

else:

389

raise # Don't retry non-transient errors

390

391

# Usage

392

def make_rpc():

393

return stub.MyMethod(request, timeout=10.0)

394

395

response = exponential_backoff_retry(make_rpc)

396

```

397

398

**Circuit Breaker Pattern:**

399

400

```python

401

import time

402

from collections import defaultdict

403

from enum import Enum

404

405

class CircuitState(Enum):

406

CLOSED = "closed"

407

OPEN = "open"

408

HALF_OPEN = "half_open"

409

410

class CircuitBreaker:

411

def __init__(self, failure_threshold=5, timeout=60):

412

self.failure_threshold = failure_threshold

413

self.timeout = timeout

414

self.failure_count = 0

415

self.last_failure_time = 0

416

self.state = CircuitState.CLOSED

417

418

def call(self, rpc_func):

419

if self.state == CircuitState.OPEN:

420

if time.time() - self.last_failure_time > self.timeout:

421

self.state = CircuitState.HALF_OPEN

422

else:

423

raise grpc.RpcError("Circuit breaker is OPEN")

424

425

try:

426

result = rpc_func()

427

# Success - reset failure count

428

if self.state == CircuitState.HALF_OPEN:

429

self.state = CircuitState.CLOSED

430

self.failure_count = 0

431

return result

432

433

except grpc.RpcError as e:

434

self.failure_count += 1

435

self.last_failure_time = time.time()

436

437

if self.failure_count >= self.failure_threshold:

438

self.state = CircuitState.OPEN

439

440

raise

441

442

# Usage

443

circuit_breaker = CircuitBreaker()

444

response = circuit_breaker.call(lambda: stub.MyMethod(request))

445

```

446

447

**Structured Error Response:**

448

449

```python

450

# Server-side structured error details

451

import json

452

453

class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):

454

def ValidateData(self, request, context):

455

errors = []

456

457

# Collect all validation errors

458

if not request.email:

459

errors.append({"field": "email", "message": "Email is required"})

460

elif not self.is_valid_email(request.email):

461

errors.append({"field": "email", "message": "Invalid email format"})

462

463

if not request.age or request.age < 0:

464

errors.append({"field": "age", "message": "Age must be a positive number"})

465

466

if errors:

467

# Include structured error details in metadata

468

error_details = json.dumps({"validation_errors": errors})

469

context.set_trailing_metadata([("error-details", error_details)])

470

context.abort(

471

grpc.StatusCode.INVALID_ARGUMENT,

472

f"Validation failed: {len(errors)} errors found"

473

)

474

475

return my_service_pb2.ValidationResponse(valid=True)

476

477

# Client-side structured error handling

478

try:

479

response = stub.ValidateData(request)

480

except grpc.RpcError as e:

481

if e.code() == grpc.StatusCode.INVALID_ARGUMENT:

482

trailing_md = dict(e.trailing_metadata())

483

if "error-details" in trailing_md:

484

error_details = json.loads(trailing_md["error-details"])

485

print("Validation errors:")

486

for error in error_details["validation_errors"]:

487

print(f" {error['field']}: {error['message']}")

488

```

489

490

## Types

491

492

```python { .api }

493

class RpcContext(abc.ABC):

494

"""Provides RPC-related information and action."""

495

496

def is_active(self) -> bool:

497

"""Describes whether the RPC is active or has terminated."""

498

499

def time_remaining(self):

500

"""Describes the length of allowed time remaining for the RPC."""

501

502

def cancel(self):

503

"""Cancels the RPC. Idempotent and has no effect if already terminated."""

504

505

def add_callback(self, callback) -> bool:

506

"""Registers a callback to be called on RPC termination."""

507

508

class Call(RpcContext):

509

"""Invocation-side utility object for an RPC."""

510

511

def initial_metadata(self):

512

"""Accesses the initial metadata sent by the server."""

513

514

def trailing_metadata(self):

515

"""Accesses the trailing metadata sent by the server."""

516

517

def code(self) -> StatusCode:

518

"""Accesses the status code sent by the server."""

519

520

def details(self) -> str:

521

"""Accesses the details sent by the server."""

522

```