CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

kafka-python

A pure Python client library for Apache Kafka distributed stream processing system. Provides comprehensive support for producers, consumers, and administrative operations with compatibility across Kafka broker versions 0.8.0 to 2.6+.

Version Compatibility

  • Core Features: Compatible with Kafka 0.8.0+
  • Consumer Groups: Requires Kafka 0.9.0+ for coordinated consumer groups
  • Idempotent Producer: Requires Kafka 0.11.0+
  • Transactional Producer: Requires Kafka 0.11.0+
  • Admin API: Requires Kafka 0.10.1.0+
  • Headers Support: Requires Kafka 0.11.0+
  • Exactly-Once Semantics: Requires Kafka 0.11.0+

Package Information

  • Package Name: kafka-python
  • Language: Python
  • Installation: pip install kafka-python
  • Optional Dependencies:
    • pip install kafka-python[crc32c] - Faster CRC32C validation
    • pip install kafka-python[lz4] - LZ4 compression support
    • pip install kafka-python[snappy] - Snappy compression support
    • pip install kafka-python[zstd] - Zstandard compression support

Core Imports

Basic imports for common usage:

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

Topic and partition management:

from kafka import TopicPartition, OffsetAndMetadata

Consumer coordination:

from kafka.consumer import ConsumerRebalanceListener

Administrative objects:

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ACL

Low-level client and connection:

from kafka import KafkaClient, BrokerConnection

Serialization interfaces:

from kafka import Serializer, Deserializer

Basic Usage

Simple Producer

from kafka import KafkaProducer
import json

# Configure producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Send message
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')

# Block for acknowledgment  
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

# Clean shutdown
producer.close()

Simple Consumer

from kafka import KafkaConsumer
import json

# Configure consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-consumer-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    auto_offset_reset='earliest'
)

# Consume messages
for message in consumer:
    print(f"Received: key={message.key}, value={message.value}")
    print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")

# Clean shutdown
consumer.close()

Administrative Operations

from kafka import KafkaAdminClient
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType

# Configure admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Create topic
topic = NewTopic(name='new-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])

# List topics
topics = admin.list_topics()
print(f"Available topics: {list(topics)}")

# Clean shutdown  
admin.close()

Architecture

kafka-python follows a layered architecture designed for both simplicity and extensibility:

  • High-Level APIs: KafkaProducer, KafkaConsumer, KafkaAdminClient provide simple interfaces for common operations
  • Low-Level Client: KafkaClient handles protocol communication and connection management
  • Connection Layer: BrokerConnection manages individual TCP connections with proper error handling and reconnection
  • Protocol Layer: Complete implementation of Kafka wire protocol with support for all API versions
  • Pluggable Components: Serializers, partitioners, and metrics collectors can be customized or replaced

This design enables everything from simple fire-and-forget message sending to complex transactional processing and cluster administration.

Capabilities

Message Production

High-level producer for publishing records to Kafka topics with support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.

class KafkaProducer:
    def __init__(self, **configs): ...
    def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): ...
    def flush(self, timeout=None): ...
    def close(self, timeout=None): ...

Producer API

Message Consumption

High-level consumer for consuming records from Kafka topics with support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.

class KafkaConsumer:
    def __init__(self, *topics, **configs): ...
    def subscribe(self, topics=(), pattern=None, listener=None): ...
    def assign(self, partitions): ...
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
    def commit(self, offsets=None): ...
    def seek(self, partition, offset): ...
class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked): ...
    def on_partitions_assigned(self, assigned): ...

Consumer API

Cluster Administration

Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.

class KafkaAdminClient:
    def __init__(self, **configs): ...
    def create_topics(self, topic_requests, timeout_ms=None, validate_only=False): ...
    def delete_topics(self, topics, timeout_ms=None): ...
    def list_topics(self, timeout_ms=None): ...
    def describe_topics(self, topics, timeout_ms=None): ...
    def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False): ...
    def describe_configs(self, config_resources, timeout_ms=None): ...
    def alter_configs(self, config_resources, timeout_ms=None): ...

Administrative API

Data Structures and Metadata

Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.

TopicPartition = NamedTuple('TopicPartition', [('topic', str), ('partition', int)])
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [('offset', int), ('metadata', str), ('leader_epoch', int)])
BrokerMetadata = NamedTuple('BrokerMetadata', [('nodeId', int), ('host', str), ('port', int), ('rack', str)])

Data Structures

Error Handling and Exceptions

Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.

class KafkaError(Exception):
    retriable: bool
    invalid_metadata: bool

class KafkaTimeoutError(KafkaError): ...
class KafkaConnectionError(KafkaError): ...  
class TopicAuthorizationFailedError(AuthorizationError): ...

Error Handling

Low-Level Client and Connection Management

Low-level client for direct protocol communication and connection management, providing fine-grained control over cluster interactions and metadata handling.

class KafkaClient:
    def __init__(self, **configs): ...
    def bootstrap_connected(self): ...
    def check_version(self, node_id=None, timeout=2, strict=False): ...
    def cluster(self): ...

class BrokerConnection:
    def __init__(self, host, port, **configs): ...
    def connect(self): ...
    def connected(self): ...
    def close(self): ...

Serialization Framework

Abstract base classes for implementing custom serializers and deserializers with pluggable serialization strategies.

class Serializer:
    def serialize(self, topic, value): ...
    def close(self): ...

class Deserializer:
    def deserialize(self, topic, bytes_): ...
    def close(self): ...

Authentication and Security

kafka-python supports multiple authentication mechanisms:

  • SASL/PLAIN: Simple username/password authentication
  • SASL/SCRAM: Challenge-response authentication with SHA-256/SHA-512
  • SASL/GSSAPI: Kerberos authentication (Unix systems)
  • SASL/OAUTHBEARER: OAuth 2.0 Bearer token authentication
  • AWS MSK IAM: AWS-specific IAM authentication for Amazon MSK
  • SSL/TLS: Encrypted connections with certificate validation

Compression and Serialization

Built-in support for multiple compression algorithms:

  • gzip: Standard compression with configurable levels
  • snappy: Fast compression optimized for speed
  • lz4: High-speed compression with Kafka-specific framing
  • zstd: Modern compression with excellent compression ratios

Extensible serialization framework with abstract base classes for custom serializers and deserializers.

Types

Core Configuration Types

# Producer configuration dictionary with string keys and various value types
ProducerConfig = Dict[str, Any]

# Consumer configuration dictionary with string keys and various value types  
ConsumerConfig = Dict[str, Any]

# Admin client configuration dictionary with string keys and various value types
AdminConfig = Dict[str, Any]

Message Types

# Producer message future returned by send()
class FutureRecordMetadata:
    def get(self, timeout=None): ...
    def add_callback(self, callback): ...
    def add_errback(self, errback): ...

# Consumer record received from poll()
class ConsumerRecord:
    topic: str
    partition: int
    offset: int
    timestamp: int
    timestamp_type: int
    key: bytes
    value: bytes
    headers: List[Tuple[str, bytes]]

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json