A strictly RFC 4510 conforming LDAP V3 pure Python client library
81
Server definition, connection establishment, authentication methods, TLS/SSL configuration, server pooling for high availability, and connection strategies for different use cases.
Defines LDAP server connection parameters including hostname, port, SSL settings, and server information retrieval options.
class Server:
def __init__(self, host, port=None, use_ssl=False, allowed_referral_hosts=None,
get_info=NONE, tls=None, formatter=None, connect_timeout=None,
mode=IP_V6_PREFERRED):
"""
Define an LDAP server.
Args:
host (str): Server hostname or IP address
port (int, optional): Server port (389 for LDAP, 636 for LDAPS)
use_ssl (bool): Use SSL/TLS connection
allowed_referral_hosts (list, optional): Allowed hosts for referrals
get_info (str): Server info to retrieve (NONE, DSA, SCHEMA, ALL)
tls (Tls, optional): TLS configuration object
formatter (dict, optional): Custom formatters for attributes
connect_timeout (float, optional): Connection timeout in seconds
mode (str): IP addressing mode preference
"""
def check_availability(self):
"""
Check if server is available.
Returns:
bool: True if server is reachable
"""
def get_info_from_server(self, connection):
"""
Retrieve server information.
Args:
connection (Connection): Active LDAP connection
Returns:
bool: True if information retrieved successfully
"""
@staticmethod
def from_definition(host, dsa_info, dsa_schema, port=None, use_ssl=False,
tls=None, formatter=None):
"""
Create server from DSA definition.
Args:
host (str): Server hostname
dsa_info (DsaInfo): DSA information object
dsa_schema (SchemaInfo): Schema information object
port (int, optional): Server port
use_ssl (bool): Use SSL connection
tls (Tls, optional): TLS configuration
formatter (dict, optional): Custom formatters
Returns:
Server: Configured server object
"""Properties:
host: Server hostname or IP addressport: Server port numberssl: SSL enabled flagname: Complete server URLtls: TLS configuration objectinfo: DSA information (DsaInfo instance)schema: Schema information (SchemaInfo instance)current_address: Current connection addressManages LDAP connections with various authentication methods, client strategies, and connection options.
class Connection:
def __init__(self, server, user=None, password=None, auto_bind=AUTO_BIND_NONE,
version=3, authentication=None, client_strategy=SYNC,
auto_referrals=True, auto_range=False, sasl_mechanism=None,
sasl_credentials=None, check_names=True, collect_usage=False,
read_only=False, lazy=False, raise_exceptions=False,
pool_name=None, pool_size=None, pool_lifetime=None,
fast_decoder=True, receive_timeout=None,
return_empty_attributes=False):
"""
Create LDAP connection.
Args:
server (Server): Server or ServerPool object
user (str, optional): User DN for binding
password (str, optional): Password for binding
auto_bind (str): Auto-bind behavior (NONE, NO_TLS, TLS_BEFORE_BIND, TLS_AFTER_BIND)
version (int): LDAP protocol version (default: 3)
authentication (str, optional): Authentication method (ANONYMOUS, SIMPLE, SASL, NTLM)
client_strategy (str): Client strategy (SYNC, ASYNC, LDIF, RESTARTABLE, REUSABLE)
auto_referrals (bool): Follow referrals automatically
auto_range (bool): Automatic range retrieval for multi-valued attributes
sasl_mechanism (str, optional): SASL mechanism name
sasl_credentials (tuple, optional): SASL credentials
check_names (bool): Check attribute and object class names (default: True)
collect_usage (bool): Collect connection usage statistics
read_only (bool): Connection is read-only
lazy (bool): Lazy connection - connect on first operation (default: False)
raise_exceptions (bool): Raise exceptions for LDAP errors
pool_name (str, optional): Connection pool name for REUSABLE strategy
pool_size (int, optional): Connection pool size
pool_lifetime (int, optional): Connection lifetime in seconds
fast_decoder (bool): Use fast BER decoder
receive_timeout (float, optional): Receive timeout in seconds
return_empty_attributes (bool): Return empty attributes in results (default: False)
"""
def bind(self, user=None, password=None, sasl_mechanism=None, sasl_credentials=None):
"""
Bind to LDAP server.
Args:
user (str, optional): User DN to bind with
password (str, optional): Password for binding
sasl_mechanism (str, optional): SASL mechanism
sasl_credentials (tuple, optional): SASL credentials
Returns:
bool: True if bind successful
"""
def rebind(self, user=None, password=None, sasl_mechanism=None, sasl_credentials=None):
"""
Rebind with new credentials.
Args:
user (str, optional): New user DN
password (str, optional): New password
sasl_mechanism (str, optional): SASL mechanism
sasl_credentials (tuple, optional): SASL credentials
Returns:
bool: True if rebind successful
"""
def unbind(self, controls=None):
"""
Unbind from server and close connection.
Args:
controls (list, optional): LDAP controls
Returns:
bool: True if unbind successful
"""
def start_tls(self):
"""
Start TLS encryption on connection.
Returns:
bool: True if TLS started successfully
"""
def do_sasl_bind(self, controls):
"""
Perform SASL bind operation.
Args:
controls (list): LDAP controls for bind
Returns:
bool: True if SASL bind successful
"""
def do_ntlm_bind(self, controls):
"""
Perform NTLM bind operation.
Args:
controls (list): LDAP controls for bind
Returns:
bool: True if NTLM bind successful
"""Properties:
server: Associated Server or ServerPool objectuser: Bound user DNpassword: User password (masked)authentication: Authentication method usedversion: LDAP protocol versionauto_bind: Auto-bind settingstrategy: Connection strategy objectbound: True if connection is boundclosed: True if connection is closedusage: Usage statistics (if enabled)entries: Search result entries (abstract layer)response: Last operation responseresult: Last operation resultConfigures SSL/TLS encryption for secure LDAP connections with certificate validation and client certificate support.
class Tls:
def __init__(self, local_private_key_file=None, local_certificate_file=None,
validate=ssl.CERT_NONE, version=None, ca_certs_file=None,
valid_names=None, ca_certs_path=None, ca_certs_data=None,
local_private_key_password=None):
"""
Configure TLS/SSL settings.
Args:
local_private_key_file (str, optional): Client private key file path
local_certificate_file (str, optional): Client certificate file path
validate (int): Certificate validation level (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED)
version (int, optional): SSL/TLS protocol version
ca_certs_file (str, optional): CA certificates file path
valid_names (list, optional): Valid server names for certificate checking
ca_certs_path (str, optional): CA certificates directory path
ca_certs_data (str, optional): CA certificates as string data
local_private_key_password (str, optional): Private key password
"""
def wrap_socket(self, connection, do_handshake=False):
"""
Wrap socket with SSL/TLS.
Args:
connection (Connection): LDAP connection object
do_handshake (bool): Perform SSL handshake immediately
Returns:
ssl.SSLSocket: Wrapped SSL socket
"""
def start_tls(self, connection):
"""
Start TLS on existing connection.
Args:
connection (Connection): LDAP connection object
Returns:
bool: True if TLS started successfully
"""Manages multiple LDAP servers for high availability with different pooling strategies and automatic failover.
class ServerPool:
def __init__(self, servers=None, pool_strategy=ROUND_ROBIN, active=True, exhaust=False):
"""
Create server pool for high availability.
Args:
servers (list, optional): List of Server objects
pool_strategy (str): Pooling strategy (FIRST, ROUND_ROBIN, RANDOM)
active (bool): Check server availability before use
exhaust (bool): Try all servers before giving up
"""
def add(self, servers):
"""
Add server(s) to pool.
Args:
servers (Server or list): Server object(s) to add
"""
def remove(self, server):
"""
Remove server from pool.
Args:
server (Server): Server object to remove
"""
def initialize(self, connection):
"""
Initialize pool state for connection.
Args:
connection (Connection): LDAP connection object
"""
def get_server(self, connection):
"""
Get next server from pool.
Args:
connection (Connection): LDAP connection object
Returns:
Server: Selected server object
"""
def get_current_server(self, connection):
"""
Get current server for connection.
Args:
connection (Connection): LDAP connection object
Returns:
Server: Current server object
"""# Connection class context manager methods
def __enter__(self):
"""Enter connection context."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit connection context and close connection."""Additional connection management methods for advanced use cases.
def rebind(self, user=None, password=None, authentication=None, sasl_mechanism=None, sasl_credentials=None):
"""
Rebind connection with new credentials.
Args:
user (str, optional): New user DN for binding
password (str, optional): New password for binding
authentication (str, optional): Authentication method
sasl_mechanism (str, optional): SASL mechanism for SASL authentication
sasl_credentials (tuple, optional): SASL credentials
Returns:
bool: True if rebind successful
"""
def refresh_server_info(self):
"""
Refresh server information from DSE.
Returns:
bool: True if server info refreshed successfully
"""
@property
def entries(self):
"""
Get search results as Entry objects (abstract layer).
Returns:
list: List of Entry objects from last search
"""
@property
def stream(self):
"""
Get or set connection stream object.
Returns:
object: Current stream object
"""
@stream.setter
def stream(self, value):
"""Set connection stream object."""
def extended(self, request_name, request_value=None, controls=None, no_encode=None):
"""
Perform extended LDAP operation.
Args:
request_name (str): Extended operation OID
request_value (bytes, optional): Request value
controls (list, optional): LDAP controls
no_encode (bool, optional): Skip request value encoding
Returns:
bool: True if operation successful
"""Methods to convert search responses to different formats.
def response_to_ldif(self, search_result=None, all_base64=False, line_separator=None, sort_order=None, stream=None):
"""
Convert response to LDIF format.
Args:
search_result (list, optional): Search results to convert (defaults to last response)
all_base64 (bool): Encode all values in base64
line_separator (str, optional): Line separator character
sort_order (callable, optional): Sort function for entries
stream (file, optional): Stream to write LDIF to
Returns:
str: LDIF formatted response
"""
def response_to_json(self, raw=False, search_result=None, indent=4, sort=True, stream=None, checked_attributes=True):
"""
Convert response to JSON format.
Args:
raw (bool): Include raw attribute values
search_result (list, optional): Search results to convert
indent (int): JSON indentation level
sort (bool): Sort JSON keys
stream (file, optional): Stream to write JSON to
checked_attributes (bool): Use schema checking for attributes
Returns:
str: JSON formatted response
"""
def response_to_file(self, target, raw=False, encoding='utf-8'):
"""
Write response to file.
Args:
target (str): Target file path
raw (bool): Include raw attribute values
encoding (str): File encoding
"""import ldap3
# Simple connection
server = ldap3.Server('ldap.example.com')
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password')
conn.bind()
# ... perform operations
conn.unbind()import ldap3
# TLS configuration
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED, ca_certs_file='/path/to/ca-certs.pem')
server = ldap3.Server('ldap.example.com', use_ssl=True, tls=tls_config)
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)import ldap3
# Multiple servers with round-robin
server1 = ldap3.Server('ldap1.example.com')
server2 = ldap3.Server('ldap2.example.com')
server_pool = ldap3.ServerPool([server1, server2], ldap3.ROUND_ROBIN, active=True)
conn = ldap3.Connection(server_pool, 'cn=user,dc=example,dc=com', 'password', auto_bind=True)import ldap3
server = ldap3.Server('ldap.example.com')
with ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password', auto_bind=True) as conn:
conn.search('dc=example,dc=com', '(objectClass=person)')
# Connection automatically closed when exiting contextimport ldap3
server = ldap3.Server('ldap.example.com')
conn = ldap3.Connection(server, sasl_mechanism=ldap3.KERBEROS, sasl_credentials=())
conn.bind()import ldap3
server = ldap3.Server('ldap.example.com')
conn = ldap3.Connection(server, 'cn=user,dc=example,dc=com', 'password',
client_strategy=ldap3.ASYNC, auto_bind=True)
# Operations return immediately, check results later
message_id = conn.search('dc=example,dc=com', '(objectClass=person)')
result = conn.get_response(message_id)Install with Tessl CLI
npx tessl i tessl/pypi-ldap3evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10