CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bonsai

Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utilities-errors.mddocs/

Utilities and Error Handling

Utility functions for escaping LDAP filter expressions and attribute values, system information functions, and comprehensive exception hierarchy for robust LDAP error handling.

Capabilities

String Escaping Utilities

Functions for properly escaping special characters in LDAP contexts according to RFC specifications.

from bonsai import escape_attribute_value, escape_filter_exp

def escape_attribute_value(attrval: str) -> str:
    """
    Escape special characters in attribute values according to RFC 4514.
    
    Escapes characters: \\ " + , ; < = > and leading/trailing spaces, # at start.
    
    Parameters:
    - attrval: Attribute value to escape
    
    Returns:
    Escaped attribute value safe for use in DN strings
    
    Example:
    escape_attribute_value('John, Jr.') -> 'John\\, Jr.'
    escape_attribute_value(' Leading space') -> '\\ Leading space'
    """

def escape_filter_exp(filter_exp: str) -> str:
    """
    Escape special characters in LDAP filter expressions according to RFC 4515.
    
    Escapes characters: \\ * ( ) and null byte.
    
    Parameters:
    - filter_exp: Filter expression to escape
    
    Returns:
    Escaped filter expression safe for use in LDAP searches
    
    Example:
    escape_filter_exp('name*') -> 'name\\2A'
    escape_filter_exp('(test)') -> '\\28test\\29'
    """

System Information Functions

Functions for querying LDAP library information and capabilities.

from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug

def get_vendor_info() -> Dict[str, str]:
    """
    Get information about the underlying LDAP library.
    
    Returns:
    Dictionary with vendor information including:
    - 'library': LDAP library name (e.g., 'OpenLDAP', 'Microsoft LDAP')
    - 'version': Library version string
    - 'api_version': LDAP API version
    - 'protocol_version': Supported LDAP protocol version
    """

def get_tls_impl_name() -> str:
    """
    Get name of the TLS implementation used by the LDAP library.
    
    Returns:
    TLS implementation name (e.g., 'OpenSSL', 'GnuTLS', 'SChannel')
    """

def has_krb5_support() -> bool:
    """
    Check if the LDAP library was compiled with Kerberos support.
    
    Returns:
    True if Kerberos/GSSAPI authentication is supported, False otherwise
    """

def set_debug(level: int, file: Optional[str] = None) -> None:
    """
    Set debug logging level for the underlying LDAP library.
    
    Parameters:
    - level: Debug level (0=off, higher values for more verbose logging)
    - file: Optional file path for debug output (None for stderr)
    """

def set_connect_async(flag: bool) -> None:
    """
    Set global flag for asynchronous connection behavior.
    
    Parameters:
    - flag: Whether to use async connections by default
    """

def escape_dn_chars(dn_str: str) -> str:
    """
    Escape special characters in DN component values.
    
    Parameters:
    - dn_str: DN component string to escape
    
    Returns:
    Escaped DN component string
    """

LDAP Exception Hierarchy

Comprehensive exception classes for handling all LDAP error conditions with proper error codes.

from bonsai import (
    LDAPError, ConnectionError, AuthenticationError, AuthMethodNotSupported,
    InvalidDN, InvalidMessageID, ClosedConnection, TimeoutError, ProtocolError,
    NoSuchObjectError, NoSuchAttribute, InsufficientAccess, UnwillingToPerform,
    NotAllowedOnNonleaf, ObjectClassViolation, AlreadyExists, TypeOrValueExists,
    SizeLimitError, AffectsMultipleDSA, PasswordPolicyError, PasswordExpired,
    AccountLocked, ChangeAfterReset, PasswordModNotAllowed, MustSupplyOldPassword,
    InsufficientPasswordQuality, PasswordTooShort, PasswordTooYoung, PasswordInHistory
)

class LDAPError(Exception):
    """
    Base class for all LDAP-related errors.
    
    All LDAP errors include an error code that corresponds to standard LDAP result codes.
    """

    @classmethod
    def create(cls, code: int) -> Type["LDAPError"]:
        """
        Create new LDAPError type with specific error code.
        
        Parameters:
        - code: LDAP result code
        
        Returns:
        LDAPError class with specified code
        """

    @property
    def code(self) -> int:
        """LDAP result code."""

    @property
    def hexcode(self) -> int:
        """Error code in hexadecimal format."""

