CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

admin-operations.mddocs/

Administrative Operations

Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.

Capabilities

Kafka Admin Client Hook

Provides administrative operations for Kafka clusters using the Kafka AdminClient API. Extends the base hook with cluster management capabilities.

class KafkaAdminClientHook(KafkaBaseHook):
    """
    A hook for administrative Kafka operations.
    
    Inherits from KafkaBaseHook and provides methods for topic management
    and other administrative tasks.
    """
    
    def __init__(self, kafka_config_id: str = "kafka_default") -> None:
        """
        Initialize the Kafka admin client hook.
        
        Args:
            kafka_config_id: The connection object to use
        """
    
    def create_topic(self, topics: Sequence[Sequence[Any]]) -> None:
        """
        Create Kafka topics.
        
        Args:
            topics: Sequence of topic configurations, where each topic is a sequence
                   containing [topic_name, partition_count, replication_factor]
        
        Raises:
            KafkaException: If topic creation fails
        """
    
    def delete_topic(self, topics: Sequence[str]) -> None:
        """
        Delete Kafka topics.
        
        Args:
            topics: Sequence of topic names to delete
        
        Raises:
            KafkaException: If topic deletion fails
        """
    
    def _get_client(self, config) -> AdminClient:
        """
        Get a Kafka AdminClient with the given configuration.
        
        Args:
            config: Kafka client configuration dictionary
            
        Returns:
            AdminClient: Configured confluent-kafka AdminClient instance
        """

Usage Examples

Creating Topics

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_default")

# Create a single topic with default settings
topics_to_create = [
    ["my-topic", 3, 1]  # topic_name, partitions, replication_factor
]

admin_hook.create_topic(topics_to_create)

Creating Topics with Configuration

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

admin_hook = KafkaAdminClientHook()

# Create topics with standard configuration  
topics_config = [
    ["high-throughput-topic", 12, 3],  # topic_name, partitions, replication_factor
    ["log-compaction-topic", 6, 2]
]

admin_hook.create_topic(topics_config)

Deleting Topics

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

admin_hook = KafkaAdminClientHook()

# Delete single topic
admin_hook.delete_topic(["old-topic"])

# Delete multiple topics
topics_to_delete = [
    "temp-topic-1",
    "temp-topic-2", 
    "test-topic"
]
admin_hook.delete_topic(topics_to_delete)

Administrative DAG Example

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

def create_kafka_topics():
    """Create required Kafka topics for the data pipeline."""
    admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
    
    topics = [
        ["user-events", 12, 3],     # topic, partitions, replication_factor
        ["processed-events", 6, 2],
        ["error-events", 3, 2]
    ]
    
    admin_hook.create_topic(topics)
    return "Topics created successfully"

def cleanup_old_topics():
    """Remove temporary or test topics."""
    admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
    
    topics_to_remove = [
        "test-topic-2023",
        "temp-processing-topic",
        "old-user-data"
    ]
    
    admin_hook.delete_topic(topics_to_remove)
    return "Old topics cleaned up"

dag = DAG(
    "kafka_admin_operations",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    description="Kafka administrative operations"
)

create_topics_task = PythonOperator(
    task_id="create_topics",
    python_callable=create_kafka_topics,
    dag=dag
)

cleanup_topics_task = PythonOperator(
    task_id="cleanup_topics", 
    python_callable=cleanup_old_topics,
    dag=dag
)

create_topics_task >> cleanup_topics_task

Topic Configuration Options

Common topic configuration parameters that can be specified in the config dictionary:

Retention Settings

{
    "retention.ms": "604800000",        # 7 days in milliseconds
    "retention.bytes": "1073741824",    # 1GB per partition
    "cleanup.policy": "delete"          # or "compact" for log compaction
}

Performance Settings

{
    "segment.ms": "86400000",           # 1 day segment roll time
    "max.message.bytes": "1000000",     # 1MB max message size
    "compression.type": "snappy",       # or "gzip", "lz4", "zstd"
    "min.insync.replicas": "2"          # Minimum in-sync replicas
}

Log Compaction Settings

{
    "cleanup.policy": "compact",
    "min.cleanable.dirty.ratio": "0.5",
    "delete.retention.ms": "86400000",   # Tombstone retention
    "min.compaction.lag.ms": "0"
}

Error Handling

Administrative operations may fail due to various reasons. Handle exceptions appropriately:

from confluent_kafka import KafkaException
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

def safe_topic_creation():
    admin_hook = KafkaAdminClientHook()
    
    try:
        topics = [["new-topic", 3, 1, {}]]
        admin_hook.create_topic(topics)
        print("Topic created successfully")
    except KafkaException as e:
        if "already exists" in str(e):
            print("Topic already exists, continuing...")
        else:
            print(f"Failed to create topic: {e}")
            raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

Best Practices

  1. Partition Planning: Choose partition count based on expected throughput and consumer parallelism
  2. Replication Factor: Use at least 2 for important topics, 3 for critical data
  3. Retention Policy: Set appropriate retention based on data lifecycle requirements
  4. Error Handling: Always handle topic creation/deletion failures gracefully
  5. Monitoring: Log administrative operations for audit and troubleshooting purposes

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka

docs

admin-operations.md

async-operations.md

connection-management.md

index.md

message-consumption.md

message-production.md

tile.json