CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ldap3

A strictly RFC 4510 conforming LDAP V3 pure Python client library

81

1.08x
Overview
Eval results
Files

extended.mddocs/

Extended Operations

Extended operations for specific LDAP server implementations including Microsoft Active Directory, Novell eDirectory, and standard RFC extensions.

Capabilities

Standard Extended Operations

RFC-standard extended operations supported by most LDAP servers.

# Standard extended operations available on connection.extend.standard

def who_am_i(self, controls=None):
    """
    WHO AM I extended operation (RFC 4532).
    
    Retrieves the authorization identity associated with the connection.

    Args:
        controls (list, optional): LDAP controls

    Returns:
        str: Authorization identity or None if not supported
    """

def modify_password(self, user, old_password, new_password, hash_algorithm=None, 
                   salt=None, controls=None):
    """
    Password Modify extended operation (RFC 3062).
    
    Modify user password with optional hashing.

    Args:
        user (str): User DN whose password to change
        old_password (str): Current password
        new_password (str): New password
        hash_algorithm (str, optional): Hash algorithm (HASHED_SHA, HASHED_MD5, etc.)
        salt (bytes, optional): Salt for hashed password
        controls (list, optional): LDAP controls

    Returns:
        bool: True if password changed successfully
    """

def paged_search(self, search_base, search_filter, search_scope=SUBTREE,
                dereference_aliases=DEREF_ALWAYS, attributes=None, 
                size_limit=0, time_limit=0, types_only=False,
                get_operational_attributes=False, controls=None,
                paged_size=1000, paged_criticality=False):
    """
    Paged search helper for large result sets.

    Args:
        search_base (str): Search base DN
        search_filter (str): LDAP search filter
        search_scope (str): Search scope
        dereference_aliases (str): Alias dereferencing
        attributes (list, optional): Attributes to retrieve
        size_limit (int): Size limit per page
        time_limit (int): Time limit in seconds
        types_only (bool): Return types only
        get_operational_attributes (bool): Include operational attributes
        controls (list, optional): Additional LDAP controls
        paged_size (int): Page size for results
        paged_criticality (bool): Paged control criticality

    Returns:
        generator: Generator yielding Entry objects
    """

def persistent_search(self, search_base, search_filter, search_scope=SUBTREE,
                     dereference_aliases=DEREF_ALWAYS, attributes=None,
                     get_operational_attributes=False, controls=None,
                     changes_only=True, events_type=None, 
                     notification_changes=True):
    """
    Persistent Search extended operation (RFC 3673 draft).
    
    Monitor directory changes in real-time.

    Args:
        search_base (str): Search base DN
        search_filter (str): LDAP search filter
        search_scope (str): Search scope
        dereference_aliases (str): Alias dereferencing
        attributes (list, optional): Attributes to monitor
        get_operational_attributes (bool): Include operational attributes
        controls (list, optional): LDAP controls
        changes_only (bool): Return only changes, not initial entries
        events_type (list, optional): Types of changes to monitor
        notification_changes (bool): Include change notification info

    Returns:
        generator: Generator yielding change notifications
    """

Microsoft Active Directory Extended Operations

Extended operations specific to Microsoft Active Directory.

# Microsoft AD extended operations available on connection.extend.microsoft

def dir_sync(self, sync_base, sync_filter='(objectclass=*)', attributes=ALL_ATTRIBUTES, cookie=None, 
             object_security=False, ancestors_first=True, public_data_only=False,
             incremental_values=True, max_length=2147483647, hex_guid=False):
    """
    DirSync extended operation for Active Directory synchronization.
    
    Efficiently synchronize directory changes from Active Directory.

    Args:
        sync_base (str): Base DN for synchronization
        sync_filter (str): LDAP filter for objects to sync (default: '(objectclass=*)')
        attributes (list): Attributes to synchronize (default: ALL_ATTRIBUTES)
        cookie (bytes, optional): Synchronization cookie from previous sync
        object_security (bool): Include object security information (default: False)
        ancestors_first (bool): Return parent objects before children (default: True)
        public_data_only (bool): Return only public data (default: False)
        incremental_values (bool): Return incremental value changes (default: True)
        max_length (int): Maximum response length (default: 2147483647)
        hex_guid (bool): Return GUIDs in hexadecimal format (default: False)

    Returns:
        dict: Sync results with entries and new cookie
    """

