CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

connection-management.mddocs/

Connection Management

Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.

Capabilities

Base Hook Connection

Provides the foundational connection management for all Kafka operations with support for multiple authentication mechanisms and connection testing.

class KafkaBaseHook(BaseHook):
    """
    A base hook for interacting with Apache Kafka.
    
    Attributes:
        conn_name_attr: str = "kafka_config_id"
        default_conn_name: str = "kafka_default"
        conn_type: str = "kafka"
        hook_name: str = "Apache Kafka"
    """
    
    def __init__(self, kafka_config_id: str = "kafka_default", *args, **kwargs) -> None:
        """
        Initialize the Kafka base hook.
        
        Args:
            kafka_config_id: The connection object to use
            *args: Additional positional arguments
            **kwargs: Additional keyword arguments
        """
    
    def get_conn(self) -> Any:
        """
        Get Kafka AdminClient connection.
        
        Returns:
            AdminClient: Configured Kafka AdminClient instance
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Kafka connection.
        
        Returns:
            tuple: (success: bool, message: str)
        """
    
    def _get_client(self, config) -> Any:
        """
        Get a Kafka client with the given configuration.
        
        Args:
            config: Kafka client configuration dictionary
            
        Returns:
            Any: Kafka client instance (AdminClient by default)
        """
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """
        Return custom field behaviour for Airflow UI.
        
        Returns:
            dict: UI field configuration
        """

Connection Configuration

The hook supports various Kafka connection configurations through Airflow connections:

  • Connection Type: "kafka"
  • Connection ID: Configurable (defaults to "kafka_default")
  • Extra Configuration: JSON object with Kafka client properties

Standard Configuration Options

# Connection extra field example
{
    "bootstrap.servers": "localhost:9092",
    "security.protocol": "PLAINTEXT",  # or SASL_PLAINTEXT, SASL_SSL, SSL
    "client.id": "airflow-client"
}

SASL Authentication

# SASL/PLAIN configuration
{
    "bootstrap.servers": "broker:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "username",
    "sasl.password": "password"
}

Google Cloud Managed Kafka Integration

The provider includes automatic support for Google Cloud Managed Kafka with token-based authentication:

# Google Cloud configuration (tokens handled automatically)
{
    "bootstrap.servers": "your-cluster.kafka.gcp.example.com:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "OAUTHBEARER"
}

Usage Examples

Basic Connection Setup

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook

# Initialize with default connection
hook = KafkaBaseHook()

# Initialize with custom connection
hook = KafkaBaseHook(kafka_config_id="my_kafka_conn")

# Test the connection
success, message = hook.test_connection()
if success:
    print("Connection successful")
else:
    print(f"Connection failed: {message}")

Using the Connection

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook

hook = KafkaBaseHook(kafka_config_id="kafka_default")

# Get the underlying AdminClient
admin_client = hook.get_conn()

# Use for administrative operations
metadata = admin_client.list_topics()
print(f"Available topics: {list(metadata.topics.keys())}")

Error Handling

Connection-related errors are handled through standard Kafka client exceptions. The test_connection() method provides a safe way to verify connectivity:

hook = KafkaBaseHook(kafka_config_id="kafka_connection")
success, error_message = hook.test_connection()

if not success:
    print(f"Connection test failed: {error_message}")
    # Handle connection failure

Configuration Best Practices

  1. Security: Always use encrypted connections (SASL_SSL or SSL) in production
  2. Client ID: Set meaningful client.id values for monitoring and debugging
  3. Timeouts: Configure appropriate timeout values for your network environment
  4. Connection Pooling: Reuse connections when possible to reduce overhead

Environment-Specific Settings

Development Environment

{
    "bootstrap.servers": "localhost:9092",
    "security.protocol": "PLAINTEXT"
}

Production Environment

{
    "bootstrap.servers": "kafka-cluster:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "{{ var.kafka_username }}",
    "sasl.password": "{{ var.kafka_password }}",
    "ssl.ca.location": "/path/to/ca-cert",
    "client.id": "airflow-prod"
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka

docs

admin-operations.md

async-operations.md

connection-management.md

index.md

message-consumption.md

message-production.md

tile.json