CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ldap3

A strictly RFC 4510 conforming LDAP V3 pure Python client library

81

1.08x
Overview
Eval results
Files

config.mddocs/

Configuration and Utilities

Configuration parameters, constants, exception handling, utility functions, and protocol information classes.

Capabilities

Configuration Management

Global configuration parameters that control LDAP3 library behavior.

def get_config_parameter(parameter):
    """
    Get current value of configuration parameter.

    Args:
        parameter (str): Parameter name

    Returns:
        Configuration parameter value

    Raises:
        LDAPConfigurationParameterError: If parameter name is invalid
    """

def set_config_parameter(parameter, value):
    """
    Set configuration parameter value.

    Args:
        parameter (str): Parameter name
        value: New parameter value

    Raises:
        LDAPConfigurationParameterError: If parameter name is invalid
    """

Available Configuration Parameters:

  • CASE_INSENSITIVE_ATTRIBUTE_NAMES (bool): Case insensitive attribute names (default: True)
  • CASE_INSENSITIVE_SCHEMA_NAMES (bool): Case insensitive schema names (default: True)
  • ABSTRACTION_OPERATIONAL_ATTRIBUTE_PREFIX (str): Operational attribute prefix (default: 'OPER_')
  • POOLING_LOOP_TIMEOUT (int): Pool loop timeout in seconds (default: 10)
  • RESPONSE_SLEEPTIME (float): Response sleep time in seconds (default: 0.05)
  • RESPONSE_WAITING_TIMEOUT (int): Response waiting timeout in seconds (default: 20)
  • SOCKET_SIZE (int): Socket buffer size in bytes (default: 4096)
  • CHECK_AVAILABILITY_TIMEOUT (float): Availability check timeout in seconds (default: 2.5)
  • RESTARTABLE_SLEEPTIME (int): Restartable sleep time in seconds (default: 2)
  • RESTARTABLE_TRIES (int): Number of restartable tries (default: 30)
  • REUSABLE_THREADED_POOL_SIZE (int): Reusable thread pool size (default: 10)
  • REUSABLE_THREADED_LIFETIME (int): Reusable thread lifetime in seconds (default: 3600)
  • DEFAULT_THREADED_POOL_NAME (str): Default thread pool name (default: 'reusable_default_pool')

Protocol Information Classes

Classes for handling LDAP server and schema information.

class DsaInfo:
    def __init__(self, attributes, raw_attributes):
        """
        Directory Service Agent information from server's DSE.

        Args:
            attributes (dict): Processed DSE attributes
            raw_attributes (dict): Raw DSE attributes from server
        """

    @staticmethod
    def from_json(json_info):
        """
        Create DsaInfo from JSON data.

        Args:
            json_info (str or dict): JSON string or dictionary

        Returns:
            DsaInfo: DSA information object
        """

    @staticmethod
    def from_file(json_file):
        """
        Create DsaInfo from JSON file.

        Args:
            json_file (str): Path to JSON file

        Returns:
            DsaInfo: DSA information object
        """

    def to_json(self):
        """
        Convert DSA info to JSON string.

        Returns:
            str: JSON representation
        """

    def to_file(self, json_file):
        """
        Save DSA info to JSON file.

        Args:
            json_file (str): Target file path
        """

DsaInfo Properties:

  • alt_servers: Alternative servers list
  • naming_contexts: Naming contexts list
  • supported_controls: Supported LDAP controls
  • supported_extensions: Supported extended operations
  • supported_features: Supported LDAP features
  • supported_ldap_versions: Supported LDAP versions
  • supported_sasl_mechanisms: Supported SASL mechanisms
  • vendor_name: LDAP server vendor name
  • vendor_version: LDAP server version
  • schema_entry: Schema entry DN
  • other: Other DSE attributes
class SchemaInfo:
    def __init__(self, schema_dn, attributes, raw_attributes):
        """
        LDAP schema information from server.

        Args:
            schema_dn (str): Schema entry DN
            attributes (dict): Processed schema attributes
            raw_attributes (dict): Raw schema attributes from server
        """

    @staticmethod
    def from_json(json_schema):
        """
        Create SchemaInfo from JSON data.

        Args:
            json_schema (str or dict): JSON string or dictionary

        Returns:
            SchemaInfo: Schema information object
        """

    @staticmethod
    def from_file(json_file):
        """
        Create SchemaInfo from JSON file.

        Args:
            json_file (str): Path to JSON file

        Returns:
            SchemaInfo: Schema information object
        """

    def to_json(self):
        """
        Convert schema info to JSON string.

        Returns:
            str: JSON representation
        """

    def to_file(self, json_file):
        """
        Save schema info to JSON file.

        Args:
            json_file (str): Target file path
        """

