CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-consumption.mddocs/

Message Consumption

Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.

Capabilities

Consume From Topic Operator

Airflow operator for consuming messages from Kafka topics with flexible processing functions and configurable commit strategies.

class ConsumeFromTopicOperator(BaseOperator):
    """
    Operator for consuming messages from Kafka topics.
    
    Attributes:
        template_fields: tuple = ("topics", "apply_function_args", "apply_function_kwargs", "kafka_config_id")
    """
    
    def __init__(
        self,
        topics: str | Sequence[str],
        kafka_config_id: str = "kafka_default",
        apply_function: Callable[..., Any] | str | None = None,
        apply_function_batch: Callable[..., Any] | str | None = None,
        apply_function_args: Sequence[Any] | None = None,
        apply_function_kwargs: dict[Any, Any] | None = None,
        commit_cadence: str = "end_of_operator",
        max_messages: int | None = None,
        max_batch_size: int = 1000,
        poll_timeout: float = 60,
        **kwargs: Any
    ) -> None:
        """
        Initialize the consumer operator.
        
        Args:
            topics: Kafka topic name(s) to consume from
            kafka_config_id: Airflow connection ID for Kafka configuration
            apply_function: Function to apply to each message
            apply_function_batch: Function to apply to batches of messages
            apply_function_args: Arguments to pass to apply function
            apply_function_kwargs: Keyword arguments to pass to apply function
            commit_cadence: When to commit offsets ("never", "end_of_batch", "end_of_operator")
            max_messages: Maximum number of messages to consume
            max_batch_size: Maximum messages per batch
            poll_timeout: Timeout for polling messages (seconds)
            **kwargs: Additional operator arguments
        """
    
    def execute(self, context) -> Any:
        """
        Execute the consumer operation.
        
        Args:
            context: Airflow task context
            
        Returns:
            Any: Result of apply function(s)
        """

Kafka Consumer Hook

Lower-level hook providing direct access to Kafka Consumer client for advanced use cases.

class KafkaConsumerHook(KafkaBaseHook):
    """
    A hook for consuming messages from Kafka topics.
    
    Inherits from KafkaBaseHook and provides consumer client access.
    """
    
    def __init__(self, topics: Sequence[str], kafka_config_id: str = "kafka_default") -> None:
        """
        Initialize the Kafka consumer hook.
        
        Args:
            topics: List of topic names to subscribe to
            kafka_config_id: The connection object to use
        """
    
    def get_consumer(self) -> Consumer:
        """
        Get Kafka Consumer client.
        
        Returns:
            Consumer: Configured confluent-kafka Consumer instance
        """
    
    def _get_client(self, config) -> Consumer:
        """
        Get a Kafka Consumer client with the given configuration.
        
        Args:
            config: Kafka client configuration dictionary
            
        Returns:
            Consumer: Configured confluent-kafka Consumer instance
        """

Commit Cadence Options

VALID_COMMIT_CADENCE = ["never", "end_of_batch", "end_of_operator"]

Error Handling

class KafkaAuthenticationError(Exception):
    """
    Custom exception for Kafka authentication failures.
    
    Raised when consumer authentication fails during connection
    or topic subscription attempts.
    """

def error_callback(err):
    """
    Default error handling callback for consumer operations.
    
    Args:
        err: Error object from confluent-kafka consumer
    """

Usage Examples

Basic Message Consumption

from airflow import DAG
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from datetime import datetime

def process_message(message):
    """Process individual messages."""
    key = message.key().decode('utf-8') if message.key() else None
    value = message.value().decode('utf-8')
    
    print(f"Processing message - Key: {key}, Value: {value}")
    
    # Return True to indicate successful processing
    return True

