Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.
Deferrable sensor that waits for specific messages in Kafka topics without blocking worker slots. Uses apply functions to test message conditions and supports deferrable execution.
class AwaitMessageSensor(BaseOperator):
"""
Sensor that waits for messages in Kafka topics using deferrable execution.
Attributes:
template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
ui_color: str = "#e8f4fd"
"""
def __init__(
self,
topics: Sequence[str],
apply_function: str | Callable,
kafka_config_id: str = "kafka_default",
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
poll_timeout: float = 1.0,
poll_interval: float = 5.0,
commit_cadence: str = "end_of_batch",
max_messages: int | None = None,
max_batch_size: int = 1000,
**kwargs: Any
) -> None:
"""
Initialize the message sensor.
Args:
topics: List of topics to monitor
apply_function: Function to test message conditions (callable or import string)
kafka_config_id: Airflow connection ID for Kafka configuration
apply_function_args: Arguments to pass to apply function
apply_function_kwargs: Keyword arguments to pass to apply function
poll_timeout: Timeout for individual polls (seconds)
poll_interval: Interval between polls (seconds)
commit_cadence: When to commit messages ("end_of_batch", "end_of_operator", "never")
max_messages: Maximum number of messages to consume
max_batch_size: Maximum batch size for message processing
**kwargs: Additional operator arguments
"""
def execute(self, context) -> Any:
"""
Execute the sensor operation.
Args:
context: Airflow task context
Returns:
Any: Result when condition is met
"""
def execute_complete(self, context, event=None):
"""
Handle trigger completion.
Args:
context: Airflow task context
event: Event data from trigger
"""Enhanced sensor with custom trigger function for complex event handling. Provides additional event processing capabilities beyond basic condition checking.
class AwaitMessageTriggerFunctionSensor(BaseOperator):
"""
Sensor with custom trigger function for advanced message handling.
Attributes:
template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
ui_color: str = "#e8f4fd"
"""
def __init__(
self,
topics: Sequence[str],
apply_function: str | Callable,
event_triggered_function: Callable,
kafka_config_id: str = "kafka_default",
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
poll_timeout: float = 1.0,
poll_interval: float = 5.0,
**kwargs: Any
) -> None:
"""
Initialize the trigger function sensor.
Args:
topics: List of topics to monitor
apply_function: Function to test message conditions (callable or import string)
event_triggered_function: Function to call when event triggers
kafka_config_id: Airflow connection ID for Kafka configuration
apply_function_args: Arguments to pass to apply function
apply_function_kwargs: Keyword arguments to pass to apply function
poll_timeout: Timeout for individual polls (seconds)
poll_interval: Interval between polls (seconds)
**kwargs: Additional operator arguments
"""
def execute(self, context, event=None) -> Any:
"""
Execute the sensor operation.
Args:
context: Airflow task context
event: Event data
Returns:
Any: Result when condition is met
"""Low-level trigger for asynchronous message monitoring in Kafka topics. Provides the core deferrable execution mechanism used by sensors.
class AwaitMessageTrigger(BaseTrigger):
"""
Trigger for asynchronous message monitoring in Kafka topics.
"""
def __init__(
self,
topics: Sequence[str],
apply_function: str,
kafka_config_id: str = "kafka_default",
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
poll_timeout: float = 1.0,
poll_interval: float = 5.0,
**kwargs: Any
) -> None:
"""
Initialize the message trigger.
Args:
topics: List of topics to monitor
apply_function: Import string for function to test message conditions
kafka_config_id: Airflow connection ID for Kafka configuration
apply_function_args: Arguments to pass to apply function
apply_function_kwargs: Keyword arguments to pass to apply function
poll_timeout: Timeout for individual polls (seconds)
poll_interval: Interval between polls (seconds)
**kwargs: Additional trigger arguments
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize trigger state.
Returns:
tuple: (class_path, serialized_kwargs)
"""
async def run(self):
"""
Run the trigger asynchronously.
Yields:
TriggerEvent: Event when condition is met
"""from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
from datetime import datetime
import json
def check_order_completion(message):
"""Check if message indicates order completion."""
try:
data = json.loads(message.value().decode('utf-8'))
return data.get("status") == "completed" and data.get("order_id") == "12345"
except Exception:
return False
dag = DAG(
"kafka_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
wait_for_order = AwaitMessageSensor(
task_id="wait_for_order_completion",
topics=["order-events"],
apply_function=check_order_completion,
poll_timeout=1.0,
poll_interval=5.0,
timeout=300, # 5 minutes timeout
kafka_config_id="kafka_default",
dag=dag
)# Define function in separate module (e.g. my_functions.py)
def detect_critical_event(message):
"""Detect critical events in message stream."""
try:
data = json.loads(message.value().decode('utf-8'))
return data.get("priority") == "critical"
except Exception:
return False
# Use import string in sensor
critical_sensor = AwaitMessageSensor(
task_id="detect_critical",
topics=["system-events"],
apply_function="my_functions.detect_critical_event", # Import string
poll_timeout=1.0,
poll_interval=2.0,
kafka_config_id="kafka_default"
)# Different commit strategies
sensor_end_of_batch = AwaitMessageSensor(
task_id="sensor_batch_commit",
topics=["events"],
apply_function="my_functions.process_event",
commit_cadence="end_of_batch", # Commit after each batch
max_batch_size=100,
kafka_config_id="kafka_default"
)
sensor_end_of_operator = AwaitMessageSensor(
task_id="sensor_operator_commit",
topics=["events"],
apply_function="my_functions.process_event",
commit_cadence="end_of_operator", # Commit at end of execution
kafka_config_id="kafka_default"
)
sensor_never_commit = AwaitMessageSensor(
task_id="sensor_no_commit",
topics=["events"],
apply_function="my_functions.process_event",
commit_cadence="never", # Never commit (external commit control)
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor
def event_condition(message):
"""Check if message meets condition."""
data = json.loads(message.value().decode('utf-8'))
return data.get("event_type") == "alert"
def handle_alert(event_data):
"""Handle alert when triggered."""
print(f"Alert triggered: {event_data}")
# Custom alert handling logic
return "Alert processed"
alert_sensor = AwaitMessageTriggerFunctionSensor(
task_id="handle_alerts",
topics=["alerts"],
apply_function=event_condition,
event_triggered_function=handle_alert,
poll_timeout=0.5,
poll_interval=1.0,
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
from airflow.sensors.base import BaseSensorOperator
class CustomKafkaSensor(BaseSensorOperator):
def __init__(self, topics, apply_function, **kwargs):
super().__init__(**kwargs)
self.topics = topics
self.apply_function = apply_function
def poke(self, context):
# Create and use trigger directly
trigger = AwaitMessageTrigger(
topics=self.topics,
apply_function=self.apply_function,
kafka_config_id="kafka_default",
poll_timeout=1.0,
poll_interval=2.0
)
# Use trigger in custom logic
return self._check_trigger_condition(trigger, context)The provider includes error handling mechanisms:
def error_callback(err):
"""
Error callback function for Kafka consumer errors.
Args:
err: Kafka error object
"""Custom exception for authentication failures:
class KafkaAuthenticationError(Exception):
"""Custom exception for Kafka authentication failures."""VALID_COMMIT_CADENCE = ["end_of_batch", "end_of_operator", "never"]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka