Confluent's Python client for Apache Kafka
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The AdminClient provides comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, consumer groups, and SCRAM credentials. All operations are asynchronous and return futures for concurrent execution.
Main administrative client for Kafka cluster management.
class AdminClient:
def __init__(self, conf):
"""
Create AdminClient instance.
Args:
conf (dict): Configuration properties for the admin client
"""
def create_topics(self, new_topics, **kwargs):
"""
Create topics.
Args:
new_topics (list): List of NewTopic objects
**kwargs: Additional options (validate_only, request_timeout, operation_timeout)
Returns:
dict: Future objects keyed by topic name
"""
def delete_topics(self, topics, **kwargs):
"""
Delete topics.
Args:
topics (list): List of topic names to delete
**kwargs: Additional options (request_timeout, operation_timeout)
Returns:
dict: Future objects keyed by topic name
"""
def list_topics(self, topic=None, timeout=-1):
"""
Get metadata for topics.
Args:
topic (str, optional): Specific topic name
timeout (float): Request timeout in seconds
Returns:
ClusterMetadata: Cluster and topic metadata
"""
def describe_topics(self, topic_names, **kwargs):
"""
Describe topics.
Args:
topic_names (list): List of topic names to describe
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by topic name
"""
def create_partitions(self, fs, **kwargs):
"""
Create additional partitions for topics.
Args:
fs (list): List of NewPartitions objects
**kwargs: Additional options (validate_only, request_timeout, operation_timeout)
Returns:
dict: Future objects keyed by topic name
"""
def describe_configs(self, resources, **kwargs):
"""
Describe configuration for resources.
Args:
resources (list): List of ConfigResource objects
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by ConfigResource
"""
def alter_configs(self, resources, **kwargs):
"""
Alter configuration for resources.
Args:
resources (dict): Dict of ConfigResource to list of ConfigEntry
**kwargs: Additional options (validate_only, request_timeout)
Returns:
dict: Future objects keyed by ConfigResource
"""
def incremental_alter_configs(self, resources, **kwargs):
"""
Incrementally alter configuration for resources.
Args:
resources (dict): Dict of ConfigResource to list of ConfigEntry with AlterConfigOpType
**kwargs: Additional options (validate_only, request_timeout)
Returns:
dict: Future objects keyed by ConfigResource
"""
def create_acls(self, acl_bindings, **kwargs):
"""
Create ACL bindings.
Args:
acl_bindings (list): List of AclBinding objects
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by AclBinding
"""
def describe_acls(self, acl_binding_filter, **kwargs):
"""
Describe ACL bindings.
Args:
acl_binding_filter (AclBindingFilter): Filter for ACL bindings
**kwargs: Additional options (request_timeout)
Returns:
concurrent.futures.Future: Future with AclBinding results
"""
def delete_acls(self, acl_binding_filters, **kwargs):
"""
Delete ACL bindings.
Args:
acl_binding_filters (list): List of AclBindingFilter objects
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by AclBindingFilter
"""
def list_consumer_groups(self, **kwargs):
"""
List consumer groups.
Args:
**kwargs: Additional options (request_timeout, states)
Returns:
concurrent.futures.Future: Future with ListConsumerGroupsResult
"""
def describe_consumer_groups(self, group_ids, **kwargs):
"""
Describe consumer groups.
Args:
group_ids (list): List of consumer group IDs
**kwargs: Additional options (request_timeout, include_authorized_operations)
Returns:
dict: Future objects keyed by group ID
"""
def delete_consumer_groups(self, group_ids, **kwargs):
"""
Delete consumer groups.
Args:
group_ids (list): List of consumer group IDs to delete
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by group ID
"""
def list_consumer_group_offsets(self, request, **kwargs):
"""
List consumer group offsets.
Args:
request (ConsumerGroupTopicPartitions or list): Group and partitions to query
**kwargs: Additional options (request_timeout, require_stable)
Returns:
dict: Future objects keyed by ConsumerGroupTopicPartitions
"""
def alter_consumer_group_offsets(self, group_topic_partitions, **kwargs):
"""
Alter consumer group offsets.
Args:
group_topic_partitions (list): List of ConsumerGroupTopicPartitions
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by ConsumerGroupTopicPartitions
"""
def describe_user_scram_credentials(self, users=None, **kwargs):
"""
Describe SCRAM credentials for users.
Args:
users (list, optional): List of usernames (None for all)
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by username
"""
def alter_user_scram_credentials(self, alterations, **kwargs):
"""
Alter SCRAM credentials for users.
Args:
alterations (list): List of UserScramCredentialAlteration objects
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by username
"""
def describe_cluster(self, **kwargs):
"""
Describe cluster information.
Args:
**kwargs: Additional options (request_timeout, include_authorized_operations)
Returns:
concurrent.futures.Future: Future with DescribeClusterResult
"""
def list_offsets(self, topic_partition_offsets, **kwargs):
"""
List offsets for topic partitions.
Args:
topic_partition_offsets (dict): Dict of TopicPartition to OffsetSpec
**kwargs: Additional options (request_timeout, isolation_level)
Returns:
dict: Future objects keyed by TopicPartition
"""
def delete_records(self, topic_partition_offsets, **kwargs):
"""
Delete records before specified offsets.
Args:
topic_partition_offsets (dict): Dict of TopicPartition to offset
**kwargs: Additional options (request_timeout)
Returns:
dict: Future objects keyed by TopicPartition
"""
def elect_leaders(self, election_type, partitions=None, **kwargs):
"""
Elect leaders for topic partitions.
Args:
election_type (ElectionType): Type of election (PREFERRED or UNCLEAN)
partitions (list, optional): List of TopicPartition objects (None for all partitions)
**kwargs: Additional options (request_timeout)
Returns:
concurrent.futures.Future: Future with election results
"""
def set_sasl_credentials(self, username, password):
"""
Set SASL credentials for authentication.
Args:
username (str): SASL username
password (str): SASL password
"""Specification for creating new topics.
class NewTopic:
def __init__(self, topic, num_partitions=None, replication_factor=None, replica_assignment=None, config=None):
"""
Create NewTopic specification.
Args:
topic (str): Topic name
num_partitions (int, optional): Number of partitions
replication_factor (int, optional): Replication factor
replica_assignment (dict, optional): Manual replica assignment
config (dict, optional): Topic configuration
"""
@property
def topic(self):
"""Topic name."""
@property
def num_partitions(self):
"""Number of partitions."""
@property
def replication_factor(self):
"""Replication factor."""
@property
def replica_assignment(self):
"""Replica assignment."""
@property
def config(self):
"""Topic configuration."""Specification for adding partitions to existing topics.
class NewPartitions:
def __init__(self, topic, new_total_count, replica_assignment=None):
"""
Create NewPartitions specification.
Args:
topic (str): Topic name
new_total_count (int): New total partition count
replica_assignment (list, optional): Replica assignment for new partitions
"""
@property
def topic(self):
"""Topic name."""
@property
def new_total_count(self):
"""New total partition count."""
@property
def replica_assignment(self):
"""Replica assignment."""Represents a configuration resource.
class ConfigResource:
def __init__(self, restype, name, incremental_configs=None):
"""
Create ConfigResource.
Args:
restype (int): Resource type (RESOURCE_TOPIC, RESOURCE_BROKER, etc.)
name (str): Resource name
incremental_configs (list, optional): Incremental configuration entries
"""
@property
def restype(self):
"""Resource type."""
@property
def name(self):
"""Resource name."""
@property
def incremental_configs(self):
"""Incremental configuration entries."""
def __hash__(self):
"""Hash for use in dicts."""
def __eq__(self, other):
"""Equality comparison."""Represents a configuration entry.
class ConfigEntry:
def __init__(self, name, value, incremental_operation=None):
"""
Create ConfigEntry.
Args:
name (str): Configuration name
value (str): Configuration value
incremental_operation (AlterConfigOpType, optional): Operation type for incremental updates
"""
@property
def name(self):
"""Configuration name."""
@property
def value(self):
"""Configuration value."""
@property
def incremental_operation(self):
"""Incremental operation type."""
@property
def source(self):
"""Configuration source."""
@property
def is_default(self):
"""Whether this is a default configuration."""
@property
def is_read_only(self):
"""Whether this configuration is read-only."""
@property
def is_sensitive(self):
"""Whether this configuration is sensitive."""
@property
def synonyms(self):
"""Configuration synonyms."""Represents an ACL binding.
class AclBinding:
def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
"""
Create AclBinding.
Args:
restype (ResourceType): Resource type
name (str): Resource name
resource_pattern_type (ResourcePatternType): Pattern type
principal (str): Principal (user/service)
host (str): Host pattern
operation (AclOperation): ACL operation
permission_type (AclPermissionType): Permission type
"""
@property
def restype(self):
"""Resource type."""
@property
def name(self):
"""Resource name."""
@property
def resource_pattern_type(self):
"""Resource pattern type."""
@property
def principal(self):
"""Principal."""
@property
def host(self):
"""Host pattern."""
@property
def operation(self):
"""ACL operation."""
@property
def permission_type(self):
"""Permission type."""Filter for ACL bindings.
class AclBindingFilter:
def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
"""
Create AclBindingFilter.
Args:
restype (ResourceType): Resource type (can be ANY)
name (str): Resource name (can be None for any)
resource_pattern_type (ResourcePatternType): Pattern type (can be ANY)
principal (str): Principal (can be None for any)
host (str): Host pattern (can be None for any)
operation (AclOperation): ACL operation (can be ANY)
permission_type (AclPermissionType): Permission type (can be ANY)
"""Information about a consumer group.
class ConsumerGroupListing:
@property
def group_id(self):
"""Consumer group ID."""
@property
def is_simple_consumer_group(self):
"""Whether this is a simple consumer group."""
@property
def state(self):
"""Consumer group state."""
@property
def type(self):
"""Consumer group type."""Detailed description of a consumer group.
class ConsumerGroupDescription:
@property
def group_id(self):
"""Consumer group ID."""
@property
def is_simple_consumer_group(self):
"""Whether this is a simple consumer group."""
@property
def members(self):
"""List of group members."""
@property
def partition_assignor(self):
"""Partition assignor strategy."""
@property
def state(self):
"""Consumer group state."""
@property
def coordinator(self):
"""Group coordinator node."""
@property
def authorized_operations(self):
"""Authorized operations for this group."""Description of a consumer group member.
class MemberDescription:
@property
def member_id(self):
"""Member ID."""
@property
def group_instance_id(self):
"""Group instance ID."""
@property
def client_id(self):
"""Client ID."""
@property
def host(self):
"""Member host."""
@property
def assignment(self):
"""Member assignment."""Base class for SCRAM credential alterations.
class UserScramCredentialAlteration:
def __init__(self, user):
"""
Base class for SCRAM credential alterations.
Args:
user (str): Username
"""
@property
def user(self):
"""Username."""SCRAM credential creation or update.
class UserScramCredentialUpsertion(UserScramCredentialAlteration):
def __init__(self, user, scram_credential_info):
"""
Create or update SCRAM credentials.
Args:
user (str): Username
scram_credential_info (ScramCredentialInfo): Credential information
"""
@property
def scram_credential_info(self):
"""SCRAM credential information."""SCRAM credential deletion.
class UserScramCredentialDeletion(UserScramCredentialAlteration):
def __init__(self, user, mechanism):
"""
Delete SCRAM credentials.
Args:
user (str): Username
mechanism (ScramMechanism): SCRAM mechanism to delete
"""
@property
def mechanism(self):
"""SCRAM mechanism."""class ResourceType:
UNKNOWN = 0
ANY = 1
TOPIC = 2
GROUP = 3
CLUSTER = 4
TRANSACTIONAL_ID = 5
DELEGATION_TOKEN = 6
USER = 7
class ResourcePatternType:
UNKNOWN = 0
ANY = 1
MATCH = 2
LITERAL = 3
PREFIXED = 4
class AclOperation:
UNKNOWN = 0
ANY = 1
ALL = 2
READ = 3
WRITE = 4
CREATE = 5
DELETE = 6
ALTER = 7
DESCRIBE = 8
CLUSTER_ACTION = 9
DESCRIBE_CONFIGS = 10
ALTER_CONFIGS = 11
IDEMPOTENT_WRITE = 12
class AclPermissionType:
UNKNOWN = 0
ANY = 1
DENY = 2
ALLOW = 3
class ConfigSource:
UNKNOWN_CONFIG = 0
DYNAMIC_TOPIC_CONFIG = 1
DYNAMIC_BROKER_CONFIG = 2
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
STATIC_BROKER_CONFIG = 4
DEFAULT_CONFIG = 5
class AlterConfigOpType:
SET = 0
DELETE = 1
APPEND = 2
SUBTRACT = 3
class ScramMechanism:
SCRAM_SHA_256 = 0
SCRAM_SHA_512 = 1
class ElectionType:
PREFERRED = 0
UNCLEAN = 1from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# Create topics
new_topics = [
NewTopic('my-topic-1', num_partitions=3, replication_factor=1),
NewTopic('my-topic-2', num_partitions=6, replication_factor=1, config={'cleanup.policy': 'compact'})
]
fs = admin_client.create_topics(new_topics, request_timeout=30)
# Wait for results
for topic, f in fs.items():
try:
f.result() # The result itself is None
print(f"Topic {topic} created")
except Exception as e:
print(f"Failed to create topic {topic}: {e}")from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# List consumer groups
fs = admin_client.list_consumer_groups(request_timeout=10)
try:
result = fs.result()
for group_listing in result.valid:
print(f"Group: {group_listing.group_id}, State: {group_listing.state}")
except Exception as e:
print(f"Failed to list consumer groups: {e}")
# Describe specific consumer groups
group_ids = ['my-group-1', 'my-group-2']
fs = admin_client.describe_consumer_groups(group_ids, request_timeout=10)
for group_id, f in fs.items():
try:
group_desc = f.result()
print(f"Group {group_id}: {len(group_desc.members)} members")
for member in group_desc.members:
print(f" Member: {member.member_id} on {member.host}")
except Exception as e:
print(f"Failed to describe group {group_id}: {e}")from confluent_kafka.admin import AdminClient, AclBinding, AclBindingFilter
from confluent_kafka.admin import ResourceType, ResourcePatternType, AclOperation, AclPermissionType
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
# Create ACL binding
acl_binding = AclBinding(
restype=ResourceType.TOPIC,
name='my-topic',
resource_pattern_type=ResourcePatternType.LITERAL,
principal='User:alice',
host='*',
operation=AclOperation.READ,
permission_type=AclPermissionType.ALLOW
)
fs = admin_client.create_acls([acl_binding], request_timeout=10)
for acl, f in fs.items():
try:
f.result()
print(f"ACL created for {acl.principal}")
except Exception as e:
print(f"Failed to create ACL: {e}")
# List ACLs
acl_filter = AclBindingFilter(
restype=ResourceType.TOPIC,
name=None, # All topics
resource_pattern_type=ResourcePatternType.ANY,
principal=None, # All principals
host=None, # All hosts
operation=AclOperation.ANY,
permission_type=AclPermissionType.ANY
)
fs = admin_client.describe_acls(acl_filter, request_timeout=10)
try:
acl_bindings = fs.result()
for acl in acl_bindings:
print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.name}")
except Exception as e:
print(f"Failed to list ACLs: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-confluent-kafka