or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md
tile.json

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kafka-python-ng@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-kafka-python-ng@2.2.0

index.mddocs/

Kafka Python NG

A pure Python client library for Apache Kafka that provides high-level producer and consumer APIs, admin functionality, and full protocol support. Designed for compatibility with Kafka brokers from version 0.8.0 to 2.6+, offering Pythonic interfaces for distributed stream processing applications.

Package Information

  • Package Name: kafka-python-ng
  • Language: Python
  • Installation: pip install kafka-python-ng
  • Requirements: Python >= 3.8

Core Imports

import kafka

For specific functionality:

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

For data structures and error handling:

from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.conn import BrokerConnection
from kafka.serializer import Serializer, Deserializer

For admin operations:

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ConfigResourceType

Basic Usage

Producer

from kafka import KafkaProducer
import json

# Create producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send messages
producer.send('my-topic', {'key': 'value'})
producer.flush()
producer.close()

Consumer

from kafka import KafkaConsumer
import json

# Create consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Consume messages
for message in consumer:
    print(f"Topic: {message.topic}, Value: {message.value}")

Admin Client

from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Create admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Create topics
topic = NewTopic(name='test-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])

Architecture

The kafka-python-ng library is organized around three main client types:

  • KafkaProducer: Thread-safe producer for publishing records with batching, compression, and retry logic
  • KafkaConsumer: High-level consumer with automatic group coordination, partition assignment, and offset management
  • KafkaAdminClient: Administrative operations for topics, configs, consumer groups, and ACLs

The library uses an async I/O foundation with selector-based networking, automatic metadata management, and comprehensive error handling. It supports all major Kafka features including SASL authentication, SSL encryption, multiple compression formats, and transactional semantics.

Capabilities

Producer API

High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. Supports asynchronous sending with futures and callback handling.

class KafkaProducer:
    def __init__(self, **configs): ...
    def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None): ...
    def flush(self, timeout=None): ...
    def close(self, timeout=None): ...

Producer Operations

Consumer API

High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based and manual partition assignment.

class KafkaConsumer:
    def __init__(self, *topics, **configs): ...
    def subscribe(self, topics, pattern=None, listener=None): ...
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
    def commit(self, offsets=None): ...
    def close(self): ...

Consumer Operations

Admin API

Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs).

class KafkaAdminClient:
    def __init__(self, **configs): ...
    def create_topics(self, topic_list, timeout_ms=None, validate_only=False): ...
    def delete_topics(self, topics, timeout_ms=None): ...
    def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False): ...

Administrative Operations

Configuration and Connection Management

Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.

class BrokerConnection:
    def __init__(self, host, port, **configs): ...
    def connect(self, timeout=None): ...
    def close(self): ...

Connection and Configuration

Data Structures and Types

Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.

class TopicPartition:
    def __init__(self, topic: str, partition: int): ...
    
class OffsetAndMetadata:
    def __init__(self, offset: int, metadata: str): ...

Data Structures

Error Handling

Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, and authentication failures.

class KafkaError(RuntimeError):
    retriable: bool
    invalid_metadata: bool
    
class NoBrokersAvailable(KafkaError): ...
class CommitFailedError(KafkaError): ...

Error Handling

Serialization

Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes.

class Serializer:
    def serialize(self, topic: str, value) -> bytes: ...
    def close(self): ...

class Deserializer:  
    def deserialize(self, topic: str, bytes_: bytes): ...
    def close(self): ...

Serialization

Types

# Core data structures
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
OffsetAndMetadata = namedtuple('OffsetAndMetadata', ['offset', 'metadata'])
BrokerMetadata = namedtuple('BrokerMetadata', ['nodeId', 'host', 'port', 'rack'])
PartitionMetadata = namedtuple('PartitionMetadata', ['topic', 'partition', 'leader', 'replicas', 'isr', 'error'])
OffsetAndTimestamp = namedtuple('OffsetAndTimestamp', ['offset', 'timestamp'])

# Admin types
class NewTopic:
    def __init__(self, name: str, num_partitions: int, replication_factor: int, **configs): ...

class NewPartitions:
    def __init__(self, total_count: int, new_assignments=None): ...

class ConfigResource:
    def __init__(self, resource_type, name: str): ...

# Consumer callback interface  
class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked): ...
    def on_partitions_assigned(self, assigned): ...

# Connection management
class BrokerConnection:
    def __init__(self, host: str, port: int, **configs): ...
    def connect(self, timeout=None): ...
    def close(self): ...

# Serialization interfaces
class Serializer:
    def serialize(self, topic: str, value) -> bytes: ...
    def close(self): ...

class Deserializer:
    def deserialize(self, topic: str, bytes_: bytes): ...
    def close(self): ...

# Future type for async operations
class FutureRecordMetadata:
    def get(self, timeout=None): ...
    def is_done(self) -> bool: ...
    def add_callback(self, callback): ...
    def add_errback(self, errback): ...