def modify_password(self, user, new_password, old_password=None, controls=None):
    """
    Active Directory password change operation.
    
    Change user password using AD-specific method.

    Args:
        user (str): User DN whose password to change
        new_password (str): New password
        old_password (str, optional): Current password (required for user self-change)
        controls (list, optional): LDAP controls

    Returns:
        bool: True if password changed successfully
    """

Novell eDirectory Extended Operations

Extended operations specific to Novell eDirectory.

# Novell eDirectory extended operations available on connection.extend.novell

def get_bind_dn(self, controls=None):
    """
    Get Bind DN extended operation.
    
    Retrieve the DN used for binding to eDirectory.

    Args:
        controls (list, optional): LDAP controls

    Returns:
        str: Bind DN or None if operation fails
    """

def get_universal_password(self, user, controls=None):
    """
    Get Universal Password extended operation.
    
    Retrieve user's universal password from eDirectory.

    Args:
        user (str): User DN
        controls (list, optional): LDAP controls

    Returns:
        str: Universal password or None if not available
    """

def set_universal_password(self, user, new_password, controls=None):
    """
    Set Universal Password extended operation.
    
    Set user's universal password in eDirectory.

    Args:
        user (str): User DN
        new_password (str): New universal password
        controls (list, optional): LDAP controls

    Returns:
        bool: True if password set successfully
    """

def list_replicas(self, server_dn, controls=None):
    """
    List Replicas extended operation.
    
    List replicas for eDirectory server.

    Args:
        server_dn (str): Server DN
        controls (list, optional): LDAP controls

    Returns:
        list: List of replica information
    """

def partition_entry_count(self, partition_dn, controls=None):
    """
    Get Partition Entry Count extended operation.
    
    Get number of entries in eDirectory partition.

    Args:
        partition_dn (str): Partition DN
        controls (list, optional): LDAP controls

    Returns:
        int: Number of entries in partition
    """

def replica_info(self, server_dn, partition_dn, controls=None):
    """
    Get Replica Info extended operation.
    
    Get replica information for specific partition.

    Args:
        server_dn (str): Server DN
        partition_dn (str): Partition DN
        controls (list, optional): LDAP controls

    Returns:
        dict: Replica information
    """

def start_transaction(self, controls=None):
    """
    Start Transaction extended operation.
    
    Begin eDirectory transaction for atomic operations.

    Args:
        controls (list, optional): LDAP controls

    Returns:
        bytes: Transaction ID for subsequent operations
    """

def end_transaction(self, commit=True, controls=None):
    """
    End Transaction extended operation.
    
    Commit or abort eDirectory transaction.

    Args:
        commit (bool): True to commit, False to abort
        controls (list, optional): LDAP controls

    Returns:
        bool: True if transaction ended successfully
    """

def add_members_to_groups(self, members, groups, fix=True, transaction=True):
    """
    Add Members to Groups extended operation.
    
    Efficiently add multiple members to multiple groups.

    Args:
        members (list): List of member DNs
        groups (list): List of group DNs
        fix (bool): Fix membership inconsistencies
        transaction (bool): Use transaction for atomic operation

    Returns:
        bool: True if operation successful
    """

def remove_members_from_groups(self, members, groups, fix=True, transaction=True):
    """
    Remove Members from Groups extended operation.
    
    Efficiently remove multiple members from multiple groups.

    Args:
        members (list): List of member DNs  
        groups (list): List of group DNs
        fix (bool): Fix membership inconsistencies
        transaction (bool): Use transaction for atomic operation

    Returns:
        bool: True if operation successful
    """

def check_groups_memberships(self, members, groups, fix=False, transaction=True):
    """
    Check Groups Memberships extended operation.
    
    Check membership relationships between members and groups.

    Args:
        members (list): List of member DNs
        groups (list): List of group DNs
        fix (bool): Fix membership inconsistencies if found
        transaction (bool): Use transaction for atomic operation

    Returns:
        dict: Membership check results
    """

Usage Examples

Standard Extended Operations

import ldap3

server = ldap3.Server('ldap://ldap.example.com')
conn = ldap3.Connection(server, 'cn=admin,dc=example,dc=com', 'password', auto_bind=True)

