CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-kafka

Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-production.mddocs/

Message Production

Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.

Capabilities

Produce to Topic Operator

Airflow operator for producing messages to Kafka topics with flexible producer function support and delivery confirmation options.

class ProduceToTopicOperator(BaseOperator):
    """
    Operator for producing messages to a Kafka topic.
    
    Attributes:
        template_fields: tuple = ("topic", "producer_function_args", "producer_function_kwargs", "kafka_config_id")
    """
    
    def __init__(
        self,
        topic: str,
        producer_function: str | Callable[..., Any],
        kafka_config_id: str = "kafka_default",
        producer_function_args: Sequence[Any] | None = None,
        producer_function_kwargs: dict[Any, Any] | None = None,
        delivery_callback: str | None = None,
        synchronous: bool = True,
        poll_timeout: float = 0,
        **kwargs: Any
    ) -> None:
        """
        Initialize the producer operator.
        
        Args:
            topic: Kafka topic name to produce to
            producer_function: Function that takes a producer and produces messages
            kafka_config_id: Airflow connection ID for Kafka configuration
            producer_function_args: Arguments to pass to producer function
            producer_function_kwargs: Keyword arguments to pass to producer function
            delivery_callback: Callback function name for delivery confirmations
            synchronous: Whether to wait for delivery confirmation
            poll_timeout: Timeout for polling delivery confirmations
            **kwargs: Additional operator arguments
        """
    
    def execute(self, context) -> None:
        """
        Execute the producer operation.
        
        Args:
            context: Airflow task context
        """

Kafka Producer Hook

Lower-level hook providing direct access to Kafka Producer client for advanced use cases.

class KafkaProducerHook(KafkaBaseHook):
    """
    A hook for producing messages to Kafka topics.
    
    Inherits from KafkaBaseHook and provides producer client access.
    """
    
    def __init__(self, kafka_config_id: str = "kafka_default") -> None:
        """
        Initialize the Kafka producer hook.
        
        Args:
            kafka_config_id: The connection object to use
        """
    
    def get_producer(self) -> Producer:
        """
        Get Kafka Producer client.
        
        Returns:
            Producer: Configured confluent-kafka Producer instance
        """
    
    def _get_client(self, config) -> Producer:
        """
        Get a Kafka Producer client with the given configuration.
        
        Args:
            config: Kafka client configuration dictionary
            
        Returns:
            Producer: Configured confluent-kafka Producer instance
        """

Delivery Callback Functions

Default and custom callback functions for handling message delivery confirmations.

def acked(err, msg):
    """
    Default delivery callback for message acknowledgments.
    
    This callback is automatically called by the producer when a message
    delivery attempt completes (either successfully or with failure).
    
    Args:
        err: Error object if delivery failed, None if successful
        msg: Message object with delivery details including topic, partition, offset
    """

Usage Examples

Basic Message Production

from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from datetime import datetime

def simple_producer(producer):
    """Produce simple text messages."""
    messages = ["Hello", "World", "from", "Airflow"]
    
    for message in messages:
        producer.produce("my-topic", value=message)
    
    # Flush to ensure delivery
    producer.flush()

