CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

admin-client.mddocs/

Admin Client

The AdminClient provides comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, consumer groups, and SCRAM credentials. All operations are asynchronous and return futures for concurrent execution.

Capabilities

AdminClient

Main administrative client for Kafka cluster management.

class AdminClient:
    def __init__(self, conf):
        """
        Create AdminClient instance.
        
        Args:
            conf (dict): Configuration properties for the admin client
        """

    def create_topics(self, new_topics, **kwargs):
        """
        Create topics.
        
        Args:
            new_topics (list): List of NewTopic objects
            **kwargs: Additional options (validate_only, request_timeout, operation_timeout)
            
        Returns:
            dict: Future objects keyed by topic name
        """

    def delete_topics(self, topics, **kwargs):
        """
        Delete topics.
        
        Args:
            topics (list): List of topic names to delete
            **kwargs: Additional options (request_timeout, operation_timeout)
            
        Returns:
            dict: Future objects keyed by topic name
        """

    def list_topics(self, topic=None, timeout=-1):
        """
        Get metadata for topics.
        
        Args:
            topic (str, optional): Specific topic name
            timeout (float): Request timeout in seconds
            
        Returns:
            ClusterMetadata: Cluster and topic metadata
        """

    def describe_topics(self, topic_names, **kwargs):
        """
        Describe topics.
        
        Args:
            topic_names (list): List of topic names to describe
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by topic name
        """

    def create_partitions(self, fs, **kwargs):
        """
        Create additional partitions for topics.
        
        Args:
            fs (list): List of NewPartitions objects
            **kwargs: Additional options (validate_only, request_timeout, operation_timeout)
            
        Returns:
            dict: Future objects keyed by topic name
        """

    def describe_configs(self, resources, **kwargs):
        """
        Describe configuration for resources.
        
        Args:
            resources (list): List of ConfigResource objects
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by ConfigResource
        """

    def alter_configs(self, resources, **kwargs):
        """
        Alter configuration for resources.
        
        Args:
            resources (dict): Dict of ConfigResource to list of ConfigEntry
            **kwargs: Additional options (validate_only, request_timeout)
            
        Returns:
            dict: Future objects keyed by ConfigResource
        """

    def incremental_alter_configs(self, resources, **kwargs):
        """
        Incrementally alter configuration for resources.
        
        Args:
            resources (dict): Dict of ConfigResource to list of ConfigEntry with AlterConfigOpType
            **kwargs: Additional options (validate_only, request_timeout)
            
        Returns:
            dict: Future objects keyed by ConfigResource
        """

    def create_acls(self, acl_bindings, **kwargs):
        """
        Create ACL bindings.
        
        Args:
            acl_bindings (list): List of AclBinding objects
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by AclBinding
        """

    def describe_acls(self, acl_binding_filter, **kwargs):
        """
        Describe ACL bindings.
        
        Args:
            acl_binding_filter (AclBindingFilter): Filter for ACL bindings
            **kwargs: Additional options (request_timeout)
            
        Returns:
            concurrent.futures.Future: Future with AclBinding results
        """

    def delete_acls(self, acl_binding_filters, **kwargs):
        """
        Delete ACL bindings.
        
        Args:
            acl_binding_filters (list): List of AclBindingFilter objects
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by AclBindingFilter
        """

    def list_consumer_groups(self, **kwargs):
        """
        List consumer groups.
        
        Args:
            **kwargs: Additional options (request_timeout, states)
            
        Returns:
            concurrent.futures.Future: Future with ListConsumerGroupsResult
        """

    def describe_consumer_groups(self, group_ids, **kwargs):
        """
        Describe consumer groups.
        
        Args:
            group_ids (list): List of consumer group IDs
            **kwargs: Additional options (request_timeout, include_authorized_operations)
            
        Returns:
            dict: Future objects keyed by group ID
        """

    def delete_consumer_groups(self, group_ids, **kwargs):
        """
        Delete consumer groups.
        
        Args:
            group_ids (list): List of consumer group IDs to delete
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by group ID
        """

    def list_consumer_group_offsets(self, request, **kwargs):
        """
        List consumer group offsets.
        
        Args:
            request (ConsumerGroupTopicPartitions or list): Group and partitions to query
            **kwargs: Additional options (request_timeout, require_stable)
            
        Returns:
            dict: Future objects keyed by ConsumerGroupTopicPartitions
        """

    def alter_consumer_group_offsets(self, group_topic_partitions, **kwargs):
        """
        Alter consumer group offsets.
        
        Args:
            group_topic_partitions (list): List of ConsumerGroupTopicPartitions
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by ConsumerGroupTopicPartitions
        """

    def describe_user_scram_credentials(self, users=None, **kwargs):
        """
        Describe SCRAM credentials for users.
        
        Args:
            users (list, optional): List of usernames (None for all)
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by username
        """

    def alter_user_scram_credentials(self, alterations, **kwargs):
        """
        Alter SCRAM credentials for users.
        
        Args:
            alterations (list): List of UserScramCredentialAlteration objects
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by username
        """

    def describe_cluster(self, **kwargs):
        """
        Describe cluster information.
        
        Args:
            **kwargs: Additional options (request_timeout, include_authorized_operations)
            
        Returns:
            concurrent.futures.Future: Future with DescribeClusterResult
        """

    def list_offsets(self, topic_partition_offsets, **kwargs):
        """
        List offsets for topic partitions.
        
        Args:
            topic_partition_offsets (dict): Dict of TopicPartition to OffsetSpec
            **kwargs: Additional options (request_timeout, isolation_level)
            
        Returns:
            dict: Future objects keyed by TopicPartition
        """

    def delete_records(self, topic_partition_offsets, **kwargs):
        """
        Delete records before specified offsets.
        
        Args:
            topic_partition_offsets (dict): Dict of TopicPartition to offset
            **kwargs: Additional options (request_timeout)
            
        Returns:
            dict: Future objects keyed by TopicPartition
        """

    def elect_leaders(self, election_type, partitions=None, **kwargs):
        """
        Elect leaders for topic partitions.
        
        Args:
            election_type (ElectionType): Type of election (PREFERRED or UNCLEAN)
            partitions (list, optional): List of TopicPartition objects (None for all partitions)
            **kwargs: Additional options (request_timeout)
            
        Returns:
            concurrent.futures.Future: Future with election results
        """

    def set_sasl_credentials(self, username, password):
        """
        Set SASL credentials for authentication.
        
        Args:
            username (str): SASL username
            password (str): SASL password
        """

