CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Overview
Eval results
Files

admin.mddocs/

Administrative API

Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.

Capabilities

KafkaAdminClient

Main administrative client providing comprehensive cluster management capabilities. Supports all Kafka administrative operations with proper error handling and timeouts.

class KafkaAdminClient:
    def __init__(self, **configs):
        """
        Initialize admin client.
        
        Configuration Parameters:
        - bootstrap_servers: List[str], broker addresses
        - client_id: str, client identifier
        - connections_max_idle_ms: int, max idle time (default: 540000)
        - request_timeout_ms: int, request timeout (default: 30000)
        - retry_backoff_ms: int, retry backoff (default: 100)
        - reconnect_backoff_ms: int, reconnect backoff (default: 50)
        - security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
        - api_version: tuple, broker API version or 'auto'
        """
    
    def create_topics(self, topic_requests, timeout_ms=None, validate_only=False):
        """
        Create new topics.
        
        Parameters:
        - topic_requests: List[NewTopic], topic creation requests
        - timeout_ms: int, operation timeout
        - validate_only: bool, validate without creating
        
        Returns:
        - Dict[str, Future]: topic name to creation result future
        """
    
    def delete_topics(self, topics, timeout_ms=None):
        """
        Delete topics.
        
        Parameters:
        - topics: List[str], topic names to delete
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[str, Future]: topic name to deletion result future
        """
    
    def list_topics(self, timeout_ms=None):
        """
        List available topics.
        
        Parameters:
        - timeout_ms: int, operation timeout
        
        Returns:
        - Set[str]: available topic names
        """
    
    def describe_topics(self, topics, timeout_ms=None):
        """
        Get detailed topic information.
        
        Parameters:
        - topics: List[str], topic names to describe
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[str, TopicDescription]: topic descriptions
        """
    
    def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
        """
        Add partitions to existing topics.
        
        Parameters:
        - partition_updates: Dict[str, NewPartitions], topic to partition updates
        - timeout_ms: int, operation timeout
        - validate_only: bool, validate without creating
        
        Returns:
        - Dict[str, Future]: topic name to result future
        """
    
    def describe_configs(self, config_resources, timeout_ms=None):
        """
        Get configuration for resources.
        
        Parameters:
        - config_resources: List[ConfigResource], resources to describe
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[ConfigResource, ConfigResourceResult]: configuration results
        """
    
    def alter_configs(self, config_resources, timeout_ms=None):
        """
        Modify configuration for resources.
        
        Parameters:
        - config_resources: Dict[ConfigResource, Dict[str, str]], config changes
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[ConfigResource, Future]: configuration change results
        """
    
    def describe_acls(self, acl_filter, timeout_ms=None):
        """
        Describe access control lists.
        
        Parameters:
        - acl_filter: ACLFilter, filter for ACL queries
        - timeout_ms: int, operation timeout
        
        Returns:
        - List[ACLBinding]: matching ACL bindings
        """
    
    def create_acls(self, acls, timeout_ms=None):
        """
        Create access control lists.
        
        Parameters:
        - acls: List[ACL], ACLs to create
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[ACL, Future]: ACL creation results
        """
    
    def delete_acls(self, acl_filters, timeout_ms=None):
        """
        Delete access control lists.
        
        Parameters:
        - acl_filters: List[ACLFilter], filters for ACLs to delete
        - timeout_ms: int, operation timeout
        
        Returns:
        - List[DeleteAclsResult]: deletion results
        """
    
    def list_consumer_groups(self, timeout_ms=None):
        """
        List consumer groups.
        
        Parameters:
        - timeout_ms: int, operation timeout
        
        Returns:
        - List[GroupInformation]: consumer group information
        """
    
    def describe_consumer_groups(self, group_ids, timeout_ms=None):
        """
        Get detailed consumer group information.
        
        Parameters:
        - group_ids: List[str], group IDs to describe
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[str, GroupDescription]: group descriptions
        """
    
    def delete_consumer_groups(self, group_ids, timeout_ms=None):
        """
        Delete consumer groups.
        
        Parameters:
        - group_ids: List[str], group IDs to delete
        - timeout_ms: int, operation timeout
        
        Returns:
        - Dict[str, Future]: deletion results
        """
    
    def close(self):
        """Close admin client and clean up resources."""

Topic Management

Classes for creating and modifying topics.

