Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.
Low-level connection management for individual Kafka brokers.
class BrokerConnection:
def __init__(self, host, port, **configs):
"""
Create a connection to a Kafka broker.
Args:
host (str): Broker hostname
port (int): Broker port
**configs: Connection configuration options including:
socket_timeout_ms (int): Socket timeout
socket_receive_buffer_bytes (int): Socket receive buffer size
socket_send_buffer_bytes (int): Socket send buffer size
socket_keepalive (bool): Enable TCP keepalive
security_protocol (str): Security protocol
ssl_context: SSL context
ssl_check_hostname (bool): Verify SSL hostname
ssl_cafile (str): CA certificate file path
ssl_certfile (str): Client certificate file path
ssl_keyfile (str): Client key file path
ssl_crlfile (str): Certificate revocation list file
ssl_password (str): Private key password
sasl_mechanism (str): SASL mechanism
sasl_plain_username (str): SASL PLAIN username
sasl_plain_password (str): SASL PLAIN password
sasl_kerberos_service_name (str): Kerberos service name
sasl_oauth_token_provider: OAuth token provider
"""
def connect(self, timeout=None):
"""
Establish connection to broker.
Args:
timeout (float): Connection timeout in seconds
Returns:
bool: True if connection successful
"""
def close(self):
"""Close the connection."""
def connected(self):
"""
Check if connection is active.
Returns:
bool: True if connected
"""
def send(self, request):
"""
Send request to broker.
Args:
request: Protocol request object
"""
def recv(self):
"""
Receive response from broker.
Returns:
Response object from broker
"""Common configuration options for Kafka clients.
# Bootstrap and Connection Settings
bootstrap_servers = ['localhost:9092'] # List of broker addresses
client_id = 'my-kafka-client' # Client identifier
connections_max_idle_ms = 540000 # Max connection idle time
request_timeout_ms = 30000 # Request timeout
retry_backoff_ms = 100 # Retry backoff time
reconnect_backoff_ms = 50 # Reconnection backoff
reconnect_backoff_max_ms = 1000 # Max reconnection backoff
# Security Settings
security_protocol = 'PLAINTEXT' # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
ssl_context = None # Custom SSL context
ssl_check_hostname = True # Verify SSL hostname
ssl_cafile = '/path/to/ca-cert.pem' # CA certificate file
ssl_certfile = '/path/to/client.pem' # Client certificate
ssl_keyfile = '/path/to/client.key' # Client private key
ssl_password = 'key-password' # Private key password
ssl_crlfile = '/path/to/crl.pem' # Certificate revocation list
# SASL Authentication
sasl_mechanism = 'PLAIN' # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
sasl_plain_username = 'myuser' # SASL PLAIN username
sasl_plain_password = 'mypassword' # SASL PLAIN password
sasl_kerberos_service_name = 'kafka' # Kerberos service name
sasl_oauth_token_provider = None # OAuth token provider
# Network and Socket Settings
socket_timeout_ms = 30000 # Socket timeout
socket_receive_buffer_bytes = 65536 # Socket receive buffer
socket_send_buffer_bytes = 131072 # Socket send buffer
socket_keepalive = False # TCP keepaliveSupported authentication mechanisms with configuration examples.
# SASL/PLAIN Authentication
sasl_plain_config = {
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'myuser',
'sasl_plain_password': 'mypassword'
}
# SASL/SCRAM Authentication
sasl_scram_config = {
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'SCRAM-SHA-256',
'sasl_plain_username': 'myuser',
'sasl_plain_password': 'mypassword',
'ssl_cafile': 'ca-cert.pem'
}
# Kerberos/GSSAPI Authentication
sasl_kerberos_config = {
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'GSSAPI',
'sasl_kerberos_service_name': 'kafka'
}
# OAuth Bearer Authentication
sasl_oauth_config = {
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'OAUTHBEARER',
'sasl_oauth_token_provider': CustomTokenProvider()
}
# SSL Client Certificate Authentication
ssl_client_cert_config = {
'security_protocol': 'SSL',
'ssl_cafile': 'ca-cert.pem',
'ssl_certfile': 'client-cert.pem',
'ssl_keyfile': 'client-key.pem',
'ssl_password': 'key-password'
}Abstract base class for implementing OAuth token providers.
class AbstractTokenProvider:
def token(self):
"""
Get current OAuth token.
Returns:
str: Valid OAuth token
"""
def close(self):
"""Clean up token provider resources."""from kafka import KafkaProducer, KafkaConsumer
# Basic connection to local Kafka
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id='my-producer'
)
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
client_id='my-consumer',
group_id='my-group'
)import ssl
from kafka import KafkaProducer
# SSL with CA verification
producer = KafkaProducer(
bootstrap_servers=['secure-broker:9093'],
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile='ca-cert.pem',
ssl_certfile='client-cert.pem', # Optional client cert
ssl_keyfile='client-key.pem', # Optional client key
ssl_password='key-password' # Optional key password
)
# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
producer = KafkaProducer(
bootstrap_servers=['broker:9093'],
security_protocol='SSL',
ssl_context=ssl_context
)from kafka import KafkaProducer
# SASL/PLAIN over plaintext
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='alice',
sasl_plain_password='secret'
)
# SASL/SCRAM over SSL
producer = KafkaProducer(
bootstrap_servers=['secure-broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='bob',
sasl_plain_password='secret',
ssl_cafile='ca-cert.pem'
)
# Kerberos authentication
producer = KafkaProducer(
bootstrap_servers=['kerb-broker:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='GSSAPI',
sasl_kerberos_service_name='kafka'
)from kafka import KafkaProducer
from kafka.oauth import AbstractTokenProvider
import boto3
class AWSTokenProvider(AbstractTokenProvider):
def __init__(self, region='us-east-1'):
self.region = region
self.session = boto3.Session()
def token(self):
# Generate AWS IAM token for MSK
client = self.session.client('kafka', region_name=self.region)
# Implementation would generate proper AWS IAM token
return "aws-iam-token"
# AWS MSK with IAM
producer = KafkaProducer(
bootstrap_servers=['msk-cluster.amazonaws.com:9098'],
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=AWSTokenProvider(),
ssl_check_hostname=True
)from kafka import KafkaProducer
from kafka.oauth import AbstractTokenProvider
import requests
class CustomOAuthProvider(AbstractTokenProvider):
def __init__(self, client_id, client_secret, token_url):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token = None
self._expires_at = 0
def token(self):
import time
if self._token is None or time.time() >= self._expires_at:
self._refresh_token()
return self._token
def _refresh_token(self):
import time
response = requests.post(self.token_url, data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
})
data = response.json()
self._token = data['access_token']
self._expires_at = time.time() + data['expires_in'] - 60
def close(self):
pass
producer = KafkaProducer(
bootstrap_servers=['oauth-broker:9092'],
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=CustomOAuthProvider(
client_id='my-client',
client_secret='my-secret',
token_url='https://auth.example.com/token'
)
)from kafka import KafkaProducer
# High-performance connection settings
producer = KafkaProducer(
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
# Connection settings
connections_max_idle_ms=600000, # 10 minutes
request_timeout_ms=30000, # 30 seconds
retry_backoff_ms=100, # Fast retries
reconnect_backoff_ms=50, # Fast reconnection
reconnect_backoff_max_ms=1000, # Max 1 second backoff
# Socket settings
socket_timeout_ms=30000, # 30 second socket timeout
socket_receive_buffer_bytes=131072, # 128KB receive buffer
socket_send_buffer_bytes=131072, # 128KB send buffer
socket_keepalive=True, # Enable TCP keepalive
# Client identification
client_id='high-perf-producer'
)from kafka import KafkaProducer, KafkaConsumer
# Connect to multiple clusters
primary_producer = KafkaProducer(
bootstrap_servers=['primary-broker:9092'],
client_id='primary-producer'
)
backup_producer = KafkaProducer(
bootstrap_servers=['backup-broker:9092'],
client_id='backup-producer'
)
# Cross-cluster replication consumer
consumer = KafkaConsumer(
'source-topic',
bootstrap_servers=['source-cluster:9092'],
group_id='replicator'
)
for message in consumer:
# Replicate to backup cluster
backup_producer.send('target-topic',
key=message.key,
value=message.value)Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng