CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bonsai

Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-ldap.mddocs/

Core LDAP Operations

Essential LDAP functionality providing client configuration, connection management, entry operations, distinguished name handling, and URL parsing. These components form the foundation for all LDAP directory server interactions.

Capabilities

LDAP Client Configuration

Configures connections to LDAP directory servers with authentication mechanisms, TLS settings, and various connection options.

class LDAPClient:
    def __init__(self, url: Union[LDAPURL, str] = "ldap://", tls: bool = False) -> None:
        """
        Initialize LDAP client with server URL and TLS configuration.
        
        Parameters:
        - url: LDAP URL (e.g., "ldap://localhost" or "ldaps://server.com")
        - tls: Enable TLS connection
        """

    def set_credentials(
        self,
        mechanism: str,
        user: Optional[str] = None,
        password: Optional[str] = None,
        realm: Optional[str] = None,
        authz_id: Optional[str] = None,
        keytab: Optional[str] = None,
    ) -> None:
        """
        Set authentication credentials and mechanism.
        
        Parameters:
        - mechanism: Authentication mechanism ("SIMPLE", "GSSAPI", "EXTERNAL", etc.)
        - user: Username or bind DN for authentication
        - password: Password for simple authentication
        - realm: Kerberos realm
        - authz_id: Authorization ID for EXTERNAL mechanism
        - keytab: Path to Kerberos keytab file
        """

    def connect(self, is_async: bool = False) -> LDAPConnection:
        """
        Create connection to LDAP server.
        
        Parameters:
        - is_async: Whether to return async connection type
        
        Returns:
        LDAPConnection object for performing operations
        """

    def set_raw_attributes(self, raw_list: List[str]) -> None:
        """
        Configure attributes to be returned as raw bytes instead of strings.
        
        Parameters:
        - raw_list: List of attribute names to keep in binary format
        """

    def set_cert_policy(self, policy: int) -> None:
        """Set certificate validation policy for TLS connections."""

    def set_ca_cert(self, name: str) -> None:
        """Set CA certificate file path for TLS validation."""

    def set_ca_cert_dir(self, path: str) -> None:
        """Set CA certificate directory path for TLS validation."""

    def set_client_cert(self, name: str) -> None:
        """Set client certificate file path for TLS authentication."""

    def set_client_key(self, name: str) -> None:
        """Set client private key file path for TLS authentication."""

    def set_password_policy(self, ppolicy: bool) -> None:
        """Enable password policy control."""

    def set_extended_dn(self, extdn_format: Optional[int]) -> None:
        """
        Set extended DN format for Active Directory.
        
        Parameters:
        - extdn_format: Extended DN format (0=hex string, 1=standard string)
        """

    def set_sd_flags(self, flags: Optional[int]) -> None:
        """
        Set security descriptor flags for Windows AD.
        
        Parameters:
        - flags: Security descriptor control flags
        """

    def set_auto_page_acquire(self, val: bool) -> None:
        """Enable automatic page size acquisition for paged searches."""

    def set_ignore_referrals(self, val: bool) -> None:
        """Set whether to ignore LDAP referrals."""

    def set_server_chase_referrals(self, val: bool) -> None:
        """Set whether server should chase referrals."""

    def set_managedsait(self, val: bool) -> None:
        """Set ManageDsaIT control for directory operations."""

    def get_rootDSE(self) -> Optional["LDAPEntry"]:
        """
        Get root DSE entry containing server capabilities.
        
        Returns:
        LDAPEntry with root DSE information or None if unavailable
        """

    @property
    def url(self) -> LDAPURL:
        """LDAP URL configuration."""

    @property
    def mechanism(self) -> str:
        """Authentication mechanism."""

    @property
    def tls(self) -> bool:
        """TLS connection flag."""

    @property
    def password_policy(self) -> bool:
        """Password policy control status."""

    @property
    def extended_dn_format(self) -> Optional[int]:
        """Extended DN format setting."""

    @property
    def sd_flags(self) -> Optional[int]:
        """Security descriptor flags."""

    @property
    def auto_page_acquire(self) -> bool:
        """Automatic page acquisition status."""

    @property
    def ignore_referrals(self) -> bool:
        """Ignore referrals setting."""

    @property
    def server_chase_referrals(self) -> bool:
        """Server chase referrals setting."""

    @property
    def managedsait(self) -> bool:
        """ManageDsaIT control setting."""

LDAP Connection Operations

Provides all LDAP directory operations including search, add, modify, delete, and rename operations.

class LDAPConnection:
    def search(
        self,
        base: Optional[Union[str, LDAPDN]] = None,
        scope: Optional[Union[LDAPSearchScope, int]] = None,
        filter_exp: Optional[str] = None,
        attrlist: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        sizelimit: int = 0,
        attrsonly: bool = False,
        sort_order: Optional[List[str]] = None,
        page_size: int = 0,
    ) -> List[LDAPEntry]:
        """
        Search LDAP directory for entries matching criteria.
        
        Parameters:
        - base: Base DN to search from
        - scope: Search scope (BASE=0, ONELEVEL=1, SUBTREE=2)
        - filter_exp: LDAP filter expression (e.g., "(objectClass=person)")
        - attrlist: List of attributes to retrieve (None for all)
        - timeout: Operation timeout in seconds
        - sizelimit: Maximum number of entries to return
        - attrsonly: Return attribute names only, no values
        - sort_order: List of attributes for server-side sorting
        - page_size: Enable paged results with specified page size
        
        Returns:
        List of LDAPEntry objects matching the search criteria
        """

    def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
        """
        Add new entry to LDAP directory.
        
        Parameters:
        - entry: LDAPEntry object with DN and attributes
        - timeout: Operation timeout in seconds
        """

    def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
        """
        Modify existing entry in LDAP directory.
        
        Parameters:
        - entry: LDAPEntry object with modifications
        - timeout: Operation timeout in seconds
        """

    def delete(
        self,
        dname: Union[str, LDAPDN],
        timeout: Optional[float] = None,
        recursive: bool = False,
    ) -> None:
        """
        Delete entry from LDAP directory.
        
        Parameters:
        - dname: Distinguished name of entry to delete
        - timeout: Operation timeout in seconds
        - recursive: Delete entry and all children
        """

    def rename(
        self,
        dn: Union[str, LDAPDN],
        newrdn: str,
        new_superior: Optional[Union[str, LDAPDN]] = None,
        delete_old_rdn: bool = True,
        timeout: Optional[float] = None,
    ) -> None:
        """
        Rename/move entry in LDAP directory.
        
        Parameters:
        - dn: Current distinguished name
        - newrdn: New relative distinguished name
        - new_superior: New parent DN (for move operation)
        - delete_old_rdn: Remove old RDN attribute values
        - timeout: Operation timeout in seconds
        """

    def modify_password(
        self,
        user: Optional[Union[str, LDAPDN]] = None,
        new_password: Optional[str] = None,
        old_password: Optional[str] = None,
        timeout: Optional[float] = None,
    ) -> str:
        """
        Modify user password using LDAP password modify extended operation.
        
        Parameters:
        - user: User DN (None for current authenticated user)
        - new_password: New password (None for server-generated)
        - old_password: Current password for verification
        - timeout: Operation timeout in seconds
        
        Returns:
        New password if server-generated
        """

    def abandon(self, msg_id: int) -> None:
        """
        Abandon ongoing LDAP operation.
        
        Parameters:
        - msg_id: Message ID of operation to abandon
        """

    def whoami(self, timeout: Optional[float] = None) -> str:
        """
        Execute LDAP "Who Am I?" extended operation to get current identity.
        
        Parameters:
        - timeout: Operation timeout in seconds
        
        Returns:
        Authorization identity string
        """

    def paged_search(
        self,
        base: Optional[Union[str, LDAPDN]] = None,
        scope: Optional[Union[LDAPSearchScope, int]] = None,
        filter_exp: Optional[str] = None,
        attrlist: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        page_size: int = 1,
        sort_order: Optional[List[str]] = None,
    ) -> "LDAPSearchIter":
        """
        Perform paged search returning an iterator for large result sets.
        
        Parameters:
        - base: Base DN to search from
        - scope: Search scope (BASE=0, ONELEVEL=1, SUBTREE=2)
        - filter_exp: LDAP filter expression
        - attrlist: List of attributes to retrieve
        - timeout: Operation timeout in seconds
        - page_size: Number of entries per page
        - sort_order: List of attributes for server-side sorting
        
        Returns:
        LDAPSearchIter object for iterating through paged results
        """

    def virtual_list_search(
        self,
        base: Optional[Union[str, LDAPDN]] = None,
        scope: Optional[Union[LDAPSearchScope, int]] = None,
        filter_exp: Optional[str] = None,
        attrlist: Optional[List[str]] = None,
        timeout: Optional[float] = None,
        offset: int = 1,
        before_count: int = 0,
        after_count: int = 0,
        est_list_count: int = 0,
        attrvalue: Optional[str] = None,
        sort_order: Optional[List[str]] = None,
    ) -> "LDAPSearchIter":
        """
        Perform virtual list view search for efficient browsing of large sorted lists.
        
        Parameters:
        - base: Base DN to search from
        - scope: Search scope
        - filter_exp: LDAP filter expression
        - attrlist: List of attributes to retrieve
        - timeout: Operation timeout in seconds
        - offset: Target entry position in virtual list
        - before_count: Number of entries to return before target
        - after_count: Number of entries to return after target
        - est_list_count: Estimated total list size
        - attrvalue: Target attribute value for positioning
        - sort_order: Required sort order for virtual list view
        
        Returns:
        LDAPSearchIter object for virtual list results
        """

    def close(self) -> None:
        """Close LDAP connection and free resources."""

    def open(self, timeout: Optional[float] = None) -> "LDAPConnection":
        """
        Open connection to LDAP server.
        
        Parameters:
        - timeout: Connection timeout in seconds
        
        Returns:
        Self for method chaining
        """

    @property
    def is_closed(self) -> bool:
        """Connection closed status."""

    @property
    def tls_inuse(self) -> bool:
        """TLS in use status."""