Topic Management Classes

NewTopic

Specification for creating new topics.

class NewTopic:
    def __init__(self, topic, num_partitions=None, replication_factor=None, replica_assignment=None, config=None):
        """
        Create NewTopic specification.
        
        Args:
            topic (str): Topic name
            num_partitions (int, optional): Number of partitions
            replication_factor (int, optional): Replication factor
            replica_assignment (dict, optional): Manual replica assignment
            config (dict, optional): Topic configuration
        """

    @property
    def topic(self):
        """Topic name."""

    @property
    def num_partitions(self):
        """Number of partitions."""

    @property
    def replication_factor(self):
        """Replication factor."""

    @property
    def replica_assignment(self):
        """Replica assignment."""

    @property
    def config(self):
        """Topic configuration."""

NewPartitions

Specification for adding partitions to existing topics.

class NewPartitions:
    def __init__(self, topic, new_total_count, replica_assignment=None):
        """
        Create NewPartitions specification.
        
        Args:
            topic (str): Topic name
            new_total_count (int): New total partition count
            replica_assignment (list, optional): Replica assignment for new partitions
        """

    @property
    def topic(self):
        """Topic name."""

    @property
    def new_total_count(self):
        """New total partition count."""

    @property
    def replica_assignment(self):
        """Replica assignment."""

Configuration Management Classes

ConfigResource

Represents a configuration resource.

class ConfigResource:
    def __init__(self, restype, name, incremental_configs=None):
        """
        Create ConfigResource.
        
        Args:
            restype (int): Resource type (RESOURCE_TOPIC, RESOURCE_BROKER, etc.)
            name (str): Resource name
            incremental_configs (list, optional): Incremental configuration entries
        """

    @property
    def restype(self):
        """Resource type."""

    @property
    def name(self):
        """Resource name."""

    @property
    def incremental_configs(self):
        """Incremental configuration entries."""

    def __hash__(self):
        """Hash for use in dicts."""

    def __eq__(self, other):
        """Equality comparison."""

ConfigEntry

Represents a configuration entry.