# Connection and Authentication Errors
class ConnectionError(LDAPError):
    """Cannot connect to LDAP server."""
    code = -1

class AuthenticationError(LDAPError):
    """Authentication failed."""
    code = 0x31

class AuthMethodNotSupported(LDAPError):
    """Authentication method not supported."""
    code = 0x07

class ClosedConnection(LDAPError):
    """Operation attempted on closed connection."""
    code = -2

class TimeoutError(LDAPError):
    """Operation timed out."""
    code = -3

# Data and Protocol Errors
class InvalidDN(LDAPError):
    """Invalid distinguished name format."""
    code = 0x22

class InvalidMessageID(LDAPError):
    """Invalid LDAP message ID."""
    code = -4

class ProtocolError(LDAPError):
    """LDAP protocol error."""
    code = 0x02

class NoSuchObjectError(LDAPError):
    """Requested object does not exist."""
    code = 0x20

class NoSuchAttribute(LDAPError):
    """Requested attribute does not exist."""
    code = 0x10

# Permission and Access Errors  
class InsufficientAccess(LDAPError):
    """Insufficient access rights for operation."""
    code = 0x32

class UnwillingToPerform(LDAPError):
    """Server unwilling to perform operation."""
    code = 0x35

class NotAllowedOnNonleaf(LDAPError):
    """Operation not allowed on non-leaf entry."""
    code = 0x42

# Data Validation Errors
class ObjectClassViolation(LDAPError):
    """Operation violates object class rules."""
    code = 0x41

class AlreadyExists(LDAPError):
    """Entry already exists."""
    code = 0x44

class TypeOrValueExists(LDAPError):
    """Attribute type or value already exists."""
    code = 0x14

class SizeLimitError(LDAPError):
    """Search size limit exceeded."""
    code = 0x04

class AffectsMultipleDSA(LDAPError):
    """Operation affects multiple directory servers."""
    code = 0x47

# Password Policy Errors
class PasswordPolicyError(LDAPError):
    """Base class for password policy violations."""
    code = -100

class PasswordExpired(PasswordPolicyError):
    """Password has expired."""
    code = -101

class AccountLocked(PasswordPolicyError):
    """Account is locked."""
    code = -102

class ChangeAfterReset(PasswordPolicyError):
    """Must change password after reset."""
    code = -103

class PasswordModNotAllowed(PasswordPolicyError):
    """Password modification not allowed."""
    code = -104

class MustSupplyOldPassword(PasswordPolicyError):
    """Must supply old password to change."""
    code = -105

class InsufficientPasswordQuality(PasswordPolicyError):
    """Password does not meet quality requirements."""
    code = -106

class PasswordTooShort(PasswordPolicyError):
    """Password is too short."""
    code = -107

class PasswordTooYoung(PasswordPolicyError):
    """Password is too young to change."""
    code = -108

class PasswordInHistory(PasswordPolicyError):
    """Password is in history."""
    code = -109

# Additional LDAP Errors
class InvalidSyntax(LDAPError):
    """Invalid attribute syntax."""
    code = 0x15

class UndefinedAttributeType(LDAPError):
    """Undefined attribute type."""
    code = 0x11

class InappropriateMatching(LDAPError):
    """Inappropriate matching rule."""
    code = 0x12

class ConstraintViolation(LDAPError):
    """Constraint violation."""
    code = 0x13

class InvalidAttributeValue(LDAPError):
    """Invalid attribute value."""
    code = 0x16

class NamingViolation(LDAPError):
    """Naming violation."""
    code = 0x40

# Administrative/Referral Errors
class Referral(LDAPError):
    """LDAP referral."""
    code = 0x0A

class AdminLimitExceeded(LDAPError):
    """Administrative limit exceeded."""
    code = 0x0B

class UnavailableCriticalExtension(LDAPError):
    """Critical extension unavailable."""
    code = 0x0C

class ConfidentialityRequired(LDAPError):
    """Confidentiality required."""  
    code = 0x0D

class SaslBindInProgress(LDAPError):
    """SASL bind in progress."""
    code = 0x0E

# Operation Errors
class CompareFalse(LDAPError):
    """Compare returned False."""
    code = 0x05

class CompareTrue(LDAPError):
    """Compare returned True."""
    code = 0x06

class StrongAuthRequired(LDAPError):
    """Strong authentication required."""
    code = 0x08

class PartialResults(LDAPError):
    """Partial results returned."""
    code = 0x09

