Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.
pip install apache-airflow-providers-apache-kafka# Basic imports for common usage
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensorHook imports for advanced usage:
from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHookSensor and trigger imports:
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTriggerfrom datetime import datetime
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
# Simple producer function
def produce_messages(producer):
for i in range(10):
producer.produce("my-topic", value=f"Message {i}")
producer.flush()
# Simple consumer processing function
def process_message(message):
print(f"Processing: {message.value().decode('utf-8')}")
return True
dag = DAG(
"kafka_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Produce messages to Kafka topic
produce_task = ProduceToTopicOperator(
task_id="produce_messages",
topic="my-topic",
producer_function=produce_messages,
kafka_config_id="kafka_default",
dag=dag
)
# Consume messages from Kafka topic
consume_task = ConsumeFromTopicOperator(
task_id="consume_messages",
topics=["my-topic"],
apply_function=process_message,
max_messages=10,
kafka_config_id="kafka_default",
dag=dag
)
produce_task >> consume_taskThe provider is organized into five main capability areas:
Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.
class KafkaBaseHook(BaseHook):
def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...
def get_conn(self) -> Any: ...
def test_connection(self) -> tuple[bool, str]: ...Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.
class KafkaAdminClientHook(KafkaBaseHook):
def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...
def delete_topic(self, topics: Sequence[str]) -> None: ...Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.
class ProduceToTopicOperator(BaseOperator):
def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class KafkaProducerHook(KafkaBaseHook):
def get_producer(self) -> Producer: ...Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.
class ConsumeFromTopicOperator(BaseOperator):
def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class KafkaConsumerHook(KafkaBaseHook):
def get_consumer(self) -> Consumer: ...Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.
class AwaitMessageSensor(BaseOperator):
def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
class AwaitMessageTriggerFunctionSensor(BaseOperator):
def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...
class AwaitMessageTrigger(BaseTrigger):
def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...The provider uses Airflow connections for configuration. Set up a connection with:
bootstrap.servers and other Kafka client settingsExample connection configuration:
{
"bootstrap.servers": "localhost:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-username",
"sasl.password": "your-password"
}The provider includes specialized exception handling:
class KafkaAuthenticationError(Exception):
"""Custom exception for Kafka authentication failures."""The provider includes version compatibility helpers for different Airflow versions:
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get the base Airflow version as a tuple.
Returns:
tuple: (major, minor, patch) version numbers
"""
# Version flags for conditional imports and behavior
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+
# Version-compatible base class imports
BaseHook # Imports from airflow.sdk (3.1+) or airflow.hooks.base
BaseOperator # Imports from airflow.sdk (3.0+) or airflow.models# Recommended: Use version-compatible imports
from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator
# These imports work across Airflow versions:
# - Airflow 3.1+: Imports from airflow.sdk.base_hook
# - Airflow 3.0+: Imports from airflow.models.baseoperator
# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.modelsThe provider includes utilities for handling different Airflow versions:
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get the base Airflow version as a tuple.
Returns:
tuple: (major, minor, patch) version numbers
"""
# Version detection constants
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0 or higher
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1 or higherThese utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka