Pure Python client for Apache Kafka distributed stream processing system
—
Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.
Main administrative client providing comprehensive cluster management capabilities. Supports all Kafka administrative operations with proper error handling and timeouts.
class KafkaAdminClient:
def __init__(self, **configs):
"""
Initialize admin client.
Configuration Parameters:
- bootstrap_servers: List[str], broker addresses
- client_id: str, client identifier
- connections_max_idle_ms: int, max idle time (default: 540000)
- request_timeout_ms: int, request timeout (default: 30000)
- retry_backoff_ms: int, retry backoff (default: 100)
- reconnect_backoff_ms: int, reconnect backoff (default: 50)
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
- api_version: tuple, broker API version or 'auto'
"""
def create_topics(self, topic_requests, timeout_ms=None, validate_only=False):
"""
Create new topics.
Parameters:
- topic_requests: List[NewTopic], topic creation requests
- timeout_ms: int, operation timeout
- validate_only: bool, validate without creating
Returns:
- Dict[str, Future]: topic name to creation result future
"""
def delete_topics(self, topics, timeout_ms=None):
"""
Delete topics.
Parameters:
- topics: List[str], topic names to delete
- timeout_ms: int, operation timeout
Returns:
- Dict[str, Future]: topic name to deletion result future
"""
def list_topics(self, timeout_ms=None):
"""
List available topics.
Parameters:
- timeout_ms: int, operation timeout
Returns:
- Set[str]: available topic names
"""
def describe_topics(self, topics, timeout_ms=None):
"""
Get detailed topic information.
Parameters:
- topics: List[str], topic names to describe
- timeout_ms: int, operation timeout
Returns:
- Dict[str, TopicDescription]: topic descriptions
"""
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
"""
Add partitions to existing topics.
Parameters:
- partition_updates: Dict[str, NewPartitions], topic to partition updates
- timeout_ms: int, operation timeout
- validate_only: bool, validate without creating
Returns:
- Dict[str, Future]: topic name to result future
"""
def describe_configs(self, config_resources, timeout_ms=None):
"""
Get configuration for resources.
Parameters:
- config_resources: List[ConfigResource], resources to describe
- timeout_ms: int, operation timeout
Returns:
- Dict[ConfigResource, ConfigResourceResult]: configuration results
"""
def alter_configs(self, config_resources, timeout_ms=None):
"""
Modify configuration for resources.
Parameters:
- config_resources: Dict[ConfigResource, Dict[str, str]], config changes
- timeout_ms: int, operation timeout
Returns:
- Dict[ConfigResource, Future]: configuration change results
"""
def describe_acls(self, acl_filter, timeout_ms=None):
"""
Describe access control lists.
Parameters:
- acl_filter: ACLFilter, filter for ACL queries
- timeout_ms: int, operation timeout
Returns:
- List[ACLBinding]: matching ACL bindings
"""
def create_acls(self, acls, timeout_ms=None):
"""
Create access control lists.
Parameters:
- acls: List[ACL], ACLs to create
- timeout_ms: int, operation timeout
Returns:
- Dict[ACL, Future]: ACL creation results
"""
def delete_acls(self, acl_filters, timeout_ms=None):
"""
Delete access control lists.
Parameters:
- acl_filters: List[ACLFilter], filters for ACLs to delete
- timeout_ms: int, operation timeout
Returns:
- List[DeleteAclsResult]: deletion results
"""
def list_consumer_groups(self, timeout_ms=None):
"""
List consumer groups.
Parameters:
- timeout_ms: int, operation timeout
Returns:
- List[GroupInformation]: consumer group information
"""
def describe_consumer_groups(self, group_ids, timeout_ms=None):
"""
Get detailed consumer group information.
Parameters:
- group_ids: List[str], group IDs to describe
- timeout_ms: int, operation timeout
Returns:
- Dict[str, GroupDescription]: group descriptions
"""
def delete_consumer_groups(self, group_ids, timeout_ms=None):
"""
Delete consumer groups.
Parameters:
- group_ids: List[str], group IDs to delete
- timeout_ms: int, operation timeout
Returns:
- Dict[str, Future]: deletion results
"""
def close(self):
"""Close admin client and clean up resources."""Classes for creating and modifying topics.
class NewTopic:
def __init__(self, name, num_partitions, replication_factor,
replica_assignments=None, topic_configs=None):
"""
Topic creation specification.
Parameters:
- name: str, topic name
- num_partitions: int, number of partitions
- replication_factor: int, replication factor
- replica_assignments: Dict[int, List[int]], manual replica assignments
- topic_configs: Dict[str, str], topic configuration overrides
"""
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}
class NewPartitions:
def __init__(self, total_count, new_assignments=None):
"""
Partition addition specification.
Parameters:
- total_count: int, new total partition count
- new_assignments: List[List[int]], replica assignments for new partitions
"""
self.total_count = total_count
self.new_assignments = new_assignments
class TopicDescription:
name: str # Topic name
partitions: List[PartitionMetadata] # Partition metadata
is_internal: bool # Internal topic flag
authorizedOperations: List[int] # Authorized operationsClasses for managing broker and topic configurations.
class ConfigResource:
def __init__(self, resource_type, name, configs=None):
"""
Configuration resource specification.
Parameters:
- resource_type: ConfigResourceType, resource type
- name: str, resource name
- configs: Dict[str, str], configuration key-value pairs
"""
self.resource_type = resource_type
self.name = name
self.configs = configs or {}
class ConfigResourceType:
BROKER = 4 # Broker configuration
TOPIC = 2 # Topic configuration
class ConfigResourceResult:
configs: Dict[str, ConfigEntry] # Configuration entries
error_code: int # Error code (0 = success)
error_message: str # Error description
class ConfigEntry:
name: str # Configuration key
value: str # Configuration value
is_default: bool # Is default value
is_sensitive: bool # Is sensitive value
is_read_only: bool # Is read-only
synonyms: List['ConfigSynonym'] # Configuration synonymsClasses for managing access control and authorization.
class ACL:
def __init__(self, principal, host, operation, permission_type, resource_pattern):
"""
Access control list entry.
Parameters:
- principal: str, principal (user/service)
- host: str, host pattern
- operation: ACLOperation, operation type
- permission_type: ACLPermissionType, ALLOW or DENY
- resource_pattern: ResourcePattern, resource pattern
"""
self.principal = principal
self.host = host
self.operation = operation
self.permission_type = permission_type
self.resource_pattern = resource_pattern
class ACLFilter:
def __init__(self, principal=None, host=None, operation=None,
permission_type=None, resource_pattern_filter=None):
"""
ACL filter for queries (allows ANY values).
Parameters:
- principal: str|ACLFilter.ANY, principal filter
- host: str|ACLFilter.ANY, host filter
- operation: ACLOperation|ACLFilter.ANY, operation filter
- permission_type: ACLPermissionType|ACLFilter.ANY, permission filter
- resource_pattern_filter: ResourcePatternFilter, resource filter
"""
self.principal = principal
self.host = host
self.operation = operation
self.permission_type = permission_type
self.resource_pattern_filter = resource_pattern_filter
class ResourcePattern:
def __init__(self, resource_type, resource_name, pattern_type):
"""
Resource pattern specification.
Parameters:
- resource_type: ResourceType, type of resource
- resource_name: str, resource name pattern
- pattern_type: ACLResourcePatternType, pattern matching type
"""
self.resource_type = resource_type
self.resource_name = resource_name
self.pattern_type = pattern_type
class ResourcePatternFilter:
def __init__(self, resource_type=None, resource_name=None, pattern_type=None):
"""Resource pattern filter (allows ANY values)."""
self.resource_type = resource_type
self.resource_name = resource_name
self.pattern_type = pattern_typeclass ACLOperation:
ANY = -1
ALL = 0
READ = 1
WRITE = 2
CREATE = 3
DELETE = 4
ALTER = 5
DESCRIBE = 6
CLUSTER_ACTION = 7
DESCRIBE_CONFIGS = 8
ALTER_CONFIGS = 9
IDEMPOTENT_WRITE = 10
class ResourceType:
UNKNOWN = 0
ANY = 1
CLUSTER = 2
DELEGATION_TOKEN = 3
GROUP = 4
TOPIC = 5
TRANSACTIONAL_ID = 6
class ACLPermissionType:
ANY = 0
DENY = 1
ALLOW = 2
class ACLResourcePatternType:
ANY = 0
MATCH = 1
LITERAL = 2
PREFIXED = 3Classes for consumer group administration.
class GroupDescription:
group_id: str # Group ID
is_simple_consumer_group: bool # Simple consumer group flag
members: List[MemberDescription] # Group members
partition_assignor: str # Partition assignment strategy
state: str # Group state
coordinator: Node # Group coordinator
authorized_operations: List[int] # Authorized operations
class MemberDescription:
member_id: str # Member ID
client_id: str # Client ID
host: str # Client host
assignment: MemberAssignment # Partition assignment
class MemberAssignment:
topic_partitions: Set[TopicPartition] # Assigned partitionsfrom kafka import KafkaAdminClient
from kafka.admin import NewTopic, NewPartitions
from kafka.errors import TopicAlreadyExistsError, KafkaError
# Create admin client
admin = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
client_id='admin-client'
)
try:
# Create topics
topics = [
NewTopic(name='events', num_partitions=3, replication_factor=1),
NewTopic(name='logs', num_partitions=6, replication_factor=1,
topic_configs={'retention.ms': '86400000'}) # 1 day retention
]
create_result = admin.create_topics(topics, timeout_ms=30000)
# Wait for results
for topic, future in create_result.items():
try:
future.result() # Block until completion
print(f"Topic '{topic}' created successfully")
except TopicAlreadyExistsError:
print(f"Topic '{topic}' already exists")
except KafkaError as e:
print(f"Failed to create topic '{topic}': {e}")
# List topics
topics = admin.list_topics(timeout_ms=10000)
print(f"Available topics: {list(topics)}")
# Add partitions to existing topic
partition_updates = {
'events': NewPartitions(total_count=5) # Increase from 3 to 5 partitions
}
partition_result = admin.create_partitions(partition_updates, timeout_ms=30000)
for topic, future in partition_result.items():
try:
future.result()
print(f"Added partitions to topic '{topic}'")
except KafkaError as e:
print(f"Failed to add partitions to '{topic}': {e}")
finally:
admin.close()from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# Get detailed topic information
topic_descriptions = admin.describe_topics(['events', 'logs'], timeout_ms=10000)
for topic_name, description in topic_descriptions.items():
print(f"\nTopic: {topic_name}")
print(f"Internal: {description.is_internal}")
print(f"Partitions: {len(description.partitions)}")
for partition in description.partitions:
print(f" Partition {partition.partition}:")
print(f" Leader: {partition.leader}")
print(f" Replicas: {partition.replicas}")
print(f" ISR: {partition.isr}")
finally:
admin.close()from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# Describe topic configurations
topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'events')
broker_resource = ConfigResource(ConfigResourceType.BROKER, '0')
config_results = admin.describe_configs([topic_resource, broker_resource],
timeout_ms=10000)
for resource, result in config_results.items():
print(f"\nConfigurations for {resource.resource_type} '{resource.name}':")
for name, entry in result.configs.items():
if not entry.is_default: # Only show non-default configs
print(f" {name} = {entry.value}")
# Alter topic configuration
config_updates = {
topic_resource: {
'retention.ms': '172800000', # 2 days
'cleanup.policy': 'delete'
}
}
alter_result = admin.alter_configs(config_updates, timeout_ms=30000)
for resource, future in alter_result.items():
try:
future.result()
print(f"Configuration updated for {resource.name}")
except KafkaError as e:
print(f"Failed to update configuration: {e}")
finally:
admin.close()from kafka import KafkaAdminClient
from kafka.admin import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter,
ACLOperation, ACLPermissionType, ResourceType,
ACLResourcePatternType)
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# Create ACLs
acls = [
ACL(
principal='User:alice',
host='*',
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(
resource_type=ResourceType.TOPIC,
resource_name='events',
pattern_type=ACLResourcePatternType.LITERAL
)
),
ACL(
principal='User:bob',
host='192.168.1.*',
operation=ACLOperation.WRITE,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(
resource_type=ResourceType.TOPIC,
resource_name='logs-*',
pattern_type=ACLResourcePatternType.PREFIXED
)
)
]
create_result = admin.create_acls(acls, timeout_ms=30000)
for acl, future in create_result.items():
try:
future.result()
print(f"ACL created for {acl.principal}")
except KafkaError as e:
print(f"Failed to create ACL: {e}")
# List ACLs
acl_filter = ACLFilter(
resource_pattern_filter=ResourcePatternFilter(
resource_type=ResourceType.TOPIC
)
)
acl_bindings = admin.describe_acls(acl_filter, timeout_ms=10000)
print(f"\nFound {len(acl_bindings)} ACLs:")
for binding in acl_bindings:
print(f" {binding.principal} {binding.permission_type} "
f"{binding.operation} on {binding.pattern.resource_name}")
finally:
admin.close()from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# List consumer groups
groups = admin.list_consumer_groups(timeout_ms=10000)
print(f"Found {len(groups)} consumer groups:")
for group in groups:
print(f" Group: {group.group}, State: {group.state}")
# Describe specific consumer groups
group_ids = ['my-consumer-group', 'batch-processor']
descriptions = admin.describe_consumer_groups(group_ids, timeout_ms=10000)
for group_id, description in descriptions.items():
print(f"\nGroup: {group_id}")
print(f"State: {description.state}")
print(f"Coordinator: {description.coordinator}")
print(f"Assignment Strategy: {description.partition_assignor}")
print(f"Members: {len(description.members)}")
for member in description.members:
print(f" Member: {member.member_id}")
print(f" Client: {member.client_id}")
print(f" Host: {member.host}")
print(f" Partitions: {len(member.assignment.topic_partitions)}")
# Delete inactive consumer group
delete_result = admin.delete_consumer_groups(['inactive-group'], timeout_ms=30000)
for group_id, future in delete_result.items():
try:
future.result()
print(f"Consumer group '{group_id}' deleted")
except KafkaError as e:
print(f"Failed to delete group '{group_id}': {e}")
finally:
admin.close()from kafka import KafkaAdminClient
from kafka.client_async import KafkaClient
# Using admin client for high-level operations
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Using low-level client for detailed cluster info
client = KafkaClient(bootstrap_servers=['localhost:9092'])
try:
# Wait for client to connect and load metadata
client.poll(timeout_ms=5000)
# Get cluster metadata
cluster = client.cluster
print("Cluster Information:")
print(f"Cluster ID: {cluster.cluster_id}")
print(f"Controller: {cluster.controller}")
print(f"\nBrokers ({len(cluster.brokers())}):")
for broker in cluster.brokers():
print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")
if broker.rack:
print(f" Rack: {broker.rack}")
print(f"\nTopics ({len(cluster.topics())}):")
for topic in sorted(cluster.topics()):
partitions = cluster.partitions_for_topic(topic)
print(f" {topic}: {len(partitions)} partitions")
for partition_id in sorted(partitions):
partition = cluster.leader_for_partition(TopicPartition(topic, partition_id))
print(f" Partition {partition_id}: Leader {partition}")
finally:
admin.close()
client.close()from kafka import KafkaAdminClient
from kafka.admin import NewTopic
import concurrent.futures
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# Create multiple topics concurrently
topics = [
NewTopic(f'partition-{i}', num_partitions=i+1, replication_factor=1)
for i in range(10)
]
create_result = admin.create_topics(topics, timeout_ms=60000)
# Process results as they complete
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit all futures
future_to_topic = {
executor.submit(future.result): topic_name
for topic_name, future in create_result.items()
}
# Process completed futures
for future in concurrent.futures.as_completed(future_to_topic, timeout=60):
topic_name = future_to_topic[future]
try:
future.result()
print(f"✓ Topic '{topic_name}' created")
except Exception as e:
print(f"✗ Topic '{topic_name}' failed: {e}")
finally:
admin.close()Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python