CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

connection.mddocs/

Connection and Configuration

Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.

Capabilities

BrokerConnection

Low-level connection management for individual Kafka brokers.

class BrokerConnection:
    def __init__(self, host, port, **configs):
        """
        Create a connection to a Kafka broker.
        
        Args:
            host (str): Broker hostname
            port (int): Broker port
            **configs: Connection configuration options including:
                socket_timeout_ms (int): Socket timeout
                socket_receive_buffer_bytes (int): Socket receive buffer size
                socket_send_buffer_bytes (int): Socket send buffer size
                socket_keepalive (bool): Enable TCP keepalive
                security_protocol (str): Security protocol
                ssl_context: SSL context
                ssl_check_hostname (bool): Verify SSL hostname
                ssl_cafile (str): CA certificate file path
                ssl_certfile (str): Client certificate file path
                ssl_keyfile (str): Client key file path
                ssl_crlfile (str): Certificate revocation list file
                ssl_password (str): Private key password
                sasl_mechanism (str): SASL mechanism
                sasl_plain_username (str): SASL PLAIN username
                sasl_plain_password (str): SASL PLAIN password
                sasl_kerberos_service_name (str): Kerberos service name
                sasl_oauth_token_provider: OAuth token provider
        """
        
    def connect(self, timeout=None):
        """
        Establish connection to broker.
        
        Args:
            timeout (float): Connection timeout in seconds
            
        Returns:
            bool: True if connection successful
        """
        
    def close(self):
        """Close the connection."""
        
    def connected(self):
        """
        Check if connection is active.
        
        Returns:
            bool: True if connected
        """
        
    def send(self, request):
        """
        Send request to broker.
        
        Args:
            request: Protocol request object
        """
        
    def recv(self):
        """
        Receive response from broker.
        
        Returns:
            Response object from broker
        """

Client Configuration

Common configuration options for Kafka clients.

# Bootstrap and Connection Settings
bootstrap_servers = ['localhost:9092']  # List of broker addresses
client_id = 'my-kafka-client'          # Client identifier
connections_max_idle_ms = 540000       # Max connection idle time
request_timeout_ms = 30000             # Request timeout
retry_backoff_ms = 100                 # Retry backoff time
reconnect_backoff_ms = 50              # Reconnection backoff
reconnect_backoff_max_ms = 1000        # Max reconnection backoff

# Security Settings  
security_protocol = 'PLAINTEXT'        # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
ssl_context = None                     # Custom SSL context
ssl_check_hostname = True              # Verify SSL hostname
ssl_cafile = '/path/to/ca-cert.pem'   # CA certificate file
ssl_certfile = '/path/to/client.pem'  # Client certificate
ssl_keyfile = '/path/to/client.key'   # Client private key
ssl_password = 'key-password'          # Private key password
ssl_crlfile = '/path/to/crl.pem'      # Certificate revocation list

# SASL Authentication
sasl_mechanism = 'PLAIN'               # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
sasl_plain_username = 'myuser'         # SASL PLAIN username
sasl_plain_password = 'mypassword'     # SASL PLAIN password
sasl_kerberos_service_name = 'kafka'   # Kerberos service name
sasl_oauth_token_provider = None       # OAuth token provider

# Network and Socket Settings
socket_timeout_ms = 30000              # Socket timeout
socket_receive_buffer_bytes = 65536    # Socket receive buffer
socket_send_buffer_bytes = 131072      # Socket send buffer  
socket_keepalive = False               # TCP keepalive

Authentication Mechanisms

Supported authentication mechanisms with configuration examples.

# SASL/PLAIN Authentication
sasl_plain_config = {
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'PLAIN', 
    'sasl_plain_username': 'myuser',
    'sasl_plain_password': 'mypassword'
}

# SASL/SCRAM Authentication  
sasl_scram_config = {
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'SCRAM-SHA-256',
    'sasl_plain_username': 'myuser',
    'sasl_plain_password': 'mypassword',
    'ssl_cafile': 'ca-cert.pem'
}

# Kerberos/GSSAPI Authentication
sasl_kerberos_config = {
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'GSSAPI',
    'sasl_kerberos_service_name': 'kafka'
}

# OAuth Bearer Authentication  
sasl_oauth_config = {
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'OAUTHBEARER',
    'sasl_oauth_token_provider': CustomTokenProvider()
}

# SSL Client Certificate Authentication
ssl_client_cert_config = {
    'security_protocol': 'SSL',
    'ssl_cafile': 'ca-cert.pem',
    'ssl_certfile': 'client-cert.pem', 
    'ssl_keyfile': 'client-key.pem',
    'ssl_password': 'key-password'
}

OAuth Token Provider

Abstract base class for implementing OAuth token providers.