dag = DAG(
    "kafka_producer_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

produce_task = ProduceToTopicOperator(
    task_id="produce_messages",
    topic="my-topic",
    producer_function=simple_producer,
    kafka_config_id="kafka_default",
    dag=dag
)

JSON Message Production

import json
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

def json_producer(producer):
    """Produce JSON messages with keys."""
    data_records = [
        {"user_id": 1, "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
        {"user_id": 2, "action": "purchase", "timestamp": "2023-01-01T10:01:00Z"},
        {"user_id": 1, "action": "logout", "timestamp": "2023-01-01T10:02:00Z"}
    ]
    
    for record in data_records:
        key = str(record["user_id"])
        value = json.dumps(record)
        
        producer.produce(
            "user-events",
            key=key,
            value=value,
            headers={"content-type": "application/json"}
        )
    
    producer.flush()

produce_json_task = ProduceToTopicOperator(
    task_id="produce_json_data",
    topic="user-events", 
    producer_function=json_producer,
    kafka_config_id="kafka_default"
)

Parameterized Production

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

def parameterized_producer(producer, batch_size, message_prefix):
    """Produce messages with parameters."""
    for i in range(batch_size):
        message = f"{message_prefix}-{i:04d}"
        producer.produce("batch-topic", value=message)
    
    producer.flush()
    print(f"Produced {batch_size} messages with prefix '{message_prefix}'")

produce_batch_task = ProduceToTopicOperator(
    task_id="produce_batch",
    topic="batch-topic",
    producer_function=parameterized_producer,
    producer_function_args=[100, "BATCH"],  # batch_size=100, prefix="BATCH"
    kafka_config_id="kafka_default"
)

Custom Delivery Callback

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

def custom_delivery_callback(err, msg):
    """Custom callback for delivery confirmation."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()}[{msg.partition()}] at offset {msg.offset()}")

def producer_with_callback(producer):
    """Producer using custom delivery callback."""
    for i in range(5):
        producer.produce(
            "callback-topic",
            value=f"Message {i}",
            callback=custom_delivery_callback
        )
    
    # Poll for delivery reports
    producer.poll(1.0)
    producer.flush()

produce_with_callback = ProduceToTopicOperator(
    task_id="produce_with_callback",
    topic="callback-topic",
    producer_function=producer_with_callback,
    delivery_callback="custom_delivery_callback",
    kafka_config_id="kafka_default"
)

Asynchronous Production

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

def async_producer(producer):
    """Asynchronous producer without waiting for confirmations."""
    for i in range(1000):
        producer.produce(
            "high-volume-topic",
            value=f"Message {i}",
            key=str(i % 10)  # Distribute across partitions
        )
        
        # Occasional polling to clear delivery reports
        if i % 100 == 0:
            producer.poll(0)
    
    producer.flush()

async_produce_task = ProduceToTopicOperator(
    task_id="async_produce",
    topic="high-volume-topic",
    producer_function=async_producer,
    synchronous=False,  # Don't wait for individual confirmations
    kafka_config_id="kafka_default"
)

Using Producer Hook Directly

from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook

def custom_producer_logic():
    """Use producer hook for advanced scenarios."""
    hook = KafkaProducerHook(kafka_config_id="kafka_default")
    producer = hook.get_producer()
    
    try:
        # Custom production logic
        for i in range(10):
            future = producer.produce(
                "custom-topic",
                value=f"Custom message {i}",
                partition=i % 3  # Explicit partition assignment
            )
            
        # Wait for all messages
        producer.flush(timeout=10)
        
    except Exception as e:
        print(f"Production failed: {e}")
        raise
    finally:
        producer.close()

hook_task = PythonOperator(
    task_id="custom_producer",
    python_callable=custom_producer_logic
)

Advanced Configuration

Producer Configuration via Connection

# Connection extra configuration for optimized production
{
    "bootstrap.servers": "kafka:9092",
    "acks": "all",                    # Wait for all replicas
    "retries": "2147483647",          # Retry indefinitely
    "batch.size": "16384",            # Batch size in bytes
    "linger.ms": "5",                 # Wait up to 5ms for batching
    "compression.type": "snappy",     # Enable compression
    "max.in.flight.requests.per.connection": "5",
    "enable.idempotence": "true"      # Exactly-once semantics
}

Error Handling in Producer Functions

def robust_producer(producer):
    """Producer with comprehensive error handling."""
    messages_sent = 0
    failed_messages = []
    
    def delivery_report(err, msg):
        nonlocal messages_sent, failed_messages
        if err is not None:
            failed_messages.append((msg.key(), msg.value(), str(err)))
        else:
            messages_sent += 1
    
    try:
        for i in range(100):
            producer.produce(
                "robust-topic",
                key=str(i),
                value=f"Message {i}",
                callback=delivery_report
            )
            
            # Poll periodically
            if i % 10 == 0:
                producer.poll(0.1)
        
        # Final flush with timeout
        remaining = producer.flush(timeout=30)
        if remaining > 0:
            raise Exception(f"{remaining} messages failed to deliver")
            
        print(f"Successfully sent {messages_sent} messages")
        if failed_messages:
            print(f"Failed messages: {failed_messages}")
            
    except Exception as e:
        print(f"Producer error: {e}")
        raise

robust_task = ProduceToTopicOperator(
    task_id="robust_producer",
    topic="robust-topic",
    producer_function=robust_producer,
    kafka_config_id="kafka_default"
)

Best Practices

  1. Batching: Use linger.ms and batch.size for optimal throughput
  2. Error Handling: Always implement delivery callbacks for critical data
  3. Flushing: Call producer.flush() to ensure message delivery
  4. Partitioning: Use message keys for consistent partition assignment
  5. Compression: Enable compression for better network utilization
  6. Idempotence: Enable idempotence for exactly-once delivery semantics

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka

docs

admin-operations.md

async-operations.md

connection-management.md

index.md

message-consumption.md

message-production.md

tile.json