LDAP Entry Handling

Dictionary-like interface for LDAP entries with automatic change tracking and type conversion.

class LDAPEntry:
    def __init__(self, dn: Union[str, LDAPDN], conn: Optional[LDAPConnection] = None) -> None:
        """
        Initialize LDAP entry.
        
        Parameters:
        - dn: Distinguished name of the entry
        - conn: LDAP connection for automatic synchronization
        """

    def __getitem__(self, key: str) -> LDAPValueList:
        """Get attribute value list by name."""

    def __setitem__(self, key: str, value: Any) -> None:
        """Set attribute value(s)."""

    def __delitem__(self, key: str) -> None:
        """Delete attribute."""

    def __contains__(self, key: str) -> bool:
        """Check if attribute exists."""

    def __iter__(self):
        """Iterate over attribute names."""

    def keys(self):
        """Get attribute names."""

    def values(self):
        """Get attribute value lists."""

    def items(self):
        """Get (attribute, value_list) pairs."""

    def get(self, key: str, default=None):
        """Get attribute with default value."""

    def update(self, other: dict) -> None:
        """Update entry with dictionary of attributes."""

    def clear(self) -> None:
        """Remove all attributes except DN."""

    def modify(self, timeout: Optional[float] = None) -> None:
        """
        Save changes to LDAP directory (requires connection).
        
        Parameters:
        - timeout: Operation timeout in seconds
        """

    def rename(
        self,
        newdn: Union[str, LDAPDN],
        delete_old_rdn: bool = True,
        timeout: Optional[float] = None,
    ) -> None:
        """
        Rename entry in LDAP directory (requires connection).
        
        Parameters:
        - newdn: New distinguished name
        - delete_old_rdn: Remove old RDN attributes
        - timeout: Operation timeout in seconds
        """

    def delete(self, timeout: Optional[float] = None) -> None:
        """
        Delete entry from LDAP directory (requires connection).
        
        Parameters:
        - timeout: Operation timeout in seconds
        """

    @property
    def dn(self) -> LDAPDN:
        """Distinguished name of the entry."""

    @property
    def connection(self) -> Optional[LDAPConnection]:
        """Associated LDAP connection."""

    @property
    def changes(self) -> dict:
        """Dictionary of pending changes."""

