CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

Apache Airflow Kafka Provider

Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.

Package Information

  • Package Name: apache-airflow-providers-apache-kafka
  • Package Type: pip
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-kafka
  • Minimum Airflow Version: 2.10.0+
  • Dependencies: confluent-kafka>=2.6.0, asgiref>=2.3.0

Core Imports

# Basic imports for common usage
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor

Hook imports for advanced usage:

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

Sensor and trigger imports:

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger

Basic Usage

from datetime import datetime
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

# Simple producer function
def produce_messages(producer):
    for i in range(10):
        producer.produce("my-topic", value=f"Message {i}")
    producer.flush()

# Simple consumer processing function  
def process_message(message):
    print(f"Processing: {message.value().decode('utf-8')}")
    return True

dag = DAG(
    "kafka_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Produce messages to Kafka topic
produce_task = ProduceToTopicOperator(
    task_id="produce_messages",
    topic="my-topic",
    producer_function=produce_messages,
    kafka_config_id="kafka_default",
    dag=dag
)

# Consume messages from Kafka topic
consume_task = ConsumeFromTopicOperator(
    task_id="consume_messages", 
    topics=["my-topic"],
    apply_function=process_message,
    max_messages=10,
    kafka_config_id="kafka_default",
    dag=dag
)

produce_task >> consume_task

Architecture

The provider is organized into five main capability areas:

  • Connection Management: Centralized Kafka connection configuration and authentication
  • Administrative Operations: Topic management and cluster administration
  • Message Production: Publishing messages to Kafka topics with flexible delivery options
  • Message Consumption: Consuming and processing messages with configurable commit strategies
  • Asynchronous Operations: Event-driven sensors and triggers for real-time workflows

Capabilities

Connection Management

Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.

class KafkaBaseHook(BaseHook):
    def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...
    def get_conn(self) -> Any: ...
    def test_connection(self) -> tuple[bool, str]: ...

Connection Management

Administrative Operations

Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.

class KafkaAdminClientHook(KafkaBaseHook):
    def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...
    def delete_topic(self, topics: Sequence[str]) -> None: ...

Administrative Operations

Message Production

Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.

class ProduceToTopicOperator(BaseOperator):
    def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class KafkaProducerHook(KafkaBaseHook):
    def get_producer(self) -> Producer: ...

Message Production

Message Consumption

Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.

class ConsumeFromTopicOperator(BaseOperator):
    def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class KafkaConsumerHook(KafkaBaseHook):  
    def get_consumer(self) -> Consumer: ...

Message Consumption

Asynchronous Operations

Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.

class AwaitMessageSensor(BaseOperator):
    def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class AwaitMessageTriggerFunctionSensor(BaseOperator):
    def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...

class AwaitMessageTrigger(BaseTrigger):
    def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

Asynchronous Operations

Configuration

The provider uses Airflow connections for configuration. Set up a connection with:

  • Connection Type: kafka
  • Connection ID: kafka_default (or custom)
  • Extra: JSON configuration with bootstrap.servers and other Kafka client settings

Example connection configuration:

{
  "bootstrap.servers": "localhost:9092",
  "security.protocol": "SASL_SSL",
  "sasl.mechanism": "PLAIN",
  "sasl.username": "your-username", 
  "sasl.password": "your-password"
}

Error Handling

The provider includes specialized exception handling:

class KafkaAuthenticationError(Exception):
    """Custom exception for Kafka authentication failures."""

Version Compatibility

The provider includes version compatibility helpers for different Airflow versions:

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get the base Airflow version as a tuple.
    
    Returns:
        tuple: (major, minor, patch) version numbers
    """

# Version flags for conditional imports and behavior
AIRFLOW_V_3_0_PLUS: bool  # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool  # True if Airflow 3.1+

# Version-compatible base class imports
BaseHook      # Imports from airflow.sdk (3.1+) or airflow.hooks.base
BaseOperator  # Imports from airflow.sdk (3.0+) or airflow.models

Version-Specific Import Patterns

# Recommended: Use version-compatible imports
from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator

# These imports work across Airflow versions:
# - Airflow 3.1+: Imports from airflow.sdk.base_hook
# - Airflow 3.0+: Imports from airflow.models.baseoperator  
# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.models

Version Compatibility Utilities

The provider includes utilities for handling different Airflow versions:

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get the base Airflow version as a tuple.
    
    Returns:
        tuple: (major, minor, patch) version numbers
    """

# Version detection constants  
AIRFLOW_V_3_0_PLUS: bool  # True if Airflow 3.0 or higher
AIRFLOW_V_3_1_PLUS: bool  # True if Airflow 3.1 or higher

These utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-kafka@1.10.x
Publish Source
CLI
Badge
tessl/pypi-apache-airflow-providers-apache-kafka badge