Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-kafka@1.10.0Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.
pip install apache-airflow-providers-apache-kafka# Basic imports for common usage
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensorHook imports for advanced usage:
from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHookSensor and trigger imports:
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTriggerfrom datetime import datetime
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
# Simple producer function
def produce_messages(producer):
for i in range(10):
producer.produce("my-topic", value=f"Message {i}")
producer.flush()
# Simple consumer processing function
def process_message(message):
print(f"Processing: {message.value().decode('utf-8')}")
return True
dag = DAG(
"kafka_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Produce messages to Kafka topic
produce_task = ProduceToTopicOperator(
task_id="produce_messages",
topic="my-topic",
producer_function=produce_messages,
kafka_config_id="kafka_default",
dag=dag
)
# Consume messages from Kafka topic
consume_task = ConsumeFromTopicOperator(
task_id="consume_messages",
topics=["my-topic"],
apply_function=process_message,
max_messages=10,
kafka_config_id="kafka_default",
dag=dag
)
produce_task >> consume_taskThe provider is organized into five main capability areas:
Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.
class KafkaBaseHook(BaseHook):
def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...
def get_conn(self) -> Any: ...
def test_connection(self) -> tuple[bool, str]: ...Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.
class KafkaAdminClientHook(KafkaBaseHook):
def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...
def delete_topic(self, topics: Sequence[str]) -> None: ...Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.
class ProduceToTopicOperator(BaseOperator):
def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class KafkaProducerHook(KafkaBaseHook):
def get_producer(self) -> Producer: ...Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.
class ConsumeFromTopicOperator(BaseOperator):
def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class KafkaConsumerHook(KafkaBaseHook):
def get_consumer(self) -> Consumer: ...Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.
class AwaitMessageSensor(BaseOperator):
def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class AwaitMessageTriggerFunctionSensor(BaseOperator):
def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...
class AwaitMessageTrigger(BaseTrigger):
def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...The provider uses Airflow connections for configuration. Set up a connection with:
bootstrap.servers and other Kafka client settingsExample connection configuration:
{
"bootstrap.servers": "localhost:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-username",
"sasl.password": "your-password"
}The provider includes specialized exception handling:
class KafkaAuthenticationError(Exception):
"""Custom exception for Kafka authentication failures."""The provider includes version compatibility helpers for different Airflow versions:
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get the base Airflow version as a tuple.
Returns:
tuple: (major, minor, patch) version numbers
"""
# Version flags for conditional imports and behavior
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+
# Version-compatible base class imports
BaseHook # Imports from airflow.sdk (3.1+) or airflow.hooks.base
BaseOperator # Imports from airflow.sdk (3.0+) or airflow.models# Recommended: Use version-compatible imports
from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator
# These imports work across Airflow versions:
# - Airflow 3.1+: Imports from airflow.sdk.base_hook
# - Airflow 3.0+: Imports from airflow.models.baseoperator
# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.modelsThe provider includes utilities for handling different Airflow versions:
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get the base Airflow version as a tuple.
Returns:
tuple: (major, minor, patch) version numbers
"""
# Version detection constants
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0 or higher
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1 or higherThese utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.