CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

admin.mddocs/

Administrative Operations

Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs). Provides comprehensive cluster management capabilities.

Capabilities

KafkaAdminClient

Main administrative client for cluster management operations.

class KafkaAdminClient:
    def __init__(self, **configs):
        """
        Create a KafkaAdminClient instance.
        
        Args:
            **configs: Admin client configuration options including:
                bootstrap_servers (list): List of Kafka brokers
                client_id (str): Client identifier
                request_timeout_ms (int): Request timeout
                connections_max_idle_ms (int): Connection idle timeout
                retry_backoff_ms (int): Retry backoff time
                security_protocol (str): Security protocol
                ssl_context: SSL context
                sasl_mechanism (str): SASL mechanism
                sasl_plain_username (str): SASL username
                sasl_plain_password (str): SASL password
        """
        
    def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
        """
        Create new topics.
        
        Args:
            new_topics (list): List of NewTopic objects
            timeout_ms (int): Operation timeout
            validate_only (bool): Only validate, don't create
            
        Returns:
            dict: Dictionary mapping topic name to CreateTopicsResponse.topic_errors
        """
        
    def delete_topics(self, topics, timeout_ms=None):
        """
        Delete topics.
        
        Args:
            topics (list): List of topic names to delete
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping topic name to DeleteTopicsResponse.topic_errors
        """
        
    def list_topics(self, timeout_ms=None):
        """
        List all topics in cluster.
        
        Args:
            timeout_ms (int): Operation timeout
            
        Returns:
            ClusterMetadata: Cluster metadata with topic information
        """
        
    def describe_topics(self, topics, timeout_ms=None):
        """
        Get detailed information about topics.
        
        Args:
            topics (list): List of topic names
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping topic name to TopicMetadata
        """
        
    def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
        """
        Add partitions to existing topics.
        
        Args:
            partition_updates (dict): Dictionary mapping topic name to NewPartitions
            timeout_ms (int): Operation timeout
            validate_only (bool): Only validate, don't create
            
        Returns:
            dict: Dictionary mapping topic name to CreatePartitionsResponse.topic_errors
        """
        
    def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False):
        """
        Get configuration for resources.
        
        Args:
            config_resources (list): List of ConfigResource objects
            timeout_ms (int): Operation timeout
            include_synonyms (bool): Include config synonyms
            
        Returns:
            dict: Dictionary mapping ConfigResource to DescribeConfigsResponse.resources
        """
        
    def alter_configs(self, config_updates, timeout_ms=None, validate_only=False):
        """
        Alter configuration for resources.
        
        Args:
            config_updates (dict): Dictionary mapping ConfigResource to config changes
            timeout_ms (int): Operation timeout
            validate_only (bool): Only validate, don't alter
            
        Returns:
            dict: Dictionary mapping ConfigResource to AlterConfigsResponse.resources
        """
        
    def list_consumer_groups(self, timeout_ms=None):
        """
        List consumer groups in cluster.
        
        Args:
            timeout_ms (int): Operation timeout
            
        Returns:
            list: List of GroupInformation objects
        """
        
    def describe_consumer_groups(self, group_ids, timeout_ms=None):
        """
        Get detailed information about consumer groups.
        
        Args:
            group_ids (list): List of consumer group IDs
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping group ID to GroupInformation
        """
        
    def delete_consumer_groups(self, group_ids, timeout_ms=None):
        """
        Delete consumer groups.
        
        Args:
            group_ids (list): List of consumer group IDs to delete
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping group ID to delete response
        """
        
    def list_consumer_group_offsets(self, group_id, partitions=None, timeout_ms=None):
        """
        Get committed offsets for consumer group.
        
        Args:
            group_id (str): Consumer group ID
            partitions (list): List of TopicPartition objects (None = all)
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping TopicPartition to OffsetAndMetadata
        """
        
    def alter_consumer_group_offsets(self, group_id, offsets, timeout_ms=None):
        """
        Alter committed offsets for consumer group.
        
        Args:
            group_id (str): Consumer group ID
            offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping TopicPartition to alter response
        """
        
    def create_acls(self, acls, timeout_ms=None):
        """
        Create access control lists.
        
        Args:
            acls (list): List of ACL objects
            timeout_ms (int): Operation timeout
            
        Returns:
            list: List of CreateAclsResponse.creation_responses
        """
        
    def describe_acls(self, acl_filter, timeout_ms=None):
        """
        Describe access control lists matching filter.
        
        Args:
            acl_filter (ACLFilter): Filter for ACLs to describe
            timeout_ms (int): Operation timeout
            
        Returns:
            list: List of ACL objects matching filter
        """
        
    def delete_acls(self, acl_filters, timeout_ms=None):
        """
        Delete access control lists matching filters.
        
        Args:
            acl_filters (list): List of ACLFilter objects
            timeout_ms (int): Operation timeout
            
        Returns:
            list: List of DeleteAclsResponse.filter_responses
        """
        
    def describe_cluster(self, timeout_ms=None):
        """
        Get cluster metadata including brokers and cluster ID.
        
        Args:
            timeout_ms (int): Operation timeout
            
        Returns:
            ClusterMetadata: Cluster information including brokers and cluster ID
        """
        
    def describe_log_dirs(self, broker_ids=None, timeout_ms=None):
        """
        Describe log directories on brokers.
        
        Args:
            broker_ids (list): List of broker IDs (None = all brokers)
            timeout_ms (int): Operation timeout
            
        Returns:
            dict: Dictionary mapping broker ID to log directory information
        """
        
    def close(self):
        """Close the admin client and release resources."""

