Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Native integration with multiple Python async frameworks including asyncio, gevent, tornado, and trio. Each framework provides specialized connection classes optimized for their respective event loops and concurrency models.
Native Python asyncio integration with async/await syntax and asyncio event loop integration.
from bonsai.asyncio import AIOLDAPConnection, AIOConnectionPool
class AIOLDAPConnection(LDAPConnection):
def __init__(self, client: LDAPClient, loop=None) -> None:
"""
Initialize asyncio LDAP connection.
Parameters:
- client: LDAPClient configuration object
- loop: asyncio event loop (uses current loop if None)
"""
async def __aenter__(self) -> "AIOLDAPConnection":
"""Async context manager entry point."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit point."""
async def search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
sizelimit: int = 0,
attrsonly: bool = False,
sort_order: Optional[List[str]] = None,
page_size: int = 0,
) -> List[LDAPEntry]:
"""
Async search LDAP directory for entries.
Parameters: Same as LDAPConnection.search()
Returns:
List of LDAPEntry objects matching search criteria
"""
async def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""
Async add entry to LDAP directory.
Parameters:
- entry: LDAPEntry object to add
- timeout: Operation timeout in seconds
"""
async def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""
Async modify existing entry in LDAP directory.
Parameters:
- entry: LDAPEntry object with modifications
- timeout: Operation timeout in seconds
"""
async def delete(
self,
dname: Union[str, LDAPDN],
timeout: Optional[float] = None,
recursive: bool = False,
) -> None:
"""
Async delete entry from LDAP directory.
Parameters:
- dname: Distinguished name of entry to delete
- timeout: Operation timeout in seconds
- recursive: Delete entry and all children recursively
"""
async def rename(
self,
dn: Union[str, LDAPDN],
newrdn: str,
new_superior: Optional[Union[str, LDAPDN]] = None,
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> None:
"""
Async rename/move entry in LDAP directory.
Parameters: Same as LDAPConnection.rename()
"""
async def modify_password(
self,
user: Optional[Union[str, LDAPDN]] = None,
new_password: Optional[str] = None,
old_password: Optional[str] = None,
timeout: Optional[float] = None,
) -> str:
"""
Async modify user password.
Parameters: Same as LDAPConnection.modify_password()
Returns:
New password if server-generated
"""
async def open(self, timeout: Optional[float] = None) -> "AIOLDAPConnection":
"""
Async open connection to LDAP server.
Parameters:
- timeout: Connection timeout in seconds
Returns:
Self for method chaining
"""
class AIOConnectionPool:
def __init__(
self,
client: LDAPClient,
minconn: int = 1,
maxconn: int = 10,
loop=None
) -> None:
"""
Initialize asyncio connection pool.
Parameters:
- client: LDAPClient configuration
- minconn: Minimum connections to maintain
- maxconn: Maximum connections allowed
- loop: asyncio event loop
"""
async def get(self, timeout: Optional[float] = None) -> AIOLDAPConnection:
"""Get connection from pool (awaitable)."""
async def put(self, conn: AIOLDAPConnection) -> None:
"""Return connection to pool (awaitable)."""
async def spawn(self, *args, **kwargs):
"""
Async context manager for getting and returning pooled connections.
Usage:
async with pool.spawn() as conn:
# Use connection
results = await conn.search(...)
# Connection automatically returned to pool
"""
async def close(self) -> None:
"""Close all connections in pool."""
@property
def closed(self) -> bool:
"""Whether the pool is closed."""
@property
def idle_connection(self) -> int:
"""Number of idle connections."""
@property
def shared_connection(self) -> int:
"""Number of connections in use."""
@property
def max_connection(self) -> int:
"""Maximum number of connections allowed."""
@property
def min_connection(self) -> int:
"""Minimum number of connections to maintain."""Integration with gevent greenlets for cooperative concurrency.
from bonsai.gevent import GeventLDAPConnection
class GeventLDAPConnection(LDAPConnection):
def __init__(self, client: LDAPClient) -> None:
"""
Initialize gevent LDAP connection.
Parameters:
- client: LDAPClient configuration object
"""
def search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
sizelimit: int = 0,
attrsonly: bool = False,
sort_order: Optional[List[str]] = None,
page_size: int = 0,
) -> List[LDAPEntry]:
"""
Search LDAP directory (yields to other greenlets during I/O).
Parameters: Same as LDAPConnection.search()
Returns:
List of LDAPEntry objects
"""
def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""Add entry (yields during I/O)."""
def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""Modify entry (yields during I/O)."""
def delete(
self,
dname: Union[str, LDAPDN],
timeout: Optional[float] = None,
recursive: bool = False,
) -> None:
"""Delete entry (yields during I/O)."""
def rename(
self,
dn: Union[str, LDAPDN],
newrdn: str,
new_superior: Optional[Union[str, LDAPDN]] = None,
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> None:
"""Rename entry (yields during I/O)."""
def modify_password(
self,
user: Optional[Union[str, LDAPDN]] = None,
new_password: Optional[str] = None,
old_password: Optional[str] = None,
timeout: Optional[float] = None,
) -> str:
"""Modify password (yields during I/O)."""
def open(self, timeout: Optional[float] = None) -> "GeventLDAPConnection":
"""Open connection (yields during I/O)."""Integration with Tornado's event loop and Future-based async model.
from bonsai.tornado import TornadoLDAPConnection
from tornado.concurrent import Future
class TornadoLDAPConnection(LDAPConnection):
def __init__(self, client: LDAPClient, ioloop=None) -> None:
"""
Initialize Tornado LDAP connection.
Parameters:
- client: LDAPClient configuration object
- ioloop: Tornado IOLoop instance
"""
def search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
sizelimit: int = 0,
attrsonly: bool = False,
sort_order: Optional[List[str]] = None,
page_size: int = 0,
) -> Future[List[LDAPEntry]]:
"""
Search LDAP directory returning Future.
Parameters: Same as LDAPConnection.search()
Returns:
Future[List[LDAPEntry]] that resolves to search results
"""
def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> Future[None]:
"""
Add entry returning Future.
Parameters:
- entry: LDAPEntry object to add
- timeout: Operation timeout in seconds
Returns:
Future[None] that resolves when operation completes
"""
def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> Future[None]:
"""Modify entry returning Future."""
def delete(
self,
dname: Union[str, LDAPDN],
timeout: Optional[float] = None,
recursive: bool = False,
) -> Future[None]:
"""Delete entry returning Future."""
def rename(
self,
dn: Union[str, LDAPDN],
newrdn: str,
new_superior: Optional[Union[str, LDAPDN]] = None,
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> Future[None]:
"""Rename entry returning Future."""
def modify_password(
self,
user: Optional[Union[str, LDAPDN]] = None,
new_password: Optional[str] = None,
old_password: Optional[str] = None,
timeout: Optional[float] = None,
) -> Future[str]:
"""
Modify password returning Future.
Returns:
Future[str] that resolves to new password if server-generated
"""
def open(self, timeout: Optional[float] = None) -> Future["TornadoLDAPConnection"]:
"""
Open connection returning Future.
Returns:
Future[TornadoLDAPConnection] that resolves to self
"""Integration with Trio's structured concurrency model.
from bonsai.trio import TrioLDAPConnection
class TrioLDAPConnection(LDAPConnection):
def __init__(self, client: LDAPClient) -> None:
"""
Initialize Trio LDAP connection.
Parameters:
- client: LDAPClient configuration object
"""
async def search(
self,
base: Optional[Union[str, LDAPDN]] = None,
scope: Optional[Union[LDAPSearchScope, int]] = None,
filter_exp: Optional[str] = None,
attrlist: Optional[List[str]] = None,
timeout: Optional[float] = None,
sizelimit: int = 0,
attrsonly: bool = False,
sort_order: Optional[List[str]] = None,
page_size: int = 0,
) -> List[LDAPEntry]:
"""
Async search LDAP directory using Trio.
Parameters: Same as LDAPConnection.search()
Returns:
List of LDAPEntry objects
"""
async def add(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""Async add entry using Trio."""
async def modify(self, entry: LDAPEntry, timeout: Optional[float] = None) -> None:
"""Async modify entry using Trio."""
async def delete(
self,
dname: Union[str, LDAPDN],
timeout: Optional[float] = None,
recursive: bool = False,
) -> None:
"""Async delete entry using Trio."""
async def rename(
self,
dn: Union[str, LDAPDN],
newrdn: str,
new_superior: Optional[Union[str, LDAPDN]] = None,
delete_old_rdn: bool = True,
timeout: Optional[float] = None,
) -> None:
"""Async rename entry using Trio."""
async def modify_password(
self,
user: Optional[Union[str, LDAPDN]] = None,
new_password: Optional[str] = None,
old_password: Optional[str] = None,
timeout: Optional[float] = None,
) -> str:
"""Async modify password using Trio."""
async def open(self, timeout: Optional[float] = None) -> "TrioLDAPConnection":
"""Async open connection using Trio."""import asyncio
from bonsai import LDAPClient
from bonsai.asyncio import AIOLDAPConnection
async def async_ldap_operations():
# Configure client for async operations
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
# Create async connection
async with client.connect(is_async=True) as conn:
# Async search
results = await conn.search(
"dc=example,dc=com",
2, # SUBTREE
"(objectClass=person)"
)
print(f"Found {len(results)} entries")
# Async operations on entries
for entry in results:
print(f"Processing: {entry.dn}")
# Could perform other async operations here
# Create and add new entry
from bonsai import LDAPEntry
new_entry = LDAPEntry("cn=async-user,dc=example,dc=com")
new_entry['objectClass'] = ['person', 'organizationalPerson']
new_entry['cn'] = 'async-user'
new_entry['sn'] = 'User'
await conn.add(new_entry)
print("Entry added successfully")
# Run async function
asyncio.run(async_ldap_operations())from bonsai import LDAPClient
import gevent
from gevent import socket
def gevent_ldap_operations():
# Configure client for gevent
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
# Connect using gevent connection
with client.connect() as conn:
# Operations automatically yield to other greenlets during I/O
results = conn.search("dc=example,dc=com", 2, "(objectClass=person)")
print(f"Found {len(results)} entries")
def worker_greenlet(worker_id):
print(f"Worker {worker_id} starting")
gevent_ldap_operations()
print(f"Worker {worker_id} finished")
# Spawn multiple greenlets
greenlets = []
for i in range(5):
greenlet = gevent.spawn(worker_greenlet, i)
greenlets.append(greenlet)
# Wait for all to complete
gevent.joinall(greenlets)from tornado import gen, ioloop
from bonsai import LDAPClient
@gen.coroutine
def tornado_ldap_operations():
# Configure client for tornado
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
conn = client.connect(is_async=True) # Returns TornadoLDAPConnection
yield conn.open()
try:
# All operations return Futures
search_future = conn.search("dc=example,dc=com", 2, "(objectClass=person)")
results = yield search_future
print(f"Found {len(results)} entries")
# Chain operations with Futures
from bonsai import LDAPEntry
new_entry = LDAPEntry("cn=tornado-user,dc=example,dc=com")
new_entry['objectClass'] = ['person']
new_entry['cn'] = 'tornado-user'
new_entry['sn'] = 'User'
add_future = conn.add(new_entry)
yield add_future
print("Entry added successfully")
finally:
conn.close()
# Run with Tornado IOLoop
if __name__ == "__main__":
ioloop.IOLoop.current().run_sync(tornado_ldap_operations)import trio
from bonsai import LDAPClient
async def trio_ldap_operations():
# Configure client for trio
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
conn = client.connect(is_async=True) # Returns TrioLDAPConnection
await conn.open()
try:
# Structured concurrency with nurseries
async with trio.open_nursery() as nursery:
# Search in background
nursery.start_soon(perform_search, conn)
nursery.start_soon(perform_add, conn)
# Both operations run concurrently
finally:
conn.close()
async def perform_search(conn):
results = await conn.search("dc=example,dc=com", 2, "(objectClass=person)")
print(f"Search found {len(results)} entries")
async def perform_add(conn):
from bonsai import LDAPEntry
new_entry = LDAPEntry("cn=trio-user,dc=example,dc=com")
new_entry['objectClass'] = ['person']
new_entry['cn'] = 'trio-user'
new_entry['sn'] = 'User'
await conn.add(new_entry)
print("Entry added successfully")
# Run with trio
trio.run(trio_ldap_operations)import asyncio
from bonsai import LDAPClient
from bonsai.asyncio import AIOConnectionPool
async def pooled_operations():
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
# Create connection pool
pool = AIOConnectionPool(client, minconn=2, maxconn=10)
try:
# Get connection from pool
conn = await pool.get()
# Perform operations
results = await conn.search("dc=example,dc=com", 2, "(objectClass=person)")
print(f"Found {len(results)} entries")
# Return connection to pool
await pool.put(conn)
finally:
# Close all connections in pool
await pool.close()
asyncio.run(pooled_operations())Install with Tessl CLI
npx tessl i tessl/pypi-bonsai