Python 3 module for accessing LDAP directory servers with async framework support and Active Directory integration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Windows Active Directory support including Security Identifiers (SID), Access Control Lists (ACL), Security Descriptors, and User Account Control flags. These components enable working with Windows-specific LDAP attributes and security structures.
Windows Security Identifier handling for users, groups, and computer accounts with string and binary format conversion.
from bonsai.active_directory import SID
class SID:
def __init__(
self,
str_rep: Optional[str] = None,
bytes_le: Optional[bytes] = None
) -> None:
"""
Initialize SID from string or binary representation.
Parameters:
- str_rep: String SID format (e.g., "S-1-5-21-1234567890-987654321-1111111111-1001")
- bytes_le: Binary SID in little-endian byte order
Note: Exactly one parameter must be provided
"""
def __str__(self) -> str:
"""Convert SID to standard string format."""
def __eq__(self, other) -> bool:
"""Compare SIDs for equality."""
def __hash__(self) -> int:
"""Hash SID for use in sets and dictionaries."""
@property
def revision(self) -> int:
"""SID revision number (usually 1)."""
@property
def identifier_authority(self) -> int:
"""Top-level authority that issued the SID."""
@property
def subauthorities(self) -> Tuple[int, ...]:
"""Tuple of subauthority values."""
@property
def bytes_le(self) -> bytes:
"""Binary representation in little-endian byte order."""
def is_well_known(self) -> bool:
"""Check if SID is a well-known Windows SID."""
def get_account_domain_sid(self) -> Optional["SID"]:
"""
Get the domain SID portion by removing the RID.
Returns:
Domain SID if this is an account SID, None otherwise
"""
def get_rid(self) -> Optional[int]:
"""
Get Relative Identifier (RID) - the last subauthority.
Returns:
RID value if this is an account SID, None otherwise
"""
@property
def sddl_alias(self) -> Optional[str]:
"""SDDL alias for well-known SIDs (e.g., 'BA' for Administrators)."""
@property
def size(self) -> int:
"""Size of SID in bytes."""Windows Access Control List implementation with support for Discretionary (DACL) and System (SACL) access control lists.
from bonsai.active_directory import ACL, ACE, ACLRevision, ACEFlag, ACERight, ACEType
from enum import IntEnum
class ACLRevision(IntEnum):
ACL_REVISION = 2
ACL_REVISION_DS = 4
class ACEType(IntEnum):
ACCESS_ALLOWED = 0
ACCESS_DENIED = 1
SYSTEM_AUDIT = 2
SYSTEM_ALARM = 3
ACCESS_ALLOWED_COMPOUND = 4
ACCESS_ALLOWED_OBJECT = 5
ACCESS_DENIED_OBJECT = 6
SYSTEM_AUDIT_OBJECT = 7
SYSTEM_ALARM_OBJECT = 8
ACCESS_ALLOWED_CALLBACK = 9
ACCESS_DENIED_CALLBACK = 10
ACCESS_ALLOWED_CALLBACK_OBJECT = 11
ACCESS_DENIED_CALLBACK_OBJECT = 12
SYSTEM_AUDIT_CALLBACK = 13
SYSTEM_ALARM_CALLBACK = 14
SYSTEM_AUDIT_CALLBACK_OBJECT = 15
SYSTEM_ALARM_CALLBACK_OBJECT = 16
class ACEFlag(IntEnum):
OBJECT_INHERIT = 0x01
CONTAINER_INHERIT = 0x02
NO_PROPAGATE_INHERIT = 0x04
INHERIT_ONLY = 0x08
INHERITED = 0x10
SUCCESSFUL_ACCESS = 0x40
FAILED_ACCESS = 0x80
class ACERight(IntEnum):
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
GENERIC_EXECUTE = 0x20000000
GENERIC_ALL = 0x10000000
DELETE = 0x00010000
READ_CONTROL = 0x00020000
WRITE_DAC = 0x00040000
WRITE_OWNER = 0x00080000
SYNCHRONIZE = 0x00100000
class ACE:
def __init__(
self,
ace_type: ACEType,
ace_flags: int,
access_mask: int,
sid: SID,
object_type: Optional[str] = None,
inherited_object_type: Optional[str] = None
) -> None:
"""
Initialize Access Control Entry.
Parameters:
- ace_type: Type of ACE (allow, deny, audit, etc.)
- ace_flags: ACE flags (inheritance, audit success/failure)
- access_mask: Access rights mask
- sid: Security identifier for the principal
- object_type: GUID for object-specific ACE (optional)
- inherited_object_type: GUID for inheritance (optional)
"""
@classmethod
def from_binary(cls, data: bytes) -> "ACE":
"""
Create ACE from binary data.
Parameters:
- data: Binary ACE data
Returns:
ACE object
"""
def to_binary(self) -> bytes:
"""
Convert ACE to binary format.
Returns:
Binary representation of ACE
"""
@property
def ace_type(self) -> ACEType:
"""Type of access control entry."""
@property
def ace_flags(self) -> int:
"""ACE flags bitfield."""
@property
def access_mask(self) -> int:
"""Access rights mask."""
@property
def sid(self) -> SID:
"""Security identifier."""
@property
def object_type(self) -> Optional[str]:
"""Object type GUID for object-specific ACEs."""
@property
def inherited_object_type(self) -> Optional[str]:
"""Inherited object type GUID."""
@property
def short_name(self) -> str:
"""Short name for ACE type (for SDDL format)."""
@property
def is_object_type(self) -> bool:
"""Whether this ACE type supports object-specific permissions."""
class ACL:
def __init__(self, revision: ACLRevision = ACLRevision.ACL_REVISION) -> None:
"""
Initialize Access Control List.
Parameters:
- revision: ACL revision (ACL_REVISION or ACL_REVISION_DS)
"""
@classmethod
def from_binary(cls, data: bytes) -> "ACL":
"""
Create ACL from binary data.
Parameters:
- data: Binary ACL data
Returns:
ACL object
"""
def to_binary(self) -> bytes:
"""
Convert ACL to binary format.
Returns:
Binary representation of ACL
"""
def add_ace(self, ace: ACE, index: Optional[int] = None) -> None:
"""
Add ACE to the ACL.
Parameters:
- ace: ACE object to add
- index: Position to insert ACE (None to append)
"""
def remove_ace(self, index: int) -> None:
"""
Remove ACE from ACL by index.
Parameters:
- index: Index of ACE to remove
"""
def __len__(self) -> int:
"""Number of ACEs in the ACL."""
def __getitem__(self, index: int) -> ACE:
"""Get ACE by index."""
def __iter__(self):
"""Iterate over ACEs."""
@property
def revision(self) -> ACLRevision:
"""ACL revision."""
@property
def aces(self) -> List[ACE]:
"""List of Access Control Entries."""Windows Security Descriptor handling with support for owner, group, DACL, and SACL components.
from bonsai.active_directory import SecurityDescriptor
class SecurityDescriptor:
def __init__(
self,
control: Dict[str, bool],
owner_sid: Optional[SID],
group_sid: Optional[SID],
sacl: Optional[ACL],
dacl: Optional[ACL],
revision: int = 1,
sbz1: int = 0,
) -> None:
"""
Initialize Security Descriptor.
Parameters:
- control: Control flags dictionary
- owner_sid: Owner security identifier
- group_sid: Group security identifier
- sacl: System Access Control List (for auditing)
- dacl: Discretionary Access Control List (for access control)
- revision: Security descriptor revision
- sbz1: Reserved field
"""
@classmethod
def from_binary(cls, data: bytes) -> "SecurityDescriptor":
"""
Create SecurityDescriptor from binary data.
Parameters:
- data: Binary security descriptor data in little-endian format
Returns:
SecurityDescriptor object
"""
def to_binary(self) -> bytes:
"""
Convert SecurityDescriptor to binary format.
Returns:
Binary representation in little-endian byte order
"""
def set_control(self, control: Union[Dict[str, bool], int]) -> None:
"""
Set control flags for the security descriptor.
Parameters:
- control: Control flags as dictionary or integer bitmask
"""
def set_owner_sid(self, owner_sid: SID) -> None:
"""Set owner SID."""
def set_group_sid(self, group_sid: SID) -> None:
"""Set group SID."""
def set_dacl(self, dacl: Optional[ACL]) -> None:
"""Set discretionary access control list."""
def set_sacl(self, sacl: Optional[ACL]) -> None:
"""Set system access control list."""
@property
def revision(self) -> int:
"""Security descriptor revision."""
@property
def control(self) -> Dict[str, bool]:
"""Control flags dictionary with keys like:
- owner_defaulted, group_defaulted
- dacl_present, dacl_defaulted, dacl_protected
- sacl_present, sacl_defaulted, sacl_protected
- self_relative, dacl_auto_inherited, sacl_auto_inherited
"""
@property
def owner_sid(self) -> Optional[SID]:
"""Owner security identifier."""
@property
def group_sid(self) -> Optional[SID]:
"""Group security identifier."""
@property
def dacl(self) -> Optional[ACL]:
"""Discretionary access control list."""
@property
def sacl(self) -> Optional[ACL]:
"""System access control list."""Parse and manipulate Windows User Account Control flags for user accounts.
from bonsai.active_directory import UserAccountControl
class UserAccountControl:
def __init__(self, flags: int) -> None:
"""
Initialize User Account Control parser.
Parameters:
- flags: Integer value of userAccountControl attribute
"""
@property
def properties(self) -> Dict[str, bool]:
"""
Dictionary of user account properties including:
- script: Login script execution
- accountdisable: Account is disabled
- homedir_required: Home directory required
- lockout: Account is locked out
- passwd_notreqd: Password not required
- passwd_cant_change: User cannot change password
- encrypted_text_pwd_allowed: Encrypted text password allowed
- temp_duplicate_account: Temporary duplicate account
- normal_account: Normal user account
- interdomain_trust_account: Interdomain trust account
- workstation_trust_account: Workstation trust account
- server_trust_account: Server trust account
- dont_expire_password: Password doesn't expire
- mns_logon_account: MNS logon account
- smartcard_required: Smart card required for logon
- trusted_for_delegation: Account trusted for delegation
- not_delegated: Account cannot be delegated
- use_des_key_only: Use DES keys only
- dont_req_preauth: Kerberos pre-authentication not required
- password_expired: Password expired
- trusted_to_auth_for_delegation: Trusted to authenticate for delegation
- partial_secrets_account: Partial secrets account (RODC)
"""
@property
def value(self) -> int:
"""Integer value computed from properties."""
# Convenience properties for common flags
@property
def account_disabled(self) -> bool:
"""Account is disabled."""
@property
def password_not_required(self) -> bool:
"""Password not required."""
@property
def password_cant_change(self) -> bool:
"""User cannot change password."""
@property
def password_expired(self) -> bool:
"""Password has expired."""
@property
def account_locked_out(self) -> bool:
"""Account is locked out."""
@property
def normal_account(self) -> bool:
"""Normal user account."""
@property
def dont_expire_password(self) -> bool:
"""Password never expires."""
@property
def smartcard_required(self) -> bool:
"""Smart card required for interactive logon."""from bonsai.active_directory import SID
# Create SID from string representation
user_sid = SID(str_rep="S-1-5-21-1234567890-987654321-1111111111-1001")
print(f"User SID: {user_sid}")
print(f"Domain SID: {user_sid.get_account_domain_sid()}")
print(f"RID: {user_sid.get_rid()}")
# Create SID from binary data (from LDAP objectSid attribute)
binary_sid = entry['objectSid'][0] # Raw bytes from LDAP
sid_obj = SID(bytes_le=binary_sid)
print(f"SID from binary: {sid_obj}")
# Compare SIDs
admin_sid = SID(str_rep="S-1-5-32-544") # Built-in Administrators
if admin_sid.is_well_known():
print("This is a well-known SID")from bonsai.active_directory import SecurityDescriptor, SID
from bonsai import LDAPClient
# Get security descriptor from Active Directory
client = LDAPClient("ldap://dc.example.com")
client.set_credentials("SIMPLE", user="admin@example.com", password="secret")
with client.connect() as conn:
# Search for user and get ntSecurityDescriptor
results = conn.search(
"cn=Users,dc=example,dc=com",
2,
"(sAMAccountName=jdoe)",
["ntSecurityDescriptor"]
)
if results:
# Parse security descriptor from binary attribute
sd_binary = results[0]['ntSecurityDescriptor'][0]
security_desc = SecurityDescriptor.from_binary(sd_binary)
print(f"Owner: {security_desc.owner_sid}")
print(f"Group: {security_desc.group_sid}")
print(f"Control flags: {security_desc.control}")
# Examine DACL
if security_desc.dacl:
print(f"DACL has {len(security_desc.dacl)} ACEs:")
for i, ace in enumerate(security_desc.dacl):
print(f" ACE {i}: {ace.ace_type.name} for {ace.sid}")
print(f" Access mask: 0x{ace.access_mask:08x}")
print(f" Flags: 0x{ace.ace_flags:02x}")from bonsai.active_directory import ACL, ACE, ACEType, ACERight, SID
# Create new DACL
dacl = ACL()
# Add Allow ACE for user
user_sid = SID(str_rep="S-1-5-21-1234567890-987654321-1111111111-1001")
allow_ace = ACE(
ace_type=ACEType.ACCESS_ALLOWED,
ace_flags=0,
access_mask=ACERight.GENERIC_READ | ACERight.GENERIC_WRITE,
sid=user_sid
)
dacl.add_ace(allow_ace)
# Add Deny ACE for guest account
guest_sid = SID(str_rep="S-1-5-21-1234567890-987654321-1111111111-501")
deny_ace = ACE(
ace_type=ACEType.ACCESS_DENIED,
ace_flags=0,
access_mask=ACERight.GENERIC_ALL,
sid=guest_sid
)
dacl.add_ace(deny_ace, index=0) # Insert at beginning for precedence
print(f"DACL has {len(dacl)} ACEs")
for i, ace in enumerate(dacl):
print(f"ACE {i}: {ace.ace_type.name} for {ace.sid}")from bonsai.active_directory import UserAccountControl
# Parse userAccountControl from Active Directory
with client.connect() as conn:
results = conn.search(
"cn=Users,dc=example,dc=com",
2,
"(sAMAccountName=jdoe)",
["userAccountControl"]
)
if results:
uac_value = int(results[0]['userAccountControl'][0])
uac = UserAccountControl(uac_value)
print(f"Account Control Value: {uac.value}")
print(f"Account Disabled: {uac.account_disabled}")
print(f"Password Required: {not uac.password_not_required}")
print(f"Password Expires: {not uac.dont_expire_password}")
print(f"Smart Card Required: {uac.smartcard_required}")
print(f"Account Locked: {uac.account_locked_out}")
# Check all properties
for prop, value in uac.properties.items():
if value:
print(f" {prop}: {value}")from bonsai.active_directory import SecurityDescriptor, ACL, ACE, ACEType, ACERight, SID
# Create new security descriptor
control_flags = {
"dacl_present": True,
"dacl_defaulted": False,
"self_relative": True,
"dacl_protected": False
}
owner_sid = SID(str_rep="S-1-5-21-1234567890-987654321-1111111111-1001")
group_sid = SID(str_rep="S-1-5-21-1234567890-987654321-1111111111-513")
# Create DACL with full control for owner
dacl = ACL()
full_control_ace = ACE(
ace_type=ACEType.ACCESS_ALLOWED,
ace_flags=0,
access_mask=ACERight.GENERIC_ALL,
sid=owner_sid
)
dacl.add_ace(full_control_ace)
# Create security descriptor
security_desc = SecurityDescriptor(
control=control_flags,
owner_sid=owner_sid,
group_sid=group_sid,
dacl=dacl,
sacl=None
)
# Convert to binary for storing back to LDAP
sd_binary = security_desc.to_binary()
# Update LDAP entry (requires appropriate permissions)
with client.connect() as conn:
entry = conn.search("cn=test-object,dc=example,dc=com", 0)[0]
entry['ntSecurityDescriptor'] = sd_binary
conn.modify(entry)Install with Tessl CLI
npx tessl i tessl/pypi-bonsai