SchemaInfo Properties:

  • schema_dn: Schema entry distinguished name
  • attribute_types: LDAP attribute type definitions
  • object_classes: LDAP object class definitions
  • ldap_syntaxes: LDAP syntax definitions
  • matching_rules: LDAP matching rule definitions
  • matching_rule_uses: Matching rule usage definitions
  • dit_content_rules: DIT content rule definitions
  • dit_structure_rules: DIT structure rule definitions
  • name_forms: Name form definitions
  • other: Other schema attributes

Exception Hierarchy

Comprehensive exception classes for different types of LDAP errors.

# Base exceptions
class LDAPException(Exception):
    """Base class for all LDAP exceptions."""

class LDAPOperationResult(LDAPException):
    """Base class for LDAP operation result exceptions."""

# Configuration exceptions
class LDAPConfigurationError(LDAPException):
    """LDAP configuration error."""

class LDAPDefinitionError(LDAPException):
    """LDAP definition error."""

class LDAPConfigurationParameterError(LDAPException):
    """Invalid configuration parameter error."""

class LDAPUnknownStrategyError(LDAPException):
    """Unknown client strategy error."""

class LDAPUnknownAuthenticationMethodError(LDAPException):
    """Unknown authentication method error."""

# Connection exceptions
class LDAPBindError(LDAPException):
    """LDAP bind operation error."""

class LDAPInvalidServerError(LDAPException):
    """Invalid LDAP server specification error."""

class LDAPConnectionIsReadOnlyError(LDAPException):
    """Connection is read-only error."""

class LDAPCommunicationError(LDAPException):
    """Base LDAP communication error."""

class LDAPSocketOpenError(LDAPCommunicationError):
    """Socket open error."""

class LDAPSocketCloseError(LDAPCommunicationError):
    """Socket close error."""

class LDAPSocketReceiveError(LDAPCommunicationError):
    """Socket receive error."""

class LDAPSocketSendError(LDAPCommunicationError):
    """Socket send error."""

# SSL/TLS exceptions
class LDAPSSLConfigurationError(LDAPException):
    """SSL/TLS configuration error."""

class LDAPSSLNotSupportedError(LDAPException):
    """SSL not supported error."""

class LDAPStartTLSError(LDAPException):
    """Start TLS operation error."""

class LDAPCertificateError(LDAPException):
    """Certificate validation error."""

# Operation exceptions
class LDAPInvalidFilterError(LDAPException):
    """Invalid LDAP search filter error."""

class LDAPInvalidScopeError(LDAPException):
    """Invalid search scope error."""

class LDAPControlsError(LDAPException):
    """LDAP controls error."""

class LDAPExtensionError(LDAPException):
    """LDAP extended operation error."""

class LDAPResponseTimeoutError(LDAPException):
    """Response timeout error."""

class LDAPTransactionError(LDAPException):
    """LDAP transaction error."""

# Abstract layer exceptions
class LDAPAttributeError(LDAPException, KeyError, AttributeError):
    """LDAP attribute error."""

class LDAPEntryError(LDAPException):
    """LDAP entry error."""

class LDAPReaderError(LDAPException):
    """LDAP reader error."""

class LDAPObjectError(LDAPException):
    """LDAP object definition error."""

class LDAPTypeError(LDAPException):
    """LDAP type error."""

# Result code exceptions (selected examples)
class LDAPInvalidCredentialsResult(LDAPOperationResult):
    """Invalid credentials result (49)."""

class LDAPInsufficientAccessRightsResult(LDAPOperationResult):
    """Insufficient access rights result (50)."""

class LDAPTimeLimitExceededResult(LDAPOperationResult):
    """Time limit exceeded result (3)."""

class LDAPSizeLimitExceededResult(LDAPOperationResult):
    """Size limit exceeded result (4)."""

Constants and Enumerations

Important constants used throughout the library.

# Result codes (selected)
RESULT_SUCCESS = 0
RESULT_OPERATIONS_ERROR = 1
RESULT_PROTOCOL_ERROR = 2
RESULT_TIME_LIMIT_EXCEEDED = 3
RESULT_SIZE_LIMIT_EXCEEDED = 4
RESULT_COMPARE_FALSE = 5
RESULT_COMPARE_TRUE = 6
RESULT_INVALID_CREDENTIALS = 49
RESULT_INSUFFICIENT_ACCESS_RIGHTS = 50

# Authentication methods
ANONYMOUS = 'ANONYMOUS'
SIMPLE = 'SIMPLE'
SASL = 'SASL'
NTLM = 'NTLM'

