0
# Utilities and Error Handling
1
2
Utility functions for escaping LDAP filter expressions and attribute values, system information functions, and comprehensive exception hierarchy for robust LDAP error handling.
3
4
## Capabilities
5
6
### String Escaping Utilities
7
8
Functions for properly escaping special characters in LDAP contexts according to RFC specifications.
9
10
```python { .api }
11
from bonsai import escape_attribute_value, escape_filter_exp
12
13
def escape_attribute_value(attrval: str) -> str:
14
"""
15
Escape special characters in attribute values according to RFC 4514.
16
17
Escapes characters: \\ " + , ; < = > and leading/trailing spaces, # at start.
18
19
Parameters:
20
- attrval: Attribute value to escape
21
22
Returns:
23
Escaped attribute value safe for use in DN strings
24
25
Example:
26
escape_attribute_value('John, Jr.') -> 'John\\, Jr.'
27
escape_attribute_value(' Leading space') -> '\\ Leading space'
28
"""
29
30
def escape_filter_exp(filter_exp: str) -> str:
31
"""
32
Escape special characters in LDAP filter expressions according to RFC 4515.
33
34
Escapes characters: \\ * ( ) and null byte.
35
36
Parameters:
37
- filter_exp: Filter expression to escape
38
39
Returns:
40
Escaped filter expression safe for use in LDAP searches
41
42
Example:
43
escape_filter_exp('name*') -> 'name\\2A'
44
escape_filter_exp('(test)') -> '\\28test\\29'
45
"""
46
```
47
48
### System Information Functions
49
50
Functions for querying LDAP library information and capabilities.
51
52
```python { .api }
53
from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug
54
55
def get_vendor_info() -> Dict[str, str]:
56
"""
57
Get information about the underlying LDAP library.
58
59
Returns:
60
Dictionary with vendor information including:
61
- 'library': LDAP library name (e.g., 'OpenLDAP', 'Microsoft LDAP')
62
- 'version': Library version string
63
- 'api_version': LDAP API version
64
- 'protocol_version': Supported LDAP protocol version
65
"""
66
67
def get_tls_impl_name() -> str:
68
"""
69
Get name of the TLS implementation used by the LDAP library.
70
71
Returns:
72
TLS implementation name (e.g., 'OpenSSL', 'GnuTLS', 'SChannel')
73
"""
74
75
def has_krb5_support() -> bool:
76
"""
77
Check if the LDAP library was compiled with Kerberos support.
78
79
Returns:
80
True if Kerberos/GSSAPI authentication is supported, False otherwise
81
"""
82
83
def set_debug(level: int, file: Optional[str] = None) -> None:
84
"""
85
Set debug logging level for the underlying LDAP library.
86
87
Parameters:
88
- level: Debug level (0=off, higher values for more verbose logging)
89
- file: Optional file path for debug output (None for stderr)
90
"""
91
92
def set_connect_async(flag: bool) -> None:
93
"""
94
Set global flag for asynchronous connection behavior.
95
96
Parameters:
97
- flag: Whether to use async connections by default
98
"""
99
100
def escape_dn_chars(dn_str: str) -> str:
101
"""
102
Escape special characters in DN component values.
103
104
Parameters:
105
- dn_str: DN component string to escape
106
107
Returns:
108
Escaped DN component string
109
"""
110
```
111
112
### LDAP Exception Hierarchy
113
114
Comprehensive exception classes for handling all LDAP error conditions with proper error codes.
115
116
```python { .api }
117
from bonsai import (
118
LDAPError, ConnectionError, AuthenticationError, AuthMethodNotSupported,
119
InvalidDN, InvalidMessageID, ClosedConnection, TimeoutError, ProtocolError,
120
NoSuchObjectError, NoSuchAttribute, InsufficientAccess, UnwillingToPerform,
121
NotAllowedOnNonleaf, ObjectClassViolation, AlreadyExists, TypeOrValueExists,
122
SizeLimitError, AffectsMultipleDSA, PasswordPolicyError, PasswordExpired,
123
AccountLocked, ChangeAfterReset, PasswordModNotAllowed, MustSupplyOldPassword,
124
InsufficientPasswordQuality, PasswordTooShort, PasswordTooYoung, PasswordInHistory
125
)
126
127
class LDAPError(Exception):
128
"""
129
Base class for all LDAP-related errors.
130
131
All LDAP errors include an error code that corresponds to standard LDAP result codes.
132
"""
133
134
@classmethod
135
def create(cls, code: int) -> Type["LDAPError"]:
136
"""
137
Create new LDAPError type with specific error code.
138
139
Parameters:
140
- code: LDAP result code
141
142
Returns:
143
LDAPError class with specified code
144
"""
145
146
@property
147
def code(self) -> int:
148
"""LDAP result code."""
149
150
@property
151
def hexcode(self) -> int:
152
"""Error code in hexadecimal format."""
153
154
# Connection and Authentication Errors
155
class ConnectionError(LDAPError):
156
"""Cannot connect to LDAP server."""
157
code = -1
158
159
class AuthenticationError(LDAPError):
160
"""Authentication failed."""
161
code = 0x31
162
163
class AuthMethodNotSupported(LDAPError):
164
"""Authentication method not supported."""
165
code = 0x07
166
167
class ClosedConnection(LDAPError):
168
"""Operation attempted on closed connection."""
169
code = -2
170
171
class TimeoutError(LDAPError):
172
"""Operation timed out."""
173
code = -3
174
175
# Data and Protocol Errors
176
class InvalidDN(LDAPError):
177
"""Invalid distinguished name format."""
178
code = 0x22
179
180
class InvalidMessageID(LDAPError):
181
"""Invalid LDAP message ID."""
182
code = -4
183
184
class ProtocolError(LDAPError):
185
"""LDAP protocol error."""
186
code = 0x02
187
188
class NoSuchObjectError(LDAPError):
189
"""Requested object does not exist."""
190
code = 0x20
191
192
class NoSuchAttribute(LDAPError):
193
"""Requested attribute does not exist."""
194
code = 0x10
195
196
# Permission and Access Errors
197
class InsufficientAccess(LDAPError):
198
"""Insufficient access rights for operation."""
199
code = 0x32
200
201
class UnwillingToPerform(LDAPError):
202
"""Server unwilling to perform operation."""
203
code = 0x35
204
205
class NotAllowedOnNonleaf(LDAPError):
206
"""Operation not allowed on non-leaf entry."""
207
code = 0x42
208
209
# Data Validation Errors
210
class ObjectClassViolation(LDAPError):
211
"""Operation violates object class rules."""
212
code = 0x41
213
214
class AlreadyExists(LDAPError):
215
"""Entry already exists."""
216
code = 0x44
217
218
class TypeOrValueExists(LDAPError):
219
"""Attribute type or value already exists."""
220
code = 0x14
221
222
class SizeLimitError(LDAPError):
223
"""Search size limit exceeded."""
224
code = 0x04
225
226
class AffectsMultipleDSA(LDAPError):
227
"""Operation affects multiple directory servers."""
228
code = 0x47
229
230
# Password Policy Errors
231
class PasswordPolicyError(LDAPError):
232
"""Base class for password policy violations."""
233
code = -100
234
235
class PasswordExpired(PasswordPolicyError):
236
"""Password has expired."""
237
code = -101
238
239
class AccountLocked(PasswordPolicyError):
240
"""Account is locked."""
241
code = -102
242
243
class ChangeAfterReset(PasswordPolicyError):
244
"""Must change password after reset."""
245
code = -103
246
247
class PasswordModNotAllowed(PasswordPolicyError):
248
"""Password modification not allowed."""
249
code = -104
250
251
class MustSupplyOldPassword(PasswordPolicyError):
252
"""Must supply old password to change."""
253
code = -105
254
255
class InsufficientPasswordQuality(PasswordPolicyError):
256
"""Password does not meet quality requirements."""
257
code = -106
258
259
class PasswordTooShort(PasswordPolicyError):
260
"""Password is too short."""
261
code = -107
262
263
class PasswordTooYoung(PasswordPolicyError):
264
"""Password is too young to change."""
265
code = -108
266
267
class PasswordInHistory(PasswordPolicyError):
268
"""Password is in history."""
269
code = -109
270
271
# Additional LDAP Errors
272
class InvalidSyntax(LDAPError):
273
"""Invalid attribute syntax."""
274
code = 0x15
275
276
class UndefinedAttributeType(LDAPError):
277
"""Undefined attribute type."""
278
code = 0x11
279
280
class InappropriateMatching(LDAPError):
281
"""Inappropriate matching rule."""
282
code = 0x12
283
284
class ConstraintViolation(LDAPError):
285
"""Constraint violation."""
286
code = 0x13
287
288
class InvalidAttributeValue(LDAPError):
289
"""Invalid attribute value."""
290
code = 0x16
291
292
class NamingViolation(LDAPError):
293
"""Naming violation."""
294
code = 0x40
295
296
# Administrative/Referral Errors
297
class Referral(LDAPError):
298
"""LDAP referral."""
299
code = 0x0A
300
301
class AdminLimitExceeded(LDAPError):
302
"""Administrative limit exceeded."""
303
code = 0x0B
304
305
class UnavailableCriticalExtension(LDAPError):
306
"""Critical extension unavailable."""
307
code = 0x0C
308
309
class ConfidentialityRequired(LDAPError):
310
"""Confidentiality required."""
311
code = 0x0D
312
313
class SaslBindInProgress(LDAPError):
314
"""SASL bind in progress."""
315
code = 0x0E
316
317
# Operation Errors
318
class CompareFalse(LDAPError):
319
"""Compare returned False."""
320
code = 0x05
321
322
class CompareTrue(LDAPError):
323
"""Compare returned True."""
324
code = 0x06
325
326
class StrongAuthRequired(LDAPError):
327
"""Strong authentication required."""
328
code = 0x08
329
330
class PartialResults(LDAPError):
331
"""Partial results returned."""
332
code = 0x09
333
334
class LoopDetect(LDAPError):
335
"""Loop detected."""
336
code = 0x36
337
338
class SortControlMissing(LDAPError):
339
"""Sort control missing."""
340
code = 0x3C
341
342
class OffsetRangeError(LDAPError):
343
"""Virtual list view offset range error."""
344
code = 0x3D
345
346
class Other(LDAPError):
347
"""Other LDAP error."""
348
code = 0x50
349
350
class ServerDown(LDAPError):
351
"""LDAP server is down."""
352
code = 0x51
353
354
class LocalError(LDAPError):
355
"""Local error occurred."""
356
code = 0x52
357
358
class EncodingError(LDAPError):
359
"""Encoding error."""
360
code = 0x53
361
362
class DecodingError(LDAPError):
363
"""Decoding error."""
364
code = 0x54
365
366
class Timeout(LDAPError):
367
"""Operation timed out."""
368
code = 0x55
369
370
class AuthUnknown(LDAPError):
371
"""Unknown authentication method."""
372
code = 0x56
373
374
class FilterError(LDAPError):
375
"""Invalid filter."""
376
code = 0x57
377
378
class UserCancelled(LDAPError):
379
"""User cancelled operation."""
380
code = 0x58
381
382
class ParamError(LDAPError):
383
"""Parameter error."""
384
code = 0x59
385
386
class NoMemory(LDAPError):
387
"""Out of memory."""
388
code = 0x5A
389
390
class ConnectError(LDAPError):
391
"""Connection error."""
392
code = 0x5B
393
394
class NotSupported(LDAPError):
395
"""Operation not supported."""
396
code = 0x5C
397
398
class ControlNotFound(LDAPError):
399
"""Control not found."""
400
code = 0x5D
401
402
class NoResultsReturned(LDAPError):
403
"""No results returned."""
404
code = 0x5E
405
406
class MoreResultsToReturn(LDAPError):
407
"""More results available."""
408
code = 0x5F
409
410
class ClientLoop(LDAPError):
411
"""Client loop detected."""
412
code = 0x60
413
414
class ReferralLimitExceeded(LDAPError):
415
"""Referral limit exceeded."""
416
code = 0x61
417
```
418
419
## Usage Examples
420
421
### String Escaping for Security
422
423
```python
424
from bonsai import escape_attribute_value, escape_filter_exp, LDAPClient, LDAPDN
425
426
# Escape attribute values when constructing DNs
427
user_input = "Smith, Jr." # User-provided input with special chars
428
safe_value = escape_attribute_value(user_input)
429
dn = f"cn={safe_value},ou=people,dc=example,dc=com"
430
print(f"Safe DN: {dn}") # cn=Smith\\, Jr.,ou=people,dc=example,dc=com
431
432
# Use with LDAPDN class
433
escaped_dn = LDAPDN(f"cn={safe_value},ou=people,dc=example,dc=com")
434
435
# Escape filter expressions to prevent LDAP injection
436
user_search = "admin*" # Potentially dangerous input
437
safe_filter = escape_filter_exp(user_search)
438
filter_exp = f"(cn={safe_filter})"
439
print(f"Safe filter: {filter_exp}") # (cn=admin\\2A)
440
441
# Examples of various escapings
442
examples = [
443
'John, Jr.', # Comma needs escaping
444
' Leading space', # Leading space needs escaping
445
'Trailing space ', # Trailing space needs escaping
446
'#hashtag', # Leading # needs escaping
447
'equals=sign', # Equals sign needs escaping
448
'quotes"here', # Quotes need escaping
449
'backslash\\here', # Backslashes need escaping
450
'less<than>greater', # Angle brackets need escaping
451
]
452
453
for example in examples:
454
escaped = escape_attribute_value(example)
455
print(f"'{example}' -> '{escaped}'")
456
```
457
458
### System Information and Configuration
459
460
```python
461
from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug
462
463
# Get LDAP library information
464
vendor_info = get_vendor_info()
465
print("LDAP Library Information:")
466
for key, value in vendor_info.items():
467
print(f" {key}: {value}")
468
469
# Check TLS implementation
470
tls_impl = get_tls_impl_name()
471
print(f"TLS Implementation: {tls_impl}")
472
473
# Check Kerberos support
474
if has_krb5_support():
475
print("Kerberos/GSSAPI authentication is supported")
476
else:
477
print("Kerberos/GSSAPI authentication is not available")
478
479
# Enable debug logging for troubleshooting
480
set_debug(7) # High debug level for detailed logging
481
482
# Example output might be:
483
# LDAP Library Information:
484
# library: OpenLDAP
485
# version: 2.5.13
486
# api_version: 3001
487
# protocol_version: 3
488
# TLS Implementation: OpenSSL
489
# Kerberos/GSSAPI authentication is supported
490
```
491
492
### Comprehensive Error Handling
493
494
```python
495
from bonsai import (
496
LDAPClient, LDAPError, ConnectionError, AuthenticationError,
497
NoSuchObjectError, InsufficientAccess, AlreadyExists, InvalidDN,
498
PasswordExpired, AccountLocked, TimeoutError
499
)
500
501
def robust_ldap_operation(client, operation_type, **kwargs):
502
"""Demonstrate comprehensive LDAP error handling."""
503
try:
504
with client.connect() as conn:
505
if operation_type == 'search':
506
return conn.search(kwargs['base'], kwargs['scope'], kwargs.get('filter'))
507
elif operation_type == 'add':
508
return conn.add(kwargs['entry'])
509
elif operation_type == 'modify':
510
return conn.modify(kwargs['entry'])
511
elif operation_type == 'delete':
512
return conn.delete(kwargs['dn'])
513
514
except ConnectionError as e:
515
print(f"Connection failed: {e}")
516
print(f"Error code: {e.code} (0x{e.hexcode:04X})")
517
# Could retry with different server
518
return None
519
520
except AuthenticationError as e:
521
print(f"Authentication failed: {e}")
522
print("Check username/password or authentication method")
523
return None
524
525
except InvalidDN as e:
526
print(f"Invalid DN format: {e}")
527
print("Check DN syntax and escaping")
528
return None
529
530
except NoSuchObjectError as e:
531
print(f"Object not found: {e}")
532
print("Entry does not exist in directory")
533
return None
534
535
except InsufficientAccess as e:
536
print(f"Access denied: {e}")
537
print("User lacks permissions for this operation")
538
return None
539
540
except AlreadyExists as e:
541
print(f"Entry already exists: {e}")
542
print("Cannot add duplicate entry")
543
return None
544
545
except TimeoutError as e:
546
print(f"Operation timed out: {e}")
547
print("Server may be overloaded or network is slow")
548
return None
549
550
except PasswordExpired as e:
551
print(f"Password expired: {e}")
552
print("User must change password before continuing")
553
return None
554
555
except AccountLocked as e:
556
print(f"Account locked: {e}")
557
print("Account is locked due to security policy")
558
return None
559
560
except LDAPError as e:
561
# Catch-all for other LDAP errors
562
print(f"LDAP error: {e}")
563
print(f"Error code: {e.code} (0x{e.hexcode:04X})")
564
return None
565
566
# Usage examples
567
client = LDAPClient("ldap://localhost")
568
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
569
570
# Search with error handling
571
results = robust_ldap_operation(
572
client,
573
'search',
574
base="dc=example,dc=com",
575
scope=2,
576
filter="(objectClass=person)"
577
)
578
579
if results is not None:
580
print(f"Search completed successfully: {len(results)} entries found")
581
else:
582
print("Search failed due to error")
583
```
584
585
### Custom Error Handling Strategies
586
587
```python
588
from bonsai import LDAPClient, LDAPError
589
import logging
590
import time
591
592
class LDAPConnectionManager:
593
"""Example of advanced error handling and retry logic."""
594
595
def __init__(self, client, max_retries=3, retry_delay=1.0):
596
self.client = client
597
self.max_retries = max_retries
598
self.retry_delay = retry_delay
599
self.logger = logging.getLogger(__name__)
600
601
def execute_with_retry(self, operation_func, *args, **kwargs):
602
"""Execute LDAP operation with automatic retry on certain errors."""
603
last_error = None
604
605
for attempt in range(self.max_retries + 1):
606
try:
607
return operation_func(*args, **kwargs)
608
609
except (ConnectionError, TimeoutError) as e:
610
last_error = e
611
if attempt < self.max_retries:
612
self.logger.warning(
613
f"Attempt {attempt + 1} failed: {e}. Retrying in {self.retry_delay}s..."
614
)
615
time.sleep(self.retry_delay)
616
continue
617
else:
618
self.logger.error(f"All {self.max_retries + 1} attempts failed")
619
break
620
621
except LDAPError as e:
622
# Don't retry on authentication, permission, or data errors
623
self.logger.error(f"Non-retryable error: {e}")
624
last_error = e
625
break
626
627
# Re-raise the last error if all retries failed
628
if last_error:
629
raise last_error
630
631
def safe_search(self, base, scope, filter_exp=None, **kwargs):
632
"""Search with comprehensive error handling."""
633
def search_operation():
634
with self.client.connect() as conn:
635
return conn.search(base, scope, filter_exp, **kwargs)
636
637
try:
638
return self.execute_with_retry(search_operation)
639
except LDAPError as e:
640
self.logger.error(f"Search failed permanently: {e}")
641
return []
642
643
# Usage
644
client = LDAPClient("ldap://unreliable-server.com")
645
client.set_credentials("SIMPLE", user="admin", password="secret")
646
647
manager = LDAPConnectionManager(client, max_retries=3, retry_delay=2.0)
648
649
# This will automatically retry on connection/timeout errors
650
results = manager.safe_search("dc=example,dc=com", 2, "(objectClass=person)")
651
print(f"Found {len(results)} entries")
652
```