Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.
Provides administrative operations for Kafka clusters using the Kafka AdminClient API. Extends the base hook with cluster management capabilities.
class KafkaAdminClientHook(KafkaBaseHook):
"""
A hook for administrative Kafka operations.
Inherits from KafkaBaseHook and provides methods for topic management
and other administrative tasks.
"""
def __init__(self, kafka_config_id: str = "kafka_default") -> None:
"""
Initialize the Kafka admin client hook.
Args:
kafka_config_id: The connection object to use
"""
def create_topic(self, topics: Sequence[Sequence[Any]]) -> None:
"""
Create Kafka topics.
Args:
topics: Sequence of topic configurations, where each topic is a sequence
containing [topic_name, partition_count, replication_factor]
Raises:
KafkaException: If topic creation fails
"""
def delete_topic(self, topics: Sequence[str]) -> None:
"""
Delete Kafka topics.
Args:
topics: Sequence of topic names to delete
Raises:
KafkaException: If topic deletion fails
"""
def _get_client(self, config) -> AdminClient:
"""
Get a Kafka AdminClient with the given configuration.
Args:
config: Kafka client configuration dictionary
Returns:
AdminClient: Configured confluent-kafka AdminClient instance
"""from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_default")
# Create a single topic with default settings
topics_to_create = [
["my-topic", 3, 1] # topic_name, partitions, replication_factor
]
admin_hook.create_topic(topics_to_create)from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
admin_hook = KafkaAdminClientHook()
# Create topics with standard configuration
topics_config = [
["high-throughput-topic", 12, 3], # topic_name, partitions, replication_factor
["log-compaction-topic", 6, 2]
]
admin_hook.create_topic(topics_config)from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
admin_hook = KafkaAdminClientHook()
# Delete single topic
admin_hook.delete_topic(["old-topic"])
# Delete multiple topics
topics_to_delete = [
"temp-topic-1",
"temp-topic-2",
"test-topic"
]
admin_hook.delete_topic(topics_to_delete)from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
def create_kafka_topics():
"""Create required Kafka topics for the data pipeline."""
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
topics = [
["user-events", 12, 3], # topic, partitions, replication_factor
["processed-events", 6, 2],
["error-events", 3, 2]
]
admin_hook.create_topic(topics)
return "Topics created successfully"
def cleanup_old_topics():
"""Remove temporary or test topics."""
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
topics_to_remove = [
"test-topic-2023",
"temp-processing-topic",
"old-user-data"
]
admin_hook.delete_topic(topics_to_remove)
return "Old topics cleaned up"
dag = DAG(
"kafka_admin_operations",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
description="Kafka administrative operations"
)
create_topics_task = PythonOperator(
task_id="create_topics",
python_callable=create_kafka_topics,
dag=dag
)
cleanup_topics_task = PythonOperator(
task_id="cleanup_topics",
python_callable=cleanup_old_topics,
dag=dag
)
create_topics_task >> cleanup_topics_taskCommon topic configuration parameters that can be specified in the config dictionary:
{
"retention.ms": "604800000", # 7 days in milliseconds
"retention.bytes": "1073741824", # 1GB per partition
"cleanup.policy": "delete" # or "compact" for log compaction
}{
"segment.ms": "86400000", # 1 day segment roll time
"max.message.bytes": "1000000", # 1MB max message size
"compression.type": "snappy", # or "gzip", "lz4", "zstd"
"min.insync.replicas": "2" # Minimum in-sync replicas
}{
"cleanup.policy": "compact",
"min.cleanable.dirty.ratio": "0.5",
"delete.retention.ms": "86400000", # Tombstone retention
"min.compaction.lag.ms": "0"
}Administrative operations may fail due to various reasons. Handle exceptions appropriately:
from confluent_kafka import KafkaException
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
def safe_topic_creation():
admin_hook = KafkaAdminClientHook()
try:
topics = [["new-topic", 3, 1, {}]]
admin_hook.create_topic(topics)
print("Topic created successfully")
except KafkaException as e:
if "already exists" in str(e):
print("Topic already exists, continuing...")
else:
print(f"Failed to create topic: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka