CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Kafka Python NG

A pure Python client library for Apache Kafka that provides high-level producer and consumer APIs, admin functionality, and full protocol support. Designed for compatibility with Kafka brokers from version 0.8.0 to 2.6+, offering Pythonic interfaces for distributed stream processing applications.

Package Information

  • Package Name: kafka-python-ng
  • Language: Python
  • Installation: pip install kafka-python-ng
  • Requirements: Python >= 3.8

Core Imports

import kafka

For specific functionality:

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

For data structures and error handling:

from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.conn import BrokerConnection
from kafka.serializer import Serializer, Deserializer

For admin operations:

from kafka.admin import NewTopic, NewPartitions, ConfigResource, ConfigResourceType

Basic Usage

Producer

from kafka import KafkaProducer
import json

# Create producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send messages
producer.send('my-topic', {'key': 'value'})
producer.flush()
producer.close()

Consumer

from kafka import KafkaConsumer
import json

# Create consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Consume messages
for message in consumer:
    print(f"Topic: {message.topic}, Value: {message.value}")

Admin Client

from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Create admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Create topics
topic = NewTopic(name='test-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])

Architecture

The kafka-python-ng library is organized around three main client types:

  • KafkaProducer: Thread-safe producer for publishing records with batching, compression, and retry logic
  • KafkaConsumer: High-level consumer with automatic group coordination, partition assignment, and offset management
  • KafkaAdminClient: Administrative operations for topics, configs, consumer groups, and ACLs

The library uses an async I/O foundation with selector-based networking, automatic metadata management, and comprehensive error handling. It supports all major Kafka features including SASL authentication, SSL encryption, multiple compression formats, and transactional semantics.

Capabilities

Producer API

High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. Supports asynchronous sending with futures and callback handling.

class KafkaProducer:
    def __init__(self, **configs): ...
    def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None): ...
    def flush(self, timeout=None): ...
    def close(self, timeout=None): ...

Producer Operations

Consumer API

High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based and manual partition assignment.

class KafkaConsumer:
    def __init__(self, *topics, **configs): ...
    def subscribe(self, topics, pattern=None, listener=None): ...
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
    def commit(self, offsets=None): ...
    def close(self): ...

Consumer Operations

Admin API

Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs).

class KafkaAdminClient:
    def __init__(self, **configs): ...
    def create_topics(self, topic_list, timeout_ms=None, validate_only=False): ...
    def delete_topics(self, topics, timeout_ms=None): ...
    def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False): ...

Administrative Operations

Configuration and Connection Management

Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.

class BrokerConnection:
    def __init__(self, host, port, **configs): ...
    def connect(self, timeout=None): ...
    def close(self): ...

Connection and Configuration

Data Structures and Types

Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.

class TopicPartition:
    def __init__(self, topic: str, partition: int): ...
    
class OffsetAndMetadata:
    def __init__(self, offset: int, metadata: str): ...

Data Structures

Error Handling

Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, and authentication failures.

class KafkaError(RuntimeError):
    retriable: bool
    invalid_metadata: bool
    
class NoBrokersAvailable(KafkaError): ...
class CommitFailedError(KafkaError): ...

Error Handling

Serialization

Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes.

class Serializer:
    def serialize(self, topic: str, value) -> bytes: ...
    def close(self): ...

class Deserializer:  
    def deserialize(self, topic: str, bytes_: bytes): ...
    def close(self): ...

Serialization

Types

# Core data structures
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
OffsetAndMetadata = namedtuple('OffsetAndMetadata', ['offset', 'metadata'])
BrokerMetadata = namedtuple('BrokerMetadata', ['nodeId', 'host', 'port', 'rack'])
PartitionMetadata = namedtuple('PartitionMetadata', ['topic', 'partition', 'leader', 'replicas', 'isr', 'error'])
OffsetAndTimestamp = namedtuple('OffsetAndTimestamp', ['offset', 'timestamp'])

# Admin types
class NewTopic:
    def __init__(self, name: str, num_partitions: int, replication_factor: int, **configs): ...

class NewPartitions:
    def __init__(self, total_count: int, new_assignments=None): ...

class ConfigResource:
    def __init__(self, resource_type, name: str): ...

# Consumer callback interface  
class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked): ...
    def on_partitions_assigned(self, assigned): ...

# Connection management
class BrokerConnection:
    def __init__(self, host: str, port: int, **configs): ...
    def connect(self, timeout=None): ...
    def close(self): ...

# Serialization interfaces
class Serializer:
    def serialize(self, topic: str, value) -> bytes: ...
    def close(self): ...

class Deserializer:
    def deserialize(self, topic: str, bytes_: bytes): ...
    def close(self): ...

# Future type for async operations
class FutureRecordMetadata:
    def get(self, timeout=None): ...
    def is_done(self) -> bool: ...
    def add_callback(self, callback): ...
    def add_errback(self, errback): ...
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kafka-python-ng@2.2.x
Publish Source
CLI
Badge
tessl/pypi-kafka-python-ng badge