or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md
tile.json

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-kafka@1.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-kafka@1.10.0

index.mddocs/

Apache Airflow Kafka Provider

Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.

Package Information

  • Package Name: apache-airflow-providers-apache-kafka
  • Package Type: pip
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-kafka
  • Minimum Airflow Version: 2.10.0+
  • Dependencies: confluent-kafka>=2.6.0, asgiref>=2.3.0

Core Imports

# Basic imports for common usage
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor

Hook imports for advanced usage:

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

Sensor and trigger imports:

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger

Basic Usage

from datetime import datetime
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

# Simple producer function
def produce_messages(producer):
    for i in range(10):
        producer.produce("my-topic", value=f"Message {i}")
    producer.flush()

# Simple consumer processing function  
def process_message(message):
    print(f"Processing: {message.value().decode('utf-8')}")
    return True

dag = DAG(
    "kafka_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Produce messages to Kafka topic
produce_task = ProduceToTopicOperator(
    task_id="produce_messages",
    topic="my-topic",
    producer_function=produce_messages,
    kafka_config_id="kafka_default",
    dag=dag
)

# Consume messages from Kafka topic
consume_task = ConsumeFromTopicOperator(
    task_id="consume_messages", 
    topics=["my-topic"],
    apply_function=process_message,
    max_messages=10,
    kafka_config_id="kafka_default",
    dag=dag
)

produce_task >> consume_task

Architecture

The provider is organized into five main capability areas:

  • Connection Management: Centralized Kafka connection configuration and authentication
  • Administrative Operations: Topic management and cluster administration
  • Message Production: Publishing messages to Kafka topics with flexible delivery options
  • Message Consumption: Consuming and processing messages with configurable commit strategies
  • Asynchronous Operations: Event-driven sensors and triggers for real-time workflows

Capabilities

Connection Management

Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.

class KafkaBaseHook(BaseHook):
    def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...
    def get_conn(self) -> Any: ...
    def test_connection(self) -> tuple[bool, str]: ...

Connection Management

Administrative Operations

Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.

class KafkaAdminClientHook(KafkaBaseHook):
    def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...
    def delete_topic(self, topics: Sequence[str]) -> None: ...

Administrative Operations

Message Production

Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.

class ProduceToTopicOperator(BaseOperator):
    def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class KafkaProducerHook(KafkaBaseHook):
    def get_producer(self) -> Producer: ...

Message Production

Message Consumption

Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.

class ConsumeFromTopicOperator(BaseOperator):
    def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class KafkaConsumerHook(KafkaBaseHook):  
    def get_consumer(self) -> Consumer: ...

Message Consumption

Asynchronous Operations

Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.

class AwaitMessageSensor(BaseOperator):
    def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

class AwaitMessageTriggerFunctionSensor(BaseOperator):
    def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...

class AwaitMessageTrigger(BaseTrigger):
    def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

Asynchronous Operations

Configuration

The provider uses Airflow connections for configuration. Set up a connection with:

  • Connection Type: kafka
  • Connection ID: kafka_default (or custom)
  • Extra: JSON configuration with bootstrap.servers and other Kafka client settings

Example connection configuration:

{
  "bootstrap.servers": "localhost:9092",
  "security.protocol": "SASL_SSL",
  "sasl.mechanism": "PLAIN",
  "sasl.username": "your-username", 
  "sasl.password": "your-password"
}

Error Handling

The provider includes specialized exception handling:

class KafkaAuthenticationError(Exception):
    """Custom exception for Kafka authentication failures."""

Version Compatibility

The provider includes version compatibility helpers for different Airflow versions:

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get the base Airflow version as a tuple.
    
    Returns:
        tuple: (major, minor, patch) version numbers
    """

# Version flags for conditional imports and behavior
AIRFLOW_V_3_0_PLUS: bool  # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool  # True if Airflow 3.1+

# Version-compatible base class imports
BaseHook      # Imports from airflow.sdk (3.1+) or airflow.hooks.base
BaseOperator  # Imports from airflow.sdk (3.0+) or airflow.models

Version-Specific Import Patterns

# Recommended: Use version-compatible imports
from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator

# These imports work across Airflow versions:
# - Airflow 3.1+: Imports from airflow.sdk.base_hook
# - Airflow 3.0+: Imports from airflow.models.baseoperator  
# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.models

Version Compatibility Utilities

The provider includes utilities for handling different Airflow versions:

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get the base Airflow version as a tuple.
    
    Returns:
        tuple: (major, minor, patch) version numbers
    """

# Version detection constants  
AIRFLOW_V_3_0_PLUS: bool  # True if Airflow 3.0 or higher
AIRFLOW_V_3_1_PLUS: bool  # True if Airflow 3.1 or higher

These utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.