A strictly RFC 4510 conforming LDAP V3 pure Python client library
81
Configuration parameters, constants, exception handling, utility functions, and protocol information classes.
Global configuration parameters that control LDAP3 library behavior.
def get_config_parameter(parameter):
"""
Get current value of configuration parameter.
Args:
parameter (str): Parameter name
Returns:
Configuration parameter value
Raises:
LDAPConfigurationParameterError: If parameter name is invalid
"""
def set_config_parameter(parameter, value):
"""
Set configuration parameter value.
Args:
parameter (str): Parameter name
value: New parameter value
Raises:
LDAPConfigurationParameterError: If parameter name is invalid
"""Available Configuration Parameters:
CASE_INSENSITIVE_ATTRIBUTE_NAMES (bool): Case insensitive attribute names (default: True)CASE_INSENSITIVE_SCHEMA_NAMES (bool): Case insensitive schema names (default: True)ABSTRACTION_OPERATIONAL_ATTRIBUTE_PREFIX (str): Operational attribute prefix (default: 'OPER_')POOLING_LOOP_TIMEOUT (int): Pool loop timeout in seconds (default: 10)RESPONSE_SLEEPTIME (float): Response sleep time in seconds (default: 0.05)RESPONSE_WAITING_TIMEOUT (int): Response waiting timeout in seconds (default: 20)SOCKET_SIZE (int): Socket buffer size in bytes (default: 4096)CHECK_AVAILABILITY_TIMEOUT (float): Availability check timeout in seconds (default: 2.5)RESTARTABLE_SLEEPTIME (int): Restartable sleep time in seconds (default: 2)RESTARTABLE_TRIES (int): Number of restartable tries (default: 30)REUSABLE_THREADED_POOL_SIZE (int): Reusable thread pool size (default: 10)REUSABLE_THREADED_LIFETIME (int): Reusable thread lifetime in seconds (default: 3600)DEFAULT_THREADED_POOL_NAME (str): Default thread pool name (default: 'reusable_default_pool')Classes for handling LDAP server and schema information.
class DsaInfo:
def __init__(self, attributes, raw_attributes):
"""
Directory Service Agent information from server's DSE.
Args:
attributes (dict): Processed DSE attributes
raw_attributes (dict): Raw DSE attributes from server
"""
@staticmethod
def from_json(json_info):
"""
Create DsaInfo from JSON data.
Args:
json_info (str or dict): JSON string or dictionary
Returns:
DsaInfo: DSA information object
"""
@staticmethod
def from_file(json_file):
"""
Create DsaInfo from JSON file.
Args:
json_file (str): Path to JSON file
Returns:
DsaInfo: DSA information object
"""
def to_json(self):
"""
Convert DSA info to JSON string.
Returns:
str: JSON representation
"""
def to_file(self, json_file):
"""
Save DSA info to JSON file.
Args:
json_file (str): Target file path
"""DsaInfo Properties:
alt_servers: Alternative servers listnaming_contexts: Naming contexts listsupported_controls: Supported LDAP controlssupported_extensions: Supported extended operationssupported_features: Supported LDAP featuressupported_ldap_versions: Supported LDAP versionssupported_sasl_mechanisms: Supported SASL mechanismsvendor_name: LDAP server vendor namevendor_version: LDAP server versionschema_entry: Schema entry DNother: Other DSE attributesclass SchemaInfo:
def __init__(self, schema_dn, attributes, raw_attributes):
"""
LDAP schema information from server.
Args:
schema_dn (str): Schema entry DN
attributes (dict): Processed schema attributes
raw_attributes (dict): Raw schema attributes from server
"""
@staticmethod
def from_json(json_schema):
"""
Create SchemaInfo from JSON data.
Args:
json_schema (str or dict): JSON string or dictionary
Returns:
SchemaInfo: Schema information object
"""
@staticmethod
def from_file(json_file):
"""
Create SchemaInfo from JSON file.
Args:
json_file (str): Path to JSON file
Returns:
SchemaInfo: Schema information object
"""
def to_json(self):
"""
Convert schema info to JSON string.
Returns:
str: JSON representation
"""
def to_file(self, json_file):
"""
Save schema info to JSON file.
Args:
json_file (str): Target file path
"""SchemaInfo Properties:
schema_dn: Schema entry distinguished nameattribute_types: LDAP attribute type definitionsobject_classes: LDAP object class definitionsldap_syntaxes: LDAP syntax definitionsmatching_rules: LDAP matching rule definitionsmatching_rule_uses: Matching rule usage definitionsdit_content_rules: DIT content rule definitionsdit_structure_rules: DIT structure rule definitionsname_forms: Name form definitionsother: Other schema attributesComprehensive exception classes for different types of LDAP errors.
# Base exceptions
class LDAPException(Exception):
"""Base class for all LDAP exceptions."""
class LDAPOperationResult(LDAPException):
"""Base class for LDAP operation result exceptions."""
# Configuration exceptions
class LDAPConfigurationError(LDAPException):
"""LDAP configuration error."""
class LDAPDefinitionError(LDAPException):
"""LDAP definition error."""
class LDAPConfigurationParameterError(LDAPException):
"""Invalid configuration parameter error."""
class LDAPUnknownStrategyError(LDAPException):
"""Unknown client strategy error."""
class LDAPUnknownAuthenticationMethodError(LDAPException):
"""Unknown authentication method error."""
# Connection exceptions
class LDAPBindError(LDAPException):
"""LDAP bind operation error."""
class LDAPInvalidServerError(LDAPException):
"""Invalid LDAP server specification error."""
class LDAPConnectionIsReadOnlyError(LDAPException):
"""Connection is read-only error."""
class LDAPCommunicationError(LDAPException):
"""Base LDAP communication error."""
class LDAPSocketOpenError(LDAPCommunicationError):
"""Socket open error."""
class LDAPSocketCloseError(LDAPCommunicationError):
"""Socket close error."""
class LDAPSocketReceiveError(LDAPCommunicationError):
"""Socket receive error."""
class LDAPSocketSendError(LDAPCommunicationError):
"""Socket send error."""
# SSL/TLS exceptions
class LDAPSSLConfigurationError(LDAPException):
"""SSL/TLS configuration error."""
class LDAPSSLNotSupportedError(LDAPException):
"""SSL not supported error."""
class LDAPStartTLSError(LDAPException):
"""Start TLS operation error."""
class LDAPCertificateError(LDAPException):
"""Certificate validation error."""
# Operation exceptions
class LDAPInvalidFilterError(LDAPException):
"""Invalid LDAP search filter error."""
class LDAPInvalidScopeError(LDAPException):
"""Invalid search scope error."""
class LDAPControlsError(LDAPException):
"""LDAP controls error."""
class LDAPExtensionError(LDAPException):
"""LDAP extended operation error."""
class LDAPResponseTimeoutError(LDAPException):
"""Response timeout error."""
class LDAPTransactionError(LDAPException):
"""LDAP transaction error."""
# Abstract layer exceptions
class LDAPAttributeError(LDAPException, KeyError, AttributeError):
"""LDAP attribute error."""
class LDAPEntryError(LDAPException):
"""LDAP entry error."""
class LDAPReaderError(LDAPException):
"""LDAP reader error."""
class LDAPObjectError(LDAPException):
"""LDAP object definition error."""
class LDAPTypeError(LDAPException):
"""LDAP type error."""
# Result code exceptions (selected examples)
class LDAPInvalidCredentialsResult(LDAPOperationResult):
"""Invalid credentials result (49)."""
class LDAPInsufficientAccessRightsResult(LDAPOperationResult):
"""Insufficient access rights result (50)."""
class LDAPTimeLimitExceededResult(LDAPOperationResult):
"""Time limit exceeded result (3)."""
class LDAPSizeLimitExceededResult(LDAPOperationResult):
"""Size limit exceeded result (4)."""Important constants used throughout the library.
# Result codes (selected)
RESULT_SUCCESS = 0
RESULT_OPERATIONS_ERROR = 1
RESULT_PROTOCOL_ERROR = 2
RESULT_TIME_LIMIT_EXCEEDED = 3
RESULT_SIZE_LIMIT_EXCEEDED = 4
RESULT_COMPARE_FALSE = 5
RESULT_COMPARE_TRUE = 6
RESULT_INVALID_CREDENTIALS = 49
RESULT_INSUFFICIENT_ACCESS_RIGHTS = 50
# Authentication methods
ANONYMOUS = 'ANONYMOUS'
SIMPLE = 'SIMPLE'
SASL = 'SASL'
NTLM = 'NTLM'
# Client strategies
SYNC = 'SYNC'
ASYNC = 'ASYNC'
LDIF = 'LDIF'
RESTARTABLE = 'RESTARTABLE'
REUSABLE = 'REUSABLE'
MOCK_SYNC = 'MOCK_SYNC'
MOCK_ASYNC = 'MOCK_ASYNC'
# Search scopes
BASE = 'BASE'
LEVEL = 'LEVEL'
SUBTREE = 'SUBTREE'
# Search attributes
ALL_ATTRIBUTES = '*'
NO_ATTRIBUTES = '1.1'
ALL_OPERATIONAL_ATTRIBUTES = '+'
# Modify operations
MODIFY_ADD = 'MODIFY_ADD'
MODIFY_DELETE = 'MODIFY_DELETE'
MODIFY_REPLACE = 'MODIFY_REPLACE'
MODIFY_INCREMENT = 'MODIFY_INCREMENT'
# Hash algorithms
HASHED_NONE = 'PLAIN'
HASHED_SHA = 'SHA'
HASHED_SHA256 = 'SHA256'
HASHED_MD5 = 'MD5'
HASHED_SALTED_SHA = 'SALTED_SHA'
HASHED_SALTED_MD5 = 'SALTED_MD5'import ldap3
# Get current configuration values
case_insensitive = ldap3.get_config_parameter('CASE_INSENSITIVE_ATTRIBUTE_NAMES')
response_timeout = ldap3.get_config_parameter('RESPONSE_WAITING_TIMEOUT')
print(f"Case insensitive attributes: {case_insensitive}")
print(f"Response timeout: {response_timeout} seconds")
# Modify configuration
ldap3.set_config_parameter('RESPONSE_WAITING_TIMEOUT', 30)
ldap3.set_config_parameter('SOCKET_SIZE', 8192)
# Configuration affects all subsequent connections
server = ldap3.Server('ldap://ldap.example.com')
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password')
# This connection will use the modified timeout and socket size# Get server information during connection
server = ldap3.Server('ldap://ldap.example.com', get_info=ldap3.ALL)
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)
# Access DSA information
dsa_info = server.info
print(f"Server vendor: {dsa_info.vendor_name}")
print(f"Server version: {dsa_info.vendor_version}")
print(f"Supported LDAP versions: {dsa_info.supported_ldap_versions}")
print(f"Supported SASL mechanisms: {dsa_info.supported_sasl_mechanisms}")
print(f"Naming contexts: {dsa_info.naming_contexts}")
# Save server info to file
dsa_info.to_file('/tmp/server_info.json')
# Load server info from file later
loaded_dsa = ldap3.DsaInfo.from_file('/tmp/server_info.json')# Get schema information
server = ldap3.Server('ldap://ldap.example.com', get_info=ldap3.ALL)
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)
schema = server.schema
if schema:
print(f"Schema DN: {schema.schema_dn}")
# Explore object classes
print("Available object classes:")
for oc_name, oc_def in schema.object_classes.items():
print(f" {oc_name}: {oc_def.description}")
# Explore attribute types
print("Available attribute types:")
for attr_name, attr_def in schema.attribute_types.items():
print(f" {attr_name}: {attr_def.description}")
# Save schema to file
schema.to_file('/tmp/schema_info.json')
# Check specific object class
if 'inetOrgPerson' in schema.object_classes:
inet_org_person = schema.object_classes['inetOrgPerson']
print(f"inetOrgPerson must attributes: {inet_org_person.must_contain}")
print(f"inetOrgPerson may attributes: {inet_org_person.may_contain}")import ldap3
try:
server = ldap3.Server('ldap://invalid.server.com')
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password')
conn.bind()
# Perform operations
conn.search('dc=example,dc=com', '(objectClass=person)')
except ldap3.LDAPInvalidServerError as e:
print(f"Invalid server: {e}")
except ldap3.LDAPBindError as e:
print(f"Bind failed: {e}")
except ldap3.LDAPInvalidCredentialsResult as e:
print(f"Invalid credentials: {e}")
except ldap3.LDAPSocketOpenError as e:
print(f"Cannot connect to server: {e}")
except ldap3.LDAPCommunicationError as e:
print(f"Communication error: {e}")
except ldap3.LDAPInvalidFilterError as e:
print(f"Invalid search filter: {e}")
except ldap3.LDAPException as e:
print(f"General LDAP error: {e}")# Optimize for high-performance scenarios
ldap3.set_config_parameter('SOCKET_SIZE', 16384) # Larger socket buffer
ldap3.set_config_parameter('RESPONSE_WAITING_TIMEOUT', 60) # Longer timeout
ldap3.set_config_parameter('RESPONSE_SLEEPTIME', 0.01) # Faster polling
# Configure thread pool for reusable connections
ldap3.set_config_parameter('REUSABLE_THREADED_POOL_SIZE', 20)
ldap3.set_config_parameter('REUSABLE_THREADED_LIFETIME', 7200) # 2 hours
# Create high-performance connection pool
servers = [
ldap3.Server('ldap1.example.com'),
ldap3.Server('ldap2.example.com')
]
server_pool = ldap3.ServerPool(servers, ldap3.ROUND_ROBIN, active=True)
conn = ldap3.Connection(
server_pool,
'cn=user,dc=example,dc=com',
'password',
client_strategy=ldap3.REUSABLE,
pool_name='high_perf_pool',
pool_size=10,
pool_lifetime=3600,
auto_bind=True,
fast_decoder=True
)def safe_ldap_operation(connection, operation_func, *args, **kwargs):
"""
Safely execute LDAP operation with comprehensive error handling.
"""
try:
return operation_func(*args, **kwargs)
except ldap3.LDAPInvalidCredentialsResult:
print("Authentication failed - check username/password")
return None
except ldap3.LDAPInsufficientAccessRightsResult:
print("Insufficient permissions for operation")
return None
except ldap3.LDAPTimeLimitExceededResult:
print("Operation timed out - try with smaller scope")
return None
except ldap3.LDAPSizeLimitExceededResult:
print("Too many results - use paged search")
return None
except ldap3.LDAPSocketOpenError:
print("Cannot connect to LDAP server")
return None
except ldap3.LDAPCommunicationError as e:
print(f"Network communication error: {e}")
return None
except ldap3.LDAPOperationResult as e:
print(f"LDAP operation failed with result {e.result}: {e.description}")
return None
except ldap3.LDAPException as e:
print(f"Unexpected LDAP error: {e}")
return None
# Usage
result = safe_ldap_operation(
conn,
conn.search,
'dc=example,dc=com',
'(objectClass=person)',
attributes=['cn', 'mail']
)# Use predefined offline schemas for testing
offline_schemas = {
ldap3.OFFLINE_AD_2012_R2: "Active Directory 2012 R2",
ldap3.OFFLINE_EDIR_8_8_8: "Novell eDirectory 8.8.8",
ldap3.OFFLINE_SLAPD_2_4: "OpenLDAP slapd 2.4",
ldap3.OFFLINE_DS389_1_3_3: "389 Directory Server 1.3.3"
}
# Create server with offline schema
server = ldap3.Server('ldap://test.example.com', get_info=ldap3.OFFLINE_AD_2012_R2)
# Schema information is available without connecting
print(f"Using offline schema: {offline_schemas[ldap3.OFFLINE_AD_2012_R2]}")
print(f"Object classes available: {len(server.schema.object_classes)}")
print(f"Attribute types available: {len(server.schema.attribute_types)}")import json
def save_ldap_config():
"""Save current LDAP3 configuration to file."""
config = {}
config_params = [
'CASE_INSENSITIVE_ATTRIBUTE_NAMES',
'CASE_INSENSITIVE_SCHEMA_NAMES',
'RESPONSE_WAITING_TIMEOUT',
'SOCKET_SIZE',
'RESTARTABLE_TRIES',
'REUSABLE_THREADED_POOL_SIZE'
]
for param in config_params:
try:
config[param] = ldap3.get_config_parameter(param)
except ldap3.LDAPConfigurationParameterError:
pass
with open('/tmp/ldap3_config.json', 'w') as f:
json.dump(config, f, indent=2)
def load_ldap_config():
"""Load LDAP3 configuration from file."""
try:
with open('/tmp/ldap3_config.json', 'r') as f:
config = json.load(f)
for param, value in config.items():
try:
ldap3.set_config_parameter(param, value)
print(f"Set {param} = {value}")
except ldap3.LDAPConfigurationParameterError as e:
print(f"Cannot set {param}: {e}")
except FileNotFoundError:
print("Configuration file not found")
# Usage
save_ldap_config()
# ... modify configuration in another session
load_ldap_config()Install with Tessl CLI
npx tessl i tessl/pypi-ldap3evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10