class AbstractTokenProvider:
    def token(self):
        """
        Get current OAuth token.
        
        Returns:
            str: Valid OAuth token
        """
        
    def close(self):
        """Clean up token provider resources."""

Usage Examples

Basic Connection

from kafka import KafkaProducer, KafkaConsumer

# Basic connection to local Kafka
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    client_id='my-producer'
)

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    client_id='my-consumer',
    group_id='my-group'
)

SSL Encryption

import ssl
from kafka import KafkaProducer

# SSL with CA verification
producer = KafkaProducer(
    bootstrap_servers=['secure-broker:9093'],
    security_protocol='SSL',
    ssl_check_hostname=True,
    ssl_cafile='ca-cert.pem',
    ssl_certfile='client-cert.pem',  # Optional client cert
    ssl_keyfile='client-key.pem',    # Optional client key
    ssl_password='key-password'       # Optional key password
)

# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

producer = KafkaProducer(
    bootstrap_servers=['broker:9093'],
    security_protocol='SSL',
    ssl_context=ssl_context
)

SASL Authentication

from kafka import KafkaProducer

# SASL/PLAIN over plaintext
producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='PLAIN',
    sasl_plain_username='alice',
    sasl_plain_password='secret'
)

# SASL/SCRAM over SSL
producer = KafkaProducer(
    bootstrap_servers=['secure-broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='bob',
    sasl_plain_password='secret',
    ssl_cafile='ca-cert.pem'
)

# Kerberos authentication
producer = KafkaProducer(
    bootstrap_servers=['kerb-broker:9092'],
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='GSSAPI',
    sasl_kerberos_service_name='kafka'
)

AWS MSK IAM Authentication

from kafka import KafkaProducer
from kafka.oauth import AbstractTokenProvider
import boto3

class AWSTokenProvider(AbstractTokenProvider):
    def __init__(self, region='us-east-1'):
        self.region = region
        self.session = boto3.Session()
        
    def token(self):
        # Generate AWS IAM token for MSK
        client = self.session.client('kafka', region_name=self.region)
        # Implementation would generate proper AWS IAM token
        return "aws-iam-token"

# AWS MSK with IAM
producer = KafkaProducer(
    bootstrap_servers=['msk-cluster.amazonaws.com:9098'],
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=AWSTokenProvider(),
    ssl_check_hostname=True
)

Custom OAuth Provider

from kafka import KafkaProducer
from kafka.oauth import AbstractTokenProvider
import requests

class CustomOAuthProvider(AbstractTokenProvider):
    def __init__(self, client_id, client_secret, token_url):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token = None
        self._expires_at = 0
        
    def token(self):
        import time
        if self._token is None or time.time() >= self._expires_at:
            self._refresh_token()
        return self._token
        
    def _refresh_token(self):
        import time
        response = requests.post(self.token_url, data={
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        })
        data = response.json()
        self._token = data['access_token']
        self._expires_at = time.time() + data['expires_in'] - 60
        
    def close(self):
        pass

producer = KafkaProducer(
    bootstrap_servers=['oauth-broker:9092'],
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=CustomOAuthProvider(
        client_id='my-client',
        client_secret='my-secret',
        token_url='https://auth.example.com/token'
    )
)

Connection Tuning

from kafka import KafkaProducer

# High-performance connection settings
producer = KafkaProducer(
    bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
    
    # Connection settings
    connections_max_idle_ms=600000,        # 10 minutes
    request_timeout_ms=30000,              # 30 seconds
    retry_backoff_ms=100,                  # Fast retries
    reconnect_backoff_ms=50,               # Fast reconnection
    reconnect_backoff_max_ms=1000,         # Max 1 second backoff
    
    # Socket settings
    socket_timeout_ms=30000,               # 30 second socket timeout
    socket_receive_buffer_bytes=131072,    # 128KB receive buffer
    socket_send_buffer_bytes=131072,       # 128KB send buffer
    socket_keepalive=True,                 # Enable TCP keepalive
    
    # Client identification
    client_id='high-perf-producer'
)

Multiple Cluster Connection

from kafka import KafkaProducer, KafkaConsumer

# Connect to multiple clusters
primary_producer = KafkaProducer(
    bootstrap_servers=['primary-broker:9092'],
    client_id='primary-producer'
)

backup_producer = KafkaProducer(
    bootstrap_servers=['backup-broker:9092'],
    client_id='backup-producer'
)

# Cross-cluster replication consumer
consumer = KafkaConsumer(
    'source-topic',
    bootstrap_servers=['source-cluster:9092'],
    group_id='replicator'
)

for message in consumer:
    # Replicate to backup cluster
    backup_producer.send('target-topic', 
                        key=message.key, 
                        value=message.value)

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json