Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
npx @tessl/cli install tessl/pypi-kafka-python-ng@2.2.0A pure Python client library for Apache Kafka that provides high-level producer and consumer APIs, admin functionality, and full protocol support. Designed for compatibility with Kafka brokers from version 0.8.0 to 2.6+, offering Pythonic interfaces for distributed stream processing applications.
pip install kafka-python-ngimport kafkaFor specific functionality:
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClientFor data structures and error handling:
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.conn import BrokerConnection
from kafka.serializer import Serializer, DeserializerFor admin operations:
from kafka.admin import NewTopic, NewPartitions, ConfigResource, ConfigResourceTypefrom kafka import KafkaProducer
import json
# Create producer with JSON serialization
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send messages
producer.send('my-topic', {'key': 'value'})
producer.flush()
producer.close()from kafka import KafkaConsumer
import json
# Create consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Consume messages
for message in consumer:
print(f"Topic: {message.topic}, Value: {message.value}")from kafka import KafkaAdminClient
from kafka.admin import NewTopic
# Create admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Create topics
topic = NewTopic(name='test-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])The kafka-python-ng library is organized around three main client types:
The library uses an async I/O foundation with selector-based networking, automatic metadata management, and comprehensive error handling. It supports all major Kafka features including SASL authentication, SSL encryption, multiple compression formats, and transactional semantics.
High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. Supports asynchronous sending with futures and callback handling.
class KafkaProducer:
def __init__(self, **configs): ...
def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None): ...
def flush(self, timeout=None): ...
def close(self, timeout=None): ...High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based and manual partition assignment.
class KafkaConsumer:
def __init__(self, *topics, **configs): ...
def subscribe(self, topics, pattern=None, listener=None): ...
def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
def commit(self, offsets=None): ...
def close(self): ...Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs).
class KafkaAdminClient:
def __init__(self, **configs): ...
def create_topics(self, topic_list, timeout_ms=None, validate_only=False): ...
def delete_topics(self, topics, timeout_ms=None): ...
def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False): ...Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.
class BrokerConnection:
def __init__(self, host, port, **configs): ...
def connect(self, timeout=None): ...
def close(self): ...Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.
class TopicPartition:
def __init__(self, topic: str, partition: int): ...
class OffsetAndMetadata:
def __init__(self, offset: int, metadata: str): ...Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, and authentication failures.
class KafkaError(RuntimeError):
retriable: bool
invalid_metadata: bool
class NoBrokersAvailable(KafkaError): ...
class CommitFailedError(KafkaError): ...Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes.
class Serializer:
def serialize(self, topic: str, value) -> bytes: ...
def close(self): ...
class Deserializer:
def deserialize(self, topic: str, bytes_: bytes): ...
def close(self): ...# Core data structures
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
OffsetAndMetadata = namedtuple('OffsetAndMetadata', ['offset', 'metadata'])
BrokerMetadata = namedtuple('BrokerMetadata', ['nodeId', 'host', 'port', 'rack'])
PartitionMetadata = namedtuple('PartitionMetadata', ['topic', 'partition', 'leader', 'replicas', 'isr', 'error'])
OffsetAndTimestamp = namedtuple('OffsetAndTimestamp', ['offset', 'timestamp'])
# Admin types
class NewTopic:
def __init__(self, name: str, num_partitions: int, replication_factor: int, **configs): ...
class NewPartitions:
def __init__(self, total_count: int, new_assignments=None): ...
class ConfigResource:
def __init__(self, resource_type, name: str): ...
# Consumer callback interface
class ConsumerRebalanceListener:
def on_partitions_revoked(self, revoked): ...
def on_partitions_assigned(self, assigned): ...
# Connection management
class BrokerConnection:
def __init__(self, host: str, port: int, **configs): ...
def connect(self, timeout=None): ...
def close(self): ...
# Serialization interfaces
class Serializer:
def serialize(self, topic: str, value) -> bytes: ...
def close(self): ...
class Deserializer:
def deserialize(self, topic: str, bytes_: bytes): ...
def close(self): ...
# Future type for async operations
class FutureRecordMetadata:
def get(self, timeout=None): ...
def is_done(self) -> bool: ...
def add_callback(self, callback): ...
def add_errback(self, errback): ...