Distinguished Name Handling

Parse, manipulate, and validate LDAP distinguished names with support for escaping and comparison.

class LDAPDN:
    def __init__(self, dnstr: str) -> None:
        """
        Initialize distinguished name from string.
        
        Parameters:
        - dnstr: DN string (e.g., "cn=user,ou=people,dc=example,dc=com")
        """

    def __str__(self) -> str:
        """Convert to string representation."""

    def __eq__(self, other) -> bool:
        """Compare DNs for equality."""

    def __getitem__(self, idx: int) -> str:
        """Get RDN component by index."""

    def __len__(self) -> int:
        """Get number of RDN components."""

    def __contains__(self, item: Union[str, "LDAPDN"]) -> bool:
        """Check if DN contains another DN or RDN."""

    def __add__(self, other: Union[str, "LDAPDN"]) -> "LDAPDN":
        """Concatenate DNs."""

    @property
    def rdns(self) -> List[str]:
        """List of relative distinguished name components."""

    def ancestors(self) -> List["LDAPDN"]:
        """Get list of ancestor DNs."""

    def is_valid(self) -> bool:
        """Validate DN syntax."""

    @staticmethod
    def escape_attribute_value(value: str) -> str:
        """
        Escape special characters in attribute values.
        
        Parameters:
        - value: Attribute value to escape
        
        Returns:
        Escaped attribute value
        """

LDAP URL Handling

Parse and construct LDAP URLs with support for all standard components including host, port, base DN, scope, filter, and attributes.

class LDAPURL:
    def __init__(self, url: str) -> None:
        """
        Initialize LDAP URL from string.
        
        Parameters:
        - url: LDAP URL string (e.g., "ldap://server.com:389/dc=example,dc=com?cn,sn?sub?(objectClass=person)")
        """

    def __str__(self) -> str:
        """Convert to URL string."""

    def __eq__(self, other) -> bool:
        """Compare URLs for equality."""

    @property
    def scheme(self) -> str:
        """URL scheme (ldap, ldaps, ldapi)."""

    @property
    def host(self) -> str:
        """LDAP server hostname."""

    @property
    def port(self) -> int:
        """LDAP server port number."""

    @property
    def basedn(self) -> Optional[LDAPDN]:
        """Base DN for operations."""

    @property
    def attributes(self) -> List[str]:
        """List of attributes to retrieve."""

    @property
    def scope(self) -> Optional[LDAPSearchScope]:
        """Search scope."""

    @property
    def scope_num(self) -> int:
        """Search scope as integer."""

    @property
    def filter_exp(self) -> Optional[str]:
        """LDAP filter expression."""

    @property
    def extensions(self) -> Optional[dict]:
        """URL extensions dictionary."""

    def get_address(self) -> str:
        """Get server address (host:port)."""

    @classmethod
    def from_config(
        cls,
        host: str,
        port: int = 389,
        basedn: Optional[str] = None,
        **kwargs
    ) -> "LDAPURL":
        """
        Create LDAP URL from configuration parameters.
        
        Parameters:
        - host: LDAP server hostname
        - port: LDAP server port
        - basedn: Base DN string
        - **kwargs: Additional URL components
        
        Returns:
        LDAPURL object
        """

Value Lists

Specialized list for LDAP attribute values with change tracking and type handling.

class LDAPValueList(list):
    def __init__(self, items=None) -> None:
        """
        Initialize value list with optional items.
        
        Parameters:
        - items: Initial list of values
        """

    def append(self, item: Any) -> None:
        """Add value to list."""

    def extend(self, items) -> None:
        """Add multiple values to list."""

    def insert(self, index: int, item: Any) -> None:
        """Insert value at specific index."""

    def remove(self, item: Any) -> None:
        """Remove first occurrence of value."""

    def clear(self) -> None:
        """Remove all values."""

    @property
    def added(self) -> set:
        """Set of values marked as added."""

    @property
    def deleted(self) -> set:
        """Set of values marked as deleted."""

    @property
    def status(self) -> int:
        """Change status (0=unchanged, 1=added, 2=deleted, 3=replaced)."""