class NewTopic:
    def __init__(self, name, num_partitions, replication_factor, 
                 replica_assignments=None, topic_configs=None):
        """
        Topic creation specification.
        
        Parameters:
        - name: str, topic name
        - num_partitions: int, number of partitions
        - replication_factor: int, replication factor
        - replica_assignments: Dict[int, List[int]], manual replica assignments
        - topic_configs: Dict[str, str], topic configuration overrides
        """
        self.name = name
        self.num_partitions = num_partitions
        self.replication_factor = replication_factor
        self.replica_assignments = replica_assignments or {}
        self.topic_configs = topic_configs or {}

class NewPartitions:
    def __init__(self, total_count, new_assignments=None):
        """
        Partition addition specification.
        
        Parameters:
        - total_count: int, new total partition count
        - new_assignments: List[List[int]], replica assignments for new partitions
        """
        self.total_count = total_count
        self.new_assignments = new_assignments

class TopicDescription:
    name: str                    # Topic name
    partitions: List[PartitionMetadata]  # Partition metadata
    is_internal: bool           # Internal topic flag
    authorizedOperations: List[int]  # Authorized operations

Configuration Management

Classes for managing broker and topic configurations.

class ConfigResource:
    def __init__(self, resource_type, name, configs=None):
        """
        Configuration resource specification.
        
        Parameters:
        - resource_type: ConfigResourceType, resource type
        - name: str, resource name
        - configs: Dict[str, str], configuration key-value pairs
        """
        self.resource_type = resource_type
        self.name = name
        self.configs = configs or {}

class ConfigResourceType:
    BROKER = 4    # Broker configuration
    TOPIC = 2     # Topic configuration

class ConfigResourceResult:
    configs: Dict[str, ConfigEntry]  # Configuration entries
    error_code: int                  # Error code (0 = success)
    error_message: str               # Error description

class ConfigEntry:
    name: str           # Configuration key
    value: str          # Configuration value
    is_default: bool    # Is default value
    is_sensitive: bool  # Is sensitive value
    is_read_only: bool  # Is read-only
    synonyms: List['ConfigSynonym']  # Configuration synonyms

Access Control Lists (ACL)

Classes for managing access control and authorization.

class ACL:
    def __init__(self, principal, host, operation, permission_type, resource_pattern):
        """
        Access control list entry.
        
        Parameters:
        - principal: str, principal (user/service)
        - host: str, host pattern
        - operation: ACLOperation, operation type
        - permission_type: ACLPermissionType, ALLOW or DENY
        - resource_pattern: ResourcePattern, resource pattern
        """
        self.principal = principal
        self.host = host
        self.operation = operation
        self.permission_type = permission_type
        self.resource_pattern = resource_pattern

class ACLFilter:
    def __init__(self, principal=None, host=None, operation=None, 
                 permission_type=None, resource_pattern_filter=None):
        """
        ACL filter for queries (allows ANY values).
        
        Parameters:
        - principal: str|ACLFilter.ANY, principal filter
        - host: str|ACLFilter.ANY, host filter
        - operation: ACLOperation|ACLFilter.ANY, operation filter
        - permission_type: ACLPermissionType|ACLFilter.ANY, permission filter
        - resource_pattern_filter: ResourcePatternFilter, resource filter
        """
        self.principal = principal
        self.host = host
        self.operation = operation
        self.permission_type = permission_type
        self.resource_pattern_filter = resource_pattern_filter

class ResourcePattern:
    def __init__(self, resource_type, resource_name, pattern_type):
        """
        Resource pattern specification.
        
        Parameters:
        - resource_type: ResourceType, type of resource
        - resource_name: str, resource name pattern
        - pattern_type: ACLResourcePatternType, pattern matching type
        """
        self.resource_type = resource_type
        self.resource_name = resource_name
        self.pattern_type = pattern_type

class ResourcePatternFilter:
    def __init__(self, resource_type=None, resource_name=None, pattern_type=None):
        """Resource pattern filter (allows ANY values)."""
        self.resource_type = resource_type
        self.resource_name = resource_name
        self.pattern_type = pattern_type

ACL Enumerations

class ACLOperation:
    ANY = -1
    ALL = 0
    READ = 1
    WRITE = 2
    CREATE = 3
    DELETE = 4
    ALTER = 5
    DESCRIBE = 6
    CLUSTER_ACTION = 7
    DESCRIBE_CONFIGS = 8
    ALTER_CONFIGS = 9
    IDEMPOTENT_WRITE = 10

class ResourceType:
    UNKNOWN = 0
    ANY = 1
    CLUSTER = 2
    DELEGATION_TOKEN = 3
    GROUP = 4
    TOPIC = 5
    TRANSACTIONAL_ID = 6

class ACLPermissionType:
    ANY = 0
    DENY = 1
    ALLOW = 2

