0
# Error Handling and Exceptions
1
2
Comprehensive error handling patterns and exception management for robust secret operations with proper authentication and network error handling. The Azure Key Vault Secrets library uses standard Azure Core exceptions for consistent error handling across Azure SDKs.
3
4
## Capabilities
5
6
### Core Exception Classes
7
8
Standard Azure Core exceptions used throughout the Key Vault Secrets library.
9
10
```python { .api }
11
# From azure.core.exceptions
12
13
class ResourceNotFoundError(HttpResponseError):
14
"""
15
Raised when a requested secret does not exist.
16
17
Common scenarios:
18
- Getting a secret that doesn't exist
19
- Updating a non-existent secret
20
- Operating on a deleted secret that has been purged
21
"""
22
23
class ResourceExistsError(HttpResponseError):
24
"""
25
Raised when attempting to create a resource that already exists.
26
27
Common scenarios:
28
- Restoring a secret when one with the same name already exists
29
- Conflicting operations during secret creation
30
"""
31
32
class ClientAuthenticationError(HttpResponseError):
33
"""
34
Raised when authentication with Azure Key Vault fails.
35
36
Common scenarios:
37
- Invalid or expired credentials
38
- Insufficient permissions for the operation
39
- Network issues preventing authentication
40
"""
41
42
class HttpResponseError(Exception):
43
"""
44
Base exception for HTTP response errors.
45
46
Properties:
47
- status_code (int): HTTP status code
48
- reason (str): HTTP reason phrase
49
- response (HttpResponse): Full HTTP response object
50
- message (str): Error message
51
"""
52
53
class ServiceRequestError(Exception):
54
"""
55
Raised when there's an error in the service request.
56
57
Common scenarios:
58
- Network connectivity issues
59
- DNS resolution problems
60
- Connection timeouts
61
"""
62
```
63
64
### Authentication Errors
65
66
Specific patterns and handling for authentication-related failures.
67
68
```python { .api }
69
# Authentication error scenarios and handling
70
71
def handle_authentication_error(error: ClientAuthenticationError) -> None:
72
"""
73
Handle authentication errors with appropriate remediation.
74
75
Common causes:
76
- Expired tokens or certificates
77
- Insufficient Key Vault permissions
78
- Invalid credentials configuration
79
- Network firewall blocking authentication endpoints
80
"""
81
```
82
83
### Permission and Access Errors
84
85
Handling authorization failures and access policy issues.
86
87
```python { .api }
88
# Permission error patterns (manifested as HttpResponseError with 403 status)
89
90
def handle_permission_error(error: HttpResponseError) -> None:
91
"""
92
Handle permission and authorization errors.
93
94
Common causes:
95
- Missing Key Vault access policies
96
- Insufficient RBAC roles
97
- Operations not allowed by access policy
98
- Vault access restricted by network rules
99
"""
100
```
101
102
### Resource State Errors
103
104
Handling errors related to secret state and lifecycle.
105
106
```python { .api }
107
def handle_secret_state_error(error: HttpResponseError) -> None:
108
"""
109
Handle errors related to secret state and lifecycle.
110
111
Common scenarios:
112
- Operating on disabled secrets
113
- Accessing expired secrets
114
- Conflicts with secret deletion state
115
- Version-specific operation errors
116
"""
117
```
118
119
## Error Handling Patterns
120
121
### Basic Exception Handling
122
123
```python
124
from azure.keyvault.secrets import SecretClient
125
from azure.identity import DefaultAzureCredential
126
from azure.core.exceptions import (
127
ResourceNotFoundError,
128
ClientAuthenticationError,
129
HttpResponseError
130
)
131
132
def safe_get_secret(client: SecretClient, name: str) -> Optional[str]:
133
"""Safely retrieve a secret with comprehensive error handling."""
134
try:
135
secret = client.get_secret(name)
136
return secret.value
137
138
except ResourceNotFoundError:
139
print(f"Secret '{name}' does not exist")
140
return None
141
142
except ClientAuthenticationError as e:
143
print(f"Authentication failed: {e}")
144
# Handle credential refresh or re-authentication
145
return None
146
147
except HttpResponseError as e:
148
if e.status_code == 403:
149
print(f"Access denied for secret '{name}': {e}")
150
elif e.status_code == 429:
151
print(f"Rate limited. Retry after: {e.response.headers.get('Retry-After', 'unknown')}")
152
else:
153
print(f"HTTP error {e.status_code}: {e}")
154
return None
155
156
except Exception as e:
157
print(f"Unexpected error: {e}")
158
return None
159
```
160
161
### Retry Logic for Transient Failures
162
163
```python
164
import time
165
from typing import Optional, Callable, TypeVar
166
from azure.core.exceptions import HttpResponseError, ServiceRequestError
167
168
T = TypeVar('T')
169
170
def retry_operation(
171
operation: Callable[[], T],
172
max_retries: int = 3,
173
initial_delay: float = 1.0,
174
backoff_multiplier: float = 2.0
175
) -> Optional[T]:
176
"""
177
Retry an operation with exponential backoff for transient failures.
178
"""
179
for attempt in range(max_retries):
180
try:
181
return operation()
182
183
except (ServiceRequestError, HttpResponseError) as e:
184
# Check if error is retryable
185
if isinstance(e, HttpResponseError):
186
# Don't retry client errors (4xx) except rate limiting (429)
187
if 400 <= e.status_code < 500 and e.status_code != 429:
188
raise
189
190
if attempt == max_retries - 1:
191
raise # Last attempt, re-raise the exception
192
193
delay = initial_delay * (backoff_multiplier ** attempt)
194
print(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
195
time.sleep(delay)
196
197
return None
198
199
# Usage example
200
def get_secret_with_retry(client: SecretClient, name: str) -> Optional[str]:
201
"""Get secret with automatic retry for transient failures."""
202
try:
203
operation = lambda: client.get_secret(name).value
204
return retry_operation(operation)
205
except Exception as e:
206
print(f"Failed to get secret after retries: {e}")
207
return None
208
```
209
210
### Comprehensive Secret Management with Error Handling
211
212
```python
213
from azure.keyvault.secrets import SecretClient
214
from azure.identity import DefaultAzureCredential
215
from azure.core.exceptions import *
216
from typing import Optional, Dict, Any
217
import logging
218
219
class SecureSecretManager:
220
"""
221
Wrapper class providing robust secret management with comprehensive error handling.
222
"""
223
224
def __init__(self, vault_url: str):
225
"""Initialize the secret manager with error handling."""
226
try:
227
credential = DefaultAzureCredential()
228
self.client = SecretClient(vault_url=vault_url, credential=credential)
229
self.vault_url = vault_url
230
logging.info(f"Initialized SecretManager for vault: {vault_url}")
231
except Exception as e:
232
logging.error(f"Failed to initialize SecretManager: {e}")
233
raise
234
235
def get_secret_safely(self, name: str, default: Optional[str] = None) -> Optional[str]:
236
"""
237
Safely retrieve a secret with fallback to default value.
238
"""
239
try:
240
secret = self.client.get_secret(name)
241
return secret.value
242
243
except ResourceNotFoundError:
244
logging.warning(f"Secret '{name}' not found, using default")
245
return default
246
247
except ClientAuthenticationError as e:
248
logging.error(f"Authentication failed for secret '{name}': {e}")
249
raise
250
251
except HttpResponseError as e:
252
if e.status_code == 403:
253
logging.error(f"Access denied for secret '{name}'")
254
elif e.status_code == 404:
255
logging.warning(f"Secret '{name}' not found")
256
return default
257
else:
258
logging.error(f"HTTP error getting secret '{name}': {e}")
259
return default
260
261
except Exception as e:
262
logging.error(f"Unexpected error getting secret '{name}': {e}")
263
return default
264
265
def set_secret_safely(
266
self,
267
name: str,
268
value: str,
269
**kwargs
270
) -> bool:
271
"""
272
Safely set a secret with error handling.
273
274
Returns:
275
bool: True if successful, False otherwise
276
"""
277
try:
278
self.client.set_secret(name, value, **kwargs)
279
logging.info(f"Successfully set secret '{name}'")
280
return True
281
282
except ClientAuthenticationError as e:
283
logging.error(f"Authentication failed setting secret '{name}': {e}")
284
return False
285
286
except HttpResponseError as e:
287
if e.status_code == 403:
288
logging.error(f"Access denied setting secret '{name}'")
289
elif e.status_code == 409:
290
logging.warning(f"Conflict setting secret '{name}' - may already exist")
291
else:
292
logging.error(f"HTTP error setting secret '{name}': {e}")
293
return False
294
295
except Exception as e:
296
logging.error(f"Unexpected error setting secret '{name}': {e}")
297
return False
298
299
def delete_secret_safely(self, name: str) -> bool:
300
"""
301
Safely delete a secret with error handling.
302
303
Returns:
304
bool: True if successful or already deleted, False on error
305
"""
306
try:
307
poller = self.client.begin_delete_secret(name)
308
deleted_secret = poller.result()
309
logging.info(f"Successfully deleted secret '{name}'")
310
return True
311
312
except ResourceNotFoundError:
313
logging.info(f"Secret '{name}' already deleted or doesn't exist")
314
return True # Consider this success
315
316
except ClientAuthenticationError as e:
317
logging.error(f"Authentication failed deleting secret '{name}': {e}")
318
return False
319
320
except HttpResponseError as e:
321
if e.status_code == 403:
322
logging.error(f"Access denied deleting secret '{name}'")
323
else:
324
logging.error(f"HTTP error deleting secret '{name}': {e}")
325
return False
326
327
except Exception as e:
328
logging.error(f"Unexpected error deleting secret '{name}': {e}")
329
return False
330
331
def list_secrets_safely(self) -> Dict[str, Dict[str, Any]]:
332
"""
333
Safely list all secrets with error handling.
334
335
Returns:
336
Dict mapping secret names to their properties
337
"""
338
secrets = {}
339
340
try:
341
for secret_props in self.client.list_properties_of_secrets():
342
secrets[secret_props.name] = {
343
'enabled': secret_props.enabled,
344
'created_on': secret_props.created_on,
345
'updated_on': secret_props.updated_on,
346
'expires_on': secret_props.expires_on,
347
'tags': secret_props.tags
348
}
349
350
except ClientAuthenticationError as e:
351
logging.error(f"Authentication failed listing secrets: {e}")
352
353
except HttpResponseError as e:
354
if e.status_code == 403:
355
logging.error("Access denied listing secrets")
356
else:
357
logging.error(f"HTTP error listing secrets: {e}")
358
359
except Exception as e:
360
logging.error(f"Unexpected error listing secrets: {e}")
361
362
return secrets
363
364
def __enter__(self):
365
return self
366
367
def __exit__(self, exc_type, exc_val, exc_tb):
368
try:
369
self.client.close()
370
except Exception as e:
371
logging.error(f"Error closing client: {e}")
372
```
373
374
### Async Error Handling
375
376
```python
377
import asyncio
378
from azure.keyvault.secrets.aio import SecretClient
379
from azure.identity.aio import DefaultAzureCredential
380
from azure.core.exceptions import *
381
382
async def safe_async_secret_operations():
383
"""
384
Example of comprehensive async error handling.
385
"""
386
credential = DefaultAzureCredential()
387
388
try:
389
async with SecretClient(
390
"https://vault.vault.azure.net/",
391
credential
392
) as client:
393
394
# Try to get a secret with error handling
395
try:
396
secret = await client.get_secret("my-secret")
397
print(f"Retrieved: {secret.name}")
398
399
except ResourceNotFoundError:
400
print("Secret not found, creating it...")
401
try:
402
new_secret = await client.set_secret("my-secret", "new-value")
403
print(f"Created: {new_secret.name}")
404
except HttpResponseError as e:
405
print(f"Failed to create secret: {e}")
406
407
except ClientAuthenticationError as e:
408
print(f"Authentication failed: {e}")
409
return
410
411
# List secrets with error handling
412
try:
413
secret_count = 0
414
async for secret_props in client.list_properties_of_secrets():
415
secret_count += 1
416
print(f"Found secret: {secret_props.name}")
417
418
print(f"Total secrets: {secret_count}")
419
420
except Exception as e:
421
print(f"Error listing secrets: {e}")
422
423
except Exception as e:
424
print(f"Failed to initialize client: {e}")
425
426
# Run the async example
427
# asyncio.run(safe_async_secret_operations())
428
```
429
430
### Custom Exception Handling
431
432
```python
433
from azure.core.exceptions import HttpResponseError
434
from typing import Optional
435
436
class KeyVaultSecretError(Exception):
437
"""Custom exception for Key Vault secret operations."""
438
pass
439
440
class SecretAccessDeniedError(KeyVaultSecretError):
441
"""Raised when access is denied to a secret."""
442
pass
443
444
class SecretExpiredError(KeyVaultSecretError):
445
"""Raised when attempting to use an expired secret."""
446
pass
447
448
def convert_azure_exception(func):
449
"""
450
Decorator to convert Azure exceptions to custom exceptions.
451
"""
452
def wrapper(*args, **kwargs):
453
try:
454
return func(*args, **kwargs)
455
except ResourceNotFoundError as e:
456
raise KeyVaultSecretError(f"Secret not found: {e}")
457
except ClientAuthenticationError as e:
458
raise KeyVaultSecretError(f"Authentication failed: {e}")
459
except HttpResponseError as e:
460
if e.status_code == 403:
461
raise SecretAccessDeniedError(f"Access denied: {e}")
462
else:
463
raise KeyVaultSecretError(f"HTTP error {e.status_code}: {e}")
464
except Exception as e:
465
raise KeyVaultSecretError(f"Unexpected error: {e}")
466
467
return wrapper
468
469
@convert_azure_exception
470
def get_secret_with_custom_exceptions(client: SecretClient, name: str) -> str:
471
"""Get secret with custom exception conversion."""
472
secret = client.get_secret(name)
473
474
# Check if secret is expired
475
if secret.properties.expires_on:
476
from datetime import datetime, timezone
477
if secret.properties.expires_on < datetime.now(timezone.utc):
478
raise SecretExpiredError(f"Secret '{name}' has expired")
479
480
return secret.value
481
```
482
483
## Required Imports
484
485
```python { .api }
486
from azure.core.exceptions import (
487
HttpResponseError,
488
ResourceNotFoundError,
489
ResourceExistsError,
490
ClientAuthenticationError,
491
ServiceRequestError
492
)
493
from typing import Optional, Dict, Any, Callable, TypeVar
494
import logging
495
import time
496
```