# WHO AM I operation
identity = conn.extend.standard.who_am_i()
print(f"Current identity: {identity}")

# Password modification
result = conn.extend.standard.modify_password(
    user='cn=john,ou=people,dc=example,dc=com',
    old_password='oldpass',
    new_password='newpass123',
    hash_algorithm=ldap3.HASHED_SHA
)
if result:
    print("Password changed successfully")

Paged Search with Standard Extensions

# Large result set with paged search
search_base = 'dc=example,dc=com'
search_filter = '(objectClass=person)'

# Paged search returns generator
entries = conn.extend.standard.paged_search(
    search_base=search_base,
    search_filter=search_filter,
    search_scope=ldap3.SUBTREE,
    attributes=['cn', 'mail'],
    paged_size=100
)

# Process all results regardless of size
count = 0
for entry in entries:
    print(f"Found: {entry.cn}")
    count += 1

print(f"Total entries processed: {count}")

Persistent Search for Real-time Monitoring

# Monitor directory changes in real-time
try:
    changes = conn.extend.standard.persistent_search(
        search_base='ou=people,dc=example,dc=com',
        search_filter='(objectClass=person)',
        attributes=['cn', 'mail', 'telephoneNumber'],
        changes_only=True
    )
    
    print("Monitoring directory changes... (Press Ctrl+C to stop)")
    
    for change in changes:
        change_type = change.get('change_type', 'unknown')
        entry_dn = change.get('dn', 'unknown')
        print(f"Change detected: {change_type} on {entry_dn}")
        
        if 'attributes' in change:
            for attr, values in change['attributes'].items():
                print(f"  {attr}: {values}")
                
except KeyboardInterrupt:
    print("Monitoring stopped")
except ldap3.LDAPExtensionError as e:
    print(f"Persistent search not supported: {e}")

Microsoft Active Directory DirSync

# Active Directory synchronization
server = ldap3.Server('ldap://dc.company.com')
conn = ldap3.Connection(server, 'DOMAIN\\syncuser', 'password', auto_bind=True)

# Initial sync
sync_result = conn.extend.microsoft.dir_sync(
    sync_base='dc=company,dc=com',
    sync_filter='(objectClass=user)',
    sync_attributes=['cn', 'mail', 'whenChanged', 'objectGUID'],
    object_security=False,
    incremental_values=True
)

print(f"Initial sync found {len(sync_result['entries'])} entries")

# Save cookie for next sync
sync_cookie = sync_result['cookie']

# Later incremental sync
incremental_result = conn.extend.microsoft.dir_sync(
    sync_base='dc=company,dc=com',
    sync_filter='(objectClass=user)',
    sync_attributes=['cn', 'mail', 'whenChanged', 'objectGUID'],
    cookie=sync_cookie,
    incremental_values=True
)

print(f"Incremental sync found {len(incremental_result['entries'])} changes")

Active Directory Password Management

# AD-specific password change
server = ldap3.Server('ldap://dc.company.com', use_ssl=True)
conn = ldap3.Connection(server, 'DOMAIN\\admin', 'adminpass', auto_bind=True)

# Admin changing user password
result = conn.extend.microsoft.modify_password(
    user='cn=John Smith,ou=users,dc=company,dc=com',
    new_password='NewComplexP@ssw0rd'
)

if result:
    print("Password changed successfully")
else:
    print(f"Password change failed: {conn.result}")

# User self-change (requires old password)
user_conn = ldap3.Connection(server, 'DOMAIN\\john.smith', 'oldpassword', auto_bind=True)
result = user_conn.extend.microsoft.modify_password(
    user='cn=John Smith,ou=users,dc=company,dc=com',
    new_password='NewComplexP@ssw0rd',
    old_password='oldpassword'
)

Novell eDirectory Operations

# eDirectory specific operations
server = ldap3.Server('ldap://edir.company.com')
conn = ldap3.Connection(server, 'cn=admin,o=company', 'password', auto_bind=True)

# Get current bind DN
bind_dn = conn.extend.novell.get_bind_dn()
print(f"Currently bound as: {bind_dn}")

# Universal password operations
user_dn = 'cn=john,ou=people,o=company'

# Set universal password
result = conn.extend.novell.set_universal_password(user_dn, 'newpassword123')
if result:
    print("Universal password set successfully")