class ACLResourcePatternType:
    ANY = 0
    MATCH = 1
    LITERAL = 2
    PREFIXED = 3

Consumer Group Management

Classes for consumer group administration.

class GroupDescription:
    group_id: str               # Group ID
    is_simple_consumer_group: bool  # Simple consumer group flag
    members: List[MemberDescription]  # Group members
    partition_assignor: str     # Partition assignment strategy
    state: str                  # Group state
    coordinator: Node           # Group coordinator
    authorized_operations: List[int]  # Authorized operations

class MemberDescription:
    member_id: str              # Member ID
    client_id: str              # Client ID
    host: str                   # Client host
    assignment: MemberAssignment    # Partition assignment

class MemberAssignment:
    topic_partitions: Set[TopicPartition]  # Assigned partitions

Usage Examples

Topic Management

from kafka import KafkaAdminClient
from kafka.admin import NewTopic, NewPartitions
from kafka.errors import TopicAlreadyExistsError, KafkaError

# Create admin client
admin = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],
    client_id='admin-client'
)

try:
    # Create topics
    topics = [
        NewTopic(name='events', num_partitions=3, replication_factor=1),
        NewTopic(name='logs', num_partitions=6, replication_factor=1,
                topic_configs={'retention.ms': '86400000'})  # 1 day retention
    ]
    
    create_result = admin.create_topics(topics, timeout_ms=30000)
    
    # Wait for results
    for topic, future in create_result.items():
        try:
            future.result()  # Block until completion
            print(f"Topic '{topic}' created successfully")
        except TopicAlreadyExistsError:
            print(f"Topic '{topic}' already exists")
        except KafkaError as e:
            print(f"Failed to create topic '{topic}': {e}")
    
    # List topics
    topics = admin.list_topics(timeout_ms=10000)
    print(f"Available topics: {list(topics)}")
    
    # Add partitions to existing topic
    partition_updates = {
        'events': NewPartitions(total_count=5)  # Increase from 3 to 5 partitions
    }
    
    partition_result = admin.create_partitions(partition_updates, timeout_ms=30000)
    for topic, future in partition_result.items():
        try:
            future.result()
            print(f"Added partitions to topic '{topic}'")
        except KafkaError as e:
            print(f"Failed to add partitions to '{topic}': {e}")

finally:
    admin.close()

Topic Description and Metadata

from kafka import KafkaAdminClient

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # Get detailed topic information
    topic_descriptions = admin.describe_topics(['events', 'logs'], timeout_ms=10000)
    
    for topic_name, description in topic_descriptions.items():
        print(f"\nTopic: {topic_name}")
        print(f"Internal: {description.is_internal}")
        print(f"Partitions: {len(description.partitions)}")
        
        for partition in description.partitions:
            print(f"  Partition {partition.partition}:")
            print(f"    Leader: {partition.leader}")
            print(f"    Replicas: {partition.replicas}")
            print(f"    ISR: {partition.isr}")

finally:
    admin.close()

Configuration Management

from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # Describe topic configurations
    topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'events')
    broker_resource = ConfigResource(ConfigResourceType.BROKER, '0')
    
    config_results = admin.describe_configs([topic_resource, broker_resource], 
                                          timeout_ms=10000)
    
    for resource, result in config_results.items():
        print(f"\nConfigurations for {resource.resource_type} '{resource.name}':")
        for name, entry in result.configs.items():
            if not entry.is_default:  # Only show non-default configs
                print(f"  {name} = {entry.value}")
    
    # Alter topic configuration
    config_updates = {
        topic_resource: {
            'retention.ms': '172800000',  # 2 days
            'cleanup.policy': 'delete'
        }
    }
    
    alter_result = admin.alter_configs(config_updates, timeout_ms=30000)
    for resource, future in alter_result.items():
        try:
            future.result()
            print(f"Configuration updated for {resource.name}")
        except KafkaError as e:
            print(f"Failed to update configuration: {e}")

finally:
    admin.close()

Access Control Lists (ACL)