# Client strategies
SYNC = 'SYNC'
ASYNC = 'ASYNC'
LDIF = 'LDIF'
RESTARTABLE = 'RESTARTABLE'
REUSABLE = 'REUSABLE'
MOCK_SYNC = 'MOCK_SYNC'
MOCK_ASYNC = 'MOCK_ASYNC'

# Search scopes
BASE = 'BASE'
LEVEL = 'LEVEL'
SUBTREE = 'SUBTREE'

# Search attributes
ALL_ATTRIBUTES = '*'
NO_ATTRIBUTES = '1.1'
ALL_OPERATIONAL_ATTRIBUTES = '+'

# Modify operations
MODIFY_ADD = 'MODIFY_ADD'
MODIFY_DELETE = 'MODIFY_DELETE'
MODIFY_REPLACE = 'MODIFY_REPLACE'
MODIFY_INCREMENT = 'MODIFY_INCREMENT'

# Hash algorithms
HASHED_NONE = 'PLAIN'
HASHED_SHA = 'SHA'
HASHED_SHA256 = 'SHA256'
HASHED_MD5 = 'MD5'
HASHED_SALTED_SHA = 'SALTED_SHA'
HASHED_SALTED_MD5 = 'SALTED_MD5'

Usage Examples

Configuration Management

import ldap3

# Get current configuration values
case_insensitive = ldap3.get_config_parameter('CASE_INSENSITIVE_ATTRIBUTE_NAMES')
response_timeout = ldap3.get_config_parameter('RESPONSE_WAITING_TIMEOUT')

print(f"Case insensitive attributes: {case_insensitive}")
print(f"Response timeout: {response_timeout} seconds")

# Modify configuration
ldap3.set_config_parameter('RESPONSE_WAITING_TIMEOUT', 30)
ldap3.set_config_parameter('SOCKET_SIZE', 8192)

# Configuration affects all subsequent connections
server = ldap3.Server('ldap://ldap.example.com')
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password')
# This connection will use the modified timeout and socket size

Working with Server Information

# Get server information during connection
server = ldap3.Server('ldap://ldap.example.com', get_info=ldap3.ALL)
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)

# Access DSA information
dsa_info = server.info
print(f"Server vendor: {dsa_info.vendor_name}")
print(f"Server version: {dsa_info.vendor_version}")
print(f"Supported LDAP versions: {dsa_info.supported_ldap_versions}")
print(f"Supported SASL mechanisms: {dsa_info.supported_sasl_mechanisms}")
print(f"Naming contexts: {dsa_info.naming_contexts}")

# Save server info to file
dsa_info.to_file('/tmp/server_info.json')

# Load server info from file later
loaded_dsa = ldap3.DsaInfo.from_file('/tmp/server_info.json')

Working with Schema Information

# Get schema information
server = ldap3.Server('ldap://ldap.example.com', get_info=ldap3.ALL)
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)

schema = server.schema
if schema:
    print(f"Schema DN: {schema.schema_dn}")
    
    # Explore object classes
    print("Available object classes:")
    for oc_name, oc_def in schema.object_classes.items():
        print(f"  {oc_name}: {oc_def.description}")
        
    # Explore attribute types
    print("Available attribute types:")
    for attr_name, attr_def in schema.attribute_types.items():
        print(f"  {attr_name}: {attr_def.description}")
        
    # Save schema to file
    schema.to_file('/tmp/schema_info.json')
    
    # Check specific object class
    if 'inetOrgPerson' in schema.object_classes:
        inet_org_person = schema.object_classes['inetOrgPerson']
        print(f"inetOrgPerson must attributes: {inet_org_person.must_contain}")
        print(f"inetOrgPerson may attributes: {inet_org_person.may_contain}")

Exception Handling

import ldap3

try:
    server = ldap3.Server('ldap://invalid.server.com')
    conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password')
    conn.bind()
    
    # Perform operations
    conn.search('dc=example,dc=com', '(objectClass=person)')
    
except ldap3.LDAPInvalidServerError as e:
    print(f"Invalid server: {e}")
    
except ldap3.LDAPBindError as e:
    print(f"Bind failed: {e}")
    
except ldap3.LDAPInvalidCredentialsResult as e:
    print(f"Invalid credentials: {e}")
    
except ldap3.LDAPSocketOpenError as e:
    print(f"Cannot connect to server: {e}")
    
except ldap3.LDAPCommunicationError as e:
    print(f"Communication error: {e}")
    
except ldap3.LDAPInvalidFilterError as e:
    print(f"Invalid search filter: {e}")
    
except ldap3.LDAPException as e:
    print(f"General LDAP error: {e}")

Advanced Configuration for Performance

# Optimize for high-performance scenarios
ldap3.set_config_parameter('SOCKET_SIZE', 16384)          # Larger socket buffer
ldap3.set_config_parameter('RESPONSE_WAITING_TIMEOUT', 60) # Longer timeout
ldap3.set_config_parameter('RESPONSE_SLEEPTIME', 0.01)     # Faster polling