Search Iterator

Iterator for handling paged search results and large result sets.

class LDAPSearchIter:
    def __init__(self, conn: LDAPConnection, base: str, scope: int, **kwargs) -> None:
        """
        Initialize search iterator.
        
        Parameters:
        - conn: LDAP connection object
        - base: Base DN for search
        - scope: Search scope
        - **kwargs: Additional search parameters
        """

    def __iter__(self) -> "LDAPSearchIter":
        """Return iterator object."""

    def __next__(self) -> LDAPEntry:
        """Get next entry from search results."""

    def acquire_next_page(self) -> None:
        """Manually acquire next page of results."""

    @property
    def cookie(self) -> Optional[bytes]:
        """Paging cookie for server-side state."""

    @property
    def estimated_list_count(self) -> int:
        """Estimated total count for virtual list view."""

LDAP Reference

Handles LDAP referrals returned by directory servers.

class LDAPReference:
    def __init__(self, client: LDAPClient, references: List[Union[str, LDAPURL]]) -> None:
        """
        Initialize LDAP reference object.
        
        Parameters:
        - client: LDAP client for connection configuration
        - references: List of referral URLs
        """

    @property
    def client(self) -> LDAPClient:
        """LDAP client configuration."""

    @property
    def references(self) -> List[LDAPURL]:
        """List of referral LDAP URLs."""

Usage Examples

Basic LDAP Operations

from bonsai import LDAPClient, LDAPEntry, LDAPDN

# Configure client
client = LDAPClient("ldap://localhost:389")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Connect and search
with client.connect() as conn:
    # Search for all person entries
    results = conn.search(
        "dc=example,dc=com",
        2,  # SUBTREE scope
        "(objectClass=person)",
        ["cn", "sn", "mail"]
    )
    
    for entry in results:
        print(f"Found: {entry.dn}")
        print(f"Name: {entry['cn'][0]}")
        if 'mail' in entry:
            print(f"Email: {entry['mail'][0]}")

Creating and Modifying Entries

# Create new entry
new_user = LDAPEntry("cn=jdoe,ou=people,dc=example,dc=com")
new_user['objectClass'] = ['person', 'organizationalPerson', 'inetOrgPerson']
new_user['cn'] = 'jdoe'
new_user['sn'] = 'Doe'  
new_user['givenName'] = 'John'
new_user['mail'] = 'jdoe@example.com'

with client.connect() as conn:
    # Add to directory
    conn.add(new_user)
    
    # Modify existing entry
    user = conn.search("cn=jdoe,ou=people,dc=example,dc=com", 0)[0]  # BASE scope
    user['title'] = 'Software Engineer'
    user['telephoneNumber'] = '+1-555-0123'
    conn.modify(user)
    
    # Delete entry
    conn.delete("cn=jdoe,ou=people,dc=example,dc=com")

Distinguished Name Operations

from bonsai import LDAPDN

# Parse DN
dn = LDAPDN("cn=John Doe,ou=people,dc=example,dc=com")
print(f"RDNs: {dn.rdns}")  # ['cn=John Doe', 'ou=people', 'dc=example', 'dc=com']
print(f"Length: {len(dn)}")  # 4

# Check containment
base_dn = LDAPDN("dc=example,dc=com")
print(base_dn in dn)  # True

# Get ancestors
ancestors = dn.ancestors()
for ancestor in ancestors:
    print(ancestor)  # ou=people,dc=example,dc=com, then dc=example,dc=com, then dc=com

Install with Tessl CLI

npx tessl i tessl/pypi-bonsai

docs

active-directory.md

async-frameworks.md

connection-pooling.md

core-ldap.md

index.md

ldif-support.md

utilities-errors.md

tile.json