class ConfigEntry:
    def __init__(self, name, value, incremental_operation=None):
        """
        Create ConfigEntry.
        
        Args:
            name (str): Configuration name
            value (str): Configuration value
            incremental_operation (AlterConfigOpType, optional): Operation type for incremental updates
        """

    @property
    def name(self):
        """Configuration name."""

    @property
    def value(self):
        """Configuration value."""

    @property
    def incremental_operation(self):
        """Incremental operation type."""

    @property
    def source(self):
        """Configuration source."""

    @property
    def is_default(self):
        """Whether this is a default configuration."""

    @property
    def is_read_only(self):
        """Whether this configuration is read-only."""

    @property
    def is_sensitive(self):
        """Whether this configuration is sensitive."""

    @property
    def synonyms(self):
        """Configuration synonyms."""

ACL Management Classes

AclBinding

Represents an ACL binding.

class AclBinding:
    def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
        """
        Create AclBinding.
        
        Args:
            restype (ResourceType): Resource type
            name (str): Resource name
            resource_pattern_type (ResourcePatternType): Pattern type
            principal (str): Principal (user/service)
            host (str): Host pattern
            operation (AclOperation): ACL operation
            permission_type (AclPermissionType): Permission type
        """

    @property
    def restype(self):
        """Resource type."""

    @property
    def name(self):
        """Resource name."""

    @property
    def resource_pattern_type(self):
        """Resource pattern type."""

    @property
    def principal(self):
        """Principal."""

    @property
    def host(self):
        """Host pattern."""

    @property
    def operation(self):
        """ACL operation."""

    @property
    def permission_type(self):
        """Permission type."""

AclBindingFilter

Filter for ACL bindings.

class AclBindingFilter:
    def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
        """
        Create AclBindingFilter.
        
        Args:
            restype (ResourceType): Resource type (can be ANY)
            name (str): Resource name (can be None for any)
            resource_pattern_type (ResourcePatternType): Pattern type (can be ANY)
            principal (str): Principal (can be None for any)
            host (str): Host pattern (can be None for any)
            operation (AclOperation): ACL operation (can be ANY)
            permission_type (AclPermissionType): Permission type (can be ANY)
        """

Consumer Group Management Classes

ConsumerGroupListing

Information about a consumer group.

class ConsumerGroupListing:
    @property
    def group_id(self):
        """Consumer group ID."""

    @property
    def is_simple_consumer_group(self):
        """Whether this is a simple consumer group."""

    @property
    def state(self):
        """Consumer group state."""

    @property
    def type(self):
        """Consumer group type."""

ConsumerGroupDescription

Detailed description of a consumer group.

class ConsumerGroupDescription:
    @property
    def group_id(self):
        """Consumer group ID."""

    @property
    def is_simple_consumer_group(self):
        """Whether this is a simple consumer group."""

    @property
    def members(self):
        """List of group members."""

    @property
    def partition_assignor(self):
        """Partition assignor strategy."""

    @property
    def state(self):
        """Consumer group state."""

    @property
    def coordinator(self):
        """Group coordinator node."""

    @property
    def authorized_operations(self):
        """Authorized operations for this group."""

MemberDescription

Description of a consumer group member.

class MemberDescription:
    @property
    def member_id(self):
        """Member ID."""

    @property
    def group_instance_id(self):
        """Group instance ID."""

    @property
    def client_id(self):
        """Client ID."""

    @property
    def host(self):
        """Member host."""

    @property
    def assignment(self):
        """Member assignment."""

SCRAM Credential Management

UserScramCredentialAlteration

Base class for SCRAM credential alterations.

class UserScramCredentialAlteration:
    def __init__(self, user):
        """
        Base class for SCRAM credential alterations.
        
        Args:
            user (str): Username
        """

    @property
    def user(self):
        """Username."""

UserScramCredentialUpsertion

SCRAM credential creation or update.

class UserScramCredentialUpsertion(UserScramCredentialAlteration):
    def __init__(self, user, scram_credential_info):
        """
        Create or update SCRAM credentials.
        
        Args:
            user (str): Username
            scram_credential_info (ScramCredentialInfo): Credential information
        """

    @property
    def scram_credential_info(self):
        """SCRAM credential information."""

UserScramCredentialDeletion

SCRAM credential deletion.

class UserScramCredentialDeletion(UserScramCredentialAlteration):
    def __init__(self, user, mechanism):
        """
        Delete SCRAM credentials.
        
        Args:
            user (str): Username
            mechanism (ScramMechanism): SCRAM mechanism to delete
        """

    @property
    def mechanism(self):
        """SCRAM mechanism."""

