A strictly RFC 4510 conforming LDAP V3 pure Python client library
81
Extended operations for specific LDAP server implementations including Microsoft Active Directory, Novell eDirectory, and standard RFC extensions.
RFC-standard extended operations supported by most LDAP servers.
# Standard extended operations available on connection.extend.standard
def who_am_i(self, controls=None):
"""
WHO AM I extended operation (RFC 4532).
Retrieves the authorization identity associated with the connection.
Args:
controls (list, optional): LDAP controls
Returns:
str: Authorization identity or None if not supported
"""
def modify_password(self, user, old_password, new_password, hash_algorithm=None,
salt=None, controls=None):
"""
Password Modify extended operation (RFC 3062).
Modify user password with optional hashing.
Args:
user (str): User DN whose password to change
old_password (str): Current password
new_password (str): New password
hash_algorithm (str, optional): Hash algorithm (HASHED_SHA, HASHED_MD5, etc.)
salt (bytes, optional): Salt for hashed password
controls (list, optional): LDAP controls
Returns:
bool: True if password changed successfully
"""
def paged_search(self, search_base, search_filter, search_scope=SUBTREE,
dereference_aliases=DEREF_ALWAYS, attributes=None,
size_limit=0, time_limit=0, types_only=False,
get_operational_attributes=False, controls=None,
paged_size=1000, paged_criticality=False):
"""
Paged search helper for large result sets.
Args:
search_base (str): Search base DN
search_filter (str): LDAP search filter
search_scope (str): Search scope
dereference_aliases (str): Alias dereferencing
attributes (list, optional): Attributes to retrieve
size_limit (int): Size limit per page
time_limit (int): Time limit in seconds
types_only (bool): Return types only
get_operational_attributes (bool): Include operational attributes
controls (list, optional): Additional LDAP controls
paged_size (int): Page size for results
paged_criticality (bool): Paged control criticality
Returns:
generator: Generator yielding Entry objects
"""
def persistent_search(self, search_base, search_filter, search_scope=SUBTREE,
dereference_aliases=DEREF_ALWAYS, attributes=None,
get_operational_attributes=False, controls=None,
changes_only=True, events_type=None,
notification_changes=True):
"""
Persistent Search extended operation (RFC 3673 draft).
Monitor directory changes in real-time.
Args:
search_base (str): Search base DN
search_filter (str): LDAP search filter
search_scope (str): Search scope
dereference_aliases (str): Alias dereferencing
attributes (list, optional): Attributes to monitor
get_operational_attributes (bool): Include operational attributes
controls (list, optional): LDAP controls
changes_only (bool): Return only changes, not initial entries
events_type (list, optional): Types of changes to monitor
notification_changes (bool): Include change notification info
Returns:
generator: Generator yielding change notifications
"""Extended operations specific to Microsoft Active Directory.
# Microsoft AD extended operations available on connection.extend.microsoft
def dir_sync(self, sync_base, sync_filter='(objectclass=*)', attributes=ALL_ATTRIBUTES, cookie=None,
object_security=False, ancestors_first=True, public_data_only=False,
incremental_values=True, max_length=2147483647, hex_guid=False):
"""
DirSync extended operation for Active Directory synchronization.
Efficiently synchronize directory changes from Active Directory.
Args:
sync_base (str): Base DN for synchronization
sync_filter (str): LDAP filter for objects to sync (default: '(objectclass=*)')
attributes (list): Attributes to synchronize (default: ALL_ATTRIBUTES)
cookie (bytes, optional): Synchronization cookie from previous sync
object_security (bool): Include object security information (default: False)
ancestors_first (bool): Return parent objects before children (default: True)
public_data_only (bool): Return only public data (default: False)
incremental_values (bool): Return incremental value changes (default: True)
max_length (int): Maximum response length (default: 2147483647)
hex_guid (bool): Return GUIDs in hexadecimal format (default: False)
Returns:
dict: Sync results with entries and new cookie
"""
def modify_password(self, user, new_password, old_password=None, controls=None):
"""
Active Directory password change operation.
Change user password using AD-specific method.
Args:
user (str): User DN whose password to change
new_password (str): New password
old_password (str, optional): Current password (required for user self-change)
controls (list, optional): LDAP controls
Returns:
bool: True if password changed successfully
"""Extended operations specific to Novell eDirectory.
# Novell eDirectory extended operations available on connection.extend.novell
def get_bind_dn(self, controls=None):
"""
Get Bind DN extended operation.
Retrieve the DN used for binding to eDirectory.
Args:
controls (list, optional): LDAP controls
Returns:
str: Bind DN or None if operation fails
"""
def get_universal_password(self, user, controls=None):
"""
Get Universal Password extended operation.
Retrieve user's universal password from eDirectory.
Args:
user (str): User DN
controls (list, optional): LDAP controls
Returns:
str: Universal password or None if not available
"""
def set_universal_password(self, user, new_password, controls=None):
"""
Set Universal Password extended operation.
Set user's universal password in eDirectory.
Args:
user (str): User DN
new_password (str): New universal password
controls (list, optional): LDAP controls
Returns:
bool: True if password set successfully
"""
def list_replicas(self, server_dn, controls=None):
"""
List Replicas extended operation.
List replicas for eDirectory server.
Args:
server_dn (str): Server DN
controls (list, optional): LDAP controls
Returns:
list: List of replica information
"""
def partition_entry_count(self, partition_dn, controls=None):
"""
Get Partition Entry Count extended operation.
Get number of entries in eDirectory partition.
Args:
partition_dn (str): Partition DN
controls (list, optional): LDAP controls
Returns:
int: Number of entries in partition
"""
def replica_info(self, server_dn, partition_dn, controls=None):
"""
Get Replica Info extended operation.
Get replica information for specific partition.
Args:
server_dn (str): Server DN
partition_dn (str): Partition DN
controls (list, optional): LDAP controls
Returns:
dict: Replica information
"""
def start_transaction(self, controls=None):
"""
Start Transaction extended operation.
Begin eDirectory transaction for atomic operations.
Args:
controls (list, optional): LDAP controls
Returns:
bytes: Transaction ID for subsequent operations
"""
def end_transaction(self, commit=True, controls=None):
"""
End Transaction extended operation.
Commit or abort eDirectory transaction.
Args:
commit (bool): True to commit, False to abort
controls (list, optional): LDAP controls
Returns:
bool: True if transaction ended successfully
"""
def add_members_to_groups(self, members, groups, fix=True, transaction=True):
"""
Add Members to Groups extended operation.
Efficiently add multiple members to multiple groups.
Args:
members (list): List of member DNs
groups (list): List of group DNs
fix (bool): Fix membership inconsistencies
transaction (bool): Use transaction for atomic operation
Returns:
bool: True if operation successful
"""
def remove_members_from_groups(self, members, groups, fix=True, transaction=True):
"""
Remove Members from Groups extended operation.
Efficiently remove multiple members from multiple groups.
Args:
members (list): List of member DNs
groups (list): List of group DNs
fix (bool): Fix membership inconsistencies
transaction (bool): Use transaction for atomic operation
Returns:
bool: True if operation successful
"""
def check_groups_memberships(self, members, groups, fix=False, transaction=True):
"""
Check Groups Memberships extended operation.
Check membership relationships between members and groups.
Args:
members (list): List of member DNs
groups (list): List of group DNs
fix (bool): Fix membership inconsistencies if found
transaction (bool): Use transaction for atomic operation
Returns:
dict: Membership check results
"""import ldap3
server = ldap3.Server('ldap://ldap.example.com')
conn = ldap3.Connection(server, 'cn=admin,dc=example,dc=com', 'password', auto_bind=True)
# WHO AM I operation
identity = conn.extend.standard.who_am_i()
print(f"Current identity: {identity}")
# Password modification
result = conn.extend.standard.modify_password(
user='cn=john,ou=people,dc=example,dc=com',
old_password='oldpass',
new_password='newpass123',
hash_algorithm=ldap3.HASHED_SHA
)
if result:
print("Password changed successfully")# Large result set with paged search
search_base = 'dc=example,dc=com'
search_filter = '(objectClass=person)'
# Paged search returns generator
entries = conn.extend.standard.paged_search(
search_base=search_base,
search_filter=search_filter,
search_scope=ldap3.SUBTREE,
attributes=['cn', 'mail'],
paged_size=100
)
# Process all results regardless of size
count = 0
for entry in entries:
print(f"Found: {entry.cn}")
count += 1
print(f"Total entries processed: {count}")# Monitor directory changes in real-time
try:
changes = conn.extend.standard.persistent_search(
search_base='ou=people,dc=example,dc=com',
search_filter='(objectClass=person)',
attributes=['cn', 'mail', 'telephoneNumber'],
changes_only=True
)
print("Monitoring directory changes... (Press Ctrl+C to stop)")
for change in changes:
change_type = change.get('change_type', 'unknown')
entry_dn = change.get('dn', 'unknown')
print(f"Change detected: {change_type} on {entry_dn}")
if 'attributes' in change:
for attr, values in change['attributes'].items():
print(f" {attr}: {values}")
except KeyboardInterrupt:
print("Monitoring stopped")
except ldap3.LDAPExtensionError as e:
print(f"Persistent search not supported: {e}")# Active Directory synchronization
server = ldap3.Server('ldap://dc.company.com')
conn = ldap3.Connection(server, 'DOMAIN\\syncuser', 'password', auto_bind=True)
# Initial sync
sync_result = conn.extend.microsoft.dir_sync(
sync_base='dc=company,dc=com',
sync_filter='(objectClass=user)',
sync_attributes=['cn', 'mail', 'whenChanged', 'objectGUID'],
object_security=False,
incremental_values=True
)
print(f"Initial sync found {len(sync_result['entries'])} entries")
# Save cookie for next sync
sync_cookie = sync_result['cookie']
# Later incremental sync
incremental_result = conn.extend.microsoft.dir_sync(
sync_base='dc=company,dc=com',
sync_filter='(objectClass=user)',
sync_attributes=['cn', 'mail', 'whenChanged', 'objectGUID'],
cookie=sync_cookie,
incremental_values=True
)
print(f"Incremental sync found {len(incremental_result['entries'])} changes")# AD-specific password change
server = ldap3.Server('ldap://dc.company.com', use_ssl=True)
conn = ldap3.Connection(server, 'DOMAIN\\admin', 'adminpass', auto_bind=True)
# Admin changing user password
result = conn.extend.microsoft.modify_password(
user='cn=John Smith,ou=users,dc=company,dc=com',
new_password='NewComplexP@ssw0rd'
)
if result:
print("Password changed successfully")
else:
print(f"Password change failed: {conn.result}")
# User self-change (requires old password)
user_conn = ldap3.Connection(server, 'DOMAIN\\john.smith', 'oldpassword', auto_bind=True)
result = user_conn.extend.microsoft.modify_password(
user='cn=John Smith,ou=users,dc=company,dc=com',
new_password='NewComplexP@ssw0rd',
old_password='oldpassword'
)# eDirectory specific operations
server = ldap3.Server('ldap://edir.company.com')
conn = ldap3.Connection(server, 'cn=admin,o=company', 'password', auto_bind=True)
# Get current bind DN
bind_dn = conn.extend.novell.get_bind_dn()
print(f"Currently bound as: {bind_dn}")
# Universal password operations
user_dn = 'cn=john,ou=people,o=company'
# Set universal password
result = conn.extend.novell.set_universal_password(user_dn, 'newpassword123')
if result:
print("Universal password set successfully")
# Get universal password (requires special privileges)
try:
password = conn.extend.novell.get_universal_password(user_dn)
print(f"Universal password retrieved: {password}")
except ldap3.LDAPExtensionError as e:
print(f"Cannot retrieve password: {e}")# Efficient group membership operations
members = [
'cn=john,ou=people,o=company',
'cn=jane,ou=people,o=company',
'cn=bob,ou=people,o=company'
]
groups = [
'cn=developers,ou=groups,o=company',
'cn=employees,ou=groups,o=company'
]
# Add members to multiple groups atomically
result = conn.extend.novell.add_members_to_groups(
members=members,
groups=groups,
fix=True, # Fix any inconsistencies
transaction=True # Use transaction for atomicity
)
if result:
print("Members added to groups successfully")
# Check membership status
membership_info = conn.extend.novell.check_groups_memberships(
members=members,
groups=groups,
fix=False # Only check, don't fix
)
print("Membership status:")
for member, group_info in membership_info.items():
print(f" {member}: {group_info}")# Atomic operations using transactions
try:
# Start transaction
transaction_id = conn.extend.novell.start_transaction()
print(f"Transaction started: {transaction_id}")
# Perform multiple operations
conn.add('cn=newuser1,ou=people,o=company',
object_class=['inetOrgPerson'],
attributes={'cn': 'New User 1', 'sn': 'User1'})
conn.add('cn=newuser2,ou=people,o=company',
object_class=['inetOrgPerson'],
attributes={'cn': 'New User 2', 'sn': 'User2'})
# Add users to group
conn.extend.novell.add_members_to_groups(
members=['cn=newuser1,ou=people,o=company', 'cn=newuser2,ou=people,o=company'],
groups=['cn=newusers,ou=groups,o=company'],
transaction=False # Already in transaction
)
# Commit transaction
result = conn.extend.novell.end_transaction(commit=True)
if result:
print("Transaction committed successfully")
else:
print("Transaction commit failed")
except Exception as e:
print(f"Error during transaction: {e}")
# Abort transaction
conn.extend.novell.end_transaction(commit=False)
print("Transaction aborted")# Server and partition information
server_dn = 'cn=server1,ou=servers,o=company'
# List all replicas on server
replicas = conn.extend.novell.list_replicas(server_dn)
print("Replicas on server:")
for replica in replicas:
print(f" Partition: {replica['partition_dn']}")
print(f" Type: {replica['replica_type']}")
print(f" State: {replica['replica_state']}")
# Get partition entry count
partition_dn = 'ou=people,o=company'
entry_count = conn.extend.novell.partition_entry_count(partition_dn)
print(f"Entries in {partition_dn}: {entry_count}")
# Get detailed replica information
replica_info = conn.extend.novell.replica_info(server_dn, partition_dn)
print(f"Replica info for {partition_dn}:")
print(f" Replica ID: {replica_info['replica_id']}")
print(f" Replica number: {replica_info['replica_number']}")
print(f" Sync status: {replica_info['sync_status']}")try:
# Attempt extended operation
result = conn.extend.standard.who_am_i()
print(f"WHO AM I result: {result}")
except ldap3.LDAPExtensionError as e:
print(f"Extended operation not supported: {e}")
except ldap3.LDAPOperationResult as e:
print(f"Operation failed with result: {e.result}")
except ldap3.LDAPCommunicationError as e:
print(f"Communication error: {e}")
# Check if specific extended operation is supported
if hasattr(conn.extend, 'microsoft'):
# Microsoft AD extensions are available
try:
sync_result = conn.extend.microsoft.dir_sync(
sync_base='dc=company,dc=com',
sync_filter='(objectClass=user)'
)
except ldap3.LDAPExtensionError:
print("DirSync not supported or insufficient permissions")
else:
print("Microsoft AD extensions not available")Install with Tessl CLI
npx tessl i tessl/pypi-ldap3evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10