Higher Level Zookeeper Client providing distributed coordination and configuration management primitives.
—
Access control and authentication mechanisms for securing Zookeeper nodes. Includes ACL creation, permission management, and authentication schemes with support for digest, IP, SASL, and world authentication patterns.
ACL data structures and permission management for controlling node access with fine-grained permission control and multiple authentication schemes.
class ACL:
def __init__(self, perms, id):
"""
Access Control List entry.
Parameters:
- perms (int): Permission mask (combination of Permissions constants)
- id (Id): Identity object specifying scheme and credential
Attributes:
- perms (int): Permission bitmask
- id (Id): Identity specification
"""
class Id:
def __init__(self, scheme, id):
"""
Identity specification for ACL.
Parameters:
- scheme (str): Authentication scheme (world, auth, digest, ip, sasl)
- id (str): Identity credential for the scheme
Attributes:
- scheme (str): Authentication scheme
- id (str): Identity credential
"""
class Permissions:
"""Permission constants for ACL entries."""
READ: int = 1 # Read node data and list children
WRITE: int = 2 # Write node data
CREATE: int = 4 # Create child nodes
DELETE: int = 8 # Delete node or child nodes
ADMIN: int = 16 # Set ACL permissions
ALL: int = 31 # All permissions (READ | WRITE | CREATE | DELETE | ADMIN)Common ACL configurations for typical security patterns with appropriate permission sets for different use cases.
# Identity constants
ANYONE_ID_UNSAFE = Id("world", "anyone")
"""Identity for world/anyone access (insecure)."""
AUTH_IDS = Id("auth", "")
"""Identity for authenticated users."""
# Pre-defined ACL lists
OPEN_ACL_UNSAFE = [ACL(Permissions.ALL, ANYONE_ID_UNSAFE)]
"""ACL allowing all permissions to anyone (insecure)."""
CREATOR_ALL_ACL = [ACL(Permissions.ALL, AUTH_IDS)]
"""ACL giving all permissions to creator only."""
READ_ACL_UNSAFE = [ACL(Permissions.READ, ANYONE_ID_UNSAFE)]
"""ACL allowing read permission to anyone (insecure)."""Utility functions for creating ACLs with specific permission sets and authentication schemes for common security patterns.
def make_acl(scheme, credential, read=False, write=False, create=False,
delete=False, admin=False, all=False):
"""
Create an ACL with specified permissions.
Parameters:
- scheme (str): Authentication scheme (world, auth, digest, ip, sasl)
- credential (str): Authentication credential
- read (bool): Grant read permission
- write (bool): Grant write permission
- create (bool): Grant create permission
- delete (bool): Grant delete permission
- admin (bool): Grant admin permission
- all (bool): Grant all permissions
Returns:
list: List containing single ACL object
"""
def make_digest_acl(username, password, read=False, write=False, create=False,
delete=False, admin=False, all=False):
"""
Create a digest-based ACL.
Parameters:
- username (str): Username for digest authentication
- password (str): Password for digest authentication
- read (bool): Grant read permission
- write (bool): Grant write permission
- create (bool): Grant create permission
- delete (bool): Grant delete permission
- admin (bool): Grant admin permission
- all (bool): Grant all permissions
Returns:
list: List containing digest ACL
"""
def make_digest_acl_credential(username, password):
"""
Create digest credential string.
Parameters:
- username (str): Username for authentication
- password (str): Password for authentication
Returns:
str: Digest credential in format "username:base64(sha1(username:password))"
"""Different authentication mechanisms supported by Zookeeper with appropriate credential formats and usage patterns.
# Authentication scheme constants and patterns
# World scheme - open access (not recommended for production)
world_acl = [ACL(Permissions.ALL, Id("world", "anyone"))]
# Auth scheme - authenticated users only
auth_acl = [ACL(Permissions.ALL, Id("auth", ""))]
# Digest scheme - username/password authentication
digest_credential = make_digest_acl_credential("admin", "secret")
digest_acl = [ACL(Permissions.ALL, Id("digest", digest_credential))]
# IP scheme - IP address-based access control
ip_acl = [ACL(Permissions.READ, Id("ip", "192.168.1.0/24"))]
# SASL scheme - SASL authentication (Kerberos, etc.)
sasl_acl = [ACL(Permissions.ALL, Id("sasl", "principal@REALM"))]Methods for authenticating clients with Zookeeper using various authentication schemes and credential management.
# KazooClient authentication methods (from core-client.md)
def add_auth(self, scheme, credential):
"""
Add authentication credentials to client.
Parameters:
- scheme (str): Authentication scheme (digest, sasl, etc.)
- credential (str): Authentication credential
Raises:
- AuthFailedError: If authentication fails
"""
def add_auth_async(self, scheme, credential):
"""
Asynchronous authentication.
Returns:
IAsyncResult: Async result object
"""Methods for getting and setting ACLs on Zookeeper nodes with version control and permission validation.
# KazooClient ACL methods (from core-client.md)
def get_acls(self, path):
"""
Get ACLs for a node.
Parameters:
- path (str): Node path
Returns:
tuple: (acl_list, ZnodeStat) containing ACLs and node metadata
Raises:
- NoNodeError: If node doesn't exist
"""
def set_acls(self, path, acls, version=-1):
"""
Set ACLs for a node.
Parameters:
- path (str): Node path
- acls (list): List of ACL objects
- version (int): Expected ACL version (-1 for any version)
Returns:
ZnodeStat: Updated node metadata
Raises:
- NoNodeError: If node doesn't exist
- BadVersionError: If ACL version doesn't match
- InvalidACLError: If ACL format is invalid
"""Secure socket layer configuration for encrypted client-server communication with certificate management and validation.
# KazooClient SSL parameters (from core-client.md)
def __init__(self, hosts="127.0.0.1:2181", ...,
keyfile=None, keyfile_password=None, certfile=None,
ca=None, use_ssl=False, verify_certs=True):
"""
SSL/TLS configuration parameters:
- keyfile (str): Path to SSL private key file
- keyfile_password (str): Password for encrypted private key
- certfile (str): Path to SSL certificate file
- ca (str): Path to SSL CA certificate file
- use_ssl (bool): Enable SSL/TLS connections
- verify_certs (bool): Verify SSL certificates
"""SASL (Simple Authentication and Security Layer) configuration for enterprise authentication integration with Kerberos and other mechanisms.
# KazooClient SASL parameters (from core-client.md)
def __init__(self, hosts="127.0.0.1:2181", ...,
sasl_options=None, auth_data=None):
"""
SASL configuration parameters:
- sasl_options (dict): SASL mechanism options
- auth_data (list): List of (scheme, credential) tuples
"""
# SASL configuration example
sasl_options = {
'mechanism': 'GSSAPI',
'principal': 'zkclient/hostname@REALM'
}from kazoo.client import KazooClient
from kazoo.security import make_digest_acl, make_acl, Permissions
from kazoo.security import OPEN_ACL_UNSAFE, CREATOR_ALL_ACL
zk = KazooClient()
zk.start()
try:
# Create node with open ACL (not recommended for production)
zk.create("/public", b"public data", acl=OPEN_ACL_UNSAFE)
# Create node with restricted access
admin_acl = make_digest_acl("admin", "secret", all=True)
zk.create("/private", b"private data", acl=admin_acl, makepath=True)
# Create node with mixed permissions
mixed_acl = [
make_acl("digest", "admin:hash", all=True)[0], # Admin full access
make_acl("digest", "user:hash", read=True, write=True)[0], # User read/write
make_acl("ip", "192.168.1.0/24", read=True)[0] # Network read-only
]
zk.create("/mixed", b"mixed access", acl=mixed_acl)
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.security import make_digest_acl_credential, make_digest_acl
from kazoo.exceptions import AuthFailedError
# Create client and authenticate
zk = KazooClient()
zk.start()
try:
# Add digest authentication
zk.add_auth("digest", "admin:secret")
# Create ACL for authenticated user
admin_acl = make_digest_acl("admin", "secret", all=True)
# Create protected node
zk.create("/secure/config", b'{"setting": "value"}',
acl=admin_acl, makepath=True)
# Access protected node (works because we're authenticated)
data, stat = zk.get("/secure/config")
print(f"Secure data: {data}")
# Update ACLs
read_only_acl = make_digest_acl("admin", "secret", read=True)
zk.set_acls("/secure/config", read_only_acl, version=stat.aversion)
except AuthFailedError:
print("Authentication failed - check credentials")
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.security import make_digest_acl_credential, ACL, Id, Permissions
zk = KazooClient()
zk.start()
try:
# Create credentials for different users
admin_cred = make_digest_acl_credential("admin", "admin_pass")
user_cred = make_digest_acl_credential("user", "user_pass")
readonly_cred = make_digest_acl_credential("readonly", "readonly_pass")
# Create multi-user ACL
multi_user_acl = [
ACL(Permissions.ALL, Id("digest", admin_cred)), # Admin: full access
ACL(Permissions.READ | Permissions.WRITE, Id("digest", user_cred)), # User: read/write
ACL(Permissions.READ, Id("digest", readonly_cred)), # ReadOnly: read only
ACL(Permissions.READ, Id("ip", "192.168.1.0/24")) # Network: read from LAN
]
# Authenticate as admin
zk.add_auth("digest", "admin:admin_pass")
# Create node with multi-user access
zk.create("/shared/resource", b"shared data", acl=multi_user_acl, makepath=True)
# View current ACLs
acls, stat = zk.get_acls("/shared/resource")
print(f"Node has {len(acls)} ACL entries")
for acl in acls:
print(f" {acl.id.scheme}:{acl.id.id} -> permissions: {acl.perms}")
finally:
zk.stop()from kazoo.client import KazooClient
# Configure SSL connection
zk = KazooClient(
hosts='secure-zk1:2282,secure-zk2:2282', # SSL port
use_ssl=True,
certfile='/path/to/client.pem', # Client certificate
keyfile='/path/to/client-key.pem', # Client private key
keyfile_password='key_password', # Key password if encrypted
ca='/path/to/ca.pem' # CA certificate
)
try:
zk.start()
print("Secure connection established")
# Use secure connection for operations
zk.create("/secure", b"encrypted data", makepath=True)
finally:
zk.stop()from kazoo.client import KazooClient
# Configure SASL authentication
sasl_options = {
'mechanism': 'GSSAPI',
'service': 'zookeeper',
'principal': 'zkclient/hostname@EXAMPLE.COM'
}
zk = KazooClient(
hosts='kerb-zk1:2181,kerb-zk2:2181',
sasl_options=sasl_options,
auth_data=[('sasl', 'zkclient@EXAMPLE.COM')]
)
try:
zk.start()
print("SASL authentication successful")
# Create node accessible to SASL principal
sasl_acl = [ACL(Permissions.ALL, Id("sasl", "zkclient@EXAMPLE.COM"))]
zk.create("/sasl-protected", b"kerberos data", acl=sasl_acl, makepath=True)
finally:
zk.stop()from kazoo.client import KazooClient
from kazoo.security import make_digest_acl, Permissions
from kazoo.exceptions import NoAuthError
# Test different permission levels
def test_permissions():
# Create admin client
admin_zk = KazooClient()
admin_zk.start()
admin_zk.add_auth("digest", "admin:secret")
# Create user client
user_zk = KazooClient()
user_zk.start()
user_zk.add_auth("digest", "user:password")
try:
# Admin creates node with user read-only access
user_acl = make_digest_acl("user", "password", read=True)
admin_zk.create("/readonly", b"test data", acl=user_acl, makepath=True)
# User can read
data, stat = user_zk.get("/readonly")
print(f"User read: {data}")
# User cannot write (will raise NoAuthError)
try:
user_zk.set("/readonly", b"modified")
print("ERROR: User should not be able to write!")
except NoAuthError:
print("Correct: User cannot write to read-only node")
finally:
admin_zk.stop()
user_zk.stop()
test_permissions()Install with Tessl CLI
npx tessl i tessl/pypi-kazoo