from kafka import KafkaAdminClient
from kafka.admin import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter,
                        ACLOperation, ACLPermissionType, ResourceType, 
                        ACLResourcePatternType)

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # Create ACLs
    acls = [
        ACL(
            principal='User:alice',
            host='*',
            operation=ACLOperation.READ,
            permission_type=ACLPermissionType.ALLOW,
            resource_pattern=ResourcePattern(
                resource_type=ResourceType.TOPIC,
                resource_name='events',
                pattern_type=ACLResourcePatternType.LITERAL
            )
        ),
        ACL(
            principal='User:bob',
            host='192.168.1.*',
            operation=ACLOperation.WRITE,
            permission_type=ACLPermissionType.ALLOW,
            resource_pattern=ResourcePattern(
                resource_type=ResourceType.TOPIC,
                resource_name='logs-*',
                pattern_type=ACLResourcePatternType.PREFIXED
            )
        )
    ]
    
    create_result = admin.create_acls(acls, timeout_ms=30000)
    for acl, future in create_result.items():
        try:
            future.result()
            print(f"ACL created for {acl.principal}")
        except KafkaError as e:
            print(f"Failed to create ACL: {e}")
    
    # List ACLs
    acl_filter = ACLFilter(
        resource_pattern_filter=ResourcePatternFilter(
            resource_type=ResourceType.TOPIC
        )
    )
    
    acl_bindings = admin.describe_acls(acl_filter, timeout_ms=10000)
    print(f"\nFound {len(acl_bindings)} ACLs:")
    for binding in acl_bindings:
        print(f"  {binding.principal} {binding.permission_type} "
              f"{binding.operation} on {binding.pattern.resource_name}")

finally:
    admin.close()

Consumer Group Management

from kafka import KafkaAdminClient

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # List consumer groups
    groups = admin.list_consumer_groups(timeout_ms=10000)
    print(f"Found {len(groups)} consumer groups:")
    for group in groups:
        print(f"  Group: {group.group}, State: {group.state}")
    
    # Describe specific consumer groups
    group_ids = ['my-consumer-group', 'batch-processor']
    descriptions = admin.describe_consumer_groups(group_ids, timeout_ms=10000)
    
    for group_id, description in descriptions.items():
        print(f"\nGroup: {group_id}")
        print(f"State: {description.state}")
        print(f"Coordinator: {description.coordinator}")
        print(f"Assignment Strategy: {description.partition_assignor}")
        print(f"Members: {len(description.members)}")
        
        for member in description.members:
            print(f"  Member: {member.member_id}")
            print(f"    Client: {member.client_id}")
            print(f"    Host: {member.host}")
            print(f"    Partitions: {len(member.assignment.topic_partitions)}")
    
    # Delete inactive consumer group
    delete_result = admin.delete_consumer_groups(['inactive-group'], timeout_ms=30000)
    for group_id, future in delete_result.items():
        try:
            future.result()
            print(f"Consumer group '{group_id}' deleted")
        except KafkaError as e:
            print(f"Failed to delete group '{group_id}': {e}")

finally:
    admin.close()

Cluster Information

from kafka import KafkaAdminClient
from kafka.client_async import KafkaClient

# Using admin client for high-level operations
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Using low-level client for detailed cluster info
client = KafkaClient(bootstrap_servers=['localhost:9092'])

try:
    # Wait for client to connect and load metadata
    client.poll(timeout_ms=5000)
    
    # Get cluster metadata
    cluster = client.cluster
    
    print("Cluster Information:")
    print(f"Cluster ID: {cluster.cluster_id}")
    print(f"Controller: {cluster.controller}")
    
    print(f"\nBrokers ({len(cluster.brokers())}):")
    for broker in cluster.brokers():
        print(f"  Broker {broker.nodeId}: {broker.host}:{broker.port}")
        if broker.rack:
            print(f"    Rack: {broker.rack}")
    
    print(f"\nTopics ({len(cluster.topics())}):")
    for topic in sorted(cluster.topics()):
        partitions = cluster.partitions_for_topic(topic)
        print(f"  {topic}: {len(partitions)} partitions")
        
        for partition_id in sorted(partitions):
            partition = cluster.leader_for_partition(TopicPartition(topic, partition_id))
            print(f"    Partition {partition_id}: Leader {partition}")

finally:
    admin.close()
    client.close()

Batch Operations

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
import concurrent.futures

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # Create multiple topics concurrently
    topics = [
        NewTopic(f'partition-{i}', num_partitions=i+1, replication_factor=1)
        for i in range(10)
    ]
    
    create_result = admin.create_topics(topics, timeout_ms=60000)
    
    # Process results as they complete
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit all futures
        future_to_topic = {
            executor.submit(future.result): topic_name 
            for topic_name, future in create_result.items()
        }
        
        # Process completed futures
        for future in concurrent.futures.as_completed(future_to_topic, timeout=60):
            topic_name = future_to_topic[future]
            try:
                future.result()
                print(f"✓ Topic '{topic_name}' created")
            except Exception as e:
                print(f"✗ Topic '{topic_name}' failed: {e}")

finally:
    admin.close()

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json