# Get universal password (requires special privileges)
try:
    password = conn.extend.novell.get_universal_password(user_dn)
    print(f"Universal password retrieved: {password}")
except ldap3.LDAPExtensionError as e:
    print(f"Cannot retrieve password: {e}")

eDirectory Group Management

# Efficient group membership operations
members = [
    'cn=john,ou=people,o=company',
    'cn=jane,ou=people,o=company',
    'cn=bob,ou=people,o=company'
]

groups = [
    'cn=developers,ou=groups,o=company',
    'cn=employees,ou=groups,o=company'
]

# Add members to multiple groups atomically
result = conn.extend.novell.add_members_to_groups(
    members=members,
    groups=groups,
    fix=True,           # Fix any inconsistencies
    transaction=True    # Use transaction for atomicity
)

if result:
    print("Members added to groups successfully")

# Check membership status
membership_info = conn.extend.novell.check_groups_memberships(
    members=members,
    groups=groups,
    fix=False          # Only check, don't fix
)

print("Membership status:")
for member, group_info in membership_info.items():
    print(f"  {member}: {group_info}")

eDirectory Transaction Management

# Atomic operations using transactions
try:
    # Start transaction
    transaction_id = conn.extend.novell.start_transaction()
    print(f"Transaction started: {transaction_id}")
    
    # Perform multiple operations
    conn.add('cn=newuser1,ou=people,o=company', 
             object_class=['inetOrgPerson'],
             attributes={'cn': 'New User 1', 'sn': 'User1'})
    
    conn.add('cn=newuser2,ou=people,o=company',
             object_class=['inetOrgPerson'], 
             attributes={'cn': 'New User 2', 'sn': 'User2'})
    
    # Add users to group  
    conn.extend.novell.add_members_to_groups(
        members=['cn=newuser1,ou=people,o=company', 'cn=newuser2,ou=people,o=company'],
        groups=['cn=newusers,ou=groups,o=company'],
        transaction=False  # Already in transaction
    )
    
    # Commit transaction
    result = conn.extend.novell.end_transaction(commit=True)
    if result:
        print("Transaction committed successfully")
    else:
        print("Transaction commit failed")
        
except Exception as e:
    print(f"Error during transaction: {e}")
    # Abort transaction
    conn.extend.novell.end_transaction(commit=False)
    print("Transaction aborted")

eDirectory Replica Management

# Server and partition information
server_dn = 'cn=server1,ou=servers,o=company'

# List all replicas on server
replicas = conn.extend.novell.list_replicas(server_dn)
print("Replicas on server:")
for replica in replicas:
    print(f"  Partition: {replica['partition_dn']}")
    print(f"  Type: {replica['replica_type']}")
    print(f"  State: {replica['replica_state']}")

# Get partition entry count
partition_dn = 'ou=people,o=company'
entry_count = conn.extend.novell.partition_entry_count(partition_dn)
print(f"Entries in {partition_dn}: {entry_count}")

# Get detailed replica information
replica_info = conn.extend.novell.replica_info(server_dn, partition_dn)
print(f"Replica info for {partition_dn}:")
print(f"  Replica ID: {replica_info['replica_id']}")
print(f"  Replica number: {replica_info['replica_number']}")
print(f"  Sync status: {replica_info['sync_status']}")

Error Handling for Extended Operations

try:
    # Attempt extended operation
    result = conn.extend.standard.who_am_i()
    print(f"WHO AM I result: {result}")
    
except ldap3.LDAPExtensionError as e:
    print(f"Extended operation not supported: {e}")
    
except ldap3.LDAPOperationResult as e:
    print(f"Operation failed with result: {e.result}")
    
except ldap3.LDAPCommunicationError as e:
    print(f"Communication error: {e}")

# Check if specific extended operation is supported
if hasattr(conn.extend, 'microsoft'):
    # Microsoft AD extensions are available
    try:
        sync_result = conn.extend.microsoft.dir_sync(
            sync_base='dc=company,dc=com',
            sync_filter='(objectClass=user)'
        )
    except ldap3.LDAPExtensionError:
        print("DirSync not supported or insufficient permissions")
else:
    print("Microsoft AD extensions not available")

Install with Tessl CLI

npx tessl i tessl/pypi-ldap3

docs

abstract.md

config.md

connection.md

extended.md

index.md

operations.md

tile.json