# Configure thread pool for reusable connections
ldap3.set_config_parameter('REUSABLE_THREADED_POOL_SIZE', 20)
ldap3.set_config_parameter('REUSABLE_THREADED_LIFETIME', 7200)  # 2 hours

# Create high-performance connection pool
servers = [
    ldap3.Server('ldap1.example.com'),
    ldap3.Server('ldap2.example.com')
]
server_pool = ldap3.ServerPool(servers, ldap3.ROUND_ROBIN, active=True)

conn = ldap3.Connection(
    server_pool,
    'cn=user,dc=example,dc=com',
    'password',
    client_strategy=ldap3.REUSABLE,
    pool_name='high_perf_pool',
    pool_size=10,
    pool_lifetime=3600,
    auto_bind=True,
    fast_decoder=True
)

Custom Exception Handling

def safe_ldap_operation(connection, operation_func, *args, **kwargs):
    """
    Safely execute LDAP operation with comprehensive error handling.
    """
    try:
        return operation_func(*args, **kwargs)
        
    except ldap3.LDAPInvalidCredentialsResult:
        print("Authentication failed - check username/password")
        return None
        
    except ldap3.LDAPInsufficientAccessRightsResult:
        print("Insufficient permissions for operation")
        return None
        
    except ldap3.LDAPTimeLimitExceededResult:
        print("Operation timed out - try with smaller scope")
        return None
        
    except ldap3.LDAPSizeLimitExceededResult:
        print("Too many results - use paged search")
        return None
        
    except ldap3.LDAPSocketOpenError:
        print("Cannot connect to LDAP server")
        return None
        
    except ldap3.LDAPCommunicationError as e:
        print(f"Network communication error: {e}")
        return None
        
    except ldap3.LDAPOperationResult as e:
        print(f"LDAP operation failed with result {e.result}: {e.description}")
        return None
        
    except ldap3.LDAPException as e:
        print(f"Unexpected LDAP error: {e}")
        return None

# Usage
result = safe_ldap_operation(
    conn, 
    conn.search, 
    'dc=example,dc=com', 
    '(objectClass=person)',
    attributes=['cn', 'mail']
)

Offline Schema Usage

# Use predefined offline schemas for testing
offline_schemas = {
    ldap3.OFFLINE_AD_2012_R2: "Active Directory 2012 R2",
    ldap3.OFFLINE_EDIR_8_8_8: "Novell eDirectory 8.8.8", 
    ldap3.OFFLINE_SLAPD_2_4: "OpenLDAP slapd 2.4",
    ldap3.OFFLINE_DS389_1_3_3: "389 Directory Server 1.3.3"
}

# Create server with offline schema
server = ldap3.Server('ldap://test.example.com', get_info=ldap3.OFFLINE_AD_2012_R2)

# Schema information is available without connecting
print(f"Using offline schema: {offline_schemas[ldap3.OFFLINE_AD_2012_R2]}")
print(f"Object classes available: {len(server.schema.object_classes)}")
print(f"Attribute types available: {len(server.schema.attribute_types)}")

Configuration File Management

import json

def save_ldap_config():
    """Save current LDAP3 configuration to file."""
    config = {}
    
    config_params = [
        'CASE_INSENSITIVE_ATTRIBUTE_NAMES',
        'CASE_INSENSITIVE_SCHEMA_NAMES', 
        'RESPONSE_WAITING_TIMEOUT',
        'SOCKET_SIZE',
        'RESTARTABLE_TRIES',
        'REUSABLE_THREADED_POOL_SIZE'
    ]
    
    for param in config_params:
        try:
            config[param] = ldap3.get_config_parameter(param)
        except ldap3.LDAPConfigurationParameterError:
            pass
            
    with open('/tmp/ldap3_config.json', 'w') as f:
        json.dump(config, f, indent=2)

def load_ldap_config():
    """Load LDAP3 configuration from file."""
    try:
        with open('/tmp/ldap3_config.json', 'r') as f:
            config = json.load(f)
            
        for param, value in config.items():
            try:
                ldap3.set_config_parameter(param, value)
                print(f"Set {param} = {value}")
            except ldap3.LDAPConfigurationParameterError as e:
                print(f"Cannot set {param}: {e}")
                
    except FileNotFoundError:
        print("Configuration file not found")

# Usage
save_ldap_config()
# ... modify configuration in another session
load_ldap_config()

Install with Tessl CLI

npx tessl i tessl/pypi-ldap3

docs

abstract.md

config.md

connection.md

extended.md

index.md

operations.md

tile.json