CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.

Capabilities

Await Message Sensor

Deferrable sensor that waits for specific messages in Kafka topics without blocking worker slots. Uses apply functions to test message conditions and supports deferrable execution.

class AwaitMessageSensor(BaseOperator):
    """
    Sensor that waits for messages in Kafka topics using deferrable execution.
    
    Attributes:
        template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
        ui_color: str = "#e8f4fd"
    """
    
    def __init__(
        self,
        topics: Sequence[str],
        apply_function: str | Callable,
        kafka_config_id: str = "kafka_default",
        apply_function_args: Sequence[Any] | None = None,
        apply_function_kwargs: dict[Any, Any] | None = None,
        poll_timeout: float = 1.0,
        poll_interval: float = 5.0,
        commit_cadence: str = "end_of_batch",
        max_messages: int | None = None,
        max_batch_size: int = 1000,
        **kwargs: Any
    ) -> None:
        """
        Initialize the message sensor.
        
        Args:
            topics: List of topics to monitor
            apply_function: Function to test message conditions (callable or import string)
            kafka_config_id: Airflow connection ID for Kafka configuration
            apply_function_args: Arguments to pass to apply function
            apply_function_kwargs: Keyword arguments to pass to apply function
            poll_timeout: Timeout for individual polls (seconds)
            poll_interval: Interval between polls (seconds)  
            commit_cadence: When to commit messages ("end_of_batch", "end_of_operator", "never")
            max_messages: Maximum number of messages to consume
            max_batch_size: Maximum batch size for message processing
            **kwargs: Additional operator arguments
        """
    
    def execute(self, context) -> Any:
        """
        Execute the sensor operation.
        
        Args:
            context: Airflow task context
            
        Returns:
            Any: Result when condition is met
        """
    
    def execute_complete(self, context, event=None):
        """
        Handle trigger completion.
        
        Args:
            context: Airflow task context
            event: Event data from trigger
        """

Await Message Trigger Function Sensor

Enhanced sensor with custom trigger function for complex event handling. Provides additional event processing capabilities beyond basic condition checking.

class AwaitMessageTriggerFunctionSensor(BaseOperator):
    """
    Sensor with custom trigger function for advanced message handling.
    
    Attributes:
        template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
        ui_color: str = "#e8f4fd"
    """
    
    def __init__(
        self,
        topics: Sequence[str],
        apply_function: str | Callable,
        event_triggered_function: Callable,
        kafka_config_id: str = "kafka_default",
        apply_function_args: Sequence[Any] | None = None,
        apply_function_kwargs: dict[Any, Any] | None = None,
        poll_timeout: float = 1.0,
        poll_interval: float = 5.0,
        **kwargs: Any
    ) -> None:
        """
        Initialize the trigger function sensor.
        
        Args:
            topics: List of topics to monitor
            apply_function: Function to test message conditions (callable or import string)
            event_triggered_function: Function to call when event triggers
            kafka_config_id: Airflow connection ID for Kafka configuration
            apply_function_args: Arguments to pass to apply function
            apply_function_kwargs: Keyword arguments to pass to apply function
            poll_timeout: Timeout for individual polls (seconds)
            poll_interval: Interval between polls (seconds)
            **kwargs: Additional operator arguments
        """
    
    def execute(self, context, event=None) -> Any:
        """
        Execute the sensor operation.
        
        Args:
            context: Airflow task context
            event: Event data
            
        Returns:
            Any: Result when condition is met
        """

Await Message Trigger

Low-level trigger for asynchronous message monitoring in Kafka topics. Provides the core deferrable execution mechanism used by sensors.

