Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utility functions for escaping LDAP filter expressions and attribute values, system information functions, and comprehensive exception hierarchy for robust LDAP error handling.
Functions for properly escaping special characters in LDAP contexts according to RFC specifications.
from bonsai import escape_attribute_value, escape_filter_exp
def escape_attribute_value(attrval: str) -> str:
"""
Escape special characters in attribute values according to RFC 4514.
Escapes characters: \\ " + , ; < = > and leading/trailing spaces, # at start.
Parameters:
- attrval: Attribute value to escape
Returns:
Escaped attribute value safe for use in DN strings
Example:
escape_attribute_value('John, Jr.') -> 'John\\, Jr.'
escape_attribute_value(' Leading space') -> '\\ Leading space'
"""
def escape_filter_exp(filter_exp: str) -> str:
"""
Escape special characters in LDAP filter expressions according to RFC 4515.
Escapes characters: \\ * ( ) and null byte.
Parameters:
- filter_exp: Filter expression to escape
Returns:
Escaped filter expression safe for use in LDAP searches
Example:
escape_filter_exp('name*') -> 'name\\2A'
escape_filter_exp('(test)') -> '\\28test\\29'
"""Functions for querying LDAP library information and capabilities.
from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug
def get_vendor_info() -> Dict[str, str]:
"""
Get information about the underlying LDAP library.
Returns:
Dictionary with vendor information including:
- 'library': LDAP library name (e.g., 'OpenLDAP', 'Microsoft LDAP')
- 'version': Library version string
- 'api_version': LDAP API version
- 'protocol_version': Supported LDAP protocol version
"""
def get_tls_impl_name() -> str:
"""
Get name of the TLS implementation used by the LDAP library.
Returns:
TLS implementation name (e.g., 'OpenSSL', 'GnuTLS', 'SChannel')
"""
def has_krb5_support() -> bool:
"""
Check if the LDAP library was compiled with Kerberos support.
Returns:
True if Kerberos/GSSAPI authentication is supported, False otherwise
"""
def set_debug(level: int, file: Optional[str] = None) -> None:
"""
Set debug logging level for the underlying LDAP library.
Parameters:
- level: Debug level (0=off, higher values for more verbose logging)
- file: Optional file path for debug output (None for stderr)
"""
def set_connect_async(flag: bool) -> None:
"""
Set global flag for asynchronous connection behavior.
Parameters:
- flag: Whether to use async connections by default
"""
def escape_dn_chars(dn_str: str) -> str:
"""
Escape special characters in DN component values.
Parameters:
- dn_str: DN component string to escape
Returns:
Escaped DN component string
"""Comprehensive exception classes for handling all LDAP error conditions with proper error codes.
from bonsai import (
LDAPError, ConnectionError, AuthenticationError, AuthMethodNotSupported,
InvalidDN, InvalidMessageID, ClosedConnection, TimeoutError, ProtocolError,
NoSuchObjectError, NoSuchAttribute, InsufficientAccess, UnwillingToPerform,
NotAllowedOnNonleaf, ObjectClassViolation, AlreadyExists, TypeOrValueExists,
SizeLimitError, AffectsMultipleDSA, PasswordPolicyError, PasswordExpired,
AccountLocked, ChangeAfterReset, PasswordModNotAllowed, MustSupplyOldPassword,
InsufficientPasswordQuality, PasswordTooShort, PasswordTooYoung, PasswordInHistory
)
class LDAPError(Exception):
"""
Base class for all LDAP-related errors.
All LDAP errors include an error code that corresponds to standard LDAP result codes.
"""
@classmethod
def create(cls, code: int) -> Type["LDAPError"]:
"""
Create new LDAPError type with specific error code.
Parameters:
- code: LDAP result code
Returns:
LDAPError class with specified code
"""
@property
def code(self) -> int:
"""LDAP result code."""
@property
def hexcode(self) -> int:
"""Error code in hexadecimal format."""
# Connection and Authentication Errors
class ConnectionError(LDAPError):
"""Cannot connect to LDAP server."""
code = -1
class AuthenticationError(LDAPError):
"""Authentication failed."""
code = 0x31
class AuthMethodNotSupported(LDAPError):
"""Authentication method not supported."""
code = 0x07
class ClosedConnection(LDAPError):
"""Operation attempted on closed connection."""
code = -2
class TimeoutError(LDAPError):
"""Operation timed out."""
code = -3
# Data and Protocol Errors
class InvalidDN(LDAPError):
"""Invalid distinguished name format."""
code = 0x22
class InvalidMessageID(LDAPError):
"""Invalid LDAP message ID."""
code = -4
class ProtocolError(LDAPError):
"""LDAP protocol error."""
code = 0x02
class NoSuchObjectError(LDAPError):
"""Requested object does not exist."""
code = 0x20
class NoSuchAttribute(LDAPError):
"""Requested attribute does not exist."""
code = 0x10
# Permission and Access Errors
class InsufficientAccess(LDAPError):
"""Insufficient access rights for operation."""
code = 0x32
class UnwillingToPerform(LDAPError):
"""Server unwilling to perform operation."""
code = 0x35
class NotAllowedOnNonleaf(LDAPError):
"""Operation not allowed on non-leaf entry."""
code = 0x42
# Data Validation Errors
class ObjectClassViolation(LDAPError):
"""Operation violates object class rules."""
code = 0x41
class AlreadyExists(LDAPError):
"""Entry already exists."""
code = 0x44
class TypeOrValueExists(LDAPError):
"""Attribute type or value already exists."""
code = 0x14
class SizeLimitError(LDAPError):
"""Search size limit exceeded."""
code = 0x04
class AffectsMultipleDSA(LDAPError):
"""Operation affects multiple directory servers."""
code = 0x47
# Password Policy Errors
class PasswordPolicyError(LDAPError):
"""Base class for password policy violations."""
code = -100
class PasswordExpired(PasswordPolicyError):
"""Password has expired."""
code = -101
class AccountLocked(PasswordPolicyError):
"""Account is locked."""
code = -102
class ChangeAfterReset(PasswordPolicyError):
"""Must change password after reset."""
code = -103
class PasswordModNotAllowed(PasswordPolicyError):
"""Password modification not allowed."""
code = -104
class MustSupplyOldPassword(PasswordPolicyError):
"""Must supply old password to change."""
code = -105
class InsufficientPasswordQuality(PasswordPolicyError):
"""Password does not meet quality requirements."""
code = -106
class PasswordTooShort(PasswordPolicyError):
"""Password is too short."""
code = -107
class PasswordTooYoung(PasswordPolicyError):
"""Password is too young to change."""
code = -108
class PasswordInHistory(PasswordPolicyError):
"""Password is in history."""
code = -109
# Additional LDAP Errors
class InvalidSyntax(LDAPError):
"""Invalid attribute syntax."""
code = 0x15
class UndefinedAttributeType(LDAPError):
"""Undefined attribute type."""
code = 0x11
class InappropriateMatching(LDAPError):
"""Inappropriate matching rule."""
code = 0x12
class ConstraintViolation(LDAPError):
"""Constraint violation."""
code = 0x13
class InvalidAttributeValue(LDAPError):
"""Invalid attribute value."""
code = 0x16
class NamingViolation(LDAPError):
"""Naming violation."""
code = 0x40
# Administrative/Referral Errors
class Referral(LDAPError):
"""LDAP referral."""
code = 0x0A
class AdminLimitExceeded(LDAPError):
"""Administrative limit exceeded."""
code = 0x0B
class UnavailableCriticalExtension(LDAPError):
"""Critical extension unavailable."""
code = 0x0C
class ConfidentialityRequired(LDAPError):
"""Confidentiality required."""
code = 0x0D
class SaslBindInProgress(LDAPError):
"""SASL bind in progress."""
code = 0x0E
# Operation Errors
class CompareFalse(LDAPError):
"""Compare returned False."""
code = 0x05
class CompareTrue(LDAPError):
"""Compare returned True."""
code = 0x06
class StrongAuthRequired(LDAPError):
"""Strong authentication required."""
code = 0x08
class PartialResults(LDAPError):
"""Partial results returned."""
code = 0x09
class LoopDetect(LDAPError):
"""Loop detected."""
code = 0x36
class SortControlMissing(LDAPError):
"""Sort control missing."""
code = 0x3C
class OffsetRangeError(LDAPError):
"""Virtual list view offset range error."""
code = 0x3D
class Other(LDAPError):
"""Other LDAP error."""
code = 0x50
class ServerDown(LDAPError):
"""LDAP server is down."""
code = 0x51
class LocalError(LDAPError):
"""Local error occurred."""
code = 0x52
class EncodingError(LDAPError):
"""Encoding error."""
code = 0x53
class DecodingError(LDAPError):
"""Decoding error."""
code = 0x54
class Timeout(LDAPError):
"""Operation timed out."""
code = 0x55
class AuthUnknown(LDAPError):
"""Unknown authentication method."""
code = 0x56
class FilterError(LDAPError):
"""Invalid filter."""
code = 0x57
class UserCancelled(LDAPError):
"""User cancelled operation."""
code = 0x58
class ParamError(LDAPError):
"""Parameter error."""
code = 0x59
class NoMemory(LDAPError):
"""Out of memory."""
code = 0x5A
class ConnectError(LDAPError):
"""Connection error."""
code = 0x5B
class NotSupported(LDAPError):
"""Operation not supported."""
code = 0x5C
class ControlNotFound(LDAPError):
"""Control not found."""
code = 0x5D
class NoResultsReturned(LDAPError):
"""No results returned."""
code = 0x5E
class MoreResultsToReturn(LDAPError):
"""More results available."""
code = 0x5F
class ClientLoop(LDAPError):
"""Client loop detected."""
code = 0x60
class ReferralLimitExceeded(LDAPError):
"""Referral limit exceeded."""
code = 0x61from bonsai import escape_attribute_value, escape_filter_exp, LDAPClient, LDAPDN
# Escape attribute values when constructing DNs
user_input = "Smith, Jr." # User-provided input with special chars
safe_value = escape_attribute_value(user_input)
dn = f"cn={safe_value},ou=people,dc=example,dc=com"
print(f"Safe DN: {dn}") # cn=Smith\\, Jr.,ou=people,dc=example,dc=com
# Use with LDAPDN class
escaped_dn = LDAPDN(f"cn={safe_value},ou=people,dc=example,dc=com")
# Escape filter expressions to prevent LDAP injection
user_search = "admin*" # Potentially dangerous input
safe_filter = escape_filter_exp(user_search)
filter_exp = f"(cn={safe_filter})"
print(f"Safe filter: {filter_exp}") # (cn=admin\\2A)
# Examples of various escapings
examples = [
'John, Jr.', # Comma needs escaping
' Leading space', # Leading space needs escaping
'Trailing space ', # Trailing space needs escaping
'#hashtag', # Leading # needs escaping
'equals=sign', # Equals sign needs escaping
'quotes"here', # Quotes need escaping
'backslash\\here', # Backslashes need escaping
'less<than>greater', # Angle brackets need escaping
]
for example in examples:
escaped = escape_attribute_value(example)
print(f"'{example}' -> '{escaped}'")from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug
# Get LDAP library information
vendor_info = get_vendor_info()
print("LDAP Library Information:")
for key, value in vendor_info.items():
print(f" {key}: {value}")
# Check TLS implementation
tls_impl = get_tls_impl_name()
print(f"TLS Implementation: {tls_impl}")
# Check Kerberos support
if has_krb5_support():
print("Kerberos/GSSAPI authentication is supported")
else:
print("Kerberos/GSSAPI authentication is not available")
# Enable debug logging for troubleshooting
set_debug(7) # High debug level for detailed logging
# Example output might be:
# LDAP Library Information:
# library: OpenLDAP
# version: 2.5.13
# api_version: 3001
# protocol_version: 3
# TLS Implementation: OpenSSL
# Kerberos/GSSAPI authentication is supportedfrom bonsai import (
LDAPClient, LDAPError, ConnectionError, AuthenticationError,
NoSuchObjectError, InsufficientAccess, AlreadyExists, InvalidDN,
PasswordExpired, AccountLocked, TimeoutError
)
def robust_ldap_operation(client, operation_type, **kwargs):
"""Demonstrate comprehensive LDAP error handling."""
try:
with client.connect() as conn:
if operation_type == 'search':
return conn.search(kwargs['base'], kwargs['scope'], kwargs.get('filter'))
elif operation_type == 'add':
return conn.add(kwargs['entry'])
elif operation_type == 'modify':
return conn.modify(kwargs['entry'])
elif operation_type == 'delete':
return conn.delete(kwargs['dn'])
except ConnectionError as e:
print(f"Connection failed: {e}")
print(f"Error code: {e.code} (0x{e.hexcode:04X})")
# Could retry with different server
return None
except AuthenticationError as e:
print(f"Authentication failed: {e}")
print("Check username/password or authentication method")
return None
except InvalidDN as e:
print(f"Invalid DN format: {e}")
print("Check DN syntax and escaping")
return None
except NoSuchObjectError as e:
print(f"Object not found: {e}")
print("Entry does not exist in directory")
return None
except InsufficientAccess as e:
print(f"Access denied: {e}")
print("User lacks permissions for this operation")
return None
except AlreadyExists as e:
print(f"Entry already exists: {e}")
print("Cannot add duplicate entry")
return None
except TimeoutError as e:
print(f"Operation timed out: {e}")
print("Server may be overloaded or network is slow")
return None
except PasswordExpired as e:
print(f"Password expired: {e}")
print("User must change password before continuing")
return None
except AccountLocked as e:
print(f"Account locked: {e}")
print("Account is locked due to security policy")
return None
except LDAPError as e:
# Catch-all for other LDAP errors
print(f"LDAP error: {e}")
print(f"Error code: {e.code} (0x{e.hexcode:04X})")
return None
# Usage examples
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
# Search with error handling
results = robust_ldap_operation(
client,
'search',
base="dc=example,dc=com",
scope=2,
filter="(objectClass=person)"
)
if results is not None:
print(f"Search completed successfully: {len(results)} entries found")
else:
print("Search failed due to error")from bonsai import LDAPClient, LDAPError
import logging
import time
class LDAPConnectionManager:
"""Example of advanced error handling and retry logic."""
def __init__(self, client, max_retries=3, retry_delay=1.0):
self.client = client
self.max_retries = max_retries
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
def execute_with_retry(self, operation_func, *args, **kwargs):
"""Execute LDAP operation with automatic retry on certain errors."""
last_error = None
for attempt in range(self.max_retries + 1):
try:
return operation_func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_error = e
if attempt < self.max_retries:
self.logger.warning(
f"Attempt {attempt + 1} failed: {e}. Retrying in {self.retry_delay}s..."
)
time.sleep(self.retry_delay)
continue
else:
self.logger.error(f"All {self.max_retries + 1} attempts failed")
break
except LDAPError as e:
# Don't retry on authentication, permission, or data errors
self.logger.error(f"Non-retryable error: {e}")
last_error = e
break
# Re-raise the last error if all retries failed
if last_error:
raise last_error
def safe_search(self, base, scope, filter_exp=None, **kwargs):
"""Search with comprehensive error handling."""
def search_operation():
with self.client.connect() as conn:
return conn.search(base, scope, filter_exp, **kwargs)
try:
return self.execute_with_retry(search_operation)
except LDAPError as e:
self.logger.error(f"Search failed permanently: {e}")
return []
# Usage
client = LDAPClient("ldap://unreliable-server.com")
client.set_credentials("SIMPLE", user="admin", password="secret")
manager = LDAPConnectionManager(client, max_retries=3, retry_delay=2.0)
# This will automatically retry on connection/timeout errors
results = manager.safe_search("dc=example,dc=com", 2, "(objectClass=person)")
print(f"Found {len(results)} entries")Install with Tessl CLI
npx tessl i tessl/pypi-bonsai