Topic Management Types

Types for creating and managing topics.

class NewTopic:
    def __init__(self, name, num_partitions, replication_factor, replica_assignments=None, topic_configs=None):
        """
        Specification for creating a new topic.
        
        Args:
            name (str): Topic name
            num_partitions (int): Number of partitions
            replication_factor (int): Replication factor
            replica_assignments (dict): Custom replica assignments (optional)
            topic_configs (dict): Topic configuration properties (optional)
        """
        
    name: str
    num_partitions: int
    replication_factor: int
    replica_assignments: dict
    topic_configs: dict

class NewPartitions:
    def __init__(self, total_count, new_assignments=None):
        """
        Specification for adding partitions to existing topic.
        
        Args:
            total_count (int): New total partition count
            new_assignments (list): Replica assignments for new partitions (optional)
        """
        
    total_count: int
    new_assignments: list

Configuration Management Types

Types for managing resource configurations.

class ConfigResource:
    def __init__(self, resource_type, name, configs=None):
        """
        Resource for configuration operations.
        
        Args:
            resource_type (ConfigResourceType): Type of resource
            name (str): Resource name
            configs (dict): Configuration properties (optional)
        """
        
    resource_type: ConfigResourceType
    name: str
    configs: dict

class ConfigResourceType:
    BROKER = 4      # Broker configuration
    TOPIC = 2       # Topic configuration

Access Control Types

Types for managing access control lists (ACLs).

class ACL:
    def __init__(self, principal, host, operation, permission_type, resource_pattern):
        """
        Access control list entry.
        
        Args:
            principal (str): Principal (user/service account)
            host (str): Host pattern
            operation (ACLOperation): Operation type
            permission_type (ACLPermissionType): Permission type
            resource_pattern (ResourcePattern): Resource pattern
        """
        
    principal: str
    host: str
    operation: ACLOperation
    permission_type: ACLPermissionType
    resource_pattern: ResourcePattern

class ACLFilter:
    def __init__(self, principal=None, host=None, operation=None, permission_type=None, resource_pattern=None):
        """Filter for ACL operations."""
        
class ResourcePattern:
    def __init__(self, resource_type, resource_name, pattern_type):
        """
        Resource pattern for ACL matching.
        
        Args:
            resource_type (ResourceType): Type of resource
            resource_name (str): Resource name/pattern  
            pattern_type (ACLResourcePatternType): Pattern matching type
        """
        
    resource_type: ResourceType
    resource_name: str
    pattern_type: ACLResourcePatternType

class ACLOperation:
    ANY = 1
    ALL = 2
    READ = 3
    WRITE = 4
    CREATE = 5
    DELETE = 6
    ALTER = 7
    DESCRIBE = 8
    CLUSTER_ACTION = 9
    DESCRIBE_CONFIGS = 10
    ALTER_CONFIGS = 11
    IDEMPOTENT_WRITE = 12

class ACLPermissionType:
    ANY = 1
    DENY = 2
    ALLOW = 3

