or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md
tile.json

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kafka-python@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-kafka-python@2.2.0

index.mddocs/

kafka-python

A pure Python client library for Apache Kafka distributed stream processing system. Provides comprehensive support for producers, consumers, and administrative operations with compatibility across Kafka broker versions 0.8.0 to 2.6+.

Version Compatibility

  • Core Features: Compatible with Kafka 0.8.0+
  • Consumer Groups: Requires Kafka 0.9.0+ for coordinated consumer groups
  • Idempotent Producer: Requires Kafka 0.11.0+
  • Transactional Producer: Requires Kafka 0.11.0+
  • Admin API: Requires Kafka 0.10.1.0+
  • Headers Support: Requires Kafka 0.11.0+
  • Exactly-Once Semantics: Requires Kafka 0.11.0+

Package Information

  • Package Name: kafka-python
  • Language: Python
  • Installation: pip install kafka-python
  • Optional Dependencies:
    • pip install kafka-python[crc32c] - Faster CRC32C validation
    • pip install kafka-python[lz4] - LZ4 compression support
    • pip install kafka-python[snappy] - Snappy compression support
    • pip install kafka-python[zstd] - Zstandard compression support

Core Imports

Basic imports for common usage:

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

Topic and partition management:

from kafka import TopicPartition, OffsetAndMetadata

Consumer coordination:

from kafka.consumer import ConsumerRebalanceListener

Administrative objects:

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ACL

Low-level client and connection:

from kafka import KafkaClient, BrokerConnection

Serialization interfaces:

from kafka import Serializer, Deserializer

Basic Usage

Simple Producer

from kafka import KafkaProducer
import json

# Configure producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Send message
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')

# Block for acknowledgment  
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

# Clean shutdown
producer.close()

Simple Consumer

from kafka import KafkaConsumer
import json

# Configure consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-consumer-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    auto_offset_reset='earliest'
)

# Consume messages
for message in consumer:
    print(f"Received: key={message.key}, value={message.value}")
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")

# Clean shutdown
consumer.close()

Administrative Operations

from kafka import KafkaAdminClient
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType

# Configure admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Create topic
topic = NewTopic(name='new-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])

# List topics
topics = admin.list_topics()
print(f"Available topics: {list(topics)}")

# Clean shutdown  
admin.close()

Architecture

kafka-python follows a layered architecture designed for both simplicity and extensibility:

  • High-Level APIs: KafkaProducer, KafkaConsumer, KafkaAdminClient provide simple interfaces for common operations
  • Low-Level Client: KafkaClient handles protocol communication and connection management
  • Connection Layer: BrokerConnection manages individual TCP connections with proper error handling and reconnection
  • Protocol Layer: Complete implementation of Kafka wire protocol with support for all API versions
  • Pluggable Components: Serializers, partitioners, and metrics collectors can be customized or replaced

This design enables everything from simple fire-and-forget message sending to complex transactional processing and cluster administration.

Capabilities

Message Production

High-level producer for publishing records to Kafka topics with support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.

class KafkaProducer:
    def __init__(self, **configs): ...
    def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): ...
    def flush(self, timeout=None): ...
    def close(self, timeout=None): ...

Producer API

Message Consumption

High-level consumer for consuming records from Kafka topics with support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.

class KafkaConsumer:
    def __init__(self, *topics, **configs): ...
    def subscribe(self, topics=(), pattern=None, listener=None): ...
    def assign(self, partitions): ...
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
    def commit(self, offsets=None): ...
    def seek(self, partition, offset): ...
class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked): ...
    def on_partitions_assigned(self, assigned): ...

Consumer API

Cluster Administration

Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.

class KafkaAdminClient:
    def __init__(self, **configs): ...
    def create_topics(self, topic_requests, timeout_ms=None, validate_only=False): ...
    def delete_topics(self, topics, timeout_ms=None): ...
    def list_topics(self, timeout_ms=None): ...
    def describe_topics(self, topics, timeout_ms=None): ...
    def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False): ...
    def describe_configs(self, config_resources, timeout_ms=None): ...
    def alter_configs(self, config_resources, timeout_ms=None): ...

Administrative API

Data Structures and Metadata

Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.

TopicPartition = NamedTuple('TopicPartition', [('topic', str), ('partition', int)])
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [('offset', int), ('metadata', str), ('leader_epoch', int)])
BrokerMetadata = NamedTuple('BrokerMetadata', [('nodeId', int), ('host', str), ('port', int), ('rack', str)])

Data Structures

Error Handling and Exceptions

Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.

class KafkaError(Exception):
    retriable: bool
    invalid_metadata: bool

class KafkaTimeoutError(KafkaError): ...
class KafkaConnectionError(KafkaError): ...  
class TopicAuthorizationFailedError(AuthorizationError): ...

Error Handling

Low-Level Client and Connection Management

Low-level client for direct protocol communication and connection management, providing fine-grained control over cluster interactions and metadata handling.

class KafkaClient:
    def __init__(self, **configs): ...
    def bootstrap_connected(self): ...
    def check_version(self, node_id=None, timeout=2, strict=False): ...
    def cluster(self): ...

class BrokerConnection:
    def __init__(self, host, port, **configs): ...
    def connect(self): ...
    def connected(self): ...
    def close(self): ...

Serialization Framework

Abstract base classes for implementing custom serializers and deserializers with pluggable serialization strategies.

class Serializer:
    def serialize(self, topic, value): ...
    def close(self): ...

class Deserializer:
    def deserialize(self, topic, bytes_): ...
    def close(self): ...

Authentication and Security

kafka-python supports multiple authentication mechanisms:

  • SASL/PLAIN: Simple username/password authentication
  • SASL/SCRAM: Challenge-response authentication with SHA-256/SHA-512
  • SASL/GSSAPI: Kerberos authentication (Unix systems)
  • SASL/OAUTHBEARER: OAuth 2.0 Bearer token authentication
  • AWS MSK IAM: AWS-specific IAM authentication for Amazon MSK
  • SSL/TLS: Encrypted connections with certificate validation

Compression and Serialization

Built-in support for multiple compression algorithms:

  • gzip: Standard compression with configurable levels
  • snappy: Fast compression optimized for speed
  • lz4: High-speed compression with Kafka-specific framing
  • zstd: Modern compression with excellent compression ratios

Extensible serialization framework with abstract base classes for custom serializers and deserializers.

Types

Core Configuration Types

# Producer configuration dictionary with string keys and various value types
ProducerConfig = Dict[str, Any]

# Consumer configuration dictionary with string keys and various value types  
ConsumerConfig = Dict[str, Any]

# Admin client configuration dictionary with string keys and various value types
AdminConfig = Dict[str, Any]

Message Types

# Producer message future returned by send()
class FutureRecordMetadata:
    def get(self, timeout=None): ...
    def add_callback(self, callback): ...
    def add_errback(self, errback): ...

# Consumer record received from poll()
class ConsumerRecord:
    topic: str
    partition: int
    offset: int
    timestamp: int
    timestamp_type: int
    key: bytes
    value: bytes
    headers: List[Tuple[str, bytes]]