class LoopDetect(LDAPError):
    """Loop detected."""
    code = 0x36

class SortControlMissing(LDAPError):
    """Sort control missing."""
    code = 0x3C

class OffsetRangeError(LDAPError):
    """Virtual list view offset range error."""
    code = 0x3D

class Other(LDAPError):
    """Other LDAP error."""
    code = 0x50

class ServerDown(LDAPError):
    """LDAP server is down."""
    code = 0x51

class LocalError(LDAPError):
    """Local error occurred."""
    code = 0x52

class EncodingError(LDAPError):
    """Encoding error."""  
    code = 0x53

class DecodingError(LDAPError):
    """Decoding error."""
    code = 0x54

class Timeout(LDAPError):
    """Operation timed out."""
    code = 0x55

class AuthUnknown(LDAPError):
    """Unknown authentication method."""
    code = 0x56

class FilterError(LDAPError):
    """Invalid filter."""
    code = 0x57

class UserCancelled(LDAPError):
    """User cancelled operation."""
    code = 0x58

class ParamError(LDAPError):
    """Parameter error."""
    code = 0x59

class NoMemory(LDAPError):
    """Out of memory."""
    code = 0x5A

class ConnectError(LDAPError):
    """Connection error."""
    code = 0x5B

class NotSupported(LDAPError):
    """Operation not supported."""
    code = 0x5C

class ControlNotFound(LDAPError):
    """Control not found."""
    code = 0x5D

class NoResultsReturned(LDAPError):
    """No results returned."""
    code = 0x5E

class MoreResultsToReturn(LDAPError):
    """More results available."""
    code = 0x5F

class ClientLoop(LDAPError):
    """Client loop detected."""
    code = 0x60

class ReferralLimitExceeded(LDAPError):
    """Referral limit exceeded."""
    code = 0x61

Usage Examples

String Escaping for Security

from bonsai import escape_attribute_value, escape_filter_exp, LDAPClient, LDAPDN

# Escape attribute values when constructing DNs
user_input = "Smith, Jr."  # User-provided input with special chars
safe_value = escape_attribute_value(user_input)
dn = f"cn={safe_value},ou=people,dc=example,dc=com"
print(f"Safe DN: {dn}")  # cn=Smith\\, Jr.,ou=people,dc=example,dc=com

# Use with LDAPDN class
escaped_dn = LDAPDN(f"cn={safe_value},ou=people,dc=example,dc=com")

# Escape filter expressions to prevent LDAP injection
user_search = "admin*"  # Potentially dangerous input
safe_filter = escape_filter_exp(user_search)
filter_exp = f"(cn={safe_filter})"
print(f"Safe filter: {filter_exp}")  # (cn=admin\\2A)

# Examples of various escapings
examples = [
    'John, Jr.',           # Comma needs escaping
    ' Leading space',      # Leading space needs escaping  
    'Trailing space ',     # Trailing space needs escaping
    '#hashtag',           # Leading # needs escaping
    'equals=sign',        # Equals sign needs escaping
    'quotes"here',        # Quotes need escaping
    'backslash\\here',    # Backslashes need escaping
    'less<than>greater',  # Angle brackets need escaping
]

for example in examples:
    escaped = escape_attribute_value(example)
    print(f"'{example}' -> '{escaped}'")

System Information and Configuration

from bonsai import get_vendor_info, get_tls_impl_name, has_krb5_support, set_debug

# Get LDAP library information
vendor_info = get_vendor_info()
print("LDAP Library Information:")
for key, value in vendor_info.items():
    print(f"  {key}: {value}")

# Check TLS implementation
tls_impl = get_tls_impl_name()
print(f"TLS Implementation: {tls_impl}")

# Check Kerberos support
if has_krb5_support():
    print("Kerberos/GSSAPI authentication is supported")
else:
    print("Kerberos/GSSAPI authentication is not available")

# Enable debug logging for troubleshooting
set_debug(7)  # High debug level for detailed logging

# Example output might be:
# LDAP Library Information:
#   library: OpenLDAP
#   version: 2.5.13
#   api_version: 3001
#   protocol_version: 3
# TLS Implementation: OpenSSL
# Kerberos/GSSAPI authentication is supported

Comprehensive Error Handling

from bonsai import (
    LDAPClient, LDAPError, ConnectionError, AuthenticationError,
    NoSuchObjectError, InsufficientAccess, AlreadyExists, InvalidDN,
    PasswordExpired, AccountLocked, TimeoutError
)

def robust_ldap_operation(client, operation_type, **kwargs):
    """Demonstrate comprehensive LDAP error handling."""
    try:
        with client.connect() as conn:
            if operation_type == 'search':
                return conn.search(kwargs['base'], kwargs['scope'], kwargs.get('filter'))
            elif operation_type == 'add':
                return conn.add(kwargs['entry'])
            elif operation_type == 'modify':
                return conn.modify(kwargs['entry'])
            elif operation_type == 'delete':
                return conn.delete(kwargs['dn'])
                
    except ConnectionError as e:
        print(f"Connection failed: {e}")
        print(f"Error code: {e.code} (0x{e.hexcode:04X})")
        # Could retry with different server
        return None
        
    except AuthenticationError as e:
        print(f"Authentication failed: {e}")
        print("Check username/password or authentication method")
        return None
        
    except InvalidDN as e:
        print(f"Invalid DN format: {e}")
        print("Check DN syntax and escaping")
        return None
        
    except NoSuchObjectError as e:
        print(f"Object not found: {e}")
        print("Entry does not exist in directory")
        return None
        
    except InsufficientAccess as e:
        print(f"Access denied: {e}")
        print("User lacks permissions for this operation")
        return None
        
    except AlreadyExists as e:
        print(f"Entry already exists: {e}")
        print("Cannot add duplicate entry")
        return None
        
    except TimeoutError as e:
        print(f"Operation timed out: {e}")
        print("Server may be overloaded or network is slow")
        return None
        
    except PasswordExpired as e:
        print(f"Password expired: {e}")
        print("User must change password before continuing")
        return None
        
    except AccountLocked as e:
        print(f"Account locked: {e}")
        print("Account is locked due to security policy")
        return None
        
    except LDAPError as e:
        # Catch-all for other LDAP errors
        print(f"LDAP error: {e}")
        print(f"Error code: {e.code} (0x{e.hexcode:04X})")
        return None

# Usage examples
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Search with error handling
results = robust_ldap_operation(
    client, 
    'search',
    base="dc=example,dc=com",
    scope=2,
    filter="(objectClass=person)"
)

if results is not None:
    print(f"Search completed successfully: {len(results)} entries found")
else:
    print("Search failed due to error")

Custom Error Handling Strategies

from bonsai import LDAPClient, LDAPError
import logging
import time

class LDAPConnectionManager:
    """Example of advanced error handling and retry logic."""
    
    def __init__(self, client, max_retries=3, retry_delay=1.0):
        self.client = client
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)
    
    def execute_with_retry(self, operation_func, *args, **kwargs):
        """Execute LDAP operation with automatic retry on certain errors."""
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return operation_func(*args, **kwargs)
                
            except (ConnectionError, TimeoutError) as e:
                last_error = e
                if attempt < self.max_retries:
                    self.logger.warning(
                        f"Attempt {attempt + 1} failed: {e}. Retrying in {self.retry_delay}s..."
                    )
                    time.sleep(self.retry_delay)
                    continue
                else:
                    self.logger.error(f"All {self.max_retries + 1} attempts failed")
                    break
                    
            except LDAPError as e:
                # Don't retry on authentication, permission, or data errors
                self.logger.error(f"Non-retryable error: {e}")
                last_error = e
                break
        
        # Re-raise the last error if all retries failed
        if last_error:
            raise last_error
    
    def safe_search(self, base, scope, filter_exp=None, **kwargs):
        """Search with comprehensive error handling."""
        def search_operation():
            with self.client.connect() as conn:
                return conn.search(base, scope, filter_exp, **kwargs)
        
        try:
            return self.execute_with_retry(search_operation)
        except LDAPError as e:
            self.logger.error(f"Search failed permanently: {e}")
            return []

# Usage
client = LDAPClient("ldap://unreliable-server.com")
client.set_credentials("SIMPLE", user="admin", password="secret")

manager = LDAPConnectionManager(client, max_retries=3, retry_delay=2.0)

# This will automatically retry on connection/timeout errors
results = manager.safe_search("dc=example,dc=com", 2, "(objectClass=person)")
print(f"Found {len(results)} entries")

Install with Tessl CLI

npx tessl i tessl/pypi-bonsai

docs

active-directory.md

async-frameworks.md

connection-pooling.md

core-ldap.md

index.md

ldif-support.md

utilities-errors.md

tile.json