class AwaitMessageTrigger(BaseTrigger):
    """
    Trigger for asynchronous message monitoring in Kafka topics.
    """
    
    def __init__(
        self,
        topics: Sequence[str],
        apply_function: str,
        kafka_config_id: str = "kafka_default",
        apply_function_args: Sequence[Any] | None = None,
        apply_function_kwargs: dict[Any, Any] | None = None,
        poll_timeout: float = 1.0,
        poll_interval: float = 5.0,
        **kwargs: Any
    ) -> None:
        """
        Initialize the message trigger.
        
        Args:
            topics: List of topics to monitor
            apply_function: Import string for function to test message conditions
            kafka_config_id: Airflow connection ID for Kafka configuration
            apply_function_args: Arguments to pass to apply function
            apply_function_kwargs: Keyword arguments to pass to apply function
            poll_timeout: Timeout for individual polls (seconds)
            poll_interval: Interval between polls (seconds)
            **kwargs: Additional trigger arguments
        """
    
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serialize trigger state.
        
        Returns:
            tuple: (class_path, serialized_kwargs)
        """
    
    async def run(self):
        """
        Run the trigger asynchronously.
        
        Yields:
            TriggerEvent: Event when condition is met
        """

Usage Examples

Basic Message Waiting

from airflow import DAG
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
from datetime import datetime
import json

def check_order_completion(message):
    """Check if message indicates order completion."""
    try:
        data = json.loads(message.value().decode('utf-8'))
        return data.get("status") == "completed" and data.get("order_id") == "12345"
    except Exception:
        return False

dag = DAG(
    "kafka_sensor_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

wait_for_order = AwaitMessageSensor(
    task_id="wait_for_order_completion",
    topics=["order-events"],
    apply_function=check_order_completion,
    poll_timeout=1.0,
    poll_interval=5.0,
    timeout=300,  # 5 minutes timeout
    kafka_config_id="kafka_default",
    dag=dag
)

Using Apply Function as Import String

# Define function in separate module (e.g. my_functions.py)
def detect_critical_event(message):
    """Detect critical events in message stream."""
    try:
        data = json.loads(message.value().decode('utf-8'))
        return data.get("priority") == "critical"
    except Exception:
        return False

# Use import string in sensor
critical_sensor = AwaitMessageSensor(
    task_id="detect_critical",
    topics=["system-events"],
    apply_function="my_functions.detect_critical_event",  # Import string
    poll_timeout=1.0,
    poll_interval=2.0,
    kafka_config_id="kafka_default"
)

Commit Cadence Configuration

# Different commit strategies
sensor_end_of_batch = AwaitMessageSensor(
    task_id="sensor_batch_commit",
    topics=["events"],
    apply_function="my_functions.process_event",
    commit_cadence="end_of_batch",  # Commit after each batch
    max_batch_size=100,
    kafka_config_id="kafka_default"
)

sensor_end_of_operator = AwaitMessageSensor(
    task_id="sensor_operator_commit", 
    topics=["events"],
    apply_function="my_functions.process_event",
    commit_cadence="end_of_operator",  # Commit at end of execution
    kafka_config_id="kafka_default"
)

sensor_never_commit = AwaitMessageSensor(
    task_id="sensor_no_commit",
    topics=["events"], 
    apply_function="my_functions.process_event",
    commit_cadence="never",  # Never commit (external commit control)
    kafka_config_id="kafka_default"
)

Custom Trigger Function Sensor

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor

def event_condition(message):
    """Check if message meets condition."""
    data = json.loads(message.value().decode('utf-8'))
    return data.get("event_type") == "alert"

def handle_alert(event_data):
    """Handle alert when triggered."""
    print(f"Alert triggered: {event_data}")
    # Custom alert handling logic
    return "Alert processed"

alert_sensor = AwaitMessageTriggerFunctionSensor(
    task_id="handle_alerts",
    topics=["alerts"],
    apply_function=event_condition,
    event_triggered_function=handle_alert,
    poll_timeout=0.5,
    poll_interval=1.0,
    kafka_config_id="kafka_default"
)

Using Trigger Directly

from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
from airflow.sensors.base import BaseSensorOperator

class CustomKafkaSensor(BaseSensorOperator):
    def __init__(self, topics, apply_function, **kwargs):
        super().__init__(**kwargs)
        self.topics = topics
        self.apply_function = apply_function
        
    def poke(self, context):
        # Create and use trigger directly
        trigger = AwaitMessageTrigger(
            topics=self.topics,
            apply_function=self.apply_function,
            kafka_config_id="kafka_default",
            poll_timeout=1.0,
            poll_interval=2.0
        )
        
        # Use trigger in custom logic
        return self._check_trigger_condition(trigger, context)

Error Handling

The provider includes error handling mechanisms:

def error_callback(err):
    """
    Error callback function for Kafka consumer errors.
    
    Args:
        err: Kafka error object
    """

Custom exception for authentication failures:

class KafkaAuthenticationError(Exception):
    """Custom exception for Kafka authentication failures."""

Configuration Constants

VALID_COMMIT_CADENCE = ["end_of_batch", "end_of_operator", "never"]

Best Practices

  1. Deferrable Execution: Use sensors for long-running waits to free up worker slots
  2. Apply Function Design: Keep apply functions lightweight and handle exceptions gracefully
  3. Commit Strategy: Choose appropriate commit cadence based on your use case
  4. Error Handling: Implement proper error handling in apply functions
  5. Resource Management: Set reasonable batch sizes and timeouts
  6. Import Strings: Use import strings for apply functions to enable proper serialization
  7. Connection Management: Use appropriate Kafka connection configurations

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka

docs

admin-operations.md

async-operations.md

connection-management.md

index.md

message-consumption.md

message-production.md

tile.json