Enumeration Classes

class ResourceType:
    UNKNOWN = 0
    ANY = 1
    TOPIC = 2
    GROUP = 3
    CLUSTER = 4
    TRANSACTIONAL_ID = 5
    DELEGATION_TOKEN = 6
    USER = 7

class ResourcePatternType:
    UNKNOWN = 0
    ANY = 1
    MATCH = 2
    LITERAL = 3
    PREFIXED = 4

class AclOperation:
    UNKNOWN = 0
    ANY = 1
    ALL = 2
    READ = 3
    WRITE = 4
    CREATE = 5
    DELETE = 6
    ALTER = 7
    DESCRIBE = 8
    CLUSTER_ACTION = 9
    DESCRIBE_CONFIGS = 10
    ALTER_CONFIGS = 11
    IDEMPOTENT_WRITE = 12

class AclPermissionType:
    UNKNOWN = 0
    ANY = 1
    DENY = 2
    ALLOW = 3

class ConfigSource:
    UNKNOWN_CONFIG = 0
    DYNAMIC_TOPIC_CONFIG = 1
    DYNAMIC_BROKER_CONFIG = 2
    DYNAMIC_DEFAULT_BROKER_CONFIG = 3
    STATIC_BROKER_CONFIG = 4
    DEFAULT_CONFIG = 5

class AlterConfigOpType:
    SET = 0
    DELETE = 1
    APPEND = 2
    SUBTRACT = 3

class ScramMechanism:
    SCRAM_SHA_256 = 0
    SCRAM_SHA_512 = 1

class ElectionType:
    PREFERRED = 0
    UNCLEAN = 1

Usage Examples

Creating Topics

from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Create topics
new_topics = [
    NewTopic('my-topic-1', num_partitions=3, replication_factor=1),
    NewTopic('my-topic-2', num_partitions=6, replication_factor=1, config={'cleanup.policy': 'compact'})
]

fs = admin_client.create_topics(new_topics, request_timeout=30)

# Wait for results
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print(f"Topic {topic} created")
    except Exception as e:
        print(f"Failed to create topic {topic}: {e}")

Managing Consumer Groups

from confluent_kafka.admin import AdminClient

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# List consumer groups
fs = admin_client.list_consumer_groups(request_timeout=10)
try:
    result = fs.result()
    for group_listing in result.valid:
        print(f"Group: {group_listing.group_id}, State: {group_listing.state}")
except Exception as e:
    print(f"Failed to list consumer groups: {e}")

# Describe specific consumer groups
group_ids = ['my-group-1', 'my-group-2']
fs = admin_client.describe_consumer_groups(group_ids, request_timeout=10)

for group_id, f in fs.items():
    try:
        group_desc = f.result()
        print(f"Group {group_id}: {len(group_desc.members)} members")
        for member in group_desc.members:
            print(f"  Member: {member.member_id} on {member.host}")
    except Exception as e:
        print(f"Failed to describe group {group_id}: {e}")

Managing ACLs

from confluent_kafka.admin import AdminClient, AclBinding, AclBindingFilter
from confluent_kafka.admin import ResourceType, ResourcePatternType, AclOperation, AclPermissionType

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Create ACL binding
acl_binding = AclBinding(
    restype=ResourceType.TOPIC,
    name='my-topic',
    resource_pattern_type=ResourcePatternType.LITERAL,
    principal='User:alice',
    host='*',
    operation=AclOperation.READ,
    permission_type=AclPermissionType.ALLOW
)

fs = admin_client.create_acls([acl_binding], request_timeout=10)
for acl, f in fs.items():
    try:
        f.result()
        print(f"ACL created for {acl.principal}")
    except Exception as e:
        print(f"Failed to create ACL: {e}")

# List ACLs
acl_filter = AclBindingFilter(
    restype=ResourceType.TOPIC,
    name=None,  # All topics
    resource_pattern_type=ResourcePatternType.ANY,
    principal=None,  # All principals
    host=None,  # All hosts
    operation=AclOperation.ANY,
    permission_type=AclPermissionType.ANY
)

fs = admin_client.describe_acls(acl_filter, request_timeout=10)
try:
    acl_bindings = fs.result()
    for acl in acl_bindings:
        print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.name}")
except Exception as e:
    print(f"Failed to list ACLs: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-confluent-kafka

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json