CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bonsai

Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

connection-pooling.mddocs/

Connection Pooling

Connection pool management for handling multiple concurrent LDAP connections with automatic lifecycle management, configurable pool sizes, and thread-safe operations.

Capabilities

Connection Pool Management

Generic connection pool for managing multiple LDAP connections with configurable minimum and maximum connections.

from bonsai.pool import ConnectionPool, PoolError, ClosedPool, EmptyPool

class ConnectionPool:
    def __init__(
        self,
        client: LDAPClient,
        minconn: int = 1,
        maxconn: int = 10,
        **kwargs
    ) -> None:
        """
        Initialize connection pool.
        
        Parameters:
        - client: LDAPClient for creating connections
        - minconn: Minimum connections to maintain (default: 1)
        - maxconn: Maximum connections allowed (default: 10)
        - **kwargs: Additional arguments passed to LDAPClient.connect()
        """

    def open(self) -> None:
        """
        Open connection pool and create minimum number of connections.
        
        Creates minconn connections and marks pool as open for use.
        """

    def close(self) -> None:
        """
        Close connection pool and all connections.
        
        Closes all idle and used connections, marks pool as closed.
        """

    def get(self) -> LDAPConnection:
        """
        Get connection from pool.
        
        Returns:
        Available LDAP connection from pool
        
        Raises:
        - ClosedPool: If pool is closed
        - EmptyPool: If no connections available and maxconn reached
        """

    def put(self, conn: LDAPConnection) -> None:
        """
        Return connection to pool.
        
        Parameters:
        - conn: LDAP connection to return to pool
        
        Note: Connection must have been obtained from this pool
        """

    def spawn(self, minconn: Optional[int] = None) -> LDAPConnection:
        """
        Create new connection and add to pool.
        
        Parameters:  
        - minconn: Minimum connections to maintain after spawn
        
        Returns:
        New LDAP connection from pool
        """

    @property
    def closed(self) -> bool:
        """Whether the pool is closed."""

    @property
    def idle_connection(self) -> int:
        """Number of idle connections in pool."""

    @property
    def shared_connection(self) -> int:
        """Number of connections currently in use."""

    @property
    def max_connection(self) -> int:
        """Maximum number of connections allowed."""

    @property
    def min_connection(self) -> int:
        """Minimum number of connections to maintain."""

    def __enter__(self) -> "ConnectionPool":
        """Context manager entry - opens pool."""

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit - closes pool."""

Pool Context Manager

Context manager for automatically managing connection lifecycle within a pool.

@contextmanager
def PooledConnection(pool: ConnectionPool) -> Generator[LDAPConnection, None, None]:
    """
    Context manager for getting and returning pooled connections.
    
    Parameters:
    - pool: ConnectionPool to get connection from
    
    Yields:
    LDAP connection from pool
    
    Usage:
    with PooledConnection(pool) as conn:
        # Use connection
        results = conn.search(...)
    # Connection automatically returned to pool
    """

Pool Exceptions

Specialized exceptions for connection pool error handling.

class PoolError(Exception):
    """Base exception for connection pool related errors."""

class ClosedPool(PoolError):
    """Raised when attempting operations on a closed pool."""

class EmptyPool(PoolError):
    """Raised when pool has no available connections and cannot create more."""

AsyncIO Connection Pool

Specialized connection pool for asyncio environments with async connection management.

from bonsai.asyncio import AIOConnectionPool

class AIOConnectionPool:
    def __init__(
        self,
        client: LDAPClient,
        minconn: int = 1,
        maxconn: int = 10,
        loop=None,
        **kwargs
    ) -> None:
        """
        Initialize asyncio connection pool.
        
        Parameters:
        - client: LDAPClient for creating connections
        - minconn: Minimum connections to maintain
        - maxconn: Maximum connections allowed  
        - loop: asyncio event loop
        - **kwargs: Additional connection arguments
        """

    async def open(self) -> None:
        """Async open connection pool."""

    async def close(self) -> None:
        """Async close connection pool and all connections."""

    async def get(self, timeout: Optional[float] = None) -> AIOLDAPConnection:
        """
        Async get connection from pool.
        
        Parameters:
        - timeout: Maximum time to wait for connection
        
        Returns:
        AsyncIO LDAP connection
        """

    async def put(self, conn: AIOLDAPConnection) -> None:
        """
        Async return connection to pool.
        
        Parameters:
        - conn: AsyncIO LDAP connection to return
        """

    async def __aenter__(self) -> "AIOConnectionPool":
        """Async context manager entry."""

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""

    @property
    def closed(self) -> bool:
        """Whether the pool is closed."""

    @property
    def idle_connection(self) -> int:
        """Number of idle connections."""

    @property
    def shared_connection(self) -> int:
        """Number of connections in use."""

Threaded Connection Pool

Thread-safe connection pool for multi-threaded applications with blocking behavior control.

from bonsai.pool import ThreadedConnectionPool

class ThreadedConnectionPool(ConnectionPool[LDAPConnection]):
    def __init__(
        self,
        client: LDAPClient,
        minconn: int = 1,
        maxconn: int = 10,
        block: bool = True,
        **kwargs
    ) -> None:
        """
        Initialize thread-safe connection pool.
        
        Parameters:
        - client: LDAPClient for creating connections
        - minconn: Minimum connections to maintain
        - maxconn: Maximum connections allowed
        - block: Whether to block when pool is empty (True) or raise EmptyPool (False)
        - **kwargs: Additional connection arguments
        """

    def get(self, timeout: Optional[float] = None) -> LDAPConnection:
        """
        Thread-safe get connection from pool.
        
        Parameters:
        - timeout: Maximum time to wait for connection (only if block=True)
        
        Returns:
        LDAP connection from pool
        
        Raises:
        - EmptyPool: If no connections available and block=False
        - TimeoutError: If timeout exceeded while waiting
        """

    def put(self, conn: LDAPConnection) -> None:
        """
        Thread-safe return connection to pool.
        
        Parameters:
        - conn: LDAP connection to return
        """

    @property
    def block(self) -> bool:
        """Whether get() blocks when pool is empty."""

Usage Examples

Basic Connection Pool Usage

from bonsai import LDAPClient
from bonsai.pool import ConnectionPool

# Configure client
client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Create connection pool
pool = ConnectionPool(client, minconn=2, maxconn=10)

# Use pool with context manager
with pool:
    # Get connection from pool
    conn = pool.get()
    
    try:
        # Perform LDAP operations
        results = conn.search("dc=example,dc=com", 2, "(objectClass=person)")
        print(f"Found {len(results)} entries")
        
        # Add new entry
        from bonsai import LDAPEntry
        new_entry = LDAPEntry("cn=pooled-user,dc=example,dc=com")
        new_entry['objectClass'] = ['person']
        new_entry['cn'] = 'pooled-user'
        new_entry['sn'] = 'User'
        conn.add(new_entry)
        
    finally:
        # Return connection to pool
        pool.put(conn)

# Pool automatically closes when exiting context

Thread-Safe Pool Operations

import threading
from bonsai import LDAPClient
from bonsai.pool import ConnectionPool

client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Create shared pool for multiple threads
pool = ConnectionPool(client, minconn=5, maxconn=20)
pool.open()

def worker_thread(thread_id, pool):
    """Worker function for thread-safe LDAP operations."""
    try:
        # Get connection from pool (thread-safe)
        conn = pool.get()
        
        try:
            # Perform operations unique to this thread
            filter_exp = f"(description=thread-{thread_id})"
            results = conn.search("dc=example,dc=com", 2, filter_exp)
            print(f"Thread {thread_id}: Found {len(results)} entries")
            
            # Each thread can perform independent operations
            if not results:
                # Create test entry for this thread
                entry = LDAPEntry(f"cn=thread-{thread_id},dc=example,dc=com")
                entry['objectClass'] = ['person', 'organizationalPerson']
                entry['cn'] = f'thread-{thread_id}'
                entry['sn'] = f'User{thread_id}'
                entry['description'] = f'thread-{thread_id}'
                conn.add(entry)
                print(f"Thread {thread_id}: Created test entry")
                
        finally:
            # Always return connection to pool
            pool.put(conn)
            
    except Exception as e:
        print(f"Thread {thread_id} error: {e}")

# Start multiple worker threads
threads = []
for i in range(10):
    thread = threading.Thread(target=worker_thread, args=(i, pool))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# Clean up
pool.close()
print("All threads completed, pool closed")

Pooled Connection Context Manager

from bonsai.pool import ConnectionPool, PooledConnection

client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Create and open pool
pool = ConnectionPool(client, minconn=3, maxconn=15)
pool.open()

try:
    # Use pooled connection context manager
    with PooledConnection(pool) as conn:
        # Connection automatically obtained from pool
        results = conn.search("dc=example,dc=com", 2, "(objectClass=organizationalUnit)")
        
        for entry in results:
            print(f"OU: {entry.dn}")
            
        # Create new organizational unit
        new_ou = LDAPEntry("ou=departments,dc=example,dc=com")
        new_ou['objectClass'] = ['organizationalUnit']
        new_ou['ou'] = 'departments'
        new_ou['description'] = 'Department organizational units'
        conn.add(new_ou)
        
    # Connection automatically returned to pool when exiting context
    
    # Use another pooled connection
    with PooledConnection(pool) as conn:
        # This might reuse the previous connection or get a different one
        results = conn.search("ou=departments,dc=example,dc=com", 1)
        print(f"Found {len(results)} departments")
        
finally:
    pool.close()

AsyncIO Pool Usage

import asyncio
from bonsai import LDAPClient
from bonsai.asyncio import AIOConnectionPool

async def async_pool_operations():
    client = LDAPClient("ldap://localhost")
    client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")
    
    # Create async connection pool
    async with AIOConnectionPool(client, minconn=2, maxconn=8) as pool:
        # Get connection from pool
        conn = await pool.get(timeout=5.0)
        
        try:
            # Async LDAP operations
            results = await conn.search("dc=example,dc=com", 2, "(objectClass=person)")
            print(f"Found {len(results)} person entries")
            
            # Concurrent operations using multiple connections
            tasks = []
            for i in range(3):
                task = asyncio.create_task(search_with_pool(pool, f"(cn=user{i})"))
                tasks.append(task)
            
            # Wait for all searches to complete
            search_results = await asyncio.gather(*tasks)
            for i, results in enumerate(search_results):
                print(f"Search {i}: {len(results)} results")
                
        finally:
            # Return connection to pool
            await pool.put(conn)

async def search_with_pool(pool, filter_exp):
    """Helper function for concurrent searches."""
    conn = await pool.get()
    try:
        results = await conn.search("dc=example,dc=com", 2, filter_exp)
        return results
    finally:
        await pool.put(conn)

# Run async operations
asyncio.run(async_pool_operations())

Pool Monitoring and Management

from bonsai import LDAPClient
from bonsai.pool import ConnectionPool, EmptyPool, ClosedPool
import time

client = LDAPClient("ldap://localhost")
client.set_credentials("SIMPLE", user="cn=admin,dc=example,dc=com", password="secret")

# Create pool with specific sizing
pool = ConnectionPool(client, minconn=2, maxconn=5)
pool.open()

print(f"Pool opened with {pool.min_connection} min, {pool.max_connection} max connections")
print(f"Initial state: {pool.idle_connection} idle, {pool.shared_connection} in use")

# Get multiple connections to test limits
connections = []
try:
    for i in range(7):  # Try to get more than maxconn
        try:
            conn = pool.get()
            connections.append(conn)
            print(f"Got connection {i+1}: {pool.idle_connection} idle, {pool.shared_connection} in use")
            
        except EmptyPool:
            print(f"Pool empty after {i} connections - hit maximum limit")
            break
            
    # Return some connections
    for i in range(2):
        if connections:
            pool.put(connections.pop())
            print(f"Returned connection: {pool.idle_connection} idle, {pool.shared_connection} in use")
    
    # Try operations on closed pool
    pool.close()
    print("Pool closed")
    
    try:
        pool.get()
    except ClosedPool:
        print("Cannot get connection from closed pool")
        
finally:
    # Clean up any remaining connections
    for conn in connections:
        try:
            conn.close()
        except:
            pass

Install with Tessl CLI

npx tessl i tessl/pypi-bonsai

docs

active-directory.md

async-frameworks.md

connection-pooling.md

core-ldap.md

index.md

ldif-support.md

utilities-errors.md

tile.json