dag = DAG(
    "kafka_consumer_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

consume_task = ConsumeFromTopicOperator(
    task_id="consume_messages",
    topics=["my-topic"],
    apply_function=process_message,
    max_messages=100,
    kafka_config_id="kafka_default",
    dag=dag
)

Batch Processing

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
import json

def process_batch(messages):
    """Process messages in batches."""
    processed_records = []
    
    for message in messages:
        try:
            # Parse JSON message
            data = json.loads(message.value().decode('utf-8'))
            
            # Transform data
            processed_record = {
                "user_id": data["user_id"],
                "action": data["action"],
                "processed_at": datetime.now().isoformat()
            }
            processed_records.append(processed_record)
            
        except json.JSONDecodeError:
            print(f"Failed to parse message: {message.value()}")
            continue
    
    # Bulk insert or process
    print(f"Processed batch of {len(processed_records)} records")
    return processed_records

consume_batch_task = ConsumeFromTopicOperator(
    task_id="consume_batch",
    topics=["user-events"],
    apply_function_batch=process_batch,
    max_batch_size=500,
    commit_cadence="end_of_batch",
    kafka_config_id="kafka_default"
)

Multiple Topics Consumption

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

def multi_topic_processor(message):
    """Process messages from multiple topics."""
    topic = message.topic()
    value = message.value().decode('utf-8')
    
    if topic == "orders":
        print(f"Processing order: {value}")
        # Handle order logic
    elif topic == "payments":
        print(f"Processing payment: {value}")
        # Handle payment logic
    elif topic == "shipments":
        print(f"Processing shipment: {value}")
        # Handle shipment logic
    
    return True

multi_topic_task = ConsumeFromTopicOperator(
    task_id="consume_multi_topics",
    topics=["orders", "payments", "shipments"],
    apply_function=multi_topic_processor,
    max_messages=1000,
    poll_timeout=30,
    kafka_config_id="kafka_default"
)

Parameterized Processing

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

def parameterized_processor(message, filter_user_id, output_format):
    """Process messages with parameters."""
    try:
        data = json.loads(message.value().decode('utf-8'))
        
        # Filter by user ID if specified
        if filter_user_id and data.get("user_id") != filter_user_id:
            return None  # Skip this message
        
        # Format output
        if output_format == "csv":
            result = f"{data['user_id']},{data['action']},{data['timestamp']}"
        else:
            result = json.dumps(data)
        
        print(f"Processed: {result}")
        return result
        
    except Exception as e:
        print(f"Error processing message: {e}")
        return None

filtered_consume_task = ConsumeFromTopicOperator(
    task_id="filtered_consume",
    topics=["user-events"],
    apply_function=parameterized_processor,
    apply_function_args=[123, "json"],  # filter_user_id=123, output_format="json"
    max_messages=500,
    kafka_config_id="kafka_default"
)

Advanced Commit Strategy

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

def critical_processor(message):
    """Process critical messages with manual commit control."""
    try:
        # Critical processing logic
        data = json.loads(message.value().decode('utf-8'))
        
        # Validate data
        if not all(k in data for k in ["id", "amount", "currency"]):
            raise ValueError("Missing required fields")
        
        # Process financial transaction
        process_transaction(data)
        
        print(f"Successfully processed transaction {data['id']}")
        return True
        
    except Exception as e:
        print(f"Failed to process transaction: {e}")
        # Don't commit this message, it will be retried
        raise

critical_consume_task = ConsumeFromTopicOperator(
    task_id="consume_critical",
    topics=["financial-transactions"],
    apply_function=critical_processor,
    commit_cadence="end_of_operator",  # Only commit after all messages processed
    max_messages=100,
    poll_timeout=60,
    kafka_config_id="kafka_prod"
)

Using Consumer Hook Directly

from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook

def custom_consumer_logic():
    """Use consumer hook for advanced scenarios."""
    hook = KafkaConsumerHook(
        topics=["custom-topic"],
        kafka_config_id="kafka_default"
    )
    
    consumer = hook.get_consumer()
    messages_processed = 0
    
    try:
        while messages_processed < 100:
            msg = consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue
            
            # Custom processing
            print(f"Received: {msg.value().decode('utf-8')}")
            messages_processed += 1
            
            # Manual commit after processing
            consumer.commit(msg)
            
    except Exception as e:
        print(f"Consumer error: {e}")
        raise
    finally:
        consumer.close()

hook_consumer_task = PythonOperator(
    task_id="custom_consumer",
    python_callable=custom_consumer_logic
)

Advanced Consumer Configuration

Consumer Group Configuration

# Connection extra configuration for consumer groups
{
    "bootstrap.servers": "kafka:9092",
    "group.id": "airflow-consumer-group",
    "auto.offset.reset": "earliest",  # or "latest"
    "enable.auto.commit": "false",    # Manual commit control
    "max.poll.records": "500",        # Messages per poll
    "session.timeout.ms": "30000",    # Consumer group session timeout
    "heartbeat.interval.ms": "3000",  # Heartbeat interval
    "fetch.min.bytes": "1024",        # Minimum bytes to fetch
    "fetch.max.wait.ms": "500"        # Maximum wait time for fetch
}

Message Filtering and Transformation

def filter_and_transform(message):
    """Filter and transform messages based on content."""
    try:
        data = json.loads(message.value().decode('utf-8'))
        
        # Filter based on message content
        if data.get("event_type") not in ["purchase", "login"]:
            return None  # Skip message
        
        # Transform message
        transformed = {
            "event_id": data["id"],
            "user": data["user_id"],
            "type": data["event_type"],
            "timestamp": data["timestamp"],
            "metadata": {
                "topic": message.topic(),
                "partition": message.partition(),
                "offset": message.offset()
            }
        }
        
        return transformed
        
    except Exception as e:
        print(f"Error filtering message: {e}")
        return None

filter_task = ConsumeFromTopicOperator(
    task_id="filter_messages",
    topics=["raw-events"],
    apply_function=filter_and_transform,
    commit_cadence="end_of_batch",
    max_batch_size=100,
    kafka_config_id="kafka_default"
)

Error Handling and Retry Logic

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

def resilient_processor(message):
    """Process messages with retry logic."""
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            # Processing logic that might fail
            data = json.loads(message.value().decode('utf-8'))
            
            # Simulate external API call
            result = call_external_api(data)
            
            print(f"Successfully processed message: {result}")
            return result
            
        except Exception as e:
            retry_count += 1
            print(f"Attempt {retry_count} failed: {e}")
            
            if retry_count >= max_retries:
                # Log to dead letter topic or error handling
                print(f"Message failed after {max_retries} attempts")
                return None
            
            # Wait before retry
            time.sleep(2 ** retry_count)  # Exponential backoff

resilient_task = ConsumeFromTopicOperator(
    task_id="resilient_consume",
    topics=["api-requests"],
    apply_function=resilient_processor,
    commit_cadence="end_of_operator",
    kafka_config_id="kafka_default"
)

Performance Optimization

High-Throughput Consumption

def high_throughput_batch_processor(messages):
    """Optimized batch processing for high throughput."""
    # Process in chunks for memory efficiency
    chunk_size = 100
    results = []
    
    for i in range(0, len(messages), chunk_size):
        chunk = messages[i:i + chunk_size]
        
        # Parallel processing of chunk
        chunk_results = []
        for message in chunk:
            try:
                data = json.loads(message.value().decode('utf-8'))
                # Fast processing logic
                chunk_results.append(transform_data(data))
            except Exception as e:
                print(f"Error in chunk processing: {e}")
                continue
        
        results.extend(chunk_results)
        
        # Optional: Intermediate processing/storage
        if len(results) >= 1000:
            bulk_store(results)
            results = []
    
    # Final storage
    if results:
        bulk_store(results)
    
    return len(messages)

high_throughput_task = ConsumeFromTopicOperator(
    task_id="high_throughput_consume",
    topics=["high-volume-topic"],
    apply_function_batch=high_throughput_batch_processor,
    max_batch_size=2000,
    poll_timeout=10,
    commit_cadence="end_of_batch",
    kafka_config_id="kafka_default"
)

Best Practices

  1. Consumer Groups: Use meaningful group IDs for consumer coordination
  2. Offset Management: Choose appropriate auto.offset.reset and commit strategies
  3. Error Handling: Implement robust error handling with retry logic
  4. Memory Management: Process messages in batches for better memory utilization
  5. Monitoring: Log processing metrics and errors for operational visibility
  6. Backpressure: Use max_messages and poll_timeout to control consumption rate

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka

docs

admin-operations.md

async-operations.md

connection-management.md

index.md

message-consumption.md

message-production.md

tile.json