Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Essential LDAP functionality providing client configuration, connection management, entry operations, distinguished name handling, and URL parsing. These components form the foundation for all LDAP directory server interactions.
Configures connections to LDAP directory servers with authentication mechanisms, TLS settings, and various connection options.
class LDAPClient:
def __init__(self, url: Union[LDAPURL, str] = "ldap://", tls: bool = False) -> None:
"""
Initialize LDAP client with server URL and TLS configuration.
Parameters:
- url: LDAP URL (e.g., "ldap://localhost" or "ldaps://server.com")
- tls: Enable TLS connection
"""
def set_credentials(
self,
mechanism: str,
user: Optional[str] = None,
password: Optional[str] = None,
realm: Optional[str] = None,
authz_id: Optional[str] = None,
keytab: Optional[str] = None,
) -> None:
"""
Set authentication credentials and mechanism.
Parameters:
- mechanism: Authentication mechanism ("SIMPLE", "GSSAPI", "EXTERNAL", etc.)
- user: Username or bind DN for authentication
- password: Password for simple authentication
- realm: Kerberos realm
- authz_id: Authorization ID for EXTERNAL mechanism
- keytab: Path to Kerberos keytab file
"""
def connect(self, is_async: bool = False) -> LDAPConnection:
"""
Create connection to LDAP server.
Parameters:
- is_async: Whether to return async connection type
Returns:
LDAPConnection object for performing operations
"""
def set_raw_attributes(self, raw_list: List[str]) -> None:
"""
Configure attributes to be returned as raw bytes instead of strings.
Parameters:
- raw_list: List of attribute names to keep in binary format
"""
def set_cert_policy(self, policy: int) -> None:
"""Set certificate validation policy for TLS connections."""
def set_ca_cert(self, name: str) -> None:
"""Set CA certificate file path for TLS validation."""
def set_ca_cert_dir(self, path: str) -> None:
"""Set CA certificate directory path for TLS validation."""
def set_client_cert(self, name: str) -> None:
"""Set client certificate file path for TLS authentication."""
def set_client_key(self, name: str) -> None:
"""Set client private key file path for TLS authentication."""
def set_password_policy(self, ppolicy: bool) -> None:
"""Enable password policy control."""
def set_extended_dn(self, extdn_format: Optional[int]) -> None:
"""
Set extended DN format for Active Directory.
Parameters:
- extdn_format: Extended DN format (0=hex string, 1=standard string)
"""
def set_sd_flags(self, flags: Optional[int]) -> None:
"""
Set security descriptor flags for Windows AD.
Parameters:
- flags: Security descriptor control flags
"""
def set_auto_page_acquire(self, val: bool) -> None:
"""Enable automatic page size acquisition for paged searches."""
def set_ignore_referrals(self, val: bool) -> None:
"""Set whether to ignore LDAP referrals."""
def set_server_chase_referrals(self, val: bool) -> None:
"""Set whether server should chase referrals."""
def set_managedsait(self, val: bool) -> None:
"""Set ManageDsaIT control for directory operations."""
def get_rootDSE(self) -> Optional["LDAPEntry"]:
"""
Get root DSE entry containing server capabilities.
Returns:
LDAPEntry with root DSE information or None if unavailable
"""
@property
def url(self) -> LDAPURL:
"""LDAP URL configuration."""
@property
def mechanism(self) -> str:
"""Authentication mechanism."""
@property
def tls(self) -> bool:
"""TLS connection flag."""
@property
def password_policy(self) -> bool:
"""Password policy control status."""
@property
def extended_dn_format(self) -> Optional[int]:
"""Extended DN format setting."""
@property
def sd_flags(self) -> Optional[int]:
"""Security descriptor flags."""
@property
def auto_page_acquire(self) -> bool:
"""Automatic page acquisition status."""
@property
def ignore_referrals(self) -> bool:
"""Ignore referrals setting."""
@property
def server_chase_referrals(self) -> bool:
"""Server chase referrals setting."""
@property
def managedsait(self) -> bool:
"""ManageDsaIT control setting."""Provides all LDAP directory operations including search, add, modify, delete, and rename operations.
class LDAPConnection:
def search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
sizelimit: int = 0,
attrsonly: bool = False,
sort_order: Optional[List[str]] = None,
page_size: int = 0,
) -> List[LDAPEntry]:
"""
Search LDAP directory for entries matching criteria.
Parameters:
- base: Base DN to search from
- scope: Search scope (BASE=0, ONELEVEL=1, SUBTREE=2)
- filter_exp: LDAP filter expression (e.g., "(objectClass=person)")
- attrlist: List of attributes to retrieve (None for all)
- timeout: Operation timeout in seconds
- sizelimit: Maximum number of entries to return
- attrsonly: Return attribute names only, no values
- sort_order: List of attributes for server-side sorting
- page_size: Enable paged results with specified page size
Returns:
List of LDAPEntry objects matching the search criteria
"""
def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""
Add new entry to LDAP directory.
Parameters:
- entry: LDAPEntry object with DN and attributes
- timeout: Operation timeout in seconds
"""
def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""
Modify existing entry in LDAP directory.
Parameters:
- entry: LDAPEntry object with modifications
- timeout: Operation timeout in seconds
"""
def delete(
self,
dname: Union[str, LDAPDN],
timeout: Optional[float] = None,
recursive: bool = False,
) -> None:
"""
Delete entry from LDAP directory.
Parameters:
- dname: Distinguished name of entry to delete
- timeout: Operation timeout in seconds
- recursive: Delete entry and all children
"""
def rename(
self,
dn: Union[str, LDAPDN],
newrdn: str,
new_superior: Optional[Union[str, LDAPDN]] = None,
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> None:
"""
Rename/move entry in LDAP directory.
Parameters:
- dn: Current distinguished name
- newrdn: New relative distinguished name
- new_superior: New parent DN (for move operation)
- delete_old_rdn: Remove old RDN attribute values
- timeout: Operation timeout in seconds
"""
def modify_password(
self,
user: Optional[Union[str, LDAPDN]] = None,
new_password: Optional[str] = None,
old_password: Optional[str] = None,
timeout: Optional[float] = None,
) -> str:
"""
Modify user password using LDAP password modify extended operation.
Parameters:
- user: User DN (None for current authenticated user)
- new_password: New password (None for server-generated)
- old_password: Current password for verification
- timeout: Operation timeout in seconds
Returns:
New password if server-generated
"""
def abandon(self, msg_id: int) -> None:
"""
Abandon ongoing LDAP operation.
Parameters:
- msg_id: Message ID of operation to abandon
"""
def whoami(self, timeout: Optional[float] = None) -> str:
"""
Execute LDAP "Who Am I?" extended operation to get current identity.
Parameters:
- timeout: Operation timeout in seconds
Returns:
Authorization identity string
"""
def paged_search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
page_size: int = 1,
sort_order: Optional[List[str]] = None,
) -> "LDAPSearchIter":
"""
Perform paged search returning an iterator for large result sets.
Parameters:
- base: Base DN to search from
- scope: Search scope (BASE=0, ONELEVEL=1, SUBTREE=2)
- filter_exp: LDAP filter expression
- attrlist: List of attributes to retrieve
- timeout: Operation timeout in seconds
- page_size: Number of entries per page
- sort_order: List of attributes for server-side sorting
Returns:
LDAPSearchIter object for iterating through paged results
"""
def virtual_list_search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
offset: int = 1,
before_count: int = 0,
after_count: int = 0,
est_list_count: int = 0,
attrvalue: Optional[str] = None,
sort_order: Optional[List[str]] = None,
) -> "LDAPSearchIter":
"""
Perform virtual list view search for efficient browsing of large sorted lists.
Parameters:
- base: Base DN to search from
- scope: Search scope
- filter_exp: LDAP filter expression
- attrlist: List of attributes to retrieve
- timeout: Operation timeout in seconds
- offset: Target entry position in virtual list
- before_count: Number of entries to return before target
- after_count: Number of entries to return after target
- est_list_count: Estimated total list size
- attrvalue: Target attribute value for positioning
- sort_order: Required sort order for virtual list view
Returns:
LDAPSearchIter object for virtual list results
"""
def close(self) -> None:
"""Close LDAP connection and free resources."""
def open(self, timeout: Optional[float] = None) -> "LDAPConnection":
"""
Open connection to LDAP server.
Parameters:
- timeout: Connection timeout in seconds
Returns:
Self for method chaining
"""
@property
def is_closed(self) -> bool:
"""Connection closed status."""
@property
def tls_inuse(self) -> bool:
"""TLS in use status."""Dictionary-like interface for LDAP entries with automatic change tracking and type conversion.
class LDAPEntry:
def __init__(self, dn: Union[str, LDAPDN], conn: Optional[LDAPConnection] = None) -> None:
"""
Initialize LDAP entry.
Parameters:
- dn: Distinguished name of the entry
- conn: LDAP connection for automatic synchronization
"""
def __getitem__(self, key: str) -> LDAPValueList:
"""Get attribute value list by name."""
def __setitem__(self, key: str, value: Any) -> None:
"""Set attribute value(s)."""
def __delitem__(self, key: str) -> None:
"""Delete attribute."""
def __contains__(self, key: str) -> bool:
"""Check if attribute exists."""
def __iter__(self):
"""Iterate over attribute names."""
def keys(self):
"""Get attribute names."""
def values(self):
"""Get attribute value lists."""
def items(self):
"""Get (attribute, value_list) pairs."""
def get(self, key: str, default=None):
"""Get attribute with default value."""
def update(self, other: dict) -> None:
"""Update entry with dictionary of attributes."""
def clear(self) -> None:
"""Remove all attributes except DN."""
def modify(self, timeout: Optional[float] = None) -> None:
"""
Save changes to LDAP directory (requires connection).
Parameters:
- timeout: Operation timeout in seconds
"""
def rename(
self,
newdn: Union[str, LDAPDN],
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> None:
"""
Rename entry in LDAP directory (requires connection).
Parameters:
- newdn: New distinguished name
- delete_old_rdn: Remove old RDN attributes
- timeout: Operation timeout in seconds
"""
def delete(self, timeout: Optional[float] = None) -> None:
"""
Delete entry from LDAP directory (requires connection).
Parameters:
- timeout: Operation timeout in seconds
"""
@property
def dn(self) -> LDAPDN:
"""Distinguished name of the entry."""
@property
def connection(self) -> Optional[LDAPConnection]:
"""Associated LDAP connection."""
@property
def changes(self) -> dict:
"""Dictionary of pending changes."""Parse, manipulate, and validate LDAP distinguished names with support for escaping and comparison.
class LDAPDN:
def __init__(self, dnstr: str) -> None:
"""
Initialize distinguished name from string.
Parameters:
- dnstr: DN string (e.g., "cn=user,ou=people,dc=example,dc=com")
"""
def __str__(self) -> str:
"""Convert to string representation."""
def __eq__(self, other) -> bool:
"""Compare DNs for equality."""
def __getitem__(self, idx: int) -> str:
"""Get RDN component by index."""
def __len__(self) -> int:
"""Get number of RDN components."""
def __contains__(self, item: Union[str, "LDAPDN"]) -> bool:
"""Check if DN contains another DN or RDN."""
def __add__(self, other: Union[str, "LDAPDN"]) -> "LDAPDN":
"""Concatenate DNs."""
@property
def rdns(self) -> List[str]:
"""List of relative distinguished name components."""
def ancestors(self) -> List["LDAPDN"]:
"""Get list of ancestor DNs."""
def is_valid(self) -> bool:
"""Validate DN syntax."""
@staticmethod
def escape_attribute_value(value: str) -> str:
"""
Escape special characters in attribute values.
Parameters:
- value: Attribute value to escape
Returns:
Escaped attribute value
"""Parse and construct LDAP URLs with support for all standard components including host, port, base DN, scope, filter, and attributes.
class LDAPURL:
def __init__(self, url: str) -> None:
"""
Initialize LDAP URL from string.
Parameters:
- url: LDAP URL string (e.g., "ldap://server.com:389/dc=example,dc=com?cn,sn?sub?(objectClass=person)")
"""
def __str__(self) -> str:
"""Convert to URL string."""
def __eq__(self, other) -> bool:
"""Compare URLs for equality."""
@property
def scheme(self) -> str:
"""URL scheme (ldap, ldaps, ldapi)."""
@property
def host(self) -> str:
"""LDAP server hostname."""
@property
def port(self) -> int:
"""LDAP server port number."""
@property
def basedn(self) -> Optional[LDAPDN]:
"""Base DN for operations."""
@property
def attributes(self) -> List[str]:
"""List of attributes to retrieve."""
@property
def scope(self) -> Optional[LDAPSearchScope]:
"""Search scope."""
@property
def scope_num(self) -> int:
"""Search scope as integer."""
@property
def filter_exp(self) -> Optional[str]:
"""LDAP filter expression."""
@property
def extensions(self) -> Optional[dict]:
"""URL extensions dictionary."""
def get_address(self) -> str:
"""Get server address (host:port)."""
@classmethod
def from_config(
cls,
host: str,
port: int = 389,
basedn: Optional[str] = None,
**kwargs
) -> "LDAPURL":
"""
Create LDAP URL from configuration parameters.
Parameters:
- host: LDAP server hostname
- port: LDAP server port
- basedn: Base DN string
- **kwargs: Additional URL components
Returns:
LDAPURL object
"""Specialized list for LDAP attribute values with change tracking and type handling.
class LDAPValueList(list):
def __init__(self, items=None) -> None:
"""
Initialize value list with optional items.
Parameters:
- items: Initial list of values
"""
def append(self, item: Any) -> None:
"""Add value to list."""
def extend(self, items) -> None:
"""Add multiple values to list."""
def insert(self, index: int, item: Any) -> None:
"""Insert value at specific index."""
def remove(self, item: Any) -> None:
"""Remove first occurrence of value."""
def clear(self) -> None:
"""Remove all values."""
@property
def added(self) -> set:
"""Set of values marked as added."""
@property
def deleted(self) -> set:
"""Set of values marked as deleted."""
@property
def status(self) -> int:
"""Change status (0=unchanged, 1=added, 2=deleted, 3=replaced)."""Iterator for handling paged search results and large result sets.
class LDAPSearchIter:
def __init__(self, conn: LDAPConnection, base: str, scope: int, **kwargs) -> None:
"""
Initialize search iterator.
Parameters:
- conn: LDAP connection object
- base: Base DN for search
- scope: Search scope
- **kwargs: Additional search parameters
"""
def __iter__(self) -> "LDAPSearchIter":
"""Return iterator object."""
def __next__(self) -> LDAPEntry:
"""Get next entry from search results."""
def acquire_next_page(self) -> None:
"""Manually acquire next page of results."""
@property
def cookie(self) -> Optional[bytes]:
"""Paging cookie for server-side state."""
@property
def estimated_list_count(self) -> int:
"""Estimated total count for virtual list view."""Handles LDAP referrals returned by directory servers.
class LDAPReference:
def __init__(self, client: LDAPClient, references: List[Union[str, LDAPURL]]) -> None:
"""
Initialize LDAP reference object.
Parameters:
- client: LDAP client for connection configuration
- references: List of referral URLs
"""
@property
def client(self) -> LDAPClient:
"""LDAP client configuration."""
@property
def references(self) -> List[LDAPURL]:
"""List of referral LDAP URLs."""from bonsai import LDAPClient, LDAPEntry, LDAPDN
# Configure client
client = LDAPClient("ldap://localhost:389")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
# Connect and search
with client.connect() as conn:
# Search for all person entries
results = conn.search(
"dc=example,dc=com",
2, # SUBTREE scope
"(objectClass=person)",
["cn", "sn", "mail"]
)
for entry in results:
print(f"Found: {entry.dn}")
print(f"Name: {entry['cn'][0]}")
if 'mail' in entry:
print(f"Email: {entry['mail'][0]}")# Create new entry
new_user = LDAPEntry("cn=jdoe,ou=people,dc=example,dc=com")
new_user['objectClass'] = ['person', 'organizationalPerson', 'inetOrgPerson']
new_user['cn'] = 'jdoe'
new_user['sn'] = 'Doe'
new_user['givenName'] = 'John'
new_user['mail'] = 'jdoe@example.com'
with client.connect() as conn:
# Add to directory
conn.add(new_user)
# Modify existing entry
user = conn.search("cn=jdoe,ou=people,dc=example,dc=com", 0)[0] # BASE scope
user['title'] = 'Software Engineer'
user['telephoneNumber'] = '+1-555-0123'
conn.modify(user)
# Delete entry
conn.delete("cn=jdoe,ou=people,dc=example,dc=com")from bonsai import LDAPDN
# Parse DN
dn = LDAPDN("cn=John Doe,ou=people,dc=example,dc=com")
print(f"RDNs: {dn.rdns}") # ['cn=John Doe', 'ou=people', 'dc=example', 'dc=com']
print(f"Length: {len(dn)}") # 4
# Check containment
base_dn = LDAPDN("dc=example,dc=com")
print(base_dn in dn) # True
# Get ancestors
ancestors = dn.ancestors()
for ancestor in ancestors:
print(ancestor) # ou=people,dc=example,dc=com, then dc=example,dc=com, then dc=comInstall with Tessl CLI
npx tessl i tessl/pypi-bonsai