Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs). Provides comprehensive cluster management capabilities.
Main administrative client for cluster management operations.
class KafkaAdminClient:
def __init__(self, **configs):
"""
Create a KafkaAdminClient instance.
Args:
**configs: Admin client configuration options including:
bootstrap_servers (list): List of Kafka brokers
client_id (str): Client identifier
request_timeout_ms (int): Request timeout
connections_max_idle_ms (int): Connection idle timeout
retry_backoff_ms (int): Retry backoff time
security_protocol (str): Security protocol
ssl_context: SSL context
sasl_mechanism (str): SASL mechanism
sasl_plain_username (str): SASL username
sasl_plain_password (str): SASL password
"""
def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
"""
Create new topics.
Args:
new_topics (list): List of NewTopic objects
timeout_ms (int): Operation timeout
validate_only (bool): Only validate, don't create
Returns:
dict: Dictionary mapping topic name to CreateTopicsResponse.topic_errors
"""
def delete_topics(self, topics, timeout_ms=None):
"""
Delete topics.
Args:
topics (list): List of topic names to delete
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping topic name to DeleteTopicsResponse.topic_errors
"""
def list_topics(self, timeout_ms=None):
"""
List all topics in cluster.
Args:
timeout_ms (int): Operation timeout
Returns:
ClusterMetadata: Cluster metadata with topic information
"""
def describe_topics(self, topics, timeout_ms=None):
"""
Get detailed information about topics.
Args:
topics (list): List of topic names
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping topic name to TopicMetadata
"""
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
"""
Add partitions to existing topics.
Args:
partition_updates (dict): Dictionary mapping topic name to NewPartitions
timeout_ms (int): Operation timeout
validate_only (bool): Only validate, don't create
Returns:
dict: Dictionary mapping topic name to CreatePartitionsResponse.topic_errors
"""
def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False):
"""
Get configuration for resources.
Args:
config_resources (list): List of ConfigResource objects
timeout_ms (int): Operation timeout
include_synonyms (bool): Include config synonyms
Returns:
dict: Dictionary mapping ConfigResource to DescribeConfigsResponse.resources
"""
def alter_configs(self, config_updates, timeout_ms=None, validate_only=False):
"""
Alter configuration for resources.
Args:
config_updates (dict): Dictionary mapping ConfigResource to config changes
timeout_ms (int): Operation timeout
validate_only (bool): Only validate, don't alter
Returns:
dict: Dictionary mapping ConfigResource to AlterConfigsResponse.resources
"""
def list_consumer_groups(self, timeout_ms=None):
"""
List consumer groups in cluster.
Args:
timeout_ms (int): Operation timeout
Returns:
list: List of GroupInformation objects
"""
def describe_consumer_groups(self, group_ids, timeout_ms=None):
"""
Get detailed information about consumer groups.
Args:
group_ids (list): List of consumer group IDs
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping group ID to GroupInformation
"""
def delete_consumer_groups(self, group_ids, timeout_ms=None):
"""
Delete consumer groups.
Args:
group_ids (list): List of consumer group IDs to delete
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping group ID to delete response
"""
def list_consumer_group_offsets(self, group_id, partitions=None, timeout_ms=None):
"""
Get committed offsets for consumer group.
Args:
group_id (str): Consumer group ID
partitions (list): List of TopicPartition objects (None = all)
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping TopicPartition to OffsetAndMetadata
"""
def alter_consumer_group_offsets(self, group_id, offsets, timeout_ms=None):
"""
Alter committed offsets for consumer group.
Args:
group_id (str): Consumer group ID
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping TopicPartition to alter response
"""
def create_acls(self, acls, timeout_ms=None):
"""
Create access control lists.
Args:
acls (list): List of ACL objects
timeout_ms (int): Operation timeout
Returns:
list: List of CreateAclsResponse.creation_responses
"""
def describe_acls(self, acl_filter, timeout_ms=None):
"""
Describe access control lists matching filter.
Args:
acl_filter (ACLFilter): Filter for ACLs to describe
timeout_ms (int): Operation timeout
Returns:
list: List of ACL objects matching filter
"""
def delete_acls(self, acl_filters, timeout_ms=None):
"""
Delete access control lists matching filters.
Args:
acl_filters (list): List of ACLFilter objects
timeout_ms (int): Operation timeout
Returns:
list: List of DeleteAclsResponse.filter_responses
"""
def describe_cluster(self, timeout_ms=None):
"""
Get cluster metadata including brokers and cluster ID.
Args:
timeout_ms (int): Operation timeout
Returns:
ClusterMetadata: Cluster information including brokers and cluster ID
"""
def describe_log_dirs(self, broker_ids=None, timeout_ms=None):
"""
Describe log directories on brokers.
Args:
broker_ids (list): List of broker IDs (None = all brokers)
timeout_ms (int): Operation timeout
Returns:
dict: Dictionary mapping broker ID to log directory information
"""
def close(self):
"""Close the admin client and release resources."""Types for creating and managing topics.
class NewTopic:
def __init__(self, name, num_partitions, replication_factor, replica_assignments=None, topic_configs=None):
"""
Specification for creating a new topic.
Args:
name (str): Topic name
num_partitions (int): Number of partitions
replication_factor (int): Replication factor
replica_assignments (dict): Custom replica assignments (optional)
topic_configs (dict): Topic configuration properties (optional)
"""
name: str
num_partitions: int
replication_factor: int
replica_assignments: dict
topic_configs: dict
class NewPartitions:
def __init__(self, total_count, new_assignments=None):
"""
Specification for adding partitions to existing topic.
Args:
total_count (int): New total partition count
new_assignments (list): Replica assignments for new partitions (optional)
"""
total_count: int
new_assignments: listTypes for managing resource configurations.
class ConfigResource:
def __init__(self, resource_type, name, configs=None):
"""
Resource for configuration operations.
Args:
resource_type (ConfigResourceType): Type of resource
name (str): Resource name
configs (dict): Configuration properties (optional)
"""
resource_type: ConfigResourceType
name: str
configs: dict
class ConfigResourceType:
BROKER = 4 # Broker configuration
TOPIC = 2 # Topic configurationTypes for managing access control lists (ACLs).
class ACL:
def __init__(self, principal, host, operation, permission_type, resource_pattern):
"""
Access control list entry.
Args:
principal (str): Principal (user/service account)
host (str): Host pattern
operation (ACLOperation): Operation type
permission_type (ACLPermissionType): Permission type
resource_pattern (ResourcePattern): Resource pattern
"""
principal: str
host: str
operation: ACLOperation
permission_type: ACLPermissionType
resource_pattern: ResourcePattern
class ACLFilter:
def __init__(self, principal=None, host=None, operation=None, permission_type=None, resource_pattern=None):
"""Filter for ACL operations."""
class ResourcePattern:
def __init__(self, resource_type, resource_name, pattern_type):
"""
Resource pattern for ACL matching.
Args:
resource_type (ResourceType): Type of resource
resource_name (str): Resource name/pattern
pattern_type (ACLResourcePatternType): Pattern matching type
"""
resource_type: ResourceType
resource_name: str
pattern_type: ACLResourcePatternType
class ACLOperation:
ANY = 1
ALL = 2
READ = 3
WRITE = 4
CREATE = 5
DELETE = 6
ALTER = 7
DESCRIBE = 8
CLUSTER_ACTION = 9
DESCRIBE_CONFIGS = 10
ALTER_CONFIGS = 11
IDEMPOTENT_WRITE = 12
class ACLPermissionType:
ANY = 1
DENY = 2
ALLOW = 3
class ResourceType:
UNKNOWN = 0
ANY = 1
CLUSTER = 4
DELEGATION_TOKEN = 6
GROUP = 3
TOPIC = 2
TRANSACTIONAL_ID = 5
class ACLResourcePatternType:
ANY = 1
MATCH = 2
LITERAL = 3
PREFIXED = 4from kafka import KafkaAdminClient
from kafka.admin import NewTopic, NewPartitions
# Create admin client
admin = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
client_id='admin-client'
)
# Create topics
topics = [
NewTopic(
name='user-events',
num_partitions=6,
replication_factor=3,
topic_configs={
'cleanup.policy': 'compact',
'retention.ms': '604800000' # 7 days
}
),
NewTopic(
name='analytics',
num_partitions=12,
replication_factor=2
)
]
result = admin.create_topics(topics, timeout_ms=30000)
for topic, error in result.values():
if error is None:
print(f"Topic {topic} created successfully")
else:
print(f"Failed to create topic {topic}: {error}")
# Add partitions to existing topic
partition_updates = {
'user-events': NewPartitions(total_count=10)
}
admin.create_partitions(partition_updates)
# Delete topics
admin.delete_topics(['old-topic'], timeout_ms=30000)
admin.close()from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Get topic configuration
topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'my-topic')
config_result = admin.describe_configs([topic_resource])
for resource, config_response in config_result.items():
print(f"Configuration for {resource.name}:")
for config in config_response.configs:
print(f" {config.name} = {config.value}")
# Alter topic configuration
config_updates = {
topic_resource: {
'retention.ms': '86400000', # 1 day
'segment.ms': '3600000' # 1 hour
}
}
admin.alter_configs(config_updates)
admin.close()from kafka import KafkaAdminClient
from kafka.structs import TopicPartition, OffsetAndMetadata
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# List all consumer groups
groups = admin.list_consumer_groups()
for group in groups:
print(f"Group: {group.group}, State: {group.state}")
# Get detailed group information
group_details = admin.describe_consumer_groups(['my-consumer-group'])
for group_id, group_info in group_details.items():
print(f"Group {group_id}:")
print(f" State: {group_info.state}")
print(f" Protocol: {group_info.protocol}")
print(f" Members: {len(group_info.members)}")
# Get committed offsets
group_id = 'my-consumer-group'
offsets = admin.list_consumer_group_offsets(group_id)
for partition, offset_metadata in offsets.items():
print(f"{partition.topic}:{partition.partition} = {offset_metadata.offset}")
# Reset offsets
new_offsets = {
TopicPartition('my-topic', 0): OffsetAndMetadata(1000, 'reset'),
TopicPartition('my-topic', 1): OffsetAndMetadata(2000, 'reset')
}
admin.alter_consumer_group_offsets(group_id, new_offsets)
admin.close()from kafka import KafkaAdminClient
from kafka.admin import (ACL, ACLFilter, ResourcePattern,
ACLOperation, ACLPermissionType,
ResourceType, ACLResourcePatternType)
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Create ACLs
acls = [
ACL(
principal='User:alice',
host='*',
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(
resource_type=ResourceType.TOPIC,
resource_name='user-data',
pattern_type=ACLResourcePatternType.LITERAL
)
),
ACL(
principal='User:service-account',
host='*',
operation=ACLOperation.WRITE,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(
resource_type=ResourceType.TOPIC,
resource_name='events-',
pattern_type=ACLResourcePatternType.PREFIXED
)
)
]
admin.create_acls(acls)
# List ACLs
acl_filter = ACLFilter(
resource_pattern=ResourcePatternFilter(
resource_type=ResourceType.TOPIC,
resource_name=None, # All topics
pattern_type=ACLResourcePatternType.ANY
)
)
existing_acls = admin.describe_acls(acl_filter)
for acl in existing_acls:
print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.resource_pattern}")
admin.close()Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng