Confluent's Python client for Apache Kafka
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The core producer and consumer classes provide fundamental Kafka functionality with high performance through the underlying librdkafka C library. These classes support all Kafka features including transactions, exactly-once semantics, and custom partitioning.
High-performance Kafka producer with support for asynchronous message delivery, custom partitioning, transactions, and delivery guarantees.
class Producer:
def __init__(self, conf):
"""
Create a new Producer instance.
Args:
conf (dict): Configuration properties for the producer
"""
def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
"""
Produce message to topic.
Args:
topic (str): Topic to produce to
value (bytes, str, optional): Message value
key (bytes, str, optional): Message key for partitioning
partition (int, optional): Specific partition (-1 for automatic)
on_delivery (callable, optional): Delivery report callback
timestamp (int, optional): Message timestamp (0 for current time)
headers (dict, optional): Message headers
Raises:
BufferError: If local producer queue is full
KafkaException: For other produce errors
"""
def poll(self, timeout=-1):
"""
Poll for events and call registered callbacks.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
int: Number of events processed
"""
def flush(self, timeout=-1):
"""
Wait for all messages to be delivered.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
int: Number of messages still in queue (0 on success)
"""
def purge(self, in_queue=True, in_flight=True, blocking=True):
"""
Purge messages from internal queues.
Args:
in_queue (bool): Purge messages in local queue
in_flight (bool): Purge messages in flight to broker
blocking (bool): Block until purge is complete
Returns:
int: Number of messages purged
"""
def abort_transaction(self, timeout=-1):
"""
Abort ongoing transaction.
Args:
timeout (float): Maximum time to wait in seconds
"""
def begin_transaction(self):
"""
Begin a new transaction.
"""
def commit_transaction(self, timeout=-1):
"""
Commit current transaction.
Args:
timeout (float): Maximum time to wait in seconds
"""
def init_transactions(self, timeout=-1):
"""
Initialize transactions for this producer.
Args:
timeout (float): Maximum time to wait in seconds
"""
def send_offsets_to_transaction(self, positions, group_metadata, timeout=-1):
"""
Send consumer offsets to transaction.
Args:
positions (list): List of TopicPartition objects with offsets
group_metadata (ConsumerGroupMetadata): Consumer group metadata
timeout (float): Maximum time to wait in seconds
"""
def list_topics(self, topic=None, timeout=-1):
"""
Get metadata for topics.
Args:
topic (str, optional): Specific topic name to query
timeout (float): Maximum time to wait in seconds
Returns:
ClusterMetadata: Cluster and topic metadata
"""High-performance Kafka consumer with support for consumer groups, manual/automatic offset management, and rebalancing.
class Consumer:
def __init__(self, conf):
"""
Create a new Consumer instance.
Args:
conf (dict): Configuration properties for the consumer
"""
def subscribe(self, topics, listener=None):
"""
Subscribe to list of topics for automatic partition assignment.
Args:
topics (list): List of topic names to subscribe to
listener (RebalanceCallback, optional): Rebalance callback
"""
def unsubscribe(self):
"""
Unsubscribe from current topic subscription.
"""
def assign(self, partitions):
"""
Manually assign partitions to consume from.
Args:
partitions (list): List of TopicPartition objects
"""
def assignment(self):
"""
Get current partition assignment.
Returns:
list: List of assigned TopicPartition objects
"""
def unassign(self):
"""
Remove current partition assignment.
"""
def poll(self, timeout=-1):
"""
Poll for messages.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
Message: Message object or None if timeout
"""
def consume(self, num_messages=1, timeout=-1):
"""
Consume multiple messages.
Args:
num_messages (int): Maximum number of messages to return
timeout (float): Maximum time to wait in seconds
Returns:
list: List of Message objects
"""
def commit(self, message=None, offsets=None, asynchronous=True):
"""
Commit message offset or specified offsets.
Args:
message (Message, optional): Commit offset for this message
offsets (list, optional): List of TopicPartition objects with offsets
asynchronous (bool): Commit asynchronously if True
Returns:
list: Committed offsets if synchronous, None if asynchronous
"""
def committed(self, partitions, timeout=-1):
"""
Get committed offsets for partitions.
Args:
partitions (list): List of TopicPartition objects
timeout (float): Maximum time to wait in seconds
Returns:
list: List of TopicPartition objects with committed offsets
"""
def position(self, partitions):
"""
Get current position (next fetch offset) for partitions.
Args:
partitions (list): List of TopicPartition objects
Returns:
list: List of TopicPartition objects with current positions
"""
def seek(self, partition):
"""
Seek to offset for partition.
Args:
partition (TopicPartition): Partition with offset to seek to
"""
def pause(self, partitions):
"""
Pause consumption for partitions.
Args:
partitions (list): List of TopicPartition objects to pause
"""
def resume(self, partitions):
"""
Resume consumption for partitions.
Args:
partitions (list): List of TopicPartition objects to resume
"""
def get_watermark_offsets(self, partition, timeout=-1, cached=False):
"""
Get low and high watermark offsets for partition.
Args:
partition (TopicPartition): Partition to query
timeout (float): Maximum time to wait in seconds
cached (bool): Use cached values if available
Returns:
tuple: (low_offset, high_offset)
"""
def offsets_for_times(self, partitions, timeout=-1):
"""
Get offsets for timestamps.
Args:
partitions (list): List of TopicPartition objects with timestamps
timeout (float): Maximum time to wait in seconds
Returns:
list: List of TopicPartition objects with offsets for timestamps
"""
def close(self):
"""
Close the consumer and leave consumer group.
"""
def store_offsets(self, message=None, offsets=None):
"""
Store offset for message or specified offsets.
Args:
message (Message, optional): Store offset for this message
offsets (list, optional): List of TopicPartition objects with offsets
"""
def incremental_assign(self, partitions):
"""
Incrementally add partitions to assignment.
Args:
partitions (list): List of TopicPartition objects to add
"""
def incremental_unassign(self, partitions):
"""
Incrementally remove partitions from assignment.
Args:
partitions (list): List of TopicPartition objects to remove
"""
def list_topics(self, topic=None, timeout=-1):
"""
Get metadata for topics.
Args:
topic (str, optional): Specific topic name to query
timeout (float): Maximum time to wait in seconds
Returns:
ClusterMetadata: Cluster and topic metadata
"""
def consumer_group_metadata(self):
"""
Get consumer group metadata for transactional operations.
Returns:
ConsumerGroupMetadata: Consumer group metadata object
"""Container for Kafka message data and metadata.
class Message:
def error(self):
"""
Get message error.
Returns:
KafkaError: Error object or None if no error
"""
def key(self):
"""
Get message key.
Returns:
bytes: Message key or None
"""
def value(self):
"""
Get message value.
Returns:
bytes: Message value or None
"""
def topic(self):
"""
Get message topic.
Returns:
str: Topic name
"""
def partition(self):
"""
Get message partition.
Returns:
int: Partition number
"""
def offset(self):
"""
Get message offset.
Returns:
int: Message offset
"""
def timestamp(self):
"""
Get message timestamp.
Returns:
tuple: (timestamp_type, timestamp) where timestamp_type is one of:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME
"""
def headers(self):
"""
Get message headers.
Returns:
dict: Dictionary of header key-value pairs or None
"""
def latency(self):
"""
Get message latency (produce time to broker acknowledgement).
Returns:
float: Latency in seconds or None
"""
def leader_epoch(self):
"""
Get leader epoch for the message.
Returns:
int: Leader epoch or None
"""
def set_key(self, key):
"""
Set message key.
Args:
key (bytes, str): New message key
"""
def set_value(self, value):
"""
Set message value.
Args:
value (bytes, str): New message value
"""
def set_headers(self, headers):
"""
Set message headers.
Args:
headers (dict): Dictionary of header key-value pairs
"""Represents a Kafka topic partition with optional offset.
class TopicPartition:
def __init__(self, topic, partition=None, offset=None):
"""
Create TopicPartition object.
Args:
topic (str): Topic name
partition (int, optional): Partition number
offset (int, optional): Offset value
"""
@property
def topic(self):
"""
Topic name.
Returns:
str: Topic name
"""
@property
def partition(self):
"""
Partition number.
Returns:
int: Partition number
"""
@property
def offset(self):
"""
Offset value.
Returns:
int: Offset value
"""
@offset.setter
def offset(self, value):
"""
Set offset value.
Args:
value (int): New offset value
"""
@property
def metadata(self):
"""
Partition metadata.
Returns:
str: Metadata string
"""
@property
def leader_epoch(self):
"""
Leader epoch.
Returns:
int: Leader epoch or None
"""
def __hash__(self):
"""Hash function for use in sets and dicts."""
def __eq__(self, other):
"""Equality comparison."""
def __lt__(self, other):
"""Less than comparison for sorting."""
def __str__(self):
"""String representation."""Represents a UUID (Universally Unique Identifier).
class Uuid:
def __init__(self, uuid_str=None):
"""
Create Uuid object.
Args:
uuid_str (str, optional): UUID string representation
"""
def __str__(self):
"""
Get string representation of UUID.
Returns:
str: UUID string
"""
def __eq__(self, other):
"""Equality comparison."""
def __hash__(self):
"""Hash function for use in sets and dicts."""from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-producer'
}
producer = Producer(conf)
def delivery_report(err, msg):
"""Called once for each message produced."""
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
# Produce messages
for i in range(10):
producer.produce('my-topic',
key=f'key-{i}',
value=f'value-{i}',
callback=delivery_report)
# Wait for all messages to be delivered
producer.flush()from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(conf)
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'End of partition {msg.topic()} [{msg.partition()}]')
else:
print(f'Error: {msg.error()}')
else:
print(f'Received: key={msg.key()}, value={msg.value()}, '
f'partition={msg.partition()}, offset={msg.offset()}')
finally:
consumer.close()from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-transactional-id',
'enable.idempotence': True
}
producer = Producer(conf)
# Initialize transactions
producer.init_transactions()
try:
# Begin transaction
producer.begin_transaction()
# Produce messages within transaction
for i in range(5):
producer.produce('my-topic', f'transactional-message-{i}')
# Commit transaction
producer.commit_transaction()
print('Transaction committed successfully')
except Exception as e:
print(f'Transaction failed: {e}')
producer.abort_transaction()from confluent_kafka import Consumer, TopicPartition
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': False
}
consumer = Consumer(conf)
# Manually assign specific partitions
partitions = [
TopicPartition('my-topic', 0, offset=100),
TopicPartition('my-topic', 1, offset=200)
]
consumer.assign(partitions)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if not msg.error():
print(f'Message: {msg.value()}')
# Manual offset commit
consumer.commit(message=msg)
finally:
consumer.close()Install with Tessl CLI
npx tessl i tessl/pypi-confluent-kafka