class ResourceType:
    UNKNOWN = 0
    ANY = 1
    CLUSTER = 4
    DELEGATION_TOKEN = 6
    GROUP = 3
    TOPIC = 2
    TRANSACTIONAL_ID = 5

class ACLResourcePatternType:
    ANY = 1
    MATCH = 2
    LITERAL = 3
    PREFIXED = 4

Usage Examples

Topic Management

from kafka import KafkaAdminClient
from kafka.admin import NewTopic, NewPartitions

# Create admin client
admin = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],
    client_id='admin-client'
)

# Create topics
topics = [
    NewTopic(
        name='user-events',
        num_partitions=6,
        replication_factor=3,
        topic_configs={
            'cleanup.policy': 'compact',
            'retention.ms': '604800000'  # 7 days
        }
    ),
    NewTopic(
        name='analytics',
        num_partitions=12,
        replication_factor=2
    )
]

result = admin.create_topics(topics, timeout_ms=30000)
for topic, error in result.values():
    if error is None:
        print(f"Topic {topic} created successfully")
    else:
        print(f"Failed to create topic {topic}: {error}")

# Add partitions to existing topic
partition_updates = {
    'user-events': NewPartitions(total_count=10)
}
admin.create_partitions(partition_updates)

# Delete topics
admin.delete_topics(['old-topic'], timeout_ms=30000)

admin.close()

Configuration Management

from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Get topic configuration
topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'my-topic')
config_result = admin.describe_configs([topic_resource])

for resource, config_response in config_result.items():
    print(f"Configuration for {resource.name}:")
    for config in config_response.configs:
        print(f"  {config.name} = {config.value}")

# Alter topic configuration
config_updates = {
    topic_resource: {
        'retention.ms': '86400000',  # 1 day
        'segment.ms': '3600000'      # 1 hour
    }
}
admin.alter_configs(config_updates)

admin.close()

Consumer Group Management

from kafka import KafkaAdminClient
from kafka.structs import TopicPartition, OffsetAndMetadata

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# List all consumer groups
groups = admin.list_consumer_groups()
for group in groups:
    print(f"Group: {group.group}, State: {group.state}")

# Get detailed group information
group_details = admin.describe_consumer_groups(['my-consumer-group'])
for group_id, group_info in group_details.items():
    print(f"Group {group_id}:")
    print(f"  State: {group_info.state}")
    print(f"  Protocol: {group_info.protocol}")
    print(f"  Members: {len(group_info.members)}")

# Get committed offsets
group_id = 'my-consumer-group' 
offsets = admin.list_consumer_group_offsets(group_id)
for partition, offset_metadata in offsets.items():
    print(f"{partition.topic}:{partition.partition} = {offset_metadata.offset}")

# Reset offsets
new_offsets = {
    TopicPartition('my-topic', 0): OffsetAndMetadata(1000, 'reset'),
    TopicPartition('my-topic', 1): OffsetAndMetadata(2000, 'reset')
}
admin.alter_consumer_group_offsets(group_id, new_offsets)

admin.close()

Access Control Management

from kafka import KafkaAdminClient
from kafka.admin import (ACL, ACLFilter, ResourcePattern, 
                        ACLOperation, ACLPermissionType, 
                        ResourceType, ACLResourcePatternType)

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Create ACLs
acls = [
    ACL(
        principal='User:alice',
        host='*',
        operation=ACLOperation.READ,
        permission_type=ACLPermissionType.ALLOW,
        resource_pattern=ResourcePattern(
            resource_type=ResourceType.TOPIC,
            resource_name='user-data',
            pattern_type=ACLResourcePatternType.LITERAL
        )
    ),
    ACL(
        principal='User:service-account',
        host='*', 
        operation=ACLOperation.WRITE,
        permission_type=ACLPermissionType.ALLOW,
        resource_pattern=ResourcePattern(
            resource_type=ResourceType.TOPIC,
            resource_name='events-',
            pattern_type=ACLResourcePatternType.PREFIXED
        )
    )
]

admin.create_acls(acls)

# List ACLs
acl_filter = ACLFilter(
    resource_pattern=ResourcePatternFilter(
        resource_type=ResourceType.TOPIC,
        resource_name=None,  # All topics
        pattern_type=ACLResourcePatternType.ANY
    )
)
existing_acls = admin.describe_acls(acl_filter)
for acl in existing_acls:
    print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.resource_pattern}")

admin.close()

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json